Use transactions
Transactions allow you to execute multiple operations atomically across different records. Either all operations succeed, or none do.
What are transactions?
Traditional Aerospike operations are atomic at the single-record level. Transactions extend atomicity across multiple records:
- Atomicity: All operations commit or all abort
- Consistency: Intermediate states are never visible
- Isolation: Concurrent transactions don’t interfere
- Durability: Committed transactions survive failures
Start a transaction
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.Session;
// Transactions run through Session.doInTransaction; commit happens when the block returns.DataSet accounts = DataSet.of("test", "accounts");
try { session.doInTransaction(txn -> { txn.insert(accounts) .bins("balance") .id("account-A").values(1000) .execute();
txn.insert(accounts) .bins("balance") .id("account-B").values(500) .execute(); });} catch (AerospikeException e) { // On unsupported server/namespace configs this can fail (for example "Unsupported Server Feature"). System.err.println("Transaction failed: " + e.getMessage());}Use doInTransactionReturning(...) when you need a value back from the transaction:
String status = session.doInTransactionReturning(txn -> { Record account = txn.query(accounts.id("account-A")).execute().getFirstRecord(); long balance = account != null ? account.getLong("balance") : 0L; if (balance < 100) { txn.abort(); return "insufficient-funds"; } txn.update(accounts.id("account-A")) .bin("balance").add(-100) .execute(); return "debited";});from aerospike_sdk import Behavior, Client, DataSet
async with Client("localhost:3000") as client: session = client.create_session(Behavior.DEFAULT) accounts = DataSet.of("test_sc", "accounts")
# Explicit-scope form: commit on clean exit, abort on exception. async with session.begin_transaction() as txn: await txn.insert(accounts.id("account-A")).put({"balance": 1000}).execute() await txn.insert(accounts.id("account-B")).put({"balance": 500}).execute()
# Retrying-wrapper form (analogous to Java's doInTransaction): async def transfer(txn): await txn.update(accounts.id("account-A")).bin("balance").add(-100).execute() await txn.update(accounts.id("account-B")).bin("balance").add(100).execute()
await session.do_in_transaction(transfer)doInTransaction and doInTransactionReturning (Java) and
do_in_transaction (Python) automatically retry the whole callable if the
transaction failed for what it considers a “retryable” reason. These are
typically if the transaction being managed was blocked by another
transaction. Retries occur on result codes of MRT_BLOCKED,
MRT_VERSION_MISMATCH or TXN_FAILED. The number of retries and the
duration between retries is controllable at the Behavior level.
CRUD within a transaction
All standard operations work within transactions:
import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import java.util.UUID;
DataSet accounts = DataSet.of("test", "accounts");DataSet ledger = DataSet.of("test", "ledger");DataSet pendingTransfers = DataSet.of("test", "pending_transfers");
session.doInTransaction(txn -> { Record account = txn.query(accounts.id("account-A")).execute().getFirstRecord(); long balance = account != null ? account.getLong("balance") : 0L;
txn.update(accounts.id("account-A")) .bin("balance").setTo(balance - 100) .execute();
txn.insert(ledger) .bins("from", "amount", "timestamp") .id(UUID.randomUUID().toString()).values("account-A", 100, System.currentTimeMillis()) .execute();
txn.delete(pendingTransfers.id("transfer-123")).execute();});import uuidimport time
from aerospike_sdk import Behavior, Client, DataSet
async with Client("localhost:3000") as client: session = client.create_session(Behavior.DEFAULT) accounts = DataSet.of("test_sc", "accounts") ledger = DataSet.of("test_sc", "ledger") pending = DataSet.of("test_sc", "pending_transfers")
async def debit_and_log(txn): stream = await txn.query(accounts.id("account-A")).execute() row = await stream.first() stream.close() balance = row.record_or_raise().bins.get("balance", 0) if row else 0
await ( txn.update(accounts.id("account-A")) .bin("balance").set_to(balance - 100) .execute() )
await ( txn.insert(ledger.id(str(uuid.uuid4()))) .put({"from": "account-A", "amount": 100, "timestamp": int(time.time() * 1000)}) .execute() )
await txn.delete(pending.id("transfer-123")).execute()
await session.do_in_transaction(debit_and_log)Commit and abort
Commit
Commit makes all transaction operations permanent:
// With session.doInTransaction(...), commit is performed automatically after the lambda completes.// There is no separate txn.commit() API on Session — doInTransaction commits for you.# With `async with session.begin_transaction() as txn:`, the transaction is# committed automatically when the block exits cleanly. You can also commit# explicitly:# status = await txn.commit()# `txn.commit()` invokes the underlying client commit and returns a# `CommitStatus`; calling it twice (or after an abort) raises RuntimeError.Abort
Abort discards all transaction operations:
// From inside doInTransaction, call txn.abort() to abort programmatically.// Outside that API, letting an exception propagate aborts the transaction.# Inside an `async with session.begin_transaction() as txn:` block:# - call `await txn.abort()` (or its alias `await txn.rollback()`) to# abort programmatically, OR# - let an exception propagate and the context manager will abort for you.await txn.abort()Exclude a specific operation from the current transaction (Java)
Use notInAnyTransaction() when a specific operation must execute outside the active transaction.
import com.aerospike.client.sdk.DataSet;
DataSet accounts = DataSet.of("test", "accounts");DataSet audit = DataSet.of("test", "audit");
session.doInTransaction(txn -> { txn.update(accounts.id("acct:1")) .bin("balance").add(-100) .execute();
// This write executes outside the transaction. txn.insert(audit) .bins("event", "amount") .id("audit:1").values("debit", 100) .notInAnyTransaction() .execute();});Handling in-doubt state
If the client loses connection during commit, the transaction may be in an “in-doubt” state. However, the commit automatically retries depending on policy settings for up to 15 seconds; more than enough time for the cluster to re-adjust.
commit is an idempotent operation, so even if it does fail after this time, it is safe to retry the commit.
Retry patterns for transactions
Implement robust retry logic for transient failures. Note that Behaviors can be configured with automatic retries, so transient failures are typically only an issue on non-idempotent writes (which cannot easily be retried).
import com.aerospike.client.sdk.AerospikeException;import com.aerospike.client.sdk.ResultCode;
public void transferWithRetry(Session session, int maxAttempts) throws InterruptedException { AtomicBoolean timeoutFailure = new AtomicBoolean(); for (int attempt = 1; attempt <= maxAttempts; attempt++) { timeoutFailure.set(false); try { session.doInTransaction(txn -> { try { // Non-idempotent operations cannot handle `inDoubt` well txn.update(acctKey).bin("balance").add(500); } catch (Timeout timeout) { timeoutFailure.set(true); txn.abort(); } }); if (timeoutFailure.get()) { Thread.sleep((long) Math.pow(2, attempt) * 10L); continue; } return; } catch (AerospikeException e) { if ((e.getResultCode() == ResultCode.MRT_BLOCKED || e.getResultCode() == ResultCode.MRT_VERSION_MISMATCH || e.getResultCode() == ResultCode.TXN_FAILED) && attempt < maxAttempts) { Thread.sleep((long) Math.pow(2, attempt) * 10L); continue; } throw e; } }}import asyncio
from aerospike_async import ResultCodefrom aerospike_sdk.exceptions import AerospikeError
# do_in_transaction already retries on the SDK's built-in retryable result# codes (MRT_BLOCKED, MRT_VERSION_MISMATCH, TXN_FAILED) according to Behavior# settings. Wrap it for application-level backoff on top of that.async def with_retry(operation, max_retries=3): attempt = 0 while True: try: return await operation() except AerospikeError as exc: attempt += 1 if attempt >= max_retries: raise # Exponential backoff between transactional retries await asyncio.sleep((2 ** attempt) * 0.01)
# Usage: pass a coroutine function that runs the transaction.async def transfer_funds(session, accounts, from_id, to_id, amount): async def op(txn): await txn.update(accounts.id(from_id)).bin("balance").add(-amount).execute() await txn.update(accounts.id(to_id)).bin("balance").add(amount).execute()
return await session.do_in_transaction(op)Complete example
import com.aerospike.client.sdk.AerospikeException;import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;
public class TransactionTransferExample { public static void main(String[] args) { try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { Session session = cluster.createSession(Behavior.DEFAULT); DataSet accounts = DataSet.of("test", "accounts");
String from = "txn-example-A"; String to = "txn-example-B"; long transfer = 100L;
// Cleanup + seed so the example is repeatable. session.delete(accounts.ids(from, to)).execute(); session.insert(accounts) .bins("owner", "balance") .id(from).values("Alice", 1000L) .id(to).values("Bob", 300L) .execute();
// Atomic transfer. try { session.doInTransaction(txn -> { Record fromRec = txn.query(accounts.id(from)).execute().getFirstRecord(); Record toRec = txn.query(accounts.id(to)).execute().getFirstRecord(); long fromBal = fromRec != null ? fromRec.getLong("balance") : 0L; long toBal = toRec != null ? toRec.getLong("balance") : 0L;
if (fromBal < transfer) { txn.abort(); return; }
txn.update(accounts.id(from)) .bin("balance").add(-transfer) .execute();
txn.update(accounts.id(to)) .bin("balance").add(transfer) .execute(); });
// Verify resulting balances. Record afterFrom = session.query(accounts.id(from)).execute().getFirstRecord(); Record afterTo = session.query(accounts.id(to)).execute().getFirstRecord(); System.out.println("From balance: " + (afterFrom != null ? afterFrom.getLong("balance") : null)); System.out.println("To balance: " + (afterTo != null ? afterTo.getLong("balance") : null)); } catch (AerospikeException e) { System.err.println("Transaction failed: " + e.getMessage()); System.err.println("Ensure this namespace supports transactions (SC namespace on server 8.0+)."); } } }}import asyncio
from aerospike_sdk import Behavior, Client, DataSetfrom aerospike_sdk.exceptions import AerospikeError
async def main(): async with Client("localhost:3000") as client: session = client.create_session(Behavior.DEFAULT) # Transactions require an SC namespace on Aerospike Server 8.0+. accounts = DataSet.of("test_sc", "accounts") from_id = "txn-example-A" to_id = "txn-example-B" transfer = 100
# Cleanup + seed so the example is repeatable. try: await session.delete(accounts.id(from_id)).execute() await session.delete(accounts.id(to_id)).execute() except AerospikeError: pass await session.insert(accounts.id(from_id)).put({"owner": "Alice", "balance": 1000}).execute() await session.insert(accounts.id(to_id)).put({"owner": "Bob", "balance": 300}).execute()
# Atomic transfer: do_in_transaction commits on clean return and # automatically retries on retryable failures (MRT_BLOCKED, ...). async def transfer_op(txn): from_stream = await txn.query(accounts.id(from_id)).execute() to_stream = await txn.query(accounts.id(to_id)).execute() from_row = await from_stream.first() to_row = await to_stream.first() from_stream.close() to_stream.close()
from_bal = from_row.record_or_raise().bins.get("balance", 0) if from_row else 0 to_bal = to_row.record_or_raise().bins.get("balance", 0) if to_row else 0 if from_bal < transfer: await txn.abort() return
await txn.update(accounts.id(from_id)).bin("balance").add(-transfer).execute() await txn.update(accounts.id(to_id)).bin("balance").add(transfer).execute()
try: await session.do_in_transaction(transfer_op) except AerospikeError as exc: print(f"Transaction failed: {exc}") print("Ensure this namespace supports transactions (SC namespace on server 8.0+).") return
# Verify resulting balances. stream = await session.query(accounts.id(from_id), accounts.id(to_id)).execute() async for row in stream: record = row.record_or_raise() print(f"{row.key.value}: {record.bins.get('balance')}") stream.close()
if __name__ == "__main__": asyncio.run(main())Transaction limitations
| Limitation | Value | Notes |
|---|---|---|
| Max records per transaction | 128 | Configurable on server |
| Max transaction duration | 10s | Default timeout |
| Namespaces | SC only | Strong consistency required |
| Operations | Point-key CRUD | Full-set query/scan and batch APIs are not transactional |
Best practices
- Keep transactions short — Minimize time between begin and commit
- Limit scope — Include only records that must be atomic
- Handle conflicts — Implement retry logic for concurrent modifications
- Use idempotent operations — Enables safe retries
- Monitor in-doubt state — Log and alert on in-doubt transactions
Next steps
Error Handling
Handle transaction errors gracefully.
Batch Operations
For non-transactional bulk operations.