Session store recipe
Build a high-performance session store for web applications using the Developer SDK.
Use case
Store user session data with:
- Sub-millisecond reads for every page load
- Automatic expiration (TTL)
- High write throughput for session updates
Quick implementation
import com.aerospike.client.sdk.AerospikeException;import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.ResultCode;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;import java.util.Map;import java.util.Optional;
public class SessionStore { private final Session session; private final DataSet sessions; private final int ttlSeconds;
public SessionStore(Cluster cluster, int ttlSeconds) { this.session = cluster.createSession(Behavior.DEFAULT); this.sessions = DataSet.of("test", "sessions"); this.ttlSeconds = ttlSeconds; }
public void save(String sessionId, Map<String, Object> data) { try { session.upsert(sessions.id(sessionId)) .bin("data").setTo(data) .expireRecordAfterSeconds(ttlSeconds) .execute(); } catch (AerospikeException e) { // Some local/dev server policies disallow TTL updates (FailForbidden). if (e.getResultCode() != ResultCode.FAIL_FORBIDDEN) { throw e; } session.upsert(sessions.id(sessionId)) .bin("data").setTo(data) .execute(); } }
public Optional<Map<String, Object>> get(String sessionId) { RecordStream stream = session.query(sessions.id(sessionId)).execute(); Optional<Map<String, Object>> out = stream.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow) .map(r -> (Map<String, Object>) r.bins.get("data")); stream.close(); return out; }
public void delete(String sessionId) { session.delete(sessions.id(sessionId)).execute(); }
public void touch(String sessionId) { try { session.touch(sessions.id(sessionId)) .expireRecordAfterSeconds(ttlSeconds) .execute(); } catch (AerospikeException e) { if (e.getResultCode() != ResultCode.FAIL_FORBIDDEN) { throw e; } } }}from aerospike_async.exceptions import ResultCodefrom aerospike_sdk import AerospikeError, Behavior, DataSet
class SessionStore: def __init__(self, cluster, ttl_seconds: int): self.session = cluster.create_session(Behavior.DEFAULT) self.sessions = DataSet.of("test", "sessions") self.ttl_seconds = ttl_seconds
async def save(self, session_id: str, data: dict): try: await ( self.session.upsert(self.sessions.id(session_id)) .put({"data": data}) .expire_record_after_seconds(self.ttl_seconds) .execute() ) except AerospikeError as e: # Some local/dev server policies disallow TTL updates (FailForbidden). if e.result_code != ResultCode.FAIL_FORBIDDEN: raise await ( self.session.upsert(self.sessions.id(session_id)) .put({"data": data}) .execute() )
async def get(self, session_id: str) -> dict | None: stream = await self.session.query(self.sessions.id(session_id)).execute() row = await stream.first() if row is None or not row.is_ok or row.record is None: stream.close() return None data = row.record_or_raise().bins.get("data") stream.close() return data
async def delete(self, session_id: str): stream = await self.session.delete(self.sessions.id(session_id)).execute() stream.close()
async def touch(self, session_id: str): try: await ( self.session.touch(self.sessions.id(session_id)) .expire_record_after_seconds(self.ttl_seconds) .execute() ) except AerospikeError as e: if e.result_code != ResultCode.FAIL_FORBIDDEN: raiseUsage
import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import java.util.Map;import java.util.Optional;
try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { SessionStore store = new SessionStore(cluster, 3600); // 1 hour TTL
// Save session store.save("sess-abc123", Map.of( "userId", "user-1", "role", "admin", "loginTime", System.currentTimeMillis() ));
// Get session Optional<Map<String, Object>> session = store.get("sess-abc123"); System.out.println("Loaded session: " + session.orElse(Map.of()));
// Extend session on activity store.touch("sess-abc123");
// Logout store.delete("sess-abc123");}import asyncioimport time
from aerospike_sdk import Client
async def main(): async with Client("localhost:3000") as client: store = SessionStore(client, ttl_seconds=3600) # 1 hour TTL
# Save session await store.save("sess-abc123", { "user_id": "user-1", "role": "admin", "login_time": time.time(), })
# Get session session = await store.get("sess-abc123") print(f"Loaded session: {session}")
# Extend session on activity await store.touch("sess-abc123")
# Logout await store.delete("sess-abc123")
if __name__ == "__main__": asyncio.run(main())Complete example
import com.aerospike.client.sdk.AerospikeException;import com.aerospike.client.sdk.Cluster;import com.aerospike.client.sdk.ClusterDefinition;import com.aerospike.client.sdk.DataSet;import com.aerospike.client.sdk.RecordResult;import com.aerospike.client.sdk.RecordStream;import com.aerospike.client.sdk.ResultCode;import com.aerospike.client.sdk.Session;import com.aerospike.client.sdk.policy.Behavior;import java.util.Map;import java.util.Optional;
public class SessionStoreExample { static class SessionStore { private final Session session; private final DataSet sessions; private final int ttlSeconds;
SessionStore(Cluster cluster, int ttlSeconds) { this.session = cluster.createSession(Behavior.DEFAULT); this.sessions = DataSet.of("test", "sessions"); this.ttlSeconds = ttlSeconds; }
void save(String sessionId, Map<String, Object> data) { try { session.upsert(sessions.id(sessionId)) .bin("data").setTo(data) .expireRecordAfterSeconds(ttlSeconds) .execute(); } catch (AerospikeException e) { // Some local/dev server policies disallow TTL updates (FailForbidden). if (e.getResultCode() != ResultCode.FAIL_FORBIDDEN) { throw e; } session.upsert(sessions.id(sessionId)) .bin("data").setTo(data) .execute(); } }
Optional<Map<String, Object>> get(String sessionId) { RecordStream stream = session.query(sessions.id(sessionId)).execute(); try { return stream.getFirst() .filter(RecordResult::isOk) .map(RecordResult::recordOrThrow) .map(r -> (Map<String, Object>) r.bins.get("data")); } finally { stream.close(); } }
void touch(String sessionId) { try { session.touch(sessions.id(sessionId)) .expireRecordAfterSeconds(ttlSeconds) .execute(); } catch (AerospikeException e) { if (e.getResultCode() != ResultCode.FAIL_FORBIDDEN) { throw e; } } }
void delete(String sessionId) { session.delete(sessions.id(sessionId)).execute(); } }
public static void main(String[] args) { try (Cluster cluster = new ClusterDefinition("localhost", 3000).connect()) { SessionStore store = new SessionStore(cluster, 3600); // 1 hour TTL String sessionId = "sess-complete-1";
store.save(sessionId, Map.of( "userId", "user-1", "role", "admin", "loginTime", System.currentTimeMillis() ));
Optional<Map<String, Object>> loaded = store.get(sessionId); System.out.println("Loaded session: " + loaded.orElse(Map.of()));
store.touch(sessionId); store.delete(sessionId); } }}import asyncioimport time
from aerospike_async.exceptions import ResultCodefrom aerospike_sdk import AerospikeError, Behavior, Client, DataSet
class SessionStore: def __init__(self, client, ttl_seconds: int): self.session = client.create_session(Behavior.DEFAULT) self.sessions = DataSet.of("test", "sessions") self.ttl_seconds = ttl_seconds
async def save(self, session_id: str, data: dict): try: await ( self.session.upsert(self.sessions.id(session_id)) .put({"data": data}) .expire_record_after_seconds(self.ttl_seconds) .execute() ) except AerospikeError as e: # Some local/dev server policies disallow TTL updates (FailForbidden). if e.result_code != ResultCode.FAIL_FORBIDDEN: raise await ( self.session.upsert(self.sessions.id(session_id)) .put({"data": data}) .execute() )
async def get(self, session_id: str) -> dict | None: stream = await self.session.query(self.sessions.id(session_id)).execute() row = await stream.first() stream.close() if row is None or not row.is_ok or row.record is None: return None return row.record_or_raise().bins.get("data")
async def touch(self, session_id: str): try: await ( self.session.touch(self.sessions.id(session_id)) .expire_record_after_seconds(self.ttl_seconds) .execute() ) except AerospikeError as e: if e.result_code != ResultCode.FAIL_FORBIDDEN: raise
async def delete(self, session_id: str): stream = await self.session.delete(key=self.sessions.id(session_id)).execute() stream.close()
async def main(): async with Client("localhost:3000") as client: store = SessionStore(client, ttl_seconds=3600) # 1 hour TTL session_id = "sess-complete-1"
await store.save(session_id, { "user_id": "user-1", "role": "admin", "login_time": time.time(), })
loaded = await store.get(session_id) print(f"Loaded session: {loaded}")
await store.touch(session_id) await store.delete(session_id)
if __name__ == "__main__": asyncio.run(main())Expected output
Loaded session: {user_id=user-1, role=admin, login_time=<timestamp>}Performance tips
- Use
Behavior.DEFAULTas a safe starting point for session workloads - Set appropriate TTL to auto-expire inactive sessions
- Use
touch()to extend TTL on user activity without rewriting data
Next steps
Rate Limiting
Build rate limiters with atomic counters.
Cache-Aside Pattern
Use Aerospike as a cache layer.