Batch Operations in Aerospike
For an interactive Jupyter notebook experience:
This tutorial describes the batch operations in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will describe the batch capabilities in Aerospike.
Batch functionality in Aerospike Java Client versions before 6.0 and Aerospike Database versions before 6.0 was supported only for read operations. With the Java Client 6.0+ and Aerospike Database 6.0+ working together, batch executions are expanded to include write, UDF, and delete. The notebook focuses on the newly added capabilities. The older read batch operations are described elsewhere including here.
The specific topics covered in this notebook include:
- New batch functionality
- Code examples of the synchronous batch APIs
Prerequisites
This tutorial assumes familiarity with the following topics:
Setup
Ensure database is running
This notebook requires that Aerospike database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Add second namespace and restart database
Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal. Run the add-namespace.sh
script to add a namespace test2
to the config and restart the server.
~/notebooks/java/add_namespace.sh test2
Download and install additional components.
Install the Java client version 6.0 or above that supports the new batch capabilities.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>6.0.0</version>
</dependency>
</dependencies>
Initialize Client
Initialize the client.
import com.aerospike.client.AerospikeClient;
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");;
Output:
Initialized the client and connected to the cluster.
Define Constants and Helper Functions
Define constants for the namespaces test
and test2
, sets batch-ops
and batch-ops2
, and helper functions truncateTestData
, initializeTestData
, and printRecords
.
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
final String Namespace1 = "test";
final String Namespace2 = "test2";
final String Set1 = "batch-ops";
final String Set2 = "batch-ops2";
final String KeyPrefix = "id-";
// convenience function to truncate test data
void truncateTestData() {
try {
client.truncate(null, Namespace1, null, null);
client.truncate(null, Namespace2, null, null);
}
catch (AerospikeException e) {
// ignore
}
}
// convenience function to initialize test data
void initializeTestData() {
truncateTestData();
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 3; i++) {
for (String ns : Arrays.asList(Namespace1, Namespace2)) {
for (String set : Arrays.asList(Set1, Set2)) {
Key key = new Key(ns, set, KeyPrefix+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 10*i);
HashMap <Integer, Integer> map = new HashMap <Integer, Integer>();
for (int j = 1; j <= i; j++) {
map.put(j, j*10);
}
Bin bin3 = new Bin("bin3", map);
client.put(wpolicy, key, bin1, bin2, bin3);
}
}
}
}
// convenience function to print all records in a namespace and set
// (please note this is not an efficient implementation to scan all records across sets/namespaces.
// refer to set-index and scan documentation for additional pointers on this topic.)
import com.aerospike.client.Record;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.policy.ScanPolicy;
public class ScanParallel implements ScanCallback {
public void scanCallback(Key key, Record record) {
System.out.format("\tKey %s: %s\n", key.userKey, record.bins);
}
}
void printRecords() {
System.out.println("Records in database:");
for (String ns : Arrays.asList(Namespace1, Namespace2)) {
for (String set : Arrays.asList(Set1, Set2)) {
System.out.format("Namespace: %s, set: %s: \n", ns, set);
client.scanAll(null, ns, set, new ScanParallel());
}
}
}
Populate and Examine Test Data
Populate and examine the test data. It contains 3 records each in the following 4 sets:
- set batch-ops in namespace test
- set batch-ops2 in namespace test
- set batch-ops in namespace test2
- set batch-ops2 in namespace test2
Each record has:
- user key: a unique sequential number k (1-3) prefixed with "id-"
- bin1: integer with value of k
- bin2: integer with vslue of k * 10
- bin3: map holding k keys (1-k) and corresponding values k * 10
initializeTestData();
System.out.format("Test data populated.\n");;
printRecords();
Output:
Test data populated.
Records in database:
Namespace: test, set: batch-ops:
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Namespace: test, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Namespace: test2, set: batch-ops:
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Namespace: test2, set: batch-ops2:
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Register UDF
In the code examples later, we will be using UDF functions in the "update_example.lua" module under "udf" directory. Register the UDF with the server by executing the following code cell. The function invalidates the cache, removes the currently registered module, and registers the latest version.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";
String UDFFile = "update_example.lua";
String UDFModule = "update_example";
void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Output:
Registered the UDF module update_example.lua.
Import Client Modules
Import the Java Client modules used in this notebook.
import com.aerospike.client.BatchRecord;
import com.aerospike.client.BatchResults;
import com.aerospike.client.ResultCode;
import com.aerospike.client.BatchWrite;
import com.aerospike.client.BatchDelete;
import com.aerospike.client.BatchUDF;
import com.aerospike.client.BatchRead;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.BatchDeletePolicy;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapPolicy;
import com.aerospike.client.cdt.MapReturnType;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.ListExp;
import com.aerospike.client.exp.MapExp;
import com.aerospike.client.exp.ExpOperation;
import com.aerospike.client.exp.ExpReadFlags;
import com.aerospike.client.exp.ExpWriteFlags;
import com.aerospike.client.exp.Expression;
New Batch Capabilities
We will illustrate the following new batch capabilities with code examples below.
- Multi-key operate: Performs the same set of operations on multiple records.
- Multi-key UDF execute: Executes the same UDF function on multiple records.
- Multi-key delete: Deletes multiple records.
- General batch operate: Allows a separate list of operations for each record in the batch.
A few important things to keep in mind about batch operations:
- Transaction semantics. The batch operations are not transactional. The transactional boundary assured is for individual key operations. In the general batch operate function, if the key is specified multiple times, the transaction is limited to each specific occurrence.
- Atomicity. A batch is not processed atomically. There is no rollback available for partially successful operations.
- Order of execution. Order within a batch write is not guaranteed unless “in line” for write is specified.
- Maximum batch size. The maximum batch size in a request (sent to a single server node) is defined by the configurable server parameter batch-max-requests (default: 5000).
In this notebook, we will explore the synchronous version of the APIs. The asychronous versions have the same operation semantics, and can be implemented using the setup instructions in this notebook.
Multi-key Operate
BatchResults operate(BatchPolicy batchPolicy,
BatchWritePolicy writePolicy,
Key[] keys,
Operation... ops)
It allows you to specify a list of keys and a list of operations. In the operations list:
- Read and write operations can be mixed.
- Read operations must specify individual bins.
- Deletes can be specified.
- UDF operations cannot be specified.
Note in the example below:
- BatchResults contains an array of BatchRecords with resultCode, key, and record fields. The record field holds the return values by bin.
- Each successful operation always returns a result, which may be a null, for example, for a write operation.
- There may be multiple operations on the same bin. Each operation result is stored in the record field in a bin-specific result list. Use getList(binName) or getValue(binName) to get the results list, 0-based "bin relative" operation index to retrieve op results, and type cast the value appropriately. See bin2 and bin3 operations in the example below.
- For a single occurrence of a bin in operations, the results can be obtained simply using the type-specific get operation. See bin1 operation in the example below.
- In case of an error, the resultCode has the error code and the record field is null.
- You can peform read batch operations using this new API in 6.0, as well as the existing batch read capabilities.
// start with a clean initialized test data
initializeTestData();
// Batch of 8 keys, 2 in each of these namespace/set combinations:
// (test, batch-ops), (test, batch-ops2), (test2, batch-ops), (test2, batch-ops2)
int NUM_KEYS = 8;
Key[] keys = new Key[NUM_KEYS];
for (int i = 0; i < NUM_KEYS/4; i++) {
keys[i] = new Key(Namespace1, Set1, KeyPrefix + (i+1));
keys[NUM_KEYS/4+i] = new Key(Namespace1, Set2, KeyPrefix + (i+1));
keys[2*NUM_KEYS/4+i] = new Key(Namespace2, Set1, KeyPrefix + (i+1));
keys[3*NUM_KEYS/4+i] = new Key(Namespace2, Set2, KeyPrefix + (i+1));
}
// Perform the following operations on the keys.
// 1) Read: get bin1
// 2) Write: increment bin2 by 1
// 3) Read: get bin2
// 4) Write: add a map element (0, 0) to bin3
// 5) Read: get the largest value in the map bin3
// send the multi-key operate batch request
BatchResults bresults = client.operate(null, null, keys,
Operation.get("bin1"), // Op 1, single bin1 op
Operation.add(new Bin("bin2", Value.get(1))), // Op 2, first bin2 op
Operation.get("bin2"), // Op 3, second bin2 op
MapOperation.put(MapPolicy.Default, "bin3", Value.get(0),
Value.get(0)), // Op 4, first bin3 op
MapOperation.getByRank("bin3", -1, MapReturnType.VALUE) // Op 5, second bin3 op
);
// check if all operations succeeded
if (bresults.status) {
System.out.println("All batch operations succeeded.");
}
else {
System.out.println("Some batch operations failed.");
}
// process the BatchResults returned from the batch operation
for (int i = 0; i < bresults.records.length; i++) {
BatchRecord br = bresults.records[i];
Record rec = br.record;
if (br.resultCode == ResultCode.OK) { // check individual key status
long bin1Val = rec.getLong("bin1"); // bin1 has one operation, op result directly accessible
List<?> bin2Results = rec.getList("bin2"); // bin2 and bin3 have multiple ops; access results through a list
List<?> bin3Results = rec.getList("bin3");
// note the result order within each list matches ops order for the bin
System.out.format("Result[%d]: key: %s/%s/%s, bin1: %d, bin2: %d, bin3 size: %d, bin3 max val: %d\n",
i, br.key.namespace, br.key.setName, br.key.userKey, bin1Val, (long)bin2Results.get(1), (long)bin3Results.get(0), (long)bin3Results.get(1));
}
else { // error in individual key's operations
System.out.format("Result[%d]: key: %s, error: %s\n",
i, br.key, ResultCode.getResultString(br.resultCode));
}
}
Output:
All batch operations succeeded.
Result[0]: key: test/batch-ops/id-1, bin1: 1, bin2: 11, bin3 size: 2, bin3 max val: 10
Result[1]: key: test/batch-ops/id-2, bin1: 2, bin2: 21, bin3 size: 3, bin3 max val: 20
Result[2]: key: test/batch-ops2/id-1, bin1: 1, bin2: 11, bin3 size: 2, bin3 max val: 10
Result[3]: key: test/batch-ops2/id-2, bin1: 2, bin2: 21, bin3 size: 3, bin3 max val: 20
Result[4]: key: test2/batch-ops/id-1, bin1: 1, bin2: 11, bin3 size: 2, bin3 max val: 10
Result[5]: key: test2/batch-ops/id-2, bin1: 2, bin2: 21, bin3 size: 3, bin3 max val: 20
Result[6]: key: test2/batch-ops2/id-1, bin1: 1, bin2: 11, bin3 size: 2, bin3 max val: 10
Result[7]: key: test2/batch-ops2/id-2, bin1: 2, bin2: 21, bin3 size: 3, bin3 max val: 20
Verify the database state. Note the changed bin2
and bin3
in keys id-1
and id-2
in the four sets.
printRecords();
Output:
Records in database:
Namespace: test, set: batch-ops:
Key id-2: {bin1=2, bin2=21, bin3={0=0, 1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=11, bin3={0=0, 1=10}}
Namespace: test, set: batch-ops2:
Key id-2: {bin1=2, bin2=21, bin3={0=0, 1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=11, bin3={0=0, 1=10}}
Namespace: test2, set: batch-ops:
Key id-2: {bin1=2, bin2=21, bin3={0=0, 1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=11, bin3={0=0, 1=10}}
Namespace: test2, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=21, bin3={0=0, 1=10, 2=20}}
Key id-1: {bin1=1, bin2=11, bin3={0=0, 1=10}}
Multi-key UDF Execute
The multi-key batch UDF request allows the same UDF function to be executed across a batch of keys.
BatchResults execute(BatchPolicy batchPolicy,
BatchUDFPolicy udfPolicy,
Key[] keys,
String packageName,
String functionName,
Value… functionArgs)
In the example below, we execute a read-write UDF function increment_and_get
in the UDF module update_example
. The function increments the specified bin's value and returns the new value.
Note:
- UDF results are obtained with
getUDFResult()
, which returns anObject
value, which in turn must be typecast to the correct type to obtain the actual value. In the example below, the UDF returns thebin2
value in a map. - A non-existent key returns a
key not found
error. - The batch policy option
respondAllKeys
governs if the batch processing should continue even if some record operations fail. Try setting it to false, and see the results.
// start with a clean initialized test data
initializeTestData();
// create a batch of 8 keys, 2 in each of these namespace/set combinations:
// (test, batch-ops), (test, batch-ops2), (test2, batch-ops), (test2, batch-ops2)
int NUM_KEYS = 9; // one extra slot for a non-existent key
Key[] keys = new Key[NUM_KEYS];
// add a non-existent key 0 to test the error path
keys[0] = new Key(Namespace1, Set1, KeyPrefix + 0);
// populate valid keys
for (int i = 0; i < NUM_KEYS/4; i++) {
keys[i+1] = new Key(Namespace1, Set1, KeyPrefix + (i+1));
keys[NUM_KEYS/4+i+1] = new Key(Namespace1, Set2, KeyPrefix + (i+1));
keys[2*NUM_KEYS/4+i+1] = new Key(Namespace2, Set1, KeyPrefix + (i+1));
keys[3*NUM_KEYS/4+i+1] = new Key(Namespace2, Set2, KeyPrefix + (i+1));
}
// perform the UDF function "increment_and_get" on the keys.
// the function takes the bin name and increment value as parameters.
String UDFModule = "update_example";
String UDFFunction = "increment_and_get";
// send the multi-key execute batch request
BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault);
bPolicy.respondAllKeys = true; // set to true/false and observe effect
BatchResults bresults = client.execute(bPolicy, null, keys,
UDFModule, UDFFunction,
Value.get("bin2"),
Value.get(1)); // increment bin2 by 1
// check if all operations succeeded
if (bresults.status) {
System.out.println("All batch operations succeeded.");
}
else {
System.out.println("Some batch operations failed.");
}
// process the BatchResults returned from the batch operation
for (int i = 0; i < bresults.records.length; i++) {
BatchRecord br = bresults.records[i];
Record rec = br.record;
if (br.resultCode == ResultCode.OK) { // check individual key status
HashMap<?,?> udfMap = (HashMap<?,?>)rec.getUDFResult(); // cast udf result to map returned by udf
long bin2Val = (long)udfMap.get("bin2"); // extract bin2 value from map // cast to map
System.out.format("Result[%d]: key: %s/%s/%s, bin2: %d\n",
i, br.key.namespace, br.key.setName, br.key.userKey, bin2Val);
}
else { // error in individual key's operations
System.out.format("Result[%d]: key: %s, error: %s\n",
i, br.key, ResultCode.getResultString(br.resultCode));
}
}
Output:
Some batch operations failed.
Result[0]: key: test:batch-ops:id-0:7b4c6a2b86aa917acb41efc8485fb20040b5ec35, error: UDF returned error
Result[1]: key: test/batch-ops/id-1, bin2: 11
Result[2]: key: test/batch-ops/id-2, bin2: 21
Result[3]: key: test/batch-ops2/id-1, bin2: 11
Result[4]: key: test/batch-ops2/id-2, bin2: 21
Result[5]: key: test2/batch-ops/id-1, bin2: 11
Result[6]: key: test2/batch-ops/id-2, bin2: 21
Result[7]: key: test2/batch-ops2/id-1, bin2: 11
Result[8]: key: test2/batch-ops2/id-2, bin2: 21
Verify the database state. Note the changed bin2
value in keys id-1
and id-2
in the four sets.
printRecords()
Output:
Records in database:
Namespace: test, set: batch-ops:
Key id-2: {bin1=2, bin2=21, bin3={1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=11, bin3={1=10}}
Namespace: test, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=21, bin3={1=10, 2=20}}
Key id-1: {bin1=1, bin2=11, bin3={1=10}}
Namespace: test2, set: batch-ops:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=11, bin3={1=10}}
Key id-2: {bin1=2, bin2=21, bin3={1=10, 2=20}}
Namespace: test2, set: batch-ops2:
Key id-2: {bin1=2, bin2=21, bin3={1=10, 2=20}}
Key id-1: {bin1=1, bin2=11, bin3={1=10}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Multi-key Delete
The multi-key batch delete allows a batch of records to be deleted.
BatchResults delete(BatchPolicy batchPolicy,
BatchDeletePolicy deletePolicy,
Key[] keys)
The example below shows deletion of multiple records across the two namespaces and their sets.
Note:
- The batch operate request described earlier also allows deletion of records, in addition to read/write operations.
- A non-existent key returns a
key not found
error.
// start with a clean initialized test data
initializeTestData();
// create a batch of 8 keys, 2 in each of these namespace/set combinations:
// (test, batch-ops), (test, batch-ops2), (test2, batch-ops), (test2, batch-ops2)
int NUM_KEYS = 9; // one extra slot for a non-existent key
Key[] keys = new Key[NUM_KEYS];
// add a non-existent key 0 to test the error path
keys[0] = new Key(Namespace1, Set1, KeyPrefix + 0);
// add valid keys
for (int i = 0; i < NUM_KEYS/4; i++) {
keys[i+1] = new Key(Namespace1, Set1, KeyPrefix + (i+1));
keys[NUM_KEYS/4+i+1] = new Key(Namespace1, Set2, KeyPrefix + (i+1));
keys[2*NUM_KEYS/4+i+1] = new Key(Namespace2, Set1, KeyPrefix + (i+1));
keys[3*NUM_KEYS/4+i+1] = new Key(Namespace2, Set2, KeyPrefix + (i+1));
}
// send the multi-key delete batch request
BatchResults bresults = client.delete(null, null, keys);
// check if all operations succeeded
if (bresults.status) {
System.out.println("All batch operations succeeded.");
}
else {
System.out.println("Some batch operations failed.");
}
// process the BatchResults returned from the batch operation
for (int i = 0; i < bresults.records.length; i++) {
BatchRecord br = bresults.records[i];
Record rec = br.record;
if (br.resultCode == ResultCode.OK) { // check individual key status
System.out.format("Result[%d]: key: %s/%s/%s deleted.\n",
i, br.key.namespace, br.key.setName, br.key.userKey);
}
else { // error in individual key's operations
System.out.format("Result[%d]: key: %s, error: %s\n",
i, br.key, ResultCode.getResultString(br.resultCode));
}
}
Output:
Some batch operations failed.
Result[0]: key: test:batch-ops:id-0:7b4c6a2b86aa917acb41efc8485fb20040b5ec35, error: Key not found
Result[1]: key: test/batch-ops/id-1 deleted.
Result[2]: key: test/batch-ops/id-2 deleted.
Result[3]: key: test/batch-ops2/id-1 deleted.
Result[4]: key: test/batch-ops2/id-2 deleted.
Result[5]: key: test2/batch-ops/id-1 deleted.
Result[6]: key: test2/batch-ops/id-2 deleted.
Result[7]: key: test2/batch-ops2/id-1 deleted.
Result[8]: key: test2/batch-ops2/id-2 deleted.
Verify the database state. Note the keys id-1
and id-2
in the four sets have been removed.
printRecords()
Output:
Records in database:
Namespace: test, set: batch-ops:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Namespace: test, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Namespace: test2, set: batch-ops:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Namespace: test2, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
General Batch Operate
In the general form of the batch operation:
- A `BatchRecord is specified using a key and the specific operation details.
- In the operation list, Read, Read-Write, Delete, and UDF operations are specified using the corresponding subclasses, namely,
BatchRead
,BatchWrite
,BatchDelete
, andBatchUDF
.
boolean operate(BatchPolicy policy,
List<BatchRecord> records)
In the code example below, we perform the following set of operations on different keys.
- Read only operations with
BatchRead
. - Read-Write operations with
BatchWrite
. - Delete with
BatchDelete
. - Read-Delete with
BatchWrite
. - UDF execution with
BatchUDF
.
Note:
- Results of multiple operations on a single bin are obtained as an ordered list.
- The general batch operate allows different operations to be peformed on different records. The batch operate described earlier allows the same set of operations across multiple records, and it does not allow UDF operations.
- The general batch operate can also be used in place of any other batch API, including the multi-key operate, multi-key UDF execute, and multi-key delete APIs described above.
- Read-only operations must use
BatchRead
, and in order to useBatchWrite
there must be at least one write operation. Deletes can be performed withBatchWrite
as well asBatchDelete
. BatchUDF
results are obtained withgetUDFResult()
, which returns anObject
value, which in turn must be typecast to the correct type to obtain the actual value. In the example below, the UDF returns thebin2
value in a map.- The error
key-not-found
does not stop batch execution even whenrespondAllKeys
policy is set to false.
// start with a clean initialized test data
initializeTestData();
// batch records array - each batch record holds a key and operations array
// a batch record can be BatchRead, BatchWrite, BatchDelete, and BatchUDF, each
//. with specific restrictions on allowed operations.
List<BatchRecord> batchRecords = new ArrayList<BatchRecord>();
// 1. Read only operations with BatchRead.
Operation[] ops1 = Operation.array(
Operation.get("bin1"),
MapOperation.getByKey("bin3", Value.get(1), MapReturnType.VALUE));
batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 1), ops1));
// 2. Read-Write operations with BatchWrite.
Operation[] ops2 = Operation.array(
Operation.add(new Bin("bin2", Value.get(1))),
Operation.get("bin2"),
MapOperation.put(MapPolicy.Default, "bin3", Value.get(0), Value.get(0)),
Operation.get("bin3"));
batchRecords.add(new BatchWrite(new Key(Namespace1, Set1, KeyPrefix + 2), ops2));
// 3. Delete with BatchDelete.
batchRecords.add(new BatchDelete(new Key(Namespace1, Set1, KeyPrefix + 3)));
// 4. Read-Write-Delete with BatchWrite.
Operation[] ops4 = Operation.array(
Operation.add(new Bin("bin2", Value.get(1))),
Operation.get("bin2"),
MapOperation.put(MapPolicy.Default, "bin3", Value.get(0), Value.get(0)),
Operation.get("bin3"),
Operation.delete());
batchRecords.add(new BatchWrite(new Key(Namespace2, Set1, KeyPrefix + 1), ops4));
// 5. UDF execution with BatchUDF.
batchRecords.add(new BatchUDF(new Key(Namespace2, Set1, KeyPrefix + 2),
UDFModule,
UDFFunction,
new Value[]{Value.get("bin2"), Value.get(1)}));
// 6. Non-existent key operation.
batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 0), ops1)); // key 0 does not exist
// execute the batch
BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault);
bPolicy.respondAllKeys = false; // note key-not-found does not stop batch execution
try {
boolean status = client.operate(bPolicy, batchRecords);
if (status) {
System.out.println("All batch operations succeeded.");
}
else {
System.out.println("Some batch operations failed.");
}
}
catch (AerospikeException e) {
System.out.format("%s", e);
}
// get and show results
// 1. Read-Only operations with BatchRead.
int i = 0;
BatchRecord batchRec = batchRecords.get(i);
Record rec = batchRec.record;
Key key = batchRec.key;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin1");
Object v2 = rec.getValue("bin3");
System.out.format("Result[%d]: key %s/%s/%s, bin1: %s, bin3[1]: %s\n",
i, key.namespace, key.setName, key.userKey, v1, v2);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 2. Read-Write operations with BatchWrite.
i = 1;
batchRec = batchRecords.get(i);
rec = batchRec.record;
key = batchRec.key;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin2");
Object v2 = rec.getValue("bin3");
System.out.format("Result[%d]: key %s/%s/%s, bin2 results: %s, bin3 results: %s\n",
i, key.namespace, key.setName, key.userKey, v1, v2);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 3. Delete with BatchDelete.
i = 2;
batchRec = batchRecords.get(i);
rec = batchRec.record;
key = batchRec.key;
if (batchRec.resultCode == ResultCode.OK) {
System.out.format("Result[%d]: key %s/%s/%s, deleted.\n",
i, key.namespace, key.setName, key.userKey);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 4. Read-Write-Delete with BatchWrite.
i = 3;
batchRec = batchRecords.get(i);
rec = batchRec.record;
key = batchRec.key;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin2");
Object v2 = rec.getValue("bin3");
System.out.format("Result[%d]: key %s/%s/%s (deleted), bin2 results: %s, bin3 results: %s\n",
i, key.namespace, key.setName, key.userKey, v1, v2);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 5. UDF execution with BatchUDF.
i = 4;
batchRec = batchRecords.get(i);
rec = batchRec.record;
key = batchRec.key;
if (batchRec.resultCode == ResultCode.OK) {
HashMap<?,?> udfMap = (HashMap<?,?>)rec.getUDFResult(); // cast udf result to map returned by udf
long bin2Val = (long)udfMap.get("bin2"); // extract bin2 value from map // cast to map
System.out.format("Result[%d]: key %s/%s/%s, bin2: %s\n",
i, key.namespace, key.setName, key.userKey, bin2Val);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 6. Non-existent key operation.
i = 5;
batchRec = batchRecords.get(i);
rec = batchRec.record;
key = batchRec.key;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin1");
Object v2 = rec.getValue("bin3");
System.out.format("Result[%d]: key %s/%s/%s, bin1: %s, bin3[1]: %s\n",
i, key.namespace, key.setName, key.userKey, v1, v2);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
Output:
Some batch operations failed.
Result[0]: key test/batch-ops/id-1, bin1: 1, bin3[1]: 10
Result[1]: key test/batch-ops/id-2, bin2 results: [null, 21], bin3 results: [3, {0=0, 1=10, 2=20}]
Result[2]: key test/batch-ops/id-3, deleted.
Result[3]: key test2/batch-ops/id-1 (deleted), bin2 results: [null, 11], bin3 results: [2, {0=0, 1=10}]
Result[4]: key test2/batch-ops/id-2, bin2: 21
Result[5]: error: Key not found
Verify database state. Note updates to test/batch-ops/id-2 and test2/batch-ops/id-2, and removal of test2/batch-ops/id-1 and test/batch-ops/id-3.
printRecords();
Output:
Records in database:
Namespace: test, set: batch-ops:
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-2: {bin1=2, bin2=21, bin3={0=0, 1=10, 2=20}}
Namespace: test, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Namespace: test2, set: batch-ops:
Key id-2: {bin1=2, bin2=21, bin3={1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Namespace: test2, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Related Topics
We will discuss the following topics related to the new batch functionality:
- Read and Write operation expressions
- Filter expressions
- Inline processing
- Asynchronous batch processing
- Batch reads
Using Read and Write Operation Expressions
Expressions were introduced in Aerospike Database 5.7 release. Filter Expressions are used in the request policy to select records for processing. Read or Write Operation Expressions are used to retrieve a server side computation result or update a bin with it. In batch requests, Operation Expressions can be used wherever Operation is allowed, that is, in all batch operate() APIs.
Below is an example of multi-key operate using Operation Expressions. We use two operation expressions using multi-key batch operate API: a write expression to write to a new bin the results of a server side computation, and to read the results of another server side computation. The specific expression operations are:
- Write expression: bin4 = (list of values from bin3 map) - (bin2 value)
- Read expression: min(bin4) - bin1
Note:
- Below, the write expression does not write a null list to
bin4
for the keyid-1
, therefore the read expression fails.
// start with a clean initialized test data
initializeTestData();
// create a batch of 3 keys in (test, batch-ops)
int NUM_KEYS = 3;
Key[] keys = new Key[NUM_KEYS];
// add keys
for (int i = 0; i < NUM_KEYS; i++) {
keys[i] = new Key(Namespace1, Set1, KeyPrefix + (i+1));
}
// create write and read expressions
// new list = list of values from bin3 map - bin2 value
Expression writeExp = Exp.build(
ListExp.removeByValue(Exp.intBin("bin2"),
MapExp.getByIndexRange(MapReturnType.VALUE,
Exp.val(0), Exp.val(100), Exp.mapBin("bin3"))));
// min(bin4) - bin1
Expression readExp = Exp.build(
Exp.sub(
ListExp.getByRank(ListReturnType.VALUE, Exp.Type.INT,
Exp.val(0), Exp.listBin("bin4")),
Exp.intBin("bin1")));
// send the multi-key operate batch request with write and read expressions
BatchResults bresults = client.operate(null, null, keys,
ExpOperation.write("bin4", writeExp, ExpWriteFlags.DEFAULT),
ExpOperation.read("read-exp", readExp, ExpReadFlags.DEFAULT));
// check if all operations succeeded
if (bresults.status) {
System.out.println("All batch operations succeeded.");
}
else {
System.out.println("Some batch operations failed.");
}
// process the BatchResults returned from the batch operation
for (int i = 0; i < bresults.records.length; i++) {
BatchRecord br = bresults.records[i];
Record rec = br.record;
if (br.resultCode == ResultCode.OK) { // check individual key status
Object wResult = rec.getValue("bin4"); // get op result for bin4
Object rResult = rec.getValue("read-exp"); // get op result for read-exp
System.out.format("Result[%d]: key: %s/%s/%s, write-exp result: %s, read-exp: %s\n",
i, br.key.namespace, br.key.setName, br.key.userKey,
wResult, rResult);
}
else { // error in individual key's operations
System.out.format("Result[%d]: key: %s, error: %s\n",
i, br.key, ResultCode.getResultString(br.resultCode));
}
}
Output:
Some batch operations failed.
Result[0]: key: test:batch-ops:id-1:ca0d67e46d385d7634d5c845f762f9e9cd66757e, error: Operation not applicable
Result[1]: key: test/batch-ops/id-2, write-exp result: null, read-exp: 8
Result[2]: key: test/batch-ops/id-3, write-exp result: null, read-exp: 7
Verify database state. Note test/batch-ops records: id-2
and id-3
are changed, but id-1
has no bin4
.
printRecords();
Output:
Records in database:
Namespace: test, set: batch-ops:
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}, bin4=[10]}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}, bin4=[10, 20]}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Namespace: test, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Namespace: test2, set: batch-ops:
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Namespace: test2, set: batch-ops2:
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Using Filter Expressions with Batch Processing
Filter expressions are typically set in the BatchPolicy
. When set in operation-specific policy such as BatchWritePolicy
or BatchDeletePolicy
, a filter expression is ignored in the multi-key operation request, but in general batch operate request it takes precedence over the one set in the BatchPolicy
. This is in line with the goal of the two batch operations: multi-key operation is meant for the same operations and filter over multiple records, whereas general batch operate is meant for different operations and potentially different filters over individual records.
In the multi-key delete example below, we set two different filters:
- in
BatchPolicy
: 5 <= bin2 <= 25 selecting keysid-1
andid-2
, and - in
BatchDeletePolicy
: 15 <= bin2 <= 35 selecting keysid-2
andid-3
.
Note, only BatchPolicy
filter has effect as the operation deletes keys id-1
and id-2
. Also, filtered out
error does not stop batch execution.
// start with a clean initialized test data
initializeTestData();
// expression filter 5 <= bin2 <= 25
BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault);
bPolicy.filterExp = Exp.build( // set the filter in batch policy
Exp.and(
Exp.ge(Exp.intBin("bin2"), Exp.val(5)),
Exp.le(Exp.intBin("bin2"), Exp.val(25))));
// expression filter 15 <= bin2 <= 35
BatchDeletePolicy bdPolicy = new BatchDeletePolicy(client.batchDeletePolicyDefault);
bdPolicy.filterExp = Exp.build( // is ignored
Exp.and(
Exp.ge(Exp.intBin("bin2"), Exp.val(15)),
Exp.le(Exp.intBin("bin2"), Exp.val(35))));
// create a batch of 3 keys in (test, batch-ops)
int NUM_KEYS = 3;
Key[] keys = new Key[NUM_KEYS];
// add keys
for (int i = 0; i < NUM_KEYS; i++) {
keys[i] = new Key(Namespace1, Set1, KeyPrefix + (i+1));
}
// send the multi-key delete batch request
BatchResults bresults = client.delete(bPolicy, bdPolicy, keys);
// check if all operations succeeded
if (bresults.status) {
System.out.println("All batch operations succeeded.");
}
else {
System.out.println("Some batch operations failed.");
}
// process the BatchResults returned from the batch operation
for (int i = 0; i < bresults.records.length; i++) {
BatchRecord br = bresults.records[i];
Record rec = br.record;
if (br.resultCode == ResultCode.OK) { // check individual key status
System.out.format("Result[%d]: key: %s/%s/%s deleted.\n",
i, br.key.namespace, br.key.setName, br.key.userKey);
}
else { // error in individual key's operations
System.out.format("Result[%d]: key: %s, error: %s\n",
i, br.key, ResultCode.getResultString(br.resultCode));
}
}
Output:
Some batch operations failed.
Result[0]: key: test:batch-ops:id-1:ca0d67e46d385d7634d5c845f762f9e9cd66757e, error: Transaction filtered out
Result[1]: key: test/batch-ops/id-2 deleted.
Result[2]: key: test/batch-ops/id-3 deleted.
Inline Processing
Both namespaces in this notebook container are in-memory namespaces, and therefore batch operations are processed inline by default.
In a general batch operate, we will execute these operations on the same record bin:
- write+read
- read
- UDF write+read
- read
If these operations execute in sequence or "in line", we expect the following:
- The reads in 1 and 2 should return the same value.
- The read in 3 and 4 should return the same value.
// start with a clean initialized test data
initializeTestData();
// batch records array - each batch record holds a key and operations array
List<BatchRecord> batchRecords = new ArrayList<BatchRecord>();
// 1. write+read
Operation[] ops1 = Operation.array(
Operation.add(new Bin("bin2", Value.get(1))),
Operation.get("bin2"));
batchRecords.add(new BatchWrite(new Key(Namespace1, Set1, KeyPrefix + 1), ops1));
// 2. read
Operation[] ops2 = Operation.array(
Operation.get("bin2"));
batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 1), ops2));
// 3. UDF write+read
batchRecords.add(new BatchUDF(new Key(Namespace1, Set1, KeyPrefix + 1),
UDFModule,
UDFFunction,
new Value[]{Value.get("bin2"), Value.get(1)}));
// 4. read
Operation[] ops4 = Operation.array(
Operation.get("bin2"));
batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 1), ops4));
// execute the batch
BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault);
bPolicy.allowInline = false; // set true or false and examine results
try {
client.operate(bPolicy, batchRecords);
}
catch (AerospikeException e) {
System.out.format("%s", e);
}
// get and show results
// 1. write+read
int i = 0;
BatchRecord batchRec = batchRecords.get(i);
Record rec = batchRec.record;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin2");
System.out.format("Result[%d]: bin2: %s\n", i, v1);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 2. read
i = 1;
batchRec = batchRecords.get(i);
rec = batchRec.record;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin2");
System.out.format("Result[%d]: bin2: %s\n", i, v1);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 3. UDF write+read
i = 2;
batchRec = batchRecords.get(i);
rec = batchRec.record;
if (batchRec.resultCode == ResultCode.OK) {
HashMap<?,?> udfMap = (HashMap<?,?>)rec.getUDFResult(); // cast udf result to map returned by udf
Object v1 = udfMap.get("bin2"); // extract bin2 value from map // cast to map
System.out.format("Result[%d]: bin2: %s\n", i, v1);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
// 4. read
i = 3;
batchRec = batchRecords.get(i);
rec = batchRec.record;
if (batchRec.resultCode == ResultCode.OK) {
Object v1 = rec.getValue("bin2");
System.out.format("Result[%d]: bin2: %s\n", i, v1);
}
else {
System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode));
}
Output:
Result[0]: bin2: [null, 11]
Result[1]: bin2: 10
Result[2]: bin2: 12
Result[3]: bin2: 12
Another Example
In this example, we have a large batch size. A bin in the same record is incremented if it has the expected value if processing is strictly inline, otherwise the write operation will generate an error.
Set the allowInline
flag to true or false and observe the results. Note the value of bin2
for the test/batch-ops/id-2
key. It should be the number of iterations + 10 if all operations successfully executed inline.
// start with a clean initialized test data
initializeTestData();
// batch records array - each batch record holds a key and operations array
List<BatchRecord> batchRecords = new ArrayList<BatchRecord>();
// create a batch of 100 - on same record in (test, batch-ops)
int NUM_ITERS = 100;
// create write and read expressions
// increment bin2 by (1 if bin2 == expected value else unknown)
int expectedBinVal = 10;
for (int i = 0; i < NUM_ITERS; i++) {
Expression writeExp = Exp.build(
Exp.add(Exp.intBin("bin2"),
Exp.cond(
Exp.eq(Exp.intBin("bin2"), Exp.val(expectedBinVal)), Exp.val(1),
Exp.val(0))));
//Exp.unknown())));
Operation[] ops = Operation.array(
ExpOperation.write("bin2", writeExp, ExpWriteFlags.DEFAULT));
batchRecords.add(new BatchWrite(new Key(Namespace1, Set1, KeyPrefix + 1), ops));
expectedBinVal++;
}
// execute the batch
BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault);
bPolicy.respondAllKeys = false;
bPolicy.allowInline = false; // set true or false and examine results
System.out.format("Batch of %d records, with flags allowInline=%b.\n",
NUM_ITERS, bPolicy.allowInline);
try {
client.operate(bPolicy, batchRecords);
}
catch (AerospikeException e) {
System.out.format("%s\n", e);
}
System.out.println("Done.");
printRecords();
Output:
Batch of 100 records, with flags allowInline=false.
Done.
Records in database:
Namespace: test, set: batch-ops:
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=11, bin3={1=10}}
Namespace: test, set: batch-ops2:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Namespace: test2, set: batch-ops:
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Namespace: test2, set: batch-ops2:
Key id-1: {bin1=1, bin2=10, bin3={1=10}}
Key id-3: {bin1=3, bin2=30, bin3={1=10, 2=20, 3=30}}
Key id-2: {bin1=2, bin2=20, bin3={1=10, 2=20}}
Asynchronous Batch Processing
Setting up the event loops for asynchronous processing is somewhat involved. There is a separate tutorial that walks through the steps; please refer to the tutorial on asynchronous processing here.
The functionality of each synchronous APIs is replicated in two asynchronous variations:
- With a list listener callback: As the name suggests, the callback gets the list of all results from the batch in one invocation.
- With a record listener callback: As the name suggests, the callback is called with every individual record in the batch.
Readers are encouraged to take a synchronous batch API above and implement its async variants, borrowing the code from the async processing tutorial.
Batch Reads
The previously supported batch read APIs have not changed to ensure that the existing code using the batch read APIs does not break. However there are the following behavior changes:
- By default, all keys in the request will be processed even if there are failures. In the old batch reads, if a node sub-batch returns an error, the entire batch operation fails.
- Failures are returned separately for each record.
- Operate also takes read expressions, which were introduced in 5.7.
- Set names are always sent. The policy option sendSetName is ignored, and is deprecated.
The read-only batch operations are illustrated here.
The newly added batch "write" operate and general batch operate functions described earlier also provide the read capabilities.
Takeaways and Conclusion
Batch requests can be effective in improving throughput as they allow one or more operations to be submitted for multiple records. Now Aerospike supports all write operations, deletes, and UDF functions in a batch mode. In this notebook we discussed and described the new batch APIs with code examples.
Clean up
Remove tutorial data and close connection.
client.truncate(null, Namespace1, null, null);
client.truncate(null, Namespace2, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Output:
Removed tutorial data and closed server connection.
Further Exploration and Resources
Here are some links for further exploration
Resources
- Related notebooks
- Blog posts
- Aerospike Developer Hub
- Github repos
- Documentation
Next steps
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.