Skip to main content

Aggregations


Jump to the Code block for a combined complete example.

A common way to process the results of a basic query is aggregation, where you compute a function over the entire results set.

Setup

The following examples will use the setup below to illustrate query aggregation with a stream UDF.

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.ResultSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.lua.LuaConfig;

// Establishes a connection to the server
AerospikeClient client = new AerospikeClient("127.0.0.1", 3000);

The record structure:

Occurred: Integer  
Reported: Integer
Posted: Integer
Report: Map
{
shape: List,
summary: String,
city: String,
state: String,
duration: String
}
Location: GeoJSON

Stream UDF

When a query executes, it produces a stream of results. That stream contains records that you can iterate using the client API. However, Aerospike provides the ability to process the stream of results using a Stream UDF. Stream UDFs allow a stream to be augmented with operations that process the data flowing through the stream.

This example uses the Stream UDF count, from the example.lua module. See Manage UDFs for information on registering the UDF.

-- Aggregation function to count records
local function one(rec)
return 1
end

local function add(a, b)
return a + b
end

function count(stream)
return stream : map(one) : reduce(add);
end

count() is applied to the stream of results from a query, adding to the stream the operations to perform on the results:

  • map — Maps a value from the stream to another value. In this example, mapping is defined as the function one(), which maps a record to the value 1.
  • reduce — Reduces the values from the stream into a single value. In this example, reduction is performed by adding two values from the stream, which happen to be 1s returned from the map function.

The end result is a stream that contains a single value, the sum of 1 for each record in the result set.

Client UDF path

For client-side Stream UDF processing, you must point the client to the local location of the UDF module.

// Set local directory
LuaConfig.SourceDirectory = "/home/user/udf";

Execute the query

The following example will execute a secondary index query, using an index created on the occurred bin. The returned result will be a count of all records with an occurred value between 20210101 and 20211231.

// Create statement
Statement stmt = new Statement();

// Set namespace and set name
stmt.setNamespace("sandbox");
stmt.setSetName("ufodata");

// Create index filter
stmt.setFilter(Filter.range("occurred", 20210101, 20211231));

// Execute the query
ResultSet resultSet = client.queryAggregate(null, stmt, "example", "count");

// Get the result
if (resultSet.next()) {
Object result = resultSet.getObject();
System.out.println("Count = " + result);
}

// Close the connection to the server
client.close();

Code block

Expand this section for a single code block to apply a stream UDF aggregation
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.ResultSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.lua.LuaConfig;

// Establishes a connection to the server
AerospikeClient client = new AerospikeClient("127.0.0.1", 3000);

// Set local directory
LuaConfig.SourceDirectory = "/home/user/udf";

// Create statement
Statement stmt = new Statement();

// Set namespace and set name
stmt.setNamespace("sandbox");
stmt.setSetName("ufodata");

// Create index filter
stmt.setFilter(Filter.range("occurred", 20210101, 20211231));

// Execute the query
ResultSet resultSet = client.queryAggregate(null, stmt, "example", "count");

// Get the result
if (resultSet.next()) {
Object result = resultSet.getObject();
System.out.println("Count = " + result);
}

// Close the connection to the server
client.close();