Implementing SQL Operations: Aggregates (Part 1)
For an interactive Jupyter notebook experience:
This tutorial is Part 1 of how to implement SQL aggregate queries in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see how specific aggregate statements in SQL can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL aggregate queries. You should be able to understand them and find them useful even without deep familiarity with SQL.
This notebook is the second in the SQL Operations series that consists of the following notebooks:
- Implementing SQL Operations: SELECT
- Implementing SQL Operations: Aggregate functions - Part 1 (this notebook) and 2
- Implementing SQL Operations: UPDATE, CREATE, and DELETE
The specific topics and aggregate functions we discuss include:
- Execution model for processing aggregates
- Pipeline of stream operators
- Types of stream operators
- Two phases of execution
- Simple aggregation state
- MIN
- MAX
- SUM
- COUNT
- Complex aggregation state: Aggregate Operator
- Multiple aggregate computations
In Part 2, we describe GROUP BY processing and some additional aggregates.
The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.
Working with UDF Module
All UDF functions for this notebook are placed in "aggregate_fns.lua" file under the "udf" subdirectory. If the subdirectory or file is not there, you may download the file from here and place it there using the notebook's File->Open followed by Upload/New menus.
You are encouraged to experiment with the Lua code in the module. Be sure to save your changes and then run the convenience function "registerUDF()" in a code cell for the changes to take effect.
Prerequisites
This tutorial assumes familiarity with the following topics:
Initialization
Ensure database is running
This notebook requires that Aerospike database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Download and install additional components.
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
Connect to database and populate test data
The test data has 1000 records with user-key "id-1" through "id-1000", two integer bins (fields) "bin1" (1-1000) and "bin2" (1001-2000), and one string bin "bin3" (random 5 values "A" through "E"), in the namespace "test" and set "sql-aggregate".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import java.util.Random;
String[] groups = {"A", "B", "C", "D", "E"};
Random rand = new Random(1);
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";
String Set = "sql-aggregate";
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 1000; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
Bin bin3 = new Bin(new String("bin3"), groups[rand.nextInt(groups.length)]);
client.put(wpolicy, key, bin1, bin2, bin3);
}
System.out.format("Test data popuated");;
Output:
Initialized the client and connected to the cluster.
Test data popuated
Create a secondary index
To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on "bin1" in "sql-aggregate" set.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_aggregate_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Output:
Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.
Execution Model for Processing Aggregates
Processing aggregates in Aerospike involves processing a stream of records through a pipeline of operators on server as well as client.
Four types of operators are supported: Filter, Map, Aggregate, and Reduce. The operators work with one of the following data types as input and output: Record, Integer, String, Map (the data type, not to be confused with the Map operator), and List. Only the initial filter(s) and first non-filter operator in the pipeline can consume Record type.
- Filter: Object -> Boolean; filters input objects, input and output objects are of the same type.
- Map: Object -> Object; any transformation is possible.
- Aggregate: (Current State, Object) -> New State; maintains the global "aggregate" state of the stream. While any type can be used, a (Aerospike) Map type is often used.
- Reduce: (Object, Object) -> Object; reduces two objects to a single object of the same type.
The operators may appear any number of times and in any order in the pipeline.
The operator pipeline is typically processed in two phases: first phase on server nodes and the second phase on client.
- Phase 1: Server nodes execute all operators up to and including the first reduce operation in the pipeline.
- Phase 2: The client processes results from multiple nodes through the remaining pipeline operators starting with and including the first reduce operation in the pipeline.
Thus, the first reduce operation if specified in the pipeline is executed on all server nodes as well as on client. If there is no reduce operator in the pipeline, the application will receive the combined results returned from server nodes.
Post aggregation processing involves operators after the first reduce in the pipeline, usually for sorting, filtering, and final transformation, and takes place on the client side.
Aggregation processing in Aerospike is defined using User Defined Functions (UDFs). UDFs are written in Lua with arbitrary logic and are executed on both server and client as explained above. Since aggregates by definition involve multiple records, only stream UDFs are discussed below (versus record UDFs whose scope of execution is a single record).
A stream UDF specifies the pipeline of operators for processing aggregates. Different aggregates differ in their UDF functions, whereas the Aerospike APIs are the same to specify the aggregate processing.
The UDFs and logic are described in appropriate sections for each aggregate function below. For additional context and details, please refer to the documentation.
Simple Aggregate State with Map-Reduce
SELECT aggregate(col) FROM namespace.set
The UDF function that specifies the processing pipeline is different, and is specified and executed with the following API operations for synchronous execution.
void Statement::setAggregateFunction(String udfModule, String udfFunction, ... Value udfArgs))
ResultSet rs = Client::queryAggregate(QueryPolicy policy, Statement stmt);
Simple aggregate computations are possible with a simple pipeline of map and reduce. Such simple computations can save the aggregation state in a single numeric or string value during stream processing. Examples include single aggregate computations for:
- MIN
- MAX
- SUM
- COUNT
Let us separately implement COUNT and SUM aggregate functions on a single bin.
Create User Defined Function (UDF)
Note, all UDF functions for this notebook are assumed to be in "aggregate_fns.lua" file under "udf" directory. Please refer to "Working with UDF Module" section above.
As explained above, the logic for aggregation resides in a stream UDF.
COUNT
Examine the following Lua code that implements COUNT. The pipeline consists of map and reduce operators.
- the map function "rec_to_count_closure" is a closure for "rec_to_count" which takes a record and returns 1 or 0 that signifies record (if bin is unspecified) or bin (if it is specified) count. In this and subsequent examples, closures are used to access the aggregate parameters.
- the reduce function "add_values" adds the two input values and returns their sum.
-- count and sum reducer
local function add_values(val1, val2)
return (val1 or 0) + (val2 or 0)
end
-- count mapper
-- note closures are used to access aggregate parameters such as bin
local function rec_to_count_closure(bin)
local function rec_to_count(rec)
-- if bin is specified: if bin exists in record return 1 else 0; if no bin is specified, return 1
return (not bin and 1) or ((rec[bin] and 1) or 0)
end
return rec_to_count
end
-- count
function count(stream)
return stream : map(rec_to_count_closure()) : reduce(add_values)
end
SUM
Examine the following Lua code that implements SUM. The pipeline consists of map and reduce operators.
- the map function "rec_to_bin_value_closure" is a closure for "rec_to_bin_value" which takes a record and returns the bin value. In this and subsequent examples, closures are used to access the aggregate parameters such as the bin in this case.
- the reduce function "add_values" adds the two input values and returns their sum.
- mapper for various single bin aggregates
local function rec_to_bin_value_closure(bin)
local function rec_to_bin_value(rec)
-- if a numeric bin exists in record return its value; otherwise return nil
local val = rec[bin]
if (type(val) ~= "number") then val = nil end
return val
end
return rec_to_bin_value
end
-- sum
function sum(stream, bin)
return stream : map(rec_to_bin_value_closure(bin)) : reduce(add_values)
end
Register UDF
Register the UDF with the server by executing the following code cell.
The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiment with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";
String UDFFile = "aggregate_fns.lua";
String UDFModule = "aggregate_fns";
void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Output:
Registered the UDF module aggregate_fns.lua.
Execute UDF
SELECT COUNT(bin2) FROM test.sql-aggregate
SELECT SUM(bin2) FROM test.sql-aggregate
Here we will execute the "count" and "sum" functions on "bin2" in all (1000) records in the set. The expected sum for bin2 values (1001 + 1002
- ... + 2000) is 1500500.
import com.aerospike.client.query.Statement;
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.ResultSet;
// COUNT
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "count", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed COUNT.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s\n", obj.toString());
}
rs.close();
// SUM
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "sum", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s\n", obj.toString());
}
rs.close();
Output:
Executed COUNT.
Returned object: 1000
Executed SUM.
Returned object: 1500500
Implementing the WHERE Clause
SELECT agg(col) FROM namespace.set WHERE condition
The WHERE clause must be implemented using either query's index predicate or UDF's stream filter. Let's implement this specific query:
SELECT SUM(bin2) FROM test.sql-aggregate WHERE bin1 >= 3 AND bin1 <= 7
Let's first use query filter and then UDF stream filter to illustrate. In both cases, the filter is 2\<=bin1\<=7. The expected sum (1002 + 1003
- .. + 1007) is 6027.
import com.aerospike.client.query.Filter;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 2, 7));
stmt.setAggregateFunction(UDFModule, "sum", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM using the query filter.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Output:
Executed SUM using the query filter.
Returned object: 6027
Using Filter Operator
Now let's implement the range filter function in UDF.
SUM_RANGE
Examine the following Lua code that implements the SUM with a range filter. It takes sum_bin, range_bin, and range limits range_low and range_high. The pipeline consists of filter followed by map and reduce operators.
- the filter function "range_filter" returns true if the bin value is within the range [range_low, range_high], false otherwise.
- the map function "rec_to_bin_value" takes a record and returns the numeric "bin" value. If "bin" doesn't exist or is non-numeric, returns 0.
- the reduce function "add_values" adds the two input values and returns their sum.
-- range filter
local function range_filter_closure(range_bin, range_low, range_high)
local function range_filter(rec)
-- if bin value is in [low,high] return true, false otherwise
local val = rec[range_bin]
if (not val or type(val) ~= "number") then val = nil end
return (val and (val >= range_low and val <= range_high)) or false
end
return ranger_filter
end
-- sum of range: sum(sum_bin) where range_bin in [range_low, range_high]
function sum_range(stream, sum_bin, range_bin, range_low, range_high)
return stream : filter(range_filter_closure(range_bin, range_low, range_high))
: map(rec_to_bin_value_closure(sum_bin)) : reduce(add_values)
end
Execute SUM_RANGE
With the same range (2 \<= bin1 \<= 7), we expect the same results.
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "sum_range",
Value.get("bin2"), Value.get("bin1"), Value.get(2), Value.get(7));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM-RANGE using the filter operator.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Output:
Executed SUM-RANGE using the filter operator.
Returned object: 6027
Do Not Use Expression Filters
Note, you cannot use expression filters with queryAggregate as they are ignored. Below, all records in the set are aggregated in sum even when the expression filter 2 \<= bin1 \<= 7 that is specified in the policy.
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin1"), Exp.val(2)),
Exp.le(Exp.intBin("bin1"), Exp.val(7))));
stmt.setAggregateFunction(UDFModule, "sum", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM using expression filter 2 <= bin1 <=7");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Output:
Executed SUM using expression filter 2 <= bin1 <=7
Returned object: 1500500
More Simple Aggregates: MIN and MAX
SELECT MIN(bin2) FROM test.sql-aggregate
SELECT MAX(bin2) FROM test.sql-aggregate
Examine the following Lua code that implements the aggregate functions MIN and MAX.
MIN
The pipeline consists of a simple map and reduce.
- the map function "rec_to_bin_value" is as described in earlier examples.
- the reduce function returns the minimum of the input values and handles nil values appropriately.
MAX is very similar to MIN above.
-- min reducer
local function get_min(val1, val2)
local min = nil
if val1 then
if val2 then
if val1 < val2 then min = val1 else min = val2 end
else min = val1
end
else
if val2 then min = val2 end
end
return min
end
-- min
function min(stream, bin)
return stream : map(rec_to_bin_value_closure(bin)) : reduce(get_min)
end
-- max reducer
local function get_max(val1, val2)
local max = nil
if val1 then
if val2 then
if val1 > val2 then max = val1 else max = val2 end
else max = val1
end
else
if val2 then max = val2 end
end
return max
end
-- max
function max(stream, bin)
return stream : map(rec_to_bin_value_closure(bin)) : reduce(get_max)
end
// MIN
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "min", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed MIN.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s\n", obj.toString());
}
rs.close();
// MAX
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "max", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed MAX.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Output:
Executed MIN.
Returned object: 1001
Executed MAX.
Returned object: 2000
Using Aggregate Operator
SELECT agg1(bin1), agg2(bin2), ... FROM namespace.set WHERE condition
The aggregate operator is used when you need to track a more complex state during stream processing. For example, to compute multiple aggregates in one query or to compute aggregates that need other aggregates for evaluation such as AVERAGE (SUM/COUNT) and RANGE (MAX-MIN).
We will illustrate the aggregate operator for AVERAGE and RANGE computations of bin1 and bin2 respectively. The aggregate function will compute SUM, COUNT, MIN, and MAX of appropriate bins needed for AVERAGE and RANGE computations at the end.
SELECT AVERAGE(bin1), RANGE(bin2), ... FROM test.sql-aggregate
We will implement a new UDF "average_range" for this.
Note that the reducer function entails merging two partial stream aggregates into one by adding their "sum" and "count" values ("map merge"). The final phase of reduce happens on the client to arrive at the final Sum and Count. The final map operator is a client-only operation that takes the aggregate (map) as input and outputs the average and range values.
AVERAGE_RANGE
It takes the bins whose AVERAGE and RANGE are needed. The pipeline consists of map, aggregate, reduce, and map operators.
- the map function "rec_to_bins" returns numeric values of bin_avg and bin_range.
- the aggregate function "aggregate_stats" takes the current aggregate state and two bin values and returns the new aggregate state.
- the reduce function "merge_stats" merges two aggregate state maps by adding corresponding (same key) elements and returns a merged map.
- the last map operator "compute_final_stats" takes the final value of SUM, COUNT, MIN, and MAX stats and outputs two values: AVERAGE (SUM/COUNT) and RANGE (MAX-MIN).
-- map function to compute average and range
local function compute_final_stats(stats)
local ret = map();
ret['AVERAGE'] = stats["sum"] / stats["count"]
ret['RANGE'] = stats["max"] - stats["min"]
return ret
end
-- merge partial stream maps into one
local function merge_stats(a, b)
local ret = map()
ret["sum"] = add_values((a["sum"], b["sum"])
ret["count"] = add_values(a["count"], b["count"])
ret["min"] = get_min(a["min"], b["min"])
ret["max"] = get_max(a["max"], b["max"])
return ret
end
-- aggregate operator to compute stream state for average_range
local function aggregate_stats(agg, val)
agg["count"] = (agg["count"] or 0) + ((val["bin_avg"] and 1) or 0)
agg["sum"] = (agg["sum"] or 0) + (val["bin_avg"] or 0)
agg["min"] = get_min(agg["min"], val["bin_range"])
agg["max"] = get_max(agg["max"], val["bin_range"])
return agg
end
-- average_range
function average_range(stream, bin_avg, bin_range)
local function rec_to_bins(rec)
-- extract the values of the two bins in ret
local ret = map()
ret["bin_avg"] = rec[bin_avg]
ret["bin_range"] = rec[bin_range]
return ret
end
return stream : map(rec_to_bins) : aggregate(map(), aggregate_stats) : reduce(merge_stats) : map(compute_final_stats)
end
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "average_range", Value.get("bin1"), Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed AVERAGE+RANGE.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Output:
Executed AVERAGE+RANGE.
Returned object: {AVERAGE=500.5, RANGE=999}
Takeaways and Conclusion
Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.
Clean up
Remove tutorial data and close connection.
client.dropIndex(null, Namespace, Set, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Output:
Removed tutorial data and closed server connection.
Further Exploration and Resources
Here are some links for further exploration
Resources
- Related notebooks
- Queries
- Other notebooks in the SQL series on 1) SELECT, 2) Aggregates (Part 2), and 2) UPDATE, CREATE, and DELETE.
- Aerospike Presto Connector
- Blog post
- Github repos
- Documentation
Next steps
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.