Loading...
Searching...
No Matches
aerospike_query.h
Go to the documentation of this file.
1/*
2 * Copyright 2008-2023 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19/**
20 * @defgroup query_operations Query Operations
21 * @ingroup client_operations
22 *
23 * The Aerospike Query Operations provide the ability to query data in the
24 * Aerospike database. The queries can optionally be performed on secondary indexes,
25 * which have been created in the database.
26 *
27 * ## Usage
28 *
29 * Before you can execute a query, you first need to build a query using
30 * as_query. See as_query for details on building queries.
31 *
32 * Once you have a query defined, then you can execute the query.
33 * When aerospike_query_foreach() is executed, it will process the results
34 * and create records on the stack. Because the records are on the stack,
35 * they will only be available within the context of the callback function.
36 *
37 * ## Walk-through
38 *
39 * First, we define a query using as_query. The query will be for the "test"
40 * namespace and "demo" set. We will add a where predicate on "bin2", on which
41 * we have already created a secondary index.
42 *
43 * ~~~~~~~~~~{.c}
44 * as_query query;
45 * as_query_init(&query, "test", "demo");
46 *
47 * as_query_where_init(&query, 1);
48 * as_query_where(&query, "bin2", as_integer_equals(100));
49 * ~~~~~~~~~~
50 *
51 * Now that we have a query defined, we want to execute it using
52 * aerospike_query_foreach().
53 *
54 * ~~~~~~~~~~{.c}
55 * if (aerospike_query_foreach(&as, &err, NULL, &query, callback, NULL) != AEROSPIKE_OK) {
56 * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
57 * }
58 * ~~~~~~~~~~
59 *
60 * The callback provided to the function above is implemented as:
61 *
62 * ~~~~~~~~~~{.c}
63 * bool callback(const as_val* val, void* udata)
64 * {
65 * if (!val) {
66 * return false; // Query complete.
67 * }
68 *
69 * as_record* rec = as_record_fromval(val);
70 * // Process record
71 * // Do not call as_record_destroy() because the calling function will do that for you.
72 * return true;
73 * }
74 * ~~~~~~~~~~
75 *
76 * When you are finished with the query, you should destroy the resources allocated to it.
77 *
78 * ~~~~~~~~~~{.c}
79 * as_query_destroy(&query);
80 * ~~~~~~~~~~
81 */
82
83#include <aerospike/aerospike.h>
84#include <aerospike/as_error.h>
85#include <aerospike/as_event.h>
86#include <aerospike/as_job.h>
87#include <aerospike/as_policy.h>
88#include <aerospike/as_query.h>
89#include <aerospike/as_record.h>
90#include <aerospike/as_status.h>
91#include <aerospike/as_stream.h>
92
93#ifdef __cplusplus
94extern "C" {
95#endif
96
97/******************************************************************************
98 * TYPES
99 *****************************************************************************/
100
101/**
102 * This callback will be called for each value or record returned from a query.
103 * Multiple threads will likely be calling this callback in parallel. Therefore,
104 * your callback implementation should be thread safe.
105 *
106 * A regular foreground query always returns as_record instances:
107 *
108 * ~~~~~~~~~~{.c}
109 * bool callback(const as_val* val, void* udata)
110 * {
111 * if (!val) {
112 * return false; // Query complete.
113 * }
114 *
115 * as_record* rec = as_record_fromval(val);
116 * // Process record
117 * // Do not call as_record_destroy() because the calling function will do that for you.
118 * return true;
119 * }
120 * ~~~~~~~~~~
121 *
122 * An aggregation query using a UDF returns as_val instances. The as_val type depends on
123 * what the UDF returns:
124 *
125 * ~~~~~~~~~~{.c}
126 * bool callback(const as_val* val, void* udata)
127 * {
128 * if (!val) {
129 * return false; // Query complete.
130 * }
131 *
132 * // Ensure UDF returned val is the expected type:
133 * as_integer* i = as_integer_fromval(val);
134 *
135 * if (!i) {
136 * return false;
137 * }
138 *
139 * // Process integer
140 * return true;
141 * }
142 * ~~~~~~~~~~
143 *
144 * @param val The value received from the query.
145 * @param udata User-data provided to the calling function.
146 *
147 * @return `true` to continue to the next value. Otherwise, iteration will end.
148 * @ingroup query_operations
149 */
150typedef bool (*aerospike_query_foreach_callback)(const as_val* val, void* udata);
151
152/**
153 * Asynchronous query user callback. This function is called for each record returned.
154 * This function is also called once when the query completes or an error has occurred.
155 *
156 * @param err This error structure is only populated on command failure. NULL on success.
157 * @param record Returned record. The record will be NULL on final query completion or query
158 * error.
159 * @param udata User data that is forwarded from asynchronous command function.
160 * @param event_loop Event loop that this command was executed on. Use this event loop when
161 * running nested asynchronous commands when single threaded behavior is
162 * desired for the group of commands.
163 *
164 * @return `true` to continue to the next value. Otherwise, the query will end.
165 * @ingroup query_operations
166 */
168 as_error* err, as_record* record, void* udata, as_event_loop* event_loop
169 );
170
171/******************************************************************************
172 * FUNCTIONS
173 *****************************************************************************/
174
175/**
176 * Execute a query and call the callback function for each result item.
177 * Multiple threads will likely be calling the callback in parallel. Therefore,
178 * your callback implementation should be thread safe.
179 *
180 * ~~~~~~~~~~{.c}
181 * bool callback(const as_val* val, void* udata)
182 * {
183 * if (!val) {
184 * return false; // Query complete.
185 * }
186 *
187 * as_record* rec = as_record_fromval(val);
188 * // Process record
189 * // Do not call as_record_destroy() because the calling function will do that for you.
190 * return true;
191 * }
192 *
193 * as_query query;
194 * as_query_init(&query, "test", "demo");
195 * as_query_select(&query, "bin1");
196 * as_query_where(&query, "bin2", as_integer_equals(100));
197 *
198 * if (aerospike_query_foreach(&as, &err, NULL, &query, callback, NULL) != AEROSPIKE_OK) {
199 * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
200 * }
201 * as_query_destroy(&query);
202 * ~~~~~~~~~~
203 *
204 * @param as Aerospike cluster instance.
205 * @param err Error detail structure that is populated if an error occurs.
206 * @param policy Query policy configuration parameters, pass in NULL for default.
207 * @param query Query definition.
208 * @param callback Query callback function called for each result value.
209 * @param udata User-data to be passed to the callback.
210 *
211 * @return AEROSPIKE_OK on success, otherwise an error.
212 * @ingroup query_operations
213 */
216 aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
217 aerospike_query_foreach_callback callback, void* udata
218 );
219
220/**
221 * Query records with a partition filter. Multiple threads will likely be calling the callback
222 * in parallel. Therefore, your callback implementation should be thread safe.
223 * Requires server version 6.0+.
224 *
225 * ~~~~~~~~~~{.c}
226 * bool callback(const as_val* val, void* udata)
227 * {
228 * if (!val) {
229 * return false; // Query complete.
230 * }
231 *
232 * as_record* rec = as_record_fromval(val);
233 * // Process record
234 * // Do not call as_record_destroy() because the calling function will do that for you.
235 * return true;
236 * }
237 *
238 * as_query query;
239 * as_query_init(&query, "test", "demo");
240 * as_query_select(&query, "bin1");
241 * as_query_where(&query, "bin2", as_integer_equals(100));
242 *
243 * as_partition_filter pf;
244 * as_partition_filter_set_range(&pf, 0, 1024);
245 *
246 * if (aerospike_query_partitions(&as, &err, NULL, &query, &pf, callback, NULL) != AEROSPIKE_OK) {
247 * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
248 * }
249 * as_query_destroy(&query);
250 * ~~~~~~~~~~
251 *
252 * @param as Aerospike cluster instance.
253 * @param err Error detail structure that is populated if an error occurs.
254 * @param policy Query policy configuration parameters, pass in NULL for default.
255 * @param query Query definition.
256 * @param pf Partition filter.
257 * @param callback Query callback function called for each result value.
258 * @param udata User-data to be passed to the callback.
259 *
260 * @return AEROSPIKE_OK on success. Otherwise an error occurred.
261 * @ingroup query_operations
262 */
265 aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
267 );
268
269/**
270 * Asynchronously execute a query and call the listener function for each result item.
271 * Standard queries are supported, but aggregation queries are not supported in async mode.
272 *
273 * ~~~~~~~~~~{.c}
274 * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
275 * {
276 * if (err) {
277 * printf("Query failed: %d %s\n", err->code, err->message);
278 * return false;
279 * }
280 *
281 * if (! record) {
282 * printf("Query ended\n");
283 * return false;
284 * }
285 *
286 * // Process record
287 * // Do not call as_record_destroy() because the calling function will do that for you.
288 * return true;
289 * }
290 *
291 * as_query query;
292 * as_query_init(&query, "test", "demo");
293 * as_query_select(&query, "bin1");
294 * as_query_where(&query, "bin2", as_integer_equals(100));
295 *
296 * if (aerospike_query_async(&as, &err, NULL, &query, my_listener, NULL, NULL) != AEROSPIKE_OK) {
297 * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
298 * }
299 * as_query_destroy(&query);
300 * ~~~~~~~~~~
301 *
302 * @param as Aerospike cluster instance.
303 * @param err Error detail structure that is populated if an error occurs.
304 * @param policy Query policy configuration parameters, pass in NULL for default.
305 * @param query Query definition.
306 * @param listener The function to be called for each returned value.
307 * @param udata User-data to be passed to the callback.
308 * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
309 * chosen by round-robin.
310 *
311 * @return AEROSPIKE_OK if async query succesfully queued. Otherwise an error.
312 * @ingroup query_operations
313 */
316 aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
317 as_async_query_record_listener listener, void* udata, as_event_loop* event_loop
318 );
319
320/**
321 * Asynchronously query records with a partition filter. Standard queries are supported, but
322 * aggregation queries are not supported in async mode.
323 * Requires server version 6.0+.
324 *
325 * ~~~~~~~~~~{.c}
326 * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
327 * {
328 * if (err) {
329 * printf("Query failed: %d %s\n", err->code, err->message);
330 * return false;
331 * }
332 *
333 * if (! record) {
334 * printf("Query ended\n");
335 * return false;
336 * }
337 *
338 * // Process record
339 * // Do not call as_record_destroy() because the calling function will do that for you.
340 * return true;
341 * }
342 *
343 * as_query query;
344 * as_query_init(&query, "test", "demo");
345 * as_query_select(&query, "bin1");
346 * as_query_where(&query, "bin2", as_integer_equals(100));
347 *
348 * as_partition_filter pf;
349 * as_partition_filter_set_range(&pf, 0, 1024);
350 *
351 * if (aerospike_query_partitions_async(&as, &err, NULL, &query, &pf, my_listener, NULL, NULL)
352 * != AEROSPIKE_OK) {
353 * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
354 * }
355 * as_query_destroy(&query);
356 * ~~~~~~~~~~
357 *
358 * @param as Aerospike cluster instance.
359 * @param err Error detail structure that is populated if an error occurs.
360 * @param policy Query policy configuration parameters, pass in NULL for default.
361 * @param query Query definition.
362 * @param pf Partition filter.
363 * @param listener The function to be called for each returned value.
364 * @param udata User-data to be passed to the callback.
365 * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
366 * chosen by round-robin.
367 *
368 * @return AEROSPIKE_OK if async query succesfully queued. Otherwise an error.
369 * @ingroup query_operations
370 */
373 aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
374 as_partition_filter* pf, as_async_query_record_listener listener, void* udata,
375 as_event_loop* event_loop
376 );
377
378/**
379 * Apply user defined function on records that match the query filter. Records are not returned to
380 * the client. This asynchronous server call will return before the command is complete. The user
381 * can optionally wait for command completion.
382 *
383 * ~~~~~~~~~~{.c}
384 * as_query query;
385 * as_query_init(&query, "test", "demo");
386 * as_query_select(&query, "bin1");
387 * as_query_where(&query, "bin2", as_integer_equals(100));
388 * as_query_apply(&query, "my_lua.lua", "my_lua_function", NULL);
389 * uint64_t query_id = 0;
390 *
391 * if (aerospike_query_background(&as, &err, NULL, &query, &query_id) == AEROSPIKE_OK) {
392 * aerospike_query_wait(as, &err, NULL, &query, query_id, 0);
393 * }
394 * else {
395 * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
396 * }
397 * as_query_destroy(&query);
398 * ~~~~~~~~~~
399 *
400 * @param as Aerospike cluster instance.
401 * @param err Error detail structure that is populated if an error occurs.
402 * @param policy Write configuration parameters, pass in NULL for default.
403 * @param query The query to execute against the cluster.
404 * @param query_id The id for the query job, which can be used for obtaining query status.
405 *
406 * @return AEROSPIKE_OK on success, otherwise an error.
407 * @ingroup query_operations
408 */
411 aerospike* as, as_error* err, const as_policy_write* policy,
412 const as_query* query, uint64_t* query_id
413 );
414
415/**
416 * Wait for a background query to be completed by servers.
417 *
418 * @param as Aerospike cluster instance.
419 * @param err Error detail structure that is populated if an error occurs.
420 * @param policy Info configuration parameters, pass in NULL for default.
421 * @param query The query that was executed against the cluster.
422 * @param query_id The id for the query job, which can be used for obtaining query status.
423 * @param interval_ms Polling interval in milliseconds. If zero, 1000 ms is used.
424 *
425 * @return AEROSPIKE_OK on success, otherwise an error.
426 * @ingroup query_operations
427 */
428static inline as_status
430 aerospike* as, as_error* err, const as_policy_info* policy,
431 const as_query* query, uint64_t query_id, uint32_t interval_ms
432 )
433{
434 const char* module = (query->where.size > 0)? "query" : "scan";
435 return aerospike_job_wait(as, err, policy, module, query_id, interval_ms);
436}
437
438/**
439 * Check the progress of a background query running on the database.
440 *
441 * @param as Aerospike cluster instance.
442 * @param err Error detail structure that is populated if an error occurs.
443 * @param policy Info configuration parameters, pass in NULL for default.
444 * @param query The query that was executed against the cluster.
445 * @param query_id The id for the query job, which can be used for obtaining query status.
446 * @param info Information about this background query, to be populated by this operation.
447 *
448 * @return AEROSPIKE_OK on success, otherwise an error.
449 * @ingroup query_operations
450 */
451static inline as_status
453 aerospike* as, as_error* err, const as_policy_info* policy,
454 const as_query* query, uint64_t query_id, as_job_info* info
455 )
456{
457 const char* module = (query->where.size > 0)? "query" : "scan";
458 return aerospike_job_info(as, err, policy, module, query_id, false, info);
459}
460
461#ifdef __cplusplus
462} // end extern "C"
463#endif
AS_EXTERN as_status aerospike_job_info(aerospike *as, as_error *err, const as_policy_info *policy, const char *module, uint64_t job_id, bool stop_if_in_progress, as_job_info *info)
AS_EXTERN as_status aerospike_job_wait(aerospike *as, as_error *err, const as_policy_info *policy, const char *module, uint64_t job_id, uint32_t interval_ms)
as_status
Definition as_status.h:30
#define AS_EXTERN
Definition as_std.h:25
AS_EXTERN as_status aerospike_query_background(aerospike *as, as_error *err, const as_policy_write *policy, const as_query *query, uint64_t *query_id)
AS_EXTERN as_status aerospike_query_foreach(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, aerospike_query_foreach_callback callback, void *udata)
static as_status aerospike_query_info(aerospike *as, as_error *err, const as_policy_info *policy, const as_query *query, uint64_t query_id, as_job_info *info)
static as_status aerospike_query_wait(aerospike *as, as_error *err, const as_policy_info *policy, const as_query *query, uint64_t query_id, uint32_t interval_ms)
AS_EXTERN as_status aerospike_query_partitions_async(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, as_partition_filter *pf, as_async_query_record_listener listener, void *udata, as_event_loop *event_loop)
bool(* aerospike_query_foreach_callback)(const as_val *val, void *udata)
bool(* as_async_query_record_listener)(as_error *err, as_record *record, void *udata, as_event_loop *event_loop)
AS_EXTERN as_status aerospike_query_async(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, as_async_query_record_listener listener, void *udata, as_event_loop *event_loop)
AS_EXTERN as_status aerospike_query_partitions(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, as_partition_filter *pf, aerospike_query_foreach_callback callback, void *udata)