Processing Query Results as a Stream of Records
For an interactive Jupyter notebook experience:
This tutorial shows processing of query results as a stream of records and related capabilities.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
The notebook shows how to:
- process query results as a stream of records,
- paginate over results,
- partition a query for parallelism, and
- resume query execution at a later time.
Refer to the adjunct blog post Working with Query Result Streams for more information.
Prerequisites
This tutorial assumes familiarity with the following topics:
Setup
Ensure database is running
This notebook requires that Aerospike database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Download and install additional components.
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>6.1.0</version>
</dependency>
</dependencies>
Initialize Client
Initialize the client that can be used for both sync and async processing modes.
Initialize event loops for async processing mode
We will use async processing using NIO event loops, but the other event loop types may also be used. The event loops initialization is needed only if asynchronous API calls are used.
import java.util.concurrent.atomic.AtomicInteger;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventLoop;
import com.aerospike.client.async.Throttles;
import com.aerospike.client.async.Monitor;
import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.listener.RecordSequenceListener;
// initialize event loops
final int NumLoops = 2;
final int CommandsPerEventLoop = 50;
final int DelayQueueSize = 50;
EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = CommandsPerEventLoop;
eventPolicy.maxCommandsInQueue = DelayQueueSize;
EventLoops eventLoops = new NioEventLoops(eventPolicy, NumLoops);
// initialize event loop throttles
Throttles throttles = new Throttles(NumLoops, CommandsPerEventLoop);
System.out.format("Throttles initialized for %s loops with %s concurrent operations per loop.\n",
NumLoops, CommandsPerEventLoop);;
Output:
Throttles initialized for 2 loops with 50 concurrent operations per loop.
Initialize client with event loops
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;
ClientPolicy clientPolicy = new ClientPolicy();
// needed only if async apis are used
clientPolicy.eventLoops = eventLoops;
int concurrentMax = CommandsPerEventLoop * NumLoops;
if (clientPolicy.maxConnsPerNode < concurrentMax) {
clientPolicy.maxConnsPerNode = concurrentMax;
}
// initialize the client
Host[] hosts = Host.parseHosts("localhost", 3000);
AerospikeClient client = new AerospikeClient(clientPolicy, hosts);
System.out.println("Initialized the client and connected to the cluster.");;
Output:
Initialized the client and connected to the cluster.
Includes and Constants
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.client.query.PartitionStatus;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.Record;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;
final String Namespace = "test";
final String SetIndexed = "indexed";
final String SetUnindexed = "unindexed";
final String KeyPrefix = "id-";
final Integer NumRecords = 10000;
Populate Test Data.
The test data consists of NumRecords records in each set, each with a user key "id-\<i>", an integer bin "bin1" with value i, and another integer bin with value 10*i, where 1 \<= i \<= NumRecords.
The set SetIndexed has a set index and an integer secondary index on "bin1". The set SetUnindexed has no set or secondary index, and is used to illustrate primary index query functionality.
// convenience function to truncate test data
void truncateTestData() {
try {
client.truncate(null, Namespace, null, null);
}
catch (AerospikeException e) {
// ignore
}
}
// convenience function to initialize test data
void initializeTestData() {
truncateTestData();
WritePolicy wPolicy = new WritePolicy(client.writePolicyDefault);
wPolicy.sendKey = true;
for (int i=0; i < NumRecords; i++) {
Bin bin1 = new Bin("bin1", i+1);
Bin bin2 = new Bin("bin2", 10*(i+1));
Key key1 = new Key(Namespace, SetIndexed, KeyPrefix+(i+1));
Key key2 = new Key(Namespace, SetUnindexed, KeyPrefix+(i+1));
try {
client.put(wPolicy, key1, bin1, bin2);
client.put(wPolicy, key2, bin1, bin2);
}
catch (AerospikeException e) {
System.out.format("%s", e);
}
}
}
initializeTestData();
System.out.println("Test data populated.");;
Output:
Test data populated.
Create Indexes
The system defined primary index already exists for the namespace. We will create a secondary index and a set index on the set SetIndexed in order to show a secondary index and set index query (scan) capabilities using this set.
The set SetUnindexed does not have a secondary or set index, which means a query (scan) of this set must use the primary index. We will use this set to show the primary index query (scan) capabilities.
Create Secondary Index
final String IndexName = "idx_indexed_bin1_number";
try {
IndexTask task = client.createIndex(null, Namespace, SetIndexed, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, SetIndexed, "bin1");;
Output:
Created index idx_indexed_bin1_number on ns=test set=indexed bin=bin1.
Create Set Index
// Enable set index on the set 'indexed'.
%sh asinfo -v "set-config:context=namespace;id=test;set=indexed;enable-index=true"
System.out.println("Set index created on set 'indexed'.");;
Output:
Set index created on set 'indexed'.
Define Convenience Functions
Define convenience functions to process results, which simply involves printing them.
// a convenience function to process a record which simply prints its user key and bins
void processRecord(Key key, Record rec) {
System.out.format("Record key: %s, bins: %s\n", key.userKey, rec.bins);
}
// a convenience function to process results
void processResults(RecordSet rs) {
int recs = 0;
try {
while (rs.next()) {
recs++;
Key key = rs.getKey();
Record rec = rs.getRecord();
processRecord(key, rec);
}
}
finally {
rs.close();
}
}
Overview
The main sections in the notebook are:
- Query results as a record stream
- Pagination
- Parallelism with query partitions
- Resuming with partition cursors
Query Results as a Stream of Records
The following examples show how all results are retrieved with one request and processed as a record stream:
- secondary index query results in a sync and an async request
- set index query (scan) with an expression filter results in a sync request
- primary index query (scan) with an expression filter results in a sync request
Note that an expression filter is different from the query filter. The former can be used with any type of query and is specified in the query policy, whereas the latter can only be used with a secondary index query and is specified in the query statement.
When the query filter is null or unspecified in a query, the query is executed as a set scan using a set index, if one exists, or the primary index.
In the examples below, we use the expression filter only with the set and primary index queries (scans) to make the returned results equivalent to those from the secondary index query.
Secondary Index Query
The secondary index filter is specified in the query statement. When a query filter is specified, the corresponding secondary index must exist, otherwise the query returns an error.
Sync Processing
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
// process record stream
processResults(rs);
Output:
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-12, bins: {bin1=12, bin2=120}
Async Processing
The query statement is the same, but the setup of the async request is more involved. Please see the tutorial Understanding Asynchronous Operations for details.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
// async request framework
// query monitor synchronizes the main thread with completion of the query
Monitor queryMonitor = new Monitor();
// submit async operation with throttle by waiting for an available slot
EventLoop eventLoop = eventLoops.next();
int eventLoopIndex = eventLoop.getIndex();
if (throttles.waitForSlot(eventLoopIndex, 1)) {
try {
// the async callback object has three methods: onRecord, onSuccess, onFailure
client.query(eventLoop, new RecordSequenceListener() {
// called for each record
public void onRecord(Key key, Record rec) throws AerospikeException {
processRecord(key, rec);
}
// called on successful completion
public void onSuccess() {
throttles.addSlot(eventLoopIndex, 1);
queryMonitor.notifyComplete(); // unblock the main thread
}
// called in case of a failure
public void onFailure(AerospikeException e) {
throttles.addSlot(eventLoopIndex, 1);
System.out.format("Error: query failed with exception - %s", e);
queryMonitor.notifyComplete();
}
},
qPolicy, stmt);
}
catch (Exception e) {
System.out.format("Error: exception in record sequence listener - %s\n", e.getMessage());
}
}
// the main thread waits for the query to complete
queryMonitor.waitTillComplete();
Output:
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}
Set Index Query (Scan)
The set is scanned when the query filter is not specified using a set index if it is available.
We use an equivalent expression filter to make the results same as the secondary index query results. The expression filter is specified in the query policy. See the tutorial Understanding Expressions for the details on expressions.
// using the set index
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed); // a set index is used when it is available
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
// process record stream
processResults(rs);
Output:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Primary Index Query (Scan)
The set is scanned when the query filter is not specified using the primary index when a set index is not available.
We use an equivalent expression filter to make the results same as the secondary index query results. The expression filter is specified in the query policy. See the tutorial Understanding Expressions for the details on expressions.
// using the primary index
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed); // the primary index is used when a set index is absent
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
// process record stream
processResults(rs);
Output:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-10, bins: {bin1=10, bin2=100}
Pagination
An application can get query results in chunks by specifying maximum number of records returned in a single response, and iterating until all results are retrieved using queryPartitions
API call. The partitionFilter
associated with the query supports the isDone
test to check if there are more records to process in the stream.
Pagination for queries on all index types is shown below.
Paginating Secondary Index Query Results
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
// set max number of records to be retrieved
stmt.setMaxRecords(3);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
PartitionFilter pFilter;
pFilter = PartitionFilter.all(); // include all data partitions
int pagenum = 0;
while (!pFilter.isDone()) { // until no more results to process
pagenum++;
System.out.format("Page %d: \n", pagenum);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
}
Output:
Page 1:
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}
Page 2:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Page 3:
Record key: id-11, bins: {bin1=11, bin2=110}
Paginating Set Index Query Results
The set index is used for SetIndexed.
// using the set index
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed); // a set index is used when it is available
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// set max number of records to be retrieved
stmt.setMaxRecords(3);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
PartitionFilter pFilter;
pFilter = PartitionFilter.all(); // include all data partitions
int pagenum = 0;
while (!pFilter.isDone()) { // until no more results to process
pagenum++;
System.out.format("Page %d: \n", pagenum);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
}
Output:
Page 1:
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}
Page 2:
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Page 3:
Record key: id-11, bins: {bin1=11, bin2=110}
Paginating Primary Index Query Results
The primary index is used as there is no set index defined on SetUnindexed.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed); // the primary index is used when a set index is absent
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
// set max number of records to be retrieved
stmt.setMaxRecords(3);
// sync quey request returns a record stream
RecordSet rs = client.query(qPolicy, stmt);
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions
int pagenum = 0;
while (!pFilter.isDone()) { // until no more results to process
pagenum++;
System.out.format("Page %d: \n", pagenum);
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
}
Output:
Page 1:
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-7, bins: {bin1=7, bin2=70}
Page 2:
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-10, bins: {bin1=10, bin2=100}
Page 3:
Record key: id-12, bins: {bin1=12, bin2=120}
Parallelism with Query Partitions
The queryPartitions
API provides the ability to select specific partitions so that the application can control how to distribute work over multiple workers for the desired level of parallelism, with each worker processing the query over its assigned partitions.
Below, 4096 partitions are split across three sub-queries. We execute the sub-queries sequentially, but it is easy to imagine them being assigned to individual workers or threads and processed in parallel.
A secondary index query is shown below, but it also works with set and primary index queries. The code will be as shown in the earlier examples.
You can define queries at a granularity finer than a partition. For detailed discussion of parallelism, refer to the blog post Processing Large Data Sets in Fine-Grained Parallel Streams.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
// create multiple sub-queries that divide and cover 0-4095 partitions
PartitionFilter pFilter1, pFilter2, pFilter3;
pFilter1 = PartitionFilter.range(0, 1366); // 0-1365 partitions
pFilter2 = PartitionFilter.range(1366, 1366); // 1366-2731 partitions
pFilter3 = PartitionFilter.range(2732, 1364); // 2732-4095 partitions
// run the sub-queries
System.out.format("Subquery 1: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter1);
processResults(rs);
System.out.format("Subquery 2: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs);
System.out.format("Subquery 3: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter3);
processResults(rs);
Output:
Subquery 1:
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Subquery 2:
Record key: id-9, bins: {bin1=9, bin2=90}
Subquery 3:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-11, bins: {bin1=11, bin2=110}
Resuming a Query
A query can be resumed from the point where the result stream processing is left off.
Use the queryPartitions
API to resume a query.
The queryPartitions
API allows the application to get the partition cursors using the getPartitions
call. The partition cursors mark points in corresponding partitions from which the query request can resume. The cursor state can be set in another query request to resume processing. Note that a returned stream from a sync request must be read completely in order to resume the query correctly.
The code examples below illustrate:
- Resume the same query
- Set partitions state in a different query
Resuming Same Query
Read partially from the stream, and resubmit the query request using the same query instance to obtain a new stream for the next results. Note all records are returned across the two calls.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions
stmt.setMaxRecords(3); // request 3 results
System.out.format("Paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();
System.out.format("\nResumed after 3 results: \n");
// cursor state is set in a new filter
PartitionFilter pFilter2 = PartitionFilter.all();
pFilter2.setPartitions(cursors); // set cursor state
stmt.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs2);
Output:
Paused after 3 results:
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Resumed after 3 results:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-11, bins: {bin1=11, bin2=110}
Setting State in Different query
Read partially from the stream, and resume it later in a different programming context by submitting a new query request in which the saved cursor state is reinstated. The cursor state is serialized and deserialized between the two calls to iluustrate arbitrarily separate programming contexts.
Note all records are returned across the two calls.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions
stmt.setMaxRecords(3); // request 3 results
System.out.format("Paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();
// serialize to save cursors
import org.apache.commons.lang3.SerializationUtils;
byte[] serialized = SerializationUtils.serialize(cursors);
System.out.format("\nResumed after 3 results - with a new query: \n");
// a new query with the same query parameters
QueryPolicy qPolicy2 = new QueryPolicy();
Statement stmt2 = new Statement();
stmt2.setNamespace(Namespace);
stmt2.setSetName(SetIndexed);
stmt2.setFilter(Filter.range("bin1", 7, 13)); // range filter uses the secondary index on bin1
PartitionFilter pFilter2 = PartitionFilter.all(); // include all data partitions
// cursors are set to resume the query from the saved state
// deserialize to restore
InputStream instr = new ByteArrayInputStream(serialized);
ObjectInputStream obj = new ObjectInputStream(instr);
PartitionStatus[] cursors2 = (PartitionStatus[]) obj.readObject();
pFilter2.setPartitions(cursors2);
stmt2.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy2, stmt2, pFilter2);
processResults(rs2);
Output:
Paused after 3 results:
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-7, bins: {bin1=7, bin2=70}
Resumed after 3 results - with a new query:
Record key: id-12, bins: {bin1=12, bin2=120}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Cursors with Set Index Query
Query resume works as expected with set index queries.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed); // a set index is used when it is available
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions
stmt.setMaxRecords(3); // request 3 results
System.out.format("Set index scan paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();
System.out.format("\nSet index scan resumed after 3 results: \n");
// cursor state is set in a new filter
PartitionFilter pFilter2 = PartitionFilter.all();
pFilter2.setPartitions(cursors); // set cursor state
stmt.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs2);
// set cursors for a set index query
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetIndexed);
stmt.setMaxRecords(3);
stmt.setFilter(null);
QueryPolicy qPolicy = new QueryPolicy();
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(3)), Exp.le(Exp.intBin("bin1"), Exp.val(7)));
qPolicy.filterExp = Exp.build(rangeFilter);
PartitionFilter pFilter = PartitionFilter.all();
Output:
Set index scan paused after 3 results:
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-10, bins: {bin1=10, bin2=100}
Set index scan resumed after 3 results:
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-13, bins: {bin1=13, bin2=130}
Record key: id-12, bins: {bin1=12, bin2=120}
Cursors with Primary Index Query
Query resume works as expected with primary index queries.
QueryPolicy qPolicy = new QueryPolicy();
// query statement defines contents of query results
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SetUnindexed); // primary index is used when a set index is absent
// no query filter means a scan using a set or primary index
// use expression filter equivalent to query filter
Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin("bin1"), Exp.val(7)), Exp.le(Exp.intBin("bin1"), Exp.val(13)));
qPolicy.filterExp = Exp.build(rangeFilter);
PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions
stmt.setMaxRecords(3); // request 3 results
System.out.format("Primary index scan paused after 3 results: \n");
RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);
processResults(rs);
// get cursors in partitions
PartitionStatus[] cursors = pFilter.getPartitions();
System.out.format("\nPrimary index scan resumed after 3 results: \n");
// cursor state is set in a new filter
PartitionFilter pFilter2 = PartitionFilter.all();
pFilter2.setPartitions(cursors); // set cursor state
stmt.setMaxRecords(0); // request all remaining results
RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);
processResults(rs2);
Output:
Primary index scan paused after 3 results:
Record key: id-7, bins: {bin1=7, bin2=70}
Record key: id-8, bins: {bin1=8, bin2=80}
Record key: id-13, bins: {bin1=13, bin2=130}
Primary index scan resumed after 3 results:
Record key: id-10, bins: {bin1=10, bin2=100}
Record key: id-9, bins: {bin1=9, bin2=90}
Record key: id-11, bins: {bin1=11, bin2=110}
Record key: id-12, bins: {bin1=12, bin2=120}
Takeaways
The notebook showed code examples for how to process query results as a stream of records, paginate over results, partition a query for parallelism, and resume query execution at a later time.
Further Exploration and Resources
Here are some links for further exploration
Resources
- Working with Query Result Streams (blog post)
- Expressions in Aerospike (interactive tutorial)
- Understanding Asynchronous Operations in Aerospike (interactive tutorial)
- Aerospike Developer Hub