Skip to main content
Loading
Version: Graph 2.2.0

Bulk loading for distributed mode

Overviewโ€‹

The guidelines on this page are a framework for efficiently bulk-loading large graph datasets into an Aerospike database using the distributed mode of the Aerospike Graph Service (AGS) bulk loader.

Loading large volumes of graph data is a computationally demanding task. It can be best optimized by harnessing the extensive computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool Apache Spark.

Architectural overviewโ€‹

The distributed mode of the AGS bulk loader is a Java executable, distributed as a JAR file. The bulk loader runs as part of a Spark job, which distributes the workload across a cluster of servers.

The bulk loader reads the source data files and loads them using configuration information stored in the AGS properties file. After the Spark job finishes, you can query the edges and vertices of your dataset with AGS.

Elements of the bulk loaderโ€‹

All elements of the bulk loader must reside in the same cloud network and be able to access one another. Those elements include:

  • the JAR file
  • the Aerospike Database cluster
  • the source data files
  • the properties file, and
  • the Spark cluster

Prerequisitesโ€‹

  • An Aerospike Database instance running on your cloud service.

  • Source data files stored in cloud buckets for edges and vertices in the Gremlin CSV format.

  • The bulk loader JAR file, stored in a cloud bucket.

  • An AGS properties file, stored in a cloud bucket.

Source data filesโ€‹

The bulk loader accepts data files in the Gremlin CSV format, with vertices and edges specified in separate files. All CSV files must have header information with names for each column of data.

note

AGS does not support user-provided ~id values for edges, so the ~id column is optional for edge CSV files. If your CSV file contains an ~id column, the values are ignored.

Data files must be stored in directories specified by the aerospike.graphloader.vertices and aerospike.graphloader.edges configuration options.

  • The directory specified in aerospike.graphloader.vertices must contain one or more subdirectories of vertex CSV files.

  • The directory specified in aerospike.graphloader.edges must contain one or more subdirectories of edge CSV files.

  • Each subdirectory should be named for the label of the data files it contains. For example, a subdirectory of vertex files containing data about people should be named people. A subdirectory of edge files containing data about connections in the people vertices, in which each row has the knows label, should be named knows.

For example, if your cloud bucket is named myBucket, that bucket must contain separate directories for edge and vertex data files, and those directories must contain subdirectories for the CSV files. If aerospike.graphloader.vertices is set to gs://MY_BUCKET/vertices, you might have subdirectories named gs://MY_BUCKET/vertices/people and gs://MY_BUCKET/vertices/places, each containing one or more CSV files.

Example cloud directory structure:

/MY_BUCKET 
|
---- /MY_BUCKET/vertices/
|
-------- /MY_BUCKET/vertices/people/
|
------------ /MY_BUCKET/vertices/people/vert_file1.csv
------------ /MY_BUCKET/vertices/people/vert_file2.csv
|
-------- /MY_BUCKET/vertices/places/
|
------------ /MY_BUCKET/vertices/places/vert_file3.csv
------------ /MY_BUCKET/vertices/places/vert_file4.csv
|
---- /MY_BUCKET/edges/
|
-------- /MY_BUCKET/edges/worksWith/
|
------------ /MY_BUCKET/edges/worksWith/edge_file1.csv
------------ /MY_BUCKET/edges/worksWith/edge_file2.csv
|
-------- /MY_BUCKET/edges/knows/
|
------------ /MY_BUCKET/edges/knows/edge_file3.csv
------------ /MY_BUCKET/edges/knows/edge_file4.csv
|
---- /MY_BUCKET/configs/bulk-loader.properties
---- /MY_BUCKET/logs

The properties fileโ€‹

The AGS properties file contains configuration information for AGS, such as the network address of the Aerospike Database. To use the bulk loader, make a copy of your config.properties file, append the relevant bulk loader configuration properties, and save it in the same cloud bucket with your source data files and bulk loader JAR file.

The following table lists some required and optional bulk loader configuration options.

Configuration keyRequired?DefaultDescription
aerospike.graphloader.edgesyesnoneURI path to edge CSVs. On GCP: gs://PATH_TO_BUCKET/edges/ On AWS: s3://PATH_TO_BUCKET/edges/
aerospike.graphloader.verticesyesnoneURI path to vertex CSVs. On GCP: gs://PATH_TO_BUCKET/vertices/ On AWS: s3://PATH_TO_BUCKET/vertices/
aerospike.graphloader.sampling-percentageno0Percentage of loaded elements to read back after writing to verify successful bulk loading when the -validate_output_data flag is enabled.
aerospike.graphloader.temp-directoryyesnoneURI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must verify that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files. On GCP: gs://PATH_TO_BUCKET/temp/ On AWS: s3://PATH_TO_BUCKET/temp/

The aerospike.graphloader.edges and aerospike.graphloader.vertices options specify the location of your edge and vertex source data files. Verify that the bulk loader process has access to the cloud bucket containing your source files.

For a full description of the properties file and a list of configuration options, see configuration options.

Specifying the location of the properties fileโ€‹

Use the -aerospike.graphloader.config option in your Spark command to specify the location of your AGS properties file.

note

The -aerospike.graphloader.config option may be abbreviated as -c.

Cloud-specific optionsโ€‹

The bulk loader supports cloud-specific authentication options for cloud services that require credentials to run a Spark job.

info

The cloud-specific configuration options must be included as part of the bulk loader execution command. They cannot be part of the AGS properties file.

NameDescription
aerospike.graphloader.remote-userOn AWS: your AWS_ACCESS_KEY_ID value. On GCP: your key file private_key_id value.
aerospike.graphloader.remote-passkeyOn AWS: your AWS_SECRET_ACCESS_KEY value. On GCP: your key file private_key value.
aerospike.graphloader.gcs-emailOn GCP: your key file client_email value.

When using Google Cloud Storage (GCS) for source data files, you must configure a GCS Service Account. Your credential information can be found in the JSON-generated key file for the GCS Service Account.

Cloud CLI proceduresโ€‹

Select a cloud provider from the following tabs.

Loading with Amazon Web Services (AWS) CLIโ€‹

  • Install the AWS CLI tool if you don't already have it.

  • Your default CLI profile must have sufficient permissions to create Amazon EC2 resources.

    Launch an Elastic Map Reduce cluster

    The following shell script sets all the environment variables and file paths needed to launch an Elastic Map Reduce (EMR) cluster and run an AGS bulk data loading job.

    When you edit the script verify that all configuration elements are correct for your environment.

  • The S3 bucket must already exist.

  • The bulk loader JAR file and properties file must be present in the specified locations.

  • Verify that the source data files are correctly specified in the properties file and accessible by your default CLI profile.

    Save the edited script in a file such as launch-cluster.sh and run it at the command line.

    #!/bin/bash

    CLUSTER_NAME="Aerospike AWS Graph Cluster"
    EMR_RELEASE="emr-6.15.0"

    # Application logs are saved here. Edit this and all other S3 bucket names
    # to match your S3 setup.
    LOG_URI="s3://my_bucket/logs/"

    SPARK_JOB_NAME="Aerospike Graph AWS Spark Job"
    SPARK_CLASS="com.aerospike.firefly.bulkloader.SparkBulkLoaderMain"

    # Update SPARK_JAR with your S3 bucket and the current bulk loader .jar file
    SPARK_JAR="s3://my_bucket/jars/aerospike-graph-bulk-loader-x.y.z.jar"

    # Add more Spark arguments as needed. See the main bulk loading
    # documentation page for a list of available Spark flags.
    SPARK_ARGS="-c,s3://my_bucket/configs/bulk-loader.properties"

    AWS_REGION="us-west-1"

    # Use the same subnet ID and AWS region for the EMR cluster and
    # the Aerospike Database cluster for optimal performance.
    #
    # Note: if you create your Aerospike Database cluster with Aerolab,
    # the AWS network and security group information is shown at the time of creation.
    SUBNET_ID="subnet-##############" # Edit with your subnet ID
    SECURITY_GROUP="sg-##############" # Edit with your security group ID

    # Use Java 11.
    CONFIGURATIONS='[{"Classification":"hadoop-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-defaults","Properties":{"spark.executorEnv.JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}]'

    # Create the EMR cluster.
    echo "Creating EMR cluster..."
    CLUSTER_ID=$(aws emr create-cluster \
    --name "$CLUSTER_NAME" \
    --release-label "$EMR_RELEASE" \
    --applications Name=Spark \
    --log-uri "$LOG_URI" \
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 4 \
    --ec2-attributes SubnetId="$SUBNET_ID",EmrManagedSlaveSecurityGroup="$SECURITY_GROUP",EmrManagedMasterSecurityGroup="$SECURITY_GROUP" \
    --configurations "$CONFIGURATIONS" \
    --query 'ClusterId' \
    --region "$AWS_REGION" \
    --output text)

    # The cluster ID may be needed later for operational purposes, such as
    # deactivating the cluster.
    echo "Cluster ID: $CLUSTER_ID"

    # Start the Spark job.
    echo "Adding Spark job step..."
    STEP_ID=$(aws emr add-steps --cluster-id "$CLUSTER_ID" \
    --steps Type=Spark,Name="$SPARK_JOB_NAME",ActionOnFailure=CONTINUE,Args=[--class,"$SPARK_CLASS","$SPARK_JAR",$SPARK_ARGS] \
    --query 'StepIds[0]' \
    --output text \
    --region "$AWS_REGION")
    echo "Step ID: $STEP_ID"

    Loggingโ€‹

    Log files for the bulk loading job are saved to the location specified in the LOG_URI variable.

Bulk data loading job stages and stepsโ€‹

A bulk data loading job consists of three stages:

  • Prep
  • Partially loaded
  • Complete

Within these three stages are several steps:

StageStepDescription
PrepRead vertex/edge filesCheck directory structure.
PrepPreflight checkVerify that the CSV format is properly formatted and parsable.
PrepTemporary data writingIntermediate transformative step to generate data for efficient writing of graph elements to the database.
Partiallyย writtenSupernode extractionDetect supernodes in the dataset to properly write them.
Partially writtenVertex writingWrite vertices to the database.
Partially writtenVertex validationValidate accuracy of written vertices using graph traversal queries.
Partially writtenEdge writingWrite edges to the database.
Partially writtenEdge validationValidate accuracy of written edges using graph traversal queries.
CompleteN/AJob has successfully been completed, vertices and edges are loaded.

Incremental data loadingโ€‹

Incremental data loading allows you to:

  • Add vertices to an existing graph.
  • Add edges to new and existing vertices.
  • Update properties of existing vertices.

To load data incrementally, add the -incremental_load flag to the submit spark command for your cloud service.

Spark job flagsโ€‹

The following flags are all optional.

ArgumentDescription
-incremental_loadAdd new data to an existing graph.
-validate_input_dataPerform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database.
-verify_output_dataPerform verification of a percentage of loaded elements, specified by aerospike.graphloader.sampling-percentage, by reading them back after loading. The verification process uses a traversal query.
-clear_existing_dataDelete all existing data before beginning the new job.