Model Serving with Aerospike Feature Store
For an interactive Jupyter notebook experience:
This notebook is the third in the series of notebooks that show how Aerospike can be used as a feature store.
This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
This notebook shows how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using Aerospike Spark Connector. It is Part 3 of the Feature Store series of notebooks, and focuses on Model Serving aspects concerning a Feature Store. The first two notebooks in the series discuss Feature Engineering and Model Training.
This notebook is organized as follows:
- Summary of the prior (Data Engineering and Model Training) notebooks
- Load the trained and saved model for making a prediction.
- Use Aerospike API to retrieve precomputed features.
- Implement and test a web service that combines the above elements, that is, accesses features, runs the model and returns the prediction.
Prerequisites
This tutorial assumes familiarity with the following topics:
- Aerospike Notebooks - Readme and Tips
- Hello World
- Feature Store with Aerospike (Part 1)
- Model Training with Aerospike Feature Store (Part 2)
Setup
Set up Aerospike Server. Spark Server, and Spark Connector.
Ensure Database Is Running
This notebook requires that Aerospike database is running.
!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
Output:
Aerospike database is running!
Initialize Client
Initialize Python Client used to access features stored in the Aerospike feature store.
import aerospike
import sys
# connect to the database
config = {
'hosts': [ ('127.0.0.1', 3000) ]
}
try:
client = aerospike.client(config).connect()
except:
print("failed to connect to the cluster with", config['hosts'])
sys.exit(1)
print('Client initialized and connected to database')
Output:
Client initialized and connected to database
Initialize Spark
We will be using Spark functionality in this notebook.
Initialize Paths and Env Variables
# directory where spark notebook requisites are installed
#SPARK_NB_DIR = '/home/jovyan/notebooks/spark'
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AEROSPIKE_SPARK_JAR_VERSION="3.2.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
import findspark
findspark.init(SPARK_HOME)
# Aerospike Spark Connector related settings
import os
AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + SPARK_NB_DIR + '/' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
Configure Spark Session
Please visit Configuring Aerospike Connect for Spark for more information about the properties used on this page.
# imports
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
Access Shell Commands
You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Prior Context
In the prior two notebooks on Feature Enginnering and Model Training, we saved feature data to the feature store, and trained an ML model on it, respectively. If the saved feature data or the model is not available in this environment, you can run the following cells to recreate them.
Feature Data
Run the following cells ONLY IF the database does not have the feature data for credit card transactions from the prior notebooks (Part 1 or Part 2). You will need to covert them to Code
cells before you can run them.
# read and transform the sample credit card transactions data from a csv file
from pyspark.sql.functions import expr
df = spark.read.options(header="True", inferSchema="True") \
.csv("resources/creditcard_small.csv") \
. orderBy(['_c0'], ascending=[True])
new_col_names = ['CC1_' + (c if c != '_c0' else 'OldIdx') for c in df.columns]
df = df.toDF(*new_col_names) \
.withColumn('TxnId', expr('CC1_OldIdx+1').cast(StringType())) \
.select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])
#df.toPandas().head()# Save feature values in entity records
ENTITY_TYPE = 'cctxn'
ID_COLUMN = 'TxnId'
df.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", ENTITY_TYPE+'-features')\
.option("aerospike.updateByKey", ID_COLUMN) \
.save()
print('Features stored to Feature Store.')
Trained Model
If the saved model fs_model_rf
is not in the resources directory, first extract it from the archive using the following commands in the terminal tab:
cd /home/jovyan/notebooks/spark/resources
tar -xvzf fs_model_rf.tar.gz
Components of Web Service
The main components of the web service are:
- Loading the model
- Retrieving the required features
- Running the model to make a prediction
Loading Model
Load the RandomForestClassifer model that we saved in the Model Training notebook (Part 2).
from pyspark.ml.classification import RandomForestClassificationModel
rf_model = RandomForestClassificationModel.read().load("resources/fs_model_rf")
print("Loaded Random Forest Classification model.")
Output:
Loaded Random Forest Classification model.
Retrieving Features
The Python Client provides a convenient API to access specific features from the entity set as shown below. Recall, the model uses features CC1_V1 through CC1-V28. We also need to construct a schema for the dataframe which is needed to run the model.
namespace = 'test'
entity_set = 'cctxn-features'
txnid = '5' # dummy value, the web service will get the id from the request params
record_key = (namespace, entity_set, txnid)
features = ["CC1_V"+str(i) for i in range(1,29)] # need features CC1_V1-CC1_V28
schema = StructType()
for i in range(1,29): # all features are of type float or Double
schema.add("CC1_V"+str(i), DoubleType(), True)
# get the needed features
try:
(key, meta, bins) = client.select(record_key, features)
except:
print('failed to get record')
sys.exit(1)
# create an input dataframe for the model
featureBuf = [tuple([bins[f] for f in features])]
featureRDD = spark.sparkContext.parallelize(featureBuf)
featureDF = spark.createDataFrame(featureRDD, schema)
featureDF.toPandas().transpose()
Output:
0 CC1_V1 -1.158233 CC1_V2 0.877737 CC1_V3 1.548718 CC1_V4 0.403034 CC1_V5 -0.407193 CC1_V6 0.095921 CC1_V7 0.592941 CC1_V8 -0.270533 CC1_V9 0.817739 CC1_V10 0.753074 CC1_V11 -0.822843 CC1_V12 0.538196 CC1_V13 1.345852 CC1_V14 -1.119670 CC1_V15 0.175121 CC1_V16 -0.451449 CC1_V17 -0.237033 CC1_V18 -0.038195 CC1_V19 0.803487 CC1_V20 0.408542 CC1_V21 -0.009431 CC1_V22 0.798278 CC1_V23 -0.137458 CC1_V24 0.141267 CC1_V25 -0.206010 CC1_V26 0.502292 CC1_V27 0.219422 CC1_V28 0.215153
Running Model
Construct Feature Vector
We first construct a feature vector from the input features as required by the model interface. The model only uses fvector column created by VectorAssembler.
from pyspark.ml.feature import VectorAssembler
# create a feature vector from features
assembler = VectorAssembler(inputCols=features, outputCol="fvector")
featureVectorDF = assembler.transform(featureDF)
featureVectorDF.toPandas().transpose()
Output:
0 CC1_V1 -1.158233 CC1_V2 0.877737 CC1_V3 1.548718 CC1_V4 0.403034 CC1_V5 -0.407193 CC1_V6 0.095921 CC1_V7 0.592941 CC1_V8 -0.270533 CC1_V9 0.817739 CC1_V10 0.753074 CC1_V11 -0.822843 CC1_V12 0.538196 CC1_V13 1.345852 CC1_V14 -1.11967 CC1_V15 0.175121 CC1_V16 -0.451449 CC1_V17 -0.237033 CC1_V18 -0.038195 CC1_V19 0.803487 CC1_V20 0.408542 CC1_V21 -0.009431 CC1_V22 0.798278 CC1_V23 -0.137458 CC1_V24 0.141267 CC1_V25 -0.20601 CC1_V26 0.502292 CC1_V27 0.219422 CC1_V28 0.215153 fvector [-1.15823309349523, 0.877736754848451, 1.54871...
Predict
Call the model's transform function to predict. We input only a dataframe with fvector
column, and use only two columns from the prediction dataframe record: probablity
and prediction
. The threshold for fraud/no-fraud decision is 50%.
rf_prediction = rf_model.transform(featureVectorDF['fvector',])
result = rf_prediction['probability', 'prediction'].collect()[0]
print('normal txn prob: ', result[0][0])
print('fraud prob: ', result[0][1])
print('prediction: ', 'no fraud' if result[0][1] < 0.5 else 'fraud')
Output:
normal txn prob: 0.9361028509004404
fraud prob: 0.0638971490995595
prediction: no fraud
Model Serving with Web Service
Let's create a simple web service that serves the model. We will use the Flask framework to create the web service. The web service takes txnid as the query parameter, retrieves the features from the feature store, runs the model, and returns the prediction.
Note, this model serving example is not realistic as we are using only precomputed features for inference. Also, we have trained and tested the model with the same data. Nonetheless, the example serves the purpose which is to illustrate the use of a feature store for model serving. It should not be difficult to use the patterns shown here to devise a realistic example.
# stop the existing spark session before starting the web service
spark.stop()
Install Web Service Framework
Open a terminal tab, and install the Flask framework with the following command.
pip install flask
pip install flask_restful
Examine Web Service File
First open the file resources/fs-model-ws.py
that implements the web service using Flask frameowrk, and examine its contents.
Note that it is mostly the code in the above cells organized to run as a Flask web service. You can learn more about Flask here.
Run Web Service
Run the web service by opening a terminal tab and running the following commands in it:
- cd /home/jovyan/notebooks/spark/resources
- python fs-model-ws.py
You can ignore the warning messages. After the "Debugger is active" message, the service is ready to receive requests.
Send Requests to Web Service
Let's call the web service to predict the outcome for a transaction id.
We can submit requests through the curl
command as below. We can test with a few normal transactions (ids: 1, 2, 3, 10), and a few fraud transactions (ids: 6337, 120506, 150669).
You can query the database to view other fraud and normal transaction ids. As you may recall, TxnId and CC1_Class are the bins for the transaction id and label respectively.
# Send a request to the model web service running at 127.0.0.1:5000
!curl http://127.0.0.1:5000/?txnid=1
Output:
{
"fraud_prob": 0.052470997597310345,
"normal_prob": 0.9475290024026897,
"prediction": "no fraud"
}
# You can query a transaction in the database.
# remember CC1_Class is the label with 1 indicating a fraudulent transaction
!aql -c "select TxnId, CC1_Class from test.cctxn-features where PK = '6337'"
Output:
select TxnId, CC1_Class from test.cctxn-features where PK = '6337'
+--------+------------+
| TxnId | CC1_Class |
+--------+------------+
| "6337" | 1 |
+--------+------------+
1 row in set (0.000 secs)
OK
Takeaways and Conclusion
In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how the precomputed features stored in the Aerospike feature store can be used at model serving time. We implemented a simple web service that loads the trained model, and then for each request, retrieves features, runs the model, and returns the model prediction.
This is the third notebook in the series of notebooks on how Aerospike can be used as a feature store. The first and second notebooks discussed Feature Engineering and Model Training aspects respectively.
Cleaning Up
Shut down the web service by hitting Ctrl-C in the tab in which it is running.
Close the spark session, and remove the tutorial data by executing the cell below.
try:
spark.stop()
except:
; # ignore
# To remove all data in the namespace test, uncomment the following line and run:
#!aql -c "truncate test"
Further Exploration and Resources
Here are some links for further exploration.
Resources
- Related notebooks
- Related blog posts
- Aerospike Developer Hub
- Github repos
Exploring Other Notebooks
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.