Loading...
Searching...
No Matches
as_async.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
24#include <citrusleaf/alloc.h>
25
26#ifdef __cplusplus
27extern "C" {
28#endif
29
30/******************************************************************************
31 * TYPES
32 *****************************************************************************/
33
34#define AS_ASYNC_TYPE_WRITE 0
35#define AS_ASYNC_TYPE_RECORD 1
36#define AS_ASYNC_TYPE_VALUE 2
37#define AS_ASYNC_TYPE_BATCH 3
38#define AS_ASYNC_TYPE_SCAN 4
39#define AS_ASYNC_TYPE_QUERY 5
40#define AS_ASYNC_TYPE_INFO 6
41#define AS_ASYNC_TYPE_SCAN_PARTITION 7
42#define AS_ASYNC_TYPE_QUERY_PARTITION 8
43#define AS_ASYNC_TYPE_CONNECTOR 9
44
45#define AS_AUTHENTICATION_MAX_SIZE 158
46
47#define AS_ASYNC_CONNECTION_COMPLETE 0
48#define AS_ASYNC_CONNECTION_PENDING 1
49#define AS_ASYNC_CONNECTION_ERROR 2
50
56
62
68
74
75/******************************************************************************
76 * FUNCTIONS
77 *****************************************************************************/
78
79static inline as_event_command*
81 as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
82 as_policy_replica replica, as_async_write_listener listener, void* udata,
83 as_event_loop* event_loop, as_pipe_listener pipe_listener, size_t size,
84 as_event_parse_results_fn parse_results
85 )
86{
87 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
88 // Then, round up memory size in 1KB increments.
89 size_t s = (sizeof(as_async_write_command) + size + AS_AUTHENTICATION_MAX_SIZE + 1023) & ~1023;
90 as_event_command* cmd = (as_event_command*)cf_malloc(s);
92 cmd->total_deadline = policy->total_timeout;
93 cmd->socket_timeout = policy->socket_timeout;
94 cmd->max_retries = policy->max_retries;
95 cmd->iteration = 0;
96 cmd->replica = as_command_write_replica(replica);
97 cmd->event_loop = as_event_assign(event_loop);
98 cmd->cluster = cluster;
99 cmd->node = NULL;
100 cmd->ns = pi->ns;
101 cmd->partition = pi->partition;
102 cmd->udata = udata;
103 cmd->parse_results = parse_results;
104 cmd->pipe_listener = pipe_listener;
105 cmd->buf = wcmd->space;
106 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_write_command));
110 cmd->flags = 0;
111 cmd->replica_size = pi->replica_size;
112 cmd->replica_index = 0;
114 wcmd->listener = listener;
115 as_cluster_add_tran(cluster);
116 return cmd;
117}
118
119static inline as_event_command*
121 as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
122 as_policy_replica replica, uint8_t replica_index, bool deserialize, bool heap_rec,
123 uint8_t flags, as_async_record_listener listener, void* udata, as_event_loop* event_loop,
124 as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results,
125 as_latency_type latency_type
126 )
127{
128 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
129 // Then, round up memory size in 4KB increments to reduce fragmentation and to allow socket
130 // read to reuse buffer for small socket write sizes.
131 size_t s = (sizeof(as_async_record_command) + size + AS_AUTHENTICATION_MAX_SIZE + 4095) & ~4095;
132 as_event_command* cmd = (as_event_command*)cf_malloc(s);
134 cmd->total_deadline = policy->total_timeout;
135 cmd->socket_timeout = policy->socket_timeout;
136 cmd->max_retries = policy->max_retries;
137 cmd->iteration = 0;
138 cmd->replica = replica;
139 cmd->event_loop = as_event_assign(event_loop);
140 cmd->cluster = cluster;
141 cmd->node = NULL;
142 cmd->ns = pi->ns;
143 cmd->partition = pi->partition;
144 cmd->udata = udata;
145 cmd->parse_results = parse_results;
146 cmd->pipe_listener = pipe_listener;
147 cmd->buf = rcmd->space;
148 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_record_command));
152 cmd->flags = flags;
153
154 if (deserialize) {
156 }
157
158 if (heap_rec) {
160 }
161
162 cmd->replica_size = pi->replica_size;
163 cmd->replica_index = replica_index;
164 cmd->latency_type = latency_type;
165 rcmd->listener = listener;
166 as_cluster_add_tran(cluster);
167 return cmd;
168}
169
170static inline as_event_command*
172 as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
173 as_policy_replica replica, as_async_value_listener listener, void* udata,
174 as_event_loop* event_loop, as_pipe_listener pipe_listener, size_t size,
175 as_event_parse_results_fn parse_results
176 )
177{
178 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
179 // Then, round up memory size in 4KB increments to reduce fragmentation and to allow socket
180 // read to reuse buffer for small socket write sizes.
181 size_t s = (sizeof(as_async_value_command) + size + AS_AUTHENTICATION_MAX_SIZE + 4095) & ~4095;
182 as_event_command* cmd = (as_event_command*)cf_malloc(s);
184 cmd->total_deadline = policy->total_timeout;
185 cmd->socket_timeout = policy->socket_timeout;
186 cmd->max_retries = policy->max_retries;
187 cmd->iteration = 0;
188 cmd->replica = as_command_write_replica(replica);
189 cmd->event_loop = as_event_assign(event_loop);
190 cmd->cluster = cluster;
191 cmd->node = NULL;
192 cmd->ns = pi->ns;
193 cmd->partition = pi->partition;
194 cmd->udata = udata;
195 cmd->parse_results = parse_results;
196 cmd->pipe_listener = pipe_listener;
197 cmd->buf = vcmd->space;
198 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_value_command));
202 cmd->flags = 0;
203 cmd->replica_size = pi->replica_size;
204 cmd->replica_index = 0;
206 vcmd->listener = listener;
207 as_cluster_add_tran(cluster);
208 return cmd;
209}
210
211static inline as_event_command*
213 as_node* node, const as_policy_info* policy, as_async_info_listener listener, void* udata,
214 as_event_loop* event_loop, size_t size
215 )
216{
217 // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
218 // Then, round up memory size in 1KB increments.
219 size_t s = (sizeof(as_async_info_command) + size + AS_AUTHENTICATION_MAX_SIZE + 1023) & ~1023;
220 as_event_command* cmd = (as_event_command*)cf_malloc(s);
222 cmd->total_deadline = policy->timeout;
223 cmd->socket_timeout = policy->timeout;
224 cmd->max_retries = 1;
225 cmd->iteration = 0;
227 cmd->event_loop = as_event_assign(event_loop);
228 cmd->cluster = node->cluster;
229 cmd->node = node;
230 cmd->ns = NULL;
231 cmd->partition = NULL;
232 cmd->udata = udata;
234 cmd->pipe_listener = NULL;
235 cmd->buf = icmd->space;
236 cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_info_command));
240 cmd->flags = 0;
241 cmd->replica_size = 1;
242 cmd->replica_index = 0;
244 icmd->listener = listener;
246 return cmd;
247}
248
249#ifdef __cplusplus
250} // end extern "C"
251#endif
#define AS_ASYNC_TYPE_VALUE
Definition as_async.h:36
static as_event_command * as_async_write_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, as_async_write_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results)
Definition as_async.h:80
#define AS_ASYNC_TYPE_INFO
Definition as_async.h:40
#define AS_ASYNC_TYPE_RECORD
Definition as_async.h:35
#define AS_ASYNC_TYPE_WRITE
Definition as_async.h:34
static as_event_command * as_async_record_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, uint8_t replica_index, bool deserialize, bool heap_rec, uint8_t flags, as_async_record_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results, as_latency_type latency_type)
Definition as_async.h:120
#define AS_AUTHENTICATION_MAX_SIZE
Definition as_async.h:45
static as_event_command * as_async_info_command_create(as_node *node, const as_policy_info *policy, as_async_info_listener listener, void *udata, as_event_loop *event_loop, size_t size)
Definition as_async.h:212
static as_event_command * as_async_value_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, as_async_value_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results)
Definition as_async.h:171
static void as_cluster_add_tran(as_cluster *cluster)
Definition as_cluster.h:616
static as_policy_replica as_command_write_replica(as_policy_replica replica)
Definition as_command.h:690
bool as_event_command_parse_info(as_event_command *cmd)
#define AS_ASYNC_STATE_UNREGISTERED
#define AS_ASYNC_FLAGS_DESERIALIZE
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
static as_event_loop * as_event_assign(as_event_loop *event_loop)
#define AS_ASYNC_FLAGS_HEAP_REC
#define AS_LATENCY_TYPE_NONE
Definition as_latency.h:36
#define AS_LATENCY_TYPE_WRITE
Definition as_latency.h:32
uint8_t as_latency_type
Definition as_latency.h:29
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
Definition as_listener.h:72
void(* as_async_value_listener)(as_error *err, as_val *val, void *udata, as_event_loop *event_loop)
Definition as_listener.h:61
void(* as_async_write_listener)(as_error *err, void *udata, as_event_loop *event_loop)
Definition as_listener.h:36
void(* as_async_record_listener)(as_error *err, as_record *record, void *udata, as_event_loop *event_loop)
Definition as_listener.h:48
void(* as_async_info_listener)(as_error *err, char *response, void *udata, as_event_loop *event_loop)
Definition as_listener.h:85
#define AS_MESSAGE_TYPE
Definition as_proto.h:38
#define AS_INFO_MESSAGE_TYPE
Definition as_proto.h:36
as_policy_replica
Definition as_policy.h:272
@ AS_POLICY_REPLICA_MASTER
Definition as_policy.h:277
as_event_command command
Definition as_async.h:70
as_async_info_listener listener
Definition as_async.h:71
as_async_record_listener listener
Definition as_async.h:59
as_event_command command
Definition as_async.h:58
as_async_value_listener listener
Definition as_async.h:65
as_event_command command
Definition as_async.h:64
as_event_command command
Definition as_async.h:52
as_async_write_listener listener
Definition as_async.h:53
as_latency_type latency_type
as_event_loop * event_loop
as_pipe_listener pipe_listener
as_event_parse_results_fn parse_results
as_policy_replica replica
struct as_cluster_s * cluster
Definition as_node.h:280
const char * ns
uint32_t socket_timeout
Definition as_policy.h:448
uint32_t total_timeout
Definition as_policy.h:463
uint32_t max_retries
Definition as_policy.h:485
uint32_t timeout
Definition as_policy.h:1402