All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
as_async.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2025 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
24#include <citrusleaf/alloc.h>
25
26#ifdef __cplusplus
27extern "C" {
28#endif
29
30/******************************************************************************
31 * TYPES
32 *****************************************************************************/
33
34#define AS_ASYNC_TYPE_WRITE 0
35#define AS_ASYNC_TYPE_RECORD 1
36#define AS_ASYNC_TYPE_VALUE 2
37#define AS_ASYNC_TYPE_BATCH 3
38#define AS_ASYNC_TYPE_SCAN 4
39#define AS_ASYNC_TYPE_QUERY 5
40#define AS_ASYNC_TYPE_INFO 6
41#define AS_ASYNC_TYPE_SCAN_PARTITION 7
42#define AS_ASYNC_TYPE_QUERY_PARTITION 8
43#define AS_ASYNC_TYPE_CONNECTOR 9
44#define AS_ASYNC_TYPE_TXN_MONITOR 10
45
46#define AS_AUTHENTICATION_MAX_SIZE 158
47
48#define AS_ASYNC_CONNECTION_COMPLETE 0
49#define AS_ASYNC_CONNECTION_PENDING 1
50#define AS_ASYNC_CONNECTION_ERROR 2
51
57
63
69
75
76/******************************************************************************
77 * FUNCTIONS
78 *****************************************************************************/
79
80static inline as_event_command*
82 as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
83 as_policy_replica replica, as_async_write_listener listener, void* udata,
84 as_event_loop* event_loop, as_pipe_listener pipe_listener, size_t size,
85 as_event_parse_results_fn parse_results, uint8_t* ubuf, uint32_t ubuf_size
86 )
87{
88 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
89 // Then, round up memory size in 1KB increments.
90 size_t s = (sizeof(as_async_write_command) + size + AS_AUTHENTICATION_MAX_SIZE + 1023) & ~1023;
91 as_event_command* cmd = (as_event_command*)cf_malloc(s);
93 cmd->total_deadline = policy->total_timeout;
94 cmd->socket_timeout = policy->socket_timeout;
95 cmd->max_retries = policy->max_retries;
96 cmd->iteration = 0;
97 cmd->replica = as_command_write_replica(replica);
98 cmd->event_loop = as_event_assign(event_loop);
99 cmd->cluster = cluster;
100 cmd->node = NULL;
101 cmd->ns = pi->ns;
102 cmd->partition = pi->partition;
103 cmd->udata = udata;
104 cmd->parse_results = parse_results;
105 cmd->pipe_listener = pipe_listener;
106 cmd->buf = wcmd->space;
107 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_write_command));
111 cmd->flags = 0;
112 cmd->replica_size = pi->replica_size;
113 cmd->replica_index = 0;
114 cmd->txn = policy->txn;
115 cmd->ubuf = ubuf;
116 cmd->ubuf_size = ubuf_size;
118 wcmd->listener = listener;
120 return cmd;
121}
122
123static inline as_event_command*
125 as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
126 as_policy_replica replica, uint8_t replica_index, bool deserialize, bool heap_rec,
127 uint8_t flags, as_async_record_listener listener, void* udata, as_event_loop* event_loop,
128 as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results,
129 uint8_t type, as_latency_type latency_type, uint8_t* ubuf, uint32_t ubuf_size
130 )
131{
132 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
133 // Then, round up memory size in 4KB increments to reduce fragmentation and to allow socket
134 // read to reuse buffer for small socket write sizes.
135 size_t s = (sizeof(as_async_record_command) + size + AS_AUTHENTICATION_MAX_SIZE + 4095) & ~4095;
136 as_event_command* cmd = (as_event_command*)cf_malloc(s);
138 cmd->total_deadline = policy->total_timeout;
139 cmd->socket_timeout = policy->socket_timeout;
140 cmd->max_retries = policy->max_retries;
141 cmd->iteration = 0;
142 cmd->replica = replica;
143 cmd->event_loop = as_event_assign(event_loop);
144 cmd->cluster = cluster;
145 cmd->node = NULL;
146 cmd->ns = pi->ns;
147 cmd->partition = pi->partition;
148 cmd->udata = udata;
149 cmd->parse_results = parse_results;
150 cmd->pipe_listener = pipe_listener;
151 cmd->buf = rcmd->space;
152 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_record_command));
153 cmd->type = type;
156 cmd->flags = flags;
157
158 if (deserialize) {
160 }
161
162 if (heap_rec) {
164 }
165
166 cmd->replica_size = pi->replica_size;
167 cmd->replica_index = replica_index;
168 cmd->txn = policy->txn;
169 cmd->ubuf = ubuf;
170 cmd->ubuf_size = ubuf_size;
171 cmd->latency_type = latency_type;
172 rcmd->listener = listener;
174 return cmd;
175}
176
177static inline as_event_command*
179 as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
180 as_policy_replica replica, as_async_value_listener listener, void* udata,
181 as_event_loop* event_loop, as_pipe_listener pipe_listener, size_t size,
182 as_event_parse_results_fn parse_results, uint8_t* ubuf, uint32_t ubuf_size
183 )
184{
185 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
186 // Then, round up memory size in 4KB increments to reduce fragmentation and to allow socket
187 // read to reuse buffer for small socket write sizes.
188 size_t s = (sizeof(as_async_value_command) + size + AS_AUTHENTICATION_MAX_SIZE + 4095) & ~4095;
189 as_event_command* cmd = (as_event_command*)cf_malloc(s);
191 cmd->total_deadline = policy->total_timeout;
192 cmd->socket_timeout = policy->socket_timeout;
193 cmd->max_retries = policy->max_retries;
194 cmd->iteration = 0;
195 cmd->replica = as_command_write_replica(replica);
196 cmd->event_loop = as_event_assign(event_loop);
197 cmd->cluster = cluster;
198 cmd->node = NULL;
199 cmd->ns = pi->ns;
200 cmd->partition = pi->partition;
201 cmd->udata = udata;
202 cmd->parse_results = parse_results;
203 cmd->pipe_listener = pipe_listener;
204 cmd->buf = vcmd->space;
205 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_value_command));
209 cmd->flags = 0;
210 cmd->replica_size = pi->replica_size;
211 cmd->replica_index = 0;
212 cmd->txn = policy->txn;
213 cmd->ubuf = ubuf;
214 cmd->ubuf_size = ubuf_size;
216 vcmd->listener = listener;
218 return cmd;
219}
220
221static inline as_event_command*
223 as_node* node, const as_policy_info* policy, as_async_info_listener listener, void* udata,
224 as_event_loop* event_loop, size_t size
225 )
226{
227 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
228 // Then, round up memory size in 1KB increments.
229 size_t s = (sizeof(as_async_info_command) + size + AS_AUTHENTICATION_MAX_SIZE + 1023) & ~1023;
230 as_event_command* cmd = (as_event_command*)cf_malloc(s);
232 cmd->total_deadline = policy->timeout;
233 cmd->socket_timeout = policy->timeout;
234 cmd->max_retries = 1;
235 cmd->iteration = 0;
237 cmd->event_loop = as_event_assign(event_loop);
238 cmd->cluster = node->cluster;
239 cmd->node = node;
240 cmd->ns = NULL;
241 cmd->partition = NULL;
242 cmd->udata = udata;
244 cmd->pipe_listener = NULL;
245 cmd->buf = icmd->space;
246 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_info_command));
250 cmd->flags = 0;
251 cmd->replica_size = 1;
252 cmd->replica_index = 0;
253 cmd->txn = NULL;
254 cmd->ubuf = NULL;
255 cmd->ubuf_size = 0;
257 icmd->listener = listener;
259 return cmd;
260}
261
262#ifdef __cplusplus
263} // end extern "C"
264#endif
#define AS_ASYNC_TYPE_VALUE
Definition as_async.h:36
#define AS_ASYNC_TYPE_INFO
Definition as_async.h:40
static as_event_command * as_async_write_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, as_async_write_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results, uint8_t *ubuf, uint32_t ubuf_size)
Definition as_async.h:81
static as_event_command * as_async_record_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, uint8_t replica_index, bool deserialize, bool heap_rec, uint8_t flags, as_async_record_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results, uint8_t type, as_latency_type latency_type, uint8_t *ubuf, uint32_t ubuf_size)
Definition as_async.h:124
#define AS_ASYNC_TYPE_WRITE
Definition as_async.h:34
static as_event_command * as_async_value_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, as_async_value_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results, uint8_t *ubuf, uint32_t ubuf_size)
Definition as_async.h:178
#define AS_AUTHENTICATION_MAX_SIZE
Definition as_async.h:46
static as_event_command * as_async_info_command_create(as_node *node, const as_policy_info *policy, as_async_info_listener listener, void *udata, as_event_loop *event_loop, size_t size)
Definition as_async.h:222
static void as_cluster_add_command_count(as_cluster *cluster)
Definition as_cluster.h:616
static as_policy_replica as_command_write_replica(as_policy_replica replica)
Definition as_command.h:780
bool as_event_command_parse_info(as_event_command *cmd)
#define AS_ASYNC_STATE_UNREGISTERED
#define AS_ASYNC_FLAGS_DESERIALIZE
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
static as_event_loop * as_event_assign(as_event_loop *event_loop)
#define AS_ASYNC_FLAGS_HEAP_REC
#define AS_LATENCY_TYPE_NONE
Definition as_latency.h:36
#define AS_LATENCY_TYPE_WRITE
Definition as_latency.h:32
uint8_t as_latency_type
Definition as_latency.h:29
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
Definition as_listener.h:72
void(* as_async_value_listener)(as_error *err, as_val *val, void *udata, as_event_loop *event_loop)
Definition as_listener.h:61
void(* as_async_write_listener)(as_error *err, void *udata, as_event_loop *event_loop)
Definition as_listener.h:36
void(* as_async_record_listener)(as_error *err, as_record *record, void *udata, as_event_loop *event_loop)
Definition as_listener.h:48
void(* as_async_info_listener)(as_error *err, char *response, void *udata, as_event_loop *event_loop)
Definition as_listener.h:85
uint8_t type
Definition as_proto.h:1
#define AS_MESSAGE_TYPE
Definition as_proto.h:38
#define AS_INFO_MESSAGE_TYPE
Definition as_proto.h:36
as_policy_replica
Definition as_policy.h:273
@ AS_POLICY_REPLICA_MASTER
Definition as_policy.h:278
as_event_command command
Definition as_async.h:71
as_async_info_listener listener
Definition as_async.h:72
as_async_record_listener listener
Definition as_async.h:60
as_event_command command
Definition as_async.h:59
as_async_value_listener listener
Definition as_async.h:66
as_event_command command
Definition as_async.h:65
as_event_command command
Definition as_async.h:53
as_async_write_listener listener
Definition as_async.h:54
struct as_txn * txn
as_latency_type latency_type
as_event_loop * event_loop
as_pipe_listener pipe_listener
as_event_parse_results_fn parse_results
as_policy_replica replica
struct as_cluster_s * cluster
Definition as_node.h:263
const char * ns
uint32_t socket_timeout
Definition as_policy.h:449
uint32_t total_timeout
Definition as_policy.h:464
struct as_txn * txn
Definition as_policy.h:538
uint32_t max_retries
Definition as_policy.h:486
uint32_t timeout
Definition as_policy.h:1486