*Source: Photo by Dan Gold on Unsplash*

Aerospike provides several mechanisms for accessing large data sets over parallel streams to match worker throughput in parallel computations. This article explains the key mechanisms, and describes specific schemes for defining data splits and a framework for testing them.

Follow along in the interactive notebook Splitting Large Data Sets for Parallel Processing of Queries.

## Parallel Processing of Large Data Sets

In order to process large data sets, a common scheme is to split the data into partitions and assign a worker task to process each partition. The partitioning scheme must have the following properties:

- The partitions are collectively exhaustive, meaning they cover the entire data set, and mutually exclusive, meaning they do not overlap.
- They are deterministically and efficiently computed.
- They are accessible in an efficient and flexible manner as required by worker tasks, for example, in smaller chunks at a time.

It is also critical for the application or platform to have an efficient mechanism to coordinate with workers and aggregate results to benefit from such a partitioning scheme.

## Data Partitions in Aerospike

Aerospike organizes records in a namespace in 4096 partitions. The partitions are:

- uniformly balanced, meaning they hold approximately the same number of records that are hashed to partitions using the RIPEMD160 hash function, and
- uniformly distributed across cluster nodes, meaning each node has about the same number of partitions. To be precise, each node has about the same number of partitions if the cluster size is a power of two, and more in some nodes otherwise.

All three types of Aerospike indexes - primary, set, and secondary - are partition oriented. That is, they are split by partitions at each node (in releases 5.7+), and queries are processed at each node over individual partitions. A client can request a query to be processed over specific partitions so that multiple client workers can work in parallel. It is easy to see how parallel streams up to the total number of partitions (4096) can be set up for parallel processing data streams.

Pagination is supported with Aerospike queries where the client can process a chunk of records at a time by repeatedly asking for a certain number of records until all records are retrieved.

## Splitting Data Sets Beyond 4096

Many data processing platforms allow more worker tasks than 4096. For example, Spark allows up to 32K worker tasks to run in parallel. Trino allows theoretical concurrency of greater than 4K.

Aerospike allows for data splits larger than 4096 by allowing a partition to be divided into sub-partitions efficiently. The scheme is based on the `digest-modulo`

function that can divide a partition into an arbitrary number of non-overlapping and collectively complete sub-partitions. It involves adding the filter expression `digest % N == i for 0 <= i < N`

, where the `digest`

is the hashed key of the record.

The advantage of the digest-modulo function is that it can be evaluated without reading individual records from the storage device (such as SSDs). Digests of all records are held in the primary index, which resides in memory. Therefore, determining the membership of a digest, and equivalently of the corresponding record, in a sub-partition is fast. Each sub-partition stream needs to read only its records from the potentially slower storage device, although it needs to perform the in-memory digest-modulo evaluation, which is much faster, for all records.

This scheme works for any query as a query uses a primary, set, or secondary index, and the digest information is accessible via all indexes. The primary index directly holds digests of records, whereas the set and secondary indexes hold the primary index location of the record. A lookup provides the digest information.

## Defining and Assigning Splits

The problem can be stated as: How are the splits over a data set defined and assigned to N worker tasks, where N can vary from 1 to any arbitrarily large number. In reality, there would be an upper bound on N on a given platform because of either a platform-defined absolute limit, or the overhead of processing a large number of parallel streams and coordinating across them can negate the benefits.

An Aerospike partition ID varies from 0 to 4095. If a partition is divided into S sub-partitions, each sub-partition is identified by a tuple of partition-id `p`

and sub-partition-id `s`

, and modulo factor `m`

: `(p, s, m)`

. Each of the splits need an assignment of a list of partitions and/or sub-partitions. For example, a split `i`

can have:

`Split i -> [pi1, pi2, pi3, …, (psi1, si1, m), (psi2, si2, m), …]`

It is important to understand what partitions or sub-partitions can be requested in a single Aerospike API call:

- Full partitions and sub-partitions cannot be mixed in a call.
- Full partitions must be consecutive in order, or
`(pstart-id, pcount)`

. - Sub-partitions must be consecutive, belong to consecutive partitions, and use the same modulo factor, or
`(pi, pcount, sstart-id, scount, m)`

.

The goal is to achieve best efficiency with the operations available in the APIs.

We will adhere to these constraints in the following discussion.

## Split Assignment Schemes

We will examine three variations of split assignment.

If N is the requested number of splits:

- At-most N splits (can be fewer), same sized, one API call per split.
- At-least N splits (can be more), same sized, one API call per split.
- Exactly N splits, same sized, up to three API calls per split.

The first two allow specific discrete values of splits to allocate the same amount of data (as partitions or a sub-partition), and choose the closest allowed number of splits that is a factor or multiple of 4096. Each split is processed with one API call.

The third one allows any number of splits with the same sized data assignment of partitions and/or sub-partitions. Each split however may require up to three API calls.

These schemes are described in detail below.

### At-Most N Splits

In this case, the returned splits can be fewer in number, matching the closest lower factor or multiple of 4096.

- Case 1: N < 8192: Full partition assignments
- Returned number of splits F is the closest
*factor*of 4096 that is <= N. - Number of partitions in each split, n: 4096/F
- Partitions in split i:
`(start = i*n, count = n)`

- Returned number of splits F is the closest
- Case 2: N >= 8192: Sub-partition assignment
- Returned number of splits M is the closest
*multiple*of 4096 that is <= N. - Number of sub-partitions or modulo-factor, m: M/4096
- Sub-partition in split i:
`(floor(i/m), i%m, m)`

- Returned number of splits M is the closest

### At-Least N Splits

In this case, the returned splits can be more in number, matching the closest higher factor or multiple of 4096.

- Case 1: N <= 4096: Full partition assignments
- Returned number of splits F is the closest
*factor*of 4096 that is >= N. - Number of partitions in each split, n: 4096/F
- Partitions in split i:
`(start = i*n, count = n)`

- Returned number of splits F is the closest
- Case 2: N >= 8192: Sub-partition assignment
- Returned number of splits M is the closest
*multiple*of 4096 that is >= N. - Number of sub-partitions or modulo-factor, m: M/4096
- Sub-partition in split i:
`(floor(i/m), i%m, m)`

- Returned number of splits M is the closest

### Exactly N Splits

In this case, the exact number of splits of equal sizes are created.

- Each of the 4096 partitions is divided into N sub-partitions, resulting in total 4096 * N sub-partitions,
- Each split is assigned 4096 sub-partitions in the following manner:
- Sub-partitions are enumerated vertically from sub-partition 0 to N-1 in each partition, starting at partition 0 and ending at partition 4095.
- After assigning 4096 consecutive sub-partitions to a split, the next split gets the following 4096 sub-partitions, and so on.

Thus, sub-partitions in a split fall in one or more of the following three groups, each of which can be retrieved using one API call:

- Up to 4095 (including none) consecutive sub-partitions in the starting partition
- Up to 4096 (including none) consecutive full partitions
- Up to 4095 (including none) consecutive sub-partitions in the ending partition

For example, if 3 splits are desired, each split can have 4096/3 or 1365 1/3 partitions. In this scheme, the first split would consist of:

- 0 sub-partitions in partition 0
- 1365 full partitions: 0-1364
- 1 (0th) of 3 sub-partitions in partition 1365

And the next (second) split will consist of:

- 2 (0-1) of 3 sub-partitions in partition 1365
- 1364 full partitions: 1366-2729
- 2 (0-1) of 3 sub-partitions in partition 2730

The algorithm details with code and examples are available in the notebook tutorial.

### Alternative Ways of Splitting

Splits can be assigned in many other ways. For example, the notebook shows two additional examples of the Exactly N Splits scheme, however you can experiment with a different scheme in the notebook or your target environment.

## Parallel Query Framework

The parallel stream processing from the above split assignments can be tested with the following simple framework that is implemented in the notebook tutorial. It can be tweaked to suit the needs of the intended workload and environment.

The test data consists of 100K records (can be changed) of ~1KB size, with a secondary index defined on an integer bin.

### Processing Flow

The processing takes place as follows (tunable parameters are *italicized*):

- Splits assignments are made for the requested
*number of splits*and the desired*split type*. - The desired
*number of workers*(threads) are created. All workers start at the same time to process the splits. Each worker thread does the following in a loop until there are no unprocessed splits available:- - Obtain the
*next scheduled*split. - - Create one or more query requests over the split’s partitions and sub-partitions and process them sequentially.
- - Assign the
*secondary-index query predicate*depending on the requested query type. - - Create the requested
*filter expression*. Append it (with AND) to the sub-partition filter expression if one is being used, otherwise use it separately. - - Process the query with the filter in the requested
*mode*(sync or async).- - Get
*chunk-size*records at a time until all records are retrieved. - - Process the records using the
*stream processing implementation*. The notebook example has CountAndSum processing that:- - Aggregates the number of records in a count by the worker.
- - Aggregates an integer bin value in a sum by the worker.
- - Aggregates count and sum across all workers at the end.

- - Get

- - Obtain the
- Wait for all workers to finish, and output the aggregated results from stream processing.

In the CountAndSum example, the total number of processed records and the sum of the integer bin across all records must be the same for a given query predicate and filter irrespective of the number of splits, split type, number of workers, and processing mode.

A summary of split assignments and worker statistics can be optionally printed.

### Parameters and Variations

**Number of splits**: Any number of splits over the data set may be requested. Example range 1-10K.**Split type**: One of the three variations discussed above can be requested: At-Most N, At-Least N, and Exactly N.**Number of workers**: The desired parallelism in processing, example values range between 1-10K.**Query index type**: Either primary- or secondary-index query can be specified.**Secondary-index predicate**: In case of a secondary-index query, a secondary-index predicate is specified. The default secondary-index predicate is`50001 <= bin1 <= 100000`

.**Filter expression**: An optional filter expression can also be specified. The default filter expression is`bin1 % 2 == 0`

, that is, only even-valued records will be retrieved.**Chunk size**: Or page size for iterative retrieval of records in a split.**Processing mode**: Either sync or async processing mode to process the query results may be selected.**Stream processing**: How records are aggregated or otherwise processed; can be customized by overriding the abstract class StreamProcessing.**Work scheduling**: How splits are assigned to workers; can be customized by overriding the abstract class WorkScheduling.

The notebook illustrates many interesting variations, and you can play with additional ones.

## Use Cases for Fine-Grained Parallelism

Processing speed can benefit from a very high degree of parallelism for a very large data set processed with transforms, aggregations, and updates.

Multiple data sets that need to be joined, and require shuffling subsets across a large number of worker nodes, may not benefit from a very high degree of parallelism. In such cases, the cost of transfer of data in subsequent steps across a large number of worker nodes can limit the benefit of fine-grained retrieval streams. A Cost Based Optimizer (CBO) on the processing platform should be able to determine the best level of parallelism for data access from Aerospike for a given query.

It would be useful to examine simple heuristics for the level of parallelism in complex computations over a data set. In a future post, we will explore the optimal level of parallelism given the potential conflicting goals of throughput, response time, resource cost, and utilization.