Skip to main content
Loading

Reading from and Writing to DataFrames

Be sure to follow the installation instructions and complete the installation of Aerospike Connect for Spark. In this section of the tutorial we will walk through reading data from an Aerospike database into a Spark DataFrame and writing a Spark DataFrame to an Aerospike database.

Set up importsโ€‹

Launch the spark shell:

$ spark-shell --jars path-to-aerospike-spark-connector.jar

Example of writing data into Aerospikeโ€‹

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", StringType, nullable = false),
StructField("three", DoubleType, nullable = false)
))

val simpleDF = {
val inputBuf= new ArrayBuffer[Row]()
for (i <- 1 to TEST_COUNT){
val one = i
val two = "two:"+i
val three = i.toDouble
val r = Row(one, two, three)
inputBuf.append(r)
}
val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
spark.createDataFrame(inputRDD,simpleSchema)
}

//Write the Sample Data to Aerospike
simpleDF.write
.mode("append")
.format("aerospike") //Aerospike-specific format
.option("aerospike.writeset", "spark-test") //write to this set
.option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key
.option("aerospike.write.mode","update")
.save()

Loading and Saving DataFramesโ€‹

Aerospike Connect for Spark provides functions to load data from Aerospike into a DataFrame and save a DataFrame into Aerospike

Loading dataโ€‹

    val thingsDF = sqlContext.read.
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
load().
show()

You can see that the read function is configured by a number of options, these are:

  • format("aerospike") specifies the function library to load the DataFrame.
  • option("aerospike.set", "spark-test") specifies the Set to be used e.g. "spark-test" Spark SQL can be used to efficiently filter (where lastName = 'Smith') Bin values represented as columns. The filter is passed down to the Aerospike cluster and filtering is done in the server. Here is an example using filtering:
    val thingsDF = sqlContext.read.
format("aerospike").
option("aerospike.set", "spark-test").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
load()
thingsDF.createOrReplaceTempView("things")
thingsDF.show()
val filteredThings = sqlContext.sql("select * from things where one = 55")
filteredThings.show()

Additional meta-data columns are automatically included when reading from Aerospike, the default names are:

  • __key the values of the primary key if it is stored in Aerospike
  • __digest the digest as Array[byte]
  • __generation the generation value of the record read
  • __expiry the expiration epoch
  • __ttl the time to live value calculated from the expiration - now

These meta-data column name defaults can be changed by using additional options during read or write, for example:

    val thingsDF = sqlContext.read.
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.expiryColumn", "_my_expiry_column").
load()
thingsDF.show()

Saving dataโ€‹

A DataFrame can be saved in Aerospike by specifying a column in the DataFrame as the Primary Key or the Digest.

Saving by Digestโ€‹

In this example, the value of the digest is specified by the __digest column in the DataFrame.

    val allAerospikeConfig= Map("aerospike.seedhost" -> "localhost:3000","aerospike.namespace" -> "test" ,"aerospike.set" -> "spark-test")
val thingsDF = sqlContext.read.
format("aerospike").
options(allAerospikeConfig).
load()
thingsDF.show()

thingsDF.write.
mode(SaveMode.Overwrite).
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.updateByDigest", "__digest").
save()
Saving by Keyโ€‹

In this example, the value of the primary key is specified by the key column in the DataFrame.

      import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row

val schema = new StructType(Array(
StructField("key",StringType,nullable = false),
StructField("last",StringType,nullable = true),
StructField("first",StringType,nullable = true),
StructField("when",LongType,nullable = true)
))
val rows = Seq(
Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),
Row("Hawke_Bob","Hawke", "Bob", 1983L),
Row("Keating_Paul","Keating", "Paul", 1991L),
Row("Howard_John","Howard", "John", 1996L),
Row("Rudd_Kevin","Rudd", "Kevin", 2007L),
Row("Gillard_Julia","Gillard", "Julia", 2010L),
Row("Abbott_Tony","Abbott", "Tony", 2013L),
Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)
)

val inputRDD = sc.parallelize(rows)

val newDF = sqlContext.createDataFrame(inputRDD, schema)

newDF.write.
mode(SaveMode.Append).
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.updateByKey", "key").
save()
Using TTL while savingโ€‹

Time to live (TTL) can be set individually on each record. The TTL should be stored in a column in the DataSet before it is saved.

To enable updates to TTL, and additional option is specified:

    option("aerospike.ttlColumn", "expiry")

Using TLSโ€‹

In the following example, we illustrate how to use TLS to execute a sample Spark write task.

      import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row

val schema = new StructType(Array(
StructField("key",StringType,nullable = false),
StructField("last",StringType,nullable = true),
StructField("first",StringType,nullable = true),
StructField("when",LongType,nullable = true)
))
val rows = Seq(
Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),
Row("Hawke_Bob","Hawke", "Bob", 1983L),
Row("Keating_Paul","Keating", "Paul", 1991L),
Row("Howard_John","Howard", "John", 1996L),
Row("Rudd_Kevin","Rudd", "Kevin", 2007L),
Row("Gillard_Julia","Gillard", "Julia", 2010L),
Row("Abbott_Tony","Abbott", "Tony", 2013L),
Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)
)

val inputRDD = sc.parallelize(rows)

val newDF = sqlContext.createDataFrame(inputRDD, schema)

newDF.write.
mode(SaveMode.Append).
format("aerospike").
option("aerospike.seedhost", "localhost:3000").
option("aerospike.namespace", "test").
option("aerospike.set", "spark-test").
option("aerospike.updateByKey", "key").
option("aerospike.tls.enabletls", "true").
option("aerospike.tls.truststore-store-file", "/tls/ca.aerospike.com.truststore.jks").
option("aerospike.tls.truststore-store-password-file", "/tls/storepass").
option("aerospike.tls.truststore-key-password-file", "tls/keypass").
option("aerospike.tls.truststore-store-type", "JKS").
option("aerospike.tls.keystore-store-file", "/tls/connector.aerospike.com.keystore.jks").
option("aerospike.tls.keystore-store-password-file", "/tls/storepass").
option("aerospike.tls.keystore-key-password-file", "/tls/keypass").
option("aerospike.tls.keystore-store-type" , "JKS").
save()
note

Setting up TLS is required for all truststore-related properties, except for those without any default values. All the artifacts such as certificates and truststores, must be present on all the Spark nodes. The Aerospike Spark connector reads these artifacts only from local paths. See the configuration page to analyze all the authentication related configurations.

Schemaโ€‹

Aerospike is Schema-less and Spark DataFrames use a Schema. To facilitate the need for schema, Aerospike Connect for Spark samples 100 records using a scan then reads the Bin names and infers the Bin type.

The number of records scanned can be changed by using the option:

    option("aerospike.schema.scan", 20)

Note: the schema is derived each time load() is called. If you call load() before the Aerospike namespace/set has any data, only the meta-data columns will be available.

We recommend that you specify schema for data that includes complex types such as maps and lists in production, to ensure accurate type and structure inference.