All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
as_node.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2025 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_atomic.h>
20#include <aerospike/as_config.h>
22#include <aerospike/as_error.h>
23#include <aerospike/as_event.h>
25#include <aerospike/as_socket.h>
27#include <aerospike/as_queue.h>
28#include <aerospike/as_vector.h>
29
30#if !defined(_MSC_VER)
31#include <netinet/in.h>
32#include <sys/uio.h>
33#endif
34
35#ifdef __cplusplus
36extern "C" {
37#endif
38
39/******************************************************************************
40 * MACROS
41 *****************************************************************************/
42
43/**
44 * Maximum size (including NULL byte) of a hostname.
45 */
46#define AS_HOSTNAME_SIZE 256
47
48/**
49 * Maximum size of node name
50 */
51#define AS_NODE_NAME_SIZE 20
52
53// Leave this is in for backwards compatibility.
54#define AS_NODE_NAME_MAX_SIZE AS_NODE_NAME_SIZE
55
56#define AS_FEATURES_PARTITION_SCAN (1 << 0)
57#define AS_FEATURES_QUERY_SHOW (1 << 1)
58#define AS_FEATURES_BATCH_ANY (1 << 2)
59#define AS_FEATURES_PARTITION_QUERY (1 << 3)
60
61#define AS_ADDRESS4_MAX 4
62#define AS_ADDRESS6_MAX 8
63
64/******************************************************************************
65 * TYPES
66 *****************************************************************************/
67
68/**
69 * Socket address information.
70 */
71typedef struct as_address_s {
72 /**
73 * Socket IP address.
74 */
75 struct sockaddr_storage addr;
76
77 /**
78 * Socket IP address string representation including port.
79 */
81
83
84/**
85 * @private
86 * Rack.
87 */
88typedef struct as_rack_s {
89 /**
90 * Namespace
91 */
93
94 /**
95 * Rack ID
96 */
98
99} as_rack;
100
101/**
102 * @private
103 * Racks.
104 */
105typedef struct as_racks_s {
106 /**
107 * Reference count of racks array.
108 */
109 uint32_t ref_count;
110
111 /**
112 * Rack ID when all namespaces use same rack.
113 */
115
116 /**
117 * Length of racks array.
118 */
119 uint32_t size;
120
121 /**
122 * Pad to 8 byte boundary.
123 */
124 uint32_t pad;
125
126 /**
127 * Racks array.
128 */
129 as_rack racks[];
130
131} as_racks;
132
133/**
134 * @private
135 * Session info.
136 */
137typedef struct as_session_s {
138 /**
139 * Reference count of session.
140 */
141 uint32_t ref_count;
142
143 /**
144 * Session token length.
145 */
146 uint32_t token_length;
147
148 /**
149 * Session expiration for this node.
150 */
151 uint64_t expiration;
152
153 /**
154 * Session token for this node.
155 */
156 uint8_t token[];
157
158} as_session;
159
160/**
161 * @private
162 * Async connection pool.
163 */
164typedef struct as_async_conn_pool_s {
165 /**
166 * Async connection queue.
167 */
169
170 /**
171 * Min connections allowed for this pool.
172 */
173 uint32_t min_size;
174
175 /**
176 * Max connections allowed for this pool.
177 */
178 uint32_t limit;
179
180 /**
181 * Total async connections opened.
182 */
183 uint32_t opened;
184
185 /**
186 * Total async connections closed.
187 */
188 uint32_t closed;
189
191
192/**
193 * Node metrics latency bucket struct
194 */
195typedef struct as_node_metrics_s {
198
199struct as_cluster_s;
200
201/**
202 * Server node representation.
203 */
204typedef struct as_node_s {
205 /**
206 * Reference count of node.
207 */
208 uint32_t ref_count;
209
210 /**
211 * Reference count of node in partition maps.
212 */
214
215 /**
216 * Server's generation count for partition management.
217 */
219
220 /**
221 * Features supported by server. Stored in bitmap.
222 */
223 uint32_t features;
224
225 /**
226 * TLS certificate name (needed for TLS only, NULL otherwise).
227 */
228 char* tls_name;
229
230 /**
231 * The name of the node.
232 */
234
235 /**
236 * Primary address index into addresses array.
237 */
239
240 /**
241 * Number of IPv4 addresses.
242 */
244
245 /**
246 * Number of IPv6 addresses.
247 */
249
250 /**
251 * Array of IP addresses. Not thread-safe.
252 */
254
255 /**
256 * Optional hostname. Not thread-safe.
257 */
258 char* hostname;
259
260 /**
261 * Cluster from which this node resides.
262 */
263 struct as_cluster_s* cluster;
264
265 /**
266 * Pools of current, cached sockets.
267 */
269
270 /**
271 * Array of connection pools used in async commands. There is one pool per node/event loop.
272 * Only used by event loop threads. Not thread-safe.
273 */
275
276 /**
277 * Pool of connections used in pipelined async commands. Also not thread-safe.
278 */
280
281 /**
282 * Authentication session.
283 */
285
286 /**
287 * Racks data.
288 */
290
291 /**
292 * Node metrics
293 */
295
296 /**
297 * Socket used exclusively for cluster tend thread info requests.
298 */
300
301 /**
302 * Command error count since node was initialized. If the error is retryable, multiple errors per
303 * command may occur.
304 */
305 uint64_t error_count;
306
307 /**
308 * Command timeout count since node was initialized. If the timeout is retryable (ie socketTimeout),
309 * multiple timeouts per command may occur.
310 */
312
313 /**
314 * Connection queue iterator. Not atomic by design.
315 */
316 uint32_t conn_iter;
317
318 /**
319 * Total sync connections opened.
320 */
322
323 /**
324 * Total sync connections closed.
325 */
327
328 /**
329 * Error count for this node's error_rate_window.
330 */
331 uint32_t error_rate;
332
333 /**
334 * Server's generation count for peers.
335 */
337
338 /**
339 * Number of peers returned by server node.
340 */
341 uint32_t peers_count;
342
343 /**
344 * Server's generation count for partition rebalancing.
345 */
347
348 /**
349 * Number of other nodes that consider this node a member of the cluster.
350 */
351 uint32_t friends;
352
353 /**
354 * Number of consecutive info request failures.
355 */
356 uint32_t failures;
357
358 /**
359 * Shared memory node array index.
360 */
361 uint32_t index;
362
363 /**
364 * Should user login to avoid session expiration.
365 */
367
368 /**
369 * Is node currently active.
370 */
371 uint8_t active;
372
373 /**
374 * Did partition change in current cluster tend.
375 */
377
378 /**
379 * Did rebalance generation change in current cluster tend.
380 */
382
383} as_node;
384
385/**
386 * @private
387 * Node discovery information.
388 */
389typedef struct as_node_info_s {
390 /**
391 * Node name.
392 */
394
395 /**
396 * Features supported by server. Stored in bitmap.
397 */
398 uint32_t features;
399
400 /**
401 * Host.
402 */
404
405 /**
406 * Validated socket.
407 */
409
410 /**
411 * Socket address.
412 */
413 struct sockaddr_storage addr;
414
415 /**
416 * Authentication session.
417 */
419
421
422/******************************************************************************
423 * FUNCTIONS
424 ******************************************************************************/
425
426/**
427 * @private
428 * Create new cluster node.
429 */
430as_node*
431as_node_create(struct as_cluster_s* cluster, as_node_info* node_info);
432
433/**
434 * @private
435 * Close all connections in pool and free resources.
436 */
437AS_EXTERN void
439
440/**
441 * @private
442 * Destroy node metrics.
443 */
444void
446
447/**
448 * @private
449 * Create configured minimum number of connections.
450 */
451void
453
454/**
455 * @private
456 * Check if node is active from a command thread.
457 */
458static inline bool
460{
461 return (bool)as_load_uint8_acq(&node->active);
462}
463
464/**
465 * @private
466 * Set node to inactive.
467 */
468static inline void
470{
471 // Make volatile write so changes are reflected in other threads.
472 as_store_uint8_rls(&node->active, false);
473}
474
475/**
476 * @private
477 * Read volatile node.
478 */
479static inline as_node*
481{
482 return (as_node*)as_load_ptr((void* const*)node);
483}
484
485/**
486 * @private
487 * Reserve existing cluster node.
488 */
489static inline void
491{
493}
494
495/**
496 * @private
497 * Set volatile node.
498 */
499static inline void
501{
502 as_store_ptr_rls((void**)trg, src);
503}
504
505/**
506 * @private
507 * Release existing cluster node.
508 */
509static inline void
511{
512 if (as_aaf_uint32_rls(&node->ref_count, -1) == 0) {
513 as_fence_acq();
514 as_node_destroy(node);
515 }
516}
517
518/**
519 * @private
520 * Release node on next cluster tend iteration.
521 */
522void
524
525/**
526 * @private
527 * Add socket address to node addresses.
528 */
529void
530as_node_add_address(as_node* node, struct sockaddr* addr);
531
532/**
533 * @private
534 * Set hostname.
535 */
536void
537as_node_set_hostname(as_node* node, const char* hostname);
538
539/**
540 * Get primary socket address.
541 */
542static inline as_address*
544{
545 return &node->addresses[node->address_index];
546}
547
548/**
549 * Get socket address as a string.
550 */
551static inline const char*
553{
554 return node->addresses[node->address_index].name;
555}
556
557/**
558 * @private
559 * Attempt to authenticate given current cluster's user and password.
560 */
562as_node_authenticate_connection(struct as_cluster_s* cluster, uint64_t deadline_ms);
563
564/**
565 * @private
566 * Get a connection to the given node from pool and validate. Return 0 on success.
567 */
569as_node_get_connection(as_error* err, as_node* node, uint32_t socket_timeout, uint64_t deadline_ms, as_socket* sock);
570
571/**
572 * @private
573 * Close a node's connection and update node/pool statistics.
574 */
575static inline void
582
583/**
584 * @private
585 * Close a node's connection and update node statistics.
586 */
587static inline void
593
594/**
595 * @private
596 * Put connection back into pool.
597 */
598static inline void
600{
601 // Save pool.
602 as_conn_pool* pool = sock->pool;
603
604 // Update last used timestamp.
605 sock->last_used = cf_getns();
606
607 // Put into pool.
608 if (! as_conn_pool_push_head(pool, sock)) {
609 as_node_close_connection(node, sock, pool);
610 }
611}
612
613/**
614 * @private
615 * Balance sync connections.
616 */
617void
619
620/**
621 * @private
622 * Are hosts equal.
623 */
624static inline bool
626{
627 return strcmp(h1->name, h2->name) == 0 && h1->port == h2->port;
628}
629
630/**
631 * @private
632 * Destroy node_info contents.
633 */
634static inline void
636{
637 as_socket_close(&node_info->socket);
638 cf_free(node_info->session);
639}
640
641/**
642 * @private
643 * Tell tend thread to perform another node login.
644 */
645void
647
648/**
649 * @private
650 * Does node contain rack.
651 */
652bool
653as_node_has_rack(as_node* node, const char* ns, int rack_id);
654
655/**
656 * @private
657 * Record latency of type latency_type for node
658 */
659void
660as_node_add_latency(as_node* node, as_latency_type latency_type, uint64_t elapsed);
661
662struct as_metrics_policy_s;
663
664/**
665 * @private
666 * Enable metrics at the node level
667 */
668void
669as_node_enable_metrics(as_node* node, const struct as_metrics_policy_s* policy);
670
671/**
672 * Return command error count. The value is cumulative and not reset per metrics interval.
673 */
674static inline uint64_t
676{
677 return as_load_uint64(&node->error_count);
678}
679
680/**
681 * Increment command error count. If the error is retryable, multiple errors per
682 * command may occur.
683 */
684static inline void
689
690/**
691 * Return command timeout count. The value is cumulative and not reset per metrics interval.
692 */
693static inline uint64_t
698
699/**
700 * Increment command timeout count. If the timeout is retryable (ie socketTimeout),
701 * multiple timeouts per command may occur.
702 */
703static inline void
708
709/**
710 * @private
711 * Volatile read session pointer.
712 */
713static inline as_session*
715{
716 return (as_session*)as_load_ptr((void* const*)session);
717}
718
719/**
720 * @private
721 * Release existing session.
722 */
723static inline void
725{
726 if (as_aaf_uint32_rls(&session->ref_count, -1) == 0) {
727 cf_free(session);
728 }
729}
730
731#ifdef __cplusplus
732} // end extern "C"
733#endif
#define AS_IP_ADDRESS_SIZE
Definition as_address.h:32
#define as_fence_acq()
#define as_load_ptr(_target)
#define as_incr_uint64(_target)
#define as_incr_uint32(_target)
#define as_store_ptr_rls(_target, _value)
#define as_store_uint8_rls(_target, _value)
#define as_load_uint8_acq(_target)
#define as_load_uint64(_target)
#define as_aaf_uint32_rls(_target, _value)
static bool as_conn_pool_push_head(as_conn_pool *pool, as_socket *sock)
static void as_conn_pool_decr(as_conn_pool *pool)
uint8_t as_latency_type
Definition as_latency.h:29
AS_EXTERN void as_node_destroy(as_node *node)
static void as_node_deactivate(as_node *node)
Definition as_node.h:469
static void as_node_info_destroy(as_node_info *node_info)
Definition as_node.h:635
static uint64_t as_node_get_error_count(as_node *node)
Definition as_node.h:675
void as_node_balance_connections(as_node *node)
static as_session * as_session_load(as_session **session)
Definition as_node.h:714
void as_node_create_min_connections(as_node *node)
bool as_node_has_rack(as_node *node, const char *ns, int rack_id)
static void as_session_release(as_session *session)
Definition as_node.h:724
static as_address * as_node_get_address(as_node *node)
Definition as_node.h:543
static void as_node_close_connection(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition as_node.h:576
as_node * as_node_create(struct as_cluster_s *cluster, as_node_info *node_info)
static bool as_host_equals(as_host *h1, as_host *h2)
Definition as_node.h:625
static void as_node_store(as_node **trg, as_node *src)
Definition as_node.h:500
void as_node_destroy_metrics(as_node *node)
as_status as_node_get_connection(as_error *err, as_node *node, uint32_t socket_timeout, uint64_t deadline_ms, as_socket *sock)
static bool as_node_is_active(const as_node *node)
Definition as_node.h:459
as_status as_node_authenticate_connection(struct as_cluster_s *cluster, uint64_t deadline_ms)
static const char * as_node_get_address_string(as_node *node)
Definition as_node.h:552
void as_node_add_address(as_node *node, struct sockaddr *addr)
static void as_node_close_socket(as_node *node, as_socket *sock)
Definition as_node.h:588
static as_node * as_node_load(as_node **node)
Definition as_node.h:480
#define AS_NODE_NAME_SIZE
Definition as_node.h:51
void as_node_release_delayed(as_node *node)
void as_node_enable_metrics(as_node *node, const struct as_metrics_policy_s *policy)
void as_node_add_latency(as_node *node, as_latency_type latency_type, uint64_t elapsed)
static void as_node_add_error(as_node *node)
Definition as_node.h:685
static void as_node_put_connection(as_node *node, as_socket *sock)
Definition as_node.h:599
static void as_node_reserve(as_node *node)
Definition as_node.h:490
static void as_node_add_timeout(as_node *node)
Definition as_node.h:704
void as_node_set_hostname(as_node *node, const char *hostname)
void as_node_signal_login(as_node *node)
static uint64_t as_node_get_timeout_count(as_node *node)
Definition as_node.h:694
static void as_node_release(as_node *node)
Definition as_node.h:510
#define AS_MAX_NAMESPACE_SIZE
void as_socket_close(as_socket *sock)
as_status
Definition as_status.h:30
#define AS_EXTERN
Definition as_std.h:25
char name[AS_IP_ADDRESS_SIZE]
Definition as_node.h:80
uint32_t min_size
Definition as_node.h:173
uint32_t opened
Definition as_node.h:183
uint32_t closed
Definition as_node.h:188
char * name
Definition as_host.h:37
uint16_t port
Definition as_host.h:47
as_host host
Definition as_node.h:403
as_session * session
Definition as_node.h:418
as_socket socket
Definition as_node.h:408
uint32_t features
Definition as_node.h:398
as_latency_buckets * latency
Definition as_node.h:196
as_session * session
Definition as_node.h:284
uint64_t error_count
Definition as_node.h:305
uint32_t peers_count
Definition as_node.h:341
bool rebalance_changed
Definition as_node.h:381
as_conn_pool * sync_conn_pools
Definition as_node.h:268
uint32_t ref_count
Definition as_node.h:208
as_node_metrics * metrics
Definition as_node.h:294
uint32_t address4_size
Definition as_node.h:243
uint64_t timeout_count
Definition as_node.h:311
struct as_cluster_s * cluster
Definition as_node.h:263
uint32_t partition_generation
Definition as_node.h:218
uint32_t failures
Definition as_node.h:356
uint32_t peers_generation
Definition as_node.h:336
uint32_t address_index
Definition as_node.h:238
as_address * addresses
Definition as_node.h:253
uint32_t friends
Definition as_node.h:351
as_async_conn_pool * pipe_conn_pools
Definition as_node.h:279
uint32_t sync_conns_opened
Definition as_node.h:321
char * hostname
Definition as_node.h:258
uint32_t index
Definition as_node.h:361
uint32_t conn_iter
Definition as_node.h:316
uint32_t sync_conns_closed
Definition as_node.h:326
as_async_conn_pool * async_conn_pools
Definition as_node.h:274
char * tls_name
Definition as_node.h:228
uint8_t active
Definition as_node.h:371
uint8_t perform_login
Definition as_node.h:366
uint32_t address6_size
Definition as_node.h:248
uint32_t features
Definition as_node.h:223
uint32_t error_rate
Definition as_node.h:331
as_racks * racks
Definition as_node.h:289
uint32_t partition_ref_count
Definition as_node.h:213
uint32_t rebalance_generation
Definition as_node.h:346
bool partition_changed
Definition as_node.h:376
as_socket info_socket
Definition as_node.h:299
int rack_id
Definition as_node.h:97
uint32_t ref_count
Definition as_node.h:109
uint32_t pad
Definition as_node.h:124
uint32_t size
Definition as_node.h:119
int rack_id
Definition as_node.h:114
uint32_t ref_count
Definition as_node.h:141
uint32_t token_length
Definition as_node.h:146
uint64_t expiration
Definition as_node.h:151
struct as_conn_pool_s * pool
Definition as_socket.h:87
uint64_t last_used
Definition as_socket.h:88