Aggregations
There are various forms of processing results of a secondary index query. A common form is aggregation, where you apply a function on the entire results set of a query.
Many developers use SQL for define aggregation queries against a database. For example, the following SQL statement counts rows from the database:
SELECT count(*)FROM test.demoWHERE d = 50
This counts the number of records in test.demo that contain a column d equal 50. In Aerospike you can use UDFs and queries to do this.
Defining Query
as_query query;as_query_init(&query, "test", "demoset");
as_query_where_inita(&query, 1);as_query_where(&query, "d", integer_equals(50));
as_query_apply(&query, "mymodule", "mycount", NULL);
Use this query
object to query for records in the Namespace test within set demoset, looking for records with bin name d and an integer value of 50. The mycount()
Stream UDF is applied on the result set of the query. (Stream UDFs are in the mymodule UDF module.)
Defining a Stream UDF
The mycount()
Stream UDF allows you to process a stream of data:
local function one(rec) return 1end
local function add(a, b) return a + bend
function mycount(stream) return stream : map(one) : reduce(add);end
The mycount()
Stream UDF is applied to a stream of results from the query. We can add to the stream these operations to perform on the results:
map
— Maps a value from the stream to another value. In this example, mapping is defined asone()
, which maps a record to the value 1.reduce
— Reduces the values from the stream to a single value. In the example, reduction is performed by adding two values from the stream, which are the 1s frommap
.
The end result is a stream that contains a single value: the count
(or the sum of 1 for each record in the result set).
Registering UDFs
Before making a query using the Stream UDF, the UDF must register with the Aerospike server.
as_error err;
if (aerospike_udf_put(&as, &err, NULL, "mymodule", AS_UDF_TYPE_LUA, &udf_content) != AEROSPIKE_OK) { LOG("aerospike_udf_put() returned %d - %s", err.code, err.message);}
Executing Queries
To execute the query using aerospike_query_foreach()
:
if (aerospike_query_foreach(&as, &err, NULL, &query, each_value, NULL) != AEROSPIKE_OK) { fprintf(stderr, "err(%d) %s at [%s:%d]\n", err.code, err.message, err.file, err.line);}
Processing the Results
Call each_value()
for each value that returns from the query:
bool each_value(const as_val *val, void *udata) { if (val == NULL) { // query is complete return true; }
as_integer *ival = as_integer_fromval(val);
if (ival == NULL) { // abort the query return false; }
// process the value
return true;}
The example above returns a single integer: the count of the records that satisfy the query.
To pass a global object each time during the callback, provide userdata
in as_query_foreach()
.
Cleaning Up Resources
After the query results complete processing, the client can safely destroy the query object and its member objects using as_query_destroy()
. Note the example avoids an explicit as_query_destroy()
by using the stack-allocated as_query
object and as_query_where_inita()
to avoid using the internal heap.