Skip to content
Visit booth 3171 at Google Cloud Next to see how to unlock real-time decisions at scaleMore info

Aggregations

Jump to the Code block for a combined complete example.

A common way to process the results of a basic query is aggregation, where you compute a function over the entire results set.

Setup

The following examples will use the setup below to illustrate query aggregation with a stream UDF.

import aerospike
from aerospike import predicates as p

The record structure:

Occurred: Integer
Reported: Integer
Posted: Integer
Report: Map
{
shape: List,
summary: String,
city: String,
state: String,
duration: String
}
Location: GeoJSON

Stream UDF

When a query executes, it produces a stream of results. That stream contains records that you can iterate using the client API. However, Aerospike provides the ability to process the stream of results using a Stream UDF. Stream UDFs allow a stream to be augmented with operations that process the data flowing through the stream.

This example uses the Stream UDF count, from the example.lua module. See Manage UDFs for information on registering the UDF.

-- Aggregation function to count records
local function one(rec)
return 1
end
local function add(a, b)
return a + b
end
function count(stream)
return stream : map(one) : reduce(add);
end

count() is applied to the stream of results from a query, adding to the stream the operations to perform on the results:

  • map — Maps a value from the stream to another value. In this example, mapping is defined as the function one(), which maps a record to the value 1.
  • reduce — Reduces the values from the stream into a single value. In this example, reduction is performed by adding two values from the stream, which happen to be 1s returned from the map function.

The end result is a stream that contains a single value, the sum of 1 for each record in the result set.

Client UDF path

For client-side Stream UDF processing, you must point the client to the local location of the UDF module.

# Define host configuration
config = {
'hosts': [ ('127.0.0.1', 3000) ],
# Set local directory
'lua': { 'user_path': '/home/user/udf'}
}
# Establishes a connection to the server
client = aerospike.client(config).connect()

Execute the query

The following example will execute a secondary index query, using an index created on the occurred bin. The returned result will be a count of all records with an occurred value between 20210101 and 20211231.

# Create the query
query = client.query('sandbox', 'ufodata')
# Set index filter
query.where(p.between('occurred', 20210101, 20211231))
# Add the UDF to the query
query.apply('example', 'count', [])
# Get the result
count = query.results()
print('Count = ', count)
# Close the connection to the server
client.close()

Code block

Expand this section for a single code block to apply a stream UDF aggregation
import aerospike
from aerospike import predicates as p
# Define host configuration
config = {
'hosts': [ ('127.0.0.1', 3000) ],
# Set local directory
'lua': { 'user_path': '/home/user/udf'}
}
# Establishes a connection to the server
client = aerospike.client(config).connect()
# Create the query
query = client.query('sandbox', 'ufodata')
# Set index filter
query.where(p.between('occurred', 20210101, 20211231))
# Add the UDF to the query
query.apply('example', 'count', [])
# Get the result
count = query.results()
print('Count = ', count)
# Close the connection to the server
client.close()
Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?