Aggregations
Jump to the Code block for a combined complete example.
A common way to process the results of a basic query is aggregation, where you compute a function over the entire results set.
Setup
The following examples will use the setup below to illustrate query aggregation with a stream UDF.
import ( "fmt" "github.com/aerospike/aerospike-client-go/v6")
// Establishes a connection to the serverclient, err := aerospike.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.Close()
The record structure:
Occurred: IntegerReported: IntegerPosted: IntegerReport: Map{ shape: List, summary: String, city: String, state: String, duration: String}Location: GeoJSON
Stream UDF
When a query executes, it produces a stream of results. That stream contains records that you can iterate using the client API. However, Aerospike provides the ability to process the stream of results using a Stream UDF. Stream UDFs allow a stream to be augmented with operations that process the data flowing through the stream.
This example uses the Stream UDF count
, from the example.lua
module.
See Manage UDFs for information on registering the UDF.
-- Aggregation function to count recordslocal function one(rec) return 1end
local function add(a, b) return a + bend
function count(stream) return stream : map(one) : reduce(add);end
count()
is applied to the stream of results from a query, adding to the stream the operations to perform on the results:
map
— Maps a value from the stream to another value. In this example, mapping is defined as the functionone()
, which maps a record to the value 1.reduce
— Reduces the values from the stream into a single value. In this example, reduction is performed by adding two values from the stream, which happen to be 1s returned from themap
function.
The end result is a stream that contains a single value, the sum of 1 for each record in the result set.
Client UDF path
For client-side Stream UDF processing, you must point the client to the local location of the UDF module.
// Set local directoryaerospike.SetLuaPath("/home/user/udf/")
Execute the query
The following example will execute a secondary index query, using an index created on the occurred
bin.
The returned result will be a count of all records with an occurred
value between 20210101
and 20211231
.
// Create statementstmt := aerospike.NewStatement("sandbox", "ufodata")
// Create index filterstmt.SetFilter(aerospike.NewRangeFilter("occurred", 20210101, 20211231))
// Execute the queryrecordSet, err := client.QueryAggregate(nil, stmt, "example", "count")
// Get the resultsfor records := range recordSet.Results() { if records != nil { // Do something fmt.Printf("Count = %v", records.Record.Bins["SUCCESS"]) }}
Code block
Expand this section for a single code block to apply a stream UDF aggregation
import ( "fmt" "github.com/aerospike/aerospike-client-go/v6")
func main() { // Establishes a connection to the server client, err := aerospike.NewClient("127.0.0.1", 3000) if err != nil { log.Fatal(err) } defer client.Close()
// Set local directory aerospike.SetLuaPath("/home/user/udf/")
// Create statement stmt := aerospike.NewStatement("sandbox", "ufodata")
// Create index filter stmt.SetFilter(aerospike.NewRangeFilter("occurred", 20210101, 20211231))
// Execute the query recordSet, err := client.QueryAggregate(nil, stmt, "example", "count") if err != nil { log.Fatal(err) }
// Get the results for records := range recordSet.Results() { if records != nil { // Do something fmt.Printf("Count = %v", records.Record.Bins["SUCCESS"]) } }}