# Cache-aside pattern recipe

Implement the cache-aside (lazy-loading) pattern using Aerospike as your cache layer.

::: coming soon
This page is under development. Full recipe will include:

-   Complete cache implementation
-   Cache invalidation strategies
-   Write-through vs write-behind
-   Multi-tier caching
:::

## Use case

Reduce database load by caching:

-   Expensive query results
-   Frequently accessed reference data
-   Computed aggregations

## How it works

```plaintext
┌─────────────┐    1. Check cache    ┌─────────────┐

│ Application │ ──────────────────── │  Aerospike  │

│             │                      │   (Cache)   │

│             │ ◄─────────────────── │             │

│             │    2. Cache hit?     │             │

│             │                      └─────────────┘

│             │

│             │    3. Cache miss     ┌─────────────┐

│             │ ──────────────────── │  Database   │

│             │                      │  (Source)   │

│             │ ◄─────────────────── │             │

│             │    4. Get data       └─────────────┘

│             │

│             │    5. Update cache   ┌─────────────┐

│             │ ──────────────────── │  Aerospike  │

└─────────────┘                      └─────────────┘
```

## Quick implementation

-   [Java](#tab-panel-3036)
-   [Python](#tab-panel-3037)

```java
import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import com.aerospike.client.sdk.Session;

import com.aerospike.client.sdk.policy.Behavior;

import java.io.ByteArrayInputStream;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.io.Serializable;

import java.util.Optional;

import java.util.function.Function;

public class CacheAside<T extends Serializable> {

    private final Session session;

    private final DataSet cache;

    private final Function<String, T> loader;

    private final int ttlSeconds;

    public CacheAside(Cluster cluster, String cacheName,

                      Function<String, T> loader, int ttlSeconds) {

        this.session = cluster.createSession(Behavior.DEFAULT);

        this.cache = DataSet.of("cache", cacheName);

        this.loader = loader;

        this.ttlSeconds = ttlSeconds;

    }

    public T get(String key) {

        // 1. Check cache

        RecordStream stream = session.query(cache.id(key)).execute();

        Optional<Record> cached = stream.getFirst()

            .filter(RecordResult::isOk)

            .map(RecordResult::recordOrThrow);

        if (cached.isPresent()) {

            // 2. Cache hit

            return deserialize(cached.get().getBytes("data"));

        }

        // 3. Cache miss - load from source

        T value = loader.apply(key);

        // 4. Update cache

        session.insert(cache.id(key))

            .bin("data").setTo(serialize(value))

            .expireRecordAfterSeconds(ttlSeconds)

            .execute();

        return value;

    }

    public void invalidate(String key) {

        session.delete(cache.id(key)).execute().close();

    }

    private byte[] serialize(T value) {

        try (ByteArrayOutputStream bout = new ByteArrayOutputStream();

             ObjectOutputStream out = new ObjectOutputStream(bout)) {

            out.writeObject(value);

            out.flush();

            return bout.toByteArray();

        } catch (IOException e) {

            throw new RuntimeException("Failed to serialize cached value", e);

        }

    }

    @SuppressWarnings("unchecked")

    private T deserialize(byte[] bytes) {

        try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);

             ObjectInputStream in = new ObjectInputStream(bin)) {

            return (T) in.readObject();

        } catch (IOException | ClassNotFoundException e) {

            throw new RuntimeException("Failed to deserialize cached value", e);

        }

    }

}
```

```python
from typing import Callable, TypeVar

import pickle

T = TypeVar('T')

class CacheAside:

    def __init__(self, cluster, cache_name: str,

                 loader: Callable[[str], T], ttl_seconds: int):

        self.session = cluster.create_session(Behavior.DEFAULT)

        self.cache = DataSet.of("cache", cache_name)

        self.loader = loader

        self.ttl_seconds = ttl_seconds

    async def get(self, key: str) -> T:

        # 1. Check cache

        stream = await self.session.query(self.cache.id(key)).execute()

        row = await stream.first()

        cached = row.record_or_raise() if row is not None and row.is_ok and row.record is not None else None

        stream.close()

        if cached:

            # 2. Cache hit

            return pickle.loads(cached.bins.get("data"))

        # 3. Cache miss - load from source

        value = self.loader(key)

        # 4. Update cache

        await (

            self.session.insert(self.cache.id(key))

            .put({"data": pickle.dumps(value)})

            .expire_record_after_seconds(self.ttl_seconds)

            .execute()

        )

        return value

    async def invalidate(self, key: str):

        stream = await self.session.delete(self.cache.id(key)).execute()

        stream.close()
```

## Usage

-   [Java](#tab-panel-3038)
-   [Python](#tab-panel-3039)

```java
import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.ClusterDefinition;

import java.io.Serializable;

import java.util.Map;

record User(String id, String name) implements Serializable {}

Map<String, User> database = Map.of(

    "user-123", new User("user-123", "Alice")

);

try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) {

    // Create cache with database loader

    CacheAside<User> userCache = new CacheAside<>(

        cluster,

        "users",

        userId -> database.get(userId),  // Loader function

        3600  // 1 hour TTL

    );

    // Get user - checks cache first, then source map

    User user = userCache.get("user-123");

    System.out.println("Loaded user: " + user);

    // Invalidate on update

    userCache.invalidate("user-123");

}
```

```python
import asyncio

from aerospike_sdk import Client

database = {

    "user-123": {"id": "user-123", "name": "Alice"}

}

async def main():

    async with Client("localhost:3000") as client:

        # Create cache with database loader

        user_cache = CacheAside(

            client,

            cache_name="users",

            loader=lambda user_id: database.get(user_id),

            ttl_seconds=3600,  # 1 hour TTL

        )

        # Get user - checks cache first, then source map

        user = await user_cache.get("user-123")

        print(f"Loaded user: {user}")

        # Invalidate on update

        await user_cache.invalidate("user-123")

if __name__ == "__main__":

    asyncio.run(main())
```

## Complete example

-   [Java](#tab-panel-3040)
-   [Python](#tab-panel-3041)

```java
import com.aerospike.client.sdk.AerospikeException;

import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.ClusterDefinition;

import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.Record;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import com.aerospike.client.sdk.ResultCode;

import com.aerospike.client.sdk.Session;

import com.aerospike.client.sdk.policy.Behavior;

import java.io.ByteArrayInputStream;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.io.Serializable;

import java.util.Map;

import java.util.Optional;

import java.util.function.Function;

public class CacheAsideExample {

    record User(String id, String name) implements Serializable {}

    static class CacheAside<T extends Serializable> {

        private final Session session;

        private final DataSet cache;

        private final Function<String, T> loader;

        private final int ttlSeconds;

        CacheAside(Cluster cluster, String setName, Function<String, T> loader, int ttlSeconds) {

            this.session = cluster.createSession(Behavior.DEFAULT);

            this.cache = DataSet.of("test", setName);

            this.loader = loader;

            this.ttlSeconds = ttlSeconds;

        }

        T get(String key) {

            RecordStream read = session.query(cache.id(key)).execute();

            Optional<Record> cached;

            try {

                cached = read.getFirst()

                    .filter(RecordResult::isOk)

                    .map(RecordResult::recordOrThrow);

            } finally {

                read.close();

            }

            if (cached.isPresent()) {

                return deserialize(cached.get().getBytes("data"));

            }

            T value = loader.apply(key);

            if (value == null) {

                return null;

            }

            try {

                session.insert(cache.id(key))

                    .bin("data").setTo(serialize(value))

                    .expireRecordAfterSeconds(ttlSeconds)

                    .execute();

            } catch (AerospikeException e) {

                if (e.getResultCode() != ResultCode.FAIL_FORBIDDEN) {

                    throw e;

                }

                session.insert(cache.id(key))

                    .bin("data").setTo(serialize(value))

                    .execute();

            }

            return value;

        }

        void invalidate(String key) {

            session.delete(cache.id(key)).execute().close();

        }

        private byte[] serialize(T value) {

            try (ByteArrayOutputStream bout = new ByteArrayOutputStream();

                 ObjectOutputStream out = new ObjectOutputStream(bout)) {

                out.writeObject(value);

                out.flush();

                return bout.toByteArray();

            } catch (IOException e) {

                throw new RuntimeException("Failed to serialize cache value", e);

            }

        }

        @SuppressWarnings("unchecked")

        private T deserialize(byte[] data) {

            try (ByteArrayInputStream bin = new ByteArrayInputStream(data);

                 ObjectInputStream in = new ObjectInputStream(bin)) {

                return (T) in.readObject();

            } catch (IOException | ClassNotFoundException e) {

                throw new RuntimeException("Failed to deserialize cache value", e);

            }

        }

    }

    public static void main(String[] args) {

        Map<String, User> database = Map.of("user-123", new User("user-123", "Alice"));

        try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) {

            CacheAside<User> cache = new CacheAside<>(cluster, "cache_users", database::get, 3600);

            User first = cache.get("user-123");   // miss -> load + cache

            User second = cache.get("user-123");  // hit

            System.out.println("Loaded user (miss): " + first);

            System.out.println("Loaded user (hit): " + second);

            cache.invalidate("user-123");

        }

    }

}
```

```python
import asyncio

import pickle

from typing import Callable, TypeVar

from aerospike_async.exceptions import ResultCode

from aerospike_sdk import AerospikeError, Behavior, Client, DataSet

T = TypeVar("T")

class CacheAside:

    def __init__(self, client, cache_name: str, loader: Callable[[str], T], ttl_seconds: int):

        self.session = client.create_session(Behavior.DEFAULT)

        self.cache = DataSet.of("test", cache_name)

        self.loader = loader

        self.ttl_seconds = ttl_seconds

    async def get(self, key: str) -> T | None:

        stream = await self.session.query(self.cache.id(key)).execute()

        row = await stream.first()

        stream.close()

        if row is not None and row.is_ok and row.record is not None:

            return pickle.loads(row.record_or_raise().bins.get("data"))

        value = self.loader(key)

        if value is None:

            return None

        payload = {"data": pickle.dumps(value)}

        try:

            await (

                self.session.insert(key=self.cache.id(key)).put(payload)

                .expire_record_after_seconds(self.ttl_seconds)

                .execute()

            )

        except AerospikeError as e:

            if e.result_code != ResultCode.FAIL_FORBIDDEN:

                raise

            await self.session.insert(key=self.cache.id(key)).put(payload).execute()

        return value

    async def invalidate(self, key: str):

        stream = await self.session.delete(key=self.cache.id(key)).execute()

        stream.close()

async def main():

    database = {"user-123": {"id": "user-123", "name": "Alice"}}

    async with Client("localhost:3000") as client:

        user_cache = CacheAside(

            client,

            cache_name="cache_users",

            loader=lambda user_id: database.get(user_id),

            ttl_seconds=3600,

        )

        user1 = await user_cache.get("user-123")  # miss -> load + cache

        user2 = await user_cache.get("user-123")  # hit

        print(f"Loaded user (miss): {user1}")

        print(f"Loaded user (hit): {user2}")

        await user_cache.invalidate("user-123")

if __name__ == "__main__":

    asyncio.run(main())
```

### Expected output

```text
Loaded user: {id=user-123, name=Alice}
```

## Cache invalidation strategies

| Strategy | Description | When to Use |
| --- | --- | --- |
| TTL expiration | Data expires after fixed time | Acceptable staleness |
| Write-through | Update cache on every write | Consistency required |
| Event-driven | Invalidate on change events | Event system available |

## Next steps

Session Store

Build high-velocity session storage.

[Session Store →](https://aerospike.com/docs/develop/client/sdk/recipes/session-store)

Rate Limiting

Implement distributed rate limiting.

[Rate Limiting →](https://aerospike.com/docs/develop/client/sdk/recipes/rate-limiting)