All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
as_event.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_error.h>
20#include <aerospike/as_queue.h>
21#include <pthread.h>
22
23/**
24 * @defgroup async_events Event Framework Abstraction
25 *
26 * Generic asynchronous events abstraction. Designed to support multiple event libraries.
27 * Only one library is supported per build.
28 */
29#if defined(AS_USE_LIBEV) || defined(AS_USE_LIBUV) || defined(AS_USE_LIBEVENT)
30#define AS_EVENT_LIB_DEFINED 1
31#endif
32
33#if defined(AS_USE_LIBEV)
34#include <ev.h>
35#elif defined(AS_USE_LIBUV)
36#include <uv.h>
37#elif defined(AS_USE_LIBEVENT)
38#include <event2/event_struct.h>
39#include <aerospike/as_vector.h>
40#else
41#endif
42
43#ifdef __cplusplus
44extern "C" {
45#endif
46
47/******************************************************************************
48 * TYPES
49 *****************************************************************************/
50
51/**
52 * Asynchronous event loop configuration.
53 *
54 * @ingroup async_events
55 */
56typedef struct as_policy_event {
57 /**
58 * Maximum number of async commands that can be processed in each event loop at any point in
59 * time. Each executing non-pipeline async command requires a socket connection. Consuming too
60 * many sockets can negatively affect application reliability and performance. If the user does
61 * not limit async command count in their application, this field should be used to enforce a
62 * limit internally in the client.
63 *
64 * If this limit is reached, the next async command will be placed on the event loop's delay
65 * queue for later execution. If this limit is zero, all async commands will be executed
66 * immediately and the delay queue will not be used.
67 *
68 * If defined, a reasonable value is 40. The optimal value will depend on cpu count, cpu speed,
69 * network bandwitdh and the number of event loops employed.
70 *
71 * Default: 0 (execute all async commands immediately)
72 */
74
75 /**
76 * Maximum number of async commands that can be stored in each event loop's delay queue for
77 * later execution. Queued commands consume memory, but they do not consume sockets. This
78 * limit should be defined when it's possible that the application executes so many async
79 * commands that memory could be exhausted.
80 *
81 * If this limit is reached, the next async command will be rejected with error code
82 * AEROSPIKE_ERR_ASYNC_QUEUE_FULL. If this limit is zero, all async commands will be accepted
83 * into the delay queue.
84 *
85 * The optimal value will depend on your application's magnitude of command bursts and the
86 * amount of memory available to store commands.
87 *
88 * Default: 0 (no delay queue limit)
89 */
91
92 /**
93 * Initial capacity of each event loop's delay queue. The delay queue can resize beyond this
94 * initial capacity.
95 *
96 * Default: 256 (if delay queue is used)
97 */
100
101/**
102 * Generic asynchronous event loop abstraction. There is one event loop per thread.
103 * Event loops can be created by the client, or be referenced to externally created event loops.
104 *
105 * @ingroup async_events
106 */
107typedef struct as_event_loop {
108#if defined(AS_USE_LIBEV)
109 struct ev_loop* loop;
110 struct ev_async wakeup;
111#elif defined(AS_USE_LIBUV)
112 uv_loop_t* loop;
113 uv_async_t* wakeup;
114#elif defined(AS_USE_LIBEVENT)
115 struct event_base* loop;
116 struct event wakeup;
117 struct event trim;
118 as_vector clusters;
119#else
120 void* loop;
121#endif
122
124 pthread_mutex_t lock;
128 pthread_t thread;
129 uint32_t index;
133 // Count of consecutive errors occurring before event loop registration.
134 // Used to prevent deep recursion.
135 uint32_t errors;
139
140/******************************************************************************
141 * GLOBAL VARIABLES
142 *****************************************************************************/
143
146AS_EXTERN extern uint32_t as_event_loop_size;
148
149/******************************************************************************
150 * PUBLIC FUNCTIONS
151 *****************************************************************************/
152
153/**
154 * Initialize event loop configuration variables.
155 *
156 * @ingroup async_events
157 */
158static inline void
160{
161 policy->max_commands_in_process = 0;
162 policy->max_commands_in_queue = 0;
163 policy->queue_initial_capacity = 256;
164}
165
166/**
167 * Create new aerospike internal event loops with default event policy. These event loops are used
168 * exclusively for aerospike database commands and are not shared with the application for other
169 * tasks. If shared event loops are desired, use as_event_set_external_loop_capacity() and
170 * as_event_set_external_loop() instead.
171 *
172 * This function must be called before aerospike_connect().
173 *
174 * @param capacity Number of event loops to create.
175 * @return Event loop array.
176 *
177 * @ingroup async_events
178 */
180as_event_create_loops(uint32_t capacity);
181
182/**
183 * Create new aerospike internal event loops with specified event policy. These event loops are used
184 * exclusively for aerospike database commands and are not shared with the application for other
185 * tasks. If shared event loops are desired, use as_event_set_external_loop_capacity() and
186 * as_set_external_event_loop() instead.
187 *
188 * This function must be called before aerospike_connect().
189 *
190 * @param err The as_error to be populated if an error occurs.
191 * @param policy Event loop configuration. Pass in NULL for default configuration.
192 * @param capacity Number of event loops to create.
193 * @param event_loops Created event loops. Pass in NULL if event loops do not need to be retrieved.
194 * @return AEROSPIKE_OK If successful. Otherwise an error.
195 *
196 * @ingroup async_events
197 */
199as_create_event_loops(as_error* err, as_policy_event* policy, uint32_t capacity, as_event_loop** event_loops);
200
201/**
202 * Set the number of aerospike external event loops. This method should be called when the
203 * application wants to share event loops with the client. This reduces resource usage and
204 * can increase performance.
205 *
206 * This method is used in conjunction with as_event_set_external_loop() or
207 * as_set_external_event_loop() to fully define the the external loop to the client and obtain a
208 * reference to the client's event loop abstraction.
209 *
210 * ~~~~~~~~~~{.c}
211 * struct {
212 * pthread_t thread;
213 * struct ev_loop* loop;
214 * as_event_loop* as_loop;
215 * } my_loop;
216 *
217 * static void* my_loop_worker_thread(void* udata)
218 * {
219 * struct my_loop* myloop = udata;
220 * myloop->loop = ev_loop_new(EVFLAG_AUTO);
221 * myloop->as_loop = as_event_set_external_loop(myloop->loop);
222 * ev_loop(myloop->loop, 0);
223 * ev_loop_destroy(myloop->loop);
224 * return NULL;
225 * }
226 *
227 * int capacity = 8;
228 * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
229 * as_event_set_external_loop_capacity(capacity);
230 *
231 * for (int i = 0; i < capacity; i++) {
232 * struct my_loop* myloop = &loops[i];
233 * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
234 * }
235 * ~~~~~~~~~~
236 *
237 * @param capacity Number of externally created event loops.
238 * @return True if all external loops were initialized.
239 *
240 * @ingroup async_events
241 */
242AS_EXTERN bool
244
245/**
246 * Register an aerospike external event loop with the client with default event policy.
247 *
248 * This method should be called when the calling program wants to share event loops with the client.
249 * This reduces resource usage and can increase performance.
250 *
251 * This method must be called in the same thread as the event loop that is being registered.
252 *
253 * This method is used in conjunction with as_event_set_external_loop_capacity() to fully define
254 * the external loop to the client and obtain a reference to the client's event loop abstraction.
255 *
256 * ~~~~~~~~~~{.c}
257 * struct {
258 * pthread_t thread;
259 * struct ev_loop* loop;
260 * as_event_loop* as_loop;
261 * } my_loop;
262 *
263 * static void* my_loop_worker_thread(void* udata)
264 * {
265 * struct my_loop* myloop = udata;
266 * myloop->loop = ev_loop_new(EVFLAG_AUTO);
267 * myloop->as_loop = as_event_set_external_loop(myloop->loop);
268 * ev_loop(myloop->loop, 0);
269 * ev_loop_destroy(myloop->loop);
270 * return NULL;
271 * }
272 *
273 * int capacity = 8;
274 * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
275 * as_event_set_external_loop_capacity(capacity);
276 *
277 * for (int i = 0; i < capacity; i++) {
278 * struct my_loop* myloop = &loops[i];
279 * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
280 * }
281 * ~~~~~~~~~~
282 *
283 * @param loop External event loop.
284 * @return Client's generic event loop abstraction that is used in client async commands.
285 * Returns NULL if external loop capacity would be exceeded.
286 *
287 * @ingroup async_events
288 */
291
292/**
293 * Register an aerospike external event loop with the client with specified event policy.
294 *
295 * This method should be called when the calling program wants to share event loops with the client.
296 * This reduces resource usage and can increase performance.
297 *
298 * This method must be called in the same thread as the event loop that is being registered.
299 *
300 * This method is used in conjunction with as_event_set_external_loop_capacity() to fully define
301 * the external loop to the client and obtain a reference to the client's event loop abstraction.
302 *
303 * ~~~~~~~~~~{.c}
304 * struct {
305 * pthread_t thread;
306 * struct ev_loop* loop;
307 * as_event_loop* as_loop;
308 * } my_loop;
309 *
310 * static void* my_loop_worker_thread(void* udata)
311 * {
312 * struct my_loop* myloop = udata;
313 * myloop->loop = ev_loop_new(EVFLAG_AUTO);
314 *
315 * as_policy_event policy;
316 * as_policy_event_init(&policy);
317 * policy.max_commands_in_process = 30;
318 *
319 * as_error err;
320 * if (as_set_external_event_loop(&err, &policy, myloop->loop, &myloop->as_loop) != AEROSPIKE_OK) {
321 * printf("Failed to set event loop: %d %s\n, err.code, err.message);
322 * return NULL;
323 * }
324 * myloop->as_loop = as_event_set_external_loop(myloop->loop);
325 * ev_loop(myloop->loop, 0);
326 * ev_loop_destroy(myloop->loop);
327 * return NULL;
328 * }
329 *
330 * int capacity = 8;
331 * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
332 * as_event_set_external_loop_capacity(capacity);
333 *
334 * for (int i = 0; i < capacity; i++) {
335 * struct my_loop* myloop = &loops[i];
336 * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
337 * }
338 * ~~~~~~~~~~
339 *
340 * @param err The as_error to be populated if an error occurs.
341 * @param policy Event loop configuration. Pass in NULL for default configuration.
342 * @param loop External event loop.
343 * @param event_loop Created event loop.
344 * @return AEROSPIKE_OK If successful. Otherwise an error.
345 *
346 * @ingroup async_events
347 */
350
351/**
352 * Find client's event loop abstraction given the external event loop.
353 *
354 * @param loop External event loop.
355 * @return Client's generic event loop abstraction that is used in client async commands.
356 * Returns NULL if loop not found.
357 *
358 * @ingroup async_events
359 */
362
363/**
364 * Retrieve event loop by array index.
365 *
366 * @param index Event loop array index.
367 * @return Client's generic event loop abstraction that is used in client async commands.
368 *
369 * @ingroup async_events
370 */
371static inline as_event_loop*
376
377/**
378 * Retrieve a random event loop using round robin distribution.
379 *
380 * @return Client's generic event loop abstraction that is used in client async commands.
381 *
382 * @ingroup async_events
383 */
384static inline as_event_loop*
386{
387 // The last event loop points to the first event loop to create a circular linked list.
388 // Not atomic because doesn't need to be exactly accurate.
390 as_event_loop_current = event_loop->next;
391 return event_loop;
392}
393
394/**
395 * Return the approximate number of commands currently being processed on
396 * the event loop. The value is approximate because the call may be from a
397 * different thread than the event loop’s thread and there are no locks or
398 * atomics used.
399 *
400 * @ingroup async_events
401 */
402static inline int
404{
405 return event_loop->pending;
406}
407
408/**
409 * Return the approximate number of commands stored on this event loop's
410 * delay queue that have not been started yet. The value is approximate
411 * because the call may be from a different thread than the event loop’s
412 * thread and there are no locks or atomics used.
413 *
414 * @ingroup async_events
415 */
416static inline uint32_t
418{
419 return as_queue_size(&event_loop->delay_queue);
420}
421
422/**
423 * Close internal event loops and release watchers for internal and external event loops.
424 * The global event loop array will also be destroyed for internal event loops.
425 *
426 * This method should be called once on program shutdown if as_event_create_loops() or
427 * as_event_set_external_loop_capacity() was called.
428 *
429 * The shutdown sequence is slightly different for internal and external event loops.
430 *
431 * Internal:
432 * ~~~~~~~~~~{.c}
433 * as_event_close_loops();
434 * ~~~~~~~~~~
435 *
436 * External:
437 * ~~~~~~~~~~{.c}
438 * as_event_close_loops();
439 * Join on external loop threads.
440 * as_event_destroy_loops();
441 * ~~~~~~~~~~
442 *
443 * @return True if event loop close was successful. If false, as_event_destroy_loops() should
444 * not be called.
445 *
446 * @ingroup async_events
447 */
448AS_EXTERN bool
450
451/**
452 * Close internal event loop and release internal/external event loop watchers.
453 * This optional method can be used instead of as_event_close_loops().
454 * If used, must be called from event loop's thread.
455 */
456AS_EXTERN void
458
459/**
460 * Destroy global event loop array. This function only needs to be called for external
461 * event loops.
462 *
463 * @ingroup async_events
464 */
465AS_EXTERN void
467
468/******************************************************************************
469 * LIBEVENT SINGLE THREAD MODE FUNCTIONS
470 *****************************************************************************/
471
472#if defined(AS_USE_LIBEVENT)
473struct aerospike_s;
474
475/**
476 * Event loop close aerospike listener
477 *
478 * @ingroup async_events
479 */
480typedef void (*as_event_close_listener) (void* udata);
481
482/**
483 * Set flag to signify that all async commands will be created in their associated event loop thread.
484 * If enabled, the client can remove locks associated with sending async commands to the event loop.
485 * This flag is only referenced when running the client with the libevent framework.
486 *
487 * By default, async single thread mode is false.
488 *
489 * @ingroup async_events
490 */
491static inline void
492as_event_set_single_thread(bool single_thread)
493{
494 as_event_single_thread = single_thread;
495}
496
497/**
498 * Register aerospike instance with event loop.
499 * Should only be called in libevent single-thread mode.
500 * The call must occur in the event loop's thread.
501 *
502 * @ingroup async_events
503 */
504AS_EXTERN void
505as_event_loop_register_aerospike(as_event_loop* event_loop, struct aerospike_s* as);
506
507/**
508 * Unregister and free aerospike instance resources associated with event loop.
509 * Should only be called in libevent single-thread mode.
510 * The call must occur in the event loop's thread.
511 *
512 * Listener is called when all aerospike instance async commands have completed
513 * on this event loop. Do not call aerospike_close() until listeners return on all
514 * event loops.
515 *
516 * @ingroup async_events
517 */
518AS_EXTERN void
519as_event_loop_close_aerospike(
520 as_event_loop* event_loop, struct aerospike_s* as, as_event_close_listener listener, void* udata
521 );
522
523#endif
524
525#ifdef __cplusplus
526} // end extern "C"
527#endif
AS_EXTERN void as_event_close_loop(as_event_loop *event_loop)
AS_EXTERN as_event_loop * as_event_loop_current
AS_EXTERN uint32_t as_event_loop_size
AS_EXTERN as_event_loop * as_event_loops
AS_EXTERN bool as_event_single_thread
static uint32_t as_queue_size(as_queue *queue)
Definition as_queue.h:114
as_status
Definition as_status.h:30
#define AS_EXTERN
Definition as_std.h:25
AS_EXTERN bool as_event_set_external_loop_capacity(uint32_t capacity)
AS_EXTERN as_status as_create_event_loops(as_error *err, as_policy_event *policy, uint32_t capacity, as_event_loop **event_loops)
AS_EXTERN bool as_event_close_loops(void)
AS_EXTERN as_event_loop * as_event_loop_find(void *loop)
AS_EXTERN as_event_loop * as_event_create_loops(uint32_t capacity)
static void as_policy_event_init(as_policy_event *policy)
Definition as_event.h:159
static as_event_loop * as_event_loop_get_by_index(uint32_t index)
Definition as_event.h:372
static uint32_t as_event_loop_get_queue_size(as_event_loop *event_loop)
Definition as_event.h:417
AS_EXTERN void as_event_destroy_loops(void)
static int as_event_loop_get_process_size(as_event_loop *event_loop)
Definition as_event.h:403
AS_EXTERN as_event_loop * as_event_set_external_loop(void *loop)
static as_event_loop * as_event_loop_get(void)
Definition as_event.h:385
AS_EXTERN as_status as_set_external_event_loop(as_error *err, as_policy_event *policy, void *loop, as_event_loop **event_loop)
pthread_mutex_t lock
Definition as_event.h:124
bool pipe_cb_calling
Definition as_event.h:137
int max_commands_in_process
Definition as_event.h:131
uint32_t max_commands_in_queue
Definition as_event.h:130
as_queue queue
Definition as_event.h:125
pthread_t thread
Definition as_event.h:128
struct as_event_loop * next
Definition as_event.h:123
bool using_delay_queue
Definition as_event.h:136
void * loop
Definition as_event.h:120
as_queue pipe_cb_queue
Definition as_event.h:127
as_queue delay_queue
Definition as_event.h:126
uint32_t errors
Definition as_event.h:135
uint32_t index
Definition as_event.h:129
uint32_t queue_initial_capacity
Definition as_event.h:98
uint32_t max_commands_in_queue
Definition as_event.h:90
int max_commands_in_process
Definition as_event.h:73