Read from and write to DataFrames
This page describes how to manage reads and writes between Aerospike and Spark DataFrames.
Prerequisites
- A running Aerospike Database.
- Aerospike Connect for Spark must be installed using the installation instructions.
Set up imports
Launch the spark shell:
$ spark-shell --jars path-to-aerospike-spark-connector.jar
Example of writing data into Aerospike
import scala.collection.mutable.ArrayBufferimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types._import org.apache.spark.sql.functions._import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.SparkSessionval TEST_COUNT= 100val simpleSchema: StructType = new StructType( Array( StructField("one", IntegerType, nullable = false), StructField("two", StringType, nullable = false), StructField("three", DoubleType, nullable = false) ))
val simpleDF = { val inputBuf= new ArrayBuffer[Row]() for (i <- 1 to TEST_COUNT){ val one = i val two = "two:"+i val three = i.toDouble val r = Row(one, two, three) inputBuf.append(r) } val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq) spark.createDataFrame(inputRDD,simpleSchema)}
//Write the Sample Data to AerospikesimpleDF.write .mode("append") .format("aerospike") //Aerospike-specific format .option("aerospike.writeset", "spark-test") //write to this set .option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key .option("aerospike.write.mode","update") .save()
Loading and saving DataFrames
Aerospike Connect for Spark loads data from Aerospike into a DataFrame and saves a DataFrame into Aerospike.
Loading data
val thingsDF = sqlContext.read. format("aerospike"). option("aerospike.seedhost", "localhost:3000"). option("aerospike.namespace", "test"). option("aerospike.set", "spark-test"). load(). show()
You can see that the read function is configured by a number of options, these are:
format("aerospike")
specifies the function library to load the DataFrame.option("aerospike.set", "spark-test")
specifies the Set to be used. For example, “spark-test”. Spark SQL can be used to efficiently filter (where lastName = ‘Smith’) Bin values represented as columns. The filter is passed down to the Aerospike cluster and filtering is done in the server. Here is an example using filtering:
val thingsDF = sqlContext.read. format("aerospike"). option("aerospike.set", "spark-test"). option("aerospike.seedhost", "localhost:3000"). option("aerospike.namespace", "test"). load() thingsDF.createOrReplaceTempView("things") thingsDF.show() val filteredThings = sqlContext.sql("select * from things where one = 55") filteredThings.show()
Additional meta-data columns are automatically included when reading from Aerospike, the default names are:
__key
the values of the primary key if it is stored in Aerospike__digest
the digest as Array[byte]__generation
the generation value of the record read__expiry
the expiration epoch__ttl
the time to live value calculated from the expiration - now
These meta-data column name defaults can be changed by using additional options during read or write, for example:
val thingsDF = sqlContext.read. format("aerospike"). option("aerospike.seedhost", "localhost:3000"). option("aerospike.namespace", "test"). option("aerospike.set", "spark-test"). option("aerospike.expiryColumn", "_my_expiry_column"). load() thingsDF.show()
Saving data
A DataFrame can be saved in Aerospike by specifying a column in the DataFrame as the Primary Key or the Digest.
Saving by Digest
In this example, the value of the digest is specified by the __digest
column in the DataFrame.
val allAerospikeConfig= Map("aerospike.seedhost" -> "localhost:3000","aerospike.namespace" -> "test" ,"aerospike.set" -> "spark-test") val thingsDF = sqlContext.read. format("aerospike"). options(allAerospikeConfig). load() thingsDF.show()
thingsDF.write. mode(SaveMode.Overwrite). format("aerospike"). option("aerospike.seedhost", "localhost:3000"). option("aerospike.namespace", "test"). option("aerospike.set", "spark-test"). option("aerospike.updateByDigest", "__digest"). save()
Saving by Key
In this example, the value of the primary key is specified by the key
column in the DataFrame.
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row
val schema = new StructType(Array( StructField("key",StringType,nullable = false), StructField("last",StringType,nullable = true), StructField("first",StringType,nullable = true), StructField("when",LongType,nullable = true) )) val rows = Seq( Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L), Row("Hawke_Bob","Hawke", "Bob", 1983L), Row("Keating_Paul","Keating", "Paul", 1991L), Row("Howard_John","Howard", "John", 1996L), Row("Rudd_Kevin","Rudd", "Kevin", 2007L), Row("Gillard_Julia","Gillard", "Julia", 2010L), Row("Abbott_Tony","Abbott", "Tony", 2013L), Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L) )
val inputRDD = sc.parallelize(rows)
val newDF = sqlContext.createDataFrame(inputRDD, schema)
newDF.write. mode(SaveMode.Append). format("aerospike"). option("aerospike.seedhost", "localhost:3000"). option("aerospike.namespace", "test"). option("aerospike.set", "spark-test"). option("aerospike.updateByKey", "key"). save()
Using TTL while saving
Time to live (TTL) can be set individually on each record. The TTL should be stored in a column in the DataSet before it is saved.
To enable updates to TTL, and additional option is specified:
option("aerospike.ttlColumn", "expiry")
Using TLS
In the following example, we illustrate how to use TLS to execute a sample Spark write task.
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row
val schema = new StructType(Array( StructField("key",StringType,nullable = false), StructField("last",StringType,nullable = true), StructField("first",StringType,nullable = true), StructField("when",LongType,nullable = true) )) val rows = Seq( Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L), Row("Hawke_Bob","Hawke", "Bob", 1983L), Row("Keating_Paul","Keating", "Paul", 1991L), Row("Howard_John","Howard", "John", 1996L), Row("Rudd_Kevin","Rudd", "Kevin", 2007L), Row("Gillard_Julia","Gillard", "Julia", 2010L), Row("Abbott_Tony","Abbott", "Tony", 2013L), Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L) )
val inputRDD = sc.parallelize(rows)
val newDF = sqlContext.createDataFrame(inputRDD, schema)
newDF.write. mode(SaveMode.Append). format("aerospike"). option("aerospike.seedhost", "localhost:3000"). option("aerospike.namespace", "test"). option("aerospike.set", "spark-test"). option("aerospike.updateByKey", "key"). option("aerospike.tls.enabletls", "true"). option("aerospike.tls.truststore-store-file", "/tls/ca.aerospike.com.truststore.jks"). option("aerospike.tls.truststore-store-password-file", "/tls/storepass"). option("aerospike.tls.truststore-key-password-file", "tls/keypass"). option("aerospike.tls.truststore-store-type", "JKS"). option("aerospike.tls.keystore-store-file", "/tls/connector.aerospike.com.keystore.jks"). option("aerospike.tls.keystore-store-password-file", "/tls/storepass"). option("aerospike.tls.keystore-key-password-file", "/tls/keypass"). option("aerospike.tls.keystore-store-type" , "JKS"). save()
Schema
Aerospike is Schema-less and Spark DataFrames use a Schema. To facilitate the need for schema, Aerospike Connect for Spark samples 100 records using a scan then reads the Bin names and infers the Bin type.
The number of records scanned can be changed by using the option:
option("aerospike.schema.scan", 20)
Note: the schema is derived each time load()
is called. If you call load()
before the Aerospike namespace/set has any data, only the meta-data columns will be available.
We recommend that you specify schema for data that includes complex types such as maps and lists in production, to ensure accurate type and structure inference.