Aggregations
Jump to the Code block for a combined complete example.
A common way to process the results of a basic query is aggregation, where you compute a function over the entire results set.
Setup
The following examples will use the setup below to illustrate query aggregation with a stream UDF.
import com.aerospike.client.AerospikeClient;import com.aerospike.client.query.Filter;import com.aerospike.client.query.ResultSet;import com.aerospike.client.query.Statement;import com.aerospike.client.lua.LuaConfig;
// Establishes a connection to the serverAerospikeClient client = new AerospikeClient("127.0.0.1", 3000);The record structure:
Occurred: IntegerReported: IntegerPosted: IntegerReport: Map{ shape: List, summary: String, city: String, state: String, duration: String}Location: GeoJSONStream UDF
When a query executes, it produces a stream of results. That stream contains records that you can iterate using the client API. However, Aerospike provides the ability to process the stream of results using a Stream UDF. Stream UDFs allow a stream to be augmented with operations that process the data flowing through the stream.
This example uses the Stream UDF count, from the example.lua module.
See Manage UDFs for information on registering the UDF.
-- Aggregation function to count recordslocal function one(rec) return 1end
local function add(a, b) return a + bend
function count(stream) return stream : map(one) : reduce(add);endcount() is applied to the stream of results from a query, adding to the stream the operations to perform on the results:
map— Maps a value from the stream to another value. In this example, mapping is defined as the functionone(), which maps a record to the value 1.reduce— Reduces the values from the stream into a single value. In this example, reduction is performed by adding two values from the stream, which happen to be 1s returned from themapfunction.
The end result is a stream that contains a single value, the sum of 1 for each record in the result set.
Client UDF path
For client-side Stream UDF processing, you must point the client to the local location of the UDF module.
// Set local directoryLuaConfig.SourceDirectory = "/home/user/udf";Execute the query
The following example will execute a secondary index query, using an index created on the occurred bin.
The returned result will be a count of all records with an occurred value between 20210101 and 20211231.
// Create statementStatement stmt = new Statement();
// Set namespace and set namestmt.setNamespace("sandbox");stmt.setSetName("ufodata");
// Create index filterstmt.setFilter(Filter.range("occurred", 20210101, 20211231));
// Execute the queryResultSet resultSet = client.queryAggregate(null, stmt, "example", "count");
// Get the resultif (resultSet.next()) { Object result = resultSet.getObject(); System.out.println("Count = " + result);}
// Close the connection to the serverclient.close();Code block
Expand this section for a single code block to apply a stream UDF aggregation
import com.aerospike.client.AerospikeClient;import com.aerospike.client.query.Filter;import com.aerospike.client.query.ResultSet;import com.aerospike.client.query.Statement;import com.aerospike.client.lua.LuaConfig;
// Establishes a connection to the serverAerospikeClient client = new AerospikeClient("127.0.0.1", 3000);
// Set local directoryLuaConfig.SourceDirectory = "/home/user/udf";
// Create statementStatement stmt = new Statement();
// Set namespace and set namestmt.setNamespace("sandbox");stmt.setSetName("ufodata");
// Create index filterstmt.setFilter(Filter.range("occurred", 20210101, 20211231));
// Execute the queryResultSet resultSet = client.queryAggregate(null, stmt, "example", "count");
// Get the resultif (resultSet.next()) { Object result = resultSet.getObject(); System.out.println("Count = " + result);}
// Close the connection to the serverclient.close();