This tutorial is Part 1 of how to implement SQL aggregate queries in
Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see how specific aggregate statements in SQL
can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this
notebook provide patterns for implementing specific SQL aggregate
queries. You should be able to understand them and find them useful even
without deep familiarity with SQL.
This notebook is the second in the SQL Operations series that consists
of the following notebooks:
Implementing SQL Operations: SELECT
Implementing SQL Operations: Aggregate functions - Part 1 (this
notebook) and 2
Implementing SQL Operations: UPDATE, CREATE, and DELETE
The specific topics and aggregate functions we discuss include:
Execution model for processing aggregates
Pipeline of stream operators
Types of stream operators
Two phases of execution
Simple aggregation state
MIN
MAX
SUM
COUNT
Complex aggregation state: Aggregate Operator
Multiple aggregate computations
In Part 2, we describe GROUP BY processing and some additional
aggregates.
The purpose of this notebook is to illustrate Aerospike implementation
for specific SQL operations. Check out Aerospike Presto
Connector
for ad-hoc SQL access to Aerospike data.
Working with UDF Module
All UDF functions for this notebook are placed in “aggregate_fns.lua”
file under the “udf” subdirectory. If the subdirectory or file is not
there, you may download the file from
here
and place it there using the notebook’s File->Open followed by
Upload/New menus.
You are encouraged to experiment with the Lua code in the module. Be
sure to save your changes and then run the convenience function
“registerUDF()” in a code cell for the changes to take effect.
Prerequisites
This tutorial assumes familiarity with the following topics:
The test data has 1000 records with user-key “id-1” through “id-1000”,
two integer bins (fields) “bin1” (1-1000) and “bin2” (1001-2000), and
one string bin “bin3” (random 5 values “A” through “E”), in the
namespace “test” and set “sql-aggregate”.
Initialized the client and connected to the cluster.
Test data popuated
Create a secondary index
To use the query API with index based filter, a secondary index must
exist on the filter bin. Here we create a numeric index on “bin1” in
“sql-aggregate” set.
if (ae.getResultCode()!=ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Output:
Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.
Execution Model for Processing Aggregates
Processing aggregates in Aerospike involves processing a stream of
records through a pipeline of operators on server as well as client.
Four types of operators are supported: Filter, Map, Aggregate, and
Reduce. The operators work with one of the following data types as input
and output: Record, Integer, String, Map (the data type, not to be
confused with the Map operator), and List. Only the initial filter(s)
and first non-filter operator in the pipeline can consume Record type.
Filter: Object -> Boolean; filters input objects, input and output
objects are of the same type.
Map: Object -> Object; any transformation is possible.
Aggregate: (Current State, Object) -> New State; maintains the
global “aggregate” state of the stream. While any type can be used,
a (Aerospike) Map type is often used.
Reduce: (Object, Object) -> Object; reduces two objects to a single
object of the same type.
The operators may appear any number of times and in any order in the
pipeline.
The operator pipeline is typically processed in two phases: first phase
on server nodes and the second phase on client.
Phase 1: Server nodes execute all operators up to and including the
first reduce operation in the pipeline.
Phase 2: The client processes results from multiple nodes through
the remaining pipeline operators starting with and including the
first reduce operation in the pipeline.
Thus, the first reduce operation if specified in the pipeline is
executed on all server nodes as well as on client. If there is no reduce
operator in the pipeline, the application will receive the combined
results returned from server nodes.
Post aggregation processing involves operators after the first reduce in
the pipeline, usually for sorting, filtering, and final transformation,
and takes place on the client side.
Aggregation processing in Aerospike is defined using User Defined
Functions (UDFs). UDFs are written in Lua with arbitrary logic and are
executed on both server and client as explained above. Since aggregates
by definition involve multiple records, only stream UDFs are discussed
below (versus record UDFs whose scope of execution is a single record).
A stream UDF specifies the pipeline of operators for processing
aggregates. Different aggregates differ in their UDF functions, whereas
the Aerospike APIs are the same to specify the aggregate processing.
The UDFs and logic are described in appropriate sections for each
aggregate function below. For additional context and details, please
refer to the
documentation.
Simple Aggregate State with Map-Reduce
SELECT aggregate(col) FROM namespace.set
The UDF function that specifies the processing pipeline is different,
and is specified and executed with the following API operations for
synchronous execution.
void Statement::setAggregateFunction(String udfModule, String udfFunction, ... Value udfArgs))
Simple aggregate computations are possible with a simple pipeline of map
and reduce. Such simple computations can save the aggregation state in a
single numeric or string value during stream processing. Examples
include single aggregate computations for:
MIN
MAX
SUM
COUNT
Let us separately implement COUNT and SUM aggregate functions on a
single bin.
Create User Defined Function (UDF)
Note, all UDF functions for this notebook are assumed to be in
“aggregate_fns.lua” file under “udf” directory. Please refer to “Working
with UDF Module” section above.
As explained above, the logic for aggregation resides in a stream UDF.
COUNT
Examine the following Lua code that implements COUNT. The pipeline
consists of map and reduce operators.
the map function “rec_to_count_closure” is a closure for
“rec_to_count” which takes a record and returns 1 or 0 that
signifies record (if bin is unspecified) or bin (if it is specified)
count. In this and subsequent examples, closures are used to access
the aggregate parameters.
the reduce function “add_values” adds the two input values and
returns their sum.
-- count and sum reducer
local function add_values(val1, val2)
return (val1 or 0) + (val2 or 0)
end
-- count mapper
-- note closures are used to access aggregate parameters such as bin
local function rec_to_count_closure(bin)
local function rec_to_count(rec)
-- if bin is specified: if bin exists in record return 1 else 0; if no bin is specified, return 1
Examine the following Lua code that implements SUM. The pipeline
consists of map and reduce operators.
the map function “rec_to_bin_value_closure” is a closure for
“rec_to_bin_value” which takes a record and returns the bin value.
In this and subsequent examples, closures are used to access the
aggregate parameters such as the bin in this case.
the reduce function “add_values” adds the two input values and
returns their sum.
- mapper for various single bin aggregates
local function rec_to_bin_value_closure(bin)
local function rec_to_bin_value(rec)
-- if a numeric bin exists in record return its value; otherwise return nil
Register the UDF with the server by executing the following code cell.
The registerUDF() function below can be run conveniently when the UDF is
modified (you are encouraged to experiment with the UDF code). The
function invalidates the cache, removes the currently registered module,
and registers the latest version.
Now let’s implement the range filter function in UDF.
SUM_RANGE
Examine the following Lua code that implements the SUM with a range
filter. It takes sum_bin, range_bin, and range limits range_low and
range_high. The pipeline consists of filter followed by map and reduce
operators.
the filter function “range_filter” returns true if the bin value is
within the range [range_low, range_high], false otherwise.
the map function “rec_to_bin_value” takes a record and returns the
numeric “bin” value. If “bin” doesn’t exist or is non-numeric,
returns 0.
the reduce function “add_values” adds the two input values and
returns their sum.
-- range filter
local function range_filter_closure(range_bin, range_low, range_high)
local function range_filter(rec)
-- if bin value is in [low,high] return true, false otherwise
local val = rec[range_bin]
if (not val or type(val) ~= "number") then val = nil end
return (val and (val >= range_low and val <= range_high)) or false
end
return ranger_filter
end
-- sum of range: sum(sum_bin) where range_bin in [range_low, range_high]
function sum_range(stream, sum_bin, range_bin, range_low, range_high)
Note, you cannot use expression filters with queryAggregate as they are
ignored. Below, all records in the set are aggregated in sum even when
the expression filter 2 <= bin1 <= 7 that is specified in the policy.
SELECT agg1(bin1), agg2(bin2), ... FROM namespace.set WHERE condition
The aggregate operator is used when you need to track a more complex
state during stream processing. For example, to compute multiple
aggregates in one query or to compute aggregates that need other
aggregates for evaluation such as AVERAGE (SUM/COUNT) and RANGE
(MAX-MIN).
We will illustrate the aggregate operator for AVERAGE and RANGE
computations of bin1 and bin2 respectively. The aggregate function will
compute SUM, COUNT, MIN, and MAX of appropriate bins needed for AVERAGE
and RANGE computations at the end.
SELECT AVERAGE(bin1), RANGE(bin2), ... FROM test.sql-aggregate
We will implement a new UDF “average_range” for this.
Note that the reducer function entails merging two partial stream
aggregates into one by adding their “sum” and “count” values (“map
merge”). The final phase of reduce happens on the client to arrive at
the final Sum and Count. The final map operator is a client-only
operation that takes the aggregate (map) as input and outputs the
average and range values.
AVERAGE_RANGE
It takes the bins whose AVERAGE and RANGE are needed. The pipeline
consists of map, aggregate, reduce, and map operators.
the map function “rec_to_bins” returns numeric values of bin_avg and
bin_range.
the aggregate function “aggregate_stats” takes the current aggregate
state and two bin values and returns the new aggregate state.
the reduce function “merge_stats” merges two aggregate state maps by
adding corresponding (same key) elements and returns a merged map.
the last map operator “compute_final_stats” takes the final value of
SUM, COUNT, MIN, and MAX stats and outputs two values: AVERAGE
(SUM/COUNT) and RANGE (MAX-MIN).
-- map function to compute average and range
local function compute_final_stats(stats)
local ret = map();
ret['AVERAGE'] = stats["sum"] / stats["count"]
ret['RANGE'] = stats["max"] - stats["min"]
return ret
end
-- merge partial stream maps into one
local function merge_stats(a, b)
local ret = map()
ret["sum"] = add_values((a["sum"], b["sum"])
ret["count"] = add_values(a["count"], b["count"])
ret["min"] = get_min(a["min"], b["min"])
ret["max"] = get_max(a["max"], b["max"])
return ret
end
-- aggregate operator to compute stream state for average_range
local function aggregate_stats(agg, val)
agg["count"] = (agg["count"] or 0) + ((val["bin_avg"] and 1) or 0)
agg["sum"] = (agg["sum"] or 0) + (val["bin_avg"] or 0)
Many developers that are familiar with SQL would like to see how SQL
operations translate to Aerospike. We looked at how to implement various
aggregate statements. This should be generally useful irrespective of
the reader’s SQL knowledge. While the examples here use synchronous
execution, many operations can also be performed asynchronously.
Visit Aerospike notebooks
repo to
run additional Aerospike notebooks. To run a different notebook,
download the notebook from the repo to your local machine, and then
click on File->Open, and select Upload.