# Model Serving with Aerospike Feature Store

#### For an interactive Jupyter notebook experience [ ![Binder Hub](https://static.mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/aerospike-examples/interactive-notebooks/main?filepath=spark/feature-store-model-serving.ipynb)

This notebook is the third in the series of notebooks that show how Aerospike can be used as a feature store.

This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks).

## Introduction

This notebook shows how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using Aerospike Spark Connector. It is Part 3 of the Feature Store series of notebooks, and focuses on Model Serving aspects concerning a Feature Store. The first two notebooks in the series discuss [Feature Engineering](https://aerospike.com/docs/develop/tutorials/applications/feature-store/feature-engineering) and [Model Training](https://aerospike.com/docs/develop/tutorials/applications/feature-store/model-training).

 ![Reference Architecture](https://aerospike.com/docs/_astro/fs-arch.B9xQVJC0_2pM3a4.jpg)

This notebook is organized as follows:

-   Summary of the prior (Data Engineering and Model Training) notebooks
-   Load the trained and saved model for making a prediction.
-   Use Aerospike API to retrieve precomputed features.
-   Implement and test a web service that combines the above elements, that is, accesses features, runs the model and returns the prediction.

## Prerequisites

This tutorial assumes familiarity with the following topics:

-   [Aerospike Notebooks - Readme and Tips](https://aerospike.com/docs/develop/tutorials/intro/readme-tips)
-   [Hello World](https://aerospike.com/docs/develop/tutorials/intro/hello-world-python)
-   [Feature Store with Aerospike (Part 1)](https://aerospike.com/docs/develop/tutorials/applications/feature-store/feature-engineering)
-   [Model Training with Aerospike Feature Store (Part 2)](https://aerospike.com/docs/develop/tutorials/applications/feature-store/model-training)

## Setup

Set up Aerospike Server. Spark Server, and Spark Connector.

### Ensure Database Is Running

This notebook requires that Aerospike database is running.

```python
!asd >& /dev/null

!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
```

Output

```plaintext
Aerospike database is running!
```

### Initialize Client

Initialize Python Client used to access features stored in the Aerospike feature store.

```python
import aerospike

import sys

# connect to the database

config = {

  'hosts': [ ('127.0.0.1', 3000) ]

}

try:

  client = aerospike.client(config).connect()

except:

  print("failed to connect to the cluster with", config['hosts'])

  sys.exit(1)

print('Client initialized and connected to database')
```

Output

```plaintext
Client initialized and connected to database
```

### Initialize Spark

We will be using Spark functionality in this notebook.

#### Initialize Paths and Env Variables

```python
# directory where spark notebook requisites are installed

#SPARK_NB_DIR = '/home/jovyan/notebooks/spark'

SPARK_NB_DIR = '/opt/spark-nb'

SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
```

```python
# IP Address or DNS name for one host in your Aerospike cluster

AS_HOST ="localhost"

# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure

AS_NAMESPACE = "test"

AEROSPIKE_SPARK_JAR_VERSION="3.2.0"

AS_PORT = 3000 # Usually 3000, but change here if not

AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
```

```python
# Next we locate the Spark installation using the SPARK_HOME environment variable that you have set

import findspark

findspark.init(SPARK_HOME)
```

```python
# Aerospike Spark Connector related settings

import os

AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"

os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + SPARK_NB_DIR + '/' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
```

#### Configure Spark Session

Please visit [Configuring Aerospike Connect for Spark](https://aerospike.com/docs/connectors/spark/configuration) for more information about the properties used on this page.

```python
# imports

import pyspark

from pyspark.context import SparkContext

from pyspark.sql.context import SQLContext

from pyspark.sql.session import SparkSession

from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
```

```python
sc = SparkContext.getOrCreate()

conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])

sc.stop()

sc = pyspark.SparkContext(conf=conf)

spark = SparkSession(sc)

sqlContext = SQLContext(sc)
```

### Access Shell Commands

You may execute shell commands including Aerospike tools like [aql](https://aerospike.com/docs/database/tools/aql) and [asadm](https://aerospike.com/docs/database/tools/asadm) in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.

## Prior Context

In the prior two notebooks on Feature Engineering and Model Training, we saved feature data to the feature store, and trained an ML model on it, respectively. If the saved feature data or the model is not available in this environment, you can run the following cells to recreate them.

### Feature Data

Run the following cells ONLY IF the database does not have the feature data for credit card transactions from the prior notebooks (Part 1 or Part 2). You must convert them to `Code` cells before you can run them.

```python
# read and transform the sample credit card transactions data from a csv file

from pyspark.sql.functions import expr

df = spark.read.options(header="True", inferSchema="True") \

    .csv("resources/creditcard_small.csv") \

    . orderBy(['_c0'], ascending=[True])

new_col_names = ['CC1_' + (c if c != '_c0' else 'OldIdx') for c in df.columns]

df = df.toDF(*new_col_names) \

        .withColumn('TxnId', expr('CC1_OldIdx+1').cast(StringType())) \

        .select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])

#df.toPandas().head()# Save feature values in entity records

ENTITY_TYPE = 'cctxn'

ID_COLUMN = 'TxnId'

df.write \

    .mode('overwrite') \

    .format("aerospike")  \

    .option("aerospike.writeset", ENTITY_TYPE+'-features')\

    .option("aerospike.updateByKey", ID_COLUMN) \

    .save()

print('Features stored to Feature Store.')
```

### Trained Model

If the saved model `fs_model_rf` is not in the resources directory, first extract it from the archive using the following commands in the terminal tab:

Terminal window

```bash
cd /home/jovyan/notebooks/spark/resources

tar -xvzf fs_model_rf.tar.gz
```

## Components of Web Service

The main components of the web service are:

1.  Loading the model
    
2.  Retrieving the required features
    
3.  Running the model to make a prediction
    

### Loading Model

Load the RandomForestClassifier model that we saved in the Model Training notebook (Part 2).

```python
from pyspark.ml.classification import RandomForestClassificationModel

rf_model = RandomForestClassificationModel.read().load("resources/fs_model_rf")

print("Loaded Random Forest Classification model.")
```

Output

```plaintext
Loaded Random Forest Classification model.
```

### Retrieving Features

The Python Client provides a convenient API to access specific features from the entity set as shown below. Recall, the model uses features CC1\_V1 through CC1-V28. We also need to construct a schema for the dataframe which is needed to run the model.

```python
namespace = 'test'

entity_set = 'cctxn-features'

txnid = '5' # dummy value, the web service will get the id from the request params

record_key = (namespace, entity_set, txnid)

features = ["CC1_V"+str(i) for i in range(1,29)] # need features CC1_V1-CC1_V28

schema = StructType()

for i in range(1,29): # all features are of type float or Double

    schema.add("CC1_V"+str(i), DoubleType(), True)

# get the needed features

try:

    (key, meta, bins) = client.select(record_key, features)

except:

  print('failed to get record')

  sys.exit(1)

# create an input dataframe for the model

featureBuf = [tuple([bins[f] for f in features])]

featureRDD = spark.sparkContext.parallelize(featureBuf)

featureDF = spark.createDataFrame(featureRDD, schema)

featureDF.toPandas().transpose()
```

Output

|  | 0 |
| --- | --- |
| CC1\_V1 | \-1.158233 |
| CC1\_V2 | 0.877737 |
| CC1\_V3 | 1.548718 |
| CC1\_V4 | 0.403034 |
| CC1\_V5 | \-0.407193 |
| CC1\_V6 | 0.095921 |
| CC1\_V7 | 0.592941 |
| CC1\_V8 | \-0.270533 |
| CC1\_V9 | 0.817739 |
| CC1\_V10 | 0.753074 |
| CC1\_V11 | \-0.822843 |
| CC1\_V12 | 0.538196 |
| CC1\_V13 | 1.345852 |
| CC1\_V14 | \-1.119670 |
| CC1\_V15 | 0.175121 |
| CC1\_V16 | \-0.451449 |
| CC1\_V17 | \-0.237033 |
| CC1\_V18 | \-0.038195 |
| CC1\_V19 | 0.803487 |
| CC1\_V20 | 0.408542 |
| CC1\_V21 | \-0.009431 |
| CC1\_V22 | 0.798278 |
| CC1\_V23 | \-0.137458 |
| CC1\_V24 | 0.141267 |
| CC1\_V25 | \-0.206010 |
| CC1\_V26 | 0.502292 |
| CC1\_V27 | 0.219422 |
| CC1\_V28 | 0.215153 |

### Running Model

#### Construct Feature Vector

We first construct a feature vector from the input features as required by the model interface. The model only uses fvector column created by VectorAssembler.

```python
from pyspark.ml.feature import VectorAssembler

# create a feature vector from features

assembler = VectorAssembler(inputCols=features, outputCol="fvector")

featureVectorDF = assembler.transform(featureDF)

featureVectorDF.toPandas().transpose()
```

Output

|  | 0 |
| --- | --- |
| CC1\_V1 | \-1.158233 |
| CC1\_V2 | 0.877737 |
| CC1\_V3 | 1.548718 |
| CC1\_V4 | 0.403034 |
| CC1\_V5 | \-0.407193 |
| CC1\_V6 | 0.095921 |
| CC1\_V7 | 0.592941 |
| CC1\_V8 | \-0.270533 |
| CC1\_V9 | 0.817739 |
| CC1\_V10 | 0.753074 |
| CC1\_V11 | \-0.822843 |
| CC1\_V12 | 0.538196 |
| CC1\_V13 | 1.345852 |
| CC1\_V14 | \-1.11967 |
| CC1\_V15 | 0.175121 |
| CC1\_V16 | \-0.451449 |
| CC1\_V17 | \-0.237033 |
| CC1\_V18 | \-0.038195 |
| CC1\_V19 | 0.803487 |
| CC1\_V20 | 0.408542 |
| CC1\_V21 | \-0.009431 |
| CC1\_V22 | 0.798278 |
| CC1\_V23 | \-0.137458 |
| CC1\_V24 | 0.141267 |
| CC1\_V25 | \-0.20601 |
| CC1\_V26 | 0.502292 |
| CC1\_V27 | 0.219422 |
| CC1\_V28 | 0.215153 |
| fvector | \[-1.15823309349523, 0.877736754848451, 1.54871… |

#### Predict

Call the model’s transform function to predict. We input only a dataframe with `fvector` column, and use only two columns from the prediction dataframe record: `probablity` and `prediction`. The threshold for fraud/no-fraud decision is 50%.

```python
rf_prediction = rf_model.transform(featureVectorDF['fvector',])

result = rf_prediction['probability', 'prediction'].collect()[0]

print('normal txn prob: ', result[0][0])

print('fraud prob: ', result[0][1])

print('prediction: ', 'no fraud' if result[0][1] < 0.5  else 'fraud')
```

Output

```plaintext
normal txn prob:  0.9361028509004404

fraud prob:  0.0638971490995595

prediction:  no fraud
```

## Model Serving with Web Service

Let’s create a simple web service that serves the model. We will use the Flask framework to create the web service. The web service takes txnid as the query parameter, retrieves the features from the feature store, runs the model, and returns the prediction.

Note, this model serving example is not realistic as we are using only precomputed features for inference. Also, we have trained and tested the model with the same data. Nonetheless, the example serves the purpose which is to illustrate the use of a feature store for model serving. It should not be difficult to use the patterns shown here to devise a realistic example.

```python
# stop the existing spark session before starting the web service

spark.stop()
```

#### Install Web Service Framework

Open a terminal tab, and install the Flask framework with the following command.

Terminal window

```bash
pip install flask

pip install flask_restful
```

#### Examine Web Service File

First open the file `resources/fs-model-ws.py` that implements the web service using Flask framework, and examine its contents.

Note that it is mostly the code in the above cells organized to run as a Flask web service. You can learn more about Flask [here](https://flask.palletsprojects.com/en/2.0.x/quickstart/#a-minimal-application).

#### Run Web Service

Run the web service by opening a terminal tab and running the following commands in it:

1.  `cd /home/jovyan/notebooks/spark/resources`
    
2.  `python fs-model-ws.py`
    

You can ignore the warning messages. After the “Debugger is active” message, the service is ready to receive requests.

#### Send Requests to Web Service

Let’s call the web service to predict the outcome for a transaction id.

We can submit requests through the `curl` command as in the following example. You can test with a few normal transactions (ids: 1, 2, 3, 10), and a few fraud transactions (ids: 6337, 120506, 150669).

You can query the database to view other fraud and normal transaction ids. As you may recall, TxnId and CC1\_Class are the bins for the transaction id and label respectively.

```python
# Send a request to the model web service running at 127.0.0.1:5000

!curl http://127.0.0.1:5000/?txnid=1
```

Output

```plaintext
{

  "fraud_prob": 0.052470997597310345,

  "normal_prob": 0.9475290024026897,

  "prediction": "no fraud"

}
```

```python
# You can query a transaction in the database.

# remember CC1_Class is the label with 1 indicating a fraudulent transaction

!aql -c "select TxnId, CC1_Class from test.cctxn-features where PK = '6337'"
```

Output

```plaintext
select TxnId, CC1_Class from test.cctxn-features where PK = '6337'

+--------+------------+

| TxnId  | CC1_Class  |

+--------+------------+

| "6337" | 1          |

+--------+------------+

1 row in set (0.000 secs)

OK
```

## Takeaways and Conclusion

In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how the precomputed features stored in the Aerospike feature store can be used at model serving time. We implemented a simple web service that loads the trained model, and then for each request, retrieves features, runs the model, and returns the model prediction.

This is the third notebook in the series of notebooks on how Aerospike can be used as a feature store. The first and second notebooks discussed [Feature Engineering](https://aerospike.com/docs/develop/tutorials/applications/feature-store/feature-engineering) and [Model Training](https://aerospike.com/docs/develop/tutorials/applications/feature-store/model-training) aspects respectively.

## Cleaning Up

Shut down the web service by hitting Ctrl-C in the tab in which it is running.

Close the spark session, and remove the tutorial data by executing the cell below.

```python
try:

    spark.stop()

except:

    ; # ignore

# To remove all data in the namespace test, uncomment the following line and run:

#!aql -c "truncate test"
```

## Further Exploration and Resources

Here are some links for further exploration.

### Resources

-   Related notebooks
    -   [Feature Store with Aerospike (Part 1)](https://aerospike.com/docs/develop/tutorials/applications/feature-store/feature-engineering)
    -   [Model Training with Aerospike Feature Store (Part 2)](https://aerospike.com/docs/develop/tutorials/applications/feature-store/model-training)
    -   [Aerospike Connect for Spark Tutorial for Python](https://aerospike.com/docs/develop/tutorials/spark/spark-connector-python)
-   Related blog posts
    -   [Let AI/ML workloads take off with Aerospike and Spark 3.0](https://medium.com/aerospike-developer-blog/let-ai-ml-workloads-take-off-with-aerospike-and-spark-3-0-82de2d834b99)
    -   [Using Aerospike Connect For Spark](https://medium.com/aerospike-developer-blog/aerospike-is-a-highly-scalable-key-value-database-offering-best-in-class-performance-5922450aaa78)
-   GitHub repos
    -   [Spark Aerospike Example](https://github.com/aerospike-examples/spark-aerospike-example)

### Exploring Other Notebooks

Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.