We are excited to be a part of AWS re:Invent 2024. Visit us at booth #1844 in Las Vegas.More info
Blog

Using Aerospike Connect For Spark

Ken Tune
Ken Tune
Senior Solutions Architect
October 29, 2020|6 min read

Aerospike is a highly scalable key value database offering best in class performance. It is typically deployed into real-time environments managing terabyte to petabyte data volumes.

Aerospike will typically be run alongside other scalable distributed software such as Kafka, for system coupling or Spark for analytics. The Aerospike Connect product line makes integration as easy as possible.

This article looks at how Aerospike Spark Connect works in practice by offering a comprehensive and easily reproduced end to end example using aerospike-ansible.

Database Cluster Setup

First take a look at Ansible for Aerospike which explains how to use aerospike-ansible.

In this example I set cluster_instance_type to c5d.18xlarge in vars/cluster-config.yml.

Follow the instructions up to and including one touch setup. You’ll get as far as

ansible-playbook aws-setup-plus-aerospike-install.yml 
ansible-playbook aerospike-java-client-setup.yml

which will give you a 3 node cluster by default, plus a client instance with relevant software installed.

Spark Cluster Setup

This is done via

ansible-playbook spark-cluster-setup.yml

For this example, prior to running, I set spark_instance_type to c5d.4xlarge in vars/cluster-config.yml.

This playbook creates a 3 node Spark cluster, of the given instance type, with Spark installed and running. It also installs Aerospike Spark Connect.

Note you will need to set enterprise: true and provide a path to a valid Aerospike feature key using feature_key: /your/path/to/key in vars/cluster-config.yml. You must therefore be either a licensed Aerospike customer, or running an Aerospike trial.

Near the end of the process you will see

TASK [Spark master IP & master internal url]{ 
   "msg": "Spark master is 3.88.237.103. 
           Spark master internal url is spark://10.0.2.122:7077."
}

Make a note of the Spark master internal url — it is needed later.

Load Data

Our example makes use of 20m records from the 1bn NYC Taxi ride corpus, available in compressed form at https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz. We load to Aerospike using aerospike loader, which is installed on the client machine set up above. First of all we get the addresses of the hosts in the Aerospike cluster — these are needed later.

source ./scripts/cluster-ip-address-list.sh

Sample output

Adds cluster ips to this array- AERO_CLUSTER_IPS 
Use as ${ AERO_CLUSTER_IPS[index]} 
There are 3 entries ########################################################## 
cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58 cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234 cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95

aerospike loader requires a config file to load the data into Aerospike. This maps csv column postions to named and typed bins. A sample entry looks like

{ 
   "name": "pkup_datetime", 
   "value": { 
      "column_position": 3, 
      "type": "timestamp", 
      "encoding": "yyyy-MM-dd hh:mm:ss", 
      "dst_type": "integer" 
   }
}

This is provided in the repo at recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json. We upload this to the client instance.

source ./scripts/client-ip-address-list.sh 
scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[0]}:~

Next get the data onto the client machine. There’s more than one way to do this, but you need to plan as the dataset is 7.6Gb when uncompressed. I used the below, but specifics will depend on the specifics of your drives and filesystem.

./scripts/client-quick-ssh.sh # to log in, followed by 
sudo mkfs.ext4 /dev/nvme1n1 
sudo mkdir /data 
sudo mount -t ext4 /dev/nvme1n1 /data 
sudo chmod 777 /data 
wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz 
gunzip /data/trips_xaa.csv.gz

Finally we load our data in, using the config file we uploaded.

cd ~/aerospike-loader 
./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv

Note we’re using one of the cluster ip addresses we recorded earlier.

Using Spark

Log into one of the Spark nodes. Via aerospike-ansible there is a utility script for this

./scripts/spark-quick-ssh.sh

Start up a Spark shell, using the Spark master URL we saw when running the Spark cluster setup playbook.

/spark/bin/spark-shell --master spark://10.0.2.122:7077

Import relevant libraries

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode} 
import org.apache.spark.SparkConf 
import java.util.Date 
import java.text.SimpleDateFormat

Supply Aerospike configuration — note we supply the cluster ip used previously:

spark.conf.set("aerospike.seedhost", "10.0.0.234") 
spark.conf.set("aerospike.namespace", "test")

Define a view, and a function we will be using

val sqlContext = spark.sqlContext

sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))

val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load

taxi.createOrReplaceTempView("taxi")

Finally we run our queries

// Journeys grouped by cab type
val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")
result.show()

+--------+--------+                                                             
|cab_type|   count|
+--------+--------+
|   green|20000000|
+--------+--------+

// Average fare based on different passenger count
val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")
result.show()

+-------------+----------+                                                      
|passenger_cnt|avg_amount|
+-------------+----------+
|            0|     10.86|
|            1|     14.63|
|            2|     15.75|
|            3|     15.87|
|            4|     15.85|
|            5|     14.76|
|            6|     15.42|
|            7|     23.74|
|            8|     19.52|
|            9|      34.9|
+-------------+----------+

// No of journeys for different numbers of passengers
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");
result.show()

+-------------+---------+--------+                                              
|passenger_cnt|trip_year|   count|
+-------------+---------+--------+
|            0|     2014|    4106|
|            1|     2014|16557518|
|            2|     2014| 1473578|
|            3|     2014|  507862|
|            4|     2014|  160714|
|            5|     2014|  939276|
|            6|     2014|  355846|
|            7|     2014|     492|
|            8|     2014|     494|
|            9|     2014|     114|
+-------------+---------+--------+

// Number of trips for each passenger count/distance combination
// Ordered by trip count, descending
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")
result.show()

+-------------+---------+--------+-------+                                      
|passenger_cnt|trip_year|distance|  trips|
+-------------+---------+--------+-------+
|            1|     2014|     1.0|5321230|
|            1|     2014|     2.0|3500458|
|            1|     2014|     3.0|2166462|
|            1|     2014|     4.0|1418494|
|            1|     2014|     5.0| 918460|
|            1|     2014|     0.0| 868210|
|            1|     2014|     6.0| 653646|
|            1|     2014|     7.0| 488416|
|            2|     2014|     1.0| 433746|
|            1|     2014|     8.0| 345728|
|            2|     2014|     2.0| 305578|
|            5|     2014|     1.0| 302120|
|            1|     2014|     9.0| 226278|
|            5|     2014|     2.0| 199968|
|            2|     2014|     3.0| 199522|
|            1|     2014|    10.0| 163928|
|            3|     2014|     1.0| 145580|
|            2|     2014|     4.0| 137152|
|            5|     2014|     3.0| 122714|
|            1|     2014|    11.0| 117570|
+-------------+---------+--------+-------+
only showing top 20 rows

Conclusion

This shows you how quickly you can get up and running with a large data corpus. The example was done with 20m rows but this is easily extended to the full corpus. We can also see just how quickly you can get up and running with the aerospike-ansible tooling.