Skip to content

Async operations

Learn how to use asynchronous operations for high-throughput, non-blocking database access. Async operations allow your application to continue processing while waiting for database responses.

Why async?

ApproachBehaviorBest For
SyncBlocks thread until responseSimple scripts, low concurrency
AsyncReturns immediately, notifies on completionHigh throughput, web servers, microservices

Note: The Java SDK uses virtual threads and hence requires JDK 21 or above. However, performance tests have shown that JDK 26 offers significant performance benefits for virtual threads in high performance environments. For these environments, JDK 26 or above is recommended.

Async read

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
// Non-blocking: results are published to the returned stream as they arrive.
RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);
stream.getFirst().ifPresent(result -> {
if (result.isOk()) {
Record user = result.recordOrThrow();
System.out.println("Name: " + user.getString("name"));
}
});

Async write

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
RecordStream writeStream = session.insert(users)
.bins("name", "email")
.id("user-1").values("Alice", "alice@example.com")
.executeAsync(ErrorStrategy.IN_STREAM);
writeStream.forEach(result -> {
if (result.isOk()) {
System.out.println("Insert complete for key index " + result.index());
}
});

Parallel operations

Execute multiple operations concurrently:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.function.Function;
DataSet users = DataSet.of("test", "users");
// Parallelize with virtual threads: each task runs a synchronous query and maps the first record.
Function<String, Optional<Record>> readOne = id -> {
RecordStream s = session.query(users.id(id)).execute();
return s.getFirst().filter(RecordResult::isOk).map(RecordResult::recordOrThrow);
};
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<Optional<Record>> u1 = CompletableFuture.supplyAsync(() -> readOne.apply("user-1"), executor);
CompletableFuture<Optional<Record>> u2 = CompletableFuture.supplyAsync(() -> readOne.apply("user-2"), executor);
CompletableFuture<Optional<Record>> u3 = CompletableFuture.supplyAsync(() -> readOne.apply("user-3"), executor);
CompletableFuture.allOf(u1, u2, u3).join();
List<Optional<Record>> results = List.of(u1.join(), u2.join(), u3.join());
}

Async with callbacks (Java)

Handle results with callbacks instead of blocking:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> {
RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);
stream.getFirst().ifPresent(result -> {
if (result.isOk()) {
Record user = result.recordOrThrow();
System.out.println("Got user: " + user.getString("name"));
}
});
});
System.out.println("Operation submitted");

Async pipeline

Chain operations together:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
DataSet orders = DataSet.of("test", "orders");
session.query(users.id("user-1"))
.bins("loyalty_tier")
.executeAsync(ErrorStrategy.IN_STREAM)
.asCompletableFuture()
.thenCompose(results -> {
String tier = results.get(0).recordOrThrow().getString("loyalty_tier");
double discount = "gold".equals(tier) ? 0.20 : 0.05;
return session.update(orders.id("order-99"))
.bin("discount").setTo(discount)
.executeAsync(ErrorStrategy.IN_STREAM)
.asCompletableFuture();
})
.thenRun(() -> System.out.println("Discount applied!"))
.exceptionally(ex -> {
System.err.println("Pipeline failed: " + ex.getMessage());
return null;
});

Async batch

Batch operations also support async:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> {
RecordStream stream = session
.query(users.id("user-1"), users.id("user-2"), users.id("user-3"))
.executeAsync(ErrorStrategy.IN_STREAM);
long count = 0;
try {
while (stream.hasNext()) {
RecordResult rr = stream.next();
if (rr.isOk() && rr.recordOrNull() != null) {
count++;
}
}
} finally {
stream.close();
}
System.out.println("Batch complete, got " + count + " records");
});

Async query

Stream query results asynchronously:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> {
RecordStream queryStream = session.query(users)
.where("$.status == 'active'")
.executeAsync(ErrorStrategy.IN_STREAM);
queryStream.forEach((RecordResult result) -> {
if (result.isOk()) {
Record user = result.recordOrThrow();
System.out.println("Active user: " + user.getString("name"));
}
});
});

Java reactive streams with asPublisher()

asPublisher() returns a java.util.concurrent.Flow.Publisher<RecordResult> with backpressure. The stream closes automatically when the subscription completes, errors, or is cancelled. This is Unicast only — a second subscriber receives onError(IllegalStateException).

Async error handling

Async errors can either be handled with a callback lambda, or have error information placed in the stream.

API reference summary

MethodDescription
Java: .executeAsync(ErrorStrategy.IN_STREAM)Non-blocking execution that returns a RecordStream filled as results arrive
Python: await <builder>.execute() on an aerospike_sdk.aio session/builderNon-blocking execution that returns a RecordStream
CompletableFuture + virtual threadsCompose parallel synchronous execute() calls (Java)
CompletableFuture.allOf()Wait for multiple futures (Java)
awaitWait for async result (Python)
asyncio.gather()Wait for multiple tasks (Python)

Next steps

Behaviors

Configure timeouts and retries for async.

Behaviors →

Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?