Implementing SQL Operations: CREATE, UPDATE, DELETE
For an interactive Jupyter notebook experience:
This tutorial describes how SQL change operation equivalents can be performed in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see how specific modify statements in SQL can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL change equivalents in Aerospike. You should be able to understand them and find them useful even without a deep familiarity with SQL.
This notebook is the third in the SQL Operations series that consists of the following notebooks:
- Implementing SQL Operations: SELECT
- Implementing SQL Operations: Aggregate functions Part 1 and 2
- Implementing SQL Operations: UPDATE, CREATE, and DELETE (this notebook)
The specific topics and aggregate functions we discuss in this notebook include:
- Database change operations
- Table and Index change operations
- Record change operations
- Single record: single and multi-ops
- Multi-record operations
The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.
Prerequisites
This tutorial assumes familiarity with the following topics:
Initialization
Ensure database is running
This notebook requires that Aerospike Database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Download and install additional components.
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
Connect to database and populate test data
The test data has 10 records with user-key "id-1" through "id-10", two integer bins (fields) "bin1" (1-10) and "bin2" (1001-1010) in the namespace "test" and set "sql-update".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";
String Set = "sql-update";
Output:
Initialized the client and connected to the cluster.
Open a Terminal Tab
In this tutorial, you will be executing many shell commands including Aerospike tools like asadm and aql in a terminal. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Aerospike Data Model
Aerospike is a distributed key-value database with support for the document-oriented data model. An Aerospike Database can hold multiple "namespaces" which are equivalent to databases or schemas in the relational model. A namespace hold records (rows), organized in sets (tables) and are accessed using a unique key that serves as the record id. A record can contain one or more bins (columns), and a bin can hold a value of different data types.
The following simple data types are available in Aerospike.
- String: equivalent to text or varchar in SQL
- Integer: equivalent to Integer in SQL
- Double: equivalent to Double in SQL
- Blob/Bytes: equivalent to Blob in SQL
In addition, a bin can hold Collection Data Type (CDTs) such as List and Map, as well as complex data types such as Geospatial and HyperLogLog.
Sets and records do not conform to a schema. A set can have records with different bins, and a record can have different bins at different times. Moreover a bin is not limited to a specific type, and can hold multiple data types in different records. Type specific operations apply only to the matching bins of the right type.
For a detailed description of the data model see the Data Model overview.
Namespace Operations
CREATE Namespace
There is no API to create a namespace. A namespace is added through the config and requires a server restart.
Review the config file at /etc/aerospike/aerospike.conf by opening a terminal window through File->Open menu section followed up New->Terminal.
cat /etc/aerospike/aerospike.conf
As you can see, there is one namespace "test" configured in the server.
You can create a simple namespace "create-test" by following the following steps.
- Copy aerospike.conf to the "java" directory.
cp /etc/aerospike/aerospike.conf ~/notebooks/java/aerospike.conf
- Open aerospike.conf in the java directory from File->Open menu.
- Add these lines to add a new namespace "create-test" at the end.
namespace create-test { memory-size 1G }
- Save the file.
- In the terminal tab, stop and restart the server with the changed config.
/etc/init.d/aerospike stop; asd --config-file ~/notebooks/java/aerospike.conf
- In the terminal tab, examine the new namespace by issuing the following asadm command.
asadm -e "info namespace"
After the new namespace is successfully created, insert a few records by executing the following cell.
// define a convenience functions to examine all records in a namespace and set
import com.aerospike.client.Record;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.policy.ScanPolicy;
public class ScanParallel implements ScanCallback {
public void scanCallback(Key key, Record record) {
System.out.format("namespace=%s set=%s key=%s bins=%s\n", key.namespace, key.setName, key.userKey, record.bins);
}
}
void examineRecords(String ns, String set) {
client.scanAll(null, ns, set, new ScanParallel());
}
// insert records in the new namespace and verify by reading back
String NamespaceCreateTest = "create-test";
String SetCreateTest = "test-set";
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 10; i++) {
Key key = new Key(NamespaceCreateTest, SetCreateTest, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
System.out.format("Inserted 10 records in ns=%s set=%s.\n", NamespaceCreateTest, SetCreateTest);
System.out.format("Retrieving all records in ns=%s.\n", NamespaceCreateTest);
examineRecords(NamespaceCreateTest, null); // null value for a set returns all records in the namespace
Output:
Inserted 10 records in ns=create-test set=test-set.
Retrieving all records in ns=create-test.
namespace=create-test set=test-set key=id-10 bins={bin1=10, bin2=1010}
namespace=create-test set=test-set key=id-5 bins={bin1=5, bin2=1005}
namespace=create-test set=test-set key=id-1 bins={bin1=1, bin2=1001}
namespace=create-test set=test-set key=id-7 bins={bin1=7, bin2=1007}
namespace=create-test set=test-set key=id-3 bins={bin1=3, bin2=1003}
namespace=create-test set=test-set key=id-8 bins={bin1=8, bin2=1008}
namespace=create-test set=test-set key=id-9 bins={bin1=9, bin2=1009}
namespace=create-test set=test-set key=id-4 bins={bin1=4, bin2=1004}
namespace=create-test set=test-set key=id-2 bins={bin1=2, bin2=1002}
namespace=create-test set=test-set key=id-6 bins={bin1=6, bin2=1006}
TRUNCATE Namespace
The truncate API removes all records in a set or the entire namespace. (If the namespace has records in a "null" set, those must be removed through iterating over them using a UDF.)
truncate(InfoPolicy policy, String ns, String set, Calendar beforeLastUpdate)
The following cell truncates all sets in the namespace "create-test", and verifies there are no records in the namespace after truncation.
// truncate the namespace
client.truncate(null, NamespaceCreateTest, null, null); // null set value truncates all sets in the namespace
// examine records in the namespace - should be empty
System.out.format("Retrieving all records in namespace %s.\n", NamespaceCreateTest);
examineRecords(NamespaceCreateTest, null); // null set value returns all records in the namespace
System.out.format("Done.");;
Output:
Retrieving all records in namespace create-test.
Done.
DELETE Namespace
There is no API to delete a namespace. A namespace has one or more dedicated storage devices, and they must be wiped clean to delete the namespace. Since "create-test" is an in-memory namespace, you can remove it from the config and restart the server to delete it. Verify deletion with the asadm command "info namespace".
- In the terminal tab, simply restart the server using the original config (without the "create-test" namespace).
/etc/init.d/aerospike restart
- In the terminal tab, examine the namespace is dropped by issuing the following asadm command.
asadm -e "info namespace"
Any operation on a deleted namespace will return an error.
import com.aerospike.client.AerospikeException;
try {
examineRecords(NamespaceCreateTest, null); // null value for a set returns all records in the namespace
}
catch (AerospikeException ae) {
System.out.format("Error in retrieving records from namespace %s: %s.\n", NamespaceCreateTest, ae.getMessage());
}
Output:
Error in retrieving records from namespace create-test: Error 20,1,30000,0,5,BB9020011AC4202 127.0.0.1 3000: Namespace not found.
Set Operations
CREATE Set
There is no explicit operation to create a set. A set is created when the first record is inserted in the set.
Create a new set "create-set" in the namespace "test" by inserting a record.
String SetCreateTest = "create-set";
Key key = new Key(Namespace, SetCreateTest, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), 1000+100);
client.put(wpolicy, key, bin1, bin2);
Examine the sets in the namespace by issuing the following command in the terminal tab.
aql -c "show sets"
ALTER Set
A set is schemaless, and can hold records that have different schemas or bins. A bin has no type associated with it, and can hold values of any type. Further, there are no constraints on bin values such as not null unique, default, primary/foreign key, and so on. Therefore ALTER operation on a set to modify its schema is not needed.
The following cell populates "create-set" with two records with different schemas.
Key key = new Key(Namespace, SetCreateTest, "id-101");
Bin bin1 = new Bin(new String("bin1"), "string");
Bin bin3 = new Bin(new String("bin3"), "new bin");
client.put(wpolicy, key, bin1, bin3);
Key key = new Key(Namespace, SetCreateTest, "id-102");
Bin bin1 = new Bin(new String("bin1"), 111.222);
Bin bin2 = new Bin(new String("bin2"), 0.1);
Bin bin4 = new Bin(new String("bin4"), "");
client.put(wpolicy, key, bin1, bin2, bin4);
examineRecords(Namespace, SetCreateTest);
Output:
namespace=test set=create-set key=id-101 bins={bin1=string, bin3=new bin}
namespace=test set=create-set key=id-100 bins={bin1=100, bin2=1100}
namespace=test set=create-set key=id-102 bins={bin1=111.222, bin2=0.1, bin4=}
TRUNCATE Set
All records in a set can be truncated using the truncate API:
truncate(InfoPolicy policy, String ns, String set, Calendar beforeLastUpdate)
The following truncates all records in set "create-set" in namespace "create-test".
client.truncate(null, Namespace, SetCreateTest, null);
Verify all records in the set have been deleted.
System.out.format("Retrieving all records in set=%s.\n", SetCreateTest);
examineRecords(Namespace, SetCreateTest);
System.out.format("Done.");;
Output:
Retrieving all records in set=create-set.
Done.
DROP Set
There is no notion of deleting a set as a set is just a name that a record is tagged with. The namespace must be deleted to remove the set name.
Index Operations
CREATE Index
An index is created on a bin for a specific value type. Integer, string, and geojson types are currently supported for indexing. An index supports equality and range queries for integer values, equality queries on string values, and containment queries on geojson values.
createIndex(Policy policy, String namespace, String setName, String indexName, String binName, IndexType indexType)
An index can also be created on a collection type value (List and Map) for fast access within the collection by item value (List), or key and range values (Map).
Below we illustrate creating an integer index on bin1.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_update_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Output:
Created number index test_sql_update_bin1_number_idx on ns=test set=sql-update bin=bin1.
Examine the new index in the namespace by issuing the following command in the terminal tab.
aql -c "show indexes"
DROP Index
The API dropIndex deletes an index.
dropIndex(Policy policy, String namespace, String setName, String indexName)
client.dropIndex(null, Namespace, Set, IndexName);
System.out.format("Dropped index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Output:
Dropped index test_sql_update_bin1_number_idx on ns=test set=sql-update bin=bin1.
Examine the index is dropped by issuing the following command in the terminal tab.
aql -c "show indexes"
Single Record Operations
<pre>
INSERT INTO namespace.set VALUES (id=key, bin=value, ...)
UPDATE namespace.set SET (bin=value, ...) WHERE id=key
DELETE FROM namespace.set WHERE id=key
</pre>
INSERT and UPDATE Record
The put operation handles Create (Insert) and Update. The "record-exists-action" specified within the write-policy defines the operation semantics when the record already exists, with the following variants:
- create-only: Create if record doesn't exist already.
- update: Create if record doesn't exist, update otherwise.
- update-only: Update only, if record exists.
- replace: Create if record doesn't exist, replace otherwise.
- replace-only: Replace only, if record exists.
Note, there is no replace operation in SQL.
<pre>
put(WritePolicy policy, Key key, Bin... bins)
</pre>
Below we illustrate various record-exists-action options for create/update/replace of a single record.
import com.aerospike.client.policy.RecordExistsAction;
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), 1000+100);
// ensure the record doesn't exist by removing it
try {
client.delete(wpolicy, key);
}
catch (AerospikeException ae) {
// ignore if doesn't exist
}
// update-only must fail since the record doesn't exist
try {
wpolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
client.put(wpolicy, key, bin1, bin2);
System.out.format("update-only succeeded");
}
catch (AerospikeException ae) {
System.out.format("Error on update-only: %s.\n", ae.getMessage());
}
// replace-only must fail since the record doesn't exist
try {
wpolicy.recordExistsAction = RecordExistsAction.REPLACE_ONLY;
client.put(wpolicy, key, bin1, bin2);
System.out.format("replace-only succeeded");
}
catch (AerospikeException ae) {
System.out.format("Error on replace-only: %s.\n", ae.getMessage());
}
// create-only should succeed since the record doesn't exist
try {
wpolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Create-only succeeded: key=%s bins=%s\n", key.userKey, record.bins);
}
catch (AerospikeException ae) {
System.out.format("Error on create-only: %s.\n", ae.getMessage());
}
// update should succeed irrespective of the record's existence
try {
Bin bin3 = new Bin(new String("bin3"), "new bin");
wpolicy.recordExistsAction = RecordExistsAction.UPDATE;
client.put(wpolicy, key, bin3);
Record record = client.get(null, key);
System.out.format("Update succeeded: key=%s bins=%s\n", key.userKey, record.bins);
}
catch (AerospikeException ae) {
System.out.format("Error: %s.\n", ae.getMessage());
}
// replace should succeed irrespective of the record's existence
try {
Bin bin4 = new Bin(new String("bin4"), "another bin");
wpolicy.recordExistsAction = RecordExistsAction.REPLACE;
client.put(wpolicy, key, bin4);
Record record = client.get(null, key);
System.out.format("Replace succeeded: key=%s bins=%s\n", key.userKey, record.bins);
}
catch (AerospikeException ae) {
System.out.format("Error: %s.\n", ae.getMessage());
}
Output:
Error on update-only: Error 2,1,30000,0,0,BB9020011AC4202 127.0.0.1 3000: Key not found.
Error on replace-only: Error 2,1,30000,0,0,BB9020011AC4202 127.0.0.1 3000: Key not found.
Create-only succeeded: key=id-100 bins={bin1=100, bin2=1100}
Update succeeded: key=id-100 bins={bin1=100, bin2=1100, bin3=new bin}
Replace succeeded: key=id-100 bins={bin4=another bin}
Type Specific Updates
Aerospike allows many type specific update operations. For integer and string types, they include:
UPDATE namespace.set SET (bin = bin + intval) WHERE id=key add(WritePolicy policy, Key key, Bin... bins) UPDATE namespace.set SET (bin = bin + strval) WHERE id=key append(WritePolicy policy, Key key, Bin... bins) UPDATE namespace.set SET (bin = strval + bin) WHERE id=key prepend(WritePolicy policy, Key key, Bin... bins)
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), "John");
wpolicy.recordExistsAction = RecordExistsAction.REPLACE;
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);
// add
bin1 = new Bin(new String("bin1"), 1);
client.add(null, key, bin1);
Record record = client.get(null, key);
System.out.format("After add: key=%s bins=%s\n", key.userKey, record.bins);
// append
bin2 = new Bin(new String("bin2"), " Doe");
client.append(null, key, bin2);
Record record = client.get(null, key);
System.out.format("After append: key=%s bins=%s\n", key.userKey, record.bins);
//prepend
bin2 = new Bin(new String("bin2"), "Mr ");
client.prepend(null, key, bin2);
Record record = client.get(null, key);
System.out.format("After prepend: key=%s bins=%s\n", key.userKey, record.bins);;
Output:
Record: key=id-100 bins={bin1=100, bin2=John}
After add: key=id-100 bins={bin1=101, bin2=John}
After append: key=id-100 bins={bin1=101, bin2=John Doe}
After prepend: key=id-100 bins={bin1=101, bin2=Mr John Doe}
In addition, List, Map, Geospatial, HyperLogLog, and Blob types allow a wide range of update operations. Refer to the tutorials on List and Map, and
documentation.DELETE Record
DELETE FROM namespace.set WHERE id=key
delete(WritePolicy policy, Key key)
Create a record, delete it, and verify deletion.
// create a record
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), 1000+100);
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);
// delete the record
client.delete(wpolicy, key);
System.out.format("Record deleted: key=%s\n", key.userKey);
// verify it's deleted
Record record = client.get(null, key);
if (record == null) {
System.out.format("No record found for key=%s \n", key.userKey);;
}
else {
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);;
}
Output:
Record: key=id-100 bins={bin1=100, bin2=1100}
Record deleted: key=id-100
No record found for key=id-100
Multi-Ops
Multiple single record updates are possible through operate.
UPDATE namespace.set SET (bin=value, ...) WHERE id=key operate(WritePolicy policy, Key key, Operation... operations)
Note, the operate function allows read (SELECT) operations to be included in the list, and executes and returns the results in the order they are specified. See this tutorial that illustrates multi-ops.
The following code creates a record with an integer and a string bin, and then in one operate call increments the integer bin, appends to the string bin, and reads the updated record back.
import com.aerospike.client.Operation;
// create a record
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), "John");
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);
// Add integer, update string, read record in one operate call
bin1 = new Bin(new String("bin1"), 1);
bin2 = new Bin(new String("bin2"), " Doe");
record = client.operate(null, key, Operation.add(bin1), Operation.append(bin2), Operation.get());
System.out.format("Operate results: key=%s bins=%s\n", key.userKey, record.bins);;
Output:
Record: key=id-100 bins={bin1=100, bin2=John}
Operate results: key=id-100 bins={bin1=101, bin2=John Doe}
Multi-Record Updates
UPDATE namespace.set SET (bin=value, ...) WHERE condition
DELETE FROM namespace.set WHERE condition
Multi-record updates and deletes are possibly by specifying the WHERE condition using the query filter (specified in a Statement object) and expression filter (specified in the write policy). If both filters are specified, the two are ANDed. The query filter operates on one bin and requires a supporting secondary index to exist on the bin. The remaining parts of the condition must be specified in the expression filter. For best performance, use the most selective condition in a query filter when possible.
Note, there is no API for batch insertion of multiple records.
Updates and deletes can be done in two ways:
- List of bin updates: A list of update operations can be specified, each operating on a bin.
- User Defined Function (UDF): Record-oriented UDFs implement arbitrary logic in a Lua function that is registered with the server and invoked through an API call.
List of Bin Updates
The list can be specified in two ways:
- As part of the Statement object where the query (index) filter is also specified.
Statement::setOps(Operation[] ops] AerospikeClient::execute(WritePolicy policy, Statement statement)
- As an operation list parameter in the execute call.
AerospikeClient::execute(WritePolicy policy, Statement statement, Operation[] ops)
Note:
- An operation list in the Statement object cannot have a read operation, only updates. If read operations are present, the entire execute call fails.
- Updates from Statement are ignored if there is an operation list specified as a parameter to execute.
First we will populate data, create secondary index, and define a convenience function, and then illustrate how to execute bin updates using the two methods.
// populate data, create secondary index, and define a convenience function
// add records with keys "id-1" to "id-10" and
// bins bin1 (integer values 1-10) and bin2 (integer values 1000-1010).
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 10; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
System.out.format("Test data populated.\n");;
// create an integer secondary index on bin1
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_update_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.\n",
IndexName, Namespace, Set, "bin1");;
// define convenience functions
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.Filter;
import com.aerospike.client.Record;
void printRecordRange(String header, String namespace, String set, String bin, int rangeLow, int rangeHigh) {
System.out.format("%s\n", header);
Statement stmt = new Statement();
stmt.setNamespace(namespace);
stmt.setSetName(set);
stmt.setFilter(Filter.range(bin, rangeLow, rangeHigh));
RecordSet rs = client.query(null, stmt);
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
rs.close();
}
Output:
Test data populated.
Created number index test_sql_update_bin1_number_idx on ns=test set=sql-update bin=bin1.
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.Operation;
import com.aerospike.client.task.ExecuteTask;
// 1. list of updates specified in Statement
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setFilter(Filter.range("bin1", 4, 7));
Operation ops[] = { new Operation(Operation.Type.ADD, "bin2", Value.get(1)),
Operation.put(new Bin("bin3", "new1"))};
stmt.setOperations(ops);
ExecuteTask task = client.execute(null, stmt);
task.waitTillComplete(3000, 3000);
printRecordRange("After executing updates in Statement with filter:", Namespace, Set, "bin1", 4, 7);
// 2. list of updates as argument to execute
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setFilter(Filter.range("bin1", 4, 7));
ExecuteTask task = client.execute(null, stmt, new Operation(Operation.Type.ADD, "bin2", Value.get(1)),
Operation.put(new Bin("bin3", "new2")));
task.waitTillComplete(3000, 3000);
printRecordRange("After executing updates in Ops list argument:", Namespace, Set, "bin1", 4, 7);
// 3. updates in Statement as well as in the execute argument - Statement operations are ignored
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setFilter(Filter.range("bin1", 4, 7));
Operation ops[] = { new Operation(Operation.Type.ADD, "bin2", Value.get(1))};
stmt.setOperations(ops);
WritePolicy policy = new WritePolicy();
policy.maxRetries = 0;
ExecuteTask task = client.execute(policy, stmt, Operation.put(new Bin("bin3", "new3")));
task.waitTillComplete(3000, 3000);
printRecordRange("After executing updates in Statement and Ops list argument (Statement updates are ignored):",
Namespace, Set, "bin1", 4, 7);
Output:
initial state:
key=id-4 bins={bin1=4, bin2=1004}
key=id-5 bins={bin1=5, bin2=1005}
key=id-6 bins={bin1=6, bin2=1006}
key=id-7 bins={bin1=7, bin2=1007}
After executing updates in Statement with filter:
key=id-4 bins={bin1=4, bin2=1005, bin3=new1}
key=id-5 bins={bin1=5, bin2=1006, bin3=new1}
key=id-6 bins={bin1=6, bin2=1007, bin3=new1}
key=id-7 bins={bin1=7, bin2=1008, bin3=new1}
initial state:
key=id-4 bins={bin1=4, bin2=1005, bin3=new1}
key=id-5 bins={bin1=5, bin2=1006, bin3=new1}
key=id-6 bins={bin1=6, bin2=1007, bin3=new1}
key=id-7 bins={bin1=7, bin2=1008, bin3=new1}
After executing updates in Ops list argument:
key=id-4 bins={bin1=4, bin2=1006, bin3=new2}
key=id-5 bins={bin1=5, bin2=1007, bin3=new2}
key=id-6 bins={bin1=6, bin2=1008, bin3=new2}
key=id-7 bins={bin1=7, bin2=1009, bin3=new2}
initial state:
key=id-4 bins={bin1=4, bin2=1006, bin3=new2}
key=id-5 bins={bin1=5, bin2=1007, bin3=new2}
key=id-6 bins={bin1=6, bin2=1008, bin3=new2}
key=id-7 bins={bin1=7, bin2=1009, bin3=new2}
After executing updates in Statement and Ops list argument (Statement updates are ignored):
key=id-4 bins={bin1=4, bin2=1006, bin3=new3}
key=id-5 bins={bin1=5, bin2=1007, bin3=new3}
key=id-6 bins={bin1=6, bin2=1008, bin3=new3}
key=id-7 bins={bin1=7, bin2=1009, bin3=new3}
Using UDF
A record oriented User Defined Function (UDF) can be used for a single record or multi-record updates.
UPDATE namespace.set SET (bin=fn(args), ...) WHERE condition
Single Record:
Object execute(WritePolicy policy, Key key, String packageName, String functionName, Value... functionArgs)
Multi-Record:
ExecuteTask execute(WritePolicy policy, Statement statement, String packageName, String functionName,
Value... functionArgs)
A multi-record update is illustrated below. The Statement object is used to specify a predicate or query filter to identify records. Any updates specified in Statement are ignored. The WHERE condition can also be specified using an expression filter as shown below.
Note:
- Cannot use updates in Statement as they will be ignored. Statement is used only to specify a query filter that leverages a secondary index.
- Cannot use UDF in Statement for updates; for updates, UDF must be specified using arguments of execute call.
Create UDF
Examine the following Lua code that updates two bins by adding and appending the values provided.
-- update the specified bins by adding and appending the values provided function add_append(rec, binName1, addVal, binName2, appendVal) rec[binName1] = rec[binName1] + addVal rec[binName2] = rec[binName2] .. appendVal aerospike:update(rec) end
Register UDF
Note, the UDF function is expected to be in "update_example.lua" file under "udf" directory.
Register the UDF with the server by executing the following code cell. The function invalidates the cache, removes the currently registered module, and registers the latest version.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";
String UDFFile = "update_example.lua";
String UDFModule = "update_example";
void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Output:
Registered the UDF module update_example.lua.
Execute UDF
Run the UDF on multiple records using an expression filter to specify records with bin1 values in the range 4-7 (both inclusive).
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
WritePolicy policy = new WritePolicy();
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin1"), Exp.val(4)),
Exp.le(Exp.intBin("bin1"), Exp.val(7))));
policy.maxRetries = 0;
ExecuteTask task = client.execute(policy, stmt, UDFModule, "add_append",
Value.get("bin2"), Value.get(1), Value.get("bin3"), Value.get("!!!"));
task.waitTillComplete(3000, 3000);
printRecordRange("After executing UDF add_append:",
Namespace, Set, "bin1", 4, 7);
Output:
initial state:
key=id-4 bins={bin1=4, bin2=1006, bin3=new3}
key=id-5 bins={bin1=5, bin2=1007, bin3=new3}
key=id-6 bins={bin1=6, bin2=1008, bin3=new3}
key=id-7 bins={bin1=7, bin2=1009, bin3=new3}
After executing UDF add_append:
key=id-4 bins={bin1=4, bin2=1007, bin3=new3!!!}
key=id-5 bins={bin1=5, bin2=1008, bin3=new3!!!}
key=id-6 bins={bin1=6, bin2=1009, bin3=new3!!!}
key=id-7 bins={bin1=7, bin2=1010, bin3=new3!!!}
Salient Points
- A combination of query (index) filter and expression filter is used to define selection criteria. If both filters are specified, the two are ANDed. The query filter spans one bin and requires a supporting secondary index to exist on the bin.
- Cannot use UDF in the Statement object for updates (they are used only for streaming functions to evaluate aggregates).
- Updates if specified in the Statement object are ignored when used in conjunction with updates in execute argument (either Ops or UDF).
Managing Durability and Expiration
Updates by default are applied in memory, but can be made more durable or persistent by specifying "commit-to-device" option in the namespace configuration.
A record by default is created never to expire, but a different time-to-live (ttl) can be specified. An expired record is automatically removed and its space reclaimed, thus relieving the lifecycle management burden in applications where it makes sense.
Leveraging CDTs
Use of collection data types (CDTs) like List and Map to store objects or records is a common pattern in Aerospike and is recommended for performance. Refer to this tutorial on modeling with lists.
Takeaways and Conclusion
Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various modification statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.
Clean up
Remove tutorial data and close connection.
client.dropIndex(null, Namespace, Set, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Output:
Removed tutorial data and closed server connection.
Further Exploration and Resources
Here are some links for further exploration
Resources
- Related notebooks
- Other notebooks in the SQL series on 1) SELECT, 2) Aggregates - Part 1 and Part 2.
- Working with Lists
- Working with Maps
- Aerospike Presto Connector
- Blog post
- Github repos
- Documentation
Next steps
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.