Skip to content
Visit booth 3171 at Google Cloud Next to see how to unlock real-time decisions at scaleMore info

Primary index

Jump to the Code block for a combined complete example.

Basic PI queries have the following features:

  • Filter records by set name.
  • Filter records by Filter Expressions.
  • Limit the number of records returned, useful for pagination.
  • Return only record digests and metadata (generation and TTL).
  • Return specified bins.

Setup

The following examples will use the setup and record structure below to illustrate primary index queries in an Aerospike database.

const Aerospike = require('aerospike');
const exp = Aerospike.exp;
// Define host configuration
let config = {hosts: '127.0.0.1:3000'};

The record structure:

Occurred: Integer
Reported: Integer
Posted: Integer
Report: Map
{
shape: List,
summary: String,
city: String,
state: String,
duration: String
}
Location: GeoJSON

Policies

See Basic Queries for query policy information.

Query a set

The following example queries the sandbox namespace and ufodata set name, while limiting the record set to only 20 records.

// Set max records to return
let maxRecords = 20;
let i = 0;
;(async () => {
// Establishes a connection to the server
let client = await Aerospike.connect(config);
// Create query
let query = client.query('sandbox', 'ufodata');
// Execute the query
let recordStream = await query.foreach();
recordStream.on('error', err => {
// Handle error
});
recordStream.on('data', record => {
// Do something
i++;
console.info('Key: %o | Record: %o\\n', record.key.key, record.bins);
if(i === maxRecords){
recordStream.abort();
}
});
recordStream.on('end', () => {
// Close the connection to the server
client.close();
});
})();

Query with a metadata filter

The following example queries the same namespace and set as the example above, but also adds a metadata Filter Expression that will only return records that are greater than 16 KiB.

// Create query policy
let queryPolicy = new Aerospike.QueryPolicy({
filterExpression: exp.gt(exp.deviceSize(), exp.int(1024 * 16))
});
;(async () => {
// Establishes a connection to the server
let client = await Aerospike.connect(config);
// Create query
let query = client.query('sandbox', 'ufodata');
// Execute the query
let recordStream = await query.foreach(queryPolicy);
recordStream.on('error', err => {
// Handle error
});
recordStream.on('data', record => {
// Do something
console.info('Key: %o | Record: %o\\n', record.key.key, record.bins);
});
recordStream.on('end', () => {
// Close the connection to the server
client.close();
});
})();

Query with a data filter

The following example queries the same namespace and set as the example above, but also adds a data Filter Expression that will only return records where the occurred bin value is in the inclusive range 20200101 to 20211231.

Take a look at secondary index queries to see how this same query can be run more efficiently with an index.

// Create query policy
let queryPolicy = new Aerospike.QueryPolicy({
filterExpression: exp.let(
exp.def('bin', exp.binInt('occurred')),
exp.and(
exp.ge(exp.var('bin'), exp.int(20210101)),
exp.le(exp.var('bin'), exp.int(20211231))
)
)
});
;(async () => {
// Establishes a connection to the server
let client = await Aerospike.connect(config);
// Create query
let query = client.query('sandbox', 'ufodata');
// Execute the query
let recordStream = await query.foreach(queryPolicy);
recordStream.on('error', err => {
// Handle error
});
recordStream.on('data', record => {
// Do something
console.info('Key: %o | Record: %o\\n', record.key.key, record.bins);
});
recordStream.on('end', () => {
// Close the connection to the server
client.close();
});
})();

Pagination

Pagination uses a combination of a partition filter and a defined maximum records to return query results. The partition filter maintains a cursor identifying the end of the current page and the beginning of the next. Moving to the next page of results is as simple as executing the query again, with the previously defined partition filter.

The following example executes a query with an Expression Filter identifying records with more than 3 shape items in the report map, returning 10 records per page. The partition filter is set to query all 4096 partitions in the database.

;(async () => {
// Establishes a connection to the server
let client = await Aerospike.connect(config);
// Create the expression
const expr = Aerospike.exp.gt(
Aerospike.exp.lists.size(
Aerospike.exp.maps.getByKey(Aerospike.exp.binMap('report'), 'shape', Aerospike.exp.type.LIST, Aerospike.maps.returnType.VALUE)
),
3
)
// Create the policy
const queryPolicy = {'filterExpression' : expr}
// Create the query
let query = client.query('test', 'demo')
// Set max records
query.maxRecords = 10
// Set paginate
query.paginate = true
// Execute the query
try {
do {
// execute the query
let stream = query.foreach()
stream.on('error', (error) => { throw error })
stream.on('data', (record) => {
// Do something
console.log(record.bins)
})
await new Promise(resolve => {
stream.on('end', (queryState) => {
// Save the queryState
query.nextPage(queryState)
resolve()
})
})
// Check to see if any pages are left
} while (query.hasNextPage())
} catch (error) {
console.error('An error occurred at some point.', error)
process.exit(1)
} finally {
// Close the connection to the server
if (client) client.close()
}
})();

Code block

Expand this section for a single code block to execute a basic PI query
const Aerospike = require('aerospike');
const exp = Aerospike.exp;
// Define host configuration
let config = {hosts: '127.0.0.1:3000'};
// Create query policy
let queryPolicy = new Aerospike.QueryPolicy({
filterExpression: exp.let(
exp.def('bin', exp.binInt('occurred')),
exp.and(
exp.ge(exp.var('bin'), exp.int(20210101)),
exp.le(exp.var('bin'), exp.int(20211231))
)
)
});
;(async () => {
// Establishes a connection to the server
let client = await Aerospike.connect(config);
// Create query
let query = client.query('sandbox', 'ufodata');
// Execute the query
let recordStream = await query.foreach(queryPolicy);
recordStream.on('error', err => {
// Handle error
});
recordStream.on('data', record => {
// Do something
console.info('Key: %o | Record: %o\\n', record.key.key, record.bins);
});
recordStream.on('end', () => {
// Close the connection to the server
client.close();
});
})();
Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?