Loading...
Searching...
No Matches
as_cluster.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_atomic.h>
20#include <aerospike/as_config.h>
22#include <aerospike/as_node.h>
24#include <aerospike/as_policy.h>
26
27#ifdef __cplusplus
28extern "C" {
29#endif
30
31/******************************************************************************
32 * TYPES
33 *****************************************************************************/
34
35/**
36 * @private
37 * Reference counted array of server node pointers.
38 */
39typedef struct as_nodes_s {
40 /**
41 * @private
42 * Reference count of node array.
43 */
44 uint32_t ref_count;
45
46 /**
47 * @private
48 * Length of node array.
49 */
50 uint32_t size;
51
52 /**
53 * @private
54 * Server node array.
55 */
56 as_node* array[];
57} as_nodes;
58
59/**
60 * @private
61 * Cluster state for an event loop.
62 */
63typedef struct as_event_state_s {
64 /**
65 * Cluster's pending command count for this event loop.
66 */
68
69 /**
70 * Is cluster closed for this event loop.
71 */
72 bool closed;
74
75/**
76 * @private
77 * Reference counted release function definition.
78 */
79typedef void (*as_release_fn) (void* value);
80
81/**
82 * @private
83 * Reference counted data to be garbage collected.
84 */
85typedef struct as_gc_item_s {
86 /**
87 * @private
88 * Reference counted data to be garbage collected.
89 */
90 void* data;
91
92 /**
93 * @private
94 * Release function.
95 */
98
99/**
100 * Cluster of server nodes.
101 */
102typedef struct as_cluster_s {
103 /**
104 * @private
105 * Active nodes in cluster.
106 */
108
109 /**
110 * @private
111 * Hints for best node for a partition.
112 */
114
115 /**
116 * @private
117 * Nodes to be garbage collected.
118 */
119 as_vector* /* <as_gc_item> */ gc;
120
121 /**
122 * @private
123 * Shared memory implementation of cluster.
124 */
125 struct as_shm_info_s* shm_info;
126
127 /**
128 * @private
129 * User name in UTF-8 encoded bytes.
130 */
131 char* user;
132
133 /**
134 * @private
135 * Password in clear text.
136 */
137 char* password;
138
139 /**
140 * @private
141 * Password in hashed format.
142 */
144
145 /**
146 * @private
147 * Expected cluster name for all nodes. May be null.
148 */
150
151 /**
152 * Cluster event function that will be called when nodes are added/removed from the cluster.
153 */
154 as_cluster_event_callback event_callback;
155
156 /**
157 * Cluster event user data that will be passed back to event_callback.
158 */
160
161 /**
162 * Cluster state for all event loops.
163 */
165
166 /**
167 * @private
168 * Initial seed hosts specified by user.
169 */
170 as_vector* /* <as_host> */ seeds;
171
172 /**
173 * @private
174 * A IP translation table is used in cases where different clients use different server
175 * IP addresses. This may be necessary when using clients from both inside and outside
176 * a local area network. Default is no translation.
177 *
178 * The key is the IP address returned from friend info requests to other servers. The
179 * value is the real IP address used to connect to the server.
180 */
181 as_vector* /* <as_addr_map> */ ip_map;
182
183 /**
184 * @private
185 * TLS parameters
186 */
188
189 /**
190 * @private
191 * Pool of threads used to query server nodes in parallel for batch, scan and query.
192 */
194
195 /**
196 * @private
197 * Cluster tend thread.
198 */
199 pthread_t tend_thread;
200
201 /**
202 * @private
203 * Lock for adding/removing seeds.
204 */
205 pthread_mutex_t seed_lock;
206
207 /**
208 * @private
209 * Lock for metrics operations.
210 */
211 pthread_mutex_t metrics_lock;
212
213 /**
214 * @private
215 * Lock for the tend thread to wait on with the tend interval as timeout.
216 * Normally locked, resulting in waiting a full interval between
217 * tend iterations. Upon cluster shutdown, unlocked by the main
218 * thread, allowing a fast termination of the tend thread.
219 */
220 pthread_mutex_t tend_lock;
221
222 /**
223 * @private
224 * Tend thread identifier to be used with tend_lock.
225 */
226 pthread_cond_t tend_cond;
227
228 /**
229 * @private
230 * Maximum socket idle to validate connections in transactions.
231 */
233
234 /**
235 * @private
236 * Maximum socket idle to trim peak connections to min connections.
237 */
239
240 /**
241 * @private
242 * Rack ids
243 */
245
246 /**
247 * @private
248 * Rack ids size
249 */
251
252 /**
253 * @private
254 * Max errors per node per error_rate_window.
255 */
257
258 /**
259 * @private
260 * Number of tend iterations defining window for max_error_rate.
261 */
263
264 /**
265 * @private
266 * Milliseconds between cluster tends.
267 */
269
270 /**
271 * @private
272 * Cluster tend counter.
273 */
274 uint32_t tend_count;
275
276 /**
277 * @private
278 * Minimum sync connections per node.
279 */
281
282 /**
283 * @private
284 * Maximum sync connections per node.
285 */
287
288 /**
289 * @private
290 * Minimum async connections per node.
291 */
293
294 /**
295 * @private
296 * Maximum async (non-pipeline) connections per node.
297 */
299
300 /**
301 * @private
302 * Maximum pipeline connections per node.
303 */
305
306 /**
307 * @private
308 * Number of synchronous connection pools used for each node.
309 */
311
312 /**
313 * @private
314 * Initial connection timeout in milliseconds.
315 */
317
318 /**
319 * @private
320 * Node login timeout in milliseconds.
321 */
323
324 /**
325 * @private
326 * Random node index.
327 */
328 uint32_t node_index;
329
330 /**
331 * @private
332 * Count of add node failures in the most recent cluster tend iteration.
333 */
335
336 /**
337 * @private
338 * Assign tend thread to this specific CPU ID.
339 */
341
342 /**
343 * @private
344 * Authentication mode.
345 */
346 as_auth_mode auth_mode;
347
348 /**
349 * @private
350 * Total number of data partitions used by cluster.
351 */
352 uint16_t n_partitions;
353
354 /**
355 * @private
356 * If "services-alternate" should be used instead of "services"
357 */
359
360 /**
361 * @private
362 * Request server rack ids.
363 */
365
366 /**
367 * @private
368 * Is authentication enabled
369 */
371
372 /**
373 * @private
374 * Does cluster support partition queries.
375 */
377
378 /**
379 * @private
380 * Fail on cluster init if seed node and all peers are not reachable.
381 */
383
384 /**
385 * @private
386 * Should continue to tend cluster.
387 */
388 volatile bool valid;
389
390 /**
391 * @private
392 * Is metrics colleciton enabled.
393 */
395
396 /**
397 * @private
398 * Number of cluster tend iterations between metrics notification events. One tend iteration
399 * is defined as as_config.tender_interval (default 1 second) plus the time to tend all
400 * nodes. This is set using as_policy_metrics.
401 */
403
404 /**
405 * @private
406 * Number of elapsed time range buckets in latency histograms. This is set using as_policy_metrics.
407 */
409
410 /**
411 * @private
412 * Power of 2 multiple between each range bucket in latency histograms starting at column 3. The bucket units
413 * are in milliseconds. The first 2 buckets are "<=1ms" and ">1ms". Examples:
414 *
415 * ~~~~~~~~~~{.c}
416 * // latencyColumns=7 latencyShift=1
417 * <=1ms >1ms >2ms >4ms >8ms >16ms >32ms
418 *
419 * // latencyColumns=5 latencyShift=3
420 * <=1ms >1ms >8ms >64ms >512ms
421 * ~~~~~~~~~~
422 *
423 * This is set using as_policy_metrics.
424 */
426
427 /**
428 * @private
429 * Listeners that handles metrics notification events. The default listener implementation
430 * writes the metrics snapshot to a file which will later be read and forwarded to
431 * OpenTelemetry by a separate offline application.
432 *
433 * The listener could be overridden to send the metrics snapshot directly to OpenTelemetry.
434 *
435 * This is set using as_policy_metrics.
436 */
438
439 /**
440 * @private
441 * Transaction retry count. There can be multiple retries for a single transaction.
442 * The value is cumulative and not reset per metrics interval.
443 */
444 uint64_t retry_count;
445
446 /**
447 * @private
448 * Transaction count. The value is cumulative and not reset per metrics interval.
449 */
450 uint64_t tran_count;
451
452 /**
453 * @private
454 * Delay queue timeout count. The value is cumulative and not reset per metrics interval.
455 */
457
458} as_cluster;
459
460/******************************************************************************
461 * FUNCTIONS
462 ******************************************************************************/
463
464/**
465 * Create and initialize cluster.
466 */
469
470/**
471 * Close all connections and release memory associated with cluster.
472 */
473void
475
476/**
477 * Is cluster connected to any server nodes.
478 */
479bool
481
482/**
483 * Get all node names in cluster.
484 */
485void
486as_cluster_get_node_names(as_cluster* cluster, int* n_nodes, char** node_names);
487
488/**
489 * Reserve reference counted access to cluster nodes.
490 */
491static inline as_nodes*
493{
494 as_nodes* nodes = (as_nodes*)as_load_ptr((void* const*)&cluster->nodes);
495 as_incr_uint32(&nodes->ref_count);
496 return nodes;
497}
498
499/**
500 * Release each individual node and free nodes struct.
501 */
502AS_EXTERN void
504
505/**
506 * Release reference counted access to cluster nodes.
507 */
508static inline void
510{
511 if (as_aaf_uint32_rls(&nodes->ref_count, -1) == 0) {
512 as_fence_acq();
513 as_nodes_destroy(nodes);
514 }
515}
516
517/**
518 * Reserve nodes. Return error if cluster is empty.
519 */
522
523/**
524 * Release nodes.
525 */
526static inline void
531
532/**
533 * Verify cluster contains nodes and return node count.
534 */
536as_cluster_validate_size(as_cluster* cluster, as_error* err, uint32_t* size);
537
538/**
539 * Add seed to cluster.
540 */
541AS_EXTERN void
542as_cluster_add_seed(as_cluster* cluster, const char* hostname, const char* tls_name, uint16_t port);
543
544/**
545 * Remove seed from cluster.
546 */
547AS_EXTERN void
548as_cluster_remove_seed(as_cluster* cluster, const char* hostname, uint16_t port);
549
550/**
551 * @private
552 * Change user and password that is used to authenticate with cluster servers.
553 */
554void
555as_cluster_change_password(as_cluster* cluster, const char* user, const char* password, const char* password_hash);
556
557/**
558 * @private
559 * Get random node in the cluster.
560 * as_nodes_release() must be called when done with node.
561 */
564
565/**
566 * @private
567 * Get node given node name.
568 * as_nodes_release() must be called when done with node.
569 */
571as_node_get_by_name(as_cluster* cluster, const char* name);
572
573/**
574 * @private
575 * Get mapped node given partition and replica. This function does not reserve the node.
576 * The caller must reserve the node for future use.
577 */
578as_node*
580 as_cluster* cluster, const char* ns, as_partition* p, as_node* prev_node,
581 as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
582 );
583
584struct as_partition_shm_s;
585
586/**
587 * @private
588 * Get mapped node given partition and replica. The function does not reserve the node.
589 * The caller must reserve the node for future use.
590 */
591as_node*
593 as_cluster* cluster, const char* ns, struct as_partition_shm_s* partition,
594 as_node* prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
595 );
596
597/**
598 * @private
599 * Enable the collection of metrics
600 */
603
604/**
605 * @private
606 * Disable the collection of metrics
607 */
610
611/**
612 * @private
613 * Increment transaction count when metrics are enabled.
614 */
615static inline void
617{
618 if (cluster->metrics_enabled) {
619 as_incr_uint64(&cluster->tran_count);
620 }
621}
622
623/**
624 * @private
625 * Return transaction count. The value is cumulative and not reset per metrics interval.
626 */
627static inline uint64_t
629{
630 return as_load_uint64(&cluster->tran_count);
631}
632
633/**
634 * @private
635 * Increment async delay queue timeout count.
636 */
637static inline void
639{
640 as_incr_uint64(&cluster->retry_count);
641}
642
643/**
644 * @private
645 * Add transaction retry count. There can be multiple retries for a single transaction.
646 */
647static inline void
648as_cluster_add_retries(as_cluster* cluster, uint32_t count)
649{
650 as_faa_uint64(&cluster->retry_count, count);
651}
652
653/**
654 * @private
655 * Return transaction retry count. The value is cumulative and not reset per metrics interval.
656 */
657static inline uint64_t
659{
660 return as_load_uint64(&cluster->retry_count);
661}
662
663/**
664 * @private
665 * Increment async delay queue timeout count.
666 */
667static inline void
672
673/**
674 * @private
675 * Return async delay queue timeout count.
676 */
677static inline uint64_t
682
683/**
684 * @private
685 * Get mapped node given partition and replica. This function does not reserve the node.
686 * The caller must reserve the node for future use.
687 */
688static inline as_node*
690 as_cluster* cluster, const char* ns, void* partition, as_node* prev_node,
691 as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
692 )
693{
694 if (cluster->shm_info) {
695 return as_partition_shm_get_node(cluster, ns, (struct as_partition_shm_s*)partition,
696 prev_node, replica, replica_size, replica_index);
697 }
698 else {
699 return as_partition_reg_get_node(cluster, ns, (as_partition*)partition, prev_node,
700 replica, replica_size, replica_index);
701 }
702}
703
704/**
705 * @private
706 * Increment node's error count.
707 */
708static inline void
710{
711 if (node->cluster->max_error_rate > 0) {
713 }
714}
715
716/**
717 * @private
718 * Reset node's error count.
719 */
720static inline void
725
726/**
727 * @private
728 * Get node's error count.
729 */
730static inline uint32_t
732{
733 return as_load_uint32(&node->error_rate);
734}
735
736/**
737 * @private
738 * Validate node's error count.
739 */
740static inline bool
742{
743 uint32_t max = node->cluster->max_error_rate;
744 return max == 0 || max >= as_load_uint32(&node->error_rate);
745}
746
747/**
748 * @private
749 * Close connection and increment node's error count.
750 */
751static inline void
753{
754 as_node_close_connection(node, sock, pool);
756}
757
758/**
759 * @private
760 * Put connection in pool and increment node's error count.
761 */
762static inline void
768
769#ifdef __cplusplus
770} // end extern "C"
771#endif
#define as_fence_acq()
#define as_load_ptr(_target)
#define as_incr_uint64(_target)
#define as_faa_uint64(_target, _value)
#define as_incr_uint32(_target)
#define as_load_uint64(_target)
#define as_aaf_uint32_rls(_target, _value)
#define as_load_uint32(_target)
#define as_store_uint32(_target, _value)
static void as_node_close_conn_error(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition as_cluster.h:752
static uint64_t as_cluster_get_tran_count(const as_cluster *cluster)
Definition as_cluster.h:628
as_status as_cluster_validate_size(as_cluster *cluster, as_error *err, uint32_t *size)
static uint64_t as_cluster_get_delay_queue_timeout_count(const as_cluster *cluster)
Definition as_cluster.h:678
void as_cluster_destroy(as_cluster *cluster)
AS_EXTERN void as_cluster_remove_seed(as_cluster *cluster, const char *hostname, uint16_t port)
static void as_cluster_add_retries(as_cluster *cluster, uint32_t count)
Definition as_cluster.h:648
as_node * as_partition_reg_get_node(as_cluster *cluster, const char *ns, as_partition *p, as_node *prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t *replica_index)
void as_cluster_change_password(as_cluster *cluster, const char *user, const char *password, const char *password_hash)
static void as_node_incr_error_rate(as_node *node)
Definition as_cluster.h:709
static void as_cluster_release_all_nodes(as_nodes *nodes)
Definition as_cluster.h:527
static as_nodes * as_nodes_reserve(as_cluster *cluster)
Definition as_cluster.h:492
as_node * as_partition_shm_get_node(as_cluster *cluster, const char *ns, struct as_partition_shm_s *partition, as_node *prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t *replica_index)
static void as_cluster_add_delay_queue_timeout(as_cluster *cluster)
Definition as_cluster.h:668
static void as_cluster_add_retry(as_cluster *cluster)
Definition as_cluster.h:638
as_status as_cluster_enable_metrics(as_error *err, as_cluster *cluster, as_metrics_policy *policy)
as_status as_cluster_create(as_config *config, as_error *err, as_cluster **cluster)
static uint64_t as_cluster_get_retry_count(const as_cluster *cluster)
Definition as_cluster.h:658
AS_EXTERN void as_nodes_destroy(as_nodes *nodes)
static bool as_node_valid_error_rate(as_node *node)
Definition as_cluster.h:741
void as_cluster_get_node_names(as_cluster *cluster, int *n_nodes, char **node_names)
static as_node * as_partition_get_node(as_cluster *cluster, const char *ns, void *partition, as_node *prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t *replica_index)
Definition as_cluster.h:689
static void as_nodes_release(as_nodes *nodes)
Definition as_cluster.h:509
static uint32_t as_node_get_error_rate(as_node *node)
Definition as_cluster.h:731
as_status as_cluster_disable_metrics(as_error *err, as_cluster *cluster)
bool as_cluster_is_connected(as_cluster *cluster)
static void as_node_put_conn_error(as_node *node, as_socket *sock)
Definition as_cluster.h:763
AS_EXTERN as_status as_cluster_reserve_all_nodes(as_cluster *cluster, as_error *err, as_nodes **nodes)
AS_EXTERN as_node * as_node_get_by_name(as_cluster *cluster, const char *name)
void(* as_release_fn)(void *value)
Definition as_cluster.h:79
static void as_cluster_add_tran(as_cluster *cluster)
Definition as_cluster.h:616
AS_EXTERN as_node * as_node_get_random(as_cluster *cluster)
static void as_node_reset_error_rate(as_node *node)
Definition as_cluster.h:721
AS_EXTERN void as_cluster_add_seed(as_cluster *cluster, const char *hostname, const char *tls_name, uint16_t port)
static void as_node_close_connection(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition as_node.h:593
static void as_node_put_connection(as_node *node, as_socket *sock)
Definition as_node.h:616
as_status
Definition as_status.h:30
#define AS_EXTERN
Definition as_std.h:25
as_policy_replica
Definition as_policy.h:272
as_partition_tables partition_tables
Definition as_cluster.h:113
uint32_t metrics_latency_shift
Definition as_cluster.h:425
pthread_cond_t tend_cond
Definition as_cluster.h:226
as_cluster_event_callback event_callback
Definition as_cluster.h:154
bool fail_if_not_connected
Definition as_cluster.h:382
int tend_thread_cpu
Definition as_cluster.h:340
uint32_t login_timeout_ms
Definition as_cluster.h:322
pthread_t tend_thread
Definition as_cluster.h:199
as_vector * seeds
Definition as_cluster.h:170
as_auth_mode auth_mode
Definition as_cluster.h:346
int * rack_ids
Definition as_cluster.h:244
uint16_t n_partitions
Definition as_cluster.h:352
uint32_t tend_count
Definition as_cluster.h:274
uint32_t rack_ids_size
Definition as_cluster.h:250
bool use_services_alternate
Definition as_cluster.h:358
uint64_t delay_queue_timeout_count
Definition as_cluster.h:456
uint32_t async_min_conns_per_node
Definition as_cluster.h:292
uint32_t tend_interval
Definition as_cluster.h:268
uint32_t metrics_interval
Definition as_cluster.h:402
char * cluster_name
Definition as_cluster.h:149
uint64_t tran_count
Definition as_cluster.h:450
as_event_state * event_state
Definition as_cluster.h:164
volatile bool valid
Definition as_cluster.h:388
void * event_callback_udata
Definition as_cluster.h:159
uint32_t min_conns_per_node
Definition as_cluster.h:280
uint32_t conn_timeout_ms
Definition as_cluster.h:316
bool auth_enabled
Definition as_cluster.h:370
uint32_t conn_pools_per_node
Definition as_cluster.h:310
as_vector * gc
Definition as_cluster.h:119
as_thread_pool thread_pool
Definition as_cluster.h:193
char * password_hash
Definition as_cluster.h:143
char * user
Definition as_cluster.h:131
bool metrics_enabled
Definition as_cluster.h:394
pthread_mutex_t seed_lock
Definition as_cluster.h:205
as_tls_context * tls_ctx
Definition as_cluster.h:187
uint64_t max_socket_idle_ns_trim
Definition as_cluster.h:238
as_metrics_listeners metrics_listeners
Definition as_cluster.h:437
uint32_t node_index
Definition as_cluster.h:328
char * password
Definition as_cluster.h:137
pthread_mutex_t metrics_lock
Definition as_cluster.h:211
pthread_mutex_t tend_lock
Definition as_cluster.h:220
uint32_t error_rate_window
Definition as_cluster.h:262
uint32_t max_conns_per_node
Definition as_cluster.h:286
uint32_t invalid_node_count
Definition as_cluster.h:334
bool has_partition_query
Definition as_cluster.h:376
uint64_t retry_count
Definition as_cluster.h:444
bool rack_aware
Definition as_cluster.h:364
uint32_t pipe_max_conns_per_node
Definition as_cluster.h:304
as_nodes * nodes
Definition as_cluster.h:107
struct as_shm_info_s * shm_info
Definition as_cluster.h:125
uint32_t async_max_conns_per_node
Definition as_cluster.h:298
uint64_t max_socket_idle_ns_tran
Definition as_cluster.h:232
as_vector * ip_map
Definition as_cluster.h:181
uint32_t max_error_rate
Definition as_cluster.h:256
uint32_t metrics_latency_columns
Definition as_cluster.h:408
void * data
Definition as_cluster.h:90
as_release_fn release_fn
Definition as_cluster.h:96
struct as_cluster_s * cluster
Definition as_node.h:280
uint32_t error_rate
Definition as_node.h:348
uint32_t ref_count
Definition as_cluster.h:44
uint32_t size
Definition as_cluster.h:50