Tutorial: Asynchronous API in Go
This tutorial demonstrates concurrent, asynchronous data ingestion and retrieval using the Aerospike Go client’s goroutines and channels.
The Aerospike Go client supports both single and batch operations. While batch operations already leverage internal concurrency for efficiency, you can further improve performance by running multiple operations concurrently using goroutines. This guide covers:
- Single async operations: running individual Get/Put operations.
- Batch async operations: running multiple batch operations concurrently or leveraging built-in batch concurrency.
Unlike some other language clients, the Go client doesn’t have explicit async/await patterns. Instead, Go uses its native concurrency primitives for async operations.
Prerequisites
Before using async operations, you should have:
-
Go 1.24.0 installed.
-
Aerospike Go client v8 installed:
Terminal window go get github.com/aerospike/aerospike-client-go/v8 -
Aerospike Database running and accessible.
Initializations
Client setup
Establish a connection to your Aerospike cluster and configure policies for your operations:
package main
import ( "log" as "github.com/aerospike/aerospike-client-go/v8")
func main() { // Connect to Aerospike cluster client, err := as.NewClient("127.0.0.1", 3000) if err != nil { log.Fatal(err) } defer client.Close()
// Configure policies for your operations // Write policy for Put operations writePolicy := as.NewWritePolicy(0, 0)
// this will enable server to persist key writePolicy.SendKey = true
// Read policy for Get operations readPolicy := as.NewPolicy()
// Batch policy for batch operations batchPolicy := as.NewBatchPolicy() batchPolicy.ConcurrentNodes = 5 // Number of concurrent node operations
// Your async operations here}Single async operations
Single async operations let you execute individual Get/Put calls independently. By using goroutines, you can run these operations concurrently on different records. The following example demonstrates how to perform a Get/Put on a single record using a goroutine. The same pattern can be extended to handle multiple records concurrently.
asyncPut operations
The following example demonstrates how to write single records:
import ( "fmt" "log"
as "github.com/aerospike/aerospike-client-go/v8")
const ( namespace = "test" set = "demo")
// AsyncPutResult holds the result of an async PUT operationtype AsyncPutResult struct { Key *as.Key Error error}
// asyncPut performs a single PUT operation asynchronouslyfunc asyncPut(client *as.Client, policy *as.WritePolicy, key *as.Key, bins ...*as.Bin) <-chan AsyncPutResult { resultChan := make(chan AsyncPutResult, 1) // buffered 1
go func() { err := client.PutBins(policy, key, bins...) resultChan <- AsyncPutResult{Key: key, Error: err} close(resultChan) }()
return resultChan}
...
client, err := as.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.close()
writePolicy := as.NewWritePolicy(0, 0)writePolicy.SendKey = true
// Create a single key and binskey, _ := as.NewKey(namespace, set, "key-1")bins := []*as.Bin{ as.NewBin("id", 1), as.NewBin("value", "hello"),}
// Start async PUTresultChan := asyncPut(client, writePolicy, key, bins...)
// Wait for resultresult := <-resultChan
if result.Error != nil { log.Printf("Error writing key %s: %v", result.Key.Value(), result.Error)} else { fmt.Printf("Successfully wrote key %s\n", result.Key.Value())}asyncGet operations
The following example demonstrates how to read single records:
import ( "fmt" "log" "sync"
as "github.com/aerospike/aerospike-client-go/v8")
const ( namespace = "test" set = "demo")
// AsyncGetResult holds the result of an async GET operationtype AsyncGetResult struct { Key *as.Key Record *as.Record Error error}
// asyncGet performs a single Get operation in a goroutinefunc asyncGet(client *as.Client, policy *as.BasePolicy, key *as.Key, binNames ...string) <-chan AsyncGetResult { resultChan := make(chan AsyncGetResult, 1)
go func() { record, err := client.Get(policy, key, binNames...) resultChan <- AsyncGetResult{Key: key, Record: record, Error: err} close(resultChan) }()
return resultChan}
...
client, err := as.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.Close()
// Create a read policy (BasePolicy, not Policy)readPolicy := as.NewPolicy()
// Example 1: Single async Get operationfmt.Println("Example 1: Single async Get operation")key, err := as.NewKey(namespace, set, "key-1")if err != nil { log.Fatal(err)}
resultChan := asyncGet(client, readPolicy, key)result := <-resultChan
if result.Error != nil { log.Printf("Error reading key %s: %v", result.Key.Value(), result.Error)} else if result.Record == nil { log.Printf("Key %s not found", result.Key.Value())} else { fmt.Printf("Successfully read key %s: %v\n", result.Key.Value(), result.Record.Bins)}asyncOperate operations
You can also perform async Operate operations combining read and write:
import ( "fmt" "log"
as "github.com/aerospike/aerospike-client-go/v8")
const ( namespace = "test" set = "demo")
// AsyncOperateResult holds the result of an async Operate operationtype AsyncOperateResult struct { Key *as.Key Record *as.Record Error error}
// asyncOperate performs a single Operate operation asynchronouslyfunc asyncOperate(client *as.Client, policy *as.WritePolicy, key *as.Key, ops ...*as.Operation) <-chan AsyncOperateResult { resultChan := make(chan AsyncOperateResult, 1)
go func() { record, err := client.Operate(policy, key, ops...) resultChan <- AsyncOperateResult{Key: key, Record: record, Error: err} close(resultChan) }()
return resultChan}
...
client, err := as.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.Close()
writePolicy := as.NewWritePolicy(0, 0)writePolicy.SendKey = true
// Single keykey, _ := as.NewKey(namespace, set, "key-1")
// Define operations: Put + Add + Getops := []*as.Operation{ as.PutOp(as.NewBin("counter", 1)), as.AddOp(as.NewBin("counter", 1)), as.GetOp(),}
// Start async operateresultChan := asyncOperate(client, writePolicy, key, ops...)
// Wait for resultres := <-resultChan
// Check resultif res.Error != nil { log.Printf("Error operating on key %s: %v", res.Key.Value(), res.Error)} else if res.Record == nil { log.Printf("No record returned for key %s", res.Key.Value())} else { fmt.Printf("Operate successful for key %s: %v\n", res.Key.Value(), res.Record.Bins)}Batch async operations
You can also run multiple batch operations concurrently for even better performance. Wrap batch operations in go routines to avoid blocking.
BatchWrite operations
The following example demonstrates concurrent batch write operations:
import ( "fmt" "log" "time"
as "github.com/aerospike/aerospike-client-go/v8")
const ( namespace = "test" set = "demo")
// AsyncBatchWriteResult holds the result of a batch writetype AsyncBatchWriteResult struct { BatchIndex int Keys []*as.Key Error error}
// asyncBatchPut performs a batch put asynchronouslyfunc asyncBatchPut(client *as.Client, policy *as.BatchPolicy, writePolicy *as.BatchWritePolicy, batchIdx int, keys []*as.Key, bins []*as.Bin, results chan<- AsyncBatchWriteResult) { go func() { defer func() { if r := recover(); r != nil { results <- AsyncBatchWriteResult{BatchIndex: batchIdx, Keys: keys, Error: fmt.Errorf("panic: %v", r)} } }()
// Convert bins to operations ops := make([]*as.Operation, len(bins)) for i, bin := range bins { ops[i] = as.PutOp(bin) }
batchRecords := make([]as.BatchRecordIfc, len(keys)) for i, key := range keys { batchRecords[i] = as.NewBatchWrite(writePolicy, key, ops...) }
err := client.BatchOperate(policy, batchRecords) results <- AsyncBatchWriteResult{BatchIndex: batchIdx, Keys: keys, Error: err} }()}
func exampleAsyncBatchPuts(client *as.Client) error { // Run multiple batch write operations concurrently batchPolicy := as.NewBatchPolicy() batchPolicy.ConcurrentNodes = 5
// Create BatchWritePolicy (not BasePolicy or WritePolicy) writePolicy := as.NewBatchWritePolicy() writePolicy.SendKey = true
batchSize := 50 totalKeys := 500 numBatches := (totalKeys + batchSize - 1) / batchSize
// Create all keys allKeys := make([]*as.Key, totalKeys) for i := 0; i < totalKeys; i++ { key, err := as.NewKey(namespace, set, fmt.Sprintf("batch-key-%d", i)) if err != nil { return err } allKeys[i] = key }
// Shared results channel results := make(chan AsyncBatchWriteResult, numBatches)
// Start all batch writes for batchIdx := 0; batchIdx < numBatches; batchIdx++ { start := batchIdx * batchSize end := start + batchSize if end > totalKeys { end = totalKeys } batchKeys := allKeys[start:end] bins := []*as.Bin{ as.NewBin("batchId", batchIdx), as.NewBin("timestamp", time.Now().Unix()), }
asyncBatchPut(client, batchPolicy, writePolicy, batchIdx, batchKeys, bins, results) }
// Single goroutine in main reads all results successCount := 0 errorCount := 0 for i := 0; i < numBatches; i++ { res := <-results if res.Error != nil { log.Printf("Batch %d failed: %v", res.BatchIndex, res.Error) errorCount++ } else { log.Printf("Batch %d completed successfully (%d keys)", res.BatchIndex, len(res.Keys)) successCount++ } }
close(results)
log.Printf("Batch writes complete: %d successful batches, %d failed batches", successCount, errorCount) return nil}
...
client, err := as.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.Close()
if err := exampleAsyncBatchPuts(client); err != nil { log.Println("Batch writes failed:", err)} else { log.Println("All batch writes completed successfully")}BatchGet operations
The following example demonstrates concurrent batch read operations:
import ( "fmt" "log"
as "github.com/aerospike/aerospike-client-go/v8")
const ( namespace = "test" set = "demo")
// AsyncBatchReadResult holds the result of an async batch read operationtype AsyncBatchReadResult struct { BatchIndex int Keys []*as.Key Records []*as.Record Error error}
// asyncBatchGet performs a batch Get operation in a goroutinefunc asyncBatchGet( client *as.Client, policy *as.BatchPolicy, batchIdx int, keys []*as.Key, results chan<- AsyncBatchReadResult, binNames ...string,) { go func() { // Protect against panic defer func() { if r := recover(); r != nil { results <- AsyncBatchReadResult{ BatchIndex: batchIdx, Keys: keys, Error: fmt.Errorf("panic: %v", r), } } }()
// Perform batch Get records, err := client.BatchGet(policy, keys, binNames...)
// Send result results <- AsyncBatchReadResult{ BatchIndex: batchIdx, Keys: keys, Records: records, Error: err, } }()}
// Example: Run multiple batch read operations concurrentlyfunc exampleAsyncBatchGets(client *as.Client) error { batchPolicy := as.NewBatchPolicy() batchPolicy.ConcurrentNodes = 5
batchSize := 50 totalKeys := 500 numBatches := (totalKeys + batchSize - 1) / batchSize
// Create all keys allKeys := make([]*as.Key, totalKeys) for i := 0; i < totalKeys; i++ { key, err := as.NewKey(namespace, set, fmt.Sprintf("batch-key-%d", i)) if err != nil { return err } allKeys[i] = key }
// Shared results channel results := make(chan AsyncBatchReadResult, numBatches)
// Start all batch read goroutines for batchIdx := 0; batchIdx < numBatches; batchIdx++ { start := batchIdx * batchSize end := start + batchSize if end > totalKeys { end = totalKeys }
batchKeys := allKeys[start:end]
asyncBatchGet(client, batchPolicy, batchIdx, batchKeys, results) }
// Collect results in SINGLE goroutine (main) totalRecords := 0 errorCount := 0
for i := 0; i < numBatches; i++ { res := <-results
if res.Error != nil { log.Printf("Batch %d read failed: %v", res.BatchIndex, res.Error) errorCount++ continue }
// Count found records found := 0 for _, r := range res.Records { if r != nil { found++ } }
log.Printf("Batch %d read: %d records found out of %d keys", res.BatchIndex, found, len(res.Keys))
totalRecords += found }
close(results)
log.Printf("Batch reads complete: %d total records found, %d failed batches", totalRecords, errorCount)
return nil}
...
client, err := as.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal("Failed to connect:", err)}defer client.Close()
log.Println("Running async batch GET example...")
if err := exampleAsyncBatchGets(client); err != nil { log.Println("Batch reads failed:", err)} else { log.Println("All batch reads completed successfully")}BatchOperate operations
You can perform batch operate operations concurrently:
import ( "fmt" "log"
as "github.com/aerospike/aerospike-client-go/v8")
const ( namespace = "test" set = "demo")
// Result struct for batch operatetype AsyncBatchOperateResult struct { BatchIndex int Error error}
// asyncBatchOperate runs a whole batch in a goroutinefunc asyncBatchOperate( client *as.Client, batchPolicy *as.BatchPolicy, writePolicy *as.BatchWritePolicy, batchIdx int, keys []*as.Key, results chan<- AsyncBatchOperateResult,) { go func() { // ensure panic does not crash program defer func() { if r := recover(); r != nil { results <- AsyncBatchOperateResult{ BatchIndex: batchIdx, Error: fmt.Errorf("panic: %v", r), } } }()
// Build operations for each record batchRecords := make([]as.BatchRecordIfc, len(keys))
for i, key := range keys { ops := []*as.Operation{ as.PutOp(as.NewBin("batchId", batchIdx)), as.AddOp(as.NewBin("counter", 1)), as.GetOp(), } batchRecords[i] = as.NewBatchWrite(writePolicy, key, ops...) }
// Perform batch operate err := client.BatchOperate(batchPolicy, batchRecords)
// Send result to shared channel results <- AsyncBatchOperateResult{ BatchIndex: batchIdx, Error: err, } }()}
func exampleAsyncBatchOperate(client *as.Client) error {
batchPolicy := as.NewBatchPolicy() batchPolicy.ConcurrentNodes = 5
writePolicy := as.NewBatchWritePolicy() writePolicy.SendKey = true
// Split into batches batchSize := 50 totalKeys := 500 numBatches := (totalKeys + batchSize - 1) / batchSize
// Create all keys allKeys := make([]*as.Key, totalKeys) for i := 0; i < totalKeys; i++ { key, err := as.NewKey(namespace, set, fmt.Sprintf("batch-key-%d", i)) if err != nil { return err } allKeys[i] = key }
// Shared channel for all results results := make(chan AsyncBatchOperateResult, numBatches)
// Launch goroutines for each batch for batchIdx := 0; batchIdx < numBatches; batchIdx++ { start := batchIdx * batchSize end := start + batchSize if end > totalKeys { end = totalKeys }
batchKeys := allKeys[start:end]
asyncBatchOperate(client, batchPolicy, writePolicy, batchIdx, batchKeys, results) }
// Single reader loop successCount := 0 errorCount := 0
for i := 0; i < numBatches; i++ { res := <-results
if res.Error != nil { log.Printf("Batch %d failed: %v", res.BatchIndex, res.Error) errorCount++ } else { log.Printf("Batch %d OK", res.BatchIndex) successCount++ } }
close(results)
log.Printf( "Batch Operate done: %d success, %d failed", successCount, errorCount, )
return nil}
...
client, err := as.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.Close()
log.Println("Running async batch OPERATE example...")
if err := exampleAsyncBatchOperate(client); err != nil { log.Println("Batch operate failed:", err)} else { log.Println("All batch operate completed successfully")}Using batch policy concurrency
The batch operations already leverage internal concurrency. You can control this with the ConcurrentNodes setting:
// Configure batch policy for optimal concurrencybatchPolicy := as.NewBatchPolicy()batchPolicy.ConcurrentNodes = 10 // Process up to 10 nodes concurrentlybatchPolicy.AllowPartialResults = true // Continue even if some keys fail
// The batch operation will automatically use goroutines internallyrecords, err := client.BatchGet(batchPolicy, keys, "bin1", "bin2")Cleanup
Proper cleanup is essential to prevent resource leaks and ensure graceful shutdown.
Closing the client
Always close the client when done:
client, err := as.NewClient("127.0.0.1", 3000) if err != nil { log.Fatal(err) } defer client.Close() // Ensures client is closed even on panic
// Your operations hereBest practices
- Use batch operations when possible: batch operations are more efficient than multiple single operations.
- Context usage: use context for timeout and cancellation support.
- Batch policy tuning: adjust
ConcurrentNodesbased on your cluster size and workload. - Connection pooling: the client manages connection pooling automatically, but ensure proper client initialization.