Bulk Data Loading Procedures for Distributed Processing
Loading large volumes of graph data is a computationally demanding task. It can be best optimized by harnessing the extensive computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool Apache Spark. The guidelines on this page are intended to serve as a framework for efficiently bulk-loading large graph datasets into Aerospike.
Architectural overviewโ
The Aerospike Distributed Mode Bulk Loader is a Java executable, distributed as a JAR file. The bulk loader runs as part of a Spark job, which distributes the workload across a cluster of servers. The bulk loader reads the source data files and loads them into an Aerospike Database namespace, using configuration information stored in a configuration file. After the Spark job finishes, you can query the edges and vertices of your dataset with the Aerospike Graph Service.
All the elements of the bulk loader -- the JAR file, the Aerospike Database cluster, the source data files, the configuration file, and the Spark cluster -- must reside in the same cloud network and be able to access one another.
While this guide uses GCP as an illustrative platform, the overall process can be applied across other cloud providers or on-premise deployments using Apache Spark.
Prerequisitesโ
Source data files for edges and vertices in the Gremlin CSV format. The data files should be stored in Google Cloud buckets.
An Aerospike Database instance running on GCP. See Deploying on GCP for help.
The Aerospike Graph bulk loader JAR file, stored in a GCP bucket.
An Aerospike Graph configuration file, stored in a GCP bucket.
A GCP bucket with write access for data processing.
The bulk loader can only load data into an empty database.
Source data filesโ
The bulk loader accepts data files in the Gremlin CSV format, with vertices and edges specified in separate files. All CSV files should have header information with names for each column of data.
Aerospike Graph does not support user-provided ~id
values for edges, so the ~id
column
is optional for edge CSV files. If your CSV file contains an ~id
column, the values
are ignored.
Data files should be stored in directories specified by the
aerospike.graphloader.vertices
and aerospike.graphloader.edges
configuration options.
The directory specified in
aerospike.graphloader.vertices
should contain one or more subdirectories of vertex CSV files.The directory specified in
aerospike.graphloader.edges
should contain one or more subdirectories of edge CSV files.Each subdirectory should be named for the label of the data files it contains. For example, a subdirectory of vertex files containing data about people should be named
people
. A subdirectory of edge files containing data about connections in thepeople
vertices, in which each row has theknows
label, should be namedknows
.
For example, if your Google Cloud bucket is named myBucket
, that bucket should
contain separate directories for edge and vertex data files, and those directories
should contain subdirectories for the CSV files.
If aerospike.graphloader.vertices
is set to gs://myBucket/vertices
, you might
have subdirectories named gs://myBucket/vertices/people
and
gs://myBucket/vertices/places
, each containing one or more CSV files.
Example directory structure:
/myBucket
|
---- /myBucket/vertices/
|
-------- /myBucket/vertices/people/
|
------------ /myBucket/vertices/people/vert_file1.csv
------------ /myBucket/vertices/people/vert_file2.csv
|
-------- /myBucket/vertices/places/
|
------------ /myBucket/vertices/places/vert_file3.csv
------------ /myBucket/vertices/places/vert_file4.csv
|
---- /myBucket/edges/
|
-------- /myBucket/edges/worksWith/
|
------------ /myBucket/edges/worksWith/edge_file1.csv
------------ /myBucket/edges/worksWith/edge_file2.csv
|
-------- /myBucket/edges/knows/
|
------------ /myBucket/edges/knows/edge_file3.csv
------------ /myBucket/edges/knows/edge_file4.csv
The configuration fileโ
The Aerospike Graph configuration file contains the necessary details for running the Spark job to load your data files into your Aerospike database. For a full description of the configuration file and a list of configuration options, see configuration options.
In addition to the standard .properties
Aerospike Graph configuration file, using
the bulk loader requires some additional configuration options.
Configuration key | Required? | Default | Description |
---|---|---|---|
aerospike.graphloader.edges | yes | none | URI path to Edge CSVs. On GCP: gs://path-to-bucket/edges/ On AWS: s3://path-to-bucket/edges/ |
aerospike.graphloader.vertices | yes | none | URI path to Vertex CSVs. On GCP: gs://path-to-bucket/vertices/ On AWS: s3://path-to-bucket/vertices/ |
aerospike.graphloader.sampling-percentage | no | 0 | Percentage of loaded elements to read back after writing to verify successful bulk loading when the -validate_output_data flag is enabled. |
aerospike.graphloader.temp-directory | yes | none | URI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must ensure that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files. |
Additional cloud-specific optionsโ
The bulk loader supports additional cloud-specific authentication options if your cloud service requires credentials to run a Spark job. For a complete list, see Cloud storage configuration options.
The following is an example Graph
configuration file named graph.properties
. The commented lines contain options
you must configure with your GCP details.
aerospike.client.host = 10.128.0.80:3000,10.128.0.6:3000 # IP addresses and port of your Aerospike database cluster
aerospike.client.namespace = test # Namespace to use for data storage
aerospike.graphloader.edges = gs://path-to-bucket/edges/ # Directory containing edge data files
aerospike.graphloader.vertices = gs://path-to-bucket/vertices/ # Directory containing vertex data files
aerospike.graphloader.temp-directory = gs://path-to-bucket/temp-data/ # Directory for data processing
Procedure for the GCP consoleโ
Log in to the GCP console and navigate to the Dataproc section.
Create a new Dataproc cluster. Select Cluster on Compute Engine in the modal window.
Configure the cluster to your desired settings. Ensure that the Enable component gateway box is checked.
When the cluster is up and running, click the Submit Job link from the Cluster Details screen.
Use the following job configuration options:
Job ID: Leave the default or provide a custom job name.
Job type: Spark
Main class or jar: com.aerospike.firefly.bulkloader.SparkBulkLoaderMain
Jar files: Link to the bulk loader JAR file, stored in a GCP bucket. Example:
gs://my-bucket/jar-files/aerospike-graph-bulk-loader-2.1.0.jar
Arguments: The following argument is required. See Spark job arguments for a full list of arguments.
-c gs://my-bucket/path-to-configuration-file/bulk-loader.properties
Replace the string after
-c
with the path to your configuration file, stored in a GCP bucket.See the table below for descriptions of the other arguments.
Click the SUBMIT button when your Spark job is ready.
To view the output from your Spark job, select the JOBS tab of your Dataproc cluster, then click on the job in the jobs list.
When the job completes, you can access your graph data with the Gremlin console or a graph application. See Graph Usage for help with accessing your data.
Spark job argumentsโ
Argument | Required? | Default | Description |
---|---|---|---|
-c , -aerospike.graphloader.config | yes | none | GCP bucket location of the bulk loader configuration file. |
Spark job flagsโ
Flag | Description |
---|---|
-validate_input_data | Perform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database. |
-verify_output_data | Perform verification of a percentage, specified by aerospike.graphloader.sampling-percentage , of loaded elements by reading them back after loading via a traversal query. |
Bulk loader progress stepsโ
Step | Description |
---|---|
Preflight check | Verify the CSV format to be properly formatted and parsable. |
Tempย dataย writing | Intermediate transformative step to generate data for efficient writing of graph elements to the database. |
Supernode extraction | Detect supernodes (nodes that are densely connected) in the dataset in order to properly write them. |
Vertex writing | Write vertices to the database. |
Vertex validation | Validate accuracy of written vertices via graph traversal queries. |
Edge writing | Write edges to the database. |
Edge validation | Validate accuracy of written edges via graph traversal queries. |
Procedure for the gcloud
command-line interface (CLI)โ
You can also use the gcloud
CLI to perform
bulk loading operations.
Install the
gcloud
CLI if you don't already have it.Use the
gcloud init
command to authorize commands against your GCP account.Use the
gcloud dataproc clusters create
command to set up an Apache Spark cluster in your GCP account.The following is an example command to set up a typical cluster. Modify the command line arguments as needed for your use case. Refer to the
gcloud
documentation for a complete reference of command line arguments.gcloud dataproc clusters create testcluster \
--enable-component-gateway \
--region us-central1 \
--zone us-central1-a \
--master-machine-type n2-standard-8 \
--master-boot-disk-type pd-ssd \
--master-boot-disk-size 500 \
--num-workers 4 \
--worker-machine-type n2-standard-4 \
--worker-boot-disk-type pd-ssd \
--worker-boot-disk-size 500 \
--image-version 2.1-debian11 \
--properties spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
--project <YOUR-PROJECT-NAME>Use the
gcloud dataproc jobs submit spark
command to run a Spark job using the Aerospike Bulk Loader JAR file.The following is an example command to start a typical Spark job. Modify the command line arguments as needed for your use case.
gcloud dataproc jobs submit spark \
--class=com.aerospike.firefly.bulkloader.SparkBulkLoaderMain \
--jars=gs://path-to-bulkloader/aerospike-graph-bulk-loader-2.1.0.jar \
--id loadjob \
--cluster=testcluster \
--region=us-central1 \
-- -c gs://path-to-properties-file -validate_input_data -validate_output_dataThe
-c
argument contains the GCP bucket location of your Graph configuration file.