This notebook is the first in the series of notebooks that show how Aerospike can be used as a feature store.
This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
This notebook demonstrates how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using the Aerospike Spark Connector. It is part of the Feature Store series of notebooks, and focuses on data model and Feature Engineering aspects concerning a Feature Store. The subsequent notebook(s) will describe the Model Traiing and Model Serving aspects of an ML application that deal with a Feature Store.
This notebook is organized in two parts:
The first part explains the key objects, data model, and operations of a Feature Store, and provides a simple implementation on top of the Aerospike Database.
The second part shows with two examples how the model developed in the first part can be used for Feature Engineering and storing features in the Feature Store.
Prerequisites
This tutorial assumes familiarity with the following topics:
You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Online and Offline Feature Stores
Aerospike can be used as a Feature Store in cases where broader capabilities of a commercial Feature Store are not warranted. Instead, when it is critical to have:
Sub-millisecond online access to features
Scaling to a very large number of entities and features
Convenient and reliable distributed deployment
Small server footprint for a low cost of operation
An Online Feature Store provides access to features in real time, and as such requires speed and, for many deployments, scalability. The Online Store only stores the latest version of feature values.
Aerospike is purpose built for the performance and scale requirements of an Online Store.
An Offline Store requires historic data to be maintained for “time travel” use cases, that is, to create datasets at a particular point in time for training and other needs. A time series of feature values can be maintained in Aerospike as a map (timestamp->value) for each feature. Aerospike’s native APIs allow efficient implementation of an Offline Store.
Offline-Online Consistency
An essential requirement for a Feature Store is to maintain Online-Offline consistency to ensure that the models that were trained with the Offline (historic) data remain valid in the production (Online) environment. Typically this means convenient, efficient, and timely sync of data from Offline to Online Store. Sometimes a reverse sync from Online to Offline Store may be requred if only the Online Store may be updated.
Synchronization in either direction between Online and Offline Stores can be achieved through various streaming connectors that Aerospike provides (such as for Kafka and Pulsar).
What is also necessary is to be able to access a specific version “as of time” (a special case being the latest version) for various use cases using the appropriate client library or connector.
Our focus in this Feature Store series will be primarily on the Online Store.
Design Considerations
The key design criteria for a Feature Store with Aerospike are access and storage efficiency. (The other key criterion, Offline - Online synchronization, will not be discussed further here. For our purpose here, we assume that can be handled with appropriate change notification and update setup between Offline and Online Stores.)
We will assume the following object model which is typical in a Feature Store:
Precomputed Feature Values are stored for Entities such as users, credit card transactions, and sensors.
Features are organized in Feature Groups. A Feature Group signifies the set of features that share common data sources, pipeline, and are created and updated together.
A Dataset is a snapshot of specified Features across a set of Entity instances. The snapshot is stored outside the Feature Store, whereas the Feature Store holds the metadata such as the snapshot query and location.
The Online Store keeps only the latest version of Feature Values, whereas the Offline store maintains a time series of Feature Values.
Online Store
There are multiple ways to store the latest Feature Values, each with its trade-offs. Here are two possible options:
One Entity instance record holds all Feature Values across multiple Feature Groups.
Stored in a set named after the Entity type. For example, “user-features”.
Read and write for Features in a Feature Group entails listing the Feature bins. Accessing a Feature vector at Model Serving time entails enumerating the model’s Feature bins across Feature Groups. To avoid Feature name collisions across Feature Groups, a bin name needs to include both the Feature Group and Feature name, such as Feature-Group:Feature.
Pros:
Fast and convenient read and write access to all features of an entity.
Cons:
A single last-update-time must be shared among all Feature Groups without additional mechanisms.
Multiple Entity instance records, one for each Feature Group.
Each Feature Group’s values are stored in its own set named in form Entity:FeatureGroup-features such as “user:fg1-features”.
Read and write for a Feature Group’s Features entail listing the Feature bins. Accessing a Feature vector at Model Serving time entails multiple requests over Feature Groups. Bin names are same as Feature names since they are unique within a Feature Group.
Pros:
Provides a better organization.
Allows for a large number of Features if they may exceed the record size limit.
Allows a separate timestamp for each Feature Group since each is updated separately.
Cons:
Multiple requests must be issued to access Features across Feature Groups.
Offline Store
The design requires multiple versions of Feature Values to be stored, each with its timestamp. One way of modeling such a time series of feature values in Aerospike is to store each feature as a map timestamp->value.
Modeling for Size
With historic versions stored in it, the Offline Store has to account for the data size problem: both in terms of individual record size as well as overall database size.
Record size
Limit the number of versions in each record by designating a record by its time duration. So Feature Values for an Entity instance will have multiple records by hour, day, week, month, or year, depending on the frequency of updates. The record will have a compound key of the form entity-key:duration-id like “123:202101”, for example, for a record holding feature values during January 2021 for a user with id 123.
As discussed above, the record size can be kept small by storing each Feature Group’s Feature Values in its own set.
Database size
To limit the size of the Offline Store, older data can be archived in a batch process. An indexed bin in a record holds the record’s duration specifier allowing efficient access to old records for archival. For example, weekly records wiil have an indexed bin with a year and week values concatenated like “202140”.
Improving Performance and Scalability
Before we dive into the implementation, here are some relevant performance and scalability aspects in use of the Spark Connector.
Random access with primary key
Random access using the primary key must use the __key bin for direct access through the primary index. Many times the unique user key is duplicated for convenience in another bin, but the equality predicate using another bin will result in a scan. This is shown below in single object load methods.
Efficient scan with set indexes
A set index should be enabled on a set for faster scans over the set. Data exploration as well as dataset creation operations will benefit by not having to scan the entire namespace instead of a fraction of records in a set, resulting in significant speedup as the namespace can be very large as compared to, say, the metadata sets. In the following implementation, we enable set indexes on all sets.
Aerospike Expression pushdown
To minimize the amount of data retrieved from the database, query predicates must be “pushed down”, or in other words, processed in the database and not on Spark. We will illustrate how expression pushdown is implemented with examples in the Model Training notebook.
Performance tuning parameters
Several tuning parameters such as partition factor and compression are available for optimizing perfomrance with the Spark Connector. The implementation here does not make use of them, but you are encouraged to explore them elsewhere.
Use of secondary indexes
While Aerospike provides secondary indexes, the Spark Connector currently does not leverage them. This performance enhancement is planned in the future.
Defining a Simple Feature Store
To make it concrete, let us define and implement a simple Feature Store.
Main Objects
The main objects in a Feature Store as discussed above are Feature Group, Feature, Entity, and Dataset. These are explained below.
Features Group
Machine Learning features are grouped by the source of data and processing pipeline. A Feature Group includes many features, and has the following attributes that are useful during feature exploration:
name: a unique name
description: human readable description
source: source of fact data
attrs: other metadata
tags: associated tags
A feature group is stored in the set “fg-metadata”.
Feature
A Feature consists of the following attributes:
fid: the record’s primary key consists of the string concatenation of feature group name and feature name in the form: fgname_fname
fgname: the name of the feature group that the feature belongs to
name: feature name that is unique within the feature group
type: feature type (integer, double, string, and possibly others)
description: human readable description
attrs: various performance stats and other metadata
tags: associated tags
A feature is stored in the set “feature-metadata”.
Entity
Features are computed and stored for an Entity such as a user, credit card, or sensor. Feature values are stored per Entity instance. Features in multiple feature groups for an Entity type are combined in one Entity record. A feature values record for an Entity has these attributes:
id_col: the column containing the id of the entity instance that serves as the record key
feature specific bins: each feature is stored in a bin named after the feature in the format fgname_fname.
timestamp: update timestamp
Entity records for an entity type are stored in an entity-type specific set “entitytype-features”. For example, the set “user-features” holds features for user instances, and “cc-features” holds features for credit card instances.
Dataset
A Dataset is a subset of features and entities selected to train an ML model. A Dataset object holds the selected features and entity instance definitions. The actual copy of entities is stored outside the feature store (for instance, in a file system). A dataset record has the following attributes.
name: name of the data set, serves as the primary key for the record
description: human readable description
features: a list of the dataset features
predicate: query predicate to enumerate the entity instances in the dataset
location: external location where the dataset is stored
attrs: other metadata
tags: associated tags
Datasets are stored in the set “dataset-metadata”.
Operations
The following operations are implemented for the different use scenarios:
Feature Engineering
Feature Group
create or update (save)
load (get)
Feature
create or update
load
Entity
create or update using a dataframe
load
Model Training
Feature Group
load
query by various attributes
Feature
query
Entity
query
Dataset
create
load
query
Model Serving
Entity
get a specific feature vector for an entity instance
Example Implementation
The following code is a simple implementation of the above operations. These operations will be illustrated in this and follow-up notebooks.
df = spark.createDataFrame([Row(**i)for i in data])
df.show()
Entity.saveDF(df,"etype_1","ID")
Output
+-----+-------------+-------------+
| ID|fg2_feature_1|fg2_feature_2|
+-----+-------------+-------------+
|eid_1| 1| 12.4|
|eid_2| 2| 30.1|
|eid_3| 3| 100.01|
+-----+-------------+-------------+
# test query operation that returns a dataframe
# need to define a schema first
schema =StructType([
StructField('ID',StringType(),False),
StructField('fg2_feature_1',IntegerType(),True),
StructField('fg2_feature_2',DoubleType(),True)])
queryDF = Entity.query('etype_1','ID in ("eid_1", "eid_3")', schema,'ID')
queryDF.show()
Output
+-----+-------------+-------------+
| ID|fg2_feature_1|fg2_feature_2|
+-----+-------------+-------------+
|eid_1| 1| 12.4|
|eid_3| 3| 100.01|
+-----+-------------+-------------+
Dataset Implementation
# Deferred to the second notebook in this series on Model Training.
Using Feature Store
Let us now see how the feature store objects and operations can be leveraged through the various phases of the ML flow. In this notebook, the focus is on Feature Engineering. Future notebooks will look into Model Training and Model Serving scenarios.
Example: Credit Card Fraud Data
The demo data is abridged from its original version from here. It represents real transactions by European cardholders in 2013. The original dataset has close to 300K transactions, whereas the abdridged version used here contains about a thousand records.
We will illustrate use of the feature store for Feature Engineering through the following sequence:
Read the demo data from a csv file.
Perform feature engineering tasks.
Save the engineered features to the Feature Store.
Read Data into Dataframe
Read and examine the columns and rows.
The data contains transformed versions of PCA (“V1”-“V28”) with 29 feature columns and 1 label (“Class”) column.
We are ready to save the features in the Spark dataframe to Aerospike.
Of the following three steps, the first two steps are needed only the first time when the feature group and features are created. Subsequent updates will use only the third step.
Create a feature group.
Create feature metadata.
Save feature values in entity records.
# 1. Create a feature group.
FG_NAME='CC1'
FG_DESCRIPTION='Credit card transaction data'
FG_SOURCE='European cardholder dataset from Kaggle'
f =Feature(FG_NAME, FEATURE_AMOUNT,'double',"Transaction amount",
attrs={'entity':'cctxn'},tags=['usd'])
f.save()
FEATURE_CLASS='Class'
f =Feature(FG_NAME, FEATURE_CLASS,'integer',"Label indicating fraud or not",
attrs={'entity':'cctxn'},tags=['label'])
f.save()
FEATURE_PCA_XFORM="V"
for i inrange(1,29):
f =Feature(FG_NAME, FEATURE_PCA_XFORM+str(i),'double',"Transformed version of PCA",
attrs={'entity':'cctxn'},tags=['pca'])
f.save()
# 3. Save feature values in entity records
ENTITY_TYPE='cctxn'
ID_COLUMN='TxnId'
Entity.saveDF(sparkDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')
Output
Features stored to Feature Store.
Query Features
Let’s issue a query against the saved features to get fraudulent transactions with amount greater than $2000. There appears to be only one such transaction in the demo data.
In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store via the Aerospike Spark Connector. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We used the API to save and query features created in two data engineering examples.
This is a first in the series of notebooks on how Aerospike can be used as a feature store. Subsequent notebooks in this series will explore use of Aerospike Feature Store for Model Training and Model Serving.
Cleaning Up
Close the spark session, and remove the tutorial data by executing the cell below.
try:
spark.stop()
except:
; # ignore
# To remove all data in the namespace test, uncomment the following line and run:
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.