Batch operations
Learn how to perform multiple database operations efficiently in a single network request. Batch operations reduce latency and improve throughput when working with multiple records.
Why use batch operations?
| Approach | 100 Records | Network Roundtrips |
|---|---|---|
| Individual operations | ~100ms+ | 100 |
| Batch operation | ~5-10ms | 1 |
Batch operations send multiple requests in a single network call, as well as sending requests to multiple servers concurrently, dramatically reducing latency.
Batch read
Read multiple records in one request:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;
DataSet users = DataSet.of("test", "users");
RecordStream result = session.query(users.ids("user-1", "user-2", "user-3")) .execute();
// Get all recordsList<Record> records = new java.util.ArrayList<>();result.forEach(rr -> { if (rr.isOk()) { records.add(rr.recordOrThrow()); }});for (Record record : records) { System.out.println("Name: " + record.getString("name"));}
// Or access by indexRecord first = records.get(0); // user-1Record second = records.get(1); // user-2async def demo(session): users = DataSet.of("test", "users")
stream = await session.query(users.ids("user-1", "user-2", "user-3")).execute()
# Get all records rows = await stream.collect() records = [row.record for row in rows if row.is_ok and row.record is not None] stream.close()
for record in records: print(f"Name: {record.bins['name']}")
# Or access by index first = records[0] # user-1 second = records[1] # user-2Batch write
Create or update multiple records:
session.insert(users) .bins("name", "email") .id("user-1").values("Alice", "alice@example.com") .id("user-2").values("Bob", "bob@example.com") .id("user-3").values("Carol", "carol@example.com") .execute();async def demo(session): users = DataSet.of("test", "users")
await ( session.batch() .insert(users.id("user-1")).put({ "name": "Alice", "email": "alice@example.com", }) .insert(users.id("user-2")).put({ "name": "Bob", "email": "bob@example.com", }) .insert(users.id("user-3")).put({ "name": "Carol", "email": "carol@example.com", }) .execute() )Batch upsert
Insert or update multiple records:
session .upsert(users.ids("user-1","user-2")) .bin("status").setTo("active") .bin("balance").add(500) .upsert(users.id("user-3")) .bin("status").setTo("inactive") .execute();async def demo(session): users = DataSet.of("test", "users") await ( session.batch() .upsert(users.id("user-1")).put({"status": "active"}) .upsert(users.id("user-2")).put({"status": "active"}) .upsert(users.id("user-3")).put({"status": "inactive"}) .execute() )Batch delete
Delete multiple records:
RecordStream result = session.delete(users.ids("user-1", "user-2", "user-3")) .execute();
// Check which deletes succeededint i = 0;while (result.hasNext()) { RecordResult rr = result.next(); System.out.println("Record " + i++ + " deleted: " + rr.asBoolean());}result.close();async def demo(session): users = DataSet.of("test", "users") stream = await ( session.batch() .delete(users.id("user-1")) .delete(users.id("user-2")) .delete(users.id("user-3")) .execute() )
# Check which deletes succeeded async for rr in stream: print(f"Deleted: {rr.as_bool()}") stream.close()Mixed batch operations
Combine different operation types in one call. Note that the operations on each
key are performed asynchronously across the nodes in the cluster, and hence the
returned results may not be in the same order as the commands. Each returned
item exposes a key and an index — key() and index() in Java, the key and
index attributes in Python (row.key, row.index). key is the unique key
of the record and index is the zero-based index of the command in the
original list.
RecordStream stream = session .query(users.ids("user-1", "user-2")) .upsert(users.id("user-3")) .bin("status").setTo("active") .delete(users.id("user-4")) .execute();
stream.forEach(result -> { switch (result.index()) { case 0 -> handleUser1(result); case 1 -> handleUser2(result); case 2 -> System.out.println("Upsert: " + (result.isOk() ? "ok" : result.message())); case 3 -> System.out.println("Delete: " + (result.isOk() ? "ok" : result.message())); }});async def demo(session): users = DataSet.of("test", "users")
# Batch read uses query(data_set.ids(...)) read_stream = await session.query(users.ids("user-1", "user-2")).execute() async for row in read_stream: if row.is_ok and row.record is not None: print(f"Read: {row.record_or_raise().bins['name']}") read_stream.close()
# Batch write/delete uses session.batch() write_stream = await ( session.batch() .upsert(users.id("user-3")).put({"status": "active"}) .delete(users.id("user-4")) .execute() ) async for _ in write_stream: pass write_stream.close()Batch with selected bins
Read only specific bins in batch:
RecordStream result = session.query(users.ids("user-1", "user-2", "user-3")) .bins("name", "email") .execute();async def demo(session): users = DataSet.of("test", "users") stream = await ( session.query(users.ids("user-1", "user-2", "user-3")) .bins(["name", "email"]) .execute() ) rows = await stream.collect() records = [row.record for row in rows if row.is_ok and row.record is not None] stream.close()Handle partial failures
Some operations in a batch may fail while others succeed:
RecordStream result = session .update(users.ids("user-1", "nonexistent", "user-3")) .bin("lastSeen").setTo(System.currentTimeMillis()) .execute(ErrorStrategy.IN_STREAM); // Place errors in the RecordStreamresult.forEach(rr -> { String keyId = rr.key().userKey.toString(); if (!rr.isOk()) { System.out.println(keyId + " failed: " + rr.message()); } else { System.out.println(keyId + ": updated"); }});async def demo(session): users = DataSet.of("test", "users")
stream = await session.query( users.ids("user-1", "nonexistent", "user-3") ).execute()
i = 0 async for row in stream: if not row.is_ok: print(f"Operation {i} failed: {row.result_code}") elif row.record is not None: print(f"Operation {i}: {row.record_or_raise().bins['name']}") else: print(f"Operation {i}: Record not found") i += 1 stream.close()Dynamic batch building
Build batches programmatically:
import java.util.List;import com.aerospike.client.sdk.IdValuesRowBuilder;
IdValuesRowBuilder rows = session.upsert(users) .bins("name", "score") .id("user-1").values("User 1", 10);for (int id = 2; id <= 50; id++) { rows.id("user-" + id).values("User " + id, id * 10);}rows.execute();async def demo(session): users = DataSet.of("test", "users") user_ids = ["user-1", "user-2", "user-3", "user-4", "user-5"]
stream = await session.query(users.ids(*user_ids)).execute() rows = await stream.collect() stream.close()Complete example
import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;
public class BatchOperationsExample { public static void main(String[] args) { try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { Session session = cluster.createSession(Behavior.DEFAULT); DataSet users = DataSet.of("test", "users"); String key1 = "batch-example-1"; String key2 = "batch-example-2"; String key3 = "batch-example-3";
// Cleanup so the example is repeatable. session.delete(users.ids(key1, key2, key3)).execute().close();
// Batch insert session.insert(users) .bins("name", "age") .id(key1).values("Alice", 28) .id(key2).values("Bob", 35) .id(key3).values("Carol", 22) .execute(); System.out.println("Batch insert complete");
// Batch read RecordStream readStream = session.query(users.ids(key1, key2, key3)) .execute() .forEach(result -> { Record record = result.recordOrThrow(); System.out.println(" - " + record.getString("name")); });
// Batch update session .upsert(users.id(key1), users.id(key2)) .bin("status").setTo("active") .upsert(users.id(key3)) .bin("status").setTo("inactive") .execute().close(); System.out.println("\nBatch update complete");
// Batch delete session.delete(users.ids(key1, key2, key3)) .execute().close(); System.out.println("Batch delete complete"); } }}import asyncio
from aerospike_sdk import Behavior, DataSet, Client
async def main(): async with Client("localhost:3000") as client: session = client.create_session(Behavior.DEFAULT) users = DataSet.of("test", "users") key1 = users.id("batch-example-1") key2 = users.id("batch-example-2") key3 = users.id("batch-example-3")
# Cleanup so the example is repeatable. stream = await session.batch().delete(key1).delete(key2).delete(key3).execute() stream.close()
# Batch insert await ( session.batch() .insert(key1).put({"name": "Alice", "age": 28}) .insert(key2).put({"name": "Bob", "age": 35}) .insert(key3).put({"name": "Carol", "age": 22}) .execute() ) print("Batch insert complete")
# Batch read stream = await session.query(users.ids("batch-example-1", "batch-example-2", "batch-example-3")).execute()
print("\nBatch read complete:") async for row in stream: if row.is_ok: record = row.record_or_raise() if record is not None: print(f" - {record.bins.get('name')}") stream.close()
# Batch update await ( session.batch() .upsert(key1).put({"status": "active"}) .upsert(key2).put({"status": "active"}) .upsert(key3).put({"status": "inactive"}) .execute() ) print("\nBatch update complete")
# Batch delete await ( session.batch() .delete(key1) .delete(key2) .delete(key3) .execute() ) print("Batch delete complete")
if __name__ == "__main__": asyncio.run(main())API reference summary
| Java | Python | Description |
|---|---|---|
session.query(dataSet.ids(...)) | await session.query(data_set.ids(...)).execute() | Batch-read multiple record IDs |
session.insert(dataSet) + repeated .id().values() | session.batch().insert(key).put({...}) (one per key) | Batch insert with one request |
session.upsert(dataSet) + repeated .id().values() | session.batch().upsert(key).put({...}) (one per key) | Batch upsert with one request |
session.delete(dataSet.ids(...)) | await session.batch().delete(key)…execute() (one per key) | Batch delete multiple IDs |
.bins(...) (varargs) | .bins([...]) (list) | Select projected bins for batch reads |
RecordStream / forEach | RecordStream / async for | Iterate per-record results |
Next steps
Async Operations
Non-blocking operations for high throughput.
Query Records
Find records with DSL queries.
Behaviors
Configure batch timeouts and retries.