# Batch operations

Learn how to perform multiple database operations efficiently in a single network request. Batch operations reduce latency and improve throughput when working with multiple records.

## Why use batch operations?

| Approach | 100 Records | Network Roundtrips |
| --- | --- | --- |
| Individual operations | ~100ms+ | 100 |
| Batch operation | ~5-10ms | 1 |

Batch operations send multiple requests in a single network call, as well as sending requests to multiple servers concurrently, dramatically reducing latency.

## Batch read

Read multiple records in one request:

-   [Java](#tab-panel-3090)
-   [Python](#tab-panel-3091)

```java
import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

DataSet users = DataSet.of("test", "users");

RecordStream result = session.query(users.ids("user-1", "user-2", "user-3"))

    .execute();

// Get all records

List<Record> records = new java.util.ArrayList<>();

result.forEach(rr -> {

    if (rr.isOk()) {

        records.add(rr.recordOrThrow());

    }

});

for (Record record : records) {

    System.out.println("Name: " + record.getString("name"));

}

// Or access by index

Record first = records.get(0);  // user-1

Record second = records.get(1); // user-2
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    stream = await session.query(users.ids("user-1", "user-2", "user-3")).execute()

    # Get all records

    rows = await stream.collect()

    records = [row.record for row in rows if row.is_ok and row.record is not None]

    stream.close()

    for record in records:

        print(f"Name: {record.bins['name']}")

    # Or access by index

    first = records[0]   # user-1

    second = records[1]  # user-2
```

## Batch write

Create or update multiple records:

-   [Java](#tab-panel-3092)
-   [Python](#tab-panel-3093)

```java
session.insert(users)

    .bins("name", "email")

    .id("user-1").values("Alice", "alice@example.com")

    .id("user-2").values("Bob", "bob@example.com")

    .id("user-3").values("Carol", "carol@example.com")

    .execute();
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    await (

        session.batch()

        .insert(users.id("user-1")).put({

            "name": "Alice",

            "email": "alice@example.com",

        })

        .insert(users.id("user-2")).put({

            "name": "Bob",

            "email": "bob@example.com",

        })

        .insert(users.id("user-3")).put({

            "name": "Carol",

            "email": "carol@example.com",

        })

        .execute()

    )
```

## Batch upsert

Insert or update multiple records:

-   [Java](#tab-panel-3094)
-   [Python](#tab-panel-3095)

```java
session

    .upsert(users.ids("user-1","user-2"))

        .bin("status").setTo("active")

        .bin("balance").add(500)

    .upsert(users.id("user-3"))

        .bin("status").setTo("inactive")

    .execute();
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    await (

        session.batch()

        .upsert(users.id("user-1")).put({"status": "active"})

        .upsert(users.id("user-2")).put({"status": "active"})

        .upsert(users.id("user-3")).put({"status": "inactive"})

        .execute()

    )
```

## Batch delete

Delete multiple records:

-   [Java](#tab-panel-3096)
-   [Python](#tab-panel-3097)

```java
RecordStream result = session.delete(users.ids("user-1", "user-2", "user-3"))

    .execute();

// Check which deletes succeeded

int i = 0;

while (result.hasNext()) {

    RecordResult rr = result.next();

    System.out.println("Record " + i++ + " deleted: " + rr.asBoolean());

}

result.close();
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    stream = await (

        session.batch()

        .delete(users.id("user-1"))

        .delete(users.id("user-2"))

        .delete(users.id("user-3"))

        .execute()

    )

    # Check which deletes succeeded

    async for rr in stream:

        print(f"Deleted: {rr.as_bool()}")

    stream.close()
```

## Mixed batch operations

Combine different operation types in one call. Note that the operations on each key are performed asynchronously across the nodes in the cluster, and hence the returned results may not be in the same order as the commands. Each returned item exposes a key and an index — `key()` and `index()` in Java, the `key` and `index` attributes in Python (`row.key`, `row.index`). `key` is the unique key of the record and `index` is the zero-based index of the command in the original list.

-   [Java](#tab-panel-3098)
-   [Python](#tab-panel-3099)

```java
RecordStream stream = session

    .query(users.ids("user-1", "user-2"))

    .upsert(users.id("user-3"))

        .bin("status").setTo("active")

    .delete(users.id("user-4"))

    .execute();

stream.forEach(result -> {

    switch (result.index()) {

        case 0 -> handleUser1(result);

        case 1 -> handleUser2(result);

        case 2 -> System.out.println("Upsert: " + (result.isOk() ? "ok" : result.message()));

        case 3 -> System.out.println("Delete: " + (result.isOk() ? "ok" : result.message()));

    }

});
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    # Batch read uses query(data_set.ids(...))

    read_stream = await session.query(users.ids("user-1", "user-2")).execute()

    async for row in read_stream:

        if row.is_ok and row.record is not None:

            print(f"Read: {row.record_or_raise().bins['name']}")

    read_stream.close()

    # Batch write/delete uses session.batch()

    write_stream = await (

        session.batch()

        .upsert(users.id("user-3")).put({"status": "active"})

        .delete(users.id("user-4"))

        .execute()

    )

    async for _ in write_stream:

        pass

    write_stream.close()
```

## Batch with selected bins

Read only specific bins in batch:

-   [Java](#tab-panel-3100)
-   [Python](#tab-panel-3101)

```java
RecordStream result = session.query(users.ids("user-1", "user-2", "user-3"))

    .bins("name", "email")

    .execute();
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    stream = await (

        session.query(users.ids("user-1", "user-2", "user-3"))

        .bins(["name", "email"])

        .execute()

    )

    rows = await stream.collect()

    records = [row.record for row in rows if row.is_ok and row.record is not None]

    stream.close()
```

## Handle partial failures

Some operations in a batch may fail while others succeed:

-   [Java](#tab-panel-3102)
-   [Python](#tab-panel-3103)

```java
RecordStream result = session

    .update(users.ids("user-1", "nonexistent", "user-3"))

    .bin("lastSeen").setTo(System.currentTimeMillis())

    .execute(ErrorStrategy.IN_STREAM);  // Place errors in the RecordStream

result.forEach(rr -> {

    String keyId = rr.key().userKey.toString();

    if (!rr.isOk()) {

        System.out.println(keyId + " failed: " + rr.message());

    } else {

        System.out.println(keyId + ": updated");

    }

});
```

::: note
In Python, `session.query(data_set.ids(...))` yields a result only for keys that were found. Missing keys are filtered from the stream rather than appearing as failed results. The failed branch below is shown for completeness and parity with other languages; in Python it won’t fire for missing keys.
:::

```python
async def demo(session):

    users = DataSet.of("test", "users")

    stream = await session.query(

        users.ids("user-1", "nonexistent", "user-3")

    ).execute()

    i = 0

    async for row in stream:

        if not row.is_ok:

            print(f"Operation {i} failed: {row.result_code}")

        elif row.record is not None:

            print(f"Operation {i}: {row.record_or_raise().bins['name']}")

        else:

            print(f"Operation {i}: Record not found")

        i += 1

    stream.close()
```

## Dynamic batch building

Build batches programmatically:

-   [Java](#tab-panel-3104)
-   [Python](#tab-panel-3105)

```java
import java.util.List;

import com.aerospike.client.sdk.IdValuesRowBuilder;

IdValuesRowBuilder rows = session.upsert(users)

    .bins("name", "score")

    .id("user-1").values("User 1", 10);

for (int id = 2; id <= 50; id++) {

    rows.id("user-" + id).values("User " + id, id * 10);

}

rows.execute();
```

```python
async def demo(session):

    users = DataSet.of("test", "users")

    user_ids = ["user-1", "user-2", "user-3", "user-4", "user-5"]

    stream = await session.query(users.ids(*user_ids)).execute()

    rows = await stream.collect()

    stream.close()
```

## Complete example

-   [Java](#tab-panel-3106)
-   [Python](#tab-panel-3107)

```java
import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.ClusterDefinition;

import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import com.aerospike.client.sdk.Session;

import com.aerospike.client.sdk.policy.Behavior;

public class BatchOperationsExample {

    public static void main(String[] args) {

        try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) {

            Session session = cluster.createSession(Behavior.DEFAULT);

            DataSet users = DataSet.of("test", "users");

            String key1 = "batch-example-1";

            String key2 = "batch-example-2";

            String key3 = "batch-example-3";

            // Cleanup so the example is repeatable.

            session.delete(users.ids(key1, key2, key3)).execute().close();

            // Batch insert

            session.insert(users)

                .bins("name", "age")

                .id(key1).values("Alice", 28)

                .id(key2).values("Bob", 35)

                .id(key3).values("Carol", 22)

                .execute();

            System.out.println("Batch insert complete");

            // Batch read

            RecordStream readStream = session.query(users.ids(key1, key2, key3))

                .execute()

                .forEach(result -> {

                    Record record = result.recordOrThrow();

                    System.out.println("  - " + record.getString("name"));

                });

            // Batch update

            session

                .upsert(users.id(key1), users.id(key2))

                    .bin("status").setTo("active")

                .upsert(users.id(key3))

                    .bin("status").setTo("inactive")

                .execute().close();

            System.out.println("\nBatch update complete");

            // Batch delete

            session.delete(users.ids(key1, key2, key3))

                .execute().close();

            System.out.println("Batch delete complete");

        }

    }

}
```

```python
import asyncio

from aerospike_sdk import Behavior, DataSet, Client

async def main():

    async with Client("localhost:3000") as client:

        session = client.create_session(Behavior.DEFAULT)

        users = DataSet.of("test", "users")

        key1 = users.id("batch-example-1")

        key2 = users.id("batch-example-2")

        key3 = users.id("batch-example-3")

        # Cleanup so the example is repeatable.

        stream = await session.batch().delete(key1).delete(key2).delete(key3).execute()

        stream.close()

        # Batch insert

        await (

            session.batch()

            .insert(key1).put({"name": "Alice", "age": 28})

            .insert(key2).put({"name": "Bob", "age": 35})

            .insert(key3).put({"name": "Carol", "age": 22})

            .execute()

        )

        print("Batch insert complete")

        # Batch read

        stream = await session.query(users.ids("batch-example-1", "batch-example-2", "batch-example-3")).execute()

        print("\nBatch read complete:")

        async for row in stream:

            if row.is_ok:

                record = row.record_or_raise()

                if record is not None:

                    print(f"  - {record.bins.get('name')}")

        stream.close()

        # Batch update

        await (

            session.batch()

            .upsert(key1).put({"status": "active"})

            .upsert(key2).put({"status": "active"})

            .upsert(key3).put({"status": "inactive"})

            .execute()

        )

        print("\nBatch update complete")

        # Batch delete

        await (

            session.batch()

            .delete(key1)

            .delete(key2)

            .delete(key3)

            .execute()

        )

        print("Batch delete complete")

if __name__ == "__main__":

    asyncio.run(main())
```

## API reference summary

| Java | Python | Description |
| --- | --- | --- |
| `session.query(dataSet.ids(...))` | `await session.query(data_set.ids(...)).execute()` | Batch-read multiple record IDs |
| `session.insert(dataSet)` + repeated `.id().values()` | `session.batch().insert(key).put({...})` (one per key) | Batch insert with one request |
| `session.upsert(dataSet)` + repeated `.id().values()` | `session.batch().upsert(key).put({...})` (one per key) | Batch upsert with one request |
| `session.delete(dataSet.ids(...))` | `await session.batch().delete(key)…execute()` (one per key) | Batch delete multiple IDs |
| `.bins(...)` (varargs) | `.bins([...])` (list) | Select projected bins for batch reads |
| `RecordStream` / `forEach` | `RecordStream` / `async for` | Iterate per-record results |

## Next steps

Async Operations

Non-blocking operations for high throughput.

[Async Operations →](https://aerospike.com/docs/develop/client/sdk/usage/async)

Query Records

Find records with DSL queries.

[Query Records →](https://aerospike.com/docs/develop/client/sdk/usage/query)

Behaviors

Configure batch timeouts and retries.

[Behaviors →](https://aerospike.com/docs/develop/client/sdk/concepts/behaviors)