Skip to content

Queues and Workers

The Lota SDK uses BullMQ backed by Redis for all background job processing. Workers handle memory extraction, context compaction, title generation, skill extraction, memory consolidation, and more. This architecture keeps the chat request path fast by deferring heavy or non-urgent work to asynchronous queues.

Overview

After each chat turn, the SDK enqueues background jobs through post-turn hooks. These jobs land in BullMQ queues stored in Redis. Dedicated worker processes pick up jobs, execute them, and report completion or failure. All workers share the same Redis instance used by the runtime.

Chat Turn


Post-Turn Hooks


BullMQ Queues ◄──────► Redis (port 6382)


┌────────────────────────────────────────┐
│ Workers                                │
│  • post-chat-memory                    │
│  • regular-chat-memory-digest          │
│  • context-compaction                  │
│  • memory-consolidation                │
│  • skill-extraction                    │
│  • workstream-title-generation         │
│  • recent-activity-title-refinement    │
└────────────────────────────────────────┘


SurrealDB / AI Gateway
(persist results, call LLMs)

Jobs flow in one direction: chat turn produces events, hooks enqueue jobs, workers consume them. Workers that need LLM calls (memory extraction, title generation, skill extraction) reach the AI gateway. Workers that persist state connect to SurrealDB.

Built-in Workers

The SDK ships seven built-in workers. Each has a dedicated queue, enqueue function, and start function.

WorkerQueue NamePurposeTriggerConcurrencyLock Duration
Post-Chat Memorypost-chat-memoryExtracts facts and memories from conversation turnsAfter each chat turn315 min
Regular Chat Memory Digestregular-chat-memory-digestBatch-processes accumulated memories from post-onboarding conversations15-min deduplication window per org110 min
Context Compactioncontext-compactionSummarizes old messages when the conversation context grows too largeWhen token count exceeds threshold25 min
Memory Consolidationmemory-consolidationArchives stale memories, resolves contradictions, merges duplicatesRecurring every 24 hours (scheduled)110 min
Skill Extractionskill-extractionIdentifies reusable procedures and skills from conversations15-min deduplication window per org110 min
Workstream Title Generationworkstream-title-generationGenerates descriptive titles for new workstreamsWhen a workstream has no custom title21 min
Recent Activity Title Refinementrecent-activity-title-refinementRefines sidebar entry titles for recent activity itemsAfter turns that update activity25 min

Worker details

Post-Chat Memory extracts structured facts from each user/agent exchange. It receives the full conversation context including history messages, memory block, and attachment context. Extracted memories are stored via memoryService.addConversationMemories().

Regular Chat Memory Digest runs on a 15-minute delayed schedule per organization. When a turn is completed, the job is enqueued with a deduplication key (regular-chat-digest:{orgId}), so multiple turns within the window are batched into a single digest run. This worker executes in a sandboxed child process.

Context Compaction fires when a workstream's token count exceeds the configured threshold. It marks the workstream as compacting, runs contextCompactionService.compactWorkstreamHistory(), then clears the compacting flag. Jobs are deduplicated per entity (compact:{domain}:{entityId}).

Memory Consolidation runs on a 24-hour recurring schedule. It scans all organization memories, archives stale entries, resolves contradictions, and merges near-duplicates. It can also be triggered on-demand for a specific scope. This worker executes in a sandboxed child process.

Skill Extraction identifies reusable procedures from conversations. Like the memory digest worker, it uses a 15-minute deduplication window per organization and runs in a sandboxed child process.

Workstream Title Generation generates a descriptive title from the first messages in a workstream. Jobs are deduplicated by workstream ID (workstream-title:{workstreamId}).

Recent Activity Title Refinement refines the short titles shown in the sidebar for recent activity entries. Jobs are deduplicated by activity ID (recent-activity-title:{activityId}).

Starting Workers

Start workers after creating and connecting the runtime:

ts
import { createLotaRuntime } from '@lota-sdk/core'

const runtime = await createLotaRuntime(config)
await runtime.connect()

// Start all built-in workers
runtime.workers.startPostChatMemoryWorker()
runtime.workers.startRegularChatMemoryDigestWorker()
runtime.workers.startContextCompactionWorker()
runtime.workers.startMemoryConsolidationWorker()
runtime.workers.startSkillExtractionWorker()
runtime.workers.startWorkstreamTitleGenerationWorker()
runtime.workers.startRecentActivityTitleRefinementWorker()

// Schedule the recurring memory consolidation job (every 24 hours)
runtime.workers.scheduleRecurringConsolidation()

Each start*Worker() call returns a WorkerHandle that you can use for graceful shutdown:

ts
interface WorkerHandle {
  worker: Worker    // BullMQ Worker instance
  shutdown: () => Promise<void>
}

Custom Workers

Consumer applications can register extra workers through the extraWorkers option in the runtime configuration:

ts
const runtime = await createLotaRuntime({
  // ... other config
  extraWorkers: {
    documentProcessor: (redis) => startDocumentProcessorWorker(redis),
    reportGenerator: (redis) => startReportGeneratorWorker(redis),
  },
})

Extra workers are stored in the runtime extensions state and can be started alongside the built-in workers. This keeps all worker lifecycle management centralized in the runtime.

Queue Configuration

Each queue is configured with sensible defaults for its workload. Here is a breakdown of the common configuration options:

Default job options

Every queue sets defaultJobOptions on the BullMQ Queue instance:

ts
{
  removeOnComplete: 200,   // Keep last 200 completed jobs for inspection
  removeOnFail: 200,       // Keep last 200 failed jobs for debugging
  attempts: 2,             // Retry failed jobs up to 2 times
  backoff: {
    type: 'exponential',   // Exponential backoff between retries
    delay: 3_000,          // Base delay in milliseconds
  },
}

The exact values vary by worker. For example, post-chat memory uses 3 attempts with a 2-second base delay, while memory consolidation uses 2 attempts with a 5-second base delay.

Concurrency and lock duration

Worker concurrency and lock duration are set when creating the BullMQ Worker instance:

WorkerConcurrencyLock DurationMax Stalled Count
Post-Chat Memory3900,000 ms (15 min)10
Regular Chat Memory Digest1600,000 ms (10 min)default
Context Compaction2300,000 ms (5 min)default
Memory Consolidation1600,000 ms (10 min)default
Skill Extraction1600,000 ms (10 min)default
Workstream Title Generation260,000 ms (1 min)default
Recent Activity Title Refinement2300,000 ms (5 min)default

Deduplication

Several workers use BullMQ's built-in deduplication to prevent redundant work:

  • Context Compaction: Deduplicates by compact:{domain}:{entityId}, so concurrent compaction requests for the same workstream collapse into one job.
  • Regular Chat Memory Digest: Deduplicates by regular-chat-digest:{orgId} with a 15-minute delay window. Multiple turns within the window produce a single digest job.
  • Skill Extraction: Deduplicates by skill-extraction:{orgId} with a 15-minute delay window, similar to memory digest.
  • Workstream Title Generation: Deduplicates by workstream-title:{workstreamId}, so only one title generation runs per workstream.
  • Recent Activity Title Refinement: Deduplicates by recent-activity-title:{activityId}.

Worker Bootstrap for Sandboxed Processes

Some workers (memory consolidation, regular chat memory digest, skill extraction) run their processor in a sandboxed child process. BullMQ spawns a separate process using the worker file path. Since createLotaRuntime() was never called in that child process, the global database service has no backing instance.

The initializeSandboxedWorkerRuntime() function handles this:

ts
import { initializeSandboxedWorkerRuntime } from '@lota-sdk/core/workers/bootstrap'

// Called at the top of sandboxed worker files
await initializeSandboxedWorkerRuntime()

This function:

  1. Configures the logger
  2. Creates a SurrealDBService instance from environment variables (SURREALDB_URL, SURREALDB_NAMESPACE, SURREALDB_USER, SURREALDB_PASSWORD)
  3. Connects to the database with startup retry logic
  4. Connects any configured plugin databases
  5. Waits for the database schema bootstrap to complete (using DB_SCHEMA_FINGERPRINT to verify schema readiness)

The initialization is idempotent -- calling it multiple times reuses the same promise.

Required environment variables for sandboxed workers

VariableRequiredDescription
SURREALDB_URLYesWebSocket URL for SurrealDB (e.g., ws://localhost:8003/rpc)
SURREALDB_NAMESPACEYesSurrealDB namespace
SURREALDB_USERNoSurrealDB username
SURREALDB_PASSWORDNoSurrealDB password
DB_SCHEMA_FINGERPRINTNoExpected schema fingerprint for readiness check

Redis Connection

All queues and workers obtain their Redis connection through getRedisConnectionForBullMQ(). This returns an ioredis instance configured for BullMQ compatibility.

For host-owned queues (e.g., document processing), use the runtime's Redis accessor instead of importing the raw connection:

ts
import { createDocumentProcessorQueueRuntime } from '@lota-sdk/core/queues/document-processor.queue'

const documentProcessorQueue = createDocumentProcessorQueueRuntime({
  getConnectionForBullMQ: runtime.redis.getConnectionForBullMQ,
  getWorkerPath: () => '/absolute/path/to/document-processor.worker.ts',
  logger,
})

This ensures the worker uses the same managed Redis connection as the rest of the runtime.

Graceful Shutdown

Every start*Worker() function returns a WorkerHandle with a shutdown method. Call it during application teardown to drain in-progress jobs gracefully:

ts
const handles = [
  runtime.workers.startContextCompactionWorker(),
  runtime.workers.startPostChatMemoryWorker(),
  runtime.workers.startRegularChatMemoryDigestWorker(),
]

// During shutdown:
await Promise.all(handles.map((h) => h.shutdown()))

The shutdown flow:

  1. Calls worker.close() on the BullMQ worker, which stops picking up new jobs and waits for in-progress jobs to finish.
  2. Logs the shutdown event.

Signal handling

When a worker runs as a standalone process (detected via import.meta.main), the SDK automatically registers SIGINT and SIGTERM handlers:

ts
// Automatic when running as: bun run core/src/queues/context-compaction.queue.ts
if (import.meta.main) {
  startContextCompactionWorker()
}

The signal handler:

  1. Initiates graceful shutdown
  2. Sets a forced-exit timeout (default: 10 seconds)
  3. Calls process.exit(0) once the worker closes or the timeout fires

You can disable automatic signal registration when starting workers inside a larger application:

ts
const handle = startContextCompactionWorker({ registerSignals: false })

Document Processing Queue

The SDK provides a reusable queue helper for document ingestion workflows through createDocumentProcessorQueueRuntime(). This is a generic factory that consumers can use to build their own document processing pipelines.

ts
import { createDocumentProcessorQueueRuntime } from '@lota-sdk/core/queues/document-processor.queue'
import type { DocumentProcessorJob } from '@lota-sdk/core/queues/document-processor.queue'

const docQueue = createDocumentProcessorQueueRuntime({
  getConnectionForBullMQ: runtime.redis.getConnectionForBullMQ,
  getWorkerPath: () => '/absolute/path/to/my-doc-worker.ts',
  logger,
  queueName: 'my-document-processor',    // optional, defaults to 'document-processor'
  workerName: 'My document processor',   // optional, defaults to 'Document processor'
  concurrency: 4,                        // optional, defaults to 4
  lockDuration: 300_000,                 // optional, defaults to 5 min
})

// Enqueue a document
await docQueue.enqueue({
  orgId: 'org_123',
  source: 'upload',
  sourceId: 'doc_456',
  title: 'Quarterly Report',
  sourceCanonicalKey: 'reports/q4-2025',
  sourceVersionKey: 'v2',
  text: 'Full document text...',
})

// Start the worker
const handle = docQueue.startWorker()

The factory returns two functions:

  • enqueue(job) -- Adds a job to the queue. Jobs are deduplicated using a SHA-256 hash of the job's identifying fields (orgId, source, sourceId, sourceCanonicalKey, sourceVersionKey, title).
  • startWorker(options?) -- Starts a BullMQ worker that processes jobs using the provided worker file path. Returns a standard WorkerHandle.

DocumentProcessorJob interface

FieldTypeRequiredDescription
orgIdstringYesOrganization identifier
sourcestringYesSource channel (e.g., 'upload', 'integration')
sourceIdstringYesSource-specific document identifier
titlestringYesDocument title
sourceCanonicalKeystringYesCanonical key for deduplication
sourceVersionKeystringYesVersion key for deduplication
contentTypestringNoMIME type of the document
textstringNoPre-extracted text content
attachmentobjectNoReference to a stored attachment (storageKey, name, contentType, sizeBytes)
metadataRecord<string, unknown>NoArbitrary metadata

File Layout

core/src/
  queues/
    context-compaction.queue.ts              # Queue + worker + enqueue
    post-chat-memory.queue.ts                # Queue + worker + enqueue
    regular-chat-memory-digest.queue.ts      # Queue + worker + enqueue
    regular-chat-memory-digest.config.ts     # Deduplication + delay config
    memory-consolidation.queue.ts            # Queue + worker + enqueue + recurring schedule
    skill-extraction.queue.ts                # Queue + worker + enqueue
    skill-extraction.config.ts               # Deduplication + delay config
    workstream-title-generation.queue.ts     # Queue + worker + enqueue
    recent-activity-title-refinement.queue.ts # Queue + worker + enqueue
    document-processor.queue.ts              # Generic document processing factory
  workers/
    worker-utils.ts                          # Shared utilities (events, shutdown, tracing)
    bootstrap.ts                             # Sandboxed worker initialization
    memory-consolidation.worker.ts           # Sandboxed processor for memory consolidation
    regular-chat-memory-digest.worker.ts     # Sandboxed processor for memory digest
    regular-chat-memory-digest.runner.ts     # Runner logic for memory digest
    regular-chat-memory-digest.helpers.ts    # Helpers for memory digest
    skill-extraction.worker.ts               # Sandboxed processor for skill extraction
    skill-extraction.runner.ts               # Runner logic for skill extraction
    utils/                                   # Non-worker support files (chunkers, extractors)