# Key lookup with aerolookup

`aerolookup` allows rapid lookups on records that correspond to a set of keys that are loaded into a Spark DataFrame (DF), either streaming or otherwise. It is easy to use and is also available in the Python, Scala, and Java client libraries. There are also performance gains attributed to Aerospike’s internal batch-read calls.

Follow the tutorial [Reading from and Writing to DataFrames](https://aerospike.com/docs/connectors/spark/tutorials/readwrite/) to load the records of an Aerospike set into a Spark DataFrame.

```plaintext
/***

* aerolookup API uses the contents of a spark dataFrame column to perform large scale lookup

* in the Aerospike database. Columns specified in `outputSchema` are used to retrieve relevant

* bins from the Aerospike database and construct columns of the resultant Dataframe. All properties

* related to batchget read can be passed as `inputConf`. We merge the contents of `inputConf` with

* Aerospike properties present in dataFrame's Spark session runtime properties, to populate Aerospike

* specific properties for read operation. Aerospike specific properties such as `aerospike.seedhost` or

* `aerospike.transaction.rate` can be passed through this map.  To set

* metacolumns like `aerospike.keyColumn` or `aerospike.digestColumn`, they must be present in `outputSchema`

* and should be passed in `inputConf`. This is necessary because the connector needs to know

* which columns in the output schema should be set with the metadata values retrieved from database.

*/

@param dataFrame - spark dataframe can be streaming or static.

@param keyCol - column of the dataframe used to construct primary key for the lookup.

@param set - set name in the database.

@param outputSchema - schema of the resultant dataframe. This schema populates

fields in the resultant dataframe.

@param namespace - namespace of the `set`.

@param inputConf - map of properties. These properties are merged with aerospike properties

extracted from spark runtime config.

@return Dataframe - resultant dataframe after aerolookup.

//Scala

aerolookup(dataFrame: DataFrame,

  keyCol: String,

  set: String,

  outputSchema: StructType,

  namespace: String): DataFrame,

  inputConf: Map[String, Any] = Map[String, Any]()
```

In the Python version of the API, outputSchema is the **JSON schema** of the desired DataFrame. The rest of the parameters have the same type as the aforementioned Scala API.

The following example illustrates how to invoke `aerolookup` in Python applications.

```python
#Python

#sc is SparkContext, spark is SparkSession object

#import the package

scala_object= sc._jvm.com.aerospike.spark.PythonUtil

#get java object representing the dataframe

gateway_df=scala_object.aerolookup(dataframe._jdf, keyCol, set, outputSchemaAsJson, namespace, {})

#convert the java object to python dataframe

aerolookup_df=pyspark.sql.DataFrame(gateway_df, spark._wrapped)
```

Here’s an example of how to use the `aerolookup` function. First, we save some sample data in the Aerospike database:

```plaintext
import com.aerospike.spark._

val setName= "aerolookup"

val writeSchema= StructType(Seq(

  StructField("col1", IntegerType, false),

  StructField("col2", StringType, false),

  StructField("col3", LongType, false)

))

val inputDF = {

  val inputBuf=  new ArrayBuffer[Row]()

  for (i <- 1 to 10000){

    val col1 = i

    val col2 =  i.toString

    val r = Row(col1,col2, 2*i.toLong)

    inputBuf.append(r)

  }

  val inputRDD = sc.parallelize(inputBuf)

  session.createDataFrame(inputRDD,writeSchema)

}

inputDF.write

  .mode(SaveMode.Append)

  .format("aerospike")

  .option("aerospike.writeset", setName)

  .option("aerospike.updateByKey", "col1")

  .save()
```

Next, we look up records in the namespace `test` and set `aerolookup` by using DataFrame `dfInSparkMemory` column `col1`.

```plaintext
//schema of the output dataframe

val outputSchema=  StructType(Seq(

  StructField("col2", StringType, false),

  StructField("col3", LongType, false)))

import spark.implicits._

val keys = 1 to 1000

val dfInSparkMemory: DataFrame = keys.toDF("col1")

val resultantDF=aerolookup(dfInSparkMemory,"col1", setName,outputSchema, "test")
```

You can also set other read related flags such as `aerospike.pushdown.expressions`, `aerospike.transaction.rate` and `aerospike.schema.flexible` as key-value pairs in the SparkConf object so that they apply to the `aerolookup` API.

### Stream processing using aerolookup

```plaintext
spark.conf.set("aerospike.seedhost", "127.0.0.1")

      val simpleschema: StructType = new StructType(

        Array(

          StructField("id", IntegerType, nullable = false),

          StructField("rate_code_id", LongType, nullable = true)

        ))

      //streaming source

      val streamreader = spark

        .readStream

        .schema(simpleschema)

        .format(allParams.getOrElse("readformat", "csv"))

        .load("s3-bucket-receiving-streaming-data")

      //stream processing using aerolookup

      val streamWriter =  streamreader.writeStream.foreachBatch((outputDf: DataFrame, bid: Long) => {

        val lookupDF = outputDf.select("id", "rate_code_id").where("id is not null")

          .groupBy("id").agg(sum("rate_code_id").alias("rate_code_id"))

        val aerospikeBatchReadDf = aerolookup(

        lookupDF, // streaming dataframe

        "id", // key column of the streaming dataframe to be used for lookup into db

        "nyc", //set name

        simpleschema, //schema of the output dataframe

        "test", //namespace

        //config map, note aerospike.keyColumn needed to be set if you want

        //to use lookup key data present in output dataframe

          Map("aerospike.log.level" -> "info", "aerospike.timeout"-> 864000,

          "aerospike.sockettimeout" -> 864000, "aerospike.keyColumn"-> "id")

         )

       //do some processing

        val lookupRdd = lookupDF.rdd.map(row => {

          if (Option(row.get(0)).isEmpty || Option(row.get(1)).isEmpty) {

            logWarning(s"lookupDF  row with null : ${row.get(0)} row 2: ${row.get(1)}")

          }

          (row.getInt(0), row.getLong(1))

        })

        val aerospikeBatchReadRDD = aerospikeBatchReadDf.rdd

        val aerospikeRdd = aerospikeBatchReadRDD.map(row => {

           if(Option(row.get(0)).isEmpty ||  Option(row.get(1)).isEmpty){

             logWarning(s"aerospikeRdd  row with null : ${row.get(0)} row 2: ${row.get(1)}")

           }

          (row.getInt(0), row.getLong(1))

        })

        val finalRdd = lookupRdd

          .fullOuterJoin(aerospikeRdd)

          .map(row => {

            def getIfNotNull(data: Option[Long]): Long = {

              if (data != None && data != null) {

                val d = data.asInstanceOf[Number].intValue()

                d

              } else {

                0

              }

            }

            val key = row._1

            val impressions_1: Long  = row._2._1.getOrElse(0L)

            val impressions_2 : Long = row._2._2.getOrElse(0L)

            val impressions = impressions_1 + impressions_2

            Row(key, impressions)

          })

        //write the processed data to the DB

        val aerospikeWriteDf = spark.createDataFrame(finalRdd, simpleschema)

        aerospikeWriteDf.write.mode(SaveMode.Overwrite)

          .format("aerospike")

          .option("aerospike.writeset", "aerospike_spark_agg")

          .option("aerospike.updateByKey", "id")

          .option("aerospike.sendKey", "true")

          .save()

      }).start()
```