Skip to content

Prerequisites and setup

For the complete documentation index see: llms.txt

All documentation pages available in markdown.

This tutorial assumes Aerospike and Spark are running and your Part 1 class definitions are loaded. Use the provided feature_store_tutorial.ipynb notebook, then continue in Part 2.

Continue your notebook

Open feature_store_tutorial.ipynb. Part 2 starts at Cell 1. Part 2 uses a new synthetic training slice on purpose, so IDs, dataset names, and output paths differ from the smaller Part 1 examples. You will work through Cells 1-13 in this part.

Prepare Part 2 data

Part 2 begins with two pre-filled notebook cells. Run them in order:

  1. Run Cell 1 to set shared constants and validate that the Part 1 feature metadata is available.
  2. Run Cell 2 to generate labeled driver data and load it into Aerospike.

Cell 1: Setup constants and validate environment

from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType, LongType, IntegerType
)
try:
import numpy as np
except ModuleNotFoundError:
np = None
if np is None:
print("NumPy not found; continuing (install later if your local kernel requires it).")
else:
print(f"NumPy import OK ({np.__version__})")
print(f"Spark version: {spark.version}")
# Shared constants used across Part 2 pages
ENTITY_TYPE = "driver"
ENTITY_ID_COL = "driver_id"
FEATURE_COLUMNS = ["ds_decl_rate", "ds_avg_rating", "da_trips_today"]
LABEL_COL = "label"
TRAIN_COLUMNS = FEATURE_COLUMNS + [LABEL_COL]
TRAINING_PREDICATE = "driver_id >= 'driver_001' and driver_id <= 'driver_100'"
DATASET_NAME = "trip-decline-risk-v1"
DATASET_LOCATION = "./datasets/trip-decline-risk-v1/"
SCHEMA = StructType([
StructField(ENTITY_ID_COL, StringType(), False),
StructField("ds_decl_rate", DoubleType(), True),
StructField("ds_avg_rating", DoubleType(), True),
StructField("da_trips_today", LongType(), True),
StructField(LABEL_COL, IntegerType(), True)
])
def require_part2(names):
missing = [name for name in names if name not in globals()]
if missing:
raise ValueError(
f"Missing setup variables: {missing}. Re-run Part 2 Cells 1-2."
)
# Guardrail: Part 1 feature metadata must exist
fg = FeatureGroup.load("driver-stats")
if fg is None:
raise ValueError("Feature group 'driver-stats' not found. Run Part 1 first.")
# Ensure driver-activity metadata exists (safe to re-run)
FeatureGroup(
"driver-activity",
"Driver activity metrics from GPS pipeline",
"kafka://events.gps.pings",
{"owner": "ml-platform", "refresh": "realtime", "entity": "driver"},
["driver", "activity"]
).save()
Feature(
"driver-activity", "trips_today", "integer",
"Number of completed trips today",
{"baseline_mean": "8"},
["driver", "activity"]
).save()
print("Setup and metadata validation complete.")
Setup and validation
NumPy import OK (2.4.3)
Spark version: 3.5.3
Setup and metadata validation complete.

The NumPy version line can vary depending on your local environment.

Cell 2: Generate labeled data and load training DataFrame

from pyspark.sql import Row
import random
if "require_part2" not in globals():
raise ValueError("Missing Part 2 bootstrap helper. Re-run Part 2 Cell 1.")
require_part2([
"ENTITY_TYPE", "ENTITY_ID_COL", "FEATURE_COLUMNS", "LABEL_COL",
"TRAINING_PREDICATE", "SCHEMA"
])
random.seed(42)
rows = []
for i in range(1, 101):
driver_id = f"driver_{i:03d}"
if random.random() < 0.2:
decline_rate = random.uniform(0.10, 0.25)
avg_rating = random.uniform(4.0, 4.5)
else:
decline_rate = random.uniform(0.01, 0.08)
avg_rating = random.uniform(4.5, 4.98)
trips_today = random.randint(0, 15)
label = 1 if decline_rate >= 0.10 else 0
rows.append(
Row(
driver_id=driver_id,
ds_decl_rate=round(decline_rate, 3),
ds_avg_rating=round(avg_rating, 2),
da_trips_today=trips_today,
label=label
)
)
bootstrap_df = spark.createDataFrame(rows, SCHEMA)
Entity.saveDF(bootstrap_df, ENTITY_TYPE, ENTITY_ID_COL)
labeled_df = Entity.query(ENTITY_TYPE, TRAINING_PREDICATE, SCHEMA, ENTITY_ID_COL)
print(f"Saved training slice records: {labeled_df.count()}")
labeled_df.groupBy(LABEL_COL).count().orderBy(LABEL_COL).show()
Data generation and DataFrame load
Saved training slice records: 100
+-----+-----+
|label|count|
+-----+-----+
| 0| 82|
| 1| 18|
+-----+-----+

You now have shared setup constants plus a labeled training DataFrame for the rest of Part 2.

Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?