# Read from and write to DataFrames

This page describes how to manage reads and writes between Aerospike and Spark DataFrames.

## Prerequisites

-   A running Aerospike Database.
-   Aerospike Connect for Spark must be installed using the [installation instructions](https://aerospike.com/docs/connectors/spark/installation).

### Set up imports

Launch the spark shell:

```plaintext
$ spark-shell --jars path-to-aerospike-spark-connector.jar
```

### Example of writing data into Aerospike

```scala
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

import org.apache.spark.sql.SaveMode

import org.apache.spark.sql.SparkSession

val TEST_COUNT= 100

val simpleSchema: StructType = new StructType(

    Array(

    StructField("one", IntegerType, nullable = false),

    StructField("two", StringType, nullable = false),

    StructField("three", DoubleType, nullable = false)

  ))

val simpleDF = {

    val inputBuf=  new ArrayBuffer[Row]()

    for (i <- 1 to TEST_COUNT){

        val one = i

        val two = "two:"+i

        val three = i.toDouble

        val r = Row(one, two, three)

        inputBuf.append(r)

    }

    val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)

    spark.createDataFrame(inputRDD,simpleSchema)

}

//Write the Sample Data to Aerospike

simpleDF.write

  .mode("append")

  .format("aerospike") //Aerospike-specific format

  .option("aerospike.writeset", "spark-test") //write to this set

  .option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key

  .option("aerospike.write.mode","update")

  .save()
```

### Loading and saving DataFrames

Aerospike Connect for Spark loads data from Aerospike into a DataFrame and saves a DataFrame into Aerospike.

#### Loading data

```scala
val thingsDF = sqlContext.read.

      format("aerospike").

      option("aerospike.seedhost", "localhost:3000").

      option("aerospike.namespace", "test").

      option("aerospike.set", "spark-test").

      load().

      show()
```

You can see that the read function is configured by a number of options, these are:

-   `format("aerospike")` specifies the function library to load the DataFrame.
-   `option("aerospike.set", "spark-test")` specifies the Set to be used. For example, “spark-test”. Spark SQL can be used to efficiently filter (where lastName = ‘Smith’) Bin values represented as columns. The filter is passed down to the Aerospike cluster and filtering is done in the server. Here is an example using filtering:

```scala
val thingsDF = sqlContext.read.

      format("aerospike").

      option("aerospike.set", "spark-test").

      option("aerospike.seedhost", "localhost:3000").

      option("aerospike.namespace", "test").

      load()

    thingsDF.createOrReplaceTempView("things")

    thingsDF.show()

    val filteredThings = sqlContext.sql("select * from things where one = 55")

    filteredThings.show()
```

Additional meta-data columns are automatically included when reading from Aerospike, the default names are:

-   `__key` the values of the primary key if it is stored in Aerospike
-   `__digest` the digest as Array\[byte\]
-   `__generation` the generation value of the record read
-   `__expiry` the expiration epoch
-   `__ttl` the time to live value calculated from the expiration - now

These meta-data column name defaults can be changed by using additional options during read or write, for example:

```scala
val thingsDF = sqlContext.read.

      format("aerospike").

      option("aerospike.seedhost", "localhost:3000").

      option("aerospike.namespace", "test").

      option("aerospike.set", "spark-test").

      option("aerospike.expiryColumn", "_my_expiry_column").

      load()

    thingsDF.show()
```

#### Saving data

A DataFrame can be saved in Aerospike by specifying a column in the DataFrame as the Primary Key or the Digest.

##### Saving by Digest

In this example, the value of the digest is specified by the `__digest` column in the DataFrame.

```scala
val allAerospikeConfig= Map("aerospike.seedhost" -> "localhost:3000","aerospike.namespace" -> "test" ,"aerospike.set" -> "spark-test")

    val thingsDF = sqlContext.read.

      format("aerospike").

      options(allAerospikeConfig).

      load()

    thingsDF.show()

    thingsDF.write.

      mode(SaveMode.Overwrite).

      format("aerospike").

      option("aerospike.seedhost", "localhost:3000").

      option("aerospike.namespace", "test").

      option("aerospike.set", "spark-test").

      option("aerospike.updateByDigest", "__digest").

      save()
```

##### Saving by Key

In this example, the value of the primary key is specified by the `key` column in the DataFrame.

```scala
import org.apache.spark.sql.types.StructType

      import org.apache.spark.sql.types.StructField

      import org.apache.spark.sql.types.LongType

      import org.apache.spark.sql.types.StringType

      import org.apache.spark.sql.DataFrame

      import org.apache.spark.sql.Row

      val schema = new StructType(Array(

          StructField("key",StringType,nullable = false),

          StructField("last",StringType,nullable = true),

          StructField("first",StringType,nullable = true),

          StructField("when",LongType,nullable = true)

          ))

      val rows = Seq(

          Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),

          Row("Hawke_Bob","Hawke", "Bob", 1983L),

          Row("Keating_Paul","Keating", "Paul", 1991L),

          Row("Howard_John","Howard", "John", 1996L),

          Row("Rudd_Kevin","Rudd", "Kevin", 2007L),

          Row("Gillard_Julia","Gillard", "Julia", 2010L),

          Row("Abbott_Tony","Abbott", "Tony", 2013L),

          Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)

          )

      val inputRDD = sc.parallelize(rows)

      val newDF = sqlContext.createDataFrame(inputRDD, schema)

      newDF.write.

        mode(SaveMode.Append).

        format("aerospike").

        option("aerospike.seedhost", "localhost:3000").

        option("aerospike.namespace", "test").

        option("aerospike.set", "spark-test").

        option("aerospike.updateByKey", "key").

        save()
```

##### Using TTL while saving

Time to live (TTL) can be set individually on each record. The TTL should be stored in a column in the DataSet before it is saved.

To enable updates to TTL, and additional option is specified:

```plaintext
option("aerospike.ttlColumn", "expiry")
```

### Using TLS

In the following example, we illustrate how to use TLS to execute a sample Spark write task.

```scala
import org.apache.spark.sql.types.StructType

      import org.apache.spark.sql.types.StructField

      import org.apache.spark.sql.types.LongType

      import org.apache.spark.sql.types.StringType

      import org.apache.spark.sql.DataFrame

      import org.apache.spark.sql.Row

      val schema = new StructType(Array(

          StructField("key",StringType,nullable = false),

          StructField("last",StringType,nullable = true),

          StructField("first",StringType,nullable = true),

          StructField("when",LongType,nullable = true)

          ))

      val rows = Seq(

          Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),

          Row("Hawke_Bob","Hawke", "Bob", 1983L),

          Row("Keating_Paul","Keating", "Paul", 1991L),

          Row("Howard_John","Howard", "John", 1996L),

          Row("Rudd_Kevin","Rudd", "Kevin", 2007L),

          Row("Gillard_Julia","Gillard", "Julia", 2010L),

          Row("Abbott_Tony","Abbott", "Tony", 2013L),

          Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)

          )

      val inputRDD = sc.parallelize(rows)

      val newDF = sqlContext.createDataFrame(inputRDD, schema)

      newDF.write.

        mode(SaveMode.Append).

        format("aerospike").

        option("aerospike.seedhost", "localhost:3000").

        option("aerospike.namespace", "test").

        option("aerospike.set", "spark-test").

        option("aerospike.updateByKey", "key").

        option("aerospike.tls.enabletls", "true").

        option("aerospike.tls.truststore-store-file",  "/tls/ca.aerospike.com.truststore.jks").

        option("aerospike.tls.truststore-store-password-file",  "/tls/storepass").

        option("aerospike.tls.truststore-key-password-file", "tls/keypass").

        option("aerospike.tls.truststore-store-type", "JKS").

        option("aerospike.tls.keystore-store-file", "/tls/connector.aerospike.com.keystore.jks").

        option("aerospike.tls.keystore-store-password-file", "/tls/storepass").

        option("aerospike.tls.keystore-key-password-file", "/tls/keypass").

        option("aerospike.tls.keystore-store-type" , "JKS").

        save()
```

::: note
Setting up TLS is required for all truststore-related properties, except for those without any default values. All the artifacts such as certificates and truststores, must be present on all the Spark nodes. The Aerospike Spark connector reads these artifacts only from local paths. See [configuration](https://aerospike.com/docs/connectors/spark/configuration#authentication) to analyze all the authentication related configurations.
:::

### Schema

Aerospike is Schema-less and Spark DataFrames use a Schema. To facilitate the need for schema, Aerospike Connect for Spark samples 100 records using a scan then reads the Bin names and infers the Bin type.

The number of records scanned can be changed by using the option:

```plaintext
option("aerospike.schema.scan", 20)
```

Note: the schema is derived each time `load()` is called. If you call `load()` before the Aerospike namespace/set has any data, only the meta-data columns will be available.

**We recommend that you specify schema for data that includes complex types such as maps and lists in production, to ensure accurate type and structure inference.**