Aerospike queries allow filtering based on a predicate and User Defined
Functions (UDFs) offer arbitrary server side processing. This notebook
illustrates how a query and a UDF can be combined in a useful pattern.
Two examples are given: the first with a query with a UDF aggregate
function, and the second with a query, predicate expression and a UDF
update function. The code for the first example is also available in
this
repo.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Getting Started
Ensure database is running
This notebook requires that Aerospike Database is running.
The test data has ten records with user-key 1 through 10, two bins
(fields) “binint” and “binstr”, in the namespace “test” and set “demo”.
The two bins are initialized with the user key and a string of the form
“(id). (name)”.
if (ae.getResultCode()!=ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, BinInt);
Output:
Created index idx_numeric_test_demo_binint on ns=test set=demo bin=binint.
Part 1: Query with aggregate function
In the first example, we will demonstrate how an aggregate function
(sum) is calculated over a stream of records returned by a query. First
we will create the aggregate function in an UDF module. A UDF function is
like a stored procedure that is executed on all server nodes of
the cluster. All streams of the partial node-specific answers are then
combined locally using the same UDF function. For this reason, the UDF
module must be registered to the server for the first phase of parallel
processing across all node, and also available locally for the final
phase of aggregation.
Create UDF module and aggregate function
Examine the following Lua code that is aggregating (reducing) a stream
of records into a sum of bin values. Create the UDF module
“sum_example.lua” in “udf” directory.
Create a “udf” directory and place sum_example.lua file with this content in it. Alternatively, execute the following two cells to achieve the same effect.
-- sum_example.lua
local function reducer(val1,val2)
return val1 + val2
end
function sum_single_bin(stream,name)
local function mapper(rec)
return rec[name]
end
return stream : map(mapper) : reduce(reducer)
end
importjava.nio.file.Files;
importjava.nio.file.Paths;
importjava.io.FileWriter;
voidCreateUDFModule(String name, String code) {
try {
if (!Files.exists(Paths.get("./udf"))) {
Files.createDirectory(Paths.get("./udf"));
}
FileWriterfw=newFileWriter("./udf/"+ name);
fw.write(luaCode);
fw.close();
}
catch(Exceptione) {
System.out.format("Failed to create Lua module %s, exception: %s.",
"udf/"+name, e);
}
}
// Execute this cell to create UDF module "udf/sum_example.lua" <pre>
// To execute, first convert the cell type from markdown to code.
System.out.format("Sum mismatch: Expected %d. Received %d.", expected, (int)sum);
}
count++;
}
if (count ==0) {
System.out.println("Query failed. No records returned.");
}
}
finally {
rs.close();
}
Output:
Processing results:
Sum matched! Value=22.
Part 2: Query, predicate expression, and UDF update
We will illustrate an update UDF function with a query and predicate
expression.
Let’s say we want to:
update all records by multiplying the integer bin value by 5
that have the bin value between 2 and 9,
AND whose string bin value have either “Smith” or “Jones” in them.
Records with user-keys 3, 4, 6 and 8 meet these conditions.
This update can be achieved in different ways using a combination of
query, predicate expression, and UDF. For the purpose of this exercise,
we use a query with the “between” predicate, a predicate expression for
string comparison, and a UDF to update the integer bin.
Let’s start defining them one by one starting with a new UDF.
Create UDF module with update function
Examine the code below, It simply multiplies a bin value by the input
factor and updates the record.
Create a “udf” directory and place update_example.lua file with this content in it. Alternatively, execute the following cell to achieve the same effect.
-- update_example.lua
function multiplyBy(rec, binName, factor)
rec[binName] = rec[binName] * factor
aerospike:update(rec)
end
// Execute this cell to create UDF module "udf/update_example.lua" <pre>
// To execute, first convert the cell type from markdown to code.
System.out.format("Registered the UDF module %s.", UDFFile);
Output:
Registered the UDF module update_example.lua.
Define the query statement
Specify the namespace, set, bins, and query filter.
Statementstmt=newStatement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames(BinInt, BinStr);
intbegin=3;
intend=9;
// Filter is evaluated using a secondary index and therefore can only reference an indexed bin.
stmt.setFilter(Filter.range(BinInt, begin, end));
System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d",
Namespace, Set, BinInt, begin, end);
Output:
Query on ns=test set=demo, with bin binint >= 3 <= 9
Define the predicate expression filter
In addition to the predicate in the query (which requires a secondary
index), additional filtering can be specified using a predicate
expression. A predicate expression is specified as part of the request
policy and does not require a secondary index. It is evaluated on each
record returned after applying the query predicate, and only the records
that evaluate True are processed further (in this case for update with
the UDF function).
Here the predicate expression is the string bin has either “smith” or
“jones” in it. We use an expression with an OR clause that combines two
regular expression matches.
// Predicate Expressions are applied on query results on server side.
System.out.println("Index dropped and server connection closed.");
Output:
Index dropped and server connection closed.
Explore other query, expression, and UDF capabilities
Feel free to check out the code example in the
repo,
and also explore other examples, and capabilities of queries, expression,
and UDF.
Next steps
Visit Aerospike notebooks
repo to
run additional Aerospike notebooks. To run a different notebook,
download the notebook from the repo to your local machine, and then
click on File->Open, and select Upload.