Skip to content

Services API

The public service boundary is runtime.services, returned from createLotaRuntime().

Consumer applications should call services through a runtime instance:

ts
import { createLotaRuntime } from '@lota-sdk/core'

const runtime = await createLotaRuntime(config)
await runtime.connect()

const {
  workstreamService,
  workstreamMessageService,
  memoryService,
  executionPlanService,
  attachmentService,
  generatedDocumentStorageService,
  recentActivityService,
  createWorkstreamTurnStream,
} = runtime.services

Deep imports such as @lota-sdk/core/services/*.service still exist for SDK internals, but host code should treat runtime.services.* as the supported boundary.

workstreamService

Access via runtime.services.workstreamService.

createWorkstream(userId, orgId, options?)

Creates a new workstream. Returns a NormalizedWorkstream.

ts
const ws = await workstreamService.createWorkstream(userId, orgId, {
  mode: 'group',          // 'direct' | 'group'
  title: 'My Workstream', // optional, defaults to 'New Workstream'
  agentId: 'chief',       // required for mode='direct'
  core: false,            // true for core workstreams
  coreType: 'daily-activities', // required when core=true
})

listWorkstreams(userId, orgId, options)

Lists workstreams with mode-based filtering and optional pagination.

ts
const { workstreams, hasMore } = await workstreamService.listWorkstreams(userId, orgId, {
  mode: 'group',
  core: false,
  take: 15,
  page: 1,
  includeArchived: false,
})

getWorkstream(workstreamId)

Returns a single NormalizedWorkstream by ID.

updateTitle(workstreamId, title)

Updates the workstream title. Throws for direct and core workstreams.

updateStatus(workstreamId, status)

Sets the workstream status to regular or archived. Throws for direct and core workstreams.

deleteWorkstream(workstreamId)

Deletes a workstream. Only works for non-direct, non-core workstreams.

stopActiveRun(workstreamId)

Aborts the in-flight agent run. Returns true if a run was stopped.

appendMemoryBlock(workstreamId, entry)

Appends a note to the workstream's memory block. Triggers background compaction if entries exceed 15.

ensureBootstrapWorkstreams(userId, orgId, options?)

Creates required direct, group, and core workstreams for a user based on workstreams.bootstrap. Called during onboarding and post-onboarding.

workstreamMessageService

Manages workstream message persistence and retrieval.

Access via runtime.services.workstreamMessageService.

upsertMessages({ workstreamId, messages })

Persists an array of ChatMessage objects. Creates new rows or updates existing ones.

listMessages(workstreamId)

Returns all messages ordered by creation time (ascending).

listMessageHistoryPage({ workstreamId, take, beforeMessageId? })

Returns a cursor-paginated page of messages for infinite scroll:

ts
const page = await workstreamMessageService.listMessageHistoryPage({
  workstreamId,
  take: 6,
  beforeMessageId: 'msg-abc123', // optional cursor
})
// page: { messages, hasMore, prevCursor }

listMessagesAfterCursor(workstreamId, afterMessageId?)

Returns all messages after a given cursor. Used for loading uncompacted messages during compaction.

listRecentMessages(workstreamId, limit)

Returns the most recent N messages.

addUserMessage({ messageId, workstreamId, content })

Creates and persists a user message.

addAgentMessage({ messageId, workstreamId, parts, metadata? })

Creates and persists an assistant message with typed parts.

memoryService

Manages memory search, creation, and fact extraction.

Access via runtime.services.memoryService.

searchOrganizationMemories(orgId, query)

Searches organization-scoped memories with vector search and reranking.

searchAgentMemories(orgId, agentName, query)

Searches agent-scoped memories.

searchAllMemoriesBatched({ orgId, agentName?, query, fastMode?, allowMultiScopeRerank? })

Batched multi-scope search across organization and agent memories with optional reranking.

getTopMemories({ orgId, agentName?, limit? })

Returns pre-seeded high-importance memories for context injection. Returns formatted text or undefined.

createOrganizationMemory({ orgId, content, memoryType, metadata?, importance? })

Directly inserts a memory into the organization scope.

createAgentMemory({ orgId, agentName, content, memoryType, metadata?, importance? })

Directly inserts a memory into an agent scope.

addConversationMemories({ orgId, input, output, sourceId?, onboardStatus?, agentName?, historyMessages?, memoryBlock?, attachmentContext?, agentNames? })

Full memory extraction pipeline: assesses importance, extracts facts, classifies against existing memories, and writes to all relevant scopes.

addExtractedFactsToScopes({ orgId, facts, source, sourceMetadata?, agentNames?, acquireLock? })

Adds pre-extracted facts to organization and agent scopes.

executionPlanService

Manages contract-driven execution plans, run lifecycle, node result submission, approvals, and resume flow.

Access via runtime.services.executionPlanService.

createPlan({ organizationId, workstreamId, leadAgentId, input })

Creates a compiled execution plan from a PlanDraft and starts a new run. Throws if an active run already exists.

replacePlan({ workstreamId, organizationId, leadAgentId, input })

Replaces the active run with a new compiled plan. Marks the old run as aborted.

getActivePlanForWorkstream(workstreamId, options?)

Returns the active serialized run with nodes, events, artifacts, approvals, checkpoints, and validation issues. Returns null if none.

hasActivePlan(workstreamId)

Returns true if the workstream has an active execution run.

submitNodeResult({ workstreamId, emittedBy, input })

Submits a node result. The executor validates structured output, deliverables, completion checks, and failure handling before advancing the run.

resumeRun({ workstreamId, emittedBy, input })

Resumes an interrupted run from its latest checkpoint.

getActivePlanToolResult({ workstreamId, includeEvents?, includeArtifacts?, includeApprovals?, includeCheckpoints?, includeValidationIssues? })

Returns the active run formatted as an ExecutionPlanToolResultData for tool output.

attachmentService

Manages file attachment operations.

Access via runtime.services.attachmentService.

hydrateSignedFileUrlsInMessageParts({ parts, orgId, userId })

Re-signs presigned URLs in message file parts.

listReadableUploadsFromMessages({ messages, orgId, userId })

Extracts readable upload metadata from message history.

extractStoredAttachmentText({ storageKey, name, contentType })

Extracts text content from a stored attachment.

generatedDocumentStorageService

Access via runtime.services.generatedDocumentStorageService.

Used for runtime-owned generated artifacts that should live in the configured S3 bucket, such as exported documents or pipeline outputs.

recentActivityService

Manages recent activity events and the deduplicated activity list.

Access via runtime.services.recentActivityService.

recordEvent(params: { orgId, userId, source, event })

Records a raw activity event and upserts the deduplicated recent activity entry.

listRecentActivities(params: { orgId, userId, limit? })

Returns the visible recent activity list sorted by latest event time.

listRecentEvents(params: { orgId, userId, limit? })

Returns raw event history for debugging.

Transport And Approval Helpers

These runtime-owned helpers are also exposed on runtime.services for host transport layers:

  • createWorkstreamTurnStream
  • createWorkstreamApprovalContinuationStream
  • runWorkstreamTurnInBackground
  • isApprovalContinuationRequest
  • verifyMutatingApproval