# Async operations

Learn how to use asynchronous operations for high-throughput, non-blocking database access. Async operations allow your application to continue processing while waiting for database responses.

## Why async?

| Approach | Behavior | Best For |
| --- | --- | --- |
| **Sync** | Blocks thread until response | Simple scripts, low concurrency |
| **Async** | Returns immediately, notifies on completion | High throughput, web servers, microservices |

Note: The Java SDK uses virtual threads and hence requires JDK 21 or above. However, performance tests have shown that JDK 26 offers significant performance benefits for virtual threads in high performance environments. For these environments, JDK 26 or above is recommended.

## Async read

-   [Java](#tab-panel-3076)
-   [Python](#tab-panel-3077)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.ErrorStrategy;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

DataSet users = DataSet.of("test", "users");

// Non-blocking: results are published to the returned stream as they arrive.

RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);

stream.getFirst().ifPresent(result -> {

    if (result.isOk()) {

        Record user = result.recordOrThrow();

        System.out.println("Name: " + user.getString("name"));

    }

});
```

```python
import asyncio

from aerospike_sdk import DataSet

users = DataSet.of("test", "users")

async def read_user():

    stream = await session.query(users.id("user-1")).execute()

    row = await stream.first_or_raise()

    user = row.record_or_raise()

    stream.close()

    print(f"Name: {user.bins.get('name')}")

# Run the async function

asyncio.run(read_user())
```

## Async write

-   [Java](#tab-panel-3078)
-   [Python](#tab-panel-3079)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.ErrorStrategy;

import com.aerospike.client.sdk.RecordStream;

DataSet users = DataSet.of("test", "users");

RecordStream writeStream = session.insert(users)

    .bins("name", "email")

    .id("user-1").values("Alice", "alice@example.com")

    .executeAsync(ErrorStrategy.IN_STREAM);

writeStream.forEach(result -> {

    if (result.isOk()) {

        System.out.println("Insert complete for key index " + result.index());

    }

});
```

```python
from aerospike_sdk import DataSet

async def create_user():

    users = DataSet.of("test", "users")

    await session.insert(key=users.id("user-1")).put(

        {"name": "Alice", "email": "alice@example.com"}

    ).execute()

    print("Insert complete!")
```

## Parallel operations

Execute multiple operations concurrently:

-   [Java](#tab-panel-3080)
-   [Python](#tab-panel-3081)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import java.util.List;

import java.util.Optional;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.Executors;

import java.util.function.Function;

DataSet users = DataSet.of("test", "users");

// Parallelize with virtual threads: each task runs a synchronous query and maps the first record.

Function<String, Optional<Record>> readOne = id -> {

    RecordStream s = session.query(users.id(id)).execute();

    return s.getFirst().filter(RecordResult::isOk).map(RecordResult::recordOrThrow);

};

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {

    CompletableFuture<Optional<Record>> u1 = CompletableFuture.supplyAsync(() -> readOne.apply("user-1"), executor);

    CompletableFuture<Optional<Record>> u2 = CompletableFuture.supplyAsync(() -> readOne.apply("user-2"), executor);

    CompletableFuture<Optional<Record>> u3 = CompletableFuture.supplyAsync(() -> readOne.apply("user-3"), executor);

    CompletableFuture.allOf(u1, u2, u3).join();

    List<Optional<Record>> results = List.of(u1.join(), u2.join(), u3.join());

}
```

```python
import asyncio

from aerospike_sdk import DataSet

async def read_multiple_users():

    users = DataSet.of("test", "users")

    async def read_one(uid: str):

        stream = await session.query(users.id(uid)).execute()

        try:

            row = await stream.first()

            if row is None or not row.is_ok:

                return None

            return row.record_or_raise()

        finally:

            stream.close()

    user_ids = ["user-1", "user-2", "user-3"]

    results = await asyncio.gather(*(read_one(uid) for uid in user_ids))

    for rec in results:

        if rec is not None:

            print(f"Name: {rec.bins.get('name')}")
```

## Async with callbacks (Java)

Handle results with callbacks instead of blocking:

-   [Java](#tab-panel-3082)
-   [Python](#tab-panel-3083)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.ErrorStrategy;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import java.util.concurrent.CompletableFuture;

DataSet users = DataSet.of("test", "users");

CompletableFuture.runAsync(() -> {

    RecordStream stream = session.query(users.id("user-1")).executeAsync(ErrorStrategy.IN_STREAM);

    stream.getFirst().ifPresent(result -> {

        if (result.isOk()) {

            Record user = result.recordOrThrow();

            System.out.println("Got user: " + user.getString("name"));

        }

    });

});

System.out.println("Operation submitted");
```

```python
# Python uses await pattern instead of callbacks

import asyncio

from aerospike_sdk import DataSet

async def get_user_with_handling():

    users = DataSet.of("test", "users")

    try:

        stream = await session.query(users.id("user-1")).execute()

        row = await stream.first_or_raise()

        user = row.record_or_raise()

        stream.close()

        print(f"Got user: {user.bins.get('name')}")

    except Exception as ex:

        print(f"Error: {ex}")

asyncio.run(get_user_with_handling())
```

## Async pipeline

Chain operations together:

-   [Java](#tab-panel-3084)
-   [Python](#tab-panel-3085)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.ErrorStrategy;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import java.util.Optional;

import java.util.concurrent.CompletableFuture;

DataSet users = DataSet.of("test", "users");

DataSet orders = DataSet.of("test", "orders");

session.query(users.id("user-1"))

    .bins("loyalty_tier")

    .executeAsync(ErrorStrategy.IN_STREAM)

    .asCompletableFuture()

    .thenCompose(results -> {

        String tier = results.get(0).recordOrThrow().getString("loyalty_tier");

        double discount = "gold".equals(tier) ? 0.20 : 0.05;

        return session.update(orders.id("order-99"))

            .bin("discount").setTo(discount)

            .executeAsync(ErrorStrategy.IN_STREAM)

            .asCompletableFuture();

    })

    .thenRun(() -> System.out.println("Discount applied!"))

    .exceptionally(ex -> {

        System.err.println("Pipeline failed: " + ex.getMessage());

        return null;

    });
```

```python
from aerospike_sdk import DataSet

async def increment_age():

    users = DataSet.of("test", "users")

    stream = await session.query(users.id("user-1")).execute()

    row = await stream.first_or_raise()

    user = row.record_or_raise()

    stream.close()

    current_age = user.bins["age"]

    await (

        session.update(users.id("user-1"))

        .bin("age").set_to(current_age + 1)

        .execute()

    )

    print("Age updated!")
```

## Async batch

Batch operations also support async:

-   [Java](#tab-panel-3086)
-   [Python](#tab-panel-3087)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.ErrorStrategy;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import java.util.concurrent.CompletableFuture;

DataSet users = DataSet.of("test", "users");

CompletableFuture.runAsync(() -> {

    RecordStream stream = session

        .query(users.id("user-1"), users.id("user-2"), users.id("user-3"))

        .executeAsync(ErrorStrategy.IN_STREAM);

    long count = 0;

    try {

        while (stream.hasNext()) {

            RecordResult rr = stream.next();

            if (rr.isOk() && rr.recordOrNull() != null) {

                count++;

            }

        }

    } finally {

        stream.close();

    }

    System.out.println("Batch complete, got " + count + " records");

});
```

```python
from aerospike_sdk import DataSet

async def batch_read():

    users = DataSet.of("test", "users")

    stream = await session.query(

        users.ids("user-1", "user-2", "user-3")

    ).execute()

    rows = await stream.collect()

    stream.close()

    print(f"Batch complete, got {len(rows)} results")
```

## Async query

Stream query results asynchronously:

-   [Java](#tab-panel-3088)
-   [Python](#tab-panel-3089)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.ErrorStrategy;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import java.util.concurrent.CompletableFuture;

DataSet users = DataSet.of("test", "users");

CompletableFuture.runAsync(() -> {

    RecordStream queryStream = session.query(users)

        .where("$.status == 'active'")

        .executeAsync(ErrorStrategy.IN_STREAM);

    queryStream.forEach((RecordResult result) -> {

        if (result.isOk()) {

            Record user = result.recordOrThrow();

            System.out.println("Active user: " + user.getString("name"));

        }

    });

});
```

```python
from aerospike_sdk import DataSet

async def query_active_users():

    users = DataSet.of("test", "users")

    stream = await (

        session.query(users)

        .where("$.status == 'active'")

        .execute()

    )

    async for row in stream:

        if row.is_ok:

            print(f"Active user: {row.record_or_raise().bins.get('name')}")

    stream.close()
```

## Java reactive streams with `asPublisher()`

`asPublisher()` returns a `java.util.concurrent.Flow.Publisher<RecordResult>` with backpressure. The stream closes automatically when the subscription completes, errors, or is cancelled. This is Unicast only — a second subscriber receives `onError(IllegalStateException)`.

## Async error handling

Async errors can either be handled with a callback lambda, or have error information placed in the stream.

## API reference summary

| Method | Description |
| --- | --- |
| Java: `.executeAsync(ErrorStrategy.IN_STREAM)` | Non-blocking execution that returns a `RecordStream` filled as results arrive |
| Python: `await <builder>.execute()` on an `aerospike_sdk.aio` session/builder | Non-blocking execution that returns a `RecordStream` |
| `CompletableFuture` + virtual threads | Compose parallel synchronous `execute()` calls (Java) |
| `CompletableFuture.allOf()` | Wait for multiple futures (Java) |
| `await` | Wait for async result (Python) |
| `asyncio.gather()` | Wait for multiple tasks (Python) |

## Next steps

Batch Operations

Efficient multi-record operations.

[Batch Operations →](https://aerospike.com/docs/develop/client/sdk/usage/batch)

Behaviors

Configure timeouts and retries for async.

[Behaviors →](https://aerospike.com/docs/develop/client/sdk/concepts/behaviors)

Error Handling

Handle async errors gracefully.

[Error Handling →](https://aerospike.com/docs/develop/client/sdk/concepts/errors)