Cache-aside pattern recipe
Implement the cache-aside (lazy-loading) pattern using Aerospike as your cache layer.
Use case
Reduce database load by caching:
- Expensive query results
- Frequently accessed reference data
- Computed aggregations
How it works
┌─────────────┐ 1. Check cache ┌─────────────┐│ Application │ ──────────────────── │ Aerospike ││ │ │ (Cache) ││ │ ◄─────────────────── │ ││ │ 2. Cache hit? │ ││ │ └─────────────┘│ ││ │ 3. Cache miss ┌─────────────┐│ │ ──────────────────── │ Database ││ │ │ (Source) ││ │ ◄─────────────────── │ ││ │ 4. Get data └─────────────┘│ ││ │ 5. Update cache ┌─────────────┐│ │ ──────────────────── │ Aerospike │└─────────────┘ └─────────────┘Quick implementation
import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.Serializable;import java.util.Optional;import java.util.function.Function;
public class CacheAside<T extends Serializable> { private final Session session; private final DataSet cache; private final Function<String, T> loader; private final int ttlSeconds;
public CacheAside(Cluster cluster, String cacheName, Function<String, T> loader, int ttlSeconds) { this.session = cluster.createSession(Behavior.DEFAULT); this.cache = DataSet.of("cache", cacheName); this.loader = loader; this.ttlSeconds = ttlSeconds; }
public T get(String key) { // 1. Check cache RecordStream stream = session.query(cache.id(key)).execute(); Optional<Record> cached = stream.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow);
if (cached.isPresent()) { // 2. Cache hit return deserialize(cached.get().getBytes("data")); }
// 3. Cache miss - load from source T value = loader.apply(key);
// 4. Update cache session.insert(cache.id(key)) .bin("data").setTo(serialize(value)) .expireRecordAfterSeconds(ttlSeconds) .execute();
return value; }
public void invalidate(String key) { session.delete(cache.id(key)).execute().close(); }
private byte[] serialize(T value) { try (ByteArrayOutputStream bout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bout)) { out.writeObject(value); out.flush(); return bout.toByteArray(); } catch (IOException e) { throw new RuntimeException("Failed to serialize cached value", e); } }
@SuppressWarnings("unchecked") private T deserialize(byte[] bytes) { try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes); ObjectInputStream in = new ObjectInputStream(bin)) { return (T) in.readObject(); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("Failed to deserialize cached value", e); } }}from typing import Callable, TypeVarimport pickle
T = TypeVar('T')
class CacheAside: def __init__(self, cluster, cache_name: str, loader: Callable[[str], T], ttl_seconds: int): self.session = cluster.create_session(Behavior.DEFAULT) self.cache = DataSet.of("cache", cache_name) self.loader = loader self.ttl_seconds = ttl_seconds
async def get(self, key: str) -> T: # 1. Check cache stream = await self.session.query(self.cache.id(key)).execute() row = await stream.first() cached = row.record_or_raise() if row is not None and row.is_ok and row.record is not None else None stream.close()
if cached: # 2. Cache hit return pickle.loads(cached.bins.get("data"))
# 3. Cache miss - load from source value = self.loader(key)
# 4. Update cache await ( self.session.insert(self.cache.id(key)) .put({"data": pickle.dumps(value)}) .expire_record_after_seconds(self.ttl_seconds) .execute() )
return value
async def invalidate(self, key: str): stream = await self.session.delete(self.cache.id(key)).execute() stream.close()Usage
import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import java.io.Serializable;import java.util.Map;
record User(String id, String name) implements Serializable {}
Map<String, User> database = Map.of( "user-123", new User("user-123", "Alice"));
try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { // Create cache with database loader CacheAside<User> userCache = new CacheAside<>( cluster, "users", userId -> database.get(userId), // Loader function 3600 // 1 hour TTL );
// Get user - checks cache first, then source map User user = userCache.get("user-123"); System.out.println("Loaded user: " + user);
// Invalidate on update userCache.invalidate("user-123");}import asyncio
from aerospike_sdk import Client
database = { "user-123": {"id": "user-123", "name": "Alice"}}
async def main(): async with Client("localhost:3000") as client: # Create cache with database loader user_cache = CacheAside( client, cache_name="users", loader=lambda user_id: database.get(user_id), ttl_seconds=3600, # 1 hour TTL )
# Get user - checks cache first, then source map user = await user_cache.get("user-123") print(f"Loaded user: {user}")
# Invalidate on update await user_cache.invalidate("user-123")
if __name__ == "__main__": asyncio.run(main())Complete example
import com.aerospike.client.sdk.AerospikeException;import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.Record;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.ResultCode;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.Serializable;import java.util.Map;import java.util.Optional;import java.util.function.Function;
public class CacheAsideExample { record User(String id, String name) implements Serializable {}
static class CacheAside<T extends Serializable> { private final Session session; private final DataSet cache; private final Function<String, T> loader; private final int ttlSeconds;
CacheAside(Cluster cluster, String setName, Function<String, T> loader, int ttlSeconds) { this.session = cluster.createSession(Behavior.DEFAULT); this.cache = DataSet.of("test", setName); this.loader = loader; this.ttlSeconds = ttlSeconds; }
T get(String key) { RecordStream read = session.query(cache.id(key)).execute(); Optional<Record> cached; try { cached = read.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow); } finally { read.close(); } if (cached.isPresent()) { return deserialize(cached.get().getBytes("data")); }
T value = loader.apply(key); if (value == null) { return null; }
try { session.insert(cache.id(key)) .bin("data").setTo(serialize(value)) .expireRecordAfterSeconds(ttlSeconds) .execute(); } catch (AerospikeException e) { if (e.getResultCode() != ResultCode.FAIL_FORBIDDEN) { throw e; } session.insert(cache.id(key)) .bin("data").setTo(serialize(value)) .execute(); } return value; }
void invalidate(String key) { session.delete(cache.id(key)).execute().close(); }
private byte[] serialize(T value) { try (ByteArrayOutputStream bout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bout)) { out.writeObject(value); out.flush(); return bout.toByteArray(); } catch (IOException e) { throw new RuntimeException("Failed to serialize cache value", e); } }
@SuppressWarnings("unchecked") private T deserialize(byte[] data) { try (ByteArrayInputStream bin = new ByteArrayInputStream(data); ObjectInputStream in = new ObjectInputStream(bin)) { return (T) in.readObject(); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("Failed to deserialize cache value", e); } } }
public static void main(String[] args) { Map<String, User> database = Map.of("user-123", new User("user-123", "Alice")); try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { CacheAside<User> cache = new CacheAside<>(cluster, "cache_users", database::get, 3600);
User first = cache.get("user-123"); // miss -> load + cache User second = cache.get("user-123"); // hit System.out.println("Loaded user (miss): " + first); System.out.println("Loaded user (hit): " + second);
cache.invalidate("user-123"); } }}import asyncioimport picklefrom typing import Callable, TypeVar
from aerospike_async.exceptions import ResultCodefrom aerospike_sdk import AerospikeError, Behavior, Client, DataSet
T = TypeVar("T")
class CacheAside: def __init__(self, client, cache_name: str, loader: Callable[[str], T], ttl_seconds: int): self.session = client.create_session(Behavior.DEFAULT) self.cache = DataSet.of("test", cache_name) self.loader = loader self.ttl_seconds = ttl_seconds
async def get(self, key: str) -> T | None: stream = await self.session.query(self.cache.id(key)).execute() row = await stream.first() stream.close() if row is not None and row.is_ok and row.record is not None: return pickle.loads(row.record_or_raise().bins.get("data"))
value = self.loader(key) if value is None: return None
payload = {"data": pickle.dumps(value)} try: await ( self.session.insert(key=self.cache.id(key)).put(payload) .expire_record_after_seconds(self.ttl_seconds) .execute() ) except AerospikeError as e: if e.result_code != ResultCode.FAIL_FORBIDDEN: raise await self.session.insert(key=self.cache.id(key)).put(payload).execute() return value
async def invalidate(self, key: str): stream = await self.session.delete(key=self.cache.id(key)).execute() stream.close()
async def main(): database = {"user-123": {"id": "user-123", "name": "Alice"}}
async with Client("localhost:3000") as client: user_cache = CacheAside( client, cache_name="cache_users", loader=lambda user_id: database.get(user_id), ttl_seconds=3600, )
user1 = await user_cache.get("user-123") # miss -> load + cache user2 = await user_cache.get("user-123") # hit print(f"Loaded user (miss): {user1}") print(f"Loaded user (hit): {user2}")
await user_cache.invalidate("user-123")
if __name__ == "__main__": asyncio.run(main())Expected output
Loaded user: {id=user-123, name=Alice}Cache invalidation strategies
| Strategy | Description | When to Use |
|---|---|---|
| TTL expiration | Data expires after fixed time | Acceptable staleness |
| Write-through | Update cache on every write | Consistency required |
| Event-driven | Invalidate on change events | Event system available |
Next steps
Session Store
Build high-velocity session storage.
Rate Limiting
Implement distributed rate limiting.