Skip to content
Visit booth 3171 at Google Cloud Next to see how to unlock real-time decisions at scaleMore info

Read from and write to DataFrames

This page describes how to manage reads and writes between Aerospike and Spark DataFrames.

Prerequisites

Set up imports

Launch the spark shell:

$ spark-shell --jars path-to-aerospike-spark-connector.jar

Example of writing data into Aerospike

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", StringType, nullable = false),
StructField("three", DoubleType, nullable = false)
))
val simpleDF = {
val inputBuf= new ArrayBuffer[Row]()
for (i <- 1 to TEST_COUNT){
val one = i
val two = "two:"+i
val three = i.toDouble
val r = Row(one, two, three)
inputBuf.append(r)
}
val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
spark.createDataFrame(inputRDD,simpleSchema)
}
//Write the Sample Data to Aerospike
simpleDF.write
.mode("append")
.format("aerospike") //Aerospike-specific format
.option("aerospike.writeset", "spark-test") //write to this set
.option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key
.option("aerospike.write.mode","update")
.save()

Loading and saving DataFrames

Aerospike Connect for Spark loads data from Aerospike into a DataFrame and saves a DataFrame into Aerospike.

Loading data

val thingsDF = sqlContext.read.
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
load().
show()

You can see that the read function is configured by a number of options, these are:

  • format("aerospike") specifies the function library to load the DataFrame.
  • option("aerospike.set", "spark-test") specifies the Set to be used. For example, “spark-test”. Spark SQL can be used to efficiently filter (where lastName = ‘Smith’) Bin values represented as columns. The filter is passed down to the Aerospike cluster and filtering is done in the server. Here is an example using filtering:
val thingsDF = sqlContext.read.
format("aerospike").
option("aerospike.set", "spark-test").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
load()
thingsDF.createOrReplaceTempView("things")
thingsDF.show()
val filteredThings = sqlContext.sql("select * from things where one = 55")
filteredThings.show()

Additional meta-data columns are automatically included when reading from Aerospike, the default names are:

  • __key the values of the primary key if it is stored in Aerospike
  • __digest the digest as Array[byte]
  • __generation the generation value of the record read
  • __expiry the expiration epoch
  • __ttl the time to live value calculated from the expiration - now

These meta-data column name defaults can be changed by using additional options during read or write, for example:

val thingsDF = sqlContext.read.
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.expiryColumn", "_my_expiry_column").
load()
thingsDF.show()

Saving data

A DataFrame can be saved in Aerospike by specifying a column in the DataFrame as the Primary Key or the Digest.

Saving by Digest

In this example, the value of the digest is specified by the __digest column in the DataFrame.

val allAerospikeConfig= Map("aerospike.seedhost" -> "localhost:3000","aerospike.namespace" -> "test" ,"aerospike.set" -> "spark-test")
val thingsDF = sqlContext.read.
format("aerospike").
options(allAerospikeConfig).
load()
thingsDF.show()
thingsDF.write.
mode(SaveMode.Overwrite).
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.updateByDigest", "__digest").
save()
Saving by Key

In this example, the value of the primary key is specified by the key column in the DataFrame.

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
val schema = new StructType(Array(
StructField("key",StringType,nullable = false),
StructField("last",StringType,nullable = true),
StructField("first",StringType,nullable = true),
StructField("when",LongType,nullable = true)
))
val rows = Seq(
Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),
Row("Hawke_Bob","Hawke", "Bob", 1983L),
Row("Keating_Paul","Keating", "Paul", 1991L),
Row("Howard_John","Howard", "John", 1996L),
Row("Rudd_Kevin","Rudd", "Kevin", 2007L),
Row("Gillard_Julia","Gillard", "Julia", 2010L),
Row("Abbott_Tony","Abbott", "Tony", 2013L),
Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)
)
val inputRDD = sc.parallelize(rows)
val newDF = sqlContext.createDataFrame(inputRDD, schema)
newDF.write.
mode(SaveMode.Append).
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.updateByKey", "key").
save()
Using TTL while saving

Time to live (TTL) can be set individually on each record. The TTL should be stored in a column in the DataSet before it is saved.

To enable updates to TTL, and additional option is specified:

option("aerospike.ttlColumn", "expiry")

Using TLS

In the following example, we illustrate how to use TLS to execute a sample Spark write task.

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
val schema = new StructType(Array(
StructField("key",StringType,nullable = false),
StructField("last",StringType,nullable = true),
StructField("first",StringType,nullable = true),
StructField("when",LongType,nullable = true)
))
val rows = Seq(
Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),
Row("Hawke_Bob","Hawke", "Bob", 1983L),
Row("Keating_Paul","Keating", "Paul", 1991L),
Row("Howard_John","Howard", "John", 1996L),
Row("Rudd_Kevin","Rudd", "Kevin", 2007L),
Row("Gillard_Julia","Gillard", "Julia", 2010L),
Row("Abbott_Tony","Abbott", "Tony", 2013L),
Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)
)
val inputRDD = sc.parallelize(rows)
val newDF = sqlContext.createDataFrame(inputRDD, schema)
newDF.write.
mode(SaveMode.Append).
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.updateByKey", "key").
option("aerospike.tls.enabletls", "true").
option("aerospike.tls.truststore-store-file", "/tls/ca.aerospike.com.truststore.jks").
option("aerospike.tls.truststore-store-password-file", "/tls/storepass").
option("aerospike.tls.truststore-key-password-file", "tls/keypass").
option("aerospike.tls.truststore-store-type", "JKS").
option("aerospike.tls.keystore-store-file", "/tls/connector.aerospike.com.keystore.jks").
option("aerospike.tls.keystore-store-password-file", "/tls/storepass").
option("aerospike.tls.keystore-key-password-file", "/tls/keypass").
option("aerospike.tls.keystore-store-type" , "JKS").
save()

Schema

Aerospike is Schema-less and Spark DataFrames use a Schema. To facilitate the need for schema, Aerospike Connect for Spark samples 100 records using a scan then reads the Bin names and infers the Bin type.

The number of records scanned can be changed by using the option:

option("aerospike.schema.scan", 20)

Note: the schema is derived each time load() is called. If you call load() before the Aerospike namespace/set has any data, only the meta-data columns will be available.

We recommend that you specify schema for data that includes complex types such as maps and lists in production, to ensure accurate type and structure inference.

Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?