Bulk loading for distributed mode
Overview
The guidelines on this page are a framework for efficiently bulk-loading large graph datasets into an Aerospike database using the distributed mode of the Aerospike Graph Service (AGS) bulk loader.
Loading large volumes of graph data is a computationally demanding task. It can be best optimized by harnessing the extensive computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool Apache Spark.
Architectural overview
The distributed mode of the AGS bulk loader is a Java executable, distributed as a JAR file. The bulk loader runs as part of a Spark job, which distributes the workload across a cluster of servers. The bulk loader reads the source data files and loads them into an Aerospike Database namespace, using configuration information stored in a properties file. After the Spark job finishes, you can query the edges and vertices of your dataset with AGS.
Elements of the bulk loader
All elements of the bulk loader must reside in the same cloud network and be able to access one another. Those elements include:
- the JAR file
- the Aerospike Database cluster
- the source data files
- the properties file, and
- the Spark cluster
Although this guide uses GCP as an illustrative platform, the overall process applies across other cloud providers or on-premise deployments using Apache Spark.
Prerequisites
Source data files stored in Google Cloud buckets for edges and vertices in the Gremlin CSV format.
An Aerospike Database instance running on GCP. See Deploying on GCP for help.
The bulk loader JAR file, stored in a GCP bucket.
An AGS properties file, stored in a GCP bucket.
A GCP bucket with write access for data processing.
Source data files
The bulk loader accepts data files in the Gremlin CSV format, with vertices and edges specified in separate files. All CSV files must have header information with names for each column of data.
AGS does not support user-provided ~id
values for edges, so the ~id
column
is optional for edge CSV files. If your CSV file contains an ~id
column, the values
are ignored.
Data files must be stored in directories specified by the
aerospike.graphloader.vertices
and aerospike.graphloader.edges
configuration options.
The directory specified in
aerospike.graphloader.vertices
must contain one or more subdirectories of vertex CSV files.The directory specified in
aerospike.graphloader.edges
must contain one or more subdirectories of edge CSV files.Each subdirectory should be named for the label of the data files it contains. For example, a subdirectory of vertex files containing data about people should be named
people
. A subdirectory of edge files containing data about connections in thepeople
vertices, in which each row has theknows
label, should be namedknows
.
For example, if your Google Cloud bucket is named myBucket
, that bucket must contain separate directories for edge and vertex data files, and those directories must contain subdirectories for the CSV files.
If aerospike.graphloader.vertices
is set to gs://MY_BUCKET/vertices
, you might have subdirectories named gs://MY_BUCKET/vertices/people
and
gs://MY_BUCKET/vertices/places
, each containing one or more CSV files.
Example directory structure:
/MY_BUCKET
|
---- /MY_BUCKET/vertices/
|
-------- /MY_BUCKET/vertices/people/
|
------------ /MY_BUCKET/vertices/people/vert_file1.csv
------------ /myBucMY_BUCKETket/vertices/people/vert_file2.csv
|
-------- /MY_BUCKET/vertices/places/
|
------------ /MY_BUCKET/vertices/places/vert_file3.csv
------------ /MY_BUCKET/vertices/places/vert_file4.csv
|
---- /MY_BUCKET/edges/
|
-------- /MY_BUCKET/edges/worksWith/
|
------------ /MY_BUCKET/edges/worksWith/edge_file1.csv
------------ /MY_BUCKET/edges/worksWith/edge_file2.csv
|
-------- /MY_BUCKET/edges/knows/
|
------------ /MY_BUCKET/edges/knows/edge_file3.csv
------------ /MY_BUCKET/edges/knows/edge_file4.csv
The properties file
The AGS properties file contains the necessary details for running the Spark job to load your data files into your Aerospike database. For a full description of the properties file and a list of configuration options, see configuration options.
In addition to the standard .properties
file, using
the bulk loader requires some additional configuration options.
Configuration key | Required? | Default | Description |
---|---|---|---|
aerospike.graphloader.edges | yes | none | URI path to Edge CSVs. On GCP: gs://PATH_TO_BUCKET/edges/ On AWS: s3://PATH_TO_BUCKET/edges/ |
aerospike.graphloader.vertices | yes | none | URI path to Vertex CSVs. On GCP: gs://PATH_TO_BUCKET/vertices/ On AWS: s3://PATH_TO_BUCKET/vertices/ |
aerospike.graphloader.sampling-percentage | no | 0 | Percentage of loaded elements to read back after writing to verify successful bulk loading when the -validate_output_data flag is enabled. |
aerospike.graphloader.temp-directory | yes | none | URI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must ensure that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files. |
Additional cloud-specific options
The bulk loader supports additional cloud-specific authentication options if your cloud service requires credentials to run a Spark job. For a complete list, see Cloud storage configuration options.
The following is an example properties file named graph.properties
.
The commented lines contain options
you must configure with your GCP details.
aerospike.client.host = 10.128.0.80:3000,10.128.0.6:3000 # IP addresses and port of your Aerospike database cluster
aerospike.client.namespace = test # Namespace to use for data storage
aerospike.graphloader.edges = gs://PATH_TO_BUCKET/edges/ # Directory containing edge data files
aerospike.graphloader.vertices = gs://PATH_TO_BUCKET/vertices/ # Directory containing vertex data files
aerospike.graphloader.temp-directory = gs://PATH_TO_BUCKET/temp-data/ # Directory for data processing
Procedure for the GCP console
Log in to the GCP console and navigate to the Dataproc section.
Create a Dataproc cluster. Select Cluster on Compute Engine in the modal window.
Configure the cluster to the settings you want. Verify that the Enable component gateway box is checked.
When the cluster is up and running, click the Submit Job link from the Cluster Details screen.
Use the following job configuration options:
Job ID: Leave the default or provide a custom job name.
Job type: Spark
Main class or jar: com.aerospike.firefly.bulkloader.SparkBulkLoaderMain
Jar files: Link to the bulk loader JAR file, stored in a GCP bucket. Example:
gs://MY_BUCKET/jar-files/aerospike-graph-bulk-loader-2.2.0.jar
Arguments: The following argument is required. See Spark job arguments for a full list of arguments.
-c gs://MY_BUCKET/PATH_TO_PROPERTIES_FILE/bulk-loader.properties
Replace the string after
-c
with the path to your properties file, stored in a GCP bucket.See the following table for descriptions of the other arguments.
Click the SUBMIT button when your Spark job is ready.
To view the output from your Spark job, select the JOBS tab of your Dataproc cluster, then click on the job in the jobs list.
When the job is finished, you can access your graph data with the Gremlin console or a graph application. See Graph Usage for help with accessing your data.
Spark job arguments
Argument | Required? | Default | Description |
---|---|---|---|
-c , -aerospike.graphloader.config | yes | none | GCP bucket location of the bulk loader properties file. |
Spark job flags
Flag | Description |
---|---|
-validate_input_data | Perform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database. |
-verify_output_data | Perform verification of a percentage of loaded elements, specified by aerospike.graphloader.sampling-percentage , by reading them back after loading. The verification process uses a traversal query. |
-incremental_load | Load data into an existing graph. |
Bulk loader progress steps
Step | Description |
---|---|
Preflight check | Verify the CSV format to be properly formatted and parsable. |
Temp data writing | Intermediate transformative step to generate data for efficient writing of graph elements to the database. |
Supernode extraction | Detect supernodes (nodes that are densely connected) in the dataset to properly write them. |
Vertex writing | Write vertices to the database. |
Vertex validation | Validate accuracy of written vertices using graph traversal queries. |
Edge writing | Write edges to the database. |
Edge validation | Validate accuracy of written edges using graph traversal queries. |
Procedure for the gcloud
command-line interface (CLI)
You can also use the gcloud
CLI to perform
bulk loading operations.
Install the
gcloud
CLI if you don't already have it.Use the
gcloud init
command to authorize commands on your GCP account.Use the
gcloud dataproc clusters create
command to to set up an Apache Spark cluster in your GCP account.The following is an example command to set up a typical cluster. Modify the command line arguments as necessary for your use case. Refer to the
gcloud
documentation for a complete reference of command line arguments.gcloud dataproc clusters create testcluster \
--enable-component-gateway \
--region us-central1 \
--zone us-central1-a \
--master-machine-type n2-standard-8 \
--master-boot-disk-type pd-ssd \
--master-boot-disk-size 500 \
--num-workers 4 \
--worker-machine-type n2-standard-4 \
--worker-boot-disk-type pd-ssd \
--worker-boot-disk-size 500 \
--image-version 2.1-debian11 \
--properties spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
--project <YOUR-PROJECT-NAME>Use the
gcloud dataproc jobs submit spark
command to run a Spark job using the Aerospike Bulk Loader JAR file.The following is an example command to start a typical Spark job. Modify the command line arguments as necessary for your use case.
gcloud dataproc jobs submit spark \
--class=com.aerospike.firefly.bulkloader.SparkBulkLoaderMain \
--jars=gs://path-to-bulkloader/aerospike-graph-bulk-loader-2.2.0.jar \
--id loadjob \
--cluster=testcluster \
--region=us-central1 \
-- -c gs://path-to-properties-file -validate_input_data -verify_output_dataThe
-c
argument contains the GCP bucket location of your Graph properties file.
Incremental data loading
Incremental data loading allows you to:
- Add vertices to an existing graph.
- Add edges to new and existing vertices.
- Update properties of existing vertices.
To load data incrementally, add the -incremental_load
flag to the submit spark
command.
gcloud dataproc jobs submit spark \
--class=com.aerospike.firefly.bulkloader.SparkBulkLoaderMain \
--jars=gs://path-to-bulkloader/aerospike-graph-bulk-loader-2.2.0.jar \
--id loadjob \
--cluster=testcluster \
--region=us-central1 \
-- -c gs://path-to-properties-file -incremental_load