Merge pull request #213 from multica-ai/codex/remove-legacy-subagent-registry

refactor: remove legacy async subagent orchestration path
This commit is contained in:
Jiayuan Zhang 2026-02-17 00:30:19 +08:00 committed by GitHub
commit db63369837
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 10 additions and 3583 deletions

View file

@ -162,21 +162,7 @@ interface InboundMessageEvent {
timestamp: number
}
interface SubagentRunInfo {
runId: string
label: string | undefined
task: string
status: 'queued' | 'running' | 'ok' | 'error' | 'timeout' | 'unknown'
groupId: string | undefined
groupLabel: string | undefined
startedAt: number | undefined
endedAt: number | undefined
createdAt: number
findings: string | undefined
error: string | undefined
}
interface ElectronAPI {
interface ElectronAPI {
app: {
getFlags: () => Promise<{ forceOnboarding: boolean }>
}
@ -251,10 +237,7 @@ interface ElectronAPI {
stop: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }>
start: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }>
}
subagents: {
list: (requesterSessionId: string) => Promise<SubagentRunInfo[]>
}
cron: {
cron: {
list: () => Promise<unknown[]>
toggle: (jobId: string) => Promise<{ ok: boolean }>
remove: (jobId: string) => Promise<{ ok: boolean }>

View file

@ -11,7 +11,6 @@ export { registerCronIpcHandlers } from './cron.js'
export { registerHeartbeatIpcHandlers } from './heartbeat.js'
export { registerAppStateIpcHandlers } from './app-state.js'
export { registerAuthHandlers, setMainWindow as setAuthMainWindow, handleAuthDeepLink } from './auth.js'
export { registerSubagentsIpcHandlers } from './subagents.js'
import { registerAgentIpcHandlers, cleanupAgent } from './agent.js'
import { registerAuthHandlers } from './auth.js'
@ -23,7 +22,6 @@ import { registerChannelsIpcHandlers } from './channels.js'
import { registerCronIpcHandlers } from './cron.js'
import { registerHeartbeatIpcHandlers } from './heartbeat.js'
import { registerAppStateIpcHandlers } from './app-state.js'
import { registerSubagentsIpcHandlers } from './subagents.js'
/**
* Register all IPC handlers.
@ -40,7 +38,6 @@ export function registerAllIpcHandlers(): void {
registerHeartbeatIpcHandlers()
registerAppStateIpcHandlers()
registerAuthHandlers()
registerSubagentsIpcHandlers()
}
/**

View file

@ -1,63 +0,0 @@
/**
* Subagent IPC handlers for Electron main process.
*
* Exposes subagent registry data to the renderer process
* for the Subagent Dashboard UI.
*/
import { ipcMain } from 'electron'
import { listSubagentRuns, getSubagentGroup } from '@multica/core'
import type { SubagentRunRecord } from '@multica/core'
/** Serializable DTO for renderer consumption */
export interface SubagentRunInfo {
runId: string
label: string | undefined
task: string
status: 'queued' | 'running' | 'ok' | 'error' | 'timeout' | 'unknown'
groupId: string | undefined
groupLabel: string | undefined
startedAt: number | undefined
endedAt: number | undefined
createdAt: number
findings: string | undefined
error: string | undefined
}
function deriveStatus(record: SubagentRunRecord): SubagentRunInfo['status'] {
if (!record.startedAt) return 'queued'
if (!record.endedAt) return 'running'
return record.outcome?.status ?? 'unknown'
}
function toDTO(record: SubagentRunRecord): SubagentRunInfo {
const group = record.groupId ? getSubagentGroup(record.groupId) : undefined
return {
runId: record.runId,
label: record.label,
task: record.task,
status: deriveStatus(record),
groupId: record.groupId,
groupLabel: group?.label,
startedAt: record.startedAt,
endedAt: record.endedAt,
createdAt: record.createdAt,
findings: record.findings ? record.findings.slice(0, 500) : undefined,
error: record.outcome?.error,
}
}
/** Hide completed runs after 5 minutes */
const COMPLETED_RETENTION_MS = 5 * 60 * 1000
/**
* Register all Subagent-related IPC handlers.
*/
export function registerSubagentsIpcHandlers(): void {
ipcMain.handle('subagents:list', async (_event, requesterSessionId: string) => {
const now = Date.now()
const runs = listSubagentRuns(requesterSessionId)
return runs
.filter((r) => !r.endedAt || now - r.endedAt < COMPLETED_RETENTION_MS)
.map(toDTO)
})
}

View file

@ -105,23 +105,9 @@ export interface LocalChatApproval {
expiresAtMs: number
}
export interface SubagentRunInfo {
runId: string
label: string | undefined
task: string
status: 'queued' | 'running' | 'ok' | 'error' | 'timeout' | 'unknown'
groupId: string | undefined
groupLabel: string | undefined
startedAt: number | undefined
endedAt: number | undefined
createdAt: number
findings: string | undefined
error: string | undefined
}
// ============================================================================
// Expose typed API to Renderer process
// ============================================================================
// ============================================================================
// Expose typed API to Renderer process
// ============================================================================
const electronAPI = {
// App-level
@ -291,14 +277,8 @@ const electronAPI = {
ipcRenderer.invoke('channels:start', channelId, accountId),
},
// Subagent dashboard
subagents: {
list: (requesterSessionId: string): Promise<SubagentRunInfo[]> =>
ipcRenderer.invoke('subagents:list', requesterSessionId),
},
// Cron jobs management
cron: {
// Cron jobs management
cron: {
list: () => ipcRenderer.invoke('cron:list'),
toggle: (jobId: string) => ipcRenderer.invoke('cron:toggle', jobId),
remove: (jobId: string) => ipcRenderer.invoke('cron:remove', jobId),

View file

@ -3,13 +3,9 @@ import { useNavigate } from 'react-router-dom'
import { Loading } from '@multica/ui/components/ui/loading'
import { ChatView } from '@multica/ui/components/chat-view'
import { useLocalChat } from '../hooks/use-local-chat'
import { useSubagentPolling } from '../hooks/use-subagent-polling'
import { useSubagentsStore } from '../stores/subagents'
import { useProviderStore } from '../stores/provider'
import { ApiKeyDialog } from './api-key-dialog'
import { OAuthDialog } from './oauth-dialog'
import { SubagentStatusBar } from './subagent-status-bar'
import { SubagentDashboard } from './subagent-dashboard'
interface LocalChatProps {
initialPrompt?: string
@ -37,11 +33,6 @@ export function LocalChat({ initialPrompt }: LocalChatProps) {
const { providers, current, setProvider: switchProvider, refresh: refreshProviders } = useProviderStore()
// Subagent polling + dashboard
useSubagentPolling(agentId)
const subagentRuns = useSubagentsStore((s) => s.runs)
const [dashboardOpen, setDashboardOpen] = useState(false)
// Provider config dialog state
const [apiKeyDialogOpen, setApiKeyDialogOpen] = useState(false)
const [oauthDialogOpen, setOauthDialogOpen] = useState(false)
@ -126,12 +117,6 @@ export function LocalChat({ initialPrompt }: LocalChatProps) {
loadMore={loadMore}
resolveApproval={resolveApproval}
errorAction={errorAction}
bottomSlot={
<SubagentStatusBar
runs={subagentRuns}
onViewClick={() => setDashboardOpen(true)}
/>
}
/>
{currentMeta && currentMeta.authMethod === 'api-key' && (
@ -154,12 +139,6 @@ export function LocalChat({ initialPrompt }: LocalChatProps) {
onSuccess={handleProviderConfigSuccess}
/>
)}
<SubagentDashboard
open={dashboardOpen}
onOpenChange={setDashboardOpen}
runs={subagentRuns}
/>
</>
)
}

View file

@ -1,122 +0,0 @@
import { useState, useEffect } from 'react'
import {
Sheet,
SheetContent,
SheetHeader,
SheetTitle,
SheetDescription,
} from '@multica/ui/components/ui/sheet'
import { Badge } from '@multica/ui/components/ui/badge'
interface SubagentDashboardProps {
open: boolean
onOpenChange: (open: boolean) => void
runs: SubagentRunInfo[]
}
const STATUS_CONFIG: Record<SubagentRunInfo['status'], { label: string; variant: 'default' | 'secondary' | 'destructive' | 'outline' }> = {
running: { label: 'Running', variant: 'default' },
queued: { label: 'Queued', variant: 'secondary' },
ok: { label: 'Completed', variant: 'outline' },
error: { label: 'Error', variant: 'destructive' },
timeout: { label: 'Timeout', variant: 'destructive' },
unknown: { label: 'Unknown', variant: 'secondary' },
}
function formatElapsed(startMs: number, endMs?: number): string {
const elapsed = (endMs ?? Date.now()) - startMs
const seconds = Math.floor(elapsed / 1000)
if (seconds < 60) return `${seconds}s`
const minutes = Math.floor(seconds / 60)
const remainSec = seconds % 60
if (minutes < 60) return `${minutes}m ${remainSec}s`
const hours = Math.floor(minutes / 60)
const remainMin = minutes % 60
return `${hours}h ${remainMin}m`
}
function RunCard({ run }: { run: SubagentRunInfo }) {
const config = STATUS_CONFIG[run.status]
const isActive = run.status === 'running' || run.status === 'queued'
const [, setTick] = useState(0)
// Tick every 1s for running agents to update elapsed time
useEffect(() => {
if (!isActive) return
const timer = setInterval(() => setTick((t) => t + 1), 1000)
return () => clearInterval(timer)
}, [isActive])
return (
<div className="rounded-lg border bg-card p-3 space-y-2">
<div className="flex items-start justify-between gap-2">
<div className="min-w-0 flex-1">
<p className="text-sm font-medium truncate">
{run.label || run.task.slice(0, 80)}
</p>
{run.label && (
<p className="text-xs text-muted-foreground truncate mt-0.5">
{run.task.slice(0, 120)}
</p>
)}
</div>
<Badge variant={config.variant} className="shrink-0">
{config.label}
</Badge>
</div>
<div className="flex items-center gap-3 text-xs text-muted-foreground">
{run.startedAt && (
<span>{formatElapsed(run.startedAt, run.endedAt)}</span>
)}
{run.groupLabel && (
<span className="bg-muted px-1.5 py-0.5 rounded text-[10px]">
{run.groupLabel}
</span>
)}
</div>
{run.error && (
<p className="text-xs text-destructive bg-destructive/5 rounded px-2 py-1 font-mono">
{run.error}
</p>
)}
{run.findings && !run.error && (
<p className="text-xs text-muted-foreground bg-muted/50 rounded px-2 py-1.5 font-mono whitespace-pre-wrap line-clamp-4">
{run.findings.slice(0, 200)}
</p>
)}
</div>
)
}
export function SubagentDashboard({ open, onOpenChange, runs }: SubagentDashboardProps) {
// Sort: active first (running, queued), then by createdAt desc
const sorted = [...runs].sort((a, b) => {
const aActive = a.status === 'running' || a.status === 'queued' ? 0 : 1
const bActive = b.status === 'running' || b.status === 'queued' ? 0 : 1
if (aActive !== bActive) return aActive - bActive
return b.createdAt - a.createdAt
})
return (
<Sheet open={open} onOpenChange={onOpenChange}>
<SheetContent side="right" className="sm:max-w-md">
<SheetHeader>
<SheetTitle>Subagents ({runs.length})</SheetTitle>
<SheetDescription>Child agents spawned by the current session</SheetDescription>
</SheetHeader>
<div className="flex-1 min-h-0 overflow-y-auto px-4 pb-4 space-y-2">
{sorted.length === 0 ? (
<p className="text-sm text-muted-foreground text-center py-8">
No subagents yet
</p>
) : (
sorted.map((run) => <RunCard key={run.runId} run={run} />)
)}
</div>
</SheetContent>
</Sheet>
)
}

View file

@ -1,77 +0,0 @@
import { useState, useEffect, useRef } from 'react'
/** Auto-dismiss delay after all runs complete (ms) */
const DISMISS_DELAY_MS = 30_000
interface SubagentStatusBarProps {
runs: SubagentRunInfo[]
onViewClick: () => void
}
export function SubagentStatusBar({ runs, onViewClick }: SubagentStatusBarProps) {
const [dismissed, setDismissed] = useState(false)
const prevHadActiveRef = useRef(false)
const running = runs.filter((r) => r.status === 'running' || r.status === 'queued').length
const completed = runs.filter((r) => r.status !== 'running' && r.status !== 'queued').length
const hasActive = running > 0
// Auto-dismiss after all runs complete
useEffect(() => {
if (hasActive) {
// Reset dismissed state when new active runs appear
prevHadActiveRef.current = true
setDismissed(false)
return
}
// Only auto-dismiss if we previously had active runs (transition to all-complete)
if (!prevHadActiveRef.current || runs.length === 0) return
const timer = setTimeout(() => setDismissed(true), DISMISS_DELAY_MS)
return () => clearTimeout(timer)
}, [hasActive, runs.length])
if (runs.length === 0 || dismissed) return null
let statusText: string
if (running > 0 && completed > 0) {
statusText = `${running} running, ${completed} completed`
} else if (running > 0) {
statusText = `${running} subagent${running > 1 ? 's' : ''} running`
} else {
statusText = `${completed} completed`
}
return (
<div className="container px-4">
<div className="flex items-center justify-between h-8 px-3 rounded-lg bg-muted/50 border text-xs text-muted-foreground">
<div className="flex items-center gap-2">
{running > 0 && (
<span className="relative flex h-2 w-2">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-blue-400 opacity-75" />
<span className="relative inline-flex rounded-full h-2 w-2 bg-blue-500" />
</span>
)}
<span>{statusText}</span>
</div>
<div className="flex items-center gap-2">
<button
onClick={onViewClick}
className="text-xs font-medium text-foreground/70 hover:text-foreground transition-colors"
>
View
</button>
{!hasActive && (
<button
onClick={() => setDismissed(true)}
className="text-xs text-muted-foreground/50 hover:text-muted-foreground transition-colors"
>
Dismiss
</button>
)}
</div>
</div>
</div>
)
}

View file

@ -1,33 +0,0 @@
import { useEffect, useRef } from 'react'
import { useSubagentsStore, selectHasActiveRuns } from '../stores/subagents'
const ACTIVE_INTERVAL_MS = 2_000
const IDLE_INTERVAL_MS = 10_000
/**
* Polls for subagent runs at an adaptive interval.
* 2s when there are active (running/queued) runs, 10s otherwise.
*/
export function useSubagentPolling(agentId: string | null): void {
const fetch = useSubagentsStore((s) => s.fetch)
const runs = useSubagentsStore((s) => s.runs)
const hasActive = selectHasActiveRuns(runs)
const intervalRef = useRef<ReturnType<typeof setInterval> | null>(null)
useEffect(() => {
if (!agentId) return
// Fetch immediately
fetch(agentId)
const ms = hasActive ? ACTIVE_INTERVAL_MS : IDLE_INTERVAL_MS
intervalRef.current = setInterval(() => fetch(agentId), ms)
return () => {
if (intervalRef.current) {
clearInterval(intervalRef.current)
intervalRef.current = null
}
}
}, [agentId, hasActive, fetch])
}

View file

@ -1,29 +0,0 @@
import { create } from 'zustand'
interface SubagentsStore {
runs: SubagentRunInfo[]
fetch: (requesterSessionId: string) => Promise<void>
}
export const useSubagentsStore = create<SubagentsStore>()((set) => ({
runs: [],
fetch: async (requesterSessionId: string) => {
try {
const result = await window.electronAPI.subagents.list(requesterSessionId)
if (Array.isArray(result)) {
set({ runs: result })
}
} catch (err) {
console.error('[SubagentsStore] Failed to fetch:', err)
}
},
}))
export function selectRunningCount(runs: SubagentRunInfo[]): number {
return runs.filter((r) => r.status === 'running' || r.status === 'queued').length
}
export function selectHasActiveRuns(runs: SubagentRunInfo[]): boolean {
return runs.some((r) => r.status === 'running' || r.status === 'queued')
}

View file

@ -13,18 +13,6 @@ export * from "./tools.js";
export * from "./tools/policy.js";
export * from "./tools/groups.js";
export * from "./extract-text.js";
// @deprecated — Old subagent registry. Use `delegate` tool instead.
// Kept temporarily for desktop app compatibility.
export {
listSubagentRuns,
getSubagentRun,
getSubagentGroup,
} from "./subagent/registry.js";
export type {
SubagentRunRecord,
SubagentRunOutcome,
SubagentGroup,
} from "./subagent/types.js";
export {
readClaudeCliCredentials,

View file

@ -1,172 +0,0 @@
# Subagent System
The subagent system allows a parent agent to spawn isolated child agents that run tasks in parallel and report results back automatically.
## Architecture Overview
```
┌─────────────────────────────────────────────────────────────────────┐
│ Parent Agent (runner.ts) │
│ │
│ tools: sessions_spawn, sessions_list │
│ state: resolvedProvider, toolsOptions │
└──────────┬──────────────────────────────────────────────────────────┘
│ sessions_spawn(task, label, timeoutSeconds)
┌─────────────────────────────────────────────────────────────────────┐
│ Spawn Flow (sessions-spawn.ts) │
│ │
│ 1. Build subagent system prompt (announce.ts) │
│ 2. hub.createSubagent(childSessionId, { provider, model }) │
│ 3. registerSubagentRun({ start: () => childAgent.write(task) }) │
│ 4. Return { status: "accepted", runId, childSessionId } │
└──────────┬──────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Concurrency Queue (command-queue.ts) │
│ │
│ Lane: "subagent" — max 10 concurrent (configurable) │
│ Queued runs wait for a slot before start() is called │
└──────────┬──────────────────────────────────────────────────────────┘
│ slot acquired
┌─────────────────────────────────────────────────────────────────────┐
│ Child Agent Execution │
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ AsyncAgent (async-agent.ts) │ │
│ │ - Isolated session with restricted tools (isSubagent=true) │ │
│ │ - Inherits parent's LLM provider │ │
│ │ - System prompt: task focus + error reporting rules │ │
│ │ - Tracks lastRunError for error propagation │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ watchChildAgent (registry.ts) │ │
│ │ - Sets startedAt, starts timeout timer │ │
│ │ - waitForIdle() — waits for child's task queue to drain │ │
│ │ - onClose() — handles explicit close (timeout kill, etc.) │ │
│ └───────────────────────────────────────────────────────────────┘ │
└──────────┬──────────────────────────────────────────────────────────┘
│ child completes / errors / times out
┌─────────────────────────────────────────────────────────────────────┐
│ Completion Handling (registry.ts) │
│ │
│ handleRunCompletion(record) │
│ │ │
│ ├─ Phase 1: captureFindings() │
│ │ - Read last assistant reply from child session JSONL │
│ │ - Falls back to last toolResult if no assistant text │
│ │ - Persists findings to record before session deletion │
│ │ │
│ ├─ Session Cleanup │
│ │ - cleanup="delete": rm child session dir + hub.closeAgent() │
│ │ - cleanup="keep": preserve for audit │
│ │ │
│ └─ Phase 2: checkAndAnnounce(requesterSessionId) │
│ - Finds all unannounced, completed runs with findings │
│ - Calls runCoalescedAnnounceFlow() │
│ - Marks records: announced=true, archiveAtMs=now+60min │
└──────────┬──────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Announcement Delivery (announce.ts) │
│ │
│ runCoalescedAnnounceFlow(requesterSessionId, records) │
│ │ │
│ ├─ Format message: formatCoalescedAnnouncementMessage() │
│ │ - Single record: task name, status, findings, stats │
│ │ - Multiple records: combined report with all findings │
│ │ │
│ ├─ Two-tier delivery: │
│ │ │
│ │ Tier 1: BUSY (parent running or has pending writes) │
│ │ └─ enqueueAnnounce() → announce-queue.ts │
│ │ - Debounce 1s to batch nearby completions │
│ │ - Drain via writeInternal() when parent finishes │
│ │ │
│ │ Tier 2: IDLE (parent not running) │
│ │ └─ sendAnnounceDirect() │
│ │ - writeInternal(msg, { forwardAssistant, persistResponse })│
│ │ │
│ └─ All delivery uses writeInternal() (marks as internal: true) │
│ → Prevents announcement from showing as user bubble in UI │
│ → LLM processes findings and responds naturally to user │
└──────────┬──────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Record Lifecycle (registry.ts) │
│ │
│ created → startedAt → endedAt → findingsCaptured → announced │
│ │
│ After announcement: │
│ - Record kept with archiveAtMs = now + 60 min │
│ - sessions_list can still query records during this window │
│ - Sweeper runs every 60s, removes expired records │
│ - When all records removed, sweeper stops │
└─────────────────────────────────────────────────────────────────────┘
```
## Key Files
| File | Purpose |
|------|---------|
| `sessions-spawn.ts` | Tool: spawns a child agent with task, label, timeout, provider |
| `sessions-list.ts` | Tool: lists subagent runs and their status |
| `registry.ts` | Lifecycle management: register, watch, capture, announce, archive |
| `announce.ts` | System prompt builder, findings reader, message formatter, delivery |
| `announce-queue.ts` | Debounced queue for batching announcements when parent is busy |
| `command-queue.ts` | Concurrency limiter for subagent lane slots |
| `lanes.ts` | Lane config: max concurrency (10), default timeout (1800s) |
| `types.ts` | Shared types: SubagentRunRecord, SubagentRunOutcome, etc. |
| `registry-store.ts` | Persistence: save/load runs to disk for crash recovery |
## Provider Inheritance
Subagents inherit the parent's resolved LLM provider:
```
runner.ts (resolvedProvider)
→ toolsOptions.provider
→ tools.ts (CreateToolsOptions.provider)
→ sessions-spawn.ts (options.provider)
→ hub.createSubagent({ provider })
```
When the user switches providers via UI (`setProvider()`), `toolsOptions.provider` is updated in sync so future spawns use the new provider.
## Error Propagation
```
Child tool error (e.g., API 401)
→ Subagent LLM sees error, includes in final message (system prompt rule)
→ captureFindings() reads final message
→ Announcement includes error in findings
→ Parent LLM sees error and can inform user
Child run error (e.g., missing API key for provider)
→ AsyncAgent._lastRunError set
→ registry.ts checks childAgent.lastRunError after waitForIdle()
→ outcome = { status: "error", error: "No API key configured..." }
→ Announcement: "task failed: No API key configured..."
```
## Timeout Behavior
Default: 1800s (30 min). System prompt guides the parent LLM:
- Simple tasks: 1800s (default)
- Moderate tasks: 1800-2400s (30-40 min)
- Complex tasks: 2400-3600s (40-60 min)
On timeout:
1. Timeout timer fires in `watchChildAgent()`
2. `cleanup({ status: "timeout" })` is called
3. Child agent is closed via `hub.closeAgent()`
4. Findings are captured from whatever the child wrote so far
5. Announcement reports "timed out" with partial findings

View file

@ -1,79 +0,0 @@
import { describe, it, expect, afterEach } from "vitest";
import { mkdtempSync, rmSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { writeEntries } from "../session/storage.js";
import { readLatestAssistantReply } from "./announce.js";
import type { SessionEntry } from "../session/types.js";
describe("readLatestAssistantReply", () => {
let testDir: string;
afterEach(() => {
if (testDir) {
rmSync(testDir, { recursive: true, force: true });
}
});
async function seedSession(sessionId: string, entries: SessionEntry[]) {
await writeEntries(sessionId, entries, { baseDir: testDir });
}
it("returns the latest non-empty assistant text when the last assistant message is tool-only", async () => {
testDir = mkdtempSync(join(tmpdir(), "announce-test-"));
const sessionId = "child-session-1";
await seedSession(sessionId, [
{
type: "message",
timestamp: 1,
message: {
role: "assistant",
content: [{ type: "text", text: "南京天气12°C。" }],
},
} as SessionEntry,
{
type: "message",
timestamp: 2,
message: {
role: "assistant",
content: [{ type: "toolCall", id: "tool-1", name: "weather", arguments: { city: "Nanjing" } }],
},
} as SessionEntry,
]);
const result = readLatestAssistantReply(sessionId, { baseDir: testDir });
expect(result).toBe("南京天气12°C。");
});
it("falls back to latest toolResult text when no assistant text exists", async () => {
testDir = mkdtempSync(join(tmpdir(), "announce-test-"));
const sessionId = "child-session-2";
await seedSession(sessionId, [
{
type: "message",
timestamp: 1,
message: {
role: "assistant",
content: [{ type: "toolCall", id: "tool-2", name: "weather", arguments: { city: "Nanjing" } }],
},
} as SessionEntry,
{
type: "message",
timestamp: 2,
message: {
role: "toolResult",
toolCallId: "tool-2",
toolName: "weather",
content: [{ type: "text", text: "{\"city\":\"Nanjing\",\"tempC\":12,\"condition\":\"Sunny\"}" }],
isError: false,
},
} as SessionEntry,
]);
const result = readLatestAssistantReply(sessionId, { baseDir: testDir });
expect(result).toContain("\"city\":\"Nanjing\"");
expect(result).toContain("\"condition\":\"Sunny\"");
});
});

View file

@ -1,203 +0,0 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
enqueueAnnounce,
resetAnnounceQueuesForTests,
getAnnounceQueueDepth,
type AnnounceQueueItem,
type AnnounceQueueSettings,
} from "./announce-queue.js";
afterEach(() => {
resetAnnounceQueuesForTests();
});
function makeItem(overrides?: Partial<AnnounceQueueItem>): AnnounceQueueItem {
return {
prompt: "test prompt",
summaryLine: "test summary",
enqueuedAt: Date.now(),
requesterSessionId: "session-1",
...overrides,
};
}
const FAST_SETTINGS: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 20,
dropPolicy: "old",
};
describe("announce queue", () => {
it("enqueues an item and drains via send callback", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
enqueueAnnounce({
key: "test",
item: makeItem(),
settings: FAST_SETTINGS,
send,
});
// Wait for async drain
await new Promise((r) => setTimeout(r, 50));
expect(sent).toHaveLength(1);
expect(sent[0]!.prompt).toBe("test prompt");
});
it("batches items in collect mode", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
const collectSettings: AnnounceQueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 20,
dropPolicy: "old",
};
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt 1" }),
settings: collectSettings,
send,
});
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt 2" }),
settings: collectSettings,
send,
});
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt 3" }),
settings: collectSettings,
send,
});
await new Promise((r) => setTimeout(r, 50));
// Collect mode batches all into one send
expect(sent).toHaveLength(1);
expect(sent[0]!.prompt).toContain("prompt 1");
expect(sent[0]!.prompt).toContain("prompt 2");
expect(sent[0]!.prompt).toContain("prompt 3");
expect(sent[0]!.prompt).toContain("3 queued announce(s)");
});
it("sends items individually in followup mode", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt A" }),
settings: FAST_SETTINGS,
send,
});
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt B" }),
settings: FAST_SETTINGS,
send,
});
await new Promise((r) => setTimeout(r, 50));
expect(sent).toHaveLength(2);
expect(sent[0]!.prompt).toBe("prompt A");
expect(sent[1]!.prompt).toBe("prompt B");
});
it("respects cap with 'new' drop policy (rejects new items)", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => {
// Slow send to keep items in queue
await new Promise((r) => setTimeout(r, 200));
sent.push(item);
};
const cappedSettings: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 2,
dropPolicy: "new",
};
const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
expect(r1).toBe(true);
expect(r2).toBe(true);
expect(r3).toBe(false); // Rejected — cap reached
});
it("respects cap with 'old' drop policy (drops oldest)", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => {
await new Promise((r) => setTimeout(r, 200));
sent.push(item);
};
const cappedSettings: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 2,
dropPolicy: "old",
};
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
// Queue should have items 2 and 3 (oldest was dropped)
expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2);
});
it("cleans up queue after drain completes", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
enqueueAnnounce({
key: "test",
item: makeItem(),
settings: FAST_SETTINGS,
send,
});
await new Promise((r) => setTimeout(r, 50));
expect(sent).toHaveLength(1);
expect(getAnnounceQueueDepth("test")).toBe(0);
});
it("debounces before draining", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
const debouncedSettings: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 100,
cap: 20,
dropPolicy: "old",
};
enqueueAnnounce({
key: "test",
item: makeItem(),
settings: debouncedSettings,
send,
});
// Should not have sent yet (debounce)
await new Promise((r) => setTimeout(r, 30));
expect(sent).toHaveLength(0);
// Wait for debounce to complete
await new Promise((r) => setTimeout(r, 150));
expect(sent).toHaveLength(1);
});
});

View file

@ -1,315 +0,0 @@
/**
* Announce queue for subagent result delivery.
*
* Handles queuing and batching of subagent announcements when the parent
* agent is busy. Supports debounce, cap, drop policy, and collect mode.
*
* Ported from OpenClaw (MIT license), adapted for Super Multica.
*/
// ============================================================================
// Types
// ============================================================================
export type AnnounceQueueMode =
/** Try steer, no queue fallback */
| "steer"
/** Try steer, fall back to queue */
| "steer-backlog"
/** Queue and send items individually */
| "followup"
/** Queue and batch all items into one combined prompt */
| "collect";
export type AnnounceDropPolicy =
/** Drop oldest items when cap reached */
| "old"
/** Drop newest items when cap reached */
| "new"
/** Summarize dropped items */
| "summarize";
export type AnnounceQueueItem = {
prompt: string;
summaryLine?: string;
enqueuedAt: number;
requesterSessionId: string;
};
export type AnnounceQueueSettings = {
mode: AnnounceQueueMode;
debounceMs?: number;
cap?: number;
dropPolicy?: AnnounceDropPolicy;
};
type AnnounceQueueState = {
items: AnnounceQueueItem[];
draining: boolean;
lastEnqueuedAt: number;
mode: AnnounceQueueMode;
debounceMs: number;
cap: number;
dropPolicy: AnnounceDropPolicy;
droppedCount: number;
summaryLines: string[];
send: (item: AnnounceQueueItem) => Promise<void>;
};
// ============================================================================
// Defaults
// ============================================================================
const DEFAULT_DEBOUNCE_MS = 1000;
const DEFAULT_CAP = 20;
const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize";
export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = {
mode: "steer-backlog",
debounceMs: DEFAULT_DEBOUNCE_MS,
cap: DEFAULT_CAP,
dropPolicy: DEFAULT_DROP_POLICY,
};
// ============================================================================
// Module state
// ============================================================================
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
// ============================================================================
// Public API
// ============================================================================
/**
* Enqueue an announcement for delivery. Returns true if enqueued,
* false if dropped (cap + "new" drop policy).
*/
export function enqueueAnnounce(params: {
key: string;
item: AnnounceQueueItem;
settings: AnnounceQueueSettings;
send: (item: AnnounceQueueItem) => Promise<void>;
}): boolean {
const queue = getOrCreateQueue(params.key, params.settings, params.send);
queue.lastEnqueuedAt = Date.now();
const shouldEnqueue = applyDropPolicy(queue, params.item);
if (!shouldEnqueue) {
if (queue.dropPolicy === "new") {
scheduleAnnounceDrain(params.key);
}
return false;
}
queue.items.push(params.item);
scheduleAnnounceDrain(params.key);
return true;
}
/** Reset all queues (for testing). */
export function resetAnnounceQueuesForTests(): void {
ANNOUNCE_QUEUES.clear();
}
/** Get the current queue depth for a key (for testing/diagnostics). */
export function getAnnounceQueueDepth(key: string): number {
return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0;
}
// ============================================================================
// Queue management
// ============================================================================
function getOrCreateQueue(
key: string,
settings: AnnounceQueueSettings,
send: (item: AnnounceQueueItem) => Promise<void>,
): AnnounceQueueState {
const existing = ANNOUNCE_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
if (typeof settings.debounceMs === "number") {
existing.debounceMs = Math.max(0, settings.debounceMs);
}
if (typeof settings.cap === "number" && settings.cap > 0) {
existing.cap = Math.floor(settings.cap);
}
if (settings.dropPolicy) {
existing.dropPolicy = settings.dropPolicy;
}
existing.send = send;
return existing;
}
const created: AnnounceQueueState = {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
debounceMs:
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: DEFAULT_DEBOUNCE_MS,
cap:
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: DEFAULT_CAP,
dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY,
droppedCount: 0,
summaryLines: [],
send,
};
ANNOUNCE_QUEUES.set(key, created);
return created;
}
// ============================================================================
// Drop policy
// ============================================================================
function applyDropPolicy(
queue: AnnounceQueueState,
item: AnnounceQueueItem,
): boolean {
if (queue.items.length < queue.cap) {
return true;
}
switch (queue.dropPolicy) {
case "new":
// Reject the incoming item
return false;
case "old": {
// Drop the oldest item to make room
const dropped = queue.items.shift();
if (dropped) {
queue.droppedCount++;
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
queue.summaryLines.push(summary);
}
return true;
}
case "summarize": {
// Drop the oldest item but keep a summary
const dropped = queue.items.shift();
if (dropped) {
queue.droppedCount++;
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
queue.summaryLines.push(summary);
}
return true;
}
default:
return true;
}
}
// ============================================================================
// Drain scheduling
// ============================================================================
function scheduleAnnounceDrain(key: string): void {
const queue = ANNOUNCE_QUEUES.get(key);
if (!queue || queue.draining) return;
queue.draining = true;
void (async () => {
try {
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForDebounce(queue);
if (queue.mode === "collect") {
// Batch all items into one combined prompt
const items = queue.items.splice(0, queue.items.length);
const summary = buildDropSummary(queue);
const prompt = buildCollectPrompt(items, summary);
const last = items.at(-1);
if (!last) break;
await queue.send({ ...last, prompt });
continue;
}
// followup / steer-backlog: send items individually
const summary = buildDropSummary(queue);
if (summary) {
const next = queue.items.shift();
if (!next) break;
await queue.send({ ...next, prompt: summary });
continue;
}
const next = queue.items.shift();
if (!next) break;
await queue.send(next);
}
} catch (err) {
console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
ANNOUNCE_QUEUES.delete(key);
} else {
scheduleAnnounceDrain(key);
}
}
})();
}
// ============================================================================
// Helpers
// ============================================================================
function waitForDebounce(queue: AnnounceQueueState): Promise<void> {
const elapsed = Date.now() - queue.lastEnqueuedAt;
const remaining = Math.max(0, queue.debounceMs - elapsed);
if (remaining <= 0) return Promise.resolve();
return new Promise((resolve) => setTimeout(resolve, remaining));
}
function buildDropSummary(queue: AnnounceQueueState): string | undefined {
if (queue.droppedCount === 0) return undefined;
const parts: string[] = [
`[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`,
];
if (queue.summaryLines.length > 0) {
parts.push("");
for (const line of queue.summaryLines) {
parts.push(`- ${line}`);
}
}
// Reset counters
queue.droppedCount = 0;
queue.summaryLines = [];
return parts.join("\n");
}
function buildCollectPrompt(
items: AnnounceQueueItem[],
dropSummary: string | undefined,
): string {
const parts: string[] = [
`[${items.length} queued announce(s) while agent was busy]`,
"",
];
for (let i = 0; i < items.length; i++) {
parts.push(`---`);
parts.push(`Queued #${i + 1}`);
parts.push(items[i]!.prompt);
parts.push("");
}
if (dropSummary) {
parts.push(dropSummary);
parts.push("");
}
return parts.join("\n");
}

View file

@ -1,294 +0,0 @@
import { describe, it, expect } from "vitest";
import { buildSubagentSystemPrompt, formatAnnouncementMessage, formatCoalescedAnnouncementMessage } from "./announce.js";
import type { FormatAnnouncementParams } from "./announce.js";
import type { SubagentRunRecord } from "./types.js";
describe("buildSubagentSystemPrompt", () => {
it("includes task and session context", () => {
const prompt = buildSubagentSystemPrompt({
requesterSessionId: "parent-123",
childSessionId: "child-456",
task: "Analyze the auth module for security issues",
});
expect(prompt).toContain("## Subagent Rules");
expect(prompt).toContain("Analyze the auth module for security issues");
expect(prompt).toContain("parent-123");
expect(prompt).toContain("child-456");
expect(prompt).toContain("Do NOT spawn nested subagents");
expect(prompt).toContain("## Safety");
});
it("includes label when provided", () => {
const prompt = buildSubagentSystemPrompt({
requesterSessionId: "parent-123",
childSessionId: "child-456",
label: "Security Audit",
task: "Check for vulnerabilities",
});
expect(prompt).toContain('Label: "Security Audit"');
});
it("omits label line when not provided", () => {
const prompt = buildSubagentSystemPrompt({
requesterSessionId: "parent-123",
childSessionId: "child-456",
task: "Do something",
});
expect(prompt).not.toContain("Label:");
});
});
describe("formatAnnouncementMessage", () => {
const baseParams: FormatAnnouncementParams = {
runId: "run-1",
childSessionId: "child-456",
requesterSessionId: "parent-123",
task: "Analyze code",
label: "Code Analysis",
cleanup: "delete",
outcome: { status: "ok" },
startedAt: 1000000,
endedAt: 1030000,
};
it("formats successful completion", () => {
const msg = formatAnnouncementMessage({
...baseParams,
findings: "Found 3 issues in the auth module.",
});
expect(msg).toContain('"Code Analysis" just completed successfully');
expect(msg).toContain("Found 3 issues in the auth module.");
expect(msg).toContain("runtime 30s");
expect(msg).toContain("session child-456");
});
it("formats error outcome", () => {
const msg = formatAnnouncementMessage({
...baseParams,
outcome: { status: "error", error: "API key expired" },
});
expect(msg).toContain("failed: API key expired");
});
it("formats timeout outcome", () => {
const msg = formatAnnouncementMessage({
...baseParams,
outcome: { status: "timeout" },
});
expect(msg).toContain("timed out");
});
it("shows (no output) when findings is not provided", () => {
const msg = formatAnnouncementMessage(baseParams);
expect(msg).toContain("(no output)");
});
it("uses task text when label is not provided", () => {
const paramsNoLabel: FormatAnnouncementParams = {
...baseParams,
label: undefined,
};
const msg = formatAnnouncementMessage(paramsNoLabel);
expect(msg).toContain('"Analyze code"');
});
it("formats runtime for minutes", () => {
const msg = formatAnnouncementMessage({
...baseParams,
startedAt: 1000000,
endedAt: 1150000, // 150 seconds = 2m30s
});
expect(msg).toContain("runtime 2m30s");
});
it("formats runtime for hours", () => {
const msg = formatAnnouncementMessage({
...baseParams,
startedAt: 1000000,
endedAt: 4600000, // 3600 seconds = 1h
});
expect(msg).toContain("runtime 1h");
});
it("includes summarization instruction", () => {
const msg = formatAnnouncementMessage(baseParams);
expect(msg).toContain("Summarize this naturally for the user");
expect(msg).toContain("NO_REPLY");
});
});
describe("formatCoalescedAnnouncementMessage", () => {
function makeRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
return {
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Default task",
cleanup: "delete",
createdAt: 1000000,
startedAt: 1000000,
endedAt: 1030000,
outcome: { status: "ok" },
findings: "Some findings",
findingsCaptured: true,
announced: false,
...overrides,
};
}
it("delegates to formatAnnouncementMessage for a single record", () => {
const record = makeRecord({ label: "Code Analysis" });
const coalesced = formatCoalescedAnnouncementMessage([record]);
const direct = formatAnnouncementMessage({
runId: record.runId,
childSessionId: record.childSessionId,
requesterSessionId: record.requesterSessionId,
task: record.task,
label: record.label,
cleanup: record.cleanup,
outcome: record.outcome,
startedAt: record.startedAt,
endedAt: record.endedAt,
findings: record.findings,
});
expect(coalesced).toBe(direct);
});
it("formats multiple records with all task findings and stats", () => {
const records = [
makeRecord({
runId: "run-1",
childSessionId: "child-1",
label: "Task A",
findings: "Found issue A",
startedAt: 1000000,
endedAt: 1030000,
}),
makeRecord({
runId: "run-2",
childSessionId: "child-2",
label: "Task B",
findings: "Found issue B",
startedAt: 1000000,
endedAt: 1045000, // 45 seconds
}),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("All 2 background task(s) have completed");
expect(msg).toContain('Task 1: "Task A"');
expect(msg).toContain("Found issue A");
expect(msg).toContain('Task 2: "Task B"');
expect(msg).toContain("Found issue B");
expect(msg).toContain("Total wall time: 45s");
expect(msg).toContain("2 succeeded, 0 failed");
});
it("reports mixed outcomes correctly", () => {
const records = [
makeRecord({ runId: "run-1", label: "OK Task", outcome: { status: "ok" } }),
makeRecord({ runId: "run-2", label: "Failed Task", outcome: { status: "error", error: "crash" } }),
makeRecord({ runId: "run-3", label: "Timeout Task", outcome: { status: "timeout" } }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("completed successfully");
expect(msg).toContain("failed: crash");
expect(msg).toContain("timed out");
expect(msg).toContain("1 succeeded, 2 failed");
});
it("shows (no output) for missing findings", () => {
const records = [
makeRecord({ runId: "run-1", findings: undefined }),
makeRecord({ runId: "run-2", findings: "Has output" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("(no output)");
expect(msg).toContain("Has output");
});
it("includes combined summary instruction for multi-record", () => {
const records = [
makeRecord({ runId: "run-1" }),
makeRecord({ runId: "run-2" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("MUST include findings from every task item above");
expect(msg).toContain("NO_REPLY");
});
it("includes raw findings for every task in coalesced payload", () => {
const records = [
makeRecord({ runId: "run-1", label: "南京天气", findings: "南京12°C" }),
makeRecord({ runId: "run-2", label: "上海天气", findings: "上海多云9°C" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("Raw findings from each task (MUST cover all items):");
expect(msg).toContain("[1] 南京天气:");
expect(msg).toContain("南京12°C");
expect(msg).toContain("[2] 上海天气:");
expect(msg).toContain("上海多云9°C");
expect(msg).toContain("MUST include findings from every task item above");
});
it("includes continuation prompt when next is provided", () => {
const records = [
makeRecord({ runId: "run-1", label: "AAPL data", findings: "AAPL revenue: $100B" }),
makeRecord({ runId: "run-2", label: "MSFT data", findings: "MSFT revenue: $200B" }),
];
const msg = formatCoalescedAnnouncementMessage(records, "Summarize all data and write a PDF investment report");
expect(msg).toContain("CONTINUATION TASK");
expect(msg).toContain("Summarize all data and write a PDF investment report");
expect(msg).toContain("AAPL revenue: $100B");
expect(msg).toContain("MSFT revenue: $200B");
// Should NOT contain the default summarize instruction
expect(msg).not.toContain("Summarize these results naturally for the user");
});
it("uses continuation prompt even for single record when next is provided", () => {
const records = [
makeRecord({ runId: "run-1", label: "Data collection", findings: "All data collected" }),
];
const msg = formatCoalescedAnnouncementMessage(records, "Generate the final report");
expect(msg).toContain("CONTINUATION TASK");
expect(msg).toContain("Generate the final report");
expect(msg).toContain("All data collected");
});
it("uses default summarize instruction when next is not provided", () => {
const records = [
makeRecord({ runId: "run-1" }),
makeRecord({ runId: "run-2" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).not.toContain("CONTINUATION TASK");
expect(msg).toContain("Summarize these results naturally for the user");
});
});

View file

@ -1,424 +0,0 @@
/**
* Subagent announcement flow.
*
* Handles result propagation from child parent agent:
* - Builds system prompts for child agents
* - Reads child session output
* - Formats and delivers announcement messages
*/
import { readEntries } from "../session/storage.js";
import { getHub } from "../../hub/hub-singleton.js";
import { buildSystemPrompt } from "../system-prompt/index.js";
import type {
SubagentAnnounceParams,
SubagentRunOutcome,
SubagentRunRecord,
SubagentSystemPromptParams,
} from "./types.js";
import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js";
/**
* Build the system prompt injected into a subagent session.
* Uses the structured prompt builder with "minimal" mode.
*/
export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): string {
return buildSystemPrompt({
mode: "minimal",
subagent: {
requesterSessionId: params.requesterSessionId,
childSessionId: params.childSessionId,
label: params.label,
task: params.task,
},
tools: params.tools,
});
}
/**
* Read the latest assistant reply from a session's JSONL file.
*/
export function readLatestAssistantReply(
sessionId: string,
options?: { baseDir?: string },
): string | undefined {
const entries = readEntries(sessionId, options);
let latestToolResultText: string | undefined;
// Walk backwards to find the last non-empty assistant reply.
// If no assistant text exists (e.g. run ended after tool execution),
// fall back to the latest non-empty toolResult content.
for (let i = entries.length - 1; i >= 0; i--) {
const entry = entries[i]!;
if (entry.type !== "message") continue;
const message = entry.message;
if (message.role === "assistant") {
const text = extractAssistantText(message);
if (text) return text;
continue;
}
if (message.role === "toolResult" && !latestToolResultText) {
const text = extractToolResultText(message);
if (text) latestToolResultText = text;
}
}
return latestToolResultText;
}
/**
* Extract text content from an assistant message.
* AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[].
*/
function extractAssistantText(message: { role: string; content: unknown }): string {
return extractTextLikeContent(message.content);
}
/**
* Extract text content from a toolResult message.
*/
function extractToolResultText(message: { role: string; content: unknown }): string {
return extractTextLikeContent(message.content);
}
function extractTextLikeContent(content: unknown): string {
if (typeof content === "string") {
return sanitizeText(content);
}
if (!Array.isArray(content)) return "";
const textParts: string[] = [];
for (const block of content) {
if (!block || typeof block !== "object") continue;
if ("text" in block) {
textParts.push(String((block as { text: unknown }).text));
}
}
return sanitizeText(textParts.join("\n"));
}
/**
* Strip thinking tags and tool markers from text.
*/
function sanitizeText(text: string): string {
return text
.replace(/<thinking>[\s\S]*?<\/thinking>/g, "")
.replace(/<tool_call>[\s\S]*?<\/tool_call>/g, "")
.trim();
}
/**
* Format the duration between two timestamps as a human-readable string.
*/
function formatDuration(startMs: number, endMs: number): string {
const totalSeconds = Math.round((endMs - startMs) / 1000);
if (totalSeconds < 60) return `${totalSeconds}s`;
const minutes = Math.floor(totalSeconds / 60);
const seconds = totalSeconds % 60;
if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`;
}
/**
* Format a status label from an outcome.
*/
function formatStatusLabel(outcome: SubagentRunOutcome | undefined): string {
if (!outcome) return "completed with unknown status";
switch (outcome.status) {
case "ok":
return "completed successfully";
case "error":
return outcome.error ? `failed: ${outcome.error}` : "failed";
case "timeout":
return "timed out";
default:
return "completed with unknown status";
}
}
/** Parameters for formatAnnouncementMessage */
export interface FormatAnnouncementParams {
runId: string;
childSessionId: string;
requesterSessionId: string;
task: string;
label?: string | undefined;
cleanup: "delete" | "keep";
outcome?: SubagentRunOutcome | undefined;
startedAt?: number | undefined;
endedAt?: number | undefined;
findings?: string | undefined;
}
/**
* Format the announcement message sent to the parent agent.
*/
export function formatAnnouncementMessage(params: FormatAnnouncementParams): string {
const { task, label, outcome, findings, startedAt, endedAt, childSessionId } = params;
const displayName = label || task.slice(0, 60);
const statusLabel = formatStatusLabel(outcome);
const parts: string[] = [
`A background task "${displayName}" just ${statusLabel}.`,
"",
"Findings:",
findings || "(no output)",
];
// Stats line
const stats: string[] = [];
if (startedAt && endedAt) {
stats.push(`runtime ${formatDuration(startedAt, endedAt)}`);
}
stats.push(`session ${childSessionId}`);
parts.push("", `Stats: ${stats.join(" • ")}`);
parts.push(
"",
"Summarize this naturally for the user. Keep it brief (1-2 sentences).",
"Flow it into the conversation naturally.",
"Do not mention technical details like session IDs or that this was a background task.",
"You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).",
);
return parts.join("\n");
}
/**
* Format a coalesced announcement message from multiple completed subagent runs.
* When only one record is provided, delegates to formatAnnouncementMessage.
*
* @param next Optional continuation prompt from a SubagentGroup. When present,
* the parent agent is instructed to execute the continuation using the combined
* findings, rather than just summarizing.
*/
export function formatCoalescedAnnouncementMessage(
records: SubagentRunRecord[],
next?: string,
): string {
// Single record without continuation: delegate to existing format
if (records.length === 1 && !next) {
const r = records[0]!;
return formatAnnouncementMessage({
runId: r.runId,
childSessionId: r.childSessionId,
requesterSessionId: r.requesterSessionId,
task: r.task,
label: r.label,
cleanup: r.cleanup,
outcome: r.outcome,
startedAt: r.startedAt,
endedAt: r.endedAt,
findings: r.findings,
});
}
// Multiple records (or single with continuation): build combined message.
const parts: string[] = [
`All ${records.length} background task(s) have completed. Here are the combined results:`,
"",
];
for (let i = 0; i < records.length; i++) {
const r = records[i]!;
const displayName = r.label || r.task.slice(0, 60);
const statusLabel = formatStatusLabel(r.outcome);
const durationStr = (r.startedAt && r.endedAt)
? ` (${formatDuration(r.startedAt, r.endedAt)})`
: "";
parts.push(
`### Task ${i + 1}: "${displayName}"`,
`Status: ${statusLabel}${durationStr}`,
"",
"Findings:",
r.findings || "(no output)",
"",
);
}
// Overall stats
const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[];
const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[];
if (allStartTimes.length > 0 && allEndTimes.length > 0) {
const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes));
parts.push(`Total wall time: ${wallTime}`);
}
const okCount = records.filter(r => r.outcome?.status === "ok").length;
const failCount = records.length - okCount;
parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`);
parts.push("", "Raw findings from each task (MUST cover all items):", "");
for (let i = 0; i < records.length; i++) {
const r = records[i]!;
const displayName = r.label || r.task.slice(0, 60);
parts.push(
`[${i + 1}] ${displayName}:`,
r.findings || "(no output)",
"",
);
}
// Continuation vs. summarization
if (next) {
parts.push(
"",
"---",
"",
"CONTINUATION TASK: The user's original request requires further work using the findings above.",
"Execute the following task now, using ALL the collected data:",
"",
next,
"",
"Use the raw findings above as your data source. Call tools as needed to complete this task.",
"Do not mention technical details like session IDs or that these were background tasks.",
);
} else {
parts.push(
"",
"Summarize these results naturally for the user.",
"You MUST include findings from every task item above, without omission.",
"Keep it concise, but preserve concrete findings from each task.",
"Do not mention technical details like session IDs or that these were background tasks.",
"You can respond with NO_REPLY if no announcement is needed.",
);
}
return parts.join("\n");
}
/**
* Run the coalesced announcement flow for all completed runs of a requester.
* Uses two-tier delivery:
* 1. Queue if parent is busy (running or has pending writes), queue for
* later drain via writeInternal (with debounce to batch nearby completions)
* 2. Direct if parent is idle, send immediately via writeInternal
*
* All delivery uses writeInternal() which marks the announcement prompt as
* `internal: true` (hidden from UI). The assistant's summary response is
* forwarded to the real-time stream (`forwardAssistant: true`) so the user
* sees the result, and persisted to JSONL for future session loads.
*/
export function runCoalescedAnnounceFlow(
requesterSessionId: string,
records: SubagentRunRecord[],
next?: string,
): boolean {
const message = formatCoalescedAnnouncementMessage(records, next);
try {
const hub = getHub();
const parentAgent = hub.getAgent(requesterSessionId);
if (!parentAgent || parentAgent.closed) {
console.warn(
`[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`,
);
return false;
}
// Tier 1: BUSY — parent is running or has pending writes
// Queue the announcement for delivery via writeInternal() after the parent
// finishes its current work. We do NOT use steer() (cancels unrelated tool
// calls) or followUp() (doesn't mark entries as internal, polluting the UI).
if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) {
enqueueAnnounce({
key: requesterSessionId,
item: {
prompt: message,
summaryLine: `${records.length} subagent(s) completed`,
enqueuedAt: Date.now(),
requesterSessionId,
},
settings: DEFAULT_ANNOUNCE_SETTINGS,
send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt),
});
console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`);
return true;
}
// Tier 2: IDLE — parent is idle, send directly via writeInternal
sendAnnounceDirect(requesterSessionId, message);
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err);
return false;
}
}
/**
* Send announcement directly to parent via writeInternal.
* Used as Tier 3 (idle) and as the queue drain callback.
*/
function sendAnnounceDirect(requesterSessionId: string, message: string): void {
try {
const hub = getHub();
const parentAgent = hub.getAgent(requesterSessionId);
if (!parentAgent || parentAgent.closed) {
console.warn(
`[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`,
);
return;
}
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
} catch (err) {
console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err);
}
}
/**
* Run the full subagent announcement flow:
* 1. Read child's last assistant reply
* 2. Format announcement message
* 3. Send to parent agent via Hub
*
* @deprecated Use runCoalescedAnnounceFlow instead, which supports
* batching multiple completed runs into a single announcement.
*/
export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean {
const { requesterSessionId, childSessionId } = params;
// Read child's final output
const findings = readLatestAssistantReply(childSessionId);
// Format the announcement
const message = formatAnnouncementMessage({
runId: params.runId,
childSessionId: params.childSessionId,
requesterSessionId: params.requesterSessionId,
task: params.task,
label: params.label,
cleanup: params.cleanup,
outcome: params.outcome,
startedAt: params.startedAt,
endedAt: params.endedAt,
findings,
});
// Deliver to parent agent via Hub
try {
const hub = getHub();
const parentAgent = hub.getAgent(requesterSessionId);
if (!parentAgent || parentAgent.closed) {
console.warn(
`[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`,
);
return false;
}
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);
return false;
}
}

View file

@ -1,117 +0,0 @@
import { afterEach, describe, expect, it } from "vitest";
import {
enqueueInLane,
getLaneSize,
clearLane,
setLaneConcurrency,
resetLanesForTests,
} from "./command-queue.js";
afterEach(() => {
resetLanesForTests();
});
describe("command queue", () => {
it("runs tasks serially by default (maxConcurrent = 1)", async () => {
let active = 0;
let maxActive = 0;
const order: number[] = [];
const makeTask = (id: number) => async () => {
active += 1;
maxActive = Math.max(maxActive, active);
order.push(id);
await new Promise((r) => setTimeout(r, 10));
active -= 1;
return id;
};
const results = await Promise.all([
enqueueInLane("test", makeTask(1)),
enqueueInLane("test", makeTask(2)),
enqueueInLane("test", makeTask(3)),
]);
expect(results).toEqual([1, 2, 3]);
expect(order).toEqual([1, 2, 3]);
expect(maxActive).toBe(1);
});
it("respects maxConcurrent limit", async () => {
setLaneConcurrency("test", 3);
let active = 0;
let maxActive = 0;
const makeTask = (id: number) => async () => {
active += 1;
maxActive = Math.max(maxActive, active);
await new Promise((r) => setTimeout(r, 20));
active -= 1;
return id;
};
const results = await Promise.all([
enqueueInLane("test", makeTask(1)),
enqueueInLane("test", makeTask(2)),
enqueueInLane("test", makeTask(3)),
enqueueInLane("test", makeTask(4)),
enqueueInLane("test", makeTask(5)),
]);
expect(results).toEqual([1, 2, 3, 4, 5]);
expect(maxActive).toBe(3);
});
it("reports correct lane size", async () => {
setLaneConcurrency("test", 1);
let resolveFirst!: () => void;
const blocker = new Promise<void>((r) => {
resolveFirst = r;
});
// First task blocks the lane
const p1 = enqueueInLane("test", () => blocker);
// Second task queued
const p2 = enqueueInLane("test", async () => "done");
// 1 active + 1 queued = 2
expect(getLaneSize("test")).toBe(2);
resolveFirst();
await Promise.all([p1, p2]);
expect(getLaneSize("test")).toBe(0);
});
it("clears pending tasks", async () => {
setLaneConcurrency("test", 1);
let resolveFirst!: () => void;
const blocker = new Promise<void>((r) => {
resolveFirst = r;
});
const p1 = enqueueInLane("test", () => blocker);
const p2 = enqueueInLane("test", async () => "should-not-run");
const p3 = enqueueInLane("test", async () => "should-not-run");
const removed = clearLane("test");
expect(removed).toBe(2);
resolveFirst();
await p1;
// p2 and p3 should reject
await expect(p2).rejects.toThrow("Lane cleared");
await expect(p3).rejects.toThrow("Lane cleared");
expect(getLaneSize("test")).toBe(0);
});
it("returns 0 for unknown lane", () => {
expect(getLaneSize("nonexistent")).toBe(0);
expect(clearLane("nonexistent")).toBe(0);
});
});

View file

@ -1,158 +0,0 @@
/**
* Lane-based command queue for subagent concurrency control.
*
* Each lane maintains an independent queue with a configurable max-concurrency
* limit. Tasks beyond the limit are queued and executed FIFO as slots free up.
*
* Adapted from OpenClaw's process/command-queue.ts.
*/
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
enqueuedAt: number;
warnAfterMs: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
};
type LaneState = {
lane: string;
queue: QueueEntry[];
active: number;
maxConcurrent: number;
draining: boolean;
};
// ---------------------------------------------------------------------------
// Module state
// ---------------------------------------------------------------------------
const lanes = new Map<string, LaneState>();
function getLaneState(lane: string): LaneState {
const existing = lanes.get(lane);
if (existing) return existing;
const created: LaneState = {
lane,
queue: [],
active: 0,
maxConcurrent: 1,
draining: false,
};
lanes.set(lane, created);
return created;
}
// ---------------------------------------------------------------------------
// Drain / pump
// ---------------------------------------------------------------------------
function drainLane(lane: string): void {
const state = getLaneState(lane);
if (state.draining) return;
state.draining = true;
const pump = () => {
while (state.active < state.maxConcurrent && state.queue.length > 0) {
const entry = state.queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
entry.onWait?.(waitedMs, state.queue.length);
console.warn(
`[CommandQueue] lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
);
}
state.active += 1;
void (async () => {
try {
const result = await entry.task();
state.active -= 1;
pump();
entry.resolve(result);
} catch (err) {
state.active -= 1;
pump();
entry.reject(err);
}
})();
}
state.draining = false;
};
pump();
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/** Set (or update) the max concurrency for a lane. Triggers a drain. */
export function setLaneConcurrency(lane: string, maxConcurrent: number): void {
const state = getLaneState(lane);
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
drainLane(lane);
}
/** Enqueue a task in a specific lane. Returns a promise that resolves with the task's return value. */
export function enqueueInLane<T>(
lane: string,
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
const warnAfterMs = opts?.warnAfterMs ?? 5_000;
const state = getLaneState(lane);
return new Promise<T>((resolve, reject) => {
state.queue.push({
task: () => task(),
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
warnAfterMs,
onWait: opts?.onWait,
});
drainLane(lane);
});
}
/** Number of active + queued tasks in a lane. */
export function getLaneSize(lane: string): number {
const state = lanes.get(lane);
if (!state) return 0;
return state.queue.length + state.active;
}
/** Remove all pending (not yet active) tasks from a lane. Returns how many were removed. */
export function clearLane(lane: string): number {
const state = lanes.get(lane);
if (!state) return 0;
const removed = state.queue.length;
// Reject pending tasks so callers aren't left hanging
for (const entry of state.queue) {
entry.reject(new Error("Lane cleared"));
}
state.queue.length = 0;
return removed;
}
/** Reset all lanes (for testing). */
export function resetLanesForTests(): void {
for (const state of lanes.values()) {
for (const entry of state.queue) {
entry.reject(new Error("Reset"));
}
}
lanes.clear();
}

View file

@ -1,40 +0,0 @@
/**
* Subagent orchestration system.
*
* Provides child agent spawning, lifecycle management,
* persistent registry, and result announcement flow.
*/
export type {
SubagentRunOutcome,
SubagentRunRecord,
RegisterSubagentRunParams,
SubagentAnnounceParams,
SubagentSystemPromptParams,
} from "./types.js";
export {
initSubagentRegistry,
registerSubagentRun,
listSubagentRuns,
releaseSubagentRun,
getSubagentRun,
resetSubagentRegistryForTests,
shutdownSubagentRegistry,
} from "./registry.js";
export {
buildSubagentSystemPrompt,
readLatestAssistantReply,
formatAnnouncementMessage,
runSubagentAnnounceFlow,
formatCoalescedAnnouncementMessage,
runCoalescedAnnounceFlow,
} from "./announce.js";
export type { FormatAnnouncementParams } from "./announce.js";
export {
loadSubagentRuns,
saveSubagentRuns,
getSubagentStorePath,
} from "./registry-store.js";

View file

@ -1,37 +0,0 @@
/** Named lanes for the subagent command queue. */
export const SubagentLane = {
Subagent: "subagent",
} as const;
/** Default maximum concurrent subagent runs. */
export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 10;
// ---------------------------------------------------------------------------
// Timeout defaults
// ---------------------------------------------------------------------------
/** Default subagent timeout: 30 minutes. */
export const DEFAULT_SUBAGENT_TIMEOUT_SECONDS = 1800;
/** Maximum safe value for setTimeout (~24.8 days). */
const MAX_SAFE_TIMEOUT_MS = 2_147_000_000;
/**
* Resolve the effective timeout in milliseconds for a subagent run.
*
* - `undefined` / negative default (1800 s)
* - `0` no timeout (MAX_SAFE_TIMEOUT_MS)
* - positive number use as-is, clamped to safe range
*/
export function resolveSubagentTimeoutMs(overrideSeconds?: number): number {
if (overrideSeconds === undefined || overrideSeconds === null) {
return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000;
}
if (overrideSeconds === 0) {
return MAX_SAFE_TIMEOUT_MS; // "no timeout"
}
if (overrideSeconds < 0) {
return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000;
}
return Math.min(Math.floor(overrideSeconds) * 1000, MAX_SAFE_TIMEOUT_MS);
}

View file

@ -1,76 +0,0 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { SubagentRunRecord } from "./types.js";
const loadSubagentRunsMock = vi.fn<() => Map<string, SubagentRunRecord>>();
const saveSubagentRunsMock = vi.fn();
const readLatestAssistantReplyMock = vi.fn();
const runCoalescedAnnounceFlowMock = vi.fn(() => false);
const resolveSessionDirMock = vi.fn((sessionId: string) => `/tmp/${sessionId}`);
const closeAgentMock = vi.fn();
const getHubMock = vi.fn(() => ({ closeAgent: closeAgentMock }));
const rmSyncMock = vi.fn();
vi.mock("./registry-store.js", () => ({
loadSubagentRuns: loadSubagentRunsMock,
loadSubagentGroups: vi.fn(() => new Map()),
saveSubagentRuns: saveSubagentRunsMock,
}));
vi.mock("./announce.js", () => ({
readLatestAssistantReply: readLatestAssistantReplyMock,
runCoalescedAnnounceFlow: runCoalescedAnnounceFlowMock,
}));
vi.mock("../session/storage.js", () => ({
resolveSessionDir: resolveSessionDirMock,
}));
vi.mock("../../hub/hub-singleton.js", () => ({
getHub: getHubMock,
isHubInitialized: vi.fn(() => false),
}));
vi.mock("node:fs", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:fs")>();
return {
...actual,
rmSync: rmSyncMock,
};
});
describe("subagent registry recovery cleanup", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
loadSubagentRunsMock.mockReturnValue(new Map());
runCoalescedAnnounceFlowMock.mockReturnValue(false);
});
it("deletes child session on recovery even when findings were already captured", async () => {
const now = Date.now();
const record: SubagentRunRecord = {
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "task",
cleanup: "delete",
createdAt: now - 1000,
startedAt: now - 900,
endedAt: now - 100,
outcome: { status: "ok" },
findings: "done",
findingsCaptured: true,
cleanupHandled: false,
announced: false,
};
loadSubagentRunsMock.mockReturnValue(new Map([["run-1", record]]));
const registry = await import("./registry.js");
registry.initSubagentRegistry();
expect(readLatestAssistantReplyMock).not.toHaveBeenCalled();
expect(resolveSessionDirMock).toHaveBeenCalledWith("child-1");
expect(rmSyncMock).toHaveBeenCalledWith("/tmp/child-1", { recursive: true, force: true });
});
});

View file

@ -1,127 +0,0 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, rmSync, existsSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import type { SubagentRunRecord } from "./types.js";
// We need to test the store functions with a custom directory.
// Since the store uses DATA_DIR from shared, we test the serialization logic directly.
describe("registry-store serialization", () => {
let tempDir: string;
beforeEach(() => {
tempDir = mkdtempSync(join(tmpdir(), "subagent-store-test-"));
});
afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
});
it("round-trips SubagentRunRecord through JSON", () => {
const record: SubagentRunRecord = {
runId: "run-123",
childSessionId: "child-456",
requesterSessionId: "parent-789",
task: "Analyze code quality",
label: "Code Review",
cleanup: "delete",
createdAt: Date.now(),
startedAt: Date.now(),
endedAt: Date.now() + 30000,
outcome: { status: "ok" },
archiveAtMs: Date.now() + 3600000,
cleanupHandled: true,
cleanupCompletedAt: Date.now() + 30100,
};
// Serialize and deserialize
const json = JSON.stringify({ version: 1, runs: { "run-123": record } });
const parsed = JSON.parse(json);
expect(parsed.version).toBe(1);
expect(parsed.runs["run-123"]).toEqual(record);
});
it("handles record with minimal fields", () => {
const record: SubagentRunRecord = {
runId: "run-minimal",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Do something",
cleanup: "keep",
createdAt: Date.now(),
};
const json = JSON.stringify({ version: 1, runs: { "run-minimal": record } });
const parsed = JSON.parse(json);
expect(parsed.runs["run-minimal"].runId).toBe("run-minimal");
expect(parsed.runs["run-minimal"].outcome).toBeUndefined();
expect(parsed.runs["run-minimal"].label).toBeUndefined();
});
it("handles error outcome serialization", () => {
const record: SubagentRunRecord = {
runId: "run-err",
childSessionId: "child-err",
requesterSessionId: "parent-1",
task: "Fail",
cleanup: "delete",
createdAt: Date.now(),
outcome: { status: "error", error: "Something went wrong" },
};
const json = JSON.stringify(record);
const parsed = JSON.parse(json) as SubagentRunRecord;
expect(parsed.outcome?.status).toBe("error");
expect(parsed.outcome?.error).toBe("Something went wrong");
});
it("round-trips coalescing fields (findings, findingsCaptured, announced)", () => {
const record: SubagentRunRecord = {
runId: "run-coalesce",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Coalesce test",
cleanup: "delete",
createdAt: Date.now(),
endedAt: Date.now() + 5000,
outcome: { status: "ok" },
findings: "Found 3 issues in auth module.",
findingsCaptured: true,
announced: true,
};
const json = JSON.stringify({ version: 1, runs: { "run-coalesce": record } });
const parsed = JSON.parse(json);
const restored = parsed.runs["run-coalesce"] as SubagentRunRecord;
expect(restored.findings).toBe("Found 3 issues in auth module.");
expect(restored.findingsCaptured).toBe(true);
expect(restored.announced).toBe(true);
});
it("round-trips record with undefined coalescing fields", () => {
const record: SubagentRunRecord = {
runId: "run-old",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Old record",
cleanup: "delete",
createdAt: Date.now(),
cleanupHandled: true,
// No findings, findingsCaptured, or announced fields (old format)
};
const json = JSON.stringify({ version: 1, runs: { "run-old": record } });
const parsed = JSON.parse(json);
const restored = parsed.runs["run-old"] as SubagentRunRecord;
expect(restored.findings).toBeUndefined();
expect(restored.findingsCaptured).toBeUndefined();
expect(restored.announced).toBeUndefined();
expect(restored.cleanupHandled).toBe(true);
});
});

View file

@ -1,80 +0,0 @@
/**
* Persistent storage for subagent run records.
*
* File: ~/.super-multica/subagents/runs.json
*/
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { DATA_DIR } from "@multica/utils";
import type { SubagentRunRecord, SubagentGroup } from "./types.js";
const SUBAGENTS_DIR = join(DATA_DIR, "subagents");
const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
interface SubagentRunsStore {
version: 1;
runs: Record<string, SubagentRunRecord>;
groups?: Record<string, SubagentGroup> | undefined;
}
function ensureDir(): void {
if (!existsSync(SUBAGENTS_DIR)) {
mkdirSync(SUBAGENTS_DIR, { recursive: true });
}
}
/** Get the path to the subagent store file (for testing) */
export function getSubagentStorePath(): string {
return RUNS_FILE;
}
/** Load all persisted subagent runs */
export function loadSubagentRuns(): Map<string, SubagentRunRecord> {
if (!existsSync(RUNS_FILE)) return new Map();
try {
const content = readFileSync(RUNS_FILE, "utf-8");
const store = JSON.parse(content) as SubagentRunsStore;
if (store.version !== 1) {
console.warn(`[SubagentStore] Unknown store version: ${store.version}, ignoring`);
return new Map();
}
return new Map(Object.entries(store.runs));
} catch (err) {
console.warn(`[SubagentStore] Failed to load runs:`, err);
return new Map();
}
}
/** Load all persisted subagent groups */
export function loadSubagentGroups(): Map<string, SubagentGroup> {
if (!existsSync(RUNS_FILE)) return new Map();
try {
const content = readFileSync(RUNS_FILE, "utf-8");
const store = JSON.parse(content) as SubagentRunsStore;
if (store.version !== 1 || !store.groups) return new Map();
return new Map(Object.entries(store.groups));
} catch {
return new Map();
}
}
/** Save all subagent runs and groups to disk */
export function saveSubagentRuns(
runs: Map<string, SubagentRunRecord>,
groups?: Map<string, SubagentGroup>,
): void {
ensureDir();
const store: SubagentRunsStore = {
version: 1,
runs: Object.fromEntries(runs),
groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined,
};
writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8");
}

View file

@ -1,405 +0,0 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import {
registerSubagentRun,
listSubagentRuns,
getSubagentRun,
releaseSubagentRun,
resetSubagentRegistryForTests,
shutdownSubagentRegistry,
} from "./registry.js";
import { resetLanesForTests } from "./command-queue.js";
// Note: These tests exercise the registry's in-memory state management.
// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent).
/** Wait for the command queue to process enqueued tasks. */
const flushQueue = () => new Promise<void>((r) => setTimeout(r, 0));
beforeEach(() => {
resetSubagentRegistryForTests();
resetLanesForTests();
});
describe("subagent registry", () => {
it("registers a run and retrieves it by ID", async () => {
const record = registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Analyze code",
label: "Code Analysis",
});
expect(record.runId).toBe("run-1");
expect(record.childSessionId).toBe("child-1");
expect(record.requesterSessionId).toBe("parent-1");
expect(record.task).toBe("Analyze code");
expect(record.label).toBe("Code Analysis");
expect(record.cleanup).toBe("delete"); // default
expect(record.createdAt).toBeGreaterThan(0);
await flushQueue();
expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent (async via queue)
const retrieved = getSubagentRun("run-1");
expect(retrieved).toBe(record);
});
it("lists runs filtered by requester session", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-A",
task: "Task 1",
});
registerSubagentRun({
runId: "run-2",
childSessionId: "child-2",
requesterSessionId: "parent-B",
task: "Task 2",
});
registerSubagentRun({
runId: "run-3",
childSessionId: "child-3",
requesterSessionId: "parent-A",
task: "Task 3",
});
const parentARuns = listSubagentRuns("parent-A");
expect(parentARuns).toHaveLength(2);
expect(parentARuns.map((r) => r.runId).sort()).toEqual(["run-1", "run-3"]);
const parentBRuns = listSubagentRuns("parent-B");
expect(parentBRuns).toHaveLength(1);
expect(parentBRuns[0]!.runId).toBe("run-2");
const emptyRuns = listSubagentRuns("parent-C");
expect(emptyRuns).toHaveLength(0);
});
it("releases a run from the registry", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task",
});
expect(getSubagentRun("run-1")).toBeDefined();
const released = releaseSubagentRun("run-1");
expect(released).toBe(true);
expect(getSubagentRun("run-1")).toBeUndefined();
// Double release returns false
const releasedAgain = releaseSubagentRun("run-1");
expect(releasedAgain).toBe(false);
});
it("applies custom cleanup value", () => {
const record = registerSubagentRun({
runId: "run-keep",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Keep session",
cleanup: "keep",
});
expect(record.cleanup).toBe("keep");
});
it("registers a run and ends it with error when Hub is not available", async () => {
// Without Hub initialized, watchChildAgent detects missing Hub
// and immediately ends the run with an error
registerSubagentRun({
runId: "run-no-hub",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Running task",
});
await flushQueue();
const record = getSubagentRun("run-no-hub");
expect(record?.startedAt).toBeGreaterThan(0);
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.outcome?.status).toBe("error");
expect(record?.outcome?.error).toContain("Hub not initialized");
});
it("shutdownSubagentRegistry marks unfinished runs as ended", async () => {
// Directly set up a record without going through watchChildAgent
// to simulate a run that is still active
registerSubagentRun({
runId: "run-active",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Running task",
});
await flushQueue();
// The above run already ended due to no Hub; reset its endedAt
// to simulate a truly active run
const record = getSubagentRun("run-active");
if (record) {
record.endedAt = undefined;
record.outcome = undefined;
}
shutdownSubagentRegistry();
const after = getSubagentRun("run-active");
expect(after?.endedAt).toBeGreaterThan(0);
expect(after?.outcome?.status).toBe("unknown");
});
it("resetSubagentRegistryForTests clears all state", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task",
});
expect(listSubagentRuns("parent-1")).toHaveLength(1);
resetSubagentRegistryForTests();
expect(listSubagentRuns("parent-1")).toHaveLength(0);
expect(getSubagentRun("run-1")).toBeUndefined();
});
});
describe("subagent registry — coalescing", () => {
// Without Hub, watchChildAgent ends runs immediately with "Hub not initialized".
// This allows us to test the coalescing state transitions.
it("captures findings when a run completes (no Hub)", async () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task 1",
});
await flushQueue();
const record = getSubagentRun("run-1");
// Run ended immediately due to no Hub
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.findingsCaptured).toBe(true);
});
it("does not announce while sibling runs are still pending", async () => {
// Register first run — ends immediately (no Hub)
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task 1",
});
await flushQueue();
const record1 = getSubagentRun("run-1");
expect(record1?.findingsCaptured).toBe(true);
// Register second run — also ends immediately
registerSubagentRun({
runId: "run-2",
childSessionId: "child-2",
requesterSessionId: "parent-1",
task: "Task 2",
});
await flushQueue();
const record2 = getSubagentRun("run-2");
expect(record2?.findingsCaptured).toBe(true);
// Both ended, but announce fails because no Hub for parent agent.
// The key check: both records should have findings captured.
// announced will be false because runCoalescedAnnounceFlow fails (no Hub).
expect(record1?.announced).toBeUndefined();
expect(record2?.announced).toBeUndefined();
});
it("single run captures findings immediately", async () => {
registerSubagentRun({
runId: "run-solo",
childSessionId: "child-solo",
requesterSessionId: "parent-solo",
task: "Solo task",
});
await flushQueue();
const record = getSubagentRun("run-solo");
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.findingsCaptured).toBe(true);
expect(record?.outcome?.status).toBe("error");
expect(record?.outcome?.error).toContain("Hub not initialized");
});
it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", async () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task",
});
await flushQueue();
const record = getSubagentRun("run-1");
if (record) {
// Simulate: run ended but findings not yet captured
record.endedAt = Date.now();
record.outcome = { status: "ok" };
record.findingsCaptured = undefined;
}
shutdownSubagentRegistry();
expect(record?.findingsCaptured).toBe(true);
});
});
describe("subagent registry — silent announce mode", () => {
// Note: In tests (no Hub), watchChildAgent completes synchronously within
// registerSubagentRun(), so each run's lifecycle finishes before the next
// registration call. Multi-run coalescing requires async child agents and
// is validated in integration tests.
it("stores announce field on the record", () => {
const record = registerSubagentRun({
runId: "run-ann",
childSessionId: "child-ann",
requesterSessionId: "parent-1",
task: "Task",
announce: "silent",
});
expect(record.announce).toBe("silent");
});
it("defaults announce to undefined (immediate behavior)", () => {
const record = registerSubagentRun({
runId: "run-def",
childSessionId: "child-def",
requesterSessionId: "parent-1",
task: "Task",
});
expect(record.announce).toBeUndefined();
});
it("silent runs are announced via runCoalescedAnnounceFlow", async () => {
const announceModule = await import("./announce.js");
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
registerSubagentRun({
runId: "run-s1",
childSessionId: "child-s1",
requesterSessionId: "parent-1",
task: "Silent A",
announce: "silent",
});
await flushQueue();
// Silent run announced (via runCoalescedAnnounceFlow mock)
const silentCalls = spy.mock.calls.filter(
([reqId, records]) =>
reqId === "parent-1" &&
records.some((r: { announce?: string }) => r.announce === "silent"),
);
expect(silentCalls.length).toBeGreaterThanOrEqual(1);
const runS1 = getSubagentRun("run-s1");
expect(runS1?.announced).toBe(true);
expect(runS1?.announce).toBe("silent");
spy.mockRestore();
});
it("immediate and silent runs are never mixed in the same announce call", async () => {
const announceModule = await import("./announce.js");
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
// Register immediate run, then silent run
registerSubagentRun({
runId: "run-imm",
childSessionId: "child-imm",
requesterSessionId: "parent-1",
task: "Immediate task",
});
registerSubagentRun({
runId: "run-s1",
childSessionId: "child-s1",
requesterSessionId: "parent-1",
task: "Silent task",
announce: "silent",
});
await flushQueue();
const calls = spy.mock.calls.filter(
([reqId]) => reqId === "parent-1",
);
// Immediate and silent should never be in the same announce call
const mixedCalls = calls.filter(([, records]) => {
const hasImm = records.some((r: { announce?: string }) => r.announce !== "silent");
const hasSilent = records.some((r: { announce?: string }) => r.announce === "silent");
return hasImm && hasSilent;
});
expect(mixedCalls).toHaveLength(0);
// Both should be announced (in separate calls)
expect(getSubagentRun("run-imm")?.announced).toBe(true);
expect(getSubagentRun("run-s1")?.announced).toBe(true);
spy.mockRestore();
});
});
describe("subagent registry — post-announce cleanup", () => {
it("keeps runs in registry after successful announcement with archiveAtMs", async () => {
// Mock runCoalescedAnnounceFlow to succeed
const announceModule = await import("./announce.js");
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
// Register two runs for the same parent — both end immediately (no Hub)
registerSubagentRun({
runId: "run-a",
childSessionId: "child-a",
requesterSessionId: "parent-1",
task: "Task A",
});
registerSubagentRun({
runId: "run-b",
childSessionId: "child-b",
requesterSessionId: "parent-1",
task: "Task B",
});
await flushQueue();
// Both runs should have been announced but kept in registry with archiveAtMs
expect(spy).toHaveBeenCalled();
const runA = getSubagentRun("run-a");
const runB = getSubagentRun("run-b");
expect(runA).toBeDefined();
expect(runB).toBeDefined();
expect(runA!.announced).toBe(true);
expect(runB!.announced).toBe(true);
expect(runA!.archiveAtMs).toBeGreaterThan(Date.now());
expect(runB!.archiveAtMs).toBeGreaterThan(Date.now());
// Records are still queryable
expect(listSubagentRuns("parent-1")).toHaveLength(2);
spy.mockRestore();
});
});

View file

@ -1,524 +0,0 @@
/**
* Subagent registry in-memory tracking + lifecycle management.
*
* Tracks all active subagent runs, persists state to disk,
* watches for child completion, and triggers announce flow.
*/
import { getHub, isHubInitialized } from "../../hub/hub-singleton.js";
import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js";
import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js";
import type {
RegisterSubagentRunParams,
SubagentRunRecord,
SubagentGroup,
} from "./types.js";
import { resolveSessionDir } from "../session/storage.js";
import { rmSync } from "node:fs";
import { enqueueInLane, setLaneConcurrency } from "./command-queue.js";
import { SubagentLane, DEFAULT_SUBAGENT_MAX_CONCURRENT, resolveSubagentTimeoutMs } from "./lanes.js";
/** Default archive retention: 60 minutes after completion */
const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000;
/** Archive sweep interval: 60 seconds */
const SWEEP_INTERVAL_MS = 60 * 1000;
// ============================================================================
// Module-level state
// ============================================================================
const subagentRuns = new Map<string, SubagentRunRecord>();
const subagentGroups = new Map<string, SubagentGroup>();
let sweepTimer: ReturnType<typeof setInterval> | undefined;
const resumedRequesters = new Set<string>();
// ============================================================================
// Public API
// ============================================================================
/** Initialize registry from persisted state. Call once at startup. */
export function initSubagentRegistry(): void {
setLaneConcurrency(SubagentLane.Subagent, DEFAULT_SUBAGENT_MAX_CONCURRENT);
const persisted = loadSubagentRuns();
for (const [runId, record] of persisted) {
subagentRuns.set(runId, record);
// Backward compat: old records with cleanupHandled but no announced field
if (record.cleanupHandled && record.announced === undefined) {
record.announced = true;
record.findingsCaptured = true;
}
}
// Restore groups
const persistedGroups = loadSubagentGroups();
for (const [groupId, group] of persistedGroups) {
subagentGroups.set(groupId, group);
}
// Process incomplete runs
const affectedRequesters = new Set<string>();
for (const record of subagentRuns.values()) {
if (record.announced && record.cleanupHandled) continue; // Already fully done
if (!record.endedAt) {
// Child was running when process crashed — mark as ended/unknown
record.endedAt = Date.now();
record.outcome = { status: "unknown" };
}
if (!record.findingsCaptured) {
captureFindings(record);
}
// Recovery cleanup must be independent from findings capture:
// the process may crash after captureFindings() persisted but before deletion.
if (record.cleanup === "delete" && !record.cleanupHandled) {
deleteChildSession(record.childSessionId);
}
affectedRequesters.add(record.requesterSessionId);
}
persist();
// For each affected requester, check if coalesced announcement is needed
for (const requesterId of affectedRequesters) {
if (!resumedRequesters.has(requesterId)) {
resumedRequesters.add(requesterId);
checkAndAnnounce(requesterId);
}
}
if (subagentRuns.size > 0) {
startSweeper();
console.log(`[SubagentRegistry] Loaded ${subagentRuns.size} persisted run(s)`);
}
}
// ============================================================================
// Group management
// ============================================================================
/** Create a new subagent group. Returns the group record. */
export function createSubagentGroup(params: {
groupId: string;
requesterSessionId: string;
label?: string;
next?: string;
}): SubagentGroup {
const group: SubagentGroup = {
groupId: params.groupId,
requesterSessionId: params.requesterSessionId,
label: params.label,
next: params.next,
createdAt: Date.now(),
};
subagentGroups.set(params.groupId, group);
persist();
return group;
}
/** Get a group by ID. */
export function getSubagentGroup(groupId: string): SubagentGroup | undefined {
return subagentGroups.get(groupId);
}
/** List all runs belonging to a group. */
export function listGroupRuns(groupId: string): SubagentRunRecord[] {
const result: SubagentRunRecord[] = [];
for (const record of subagentRuns.values()) {
if (record.groupId === groupId) {
result.push(record);
}
}
return result;
}
/** Register a new subagent run and start tracking its lifecycle. */
export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord {
const {
runId,
childSessionId,
requesterSessionId,
task,
label,
cleanup = "delete",
timeoutSeconds,
announce,
groupId,
start,
} = params;
const record: SubagentRunRecord = {
runId,
childSessionId,
requesterSessionId,
task,
label,
cleanup,
announce,
groupId,
createdAt: Date.now(),
};
subagentRuns.set(runId, record);
persist();
startSweeper();
// Enqueue in the subagent lane — the start callback and watchChildAgent
// only execute once a concurrency slot is available.
void enqueueInLane(SubagentLane.Subagent, async () => {
console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`);
start?.();
console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`);
return watchChildAgent(record, timeoutSeconds);
});
return record;
}
/** List all active runs for a given requester session. */
export function listSubagentRuns(requesterSessionId: string): SubagentRunRecord[] {
const result: SubagentRunRecord[] = [];
for (const record of subagentRuns.values()) {
if (record.requesterSessionId === requesterSessionId) {
result.push(record);
}
}
return result;
}
/** Remove a run from the registry. */
export function releaseSubagentRun(runId: string): boolean {
const deleted = subagentRuns.delete(runId);
if (deleted) {
persist();
if (subagentRuns.size === 0) {
stopSweeper();
}
}
return deleted;
}
/** Get a run by ID. */
export function getSubagentRun(runId: string): SubagentRunRecord | undefined {
return subagentRuns.get(runId);
}
/** Mark all active (non-ended) runs as ended with "unknown" status. Called during Hub shutdown. */
export function shutdownSubagentRegistry(): void {
const now = Date.now();
let updated = 0;
for (const record of subagentRuns.values()) {
if (!record.endedAt) {
record.endedAt = now;
record.outcome = { status: "unknown" };
updated++;
}
// Opportunistically capture findings for ended-but-uncaptured runs
if (record.endedAt && !record.findingsCaptured) {
captureFindings(record);
updated++;
}
}
if (updated > 0) {
persist();
console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`);
}
stopSweeper();
}
/** Reset all state (for testing). */
export function resetSubagentRegistryForTests(): void {
subagentRuns.clear();
subagentGroups.clear();
resumedRequesters.clear();
stopSweeper();
}
/** Seed a run record directly (for testing). Bypasses persistence and side effects. */
export function seedSubagentRunForTests(record: SubagentRunRecord): void {
subagentRuns.set(record.runId, record);
}
// ============================================================================
// Lifecycle watching
// ============================================================================
/**
* Watch a child agent for completion.
* Returns a promise that resolves when the child finishes (or errors/times out),
* keeping the command-queue lane slot occupied until then.
*/
function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Promise<void> {
const { childSessionId } = record;
// Mark as started
record.startedAt = Date.now();
persist();
const timeoutMs = resolveSubagentTimeoutMs(timeoutSeconds);
return new Promise<void>((resolveSlot) => {
const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => {
if (record.endedAt) return; // Already finalized
if (timeoutTimer) clearTimeout(timeoutTimer);
record.endedAt = Date.now();
record.outcome = outcome;
persist();
handleRunCompletion(record);
resolveSlot(); // Release the queue slot
};
// Always set a timeout (default 30 min, 0 = ~24 days via resolveSubagentTimeoutMs)
const timeoutTimer = setTimeout(() => {
cleanup({ status: "timeout" });
// Try to close the child agent
try {
const hub = getHub();
hub.closeAgent(childSessionId);
} catch {
// Hub may not be available
}
}, timeoutMs);
// Get child agent reference (Hub may not be available in tests)
if (!isHubInitialized()) {
cleanup({ status: "error", error: "Hub not initialized" });
return;
}
const hub = getHub();
const childAgent = hub.getAgent(childSessionId);
if (!childAgent) {
cleanup({ status: "error", error: "Child agent not found" });
return;
}
// Wait for the child agent's task queue to drain (task completion),
// then trigger announce flow. Uses waitForIdle() instead of consuming
// the stream (which would conflict with Hub.consumeAgent).
console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`);
childAgent.waitForIdle().then(
() => {
const runtime = Date.now() - (record.startedAt ?? 0);
const runError = childAgent.lastRunError;
if (runError) {
console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`);
cleanup({ status: "error", error: runError });
} else {
console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`);
cleanup({ status: "ok" });
}
},
(err) => {
console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err);
cleanup({
status: "error",
error: err instanceof Error ? err.message : String(err),
});
},
);
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
childAgent.onClose(() => {
cleanup({ status: record.outcome?.status ?? "unknown" });
});
});
}
// ============================================================================
// Cleanup + Announce (two-phase: capture findings, then coalesced announce)
// ============================================================================
/** Phase 1: Capture child's findings before session deletion. */
function captureFindings(record: SubagentRunRecord): void {
try {
const findings = readLatestAssistantReply(record.childSessionId);
record.findings = findings ?? undefined;
} catch {
record.findings = "(failed to read findings)";
}
record.findingsCaptured = true;
persist();
}
/**
* Phase 2: Announce completed-but-unannounced runs.
*
* Three announcement paths:
* 1. Grouped runs wait for all runs in the group to complete, then announce
* together with the group's `next` continuation prompt (if any).
* 2. Ungrouped silent runs legacy behavior: wait for ALL silent runs from
* the same requester to complete, then announce together.
* 3. Ungrouped immediate runs announce per-completion (default).
*/
function checkAndAnnounce(requesterSessionId: string): void {
const allRuns = listSubagentRuns(requesterSessionId);
// ── 1. Grouped runs: announce by group when all members complete ──
const groupIds = new Set<string>();
for (const r of allRuns) {
if (r.groupId && !r.announced) groupIds.add(r.groupId);
}
for (const groupId of groupIds) {
const groupRuns = allRuns.filter(r => r.groupId === groupId);
const unannounced = groupRuns.filter(r => !r.announced);
const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured);
if (ready.length > 0 && ready.length === unannounced.length) {
const group = subagentGroups.get(groupId);
announceRuns(requesterSessionId, ready, group?.next);
}
}
// ── 2. Ungrouped runs: original immediate/silent logic ──
const ungrouped = allRuns.filter(r => !r.groupId);
// Immediate: announce per-completion
const immediateReady = ungrouped.filter(
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent",
);
if (immediateReady.length > 0) {
announceRuns(requesterSessionId, immediateReady);
}
// Silent: announce only when ALL ungrouped silent runs are done
const silentRuns = ungrouped.filter(r => r.announce === "silent");
const unannouncedSilent = silentRuns.filter(r => !r.announced);
const silentReady = unannouncedSilent.filter(
r => r.endedAt !== undefined && r.findingsCaptured,
);
if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) {
announceRuns(requesterSessionId, silentReady);
}
}
/** Announce a batch of completed runs and mark them as announced. */
function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void {
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next);
if (announced) {
for (const r of runs) {
r.announced = true;
r.cleanupHandled = true;
// Keep records for querying via sessions_list; let sweeper archive later
r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
}
persist();
} else {
// Allow retry — mark cleanupHandled false so initSubagentRegistry() retries
for (const r of runs) {
r.cleanupHandled = false;
}
persist();
console.warn(
`[SubagentRegistry] Announce failed for requester ${requesterSessionId}`,
);
}
}
/** Entry point: called when a child completes. */
function handleRunCompletion(record: SubagentRunRecord): void {
// Phase 1: capture findings (before session deletion)
if (!record.findingsCaptured) {
captureFindings(record);
// Session cleanup (safe now that findings are persisted)
if (record.cleanup === "delete") {
deleteChildSession(record.childSessionId);
}
}
// Phase 2: coalesced announce check
checkAndAnnounce(record.requesterSessionId);
}
function deleteChildSession(sessionId: string): void {
try {
const sessionDir = resolveSessionDir(sessionId);
rmSync(sessionDir, { recursive: true, force: true });
console.log(`[SubagentRegistry] Deleted child session: ${sessionId}`);
} catch (err) {
console.warn(`[SubagentRegistry] Failed to delete child session ${sessionId}:`, err);
}
// Also close the agent in Hub
try {
const hub = getHub();
hub.closeAgent(sessionId);
} catch {
// Hub may not be available
}
}
// ============================================================================
// Archive sweeper
// ============================================================================
function startSweeper(): void {
if (sweepTimer) return;
sweepTimer = setInterval(sweep, SWEEP_INTERVAL_MS);
// Don't prevent process exit
if (sweepTimer.unref) sweepTimer.unref();
}
function stopSweeper(): void {
if (sweepTimer) {
clearInterval(sweepTimer);
sweepTimer = undefined;
}
}
function sweep(): void {
const now = Date.now();
let removed = 0;
for (const [runId, record] of subagentRuns) {
if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) {
subagentRuns.delete(runId);
removed++;
}
}
// Clean up groups whose runs have all been archived
for (const [groupId] of subagentGroups) {
const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId);
if (!hasActiveRuns) {
subagentGroups.delete(groupId);
removed++;
}
}
if (removed > 0) {
persist();
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`);
}
if (subagentRuns.size === 0) {
stopSweeper();
}
}
// ============================================================================
// Persistence helper
// ============================================================================
function persist(): void {
try {
saveSubagentRuns(subagentRuns, subagentGroups);
} catch (err) {
console.error(`[SubagentRegistry] Failed to persist runs:`, err);
}
}

View file

@ -1,118 +0,0 @@
/**
* Subagent orchestration types.
*
* Models the lifecycle of spawned child agents:
* created started ended cleanup
*/
/** Final outcome of a subagent run */
export type SubagentRunOutcome = {
status: "ok" | "error" | "timeout" | "unknown";
error?: string | undefined;
};
/**
* A logical group of subagent runs that are tracked together.
* Groups enable "collect all, then act" workflows:
* all runs in a group must complete before the combined results
* (plus an optional `next` continuation) are announced to the parent.
*/
export type SubagentGroup = {
/** Unique group identifier (UUIDv7) */
groupId: string;
/** Session ID of the parent (requester) agent */
requesterSessionId: string;
/** Optional human-readable label for the group */
label?: string | undefined;
/** Continuation prompt executed after all runs in the group complete.
* Injected into the announcement so the parent agent acts on the combined findings. */
next?: string | undefined;
/** Timestamp when the group was created */
createdAt: number;
};
/** Persistent record tracking a single subagent run */
export type SubagentRunRecord = {
/** Unique run identifier (UUIDv7) */
runId: string;
/** Session ID of the child agent */
childSessionId: string;
/** Session ID of the parent (requester) agent */
requesterSessionId: string;
/** The task description / prompt given to the child */
task: string;
/** Optional human-readable label */
label?: string | undefined;
/** Session cleanup strategy after completion */
cleanup: "delete" | "keep";
/** Timestamp when the run was created */
createdAt: number;
/** Timestamp when the child agent started execution */
startedAt?: number | undefined;
/** Timestamp when the child agent finished */
endedAt?: number | undefined;
/** Final status of the run */
outcome?: SubagentRunOutcome | undefined;
/** Scheduled auto-archive time (ms since epoch) */
archiveAtMs?: number | undefined;
/** Whether the cleanup/announce flow has been initiated */
cleanupHandled?: boolean | undefined;
/** Timestamp when cleanup completed */
cleanupCompletedAt?: number | undefined;
/** Captured findings from the child session's last assistant reply */
findings?: string | undefined;
/** Whether findings have been captured (safe to delete session after this) */
findingsCaptured?: boolean | undefined;
/** Whether the coalesced announcement has been sent to parent */
announced?: boolean | undefined;
/** Announcement mode: "immediate" (default) announces per-completion,
* "silent" defers until all silent runs from the same requester complete. */
announce?: "immediate" | "silent" | undefined;
/** Group ID this run belongs to (if any). Runs in a group are announced
* together when all complete, regardless of the `announce` field. */
groupId?: string | undefined;
};
/** Parameters for registering a new subagent run */
export type RegisterSubagentRunParams = {
runId: string;
childSessionId: string;
requesterSessionId: string;
task: string;
label?: string | undefined;
cleanup?: "delete" | "keep" | undefined;
timeoutSeconds?: number | undefined;
/** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */
start?: (() => void) | undefined;
/** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */
announce?: "immediate" | "silent" | undefined;
/** Group ID to join. Runs in a group are announced together when all complete. */
groupId?: string | undefined;
/** Continuation prompt for the group. Only used on group creation (first spawn).
* After all runs in the group complete, this prompt is included in the announcement
* so the parent agent can act on the combined findings (e.g. summarize, write PDF). */
next?: string | undefined;
};
/** Parameters for the announce flow */
export type SubagentAnnounceParams = {
runId: string;
childSessionId: string;
requesterSessionId: string;
task: string;
label?: string | undefined;
cleanup: "delete" | "keep";
outcome?: SubagentRunOutcome | undefined;
startedAt?: number | undefined;
endedAt?: number | undefined;
};
/** Parameters for building the subagent system prompt */
export type SubagentSystemPromptParams = {
requesterSessionId: string;
childSessionId: string;
label?: string | undefined;
task: string;
/** Tool names available to the subagent (for tooling summary in system prompt) */
tools?: string[] | undefined;
};

View file

@ -1,8 +1,8 @@
/**
* Global Hub singleton for cross-module access.
*
* Used by subagent tools and announce flow to interact with the Hub
* without threading references through the entire call chain.
* Used by modules like cron execution without threading Hub references
* through the entire call chain.
*/
import type { Hub } from "./hub.js";

View file

@ -17,7 +17,6 @@ import { AsyncAgent } from "../agent/async-agent.js";
import type { AgentOptions } from "../agent/types.js";
import { getHubId } from "./hub-identity.js";
import { setHub } from "./hub-singleton.js";
import { initSubagentRegistry, shutdownSubagentRegistry } from "../agent/subagent/index.js";
import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js";
import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js";
import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js";
@ -144,12 +143,9 @@ export class Hub {
});
this.rpc.register("resolveExecApproval", createResolveExecApprovalHandler(this.approvalManager));
// Register as global singleton for cross-module access (subagent tools, announce flow)
// Register as global singleton for cross-module access.
setHub(this);
// Restore subagent registry from persistent state
initSubagentRegistry();
// Initialize and start cron service
this.initCronService();
this.initHeartbeatService();
@ -800,9 +796,6 @@ export class Hub {
this.heartbeatUnsubscribe = null;
this.heartbeatListeners.clear();
// Finalize subagent registry before closing agents
shutdownSubagentRegistry();
for (const [id, agent] of this.agents) {
agent.close();
this.agents.delete(id);