From 909efb5dabb32e593aa5907734d15cf78c789b33 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 17 Feb 2026 00:07:15 +0800 Subject: [PATCH 1/2] refactor(core): remove legacy subagent registry subsystem --- packages/core/src/agent/index.ts | 12 - packages/core/src/agent/subagent/README.md | 172 ------ .../agent/subagent/announce-findings.test.ts | 79 --- .../src/agent/subagent/announce-queue.test.ts | 203 ------- .../core/src/agent/subagent/announce-queue.ts | 315 ----------- .../core/src/agent/subagent/announce.test.ts | 294 ---------- packages/core/src/agent/subagent/announce.ts | 424 -------------- .../src/agent/subagent/command-queue.test.ts | 117 ---- .../core/src/agent/subagent/command-queue.ts | 158 ------ packages/core/src/agent/subagent/index.ts | 40 -- packages/core/src/agent/subagent/lanes.ts | 37 -- .../agent/subagent/registry-recovery.test.ts | 76 --- .../src/agent/subagent/registry-store.test.ts | 127 ----- .../core/src/agent/subagent/registry-store.ts | 80 --- .../core/src/agent/subagent/registry.test.ts | 405 -------------- packages/core/src/agent/subagent/registry.ts | 524 ------------------ packages/core/src/agent/subagent/types.ts | 118 ---- packages/core/src/hub/hub-singleton.ts | 4 +- packages/core/src/hub/hub.ts | 9 +- 19 files changed, 3 insertions(+), 3191 deletions(-) delete mode 100644 packages/core/src/agent/subagent/README.md delete mode 100644 packages/core/src/agent/subagent/announce-findings.test.ts delete mode 100644 packages/core/src/agent/subagent/announce-queue.test.ts delete mode 100644 packages/core/src/agent/subagent/announce-queue.ts delete mode 100644 packages/core/src/agent/subagent/announce.test.ts delete mode 100644 packages/core/src/agent/subagent/announce.ts delete mode 100644 packages/core/src/agent/subagent/command-queue.test.ts delete mode 100644 packages/core/src/agent/subagent/command-queue.ts delete mode 100644 packages/core/src/agent/subagent/index.ts delete mode 100644 packages/core/src/agent/subagent/lanes.ts delete mode 100644 packages/core/src/agent/subagent/registry-recovery.test.ts delete mode 100644 packages/core/src/agent/subagent/registry-store.test.ts delete mode 100644 packages/core/src/agent/subagent/registry-store.ts delete mode 100644 packages/core/src/agent/subagent/registry.test.ts delete mode 100644 packages/core/src/agent/subagent/registry.ts delete mode 100644 packages/core/src/agent/subagent/types.ts diff --git a/packages/core/src/agent/index.ts b/packages/core/src/agent/index.ts index 01488856..b391a2c2 100644 --- a/packages/core/src/agent/index.ts +++ b/packages/core/src/agent/index.ts @@ -13,18 +13,6 @@ export * from "./tools.js"; export * from "./tools/policy.js"; export * from "./tools/groups.js"; export * from "./extract-text.js"; -// @deprecated — Old subagent registry. Use `delegate` tool instead. -// Kept temporarily for desktop app compatibility. -export { - listSubagentRuns, - getSubagentRun, - getSubagentGroup, -} from "./subagent/registry.js"; -export type { - SubagentRunRecord, - SubagentRunOutcome, - SubagentGroup, -} from "./subagent/types.js"; export { readClaudeCliCredentials, diff --git a/packages/core/src/agent/subagent/README.md b/packages/core/src/agent/subagent/README.md deleted file mode 100644 index 9455cde9..00000000 --- a/packages/core/src/agent/subagent/README.md +++ /dev/null @@ -1,172 +0,0 @@ -# Subagent System - -The subagent system allows a parent agent to spawn isolated child agents that run tasks in parallel and report results back automatically. - -## Architecture Overview - -``` -┌─────────────────────────────────────────────────────────────────────┐ -│ Parent Agent (runner.ts) │ -│ │ -│ tools: sessions_spawn, sessions_list │ -│ state: resolvedProvider, toolsOptions │ -└──────────┬──────────────────────────────────────────────────────────┘ - │ - │ sessions_spawn(task, label, timeoutSeconds) - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Spawn Flow (sessions-spawn.ts) │ -│ │ -│ 1. Build subagent system prompt (announce.ts) │ -│ 2. hub.createSubagent(childSessionId, { provider, model }) │ -│ 3. registerSubagentRun({ start: () => childAgent.write(task) }) │ -│ 4. Return { status: "accepted", runId, childSessionId } │ -└──────────┬──────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Concurrency Queue (command-queue.ts) │ -│ │ -│ Lane: "subagent" — max 10 concurrent (configurable) │ -│ Queued runs wait for a slot before start() is called │ -└──────────┬──────────────────────────────────────────────────────────┘ - │ slot acquired - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Child Agent Execution │ -│ │ -│ ┌───────────────────────────────────────────────────────────────┐ │ -│ │ AsyncAgent (async-agent.ts) │ │ -│ │ - Isolated session with restricted tools (isSubagent=true) │ │ -│ │ - Inherits parent's LLM provider │ │ -│ │ - System prompt: task focus + error reporting rules │ │ -│ │ - Tracks lastRunError for error propagation │ │ -│ └───────────────────────────────────────────────────────────────┘ │ -│ │ -│ ┌───────────────────────────────────────────────────────────────┐ │ -│ │ watchChildAgent (registry.ts) │ │ -│ │ - Sets startedAt, starts timeout timer │ │ -│ │ - waitForIdle() — waits for child's task queue to drain │ │ -│ │ - onClose() — handles explicit close (timeout kill, etc.) │ │ -│ └───────────────────────────────────────────────────────────────┘ │ -└──────────┬──────────────────────────────────────────────────────────┘ - │ - │ child completes / errors / times out - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Completion Handling (registry.ts) │ -│ │ -│ handleRunCompletion(record) │ -│ │ │ -│ ├─ Phase 1: captureFindings() │ -│ │ - Read last assistant reply from child session JSONL │ -│ │ - Falls back to last toolResult if no assistant text │ -│ │ - Persists findings to record before session deletion │ -│ │ │ -│ ├─ Session Cleanup │ -│ │ - cleanup="delete": rm child session dir + hub.closeAgent() │ -│ │ - cleanup="keep": preserve for audit │ -│ │ │ -│ └─ Phase 2: checkAndAnnounce(requesterSessionId) │ -│ - Finds all unannounced, completed runs with findings │ -│ - Calls runCoalescedAnnounceFlow() │ -│ - Marks records: announced=true, archiveAtMs=now+60min │ -└──────────┬──────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Announcement Delivery (announce.ts) │ -│ │ -│ runCoalescedAnnounceFlow(requesterSessionId, records) │ -│ │ │ -│ ├─ Format message: formatCoalescedAnnouncementMessage() │ -│ │ - Single record: task name, status, findings, stats │ -│ │ - Multiple records: combined report with all findings │ -│ │ │ -│ ├─ Two-tier delivery: │ -│ │ │ -│ │ Tier 1: BUSY (parent running or has pending writes) │ -│ │ └─ enqueueAnnounce() → announce-queue.ts │ -│ │ - Debounce 1s to batch nearby completions │ -│ │ - Drain via writeInternal() when parent finishes │ -│ │ │ -│ │ Tier 2: IDLE (parent not running) │ -│ │ └─ sendAnnounceDirect() │ -│ │ - writeInternal(msg, { forwardAssistant, persistResponse })│ -│ │ │ -│ └─ All delivery uses writeInternal() (marks as internal: true) │ -│ → Prevents announcement from showing as user bubble in UI │ -│ → LLM processes findings and responds naturally to user │ -└──────────┬──────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────────────┐ -│ Record Lifecycle (registry.ts) │ -│ │ -│ created → startedAt → endedAt → findingsCaptured → announced │ -│ │ -│ After announcement: │ -│ - Record kept with archiveAtMs = now + 60 min │ -│ - sessions_list can still query records during this window │ -│ - Sweeper runs every 60s, removes expired records │ -│ - When all records removed, sweeper stops │ -└─────────────────────────────────────────────────────────────────────┘ -``` - -## Key Files - -| File | Purpose | -|------|---------| -| `sessions-spawn.ts` | Tool: spawns a child agent with task, label, timeout, provider | -| `sessions-list.ts` | Tool: lists subagent runs and their status | -| `registry.ts` | Lifecycle management: register, watch, capture, announce, archive | -| `announce.ts` | System prompt builder, findings reader, message formatter, delivery | -| `announce-queue.ts` | Debounced queue for batching announcements when parent is busy | -| `command-queue.ts` | Concurrency limiter for subagent lane slots | -| `lanes.ts` | Lane config: max concurrency (10), default timeout (1800s) | -| `types.ts` | Shared types: SubagentRunRecord, SubagentRunOutcome, etc. | -| `registry-store.ts` | Persistence: save/load runs to disk for crash recovery | - -## Provider Inheritance - -Subagents inherit the parent's resolved LLM provider: - -``` -runner.ts (resolvedProvider) - → toolsOptions.provider - → tools.ts (CreateToolsOptions.provider) - → sessions-spawn.ts (options.provider) - → hub.createSubagent({ provider }) -``` - -When the user switches providers via UI (`setProvider()`), `toolsOptions.provider` is updated in sync so future spawns use the new provider. - -## Error Propagation - -``` -Child tool error (e.g., API 401) - → Subagent LLM sees error, includes in final message (system prompt rule) - → captureFindings() reads final message - → Announcement includes error in findings - → Parent LLM sees error and can inform user - -Child run error (e.g., missing API key for provider) - → AsyncAgent._lastRunError set - → registry.ts checks childAgent.lastRunError after waitForIdle() - → outcome = { status: "error", error: "No API key configured..." } - → Announcement: "task failed: No API key configured..." -``` - -## Timeout Behavior - -Default: 1800s (30 min). System prompt guides the parent LLM: -- Simple tasks: 1800s (default) -- Moderate tasks: 1800-2400s (30-40 min) -- Complex tasks: 2400-3600s (40-60 min) - -On timeout: -1. Timeout timer fires in `watchChildAgent()` -2. `cleanup({ status: "timeout" })` is called -3. Child agent is closed via `hub.closeAgent()` -4. Findings are captured from whatever the child wrote so far -5. Announcement reports "timed out" with partial findings diff --git a/packages/core/src/agent/subagent/announce-findings.test.ts b/packages/core/src/agent/subagent/announce-findings.test.ts deleted file mode 100644 index 71f76448..00000000 --- a/packages/core/src/agent/subagent/announce-findings.test.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { describe, it, expect, afterEach } from "vitest"; -import { mkdtempSync, rmSync } from "node:fs"; -import { join } from "node:path"; -import { tmpdir } from "node:os"; -import { writeEntries } from "../session/storage.js"; -import { readLatestAssistantReply } from "./announce.js"; -import type { SessionEntry } from "../session/types.js"; - -describe("readLatestAssistantReply", () => { - let testDir: string; - - afterEach(() => { - if (testDir) { - rmSync(testDir, { recursive: true, force: true }); - } - }); - - async function seedSession(sessionId: string, entries: SessionEntry[]) { - await writeEntries(sessionId, entries, { baseDir: testDir }); - } - - it("returns the latest non-empty assistant text when the last assistant message is tool-only", async () => { - testDir = mkdtempSync(join(tmpdir(), "announce-test-")); - const sessionId = "child-session-1"; - - await seedSession(sessionId, [ - { - type: "message", - timestamp: 1, - message: { - role: "assistant", - content: [{ type: "text", text: "南京天气:晴,12°C。" }], - }, - } as SessionEntry, - { - type: "message", - timestamp: 2, - message: { - role: "assistant", - content: [{ type: "toolCall", id: "tool-1", name: "weather", arguments: { city: "Nanjing" } }], - }, - } as SessionEntry, - ]); - - const result = readLatestAssistantReply(sessionId, { baseDir: testDir }); - expect(result).toBe("南京天气:晴,12°C。"); - }); - - it("falls back to latest toolResult text when no assistant text exists", async () => { - testDir = mkdtempSync(join(tmpdir(), "announce-test-")); - const sessionId = "child-session-2"; - - await seedSession(sessionId, [ - { - type: "message", - timestamp: 1, - message: { - role: "assistant", - content: [{ type: "toolCall", id: "tool-2", name: "weather", arguments: { city: "Nanjing" } }], - }, - } as SessionEntry, - { - type: "message", - timestamp: 2, - message: { - role: "toolResult", - toolCallId: "tool-2", - toolName: "weather", - content: [{ type: "text", text: "{\"city\":\"Nanjing\",\"tempC\":12,\"condition\":\"Sunny\"}" }], - isError: false, - }, - } as SessionEntry, - ]); - - const result = readLatestAssistantReply(sessionId, { baseDir: testDir }); - expect(result).toContain("\"city\":\"Nanjing\""); - expect(result).toContain("\"condition\":\"Sunny\""); - }); -}); diff --git a/packages/core/src/agent/subagent/announce-queue.test.ts b/packages/core/src/agent/subagent/announce-queue.test.ts deleted file mode 100644 index bff56972..00000000 --- a/packages/core/src/agent/subagent/announce-queue.test.ts +++ /dev/null @@ -1,203 +0,0 @@ -import { afterEach, describe, expect, it, vi } from "vitest"; -import { - enqueueAnnounce, - resetAnnounceQueuesForTests, - getAnnounceQueueDepth, - type AnnounceQueueItem, - type AnnounceQueueSettings, -} from "./announce-queue.js"; - -afterEach(() => { - resetAnnounceQueuesForTests(); -}); - -function makeItem(overrides?: Partial): AnnounceQueueItem { - return { - prompt: "test prompt", - summaryLine: "test summary", - enqueuedAt: Date.now(), - requesterSessionId: "session-1", - ...overrides, - }; -} - -const FAST_SETTINGS: AnnounceQueueSettings = { - mode: "followup", - debounceMs: 0, - cap: 20, - dropPolicy: "old", -}; - -describe("announce queue", () => { - it("enqueues an item and drains via send callback", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { sent.push(item); }; - - enqueueAnnounce({ - key: "test", - item: makeItem(), - settings: FAST_SETTINGS, - send, - }); - - // Wait for async drain - await new Promise((r) => setTimeout(r, 50)); - - expect(sent).toHaveLength(1); - expect(sent[0]!.prompt).toBe("test prompt"); - }); - - it("batches items in collect mode", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { sent.push(item); }; - - const collectSettings: AnnounceQueueSettings = { - mode: "collect", - debounceMs: 0, - cap: 20, - dropPolicy: "old", - }; - - enqueueAnnounce({ - key: "test", - item: makeItem({ prompt: "prompt 1" }), - settings: collectSettings, - send, - }); - enqueueAnnounce({ - key: "test", - item: makeItem({ prompt: "prompt 2" }), - settings: collectSettings, - send, - }); - enqueueAnnounce({ - key: "test", - item: makeItem({ prompt: "prompt 3" }), - settings: collectSettings, - send, - }); - - await new Promise((r) => setTimeout(r, 50)); - - // Collect mode batches all into one send - expect(sent).toHaveLength(1); - expect(sent[0]!.prompt).toContain("prompt 1"); - expect(sent[0]!.prompt).toContain("prompt 2"); - expect(sent[0]!.prompt).toContain("prompt 3"); - expect(sent[0]!.prompt).toContain("3 queued announce(s)"); - }); - - it("sends items individually in followup mode", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { sent.push(item); }; - - enqueueAnnounce({ - key: "test", - item: makeItem({ prompt: "prompt A" }), - settings: FAST_SETTINGS, - send, - }); - enqueueAnnounce({ - key: "test", - item: makeItem({ prompt: "prompt B" }), - settings: FAST_SETTINGS, - send, - }); - - await new Promise((r) => setTimeout(r, 50)); - - expect(sent).toHaveLength(2); - expect(sent[0]!.prompt).toBe("prompt A"); - expect(sent[1]!.prompt).toBe("prompt B"); - }); - - it("respects cap with 'new' drop policy (rejects new items)", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { - // Slow send to keep items in queue - await new Promise((r) => setTimeout(r, 200)); - sent.push(item); - }; - - const cappedSettings: AnnounceQueueSettings = { - mode: "followup", - debounceMs: 0, - cap: 2, - dropPolicy: "new", - }; - - const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send }); - const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send }); - const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send }); - - expect(r1).toBe(true); - expect(r2).toBe(true); - expect(r3).toBe(false); // Rejected — cap reached - }); - - it("respects cap with 'old' drop policy (drops oldest)", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { - await new Promise((r) => setTimeout(r, 200)); - sent.push(item); - }; - - const cappedSettings: AnnounceQueueSettings = { - mode: "followup", - debounceMs: 0, - cap: 2, - dropPolicy: "old", - }; - - enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send }); - enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send }); - enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send }); - - // Queue should have items 2 and 3 (oldest was dropped) - expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2); - }); - - it("cleans up queue after drain completes", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { sent.push(item); }; - - enqueueAnnounce({ - key: "test", - item: makeItem(), - settings: FAST_SETTINGS, - send, - }); - - await new Promise((r) => setTimeout(r, 50)); - - expect(sent).toHaveLength(1); - expect(getAnnounceQueueDepth("test")).toBe(0); - }); - - it("debounces before draining", async () => { - const sent: AnnounceQueueItem[] = []; - const send = async (item: AnnounceQueueItem) => { sent.push(item); }; - - const debouncedSettings: AnnounceQueueSettings = { - mode: "followup", - debounceMs: 100, - cap: 20, - dropPolicy: "old", - }; - - enqueueAnnounce({ - key: "test", - item: makeItem(), - settings: debouncedSettings, - send, - }); - - // Should not have sent yet (debounce) - await new Promise((r) => setTimeout(r, 30)); - expect(sent).toHaveLength(0); - - // Wait for debounce to complete - await new Promise((r) => setTimeout(r, 150)); - expect(sent).toHaveLength(1); - }); -}); diff --git a/packages/core/src/agent/subagent/announce-queue.ts b/packages/core/src/agent/subagent/announce-queue.ts deleted file mode 100644 index 8e88a346..00000000 --- a/packages/core/src/agent/subagent/announce-queue.ts +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Announce queue for subagent result delivery. - * - * Handles queuing and batching of subagent announcements when the parent - * agent is busy. Supports debounce, cap, drop policy, and collect mode. - * - * Ported from OpenClaw (MIT license), adapted for Super Multica. - */ - -// ============================================================================ -// Types -// ============================================================================ - -export type AnnounceQueueMode = - /** Try steer, no queue fallback */ - | "steer" - /** Try steer, fall back to queue */ - | "steer-backlog" - /** Queue and send items individually */ - | "followup" - /** Queue and batch all items into one combined prompt */ - | "collect"; - -export type AnnounceDropPolicy = - /** Drop oldest items when cap reached */ - | "old" - /** Drop newest items when cap reached */ - | "new" - /** Summarize dropped items */ - | "summarize"; - -export type AnnounceQueueItem = { - prompt: string; - summaryLine?: string; - enqueuedAt: number; - requesterSessionId: string; -}; - -export type AnnounceQueueSettings = { - mode: AnnounceQueueMode; - debounceMs?: number; - cap?: number; - dropPolicy?: AnnounceDropPolicy; -}; - -type AnnounceQueueState = { - items: AnnounceQueueItem[]; - draining: boolean; - lastEnqueuedAt: number; - mode: AnnounceQueueMode; - debounceMs: number; - cap: number; - dropPolicy: AnnounceDropPolicy; - droppedCount: number; - summaryLines: string[]; - send: (item: AnnounceQueueItem) => Promise; -}; - -// ============================================================================ -// Defaults -// ============================================================================ - -const DEFAULT_DEBOUNCE_MS = 1000; -const DEFAULT_CAP = 20; -const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize"; - -export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = { - mode: "steer-backlog", - debounceMs: DEFAULT_DEBOUNCE_MS, - cap: DEFAULT_CAP, - dropPolicy: DEFAULT_DROP_POLICY, -}; - -// ============================================================================ -// Module state -// ============================================================================ - -const ANNOUNCE_QUEUES = new Map(); - -// ============================================================================ -// Public API -// ============================================================================ - -/** - * Enqueue an announcement for delivery. Returns true if enqueued, - * false if dropped (cap + "new" drop policy). - */ -export function enqueueAnnounce(params: { - key: string; - item: AnnounceQueueItem; - settings: AnnounceQueueSettings; - send: (item: AnnounceQueueItem) => Promise; -}): boolean { - const queue = getOrCreateQueue(params.key, params.settings, params.send); - queue.lastEnqueuedAt = Date.now(); - - const shouldEnqueue = applyDropPolicy(queue, params.item); - if (!shouldEnqueue) { - if (queue.dropPolicy === "new") { - scheduleAnnounceDrain(params.key); - } - return false; - } - - queue.items.push(params.item); - scheduleAnnounceDrain(params.key); - return true; -} - -/** Reset all queues (for testing). */ -export function resetAnnounceQueuesForTests(): void { - ANNOUNCE_QUEUES.clear(); -} - -/** Get the current queue depth for a key (for testing/diagnostics). */ -export function getAnnounceQueueDepth(key: string): number { - return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0; -} - -// ============================================================================ -// Queue management -// ============================================================================ - -function getOrCreateQueue( - key: string, - settings: AnnounceQueueSettings, - send: (item: AnnounceQueueItem) => Promise, -): AnnounceQueueState { - const existing = ANNOUNCE_QUEUES.get(key); - if (existing) { - existing.mode = settings.mode; - if (typeof settings.debounceMs === "number") { - existing.debounceMs = Math.max(0, settings.debounceMs); - } - if (typeof settings.cap === "number" && settings.cap > 0) { - existing.cap = Math.floor(settings.cap); - } - if (settings.dropPolicy) { - existing.dropPolicy = settings.dropPolicy; - } - existing.send = send; - return existing; - } - - const created: AnnounceQueueState = { - items: [], - draining: false, - lastEnqueuedAt: 0, - mode: settings.mode, - debounceMs: - typeof settings.debounceMs === "number" - ? Math.max(0, settings.debounceMs) - : DEFAULT_DEBOUNCE_MS, - cap: - typeof settings.cap === "number" && settings.cap > 0 - ? Math.floor(settings.cap) - : DEFAULT_CAP, - dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY, - droppedCount: 0, - summaryLines: [], - send, - }; - ANNOUNCE_QUEUES.set(key, created); - return created; -} - -// ============================================================================ -// Drop policy -// ============================================================================ - -function applyDropPolicy( - queue: AnnounceQueueState, - item: AnnounceQueueItem, -): boolean { - if (queue.items.length < queue.cap) { - return true; - } - - switch (queue.dropPolicy) { - case "new": - // Reject the incoming item - return false; - - case "old": { - // Drop the oldest item to make room - const dropped = queue.items.shift(); - if (dropped) { - queue.droppedCount++; - const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80); - queue.summaryLines.push(summary); - } - return true; - } - - case "summarize": { - // Drop the oldest item but keep a summary - const dropped = queue.items.shift(); - if (dropped) { - queue.droppedCount++; - const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80); - queue.summaryLines.push(summary); - } - return true; - } - - default: - return true; - } -} - -// ============================================================================ -// Drain scheduling -// ============================================================================ - -function scheduleAnnounceDrain(key: string): void { - const queue = ANNOUNCE_QUEUES.get(key); - if (!queue || queue.draining) return; - - queue.draining = true; - void (async () => { - try { - while (queue.items.length > 0 || queue.droppedCount > 0) { - await waitForDebounce(queue); - - if (queue.mode === "collect") { - // Batch all items into one combined prompt - const items = queue.items.splice(0, queue.items.length); - const summary = buildDropSummary(queue); - const prompt = buildCollectPrompt(items, summary); - const last = items.at(-1); - if (!last) break; - await queue.send({ ...last, prompt }); - continue; - } - - // followup / steer-backlog: send items individually - const summary = buildDropSummary(queue); - if (summary) { - const next = queue.items.shift(); - if (!next) break; - await queue.send({ ...next, prompt: summary }); - continue; - } - - const next = queue.items.shift(); - if (!next) break; - await queue.send(next); - } - } catch (err) { - console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`); - } finally { - queue.draining = false; - if (queue.items.length === 0 && queue.droppedCount === 0) { - ANNOUNCE_QUEUES.delete(key); - } else { - scheduleAnnounceDrain(key); - } - } - })(); -} - -// ============================================================================ -// Helpers -// ============================================================================ - -function waitForDebounce(queue: AnnounceQueueState): Promise { - const elapsed = Date.now() - queue.lastEnqueuedAt; - const remaining = Math.max(0, queue.debounceMs - elapsed); - if (remaining <= 0) return Promise.resolve(); - return new Promise((resolve) => setTimeout(resolve, remaining)); -} - -function buildDropSummary(queue: AnnounceQueueState): string | undefined { - if (queue.droppedCount === 0) return undefined; - - const parts: string[] = [ - `[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`, - ]; - if (queue.summaryLines.length > 0) { - parts.push(""); - for (const line of queue.summaryLines) { - parts.push(`- ${line}`); - } - } - - // Reset counters - queue.droppedCount = 0; - queue.summaryLines = []; - - return parts.join("\n"); -} - -function buildCollectPrompt( - items: AnnounceQueueItem[], - dropSummary: string | undefined, -): string { - const parts: string[] = [ - `[${items.length} queued announce(s) while agent was busy]`, - "", - ]; - - for (let i = 0; i < items.length; i++) { - parts.push(`---`); - parts.push(`Queued #${i + 1}`); - parts.push(items[i]!.prompt); - parts.push(""); - } - - if (dropSummary) { - parts.push(dropSummary); - parts.push(""); - } - - return parts.join("\n"); -} diff --git a/packages/core/src/agent/subagent/announce.test.ts b/packages/core/src/agent/subagent/announce.test.ts deleted file mode 100644 index efba367b..00000000 --- a/packages/core/src/agent/subagent/announce.test.ts +++ /dev/null @@ -1,294 +0,0 @@ -import { describe, it, expect } from "vitest"; -import { buildSubagentSystemPrompt, formatAnnouncementMessage, formatCoalescedAnnouncementMessage } from "./announce.js"; -import type { FormatAnnouncementParams } from "./announce.js"; -import type { SubagentRunRecord } from "./types.js"; - -describe("buildSubagentSystemPrompt", () => { - it("includes task and session context", () => { - const prompt = buildSubagentSystemPrompt({ - requesterSessionId: "parent-123", - childSessionId: "child-456", - task: "Analyze the auth module for security issues", - }); - - expect(prompt).toContain("## Subagent Rules"); - expect(prompt).toContain("Analyze the auth module for security issues"); - expect(prompt).toContain("parent-123"); - expect(prompt).toContain("child-456"); - expect(prompt).toContain("Do NOT spawn nested subagents"); - expect(prompt).toContain("## Safety"); - }); - - it("includes label when provided", () => { - const prompt = buildSubagentSystemPrompt({ - requesterSessionId: "parent-123", - childSessionId: "child-456", - label: "Security Audit", - task: "Check for vulnerabilities", - }); - - expect(prompt).toContain('Label: "Security Audit"'); - }); - - it("omits label line when not provided", () => { - const prompt = buildSubagentSystemPrompt({ - requesterSessionId: "parent-123", - childSessionId: "child-456", - task: "Do something", - }); - - expect(prompt).not.toContain("Label:"); - }); -}); - -describe("formatAnnouncementMessage", () => { - const baseParams: FormatAnnouncementParams = { - runId: "run-1", - childSessionId: "child-456", - requesterSessionId: "parent-123", - task: "Analyze code", - label: "Code Analysis", - cleanup: "delete", - outcome: { status: "ok" }, - startedAt: 1000000, - endedAt: 1030000, - }; - - it("formats successful completion", () => { - const msg = formatAnnouncementMessage({ - ...baseParams, - findings: "Found 3 issues in the auth module.", - }); - - expect(msg).toContain('"Code Analysis" just completed successfully'); - expect(msg).toContain("Found 3 issues in the auth module."); - expect(msg).toContain("runtime 30s"); - expect(msg).toContain("session child-456"); - }); - - it("formats error outcome", () => { - const msg = formatAnnouncementMessage({ - ...baseParams, - outcome: { status: "error", error: "API key expired" }, - }); - - expect(msg).toContain("failed: API key expired"); - }); - - it("formats timeout outcome", () => { - const msg = formatAnnouncementMessage({ - ...baseParams, - outcome: { status: "timeout" }, - }); - - expect(msg).toContain("timed out"); - }); - - it("shows (no output) when findings is not provided", () => { - const msg = formatAnnouncementMessage(baseParams); - - expect(msg).toContain("(no output)"); - }); - - it("uses task text when label is not provided", () => { - const paramsNoLabel: FormatAnnouncementParams = { - ...baseParams, - label: undefined, - }; - const msg = formatAnnouncementMessage(paramsNoLabel); - - expect(msg).toContain('"Analyze code"'); - }); - - it("formats runtime for minutes", () => { - const msg = formatAnnouncementMessage({ - ...baseParams, - startedAt: 1000000, - endedAt: 1150000, // 150 seconds = 2m30s - }); - - expect(msg).toContain("runtime 2m30s"); - }); - - it("formats runtime for hours", () => { - const msg = formatAnnouncementMessage({ - ...baseParams, - startedAt: 1000000, - endedAt: 4600000, // 3600 seconds = 1h - }); - - expect(msg).toContain("runtime 1h"); - }); - - it("includes summarization instruction", () => { - const msg = formatAnnouncementMessage(baseParams); - - expect(msg).toContain("Summarize this naturally for the user"); - expect(msg).toContain("NO_REPLY"); - }); -}); - -describe("formatCoalescedAnnouncementMessage", () => { - function makeRecord(overrides: Partial = {}): SubagentRunRecord { - return { - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Default task", - cleanup: "delete", - createdAt: 1000000, - startedAt: 1000000, - endedAt: 1030000, - outcome: { status: "ok" }, - findings: "Some findings", - findingsCaptured: true, - announced: false, - ...overrides, - }; - } - - it("delegates to formatAnnouncementMessage for a single record", () => { - const record = makeRecord({ label: "Code Analysis" }); - const coalesced = formatCoalescedAnnouncementMessage([record]); - const direct = formatAnnouncementMessage({ - runId: record.runId, - childSessionId: record.childSessionId, - requesterSessionId: record.requesterSessionId, - task: record.task, - label: record.label, - cleanup: record.cleanup, - outcome: record.outcome, - startedAt: record.startedAt, - endedAt: record.endedAt, - findings: record.findings, - }); - - expect(coalesced).toBe(direct); - }); - - it("formats multiple records with all task findings and stats", () => { - const records = [ - makeRecord({ - runId: "run-1", - childSessionId: "child-1", - label: "Task A", - findings: "Found issue A", - startedAt: 1000000, - endedAt: 1030000, - }), - makeRecord({ - runId: "run-2", - childSessionId: "child-2", - label: "Task B", - findings: "Found issue B", - startedAt: 1000000, - endedAt: 1045000, // 45 seconds - }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records); - - expect(msg).toContain("All 2 background task(s) have completed"); - expect(msg).toContain('Task 1: "Task A"'); - expect(msg).toContain("Found issue A"); - expect(msg).toContain('Task 2: "Task B"'); - expect(msg).toContain("Found issue B"); - expect(msg).toContain("Total wall time: 45s"); - expect(msg).toContain("2 succeeded, 0 failed"); - }); - - it("reports mixed outcomes correctly", () => { - const records = [ - makeRecord({ runId: "run-1", label: "OK Task", outcome: { status: "ok" } }), - makeRecord({ runId: "run-2", label: "Failed Task", outcome: { status: "error", error: "crash" } }), - makeRecord({ runId: "run-3", label: "Timeout Task", outcome: { status: "timeout" } }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records); - - expect(msg).toContain("completed successfully"); - expect(msg).toContain("failed: crash"); - expect(msg).toContain("timed out"); - expect(msg).toContain("1 succeeded, 2 failed"); - }); - - it("shows (no output) for missing findings", () => { - const records = [ - makeRecord({ runId: "run-1", findings: undefined }), - makeRecord({ runId: "run-2", findings: "Has output" }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records); - - expect(msg).toContain("(no output)"); - expect(msg).toContain("Has output"); - }); - - it("includes combined summary instruction for multi-record", () => { - const records = [ - makeRecord({ runId: "run-1" }), - makeRecord({ runId: "run-2" }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records); - - expect(msg).toContain("MUST include findings from every task item above"); - expect(msg).toContain("NO_REPLY"); - }); - - it("includes raw findings for every task in coalesced payload", () => { - const records = [ - makeRecord({ runId: "run-1", label: "南京天气", findings: "南京:晴,12°C" }), - makeRecord({ runId: "run-2", label: "上海天气", findings: "上海:多云,9°C" }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records); - - expect(msg).toContain("Raw findings from each task (MUST cover all items):"); - expect(msg).toContain("[1] 南京天气:"); - expect(msg).toContain("南京:晴,12°C"); - expect(msg).toContain("[2] 上海天气:"); - expect(msg).toContain("上海:多云,9°C"); - expect(msg).toContain("MUST include findings from every task item above"); - }); - - it("includes continuation prompt when next is provided", () => { - const records = [ - makeRecord({ runId: "run-1", label: "AAPL data", findings: "AAPL revenue: $100B" }), - makeRecord({ runId: "run-2", label: "MSFT data", findings: "MSFT revenue: $200B" }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records, "Summarize all data and write a PDF investment report"); - - expect(msg).toContain("CONTINUATION TASK"); - expect(msg).toContain("Summarize all data and write a PDF investment report"); - expect(msg).toContain("AAPL revenue: $100B"); - expect(msg).toContain("MSFT revenue: $200B"); - // Should NOT contain the default summarize instruction - expect(msg).not.toContain("Summarize these results naturally for the user"); - }); - - it("uses continuation prompt even for single record when next is provided", () => { - const records = [ - makeRecord({ runId: "run-1", label: "Data collection", findings: "All data collected" }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records, "Generate the final report"); - - expect(msg).toContain("CONTINUATION TASK"); - expect(msg).toContain("Generate the final report"); - expect(msg).toContain("All data collected"); - }); - - it("uses default summarize instruction when next is not provided", () => { - const records = [ - makeRecord({ runId: "run-1" }), - makeRecord({ runId: "run-2" }), - ]; - - const msg = formatCoalescedAnnouncementMessage(records); - - expect(msg).not.toContain("CONTINUATION TASK"); - expect(msg).toContain("Summarize these results naturally for the user"); - }); -}); diff --git a/packages/core/src/agent/subagent/announce.ts b/packages/core/src/agent/subagent/announce.ts deleted file mode 100644 index 3ab8eeeb..00000000 --- a/packages/core/src/agent/subagent/announce.ts +++ /dev/null @@ -1,424 +0,0 @@ -/** - * Subagent announcement flow. - * - * Handles result propagation from child → parent agent: - * - Builds system prompts for child agents - * - Reads child session output - * - Formats and delivers announcement messages - */ - -import { readEntries } from "../session/storage.js"; -import { getHub } from "../../hub/hub-singleton.js"; -import { buildSystemPrompt } from "../system-prompt/index.js"; -import type { - SubagentAnnounceParams, - SubagentRunOutcome, - SubagentRunRecord, - SubagentSystemPromptParams, -} from "./types.js"; -import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js"; - -/** - * Build the system prompt injected into a subagent session. - * Uses the structured prompt builder with "minimal" mode. - */ -export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): string { - return buildSystemPrompt({ - mode: "minimal", - subagent: { - requesterSessionId: params.requesterSessionId, - childSessionId: params.childSessionId, - label: params.label, - task: params.task, - }, - tools: params.tools, - }); -} - -/** - * Read the latest assistant reply from a session's JSONL file. - */ -export function readLatestAssistantReply( - sessionId: string, - options?: { baseDir?: string }, -): string | undefined { - const entries = readEntries(sessionId, options); - let latestToolResultText: string | undefined; - - // Walk backwards to find the last non-empty assistant reply. - // If no assistant text exists (e.g. run ended after tool execution), - // fall back to the latest non-empty toolResult content. - for (let i = entries.length - 1; i >= 0; i--) { - const entry = entries[i]!; - if (entry.type !== "message") continue; - - const message = entry.message; - if (message.role === "assistant") { - const text = extractAssistantText(message); - if (text) return text; - continue; - } - - if (message.role === "toolResult" && !latestToolResultText) { - const text = extractToolResultText(message); - if (text) latestToolResultText = text; - } - } - - return latestToolResultText; -} - -/** - * Extract text content from an assistant message. - * AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[]. - */ -function extractAssistantText(message: { role: string; content: unknown }): string { - return extractTextLikeContent(message.content); -} - -/** - * Extract text content from a toolResult message. - */ -function extractToolResultText(message: { role: string; content: unknown }): string { - return extractTextLikeContent(message.content); -} - -function extractTextLikeContent(content: unknown): string { - if (typeof content === "string") { - return sanitizeText(content); - } - - if (!Array.isArray(content)) return ""; - - const textParts: string[] = []; - for (const block of content) { - if (!block || typeof block !== "object") continue; - if ("text" in block) { - textParts.push(String((block as { text: unknown }).text)); - } - } - - return sanitizeText(textParts.join("\n")); -} - -/** - * Strip thinking tags and tool markers from text. - */ -function sanitizeText(text: string): string { - return text - .replace(/[\s\S]*?<\/thinking>/g, "") - .replace(/[\s\S]*?<\/tool_call>/g, "") - .trim(); -} - -/** - * Format the duration between two timestamps as a human-readable string. - */ -function formatDuration(startMs: number, endMs: number): string { - const totalSeconds = Math.round((endMs - startMs) / 1000); - if (totalSeconds < 60) return `${totalSeconds}s`; - - const minutes = Math.floor(totalSeconds / 60); - const seconds = totalSeconds % 60; - if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`; - - const hours = Math.floor(minutes / 60); - const remainingMinutes = minutes % 60; - return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`; -} - -/** - * Format a status label from an outcome. - */ -function formatStatusLabel(outcome: SubagentRunOutcome | undefined): string { - if (!outcome) return "completed with unknown status"; - switch (outcome.status) { - case "ok": - return "completed successfully"; - case "error": - return outcome.error ? `failed: ${outcome.error}` : "failed"; - case "timeout": - return "timed out"; - default: - return "completed with unknown status"; - } -} - -/** Parameters for formatAnnouncementMessage */ -export interface FormatAnnouncementParams { - runId: string; - childSessionId: string; - requesterSessionId: string; - task: string; - label?: string | undefined; - cleanup: "delete" | "keep"; - outcome?: SubagentRunOutcome | undefined; - startedAt?: number | undefined; - endedAt?: number | undefined; - findings?: string | undefined; -} - -/** - * Format the announcement message sent to the parent agent. - */ -export function formatAnnouncementMessage(params: FormatAnnouncementParams): string { - const { task, label, outcome, findings, startedAt, endedAt, childSessionId } = params; - const displayName = label || task.slice(0, 60); - const statusLabel = formatStatusLabel(outcome); - - const parts: string[] = [ - `A background task "${displayName}" just ${statusLabel}.`, - "", - "Findings:", - findings || "(no output)", - ]; - - // Stats line - const stats: string[] = []; - if (startedAt && endedAt) { - stats.push(`runtime ${formatDuration(startedAt, endedAt)}`); - } - stats.push(`session ${childSessionId}`); - - parts.push("", `Stats: ${stats.join(" • ")}`); - - parts.push( - "", - "Summarize this naturally for the user. Keep it brief (1-2 sentences).", - "Flow it into the conversation naturally.", - "Do not mention technical details like session IDs or that this was a background task.", - "You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).", - ); - - return parts.join("\n"); -} - -/** - * Format a coalesced announcement message from multiple completed subagent runs. - * When only one record is provided, delegates to formatAnnouncementMessage. - * - * @param next — Optional continuation prompt from a SubagentGroup. When present, - * the parent agent is instructed to execute the continuation using the combined - * findings, rather than just summarizing. - */ -export function formatCoalescedAnnouncementMessage( - records: SubagentRunRecord[], - next?: string, -): string { - // Single record without continuation: delegate to existing format - if (records.length === 1 && !next) { - const r = records[0]!; - return formatAnnouncementMessage({ - runId: r.runId, - childSessionId: r.childSessionId, - requesterSessionId: r.requesterSessionId, - task: r.task, - label: r.label, - cleanup: r.cleanup, - outcome: r.outcome, - startedAt: r.startedAt, - endedAt: r.endedAt, - findings: r.findings, - }); - } - - // Multiple records (or single with continuation): build combined message. - const parts: string[] = [ - `All ${records.length} background task(s) have completed. Here are the combined results:`, - "", - ]; - - for (let i = 0; i < records.length; i++) { - const r = records[i]!; - const displayName = r.label || r.task.slice(0, 60); - const statusLabel = formatStatusLabel(r.outcome); - const durationStr = (r.startedAt && r.endedAt) - ? ` (${formatDuration(r.startedAt, r.endedAt)})` - : ""; - - parts.push( - `### Task ${i + 1}: "${displayName}"`, - `Status: ${statusLabel}${durationStr}`, - "", - "Findings:", - r.findings || "(no output)", - "", - ); - } - - // Overall stats - const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[]; - const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[]; - if (allStartTimes.length > 0 && allEndTimes.length > 0) { - const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes)); - parts.push(`Total wall time: ${wallTime}`); - } - - const okCount = records.filter(r => r.outcome?.status === "ok").length; - const failCount = records.length - okCount; - parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`); - - parts.push("", "Raw findings from each task (MUST cover all items):", ""); - for (let i = 0; i < records.length; i++) { - const r = records[i]!; - const displayName = r.label || r.task.slice(0, 60); - parts.push( - `[${i + 1}] ${displayName}:`, - r.findings || "(no output)", - "", - ); - } - - // Continuation vs. summarization - if (next) { - parts.push( - "", - "---", - "", - "CONTINUATION TASK: The user's original request requires further work using the findings above.", - "Execute the following task now, using ALL the collected data:", - "", - next, - "", - "Use the raw findings above as your data source. Call tools as needed to complete this task.", - "Do not mention technical details like session IDs or that these were background tasks.", - ); - } else { - parts.push( - "", - "Summarize these results naturally for the user.", - "You MUST include findings from every task item above, without omission.", - "Keep it concise, but preserve concrete findings from each task.", - "Do not mention technical details like session IDs or that these were background tasks.", - "You can respond with NO_REPLY if no announcement is needed.", - ); - } - - return parts.join("\n"); -} - -/** - * Run the coalesced announcement flow for all completed runs of a requester. - * Uses two-tier delivery: - * 1. Queue — if parent is busy (running or has pending writes), queue for - * later drain via writeInternal (with debounce to batch nearby completions) - * 2. Direct — if parent is idle, send immediately via writeInternal - * - * All delivery uses writeInternal() which marks the announcement prompt as - * `internal: true` (hidden from UI). The assistant's summary response is - * forwarded to the real-time stream (`forwardAssistant: true`) so the user - * sees the result, and persisted to JSONL for future session loads. - */ -export function runCoalescedAnnounceFlow( - requesterSessionId: string, - records: SubagentRunRecord[], - next?: string, -): boolean { - const message = formatCoalescedAnnouncementMessage(records, next); - - try { - const hub = getHub(); - const parentAgent = hub.getAgent(requesterSessionId); - if (!parentAgent || parentAgent.closed) { - console.warn( - `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, - ); - return false; - } - - // Tier 1: BUSY — parent is running or has pending writes - // Queue the announcement for delivery via writeInternal() after the parent - // finishes its current work. We do NOT use steer() (cancels unrelated tool - // calls) or followUp() (doesn't mark entries as internal, polluting the UI). - if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) { - enqueueAnnounce({ - key: requesterSessionId, - item: { - prompt: message, - summaryLine: `${records.length} subagent(s) completed`, - enqueuedAt: Date.now(), - requesterSessionId, - }, - settings: DEFAULT_ANNOUNCE_SETTINGS, - send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt), - }); - console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`); - return true; - } - - // Tier 2: IDLE — parent is idle, send directly via writeInternal - sendAnnounceDirect(requesterSessionId, message); - return true; - } catch (err) { - console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); - return false; - } -} - -/** - * Send announcement directly to parent via writeInternal. - * Used as Tier 3 (idle) and as the queue drain callback. - */ -function sendAnnounceDirect(requesterSessionId: string, message: string): void { - try { - const hub = getHub(); - const parentAgent = hub.getAgent(requesterSessionId); - if (!parentAgent || parentAgent.closed) { - console.warn( - `[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`, - ); - return; - } - parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); - } catch (err) { - console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err); - } -} - -/** - * Run the full subagent announcement flow: - * 1. Read child's last assistant reply - * 2. Format announcement message - * 3. Send to parent agent via Hub - * - * @deprecated Use runCoalescedAnnounceFlow instead, which supports - * batching multiple completed runs into a single announcement. - */ -export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean { - const { requesterSessionId, childSessionId } = params; - - // Read child's final output - const findings = readLatestAssistantReply(childSessionId); - - // Format the announcement - const message = formatAnnouncementMessage({ - runId: params.runId, - childSessionId: params.childSessionId, - requesterSessionId: params.requesterSessionId, - task: params.task, - label: params.label, - cleanup: params.cleanup, - outcome: params.outcome, - startedAt: params.startedAt, - endedAt: params.endedAt, - findings, - }); - - // Deliver to parent agent via Hub - try { - const hub = getHub(); - const parentAgent = hub.getAgent(requesterSessionId); - if (!parentAgent || parentAgent.closed) { - console.warn( - `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, - ); - return false; - } - - parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); - return true; - } catch (err) { - console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); - return false; - } -} diff --git a/packages/core/src/agent/subagent/command-queue.test.ts b/packages/core/src/agent/subagent/command-queue.test.ts deleted file mode 100644 index a9bd6966..00000000 --- a/packages/core/src/agent/subagent/command-queue.test.ts +++ /dev/null @@ -1,117 +0,0 @@ -import { afterEach, describe, expect, it } from "vitest"; -import { - enqueueInLane, - getLaneSize, - clearLane, - setLaneConcurrency, - resetLanesForTests, -} from "./command-queue.js"; - -afterEach(() => { - resetLanesForTests(); -}); - -describe("command queue", () => { - it("runs tasks serially by default (maxConcurrent = 1)", async () => { - let active = 0; - let maxActive = 0; - const order: number[] = []; - - const makeTask = (id: number) => async () => { - active += 1; - maxActive = Math.max(maxActive, active); - order.push(id); - await new Promise((r) => setTimeout(r, 10)); - active -= 1; - return id; - }; - - const results = await Promise.all([ - enqueueInLane("test", makeTask(1)), - enqueueInLane("test", makeTask(2)), - enqueueInLane("test", makeTask(3)), - ]); - - expect(results).toEqual([1, 2, 3]); - expect(order).toEqual([1, 2, 3]); - expect(maxActive).toBe(1); - }); - - it("respects maxConcurrent limit", async () => { - setLaneConcurrency("test", 3); - - let active = 0; - let maxActive = 0; - - const makeTask = (id: number) => async () => { - active += 1; - maxActive = Math.max(maxActive, active); - await new Promise((r) => setTimeout(r, 20)); - active -= 1; - return id; - }; - - const results = await Promise.all([ - enqueueInLane("test", makeTask(1)), - enqueueInLane("test", makeTask(2)), - enqueueInLane("test", makeTask(3)), - enqueueInLane("test", makeTask(4)), - enqueueInLane("test", makeTask(5)), - ]); - - expect(results).toEqual([1, 2, 3, 4, 5]); - expect(maxActive).toBe(3); - }); - - it("reports correct lane size", async () => { - setLaneConcurrency("test", 1); - - let resolveFirst!: () => void; - const blocker = new Promise((r) => { - resolveFirst = r; - }); - - // First task blocks the lane - const p1 = enqueueInLane("test", () => blocker); - // Second task queued - const p2 = enqueueInLane("test", async () => "done"); - - // 1 active + 1 queued = 2 - expect(getLaneSize("test")).toBe(2); - - resolveFirst(); - await Promise.all([p1, p2]); - - expect(getLaneSize("test")).toBe(0); - }); - - it("clears pending tasks", async () => { - setLaneConcurrency("test", 1); - - let resolveFirst!: () => void; - const blocker = new Promise((r) => { - resolveFirst = r; - }); - - const p1 = enqueueInLane("test", () => blocker); - const p2 = enqueueInLane("test", async () => "should-not-run"); - const p3 = enqueueInLane("test", async () => "should-not-run"); - - const removed = clearLane("test"); - expect(removed).toBe(2); - - resolveFirst(); - await p1; - - // p2 and p3 should reject - await expect(p2).rejects.toThrow("Lane cleared"); - await expect(p3).rejects.toThrow("Lane cleared"); - - expect(getLaneSize("test")).toBe(0); - }); - - it("returns 0 for unknown lane", () => { - expect(getLaneSize("nonexistent")).toBe(0); - expect(clearLane("nonexistent")).toBe(0); - }); -}); diff --git a/packages/core/src/agent/subagent/command-queue.ts b/packages/core/src/agent/subagent/command-queue.ts deleted file mode 100644 index 470378d2..00000000 --- a/packages/core/src/agent/subagent/command-queue.ts +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Lane-based command queue for subagent concurrency control. - * - * Each lane maintains an independent queue with a configurable max-concurrency - * limit. Tasks beyond the limit are queued and executed FIFO as slots free up. - * - * Adapted from OpenClaw's process/command-queue.ts. - */ - -// --------------------------------------------------------------------------- -// Types -// --------------------------------------------------------------------------- - -type QueueEntry = { - task: () => Promise; - resolve: (value: unknown) => void; - reject: (reason?: unknown) => void; - enqueuedAt: number; - warnAfterMs: number; - onWait?: (waitMs: number, queuedAhead: number) => void; -}; - -type LaneState = { - lane: string; - queue: QueueEntry[]; - active: number; - maxConcurrent: number; - draining: boolean; -}; - -// --------------------------------------------------------------------------- -// Module state -// --------------------------------------------------------------------------- - -const lanes = new Map(); - -function getLaneState(lane: string): LaneState { - const existing = lanes.get(lane); - if (existing) return existing; - - const created: LaneState = { - lane, - queue: [], - active: 0, - maxConcurrent: 1, - draining: false, - }; - lanes.set(lane, created); - return created; -} - -// --------------------------------------------------------------------------- -// Drain / pump -// --------------------------------------------------------------------------- - -function drainLane(lane: string): void { - const state = getLaneState(lane); - if (state.draining) return; - state.draining = true; - - const pump = () => { - while (state.active < state.maxConcurrent && state.queue.length > 0) { - const entry = state.queue.shift() as QueueEntry; - const waitedMs = Date.now() - entry.enqueuedAt; - - if (waitedMs >= entry.warnAfterMs) { - entry.onWait?.(waitedMs, state.queue.length); - console.warn( - `[CommandQueue] lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`, - ); - } - - state.active += 1; - - void (async () => { - try { - const result = await entry.task(); - state.active -= 1; - pump(); - entry.resolve(result); - } catch (err) { - state.active -= 1; - pump(); - entry.reject(err); - } - })(); - } - state.draining = false; - }; - - pump(); -} - -// --------------------------------------------------------------------------- -// Public API -// --------------------------------------------------------------------------- - -/** Set (or update) the max concurrency for a lane. Triggers a drain. */ -export function setLaneConcurrency(lane: string, maxConcurrent: number): void { - const state = getLaneState(lane); - state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent)); - drainLane(lane); -} - -/** Enqueue a task in a specific lane. Returns a promise that resolves with the task's return value. */ -export function enqueueInLane( - lane: string, - task: () => Promise, - opts?: { - warnAfterMs?: number; - onWait?: (waitMs: number, queuedAhead: number) => void; - }, -): Promise { - const warnAfterMs = opts?.warnAfterMs ?? 5_000; - const state = getLaneState(lane); - - return new Promise((resolve, reject) => { - state.queue.push({ - task: () => task(), - resolve: (value) => resolve(value as T), - reject, - enqueuedAt: Date.now(), - warnAfterMs, - onWait: opts?.onWait, - }); - drainLane(lane); - }); -} - -/** Number of active + queued tasks in a lane. */ -export function getLaneSize(lane: string): number { - const state = lanes.get(lane); - if (!state) return 0; - return state.queue.length + state.active; -} - -/** Remove all pending (not yet active) tasks from a lane. Returns how many were removed. */ -export function clearLane(lane: string): number { - const state = lanes.get(lane); - if (!state) return 0; - const removed = state.queue.length; - // Reject pending tasks so callers aren't left hanging - for (const entry of state.queue) { - entry.reject(new Error("Lane cleared")); - } - state.queue.length = 0; - return removed; -} - -/** Reset all lanes (for testing). */ -export function resetLanesForTests(): void { - for (const state of lanes.values()) { - for (const entry of state.queue) { - entry.reject(new Error("Reset")); - } - } - lanes.clear(); -} diff --git a/packages/core/src/agent/subagent/index.ts b/packages/core/src/agent/subagent/index.ts deleted file mode 100644 index 7a934229..00000000 --- a/packages/core/src/agent/subagent/index.ts +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Subagent orchestration system. - * - * Provides child agent spawning, lifecycle management, - * persistent registry, and result announcement flow. - */ - -export type { - SubagentRunOutcome, - SubagentRunRecord, - RegisterSubagentRunParams, - SubagentAnnounceParams, - SubagentSystemPromptParams, -} from "./types.js"; - -export { - initSubagentRegistry, - registerSubagentRun, - listSubagentRuns, - releaseSubagentRun, - getSubagentRun, - resetSubagentRegistryForTests, - shutdownSubagentRegistry, -} from "./registry.js"; - -export { - buildSubagentSystemPrompt, - readLatestAssistantReply, - formatAnnouncementMessage, - runSubagentAnnounceFlow, - formatCoalescedAnnouncementMessage, - runCoalescedAnnounceFlow, -} from "./announce.js"; -export type { FormatAnnouncementParams } from "./announce.js"; - -export { - loadSubagentRuns, - saveSubagentRuns, - getSubagentStorePath, -} from "./registry-store.js"; diff --git a/packages/core/src/agent/subagent/lanes.ts b/packages/core/src/agent/subagent/lanes.ts deleted file mode 100644 index 87bfecc3..00000000 --- a/packages/core/src/agent/subagent/lanes.ts +++ /dev/null @@ -1,37 +0,0 @@ -/** Named lanes for the subagent command queue. */ -export const SubagentLane = { - Subagent: "subagent", -} as const; - -/** Default maximum concurrent subagent runs. */ -export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 10; - -// --------------------------------------------------------------------------- -// Timeout defaults -// --------------------------------------------------------------------------- - -/** Default subagent timeout: 30 minutes. */ -export const DEFAULT_SUBAGENT_TIMEOUT_SECONDS = 1800; - -/** Maximum safe value for setTimeout (~24.8 days). */ -const MAX_SAFE_TIMEOUT_MS = 2_147_000_000; - -/** - * Resolve the effective timeout in milliseconds for a subagent run. - * - * - `undefined` / negative → default (1800 s) - * - `0` → no timeout (MAX_SAFE_TIMEOUT_MS) - * - positive number → use as-is, clamped to safe range - */ -export function resolveSubagentTimeoutMs(overrideSeconds?: number): number { - if (overrideSeconds === undefined || overrideSeconds === null) { - return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000; - } - if (overrideSeconds === 0) { - return MAX_SAFE_TIMEOUT_MS; // "no timeout" - } - if (overrideSeconds < 0) { - return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000; - } - return Math.min(Math.floor(overrideSeconds) * 1000, MAX_SAFE_TIMEOUT_MS); -} diff --git a/packages/core/src/agent/subagent/registry-recovery.test.ts b/packages/core/src/agent/subagent/registry-recovery.test.ts deleted file mode 100644 index da9bc14f..00000000 --- a/packages/core/src/agent/subagent/registry-recovery.test.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { SubagentRunRecord } from "./types.js"; - -const loadSubagentRunsMock = vi.fn<() => Map>(); -const saveSubagentRunsMock = vi.fn(); -const readLatestAssistantReplyMock = vi.fn(); -const runCoalescedAnnounceFlowMock = vi.fn(() => false); -const resolveSessionDirMock = vi.fn((sessionId: string) => `/tmp/${sessionId}`); -const closeAgentMock = vi.fn(); -const getHubMock = vi.fn(() => ({ closeAgent: closeAgentMock })); -const rmSyncMock = vi.fn(); - -vi.mock("./registry-store.js", () => ({ - loadSubagentRuns: loadSubagentRunsMock, - loadSubagentGroups: vi.fn(() => new Map()), - saveSubagentRuns: saveSubagentRunsMock, -})); - -vi.mock("./announce.js", () => ({ - readLatestAssistantReply: readLatestAssistantReplyMock, - runCoalescedAnnounceFlow: runCoalescedAnnounceFlowMock, -})); - -vi.mock("../session/storage.js", () => ({ - resolveSessionDir: resolveSessionDirMock, -})); - -vi.mock("../../hub/hub-singleton.js", () => ({ - getHub: getHubMock, - isHubInitialized: vi.fn(() => false), -})); - -vi.mock("node:fs", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - rmSync: rmSyncMock, - }; -}); - -describe("subagent registry recovery cleanup", () => { - beforeEach(() => { - vi.resetModules(); - vi.clearAllMocks(); - loadSubagentRunsMock.mockReturnValue(new Map()); - runCoalescedAnnounceFlowMock.mockReturnValue(false); - }); - - it("deletes child session on recovery even when findings were already captured", async () => { - const now = Date.now(); - const record: SubagentRunRecord = { - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "task", - cleanup: "delete", - createdAt: now - 1000, - startedAt: now - 900, - endedAt: now - 100, - outcome: { status: "ok" }, - findings: "done", - findingsCaptured: true, - cleanupHandled: false, - announced: false, - }; - - loadSubagentRunsMock.mockReturnValue(new Map([["run-1", record]])); - - const registry = await import("./registry.js"); - registry.initSubagentRegistry(); - - expect(readLatestAssistantReplyMock).not.toHaveBeenCalled(); - expect(resolveSessionDirMock).toHaveBeenCalledWith("child-1"); - expect(rmSyncMock).toHaveBeenCalledWith("/tmp/child-1", { recursive: true, force: true }); - }); -}); diff --git a/packages/core/src/agent/subagent/registry-store.test.ts b/packages/core/src/agent/subagent/registry-store.test.ts deleted file mode 100644 index bd7a5814..00000000 --- a/packages/core/src/agent/subagent/registry-store.test.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { describe, it, expect, beforeEach, afterEach } from "vitest"; -import { mkdtempSync, rmSync, existsSync } from "node:fs"; -import { join } from "node:path"; -import { tmpdir } from "node:os"; -import type { SubagentRunRecord } from "./types.js"; - -// We need to test the store functions with a custom directory. -// Since the store uses DATA_DIR from shared, we test the serialization logic directly. - -describe("registry-store serialization", () => { - let tempDir: string; - - beforeEach(() => { - tempDir = mkdtempSync(join(tmpdir(), "subagent-store-test-")); - }); - - afterEach(() => { - rmSync(tempDir, { recursive: true, force: true }); - }); - - it("round-trips SubagentRunRecord through JSON", () => { - const record: SubagentRunRecord = { - runId: "run-123", - childSessionId: "child-456", - requesterSessionId: "parent-789", - task: "Analyze code quality", - label: "Code Review", - cleanup: "delete", - createdAt: Date.now(), - startedAt: Date.now(), - endedAt: Date.now() + 30000, - outcome: { status: "ok" }, - archiveAtMs: Date.now() + 3600000, - cleanupHandled: true, - cleanupCompletedAt: Date.now() + 30100, - }; - - // Serialize and deserialize - const json = JSON.stringify({ version: 1, runs: { "run-123": record } }); - const parsed = JSON.parse(json); - - expect(parsed.version).toBe(1); - expect(parsed.runs["run-123"]).toEqual(record); - }); - - it("handles record with minimal fields", () => { - const record: SubagentRunRecord = { - runId: "run-minimal", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Do something", - cleanup: "keep", - createdAt: Date.now(), - }; - - const json = JSON.stringify({ version: 1, runs: { "run-minimal": record } }); - const parsed = JSON.parse(json); - - expect(parsed.runs["run-minimal"].runId).toBe("run-minimal"); - expect(parsed.runs["run-minimal"].outcome).toBeUndefined(); - expect(parsed.runs["run-minimal"].label).toBeUndefined(); - }); - - it("handles error outcome serialization", () => { - const record: SubagentRunRecord = { - runId: "run-err", - childSessionId: "child-err", - requesterSessionId: "parent-1", - task: "Fail", - cleanup: "delete", - createdAt: Date.now(), - outcome: { status: "error", error: "Something went wrong" }, - }; - - const json = JSON.stringify(record); - const parsed = JSON.parse(json) as SubagentRunRecord; - - expect(parsed.outcome?.status).toBe("error"); - expect(parsed.outcome?.error).toBe("Something went wrong"); - }); - - it("round-trips coalescing fields (findings, findingsCaptured, announced)", () => { - const record: SubagentRunRecord = { - runId: "run-coalesce", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Coalesce test", - cleanup: "delete", - createdAt: Date.now(), - endedAt: Date.now() + 5000, - outcome: { status: "ok" }, - findings: "Found 3 issues in auth module.", - findingsCaptured: true, - announced: true, - }; - - const json = JSON.stringify({ version: 1, runs: { "run-coalesce": record } }); - const parsed = JSON.parse(json); - const restored = parsed.runs["run-coalesce"] as SubagentRunRecord; - - expect(restored.findings).toBe("Found 3 issues in auth module."); - expect(restored.findingsCaptured).toBe(true); - expect(restored.announced).toBe(true); - }); - - it("round-trips record with undefined coalescing fields", () => { - const record: SubagentRunRecord = { - runId: "run-old", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Old record", - cleanup: "delete", - createdAt: Date.now(), - cleanupHandled: true, - // No findings, findingsCaptured, or announced fields (old format) - }; - - const json = JSON.stringify({ version: 1, runs: { "run-old": record } }); - const parsed = JSON.parse(json); - const restored = parsed.runs["run-old"] as SubagentRunRecord; - - expect(restored.findings).toBeUndefined(); - expect(restored.findingsCaptured).toBeUndefined(); - expect(restored.announced).toBeUndefined(); - expect(restored.cleanupHandled).toBe(true); - }); -}); diff --git a/packages/core/src/agent/subagent/registry-store.ts b/packages/core/src/agent/subagent/registry-store.ts deleted file mode 100644 index 17fc8c8f..00000000 --- a/packages/core/src/agent/subagent/registry-store.ts +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Persistent storage for subagent run records. - * - * File: ~/.super-multica/subagents/runs.json - */ - -import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; -import { join } from "node:path"; -import { DATA_DIR } from "@multica/utils"; -import type { SubagentRunRecord, SubagentGroup } from "./types.js"; - -const SUBAGENTS_DIR = join(DATA_DIR, "subagents"); -const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); - -interface SubagentRunsStore { - version: 1; - runs: Record; - groups?: Record | undefined; -} - -function ensureDir(): void { - if (!existsSync(SUBAGENTS_DIR)) { - mkdirSync(SUBAGENTS_DIR, { recursive: true }); - } -} - -/** Get the path to the subagent store file (for testing) */ -export function getSubagentStorePath(): string { - return RUNS_FILE; -} - -/** Load all persisted subagent runs */ -export function loadSubagentRuns(): Map { - if (!existsSync(RUNS_FILE)) return new Map(); - - try { - const content = readFileSync(RUNS_FILE, "utf-8"); - const store = JSON.parse(content) as SubagentRunsStore; - - if (store.version !== 1) { - console.warn(`[SubagentStore] Unknown store version: ${store.version}, ignoring`); - return new Map(); - } - - return new Map(Object.entries(store.runs)); - } catch (err) { - console.warn(`[SubagentStore] Failed to load runs:`, err); - return new Map(); - } -} - -/** Load all persisted subagent groups */ -export function loadSubagentGroups(): Map { - if (!existsSync(RUNS_FILE)) return new Map(); - - try { - const content = readFileSync(RUNS_FILE, "utf-8"); - const store = JSON.parse(content) as SubagentRunsStore; - if (store.version !== 1 || !store.groups) return new Map(); - return new Map(Object.entries(store.groups)); - } catch { - return new Map(); - } -} - -/** Save all subagent runs and groups to disk */ -export function saveSubagentRuns( - runs: Map, - groups?: Map, -): void { - ensureDir(); - - const store: SubagentRunsStore = { - version: 1, - runs: Object.fromEntries(runs), - groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined, - }; - - writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8"); -} diff --git a/packages/core/src/agent/subagent/registry.test.ts b/packages/core/src/agent/subagent/registry.test.ts deleted file mode 100644 index e1886dac..00000000 --- a/packages/core/src/agent/subagent/registry.test.ts +++ /dev/null @@ -1,405 +0,0 @@ -import { describe, it, expect, beforeEach, vi } from "vitest"; -import { - registerSubagentRun, - listSubagentRuns, - getSubagentRun, - releaseSubagentRun, - resetSubagentRegistryForTests, - shutdownSubagentRegistry, -} from "./registry.js"; -import { resetLanesForTests } from "./command-queue.js"; - -// Note: These tests exercise the registry's in-memory state management. -// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent). - -/** Wait for the command queue to process enqueued tasks. */ -const flushQueue = () => new Promise((r) => setTimeout(r, 0)); - -beforeEach(() => { - resetSubagentRegistryForTests(); - resetLanesForTests(); -}); - -describe("subagent registry", () => { - it("registers a run and retrieves it by ID", async () => { - const record = registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Analyze code", - label: "Code Analysis", - }); - - expect(record.runId).toBe("run-1"); - expect(record.childSessionId).toBe("child-1"); - expect(record.requesterSessionId).toBe("parent-1"); - expect(record.task).toBe("Analyze code"); - expect(record.label).toBe("Code Analysis"); - expect(record.cleanup).toBe("delete"); // default - expect(record.createdAt).toBeGreaterThan(0); - - await flushQueue(); - expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent (async via queue) - - const retrieved = getSubagentRun("run-1"); - expect(retrieved).toBe(record); - }); - - it("lists runs filtered by requester session", () => { - registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-A", - task: "Task 1", - }); - registerSubagentRun({ - runId: "run-2", - childSessionId: "child-2", - requesterSessionId: "parent-B", - task: "Task 2", - }); - registerSubagentRun({ - runId: "run-3", - childSessionId: "child-3", - requesterSessionId: "parent-A", - task: "Task 3", - }); - - const parentARuns = listSubagentRuns("parent-A"); - expect(parentARuns).toHaveLength(2); - expect(parentARuns.map((r) => r.runId).sort()).toEqual(["run-1", "run-3"]); - - const parentBRuns = listSubagentRuns("parent-B"); - expect(parentBRuns).toHaveLength(1); - expect(parentBRuns[0]!.runId).toBe("run-2"); - - const emptyRuns = listSubagentRuns("parent-C"); - expect(emptyRuns).toHaveLength(0); - }); - - it("releases a run from the registry", () => { - registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Task", - }); - - expect(getSubagentRun("run-1")).toBeDefined(); - - const released = releaseSubagentRun("run-1"); - expect(released).toBe(true); - expect(getSubagentRun("run-1")).toBeUndefined(); - - // Double release returns false - const releasedAgain = releaseSubagentRun("run-1"); - expect(releasedAgain).toBe(false); - }); - - it("applies custom cleanup value", () => { - const record = registerSubagentRun({ - runId: "run-keep", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Keep session", - cleanup: "keep", - }); - - expect(record.cleanup).toBe("keep"); - }); - - it("registers a run and ends it with error when Hub is not available", async () => { - // Without Hub initialized, watchChildAgent detects missing Hub - // and immediately ends the run with an error - registerSubagentRun({ - runId: "run-no-hub", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Running task", - }); - - await flushQueue(); - - const record = getSubagentRun("run-no-hub"); - expect(record?.startedAt).toBeGreaterThan(0); - expect(record?.endedAt).toBeGreaterThan(0); - expect(record?.outcome?.status).toBe("error"); - expect(record?.outcome?.error).toContain("Hub not initialized"); - }); - - it("shutdownSubagentRegistry marks unfinished runs as ended", async () => { - // Directly set up a record without going through watchChildAgent - // to simulate a run that is still active - registerSubagentRun({ - runId: "run-active", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Running task", - }); - - await flushQueue(); - - // The above run already ended due to no Hub; reset its endedAt - // to simulate a truly active run - const record = getSubagentRun("run-active"); - if (record) { - record.endedAt = undefined; - record.outcome = undefined; - } - - shutdownSubagentRegistry(); - - const after = getSubagentRun("run-active"); - expect(after?.endedAt).toBeGreaterThan(0); - expect(after?.outcome?.status).toBe("unknown"); - }); - - it("resetSubagentRegistryForTests clears all state", () => { - registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Task", - }); - - expect(listSubagentRuns("parent-1")).toHaveLength(1); - - resetSubagentRegistryForTests(); - - expect(listSubagentRuns("parent-1")).toHaveLength(0); - expect(getSubagentRun("run-1")).toBeUndefined(); - }); -}); - -describe("subagent registry — coalescing", () => { - // Without Hub, watchChildAgent ends runs immediately with "Hub not initialized". - // This allows us to test the coalescing state transitions. - - it("captures findings when a run completes (no Hub)", async () => { - registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Task 1", - }); - - await flushQueue(); - - const record = getSubagentRun("run-1"); - // Run ended immediately due to no Hub - expect(record?.endedAt).toBeGreaterThan(0); - expect(record?.findingsCaptured).toBe(true); - }); - - it("does not announce while sibling runs are still pending", async () => { - // Register first run — ends immediately (no Hub) - registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Task 1", - }); - - await flushQueue(); - - const record1 = getSubagentRun("run-1"); - expect(record1?.findingsCaptured).toBe(true); - - // Register second run — also ends immediately - registerSubagentRun({ - runId: "run-2", - childSessionId: "child-2", - requesterSessionId: "parent-1", - task: "Task 2", - }); - - await flushQueue(); - - const record2 = getSubagentRun("run-2"); - expect(record2?.findingsCaptured).toBe(true); - - // Both ended, but announce fails because no Hub for parent agent. - // The key check: both records should have findings captured. - // announced will be false because runCoalescedAnnounceFlow fails (no Hub). - expect(record1?.announced).toBeUndefined(); - expect(record2?.announced).toBeUndefined(); - }); - - it("single run captures findings immediately", async () => { - registerSubagentRun({ - runId: "run-solo", - childSessionId: "child-solo", - requesterSessionId: "parent-solo", - task: "Solo task", - }); - - await flushQueue(); - - const record = getSubagentRun("run-solo"); - expect(record?.endedAt).toBeGreaterThan(0); - expect(record?.findingsCaptured).toBe(true); - expect(record?.outcome?.status).toBe("error"); - expect(record?.outcome?.error).toContain("Hub not initialized"); - }); - - it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", async () => { - registerSubagentRun({ - runId: "run-1", - childSessionId: "child-1", - requesterSessionId: "parent-1", - task: "Task", - }); - - await flushQueue(); - - const record = getSubagentRun("run-1"); - if (record) { - // Simulate: run ended but findings not yet captured - record.endedAt = Date.now(); - record.outcome = { status: "ok" }; - record.findingsCaptured = undefined; - } - - shutdownSubagentRegistry(); - - expect(record?.findingsCaptured).toBe(true); - }); -}); - -describe("subagent registry — silent announce mode", () => { - // Note: In tests (no Hub), watchChildAgent completes synchronously within - // registerSubagentRun(), so each run's lifecycle finishes before the next - // registration call. Multi-run coalescing requires async child agents and - // is validated in integration tests. - - it("stores announce field on the record", () => { - const record = registerSubagentRun({ - runId: "run-ann", - childSessionId: "child-ann", - requesterSessionId: "parent-1", - task: "Task", - announce: "silent", - }); - expect(record.announce).toBe("silent"); - }); - - it("defaults announce to undefined (immediate behavior)", () => { - const record = registerSubagentRun({ - runId: "run-def", - childSessionId: "child-def", - requesterSessionId: "parent-1", - task: "Task", - }); - expect(record.announce).toBeUndefined(); - }); - - it("silent runs are announced via runCoalescedAnnounceFlow", async () => { - const announceModule = await import("./announce.js"); - const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); - - registerSubagentRun({ - runId: "run-s1", - childSessionId: "child-s1", - requesterSessionId: "parent-1", - task: "Silent A", - announce: "silent", - }); - - await flushQueue(); - - // Silent run announced (via runCoalescedAnnounceFlow mock) - const silentCalls = spy.mock.calls.filter( - ([reqId, records]) => - reqId === "parent-1" && - records.some((r: { announce?: string }) => r.announce === "silent"), - ); - expect(silentCalls.length).toBeGreaterThanOrEqual(1); - - const runS1 = getSubagentRun("run-s1"); - expect(runS1?.announced).toBe(true); - expect(runS1?.announce).toBe("silent"); - - spy.mockRestore(); - }); - - it("immediate and silent runs are never mixed in the same announce call", async () => { - const announceModule = await import("./announce.js"); - const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); - - // Register immediate run, then silent run - registerSubagentRun({ - runId: "run-imm", - childSessionId: "child-imm", - requesterSessionId: "parent-1", - task: "Immediate task", - }); - registerSubagentRun({ - runId: "run-s1", - childSessionId: "child-s1", - requesterSessionId: "parent-1", - task: "Silent task", - announce: "silent", - }); - - await flushQueue(); - - const calls = spy.mock.calls.filter( - ([reqId]) => reqId === "parent-1", - ); - - // Immediate and silent should never be in the same announce call - const mixedCalls = calls.filter(([, records]) => { - const hasImm = records.some((r: { announce?: string }) => r.announce !== "silent"); - const hasSilent = records.some((r: { announce?: string }) => r.announce === "silent"); - return hasImm && hasSilent; - }); - expect(mixedCalls).toHaveLength(0); - - // Both should be announced (in separate calls) - expect(getSubagentRun("run-imm")?.announced).toBe(true); - expect(getSubagentRun("run-s1")?.announced).toBe(true); - - spy.mockRestore(); - }); -}); - -describe("subagent registry — post-announce cleanup", () => { - it("keeps runs in registry after successful announcement with archiveAtMs", async () => { - // Mock runCoalescedAnnounceFlow to succeed - const announceModule = await import("./announce.js"); - const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); - - // Register two runs for the same parent — both end immediately (no Hub) - registerSubagentRun({ - runId: "run-a", - childSessionId: "child-a", - requesterSessionId: "parent-1", - task: "Task A", - }); - registerSubagentRun({ - runId: "run-b", - childSessionId: "child-b", - requesterSessionId: "parent-1", - task: "Task B", - }); - - await flushQueue(); - - // Both runs should have been announced but kept in registry with archiveAtMs - expect(spy).toHaveBeenCalled(); - - const runA = getSubagentRun("run-a"); - const runB = getSubagentRun("run-b"); - expect(runA).toBeDefined(); - expect(runB).toBeDefined(); - expect(runA!.announced).toBe(true); - expect(runB!.announced).toBe(true); - expect(runA!.archiveAtMs).toBeGreaterThan(Date.now()); - expect(runB!.archiveAtMs).toBeGreaterThan(Date.now()); - - // Records are still queryable - expect(listSubagentRuns("parent-1")).toHaveLength(2); - - spy.mockRestore(); - }); -}); diff --git a/packages/core/src/agent/subagent/registry.ts b/packages/core/src/agent/subagent/registry.ts deleted file mode 100644 index ca26f5dc..00000000 --- a/packages/core/src/agent/subagent/registry.ts +++ /dev/null @@ -1,524 +0,0 @@ -/** - * Subagent registry — in-memory tracking + lifecycle management. - * - * Tracks all active subagent runs, persists state to disk, - * watches for child completion, and triggers announce flow. - */ - -import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; -import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js"; -import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js"; -import type { - RegisterSubagentRunParams, - SubagentRunRecord, - SubagentGroup, -} from "./types.js"; -import { resolveSessionDir } from "../session/storage.js"; -import { rmSync } from "node:fs"; -import { enqueueInLane, setLaneConcurrency } from "./command-queue.js"; -import { SubagentLane, DEFAULT_SUBAGENT_MAX_CONCURRENT, resolveSubagentTimeoutMs } from "./lanes.js"; - -/** Default archive retention: 60 minutes after completion */ -const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000; - -/** Archive sweep interval: 60 seconds */ -const SWEEP_INTERVAL_MS = 60 * 1000; - -// ============================================================================ -// Module-level state -// ============================================================================ - -const subagentRuns = new Map(); -const subagentGroups = new Map(); -let sweepTimer: ReturnType | undefined; -const resumedRequesters = new Set(); - -// ============================================================================ -// Public API -// ============================================================================ - -/** Initialize registry from persisted state. Call once at startup. */ -export function initSubagentRegistry(): void { - setLaneConcurrency(SubagentLane.Subagent, DEFAULT_SUBAGENT_MAX_CONCURRENT); - - const persisted = loadSubagentRuns(); - for (const [runId, record] of persisted) { - subagentRuns.set(runId, record); - - // Backward compat: old records with cleanupHandled but no announced field - if (record.cleanupHandled && record.announced === undefined) { - record.announced = true; - record.findingsCaptured = true; - } - } - - // Restore groups - const persistedGroups = loadSubagentGroups(); - for (const [groupId, group] of persistedGroups) { - subagentGroups.set(groupId, group); - } - - // Process incomplete runs - const affectedRequesters = new Set(); - - for (const record of subagentRuns.values()) { - if (record.announced && record.cleanupHandled) continue; // Already fully done - - if (!record.endedAt) { - // Child was running when process crashed — mark as ended/unknown - record.endedAt = Date.now(); - record.outcome = { status: "unknown" }; - } - - if (!record.findingsCaptured) { - captureFindings(record); - } - - // Recovery cleanup must be independent from findings capture: - // the process may crash after captureFindings() persisted but before deletion. - if (record.cleanup === "delete" && !record.cleanupHandled) { - deleteChildSession(record.childSessionId); - } - - affectedRequesters.add(record.requesterSessionId); - } - - persist(); - - // For each affected requester, check if coalesced announcement is needed - for (const requesterId of affectedRequesters) { - if (!resumedRequesters.has(requesterId)) { - resumedRequesters.add(requesterId); - checkAndAnnounce(requesterId); - } - } - - if (subagentRuns.size > 0) { - startSweeper(); - console.log(`[SubagentRegistry] Loaded ${subagentRuns.size} persisted run(s)`); - } -} - -// ============================================================================ -// Group management -// ============================================================================ - -/** Create a new subagent group. Returns the group record. */ -export function createSubagentGroup(params: { - groupId: string; - requesterSessionId: string; - label?: string; - next?: string; -}): SubagentGroup { - const group: SubagentGroup = { - groupId: params.groupId, - requesterSessionId: params.requesterSessionId, - label: params.label, - next: params.next, - createdAt: Date.now(), - }; - subagentGroups.set(params.groupId, group); - persist(); - return group; -} - -/** Get a group by ID. */ -export function getSubagentGroup(groupId: string): SubagentGroup | undefined { - return subagentGroups.get(groupId); -} - -/** List all runs belonging to a group. */ -export function listGroupRuns(groupId: string): SubagentRunRecord[] { - const result: SubagentRunRecord[] = []; - for (const record of subagentRuns.values()) { - if (record.groupId === groupId) { - result.push(record); - } - } - return result; -} - -/** Register a new subagent run and start tracking its lifecycle. */ -export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord { - const { - runId, - childSessionId, - requesterSessionId, - task, - label, - cleanup = "delete", - timeoutSeconds, - announce, - groupId, - start, - } = params; - - const record: SubagentRunRecord = { - runId, - childSessionId, - requesterSessionId, - task, - label, - cleanup, - announce, - groupId, - createdAt: Date.now(), - }; - - subagentRuns.set(runId, record); - persist(); - startSweeper(); - - // Enqueue in the subagent lane — the start callback and watchChildAgent - // only execute once a concurrency slot is available. - void enqueueInLane(SubagentLane.Subagent, async () => { - console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`); - start?.(); - console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`); - return watchChildAgent(record, timeoutSeconds); - }); - - return record; -} - -/** List all active runs for a given requester session. */ -export function listSubagentRuns(requesterSessionId: string): SubagentRunRecord[] { - const result: SubagentRunRecord[] = []; - for (const record of subagentRuns.values()) { - if (record.requesterSessionId === requesterSessionId) { - result.push(record); - } - } - return result; -} - -/** Remove a run from the registry. */ -export function releaseSubagentRun(runId: string): boolean { - const deleted = subagentRuns.delete(runId); - if (deleted) { - persist(); - if (subagentRuns.size === 0) { - stopSweeper(); - } - } - return deleted; -} - -/** Get a run by ID. */ -export function getSubagentRun(runId: string): SubagentRunRecord | undefined { - return subagentRuns.get(runId); -} - -/** Mark all active (non-ended) runs as ended with "unknown" status. Called during Hub shutdown. */ -export function shutdownSubagentRegistry(): void { - const now = Date.now(); - let updated = 0; - - for (const record of subagentRuns.values()) { - if (!record.endedAt) { - record.endedAt = now; - record.outcome = { status: "unknown" }; - updated++; - } - - // Opportunistically capture findings for ended-but-uncaptured runs - if (record.endedAt && !record.findingsCaptured) { - captureFindings(record); - updated++; - } - } - - if (updated > 0) { - persist(); - console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`); - } - - stopSweeper(); -} - -/** Reset all state (for testing). */ -export function resetSubagentRegistryForTests(): void { - subagentRuns.clear(); - subagentGroups.clear(); - resumedRequesters.clear(); - stopSweeper(); -} - -/** Seed a run record directly (for testing). Bypasses persistence and side effects. */ -export function seedSubagentRunForTests(record: SubagentRunRecord): void { - subagentRuns.set(record.runId, record); -} - -// ============================================================================ -// Lifecycle watching -// ============================================================================ - -/** - * Watch a child agent for completion. - * Returns a promise that resolves when the child finishes (or errors/times out), - * keeping the command-queue lane slot occupied until then. - */ -function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Promise { - const { childSessionId } = record; - - // Mark as started - record.startedAt = Date.now(); - persist(); - - const timeoutMs = resolveSubagentTimeoutMs(timeoutSeconds); - - return new Promise((resolveSlot) => { - const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => { - if (record.endedAt) return; // Already finalized - if (timeoutTimer) clearTimeout(timeoutTimer); - record.endedAt = Date.now(); - record.outcome = outcome; - persist(); - handleRunCompletion(record); - resolveSlot(); // Release the queue slot - }; - - // Always set a timeout (default 30 min, 0 = ~24 days via resolveSubagentTimeoutMs) - const timeoutTimer = setTimeout(() => { - cleanup({ status: "timeout" }); - - // Try to close the child agent - try { - const hub = getHub(); - hub.closeAgent(childSessionId); - } catch { - // Hub may not be available - } - }, timeoutMs); - - // Get child agent reference (Hub may not be available in tests) - if (!isHubInitialized()) { - cleanup({ status: "error", error: "Hub not initialized" }); - return; - } - - const hub = getHub(); - const childAgent = hub.getAgent(childSessionId); - if (!childAgent) { - cleanup({ status: "error", error: "Child agent not found" }); - return; - } - - // Wait for the child agent's task queue to drain (task completion), - // then trigger announce flow. Uses waitForIdle() instead of consuming - // the stream (which would conflict with Hub.consumeAgent). - console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`); - childAgent.waitForIdle().then( - () => { - const runtime = Date.now() - (record.startedAt ?? 0); - const runError = childAgent.lastRunError; - if (runError) { - console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`); - cleanup({ status: "error", error: runError }); - } else { - console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`); - cleanup({ status: "ok" }); - } - }, - (err) => { - console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err); - cleanup({ - status: "error", - error: err instanceof Error ? err.message : String(err), - }); - }, - ); - - // Also handle explicit close (e.g., timeout kill, Hub shutdown) - childAgent.onClose(() => { - cleanup({ status: record.outcome?.status ?? "unknown" }); - }); - }); -} - -// ============================================================================ -// Cleanup + Announce (two-phase: capture findings, then coalesced announce) -// ============================================================================ - -/** Phase 1: Capture child's findings before session deletion. */ -function captureFindings(record: SubagentRunRecord): void { - try { - const findings = readLatestAssistantReply(record.childSessionId); - record.findings = findings ?? undefined; - } catch { - record.findings = "(failed to read findings)"; - } - record.findingsCaptured = true; - persist(); -} - -/** - * Phase 2: Announce completed-but-unannounced runs. - * - * Three announcement paths: - * 1. Grouped runs — wait for all runs in the group to complete, then announce - * together with the group's `next` continuation prompt (if any). - * 2. Ungrouped silent runs — legacy behavior: wait for ALL silent runs from - * the same requester to complete, then announce together. - * 3. Ungrouped immediate runs — announce per-completion (default). - */ -function checkAndAnnounce(requesterSessionId: string): void { - const allRuns = listSubagentRuns(requesterSessionId); - - // ── 1. Grouped runs: announce by group when all members complete ── - const groupIds = new Set(); - for (const r of allRuns) { - if (r.groupId && !r.announced) groupIds.add(r.groupId); - } - - for (const groupId of groupIds) { - const groupRuns = allRuns.filter(r => r.groupId === groupId); - const unannounced = groupRuns.filter(r => !r.announced); - const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured); - - if (ready.length > 0 && ready.length === unannounced.length) { - const group = subagentGroups.get(groupId); - announceRuns(requesterSessionId, ready, group?.next); - } - } - - // ── 2. Ungrouped runs: original immediate/silent logic ── - const ungrouped = allRuns.filter(r => !r.groupId); - - // Immediate: announce per-completion - const immediateReady = ungrouped.filter( - r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent", - ); - if (immediateReady.length > 0) { - announceRuns(requesterSessionId, immediateReady); - } - - // Silent: announce only when ALL ungrouped silent runs are done - const silentRuns = ungrouped.filter(r => r.announce === "silent"); - const unannouncedSilent = silentRuns.filter(r => !r.announced); - const silentReady = unannouncedSilent.filter( - r => r.endedAt !== undefined && r.findingsCaptured, - ); - - if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) { - announceRuns(requesterSessionId, silentReady); - } -} - -/** Announce a batch of completed runs and mark them as announced. */ -function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void { - const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next); - - if (announced) { - for (const r of runs) { - r.announced = true; - r.cleanupHandled = true; - // Keep records for querying via sessions_list; let sweeper archive later - r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; - } - persist(); - } else { - // Allow retry — mark cleanupHandled false so initSubagentRegistry() retries - for (const r of runs) { - r.cleanupHandled = false; - } - persist(); - console.warn( - `[SubagentRegistry] Announce failed for requester ${requesterSessionId}`, - ); - } -} - -/** Entry point: called when a child completes. */ -function handleRunCompletion(record: SubagentRunRecord): void { - // Phase 1: capture findings (before session deletion) - if (!record.findingsCaptured) { - captureFindings(record); - - // Session cleanup (safe now that findings are persisted) - if (record.cleanup === "delete") { - deleteChildSession(record.childSessionId); - } - } - - // Phase 2: coalesced announce check - checkAndAnnounce(record.requesterSessionId); -} - -function deleteChildSession(sessionId: string): void { - try { - const sessionDir = resolveSessionDir(sessionId); - rmSync(sessionDir, { recursive: true, force: true }); - console.log(`[SubagentRegistry] Deleted child session: ${sessionId}`); - } catch (err) { - console.warn(`[SubagentRegistry] Failed to delete child session ${sessionId}:`, err); - } - - // Also close the agent in Hub - try { - const hub = getHub(); - hub.closeAgent(sessionId); - } catch { - // Hub may not be available - } -} - -// ============================================================================ -// Archive sweeper -// ============================================================================ - -function startSweeper(): void { - if (sweepTimer) return; - sweepTimer = setInterval(sweep, SWEEP_INTERVAL_MS); - // Don't prevent process exit - if (sweepTimer.unref) sweepTimer.unref(); -} - -function stopSweeper(): void { - if (sweepTimer) { - clearInterval(sweepTimer); - sweepTimer = undefined; - } -} - -function sweep(): void { - const now = Date.now(); - let removed = 0; - - for (const [runId, record] of subagentRuns) { - if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) { - subagentRuns.delete(runId); - removed++; - } - } - - // Clean up groups whose runs have all been archived - for (const [groupId] of subagentGroups) { - const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId); - if (!hasActiveRuns) { - subagentGroups.delete(groupId); - removed++; - } - } - - if (removed > 0) { - persist(); - console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`); - } - - if (subagentRuns.size === 0) { - stopSweeper(); - } -} - -// ============================================================================ -// Persistence helper -// ============================================================================ - -function persist(): void { - try { - saveSubagentRuns(subagentRuns, subagentGroups); - } catch (err) { - console.error(`[SubagentRegistry] Failed to persist runs:`, err); - } -} diff --git a/packages/core/src/agent/subagent/types.ts b/packages/core/src/agent/subagent/types.ts deleted file mode 100644 index 50b07ba6..00000000 --- a/packages/core/src/agent/subagent/types.ts +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Subagent orchestration types. - * - * Models the lifecycle of spawned child agents: - * created → started → ended → cleanup - */ - -/** Final outcome of a subagent run */ -export type SubagentRunOutcome = { - status: "ok" | "error" | "timeout" | "unknown"; - error?: string | undefined; -}; - -/** - * A logical group of subagent runs that are tracked together. - * Groups enable "collect all, then act" workflows: - * all runs in a group must complete before the combined results - * (plus an optional `next` continuation) are announced to the parent. - */ -export type SubagentGroup = { - /** Unique group identifier (UUIDv7) */ - groupId: string; - /** Session ID of the parent (requester) agent */ - requesterSessionId: string; - /** Optional human-readable label for the group */ - label?: string | undefined; - /** Continuation prompt executed after all runs in the group complete. - * Injected into the announcement so the parent agent acts on the combined findings. */ - next?: string | undefined; - /** Timestamp when the group was created */ - createdAt: number; -}; - -/** Persistent record tracking a single subagent run */ -export type SubagentRunRecord = { - /** Unique run identifier (UUIDv7) */ - runId: string; - /** Session ID of the child agent */ - childSessionId: string; - /** Session ID of the parent (requester) agent */ - requesterSessionId: string; - /** The task description / prompt given to the child */ - task: string; - /** Optional human-readable label */ - label?: string | undefined; - /** Session cleanup strategy after completion */ - cleanup: "delete" | "keep"; - /** Timestamp when the run was created */ - createdAt: number; - /** Timestamp when the child agent started execution */ - startedAt?: number | undefined; - /** Timestamp when the child agent finished */ - endedAt?: number | undefined; - /** Final status of the run */ - outcome?: SubagentRunOutcome | undefined; - /** Scheduled auto-archive time (ms since epoch) */ - archiveAtMs?: number | undefined; - /** Whether the cleanup/announce flow has been initiated */ - cleanupHandled?: boolean | undefined; - /** Timestamp when cleanup completed */ - cleanupCompletedAt?: number | undefined; - /** Captured findings from the child session's last assistant reply */ - findings?: string | undefined; - /** Whether findings have been captured (safe to delete session after this) */ - findingsCaptured?: boolean | undefined; - /** Whether the coalesced announcement has been sent to parent */ - announced?: boolean | undefined; - /** Announcement mode: "immediate" (default) announces per-completion, - * "silent" defers until all silent runs from the same requester complete. */ - announce?: "immediate" | "silent" | undefined; - /** Group ID this run belongs to (if any). Runs in a group are announced - * together when all complete, regardless of the `announce` field. */ - groupId?: string | undefined; -}; - -/** Parameters for registering a new subagent run */ -export type RegisterSubagentRunParams = { - runId: string; - childSessionId: string; - requesterSessionId: string; - task: string; - label?: string | undefined; - cleanup?: "delete" | "keep" | undefined; - timeoutSeconds?: number | undefined; - /** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */ - start?: (() => void) | undefined; - /** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */ - announce?: "immediate" | "silent" | undefined; - /** Group ID to join. Runs in a group are announced together when all complete. */ - groupId?: string | undefined; - /** Continuation prompt for the group. Only used on group creation (first spawn). - * After all runs in the group complete, this prompt is included in the announcement - * so the parent agent can act on the combined findings (e.g. summarize, write PDF). */ - next?: string | undefined; -}; - -/** Parameters for the announce flow */ -export type SubagentAnnounceParams = { - runId: string; - childSessionId: string; - requesterSessionId: string; - task: string; - label?: string | undefined; - cleanup: "delete" | "keep"; - outcome?: SubagentRunOutcome | undefined; - startedAt?: number | undefined; - endedAt?: number | undefined; -}; - -/** Parameters for building the subagent system prompt */ -export type SubagentSystemPromptParams = { - requesterSessionId: string; - childSessionId: string; - label?: string | undefined; - task: string; - /** Tool names available to the subagent (for tooling summary in system prompt) */ - tools?: string[] | undefined; -}; diff --git a/packages/core/src/hub/hub-singleton.ts b/packages/core/src/hub/hub-singleton.ts index 5d04d0f7..d28e6c33 100644 --- a/packages/core/src/hub/hub-singleton.ts +++ b/packages/core/src/hub/hub-singleton.ts @@ -1,8 +1,8 @@ /** * Global Hub singleton for cross-module access. * - * Used by subagent tools and announce flow to interact with the Hub - * without threading references through the entire call chain. + * Used by modules like cron execution without threading Hub references + * through the entire call chain. */ import type { Hub } from "./hub.js"; diff --git a/packages/core/src/hub/hub.ts b/packages/core/src/hub/hub.ts index b852905d..2032e72a 100644 --- a/packages/core/src/hub/hub.ts +++ b/packages/core/src/hub/hub.ts @@ -17,7 +17,6 @@ import { AsyncAgent } from "../agent/async-agent.js"; import type { AgentOptions } from "../agent/types.js"; import { getHubId } from "./hub-identity.js"; import { setHub } from "./hub-singleton.js"; -import { initSubagentRegistry, shutdownSubagentRegistry } from "../agent/subagent/index.js"; import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; @@ -144,12 +143,9 @@ export class Hub { }); this.rpc.register("resolveExecApproval", createResolveExecApprovalHandler(this.approvalManager)); - // Register as global singleton for cross-module access (subagent tools, announce flow) + // Register as global singleton for cross-module access. setHub(this); - // Restore subagent registry from persistent state - initSubagentRegistry(); - // Initialize and start cron service this.initCronService(); this.initHeartbeatService(); @@ -800,9 +796,6 @@ export class Hub { this.heartbeatUnsubscribe = null; this.heartbeatListeners.clear(); - // Finalize subagent registry before closing agents - shutdownSubagentRegistry(); - for (const [id, agent] of this.agents) { agent.close(); this.agents.delete(id); From db0f8b3f7b14670fad2e77f8c4e5891aa8f6de5c Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 17 Feb 2026 00:07:23 +0800 Subject: [PATCH 2/2] refactor(desktop): drop legacy subagent dashboard wiring --- apps/desktop/src/main/electron-env.d.ts | 21 +-- apps/desktop/src/main/ipc/index.ts | 3 - apps/desktop/src/main/ipc/subagents.ts | 63 --------- apps/desktop/src/preload/index.ts | 30 +---- .../renderer/src/components/local-chat.tsx | 21 --- .../src/components/subagent-dashboard.tsx | 122 ------------------ .../src/components/subagent-status-bar.tsx | 77 ----------- .../src/hooks/use-subagent-polling.ts | 33 ----- .../src/renderer/src/stores/subagents.ts | 29 ----- 9 files changed, 7 insertions(+), 392 deletions(-) delete mode 100644 apps/desktop/src/main/ipc/subagents.ts delete mode 100644 apps/desktop/src/renderer/src/components/subagent-dashboard.tsx delete mode 100644 apps/desktop/src/renderer/src/components/subagent-status-bar.tsx delete mode 100644 apps/desktop/src/renderer/src/hooks/use-subagent-polling.ts delete mode 100644 apps/desktop/src/renderer/src/stores/subagents.ts diff --git a/apps/desktop/src/main/electron-env.d.ts b/apps/desktop/src/main/electron-env.d.ts index c4d13f11..70720780 100644 --- a/apps/desktop/src/main/electron-env.d.ts +++ b/apps/desktop/src/main/electron-env.d.ts @@ -162,21 +162,7 @@ interface InboundMessageEvent { timestamp: number } -interface SubagentRunInfo { - runId: string - label: string | undefined - task: string - status: 'queued' | 'running' | 'ok' | 'error' | 'timeout' | 'unknown' - groupId: string | undefined - groupLabel: string | undefined - startedAt: number | undefined - endedAt: number | undefined - createdAt: number - findings: string | undefined - error: string | undefined -} - -interface ElectronAPI { +interface ElectronAPI { app: { getFlags: () => Promise<{ forceOnboarding: boolean }> } @@ -251,10 +237,7 @@ interface ElectronAPI { stop: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> start: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> } - subagents: { - list: (requesterSessionId: string) => Promise - } - cron: { + cron: { list: () => Promise toggle: (jobId: string) => Promise<{ ok: boolean }> remove: (jobId: string) => Promise<{ ok: boolean }> diff --git a/apps/desktop/src/main/ipc/index.ts b/apps/desktop/src/main/ipc/index.ts index eed30e01..42abe57a 100644 --- a/apps/desktop/src/main/ipc/index.ts +++ b/apps/desktop/src/main/ipc/index.ts @@ -11,7 +11,6 @@ export { registerCronIpcHandlers } from './cron.js' export { registerHeartbeatIpcHandlers } from './heartbeat.js' export { registerAppStateIpcHandlers } from './app-state.js' export { registerAuthHandlers, setMainWindow as setAuthMainWindow, handleAuthDeepLink } from './auth.js' -export { registerSubagentsIpcHandlers } from './subagents.js' import { registerAgentIpcHandlers, cleanupAgent } from './agent.js' import { registerAuthHandlers } from './auth.js' @@ -23,7 +22,6 @@ import { registerChannelsIpcHandlers } from './channels.js' import { registerCronIpcHandlers } from './cron.js' import { registerHeartbeatIpcHandlers } from './heartbeat.js' import { registerAppStateIpcHandlers } from './app-state.js' -import { registerSubagentsIpcHandlers } from './subagents.js' /** * Register all IPC handlers. @@ -40,7 +38,6 @@ export function registerAllIpcHandlers(): void { registerHeartbeatIpcHandlers() registerAppStateIpcHandlers() registerAuthHandlers() - registerSubagentsIpcHandlers() } /** diff --git a/apps/desktop/src/main/ipc/subagents.ts b/apps/desktop/src/main/ipc/subagents.ts deleted file mode 100644 index ea901a91..00000000 --- a/apps/desktop/src/main/ipc/subagents.ts +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Subagent IPC handlers for Electron main process. - * - * Exposes subagent registry data to the renderer process - * for the Subagent Dashboard UI. - */ -import { ipcMain } from 'electron' -import { listSubagentRuns, getSubagentGroup } from '@multica/core' -import type { SubagentRunRecord } from '@multica/core' - -/** Serializable DTO for renderer consumption */ -export interface SubagentRunInfo { - runId: string - label: string | undefined - task: string - status: 'queued' | 'running' | 'ok' | 'error' | 'timeout' | 'unknown' - groupId: string | undefined - groupLabel: string | undefined - startedAt: number | undefined - endedAt: number | undefined - createdAt: number - findings: string | undefined - error: string | undefined -} - -function deriveStatus(record: SubagentRunRecord): SubagentRunInfo['status'] { - if (!record.startedAt) return 'queued' - if (!record.endedAt) return 'running' - return record.outcome?.status ?? 'unknown' -} - -function toDTO(record: SubagentRunRecord): SubagentRunInfo { - const group = record.groupId ? getSubagentGroup(record.groupId) : undefined - return { - runId: record.runId, - label: record.label, - task: record.task, - status: deriveStatus(record), - groupId: record.groupId, - groupLabel: group?.label, - startedAt: record.startedAt, - endedAt: record.endedAt, - createdAt: record.createdAt, - findings: record.findings ? record.findings.slice(0, 500) : undefined, - error: record.outcome?.error, - } -} - -/** Hide completed runs after 5 minutes */ -const COMPLETED_RETENTION_MS = 5 * 60 * 1000 - -/** - * Register all Subagent-related IPC handlers. - */ -export function registerSubagentsIpcHandlers(): void { - ipcMain.handle('subagents:list', async (_event, requesterSessionId: string) => { - const now = Date.now() - const runs = listSubagentRuns(requesterSessionId) - return runs - .filter((r) => !r.endedAt || now - r.endedAt < COMPLETED_RETENTION_MS) - .map(toDTO) - }) -} diff --git a/apps/desktop/src/preload/index.ts b/apps/desktop/src/preload/index.ts index e317dc33..3e282bee 100644 --- a/apps/desktop/src/preload/index.ts +++ b/apps/desktop/src/preload/index.ts @@ -105,23 +105,9 @@ export interface LocalChatApproval { expiresAtMs: number } -export interface SubagentRunInfo { - runId: string - label: string | undefined - task: string - status: 'queued' | 'running' | 'ok' | 'error' | 'timeout' | 'unknown' - groupId: string | undefined - groupLabel: string | undefined - startedAt: number | undefined - endedAt: number | undefined - createdAt: number - findings: string | undefined - error: string | undefined -} - -// ============================================================================ -// Expose typed API to Renderer process -// ============================================================================ +// ============================================================================ +// Expose typed API to Renderer process +// ============================================================================ const electronAPI = { // App-level @@ -291,14 +277,8 @@ const electronAPI = { ipcRenderer.invoke('channels:start', channelId, accountId), }, - // Subagent dashboard - subagents: { - list: (requesterSessionId: string): Promise => - ipcRenderer.invoke('subagents:list', requesterSessionId), - }, - - // Cron jobs management - cron: { + // Cron jobs management + cron: { list: () => ipcRenderer.invoke('cron:list'), toggle: (jobId: string) => ipcRenderer.invoke('cron:toggle', jobId), remove: (jobId: string) => ipcRenderer.invoke('cron:remove', jobId), diff --git a/apps/desktop/src/renderer/src/components/local-chat.tsx b/apps/desktop/src/renderer/src/components/local-chat.tsx index 0f842a05..7cbded04 100644 --- a/apps/desktop/src/renderer/src/components/local-chat.tsx +++ b/apps/desktop/src/renderer/src/components/local-chat.tsx @@ -3,13 +3,9 @@ import { useNavigate } from 'react-router-dom' import { Loading } from '@multica/ui/components/ui/loading' import { ChatView } from '@multica/ui/components/chat-view' import { useLocalChat } from '../hooks/use-local-chat' -import { useSubagentPolling } from '../hooks/use-subagent-polling' -import { useSubagentsStore } from '../stores/subagents' import { useProviderStore } from '../stores/provider' import { ApiKeyDialog } from './api-key-dialog' import { OAuthDialog } from './oauth-dialog' -import { SubagentStatusBar } from './subagent-status-bar' -import { SubagentDashboard } from './subagent-dashboard' interface LocalChatProps { initialPrompt?: string @@ -37,11 +33,6 @@ export function LocalChat({ initialPrompt }: LocalChatProps) { const { providers, current, setProvider: switchProvider, refresh: refreshProviders } = useProviderStore() - // Subagent polling + dashboard - useSubagentPolling(agentId) - const subagentRuns = useSubagentsStore((s) => s.runs) - const [dashboardOpen, setDashboardOpen] = useState(false) - // Provider config dialog state const [apiKeyDialogOpen, setApiKeyDialogOpen] = useState(false) const [oauthDialogOpen, setOauthDialogOpen] = useState(false) @@ -126,12 +117,6 @@ export function LocalChat({ initialPrompt }: LocalChatProps) { loadMore={loadMore} resolveApproval={resolveApproval} errorAction={errorAction} - bottomSlot={ - setDashboardOpen(true)} - /> - } /> {currentMeta && currentMeta.authMethod === 'api-key' && ( @@ -154,12 +139,6 @@ export function LocalChat({ initialPrompt }: LocalChatProps) { onSuccess={handleProviderConfigSuccess} /> )} - - ) } diff --git a/apps/desktop/src/renderer/src/components/subagent-dashboard.tsx b/apps/desktop/src/renderer/src/components/subagent-dashboard.tsx deleted file mode 100644 index bac29773..00000000 --- a/apps/desktop/src/renderer/src/components/subagent-dashboard.tsx +++ /dev/null @@ -1,122 +0,0 @@ -import { useState, useEffect } from 'react' -import { - Sheet, - SheetContent, - SheetHeader, - SheetTitle, - SheetDescription, -} from '@multica/ui/components/ui/sheet' -import { Badge } from '@multica/ui/components/ui/badge' - -interface SubagentDashboardProps { - open: boolean - onOpenChange: (open: boolean) => void - runs: SubagentRunInfo[] -} - -const STATUS_CONFIG: Record = { - running: { label: 'Running', variant: 'default' }, - queued: { label: 'Queued', variant: 'secondary' }, - ok: { label: 'Completed', variant: 'outline' }, - error: { label: 'Error', variant: 'destructive' }, - timeout: { label: 'Timeout', variant: 'destructive' }, - unknown: { label: 'Unknown', variant: 'secondary' }, -} - -function formatElapsed(startMs: number, endMs?: number): string { - const elapsed = (endMs ?? Date.now()) - startMs - const seconds = Math.floor(elapsed / 1000) - if (seconds < 60) return `${seconds}s` - const minutes = Math.floor(seconds / 60) - const remainSec = seconds % 60 - if (minutes < 60) return `${minutes}m ${remainSec}s` - const hours = Math.floor(minutes / 60) - const remainMin = minutes % 60 - return `${hours}h ${remainMin}m` -} - -function RunCard({ run }: { run: SubagentRunInfo }) { - const config = STATUS_CONFIG[run.status] - const isActive = run.status === 'running' || run.status === 'queued' - const [, setTick] = useState(0) - - // Tick every 1s for running agents to update elapsed time - useEffect(() => { - if (!isActive) return - const timer = setInterval(() => setTick((t) => t + 1), 1000) - return () => clearInterval(timer) - }, [isActive]) - - return ( -
-
-
-

- {run.label || run.task.slice(0, 80)} -

- {run.label && ( -

- {run.task.slice(0, 120)} -

- )} -
- - {config.label} - -
- -
- {run.startedAt && ( - {formatElapsed(run.startedAt, run.endedAt)} - )} - {run.groupLabel && ( - - {run.groupLabel} - - )} -
- - {run.error && ( -

- {run.error} -

- )} - - {run.findings && !run.error && ( -

- {run.findings.slice(0, 200)} -

- )} -
- ) -} - -export function SubagentDashboard({ open, onOpenChange, runs }: SubagentDashboardProps) { - // Sort: active first (running, queued), then by createdAt desc - const sorted = [...runs].sort((a, b) => { - const aActive = a.status === 'running' || a.status === 'queued' ? 0 : 1 - const bActive = b.status === 'running' || b.status === 'queued' ? 0 : 1 - if (aActive !== bActive) return aActive - bActive - return b.createdAt - a.createdAt - }) - - return ( - - - - Subagents ({runs.length}) - Child agents spawned by the current session - -
- {sorted.length === 0 ? ( -

- No subagents yet -

- ) : ( - sorted.map((run) => ) - )} -
-
-
- ) -} diff --git a/apps/desktop/src/renderer/src/components/subagent-status-bar.tsx b/apps/desktop/src/renderer/src/components/subagent-status-bar.tsx deleted file mode 100644 index 60e4125f..00000000 --- a/apps/desktop/src/renderer/src/components/subagent-status-bar.tsx +++ /dev/null @@ -1,77 +0,0 @@ -import { useState, useEffect, useRef } from 'react' - -/** Auto-dismiss delay after all runs complete (ms) */ -const DISMISS_DELAY_MS = 30_000 - -interface SubagentStatusBarProps { - runs: SubagentRunInfo[] - onViewClick: () => void -} - -export function SubagentStatusBar({ runs, onViewClick }: SubagentStatusBarProps) { - const [dismissed, setDismissed] = useState(false) - const prevHadActiveRef = useRef(false) - - const running = runs.filter((r) => r.status === 'running' || r.status === 'queued').length - const completed = runs.filter((r) => r.status !== 'running' && r.status !== 'queued').length - const hasActive = running > 0 - - // Auto-dismiss after all runs complete - useEffect(() => { - if (hasActive) { - // Reset dismissed state when new active runs appear - prevHadActiveRef.current = true - setDismissed(false) - return - } - - // Only auto-dismiss if we previously had active runs (transition to all-complete) - if (!prevHadActiveRef.current || runs.length === 0) return - - const timer = setTimeout(() => setDismissed(true), DISMISS_DELAY_MS) - return () => clearTimeout(timer) - }, [hasActive, runs.length]) - - if (runs.length === 0 || dismissed) return null - - let statusText: string - if (running > 0 && completed > 0) { - statusText = `${running} running, ${completed} completed` - } else if (running > 0) { - statusText = `${running} subagent${running > 1 ? 's' : ''} running` - } else { - statusText = `${completed} completed` - } - - return ( -
-
-
- {running > 0 && ( - - - - - )} - {statusText} -
-
- - {!hasActive && ( - - )} -
-
-
- ) -} diff --git a/apps/desktop/src/renderer/src/hooks/use-subagent-polling.ts b/apps/desktop/src/renderer/src/hooks/use-subagent-polling.ts deleted file mode 100644 index 78f73d77..00000000 --- a/apps/desktop/src/renderer/src/hooks/use-subagent-polling.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { useEffect, useRef } from 'react' -import { useSubagentsStore, selectHasActiveRuns } from '../stores/subagents' - -const ACTIVE_INTERVAL_MS = 2_000 -const IDLE_INTERVAL_MS = 10_000 - -/** - * Polls for subagent runs at an adaptive interval. - * 2s when there are active (running/queued) runs, 10s otherwise. - */ -export function useSubagentPolling(agentId: string | null): void { - const fetch = useSubagentsStore((s) => s.fetch) - const runs = useSubagentsStore((s) => s.runs) - const hasActive = selectHasActiveRuns(runs) - const intervalRef = useRef | null>(null) - - useEffect(() => { - if (!agentId) return - - // Fetch immediately - fetch(agentId) - - const ms = hasActive ? ACTIVE_INTERVAL_MS : IDLE_INTERVAL_MS - intervalRef.current = setInterval(() => fetch(agentId), ms) - - return () => { - if (intervalRef.current) { - clearInterval(intervalRef.current) - intervalRef.current = null - } - } - }, [agentId, hasActive, fetch]) -} diff --git a/apps/desktop/src/renderer/src/stores/subagents.ts b/apps/desktop/src/renderer/src/stores/subagents.ts deleted file mode 100644 index 249ed321..00000000 --- a/apps/desktop/src/renderer/src/stores/subagents.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { create } from 'zustand' - -interface SubagentsStore { - runs: SubagentRunInfo[] - fetch: (requesterSessionId: string) => Promise -} - -export const useSubagentsStore = create()((set) => ({ - runs: [], - - fetch: async (requesterSessionId: string) => { - try { - const result = await window.electronAPI.subagents.list(requesterSessionId) - if (Array.isArray(result)) { - set({ runs: result }) - } - } catch (err) { - console.error('[SubagentsStore] Failed to fetch:', err) - } - }, -})) - -export function selectRunningCount(runs: SubagentRunInfo[]): number { - return runs.filter((r) => r.status === 'running' || r.status === 'queued').length -} - -export function selectHasActiveRuns(runs: SubagentRunInfo[]): boolean { - return runs.some((r) => r.status === 'running' || r.status === 'queued') -}