Skip to main content
Loading
Version: Graph 2.2.0

Bulk loading for distributed mode

Overview

The guidelines on this page are a framework for efficiently bulk-loading large graph datasets into an Aerospike database using the distributed mode of the Aerospike Graph Service (AGS) bulk loader.

Loading large volumes of graph data is a computationally demanding task. It can be best optimized by harnessing the extensive computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool Apache Spark.

Architectural overview

The distributed mode of the AGS bulk loader is a Java executable, distributed as a JAR file. The bulk loader runs as part of a Spark job, which distributes the workload across a cluster of servers. The bulk loader reads the source data files and loads them into an Aerospike Database namespace, using configuration information stored in a properties file. After the Spark job finishes, you can query the edges and vertices of your dataset with AGS.

Elements of the bulk loader

All elements of the bulk loader must reside in the same cloud network and be able to access one another. Those elements include:

  • the JAR file
  • the Aerospike Database cluster
  • the source data files
  • the properties file, and
  • the Spark cluster
note

Although this guide uses GCP as an illustrative platform, the overall process applies across other cloud providers or on-premise deployments using Apache Spark.

Prerequisites

  • Source data files stored in Google Cloud buckets for edges and vertices in the Gremlin CSV format.

  • An Aerospike Database instance running on GCP. See Deploying on GCP for help.

  • The bulk loader JAR file, stored in a GCP bucket.

  • An AGS properties file, stored in a GCP bucket.

  • A GCP bucket with write access for data processing.

Source data files

The bulk loader accepts data files in the Gremlin CSV format, with vertices and edges specified in separate files. All CSV files must have header information with names for each column of data.

note

AGS does not support user-provided ~id values for edges, so the ~id column is optional for edge CSV files. If your CSV file contains an ~id column, the values are ignored.

Data files must be stored in directories specified by the aerospike.graphloader.vertices and aerospike.graphloader.edges configuration options.

  • The directory specified in aerospike.graphloader.vertices must contain one or more subdirectories of vertex CSV files.

  • The directory specified in aerospike.graphloader.edges must contain one or more subdirectories of edge CSV files.

  • Each subdirectory should be named for the label of the data files it contains. For example, a subdirectory of vertex files containing data about people should be named people. A subdirectory of edge files containing data about connections in the people vertices, in which each row has the knows label, should be named knows.

For example, if your Google Cloud bucket is named myBucket, that bucket must contain separate directories for edge and vertex data files, and those directories must contain subdirectories for the CSV files. If aerospike.graphloader.vertices is set to gs://MY_BUCKET/vertices, you might have subdirectories named gs://MY_BUCKET/vertices/people and gs://MY_BUCKET/vertices/places, each containing one or more CSV files.

Example directory structure:

/MY_BUCKET 
|
---- /MY_BUCKET/vertices/
|
-------- /MY_BUCKET/vertices/people/
|
------------ /MY_BUCKET/vertices/people/vert_file1.csv
------------ /myBucMY_BUCKETket/vertices/people/vert_file2.csv
|
-------- /MY_BUCKET/vertices/places/
|
------------ /MY_BUCKET/vertices/places/vert_file3.csv
------------ /MY_BUCKET/vertices/places/vert_file4.csv
|
---- /MY_BUCKET/edges/
|
-------- /MY_BUCKET/edges/worksWith/
|
------------ /MY_BUCKET/edges/worksWith/edge_file1.csv
------------ /MY_BUCKET/edges/worksWith/edge_file2.csv
|
-------- /MY_BUCKET/edges/knows/
|
------------ /MY_BUCKET/edges/knows/edge_file3.csv
------------ /MY_BUCKET/edges/knows/edge_file4.csv

The properties file

The AGS properties file contains the necessary details for running the Spark job to load your data files into your Aerospike database. For a full description of the properties file and a list of configuration options, see configuration options.

In addition to the standard .properties file, using the bulk loader requires some additional configuration options.

Configuration keyRequired?DefaultDescription
aerospike.graphloader.edgesyesnoneURI path to Edge CSVs. On GCP: gs://PATH_TO_BUCKET/edges/ On AWS: s3://PATH_TO_BUCKET/edges/
aerospike.graphloader.verticesyesnoneURI path to Vertex CSVs. On GCP: gs://PATH_TO_BUCKET/vertices/ On AWS: s3://PATH_TO_BUCKET/vertices/
aerospike.graphloader.sampling-percentageno0Percentage of loaded elements to read back after writing to verify successful bulk loading when the -validate_output_data flag is enabled.
aerospike.graphloader.temp-directoryyesnoneURI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must ensure that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files.

Additional cloud-specific options

The bulk loader supports additional cloud-specific authentication options if your cloud service requires credentials to run a Spark job. For a complete list, see Cloud storage configuration options.

The following is an example properties file named graph.properties. The commented lines contain options you must configure with your GCP details.

aerospike.client.host = 10.128.0.80:3000,10.128.0.6:3000 # IP addresses and port of your Aerospike database cluster
aerospike.client.namespace = test # Namespace to use for data storage
aerospike.graphloader.edges = gs://PATH_TO_BUCKET/edges/ # Directory containing edge data files
aerospike.graphloader.vertices = gs://PATH_TO_BUCKET/vertices/ # Directory containing vertex data files
aerospike.graphloader.temp-directory = gs://PATH_TO_BUCKET/temp-data/ # Directory for data processing

Procedure for the GCP console

  1. Log in to the GCP console and navigate to the Dataproc section.

  2. Create a Dataproc cluster. Select Cluster on Compute Engine in the modal window.

  3. Configure the cluster to the settings you want. Verify that the Enable component gateway box is checked.

  4. When the cluster is up and running, click the Submit Job link from the Cluster Details screen.

  5. Use the following job configuration options:

    • Job ID: Leave the default or provide a custom job name.

    • Job type: Spark

    • Main class or jar: com.aerospike.firefly.bulkloader.SparkBulkLoaderMain

    • Jar files: Link to the bulk loader JAR file, stored in a GCP bucket. Example: gs://MY_BUCKET/jar-files/aerospike-graph-bulk-loader-2.2.0.jar

    • Arguments: The following argument is required. See Spark job arguments for a full list of arguments.

      -c gs://MY_BUCKET/PATH_TO_PROPERTIES_FILE/bulk-loader.properties    

      Replace the string after -c with the path to your properties file, stored in a GCP bucket.

      See the following table for descriptions of the other arguments.

  6. Click the SUBMIT button when your Spark job is ready.

To view the output from your Spark job, select the JOBS tab of your Dataproc cluster, then click on the job in the jobs list.

When the job is finished, you can access your graph data with the Gremlin console or a graph application. See Graph Usage for help with accessing your data.

Spark job arguments

ArgumentRequired?DefaultDescription
-c, -aerospike.graphloader.configyesnoneGCP bucket location of the bulk loader properties file.

Spark job flags

FlagDescription
-validate_input_dataPerform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database.
-verify_output_dataPerform verification of a percentage of loaded elements, specified by aerospike.graphloader.sampling-percentage, by reading them back after loading. The verification process uses a traversal query.
-incremental_loadLoad data into an existing graph.

Bulk loader progress steps

StepDescription
Preflight checkVerify the CSV format to be properly formatted and parsable.
Temp data writingIntermediate transformative step to generate data for efficient writing of graph elements to the database.
Supernode extractionDetect supernodes (nodes that are densely connected) in the dataset to properly write them.
Vertex writingWrite vertices to the database.
Vertex validationValidate accuracy of written vertices using graph traversal queries.
Edge writingWrite edges to the database.
Edge validationValidate accuracy of written edges using graph traversal queries.

Procedure for the gcloud command-line interface (CLI)

You can also use the gcloud CLI to perform bulk loading operations.

  1. Install the gcloud CLI if you don't already have it.

  2. Use the gcloud init command to authorize commands on your GCP account.

  3. Use the gcloud dataproc clusters create command to to set up an Apache Spark cluster in your GCP account.

    The following is an example command to set up a typical cluster. Modify the command line arguments as necessary for your use case. Refer to the gcloud documentation for a complete reference of command line arguments.

    gcloud dataproc clusters create testcluster \
    --enable-component-gateway \
    --region us-central1 \
    --zone us-central1-a \
    --master-machine-type n2-standard-8 \
    --master-boot-disk-type pd-ssd \
    --master-boot-disk-size 500 \
    --num-workers 4 \
    --worker-machine-type n2-standard-4 \
    --worker-boot-disk-type pd-ssd \
    --worker-boot-disk-size 500 \
    --image-version 2.1-debian11 \
    --properties spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
    --project <YOUR-PROJECT-NAME>
  4. Use the gcloud dataproc jobs submit spark command to run a Spark job using the Aerospike Bulk Loader JAR file.

    The following is an example command to start a typical Spark job. Modify the command line arguments as necessary for your use case.

    gcloud dataproc jobs submit spark \
    --class=com.aerospike.firefly.bulkloader.SparkBulkLoaderMain \
    --jars=gs://path-to-bulkloader/aerospike-graph-bulk-loader-2.2.0.jar \
    --id loadjob \
    --cluster=testcluster \
    --region=us-central1 \
    -- -c gs://path-to-properties-file -validate_input_data -verify_output_data

    The -c argument contains the GCP bucket location of your Graph properties file.

Incremental data loading

Incremental data loading allows you to:

  • Add vertices to an existing graph.
  • Add edges to new and existing vertices.
  • Update properties of existing vertices.

To load data incrementally, add the -incremental_load flag to the submit spark command.

gcloud dataproc jobs submit spark \
--class=com.aerospike.firefly.bulkloader.SparkBulkLoaderMain \
--jars=gs://path-to-bulkloader/aerospike-graph-bulk-loader-2.2.0.jar \
--id loadjob \
--cluster=testcluster \
--region=us-central1 \
-- -c gs://path-to-properties-file -incremental_load