This notebook is the second in the series of notebooks that show how Aerospike can be used as a feature store.
This notebook requires the Aerospike Database and Spark running locally with the Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
This notebook shows how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using Aerospike Spark Connector. It is Part 2 of the Feature Store series of notebooks, and focuses on Model Training aspects concerning a Feature Store. The first notebook in the series discusses Feature Engineering, and the next one describes Model Serving.
This notebook is organized as follows:
Summary of the prior (Data Engineering) notebook
Exploring features and datasets
Defining and saving a dataset
Training and saving an AI/ML model
Prerequisites
This tutorial assumes familiarity with the following topics:
You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Context from Part 1 (Feature Engineering Notebook)
In the previous notebook in the Feature Store series, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We implemented a simple object model to save and query features, and illustrated its use with two examples.
You are encouraged to review the Feature Engineering notebook as we will use the same object model, implementation (with some extensions), and data in this notebook.
The code from Part 1 is replicated below as we will be using it later.
Code: Feature Group, Feature, and Entity
Below, we have copied over the code for Feature Group, Feature, and Entity classes for use in the following sections. Please review the object model described in the Feature Engineering notebook.
f =Feature(FG_NAME, FEATURE_AMOUNT,'double',"Transaction amount",
attrs={'entity':'cctxn'},tags=['usd'])
f.save()
FEATURE_CLASS='Class'
f =Feature(FG_NAME, FEATURE_CLASS,'integer',"Label indicating fraud or not",
attrs={'entity':'cctxn'},tags=['label'])
f.save()
FEATURE_PCA_XFORM="V"
for i inrange(1,29):
f =Feature(FG_NAME, FEATURE_PCA_XFORM+str(i),'double',"Transformed version of PCA",
attrs={'entity':'cctxn'},tags=['pca'])
f.save()
# 3. Save feature values in entity records
ENTITY_TYPE='cctxn'
ID_COLUMN='TxnId'
Entity.saveDF(df, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')
Output
Features stored to Feature Store.
Implementing Dataset
We created example implementations of Feature Group, Feature, and Entity objects as above. Let us now create a similar implementation of Dataset.
Object Model
A dataset is a subset of features and entities selected for an ML model. A Dataset object holds the selected features and entity instances. The actual (materialized) copy of entity records is stored outside the feature store (for instance, in a file system).
Attributes
A dataset record has the following attributes.
name: name of the data set, serves as the primary key for the record
description: human readable description
features: a list of the dataset features
predicate: query predicate to enumerate the entity instances in the dataset
location: external location where the dataset is stored
attrs: other metadata
tags: associated tags
Datasets are stored in the set “dataset-metadata”.
Operations
Dataset is used during Model Training. The following operations are needed.
create
load (get)
query (returns dataset metadata records)
materialize (returns entity records as defined by a dataset)
Dataset Implementation
Below is an example implementation of Dataset as described above.
In order to get best performance from the Aerospike feature store, one important optimization is to “push down” processing to the database and minimize the amount of data retrieved to Spark. This is especially important for querying from large amounts of underlying data, such as when creating a dataset. This is achieved by “pushing down” filters or processing filters in the database.
Currently the Spark Connector allows two mutually exclusive ways of specifying filters in a dataframe load:
The where clause
The pushdown expressions option
Only one may be specified because the underlying Aerospike database mechanisms used to process them are different and exclusive. The latter takes prcedence if both are specified.
The where clause filter may be pushed down in part or fully depending on the parts in the filter (that is, if the database supports them and the Spark Connector takes advantage of it). The pushdown expression filter however is fully processed in the database, which ensures best performance.
Aerospike expressions provide some filtering capabilities that are either not available on Spark (such as record metadata based filtering). Also, expression based filtering will be processed more efficiently in the database. On the other hand, the where clause also has many capabilities that are not available in Aerospike expressions. So it may be necessary to use both, in which case it is best to use pushdown expressions to retrieve a dataframe, and then process it using the Spark dataframe capabilities.
Creating Pushdown Expressions
The Spark Connector currently requires the base64 encoding of the expression. Exporting the base64 encoded expression currently requires the Java client, which can be run in a parallel notebook, and entails the following steps:
Write the expression in Java.
Test the expression with the desired data.
Obtain the base64 encoding.
Use the base64 representation in this notebook as shown below.
You can run the adjunct notebook Pushdown Expressions for Spark Connector to follow the above recipe and obtain the base64 representation of an expression for use in the following examples.
Examples
We illustrate pushdown expressions with Feature class queries, but the query method implementation can be adopted in other objects.
The examples below illustrate the capabilities and process of working with pushdown expressions. More details on expressions are explained in Pushdown Expressions for Spark Connector notebook.
The outer expression compares for the value returned from the first argument to be greater than 0. The first argument is the count of matching tags from the specified tags in the list bin tags.
Obtain the base64 representation from Pushdown Expressions for Spark Connector notebook. It is “kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA”
Now let’s explore the features available in the Feature Store prior to using them to train a model. We will illustrate this with the querying functions on the metadata objects we have implemented above, as well as Spark functions.
Exploring Datasets
As we are interested in building a fraud detection model, let’s see if there are any existing datasets that have “fraud’ in their description. At present there should be no datasets in the database until we create and save one in later sections.
ds_df = Dataset.query("description like '%fraud%'")
Let’s identify feature groups for the entity type “cctxn” (credit card transactions) that have an attribute “class”=“fraud”
fg_df = FeatureGroup.query("attrs.entity == 'cctxn' and attrs.class == 'fraud'")
fg_df.toPandas().transpose().head()
Output
0
name
CC1
description
Credit card transaction data
source
European cardholder dataset from Kaggle
attrs
’class’: ‘fraud’, ‘entity’: ‘cctxn’
tags
[kaggle, demo]
# View all available features in this feature group
f_df = Feature.query("fgname == 'CC1'")
f_df.toPandas()
Output
fid
fgname
name
type
description
attrs
tags
0
CC1_V23
CC1
V23
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
1
CC1_V10
CC1
V10
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
2
CC1_Class
CC1
Class
integer
Label indicating fraud or not
’entity’: ‘cctxn’
[label]
3
CC1_V20
CC1
V20
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
4
CC1_V16
CC1
V16
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
5
CC1_V1
CC1
V1
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
6
CC1_V6
CC1
V6
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
7
CC1_V25
CC1
V25
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
8
CC1_V9
CC1
V9
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
9
CC1_V2
CC1
V2
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
10
CC1_V3
CC1
V3
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
11
CC1_V12
CC1
V12
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
12
CC1_V21
CC1
V21
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
13
CC1_V27
CC1
V27
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
14
CC1_Amount
CC1
Amount
double
Transaction amount
’entity’: ‘cctxn’
[usd]
15
CC1_V24
CC1
V24
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
16
CC1_V7
CC1
V7
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
17
CC1_V28
CC1
V28
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
18
CC1_V4
CC1
V4
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
19
CC1_V13
CC1
V13
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
20
CC1_V17
CC1
V17
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
21
CC1_V18
CC1
V18
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
22
CC1_V26
CC1
V26
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
23
CC1_V19
CC1
V19
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
24
CC1_V14
CC1
V14
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
25
CC1_V11
CC1
V11
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
26
CC1_V8
CC1
V8
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
27
CC1_V5
CC1
V5
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
28
CC1_V22
CC1
V22
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
29
CC1_V15
CC1
V15
double
Transformed version of PCA
’entity’: ‘cctxn’
[pca]
The features look promising for a fraud prediction model. Let’s look at the actual feature data and its characteristics by querying the entity records.
Exploring Feature Data
We can further explore the feature data to determine what features should be part of the dataset. The feature data resides in Entity records and we can use the above info to form the schema and retrieve the records.
Defining Schema
In order to query using the Aerospike Spark Conntector, we must define the schema
for the record.
Here we get all records from the sample data in the database. A small subset of the data would suffice in practice.
# let's get the entity records to assess the data
txn_df = Entity.query(ENTITY_TYPE,"TxnId like '%'", schema,"TxnId")
print("Records retrieved: ", txn_df.count())
txn_df.printSchema()
Output
Records retrieved: 984
root
|-- TxnId: string (nullable = false)
|-- CC1_Class: integer (nullable = false)
|-- CC1_Amount: double (nullable = false)
|-- CC1_V1: double (nullable = true)
|-- CC1_V2: double (nullable = true)
|-- CC1_V3: double (nullable = true)
|-- CC1_V4: double (nullable = true)
|-- CC1_V5: double (nullable = true)
|-- CC1_V6: double (nullable = true)
|-- CC1_V7: double (nullable = true)
|-- CC1_V8: double (nullable = true)
|-- CC1_V9: double (nullable = true)
|-- CC1_V10: double (nullable = true)
|-- CC1_V11: double (nullable = true)
|-- CC1_V12: double (nullable = true)
|-- CC1_V13: double (nullable = true)
|-- CC1_V14: double (nullable = true)
|-- CC1_V15: double (nullable = true)
|-- CC1_V16: double (nullable = true)
|-- CC1_V17: double (nullable = true)
|-- CC1_V18: double (nullable = true)
|-- CC1_V19: double (nullable = true)
|-- CC1_V20: double (nullable = true)
|-- CC1_V21: double (nullable = true)
|-- CC1_V22: double (nullable = true)
|-- CC1_V23: double (nullable = true)
|-- CC1_V24: double (nullable = true)
|-- CC1_V25: double (nullable = true)
|-- CC1_V26: double (nullable = true)
|-- CC1_V27: double (nullable = true)
|-- CC1_V28: double (nullable = true)
Examining Data
We will examine the statistical properties as well as null values of the feature columns. Note, the column CC1_Class is the label (fraud or not).
# examine the statistical properties
txn_df.describe().toPandas().transpose()
Output
0
1
2
3
4
summary
count
mean
stddev
min
max
TxnId
984
59771.279471544716
83735.17714512876
1
99507
CC1_Class
984
0.5
0.5002542588519272
0
1
CC1_Amount
984
96.22459349593494
240.14239707065826
0.0
3828.04
CC1_V1
984
-2.4674030372100715
5.40712231422648
-30.552380043581
2.13238602134104
CC1_V2
984
1.9053035968231344
3.5961094277406076
-12.1142127363483
22.0577289904909
CC1_V3
984
-3.083884202829433
6.435904925385388
-31.1036848245812
3.77285685226266
CC1_V4
984
2.456780057740528
3.0427216170397466
-4.51582435488105
12.1146718424589
CC1_V5
984
-1.5617259373325372
4.202691637741722
-22.105531524316
11.0950886001596
CC1_V6
984
-0.572583991041022
1.8036571668000605
-6.40626663445964
6.47411462748849
CC1_V7
984
-2.73090333834317
5.863241960076915
-43.5572415712451
5.80253735302589
CC1_V8
984
0.26108185138806433
4.850081053008372
-41.0442609210741
20.0072083651213
CC1_V9
984
-1.301144796452937
2.266780102671618
-13.4340663182301
5.43663339611854
CC1_V10
984
-2.805194376398951
4.549492504413138
-24.5882624372475
8.73745780611353
CC1_V11
984
1.9525351017305455
2.7369799649027207
-2.33201137167952
12.0189131816199
CC1_V12
984
-2.995316874600595
4.657383279424634
-18.6837146333443
2.15205511590243
CC1_V13
984
-0.09029142836357146
1.0102129366924129
-3.12779501198771
2.81543981456255
CC1_V14
984
-3.597226605511213
4.5682405087763325
-19.2143254902614
3.44242199594215
CC1_V15
984
0.06275139057382163
1.0021871899317296
-4.49894467676621
2.47135790380837
CC1_V16
984
-2.1571248198091597
3.42439305003353
-14.1298545174931
3.13965565883069
CC1_V17
984
-3.36609535335953
5.953540928078054
-25.1627993693248
6.73938438478335
CC1_V18
984
-1.2187062731658431
2.3587681071910915
-9.49874592104677
3.79031621184375
CC1_V19
984
0.3359445791509033
1.2843379816775733
-3.68190355226504
5.2283417900513
CC1_V20
984
0.21117939872897198
1.0613528102262861
-4.12818582871798
11.0590042933942
CC1_V21
984
0.3548982757919287
2.78726704784996
-22.7976039055519
27.2028391573154
CC1_V22
984
-0.04448149211405775
1.1450798238059015
-8.88701714094871
8.36198519168435
CC1_V23
984
-0.036528942589509734
1.148960101817997
-19.2543276173719
5.46622995370963
CC1_V24
984
-0.04738043011343529
0.5866834793500019
-2.02802422921896
1.21527882183022
CC1_V25
984
0.08757054553217881
0.6404192414977025
-4.78160552206407
2.20820917836653
CC1_V26
984
0.026120460105754934
0.4682991121957343
-1.24392415371264
3.06557569653728
CC1_V27
984
0.09618165650018666
1.0037324673667467
-7.26348214633855
3.05235768679424
CC1_V28
984
0.02786530375842634
0.4429545316584082
-2.73388711897575
1.77936385243205
# check for null values
from pyspark.sql.functions import count, when, isnan
txn_df.select([count(when(isnan(c), c)).alias(c)for c in txn_df.columns]).toPandas().head()
Output
TxnId
CC1_Class
CC1_Amount
CC1_V1
CC1_V2
CC1_V3
CC1_V4
CC1_V5
CC1_V6
CC1_V7
…
CC1_V19
CC1_V20
CC1_V21
CC1_V22
CC1_V23
CC1_V24
CC1_V25
CC1_V26
CC1_V27
CC1_V28
0
0
0
0
0
0
0
0
0
0
0
…
0
0
0
0
0
0
0
0
0
0
1 rows × 31 columns
Defining Dataset
Based on the above exploration, we will choose features V1-V28 for our training dataset, which we will define below.
In addition to the features, we also need to choose the data records for the dataset. We only have a small data from the original dataset, and therefore we will use all the available records by setting the dataset query predicate to “true”.
It is possible to create a random dataset of random records by performing an “aerolookup” of randomly selected key values.
# Create a dataset with the V1-V28 features.
CC_FRAUD_DATASET="CC_FRAUD_DETECTION"
features =["CC1_V"+str(i) for i inrange(1,29)]
features_and_label =["CC1_Class"]+ features
ds =Dataset(CC_FRAUD_DATASET,"Training dataset for fraud detection model","cctxn","TxnId","string",
In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features and datasets stored in the Aerospike can be explored and reused for model training. We implemented a simple example feature store interface that leverages the Aerospike Spark Connector capabilities for this purpose. We used the APIs to create, save, and query features and datasets for model training.
This is the second notebook in the series of notebooks on how Aerospike can be used as a feature store. The first notebook discusses Feature Engineering aspects, whereas the third notebook explores the use of Aerospike Feature Store for Model Serving.
Cleaning Up
Close the spark session, and remove the tutorial data.
try:
spark.stop()
except:
; ignore
# To remove all data in the namespace test, uncomment the following line and run:
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.