Loading...
Searching...
No Matches
as_queue_mt.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2019 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_queue.h>
20#include <aerospike/as_std.h>
21#include <pthread.h>
22
23#ifdef __cplusplus
24extern "C" {
25#endif
26
27/******************************************************************************
28 * TYPES
29 ******************************************************************************/
30
31/**
32 * A thread-safe multi-threaded dynamic queue implementation.
33 * as_queue_mt is not part of the generic as_val family.
34 */
35typedef struct as_queue_mt_s {
36 /**
37 * The queue.
38 */
40
41 /**
42 * The lock.
43 */
44 pthread_mutex_t lock;
45
46 /**
47 * The notify/wait condition variable.
48 */
49 pthread_cond_t cond;
51
52/******************************************************************************
53 * MACROS
54 ******************************************************************************/
55
56#define AS_QUEUE_FOREVER -1
57#define AS_QUEUE_NOWAIT 0
58
59/**
60 * Initialize a stack allocated as_queue_mt, with item storage on the stack.
61 * as_queue_mt_inita() will transfer stack memory to the heap if a resize is
62 * required.
63 */
64#define as_queue_mt_inita(__q, __item_size, __capacity)\
65as_queue_inita(&(__q)->queue, __item_size, __capacity);\
66pthread_mutex_init(&(__q)->lock, NULL);\
67pthread_cond_init(&(__q)->cond, NULL);
68
69/******************************************************************************
70 * FUNCTIONS
71 ******************************************************************************/
72
73/**
74 * Initialize a stack allocated as_queue, with item storage on the heap.
75 */
76AS_EXTERN bool
77as_queue_mt_init(as_queue_mt* queue, uint32_t item_size, uint32_t capacity);
78
79/**
80 * Create a heap allocated as_queue, with item storage on the heap.
81 */
83as_queue_mt_create(uint32_t item_size, uint32_t capacity);
84
85/**
86 * Release queue memory.
87 */
88static inline void
90{
91 pthread_cond_destroy(&queue->cond);
92 pthread_mutex_destroy(&queue->lock);
93 as_queue_destroy(&queue->queue);
94}
95
96/**
97 * Get the number of elements currently in the queue.
98 */
99static inline uint32_t
101{
102 pthread_mutex_lock(&queue->lock);
103 uint32_t size = as_queue_size(&queue->queue);
104 pthread_mutex_unlock(&queue->lock);
105 return size;
106}
107
108/**
109 * Is queue empty?
110 */
111static inline bool
113{
114 pthread_mutex_lock(&queue->lock);
115 bool empty = as_queue_empty(&queue->queue);
116 pthread_mutex_unlock(&queue->lock);
117 return empty;
118}
119
120/**
121 * Push to the tail of the queue.
122 */
123static inline bool
124as_queue_mt_push(as_queue_mt* queue, const void* ptr)
125{
126 pthread_mutex_lock(&queue->lock);
127 bool status = as_queue_push(&queue->queue, ptr);
128
129 if (status) {
130 pthread_cond_signal(&queue->cond);
131 }
132 pthread_mutex_unlock(&queue->lock);
133 return status;
134}
135
136/**
137 * Push element on the queue only if size < capacity.
138 */
139static inline bool
140as_queue_mt_push_limit(as_queue_mt* queue, const void* ptr)
141{
142 pthread_mutex_lock(&queue->lock);
143 bool status = as_queue_push_limit(&queue->queue, ptr);
144
145 if (status) {
146 pthread_cond_signal(&queue->cond);
147 }
148 pthread_mutex_unlock(&queue->lock);
149 return status;
150}
151
152/**
153 * Push to the front of the queue.
154 */
155static inline bool
156as_queue_mt_push_head(as_queue_mt* queue, const void* ptr)
157{
158 pthread_mutex_lock(&queue->lock);
159 bool status = as_queue_push_head(&queue->queue, ptr);
160
161 if (status) {
162 pthread_cond_signal(&queue->cond);
163 }
164 pthread_mutex_unlock(&queue->lock);
165 return status;
166}
167
168/**
169 * Push to the front of the queue only if size < capacity.
170 */
171static inline bool
173{
174 pthread_mutex_lock(&queue->lock);
175 bool status = as_queue_push_head_limit(&queue->queue, ptr);
176
177 if (status) {
178 pthread_cond_signal(&queue->cond);
179 }
180 pthread_mutex_unlock(&queue->lock);
181 return status;
182}
183
184/**
185 * Pop from the head of the queue.
186 *
187 * If the queue is empty, wait_ms is the maximum time in milliseconds to wait
188 * for an available entry. If wait_ms is AS_QUEUE_FOREVER (-1), the wait time will be forever.
189 * If wait_ms is AS_QUEUE_NOWAIT (0), the function will not wait.
190 *
191 * The return value is true if an entry was successfully retrieved.
192 */
193AS_EXTERN bool
194as_queue_mt_pop(as_queue_mt* queue, void* ptr, int wait_ms);
195
196/**
197 * Pop from the tail of the queue.
198 *
199 * If the queue is empty, wait_ms is the maximum time in milliseconds to wait
200 * for an available entry. If wait_ms is AS_QUEUE_FOREVER (-1), the wait time will be forever.
201 * If wait_ms is AS_QUEUE_NOWAIT (0), the function will not wait.
202 *
203 * The return value is true if an entry was successfully retrieved.
204 */
205AS_EXTERN bool
206as_queue_mt_pop_tail(as_queue_mt* queue, void* ptr, int wait_ms);
207
208#ifdef __cplusplus
209} // end extern "C"
210#endif
AS_EXTERN bool as_queue_push_limit(as_queue *queue, const void *ptr)
AS_EXTERN bool as_queue_push_head(as_queue *queue, const void *ptr)
AS_EXTERN void as_queue_destroy(as_queue *queue)
static bool as_queue_empty(as_queue *queue)
Definition as_queue.h:123
AS_EXTERN bool as_queue_push_head_limit(as_queue *queue, const void *ptr)
static uint32_t as_queue_size(as_queue *queue)
Definition as_queue.h:114
AS_EXTERN bool as_queue_push(as_queue *queue, const void *ptr)
static bool as_queue_mt_push_head_limit(as_queue_mt *queue, const void *ptr)
AS_EXTERN bool as_queue_mt_pop_tail(as_queue_mt *queue, void *ptr, int wait_ms)
static uint32_t as_queue_mt_size(as_queue_mt *queue)
static bool as_queue_mt_empty(as_queue_mt *queue)
AS_EXTERN bool as_queue_mt_pop(as_queue_mt *queue, void *ptr, int wait_ms)
AS_EXTERN as_queue_mt * as_queue_mt_create(uint32_t item_size, uint32_t capacity)
static bool as_queue_mt_push_limit(as_queue_mt *queue, const void *ptr)
AS_EXTERN bool as_queue_mt_init(as_queue_mt *queue, uint32_t item_size, uint32_t capacity)
static void as_queue_mt_destroy(as_queue_mt *queue)
Definition as_queue_mt.h:89
static bool as_queue_mt_push_head(as_queue_mt *queue, const void *ptr)
static bool as_queue_mt_push(as_queue_mt *queue, const void *ptr)
#define AS_EXTERN
Definition as_std.h:25
as_queue queue
Definition as_queue_mt.h:39
pthread_cond_t cond
Definition as_queue_mt.h:49
pthread_mutex_t lock
Definition as_queue_mt.h:44