Blog

Async C Client: Opening the Door to Event-Based Processing

Brian Nichols
March 16, 2016|11 min read

The Aerospike C Interface Does Async

The Aerospike C/C++ client, version 4.0, is the latest client to support asynchronous programming. In doing so, it joins the Java and C# clients. The C client has async interface functions for put, get, operate, apply, and other single-record commands, as well as for batch, scan, and query commands (except for Lua aggregation queries).

Sync vs. Async

A synchronous call to issue a command to the database ties up a thread and a socket connection to the database for the duration of the command. This approach doesn’t scale well for an application that may need to issue hundreds of overlapping commands. Examples include retrieving a user profile, recording a stock tick or updating an auction bid.

On the other hand, using asynchronous calls, a single thread can issue many commands and take callbacks as they complete. This approach minimizes both the number of threads required and context switches, lowers resource utilization, and enables better scaling.

Threads vs. Event Loops

Concurrent software is typically written as separate threads of execution that can run in parallel on multiple cores with multiple threads. In this model your code may have to be careful about simultaneous data access to avoid race conditions, deadlocks, and other issues. Large numbers of threads are usually inefficient.

Writing concurrent programs in multithreaded style isn’t the only way. A lot of concurrent software is organized around an “event loop” running in a single thread. The event loop manages the queuing and execution of functions on the event loop’s thread. This is in some ways a simpler concurrency model; your code never has to worry about simultaneous access by another thread because the code is organized into functions that the event loop calls serially, never in parallel. This model is as lightweight as possible because the switching between tasks is done via calling and returning from functions instead of via context switching between threads.

The event loop style of concurrent programming does have its challenges. Perhaps the main drawback is that event loop code can be hard to understand, debug, and modify because the functions that do the work aren’t organized in the familiar structure of a top-level function and a call tree of functions under it. Instead, there are many independent top-level functions, invoked by an external force: the event loop. Instead of A calling B and then C, A schedules B to be called, and then B schedules C. Or perhaps A schedules both B and C. In either case, the event loop will call B and then C when it can (in order). It can quickly get complicated.

Perhaps one of the greatest war stories on the subject of concurrent programming is the 2004 blog post, Multithreaded toolkits: A failed dream? by Graham Hamilton. Anyone doing event loop programming should find it a fun read.

Standard I/O Event Loop Libraries

The two most popular C/C++ event loop libraries are libuv and libev. Libuv is a Windows-compatible offshoot of libev, which is in turn an offshoot of libevent. The Node.js event model is based on libuv.

Why are we talking about event loops? Because the Aerospike C client’s async commands must return all of their results within a thread, and you want that thread to be one on an existing event loop. If you wish to use your own thread pool, or you wish to support a different event loop, the translation should be straightforward.

If your application is in the C/C++ event loop world and you want to use Aerospike, your day has come.

Async And The C Client

You can build the Aerospike C client library from source with either libev or libuv. The binary download of the C client comes with both libev and libuv forms. The client library’s as_event_loop abstract type insulates both the client and the app from the particular event loop implementation underneath enabling the developer to change which event loop is being used or extended for another event loop implementation. Async commands and callbacks communicate via an as_event_loop* argument.

If you’ve used the Aerospike client’s scan and query commands, you’ve already noticed that they deliver their results in callback style. The async versions of scan and query do so as well, but they return immediately rather than after the last result callback has occurred; a NULL value passed to the callback indicates no more results. Another difference is that the async versions of these functions, like the other async functions, deliver all callbacks via the event loop thread, whereas the sync version typically delivers the callbacks in multiple threads.

I’ve put together a short tutorial to show what it’s like to use the C client async interface.

Async C Tutorial Excerpts

This tutorial demonstrates how to read/write large batches using async programming interfaces. It is excerpted from the full tutorial and discusses the source code in async_tutorial.c on GitHub.

Async Commands and Event Loops

This is how an Aerospike client async command works:

  • The app initializes an event loop, which the client will use to deliver callbacks to the app.

  • The app then asks the client to connect to the database.

Then:

  • The app calls a client async function (from any thread), passing event loop and completion callback arguments.

  • The function returns.

  • The client hands the command to a socket for transmission to the appropriate node in the cluster.

  • The client gets a response from the node.

  • The client requests that the event loop call the app’s completion callback.

  • The event loop calls the callback on the event loop’s thread.

Callbacks don’t need special coding for concurrency because they are called serially in a single thread, the one used by the event loop.

Initialize Event Loops

The app tells the client how many event loops to use. The tutorial app uses only one event loop. The easy way is for the app to ask the client to create the event loops:

if (! as_event_create_loops(1)) {
    printf("Failed to create event loopn");
    return -1;
}

Alternatively, the app can create its own event loops and share them with the client. The full tutorial document shows how.

Writing Records

A command goes through several steps:

app ➔ client ➔ network ➔ node ➔ network ➔ client ➔ app

You can think of these steps as stages in a pipeline. As with any pipeline, you will get the most throughput when all the components of the pipeline are kept busy; some of the components can be overloaded if you apply too much pressure.

The tutorial app writes 5,000 records. If it were to wait for each write to complete before starting the next one, the app would not be keeping more than one component of the pipeline busy at a time. The app can get a lot more throughput by continually feeding overlapping commands into the pipeline so that all components always have something to do. Async programming makes this easy.

The tutorial app keeps 100 write commands in flight like this:

  • The app starts by issuing 100 async commands in the main thread;

  • In the completion callback, the app issues another command to replace the one that just completed.

Preventing Exhaustion of Client Resources

Each async command consumes resources, including memory and a socket connection, so it is possible to exhaust these resources with too many concurrent commands. Code to handle resource exhaustion errors is difficult, so the tutorial app avoids such errors by limiting the number of in-flight commands.

I arrived at the number 100 above by starting with a lower value, testing with increasing values until there were timeout errors, then backing off. If you have higher network latency, more nodes, more flash devices per node, or other differences from my test setup, you could arrive at a higher number.

The app also tunes a client parameter relating to socket connections, discussed later.

Controlling Writes

The tutorial app uses a counter struct to keep track of writes. The app limits the number of writes in flight to queue_size.

typedef struct {
   uint32_t next_id;     // Key of next record to write.
   uint32_t max;         // Number of records to write.
   uint32_t count;       // Records written.
   uint32_t queue_size;  // Maximum records allowed in flight
} counter;

Writing Records With Async

The app initializes the counter struct, where max_records is the number to be written:

counter counter = {
    .next_id = 0,
    .max = max_records,
    .count = 0,
    .queue_size = 100,
};

The app then starts things off:

static void
write_records_async(counter* counter)
{
    // Use same event loop for all records.
    as_event_loop* event_loop = as_event_loop_get();

    // Write queue_size commands on the async queue.
    for (uint32_t i = 0; i < counter->queue_size; i++) {
        if (! write_record(event_loop, counter)) {
            break;
        }
    }
}

The write_listener callback then issues a new command each time it is called until all records are written, thus keeping the number of in-flights constant:

static void
write_listener(as_error* err, void* udata, as_event_loop* event_loop)
{
    ...
    // Atomic increment is not necessary since only one event loop is used.
    if (++counter->count == counter->max) {
        // We have reached total records.
        printf("Wrote %u recordsn", counter->count);

        // Records can now be read in a batch.
        batch_read(event_loop, counter->max);
        return;
    }

    // Check if we need to write another record.
    if (counter->next_id < counter->max) {
        write_record(event_loop, counter);
    }
}

As mentioned in the section above on Async Commands and Event Loops, this callback works without special coding for concurrency because callbacks are called serially (in the thread used by the event loop).

The tutorial app’s write_record function is a wrapper for the aerospike_key_put_async function. The write_record function passes write_listener as the completion callback argument to keep the process going. This is like looping, or you could say it’s like recursion, but it’s definitely trickier than non-async programming.

static bool
write_record(as_event_loop* event_loop, counter* counter, uint32_t index)
{
    ...
    aerospike_key_put_async(&as, &err, NULL, &key, &rec, 
            write_listener, counter, event_loop, NULL);
}

Tuning client connections to the database

The tutorial app also limits async socket connections by using the following configuration:

as_config cfg;
as_config_init(&cfg);
cfg.async_max_conns_per_node = 200;

These connection limits are distributed across the number of event loops, so if the app were using 2 event loops, the 200 max connections per node would be divided into 100 max connections per node per event loop.

This number needs to be greater or equal to queue_size. Here it’s 2 times bigger, which is plenty.

Read Records With Async Batch

Reading a large number of records is easier than writing because the batch read async function handles pipelining.

The tutorial app uses async batch read to read the records just written, in one batched call:

static void
batch_read(as_event_loop* event_loop, uint32_t max_records)
{
    // Make a batch of all the keys we inserted.
    as_batch_read_records* records = as_batch_read_create(max_records);

    for (uint32_t i = 0; i < max_records; i++) {
        as_batch_read_record* record = as_batch_read_reserve(records);
        as_key_init_int64(&record->key, g_namespace, g_set, (int64_t)i);
        record->read_all_bins = true;
    }

    // Read these keys.
    as_error err;
    if (aerospike_batch_read_async(&as, &err, NULL, records, 
                batch_listener, NULL, event_loop) != AEROSPIKE_OK) {
        batch_listener(&err, records, NULL, event_loop);
    }
}

static void
batch_listener(as_error* err, as_batch_read_records* records, void* udata, as_event_loop* event_loop)
{
    if (err) {
        printf("aerospike_batch_read_async() returned %d - %sn", 
                err->code, err->message);
        as_batch_read_destroy(records);
        as_monitor_notify(&app_complete_monitor);
        return;
    }

    as_vector* list = &records->list;

    uint32_t n_found = 0;

    for (uint32_t i = 0; i < list->size; i++) {
        as_batch_read_record* record = as_vector_get(list, i);

        if (record->result == AEROSPIKE_OK) {
            n_found++;
        }
        else if (record->result == AEROSPIKE_ERR_RECORD_NOT_FOUND) {
            // The transaction succeeded but the record doesn't exist.
            printf("AEROSPIKE_ERR_RECORD_NOT_FOUNDn");
        }
        else {
            // The transaction failed.
            printf("Error %dn", record->result);
        }
    }

    printf("Found %u/%u recordsn", n_found, list->size);
    as_batch_read_destroy(records);
    as_monitor_notify(&app_complete_monitor);
}

Error Handling

The tutorial app’s error handling is minimal to keep it simple. There are many strategies for dealing with errors, depending on whether the application needs to retry the command, compensate for failure or undo the effect of an operation (e.g. an increment). However, that’s a topic for another day!

Where to Get More Information

Some tools and resources that may help you are listed below:

As always, we need your help and input to continue to enhance your experience on Aerospike. Please contribute your feedback, ideas and questions to our user forum, file Github issues or create a pull request for the next great feature you’d like to contribute to the Aerospike user community!