Loading...
Searching...
No Matches
as_partition_tracker.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_std.h>
22#include <aerospike/as_policy.h>
23#include <aerospike/as_vector.h>
24#include <pthread.h>
25
26#ifdef __cplusplus
27extern "C" {
28#endif
29
30/******************************************************************************
31 * TYPES
32 *****************************************************************************/
33struct as_node_s;
34struct as_cluster_s;
35struct as_error_s;
36
37/**
38 * @private
39 * List of partitions assigned to a node.
40 */
41typedef struct as_node_partitions_s {
42 struct as_node_s* node;
45 uint64_t record_count;
46 uint64_t record_max;
48 bool retry;
50
51/**
52 * @private
53 * Scan/Query partition tracker.
54 */
55typedef struct as_partition_tracker_s {
56 pthread_mutex_t lock;
58 uint32_t node_capacity;
59 struct as_node_s* node_filter;
62 uint64_t max_records;
63 uint64_t record_count;
64 uint64_t deadline;
69 uint32_t total_timeout;
70 uint32_t max_retries;
71 uint32_t iteration;
74
75/******************************************************************************
76 * FUNCTIONS
77 ******************************************************************************/
78
79void
81 as_partition_tracker* pt, struct as_cluster_s* cluster, const as_policy_base* policy,
82 uint64_t max_records, as_policy_replica replica, as_partitions_status** parts_all,
83 bool paginate, uint32_t cluster_size
84 );
85
86void
88 as_partition_tracker* pt, struct as_cluster_s* cluster, const as_policy_base* policy,
89 uint64_t max_records, as_policy_replica replica, as_partitions_status** parts_all, bool paginate, struct as_node_s* node
90 );
91
94 as_partition_tracker* pt, struct as_cluster_s* cluster, const as_policy_base* policy,
95 uint64_t max_records, as_policy_replica replica, as_partitions_status** parts_all,
96 bool paginate, uint32_t cluster_size, as_partition_filter* pf, struct as_error_s* err
97 );
98
101 as_partition_tracker* pt, struct as_cluster_s* cluster, const char* ns, struct as_error_s* err
102 );
103
104void
106 as_partition_tracker* pt, as_node_partitions* np, uint32_t part_id
107 );
108
109static inline void
111 as_partition_tracker* pt, as_node_partitions* np, as_digest* digest, uint32_t n_partitions
112 )
113{
114 uint32_t part_id = as_partition_getid(digest->value, n_partitions);
116 ps->parts[part_id - ps->part_begin].digest = *digest;
117 np->record_count++;
118}
119
120static inline void
122 as_partition_tracker* pt, as_node_partitions* np, as_digest* digest, uint64_t bval,
123 uint32_t n_partitions
124 )
125{
126 uint32_t part_id = as_partition_getid(digest->value, n_partitions);
128 as_partition_status* p = &ps->parts[part_id - ps->part_begin];
129 p->digest = *digest;
130 p->bval = bval;
131 np->record_count++;
132}
133
134static inline bool
136{
137 // Sync scan/query runs in multiple threads, so atomics are required.
138 if (pt && pt->check_max && (as_aaf_uint64(&pt->record_count, 1) > pt->max_records)) {
139 // Record was returned, but would exceed max_records. Discard record
140 // and mark node for retry on next scan/query page.
141 np->retry = true;
142 return true;
143 }
144 return false;
145}
146
147static inline bool
149{
150 // Async scan/query runs in a single event loop thread, so atomics are not necessary.
151 if (pt && pt->check_max && (++pt->record_count > pt->max_records)) {
152 // Record was returned, but would exceed max_records. Discard record
153 // and mark node for retry on next scan/query page.
154 np->retry = true;
155 return true;
156 }
157 return false;
158}
159
160static inline uint16_t
162{
163 return *(uint16_t*)as_vector_get(list, index);
164}
165
166static inline as_partition_status*
168{
169 uint16_t part_id = *(uint16_t*)as_vector_get(list, index);
171 return &ps->parts[part_id - ps->part_begin];
172}
173
176 as_partition_tracker* pt, struct as_cluster_s* cluster, struct as_error_s* err
177 );
178
179bool
182 );
183
184void
186
187static inline void
189{
190 // Mark all partitions for retry on fatal errors.
191 if (parts_all) {
192 parts_all->retry = true;
193 }
194}
195
196#ifdef __cplusplus
197} // end extern "C"
198#endif
#define as_aaf_uint64(_target, _value)
static uint32_t as_partition_getid(const uint8_t *digest, uint32_t n_partitions)
void as_partition_tracker_init_nodes(as_partition_tracker *pt, struct as_cluster_s *cluster, const as_policy_base *policy, uint64_t max_records, as_policy_replica replica, as_partitions_status **parts_all, bool paginate, uint32_t cluster_size)
static as_partition_status * as_partition_tracker_get_status(as_partition_tracker *pt, as_vector *list, uint32_t index)
bool as_partition_tracker_should_retry(as_partition_tracker *pt, as_node_partitions *np, as_status status)
void as_partition_tracker_init_node(as_partition_tracker *pt, struct as_cluster_s *cluster, const as_policy_base *policy, uint64_t max_records, as_policy_replica replica, as_partitions_status **parts_all, bool paginate, struct as_node_s *node)
static bool as_partition_tracker_reached_max_records_sync(as_partition_tracker *pt, as_node_partitions *np)
static void as_partition_tracker_set_last(as_partition_tracker *pt, as_node_partitions *np, as_digest *digest, uint64_t bval, uint32_t n_partitions)
as_status as_partition_tracker_assign(as_partition_tracker *pt, struct as_cluster_s *cluster, const char *ns, struct as_error_s *err)
static void as_partition_tracker_set_digest(as_partition_tracker *pt, as_node_partitions *np, as_digest *digest, uint32_t n_partitions)
static uint16_t as_partition_tracker_get_id(as_vector *list, uint32_t index)
static bool as_partition_tracker_reached_max_records_async(as_partition_tracker *pt, as_node_partitions *np)
as_status as_partition_tracker_init_filter(as_partition_tracker *pt, struct as_cluster_s *cluster, const as_policy_base *policy, uint64_t max_records, as_policy_replica replica, as_partitions_status **parts_all, bool paginate, uint32_t cluster_size, as_partition_filter *pf, struct as_error_s *err)
as_status as_partition_tracker_is_complete(as_partition_tracker *pt, struct as_cluster_s *cluster, struct as_error_s *err)
void as_partition_tracker_part_unavailable(as_partition_tracker *pt, as_node_partitions *np, uint32_t part_id)
void as_partition_tracker_destroy(as_partition_tracker *pt)
static void as_partition_error(as_partitions_status *parts_all)
as_status
Definition as_status.h:30
static void * as_vector_get(as_vector *vector, uint32_t index)
Definition as_vector.h:112
as_policy_replica
Definition as_policy.h:272
as_digest_value value
Definition as_key.h:82
struct as_node_s * node
struct as_node_s * node_filter
as_partitions_status * parts_all
as_policy_replica replica
as_partition_status parts[]