Rate limiting recipe
Implement distributed rate limiting using Aerospike’s atomic increment operations.
Use case
Limit API requests per user/IP with:
- Distributed coordination (works across multiple app servers)
- Atomic counters (no race conditions)
- Automatic reset via TTL
Quick implementation: fixed window
import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;
public class RateLimiter { private final Session session; private final DataSet limits; private final int maxRequests; private final int windowSeconds;
public RateLimiter(Cluster cluster, int maxRequests, int windowSeconds) { this.session = cluster.createSession(Behavior.DEFAULT); this.limits = DataSet.of("app", "rate_limits"); this.maxRequests = maxRequests; this.windowSeconds = windowSeconds; }
public boolean isAllowed(String key) { // Atomic increment - returns new value RecordStream stream = session.update(limits.id(key)) .bin("count").add(1) .expireRecordAfterSeconds(windowSeconds) // Auto-reset after window .execute(); long count = stream.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow) .map(r -> r.getLong("count")) .orElse(1L);
return count <= maxRequests; }
public long getRemaining(String key) { RecordStream stream = session.query(limits.id(key)).execute(); long remaining = stream.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow) .map(r -> maxRequests - r.getLong("count")) .orElse((long) maxRequests); return remaining; }}from aerospike_sdk import Behavior, DataSet
class RateLimiter: def __init__(self, cluster, max_requests: int, window_seconds: int): self.session = cluster.create_session(Behavior.DEFAULT) self.limits = DataSet.of("app", "rate_limits") self.max_requests = max_requests self.window_seconds = window_seconds
async def is_allowed(self, key: str) -> bool: # Atomic increment - returns new value stream = await ( self.session.update(self.limits.id(key)) .bin("count").increment_by(1) .expire_record_after_seconds(self.window_seconds) .execute() ) row = await stream.first_or_raise() count = row.record_or_raise().bins.get("count", 0) stream.close()
return count <= self.max_requests
async def get_remaining(self, key: str) -> int: stream = await self.session.query(self.limits.id(key)).execute() row = await stream.first() if row is None or not row.is_ok or row.record is None: stream.close() return self.max_requests count = row.record_or_raise().bins.get("count", 0) stream.close() return self.max_requests - countUsage
import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;
try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { RateLimiter limiter = new RateLimiter(cluster, 100, 60); // 100 req/min String userId = "user-123";
if (limiter.isAllowed(userId)) { // Process request System.out.println("Request accepted for " + userId); } else { throw new RuntimeException( "Rate limit exceeded. Remaining: " + limiter.getRemaining(userId) ); }}import asyncio
from aerospike_sdk import Client
async def main(): async with Client("localhost:3000") as client: limiter = RateLimiter(client, max_requests=100, window_seconds=60) user_id = "user-123"
if await limiter.is_allowed(user_id): print(f"Request accepted for {user_id}") else: remaining = await limiter.get_remaining(user_id) raise RuntimeError(f"Rate limit exceeded. Remaining: {remaining}")
if __name__ == "__main__": asyncio.run(main())Complete example
import com.aerospike.client.sdk.AerospikeException;import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.ResultCode;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;
public class RateLimiterExample { static class RateLimiter { private final Session session; private final DataSet limits; private final int maxRequests; private final int windowSeconds;
RateLimiter(Cluster cluster, int maxRequests, int windowSeconds) { this.session = cluster.createSession(Behavior.DEFAULT); this.limits = DataSet.of("test", "rate_limits"); this.maxRequests = maxRequests; this.windowSeconds = windowSeconds; }
boolean isAllowed(String key) { try { session.upsert(limits.id(key)) .bin("count").add(1) .expireRecordAfterSeconds(windowSeconds) .execute(); } catch (AerospikeException e) { if (e.getResultCode() == ResultCode.FAIL_FORBIDDEN) { session.upsert(limits.id(key)) .bin("count").add(1) .execute(); } else { throw e; } } return getCount(key) <= maxRequests; }
long getRemaining(String key) { return Math.max(maxRequests - getCount(key), 0); }
private long getCount(String key) { RecordStream stream = session.query(limits.id(key)).execute(); return stream.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow) .map(r -> r.getLong("count")) .orElse(0L); } }
public static void main(String[] args) { try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { RateLimiter limiter = new RateLimiter(cluster, 3, 60); // 3 req/min demo String key = "user-123"; // Reset the demo key so output is deterministic. limiter.session.delete(limiter.limits.id(key)).execute().close();
for (int i = 1; i <= 5; i++) { boolean allowed = limiter.isAllowed(key); long remaining = limiter.getRemaining(key); System.out.println( "Request " + i + ": " + (allowed ? "ALLOWED" : "BLOCKED") + " (remaining=" + Math.max(remaining, 0) + ")" ); } } }}import asyncio
from aerospike_async.exceptions import ResultCodefrom aerospike_sdk import AerospikeError, Behavior, Client, DataSet
class RateLimiter: def __init__(self, client, max_requests: int, window_seconds: int): self.session = client.create_session(Behavior.DEFAULT) self.limits = DataSet.of("test", "rate_limits") self.max_requests = max_requests self.window_seconds = window_seconds
async def is_allowed(self, key: str) -> bool: try: await ( self.session.upsert(self.limits.id(key)) .bin("count").increment_by(1) .expire_record_after_seconds(self.window_seconds) .execute() ) except AerospikeError as e: if e.result_code == ResultCode.FAIL_FORBIDDEN: await ( self.session.upsert(self.limits.id(key)) .bin("count").increment_by(1) .execute() ) else: raise return await self.get_count(key) <= self.max_requests
async def get_remaining(self, key: str) -> int: return max(self.max_requests - await self.get_count(key), 0)
async def get_count(self, key: str) -> int: stream = await self.session.query(self.limits.id(key)).execute() row = await stream.first() stream.close() if row is None or not row.is_ok or row.record is None: return 0 return row.record_or_raise().bins.get("count", 0)
async def main(): async with Client("localhost:3000") as client: limiter = RateLimiter(client, max_requests=3, window_seconds=60) key = "user-123" # Reset the demo key so output is deterministic. stream = await limiter.session.delete(key=limiter.limits.id(key)).execute() stream.close()
for i in range(1, 6): allowed = await limiter.is_allowed(key) remaining = await limiter.get_remaining(key) status = "ALLOWED" if allowed else "BLOCKED" print(f"Request {i}: {status} (remaining={max(remaining, 0)})")
if __name__ == "__main__": asyncio.run(main())Expected output
Request accepted for user-123Why Aerospike for rate limiting?
- Atomic operations —
increment()is atomic across distributed servers - TTL-based reset — Windows auto-expire without background jobs
- Sub-millisecond latency — Doesn’t slow down your API
- Horizontal scaling — Works across any number of app servers
Next steps
Session Store
Build high-velocity session storage.
Cache-Aside Pattern
Use Aerospike as a cache layer.