# Aerospike Spark Connector Tutorial for Scala

Tested with Spark connector 3.1.0, Java 8, Apache Spark 3.0.2, Python 3.7 and Scala 2.12.11 and [Spylon](https://pypi.org/project/spylon-kernel/)

## Getting Started

### Download the appropriate [Aerospike Connect for Spark](https://aerospike.com/download/connector/spark/)

Set `launcher.jars` with path to the downloaded binary

```scala
%%init_spark

launcher.jars = ["aerospike-spark-assembly-3.1.0.jar"]

launcher.master = "local[*]"
```

```scala
//Specify the Seed Host of the Aerospike Server

val AS_HOST = "172.16.39.192:3000"
```

> Output
> 
> ```plaintext
> Intitializing Scala interpreter ...
> 
> Spark Web UI available at http://192.168.106.119:4040
> 
> SparkContext available as 'sc' (version = 3.0.2, master = local[*], app id = local-1626732681010)
> 
> SparkSession available as 'spark'
> 
> AS_HOST: String = 172.16.39.192:3000
> ```

```scala
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

import org.apache.spark.sql.SaveMode

import com.aerospike.spark.sql.AerospikeConnection

import org.apache.spark.sql.SparkSession
```

> Output
> 
> ```plaintext
> import scala.collection.mutable.ArrayBuffer
> 
> import org.apache.spark.sql.Row
> 
> import org.apache.spark.sql.types._
> 
> import org.apache.spark.sql.functions._
> 
> import org.apache.spark.sql.SaveMode
> 
> import com.aerospike.spark.sql.AerospikeConnection
> 
> import org.apache.spark.sql.SparkSession
> ```

## Schema in the Spark Connector

-   Aerospike is schemaless, however Spark adheres to a schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types.
    
-   To infer the schema, the connector samples a set of records (configurable through `aerospike.schema.scan`) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records.
    

::: note
`__key` was not part of provided schema. So how can one query using `__key`? We can just add `__key` in provided schema with appropriate type. Similarly we can add `__gen` or `__ttl` or other columns.\*\*

```plaintext
val schemaWithPK: StructType = new StructType(Array(

            StructField("__key",IntegerType, nullable = false),

            StructField("id", IntegerType, nullable = false),

            StructField("name", StringType, nullable = false),

            StructField("age", IntegerType, nullable = false),

            StructField("salary",IntegerType, nullable = false)))
```
:::
::: note
We recommend that you provide schema for queries that involve [collection data types](https://aerospike.com/docs/develop/data-types/collections) such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.\*\*
:::

## Create sample data and write it into Aerospike Database

```scala
//Create test data

val conf = sc.getConf.clone();

conf.set("aerospike.seedhost" , AS_HOST)

conf.set("aerospike.namespace", "test")

conf.set("aerospike.log.level", "info")

spark.close()

val spark2= SparkSession.builder().config(conf).master("local[2]").getOrCreate()

val num_records=1000

val rand = scala.util.Random

val schema: StructType = new StructType(

    Array(

    StructField("id", IntegerType, nullable = false),

    StructField("name", StringType, nullable = false),

    StructField("age", IntegerType, nullable = false),

    StructField("salary",IntegerType, nullable = false)

  ))

val inputDF = {

    val inputBuf=  new ArrayBuffer[Row]()

    for ( i <- 1 to num_records){

        val name = "name"  + i

        val age = i%100

        val salary = 50000 + rand.nextInt(50000)

        val id = i

        val r = Row(id, name, age,salary)

        inputBuf.append(r)

    }

    val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)

    spark2.createDataFrame(inputRDD,schema)

}

inputDF.show(10)

//Write the Sample Data to Aerospike

inputDF.write.mode(SaveMode.Overwrite)

.format("aerospike") //aerospike specific format

.option("aerospike.writeset", "scala_input_data") //write to this set

.option("aerospike.updateByKey", "id") //indicates which columns should be used for construction of primary key

.option("aerospike.sendKey", "true")

.save()
```

> Output
> 
> ```plaintext
> +---+------+---+------+
> 
> | id|  name|age|salary|
> 
> +---+------+---+------+
> 
> |  1| name1|  1| 71425|
> 
> |  2| name2|  2| 64969|
> 
> |  3| name3|  3| 76504|
> 
> |  4| name4|  4| 86652|
> 
> |  5| name5|  5| 72894|
> 
> |  6| name6|  6| 80305|
> 
> |  7| name7|  7| 68467|
> 
> |  8| name8|  8| 91715|
> 
> |  9| name9|  9| 81021|
> 
> | 10|name10| 10| 85134|
> 
> +---+------+---+------+
> 
> only showing top 10 rows
> 
> conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@43fd6f26
> 
> spark2: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@32d3db7d
> 
> num_records: Int = 1000
> 
> rand: util.Random.type = scala.util.Random$@698c02f8
> 
> schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,false), StructField(age,IntegerType,false), StructField(salary,IntegerType,false))
> 
> inputDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
> ```

### Using Spark SQL syntax

```scala
/*

Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column

using for example .option("aerospike.updateByKey", "id"), where "id" is the name of the column that you'd

like to be the Primary key, while loading data from the DB.

*/

val insertDFWithSchema=spark2

.sqlContext

.read

.format("aerospike")

.schema(schema)

.option("aerospike.set", "scala_input_data")

.load()

val sqlView="inserttable"

insertDFWithSchema.createOrReplaceTempView(sqlView)

//

//V2 datasource doesn't allow insert into a view.

//

spark2.sql(s"select * from $sqlView").show()
```

> Output
> 
> ```plaintext
> +---+-------+---+------+
> 
> | id|   name|age|salary|
> 
> +---+-------+---+------+
> 
> |132|name132| 32| 54949|
> 
> |647|name647| 47| 96580|
> 
> | 45| name45| 45| 82480|
> 
> |558|name558| 58| 91583|
> 
> |608|name608|  8| 83286|
> 
> |687|name687| 87| 96887|
> 
> |372|name372| 72| 64562|
> 
> |335|name335| 35| 59378|
> 
> |911|name911| 11| 93950|
> 
> |352|name352| 52| 57325|
> 
> | 94| name94| 94| 70982|
> 
> |890|name890| 90| 55053|
> 
> |334|name334| 34| 88603|
> 
> |907|name907|  7| 87233|
> 
> |148|name148| 48| 62904|
> 
> |315|name315| 15| 64200|
> 
> |163|name163| 63| 62598|
> 
> |882|name882| 82| 63215|
> 
> |602|name602|  2| 73376|
> 
> |673|name673| 73| 98636|
> 
> +---+-------+---+------+
> 
> only showing top 20 rows
> 
> insertDFWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
> 
> sqlView: String = inserttable
> ```

## Use connector schema inference to load data into a DataFrame without specifying any schema

```scala
// Create a Spark DataFrame by using the Connector Schema inference mechanism

val loadedDFWithoutSchema=spark2

.sqlContext

.read

.format("aerospike")

.option("aerospike.set", "scala_input_data") //read the data from this set

.load

loadedDFWithoutSchema.printSchema()

//Notice that schema of loaded data has some additional fields.

// When connector infers schema, it also adds internal metadata.
```

> Output
> 
> ```plaintext
> root
> 
>  |-- __key: string (nullable = true)
> 
>  |-- __digest: binary (nullable = true)
> 
>  |-- __expiry: integer (nullable = false)
> 
>  |-- __generation: integer (nullable = false)
> 
>  |-- __ttl: integer (nullable = false)
> 
>  |-- age: long (nullable = true)
> 
>  |-- name: string (nullable = true)
> 
>  |-- salary: long (nullable = true)
> 
>  |-- id: long (nullable = true)
> 
> loadedDFWithoutSchema: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
> ```

## Load data into a DataFrame with user specified schema

```scala
//Data can be loaded with known schema as well.

val loadedDFWithSchema=spark2

.sqlContext

.read

.format("aerospike")

.schema(schema)

.option("aerospike.set", "scala_input_data").load

loadedDFWithSchema.show(5)
```

> Output
> 
> ```plaintext
> +---+-------+---+------+
> 
> | id|   name|age|salary|
> 
> +---+-------+---+------+
> 
> |132|name132| 32| 54949|
> 
> |647|name647| 47| 96580|
> 
> | 45| name45| 45| 82480|
> 
> |558|name558| 58| 91583|
> 
> |608|name608|  8| 83286|
> 
> +---+-------+---+------+
> 
> only showing top 5 rows
> 
> loadedDFWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
> ```

## Writing Sample Collection Data Types (CDT) data into Aerospike

```scala
val complex_data_json="resources/nested_data.json"

val alias=  StructType(List(

    StructField("first_name",StringType, false),

    StructField("last_name",StringType, false)))

  val name= StructType(List(

    StructField("first_name",StringType, false),

    StructField("aliases",ArrayType(alias), false )

  ))

  val street_adress= StructType(List(

    StructField("street_name", StringType, false),

    StructField("apt_number" , IntegerType, false)))

  val address = StructType( List(

    StructField ("zip" , LongType, false),

    StructField("street", street_adress, false),

    StructField("city", StringType, false)))

  val workHistory = StructType(List(

    StructField ("company_name" , StringType, false),

    StructField( "company_address" , address, false),

    StructField("worked_from", StringType, false)))

  val person=  StructType ( List(

    StructField("name" , name, false, Metadata.empty),

    StructField("SSN", StringType, false,Metadata.empty),

    StructField("home_address", ArrayType(address), false),

    StructField("work_history", ArrayType(workHistory), false)))

val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json)

cmplx_data_with_schema.printSchema()

cmplx_data_with_schema.write.mode(SaveMode.Overwrite)

.format("aerospike") //aerospike specific format

.option("aerospike.seedhost", AS_HOST) //db hostname, can be added multiple hosts, delimited with ":"

.option("aerospike.namespace", "test") //use this namespace

.option("aerospike.writeset", "scala_complex_input_data") //write to this set

.option("aerospike.updateByKey", "name.first_name") //indicates which columns should be used for construction of primary key

.save()
```

> Output
> 
> ```plaintext
> root
> 
>  |-- name: struct (nullable = true)
> 
>  |    |-- first_name: string (nullable = true)
> 
>  |    |-- aliases: array (nullable = true)
> 
>  |    |    |-- element: struct (containsNull = true)
> 
>  |    |    |    |-- first_name: string (nullable = true)
> 
>  |    |    |    |-- last_name: string (nullable = true)
> 
>  |-- SSN: string (nullable = true)
> 
>  |-- home_address: array (nullable = true)
> 
>  |    |-- element: struct (containsNull = true)
> 
>  |    |    |-- zip: long (nullable = true)
> 
>  |    |    |-- street: struct (nullable = true)
> 
>  |    |    |    |-- street_name: string (nullable = true)
> 
>  |    |    |    |-- apt_number: integer (nullable = true)
> 
>  |    |    |-- city: string (nullable = true)
> 
>  |-- work_history: array (nullable = true)
> 
>  |    |-- element: struct (containsNull = true)
> 
>  |    |    |-- company_name: string (nullable = true)
> 
>  |    |    |-- company_address: struct (nullable = true)
> 
>  |    |    |    |-- zip: long (nullable = true)
> 
>  |    |    |    |-- street: struct (nullable = true)
> 
>  |    |    |    |    |-- street_name: string (nullable = true)
> 
>  |    |    |    |    |-- apt_number: integer (nullable = true)
> 
>  |    |    |    |-- city: string (nullable = true)
> 
>  |    |    |-- worked_from: string (nullable = true)
> 
> complex_data_json: String = resources/nested_data.json
> 
> alias: org.apache.spark.sql.types.StructType = StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false))
> 
> name: org.apache.spark.sql.types.StructType = StructType(StructField(first_name,StringType,false), StructField(aliases,ArrayType(StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false)),true),false))
> 
> street_adress: org.apache.spark.sql.types.StructType = StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false))
> 
> address: org.apache.spark.sql.types.StructType = StructType(StructField(zip,LongType,false), StructField(street,StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false)),fal...
> ```

## Load Complex Data Types (CDT) into a DataFrame with user specified schema

```scala
val loadedComplexDFWithSchema=spark2

.sqlContext

.read

.format("aerospike")

.option("aerospike.set", "scala_complex_input_data") //read the data from this set

.schema(person)

.load

loadedComplexDFWithSchema.show(2)

loadedComplexDFWithSchema.printSchema()

loadedComplexDFWithSchema.cache()

//Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types.
```

> Output
> 
> ```plaintext
> +--------------------+-----------+--------------------+--------------------+
> 
> |                name|        SSN|        home_address|        work_history|
> 
> +--------------------+-----------+--------------------+--------------------+
> 
> |[Kurt, [[Jacob, R...|533-07-8760|[[37634, [Hammond...|[[Atkins Group, [...|
> 
> |[Jamie, [[Patrici...|569-31-4715|[[53379, [James I...|[[Brown, Miller a...|
> 
> +--------------------+-----------+--------------------+--------------------+
> 
> only showing top 2 rows
> 
> root
> 
>  |-- name: struct (nullable = false)
> 
>  |    |-- first_name: string (nullable = false)
> 
>  |    |-- aliases: array (nullable = false)
> 
>  |    |    |-- element: struct (containsNull = true)
> 
>  |    |    |    |-- first_name: string (nullable = false)
> 
>  |    |    |    |-- last_name: string (nullable = false)
> 
>  |-- SSN: string (nullable = false)
> 
>  |-- home_address: array (nullable = false)
> 
>  |    |-- element: struct (containsNull = true)
> 
>  |    |    |-- zip: long (nullable = false)
> 
>  |    |    |-- street: struct (nullable = false)
> 
>  |    |    |    |-- street_name: string (nullable = false)
> 
>  |    |    |    |-- apt_number: integer (nullable = false)
> 
>  |    |    |-- city: string (nullable = false)
> 
>  |-- work_history: array (nullable = false)
> 
>  |    |-- element: struct (containsNull = true)
> 
>  |    |    |-- company_name: string (nullable = false)
> 
>  |    |    |-- company_address: struct (nullable = false)
> 
>  |    |    |    |-- zip: long (nullable = false)
> 
>  |    |    |    |-- street: struct (nullable = false)
> 
>  |    |    |    |    |-- street_name: string (nullable = false)
> 
>  |    |    |    |    |-- apt_number: integer (nullable = false)
> 
>  |    |    |    |-- city: string (nullable = false)
> 
>  |    |    |-- worked_from: string (nullable = false)
> 
> loadedComplexDFWithSchema: org.apache.spark.sql.DataFrame = [name: struct<first_name: string, aliases: array<struct<first_name:string,last_name:string>>>, SSN: string ... 2 more fields]
> 
> res5: loadedComplexDFWithSchema.type = [name: struct<first_name: string, aliases: array<struct<first_name:string,last_name:string>>>, SSN: string ... 2 more fields]
> ```

## Querying Aerospike Data using SparkSQL

### Things to keep in mind

1.  Queries that involve Primary Key or Digest in the predicate trigger [aerospike\_batch\_get()](https://aerospike.com/docs/develop/client/java/usage/multi/batch/) and run extremely quickly. For example, a query containing `__key` or `__digest` with no `OR` between two bins.
2.  All other queries may entail a full scan of the Aerospike DB if they can’t be converted to Aerospike batchget.

## Queries that include Primary Key in the Predicate

In case of batchget queries we can also apply filters upon metadata columns like `__gen` or `__ttl`. To do so, these columns should be exposed through schema (if schema provided).

```scala
val batchGet1= spark2.sqlContext

.read

.format("aerospike")

.option("aerospike.set", "scala_input_data")

.option("aerospike.keyType", "int") //used to hint primary key(PK) type when schema is not provided.

.load.where("__key = 829")

batchGet1.show()

//Please be aware Aerospike database supports only equality test with PKs in primary key query.

//So, a where clause with "__key >10", would result in scan query!
```

> Output
> 
> ```plaintext
> +-----+--------------------+--------+------------+-----+---+-------+------+---+
> 
> |__key|            __digest|__expiry|__generation|__ttl|age|   name|salary| id|
> 
> +-----+--------------------+--------+------------+-----+---+-------+------+---+
> 
> |  829|[C0 B6 C4 DE 68 D...|       0|           1|   -1| 29|name829| 92238|829|
> 
> +-----+--------------------+--------+------------+-----+---+-------+------+---+
> 
> batchGet1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]
> ```

```scala
//In this query we are doing *OR* between PK subqueries

val somePrimaryKeys= 1.to(10).toSeq

val someMoreKeys= 12.to(14).toSeq

val batchGet2= spark2.sqlContext

.read

.format("aerospike")

.option("aerospike.set", "scala_input_data")

.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.

.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("__key") isin (someMoreKeys:_*) ))

batchGet2.show(15)

//We should got in total 13 records.
```

> Output
> 
> ```plaintext
> +-----+--------------------+--------+------------+-----+---+------+------+---+
> 
> |__key|            __digest|__expiry|__generation|__ttl|age|  name|salary| id|
> 
> +-----+--------------------+--------+------------+-----+---+------+------+---+
> 
> |    1|[89 31 AB FE 54 D...|       0|           1|   -1|  1| name1| 71425|  1|
> 
> |    4|[93 F1 65 F0 E8 9...|       0|           1|   -1|  4| name4| 86652|  4|
> 
> |    3|[D4 A1 0B A5 12 0...|       0|           1|   -1|  3| name3| 76504|  3|
> 
> |    7|[30 94 D4 E7 9E 8...|       0|           1|   -1|  7| name7| 68467|  7|
> 
> |    5|[3E F5 94 A9 3A A...|       0|           1|   -1|  5| name5| 72894|  5|
> 
> |   14|[06 66 ED 38 08 F...|       0|           1|   -1| 14|name14| 53533| 14|
> 
> |   13|[EA 78 AB 39 FC C...|       0|           1|   -1| 13|name13| 98475| 13|
> 
> |    2|[41 DB A8 23 03 4...|       0|           1|   -1|  2| name2| 64969|  2|
> 
> |    8|[60 AB E7 17 C8 5...|       0|           1|   -1|  8| name8| 91715|  8|
> 
> |    9|[1B 6D CD D8 D2 5...|       0|           1|   -1|  9| name9| 81021|  9|
> 
> |    6|[C2 4D 37 CC 2B 2...|       0|           1|   -1|  6| name6| 80305|  6|
> 
> |   12|[F8 4E EC 27 8F 1...|       0|           1|   -1| 12|name12| 80583| 12|
> 
> |   10|[8D 0F 84 CD B0 7...|       0|           1|   -1| 10|name10| 85134| 10|
> 
> +-----+--------------------+--------+------------+-----+---+------+------+---+
> 
> somePrimaryKeys: scala.collection.immutable.Range = Range 1 to 10
> 
> someMoreKeys: scala.collection.immutable.Range = Range 12 to 14
> 
> batchGet2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]
> ```

## Queries that do not include Primary Key in the Predicate

```scala
val somePrimaryKeys= 1.to(10).toSeq

val scanQuery1= spark2.sqlContext

.read

.format("aerospike")

.option("aerospike.set", "scala_input_data")

.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.

.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("age") >50 ))

scanQuery1.show()

//Since there is OR between PKs and Bin. It will be treated as Scan query.

//Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored.
```

> Output
> 
> ```plaintext
> +-----+--------------------+--------+------------+-----+---+-------+------+---+
> 
> |__key|            __digest|__expiry|__generation|__ttl|age|   name|salary| id|
> 
> +-----+--------------------+--------+------------+-----+---+-------+------+---+
> 
> |  558|[14 80 A2 9D D2 E...|       0|           1|   -1| 58|name558| 91583|558|
> 
> |  687|[1A 30 21 88 39 A...|       0|           1|   -1| 87|name687| 96887|687|
> 
> |  372|[1B 40 51 DD 64 F...|       0|           1|   -1| 72|name372| 64562|372|
> 
> |  352|[23 A0 99 06 1F 7...|       0|           1|   -1| 52|name352| 57325|352|
> 
> |   94|[26 E0 C4 85 CE 9...|       0|           1|   -1| 94| name94| 70982| 94|
> 
> |  890|[26 30 F7 1A D3 A...|       0|           1|   -1| 90|name890| 55053|890|
> 
> |  163|[3E D0 72 42 15 9...|       0|           1|   -1| 63|name163| 62598|163|
> 
> |  882|[3E C0 28 CE F2 5...|       0|           1|   -1| 82|name882| 63215|882|
> 
> |  673|[45 10 C1 D6 80 3...|       0|           1|   -1| 73|name673| 98636|673|
> 
> |  991|[47 A0 D4 EC 12 1...|       0|           1|   -1| 91|name991| 92557|991|
> 
> |  293|[48 40 20 B0 E6 D...|       0|           1|   -1| 93|name293| 82140|293|
> 
> |  679|[57 80 24 4F 1D 3...|       0|           1|   -1| 79|name679| 66054|679|
> 
> |  153|[5D E0 05 75 BF 3...|       0|           1|   -1| 53|name153| 79642|153|
> 
> |  485|[6B 80 7E E1 A4 5...|       0|           1|   -1| 85|name485| 54741|485|
> 
> |  997|[72 10 81 9D E2 E...|       0|           1|   -1| 97|name997| 80278|997|
> 
> |  482|[85 B0 B1 3F 49 A...|       0|           1|   -1| 82|name482| 79900|482|
> 
> |  166|[8A 00 3E 64 19 D...|       0|           1|   -1| 66|name166| 70542|166|
> 
> |  590|[8C 20 A4 28 BE 7...|       0|           1|   -1| 90|name590| 67077|590|
> 
> |  689|[9B 00 70 22 F0 8...|       0|           1|   -1| 89|name689| 93761|689|
> 
> |  895|[9D A0 9D 91 AE 8...|       0|           1|   -1| 95|name895| 76241|895|
> 
> +-----+--------------------+--------+------------+-----+---+-------+------+---+
> 
> only showing top 20 rows
> 
> somePrimaryKeys: scala.collection.immutable.Range = Range 1 to 10
> 
> scanQuery1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]
> ```

## Sampling from Aerospike DB

-   Sample specified number of records from Aerospike to considerably reduce data movement between Aerospike and the Spark clusters. Depending on the aerospike.partition.factor setting, you may get more records than desired. Please use this property in conjunction with Spark `limit()` function to get the specified number of records. The sample read is not randomized, so sample more than you need and use the Spark `sample()` function to randomize if you see fit. You can use it in conjunction with `aerospike.recordspersecond` to control the load on the Aerospike server while sampling.
    
-   For more information, please see [documentation](https://aerospike.com/docs/connectors/spark/configuration) page.
    

```scala
//number_of_spark_partitions (num_sp)=2^{aerospike.partition.factor}

//total number of records = Math.ceil((float)aerospike.sample.size/num_sp) * (num_sp)

//use lower partition factor for more accurate sampling

val setname="scala_input_data"

val sample_size=101

val df3=spark2.read.format("aerospike")

.option("aerospike.partition.factor","2")

.option("aerospike.set",setname)

.option("aerospike.sample.size","101") //allows to sample approximately spacific number of record.

.load()

val df4=spark2.read.format("aerospike")

.option("aerospike.partition.factor","6")

.option("aerospike.set",setname)

.option("aerospike.sample.size","101") //allows to sample approximately spacific number of record.

.load()

//Notice that more records were read than requested due to the underlying partitioning logic related to the partition factor as described earlier, hence we use Spark limit() function additionally to return the desired number of records.

val count3=df3.count()

val count4=df4.count()

//Note how limit got only 101 record from df4 which have 128 records.

val dfWithLimit=df4.limit(101)

val limitCount=dfWithLimit.count()
```

> Output
> 
> ```plaintext
> setname: String = scala_input_data
> 
> sample_size: Int = 101
> 
> df3: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
> 
> df4: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
> 
> count3: Long = 104
> 
> count4: Long = 128
> 
> dfWithLimit: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: string, __digest: binary ... 7 more fields]
> 
> limitCount: Long = 101
> ```