Loading...
Searching...
No Matches
as_event_internal.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_admin.h>
22#include <aerospike/as_queue.h>
23#include <aerospike/as_proto.h>
24#include <aerospike/as_socket.h>
25#include <citrusleaf/cf_ll.h>
26#include <pthread.h>
27
28#if defined(AS_USE_LIBEV)
29#include <ev.h>
30#elif defined(AS_USE_LIBUV)
31#include <uv.h>
32struct as_uv_tls;
33#elif defined(AS_USE_LIBEVENT)
34#include <event2/event.h>
35#else
36#endif
37
38#ifdef __cplusplus
39extern "C" {
40#endif
41
42/******************************************************************************
43 * TYPES
44 *****************************************************************************/
45
46#define AS_ASYNC_STATE_UNREGISTERED 0
47#define AS_ASYNC_STATE_REGISTERED 1
48#define AS_ASYNC_STATE_DELAY_QUEUE 2
49#define AS_ASYNC_STATE_CONNECT 3
50#define AS_ASYNC_STATE_TLS_CONNECT 4
51#define AS_ASYNC_STATE_AUTH_WRITE 5
52#define AS_ASYNC_STATE_AUTH_READ_HEADER 6
53#define AS_ASYNC_STATE_AUTH_READ_BODY 7
54#define AS_ASYNC_STATE_COMMAND_WRITE 8
55#define AS_ASYNC_STATE_COMMAND_READ_HEADER 9
56#define AS_ASYNC_STATE_COMMAND_READ_BODY 10
57#define AS_ASYNC_STATE_QUEUE_ERROR 11
58#define AS_ASYNC_STATE_RETRY 12
59
60#define AS_ASYNC_FLAGS_DESERIALIZE 1
61#define AS_ASYNC_FLAGS_READ 2
62#define AS_ASYNC_FLAGS_HAS_TIMER 4
63#define AS_ASYNC_FLAGS_USING_SOCKET_TIMER 8
64#define AS_ASYNC_FLAGS_EVENT_RECEIVED 16
65#define AS_ASYNC_FLAGS_FREE_BUF 32
66#define AS_ASYNC_FLAGS_LINEARIZE 64
67#define AS_ASYNC_FLAGS_HEAP_REC 128
68
69#define AS_ASYNC_AUTH_RETURN_CODE 1
70
71#define AS_EVENT_CONNECTION_COMPLETE 0
72#define AS_EVENT_CONNECTION_PENDING 1
73#define AS_EVENT_CONNECTION_ERROR 2
74
75#define AS_EVENT_QUEUE_INITIAL_CAPACITY 256
76
77struct as_event_command;
79
80typedef struct {
81#if defined(AS_USE_LIBEV)
82 struct ev_io watcher;
83 as_socket socket;
84#elif defined(AS_USE_LIBUV)
85 uv_tcp_t socket;
86 struct as_uv_tls* tls;
87 // Reuse memory for requests, because only one request is active at a time.
88 union {
89 uv_connect_t connect;
90 uv_write_t write;
91 } req;
92 uint64_t last_used;
93#elif defined(AS_USE_LIBEVENT)
94 struct event watcher;
95 as_socket socket;
96#else
97#endif
101
106
111
112typedef void (*as_event_executable) (as_event_loop* event_loop, void* udata);
113typedef bool (*as_event_parse_results_fn) (struct as_event_command* cmd);
114typedef void (*as_event_executor_complete_fn) (struct as_event_executor* executor);
115
116typedef struct as_event_command {
117#if defined(AS_USE_LIBEV)
118 struct ev_timer timer;
119#elif defined(AS_USE_LIBUV)
120 uv_timer_t timer;
121#elif defined(AS_USE_LIBEVENT)
122 struct event timer;
123#else
124#endif
127 uint32_t max_retries;
128 uint32_t iteration;
135 const char* ns;
136 void* partition; // as_partition* or as_partition_shm*
137 void* udata;
140 cf_ll_element pipe_link;
141
142 uint8_t* buf;
143 uint64_t begin; // Used for metrics
145 uint32_t write_offset;
146 uint32_t write_len;
148 uint32_t len;
149 uint32_t pos;
150
151 uint8_t type;
152 uint8_t proto_type;
154 uint8_t state;
155 uint8_t flags;
158 uint8_t replica_index_sc; // Used in batch only.
162
167
184
185/******************************************************************************
186 * COMMON FUNCTIONS
187 *****************************************************************************/
188
191
192void
194
195void
197
198bool
200
201bool
203
204bool
206
207void
209
210void
212
213void
215
216bool
218
219void
221
222void
224
225void
227
228void
229as_event_executor_error(as_event_executor* executor, as_error* err, uint32_t command_count);
230
231void
232as_event_executor_cancel(as_event_executor* executor, uint32_t queued_count);
233
234void
236
237void
239
240void
242
243void
245
246void
248
249void
251
252bool
254
255bool
257
258bool
260
261bool
263
264void
266
267void
269
270void
272
273void
275
276/******************************************************************************
277 * IMPLEMENTATION SPECIFIC FUNCTIONS
278 *****************************************************************************/
279
280bool
282
283void
285
286/**
287 * Schedule execution of function on specified event loop.
288 * Command is placed on event loop queue and is never executed directly.
289 */
290bool
292
293void
295
296void
298
299void
301
302/******************************************************************************
303 * LIBEV INLINE FUNCTIONS
304 *****************************************************************************/
305
306#if defined(AS_USE_LIBEV)
307
308void as_ev_timer_cb(struct ev_loop* loop, ev_timer* timer, int revents);
309void as_ev_repeat_cb(struct ev_loop* loop, ev_timer* timer, int revents);
310
311static inline bool
312as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
313{
314 return as_socket_current_trim(conn->socket.last_used, max_socket_idle_ns);
315}
316
317static inline bool
318as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
319{
320 return as_socket_current_tran(conn->socket.last_used, max_socket_idle_ns);
321}
322
323static inline int
325{
326 return as_socket_validate_fd(conn->socket.fd);
327}
328
329static inline void
331{
332 as_socket_close(&conn->socket);
333 cf_free(conn);
334}
335
336static inline void
338{
339 conn->socket.last_used = cf_getns();
340}
341
342static inline void
343as_event_timer_once(as_event_command* cmd, uint64_t timeout)
344{
345 ev_timer_init(&cmd->timer, as_ev_timer_cb, (double)timeout / 1000.0, 0.0);
346 cmd->timer.data = cmd;
347 ev_timer_start(cmd->event_loop->loop, &cmd->timer);
349}
350
351static inline void
352as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
353{
354 ev_init(&cmd->timer, as_ev_repeat_cb);
355 cmd->timer.repeat = (double)repeat / 1000.0;
356 cmd->timer.data = cmd;
357 ev_timer_again(cmd->event_loop->loop, &cmd->timer);
359}
360
361static inline void
363{
364 ev_timer_again(cmd->event_loop->loop, &cmd->timer);
365}
366
367static inline void
369{
370 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
371 ev_timer_stop(cmd->event_loop->loop, &cmd->timer);
372 }
373}
374
375static inline void
377{
378 ev_io_stop(cmd->event_loop->loop, &conn->watcher);
379 conn->watching = 0;
380}
381
382static inline void
384{
385 // This method only needed for libuv pipelined connections.
386}
387
388static inline void
390{
392}
393
394/******************************************************************************
395 * LIBUV INLINE FUNCTIONS
396 *****************************************************************************/
397
398#elif defined(AS_USE_LIBUV)
399
400void as_uv_timer_cb(uv_timer_t* timer);
401void as_uv_repeat_cb(uv_timer_t* timer);
403
404static inline bool
405as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
406{
407 return as_socket_current_trim(conn->last_used, max_socket_idle_ns);
408}
409
410static inline bool
411as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
412{
413 return as_socket_current_tran(conn->last_used, max_socket_idle_ns);
414}
415
416static inline int
418{
419 // Libuv does not have a peek function, so use fd directly.
420 uv_os_fd_t fd;
421
422 if (uv_fileno((uv_handle_t*)&conn->socket, &fd) == 0) {
424 }
425 return -1;
426}
427
428static inline void
430{
431 conn->last_used = cf_getns();
432}
433
434static inline void
435as_event_timer_once(as_event_command* cmd, uint64_t timeout)
436{
437 if (!(cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER)) {
438 uv_timer_init(cmd->event_loop->loop, &cmd->timer);
439 cmd->timer.data = cmd;
440 }
441 uv_timer_start(&cmd->timer, as_uv_timer_cb, timeout, 0);
443}
444
445static inline void
446as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
447{
448 if (!(cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER)) {
449 uv_timer_init(cmd->event_loop->loop, &cmd->timer);
450 cmd->timer.data = cmd;
451 }
452 uv_timer_start(&cmd->timer, as_uv_repeat_cb, repeat, repeat);
454}
455
456static inline void
458{
459 // libuv socket timers automatically repeat.
460}
461
462static inline void
464{
465 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
466 uv_timer_stop(&cmd->timer);
467 }
468}
469
470static inline void
472{
473 // uv_read_stop() will handle case where read is already stopped.
474 // Do not set watching to zero because conn is still initialized and active.
475 // libuv works differently here.
476 uv_read_stop((uv_stream_t*)conn);
477}
478
479static inline void
481{
482 uv_read_stop((uv_stream_t*)conn);
483}
484
485void
486as_uv_timer_closed(uv_handle_t* handle);
487
488static inline void
490{
491 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
492 // libuv requires that cmd can't be freed until timer is closed.
493 uv_close((uv_handle_t*)&cmd->timer, as_uv_timer_closed);
494 }
495 else {
497 }
498}
499
500/******************************************************************************
501 * LIBEVENT INLINE FUNCTIONS
502 *****************************************************************************/
503
504#elif defined(AS_USE_LIBEVENT)
505
506void as_libevent_timer_cb(evutil_socket_t sock, short events, void* udata);
507void as_libevent_repeat_cb(evutil_socket_t sock, short events, void* udata);
508
509static inline bool
510as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
511{
512 return as_socket_current_trim(conn->socket.last_used, max_socket_idle_ns);
513}
514
515static inline bool
516as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
517{
518 return as_socket_current_tran(conn->socket.last_used, max_socket_idle_ns);
519}
520
521static inline int
523{
524 return as_socket_validate_fd(conn->socket.fd);
525}
526
527static inline void
529{
530 as_socket_close(&conn->socket);
531 cf_free(conn);
532}
533
534static inline void
536{
537 conn->socket.last_used = cf_getns();
538}
539
540static inline void
541as_event_timer_once(as_event_command* cmd, uint64_t timeout)
542{
543 evtimer_assign(&cmd->timer, cmd->event_loop->loop, as_libevent_timer_cb, cmd);
544 struct timeval tv;
545 tv.tv_sec = (uint32_t)timeout / 1000;
546 tv.tv_usec = ((uint32_t)timeout % 1000) * 1000;
547 evtimer_add(&cmd->timer, &tv);
549}
550
551static inline void
552as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
553{
554 event_assign(&cmd->timer, cmd->event_loop->loop, -1, EV_PERSIST, as_libevent_repeat_cb, cmd);
555 struct timeval tv;
556 tv.tv_sec = (uint32_t)repeat / 1000;
557 tv.tv_usec = ((uint32_t)repeat % 1000) * 1000;
558 evtimer_add(&cmd->timer, &tv);
560}
561
562static inline void
564{
565 // libevent socket timers automatically repeat.
566}
567
568static inline void
570{
571 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
572 evtimer_del(&cmd->timer);
573 }
574}
575
576static inline void
578{
579 event_del(&conn->watcher);
580 conn->watching = 0;
581}
582
583static inline void
585{
586 // This method only needed for libuv pipelined connections.
587}
588
589static inline void
591{
593}
594
595/******************************************************************************
596 * EVENT_LIB NOT DEFINED INLINE FUNCTIONS
597 *****************************************************************************/
598
599#else
600
601static inline bool
602as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
603{
604 return false;
605}
606
607static inline bool
608as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
609{
610 return false;
611}
612
613static inline int
615{
616 return -1;
617}
618
619static inline void
623
624static inline void
628
629static inline void
631{
632}
633
634static inline void
636{
637}
638
639static inline void
643
644static inline void
648
649static inline void
653
654static inline void
658
659static inline void
663
664#endif
665
666/******************************************************************************
667 * COMMON INLINE FUNCTIONS
668 *****************************************************************************/
669
670static inline as_event_loop*
672{
673 // Assign event loop using round robin distribution if not specified.
674 return event_loop ? event_loop : as_event_loop_get();
675}
676
677static inline void
679{
680 // Authentication write buffer is always located after command write buffer.
681 uint8_t* buf = (uint8_t*)cmd + cmd->write_offset + cmd->write_len;
682 uint32_t len = as_authenticate_set(cmd->cluster, session, buf);
683 cmd->len = cmd->write_len + len;
684 cmd->pos = cmd->write_len;
685}
686
687static inline void
689{
690 // Authenticate read buffer uses the standard read buffer (buf).
691 cmd->len = sizeof(as_proto);
692 cmd->pos = 0;
694}
695
696static inline bool
698{
699 // Authenticate read buffer uses the standard read buffer (buf).
700 as_proto* proto = (as_proto*)cmd->buf;
701
702 if (! as_event_proto_parse_auth(cmd, proto)) {
703 return false;
704 }
705
706 cmd->len = (uint32_t)proto->sz;
707 cmd->pos = 0;
709 return true;
710}
711
712static inline void
714{
715 cmd->len = cmd->write_len;
716 cmd->pos = 0;
717}
718
719static inline void
720as_async_conn_pool_init(as_async_conn_pool* pool, uint32_t min_size, uint32_t max_size)
721{
722 as_queue_init(&pool->queue, sizeof(void*), max_size);
723 pool->min_size = min_size;
724 pool->limit = max_size;
725 pool->opened = 0;
726 pool->closed = 0;
727}
728
729static inline bool
731{
732 if (pool->queue.total >= pool->limit) {
733 return false;
734 }
735 pool->queue.total++;
736 return true;
737}
738
739static inline bool
741{
742 if (pool->queue.total > pool->limit) {
743 return false;
744 }
745 return as_queue_push_head(&pool->queue, &conn);
746}
747
748static inline bool
750{
751 if (pool->queue.total > pool->limit) {
752 return false;
753 }
754 return as_queue_push(&pool->queue, &conn);
755}
756
757static inline void
764
765static inline void
772
773static inline void
775{
776 as_async_conn_pool* pool = cmd->pipe_listener != NULL ?
777 &cmd->node->pipe_conn_pools[cmd->event_loop->index] :
779
781}
782
783static inline void
785{
786 as_event_connection* conn = cmd->conn;
787
788 if (conn) {
789 if (conn->watching > 0) {
790 as_event_stop_watcher(cmd, conn);
791 as_event_release_connection(conn, pool);
793 }
794 else {
795 cf_free(conn);
797 pool->closed++;
798 }
799 }
800}
801
802static inline bool
804{
805 if (cmd->pipe_listener) {
806 return false;
807 }
808
809 as_event_stop_watcher(cmd, cmd->conn);
811 return as_event_command_retry(cmd, false);
812}
813
814static inline void
816{
817 // Use this function to free batch/scan/query commands that were never started.
818 as_node_release(cmd->node);
819 cf_free(cmd);
820}
821
822static inline void
824{
825 as_queue_destroy(&event_loop->queue);
826 as_queue_destroy(&event_loop->delay_queue);
827 as_queue_destroy(&event_loop->pipe_cb_queue);
828 pthread_mutex_destroy(&event_loop->lock);
829}
830
831#ifdef __cplusplus
832} // end extern "C"
833#endif
uint32_t as_authenticate_set(struct as_cluster_s *cluster, struct as_session_s *session, uint8_t *buffer)
static void as_node_incr_error_rate(as_node *node)
Definition as_cluster.h:709
bool as_event_command_parse_success_failure(as_event_command *cmd)
static void as_event_decr_conn(as_event_command *cmd)
bool as_event_command_retry(as_event_command *cmd, bool timeout)
void as_event_notify_error(as_event_command *cmd, as_error *err)
static void as_event_set_write(as_event_command *cmd)
bool as_event_command_parse_info(as_event_command *cmd)
void as_event_error_callback(as_event_command *cmd, as_error *err)
static bool as_event_conn_current_tran(as_event_connection *conn, uint64_t max_socket_idle_ns)
void as_event_query_complete(as_event_command *cmd)
static void as_event_stop_read(as_event_connection *conn)
static void as_event_timer_once(as_event_command *cmd, uint64_t timeout)
bool as_event_decompress(as_event_command *cmd)
bool as_event_proto_parse(as_event_command *cmd, as_proto *proto)
static bool as_event_set_auth_parse_header(as_event_command *cmd)
void(* as_event_executable)(as_event_loop *event_loop, void *udata)
void as_event_parse_error(as_event_command *cmd, as_error *err)
static void as_event_command_release(as_event_command *cmd)
void as_event_executor_cancel(as_event_executor *executor, uint32_t queued_count)
static bool as_async_conn_pool_incr_total(as_async_conn_pool *pool)
#define AS_ASYNC_FLAGS_HAS_TIMER
void as_event_socket_timeout(as_event_command *cmd)
static void as_event_set_auth_write(as_event_command *cmd, as_session *session)
bool as_event_command_parse_result(as_event_command *cmd)
void as_event_command_schedule(as_event_command *cmd)
#define AS_ASYNC_FLAGS_USING_SOCKET_TIMER
bool as_event_command_parse_header(as_event_command *cmd)
static void as_event_release_connection(as_event_connection *conn, as_async_conn_pool *pool)
void as_event_socket_error(as_event_command *cmd, as_error *err)
static void as_event_release_async_connection(as_event_command *cmd)
void as_event_connector_success(as_event_command *cmd)
static void as_event_command_destroy(as_event_command *cmd)
as_status as_event_command_execute(as_event_command *cmd, as_error *err)
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
#define AS_ASYNC_STATE_AUTH_READ_BODY
static void as_event_stop_watcher(as_event_command *cmd, as_event_connection *conn)
void as_event_connect(as_event_command *cmd, as_async_conn_pool *pool)
bool as_event_proto_parse_auth(as_event_command *cmd, as_proto *proto)
void(* as_event_executor_complete_fn)(struct as_event_executor *executor)
static as_event_loop * as_event_assign(as_event_loop *event_loop)
void as_event_connection_complete(as_event_command *cmd)
void as_event_node_destroy(as_node *node)
void as_event_executor_error(as_event_executor *executor, as_error *err, uint32_t command_count)
static void as_event_timer_stop(as_event_command *cmd)
bool as_event_create_loop(as_event_loop *event_loop)
void as_event_response_error(as_event_command *cmd, as_error *err)
static bool as_event_socket_retry(as_event_command *cmd)
void as_event_process_timer(as_event_command *cmd)
static void as_event_loop_destroy(as_event_loop *event_loop)
#define AS_ASYNC_STATE_AUTH_READ_HEADER
static void as_event_close_connection(as_event_connection *conn)
static bool as_async_conn_pool_push_head(as_async_conn_pool *pool, as_event_connection *conn)
static void as_event_connection_timeout(as_event_command *cmd, as_async_conn_pool *pool)
void as_event_executor_complete(as_event_executor *executor)
static void as_event_set_conn_last_used(as_event_connection *conn)
static void as_event_timer_repeat(as_event_command *cmd, uint64_t repeat)
void as_event_command_free(as_event_command *cmd)
void as_event_execute_retry(as_event_command *cmd)
void as_event_close_cluster(as_cluster *cluster)
static bool as_async_conn_pool_push(as_async_conn_pool *pool, as_event_connection *conn)
void as_event_create_connections(as_node *node, as_async_conn_pool *pools)
bool as_event_execute(as_event_loop *event_loop, as_event_executable executable, void *udata)
static bool as_event_conn_current_trim(as_event_connection *conn, uint64_t max_socket_idle_ns)
void as_event_command_write_start(as_event_command *cmd)
void as_event_batch_complete(as_event_command *cmd)
static void as_async_conn_pool_init(as_async_conn_pool *pool, uint32_t min_size, uint32_t max_size)
static void as_event_timer_again(as_event_command *cmd)
static void as_event_set_auth_read_header(as_event_command *cmd)
static int as_event_conn_validate(as_event_connection *conn)
void as_event_total_timeout(as_event_command *cmd)
void as_event_register_external_loop(as_event_loop *event_loop)
uint8_t as_latency_type
Definition as_latency.h:29
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
Definition as_listener.h:72
static void as_node_release(as_node *node)
Definition as_node.h:527
as_proto proto
Definition as_proto.h:0
AS_EXTERN bool as_queue_push_head(as_queue *queue, const void *ptr)
AS_EXTERN void as_queue_destroy(as_queue *queue)
static void as_queue_decr_total(as_queue *queue)
Definition as_queue.h:217
AS_EXTERN bool as_queue_init(as_queue *queue, uint32_t item_size, uint32_t capacity)
AS_EXTERN bool as_queue_push(as_queue *queue, const void *ptr)
int as_socket_validate_fd(as_socket_fd fd)
static bool as_socket_current_tran(uint64_t last_used, uint64_t max_socket_idle_ns)
Definition as_socket.h:197
void as_socket_close(as_socket *sock)
#define as_socket_fd
Definition as_socket.h:32
static bool as_socket_current_trim(uint64_t last_used, uint64_t max_socket_idle_ns)
Definition as_socket.h:207
as_status
Definition as_status.h:30
static as_event_loop * as_event_loop_get(void)
Definition as_event.h:380
as_policy_replica
Definition as_policy.h:272
uint32_t min_size
Definition as_node.h:190
uint32_t opened
Definition as_node.h:200
uint32_t closed
Definition as_node.h:205
struct as_event_command * cmd
as_event_connection base
as_latency_type latency_type
as_event_loop * event_loop
cf_ll_element pipe_link
as_pipe_listener pipe_listener
as_event_parse_results_fn parse_results
as_policy_replica replica
as_event_connection * conn
as_event_state * event_state
as_event_executable executable
as_event_loop * event_loop
pthread_mutex_t lock
struct as_event_command ** commands
as_event_executor_complete_fn complete_fn
pthread_mutex_t lock
Definition as_event.h:124
as_queue queue
Definition as_event.h:125
void * loop
Definition as_event.h:120
as_queue pipe_cb_queue
Definition as_event.h:127
as_queue delay_queue
Definition as_event.h:126
uint32_t index
Definition as_event.h:129
as_async_conn_pool * pipe_conn_pools
Definition as_node.h:296
as_async_conn_pool * async_conn_pools
Definition as_node.h:291
uint32_t total
Definition as_queue.h:63
as_pipe_listener listener