Async operations
For the complete documentation index see: llms.txt
All documentation pages available in markdown.
Learn how to use asynchronous operations for high-throughput, non-blocking database access. Async operations allow your application to continue processing while waiting for database responses.
Why async?
| Approach | Behavior | Best For |
|---|---|---|
| Sync | Blocks thread until response | Simple scripts, low concurrency |
| Async | Returns immediately, notifies on completion | High throughput, web servers, microservices |
See Error handling for more information about async execution modes.
Async read
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
// Non-blocking: results are published to the returned stream as they arrive.RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);stream.getFirst().ifPresent(result -> { if (result.isOk()) { Record user = result.recordOrThrow(); System.out.println("Name: " + user.getString("name")); }});📖 API reference:
DataSet.of(...)|DataSet.id(...)|Session.query(Key)|ChainableQueryBuilder.executeAsync(...)|RecordStream.getFirst()|RecordResult.isOk()|RecordResult.recordOrThrow()|Record.getString(...)|ErrorStrategy|ErrorStrategy.IN_STREAM
import asyncio
from aerospike_sdk import DataSet
users = DataSet.of("test", "users")
async def read_user(): stream = await session.query(users.id("user-1")).execute() row = await stream.first_or_raise() user = row.record_or_raise() stream.close() print(f"Name: {user.bins.get('name')}")
# Run the async functionasyncio.run(read_user())📖 API reference:
DataSet.of()|DataSet.id()|Session.query()|RecordResult.record_or_raise()|RecordStream.first_or_raise()|RecordStream.close()|QueryBuilder.execute()
Async write
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
RecordStream writeStream = session.insert(users) .bins("name", "email") .id("user-1").values("Alice", "alice@example.com") .executeAsync(ErrorStrategy.IN_STREAM);writeStream.forEach(result -> { if (result.isOk()) { System.out.println("Insert complete for key index " + result.index()); }});📖 API reference:
DataSet.of(...)|Session.insert(DataSet)|OperationObjectBuilder.bins(...)|IdValuesBuilder.id(...)|IdValuesRowBuilder.values(...)|ChainableQueryBuilder.executeAsync(...)|RecordStream.forEach(...)|RecordResult.isOk()|ErrorStrategy|ErrorStrategy.IN_STREAM
from aerospike_sdk import DataSet
async def create_user(): users = DataSet.of("test", "users") await session.insert(key=users.id("user-1")).put( {"name": "Alice", "email": "alice@example.com"} ).execute()
print("Insert complete!")📖 API reference:
DataSet.of()|DataSet.id()|Session.insert()|WriteSegmentBuilder.put()|WriteSegmentBuilder.execute()
Parallel operations
Execute multiple operations concurrently:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.List;import java.util.Optional;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executors;import java.util.function.Function;
DataSet users = DataSet.of("test", "users");
// Parallelize with virtual threads: each task runs a synchronous query and maps the first record.Function<String, Optional<Record>> readOne = id -> { RecordStream s = session.query(users.id(id)).execute(); return s.getFirst().filter(RecordResult::isOk).map(RecordResult::recordOrThrow);};
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { CompletableFuture<Optional<Record>> u1 = CompletableFuture.supplyAsync(() -> readOne.apply("user-1"), executor); CompletableFuture<Optional<Record>> u2 = CompletableFuture.supplyAsync(() -> readOne.apply("user-2"), executor); CompletableFuture<Optional<Record>> u3 = CompletableFuture.supplyAsync(() -> readOne.apply("user-3"), executor); CompletableFuture.allOf(u1, u2, u3).join(); List<Optional<Record>> results = List.of(u1.join(), u2.join(), u3.join());}📖 API reference:
DataSet.of(...)|Session.query(Key)|ChainableQueryBuilder.execute()|RecordStream.getFirst()|RecordResult.isOk()
import asyncio
from aerospike_sdk import DataSet
async def read_multiple_users(): users = DataSet.of("test", "users")
async def read_one(uid: str): stream = await session.query(users.id(uid)).execute() try: row = await stream.first() if row is None or not row.is_ok: return None return row.record_or_raise() finally: stream.close()
user_ids = ["user-1", "user-2", "user-3"] results = await asyncio.gather(*(read_one(uid) for uid in user_ids))
for rec in results: if rec is not None: print(f"Name: {rec.bins.get('name')}")📖 API reference:
DataSet.of()|DataSet.id()|Session.query()|RecordResult.record_or_raise()|RecordStream.first()|RecordStream.close()|QueryBuilder.execute()
Async with callbacks (Java)
Handle results with callbacks instead of blocking:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> { RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM); stream.getFirst().ifPresent(result -> { if (result.isOk()) { Record user = result.recordOrThrow(); System.out.println("Got user: " + user.getString("name")); } });});
System.out.println("Operation submitted");📖 API reference:
DataSet.of(...)|DataSet.id(...)|Session.query(Key)|ChainableQueryBuilder.executeAsync(...)|RecordStream.getFirst()|RecordResult.isOk()|RecordResult.recordOrThrow()|Record.getString(...)|ErrorStrategy|ErrorStrategy.IN_STREAM
# Python uses await pattern instead of callbacksimport asyncio
from aerospike_sdk import DataSet
async def get_user_with_handling(): users = DataSet.of("test", "users") try: stream = await session.query(users.id("user-1")).execute() row = await stream.first_or_raise() user = row.record_or_raise() stream.close() print(f"Got user: {user.bins.get('name')}") except Exception as ex: print(f"Error: {ex}")
asyncio.run(get_user_with_handling())📖 API reference:
DataSet.of()|DataSet.id()|Session.query()|RecordResult.record_or_raise()|RecordStream.first_or_raise()|RecordStream.close()|QueryBuilder.execute()
Async pipeline
Chain operations together:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.Optional;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");DataSet orders = DataSet.of("test", "orders");session.query(users.id("user-1")) .bins("loyalty_tier") .executeAsync(ErrorStrategy.IN_STREAM) .asCompletableFuture() .thenCompose(results -> { String tier = results.get(0).recordOrThrow().getString("loyalty_tier"); double discount = "gold".equals(tier) ? 0.20 : 0.05; return session.update(orders.id("order-99")) .bin("discount").setTo(discount) .executeAsync(ErrorStrategy.IN_STREAM) .asCompletableFuture(); }) .thenRun(() -> System.out.println("Discount applied!")) .exceptionally(ex -> { System.err.println("Pipeline failed: " + ex.getMessage()); return null; });📖 API reference:
DataSet.of(...)|DataSet.id(...)|Session.update(DataSet)|Session.query(Key)|OperationObjectBuilder.bins(...)|ChainableOperationBuilder.bin(...)|ChainableQueryBuilder.bin(...)|ChainableQueryBuilder.executeAsync(...)|RecordStream.asCompletableFuture()|RecordResult.recordOrThrow()|Record.getString(...)|ErrorStrategy|ErrorStrategy.IN_STREAM
from aerospike_sdk import DataSet
async def increment_age(): users = DataSet.of("test", "users") stream = await session.query(users.id("user-1")).execute() row = await stream.first_or_raise() user = row.record_or_raise() stream.close()
current_age = user.bins["age"] await ( session.update(users.id("user-1")) .bin("age").set_to(current_age + 1) .execute() ) print("Age updated!")📖 API reference:
DataSet.of()|DataSet.id()|Session.query()|Session.update()|QueryBuilder.bin()|WriteSegmentBuilder.set_to()|WriteSegmentBuilder.bin()|RecordResult.record_or_raise()|RecordStream.first_or_raise()|RecordStream.close()|QueryBuilder.execute()|WriteSegmentBuilder.execute()
Async batch
Batch operations also support async:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> { RecordStream stream = session .query(users.id("user-1"), users.id("user-2"), users.id("user-3")) .executeAsync(ErrorStrategy.IN_STREAM); long count = 0; try { while (stream.hasNext()) { RecordResult rr = stream.next(); if (rr.isOk() && rr.recordOrNull() != null) { count++; } } } finally { stream.close(); } System.out.println("Batch complete, got " + count + " records");});📖 API reference:
DataSet.of(...)|DataSet.id(...)|ChainableQueryBuilder.executeAsync(...)|RecordStream.hasNext()|RecordStream.next()|RecordStream.close()|RecordResult.isOk()|ErrorStrategy|ErrorStrategy.IN_STREAM
from aerospike_sdk import DataSet
async def batch_read(): users = DataSet.of("test", "users") stream = await session.query( users.ids("user-1", "user-2", "user-3") ).execute() rows = await stream.collect() stream.close() print(f"Batch complete, got {len(rows)} results")📖 API reference:
DataSet.of()|DataSet.ids()|Session.query()|RecordStream.collect()|RecordStream.close()|QueryBuilder.execute()
Async query
Stream query results asynchronously:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> { RecordStream queryStream = session.query(users) .where("$.status == 'active'") .executeAsync(ErrorStrategy.IN_STREAM); queryStream.forEach((RecordResult result) -> { if (result.isOk()) { Record user = result.recordOrThrow(); System.out.println("Active user: " + user.getString("name")); } });});📖 API reference:
DataSet.of(...)|Session.query(DataSet)|ChainableQueryBuilder.where(...)|ChainableQueryBuilder.executeAsync(...)|RecordStream.forEach(...)|RecordResult.isOk()|RecordResult.recordOrThrow()|Record.getString(...)|ErrorStrategy|ErrorStrategy.IN_STREAM
from aerospike_sdk import DataSet
async def query_active_users(): users = DataSet.of("test", "users") stream = await ( session.query(users) .where("$.status == 'active'") .execute() )
async for row in stream: if row.is_ok: print(f"Active user: {row.record_or_raise().bins.get('name')}") stream.close()📖 API reference:
DataSet.of()|Session.query()|QueryBuilder.where()|RecordResult.record_or_raise()|RecordStream.close()|QueryBuilder.execute()
Java reactive streams with asPublisher()
asPublisher() returns a java.util.concurrent.Flow.Publisher<RecordResult> with backpressure. The stream closes automatically when the subscription completes, errors, or is cancelled. This is Unicast only — a second subscriber receives onError(IllegalStateException).
Async error handling
Async errors can either be handled with a callback lambda, or have error information placed in the stream.
API reference summary
| Method | Description |
|---|---|
Java: .executeAsync(ErrorStrategy.IN_STREAM) | Non-blocking execution that returns a RecordStream filled as results arrive |
Python: await <builder>.execute() on an aerospike_sdk.aio session/builder | Non-blocking execution that returns a RecordStream |
CompletableFuture + virtual threads | Compose parallel synchronous execute() calls (Java) |
CompletableFuture.allOf() | Wait for multiple futures (Java) |
await | Wait for async result (Python) |
asyncio.gather() | Wait for multiple tasks (Python) |
Next steps
Batch Operations
Efficient multi-record operations.
Behaviors
Configure timeouts and retries for async.
Error Handling
Handle async errors gracefully.