Batched commands
Jump to the Code block for a combined complete example.
Batched commands execute against multiple records issued as a single request.
Batch reads support get
, exists
, getHeader
, and operate
requests.
Batch writes, introduced in Aerospike 6.0, allow write requests against any keys, including updates, deletes, UDFs, and multi-operation operate
commands.
Setup
The following examples will use the setup and record structure below to illustrate batch operations in an Aerospike database.
import ( "fmt" "github.com/aerospike/aerospike-client-go/v6")
// Establishes a connection to the serverclient, err := aerospike.NewClient("127.0.0.1", 3000)if err != nil { log.Fatal(err)}defer client.Close()
The record structure:
Occurred: IntegerReported: IntegerPosted: IntegerReport: Map{ shape: List, summary: String, city: String, state: String, duration: String}Location: GeoJSON
Policies are defined for the batch parent policy as well as batch read, batch write, batch delete, and batch UDF operations. Filter Expressions can be defined within each type of batch operation policy and the batch parent policy, along with other operation specific policies.
// Create a new batch policybatchPolicy := aerospike.NewBatchPolicy()batchPolicy.FilterExpression = aerospike.ExpGreater( // An example that will always return true aerospike.ExpIntVal(2), aerospike.ExpIntVal(1))
// Create the batch write policybatchWritePolicy := aerospike.NewBatchWritePolicy()batchWritePolicy.FilterExpression = aerospike.ExpGreater( // An example that will always return true aerospike.ExpIntVal(2), aerospike.ExpIntVal(1))
Requests
Exists
The following example creates an array of ten keys and checks for their existance in the database.
// Create batch of keyskeys := make([]*aerospike.Key, 10)for i := 0; i < 10; i++ { keys[i], err = aerospike.NewKey("sandbox", "ufodata", (i + 4995)) if err != nil { log.Fatal(err) }}
// Check if records existexists, err := client.BatchExists(batchPolicy, keys)if err != nil { log.Fatal(err)}
for i := 0; i < len(exists); i++{ if !exists[i] { // Do something fmt.Printf("Key: %v does not exist\\n", keys[i].Value()) }}
Read records
The following example creates an array of ten keys and reads the records from the database; returning either the whole record or the specified report
and location
bins.
// Create batch of keyskeys := make([]*aerospike.Key, 10)for i := 0; i < 10; i++ { keys[i], err = aerospike.NewKey("sandbox", "ufodata", (i + 1)) if err != nil { log.Fatal(err) }}
// Read each whole recordrecords, err := client.BatchGet(batchPolicy, keys)if err != nil { log.Fatal(err)}
// Or specifiy bins// records, err := client.BatchGet(batchPolicy, keys, "report", "location")
// Access the recordsfor _, record := range records { if record != nil { // Do something fmt.Printf("Record: %v \\n", record.Bins) }}
Read commands
The following example creates an array of ten keys and accesses the city
and state
map keys to return their respective values from the report
bin, for each record.
// Create batch of keyskeys := make([]*aerospike.Key, 10)for i := 0; i < 10; i++ { keys[i], err = aerospike.NewKey("sandbox", "ufodata", (i + 1)) if err != nil { log.Fatal(err) }}
// Create map key listmapKeys := []interface{}{"city", "state"}
// Get 'city' and 'state' from report map for each recordrecords, err := client.BatchGetOperate(batchPolicy, keys, aerospike.MapGetByKeyListOp("report", mapKeys, aerospike.MapReturnType.VALUE))if err != nil { log.Fatal(err)}
// Access the recordsfor _, record := range records { if record != nil { // Do something fmt.Printf("Record: %v \\n", record.Bins) }}
Read/write commands
The following example creates an array of ten keys and
- Defines an Operation Expression that compares the
occurred
bin value against the provided value,20211231
, and verifies theposted
bin exists to determine the boolean value of the newrecent
key being added to thereport
map. - Returns the
report
bin.
// Define Operation Expressionsexp := aerospike.ExpMapPut(aerospike.DefaultMapPolicy(), aerospike.ExpStringVal("recent"), aerospike.ExpAnd( aerospike.ExpGreater( aerospike.ExpIntBin("occurred"), aerospike.ExpIntVal(20211231)), aerospike.ExpBinExists("posted")), aerospike.ExpMapBin("report"))
// Create batch of recordsbatchRecords := []aerospike.BatchRecordIfc{}
for i := 0; i < 10; i++ { // Create key key, err := aerospike.NewKey("sandbox", "ufodata", (i + 1)) if err != nil { log.Fatal(err) } // Create record record := aerospike.NewBatchWrite(nil, key, aerospike.ExpWriteOp("report", exp, aerospike.ExpWriteFlagDefault), aerospike.GetBinOp("report")) // Add to batch batchRecords = append(batchRecords, record)}
// Execute the write operation and return the report binerr = client.BatchOperate(batchPolicy, batchRecords)if err != nil { log.Fatal(err)}
// Access the recordsfor _, records := range batchRecords { record := records.BatchRec() if record != nil { // Do something fmt.Printf("Record: %v \\n", record.Record.Bins) }}
Deletes
The following example deletes the records from the database.
// Create batch of keyskeys := make([]*aerospike.Key, 10)for i := 0; i < 10; i++ { keys[i], err = aerospike.NewKey("sandbox", "ufodata", (i + 1)) if err != nil { log.Fatal(err) }}
// Delete records paerospikesing null to use the default BatchDeletePolicybatchResults, err := client.BatchDelete(batchPolicy, nil, keys)if err != nil { log.Fatal(err)}
Complex batched commands
The following example creates a list of four batch records that each use a differing set of operations.
The record with user defined key 4000
- uses the
ops1
array that combines the Operation Expression. - uses
exp1
which compares theoccurred
bin value against the provided value,20211231
and verifies theposted
bin exists to determine the boolean value of the newrecent
key being added to thereport
map. - returns the
report
bin.
The record with user defined key 4001
- uses the
ops2
array which contains a read Operation Expression that gets the length of theshape
list from thereport
map and returns the value in a synthetic bin namednumShapes
.
The record with user defined key 4002
- uses the
ops3
array which combines a write operation that updates theposted
bin value, with a map operation that updates thecity
value in thereport
map. - returns both the
posted
andreport
bins.
The record with user defined key 4003
is deleted from the database.
// Define Operation Expressionsexp1 := aerospike.ExpMapPut( aerospike.DefaultMapPolicy(), aerospike.ExpStringVal("recent"), aerospike.ExpAnd( aerospike.ExpGreater( aerospike.ExpIntBin("occurred"), aerospike.ExpIntVal(20211231)), aerospike.ExpBinExists("posted")), aerospike.ExpMapBin("report"))
exp2 := aerospike.ExpListSize( aerospike.ExpMapGetByKey( aerospike.MapReturnType.VALUE, aerospike.ExpTypeLIST, aerospike.ExpStringVal("shape"), aerospike.ExpMapBin("report")))
// Define keyskey1, err := aerospike.NewKey("sandbox", "ufodata", 4000)if err != nil { log.Fatal(err)}key2, err := aerospike.NewKey("sandbox", "ufodata", 4001)if err != nil { log.Fatal(err)}key3, err := aerospike.NewKey("sandbox", "ufodata", 4002)if err != nil { log.Fatal(err)}key4, err := aerospike.NewKey("sandbox", "ufodata", 4003)if err != nil { log.Fatal(err)}
// Create batch recordsbatchRecords := []aerospike.BatchRecordIfc{}// Add to batchbatchRecords = append(batchRecords,aerospike.NewBatchWrite(nil, key1, aerospike.ExpWriteOp("report", exp1, aerospike.ExpWriteFlagDefault), aerospike.GetBinOp("report")))
// Create read opsreadOps := []*aerospike.Operation{}readOps = append(readOps, aerospike.ExpReadOp("numShapes", exp2, aerospike.ExpReadFlagDefault))// Add to batchbatchRecords = append(batchRecords, aerospike.NewBatchReadOps(key2, []string{"numShapes"}, readOps))
// Add to batchbatchRecords = append(batchRecords, aerospike.NewBatchWrite(nil, key3, aerospike.PutOp(aerospike.NewBin("posted", 20201108)), aerospike.MapPutOp(aerospike.DefaultMapPolicy(), "report", aerospike.NewValue("city"), aerospike.NewValue("Cedarville")), aerospike.GetBinOp("posted"), aerospike.GetBinOp("report")))
// Add to batchbatchRecords = append(batchRecords, aerospike.NewBatchDelete(nil, key4))
// Execute the write operation and return the report binerr = client.BatchOperate(batchPolicy, batchRecords)if err != nil { log.Fatal(err)}
// Access the recordsfor _, records := range batchRecords { record := records.BatchRec() if record != nil { // Do something fmt.Printf("Record: %v \\n", record.Record.Bins) }}
Code block
Expand this section for a single code block to execute a batch read/write operation
import ( "fmt" "github.com/aerospike/aerospike-client-go/v6")
func main() { // Establishes a connection to the server client, err := aerospike.NewClient("127.0.0.1", 3000) if err != nil { log.Fatal(err) } defer client.Close()
// Create a new batch policy batchPolicy := aerospike.NewBatchPolicy() batchPolicy.FilterExpression = aerospike.ExpGreater( // An example that will always return true aerospike.ExpIntVal(2), aerospike.ExpIntVal(1))
// Create the batch write policy batchWritePolicy := aerospike.NewBatchWritePolicy() batchWritePolicy.FilterExpression = aerospike.ExpGreater( // An example that will always return true aerospike.ExpIntVal(2), aerospike.ExpIntVal(1))
// Define Operation Expressions exp := aerospike.ExpMapPut(aerospike.DefaultMapPolicy(), aerospike.ExpStringVal("recent"), aerospike.ExpAnd( aerospike.ExpGreater( aerospike.ExpIntBin("occurred"), aerospike.ExpIntVal(20211231)), aerospike.ExpBinExists("posted")), aerospike.ExpMapBin("report"))
// Create batch of records batchRecords := []aerospike.BatchRecordIfc{}
for i := 0; i < 10; i++ { // Create key key, err := aerospike.NewKey("sandbox", "ufodata", (i + 1)) if err != nil { log.Fatal(err) } // Create record record := aerospike.NewBatchWrite(nil, key, aerospike.ExpWriteOp("report", exp, aerospike.ExpWriteFlagDefault), aerospike.GetBinOp("report")) // Add to batch batchRecords = append(batchRecords, record) }
// Execute the write operation and return the report bin err = client.BatchOperate(batchPolicy, batchRecords) if err != nil { log.Fatal(err) }
// Access the records for _, records := range batchRecords { record := records.BatchRec() if record != nil { // Do something fmt.Printf("Record: %v \\n", record.Record.Bins) } }}