Primary index queries
The digest of every record in the Aerospike database is indexed in the primary index (PI) of its namespace. This enables query access to all the records in a namespace, and with the assistance of a set index, fast query access to all the records in a set.
Foreground queries
Foreground queries are read-only. The examples below show bin projection, filter expressions, and a partitioned metadata-only count.
Insert data
The examples on this page use an ad-tech user profile set profiles in namespace test. Each record represents a user with an integer age, a string region, and a map segments keyed by segment ID. The map values are lists of [ttl_epoch, category, tier].
{ "uid": "user42", "age": 28, "region": "NA", "segments": { 1001: [1704067200, "sports", "premium"], 1002: [1672531200, "travel", "standard"], 1003: [1711929600, "news", "basic"] }}Enable the set index for the profiles set so queries get direct access to the records in the set, without needing to traverse the entire primary index:
Admin+> manage config namespace test set profiles param enable-index to trueInsert two sample records:
Key key1 = new Key("test", "profiles", "user42");Map<Long, List<Object>> segs1 = new HashMap<>();segs1.put(1001L, Arrays.asList(1704067200L, "sports", "premium"));segs1.put(1002L, Arrays.asList(1672531200L, "travel", "standard"));segs1.put(1003L, Arrays.asList(1711929600L, "news", "basic"));
client.put(null, key1, new Bin("uid", "user42"), new Bin("age", 28), new Bin("region", "NA"), new Bin("segments", segs1));
Key key2 = new Key("test", "profiles", "user99");Map<Long, List<Object>> segs2 = new HashMap<>();segs2.put(2001L, Arrays.asList(1711929600L, "finance", "premium"));
client.put(null, key2, new Bin("uid", "user99"), new Bin("age", 35), new Bin("region", "EU"), new Bin("segments", segs2));key1 = ("test", "profiles", "user42")client.put(key1, { "uid": "user42", "age": 28, "region": "NA", "segments": { 1001: [1704067200, "sports", "premium"], 1002: [1672531200, "travel", "standard"], 1003: [1711929600, "news", "basic"], },})
key2 = ("test", "profiles", "user99")client.put(key2, { "uid": "user99", "age": 35, "region": "EU", "segments": { 2001: [1711929600, "finance", "premium"], },})key1, _ := as.NewKey("test", "profiles", "user42")segs1 := map[interface{}]interface{}{ 1001: []interface{}{1704067200, "sports", "premium"}, 1002: []interface{}{1672531200, "travel", "standard"}, 1003: []interface{}{1711929600, "news", "basic"},}client.PutBins(nil, key1, as.NewBin("uid", "user42"), as.NewBin("age", 28), as.NewBin("region", "NA"), as.NewBin("segments", segs1))
key2, _ := as.NewKey("test", "profiles", "user99")segs2 := map[interface{}]interface{}{ 2001: []interface{}{1711929600, "finance", "premium"},}client.PutBins(nil, key2, as.NewBin("uid", "user99"), as.NewBin("age", 35), as.NewBin("region", "EU"), as.NewBin("segments", segs2))as_key key1;as_key_init_str(&key1, "test", "profiles", "user42");
as_hashmap segs1;as_hashmap_init(&segs1, 3);
as_arraylist seg1001;as_arraylist_init(&seg1001, 3, 0);as_arraylist_append_int64(&seg1001, 1704067200);as_arraylist_append_str(&seg1001, "sports");as_arraylist_append_str(&seg1001, "premium");as_hashmap_set(&segs1, (as_val*)as_integer_new(1001), (as_val*)&seg1001);
as_arraylist seg1002;as_arraylist_init(&seg1002, 3, 0);as_arraylist_append_int64(&seg1002, 1672531200);as_arraylist_append_str(&seg1002, "travel");as_arraylist_append_str(&seg1002, "standard");as_hashmap_set(&segs1, (as_val*)as_integer_new(1002), (as_val*)&seg1002);
as_arraylist seg1003;as_arraylist_init(&seg1003, 3, 0);as_arraylist_append_int64(&seg1003, 1711929600);as_arraylist_append_str(&seg1003, "news");as_arraylist_append_str(&seg1003, "basic");as_hashmap_set(&segs1, (as_val*)as_integer_new(1003), (as_val*)&seg1003);
as_record rec1;as_record_inita(&rec1, 4);as_record_set_str(&rec1, "uid", "user42");as_record_set_int64(&rec1, "age", 28);as_record_set_str(&rec1, "region", "NA");as_record_set_map(&rec1, "segments", (as_map*)&segs1);aerospike_key_put(&as, &err, NULL, &key1, &rec1);as_record_destroy(&rec1);
as_key key2;as_key_init_str(&key2, "test", "profiles", "user99");
as_hashmap segs2;as_hashmap_init(&segs2, 1);
as_arraylist seg2001;as_arraylist_init(&seg2001, 3, 0);as_arraylist_append_int64(&seg2001, 1711929600);as_arraylist_append_str(&seg2001, "finance");as_arraylist_append_str(&seg2001, "premium");as_hashmap_set(&segs2, (as_val*)as_integer_new(2001), (as_val*)&seg2001);
as_record rec2;as_record_inita(&rec2, 4);as_record_set_str(&rec2, "uid", "user99");as_record_set_int64(&rec2, "age", 35);as_record_set_str(&rec2, "region", "EU");as_record_set_map(&rec2, "segments", (as_map*)&segs2);aerospike_key_put(&as, &err, NULL, &key2, &rec2);as_record_destroy(&rec2);Key key1 = new("test", "profiles", "user42");Dictionary<object, object> segs1 = new(){ [1001L] = new List<object> { 1704067200L, "sports", "premium" }, [1002L] = new List<object> { 1672531200L, "travel", "standard" }, [1003L] = new List<object> { 1711929600L, "news", "basic" },};client.Put(null, key1, new Bin("uid", "user42"), new Bin("age", 28), new Bin("region", "NA"), new Bin("segments", segs1));
Key key2 = new("test", "profiles", "user99");Dictionary<object, object> segs2 = new(){ [2001L] = new List<object> { 1711929600L, "finance", "premium" },};client.Put(null, key2, new Bin("uid", "user99"), new Bin("age", 35), new Bin("region", "EU"), new Bin("segments", segs2));const key1 = new Aerospike.Key("test", "profiles", "user42");await client.put(key1, { uid: "user42", age: 28, region: "NA", segments: { 1001: [1704067200, "sports", "premium"], 1002: [1672531200, "travel", "standard"], 1003: [1711929600, "news", "basic"], },});
const key2 = new Aerospike.Key("test", "profiles", "user99");await client.put(key2, { uid: "user99", age: 35, region: "EU", segments: { 2001: [1711929600, "finance", "premium"], },});Query all records in a set
Query the profiles set and project only the uid and region bins. Because the set index is enabled, the server does not need to traverse the primary index skipping records in other sets.
The set index grants this query with direct access to the records of the profiles set.
Statement stmt = new Statement();stmt.setNamespace("test");stmt.setSetName("profiles");stmt.setBinNames("uid", "region");
try (RecordSet rs = client.query(null, stmt)) { while (rs.next()) { Record rec = rs.getRecord(); System.out.printf("uid=%s region=%s%n", rec.getString("uid"), rec.getString("region")); }}query = client.query("test", "profiles")query.select("uid", "region")
for _, _, bins in query.results(): print(f"uid={bins['uid']} region={bins['region']}")stmt := as.NewStatement("test", "profiles", "uid", "region")
rs, err := client.Query(nil, stmt)if err != nil { panic(err)}for rec := range rs.Results() { if rec.Err != nil { panic(rec.Err) } fmt.Printf("uid=%v region=%v\n", rec.Record.Bins["uid"], rec.Record.Bins["region"])}as_query q;as_query_init(&q, "test", "profiles");as_query_select(&q, "uid");as_query_select(&q, "region");
aerospike_query_foreach(&as, &err, NULL, &q, query_cb, NULL);as_query_destroy(&q);Statement stmt = new();stmt.SetNamespace("test");stmt.SetSetName("profiles");stmt.SetBinNames("uid", "region");
using RecordSet rs = client.Query(null, stmt);while (rs.Next()){ Record rec = rs.Record; Console.WriteLine($"uid={rec.GetString("uid")} region={rec.GetString("region")}");}const query = client.query("test", "profiles");query.select("uid", "region");
const stream = query.foreach();stream.on("data", (rec) => { console.log(`uid=${rec.bins.uid} region=${rec.bins.region}`);});await new Promise((resolve, reject) => { stream.on("error", reject); stream.on("end", resolve);});Query with a filter expression
Filter records using a filter expression. The expression evaluates metadata predicates first for fast-path elimination, then bin predicates. This example selects profiles updated within the last five minutes (since_update), in the "NA" region, where age >= 21, and uses operation projection to return the number of segments as a computed bin.
long fiveMinMs = 5L * 60 * 1000;
Statement stmt = new Statement();stmt.setNamespace("test");stmt.setSetName("profiles");
stmt.setOperations( ExpOperation.read("active_segments", Exp.build(MapExp.size(Exp.mapBin("segments"))), ExpReadFlags.DEFAULT));
QueryPolicy queryPolicy = new QueryPolicy();queryPolicy.filterExp = Exp.build( Exp.and( Exp.lt(Exp.sinceUpdate(), Exp.val(fiveMinMs)), Exp.eq(Exp.stringBin("region"), Exp.val("NA")), Exp.ge(Exp.intBin("age"), Exp.val(21))));
try (RecordSet rs = client.query(queryPolicy, stmt)) { while (rs.next()) { Record rec = rs.getRecord(); System.out.printf("uid=%s active_segments=%d%n", rec.getString("uid"), rec.getLong("active_segments")); }}from aerospike_helpers.expressions import base as exp, map as map_exp
five_min_ms = 5 * 60 * 1000
query = client.query("test", "profiles")
expr = exp.And( exp.LT(exp.SinceUpdateTime(), exp.Val(five_min_ms)), exp.Eq(exp.StrBin("region"), exp.Val("NA")), exp.GE(exp.IntBin("age"), exp.Val(21)),).compile()
policy = {"expressions": expr}
for _, _, bins in query.results(policy): print(f"uid={bins['uid']} segments={bins['segments']}")fiveMinMs := int64(5 * 60 * 1000)
stmt := as.NewStatement("test", "profiles")
qp := as.NewQueryPolicy()qp.FilterExpression = as.ExpAnd( as.ExpLess(as.ExpSinceUpdate(), as.ExpIntVal(fiveMinMs)), as.ExpEq(as.ExpStringBin("region"), as.ExpStringVal("NA")), as.ExpGreaterEq(as.ExpIntBin("age"), as.ExpIntVal(21)))
rs, err := client.Query(qp, stmt)if err != nil { panic(err)}for rec := range rs.Results() { if rec.Err != nil { panic(rec.Err) } fmt.Printf("uid=%v age=%v region=%v\n", rec.Record.Bins["uid"], rec.Record.Bins["age"], rec.Record.Bins["region"])}as_query q;as_query_init(&q, "test", "profiles");
as_exp_build(filter, as_exp_and( as_exp_cmp_lt(as_exp_since_update(), as_exp_int(5 * 60 * 1000)), as_exp_cmp_eq(as_exp_bin_str("region"), as_exp_str("NA")), as_exp_cmp_ge(as_exp_bin_int("age"), as_exp_int(21))));
as_policy_query p;as_policy_query_init(&p);p.base.filter_exp = filter;
aerospike_query_foreach(&as, &err, &p, &q, query_cb, NULL);as_exp_destroy(filter);as_query_destroy(&q);long fiveMinMs = 5L * 60 * 1000;
Statement stmt = new();stmt.SetNamespace("test");stmt.SetSetName("profiles");
QueryPolicy queryPolicy = new(){ filterExp = Exp.Build( Exp.And( Exp.LT(Exp.SinceUpdate(), Exp.Val(fiveMinMs)), Exp.EQ(Exp.StringBin("region"), Exp.Val("NA")), Exp.GE(Exp.IntBin("age"), Exp.Val(21))))};
using RecordSet rs = client.Query(queryPolicy, stmt);while (rs.Next()){ Record rec = rs.Record; Console.WriteLine($"uid={rec.GetString("uid")} age={rec.GetInt("age")}");}const fiveMinMs = 5 * 60 * 1000;
const query = client.query("test", "profiles");
const queryPolicy = new Aerospike.QueryPolicy({ filterExpression: exp.and( exp.lt(exp.sinceUpdate(), exp.int(fiveMinMs)), exp.eq(exp.binStr("region"), exp.str("NA")), exp.ge(exp.binInt("age"), exp.int(21)) ),});
const stream = query.foreach(queryPolicy);stream.on("data", (rec) => { console.log(`uid=${rec.bins.uid} age=${rec.bins.age}`);});await new Promise((resolve, reject) => { stream.on("error", reject); stream.on("end", resolve);});Query a single partition
Query partition 1000 for "NA" profiles with includeBinData = false to count records without transferring bin data. This pattern is a building block for parallel processing across multiple clients, where each client handles a range of partitions.
Statement stmt = new Statement();stmt.setNamespace("test");stmt.setSetName("profiles");
QueryPolicy qp = new QueryPolicy();qp.includeBinData = false;qp.filterExp = Exp.build( Exp.eq(Exp.stringBin("region"), Exp.val("NA")));
int count = 0;try (RecordSet rs = client.queryPartitions(qp, stmt, PartitionFilter.id(1000))) { while (rs.next()) { count++; }}System.out.printf("Partition 1000 NA count: %d%n", count);query = client.query("test", "profiles")
expr = exp.Eq(exp.StrBin("region"), exp.Val("NA")).compile()policy = { "expressions": expr, "partition_filter": {"begin": 1000, "count": 1},}
count = 0for _, _, bins in query.results(policy, options={"nobins": True}): count += 1
print(f"Partition 1000 NA count: {count}")stmt := as.NewStatement("test", "profiles")
qp := as.NewQueryPolicy()qp.IncludeBinData = falseqp.FilterExpression = as.ExpEq( as.ExpStringBin("region"), as.ExpStringVal("NA"))
pf := as.NewPartitionFilterById(1000)
rs, err := client.QueryPartitions(qp, stmt, pf)if err != nil { panic(err)}
count := 0for rec := range rs.Results() { if rec.Err != nil { panic(rec.Err) } count++}fmt.Printf("Partition 1000 NA count: %d\n", count)as_query q;as_query_init(&q, "test", "profiles");q.no_bins = true;
as_exp_build(filter, as_exp_cmp_eq(as_exp_bin_str("region"), as_exp_str("NA")));
as_policy_query p;as_policy_query_init(&p);p.base.filter_exp = filter;
as_partition_filter pf;as_partition_filter_set_id(&pf, 1000);
int count = 0;aerospike_query_partitions(&as, &err, &p, &q, &pf, count_cb, &count);
printf("Partition 1000 NA count: %d\n", count);as_exp_destroy(filter);as_query_destroy(&q);Statement stmt = new();stmt.SetNamespace("test");stmt.SetSetName("profiles");
QueryPolicy qp = new(){ includeBinData = false, filterExp = Exp.Build( Exp.EQ(Exp.StringBin("region"), Exp.Val("NA")))};
int count = 0;using (RecordSet rs = client.QueryPartitions(qp, stmt, PartitionFilter.Id(1000))){ while (rs.Next()) count++;}Console.WriteLine($"Partition 1000 NA count: {count}");const query = client.query("test", "profiles");query.nobins = true;query.partitions(1000, 1);
const queryPolicy = new Aerospike.QueryPolicy({ filterExpression: exp.eq(exp.binStr("region"), exp.str("NA")),});
let count = 0;const stream = query.foreach(queryPolicy);stream.on("data", () => count++);await new Promise((resolve, reject) => { stream.on("error", reject); stream.on("end", resolve);});console.log(`Partition 1000 NA count: ${count}`);Background queries
Background queries modify records in place on the server. The example below uses a background ops query with a map operation to remove expired ad-tech segments.
Remove expired segments
Remove ad-tech segments whose ttl_epoch has passed. The segments map values are lists [ttl_epoch, category, tier]. Because lists compare element-by-element, removeByValueRange(begin=null, end=[cutoff_ts]) removes all entries whose value list starts with an expired timestamp. This operation is idempotent — re-running it removes nothing if no expired segments remain.
long cutoffTs = 1704067200L; // 2024-01-01T00:00:00Z
Statement stmt = new Statement();stmt.setNamespace("test");stmt.setSetName("profiles");
QueryPolicy queryPolicy = new QueryPolicy();queryPolicy.filterExp = Exp.build( Exp.gt(MapExp.size(Exp.mapBin("segments")), Exp.val(0)));
ExecuteTask task = client.execute(null, stmt, MapOperation.removeByValueRange("segments", null, Value.get(Arrays.asList(cutoffTs)), MapReturnType.NONE));
task.waitTillComplete();from aerospike_helpers.operations import map_operationsfrom aerospike_helpers.expressions import base as exp, map as map_exp
cutoff_ts = 1704067200 # 2024-01-01T00:00:00Z
query = client.query("test", "profiles")
expr = exp.GT(map_exp.MapSize(None, exp.MapBin("segments")), exp.Val(0)).compile()policy = {"expressions": expr}
ops = [ map_operations.map_remove_by_value_range( "segments", None, [cutoff_ts], aerospike.MAP_RETURN_NONE )]query.add_ops(ops)query.execute_background(policy)cutoffTs := 1704067200 // 2024-01-01T00:00:00Z
stmt := as.NewStatement("test", "profiles")
qp := as.NewQueryPolicy()qp.FilterExpression = as.ExpGreater( as.ExpMapSize(as.ExpMapBin("segments")), as.ExpIntVal(0))
task, err := client.QueryExecute(qp, nil, stmt, as.MapRemoveByValueRangeOp("segments", nil, []interface{}{cutoffTs}, as.MapReturnType.NONE))if err != nil { panic(err)}<-task.OnComplete()int64_t cutoff_ts = 1704067200; // 2024-01-01T00:00:00Z
as_query q;as_query_init(&q, "test", "profiles");
as_exp_build(filter, as_exp_cmp_gt( as_exp_map_size(NULL, as_exp_bin_map("segments")), as_exp_int(0)));
as_arraylist cutoff_list;as_arraylist_init(&cutoff_list, 1, 0);as_arraylist_append_int64(&cutoff_list, cutoff_ts);
as_operations ops;as_operations_inita(&ops, 1);as_operations_map_remove_by_value_range( &ops, "segments", NULL, NULL, (as_val*)&cutoff_list, AS_MAP_RETURN_NONE);q.ops = &ops;
as_policy_write wp;as_policy_write_init(&wp);wp.base.filter_exp = filter;
uint64_t query_id = 0;aerospike_query_background(&as, &err, &wp, &q, &query_id);aerospike_query_wait(&as, &err, NULL, &q, query_id, 0);
as_exp_destroy(filter);as_query_destroy(&q);long cutoffTs = 1704067200L; // 2024-01-01T00:00:00Z
Statement stmt = new();stmt.SetNamespace("test");stmt.SetSetName("profiles");
WritePolicy writePolicy = new(){ filterExp = Exp.Build( Exp.GT(MapExp.Size(Exp.MapBin("segments")), Exp.Val(0)))};
ExecuteTask task = client.Execute(writePolicy, stmt, MapOperation.RemoveByValueRange("segments", null, Value.Get(new List<object> { cutoffTs }), MapReturnType.NONE));
task.Wait();const cutoffTs = 1704067200; // 2024-01-01T00:00:00Z
const query = client.query("test", "profiles");
const writePolicy = new Aerospike.WritePolicy({ filterExpression: exp.gt( exp.maps.size(exp.binMap("segments")), exp.int(0) ),});
const ops = [ Aerospike.maps.removeByValueRange("segments", null, [cutoffTs], Aerospike.maps.returnType.NONE),];
const job = await query.operate(ops, writePolicy);await job.waitUntilDone();