Skip to content

Async operations

For the complete documentation index see: llms.txt

All documentation pages available in markdown.

Learn how to use asynchronous operations for high-throughput, non-blocking database access. Async operations allow your application to continue processing while waiting for database responses.

Why async?

ApproachBehaviorBest For
SyncBlocks thread until responseSimple scripts, low concurrency
AsyncReturns immediately, notifies on completionHigh throughput, web servers, microservices

See Error handling for more information about async execution modes.

Async read

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
// Non-blocking: results are published to the returned stream as they arrive.
RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);
stream.getFirst().ifPresent(result -> {
if (result.isOk()) {
Record user = result.recordOrThrow();
System.out.println("Name: " + user.getString("name"));
}
});

📖 API reference: DataSet.of(...) | DataSet.id(...) | Session.query(Key) | ChainableQueryBuilder.executeAsync(...) | RecordStream.getFirst() | RecordResult.isOk() | RecordResult.recordOrThrow() | Record.getString(...) | ErrorStrategy | ErrorStrategy.IN_STREAM

Async write

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
RecordStream writeStream = session.insert(users)
.bins("name", "email")
.id("user-1").values("Alice", "alice@example.com")
.executeAsync(ErrorStrategy.IN_STREAM);
writeStream.forEach(result -> {
if (result.isOk()) {
System.out.println("Insert complete for key index " + result.index());
}
});

📖 API reference: DataSet.of(...) | Session.insert(DataSet) | OperationObjectBuilder.bins(...) | IdValuesBuilder.id(...) | IdValuesRowBuilder.values(...) | ChainableQueryBuilder.executeAsync(...) | RecordStream.forEach(...) | RecordResult.isOk() | ErrorStrategy | ErrorStrategy.IN_STREAM

Parallel operations

Execute multiple operations concurrently:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.function.Function;
DataSet users = DataSet.of("test", "users");
// Parallelize with virtual threads: each task runs a synchronous query and maps the first record.
Function<String, Optional<Record>> readOne = id -> {
RecordStream s = session.query(users.id(id)).execute();
return s.getFirst().filter(RecordResult::isOk).map(RecordResult::recordOrThrow);
};
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<Optional<Record>> u1 = CompletableFuture.supplyAsync(() -> readOne.apply("user-1"), executor);
CompletableFuture<Optional<Record>> u2 = CompletableFuture.supplyAsync(() -> readOne.apply("user-2"), executor);
CompletableFuture<Optional<Record>> u3 = CompletableFuture.supplyAsync(() -> readOne.apply("user-3"), executor);
CompletableFuture.allOf(u1, u2, u3).join();
List<Optional<Record>> results = List.of(u1.join(), u2.join(), u3.join());
}

📖 API reference: DataSet.of(...) | Session.query(Key) | ChainableQueryBuilder.execute() | RecordStream.getFirst() | RecordResult.isOk()

Async with callbacks (Java)

Handle results with callbacks instead of blocking:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> {
RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);
stream.getFirst().ifPresent(result -> {
if (result.isOk()) {
Record user = result.recordOrThrow();
System.out.println("Got user: " + user.getString("name"));
}
});
});
System.out.println("Operation submitted");

📖 API reference: DataSet.of(...) | DataSet.id(...) | Session.query(Key) | ChainableQueryBuilder.executeAsync(...) | RecordStream.getFirst() | RecordResult.isOk() | RecordResult.recordOrThrow() | Record.getString(...) | ErrorStrategy | ErrorStrategy.IN_STREAM

Async pipeline

Chain operations together:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
DataSet orders = DataSet.of("test", "orders");
session.query(users.id("user-1"))
.bins("loyalty_tier")
.executeAsync(ErrorStrategy.IN_STREAM)
.asCompletableFuture()
.thenCompose(results -> {
String tier = results.get(0).recordOrThrow().getString("loyalty_tier");
double discount = "gold".equals(tier) ? 0.20 : 0.05;
return session.update(orders.id("order-99"))
.bin("discount").setTo(discount)
.executeAsync(ErrorStrategy.IN_STREAM)
.asCompletableFuture();
})
.thenRun(() -> System.out.println("Discount applied!"))
.exceptionally(ex -> {
System.err.println("Pipeline failed: " + ex.getMessage());
return null;
});

📖 API reference: DataSet.of(...) | DataSet.id(...) | Session.update(DataSet) | Session.query(Key) | OperationObjectBuilder.bins(...) | ChainableOperationBuilder.bin(...) | ChainableQueryBuilder.bin(...) | ChainableQueryBuilder.executeAsync(...) | RecordStream.asCompletableFuture() | RecordResult.recordOrThrow() | Record.getString(...) | ErrorStrategy | ErrorStrategy.IN_STREAM

Async batch

Batch operations also support async:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> {
RecordStream stream = session
.query(users.id("user-1"), users.id("user-2"), users.id("user-3"))
.executeAsync(ErrorStrategy.IN_STREAM);
long count = 0;
try {
while (stream.hasNext()) {
RecordResult rr = stream.next();
if (rr.isOk() && rr.recordOrNull() != null) {
count++;
}
}
} finally {
stream.close();
}
System.out.println("Batch complete, got " + count + " records");
});

📖 API reference: DataSet.of(...) | DataSet.id(...) | ChainableQueryBuilder.executeAsync(...) | RecordStream.hasNext() | RecordStream.next() | RecordStream.close() | RecordResult.isOk() | ErrorStrategy | ErrorStrategy.IN_STREAM

Async query

Stream query results asynchronously:

import com.aerospike.client.sdk.DataSet;
import com.aerospike.client.sdk.ErrorStrategy;
import com.aerospike.client.sdk.Record;
import com.aerospike.client.sdk.RecordResult;
import com.aerospike.client.sdk.RecordStream;
import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> {
RecordStream queryStream = session.query(users)
.where("$.status == 'active'")
.executeAsync(ErrorStrategy.IN_STREAM);
queryStream.forEach((RecordResult result) -> {
if (result.isOk()) {
Record user = result.recordOrThrow();
System.out.println("Active user: " + user.getString("name"));
}
});
});

📖 API reference: DataSet.of(...) | Session.query(DataSet) | ChainableQueryBuilder.where(...) | ChainableQueryBuilder.executeAsync(...) | RecordStream.forEach(...) | RecordResult.isOk() | RecordResult.recordOrThrow() | Record.getString(...) | ErrorStrategy | ErrorStrategy.IN_STREAM

Java reactive streams with asPublisher()

asPublisher() returns a java.util.concurrent.Flow.Publisher<RecordResult> with backpressure. The stream closes automatically when the subscription completes, errors, or is cancelled. This is Unicast only — a second subscriber receives onError(IllegalStateException).

Async error handling

Async errors can either be handled with a callback lambda, or have error information placed in the stream.

API reference summary

MethodDescription
Java: .executeAsync(ErrorStrategy.IN_STREAM)Non-blocking execution that returns a RecordStream filled as results arrive
Python: await <builder>.execute() on an aerospike_sdk.aio session/builderNon-blocking execution that returns a RecordStream
CompletableFuture + virtual threadsCompose parallel synchronous execute() calls (Java)
CompletableFuture.allOf()Wait for multiple futures (Java)
awaitWait for async result (Python)
asyncio.gather()Wait for multiple tasks (Python)

Next steps

Behaviors

Configure timeouts and retries for async.

Behaviors →

Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?