Async operations
Learn how to use asynchronous operations for high-throughput, non-blocking database access. Async operations allow your application to continue processing while waiting for database responses.
Why async?
| Approach | Behavior | Best For |
|---|---|---|
| Sync | Blocks thread until response | Simple scripts, low concurrency |
| Async | Returns immediately, notifies on completion | High throughput, web servers, microservices |
Note: The Java SDK uses virtual threads and hence requires JDK 21 or above. However, performance tests have shown that JDK 26 offers significant performance benefits for virtual threads in high performance environments. For these environments, JDK 26 or above is recommended.
Async read
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
// Non-blocking: results are published to the returned stream as they arrive.RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);stream.getFirst().ifPresent(result -> { if (result.isOk()) { Record user = result.recordOrThrow(); System.out.println("Name: " + user.getString("name")); }});import asyncio
from aerospike_sdk import DataSet
users = DataSet.of("test", "users")
async def read_user(): stream = await session.query(users.id("user-1")).execute() row = await stream.first_or_raise() user = row.record_or_raise() stream.close() print(f"Name: {user.bins.get('name')}")
# Run the async functionasyncio.run(read_user())Async write
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
RecordStream writeStream = session.insert(users) .bins("name", "email") .id("user-1").values("Alice", "alice@example.com") .executeAsync(ErrorStrategy.IN_STREAM);writeStream.forEach(result -> { if (result.isOk()) { System.out.println("Insert complete for key index " + result.index()); }});from aerospike_sdk import DataSet
async def create_user(): users = DataSet.of("test", "users") await session.insert(key=users.id("user-1")).put( {"name": "Alice", "email": "alice@example.com"} ).execute()
print("Insert complete!")Parallel operations
Execute multiple operations concurrently:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.List;import java.util.Optional;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executors;import java.util.function.Function;
DataSet users = DataSet.of("test", "users");
// Parallelize with virtual threads: each task runs a synchronous query and maps the first record.Function<String, Optional<Record>> readOne = id -> { RecordStream s = session.query(users.id(id)).execute(); return s.getFirst().filter(RecordResult::isOk).map(RecordResult::recordOrThrow);};
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { CompletableFuture<Optional<Record>> u1 = CompletableFuture.supplyAsync(() -> readOne.apply("user-1"), executor); CompletableFuture<Optional<Record>> u2 = CompletableFuture.supplyAsync(() -> readOne.apply("user-2"), executor); CompletableFuture<Optional<Record>> u3 = CompletableFuture.supplyAsync(() -> readOne.apply("user-3"), executor); CompletableFuture.allOf(u1, u2, u3).join(); List<Optional<Record>> results = List.of(u1.join(), u2.join(), u3.join());}import asyncio
from aerospike_sdk import DataSet
async def read_multiple_users(): users = DataSet.of("test", "users")
async def read_one(uid: str): stream = await session.query(users.id(uid)).execute() try: row = await stream.first() if row is None or not row.is_ok: return None return row.record_or_raise() finally: stream.close()
user_ids = ["user-1", "user-2", "user-3"] results = await asyncio.gather(*(read_one(uid) for uid in user_ids))
for rec in results: if rec is not None: print(f"Name: {rec.bins.get('name')}")Async with callbacks (Java)
Handle results with callbacks instead of blocking:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> { RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM); stream.getFirst().ifPresent(result -> { if (result.isOk()) { Record user = result.recordOrThrow(); System.out.println("Got user: " + user.getString("name")); } });});
System.out.println("Operation submitted");# Python uses await pattern instead of callbacksimport asyncio
from aerospike_sdk import DataSet
async def get_user_with_handling(): users = DataSet.of("test", "users") try: stream = await session.query(users.id("user-1")).execute() row = await stream.first_or_raise() user = row.record_or_raise() stream.close() print(f"Got user: {user.bins.get('name')}") except Exception as ex: print(f"Error: {ex}")
asyncio.run(get_user_with_handling())Async pipeline
Chain operations together:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.Optional;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");DataSet orders = DataSet.of("test", "orders");session.query(users.id("user-1")) .bins("loyalty_tier") .executeAsync(ErrorStrategy.IN_STREAM) .asCompletableFuture() .thenCompose(results -> { String tier = results.get(0).recordOrThrow().getString("loyalty_tier"); double discount = "gold".equals(tier) ? 0.20 : 0.05; return session.update(orders.id("order-99")) .bin("discount").setTo(discount) .executeAsync(ErrorStrategy.IN_STREAM) .asCompletableFuture(); }) .thenRun(() -> System.out.println("Discount applied!")) .exceptionally(ex -> { System.err.println("Pipeline failed: " + ex.getMessage()); return null; });from aerospike_sdk import DataSet
async def increment_age(): users = DataSet.of("test", "users") stream = await session.query(users.id("user-1")).execute() row = await stream.first_or_raise() user = row.record_or_raise() stream.close()
current_age = user.bins["age"] await ( session.update(users.id("user-1")) .bin("age").set_to(current_age + 1) .execute() ) print("Age updated!")Async batch
Batch operations also support async:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> { RecordStream stream = session .query(users.id("user-1"), users.id("user-2"), users.id("user-3")) .executeAsync(ErrorStrategy.IN_STREAM); long count = 0; try { while (stream.hasNext()) { RecordResult rr = stream.next(); if (rr.isOk() && rr.recordOrNull() != null) { count++; } } } finally { stream.close(); } System.out.println("Batch complete, got " + count + " records");});from aerospike_sdk import DataSet
async def batch_read(): users = DataSet.of("test", "users") stream = await session.query( users.ids("user-1", "user-2", "user-3") ).execute() rows = await stream.collect() stream.close() print(f"Batch complete, got {len(rows)} results")Async query
Stream query results asynchronously:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.ErrorStrategy;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import java.util.concurrent.CompletableFuture;
DataSet users = DataSet.of("test", "users");
CompletableFuture.runAsync(() -> { RecordStream queryStream = session.query(users) .where("$.status == 'active'") .executeAsync(ErrorStrategy.IN_STREAM); queryStream.forEach((RecordResult result) -> { if (result.isOk()) { Record user = result.recordOrThrow(); System.out.println("Active user: " + user.getString("name")); } });});from aerospike_sdk import DataSet
async def query_active_users(): users = DataSet.of("test", "users") stream = await ( session.query(users) .where("$.status == 'active'") .execute() )
async for row in stream: if row.is_ok: print(f"Active user: {row.record_or_raise().bins.get('name')}") stream.close()Java reactive streams with asPublisher()
asPublisher() returns a java.util.concurrent.Flow.Publisher<RecordResult> with backpressure. The stream closes automatically when the subscription completes, errors, or is cancelled. This is Unicast only — a second subscriber receives onError(IllegalStateException).
Async error handling
Async errors can either be handled with a callback lambda, or have error information placed in the stream.
API reference summary
| Method | Description |
|---|---|
Java: .executeAsync(ErrorStrategy.IN_STREAM) | Non-blocking execution that returns a RecordStream filled as results arrive |
Python: await <builder>.execute() on an aerospike_sdk.aio session/builder | Non-blocking execution that returns a RecordStream |
CompletableFuture + virtual threads | Compose parallel synchronous execute() calls (Java) |
CompletableFuture.allOf() | Wait for multiple futures (Java) |
await | Wait for async result (Python) |
asyncio.gather() | Wait for multiple tasks (Python) |
Next steps
Batch Operations
Efficient multi-record operations.
Behaviors
Configure timeouts and retries for async.
Error Handling
Handle async errors gracefully.