Understanding Asynchronous Operations
For an interactive Jupyter notebook experience:
This tutorial describes asynchronous operations in Aerospike: why they are used, the architecture, and how to program with async operations.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see the benefits, design, and specifics of programming with asynchronous operations in Aerospike.
Aerospike provides asynchronous APIs for many operations. We will describe the benefits of using async operations and key abstractions in the client related to async requests. After covering the theoretical ground, we will show how it all comes together with specific code examples.
The notebook tutorial has two parts:
- architecture and concepts, and
- coding examples.
The main topics include:
- Execution models in Aerospike
- Benefits of async
- Key concepts
- Framework for async programming
- Coding examples
Prerequisites
This tutorial assumes familiarity with the following topics:
Initialization
Ensure database is running
This notebook requires that Aerospike Database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Download and install additional components.
Install the Aerospike Java client and the Java Netty package, which is described later in the notebook.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
<scope>compile</scope>
</dependency>
</dependencies>
Constants and Convenience Functions
We will use some constants and convenience functions throughout this tutorial, including the namespace "test" and set "async-ops".
final String Namespace = "test";
final String Set = "async-ops";
// truncate data, close client and event loops - called multiple times to initialize with different options
// described in greater detail later
void Cleanup() {
try {
client.truncate(null, Namespace, Set, null);
}
catch (AerospikeException e) {
// ignore
}
client.close();
eventLoops.close();
};
Open a Terminal Tab
You may execute shell commands including Aerospike tools like
aql andasadm in the terminal tab. Open a terminal tab by selecting File-\>Open from the notebook menu, and then New-\>Terminal.Synchronous, Asynchronous, and Background Operations
An application uses the Aerospike client library (aka the client) to interact with Aerospike Database. The client sets up a connection to the appropriate server node and sends in a request for execution in one of the following modes:
Synchronous: The request thread makes the request, waits for the response, and processes the response upon arrival.
Asynchronous: The request thread submits one or more requests, and results are processed in one or more callback thread(s) as they arrive.
Background: The request thread submits the request and operation (or task) is completed in the background. The submission returns immediately, while the actual operation executes separately. The application can check the completion status of the task, and after it is completed may examine the results in the database with one or more separate requests.
Note, a background operation may be considered a special type of asynchronous operation, and it is applicable only for updates in Aerospike. By asynchronous operations we refer to only those that return results in a callback.
Asynchronous Operations For Better Resource Efficiency
During the time a request is sent to the server and the result arrives (“request latency”), the client and application need not wait idly if a high throughput is the goal. A higher throughput can be achieved through concurrent requests.
- Synchronous: The application can spawn multiple threads and process multiple requests in parallel, one per thread at a time.
- Asynchronous: The application can process requests asynchronously by submitting them in parallel without waiting for the results. The results are processed as they arrive in a different “callback” thread. An async request uses a dedicated connection to the server.
- Pipeline: Multiple requests could be sent over the same connection to the same server node, and their results received over the same connection. Thus there is greater sharing of threads and connections across multiple requests. Aerospike currently does not support pipeline processing.
In many cases, asynchronous processing can be more resource efficient and can deliver better throughput than multi-threaded synchronous processing because threads have memory and CPU (context-switch) overhead and their number may be limited by the OS.
On the other hand, the asynchronous model is more complex to program and debug. The application should make judicious use of synchronous, asynchronous, and background requests. The client can perform different type of commands in a single instance.
It should be noted that background operations when appropriate would typically deliver superior throughput especially in a non-UDF invocation.
Supported Asynchronous Operations
Most CRUD operations have the async variant.
- Single record operations
- add, append, delete, apply(udf), get, getHeader, operate, prepend, put, touch
- Batch operations:
- exists (array listener and sequence listener), get (batch list and batch sequence listener), get (record array and record sequence listener), getHeader
- Query/scan: Callback handles a series of records, a single record at
a time.
- query, queryPartitions
- scanAll, scanPartitions
- Metadata: createIndex, dropIndex
- Operational: info
Please refer to the API documentation for details.
Execution Model
The async methods take two additional arguments than their sync variants: the “event loops” and “listener (callback)”. See the code in Async Framework section below.
- Event loops: An event-loop represents the loop of "submit a request" and "asynchronously process the result" for concurrent processing of events or requests. Multiple event loops are used to leverage multiple CPU cores.
- Listener: The listener encapsulates the processing of results.
- Listener types: Depending on the expected number of records in the result and whether they arrive at once or individually, different listener types are to be used such as a single record listener, a record array listener, or a record sequence listener.
- Completion handlers: A single record or record array is processed with the success or failure handler. In a record sequence listener, each record is processed with a "record" handler, whereas the success handler is called to mark the end of the sequence.
Application Call Sequence
The application is responsible for spreading requests evenly across event loops as well as throttling the rate of requests if the request rate can exceed the client or server capacity. The call sequence involves these steps (see the code in Async Framework section below):
- Initialize event loops.
- Implement the listener with success and failure handlers.
- Submit requests across event loops, throttling to stay below maximum outstanding requests limit.
- Wait for all outstanding requests to finish.
Understanding Event Loops
Let's look at the key concepts relating to event loops. As described above, an event loop represents concurrent submit-callback processing of requests. See the code in Async Framework section below.
Number of event loops: In order to maximize parallelism of the client hardware, as many event loops are created as the number of cores dedicated for the Aerospike application. An event pool is aligned with a CPU core, not to a server node or a request type.
Concurrency level: The maximum concurrency level in each event loop depends on the effective server throughput seen by the client, and in aggregate may not exceed it. A larger value would result in request timeouts and other failures.
Connection pools and event loops: Connection pools are allocated on a per node basis, and are independent of event pools. When an async request needs to connect to a node, it uses a connection from the node’s connection pool only for the duration of the request and then releases it.
Connection pool size: Concurrency across all loops must be supported by the number of connections in the connection pool. The connection pool per node should be set equal to or greater than the total number of outstanding requests across all event loops (because all requests may go to the same node in the extreme case).
Delay queue buffer: To buffer a temporary mismatch in processing and submission rates, there is a delay queue buffer in front of an event loop where requests are held until an async request slot becomes available in the event loop. The queued request is automatically assigned to a slot and processed without involvement of the application.
Throttling: The delay queue cannot buffer a long running mismatch in submission and processing speeds, however, and if the wait queue fills up, a request will not be accepted and the client will return “delay queue full” error. The application should throttle by keeping track of outstanding requests and issue a new request when an outstanding one finishes. If delay queue size is set to zero, throttling must also be handled in the application code.
Event Loop Variants: Netty, NIO, EPOLL
Both Netty and Direct NIO event loops are supported in Aerospike.
Netty is an asynchronous event-driven network application framework for high-performance servers based on Java Non-blocking IO (NIO) package. Epoll (event poll)is a Linux specific construct and allows for a process to monitor multiple file descriptors and get notifications when I/O is possible on them.
Netty allows users to share their existing event loops with AerospikeClient which can improve performance. Netty event loops are also required when using TLS connections. However Netty is an optional external library dependency.
Direct NIO event loops are lighter weight and slightly faster than Netty defaults when not sharing event loops. Direct NIO does not have an external library dependency.
You should consider trade-offs in using the types of event loops - refer to the links provided for further details.
Async Framework
Below we walk through the steps in setting up a typical async operation framework.
Initialize event loops
Initialize event loops. Allocate an event loop for each CPU core.
Examine the code snippets below.
- Initialize event policy. Select level of parallelism desired; cannot exceed server throughput.
EventPolicy eventPolicy = EventPolicy(); final CommandsPerEventLoop = 50; eventPolicy.maxCommandsInProcess = commandsPerEventLoop;
- Select delay queue buffer size in front of the event loop.
maxCommandsInQueue = 50; eventPolicy.maxCommandsInQueue = maxCommandsInQueue;
- Create event loops object.
// here we use direct nio and 2 events loops numLoops = 2; EventLoops eventLoops = new NioEventLoops(eventPolicy, numLoops);
In the following cell, the function InitializeEventLoops allows initialization of different types of event loops. The function will be called multiple times later in the notebook to experiment with different settings.
import com.aerospike.client.async.EventPolicy;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.NioEventLoops;
enum EventLoopType{DIRECT_NIO, NETTY_NIO, NETTY_EPOLL};
// a function to create event loops with specified parameters
EventLoops InitializeEventLoops(EventLoopType eventLoopType, int numLoops, int commandsPerEventLoop,
int maxCommandsInQueue) {
EventPolicy eventPolicy = new EventPolicy();
eventPolicy.maxCommandsInProcess = commandsPerEventLoop;
eventPolicy.maxCommandsInQueue = maxCommandsInQueue;
EventLoops eventLoops = null;
switch(eventLoopType) {
case DIRECT_NIO:
eventLoops = new NioEventLoops(eventPolicy, numLoops);
break;
case NETTY_NIO:
NioEventLoopGroup nioGroup = new NioEventLoopGroup(numLoops);
eventLoops = new NettyEventLoops(eventPolicy, nioGroup);
break;
case NETTY_EPOLL:
EpollEventLoopGroup epollGroup = new EpollEventLoopGroup(numLoops);
eventLoops = new NettyEventLoops(eventPolicy, epollGroup);
break;
default:
System.out.println("Error: Invalid event loop type");
}
return eventLoops;
}
// initialize event loops
final int NumLoops = 2;
final int CommandsPerEventLoop = 50;
final int DelayQueueSize = 50;
EventLoops eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, NumLoops, CommandsPerEventLoop, DelayQueueSize);
System.out.format("Event loops initialized with num-loops: %s, commands-per-event-loop: %s, delay-queue-size: %s.\n",
NumLoops, CommandsPerEventLoop, DelayQueueSize);;
Output:
Event loops initialized with num-loops: 2, commands-per-event-loop: 50, delay-queue-size: 50.
Initialize Client
Examine the code snippets below.
- Initialize client policy with event loops.
ClientPolicy policy = new ClientPolicy(); clientPolicy.eventLoops = eventLoops;
- Set total concurrent connections per node by multiplying concurrency level at event loop (maxCommandsInProcess) by the number of event loops.
concurrentMax = commandsPerEventLoop * numLoops;
- This is the max number of commands or requests per node if all requests go to one node. Adjust the default connection pool size of 300 if concurrentMax is larger.
if (clientPolicy.maxConnsPerNode < concurrentMax) { clientPolicy.maxConnsPerNode = concurrentMax; }
- Set total concurrent connections per node by multiplying concurrency level at event loop (maxCommandsInProcess) by the number of event loops.
- Initialize the client with the client policy and seed hosts in cluster.
Host[] hosts = Host.parseHosts("localhost", 3000); AerospikeClient client = new AerospikeClient(clientPolicy, hosts);
In the following cell, the function InitializeClient allows initialization of the client with specified parameters.
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.Host;
import com.aerospike.client.AerospikeClient;
// a function to initialize the client with specified parameters
AerospikeClient InitializeClient(EventLoops eventLoops, int numLoops, int commandsPerEventLoop, Host[] hosts) {
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.eventLoops = eventLoops;
int concurrentMax = commandsPerEventLoop * numLoops;
if (clientPolicy.maxConnsPerNode < concurrentMax) {
clientPolicy.maxConnsPerNode = concurrentMax;
}
AerospikeClient client = new AerospikeClient(clientPolicy, hosts);
return client;
}
// initialize the client
Host[] hosts = Host.parseHosts("localhost", 3000);
AerospikeClient client = InitializeClient(eventLoops, NumLoops, CommandsPerEventLoop, hosts);
System.out.print("Client initialized.\n");
Output:
Client initialized.
Initialize event loop throttles and atomic operation count.
The event loop throttles object is initialized with the number of event loops and commands per event loop. It provides two methods "waitForSlot" and "addSlot" to manage concurrency for an event loop, both take an index parameter that identifies the event loop.
throttles = new Throttles(numLoops, commandsPerEventLoop);
The operation count is used to track the number of finished operations. Because multiple callback threads access and increment it concurrently, it is defined as an AtomicInteger, which has support for atomic operation get/increment operations.
AtomicInteger asyncOpCount = new AtomicInteger();
In the following cell, the function InitializeThrottles creates throttles for event loops with specified parameters.
import com.aerospike.client.async.Throttles;
// creates event loop throttles with specified parameters
Throttles InitializeThrottles(int numLoops, int commandsPerEventLoop) {
Throttles throttles = new Throttles(numLoops, commandsPerEventLoop);
return throttles;
}
// initialize event loop throttles
Throttles throttles = InitializeThrottles(NumLoops, CommandsPerEventLoop);
System.out.format("Throttles initialized for %s loops with %s concurrent operations per loop.\n",
NumLoops, CommandsPerEventLoop);
// initialize the atomic integer to keep track of async operations count
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger asyncOpCount = new AtomicInteger();
System.out.format("Atomic operation count initialized.");;
Output:
Throttles initialized for 2 loops with 50 concurrent operations per loop.
Atomic operation count initialized.
Define Listener and Handlers
Define the listener with success and failure handlers to process results. Below, we have MyWriteListener derived from WriteListener to process insertion of records that:
- implements success and failure handlers
- releases a slot in the event loop on success or failure for another insert to proceed throttles.addSlot(eventLoopIndex, 1);
- signals completion through monitor on failure or when the write count reaches the expected final count monitor.notifyComplete();
- prints progress every "progressFreq" records
import com.aerospike.client.Key;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.async.Monitor;
import com.aerospike.client.AerospikeException;
// write listener
// - implements success and failure handlers
// - releases a slot on success or failure for another insert to proceed
// - signals completion through monitor on failure or when the write count reaches the expected final count
// - prints progress every "progressFreq" records*/
class MyWriteListener implements WriteListener {
private final Key key;
private final int eventLoopIndex;
private final int finalCount;
private Monitor monitor;
private final int progressFreq;
public MyWriteListener(Key key, int eventLoopIndex, int finalCount, Monitor monitor, int progressFreq) {
this.key = key;
this.eventLoopIndex = eventLoopIndex;
this.finalCount = finalCount;
this.monitor = monitor;
this.progressFreq = progressFreq;
}
// Write success callback.
public void onSuccess(Key key) {
// Write succeeded.
throttles.addSlot(eventLoopIndex, 1);
int currentCount = asyncOpCount.incrementAndGet();
if ( progressFreq > 0 && currentCount % progressFreq == 0) {
System.out.format("Inserted %s records.\n", currentCount);
}
if (currentCount == finalCount) {
monitor.notifyComplete();
}
}
// Error callback.
public void onFailure(AerospikeException e) {
throttles.addSlot(eventLoopIndex, 1);
System.out.format("Put failed: namespace=%s set=%s key=%s exception=%s\n",
key.namespace, key.setName, key.userKey, e.getMessage());
monitor.notifyComplete();
}
}
System.out.print("Write listener defined.");
Output:
Write listener defined.
Submit Async Requests Using Throttling
While submitting async requests it is important to keep below the planned concurrent capacity using throttling.
The function InsertRecords below inserts the specified number of records asynchronously with id-\<index> as the user-key and two integer fields bin1 and bin2. It keeps track of and returns the elapsed time.
Throttling is achieved by waiting for an available slot in the event loop.
if (throttles.waitForSlot(eventLoopIndex, 1)) { // submit async request }
After submitting all requests, the main thread must wait for outstanding requests to complete before closing.
monitor.waitTillComplete();
import java.util.concurrent.TimeUnit;
import com.aerospike.client.Bin;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.async.EventLoop;
long InsertRecords(int numRecords, EventLoops eventLoops, Throttles throttles, int progressFreq) {
long startTime = System.nanoTime();
Monitor monitor = new Monitor();
asyncOpCount.set(0);
WritePolicy policy = new WritePolicy();
for (int i = 0; i < numRecords; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), numRecords*10+i);
EventLoop eventLoop = eventLoops.next();
int eventLoopIndex = eventLoop.getIndex();
if (throttles.waitForSlot(eventLoopIndex, 1)) {
try {
client.put(eventLoop, new MyWriteListener(key, eventLoopIndex, numRecords, monitor, progressFreq),
policy, key, bin1, bin2);
}
catch (Exception e) {
throttles.addSlot(eventLoopIndex, 1);
}
}
}
monitor.waitTillComplete();
long endTime = System.nanoTime();
return (endTime - startTime);
}
final int NumRecords = 100000;
long elapsedTime = InsertRecords(NumRecords, eventLoops, throttles, NumRecords/4);
System.out.format("Inserted %s records with %s event-loops and %s commands-per-loop in %s milliseconds.\n",
NumRecords, NumLoops, CommandsPerEventLoop, elapsedTime/1000000);;
Output:
Inserted 25000 records.
Inserted 50000 records.
Inserted 75000 records.
Inserted 100000 records.
Inserted 100000 records with 2 event-loops and 50 commands-per-loop in 4212 milliseconds.
Closing
Both AerospikeClient and EventLoops should be closed before program shutdown. The latest client waits for pending async commands to finish before performing the actual close, so there is no need to externally track pending async commands. Earlier versions provide a waitToComplete() call on Monitor object to ensure async operations are completed. The Cleanup function implemented above truncates the database and closes client and event-loops.
// truncates database and closes client and event-loops
Cleanup();
System.out.println("Removed data and closed client and event loops.");
Output:
Removed data and closed client and event loops.
Nested and Inline Async Operations
It is possible to nest a series of async calls, one in the processing logic of another. Some simple examples of such cascaded calls are:
- Retry the same operation in the failure handler
- Issue an async read to validate an async write operation
- Issue an async write to update a record retrieved from an async read operation.
The following code illustrates a simplistic example of how each record retrieved from an async filtered scan is updated asynchronously by incrementing the value of bin2. Note the inline implementation of WriteListener. The scan filter selects records between bin1 values of 1 and 1000. Throttling and progress report are also present as described above.
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.listener.RecordSequenceListener;
import com.aerospike.client.Record;
import com.aerospike.client.exp.Exp;
// Scan callback
class ScanRecordSequenceListener implements RecordSequenceListener {
private EventLoops eventLoops;
private Throttles throttles;
private Monitor scanMonitor;
private AtomicInteger writeCount = new AtomicInteger();
private int scanCount = 0;
private final int progressFreq;
public ScanRecordSequenceListener(EventLoops eventLoops, Throttles throttles, Monitor scanMonitor,
int progressFreq) {
this.eventLoops = eventLoops;
this.throttles = throttles;
this.scanMonitor = scanMonitor;
this.progressFreq = progressFreq;
}
public void onRecord(Key key, Record record) throws AerospikeException {
++scanCount;
if ( progressFreq > 0 && scanCount % progressFreq == 0) {
System.out.format("Scan returned %s records.\n", scanCount);
}
// submit async update operation with throttle
EventLoop eventLoop = eventLoops.next();
int eventLoopIndex = eventLoop.getIndex();
if (throttles.waitForSlot(eventLoopIndex, 1)) { // throttle by waiting for an available slot
try {
WritePolicy policy = new WritePolicy();
Bin bin2 = new Bin(new String("bin2"), 1);
client.add(eventLoop, new WriteListener() { // inline write listener
public void onSuccess(final Key key) {
// Write succeeded.
throttles.addSlot(eventLoopIndex, 1);
int currentCount = writeCount.incrementAndGet();
if ( progressFreq > 0 && currentCount % progressFreq == 0) {
System.out.format("Processed %s records.\n", currentCount);
}
}
public void onFailure(AerospikeException e) {
System.out.format("Put failed: namespace=%s set=%s key=%s exception=%s\n",
key.namespace, key.setName, key.userKey, e.getMessage());
throttles.addSlot(eventLoopIndex, 1);
int currentCount = writeCount.incrementAndGet();
if ( progressFreq > 0 && currentCount % progressFreq == 0) {
System.out.format("Processed %s records.\n", currentCount);
}
}
},
policy, key, bin2);
}
catch (Exception e) {
System.out.format("Error: exception in write listener - %s", e.getMessage());
}
}
}
public void onSuccess() {
if (scanCount != writeCount.get()) { // give the last write some time to finish
try {
Thread.sleep(100);
}
catch(InterruptedException e) {
System.out.format("Error: exception - %s", e);
}
}
scanMonitor.notifyComplete();
}
public void onFailure(AerospikeException e) {
System.out.format("Error: scan failed with exception - %s", e);
scanMonitor.notifyComplete();
}
}
// cleanup prior state
Cleanup();
// initialize data, event loops and client
int numRecords = 100000;
int numLoops = 2;
int commandsPerLoop = 25;
int delayQueueSize = 0;
eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, numLoops, commandsPerLoop, delayQueueSize);
client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts);
throttles = InitializeThrottles(numLoops, commandsPerLoop);
InsertRecords(numRecords, eventLoops, throttles, 0);
System.out.format("Inserted %s records.\n", numRecords);
EventLoop eventLoop = eventLoops.next();
Monitor scanMonitor = new Monitor();
int progressFreq = 100;
// issue async scan that in turn issues async update on each returned record
ScanPolicy policy = new ScanPolicy();
policy.filterExp = Exp.build(
Exp.and(
Exp.le(Exp.intBin("bin1"), Exp.val(1000)),
Exp.ge(Exp.intBin("bin1"), Exp.val(1))));
client.scanAll(eventLoop, new ScanRecordSequenceListener(eventLoops, throttles, scanMonitor, progressFreq),
policy, Namespace, Set);
scanMonitor.waitTillComplete();
System.out.format("Done: nested async scan and update");;
Output:
Inserted 100000 records.
Scan returned 100 records.
Processed 100 records.
Scan returned 200 records.
Processed 200 records.
Scan returned 300 records.
Processed 300 records.
Scan returned 400 records.
Processed 400 records.
Scan returned 500 records.
Processed 500 records.
Scan returned 600 records.
Processed 600 records.
Scan returned 700 records.
Processed 700 records.
Scan returned 800 records.
Processed 800 records.
Scan returned 900 records.
Processed 900 records.
Scan returned 1000 records.
Processed 1000 records.
Done: nested async scan and update
Misc Examples
Delay Queue Full Error
If the delay queue fills up, a request will not be accepted and the client will return “delay queue full” error. Below we simulate this condition by having 25 slots and a delay queue of 20 in 2 event loops each (can handle total 90 outstanding requests) and issuing a hundred concurrent requests. The throttle is effectively turned off by a large setting for the number of requests to go through.
// clean up the current state
Cleanup();
// initialize data, event loops and client
int numRecords = 100;
int numLoops = 2;
int commandsPerLoop = 25;
int delayQueueSize = 20;
int noThrottle = 10000; //effectively no throttle
eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, numLoops, commandsPerLoop, delayQueueSize);
client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts);
throttles = InitializeThrottles(numLoops, noThrottle);
// attempt to insert records above the available slots and delay queue capacity
long elapsedTime = InsertRecords(numRecords, eventLoops, throttles, 0);
System.out.format("%s ops/ms with event-loops: %s and commands-per-loop: %s.\n",
numRecords/(elapsedTime/1000000), numLoops, commandsPerLoop);;
Output:
Put failed: namespace=test set=async-ops key=id-90 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-92 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-91 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-93 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-95 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-97 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-99 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-94 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-96 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-98 exception=Error -9,0,30000,0,0: Async delay queue is full
16 ops/ms with event-loops: 2 and commands-per-loop: 25.
Comparing Different Settings
The code below allows comparison of insert throughput with different parameters: event loops type, number of event loops, and concurrency level in each loop. It doesn't produce meaningful results in the default notebook container setting where the client and server are running in the same container. A meaningful comparison can be drawn by pointing to the desired server cluster and also adjusting the client environment.
// Throughput with parameterized async insertion
int numRecords = 100000;
EventLoopType[] eventLoopOptions = {EventLoopType.DIRECT_NIO, EventLoopType.NETTY_NIO, EventLoopType.NETTY_EPOLL};
int[] numLoopsOptions = {2, 4, 8};
int[] commandsPerLoopOptions = {50, 100, 200};
for (EventLoopType eventLoopType: eventLoopOptions) {
for (int numLoops: numLoopsOptions) {
for (int commandsPerLoop: commandsPerLoopOptions) {
Cleanup();
eventLoops = InitializeEventLoops(eventLoopType, numLoops, commandsPerLoop, 0);
client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts);
throttles = InitializeThrottles(numLoops, commandsPerLoop);
long elapsedTime = InsertRecords(numRecords, eventLoops, throttles, 0);
System.out.format("%s ops/ms with %s %s event-loops and %s commands-per-loop.\n",
numRecords/(elapsedTime/1000000), numLoops, eventLoopType, commandsPerLoop);
}
}
}
System.out.println("Done.");;
Output:
27 ops/ms with 2 DIRECT_NIO event-loops and 50 commands-per-loop.
23 ops/ms with 2 DIRECT_NIO event-loops and 100 commands-per-loop.
29 ops/ms with 2 DIRECT_NIO event-loops and 200 commands-per-loop.
34 ops/ms with 4 DIRECT_NIO event-loops and 50 commands-per-loop.
35 ops/ms with 4 DIRECT_NIO event-loops and 100 commands-per-loop.
33 ops/ms with 4 DIRECT_NIO event-loops and 200 commands-per-loop.
23 ops/ms with 8 DIRECT_NIO event-loops and 50 commands-per-loop.
22 ops/ms with 8 DIRECT_NIO event-loops and 100 commands-per-loop.
19 ops/ms with 8 DIRECT_NIO event-loops and 200 commands-per-loop.
21 ops/ms with 2 NETTY_NIO event-loops and 50 commands-per-loop.
25 ops/ms with 2 NETTY_NIO event-loops and 100 commands-per-loop.
27 ops/ms with 2 NETTY_NIO event-loops and 200 commands-per-loop.
32 ops/ms with 4 NETTY_NIO event-loops and 50 commands-per-loop.
31 ops/ms with 4 NETTY_NIO event-loops and 100 commands-per-loop.
33 ops/ms with 4 NETTY_NIO event-loops and 200 commands-per-loop.
22 ops/ms with 8 NETTY_NIO event-loops and 50 commands-per-loop.
22 ops/ms with 8 NETTY_NIO event-loops and 100 commands-per-loop.
20 ops/ms with 8 NETTY_NIO event-loops and 200 commands-per-loop.
20 ops/ms with 2 NETTY_EPOLL event-loops and 50 commands-per-loop.
23 ops/ms with 2 NETTY_EPOLL event-loops and 100 commands-per-loop.
24 ops/ms with 2 NETTY_EPOLL event-loops and 200 commands-per-loop.
25 ops/ms with 4 NETTY_EPOLL event-loops and 50 commands-per-loop.
25 ops/ms with 4 NETTY_EPOLL event-loops and 100 commands-per-loop.
26 ops/ms with 4 NETTY_EPOLL event-loops and 200 commands-per-loop.
23 ops/ms with 8 NETTY_EPOLL event-loops and 50 commands-per-loop.
23 ops/ms with 8 NETTY_EPOLL event-loops and 100 commands-per-loop.
21 ops/ms with 8 NETTY_EPOLL event-loops and 200 commands-per-loop.
Done.
Takeaways and Conclusion
The tutorial described the architecture of and key concepts in asynchronous operations in Aerospike client. It presented the programming framework in which async requests can be submitted and handled. It illustrated with code how event loops, throttling, inline async calls are implemented. The trade-offs that a developer needs to make for which execution modes to employ - synchronous, asynchronous, or background - involve multiple factors including the nature of operations, client and server setup, throughput needs, and programming complexity.
Clean up
Remove tutorial data and close connection.
Cleanup();
System.out.println("Removed tutorial data and closed server connection.");
Output:
Removed tutorial data and closed server connection.
Further Exploration and Resources
Here are some links for further exploration
Resources
- Related notebooks
- Aerospike Developer Hub
- Github repos
- Documentation
- Blog
Next steps
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.