All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
as_event_internal.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2024 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_admin.h>
22#include <aerospike/as_queue.h>
23#include <aerospike/as_proto.h>
24#include <aerospike/as_socket.h>
25#include <citrusleaf/cf_ll.h>
26#include <pthread.h>
27
28#if defined(AS_USE_LIBEV)
29#include <ev.h>
30#elif defined(AS_USE_LIBUV)
31#include <uv.h>
32struct as_uv_tls;
33#elif defined(AS_USE_LIBEVENT)
34#include <event2/event.h>
35#else
36#endif
37
38#ifdef __cplusplus
39extern "C" {
40#endif
41
42/******************************************************************************
43 * TYPES
44 *****************************************************************************/
45
46#define AS_ASYNC_STATE_UNREGISTERED 0
47#define AS_ASYNC_STATE_REGISTERED 1
48#define AS_ASYNC_STATE_DELAY_QUEUE 2
49#define AS_ASYNC_STATE_CONNECT 3
50#define AS_ASYNC_STATE_TLS_CONNECT 4
51#define AS_ASYNC_STATE_AUTH_WRITE 5
52#define AS_ASYNC_STATE_AUTH_READ_HEADER 6
53#define AS_ASYNC_STATE_AUTH_READ_BODY 7
54#define AS_ASYNC_STATE_COMMAND_WRITE 8
55#define AS_ASYNC_STATE_COMMAND_READ_HEADER 9
56#define AS_ASYNC_STATE_COMMAND_READ_BODY 10
57#define AS_ASYNC_STATE_QUEUE_ERROR 11
58#define AS_ASYNC_STATE_RETRY 12
59
60#define AS_ASYNC_FLAGS_DESERIALIZE 1
61#define AS_ASYNC_FLAGS_READ 2
62#define AS_ASYNC_FLAGS_HAS_TIMER 4
63#define AS_ASYNC_FLAGS_USING_SOCKET_TIMER 8
64#define AS_ASYNC_FLAGS_EVENT_RECEIVED 16
65#define AS_ASYNC_FLAGS_FREE_BUF 32
66#define AS_ASYNC_FLAGS_LINEARIZE 64
67#define AS_ASYNC_FLAGS_HEAP_REC 128
68
69#define AS_ASYNC_AUTH_RETURN_CODE 1
70
71#define AS_EVENT_CONNECTION_COMPLETE 0
72#define AS_EVENT_CONNECTION_PENDING 1
73#define AS_EVENT_CONNECTION_ERROR 2
74
75#define AS_EVENT_QUEUE_INITIAL_CAPACITY 256
76
77struct as_event_command;
79
80typedef struct {
81#if defined(AS_USE_LIBEV)
82 struct ev_io watcher;
83 as_socket socket;
84#elif defined(AS_USE_LIBUV)
85 uv_tcp_t socket;
86 struct as_uv_tls* tls;
87 // Reuse memory for requests, because only one request is active at a time.
88 union {
89 uv_connect_t connect;
90 uv_write_t write;
91 } req;
92 uint64_t last_used;
93#elif defined(AS_USE_LIBEVENT)
94 struct event watcher;
95 as_socket socket;
96#else
97#endif
101
106
111
112typedef void (*as_event_executable) (as_event_loop* event_loop, void* udata);
113typedef bool (*as_event_parse_results_fn) (struct as_event_command* cmd);
114typedef void (*as_event_executor_complete_fn) (struct as_event_executor* executor);
115
116typedef struct as_event_command {
117#if defined(AS_USE_LIBEV)
118 struct ev_timer timer;
119#elif defined(AS_USE_LIBUV)
120 uv_timer_t timer;
121#elif defined(AS_USE_LIBEVENT)
122 struct event timer;
123#else
124#endif
127 uint32_t max_retries;
128 uint32_t iteration;
135 const char* ns;
136 void* partition; // as_partition* or as_partition_shm*
137 void* udata;
140 cf_ll_element pipe_link;
141
142 uint8_t* buf;
143 uint64_t begin; // Used for metrics
145 uint32_t write_offset;
146 uint32_t write_len;
148 uint32_t len;
149 uint32_t pos;
150
151 uint8_t type;
152 uint8_t proto_type;
154 uint8_t state;
155 uint8_t flags;
158 uint8_t replica_index_sc; // Used in batch only.
159
160 struct as_txn* txn;
161 uint8_t* ubuf; // Uncompressed send buffer. Used when compression is enabled.
162 uint32_t ubuf_size;
166
171
188
189/******************************************************************************
190 * COMMON FUNCTIONS
191 *****************************************************************************/
192
195
196void
198
199void
201
202bool
204
205bool
207
208bool
210
211void
213
214void
216
217void
219
220bool
222
223void
225
226void
228
229void
231
232void
234
235void
236as_event_executor_error(as_event_executor* executor, as_error* err, uint32_t command_count);
237
238void
239as_event_executor_cancel(as_event_executor* executor, uint32_t queued_count);
240
241void
243
244void
246
247void
249
250void
252
253void
255
256void
258
259bool
261
262bool
264
265bool
267
268bool
270
271bool
273
274void
276
277void
279
280void
282
283void
285
286/******************************************************************************
287 * IMPLEMENTATION SPECIFIC FUNCTIONS
288 *****************************************************************************/
289
290bool
292
293void
295
296/**
297 * Schedule execution of function on specified event loop.
298 * Command is placed on event loop queue and is never executed directly.
299 */
300bool
302
303void
305
306void
308
309void
311
312/******************************************************************************
313 * LIBEV INLINE FUNCTIONS
314 *****************************************************************************/
315
316#if defined(AS_USE_LIBEV)
317
318void as_ev_timer_cb(struct ev_loop* loop, ev_timer* timer, int revents);
319void as_ev_repeat_cb(struct ev_loop* loop, ev_timer* timer, int revents);
320
321static inline bool
322as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
323{
324 return as_socket_current_trim(conn->socket.last_used, max_socket_idle_ns);
325}
326
327static inline bool
328as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
329{
330 return as_socket_current_tran(conn->socket.last_used, max_socket_idle_ns);
331}
332
333static inline int
335{
336 return as_socket_validate_fd(conn->socket.fd);
337}
338
339static inline void
341{
342 as_socket_close(&conn->socket);
343 cf_free(conn);
344}
345
346static inline void
348{
349 conn->socket.last_used = cf_getns();
350}
351
352static inline void
353as_event_timer_once(as_event_command* cmd, uint64_t timeout)
354{
355 ev_timer_init(&cmd->timer, as_ev_timer_cb, (double)timeout / 1000.0, 0.0);
356 cmd->timer.data = cmd;
357 ev_timer_start(cmd->event_loop->loop, &cmd->timer);
359}
360
361static inline void
362as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
363{
364 ev_init(&cmd->timer, as_ev_repeat_cb);
365 cmd->timer.repeat = (double)repeat / 1000.0;
366 cmd->timer.data = cmd;
367 ev_timer_again(cmd->event_loop->loop, &cmd->timer);
369}
370
371static inline void
373{
374 ev_timer_again(cmd->event_loop->loop, &cmd->timer);
375}
376
377static inline void
379{
380 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
381 ev_timer_stop(cmd->event_loop->loop, &cmd->timer);
382 }
383}
384
385static inline void
387{
388 ev_io_stop(cmd->event_loop->loop, &conn->watcher);
389 conn->watching = 0;
390}
391
392static inline void
394{
395 // This method only needed for libuv pipelined connections.
396}
397
398static inline void
400{
402}
403
404/******************************************************************************
405 * LIBUV INLINE FUNCTIONS
406 *****************************************************************************/
407
408#elif defined(AS_USE_LIBUV)
409
410void as_uv_timer_cb(uv_timer_t* timer);
411void as_uv_repeat_cb(uv_timer_t* timer);
413
414static inline bool
415as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
416{
417 return as_socket_current_trim(conn->last_used, max_socket_idle_ns);
418}
419
420static inline bool
421as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
422{
423 return as_socket_current_tran(conn->last_used, max_socket_idle_ns);
424}
425
426static inline int
428{
429 // Libuv does not have a peek function, so use fd directly.
430 uv_os_fd_t fd;
431
432 if (uv_fileno((uv_handle_t*)&conn->socket, &fd) == 0) {
434 }
435 return -1;
436}
437
438static inline void
440{
441 conn->last_used = cf_getns();
442}
443
444static inline void
445as_event_timer_once(as_event_command* cmd, uint64_t timeout)
446{
447 if (!(cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER)) {
448 uv_timer_init(cmd->event_loop->loop, &cmd->timer);
449 cmd->timer.data = cmd;
450 }
451 uv_timer_start(&cmd->timer, as_uv_timer_cb, timeout, 0);
453}
454
455static inline void
456as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
457{
458 if (!(cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER)) {
459 uv_timer_init(cmd->event_loop->loop, &cmd->timer);
460 cmd->timer.data = cmd;
461 }
462 uv_timer_start(&cmd->timer, as_uv_repeat_cb, repeat, repeat);
464}
465
466static inline void
468{
469 // libuv socket timers automatically repeat.
470}
471
472static inline void
474{
475 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
476 uv_timer_stop(&cmd->timer);
477 }
478}
479
480static inline void
482{
483 // uv_read_stop() will handle case where read is already stopped.
484 // Do not set watching to zero because conn is still initialized and active.
485 // libuv works differently here.
486 uv_read_stop((uv_stream_t*)conn);
487}
488
489static inline void
491{
492 uv_read_stop((uv_stream_t*)conn);
493}
494
495void
496as_uv_timer_closed(uv_handle_t* handle);
497
498static inline void
500{
501 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
502 // libuv requires that cmd can't be freed until timer is closed.
503 uv_close((uv_handle_t*)&cmd->timer, as_uv_timer_closed);
504 }
505 else {
507 }
508}
509
510/******************************************************************************
511 * LIBEVENT INLINE FUNCTIONS
512 *****************************************************************************/
513
514#elif defined(AS_USE_LIBEVENT)
515
516void as_libevent_timer_cb(evutil_socket_t sock, short events, void* udata);
517void as_libevent_repeat_cb(evutil_socket_t sock, short events, void* udata);
518
519static inline bool
520as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
521{
522 return as_socket_current_trim(conn->socket.last_used, max_socket_idle_ns);
523}
524
525static inline bool
526as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
527{
528 return as_socket_current_tran(conn->socket.last_used, max_socket_idle_ns);
529}
530
531static inline int
533{
534 return as_socket_validate_fd(conn->socket.fd);
535}
536
537static inline void
539{
540 as_socket_close(&conn->socket);
541 cf_free(conn);
542}
543
544static inline void
546{
547 conn->socket.last_used = cf_getns();
548}
549
550static inline void
551as_event_timer_once(as_event_command* cmd, uint64_t timeout)
552{
553 evtimer_assign(&cmd->timer, cmd->event_loop->loop, as_libevent_timer_cb, cmd);
554 struct timeval tv;
555 tv.tv_sec = (uint32_t)timeout / 1000;
556 tv.tv_usec = ((uint32_t)timeout % 1000) * 1000;
557 evtimer_add(&cmd->timer, &tv);
559}
560
561static inline void
562as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
563{
564 event_assign(&cmd->timer, cmd->event_loop->loop, -1, EV_PERSIST, as_libevent_repeat_cb, cmd);
565 struct timeval tv;
566 tv.tv_sec = (uint32_t)repeat / 1000;
567 tv.tv_usec = ((uint32_t)repeat % 1000) * 1000;
568 evtimer_add(&cmd->timer, &tv);
570}
571
572static inline void
574{
575 // libevent socket timers automatically repeat.
576}
577
578static inline void
580{
581 if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
582 evtimer_del(&cmd->timer);
583 }
584}
585
586static inline void
588{
589 event_del(&conn->watcher);
590 conn->watching = 0;
591}
592
593static inline void
595{
596 // This method only needed for libuv pipelined connections.
597}
598
599static inline void
601{
603}
604
605/******************************************************************************
606 * EVENT_LIB NOT DEFINED INLINE FUNCTIONS
607 *****************************************************************************/
608
609#else
610
611static inline bool
612as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
613{
614 return false;
615}
616
617static inline bool
618as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
619{
620 return false;
621}
622
623static inline int
625{
626 return -1;
627}
628
629static inline void
633
634static inline void
638
639static inline void
641{
642}
643
644static inline void
646{
647}
648
649static inline void
653
654static inline void
658
659static inline void
663
664static inline void
668
669static inline void
673
674#endif
675
676/******************************************************************************
677 * COMMON INLINE FUNCTIONS
678 *****************************************************************************/
679
680static inline as_event_loop*
682{
683 // Assign event loop using round robin distribution if not specified.
684 return event_loop ? event_loop : as_event_loop_get();
685}
686
687static inline void
689{
690 // Authentication write buffer is always located after command write buffer.
691 uint8_t* buf = (uint8_t*)cmd + cmd->write_offset + cmd->write_len;
692 uint32_t len = as_authenticate_set(cmd->cluster, session, buf);
693 cmd->len = cmd->write_len + len;
694 cmd->pos = cmd->write_len;
695}
696
697static inline void
699{
700 // Authenticate read buffer uses the standard read buffer (buf).
701 cmd->len = sizeof(as_proto);
702 cmd->pos = 0;
704}
705
706static inline bool
708{
709 // Authenticate read buffer uses the standard read buffer (buf).
710 as_proto* proto = (as_proto*)cmd->buf;
711
712 if (! as_event_proto_parse_auth(cmd, proto)) {
713 return false;
714 }
715
716 cmd->len = (uint32_t)proto->sz;
717 cmd->pos = 0;
719 return true;
720}
721
722static inline void
724{
725 cmd->len = cmd->write_len;
726 cmd->pos = 0;
727}
728
729static inline void
730as_async_conn_pool_init(as_async_conn_pool* pool, uint32_t min_size, uint32_t max_size)
731{
732 as_queue_init(&pool->queue, sizeof(void*), max_size);
733 pool->min_size = min_size;
734 pool->limit = max_size;
735 pool->opened = 0;
736 pool->closed = 0;
737}
738
739static inline bool
741{
742 if (pool->queue.total >= pool->limit) {
743 return false;
744 }
745 pool->queue.total++;
746 return true;
747}
748
749static inline bool
751{
752 if (pool->queue.total > pool->limit) {
753 return false;
754 }
755 return as_queue_push_head(&pool->queue, &conn);
756}
757
758static inline bool
760{
761 if (pool->queue.total > pool->limit) {
762 return false;
763 }
764 return as_queue_push(&pool->queue, &conn);
765}
766
767static inline void
774
775static inline void
782
783static inline void
785{
786 as_async_conn_pool* pool = cmd->pipe_listener != NULL ?
787 &cmd->node->pipe_conn_pools[cmd->event_loop->index] :
789
791}
792
793static inline void
795{
796 as_event_connection* conn = cmd->conn;
797
798 if (conn) {
799 if (conn->watching > 0) {
800 as_event_stop_watcher(cmd, conn);
801 as_event_release_connection(conn, pool);
803 }
804 else {
805 cf_free(conn);
807 pool->closed++;
808 }
809 }
810}
811
812static inline bool
814{
815 if (cmd->pipe_listener) {
816 return false;
817 }
818
819 as_event_stop_watcher(cmd, cmd->conn);
821 return as_event_command_retry(cmd, false);
822}
823
824static inline uint8_t*
826{
827 // Return saved uncompressed buffer when compression is enabled.
828 // Return command buffer when compression is not enabled.
829 return cmd->ubuf ? cmd->ubuf : (uint8_t*)cmd + cmd->write_offset;
830}
831
832static inline void
834{
835 // Use this function to free async commands that were never started.
836 if (cmd->node) {
837 as_node_release(cmd->node);
838 }
839
840 if (cmd->ubuf) {
841 cf_free(cmd->ubuf);
842 }
843
844 cf_free(cmd);
845}
846
847static inline void
849{
850 as_queue_destroy(&event_loop->queue);
851 as_queue_destroy(&event_loop->delay_queue);
852 as_queue_destroy(&event_loop->pipe_cb_queue);
853 pthread_mutex_destroy(&event_loop->lock);
854}
855
856#ifdef __cplusplus
857} // end extern "C"
858#endif
uint32_t as_authenticate_set(struct as_cluster_s *cluster, struct as_session_s *session, uint8_t *buffer)
static void as_node_incr_error_rate(as_node *node)
Definition as_cluster.h:720
bool as_event_command_parse_success_failure(as_event_command *cmd)
static void as_event_decr_conn(as_event_command *cmd)
bool as_event_command_retry(as_event_command *cmd, bool timeout)
void as_event_notify_error(as_event_command *cmd, as_error *err)
static void as_event_set_write(as_event_command *cmd)
bool as_event_command_parse_info(as_event_command *cmd)
void as_event_error_callback(as_event_command *cmd, as_error *err)
static bool as_event_conn_current_tran(as_event_connection *conn, uint64_t max_socket_idle_ns)
void as_event_query_complete(as_event_command *cmd)
static void as_event_stop_read(as_event_connection *conn)
static void as_event_timer_once(as_event_command *cmd, uint64_t timeout)
bool as_event_decompress(as_event_command *cmd)
bool as_event_proto_parse(as_event_command *cmd, as_proto *proto)
static bool as_event_set_auth_parse_header(as_event_command *cmd)
void(* as_event_executable)(as_event_loop *event_loop, void *udata)
void as_event_parse_error(as_event_command *cmd, as_error *err)
static void as_event_command_release(as_event_command *cmd)
void as_event_executor_cancel(as_event_executor *executor, uint32_t queued_count)
static bool as_async_conn_pool_incr_total(as_async_conn_pool *pool)
#define AS_ASYNC_FLAGS_HAS_TIMER
void as_event_socket_timeout(as_event_command *cmd)
static void as_event_set_auth_write(as_event_command *cmd, as_session *session)
bool as_event_command_parse_result(as_event_command *cmd)
void as_event_command_schedule(as_event_command *cmd)
#define AS_ASYNC_FLAGS_USING_SOCKET_TIMER
bool as_event_command_parse_header(as_event_command *cmd)
static void as_event_release_connection(as_event_connection *conn, as_async_conn_pool *pool)
void as_event_socket_error(as_event_command *cmd, as_error *err)
static void as_event_release_async_connection(as_event_command *cmd)
void as_event_connector_success(as_event_command *cmd)
static void as_event_command_destroy(as_event_command *cmd)
as_status as_event_command_execute(as_event_command *cmd, as_error *err)
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
#define AS_ASYNC_STATE_AUTH_READ_BODY
static void as_event_stop_watcher(as_event_command *cmd, as_event_connection *conn)
void as_event_connect(as_event_command *cmd, as_async_conn_pool *pool)
bool as_event_proto_parse_auth(as_event_command *cmd, as_proto *proto)
void(* as_event_executor_complete_fn)(struct as_event_executor *executor)
static as_event_loop * as_event_assign(as_event_loop *event_loop)
void as_event_connection_complete(as_event_command *cmd)
static uint8_t * as_event_get_ubuf(as_event_command *cmd)
void as_event_node_destroy(as_node *node)
void as_event_executor_error(as_event_executor *executor, as_error *err, uint32_t command_count)
static void as_event_timer_stop(as_event_command *cmd)
bool as_event_create_loop(as_event_loop *event_loop)
void as_event_response_error(as_event_command *cmd, as_error *err)
static bool as_event_socket_retry(as_event_command *cmd)
void as_event_process_timer(as_event_command *cmd)
static void as_event_loop_destroy(as_event_loop *event_loop)
#define AS_ASYNC_STATE_AUTH_READ_HEADER
static void as_event_close_connection(as_event_connection *conn)
static bool as_async_conn_pool_push_head(as_async_conn_pool *pool, as_event_connection *conn)
static void as_event_connection_timeout(as_event_command *cmd, as_async_conn_pool *pool)
void as_event_executor_complete(as_event_executor *executor)
static void as_event_set_conn_last_used(as_event_connection *conn)
static void as_event_timer_repeat(as_event_command *cmd, uint64_t repeat)
void as_event_command_free(as_event_command *cmd)
void as_event_execute_retry(as_event_command *cmd)
void as_event_close_cluster(as_cluster *cluster)
static bool as_async_conn_pool_push(as_async_conn_pool *pool, as_event_connection *conn)
void as_event_create_connections(as_node *node, as_async_conn_pool *pools)
bool as_event_execute(as_event_loop *event_loop, as_event_executable executable, void *udata)
static bool as_event_conn_current_trim(as_event_connection *conn, uint64_t max_socket_idle_ns)
void as_event_command_write_start(as_event_command *cmd)
void as_event_batch_complete(as_event_command *cmd)
static void as_async_conn_pool_init(as_async_conn_pool *pool, uint32_t min_size, uint32_t max_size)
static void as_event_timer_again(as_event_command *cmd)
static void as_event_set_auth_read_header(as_event_command *cmd)
static int as_event_conn_validate(as_event_connection *conn)
void as_event_total_timeout(as_event_command *cmd)
void as_event_register_external_loop(as_event_loop *event_loop)
void as_event_response_complete(as_event_command *cmd)
bool as_event_command_parse_deadline(as_event_command *cmd)
uint8_t as_latency_type
Definition as_latency.h:29
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
Definition as_listener.h:72
static void as_node_release(as_node *node)
Definition as_node.h:510
as_proto proto
Definition as_proto.h:0
AS_EXTERN bool as_queue_push_head(as_queue *queue, const void *ptr)
AS_EXTERN void as_queue_destroy(as_queue *queue)
static void as_queue_decr_total(as_queue *queue)
Definition as_queue.h:217
AS_EXTERN bool as_queue_init(as_queue *queue, uint32_t item_size, uint32_t capacity)
AS_EXTERN bool as_queue_push(as_queue *queue, const void *ptr)
int as_socket_validate_fd(as_socket_fd fd)
static bool as_socket_current_tran(uint64_t last_used, uint64_t max_socket_idle_ns)
Definition as_socket.h:197
void as_socket_close(as_socket *sock)
#define as_socket_fd
Definition as_socket.h:32
static bool as_socket_current_trim(uint64_t last_used, uint64_t max_socket_idle_ns)
Definition as_socket.h:207
as_status
Definition as_status.h:30
static as_event_loop * as_event_loop_get(void)
Definition as_event.h:385
as_policy_replica
Definition as_policy.h:273
uint32_t min_size
Definition as_node.h:173
uint32_t opened
Definition as_node.h:183
uint32_t closed
Definition as_node.h:188
struct as_event_command * cmd
as_event_connection base
struct as_txn * txn
as_latency_type latency_type
as_event_loop * event_loop
cf_ll_element pipe_link
as_pipe_listener pipe_listener
as_event_parse_results_fn parse_results
as_policy_replica replica
as_event_connection * conn
as_event_state * event_state
as_event_executable executable
as_event_loop * event_loop
pthread_mutex_t lock
struct as_event_command ** commands
as_event_executor_complete_fn complete_fn
pthread_mutex_t lock
Definition as_event.h:124
as_queue queue
Definition as_event.h:125
void * loop
Definition as_event.h:120
as_queue pipe_cb_queue
Definition as_event.h:127
as_queue delay_queue
Definition as_event.h:126
uint32_t index
Definition as_event.h:129
as_async_conn_pool * pipe_conn_pools
Definition as_node.h:279
as_async_conn_pool * async_conn_pools
Definition as_node.h:274
uint32_t total
Definition as_queue.h:63
as_pipe_listener listener