# Rate limiting recipe

Implement distributed rate limiting using Aerospike’s atomic increment operations.

## Use case

Limit API requests per user/IP with:

-   Distributed coordination (works across multiple app servers)
-   Atomic counters (no race conditions)
-   Automatic reset via TTL

## Quick implementation: fixed window

-   [Java](#tab-panel-3042)
-   [Python](#tab-panel-3043)

```java
import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import com.aerospike.client.sdk.Session;

import com.aerospike.client.sdk.policy.Behavior;

public class RateLimiter {

    private final Session session;

    private final DataSet limits;

    private final int maxRequests;

    private final int windowSeconds;

    public RateLimiter(Cluster cluster, int maxRequests, int windowSeconds) {

        this.session = cluster.createSession(Behavior.DEFAULT);

        this.limits = DataSet.of("app", "rate_limits");

        this.maxRequests = maxRequests;

        this.windowSeconds = windowSeconds;

    }

    public boolean isAllowed(String key) {

        // Atomic increment - returns new value

        RecordStream stream = session.update(limits.id(key))

            .bin("count").add(1)

            .expireRecordAfterSeconds(windowSeconds)  // Auto-reset after window

            .execute();

        long count = stream.getFirst()

            .filter(RecordResult::isOk)

            .map(RecordResult::recordOrThrow)

            .map(r -> r.getLong("count"))

            .orElse(1L);

        return count <= maxRequests;

    }

    public long getRemaining(String key) {

        RecordStream stream = session.query(limits.id(key)).execute();

        long remaining = stream.getFirst()

            .filter(RecordResult::isOk)

            .map(RecordResult::recordOrThrow)

            .map(r -> maxRequests - r.getLong("count"))

            .orElse((long) maxRequests);

        return remaining;

    }

}
```

```python
from aerospike_sdk import Behavior, DataSet

class RateLimiter:

    def __init__(self, cluster, max_requests: int, window_seconds: int):

        self.session = cluster.create_session(Behavior.DEFAULT)

        self.limits = DataSet.of("app", "rate_limits")

        self.max_requests = max_requests

        self.window_seconds = window_seconds

    async def is_allowed(self, key: str) -> bool:

        # Atomic increment - returns new value

        stream = await (

            self.session.update(self.limits.id(key))

            .bin("count").increment_by(1)

            .expire_record_after_seconds(self.window_seconds)

            .execute()

        )

        row = await stream.first_or_raise()

        count = row.record_or_raise().bins.get("count", 0)

        stream.close()

        return count <= self.max_requests

    async def get_remaining(self, key: str) -> int:

        stream = await self.session.query(self.limits.id(key)).execute()

        row = await stream.first()

        if row is None or not row.is_ok or row.record is None:

            stream.close()

            return self.max_requests

        count = row.record_or_raise().bins.get("count", 0)

        stream.close()

        return self.max_requests - count
```

## Usage

-   [Java](#tab-panel-3044)
-   [Python](#tab-panel-3045)

```java
import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.ClusterDefinition;

try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) {

    RateLimiter limiter = new RateLimiter(cluster, 100, 60);  // 100 req/min

    String userId = "user-123";

    if (limiter.isAllowed(userId)) {

        // Process request

        System.out.println("Request accepted for " + userId);

    } else {

        throw new RuntimeException(

            "Rate limit exceeded. Remaining: " + limiter.getRemaining(userId)

        );

    }

}
```

```python
import asyncio

from aerospike_sdk import Client

async def main():

    async with Client("localhost:3000") as client:

        limiter = RateLimiter(client, max_requests=100, window_seconds=60)

        user_id = "user-123"

        if await limiter.is_allowed(user_id):

            print(f"Request accepted for {user_id}")

        else:

            remaining = await limiter.get_remaining(user_id)

            raise RuntimeError(f"Rate limit exceeded. Remaining: {remaining}")

if __name__ == "__main__":

    asyncio.run(main())
```

## Complete example

-   [Java](#tab-panel-3046)
-   [Python](#tab-panel-3047)

```java
import com.aerospike.client.sdk.AerospikeException;

import com.aerospike.client.sdk.Cluster;

import com.aerospike.client.sdk.ClusterDefinition;

import com.aerospike.client.sdk.DataSet;

import com.aerospike.client.sdk.RecordResult;

import com.aerospike.client.sdk.RecordStream;

import com.aerospike.client.sdk.ResultCode;

import com.aerospike.client.sdk.Session;

import com.aerospike.client.sdk.policy.Behavior;

public class RateLimiterExample {

    static class RateLimiter {

        private final Session session;

        private final DataSet limits;

        private final int maxRequests;

        private final int windowSeconds;

        RateLimiter(Cluster cluster, int maxRequests, int windowSeconds) {

            this.session = cluster.createSession(Behavior.DEFAULT);

            this.limits = DataSet.of("test", "rate_limits");

            this.maxRequests = maxRequests;

            this.windowSeconds = windowSeconds;

        }

        boolean isAllowed(String key) {

            try {

                session.upsert(limits.id(key))

                    .bin("count").add(1)

                    .expireRecordAfterSeconds(windowSeconds)

                    .execute();

            } catch (AerospikeException e) {

                if (e.getResultCode() == ResultCode.FAIL_FORBIDDEN) {

                    session.upsert(limits.id(key))

                        .bin("count").add(1)

                        .execute();

                } else {

                    throw e;

                }

            }

            return getCount(key) <= maxRequests;

        }

        long getRemaining(String key) {

            return Math.max(maxRequests - getCount(key), 0);

        }

        private long getCount(String key) {

            RecordStream stream = session.query(limits.id(key)).execute();

            return stream.getFirst()

                .filter(RecordResult::isOk)

                .map(RecordResult::recordOrThrow)

                .map(r -> r.getLong("count"))

                .orElse(0L);

        }

    }

    public static void main(String[] args) {

        try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) {

            RateLimiter limiter = new RateLimiter(cluster, 3, 60); // 3 req/min demo

            String key = "user-123";

            // Reset the demo key so output is deterministic.

            limiter.session.delete(limiter.limits.id(key)).execute().close();

            for (int i = 1; i <= 5; i++) {

                boolean allowed = limiter.isAllowed(key);

                long remaining = limiter.getRemaining(key);

                System.out.println(

                    "Request " + i + ": " + (allowed ? "ALLOWED" : "BLOCKED") +

                    " (remaining=" + Math.max(remaining, 0) + ")"

                );

            }

        }

    }

}
```

```python
import asyncio

from aerospike_async.exceptions import ResultCode

from aerospike_sdk import AerospikeError, Behavior, Client, DataSet

class RateLimiter:

    def __init__(self, client, max_requests: int, window_seconds: int):

        self.session = client.create_session(Behavior.DEFAULT)

        self.limits = DataSet.of("test", "rate_limits")

        self.max_requests = max_requests

        self.window_seconds = window_seconds

    async def is_allowed(self, key: str) -> bool:

        try:

            await (

                self.session.upsert(self.limits.id(key))

                .bin("count").increment_by(1)

                .expire_record_after_seconds(self.window_seconds)

                .execute()

            )

        except AerospikeError as e:

            if e.result_code == ResultCode.FAIL_FORBIDDEN:

                await (

                    self.session.upsert(self.limits.id(key))

                    .bin("count").increment_by(1)

                    .execute()

                )

            else:

                raise

        return await self.get_count(key) <= self.max_requests

    async def get_remaining(self, key: str) -> int:

        return max(self.max_requests - await self.get_count(key), 0)

    async def get_count(self, key: str) -> int:

        stream = await self.session.query(self.limits.id(key)).execute()

        row = await stream.first()

        stream.close()

        if row is None or not row.is_ok or row.record is None:

            return 0

        return row.record_or_raise().bins.get("count", 0)

async def main():

    async with Client("localhost:3000") as client:

        limiter = RateLimiter(client, max_requests=3, window_seconds=60)

        key = "user-123"

        # Reset the demo key so output is deterministic.

        stream = await limiter.session.delete(key=limiter.limits.id(key)).execute()

        stream.close()

        for i in range(1, 6):

            allowed = await limiter.is_allowed(key)

            remaining = await limiter.get_remaining(key)

            status = "ALLOWED" if allowed else "BLOCKED"

            print(f"Request {i}: {status} (remaining={max(remaining, 0)})")

if __name__ == "__main__":

    asyncio.run(main())
```

### Expected output

```text
Request accepted for user-123
```

## Why Aerospike for rate limiting?

1.  **Atomic operations** — `increment()` is atomic across distributed servers
2.  **TTL-based reset** — Windows auto-expire without background jobs
3.  **Sub-millisecond latency** — Doesn’t slow down your API
4.  **Horizontal scaling** — Works across any number of app servers

## Next steps

Session Store

Build high-velocity session storage.

[Session Store →](https://aerospike.com/docs/develop/client/sdk/recipes/session-store)

Cache-Aside Pattern

Use Aerospike as a cache layer.

[Cache-Aside →](https://aerospike.com/docs/develop/client/sdk/recipes/cache-aside)