Create and use transactions
This page describes how to create and use transactions, which were introduced in Aerospike Database 8.0.0. A transaction is an encapsulation of several commands, isolated from commands outside the transaction, and executed atomically.
To ensure strict serializability, transactions in Aerospike require CP consistency mode, known in Aerospike as the strong-consistency (SC) namespace configuration. A transaction can guarantee one of two outcomes: either all commands succeed together, or one or more commands fail, in which case you can request to roll back to the state of the records prior to the attempted transaction. No commands outside the transaction can see the state changes being created inside the transaction.
Transaction best practices
- Remember that transactions and single-record commands can be mixed in the same SC namespace.
- Make sure to call abort when your app gives up on a transaction. Not calling abort invokes unnecessary monitor activity, and leaves records “locked” for much longer than necessary.
- Use
batch-writesto write a group of independent records in a transaction. A batch is especially efficient inside a transaction. - Don’t mix expiration and non-durable deletes with transactions. This is a general recommendation for strong consistency namespaces.
- Pay attention to tombstone accumulation and tomb-raider configs.
- Do not truncate in a namespace with active transactions. If you must use transactions, see Pause and drain transactions before truncating for steps to first disable and drain existing transactions.
- Do not mix transactions with active-active XDR, unless you’re certain that you are not writing to the same records on both sides. Use stretch clusters (multi-site clustering) instead of XDR if you intend to use transactions this way.
Create a transaction
The following steps describe how to create a transaction, bind it to commands, and commit. A transaction object is attached to each command’s policy so the server can logically group related requests.
-
Create a transaction.
import com.aerospike.client.Txn;Txn txn = new Txn();System.out.println("Begin txn: " + txn.getId());import aerospiketxn = aerospike.Transaction()print(f"Begin txn: {txn.id}")import as "github.com/aerospike/aerospike-client-go/v8"txn := as.NewTxn()fmt.Printf("Begin txn: %d\n", txn.Id())#include <aerospike/as_txn.h>as_txn txn;as_txn_init(&txn);using Aerospike.Client;Txn txn = new Txn();Console.WriteLine("Begin txn: " + txn.Id);const Aerospike = require("aerospike");const txn = new Aerospike.Transaction();console.log("Begin txn:", txn.id); -
Issue commands within a try block and attach the transaction to each command’s policy.
try {WritePolicy wp = client.copyWritePolicyDefault();wp.txn = txn;Key key1 = new Key(namespace, set, 1);client.put(wp, key1, new Bin("a", "val1"));}try:wp = {"txn": txn}key = (namespace, set_name, 1)client.put(key, {"a": "val1"}, policy=wp)wp := as.NewWritePolicy(0, 0)wp.Txn = txnkey, _ := as.NewKey(namespace, set, 1)err := client.PutBins(wp, key, as.NewBin("a", "val1"))if err != nil {client.Abort(txn)log.Fatal(err)}as_policy_write pw;as_policy_write_init(&pw);pw.base.txn = &txn;as_key key;as_key_init_int64(&key, namespace, set, 1);as_record rec;as_record_inita(&rec, 1);as_record_set_str(&rec, "a", "val1");aerospike_key_put(&as, &err, &pw, &key, &rec);if (err.code != AEROSPIKE_OK) {aerospike_abort(&as, &err, &txn, NULL);}as_record_destroy(&rec);try{WritePolicy wp = client.WritePolicyDefault.Clone();wp.Txn = txn;Key key1 = new Key(ns, set, 1);client.Put(wp, key1, new Bin("a", "val1"));}const policy = { txn };try {const key = new Aerospike.Key(ns, set, 1);await client.put(key, { a: "val1" }, {}, policy);} -
Commit the transaction.
System.out.println("Commit txn: " + txn.getId());client.commit(txn);print(f"Commit txn: {txn.id}")client.commit(txn)_, err = client.Commit(txn)if err != nil {log.Fatal(err)}as_commit_status commit_status;aerospike_commit(&as, &err, &txn, &commit_status);as_txn_destroy(&txn);Console.WriteLine("Commit txn: " + txn.Id);client.Commit(txn);await client.commit(txn);
How to handle transaction errors
When an exception is thrown during a transaction, abort the transaction to roll back all changes and release locked records.
catch (AerospikeException ae) { client.abort(txn); if (ae.getResultCode() == ResultCode.MRT_BLOCKED) { // Transaction was blocked by another transaction — retry } if (ae.getResultCode() == ResultCode.MRT_EXPIRED) { // Transaction expired before commit or abort — retry } throw ae;}catch (Throwable t) { client.abort(txn); throw t;}except aerospike.exception.AerospikeError as ae: client.abort(txn) print(f"Error code: {ae.code}, message: {ae.msg}") raiseexcept Exception: client.abort(txn) raiseif err != nil { client.Abort(txn) if asErr, ok := err.(*as.AerospikeError); ok { if asErr.ResultCode == types.MRT_BLOCKED { // Transaction was blocked by another transaction — retry } if asErr.ResultCode == types.MRT_EXPIRED { // Transaction expired — retry } } log.Fatal(err)}if (err.code != AEROSPIKE_OK) { aerospike_abort(&as, &err, &txn, NULL); if (err.code == AEROSPIKE_MRT_BLOCKED) { // Transaction was blocked — retry } if (err.code == AEROSPIKE_MRT_EXPIRED) { // Transaction expired — retry } as_txn_destroy(&txn);}catch (AerospikeException ae){ client.Abort(txn); if (ae.Result == ResultCode.MRT_BLOCKED) { // Transaction was blocked by another transaction — retry } if (ae.Result == ResultCode.MRT_EXPIRED) { // Transaction expired — retry } throw;}catch (Exception){ client.Abort(txn); throw;}catch (err) { await client.abort(txn); if (err.code === Aerospike.status.MRT_BLOCKED) { // Transaction was blocked by another transaction — retry } if (err.code === Aerospike.status.MRT_EXPIRED) { // Transaction expired — retry } throw err;}Status codes
The following status codes can be returned during a transaction:
MRT_BLOCKED: The transaction was blocked by another transaction accessing the same record.MRT_EXPIRED: The transaction deadline was reached without a commit or abort.
if (ae.getResultCode() == ResultCode.MRT_BLOCKED) { // Transaction was blocked by another transaction — retry}if (ae.getResultCode() == ResultCode.MRT_EXPIRED) { // Transaction expired before commit or abort — retry}if ae.code == 120: # MRT_BLOCKED # Transaction was blocked by another transaction — retry passif ae.code == 122: # MRT_EXPIRED # Transaction expired before commit or abort — retry passif asErr.ResultCode == types.MRT_BLOCKED { // Transaction was blocked by another transaction — retry}if asErr.ResultCode == types.MRT_EXPIRED { // Transaction expired before commit or abort — retry}if (err.code == AEROSPIKE_MRT_BLOCKED) { // Transaction was blocked by another transaction — retry}if (err.code == AEROSPIKE_MRT_EXPIRED) { // Transaction expired before commit or abort — retry}if (ae.Result == ResultCode.MRT_BLOCKED){ // Transaction was blocked by another transaction — retry}if (ae.Result == ResultCode.MRT_EXPIRED){ // Transaction expired before commit or abort — retry}if (err.code === Aerospike.status.MRT_BLOCKED) { // Transaction was blocked by another transaction — retry}if (err.code === Aerospike.status.MRT_EXPIRED) { // Transaction expired before commit or abort — retry}Failed commits
A commit may fail if the read verify step fails. Aerospike attempts to do the read, even if it is dirty during the command execution step. Aerospike later sends an exception with the MRT_VERSION_MISMATCH result code during the commit if the read is determined to be invalid due to version mismatch. This indicates that another command outside the transaction changed the targeted record. The correct way to handle this situation is to retry the entire transaction.
There is also a chance that the transaction succeeds but the commit fails. An example of this would be if there is a problem updating the transaction monitor on the server. In these situations, Aerospike returns a commit exception, and if there is uncertainty about whether the transaction was committed fully, it sets the inDoubt flag to true.
To safely handle this situation, catch the exception from the commit and implement separate handling logic as follows.
try { client.commit(txn);}catch (AerospikeException.Commit ce) { if (ce.getInDoubt()) { try { client.commit(txn); } catch (AerospikeException.Commit ce2) { if (ce2.getInDoubt()) { // Recommit still in doubt — log records for later cleanup } } } else { if (ce.getResultCode() == ResultCode.MRT_VERSION_MISMATCH) { // Read was invalidated by an outside write — retry transaction } else { // Other commit failure — retry transaction } }}catch (Throwable t) { // Handle non-Aerospike exception}try: client.commit(txn)except aerospike.exception.AerospikeError as ae: if ae.in_doubt: try: client.commit(txn) except aerospike.exception.AerospikeError as ae2: if ae2.in_doubt: # Recommit still in doubt — log records for later cleanup pass else: if ae.code == 121: # MRT_VERSION_MISMATCH # Read was invalidated by an outside write — retry transaction pass else: # Other commit failure — retry transaction passexcept Exception: # Handle non-Aerospike exception pass_, err = client.Commit(txn)if err != nil { if asErr, ok := err.(*as.AerospikeError); ok { if asErr.InDoubt { _, err2 := client.Commit(txn) if err2 != nil { if asErr2, ok := err2.(*as.AerospikeError); ok && asErr2.InDoubt { // Recommit still in doubt — log records for later cleanup } } } else if asErr.ResultCode == types.MRT_VERSION_MISMATCH { // Read was invalidated by an outside write — retry transaction } else { // Other commit failure — retry transaction } }}as_commit_status commit_status;as_status status = aerospike_commit(&as, &err, &txn, &commit_status);
if (status != AEROSPIKE_OK) { if (err.in_doubt) { as_error err2; as_status status2 = aerospike_commit(&as, &err2, &txn, &commit_status); if (status2 != AEROSPIKE_OK && err2.in_doubt) { // Recommit still in doubt — log records for later cleanup } } else if (err.code == AEROSPIKE_MRT_VERSION_MISMATCH) { // Read was invalidated by an outside write — retry transaction } else { // Other commit failure — retry transaction }}as_txn_destroy(&txn);try{ client.Commit(txn);}catch (AerospikeException.Commit ce){ if (ce.InDoubt) { try { client.Commit(txn); } catch (AerospikeException.Commit ce2) { if (ce2.InDoubt) { // Recommit still in doubt — log records for later cleanup } } } else { if (ce.Result == ResultCode.MRT_VERSION_MISMATCH) { // Read was invalidated by an outside write — retry transaction } else { // Other commit failure — retry transaction } }}catch (Exception){ // Handle non-Aerospike exception}try { await client.commit(txn);} catch (err) { if (err.inDoubt) { try { await client.commit(txn); } catch (err2) { if (err2.inDoubt) { // Recommit still in doubt — log records for later cleanup } } } else if (err.code === Aerospike.status.MRT_VERSION_MISMATCH) { // Read was invalidated by an outside write — retry transaction } else { // Other commit failure — retry transaction }}Examples
Setting a transaction timeout
You can set a timeout (in seconds) that dictates the total allowed time to complete all of the commands in the transaction and commit. The timeout clock starts when the first write request for the transaction is submitted; read requests do not start the timeout clock.
Txn txn = new Txn();txn.setTimeout(20);
try { WritePolicy wp = client.copyWritePolicyDefault(); wp.txn = txn;
// Timeout clock starts after this operation Key key1 = new Key(namespace, set, 1); client.put(wp, key1, new Bin("a", "val1"));
Key key2 = new Key(namespace, set, 2); client.put(wp, key2, new Bin("b", "val2"));}catch (Throwable t) { client.abort(txn); throw t;}
client.commit(txn);txn = aerospike.Transaction()txn.timeout = 20
try: wp = {"txn": txn}
# Timeout clock starts after this operation client.put((namespace, set_name, 1), {"a": "val1"}, policy=wp) client.put((namespace, set_name, 2), {"b": "val2"}, policy=wp)except Exception: client.abort(txn) raise
client.commit(txn)txn := as.NewTxn()txn.SetTimeout(20 * time.Second)
wp := as.NewWritePolicy(0, 0)wp.Txn = txn
// Timeout clock starts after this operationkey1, _ := as.NewKey(namespace, set, 1)err := client.PutBins(wp, key1, as.NewBin("a", "val1"))if err != nil { client.Abort(txn) log.Fatal(err)}
key2, _ := as.NewKey(namespace, set, 2)err = client.PutBins(wp, key2, as.NewBin("b", "val2"))if err != nil { client.Abort(txn) log.Fatal(err)}
_, err = client.Commit(txn)as_txn txn;as_txn_init(&txn);as_txn_set_timeout(&txn, 20);
as_policy_write pw;as_policy_write_init(&pw);pw.base.txn = &txn;
// Timeout clock starts after this operationas_key key1;as_key_init_int64(&key1, namespace, set, 1);as_record rec1;as_record_inita(&rec1, 1);as_record_set_str(&rec1, "a", "val1");aerospike_key_put(&as, &err, &pw, &key1, &rec1);as_record_destroy(&rec1);
as_key key2;as_key_init_int64(&key2, namespace, set, 2);as_record rec2;as_record_inita(&rec2, 1);as_record_set_str(&rec2, "b", "val2");aerospike_key_put(&as, &err, &pw, &key2, &rec2);as_record_destroy(&rec2);
as_commit_status commit_status;aerospike_commit(&as, &err, &txn, &commit_status);as_txn_destroy(&txn);Txn txn = new Txn();txn.Timeout = 20;
try{ WritePolicy wp = client.WritePolicyDefault.Clone(); wp.Txn = txn;
// Timeout clock starts after this operation Key key1 = new Key(ns, set, 1); client.Put(wp, key1, new Bin("a", "val1"));
Key key2 = new Key(ns, set, 2); client.Put(wp, key2, new Bin("b", "val2"));}catch (Exception){ client.Abort(txn); throw;}
client.Commit(txn);const txn = new Aerospike.Transaction();txn.setTimeout(20);
const policy = { txn };
try { // Timeout clock starts after this operation await client.put(new Aerospike.Key(ns, set, 1), { a: "val1" }, {}, policy); await client.put(new Aerospike.Key(ns, set, 2), { b: "val2" }, {}, policy);} catch (err) { await client.abort(txn); throw err;}
await client.commit(txn);Batched writes
For batched writes inside a transaction, attach the transaction to the batch policy.
Txn txn = new Txn();
BatchPolicy bp = client.copyBatchPolicyDefault();bp.txn = txn;
BatchWritePolicy bwp = client.copyBatchWritePolicyDefault();
Key[] keys = new Key[] { new Key(namespace, set, 1), new Key(namespace, set, 2), new Key(namespace, set, 3)};
try { client.operate(bp, bwp, keys, Operation.put(new Bin("color", "blue")) );}catch (AerospikeException ae) { client.abort(txn); throw ae;}
client.commit(txn);from aerospike_helpers.batch import records as brfrom aerospike_helpers.operations import operations as op
txn = aerospike.Transaction()
keys = [ (namespace, set_name, 1), (namespace, set_name, 2), (namespace, set_name, 3),]
try: bp = {"txn": txn} batch_records = br.BatchRecords( [br.Write(key, [op.write("color", "blue")]) for key in keys] ) client.batch_write(batch_records, policy_batch=bp)except Exception: client.abort(txn) raise
client.commit(txn)txn := as.NewTxn()
bp := as.NewBatchPolicy()bp.Txn = txn
key1, _ := as.NewKey(namespace, set, 1)key2, _ := as.NewKey(namespace, set, 2)key3, _ := as.NewKey(namespace, set, 3)
records := []as.BatchRecordIfc{ as.NewBatchWrite(nil, key1, as.PutOp(as.NewBin("color", "blue"))), as.NewBatchWrite(nil, key2, as.PutOp(as.NewBin("color", "blue"))), as.NewBatchWrite(nil, key3, as.PutOp(as.NewBin("color", "blue"))),}
err := client.BatchOperate(bp, records)if err != nil { client.Abort(txn) log.Fatal(err)}
_, err = client.Commit(txn)as_txn txn;as_txn_init(&txn);
as_policy_batch pb;as_policy_batch_init(&pb);pb.base.txn = &txn;
as_operations ops1, ops2, ops3;as_operations_inita(&ops1, 1);as_operations_add_write_str(&ops1, "color", "blue");as_operations_inita(&ops2, 1);as_operations_add_write_str(&ops2, "color", "blue");as_operations_inita(&ops3, 1);as_operations_add_write_str(&ops3, "color", "blue");
as_batch_records records;as_batch_records_inita(&records, 3);
as_batch_write_record* bw;bw = as_batch_write_reserve(&records);as_key_init_int64(&bw->key, namespace, set, 1);bw->ops = &ops1;
bw = as_batch_write_reserve(&records);as_key_init_int64(&bw->key, namespace, set, 2);bw->ops = &ops2;
bw = as_batch_write_reserve(&records);as_key_init_int64(&bw->key, namespace, set, 3);bw->ops = &ops3;
aerospike_batch_write(&as, &err, &pb, &records);as_batch_records_destroy(&records);
as_commit_status commit_status;aerospike_commit(&as, &err, &txn, &commit_status);as_txn_destroy(&txn);Txn txn = new Txn();
BatchPolicy bp = new BatchPolicy(client.BatchPolicyDefault);bp.Txn = txn;
Key[] keys = new Key[]{ new Key(ns, set, 1), new Key(ns, set, 2), new Key(ns, set, 3)};
try{ client.Operate(bp, null, keys, Operation.Put(new Bin("color", "blue")) );}catch (AerospikeException){ client.Abort(txn); throw;}
client.Commit(txn);const txn = new Aerospike.Transaction();
const bp = new Aerospike.BatchPolicy({ txn });
const records = [ { type: Aerospike.batchType.BATCH_WRITE, key: new Aerospike.Key(ns, set, 1), ops: [Aerospike.operations.write("color", "blue")] }, { type: Aerospike.batchType.BATCH_WRITE, key: new Aerospike.Key(ns, set, 2), ops: [Aerospike.operations.write("color", "blue")] }, { type: Aerospike.batchType.BATCH_WRITE, key: new Aerospike.Key(ns, set, 3), ops: [Aerospike.operations.write("color", "blue")] },];
try { await client.batchWrite(records, bp);} catch (err) { await client.abort(txn); throw err;}
await client.commit(txn);Put, get, and delete in the same transaction
The following example shows multiple commands tied to the same transaction.
Txn txn = new Txn();
try { WritePolicy wp = client.copyWritePolicyDefault(); wp.txn = txn;
Key key1 = new Key(namespace, set, 1); client.put(wp, key1, new Bin("a", "val1"));
Policy p = client.copyReadPolicyDefault(); p.txn = txn;
Key key2 = new Key(namespace, set, 3); Record rec = client.get(p, key2);
WritePolicy dp = client.copyWritePolicyDefault(); dp.txn = txn; dp.durableDelete = true; client.delete(dp, key2);}catch (Throwable t) { client.abort(txn); throw t;}
client.commit(txn);txn = aerospike.Transaction()
try: wp = {"txn": txn} client.put((namespace, set_name, 1), {"a": "val1"}, policy=wp)
rp = {"txn": txn} _, _, bins = client.get((namespace, set_name, 3), policy=rp)
dp = {"txn": txn, "durable_delete": True} client.remove((namespace, set_name, 3), policy=dp)except Exception: client.abort(txn) raise
client.commit(txn)txn := as.NewTxn()
wp := as.NewWritePolicy(0, 0)wp.Txn = txn
key1, _ := as.NewKey(namespace, set, 1)err := client.PutBins(wp, key1, as.NewBin("a", "val1"))if err != nil { client.Abort(txn) log.Fatal(err)}
rp := as.NewPolicy()rp.Txn = txn
key2, _ := as.NewKey(namespace, set, 3)rec, err := client.Get(rp, key2)if err != nil { client.Abort(txn) log.Fatal(err)}_ = rec
dp := as.NewWritePolicy(0, 0)dp.Txn = txndp.DurableDelete = true_, err = client.Delete(dp, key2)if err != nil { client.Abort(txn) log.Fatal(err)}
_, err = client.Commit(txn)as_txn txn;as_txn_init(&txn);
as_policy_write pw;as_policy_write_init(&pw);pw.base.txn = &txn;
as_key key1;as_key_init_int64(&key1, namespace, set, 1);as_record rec1;as_record_inita(&rec1, 1);as_record_set_str(&rec1, "a", "val1");aerospike_key_put(&as, &err, &pw, &key1, &rec1);as_record_destroy(&rec1);
as_policy_read pr;as_policy_read_init(&pr);pr.base.txn = &txn;
as_key key2;as_key_init_int64(&key2, namespace, set, 3);as_record* p_rec = NULL;aerospike_key_get(&as, &err, &pr, &key2, &p_rec);if (p_rec) as_record_destroy(p_rec);
as_policy_remove pd;as_policy_remove_init(&pd);pd.base.txn = &txn;pd.durable_delete = true;aerospike_key_remove(&as, &err, &pd, &key2);
as_commit_status commit_status;aerospike_commit(&as, &err, &txn, &commit_status);as_txn_destroy(&txn);Txn txn = new Txn();
try{ WritePolicy wp = client.WritePolicyDefault.Clone(); wp.Txn = txn;
Key key1 = new Key(ns, set, 1); client.Put(wp, key1, new Bin("a", "val1"));
Policy p = client.ReadPolicyDefault.Clone(); p.Txn = txn;
Key key2 = new Key(ns, set, 3); Record rec = client.Get(p, key2);
WritePolicy dp = client.WritePolicyDefault.Clone(); dp.Txn = txn; dp.durableDelete = true; client.Delete(dp, key2);}catch (Exception){ client.Abort(txn); throw;}
client.Commit(txn);const txn = new Aerospike.Transaction();const policy = { txn };
try { await client.put(new Aerospike.Key(ns, set, 1), { a: "val1" }, {}, policy);
const rec = await client.get(new Aerospike.Key(ns, set, 3), policy);
const dp = new Aerospike.WritePolicy({ txn, durableDelete: true }); await client.remove(new Aerospike.Key(ns, set, 3), dp);} catch (err) { await client.abort(txn); throw err;}
await client.commit(txn);