This tutorial is Part 2 of how to implement SQL aggregate queries in
Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see how specific aggregate statements in SQL
can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this
notebook provide patterns for implementing specific SQL aggregate
queries. You should be able to understand them and find them useful even
without deep familiarity with SQL.
This notebook is the third in the SQL Operations series that consists of
the following notebooks:
Implementing SQL Operations: SELECT
Implementing SQL Operations: Aggregate functions Part 1 and 2 (this
notebook)
Implementing SQL Operations: UPDATE, CREATE, and DELETE
Part 1 of Aggregate functions describes simpler aggregate processing of
a stream of records.
The specific topics and aggregate functions we discuss in this notebook
include:
Stream Partitioning with GROUP BY
Filtering partitions: HAVING
Sorting partitions: ORDER BY
Additional aggregate functions
DISTINCT
LIMIT
TOP N
The purpose of this notebook is to illustrate Aerospike implementation
for specific SQL operations. Check out Aerospike Presto
Connector
for ad-hoc SQL access to Aerospike data.
Prerequisites
This tutorial assumes familiarity with the following topics:
All UDF functions for this notebook are placed in “aggregate_fns.lua”
file under the “udf” subdirectory. If the subdirectory or file is not
there, you may download the file from
here
and place it there using the notebook’s File->Open followed by
Upload/New menus.
You are encouraged to experiment with the Lua code in the module. Be
sure to save your changes and then run the convenience function
“registerUDF()” in a code cell for the changes to take effect.
Initialization
Ensure database is running
This notebook requires that Aerospike Database is running.
The test data has 1000 records with user-key “id-1” through “id-1000”,
two integer bins (fields) “bin1” (1-1000) and “bin2” (1001-2000), and
one string bin “bin3” (random 5 values “A” through “E”), in the
namespace “test” and set “sql-aggregate”.
Initialized the client and connected to the cluster.
Test data populated
Create a secondary index
To use the query API with index based filter, a secondary index must
exist on the filter bin. Here we create a numeric index on “bin1” in
“sql-aggregate” set.
if (ae.getResultCode()!=ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Output:
Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.
Execution Model for Processing Aggregates
(This section is repeated for convenience from Part 1. Please skip to
the the next section if you are familiar with the execution model.)
Processing aggregates in Aerospike involves processing a stream of
records through a pipeline of operators on server as well as client.
Four types of operators are supported: Filter, Map, Aggregate, and
Reduce. The operators work with one of the following data types as input
and output: Record, Integer, String, Map (the data type, not to be
confused with the Map operator), and List. Only the initial filter(s)
and first non-filter operator in the pipeline can consume Record type.
Filter: Object -> Boolean; filters input objects, input and output
objects are of the same type.
Map: Object -> Object; any transformation is possible.
Aggregate: (Current State, Object) -> New State; maintains the
global “aggregate” state of the stream. While any type can be used,
a (Aerospike) Map type is often used.
Reduce: (Object, Object) -> Object; reduces two objects to a single
object of the same type.
The operators may appear any number of times and in any order in the
pipeline.
The operator pipeline is typically processed in two phases: first phase
on server nodes and the second phase on client.
Phase 1: Server nodes execute all operators up to and including the
first reduce operation in the pipeline.
Phase 2: The client processes results from multiple nodes through
the remaining pipeline operators starting with and including the
first reduce operation in the pipeline.
Thus, the first reduce operation if specified in the pipeline is
executed on all server nodes as well as on client. If there is no reduce
operator in the pipeline, the application will receive the combined
results returned from server nodes.
Post aggregation processing involves operators after the first reduce in
the pipeline, usually for sorting, filtering, and final transformation,
and takes place on the client side.
Aggregation processing in Aerospike is defined using User Defined
Functions (UDFs). UDFs are written in Lua with arbitrary logic and are
executed on both server and client as explained above. Since aggregates
by definition involve multiple records, only stream UDFs are discussed
below (versus record UDFs whose scope of execution is a single record).
A stream UDF specifies the pipeline of operators for processing
aggregates. Different aggregates differ in their UDF functions, whereas
the Aerospike APIs are the same to specify the aggregate processing.
The UDFs and logic are described in appropriate sections for each
aggregate function below. For additional context and details, please
refer to the
documentation.
Register UDF
Note, all UDF functions for this notebook are assumed to be in
“aggregate_fns.lua” file under “udf” directory. Please refer to “Working
with UDF Module” section above.
Register the UDF with the server by executing the following code cell.
The registerUDF() function below can be run conveniently when the UDF is
modified (you are encouraged to experiment with the UDF code). The
function invalidates the cache, removes the currently registered module,
and registers the latest version.
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Output:
Registered the UDF module aggregate_fns.lua.
Stream Partitioning with GROUP BY
SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1
GROUP BY processing partitions the record stream into multiple
partitions, one for each distinct value of the grouped-by bin. The
aggregate operator in the pipeline outputs a nested map - an outer map of
all partitions or distinct values of the grouped-by bin, and an inner
map for each partition to maintain each partition’s aggregates. The bins
aggregated for each group, such as agg(bin2) in the above SQL statement,
are stored within a group’s map.
Reduce uses map-merge to merge partial aggregates. Since map-merge
currently does not handle nested maps, merging at multiple levels have
to be explicitly specified as shown.
The filter “inner-condition” can be specified on any bins in the record,
and can be processed using a query predicate filter and/or a stream
filter operator. This is as described in Part 1, and so the example
below will omit the WHERE clause for simplicity.
SELECT bin1, SUM(bin2) FROM test.sql-aggregate GROUP BY bin1
We will implement a new UDF “groupby_with_sum” for this.
GROUPBY_WITH_SUM
It takes two bins: the bin to group-by and the bin to sum. The pipeline
consists of map, aggregate, and reduce operators.
the map function “rec_to_group_and_bin” adds a group tag, and return
a map containing the group and the value of bin to sum.
the aggregate function “group_sum” takes the current aggregate state
and “groupval” map and returns the new aggregate state. It creates a
map for each distinct group value and adds the value tagged for a
group to the group’s sum
the reduce function “merge_group_sum” is a nested map merge that
merges maps explicitly at the two levels.
-- nested map merge for group-by sum/count; explicit map merge at each nested level
local function merge_group_sum(a, b)
local function merge_group(x, y)
-- inner map merge
return map.merge(x, y, add_values)
end
-- outer map merge
return map.merge(a, b, merge_group)
end
-- aggregate for group-by sum
-- creates a map for each distinct group value and adds the value tagged for a group to the group's sum
local function group_sum(agg, groupval)
if not agg[groupval["group"]] then agg[groupval["group"]] = map() end
agg[groupval["group"]]["sum"] = (agg[groupval["group"]]["sum"] or 0) + (groupval["value"] or 0)
return agg
end
-- group-by with sum
function groupby_with_sum(stream, bin_grpby, bin_sum)
local function rec_to_group_and_bin(rec)
-- tag the group by bin_grpby value, return a map containing group and bin_sum value
local ret = map()
ret["group"] = rec[bin_grpby]
local val = rec[bin_sum]
if (not val or type(val) ~= "number") then val = 0 end
Returned object: {A={sum=276830}, B={sum=296246}, C={sum=260563}, D={sum=332231}, E={sum=334630}}
Filtering Partitions: HAVING
SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition
Note the inner filter “inner-condition” can be specified using any bins
in the record, whereas the outer filter and ORDER BY must use selected
(aggregated) bins from the query. We will focus on the outer condition
in the following example which outputs the count of distinct bin3 values
in the range “B” and “E”.
SELECT bin3, COUNT(*) FROM test.sql-aggreate GROUP BY bin3 HAVING "B" <= bin3 AND bin3 <= "E"
Processing for Having clause can be done by using a filter operator
after reduce.
Here we implement a new UDF “groupby_with_count_having” for this.
GROUPBY_WITH_COUNT_HAVING
It takes the group-by bin and the range values for the groups. The
pipeline consists of map, aggregate, reduce, and filter operators.
the map function “rec_to_group” simply returns the group-by bin
value.
the aggregate function “group_count” takes the current aggregate
state and a record’s group and returns the new aggregate state. It
creates a map for each distinct group value and increments the input
group’s count.
the reduce function “merge_group_sum” is a nested map merge that
merges maps explicitly at the two levels.
the filter function “process_having” iterates over the nested map,
applies the filter condition, and returns a slice of the input map.
-- aggregate for group-by count
-- creates a map for each distinct group value and increments the tagged group's count
local function group_count(agg, group)
if not agg[group] then agg[group] = map() end
agg[group]["count"] = (agg[group]["count"] or 0) + ((group and 1) or 0)
return agg
end
-- map function for group-by processing
local function rec_to_group_closure(bin_grpby)
local function rec_to_group(rec)
-- returns group-by bin value in a record
return rec[bin_grpby]
end
return rec_to_group
end
-- group-by having example: count(*) having low <= count <= high
function groupby_with_count_having(stream, bin_grpby, having_range_low, having_range_high)
local function process_having(stats)
-- filters groups with count in the range
local ret = map()
for key, value in map.pairs(stats) do
if (key >= having_range_low and key <= having_range_high) then
Returned object: {D={count=222}, B={count=196}, C={count=172}}
Sorting Partitions: ORDER BY
SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition ORDER BY bin
In the following example, the count of distinct bin3 values is produced
in descending order.
SELECT bin3, COUNT(*) FROM test.sql-aggregate GROUP BY bin3 ORDER BY COUNT
Processing for Order By clause can be done by using a map operator at
the end that outputs an ordered list.
The UDF “groupby_with_count_orderby” is very similar to the HAVING
example.
GROUPBY_WITH_COUNT_ORDERBY
It takes two bins to group-by order-by. The pipeline consists of map,
aggregate, reduce, and map operators.
the map function “rec_to_group” (see above) simply returns the
group-by bin value.
the aggregate function “group_count”(see above) takes the current
aggregate state and a record’s group and returns the new aggregate
state. It creates a map for each distinct group value and increments
the input group’s count.
the reduce function “merge_group_sum”(see above) is a nested map
merge that merges maps explicitly at the two levels.
the map function “process_orderby” uses lua table’s sort function to
sort the aggregate map into a flattened ordered list in this format
[k1, v1, k2, v2, …].
-- group-by count(*) order-by count
function groupby_with_count_orderby(stream, bin_grpby, bin_orderby)
local function orderby(t, order)
-- collect the keys
local keys = {}
for k in pairs(t) do keys[#keys+1] = k end
-- sort by the order by passing the table and keys a, b,
table.sort(keys, function(a,b) return order(t, a, b) end)
-- return the iterator function
local i = 0
return function()
i = i + 1
if keys[i] then
return keys[i], t[keys[i] ]
end
end
end
local function process_orderby(stats)
-- uses lua table sort to sort aggregate map into a list
-- list has k and v separately added for sorted entries
local ret = list()
local t = {}
for k,v in map.pairs(stats) do t[k] = v end
for k,v in orderby(t, function(t, a, b) return t[a][bin_orderby] < t[b][bin_orderby] end) do
Returned object: [C, {count=172}, A, {count=187}, B, {count=196}, D, {count=222}, E, {count=223}]
More Aggregates: DISTINCT, LIMIT, and TOP N
Let us see how DISTINCT, LIMIT, and TOP N can be processed. Only the
first two appear in SQL syntax, and the third is a special case of a
LIMIT query.
DISTINCT
SELECT DISTINCT(bin) FROM namespace.set WHERE condition
DISTINCT can be processed by storing all values in a map (in the
aggregate state) that is keyed on the value(s) of the bin(s) so only
unique values are retained.
In the following example, distinct bin3 values are produced for records
whose bin1 is in the range [101,200].
SELECT DISTINCT bin3 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200
The UDF “distinct” implements a single bin distinct.
DISTINCT
It takes the bin and returns its distinct values. The pipeline consists
of map, aggregate, reduce, and map operators.
the map function “rec_to_bin_value” simply returns the bin value.
the aggregate function “distinct_values” takes the current aggregate
state and a value, and returns the new aggregate state. Only unique
values are retained in a map as keys.
the reduce function “merge_values”is a map merge that merges two
maps that has the union of their keys.
the map function “map_to_list” returns a list of map keys.
-- return map keys in a list
local function map_to_list(values)
local ret = list()
for k in map.keys(values) do list.append(ret, k) end
return ret
end
-- merge partial aggregate maps
local function merge_values(a, b)
return map.merge(a, b, function(v1, v2) return ((v1 or v2) and 1) or nil end)
SELECT bin FROM namespace.set WHERE condition LIMIT N
In the following example, up to 10 values in bin2 are produced for
records whose bin1 is in the range [101,200].
SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 LIMIT 10
The UDF “limit” returns a single bin with an upper limit on number of
results returned.
LIMIT
It takes the bin and max limit. and returns up to max number of bin
values. The pipeline consists of aggregate and reduce.
the aggregate function “list_limit” takes the current aggregate
state and a record, and returns the new aggregate state by adding
the record’s bin value to a list only if the list size is below the
limit.
the reduce function “list_merge_limit” merges two lists to retain
only max number of values.
function limit(stream, bin, max)
local function list_limit(agg, rec)
-- add to list if the list size is below the limit
if list.size(agg) < max then
local ret = map()
ret[bin] = rec[bin]
list.append(agg, ret)
end
return agg
end
local function list_merge_limit(a, b)
local ret = list()
list.concat(ret, list.take(a, max))
list.concat(ret, list.take(b, (max > list.size(ret) and max-list.size(ret)) or 0))
SELECT bin FROM namespace.set WHERE condition ORDER BY bin DESC LIMIT N
TOP N can be processed by retaining top N values in a list in aggregate
as well as reduce operators.
In the following example, top 10 values in bin2 are produced for records
whose bin1 is in the range [101,200].
SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 ORDER BY bin2 DESC LIMIT 10
The UDF “top_n” returns the top N values from a bin.
TOP_N
It takes the bin and N. and returns top N bin values. The pipeline
consists of map, aggregate, reduce, and map.
the map function “rec_to_bin_value” simply returns the bin value.
the aggregate function “top_n_values” takes the current aggregate
state and a record, and returns the new aggregate state by retaining
distinct bin values in a map. It trims the retained values by
retaining only top N values if the retained values ever exceed a max
limit (in this code 10*N).
the reduce function “merge_values” (see above) merges two maps that
represent top n values in two partial streams.
the map function “get_top_n” return top n values in a map as an
ordered list. It leverages the table sort function for sorting.
-- top n
function top_n(stream, bin, n)
local function get_top_n(values)
-- return top n values in a map as an ordered list
Many developers that are familiar with SQL would like to see how SQL
operations translate to Aerospike. We looked at how to implement various
aggregate statements. This should be generally useful irrespective of
the reader’s SQL knowledge. While the examples here use synchronous
execution, many operations can also be performed asynchronously.
Visit Aerospike notebooks
repo to
run additional Aerospike notebooks. To run a different notebook,
download the notebook from the repo to your local machine, and then
click on File->Open, and select Upload.