Appearance
Queues and Workers
The Lota SDK uses BullMQ backed by Redis for all background job processing. Workers handle memory extraction, context compaction, title generation, skill extraction, memory consolidation, and more. This architecture keeps the chat request path fast by deferring heavy or non-urgent work to asynchronous queues.
Overview
After each chat turn, the SDK enqueues background jobs through post-turn hooks. These jobs land in BullMQ queues stored in Redis. Dedicated worker processes pick up jobs, execute them, and report completion or failure. All workers share the same Redis instance used by the runtime.
Chat Turn
│
▼
Post-Turn Hooks
│
▼
BullMQ Queues ◄──────► Redis (port 6382)
│
▼
┌────────────────────────────────────────┐
│ Workers │
│ • post-chat-memory │
│ • regular-chat-memory-digest │
│ • context-compaction │
│ • memory-consolidation │
│ • skill-extraction │
│ • workstream-title-generation │
│ • recent-activity-title-refinement │
└────────────────────────────────────────┘
│
▼
SurrealDB / AI Gateway
(persist results, call LLMs)Jobs flow in one direction: chat turn produces events, hooks enqueue jobs, workers consume them. Workers that need LLM calls (memory extraction, title generation, skill extraction) reach the AI gateway. Workers that persist state connect to SurrealDB.
Built-in Workers
The SDK ships seven built-in workers. Each has a dedicated queue, enqueue function, and start function.
| Worker | Queue Name | Purpose | Trigger | Concurrency | Lock Duration |
|---|---|---|---|---|---|
| Post-Chat Memory | post-chat-memory | Extracts facts and memories from conversation turns | After each chat turn | 3 | 15 min |
| Regular Chat Memory Digest | regular-chat-memory-digest | Batch-processes accumulated memories from post-onboarding conversations | 15-min deduplication window per org | 1 | 10 min |
| Context Compaction | context-compaction | Summarizes old messages when the conversation context grows too large | When token count exceeds threshold | 2 | 5 min |
| Memory Consolidation | memory-consolidation | Archives stale memories, resolves contradictions, merges duplicates | Recurring every 24 hours (scheduled) | 1 | 10 min |
| Skill Extraction | skill-extraction | Identifies reusable procedures and skills from conversations | 15-min deduplication window per org | 1 | 10 min |
| Workstream Title Generation | workstream-title-generation | Generates descriptive titles for new workstreams | When a workstream has no custom title | 2 | 1 min |
| Recent Activity Title Refinement | recent-activity-title-refinement | Refines sidebar entry titles for recent activity items | After turns that update activity | 2 | 5 min |
Worker details
Post-Chat Memory extracts structured facts from each user/agent exchange. It receives the full conversation context including history messages, memory block, and attachment context. Extracted memories are stored via memoryService.addConversationMemories().
Regular Chat Memory Digest runs on a 15-minute delayed schedule per organization. When a turn is completed, the job is enqueued with a deduplication key (regular-chat-digest:{orgId}), so multiple turns within the window are batched into a single digest run. This worker executes in a sandboxed child process.
Context Compaction fires when a workstream's token count exceeds the configured threshold. It marks the workstream as compacting, runs contextCompactionService.compactWorkstreamHistory(), then clears the compacting flag. Jobs are deduplicated per entity (compact:{domain}:{entityId}).
Memory Consolidation runs on a 24-hour recurring schedule. It scans all organization memories, archives stale entries, resolves contradictions, and merges near-duplicates. It can also be triggered on-demand for a specific scope. This worker executes in a sandboxed child process.
Skill Extraction identifies reusable procedures from conversations. Like the memory digest worker, it uses a 15-minute deduplication window per organization and runs in a sandboxed child process.
Workstream Title Generation generates a descriptive title from the first messages in a workstream. Jobs are deduplicated by workstream ID (workstream-title:{workstreamId}).
Recent Activity Title Refinement refines the short titles shown in the sidebar for recent activity entries. Jobs are deduplicated by activity ID (recent-activity-title:{activityId}).
Starting Workers
Start workers after creating and connecting the runtime:
ts
import { createLotaRuntime } from '@lota-sdk/core'
const runtime = await createLotaRuntime(config)
await runtime.connect()
// Start all built-in workers
runtime.workers.startPostChatMemoryWorker()
runtime.workers.startRegularChatMemoryDigestWorker()
runtime.workers.startContextCompactionWorker()
runtime.workers.startMemoryConsolidationWorker()
runtime.workers.startSkillExtractionWorker()
runtime.workers.startWorkstreamTitleGenerationWorker()
runtime.workers.startRecentActivityTitleRefinementWorker()
// Schedule the recurring memory consolidation job (every 24 hours)
runtime.workers.scheduleRecurringConsolidation()Each start*Worker() call returns a WorkerHandle that you can use for graceful shutdown:
ts
interface WorkerHandle {
worker: Worker // BullMQ Worker instance
shutdown: () => Promise<void>
}Custom Workers
Consumer applications can register extra workers through the extraWorkers option in the runtime configuration:
ts
const runtime = await createLotaRuntime({
// ... other config
extraWorkers: {
documentProcessor: (redis) => startDocumentProcessorWorker(redis),
reportGenerator: (redis) => startReportGeneratorWorker(redis),
},
})Extra workers are stored in the runtime extensions state and can be started alongside the built-in workers. This keeps all worker lifecycle management centralized in the runtime.
Queue Configuration
Each queue is configured with sensible defaults for its workload. Here is a breakdown of the common configuration options:
Default job options
Every queue sets defaultJobOptions on the BullMQ Queue instance:
ts
{
removeOnComplete: 200, // Keep last 200 completed jobs for inspection
removeOnFail: 200, // Keep last 200 failed jobs for debugging
attempts: 2, // Retry failed jobs up to 2 times
backoff: {
type: 'exponential', // Exponential backoff between retries
delay: 3_000, // Base delay in milliseconds
},
}The exact values vary by worker. For example, post-chat memory uses 3 attempts with a 2-second base delay, while memory consolidation uses 2 attempts with a 5-second base delay.
Concurrency and lock duration
Worker concurrency and lock duration are set when creating the BullMQ Worker instance:
| Worker | Concurrency | Lock Duration | Max Stalled Count |
|---|---|---|---|
| Post-Chat Memory | 3 | 900,000 ms (15 min) | 10 |
| Regular Chat Memory Digest | 1 | 600,000 ms (10 min) | default |
| Context Compaction | 2 | 300,000 ms (5 min) | default |
| Memory Consolidation | 1 | 600,000 ms (10 min) | default |
| Skill Extraction | 1 | 600,000 ms (10 min) | default |
| Workstream Title Generation | 2 | 60,000 ms (1 min) | default |
| Recent Activity Title Refinement | 2 | 300,000 ms (5 min) | default |
Deduplication
Several workers use BullMQ's built-in deduplication to prevent redundant work:
- Context Compaction: Deduplicates by
compact:{domain}:{entityId}, so concurrent compaction requests for the same workstream collapse into one job. - Regular Chat Memory Digest: Deduplicates by
regular-chat-digest:{orgId}with a 15-minute delay window. Multiple turns within the window produce a single digest job. - Skill Extraction: Deduplicates by
skill-extraction:{orgId}with a 15-minute delay window, similar to memory digest. - Workstream Title Generation: Deduplicates by
workstream-title:{workstreamId}, so only one title generation runs per workstream. - Recent Activity Title Refinement: Deduplicates by
recent-activity-title:{activityId}.
Worker Bootstrap for Sandboxed Processes
Some workers (memory consolidation, regular chat memory digest, skill extraction) run their processor in a sandboxed child process. BullMQ spawns a separate process using the worker file path. Since createLotaRuntime() was never called in that child process, the global database service has no backing instance.
The initializeSandboxedWorkerRuntime() function handles this:
ts
import { initializeSandboxedWorkerRuntime } from '@lota-sdk/core/workers/bootstrap'
// Called at the top of sandboxed worker files
await initializeSandboxedWorkerRuntime()This function:
- Configures the logger
- Creates a
SurrealDBServiceinstance from environment variables (SURREALDB_URL,SURREALDB_NAMESPACE,SURREALDB_USER,SURREALDB_PASSWORD) - Connects to the database with startup retry logic
- Connects any configured plugin databases
- Waits for the database schema bootstrap to complete (using
DB_SCHEMA_FINGERPRINTto verify schema readiness)
The initialization is idempotent -- calling it multiple times reuses the same promise.
Required environment variables for sandboxed workers
| Variable | Required | Description |
|---|---|---|
SURREALDB_URL | Yes | WebSocket URL for SurrealDB (e.g., ws://localhost:8003/rpc) |
SURREALDB_NAMESPACE | Yes | SurrealDB namespace |
SURREALDB_USER | No | SurrealDB username |
SURREALDB_PASSWORD | No | SurrealDB password |
DB_SCHEMA_FINGERPRINT | No | Expected schema fingerprint for readiness check |
Redis Connection
All queues and workers obtain their Redis connection through getRedisConnectionForBullMQ(). This returns an ioredis instance configured for BullMQ compatibility.
For host-owned queues (e.g., document processing), use the runtime's Redis accessor instead of importing the raw connection:
ts
import { createDocumentProcessorQueueRuntime } from '@lota-sdk/core/queues/document-processor.queue'
const documentProcessorQueue = createDocumentProcessorQueueRuntime({
getConnectionForBullMQ: runtime.redis.getConnectionForBullMQ,
getWorkerPath: () => '/absolute/path/to/document-processor.worker.ts',
logger,
})This ensures the worker uses the same managed Redis connection as the rest of the runtime.
Graceful Shutdown
Every start*Worker() function returns a WorkerHandle with a shutdown method. Call it during application teardown to drain in-progress jobs gracefully:
ts
const handles = [
runtime.workers.startContextCompactionWorker(),
runtime.workers.startPostChatMemoryWorker(),
runtime.workers.startRegularChatMemoryDigestWorker(),
]
// During shutdown:
await Promise.all(handles.map((h) => h.shutdown()))The shutdown flow:
- Calls
worker.close()on the BullMQ worker, which stops picking up new jobs and waits for in-progress jobs to finish. - Logs the shutdown event.
Signal handling
When a worker runs as a standalone process (detected via import.meta.main), the SDK automatically registers SIGINT and SIGTERM handlers:
ts
// Automatic when running as: bun run core/src/queues/context-compaction.queue.ts
if (import.meta.main) {
startContextCompactionWorker()
}The signal handler:
- Initiates graceful shutdown
- Sets a forced-exit timeout (default: 10 seconds)
- Calls
process.exit(0)once the worker closes or the timeout fires
You can disable automatic signal registration when starting workers inside a larger application:
ts
const handle = startContextCompactionWorker({ registerSignals: false })Document Processing Queue
The SDK provides a reusable queue helper for document ingestion workflows through createDocumentProcessorQueueRuntime(). This is a generic factory that consumers can use to build their own document processing pipelines.
ts
import { createDocumentProcessorQueueRuntime } from '@lota-sdk/core/queues/document-processor.queue'
import type { DocumentProcessorJob } from '@lota-sdk/core/queues/document-processor.queue'
const docQueue = createDocumentProcessorQueueRuntime({
getConnectionForBullMQ: runtime.redis.getConnectionForBullMQ,
getWorkerPath: () => '/absolute/path/to/my-doc-worker.ts',
logger,
queueName: 'my-document-processor', // optional, defaults to 'document-processor'
workerName: 'My document processor', // optional, defaults to 'Document processor'
concurrency: 4, // optional, defaults to 4
lockDuration: 300_000, // optional, defaults to 5 min
})
// Enqueue a document
await docQueue.enqueue({
orgId: 'org_123',
source: 'upload',
sourceId: 'doc_456',
title: 'Quarterly Report',
sourceCanonicalKey: 'reports/q4-2025',
sourceVersionKey: 'v2',
text: 'Full document text...',
})
// Start the worker
const handle = docQueue.startWorker()The factory returns two functions:
enqueue(job)-- Adds a job to the queue. Jobs are deduplicated using a SHA-256 hash of the job's identifying fields (orgId,source,sourceId,sourceCanonicalKey,sourceVersionKey,title).startWorker(options?)-- Starts a BullMQ worker that processes jobs using the provided worker file path. Returns a standardWorkerHandle.
DocumentProcessorJob interface
| Field | Type | Required | Description |
|---|---|---|---|
orgId | string | Yes | Organization identifier |
source | string | Yes | Source channel (e.g., 'upload', 'integration') |
sourceId | string | Yes | Source-specific document identifier |
title | string | Yes | Document title |
sourceCanonicalKey | string | Yes | Canonical key for deduplication |
sourceVersionKey | string | Yes | Version key for deduplication |
contentType | string | No | MIME type of the document |
text | string | No | Pre-extracted text content |
attachment | object | No | Reference to a stored attachment (storageKey, name, contentType, sizeBytes) |
metadata | Record<string, unknown> | No | Arbitrary metadata |
File Layout
core/src/
queues/
context-compaction.queue.ts # Queue + worker + enqueue
post-chat-memory.queue.ts # Queue + worker + enqueue
regular-chat-memory-digest.queue.ts # Queue + worker + enqueue
regular-chat-memory-digest.config.ts # Deduplication + delay config
memory-consolidation.queue.ts # Queue + worker + enqueue + recurring schedule
skill-extraction.queue.ts # Queue + worker + enqueue
skill-extraction.config.ts # Deduplication + delay config
workstream-title-generation.queue.ts # Queue + worker + enqueue
recent-activity-title-refinement.queue.ts # Queue + worker + enqueue
document-processor.queue.ts # Generic document processing factory
workers/
worker-utils.ts # Shared utilities (events, shutdown, tracing)
bootstrap.ts # Sandboxed worker initialization
memory-consolidation.worker.ts # Sandboxed processor for memory consolidation
regular-chat-memory-digest.worker.ts # Sandboxed processor for memory digest
regular-chat-memory-digest.runner.ts # Runner logic for memory digest
regular-chat-memory-digest.helpers.ts # Helpers for memory digest
skill-extraction.worker.ts # Sandboxed processor for skill extraction
skill-extraction.runner.ts # Runner logic for skill extraction
utils/ # Non-worker support files (chunkers, extractors)