This tutorial describes asynchronous operations in Aerospike: why they are used, the architecture, and how to program with async operations.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see the benefits, design, and specifics of
programming with asynchronous operations in Aerospike.
Aerospike provides asynchronous APIs for many operations. We will
describe the benefits of using async operations and key abstractions in
the client related to async requests. After covering the theoretical
ground, we will show how it all comes together with specific code
examples.
The notebook tutorial has two parts:
architecture and concepts, and
coding examples.
The main topics include:
Execution models in Aerospike
Benefits of async
Key concepts
Framework for async programming
Coding examples
Prerequisites
This tutorial assumes familiarity with the following topics:
Install the Aerospike Java client and the Java Netty package, which is
described later in the notebook.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
<scope>compile</scope>
</dependency>
</dependencies>
Constants and Convenience Functions
We will use some constants and convenience functions throughout this
tutorial, including the namespace “test” and set “async-ops”.
finalStringNamespace="test";
finalStringSet="async-ops";
// truncate data, close client and event loops - called multiple times to initialize with different options
// described in greater detail later
voidCleanup() {
try {
client.truncate(null, Namespace, Set, null);
}
catch(AerospikeExceptione) {
// ignore
}
client.close();
eventLoops.close();
};
Open a Terminal Tab
You may execute shell commands including Aerospike tools like
aql and
asadm in the
terminal tab. Open a terminal tab by selecting File->Open from the
notebook menu, and then New->Terminal.
Synchronous, Asynchronous, and Background Operations
An application uses the Aerospike client library (aka the client) to
interact with Aerospike Database. The client sets up a connection to the
appropriate server node and sends in a request for execution in one of
the following modes:
Synchronous: The request thread makes the request, waits for the
response, and processes the response upon arrival.
Asynchronous: The request thread submits one or more requests, and
results are processed in one or more callback thread(s) as they arrive.
Background: The request thread submits the request and operation (or
task) is completed in the background. The submission returns
immediately, while the actual operation executes separately. The
application can check the completion status of the task, and after it is
completed may examine the results in the database with one or more
separate requests.
Note, a background operation may be considered a special type of
asynchronous operation, and it is applicable only for updates in
Aerospike. By asynchronous operations we refer to only those that
return results in a callback.
Asynchronous Operations For Better Resource Efficiency
During the time a request is sent to the server and the result arrives
(“request latency”), the client and application need not wait idly if a
high throughput is the goal. A higher throughput can be achieved through
concurrent requests.
Synchronous: The application can spawn multiple threads and process
multiple requests in parallel, one per thread at a time.
Asynchronous: The application can process requests asynchronously by
submitting them in parallel without waiting for the results. The
results are processed as they arrive in a different “callback”
thread. An async request uses a dedicated connection to the server.
Pipeline: Multiple requests could be sent over the same connection
to the same server node, and their results received over the same
connection. Thus there is greater sharing of threads and connections
across multiple requests. Aerospike currently does not support
pipeline processing.
In many cases, asynchronous processing can be more resource efficient
and can deliver better throughput than multi-threaded synchronous
processing because threads have memory and CPU (context-switch) overhead
and their number may be limited by the OS.
On the other hand, the asynchronous model is more complex to program and
debug. The application should make judicious use of synchronous,
asynchronous, and background requests. The client can perform different
type of commands in a single instance.
It should be noted that background operations when appropriate would
typically deliver superior throughput especially in a non-UDF
invocation.
exists (array listener and sequence listener), get (batch list
and batch sequence listener), get (record array and record
sequence listener), getHeader
Query/scan: Callback handles a series of records, a single record at
a time.
The async methods take two additional arguments than their sync
variants: the “event loops” and “listener (callback)”. See the code in
Async Framework section below.
Event loops: An event-loop represents the loop of “submit a request”
and “asynchronously process the result” for concurrent processing of
events or requests. Multiple event loops are used to leverage
multiple CPU cores.
Listener: The listener encapsulates the processing of results.
Listener types: Depending on the expected number of records in
the result and whether they arrive at once or individually,
different listener types are to be used such as a single record
listener, a record array listener, or a record sequence listener.
Completion handlers: A single record or record array is
processed with the success or failure handler. In a record
sequence listener, each record is processed with a “record”
handler, whereas the success handler is called to mark the end
of the sequence.
Application Call Sequence
The application is responsible for spreading requests evenly across
event loops as well as throttling the rate of requests if the request
rate can exceed the client or server capacity. The call sequence
involves these steps (see the code in Async
Framework section below):
Initialize event loops.
Implement the listener with success and failure handlers.
Submit requests across event loops, throttling to stay below maximum
outstanding requests limit.
Wait for all outstanding requests to finish.
Understanding Event Loops
Let’s look at the key concepts relating to event loops. As described
above, an event loop represents concurrent submit-callback processing of
requests. See the code in Async Framework section
below.
Number of event loops: In order to maximize parallelism of the
client hardware, as many event loops are created as the number of cores
dedicated for the Aerospike application. An event pool is aligned with a
CPU core, not to a server node or a request type.
Concurrency level: The maximum concurrency level in each event loop
depends on the effective server throughput seen by the client, and in
aggregate may not exceed it. A larger value would result in request
timeouts and other failures.
Connection pools and event loops: Connection pools are allocated on
a per node basis, and are independent of event pools. When an async
request needs to connect to a node, it uses a connection from the node’s
connection pool only for the duration of the request and then releases
it.
Connection pool size: Concurrency across all loops must be supported
by the number of connections in the connection pool. The connection pool
per node should be set equal to or greater than the total number of
outstanding requests across all event loops (because all requests may go
to the same node in the extreme case).
Delay queue buffer: To buffer a temporary mismatch in processing and
submission rates, there is a delay queue buffer in front of an event
loop where requests are held until an async request slot becomes
available in the event loop. The queued request is automatically
assigned to a slot and processed without involvement of the application.
Throttling: The delay queue cannot buffer a long running mismatch in
submission and processing speeds, however, and if the wait queue fills
up, a request will not be accepted and the client will return “delay
queue full” error. The application should throttle by keeping track of
outstanding requests and issue a new request when an outstanding one
finishes. If delay queue size is set to zero, throttling must also be
handled in the application code.
Event Loop Variants: Netty, NIO, EPOLL
Both Netty and Direct NIO event loops are supported in Aerospike.
Netty is an asynchronous event-driven network
application framework for high-performance servers based on Java
Non-blocking IO
(NIO) package.
Epoll (event poll)is a Linux
specific construct and allows for a process to monitor multiple file
descriptors and get notifications when I/O is possible on them.
Netty allows users to share their existing event loops with
AerospikeClient which can improve performance. Netty event loops are
also required when using TLS connections. However Netty is an optional
external library dependency.
Direct NIO event loops are lighter weight and slightly faster than Netty
defaults when not sharing event loops. Direct NIO does not have an
external library dependency.
You should consider trade-offs in using the types of event loops - refer
to the links provided for further details.
Async Framework
Below we walk through the steps in setting up a typical async operation
framework.
Initialize event loops
Initialize event loops. Allocate an event loop for each CPU core.
Examine the code snippets below.
Initialize event policy. Select level of parallelism desired; cannot exceed server throughput. EventPolicy eventPolicy = EventPolicy(); final CommandsPerEventLoop = 50; eventPolicy.maxCommandsInProcess = commandsPerEventLoop;
Select delay queue buffer size in front of the event loop. maxCommandsInQueue = 50; eventPolicy.maxCommandsInQueue = maxCommandsInQueue;
Create event loops object. // here we use direct nio and 2 events loops numLoops = 2; EventLoops eventLoops = new NioEventLoops(eventPolicy, numLoops);
In the following cell, the function InitializeEventLoops allows
initialization of different types of event loops. The function will be
called multiple times later in the notebook to experiment with different
settings.
System.out.format("Event loops initialized with num-loops: %s, commands-per-event-loop: %s, delay-queue-size: %s.\n",
NumLoops, CommandsPerEventLoop, DelayQueueSize);;
Output:
Event loops initialized with num-loops: 2, commands-per-event-loop: 50, delay-queue-size: 50.
Initialize Client
Examine the code snippets below.
Initialize client policy with event loops. ClientPolicy policy = new ClientPolicy(); clientPolicy.eventLoops = eventLoops;
Set total concurrent connections per node by multiplying concurrency level at event loop (maxCommandsInProcess) by the number of event loops. concurrentMax = commandsPerEventLoop * numLoops;
This is the max number of commands or requests per node if all requests go to one node. Adjust the default connection pool size of 300 if concurrentMax is larger. if (clientPolicy.maxConnsPerNode < concurrentMax) { clientPolicy.maxConnsPerNode = concurrentMax; }
Initialize the client with the client policy and seed hosts in cluster. Host[] hosts = Host.parseHosts("localhost", 3000); AerospikeClient client = new AerospikeClient(clientPolicy, hosts);
In the following cell, the function InitializeClient allows
initialization of the client with specified parameters.
importcom.aerospike.client.policy.ClientPolicy;
importcom.aerospike.client.Host;
importcom.aerospike.client.AerospikeClient;
// a function to initialize the client with specified parameters
AerospikeClientInitializeClient(EventLoops eventLoops, int numLoops, int commandsPerEventLoop, Host[] hosts) {
Initialize event loop throttles and atomic operation count.
The event loop throttles object is initialized with the number of event loops and commands per event loop. It provides two methods “waitForSlot” and “addSlot” to manage concurrency for an event loop, both take an index parameter that identifies the event loop.
throttles = new Throttles(numLoops, commandsPerEventLoop);
The operation count is used to track the number of finished operations. Because multiple callback threads access and increment it concurrently, it is defined as an AtomicInteger, which has support for atomic operation get/increment operations.
AtomicInteger asyncOpCount = new AtomicInteger();
In the following cell, the function InitializeThrottles creates throttles for event loops with specified parameters.
importcom.aerospike.client.async.Throttles;
// creates event loop throttles with specified parameters
ThrottlesInitializeThrottles(int numLoops, int commandsPerEventLoop) {
Throttles initialized for 2 loops with 50 concurrent operations per loop.
Atomic operation count initialized.
Define Listener and Handlers
Define the listener with success and failure handlers to process
results. Below, we have MyWriteListener derived from WriteListener to
process insertion of records that:
implements success and failure handlers
releases a slot in the event loop on success or failure for another
insert to proceed throttles.addSlot(eventLoopIndex, 1);
signals completion through monitor on failure or when the write
count reaches the expected final count monitor.notifyComplete();
While submitting async requests it is important to keep below the
planned concurrent capacity using throttling.
The function InsertRecords below inserts the specified number of records
asynchronously with id-<index> as the user-key and two integer fields
bin1 and bin2. It keeps track of and returns the elapsed time.
Throttling is achieved by waiting for an available slot in the event loop.
if (throttles.waitForSlot(eventLoopIndex, 1)) { // submit async request }
After submitting all requests, the main thread must wait for outstanding requests to complete before closing.
monitor.waitTillComplete();
importjava.util.concurrent.TimeUnit;
importcom.aerospike.client.Bin;
importcom.aerospike.client.policy.WritePolicy;
importcom.aerospike.client.async.EventLoop;
longInsertRecords(int numRecords, EventLoops eventLoops, Throttles throttles, int progressFreq) {
Inserted 100000 records with 2 event-loops and 50 commands-per-loop in 4212 milliseconds.
Closing
Both AerospikeClient and EventLoops should be closed before program
shutdown. The latest client waits for pending async commands to finish
before performing the actual close, so there is no need to externally
track pending async commands. Earlier versions provide a
waitToComplete() call on Monitor object to ensure async operations are
completed. The Cleanup function implemented above truncates the database
and closes client and event-loops.
// truncates database and closes client and event-loops
Cleanup();
System.out.println("Removed data and closed client and event loops.");
Output:
Removed data and closed client and event loops.
Nested and Inline Async Operations
It is possible to nest a series of async calls, one in the processing
logic of another. Some simple examples of such cascaded calls are:
Retry the same operation in the failure handler
Issue an async read to validate an async write operation
Issue an async write to update a record retrieved from an async read
operation.
The following code illustrates a simplistic example of how each record
retrieved from an async filtered scan is updated asynchronously by
incrementing the value of bin2. Note the inline implementation of
WriteListener. The scan filter selects records between bin1 values of 1
and 1000. Throttling and progress report are also present as described
above.
System.out.format("Done: nested async scan and update");;
Output:
Inserted 100000 records.
Scan returned 100 records.
Processed 100 records.
Scan returned 200 records.
Processed 200 records.
Scan returned 300 records.
Processed 300 records.
Scan returned 400 records.
Processed 400 records.
Scan returned 500 records.
Processed 500 records.
Scan returned 600 records.
Processed 600 records.
Scan returned 700 records.
Processed 700 records.
Scan returned 800 records.
Processed 800 records.
Scan returned 900 records.
Processed 900 records.
Scan returned 1000 records.
Processed 1000 records.
Done: nested async scan and update
Misc Examples
Delay Queue Full Error
If the delay queue fills up, a request will not be accepted and the
client will return “delay queue full” error. Below we simulate this
condition by having 25 slots and a delay queue of 20 in 2 event loops
each (can handle total 90 outstanding requests) and issuing a hundred
concurrent requests. The throttle is effectively turned off by a large
setting for the number of requests to go through.
Put failed: namespace=test set=async-ops key=id-90 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-92 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-91 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-93 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-95 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-97 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-99 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-94 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-96 exception=Error -9,0,30000,0,0: Async delay queue is full
Put failed: namespace=test set=async-ops key=id-98 exception=Error -9,0,30000,0,0: Async delay queue is full
16 ops/ms with event-loops: 2 and commands-per-loop: 25.
Comparing Different Settings
The code below allows comparison of insert throughput with different
parameters: event loops type, number of event loops, and concurrency
level in each loop. It doesn’t produce meaningful results in the default
notebook container setting where the client and server are running in
the same container. A meaningful comparison can be drawn by pointing to
the desired server cluster and also adjusting the client environment.
27 ops/ms with 2 DIRECT_NIO event-loops and 50 commands-per-loop.
23 ops/ms with 2 DIRECT_NIO event-loops and 100 commands-per-loop.
29 ops/ms with 2 DIRECT_NIO event-loops and 200 commands-per-loop.
34 ops/ms with 4 DIRECT_NIO event-loops and 50 commands-per-loop.
35 ops/ms with 4 DIRECT_NIO event-loops and 100 commands-per-loop.
33 ops/ms with 4 DIRECT_NIO event-loops and 200 commands-per-loop.
23 ops/ms with 8 DIRECT_NIO event-loops and 50 commands-per-loop.
22 ops/ms with 8 DIRECT_NIO event-loops and 100 commands-per-loop.
19 ops/ms with 8 DIRECT_NIO event-loops and 200 commands-per-loop.
21 ops/ms with 2 NETTY_NIO event-loops and 50 commands-per-loop.
25 ops/ms with 2 NETTY_NIO event-loops and 100 commands-per-loop.
27 ops/ms with 2 NETTY_NIO event-loops and 200 commands-per-loop.
32 ops/ms with 4 NETTY_NIO event-loops and 50 commands-per-loop.
31 ops/ms with 4 NETTY_NIO event-loops and 100 commands-per-loop.
33 ops/ms with 4 NETTY_NIO event-loops and 200 commands-per-loop.
22 ops/ms with 8 NETTY_NIO event-loops and 50 commands-per-loop.
22 ops/ms with 8 NETTY_NIO event-loops and 100 commands-per-loop.
20 ops/ms with 8 NETTY_NIO event-loops and 200 commands-per-loop.
20 ops/ms with 2 NETTY_EPOLL event-loops and 50 commands-per-loop.
23 ops/ms with 2 NETTY_EPOLL event-loops and 100 commands-per-loop.
24 ops/ms with 2 NETTY_EPOLL event-loops and 200 commands-per-loop.
25 ops/ms with 4 NETTY_EPOLL event-loops and 50 commands-per-loop.
25 ops/ms with 4 NETTY_EPOLL event-loops and 100 commands-per-loop.
26 ops/ms with 4 NETTY_EPOLL event-loops and 200 commands-per-loop.
23 ops/ms with 8 NETTY_EPOLL event-loops and 50 commands-per-loop.
23 ops/ms with 8 NETTY_EPOLL event-loops and 100 commands-per-loop.
21 ops/ms with 8 NETTY_EPOLL event-loops and 200 commands-per-loop.
Done.
Takeaways and Conclusion
The tutorial described the architecture of and key concepts in
asynchronous operations in Aerospike client. It presented the
programming framework in which async requests can be submitted and
handled. It illustrated with code how event loops, throttling, inline
async calls are implemented. The trade-offs that a developer needs to
make for which execution modes to employ - synchronous, asynchronous, or
background - involve multiple factors including the nature of
operations, client and server setup, throughput needs, and programming
complexity.
Clean up
Remove tutorial data and close connection.
Cleanup();
System.out.println("Removed tutorial data and closed server connection.");
Output:
Removed tutorial data and closed server connection.
Visit Aerospike notebooks
repo to
run additional Aerospike notebooks. To run a different notebook,
download the notebook from the repo to your local machine, and then
click on File->Open in the notebook menu, and select Upload.