# Bulk loading for distributed mode

## Overview

The guidelines on this page are a framework for efficiently bulk-loading large graph datasets into an Aerospike database using the distributed mode of the Aerospike Graph Service (AGS) bulk loader.

Loading large volumes of graph data is a computationally demanding task. The best way to optimize it is by harnessing the computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool [Apache Spark](https://spark.apache.org/).

## Prerequisites

-   An Aerospike Database instance running on your cloud service.
    
    -   See [Deploying on GCP](https://aerospike.com/docs/database/install/gcp/install/) for help with GCP.
    -   See [Deploying on AWS](https://aerospike.com/docs/database/install/aws/install/) for help with AWS.
-   A cloud bucket with write access for data processing.
    
-   Source data files stored in cloud buckets for edges and vertices in the [Gremlin CSV format](https://aerospike.com/docs/graph/2.5.0/develop/data-loading/csv-format).
    
-   The bulk loader [JAR file](https://aerospike.com/download/?software=graph-loader), stored in a cloud bucket.
    
-   An AGS properties file, stored in a cloud bucket.
    

### The properties file

The AGS properties file contains configuration information for AGS, such as the network address of the Aerospike Database. To use the bulk loader, make a copy of your [`config.properties`](https://aerospike.com/docs/graph/reference/config) file, append the relevant bulk loader configuration properties, and save it in the same cloud bucket with your source data files and bulk loader JAR file.

The following table lists some required and optional bulk loader configuration options.

| Configuration key | Required? | Default | Description |
| --- | --- | --- | --- |
| `aerospike.graphloader.edges` | yes | none | URI path to edge CSVs. On GCP: `gs://PATH_TO_BUCKET/edges/` On AWS: `s3://PATH_TO_BUCKET/edges/` |
| `aerospike.graphloader.vertices` | yes | none | URI path to vertex CSVs. On GCP: `gs://PATH_TO_BUCKET/vertices/` On AWS: `s3://PATH_TO_BUCKET/vertices/` |
| `aerospike.graphloader.sampling-percentage` | no | 0 | Percentage of loaded elements to read back after writing to verify successful bulk loading when the `-validate_output_data` flag is enabled. |
| `aerospike.graphloader.temp-directory` | yes | none | URI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must verify that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files. On GCP: `gs://PATH_TO_BUCKET/temp/` On AWS: `s3://PATH_TO_BUCKET/temp/` |

The `aerospike.graphloader.edges` and `aerospike.graphloader.vertices` options specify the location of your edge and vertex source data files. Verify that the bulk loader process has access to the cloud bucket containing your source files.

For a full description of the properties file and a list of configuration options, see [configuration options](https://aerospike.com/docs/graph/reference/config#aerospikegraphloader-options).

### Specifying the location of the properties file

Use the `aerospike.graphloader.config` option in your Spark command to specify the location of your AGS properties file.

::: note
The `aerospike.graphloader.config` option may be abbreviated as `-c`.
:::

### Cloud-specific options

The bulk loader supports cloud-specific authentication options for cloud services that require credentials to run a Spark job.

::: note
The cloud-specific configuration options must be included as part of the bulk loader execution command. They cannot be part of the AGS properties file.
:::

| Name | Description |
| --- | --- |
| `aerospike.graphloader.remote-user` | On AWS: your `AWS_ACCESS_KEY_ID` value. On GCP: your key file `private_key_id` value. |
| `aerospike.graphloader.remote-passkey` | On AWS: your `AWS_SECRET_ACCESS_KEY` value. On GCP: your key file `private_key` value. |
| `aerospike.graphloader.gcs-email` | On GCP: your key file `client_email` value. |

When using Google Cloud Storage (GCS) for source data files, you must configure a GCS Service Account. Your credential information can be found in the JSON-generated key file for the GCS Service Account.

## Cloud CLI procedures

Select a cloud provider from the following tabs.

-   [AWS](#tab-panel-1173)
-   [GCP](#tab-panel-1174)

### Loading with Amazon Web Services (AWS) CLI

-   Install the [AWS CLI tool](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) if you don’t already have it.
-   Your default CLI profile must have sufficient permissions to create Amazon EC2 resources.

\*\* Launch an Elastic Map Reduce cluster \*\*

The following shell script sets all the environment variables and file paths needed to launch an Elastic Map Reduce (EMR) cluster and run an AGS bulk data loading job.

When you edit the script verify that all configuration elements are correct for your environment.

-   The S3 bucket must already exist.
-   The bulk loader JAR file and properties file must be present in the specified locations.
-   Verify that the source data files are correctly specified in the properties file and accessible by your default CLI profile.

Save the edited script in a file such as `launch-cluster.sh` and run it at the command line.

```bash
#!/bin/bash

CLUSTER_NAME="Aerospike AWS Graph Cluster"

EMR_RELEASE="emr-6.15.0"

# Application logs are saved here. Edit this and all other S3 bucket names

# to match your S3 setup.

LOG_URI="s3://my_bucket/logs/"

SPARK_JOB_NAME="Aerospike Graph AWS Spark Job"

SPARK_CLASS="com.aerospike.firefly.bulkloader.SparkBulkLoaderMain"

# Update SPARK_JAR with your S3 bucket and the current bulk loader .jar file

SPARK_JAR="s3://my_bucket/jars/aerospike-graph-bulk-loader-x.y.z.jar"

# Add more Spark arguments as needed. See the main bulk loading

# documentation page for a list of available Spark flags.

SPARK_ARGS="-c,s3://my_bucket/configs/bulk-loader.properties"

AWS_REGION="us-west-1"

# Use the same subnet ID and AWS region for the EMR cluster and

# the Aerospike Database cluster for optimal performance.

#

# Note: if you create your Aerospike Database cluster with Aerolab,

# the AWS network and security group information is shown at the time of creation.

SUBNET_ID="subnet-##############" # Edit with your subnet ID

SECURITY_GROUP="sg-##############" # Edit with your security group ID

# Use Java 11.

CONFIGURATIONS='[{"Classification":"hadoop-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-defaults","Properties":{"spark.executorEnv.JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}]'

# Create the EMR cluster.

echo "Creating EMR cluster..."

CLUSTER_ID=$(aws emr create-cluster \

    --name "$CLUSTER_NAME" \

    --release-label "$EMR_RELEASE" \

    --applications Name=Spark \

    --log-uri "$LOG_URI" \

    --use-default-roles \

    --instance-type m5.xlarge \

    --instance-count 4 \

    --ec2-attributes SubnetId="$SUBNET_ID",EmrManagedSlaveSecurityGroup="$SECURITY_GROUP",EmrManagedMasterSecurityGroup="$SECURITY_GROUP" \

    --configurations "$CONFIGURATIONS" \

    --query 'ClusterId' \

    --region "$AWS_REGION" \

    --output text)

# The cluster ID may be needed later for operational purposes, such as

# deactivating the cluster.

echo "Cluster ID: $CLUSTER_ID"

# Start the Spark job.

echo "Adding Spark job step..."

STEP_ID=$(aws emr add-steps --cluster-id "$CLUSTER_ID" \

    --steps Type=Spark,Name="$SPARK_JOB_NAME",ActionOnFailure=CONTINUE,Args=[--class,"$SPARK_CLASS","$SPARK_JAR",$SPARK_ARGS] \

    --query 'StepIds[0]' \

    --output text \

    --region "$AWS_REGION")

echo "Step ID: $STEP_ID"
```

## Logging

Log files for the bulk loading job are saved to the location specified in the `LOG_URI` variable.

### Loading with the `gcloud` command-line interface (CLI)

1.  Install the `gcloud` CLI if you don’t already have it.
    
2.  Use the `gcloud init` command to authorize commands on your GCP account.
    

The following shell script sets all the environment variables and file paths needed to launch a Spark Job on GCP run an AGS bulk data loading job.

Edit the script carefully to ensure that all the configuration elements are correct for your environment. The GCP bucket must already exist, and the bulk loader JAR file and properties file must be present in the specified locations. Ensure that the source data files are correctly specified in the properties file and accessible by your default CLI profile.

Save the edited script in a file such as `launch-cluster.sh` and run it at the command line when ready.

```bash
#!/bin/bash

# Edit all these variable to match your GCP environment.

# Ensure that the bulk loader .jar file is correctly named and

# accessible by your CLI profile.

dataproc_name="testcluster"

region=us-central1

zone=us-central1-a

instance_type=n2d-highmem-8

num_workers=8

project=my-project

bulk_jar_uri="gs://my_bucket/aerospike-graph-bulk-loader-x.y.z.jar"

properties_file_uri="gs://my_bucket/bulk-loader.properties"

# Execute the dataproc command

gcloud dataproc clusters create "$dataproc_name" \

    --enable-component-gateway \

    --region $region \

    --zone $zone \

    --master-machine-type "$instance_type" \

    --master-boot-disk-type pd-ssd \

    --master-boot-disk-size 500 \

    --num-workers "$num_workers" \

    --worker-machine-type "$instance_type" \

    --worker-boot-disk-type pd-ssd \

    --worker-boot-disk-size 500 \

    --image-version 2.1-debian11 \

    --properties spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \

    --project $project

gcloud dataproc jobs submit spark \

    --class=com.aerospike.firefly.bulkloader.SparkBulkLoader \

    --jars="$bulk_jar_uri" \

    --cluster="$dataproc_name" \

    --region="$region" \

    -- -c "$properties_file_uri"
```

## Bulk data loading job stages and steps

A bulk data loading job consists of three stages:

-   Prep
-   Partially loaded
-   Complete

Within these three stages are several steps:

| Stage | Step | Description |
| --- | --- | --- |
| Prep | Resuming job | First step when resuming a previously started job. |
| Prep | Read vertex/edge files | Check directory structure. |
| Prep | Preflight check | Verify that the CSV format is properly formatted and parsable. |
| Prep | Temporary data writing | Intermediate transformative step to generate data for efficient writing of graph elements to the database. |
| Partially written | Supernode extraction | Detect [supernodes](https://aerospike.com/docs/graph/2.5.0/develop/query/supernodes) in the dataset to properly write them. |
| Partially written | Vertex writing | Write vertices to the database. |
| Partially written | Vertex validation | Validate accuracy of written vertices using graph traversal queries. |
| Partially written | Edge writing | Write edges to the database. |
| Partially written | Edge validation | Validate accuracy of written edges using graph traversal queries. |
| Complete | N/A | Job has successfully been completed, vertices and edges are loaded, resume job information is removed. |

If a loading job fails at the _partially written_ stage due to an error, you can [restart](https://aerospike.com/docs/graph/2.5.0/develop/data-loading/restart) the job at the point at which it failed.

## Incremental data loading

In addition to creating new graphs with the bulk loader, you can also add data to an existing graph. Incremental data loading includes the following use cases:

-   Add vertices to an existing graph.
-   Add vertices and edges with no connections to the existing dataset.
-   Add new vertices and edges with connections to vertices in the existing dataset. Vertex IDs in the existing dataset are provided in the incremental data load CSV files.
-   Add edges to new and existing vertices. Edge IDs in the existing dataset are provided in the incremental data load CSV files.
-   Update properties of existing vertices.

To load data incrementally, add the `-incremental_load` flag to the `submit spark` command for your cloud service.

### Vertex insertion

To perform vertex insertions on an incremental dataset, the bulk loader uses the [`mergeV`](https://tinkerpop.apache.org/docs/current/reference/#mergevertex-step) TinkerPop step.

-   The bulk loader merges vertices based on the `~id` field.
    
-   If a vertex with the same `~id` value is specified multiple times in the incremental dataset, the final vertex contains all the data of the combined rows. If the incremental dataset contains rows with duplicate `~id` fields which have different values for the same properties, the final assigned property value is non-deterministic and may be any of the assigned values.
    

### Edge insertion

Edge insertions on an incremental dataset behave the same as a fresh data load. No merging occurs, and all entries in the edge dataset create a valid edge between the specified vertices.

## Spark job flags

The following flags are all optional.

| Argument | Description |
| --- | --- |
| `-incremental_load` | Add new data to an existing graph. |
| `-validate_input_data` | Perform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database. |
| `-verify_output_data` | Perform verification of a percentage of loaded elements, specified by `aerospike.graphloader.sampling-percentage`, by reading them back after loading. The verification process uses a traversal query. |
| `-resume` | [Resume](https://aerospike.com/docs/graph/2.5.0/develop/data-loading/restart) a previously failed job. |
| `-clear_existing_data` | Delete all existing data before beginning the new job. |