Skip to content

Redis

Overview

Redis serves two primary roles in the Lota SDK: it backs BullMQ job queues for asynchronous task processing (memory extraction, context compaction, repo indexing, etc.) and provides distributed locks for coordinating concurrent access to shared resources like organization memory. Additionally, Redis is used as an embedding cache to avoid redundant calls to the embedding API.

Public Runtime Boundary

ts
const runtime = await createLotaRuntime(config)
await runtime.connect()

const redis = runtime.redis.getConnection()
const bullmq = runtime.redis.getConnectionForBullMQ()

Use runtime.redis.* from host code. That keeps Redis ownership attached to the runtime instance you created.

Low-Level SDK Ownership

  • core/src/redis/connection.ts creates the connection manager
  • core/src/redis/index.ts exposes runtime-wide Redis accessors
  • core/src/queues/*.queue.ts consumes BullMQ connections
  • core/src/redis/org-memory-lock.ts owns distributed memory locking

@lota-sdk/core/redis is still used by SDK internals and shared queue code, but it is not the preferred host integration surface.

Runtime usage

createLotaRuntime({ redis: { url } }) wires the Redis connection manager into the runtime. Built-in workers, queue helpers, and host-owned queue wrappers should then use that runtime-backed manager instead of creating parallel Redis clients.

Connection Management

The RedisConnectionManager handles the lifecycle of ioredis connections:

  • Separate connection pools. General-purpose connections and BullMQ connections are created independently. BullMQ requires maxRetriesPerRequest: null to support its blocking command pattern — this option is incompatible with general-use connections, which is why they are separated.

  • Health checks. The connection manager runs periodic PING commands (every 30 seconds) to detect stale connections early. If a health check fails, the connection triggers its reconnection logic.

  • Auto-reconnect. ioredis handles reconnection automatically with exponential backoff. The SDK does not implement custom retry logic on top of this — it relies on ioredis defaults, which retry indefinitely with increasing delays.

  • Graceful shutdown. When runtime.disconnect() is called, all Redis connections are closed cleanly. This prevents connection leaks in test environments and during hot-reload cycles.

ts
// Configuration
redis: {
  url: 'redis://localhost:6379',   // Standard Redis connection URL
}

Distributed Locks

Org-Memory Lock

The org-memory lock prevents concurrent memory writes to the same organization. Without this lock, parallel memory extraction jobs (triggered by simultaneous turns across different workstreams) could produce duplicate or conflicting memory records.

  • Lease-based TTL. Each lock acquisition sets a TTL (time-to-live). If the process holding the lock crashes without releasing it, the lock automatically expires after the TTL, allowing other processes to proceed. This prevents deadlocks from crashed workers.

  • Scope. Locks are scoped per organization ID. Different organizations can have memory operations running concurrently without interference.

  • Host adapter. The withWorkspaceMemoryLock function provides an adapter for host-provided lock implementations. Hosts can override the default Redis-based lock with their own distributed lock mechanism if needed.

ts
// Internal usage pattern (SDK handles this automatically)
await withOrgMemoryLock(orgId, async () => {
  // Extract and persist memories
  // Only one process per org runs this block at a time
})

Embedding Cache

To reduce latency and cost, the SDK caches embedding vectors in Redis. When the same text is embedded multiple times (common during memory retrieval across turns), the cached vector is returned instead of calling the embedding API.

  • Key format. Cache keys are derived from a hash of the input text and the embedding model name, ensuring cache correctness across model changes.

  • Configurable TTL. The cache TTL is controlled by the embeddingCacheTtlSeconds config option. Embeddings are deterministic for a given model, so long TTLs are safe. The default is tuned for production use.

  • Eviction. Standard Redis eviction policies apply. If Redis memory pressure triggers eviction, embedding cache entries are evicted like any other key. The system gracefully falls back to the embedding API on cache misses.

ts
// Configuration (optional)
aiGateway: {
  // ...
  embeddingCacheTtlSeconds: 86400,   // Cache embeddings for 24 hours
}

Queue Patterns

The SDK defines several BullMQ queues for background processing. All queues share common patterns:

Job Deduplication

Jobs use composite keys to prevent duplicate processing. For example, a memory extraction job for the same workstream turn is deduplicated by {orgId}:{workstreamId}:{turnId}. If a job with the same key is already queued or active, the duplicate is silently dropped.

Delayed Jobs

Some jobs use deliberate delays for batching. Memory digest jobs, for instance, are enqueued with a 15-minute delay. If another turn completes within that window, the existing delayed job is replaced with a new one (resetting the timer). This effectively debounces memory extraction to avoid processing every single turn individually.

Exponential Backoff Retries

Failed jobs are retried with exponential backoff. Each queue defines its own retry count and backoff parameters based on the nature of the work:

QueueMax RetriesBackoffNotes
Memory extraction3ExponentialIdempotent, safe to retry
Context compaction2ExponentialRe-reads messages, safe to retry
Repo indexing3ExponentialLong-running, chunked

Concurrency Limits

Each worker specifies a concurrency limit controlling how many jobs it processes in parallel. CPU-bound work (embedding generation) uses lower concurrency, while I/O-bound work (database writes) can run at higher concurrency. Workers are configured per-queue when registered with the runtime.

ts
// Workers are registered through the runtime
const runtime = await createLotaRuntime({
  // ...
  extraWorkers: [
    { queue: 'custom-queue', concurrency: 5, processor: myProcessor },
  ],
})