Loading...
Searching...
No Matches
as_event.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_error.h>
20#include <aerospike/as_queue.h>
21#include <pthread.h>
22
23/**
24 * @defgroup async_events Event Framework Abstraction
25 *
26 * Generic asynchronous events abstraction. Designed to support multiple event libraries.
27 * Only one library is supported per build.
28 */
29#if defined(AS_USE_LIBEV) || defined(AS_USE_LIBUV) || defined(AS_USE_LIBEVENT)
30#define AS_EVENT_LIB_DEFINED 1
31#endif
32
33#if defined(AS_USE_LIBEV)
34#include <ev.h>
35#elif defined(AS_USE_LIBUV)
36#include <uv.h>
37#elif defined(AS_USE_LIBEVENT)
38#include <event2/event_struct.h>
39#include <aerospike/as_vector.h>
40#else
41#endif
42
43#ifdef __cplusplus
44extern "C" {
45#endif
46
47/******************************************************************************
48 * TYPES
49 *****************************************************************************/
50
51/**
52 * Asynchronous event loop configuration.
53 *
54 * @ingroup async_events
55 */
56typedef struct as_policy_event {
57 /**
58 * Maximum number of async commands that can be processed in each event loop at any point in
59 * time. Each executing non-pipeline async command requires a socket connection. Consuming too
60 * many sockets can negatively affect application reliability and performance. If the user does
61 * not limit async command count in their application, this field should be used to enforce a
62 * limit internally in the client.
63 *
64 * If this limit is reached, the next async command will be placed on the event loop's delay
65 * queue for later execution. If this limit is zero, all async commands will be executed
66 * immediately and the delay queue will not be used.
67 *
68 * If defined, a reasonable value is 40. The optimal value will depend on cpu count, cpu speed,
69 * network bandwitdh and the number of event loops employed.
70 *
71 * Default: 0 (execute all async commands immediately)
72 */
74
75 /**
76 * Maximum number of async commands that can be stored in each event loop's delay queue for
77 * later execution. Queued commands consume memory, but they do not consume sockets. This
78 * limit should be defined when it's possible that the application executes so many async
79 * commands that memory could be exhausted.
80 *
81 * If this limit is reached, the next async command will be rejected with error code
82 * AEROSPIKE_ERR_ASYNC_QUEUE_FULL. If this limit is zero, all async commands will be accepted
83 * into the delay queue.
84 *
85 * The optimal value will depend on your application's magnitude of command bursts and the
86 * amount of memory available to store commands.
87 *
88 * Default: 0 (no delay queue limit)
89 */
91
92 /**
93 * Initial capacity of each event loop's delay queue. The delay queue can resize beyond this
94 * initial capacity.
95 *
96 * Default: 256 (if delay queue is used)
97 */
100
101/**
102 * Generic asynchronous event loop abstraction. There is one event loop per thread.
103 * Event loops can be created by the client, or be referenced to externally created event loops.
104 *
105 * @ingroup async_events
106 */
107typedef struct as_event_loop {
108#if defined(AS_USE_LIBEV)
109 struct ev_loop* loop;
110 struct ev_async wakeup;
111#elif defined(AS_USE_LIBUV)
112 uv_loop_t* loop;
113 uv_async_t* wakeup;
114#elif defined(AS_USE_LIBEVENT)
115 struct event_base* loop;
116 struct event wakeup;
117 struct event trim;
118 as_vector clusters;
119#else
120 void* loop;
121#endif
122
124 pthread_mutex_t lock;
128 pthread_t thread;
129 uint32_t index;
133 // Count of consecutive errors occurring before event loop registration.
134 // Used to prevent deep recursion.
135 uint32_t errors;
139
140/******************************************************************************
141 * GLOBAL VARIABLES
142 *****************************************************************************/
143
146AS_EXTERN extern uint32_t as_event_loop_size;
148
149/******************************************************************************
150 * PUBLIC FUNCTIONS
151 *****************************************************************************/
152
153/**
154 * Initialize event loop configuration variables.
155 *
156 * @ingroup async_events
157 */
158static inline void
160{
161 policy->max_commands_in_process = 0;
162 policy->max_commands_in_queue = 0;
163 policy->queue_initial_capacity = 256;
164}
165
166/**
167 * Create new event loops with default event policy.
168 *
169 * This method should only be called when async client commands will be used and the calling program
170 * itself is not async. If this method is used, it must be called before aerospike_connect().
171 *
172 * @param capacity Number of event loops to create.
173 * @return Event loop array.
174 *
175 * @ingroup async_events
176 */
178as_event_create_loops(uint32_t capacity);
179
180/**
181 * Create new event loops with specified event policy.
182 *
183 * This method should only be called when async client commands will be used and the calling program
184 * itself is not async. If this method is used, it must be called before aerospike_connect().
185 *
186 * @param err The as_error to be populated if an error occurs.
187 * @param policy Event loop configuration. Pass in NULL for default configuration.
188 * @param capacity Number of event loops to create.
189 * @param event_loops Created event loops. Pass in NULL if event loops do not need to be retrieved.
190 * @return AEROSPIKE_OK If successful. Otherwise an error.
191 *
192 * @ingroup async_events
193 */
195as_create_event_loops(as_error* err, as_policy_event* policy, uint32_t capacity, as_event_loop** event_loops);
196
197/**
198 * Set the number of externally created event loops. This method should be called when the
199 * calling program wants to share event loops with the client. This reduces resource usage and
200 * can increase performance.
201 *
202 * This method is used in conjunction with as_event_set_external_loop() to fully define the
203 * the external loop to the client and obtain a reference the client's event loop abstraction.
204 *
205 * ~~~~~~~~~~{.c}
206 * struct {
207 * pthread_t thread;
208 * struct ev_loop* loop;
209 * as_event_loop* as_loop;
210 * } my_loop;
211 *
212 * static void* my_loop_worker_thread(void* udata)
213 * {
214 * struct my_loop* myloop = udata;
215 * myloop->loop = ev_loop_new(EVFLAG_AUTO);
216 * myloop->as_loop = as_event_set_external_loop(myloop->loop);
217 * ev_loop(myloop->loop, 0);
218 * ev_loop_destroy(myloop->loop);
219 * return NULL;
220 * }
221 *
222 * int capacity = 8;
223 * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
224 * as_event_set_external_loop_capacity(capacity);
225 *
226 * for (int i = 0; i < capacity; i++) {
227 * struct my_loop* myloop = &loops[i];
228 * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
229 * }
230 * ~~~~~~~~~~
231 *
232 * @param capacity Number of externally created event loops.
233 * @return True if all external loops were initialized.
234 *
235 * @ingroup async_events
236 */
237AS_EXTERN bool
239
240/**
241 * Register an external event loop with the client with default event policy.
242 *
243 * This method should be called when the calling program wants to share event loops with the client.
244 * This reduces resource usage and can increase performance.
245 *
246 * This method must be called in the same thread as the event loop that is being registered.
247 *
248 * This method is used in conjunction with as_event_set_external_loop_capacity() to fully define
249 * the external loop to the client and obtain a reference the client's event loop abstraction.
250 *
251 * ~~~~~~~~~~{.c}
252 * struct {
253 * pthread_t thread;
254 * struct ev_loop* loop;
255 * as_event_loop* as_loop;
256 * } my_loop;
257 *
258 * static void* my_loop_worker_thread(void* udata)
259 * {
260 * struct my_loop* myloop = udata;
261 * myloop->loop = ev_loop_new(EVFLAG_AUTO);
262 * myloop->as_loop = as_event_set_external_loop(myloop->loop);
263 * ev_loop(myloop->loop, 0);
264 * ev_loop_destroy(myloop->loop);
265 * return NULL;
266 * }
267 *
268 * int capacity = 8;
269 * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
270 * as_event_set_external_loop_capacity(capacity);
271 *
272 * for (int i = 0; i < capacity; i++) {
273 * struct my_loop* myloop = &loops[i];
274 * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
275 * }
276 * ~~~~~~~~~~
277 *
278 * @param loop External event loop.
279 * @return Client's generic event loop abstraction that is used in client async commands.
280 * Returns NULL if external loop capacity would be exceeded.
281 *
282 * @ingroup async_events
283 */
286
287/**
288 * Register an external event loop with the client with specified event policy.
289 *
290 * This method should be called when the calling program wants to share event loops with the client.
291 * This reduces resource usage and can increase performance.
292 *
293 * This method must be called in the same thread as the event loop that is being registered.
294 *
295 * This method is used in conjunction with as_event_set_external_loop_capacity() to fully define
296 * the external loop to the client and obtain a reference the client's event loop abstraction.
297 *
298 * ~~~~~~~~~~{.c}
299 * struct {
300 * pthread_t thread;
301 * struct ev_loop* loop;
302 * as_event_loop* as_loop;
303 * } my_loop;
304 *
305 * static void* my_loop_worker_thread(void* udata)
306 * {
307 * struct my_loop* myloop = udata;
308 * myloop->loop = ev_loop_new(EVFLAG_AUTO);
309 *
310 * as_policy_event policy;
311 * as_policy_event_init(&policy);
312 * policy.max_commands_in_process = 30;
313 *
314 * as_error err;
315 * if (as_set_external_event_loop(&err, &policy, myloop->loop, &myloop->as_loop) != AEROSPIKE_OK) {
316 * printf("Failed to set event loop: %d %s\n, err.code, err.message);
317 * return NULL;
318 * }
319 * myloop->as_loop = as_event_set_external_loop(myloop->loop);
320 * ev_loop(myloop->loop, 0);
321 * ev_loop_destroy(myloop->loop);
322 * return NULL;
323 * }
324 *
325 * int capacity = 8;
326 * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
327 * as_event_set_external_loop_capacity(capacity);
328 *
329 * for (int i = 0; i < capacity; i++) {
330 * struct my_loop* myloop = &loops[i];
331 * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
332 * }
333 * ~~~~~~~~~~
334 *
335 * @param err The as_error to be populated if an error occurs.
336 * @param policy Event loop configuration. Pass in NULL for default configuration.
337 * @param loop External event loop.
338 * @param event_loop Created event loop.
339 * @return AEROSPIKE_OK If successful. Otherwise an error.
340 *
341 * @ingroup async_events
342 */
345
346/**
347 * Find client's event loop abstraction given the external event loop.
348 *
349 * @param loop External event loop.
350 * @return Client's generic event loop abstraction that is used in client async commands.
351 * Returns NULL if loop not found.
352 *
353 * @ingroup async_events
354 */
357
358/**
359 * Retrieve event loop by array index.
360 *
361 * @param index Event loop array index.
362 * @return Client's generic event loop abstraction that is used in client async commands.
363 *
364 * @ingroup async_events
365 */
366static inline as_event_loop*
371
372/**
373 * Retrieve a random event loop using round robin distribution.
374 *
375 * @return Client's generic event loop abstraction that is used in client async commands.
376 *
377 * @ingroup async_events
378 */
379static inline as_event_loop*
381{
382 // The last event loop points to the first event loop to create a circular linked list.
383 // Not atomic because doesn't need to be exactly accurate.
385 as_event_loop_current = event_loop->next;
386 return event_loop;
387}
388
389/**
390 * Return the approximate number of commands currently being processed on
391 * the event loop. The value is approximate because the call may be from a
392 * different thread than the event loop’s thread and there are no locks or
393 * atomics used.
394 *
395 * @ingroup async_events
396 */
397static inline int
399{
400 return event_loop->pending;
401}
402
403/**
404 * Return the approximate number of commands stored on this event loop's
405 * delay queue that have not been started yet. The value is approximate
406 * because the call may be from a different thread than the event loop’s
407 * thread and there are no locks or atomics used.
408 *
409 * @ingroup async_events
410 */
411static inline uint32_t
413{
414 return as_queue_size(&event_loop->delay_queue);
415}
416
417/**
418 * Close internal event loops and release watchers for internal and external event loops.
419 * The global event loop array will also be destroyed for internal event loops.
420 *
421 * This method should be called once on program shutdown if as_event_create_loops() or
422 * as_event_set_external_loop_capacity() was called.
423 *
424 * The shutdown sequence is slightly different for internal and external event loops.
425 *
426 * Internal:
427 * ~~~~~~~~~~{.c}
428 * as_event_close_loops();
429 * ~~~~~~~~~~
430 *
431 * External:
432 * ~~~~~~~~~~{.c}
433 * as_event_close_loops();
434 * Join on external loop threads.
435 * as_event_destroy_loops();
436 * ~~~~~~~~~~
437 *
438 * @return True if event loop close was successful. If false, as_event_destroy_loops() should
439 * not be called.
440 *
441 * @ingroup async_events
442 */
443AS_EXTERN bool
445
446/**
447 * Close internal event loop and release internal/external event loop watchers.
448 * This optional method can be used instead of as_event_close_loops().
449 * If used, must be called from event loop's thread.
450 */
451AS_EXTERN void
453
454/**
455 * Destroy global event loop array. This function only needs to be called for external
456 * event loops.
457 *
458 * @ingroup async_events
459 */
460AS_EXTERN void
462
463/******************************************************************************
464 * LIBEVENT SINGLE THREAD MODE FUNCTIONS
465 *****************************************************************************/
466
467#if defined(AS_USE_LIBEVENT)
468struct aerospike_s;
469
470/**
471 * Event loop close aerospike listener
472 *
473 * @ingroup async_events
474 */
475typedef void (*as_event_close_listener) (void* udata);
476
477/**
478 * Set flag to signify that all async commands will be created in their associated event loop thread.
479 * If enabled, the client can remove locks associated with sending async commands to the event loop.
480 * This flag is only referenced when running the client with the libevent framework.
481 *
482 * By default, async single thread mode is false.
483 *
484 * @ingroup async_events
485 */
486static inline void
487as_event_set_single_thread(bool single_thread)
488{
489 as_event_single_thread = single_thread;
490}
491
492/**
493 * Register aerospike instance with event loop.
494 * Should only be called in libevent single-thread mode.
495 * The call must occur in the event loop's thread.
496 *
497 * @ingroup async_events
498 */
499AS_EXTERN void
500as_event_loop_register_aerospike(as_event_loop* event_loop, struct aerospike_s* as);
501
502/**
503 * Unregister and free aerospike instance resources associated with event loop.
504 * Should only be called in libevent single-thread mode.
505 * The call must occur in the event loop's thread.
506 *
507 * Listener is called when all aerospike instance async commands have completed
508 * on this event loop. Do not call aerospike_close() until listeners return on all
509 * event loops.
510 *
511 * @ingroup async_events
512 */
513AS_EXTERN void
514as_event_loop_close_aerospike(
515 as_event_loop* event_loop, struct aerospike_s* as, as_event_close_listener listener, void* udata
516 );
517
518#endif
519
520#ifdef __cplusplus
521} // end extern "C"
522#endif
AS_EXTERN void as_event_close_loop(as_event_loop *event_loop)
AS_EXTERN as_event_loop * as_event_loop_current
AS_EXTERN uint32_t as_event_loop_size
AS_EXTERN as_event_loop * as_event_loops
AS_EXTERN bool as_event_single_thread
static uint32_t as_queue_size(as_queue *queue)
Definition as_queue.h:114
as_status
Definition as_status.h:30
#define AS_EXTERN
Definition as_std.h:25
AS_EXTERN bool as_event_set_external_loop_capacity(uint32_t capacity)
AS_EXTERN as_status as_create_event_loops(as_error *err, as_policy_event *policy, uint32_t capacity, as_event_loop **event_loops)
AS_EXTERN bool as_event_close_loops(void)
AS_EXTERN as_event_loop * as_event_loop_find(void *loop)
AS_EXTERN as_event_loop * as_event_create_loops(uint32_t capacity)
static void as_policy_event_init(as_policy_event *policy)
Definition as_event.h:159
static as_event_loop * as_event_loop_get_by_index(uint32_t index)
Definition as_event.h:367
static uint32_t as_event_loop_get_queue_size(as_event_loop *event_loop)
Definition as_event.h:412
AS_EXTERN void as_event_destroy_loops(void)
static int as_event_loop_get_process_size(as_event_loop *event_loop)
Definition as_event.h:398
AS_EXTERN as_event_loop * as_event_set_external_loop(void *loop)
static as_event_loop * as_event_loop_get(void)
Definition as_event.h:380
AS_EXTERN as_status as_set_external_event_loop(as_error *err, as_policy_event *policy, void *loop, as_event_loop **event_loop)
pthread_mutex_t lock
Definition as_event.h:124
bool pipe_cb_calling
Definition as_event.h:137
int max_commands_in_process
Definition as_event.h:131
uint32_t max_commands_in_queue
Definition as_event.h:130
as_queue queue
Definition as_event.h:125
pthread_t thread
Definition as_event.h:128
struct as_event_loop * next
Definition as_event.h:123
bool using_delay_queue
Definition as_event.h:136
void * loop
Definition as_event.h:120
as_queue pipe_cb_queue
Definition as_event.h:127
as_queue delay_queue
Definition as_event.h:126
uint32_t errors
Definition as_event.h:135
uint32_t index
Definition as_event.h:129
uint32_t queue_initial_capacity
Definition as_event.h:98
uint32_t max_commands_in_queue
Definition as_event.h:90
int max_commands_in_process
Definition as_event.h:73