Feature Store with Aerospike
For an interactive Jupyter notebook experience:
This notebook is the first in the series of notebooks that show how Aerospike can be used as a feature store.
This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
This notebook demonstrates how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using the Aerospike Spark Connector. It is part of the Feature Store series of notebooks, and focuses on data model and Feature Engineering aspects concerning a Feature Store. The subsequent notebook(s) will describe the Model Traiing and Model Serving aspects of an ML application that deal with a Feature Store.
This notebook is organized in two parts:
- The first part explains the key objects, data model, and operations of a Feature Store, and provides a simple implementation on top of the Aerospike Database.
- The second part shows with two examples how the model developed in the first part can be used for Feature Engineering and storing features in the Feature Store.
Prerequisites
This tutorial assumes familiarity with the following topics:
Setup
Set up Aerospike Server. Spark Server, and Spark Connector.
Ensure Database Is Running
This notebook requires that Aerospike database is running.
!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
Output:
Aerospike database is running!
Initialize Spark
We will be using Spark functionality in this notebook.
Initialize Paths and Env Variables
# directory where spark notebook requisites are installed
#SPARK_NB_DIR = '/home/jovyan/notebooks/spark'
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AEROSPIKE_SPARK_JAR_VERSION="3.2.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
import findspark
findspark.init(SPARK_HOME)
# Aerospike Spark Connector related settings
import os
AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + SPARK_NB_DIR + '/' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
Configure Spark Session
Please visit Configuring Aerospike Connect for Spark for more information about the properties used on this page.
# imports
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
Access Shell Commands
You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Online and Offline Feature Stores
Aerospike can be used as a Feature Store in cases where broader capabilities of a commercial Feature Store are not warranted. Instead, when it is critical to have:
- Sub-millisecond online access to features
- Scaling to a very large number of entities and features
- Convenient and reliable distributed deployment
- Small server footprint for a low cost of operation
An Online Feature Store provides access to features in real time, and as such requires speed and, for many deployments, scalability. The Online Store only stores the latest version of feature values.
Aerospike is purpose built for the performance and scale requirements of an Online Store.
An Offline Store requires historic data to be maintained for “time travel” use cases, that is, to create datasets at a particular point in time for training and other needs. A time series of feature values can be maintained in Aerospike as a map (timestamp->value) for each feature. Aerospike's native APIs allow efficient implementation of an Offline Store.
Offline-Online Consistency
An essential requirement for a Feature Store is to maintain Online-Offline consistency to ensure that the models that were trained with the Offline (historic) data remain valid in the production (Online) environment. Typically this means convenient, efficient, and timely sync of data from Offline to Online Store. Sometimes a reverse sync from Online to Offline Store may be requred if only the Online Store may be updated.
Synchronization in either direction between Online and Offline Stores can be achieved through various streaming connectors that Aerospike provides (such as for Kafka and Pulsar).
What is also necessary is to be able to access a specific version “as of time” (a special case being the latest version) for various use cases using the appropriate client library or connector.
Our focus in this Feature Store series will be primarily on the Online Store.
Design Considerations
The key design criteria for a Feature Store with Aerospike are access and storage efficiency. (The other key criterion, Offline - Online synchronization, will not be discussed further here. For our purpose here, we assume that can be handled with appropriate change notification and update setup between Offline and Online Stores.)
We will assume the following object model which is typical in a Feature Store:
- Precomputed
Feature Values
are stored forEntities
such as users, credit card transactions, and sensors. Features
are organized inFeature Groups
. AFeature Group
signifies the set of features that share common data sources, pipeline, and are created and updated together.- A
Dataset
is a snapshot of specifiedFeatures
across a set ofEntity
instances. The snapshot is stored outside the Feature Store, whereas the Feature Store holds the metadata such as the snapshot query and location. - The Online Store keeps only the latest version of Feature Values, whereas the Offline store maintains a time series of Feature Values.
Online Store
There are multiple ways to store the latest Feature Values, each with its trade-offs. Here are two possible options:
One
Entity
instance record holds allFeature Values
across multipleFeature Groups
.- Stored in a set named after the
Entity
type (e.g., "user-features"). - Read and write for a
Feature Group
'sFeatures
entails listing theFeature
bins. Accessing aFeature
vector at Model Serving time entails enumerating the model'sFeature
bins acrossFeature Groups
. To avoidFeature
name collisions acroosFeature Groups
, a bin name needs to include both theFeature Group
andFeature
name such asFeature-Group:Feature
. - Pros:
- Fast and convenient read and write access to all features of an entity.
- Cons:
- A single last-update-time must be shared among all
Feature Groups
without additional mechanisms.
- A single last-update-time must be shared among all
- Stored in a set named after the
Multiple
Entity
instance records, one for eachFeature Group
.- Each
Feature Group
's values are stored in its own set named in formEntity:FeatureGroup-features
such as "user:fg1-features". - Read and write for a
Feature Group
'sFeatures
entail listing theFeature
bins. Accessing aFeature
vector at Model Serving time entails multiple requests overFeature Groups
. Bin names are same asFeature
names since they are unique within aFeature Group
. - Pros:
- Provides a better organization.
- Allows for a large number of
Features
if they may exceed the record size limit. - Allows a separate timestamp for each
Feature Group
since each is updated separately.
- Cons:
- Multiple requests must be issued to access
Features
acrossFeature Groups
.
- Multiple requests must be issued to access
- Each
Offline Store
The design requires multiple versions of Feature Values to be stored, each with its timestamp. One way of modeling such a time series of feature values in Aerospike is to store each feature as a map timestamp->value
.
Modeling for Size
With historic versions stored in it, the Offline Store has to account for the data size problem: both in terms of individual record size as well as overall database size.
- Record size
- Limit the number of versions in each record by designating a record by its time duration. So Feature Values for an Entity instance will have multiple records by hour, day, week, month, or year, etc, depending on the frequency of updates. The record will have a compound key of the form
entity-key:duration-id
like "123:202101", for example, for a record holding feature values during January 2021 for a user with id 123. - As discussed above, the record size can be kept small by storing each Feature Group's Feature Values in its own set.
- Limit the number of versions in each record by designating a record by its time duration. So Feature Values for an Entity instance will have multiple records by hour, day, week, month, or year, etc, depending on the frequency of updates. The record will have a compound key of the form
- Database size
- To limit the size of the Offline Store, older data can be archived in a batch process. An indexed bin in a record holds the record's duration specifier allowing efficient access to old records for archival. For example, weekly records wiil have an indexed bin with a year and week values concatenated like "202140".
Improving Performance and Scalability
Before we dive into the implementation, here are some relevant performance and scalability aspects in use of the Spark Connector.
Random access with primary key
Random access using the primary key must use the __key bin for direct access through the primary index. Many times the unique user key is duplicated for convenience in another bin, but the equality predicate using another bin will result in a scan. This is shown below in single object load
methods.
Efficient scan with set indexes
A set index should be enabled on a set for faster scans over the set. Data exploration as well as dataset creation operations will benefit by not having to scan the entire namespace instead of a fraction of records in a set, resulting in significant speedup as the namespace can be very large as compared to, say, the metadata sets. In the following implementation, we enable set indexes on all sets.
Aerospike Expression pushdown
To minimize the amount of data retrieved from the database, query predicates must be "pushed down", or in other words, processed in the database and not on Spark. We will illustrate how expression pushdown is implemented with examples in the Model Training notebook.
Performance tuning parameters
Several tuning parameters such as partition factor
and compression
are available for optimizing perfomrance with the Spark Connector. The implementation here does not make use of them, but you are encouraged to explore them elsewhere.
Use of secondary indexes
While Aerospike provides secondary indexes, the Spark Connector currently does not leverage them. This performance enhancement is planned in the future.
Defining a Simple Feature Store
To make it concrete, let us define and implement a simple Feature Store.
Main Objects
The main objects in a Feature Store as discussed above are Feature Group, Feature, Entity, and Dataset. These are explained below.
Features Group
Machine Learning features are grouped by the source of data and processing pipeline. A Feature Group includes many features, and has the following attributes that are useful during feature exploration:
- name: a unique name
- description: human readable description
- source: source of fact data
- attrs: other metadata
- tags: associated tags
A feature group is stored in the set "fg-metadata".
Feature
A Feature consists of the following attributes:
- fid: the record's primary key consists of the string concatenation of feature group name and feature name in the form: fgname_fname
- fgname: the name of the feature group that the feature belongs to
- name: feature name that is unique within the feature group
- type: feature type (integer, double, string, and possibly others)
- description: human readable description
- attrs: various performance stats and other metadata
- tags: associated tags
A feature is stored in the set "feature-metadata".
Entity
Features are computed and stored for an Entity such as a user, credit card, or sensor. Feature values are stored per Entity instance. Features in multiple feature groups for an Entity type are combined in one Entity record. A feature values record for an Entity has these attributes:
- id_col: the column containing the id of the entity instance that serves as the record key
- feature specific bins: each feature is stored in a bin named after the feature in the format fgname_fname.
- timestamp: update timestamp
Entity records for an entity type are stored in an entity-type specific set "entitytype-features". For example, the set "user-features" holds features for user instances, and "cc-features" holds features for credit card instances.
Dataset
A Dataset is a subset of features and entities selected to train an ML model. A Dataset object holds the selected features and entity instance definitions. The actual copy of entities is stored outside the feature store (for instance, in a file system). A dataset record has the following attributes.
- name: name of the data set, serves as the primary key for the record
- description: human readable description
- features: a list of the dataset features
- predicate: query predicate to enumerate the entity instances in the dataset
- location: external location where the dataset is stored
- attrs: other metadata
- tags: associated tags
Datasets are stored in the set "dataset-metadata".
Operations
The following operations are implemented for the different use scenarios:
Feature Engineering
- Feature Group
- create or update (save)
- load (get)
- Feature
- create or update
- load
- Entity
- create or update using a dataframe
- load
- Feature Group
Model Training
- Feature Group
- load
- query by various attributes
- Feature
- query
- Entity
- query
- Dataset
- create
- load
- query
- Feature Group
Model Serving
- Entity
- get a specific feature vector for an entity instance
- Entity
Example Implementation
The following code is a simple implementation of the above operations. These operations will be illustrated in this and follow-up notebooks.
Feature Group Implementation
import copy
class FeatureGroup:
schema = StructType([StructField("name", StringType(), False),
StructField("description", StringType(), True),
StructField("source", StringType(), True),
StructField("attrs", MapType(StringType(), StringType()), True),
StructField("tags", ArrayType(StringType()), True)])
def __init__(self, name, description, source, attrs, tags):
self.name = name
self.description = description
self.source = source
self.attrs = attrs
self.tags = tags
return
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def save(self):
inputBuf = [(self.name, self.description, self.source, self.attrs, self.tags)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, FeatureGroup.schema)
#Write the data frame to Aerospike, the name field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "fg-metadata")\
.option("aerospike.updateByKey", "name") \
.save()
return
def load(name):
fg = None
schema = copy.deepcopy(FeatureGroup.schema)
schema.add("__key", StringType(), False)
fgdf = spark.read \
.format("aerospike") \
.option("aerospike.set", "fg-metadata") \
.schema(schema) \
.load().where("__key = \"" + name + "\"")
if fgdf.count() > 0:
fgtuple = fgdf.collect()[0]
fg = FeatureGroup(*fgtuple[:-1])
return fg
def query(predicate): #returns a dataframe
fg_df = spark.read \
.format("aerospike") \
.schema(FeatureGroup.schema) \
.option("aerospike.set", "fg-metadata") \
.load().where(predicate)
return fg_df
# Enable set index on fg-metadata
!asinfo -v "set-config:context=namespace;id=test;set=fg-metadata;enable-index=true"
Output:
ok
# test feature group
# test save and load
# save
fg1 = FeatureGroup("fg_name1", "fg_desc1", "fg_source1",
{"fg_attr1":"1", "fg_attr2":"two"}, ["fg_tag1", "fg_tag2"])
fg1.save()
# load
fg2 = FeatureGroup.load("fg_name1")
print(fg2, '\n')
# test query
print("Feature Groups with name ending with '_name1' and having attribute 'fg_tag1'='1':")
df = FeatureGroup.query("name like '%_name1' and attrs.fg_attr1 == '1'")
df.show()
Output:
<class '__main__.FeatureGroup'>: {'name': 'fg_name1', 'description': 'fg_desc1', 'source': 'fg_source1', 'attrs': {'fg_attr2': 'two', 'fg_attr1': '1'}, 'tags': ['fg_tag1', 'fg_tag2']}
Feature Groups with name ending with '_name1' and having attribute 'fg_tag1'='1':
+--------+-----------+----------+--------------------+------------------+
| name|description| source| attrs| tags|
+--------+-----------+----------+--------------------+------------------+
|fg_name1| fg_desc1|fg_source1|[fg_attr1 -> 1, f...|[fg_tag1, fg_tag2]|
+--------+-----------+----------+--------------------+------------------+
Feature Implementation
class Feature:
schema = StructType([StructField("fid", StringType(), False),
StructField("fgname", StringType(), False),
StructField("name", StringType(), False),
StructField("type", StringType(), False),
StructField("description", StringType(), True),
StructField("attrs", MapType(StringType(), StringType()), True),
StructField("tags", ArrayType(StringType()), True)])
def __init__(self, fgname, name, ftype, description, attrs, tags):
self.fid = fgname + '_' + name
self.fgname = fgname
self.name = name
self.ftype = ftype
self.description = description
self.attrs = attrs
self.tags = tags
return
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def save(self):
inputBuf = [(self.fid, self.fgname, self.name, self.ftype, self.description, self.attrs, self.tags)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, Feature.schema)
#Write the data frame to Aerospike, the fid field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "feature-metadata")\
.option("aerospike.updateByKey", "fid") \
.save()
return
def load(fgname, name):
f = None
schema = copy.deepcopy(Feature.schema)
schema.add("__key", StringType(), False)
f_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", "feature-metadata") \
.load().where("__key = \"" + fgname+'_'+name + "\"")
if f_df.count() > 0:
f_tuple = f_df.collect()[0]
f = Feature(*f_tuple[1:-1])
return f
def query(predicate): #returns a dataframe
f_df = spark.read \
.format("aerospike") \
.schema(Feature.schema) \
.option("aerospike.set", "feature-metadata") \
.load().where(predicate)
return f_df
# Enable set index on feature-metadata
!asinfo -v "set-config:context=namespace;id=test;set=feature-metadata;enable-index=true"
Output:
ok
# test Feature
# test save and load
# save
feature1 = Feature("fg_name1", "f_name1", "integer", "f_desc1",
{"f_attr1":"1", "f_attr2":"two"}, ["f_tag1", "f_tag2"])
feature1.save()
# load
f1 = Feature.load("fg_name1", "f_name1")
print(f1, '\n')
# test query
feature2 = Feature("fg_name1", "f_name2", "double", "f_desc2",
{"f_attr1":"1.0", "f_attr3":"three"}, ["f_tag1", "f_tag3"])
feature2.save()
print("Features having name starting with 'f_name' and tagged with 'f_tag1':")
f_df = Feature.query("name like 'f_name%' and array_contains(tags, 'f_tag1')")
f_df.show()
Output:
<class '__main__.Feature'>: {'fid': 'fg_name1_f_name1', 'fgname': 'fg_name1', 'name': 'f_name1', 'ftype': 'integer', 'description': 'f_desc1', 'attrs': {'f_attr2': 'two', 'f_attr1': '1'}, 'tags': ['f_tag1', 'f_tag2']}
Features having name starting with 'f_name' and tagged with 'f_tag1':
+----------------+--------+-------+-------+-----------+--------------------+----------------+
| fid| fgname| name| type|description| attrs| tags|
+----------------+--------+-------+-------+-----------+--------------------+----------------+
| fgname1_f_name1| fgname1|f_name1|integer| f_desc1|[etype -> etype1,...|[f_tag1, f_tag2]|
|fg_name1_f_name2|fg_name1|f_name2| double| f_desc2|[f_attr1 -> 1.0, ...|[f_tag1, f_tag3]|
| fgname1_f_name2| fgname1|f_name2| double| f_desc2|[etype -> etype1,...|[f_tag1, f_tag3]|
|fg_name1_f_name1|fg_name1|f_name1|integer| f_desc1|[f_attr1 -> 1, f_...|[f_tag1, f_tag2]|
+----------------+--------+-------+-------+-----------+--------------------+----------------+
Entity Implementation
class Entity:
def __init__(self, etype, record, id_col):
# record is an array of triples (name, type, value)
self.etype = etype
self.record = record
self.id_col = id_col
return
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def get_schema(record):
schema = StructType()
for f in record:
schema.add(f[0], f[1], True)
return schema
def get_id_type(schema, id_col):
return schema[id_col].dataType.typeName()
def save(self, schema):
fvalues = [f[2] for f in self.record]
inputBuf = [tuple(fvalues)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, schema)
#Write the data frame to Aerospike, the id_col field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", self.etype+'-features')\
.option("aerospike.updateByKey", self.id_col) \
.save()
return
def load(etype, eid, schema, id_col):
ent = None
ent_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", etype+'-features') \
.option("aerospike.keyType", "string") \
.load().where(id_col + " = \"" + eid + "\"")
if ent_df.count() > 0:
ent_tuple = ent_df.collect()[0]
record = [(schema[i].name, schema[i].dataType.typeName(), fv) for i, fv in enumerate(ent_tuple)]
ent = Entity(etype, record, id_col)
return ent
def saveDF(df, etype, id_col): # save a dataframe
# df: dataframe consisting of feature values
# etyoe: entity type (such as user or sensor)
# id_col: column name that holds the primary key
#Write the data frame to Aerospike, the column in id_col is used as the key
df.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", etype+'-features')\
.option("aerospike.updateByKey", id_col) \
.save()
return
def query(etype, predicate, schema, id_col): #returns a dataframe
ent_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", etype+'-features') \
.option("aerospike.keyType", Entity.get_id_type(schema, id_col)) \
.load().where(predicate)
return ent_df
def get_feature_vector(etype, eid, feature_list): # elements in feature_list are in "fg_name|name" form
# deferred to Model Serving notebook
pass
# Enable set index on cctxn-features
!asinfo -v "set-config:context=namespace;id=test;set=cctxn-features;enable-index=true"
Output:
ok
# test Entity
# test save and load
# save
features1 = [('fg1:f_name1', IntegerType(), 1), ('fg1:f_name2', DoubleType(), 2.0), ('fg1:f_name3', StringType(), 'three')]
record1 = [('eid', StringType(), 'eid1')] + features1
ent1 = Entity('entity_type1', record1, 'eid')
schema = Entity.get_schema(record1)
ent1.save(schema);
# load
e1 = Entity.load('entity_type1', 'eid1', schema, 'eid')
print(e1, '\n')
# test query
features2 = [('fg1:f_name1', IntegerType(), 10), ('fg1:f_name2', DoubleType(), 20.0), ('fg1:f_name3', StringType(), 'thirty')]
record2 = [('eid', StringType(), 'eid2')] + features2
ent2 = Entity('entity_type2', record2, 'eid')
ent2.save(schema);
# query 1
print("Instances of entity type entity_type1 with eid ending in 1:")
instances = Entity.query('entity_type1', 'eid like "%1"', schema, 'eid')
instances.show()
# query 2
print("Instances of entity type entity_type2 with eid in ['eid2']:")
instances = Entity.query('entity_type2', 'eid in ("eid2")', schema, 'eid')
instances.show()
Output:
<class '__main__.Entity'>: {'etype': 'entity_type1', 'record': [('eid', 'string', 'eid1'), ('fg1:f_name1', 'integer', 1), ('fg1:f_name2', 'double', 2.0), ('fg1:f_name3', 'string', 'three')], 'id_col': 'eid'}
Instances of entity type entity_type1 with eid ending in 1:
+----+-----------+-----------+-----------+
| eid|fg1:f_name1|fg1:f_name2|fg1:f_name3|
+----+-----------+-----------+-----------+
|eid1| 1| 2.0| three|
+----+-----------+-----------+-----------+
Instances of entity type entity_type2 with eid in ['eid2']:
+----+-----------+-----------+-----------+
| eid|fg1:f_name1|fg1:f_name2|fg1:f_name3|
+----+-----------+-----------+-----------+
|eid2| 10| 20.0| thirty|
+----+-----------+-----------+-----------+
# test save dataframe
from pyspark.sql import SparkSession, Row
data = [{"ID": 'eid_1', "fg2_feature_1": 1, "fg2_feature_2": 12.40},
{"ID": 'eid_2', "fg2_feature_1": 2, "fg2_feature_2": 30.10},
{"ID": 'eid_3', "fg2_feature_1": 3, "fg2_feature_2": 100.01}
]
# create and save a dataframe
df = spark.createDataFrame([Row(**i) for i in data])
df.show()
Entity.saveDF(df, "etype_1", "ID")
Output:
+-----+-------------+-------------+
| ID|fg2_feature_1|fg2_feature_2|
+-----+-------------+-------------+
|eid_1| 1| 12.4|
|eid_2| 2| 30.1|
|eid_3| 3| 100.01|
+-----+-------------+-------------+
# test query operation that returns a dataframe
# need to define a schema first
schema = StructType([
StructField('ID', StringType(), False),
StructField('fg2_feature_1', IntegerType(), True),
StructField('fg2_feature_2', DoubleType(), True)])
queryDF = Entity.query('etype_1', 'ID in ("eid_1", "eid_3")', schema, 'ID')
queryDF.show()
Output:
+-----+-------------+-------------+
| ID|fg2_feature_1|fg2_feature_2|
+-----+-------------+-------------+
|eid_1| 1| 12.4|
|eid_3| 3| 100.01|
+-----+-------------+-------------+
Dataset Implementation
# Deferred to the second notebook in this series on Model Training.
Using Feature Store
Let us now see how the feature store objects and operations can be leveraged through the various phases of the ML flow. In this notebook, the focus is on Feature Engineering. Future notebooks will look into Model Training and Model Serving scenarios.
Example: Credit Card Fraud Data
The demo data is abridged from its original version from here. It represents real transactions by European cardholders in 2013. The original dataset has close to 300K transactions, whereas the abdridged version used here contains about a thousand records.
We will illustrate use of the feature store for Feature Engineering through the following sequence:
- Read the demo data from a csv file.
- Perform feature engineering tasks.
- Save the engineered features to the Feature Store.
Read Data into Dataframe
Read and examine the columns and rows.
The data contains transformed versions of PCA ("V1"-"V28") with 29 feature columns and 1 label ("Class") column.
import pandas as pd
data=pd.read_csv("resources/creditcard_small.csv")
data.info()
Output:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 984 entries, 0 to 983
Data columns (total 32 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 Unnamed: 0 984 non-null int64
1 Time 984 non-null float64
2 V1 984 non-null float64
3 V2 984 non-null float64
4 V3 984 non-null float64
5 V4 984 non-null float64
6 V5 984 non-null float64
7 V6 984 non-null float64
8 V7 984 non-null float64
9 V8 984 non-null float64
10 V9 984 non-null float64
11 V10 984 non-null float64
12 V11 984 non-null float64
13 V12 984 non-null float64
14 V13 984 non-null float64
15 V14 984 non-null float64
16 V15 984 non-null float64
17 V16 984 non-null float64
18 V17 984 non-null float64
19 V18 984 non-null float64
20 V19 984 non-null float64
21 V20 984 non-null float64
22 V21 984 non-null float64
23 V22 984 non-null float64
24 V23 984 non-null float64
25 V24 984 non-null float64
26 V25 984 non-null float64
27 V26 984 non-null float64
28 V27 984 non-null float64
29 V28 984 non-null float64
30 Amount 984 non-null float64
31 Class 984 non-null int64
dtypes: float64(30), int64(2)
memory usage: 246.1 KB
The data contains all fraudulent transactions from the original dataset, and the same number of randomly selected non-fraudulent transactions.
total = len(data)
normal = len(data[data.Class == 0])
fraudulent = len(data[data.Class == 1])
fraud_percentage = round(fraudulent/total*100, 2)
print('Total number of transactions: {}'.format(total))
print('Number of normal transactions {}'.format(normal))
print('Number of fraudulent transactions: {}'.format(fraudulent))
Output:
Total number of transactions: 984
Number of normal transactions 492
Number of fraudulent transactions: 492
Here is how the data looks:
data.head()
Output:
Unnamed: 0 Time V1 V2 V3 V4 V5 V6 V7 V8 ... V21 V22 V23 V24 V25 V26 V27 V28 Amount Class 0 121 77.0 -0.427191 0.745708 1.761811 -0.165130 0.058298 -0.213413 0.647323 0.073464 ... -0.201681 -0.432070 0.013164 0.161606 -0.401310 0.047423 0.102549 -0.116571 9.12 0 1 248296 153875.0 -0.613696 3.698772 -5.534941 5.620486 1.649263 -2.335145 -0.907188 0.706362 ... 0.319261 -0.471379 -0.075890 -0.667909 -0.642848 0.070600 0.488410 0.292345 0.00 1 2 239 160.0 1.171439 0.474974 0.011761 1.264303 0.116234 -0.865986 0.554393 -0.276375 ... 0.070051 0.278843 -0.097491 0.426278 0.744938 -0.274728 0.008472 0.015492 20.00 0 3 239501 150139.0 -6.682832 -2.714268 -5.774530 1.449792 -0.661836 -1.148650 0.849686 0.433427 ... 0.220526 1.187013 0.335821 0.215683 0.803110 0.044033 -0.054988 0.082337 237.26 1 4 143336 85285.0 -6.713407 3.921104 -9.746678 5.148263 -5.151563 -2.099389 -5.937767 3.578780 ... 0.954272 -0.451086 0.127214 -0.339450 0.394096 1.075295 1.649906 -0.394905 252.92 1 5 rows × 32 columns
Perform Feature Engineering Tasks
We will perform some simple data transformations:
- order the rows
- rename the columns to include the feature group prefix "CC1"
- add a transaction id column "TxnId" and assign a unique generated value to it
- select (drop the index column) and order columns
# rename the index column from the orignal dataset
data = data.rename(columns={"Unnamed: 0": "OldIdx"})
# order the rows by timestamp (original index order)
data = data.sort_values("OldIdx")
data.reset_index(drop=True, inplace=True)
data
Output:
OldIdx Time V1 V2 V3 V4 V5 V6 V7 V8 ... V21 V22 V23 V24 V25 V26 V27 V28 Amount Class 0 0 0.0 -1.359807 -0.072781 2.536347 1.378155 -0.338321 0.462388 0.239599 0.098698 ... -0.018307 0.277838 -0.110474 0.066928 0.128539 -0.189115 0.133558 -0.021053 149.62 0 1 1 0.0 1.191857 0.266151 0.166480 0.448154 0.060018 -0.082361 -0.078803 0.085102 ... -0.225775 -0.638672 0.101288 -0.339846 0.167170 0.125895 -0.008983 0.014724 2.69 0 2 2 1.0 -1.358354 -1.340163 1.773209 0.379780 -0.503198 1.800499 0.791461 0.247676 ... 0.247998 0.771679 0.909412 -0.689281 -0.327642 -0.139097 -0.055353 -0.059752 378.66 0 3 3 1.0 -0.966272 -0.185226 1.792993 -0.863291 -0.010309 1.247203 0.237609 0.377436 ... -0.108300 0.005274 -0.190321 -1.175575 0.647376 -0.221929 0.062723 0.061458 123.50 0 4 4 2.0 -1.158233 0.877737 1.548718 0.403034 -0.407193 0.095921 0.592941 -0.270533 ... -0.009431 0.798278 -0.137458 0.141267 -0.206010 0.502292 0.219422 0.215153 69.99 0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... 979 279863 169142.0 -1.927883 1.125653 -4.518331 1.749293 -1.566487 -2.010494 -0.882850 0.697211 ... 0.778584 -0.319189 0.639419 -0.294885 0.537503 0.788395 0.292680 0.147968 390.00 1 980 280143 169347.0 1.378559 1.289381 -5.004247 1.411850 0.442581 -1.326536 -1.413170 0.248525 ... 0.370612 0.028234 -0.145640 -0.081049 0.521875 0.739467 0.389152 0.186637 0.76 1 981 280149 169351.0 -0.676143 1.126366 -2.213700 0.468308 -1.120541 -0.003346 -2.234739 1.210158 ... 0.751826 0.834108 0.190944 0.032070 -0.739695 0.471111 0.385107 0.194361 77.89 1 982 281144 169966.0 -3.113832 0.585864 -5.399730 1.817092 -0.840618 -2.943548 -2.208002 1.058733 ... 0.583276 -0.269209 -0.456108 -0.183659 -0.328168 0.606116 0.884876 -0.253700 245.00 1 983 281674 170348.0 1.991976 0.158476 -2.583441 0.408670 1.151147 -0.096695 0.223050 -0.068384 ... -0.164350 -0.295135 -0.072173 -0.450261 0.313267 -0.289617 0.002988 -0.015309 42.53 1 984 rows × 32 columns
# rename the columns to include the feature group prefix "CC1"
curr_columns = data.columns
data = data.rename(columns=dict(zip(curr_columns, ["CC1_"+c for c in curr_columns])))
data
Output:
CC1_OldIdx CC1_Time CC1_V1 CC1_V2 CC1_V3 CC1_V4 CC1_V5 CC1_V6 CC1_V7 CC1_V8 ... CC1_V21 CC1_V22 CC1_V23 CC1_V24 CC1_V25 CC1_V26 CC1_V27 CC1_V28 CC1_Amount CC1_Class 0 0 0.0 -1.359807 -0.072781 2.536347 1.378155 -0.338321 0.462388 0.239599 0.098698 ... -0.018307 0.277838 -0.110474 0.066928 0.128539 -0.189115 0.133558 -0.021053 149.62 0 1 1 0.0 1.191857 0.266151 0.166480 0.448154 0.060018 -0.082361 -0.078803 0.085102 ... -0.225775 -0.638672 0.101288 -0.339846 0.167170 0.125895 -0.008983 0.014724 2.69 0 2 2 1.0 -1.358354 -1.340163 1.773209 0.379780 -0.503198 1.800499 0.791461 0.247676 ... 0.247998 0.771679 0.909412 -0.689281 -0.327642 -0.139097 -0.055353 -0.059752 378.66 0 3 3 1.0 -0.966272 -0.185226 1.792993 -0.863291 -0.010309 1.247203 0.237609 0.377436 ... -0.108300 0.005274 -0.190321 -1.175575 0.647376 -0.221929 0.062723 0.061458 123.50 0 4 4 2.0 -1.158233 0.877737 1.548718 0.403034 -0.407193 0.095921 0.592941 -0.270533 ... -0.009431 0.798278 -0.137458 0.141267 -0.206010 0.502292 0.219422 0.215153 69.99 0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... 979 279863 169142.0 -1.927883 1.125653 -4.518331 1.749293 -1.566487 -2.010494 -0.882850 0.697211 ... 0.778584 -0.319189 0.639419 -0.294885 0.537503 0.788395 0.292680 0.147968 390.00 1 980 280143 169347.0 1.378559 1.289381 -5.004247 1.411850 0.442581 -1.326536 -1.413170 0.248525 ... 0.370612 0.028234 -0.145640 -0.081049 0.521875 0.739467 0.389152 0.186637 0.76 1 981 280149 169351.0 -0.676143 1.126366 -2.213700 0.468308 -1.120541 -0.003346 -2.234739 1.210158 ... 0.751826 0.834108 0.190944 0.032070 -0.739695 0.471111 0.385107 0.194361 77.89 1 982 281144 169966.0 -3.113832 0.585864 -5.399730 1.817092 -0.840618 -2.943548 -2.208002 1.058733 ... 0.583276 -0.269209 -0.456108 -0.183659 -0.328168 0.606116 0.884876 -0.253700 245.00 1 983 281674 170348.0 1.991976 0.158476 -2.583441 0.408670 1.151147 -0.096695 0.223050 -0.068384 ... -0.164350 -0.295135 -0.072173 -0.450261 0.313267 -0.289617 0.002988 -0.015309 42.53 1 984 rows × 32 columns
Create a Spark dataframe to save data to Aerospike efficiently through the Aerospike Spark Connector.
sparkDF = spark.createDataFrame(data)
sparkDF.printSchema()
sparkDF.toPandas()
Output:
root
|-- CC1_OldIdx: long (nullable = true)
|-- CC1_Time: double (nullable = true)
|-- CC1_V1: double (nullable = true)
|-- CC1_V2: double (nullable = true)
|-- CC1_V3: double (nullable = true)
|-- CC1_V4: double (nullable = true)
|-- CC1_V5: double (nullable = true)
|-- CC1_V6: double (nullable = true)
|-- CC1_V7: double (nullable = true)
|-- CC1_V8: double (nullable = true)
|-- CC1_V9: double (nullable = true)
|-- CC1_V10: double (nullable = true)
|-- CC1_V11: double (nullable = true)
|-- CC1_V12: double (nullable = true)
|-- CC1_V13: double (nullable = true)
|-- CC1_V14: double (nullable = true)
|-- CC1_V15: double (nullable = true)
|-- CC1_V16: double (nullable = true)
|-- CC1_V17: double (nullable = true)
|-- CC1_V18: double (nullable = true)
|-- CC1_V19: double (nullable = true)
|-- CC1_V20: double (nullable = true)
|-- CC1_V21: double (nullable = true)
|-- CC1_V22: double (nullable = true)
|-- CC1_V23: double (nullable = true)
|-- CC1_V24: double (nullable = true)
|-- CC1_V25: double (nullable = true)
|-- CC1_V26: double (nullable = true)
|-- CC1_V27: double (nullable = true)
|-- CC1_V28: double (nullable = true)
|-- CC1_Amount: double (nullable = true)
|-- CC1_Class: long (nullable = true)
Output:
CC1_OldIdx CC1_Time CC1_V1 CC1_V2 CC1_V3 CC1_V4 CC1_V5 CC1_V6 CC1_V7 CC1_V8 ... CC1_V21 CC1_V22 CC1_V23 CC1_V24 CC1_V25 CC1_V26 CC1_V27 CC1_V28 CC1_Amount CC1_Class 0 0 0.0 -1.359807 -0.072781 2.536347 1.378155 -0.338321 0.462388 0.239599 0.098698 ... -0.018307 0.277838 -0.110474 0.066928 0.128539 -0.189115 0.133558 -0.021053 149.62 0 1 1 0.0 1.191857 0.266151 0.166480 0.448154 0.060018 -0.082361 -0.078803 0.085102 ... -0.225775 -0.638672 0.101288 -0.339846 0.167170 0.125895 -0.008983 0.014724 2.69 0 2 2 1.0 -1.358354 -1.340163 1.773209 0.379780 -0.503198 1.800499 0.791461 0.247676 ... 0.247998 0.771679 0.909412 -0.689281 -0.327642 -0.139097 -0.055353 -0.059752 378.66 0 3 3 1.0 -0.966272 -0.185226 1.792993 -0.863291 -0.010309 1.247203 0.237609 0.377436 ... -0.108300 0.005274 -0.190321 -1.175575 0.647376 -0.221929 0.062723 0.061458 123.50 0 4 4 2.0 -1.158233 0.877737 1.548718 0.403034 -0.407193 0.095921 0.592941 -0.270533 ... -0.009431 0.798278 -0.137458 0.141267 -0.206010 0.502292 0.219422 0.215153 69.99 0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... 979 279863 169142.0 -1.927883 1.125653 -4.518331 1.749293 -1.566487 -2.010494 -0.882850 0.697211 ... 0.778584 -0.319189 0.639419 -0.294885 0.537503 0.788395 0.292680 0.147968 390.00 1 980 280143 169347.0 1.378559 1.289381 -5.004247 1.411850 0.442581 -1.326536 -1.413170 0.248525 ... 0.370612 0.028234 -0.145640 -0.081049 0.521875 0.739467 0.389152 0.186637 0.76 1 981 280149 169351.0 -0.676143 1.126366 -2.213700 0.468308 -1.120541 -0.003346 -2.234739 1.210158 ... 0.751826 0.834108 0.190944 0.032070 -0.739695 0.471111 0.385107 0.194361 77.89 1 982 281144 169966.0 -3.113832 0.585864 -5.399730 1.817092 -0.840618 -2.943548 -2.208002 1.058733 ... 0.583276 -0.269209 -0.456108 -0.183659 -0.328168 0.606116 0.884876 -0.253700 245.00 1 983 281674 170348.0 1.991976 0.158476 -2.583441 0.408670 1.151147 -0.096695 0.223050 -0.068384 ... -0.164350 -0.295135 -0.072173 -0.450261 0.313267 -0.289617 0.002988 -0.015309 42.53 1 984 rows × 32 columns
Add an identifier column TxnId. Generate and assign a unique value to it.
# add a new column TxnId for transaction id
# define a udf to update the new TxnId column
import pyspark.sql.functions as F
curr_txn_id = 1
def get_txn_id():
global curr_txn_id
txn_id = str(curr_txn_id)
curr_txn_id += 1
return txn_id
txn_id_udf = F.UserDefinedFunction(get_txn_id, StringType())
sparkDF = sparkDF.withColumn('TxnId', txn_id_udf())
# select the needed columns
sparkDF = sparkDF.select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])
sparkDF.show(3, truncate=3)
Output:
+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|TxnId|CC1_Class|CC1_Amount|CC1_V1|CC1_V2|CC1_V3|CC1_V4|CC1_V5|CC1_V6|CC1_V7|CC1_V8|CC1_V9|CC1_V10|CC1_V11|CC1_V12|CC1_V13|CC1_V14|CC1_V15|CC1_V16|CC1_V17|CC1_V18|CC1_V19|CC1_V20|CC1_V21|CC1_V22|CC1_V23|CC1_V24|CC1_V25|CC1_V26|CC1_V27|CC1_V28|
+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| 1| 0| 149| -1.| -0.| 2.5| 1.3| -0.| 0.4| 0.2| 0.0| 0.3| 0.0| -0.| -0.| -0.| -0.| 1.4| -0.| 0.2| 0.0| 0.4| 0.2| -0.| 0.2| -0.| 0.0| 0.1| -0.| 0.1| -0.|
| 2| 0| 2.6| 1.1| 0.2| 0.1| 0.4| 0.0| -0.| -0.| 0.0| -0.| -0.| 1.6| 1.0| 0.4| -0.| 0.6| 0.4| -0.| -0.| -0.| -0.| -0.| -0.| 0.1| -0.| 0.1| 0.1| -0.| 0.0|
| 3| 0| 378| -1.| -1.| 1.7| 0.3| -0.| 1.8| 0.7| 0.2| -1.| 0.2| 0.6| 0.0| 0.7| -0.| 2.3| -2.| 1.1| -0.| -2.| 0.5| 0.2| 0.7| 0.9| -0.| -0.| -0.| -0.| -0.|
+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
only showing top 3 rows
Save Features to Feature Store
We are ready to save the features in the Spark dataframe to Aerospike.
Of the following three steps, the first two steps are needed only the first time when the feature group and features are created. Subsequent updates will use only the third step.
- Create a feature group.
- Create feature metadata.
- Save feature values in entity records.
# 1. Create a feature group.
FG_NAME = 'CC1'
FG_DESCRIPTION = 'Credit card transaction data'
FG_SOURCE = 'European cardholder dataset from Kaggle'
fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,
attrs={'entity':'cctxn', 'class':'fraud'}, tags=['kaggle', 'demo'])
fg.save()
# 2. Create feature metadata
FEATURE_AMOUNT = 'Amount'
f = Feature(FG_NAME, FEATURE_AMOUNT, 'double', "Transaction amount",
attrs={'entity':'cctxn'}, tags=['usd'])
f.save()
FEATURE_CLASS = 'Class'
f = Feature(FG_NAME, FEATURE_CLASS, 'integer', "Label indicating fraud or not",
attrs={'entity':'cctxn'}, tags=['label'])
f.save()
FEATURE_PCA_XFORM = "V"
for i in range(1,29):
f = Feature(FG_NAME, FEATURE_PCA_XFORM+str(i), 'double', "Transformed version of PCA",
attrs={'entity':'cctxn'}, tags=['pca'])
f.save()
# 3. Save feature values in entity records
ENTITY_TYPE = 'cctxn'
ID_COLUMN = 'TxnId'
Entity.saveDF(sparkDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')
Output:
Features stored to Feature Store.
Query Features
Let's issue a query against the saved features to get fraudulent transactions with amount greater than $2000. There appears to be only one such transaction in the demo data.
schema = StructType([StructField(ID_COLUMN, StringType(), False),
StructField(FG_NAME+'_'+FEATURE_CLASS, IntegerType(), False),
StructField(FG_NAME+'_'+FEATURE_AMOUNT, DoubleType(), False)])
for i in range(1,29):
schema.add(FG_NAME+'_'+FEATURE_PCA_XFORM+str(i), DoubleType(), True)
queryDF = Entity.query(ENTITY_TYPE, 'CC1_Class == 1 and CC1_Amount > 2000', schema, ID_COLUMN)
queryDF.show(5, truncate=7)
Output:
+------+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| TxnId|CC1_Class|CC1_Amount| CC1_V1| CC1_V2| CC1_V3| CC1_V4| CC1_V5| CC1_V6| CC1_V7| CC1_V8| CC1_V9|CC1_V10|CC1_V11|CC1_V12|CC1_V13|CC1_V14|CC1_V15|CC1_V16|CC1_V17|CC1_V18|CC1_V19|CC1_V20|CC1_V21|CC1_V22|CC1_V23|CC1_V24|CC1_V25|CC1_V26|CC1_V27|CC1_V28|
+------+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|176050| 1| 2125.87|-2.0...|-7.1...|-4.0...|1.30...|-2.0...|-0.0...|2.88...|-0.7...|1.46...|-1.5...|-1.3...|-0.2...|-1.5...|1.07...|0.38...|-0.6...|0.09...|0.33...|0.05...|3.97...|1.24...|-1.0...|-1.8...|0.65...|-0.4...|-0.8...|-0.3...|0.31...|
+------+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
Examine the data through aql with the following command:
!aql -c "select * from test.cctxn-features where PK='176050'"
Output:
select * from test.cctxn-features where PK='176050'
+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+----------+
| CC1_Amount | CC1_Class | CC1_V1 | CC1_V10 | CC1_V11 | CC1_V12 | CC1_V13 | CC1_V14 | CC1_V15 | CC1_V16 | CC1_V17 | CC1_V18 | CC1_V19 | CC1_V2 | CC1_V20 | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | CC1_V8 | CC1_V9 | TxnId |
+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+----------+
| 2125.87 | 1 | -2.00345953080582 | -1.53160798206082 | -1.39432826167269 | -0.220718797789479 | -1.53099043146804 | 1.0752476539262 | 0.388383209307268 | -0.660655352312646 | 0.0933209955444861 | 0.335742221574637 | 0.0575510393537501 | -7.15904171709445 | 3.97321702726744 | 1.24428677489095 | -1.01523228673153 | -1.80098486605048 | 0.657585626965743 | -0.435617246752788 | -0.894508922176968 | -0.39755738695085 | 0.314261714087509 | -4.05097631587393 | 1.30957974749918 | -2.05810158798669 | -0.0986209270722274 | 2.88008272715204 | -0.727484046608914 | 1.4603805509699 | "176050" |
+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+----------+
1 row in set (0.001 secs)
OK
Example: Salary Data
Let's look at another example of feature store use for data engineering, which is taken from another Aerospike Spark notebook.
We will illustrate use of the feature store for Feature Engineering through the following sequence:
- Generate demo data.
- Perform feature engineering tasks.
- Save the engineered features to Feature Store.
Generate Demo Data
# We create age vs salary data, using three different Gaussian distributions
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math
# Make sure we get the same results every time this workbook is run
# Otherwise we are occasionally exposed to results not working out as expected
np.random.seed(12345)
# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2],
[correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]
# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
distribution_params["age_salary_correlation"])
return np.random.multivariate_normal(mean, cov, sample_size).T
# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
"age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}
age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
"age_std_dev":4,"salary_std_dev":8000,"age_salary_correlation":0.7}
age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
"age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}
distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]
# Sample age/salary data for each distributions
sample_size_1 = 100;
sample_size_2 = 120;
sample_size_3 = 80;
sample_sizes = [sample_size_1,sample_size_2,sample_size_3]
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=sample_size_1)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=sample_size_2)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=sample_size_3)
ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])
print("Data created")
Output:
Data created
Perform Feature Engineering Tasks
Display simulated age/salary data.
# Plot the sample data
group_1_colour, group_2_colour, group_3_colour ='red','blue', 'pink'
plt.xlabel('Age',fontsize=10)
plt.ylabel("Salary",fontsize=10)
plt.scatter(group_1_ages,group_1_salaries,c=group_1_colour,label="Group 1")
plt.scatter(group_2_ages,group_2_salaries,c=group_2_colour,label="Group 2")
plt.scatter(group_3_ages,group_3_salaries,c=group_3_colour,label="Group 3")
plt.legend(loc='upper left')
plt.show()
Output:
# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []
for i in range(0, len(ages)) :
id = i + 1 # Avoid counting from zero
name = "Individual: {:03d}".format(id)
# Note we need to make sure values are typed correctly
# salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
age = float(ages[i])
salary = int(salaries[i])
inputBuf.append((id, name,age,salary))
# Convert to an RDD
inputRDD = spark.sparkContext.parallelize(inputBuf)
# Convert to a data frame using a schema
# Note the feature group SAL is prefixed to each feature column
schema = StructType([
StructField("id", IntegerType(), True),
StructField("SAL_name", StringType(), True),
StructField("SAL_age", DoubleType(), True),
StructField("SAL_salary",IntegerType(), True)
])
inputDF=spark.createDataFrame(inputRDD,schema)
inputDF.show(5)
Output:
+---+---------------+------------------+----------+
| id| SAL_name| SAL_age|SAL_salary|
+---+---------------+------------------+----------+
| 1|Individual: 001| 25.39547052370498| 48976|
| 2|Individual: 002|24.314035458986748| 47402|
| 3|Individual: 003|26.918958635987888| 59828|
| 4|Individual: 004| 25.29664106310324| 50464|
| 5|Individual: 005|26.419729731447458| 53845|
+---+---------------+------------------+----------+
only showing top 5 rows
Save Features to Feature Store
- Create a feature group.
- Create feature metadata.
- Save feature values in entity records.
# 1. Create a feature group.
FG_NAME = 'SAL'
FG_DESCRIPTION = 'Age salary data'
FG_SOURCE = 'Generated demo data'
fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,
attrs={'access':'all'}, tags=['test'])
fg.save()
# 2. Create features metadata.
FEATURE_NAME = 'name'
f = Feature(FG_NAME, FEATURE_NAME, "string", "Name of the person",
attrs={'unique':'no'}, tags=['test'])
f.save()
FEATURE_AGE = 'age'
f = Feature(FG_NAME, FEATURE_AGE, "double", "Age of the person",
attrs={'range':'0-100'}, tags=['test'])
f.save()
FEATURE_SALARY = 'salary'
f = Feature(FG_NAME, FEATURE_SALARY, 'integer', "Salary of the person",
attrs={'range':'20-999K'}, tags=['test'])
f.save()
# 3. Save feature values in entity records.
ENTITY_TYPE = 'user'
ID_COLUMN = 'id'
Entity.saveDF(inputDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')
Output:
Features stored to Feature Store.
Query Features
Issue a query against the saved features to get a dataframe with records in age group 25 to 30. There are 50 such users in the dataset.
queryDF = Entity.query(ENTITY_TYPE, '`SAL_age` between 25 and 30', schema, ID_COLUMN)
print("Number of users of age between 25 and 30: {}".format(queryDF.count()))
queryDF.show(5)
Output:
Number of users of age between 25 and 30: 50
+---+---------------+------------------+----------+
| id| SAL_name| SAL_age|SAL_salary|
+---+---------------+------------------+----------+
| 85|Individual: 085|26.288287033377657| 59603|
| 1|Individual: 001| 25.39547052370498| 48976|
| 14|Individual: 014| 25.59043077849547| 51513|
| 79|Individual: 079|25.887490702675926| 48162|
| 4|Individual: 004| 25.29664106310324| 50464|
+---+---------------+------------------+----------+
only showing top 5 rows
Examine the data through aql with the following command:
!aql -c "select * from test.user-features where PK=85"
Output:
select * from test.user-features where PK=85
+----+-------------------+-------------------+------------+
| id | SAL_age | SAL_name | SAL_salary |
+----+-------------------+-------------------+------------+
| 85 | 26.28828703337766 | "Individual: 085" | 59603 |
+----+-------------------+-------------------+------------+
1 row in set (0.001 secs)
OK
Takeaways and Conclusion
In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store via the Aerospike Spark Connector. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We used the API to save and query features created in two data engineering examples.
This is a first in the series of notebooks on how Aerospike can be used as a feature store. Subsequent notebooks in this series will explore use of Aerospike Feature Store for Model Training and Model Serving.
Cleaning Up
Close the spark session, and remove the tutorial data by executing the cell below.
try:
spark.stop()
except:
; # ignore
# To remove all data in the namespace test, uncomment the following line and run:
#!aql -c "truncate test"
Further Exploration and Resources
Here are some links for further exploration.
Resources
- Related notebooks
- Related blog posts
- Aerospike Developer Hub
- Github repos
Exploring Other Notebooks
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.