Skip to main content
Loading
Version: Graph 2.3.0

Bulk loading for distributed mode

Overviewโ€‹

The guidelines on this page are a framework for efficiently bulk-loading large graph datasets into an Aerospike database using the distributed mode of the Aerospike Graph Service (AGS) bulk loader.

Loading large volumes of graph data is a computationally demanding task. The best way to optimize it is by harnessing the computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool Apache Spark.

Prerequisitesโ€‹

  • An Aerospike Database instance running on your cloud service.

  • A cloud bucket with write access for data processing.

  • Source data files stored in cloud buckets for edges and vertices in the Gremlin CSV format.

  • The bulk loader JAR file, stored in a cloud bucket.

  • An AGS properties file, stored in a cloud bucket.

The properties fileโ€‹

The AGS properties file contains configuration information for AGS, such as the network address of the Aerospike Database. To use the bulk loader, make a copy of your config.properties file, append the relevant bulk loader configuration properties, and save it in the same cloud bucket with your source data files and bulk loader JAR file.

The following table lists some required and optional bulk loader configuration options.

Configuration keyRequired?DefaultDescription
aerospike.graphloader.edgesyesnoneURI path to edge CSVs. On GCP: gs://PATH_TO_BUCKET/edges/ On AWS: s3://PATH_TO_BUCKET/edges/
aerospike.graphloader.verticesyesnoneURI path to vertex CSVs. On GCP: gs://PATH_TO_BUCKET/vertices/ On AWS: s3://PATH_TO_BUCKET/vertices/
aerospike.graphloader.sampling-percentageno0Percentage of loaded elements to read back after writing to verify successful bulk loading when the -validate_output_data flag is enabled.
aerospike.graphloader.temp-directoryyesnoneURI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must verify that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files. On GCP: gs://PATH_TO_BUCKET/temp/ On AWS: s3://PATH_TO_BUCKET/temp/

The aerospike.graphloader.edges and aerospike.graphloader.vertices options specify the location of your edge and vertex source data files. Verify that the bulk loader process has access to the cloud bucket containing your source files.

For a full description of the properties file and a list of configuration options, see configuration options.

Specifying the location of the properties fileโ€‹

Use the aerospike.graphloader.config option in your Spark command to specify the location of your AGS properties file.

note

The aerospike.graphloader.config option may be abbreviated as -c.

Cloud-specific optionsโ€‹

The bulk loader supports cloud-specific authentication options for cloud services that require credentials to run a Spark job.

info

The cloud-specific configuration options must be included as part of the bulk loader execution command. They cannot be part of the AGS properties file.

NameDescription
aerospike.graphloader.remote-userOn AWS: your AWS_ACCESS_KEY_ID value. On GCP: your key file private_key_id value.
aerospike.graphloader.remote-passkeyOn AWS: your AWS_SECRET_ACCESS_KEY value. On GCP: your key file private_key value.
aerospike.graphloader.gcs-emailOn GCP: your key file client_email value.

When using Google Cloud Storage (GCS) for source data files, you must configure a GCS Service Account. Your credential information can be found in the JSON-generated key file for the GCS Service Account.

Cloud CLI proceduresโ€‹

Select a cloud provider from the following tabs.

Loading with Amazon Web Services (AWS) CLIโ€‹

  • Install the AWS CLI tool if you don't already have it.

  • Your default CLI profile must have sufficient permissions to create Amazon EC2 resources.

    Launch an Elastic Map Reduce cluster

    The following shell script sets all the environment variables and file paths needed to launch an Elastic Map Reduce (EMR) cluster and run an AGS bulk data loading job.

    When you edit the script verify that all configuration elements are correct for your environment.

  • The S3 bucket must already exist.

  • The bulk loader JAR file and properties file must be present in the specified locations.

  • Verify that the source data files are correctly specified in the properties file and accessible by your default CLI profile.

    Save the edited script in a file such as launch-cluster.sh and run it at the command line.

    #!/bin/bash

    CLUSTER_NAME="Aerospike AWS Graph Cluster"
    EMR_RELEASE="emr-6.15.0"

    # Application logs are saved here. Edit this and all other S3 bucket names
    # to match your S3 setup.
    LOG_URI="s3://my_bucket/logs/"

    SPARK_JOB_NAME="Aerospike Graph AWS Spark Job"
    SPARK_CLASS="com.aerospike.firefly.bulkloader.SparkBulkLoaderMain"

    # Update SPARK_JAR with your S3 bucket and the current bulk loader .jar file
    SPARK_JAR="s3://my_bucket/jars/aerospike-graph-bulk-loader-x.y.z.jar"

    # Add more Spark arguments as needed. See the main bulk loading
    # documentation page for a list of available Spark flags.
    SPARK_ARGS="-c,s3://my_bucket/configs/bulk-loader.properties"

    AWS_REGION="us-west-1"

    # Use the same subnet ID and AWS region for the EMR cluster and
    # the Aerospike Database cluster for optimal performance.
    #
    # Note: if you create your Aerospike Database cluster with Aerolab,
    # the AWS network and security group information is shown at the time of creation.
    SUBNET_ID="subnet-##############" # Edit with your subnet ID
    SECURITY_GROUP="sg-##############" # Edit with your security group ID

    # Use Java 11.
    CONFIGURATIONS='[{"Classification":"hadoop-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-defaults","Properties":{"spark.executorEnv.JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}]'

    # Create the EMR cluster.
    echo "Creating EMR cluster..."
    CLUSTER_ID=$(aws emr create-cluster \
    --name "$CLUSTER_NAME" \
    --release-label "$EMR_RELEASE" \
    --applications Name=Spark \
    --log-uri "$LOG_URI" \
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 4 \
    --ec2-attributes SubnetId="$SUBNET_ID",EmrManagedSlaveSecurityGroup="$SECURITY_GROUP",EmrManagedMasterSecurityGroup="$SECURITY_GROUP" \
    --configurations "$CONFIGURATIONS" \
    --query 'ClusterId' \
    --region "$AWS_REGION" \
    --output text)

    # The cluster ID may be needed later for operational purposes, such as
    # deactivating the cluster.
    echo "Cluster ID: $CLUSTER_ID"

    # Start the Spark job.
    echo "Adding Spark job step..."
    STEP_ID=$(aws emr add-steps --cluster-id "$CLUSTER_ID" \
    --steps Type=Spark,Name="$SPARK_JOB_NAME",ActionOnFailure=CONTINUE,Args=[--class,"$SPARK_CLASS","$SPARK_JAR",$SPARK_ARGS] \
    --query 'StepIds[0]' \
    --output text \
    --region "$AWS_REGION")
    echo "Step ID: $STEP_ID"

    Loggingโ€‹

    Log files for the bulk loading job are saved to the location specified in the LOG_URI variable.

Bulk data loading job stages and stepsโ€‹

A bulk data loading job consists of three stages:

  • Prep
  • Partially loaded
  • Complete

Within these three stages are several steps:

StageStepDescription
PrepResuming jobFirst step when resuming a previously started job.
PrepRead vertex/edge filesCheck directory structure.
PrepPreflight checkVerify that the CSV format is properly formatted and parsable.
PrepTemporary data writingIntermediate transformative step to generate data for efficient writing of graph elements to the database.
Partiallyย writtenSupernode extractionDetect supernodes in the dataset to properly write them.
Partially writtenVertex writingWrite vertices to the database.
Partially writtenVertex validationValidate accuracy of written vertices using graph traversal queries.
Partially writtenEdge writingWrite edges to the database.
Partially writtenEdge validationValidate accuracy of written edges using graph traversal queries.
CompleteN/AJob has successfully been completed, vertices and edges are loaded, resume job information is removed.

If a loading job fails at the Partially written stage due to an error, you can restart the job at the point at which it failed.

Incremental data loadingโ€‹

In addition to creating new graphs with the bulk loader, you can also add data to an existing graph. Incremental data loading includes the following use cases:

  • Add vertices to an existing graph.
  • Add vertices and edges with no connections to the existing dataset.
  • Add new vertices and edges with connections to vertices in the existing dataset. Vertex IDs in the existing dataset are provided in the incremental data load CSV files.
  • Add edges to new and existing vertices. Edge IDs in the existing dataset are provided in the incremental data load CSV files.
  • Update properties of existing vertices.

To load data incrementally, add the -incremental_load flag to the submit spark command for your cloud service.

Vertex insertionโ€‹

To perform vertex insertions on an incremental dataset, the bulk loader uses the mergeV TinkerPop step.

  • The bulk loader merges vertices based on the ~id field.

  • If a vertex with the same ~id value is specified multiple times in the incremental dataset, the final vertex contains all the data of the combined rows. If the incremental dataset contains rows with duplicate ~id fields which have different values for the same properties, the final assigned property value is non-deterministic and may be any of the assigned values.

Edge insertionโ€‹

Edge insertions on an incremental dataset behave the same as a fresh data load. No merging occurs, and all entries in the edge dataset create a valid edge between the specified vertices.

Spark job flagsโ€‹

The following flags are all optional.

ArgumentDescription
-incremental_loadAdd new data to an existing graph.
-validate_input_dataPerform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database.
-verify_output_dataPerform verification of a percentage of loaded elements, specified by aerospike.graphloader.sampling-percentage, by reading them back after loading. The verification process uses a traversal query.
-resumeResume a previously failed job.
-clear_existing_dataDelete all existing data before beginning the new job.