Background queries
Jump to the Code block for a combined complete example.
A client application can also issue an asynchronous background query to the database and apply a series of write transaction operations to each record. This is more efficient than a query to retrieve records followed by updates to them for cases where data needs to be manipulated on the client side. Transactional operations are typically more efficient than using Lua UDFs because the server doesn’t need to translate internal objects to another language. Many client libraries also provide an API to poll for the completion of a background query.
Refer to Background Queries for more information.
Setup
The following examples will use the setup and record structure below to illustrate background queries in an Aerospike database.
import aerospikefrom aerospike_helpers import expressions as expfrom aerospike_helpers.operations import operations, expression_operations, map_operationsfrom aerospike import predicates as p
# Define host configurationconfig = { 'hosts': [ ('127.0.0.1', 3000) ]}# Establishes a connection to the serverclient = aerospike.client(config).connect()
The record structure:
Occurred: IntegerReported: IntegerPosted: IntegerReport: Map{ shape: List, summary: String, city: String, state: String, duration: String}Location: GeoJSON
Policies
Background queries can define policies to pass to the executed task.
The following example creates a policy that defines a filter expression looking for records that do not already
have a numShapes
bin.
# Build the expressionexpr = exp.Not(exp.BinExists('numShapes')).compile()
# Create new write policywrite_policy = {'expressions': expr}
Query
Just like basic queries background queries can be run on the primary index or, using a filter, on a secondary index.
Primary index
The following example creates a primary index background query using the Filter Expression defined in the policies example to find all
records without a numShapes
bin, then get the length of the shape
key in the report
map and write that value to a new bin called numShapes
.
# Create the queryquery = client.query('sandbox', 'ufodata')
# Define operationsop_expr = exp.ListSize( None, exp.MapGetByKey(None, aerospike.MAP_RETURN_VALUE, exp.ResultType.LIST, 'shape', exp.MapBin('report'))).compile()
ops = [ expression_operations.expression_write('numShapes', op_expr, aerospike.EXP_WRITE_DEFAULT)]
# Add ops to queryquery.add_ops(ops)
# Execute the queryid = query.execute_background()
Secondary index
The following example uses a secondary index created on the occurred
bin.
asadm -e 'enable; manage sindex create numeric occurred_idx ns sandbox set ufodata bin occurred'
Then creates a secondary index background query using a Filter Expression checking for the existence of a posted
bin on records
that have an occurred
bin value inclusively between 20210101
and 20211231
, that updates the report
map by adding a recent
key with a value of true
.
# Build the expressionexpr = exp.Not(exp.BinExists('posted')).compile()
# Create new write policywrite_policy = {'expressions': expr}
# Create the queryquery = client.query('sandbox', 'ufodata')
# Set index filterquery.where(p.between('occurred', 20210101, 20211231))
# Define operationsops = [ map_operations.map_put('report', 'recent', True)]
# Add ops to queryquery.add_ops(ops)
# Execute the queryid = query.execute_background()
Tracking
Once a background query has been executed, a query status can be obtained to query nodes for task completion.
# Return the query statusjob_info = client.job_info(id, aerospike.JOB_QUERY)
def query_status(x): return{ 0: 'Query not found', 1: 'Query in progress, ' + str(job_info['progress_pct']) + '% complete', 2: 'Query complete, ' + str(job_info['records_read']) + ' records read' }[x]
print(query_status(job_info['status']))
# Close the connection to the serverclient.close()
Code block
Expand this section for a single code block to execute a background query
import aerospikefrom aerospike_helpers import expressions as expfrom aerospike_helpers.operations import operations, expression_operations, map_operationsfrom aerospike import predicates as p
# Define host configurationconfig = { 'hosts': [ ('127.0.0.1', 3000) ]}# Establishes a connection to the serverclient = aerospike.client(config).connect()
# Build the expressionexpr = exp.Not(exp.BinExists('numShapes')).compile()
# Create new write policywrite_policy = {'expressions': expr}
# Create the queryquery = client.query('sandbox', 'ufodata')
# Define operationsop_expr = exp.ListSize( None, exp.MapGetByKey(None, aerospike.MAP_RETURN_VALUE, exp.ResultType.LIST, 'shape', exp.MapBin('report'))).compile()
ops = [ expression_operations.expression_write('numShapes', op_expr, aerospike.EXP_WRITE_DEFAULT)]
# Add ops to queryquery.add_ops(ops)
# Execute the queryid = query.execute_background()
# Return the query statusjob_info = client.job_info(id, aerospike.JOB_QUERY)
def query_status(x): return{ 0: 'Query not found', 1: 'Query in progress, ' + str(job_info['progress_pct']) + '% complete', 2: 'Query complete, ' + str(job_info['records_read']) + ' records read' }[x]
print(query_status(job_info['status']))
# Close the connection to the serverclient.close()