From 85f71e0084f7e6701377c50db00028f158811b12 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 16:48:49 +0800 Subject: [PATCH 1/7] feat(agent): add subagent orchestration core Implement subagent registry, persistence store, and announcement flow for child agent lifecycle management. Includes types, registry with auto-archive sweeper, session JSONL reading for result extraction, and formatted announcement message delivery. Co-Authored-By: Claude Opus 4.5 --- src/agent/subagent/announce.test.ts | 127 +++++++++ src/agent/subagent/announce.ts | 226 +++++++++++++++ src/agent/subagent/index.ts | 37 +++ src/agent/subagent/registry-store.test.ts | 81 ++++++ src/agent/subagent/registry-store.ts | 61 ++++ src/agent/subagent/registry.ts | 321 ++++++++++++++++++++++ src/agent/subagent/types.ts | 74 +++++ 7 files changed, 927 insertions(+) create mode 100644 src/agent/subagent/announce.test.ts create mode 100644 src/agent/subagent/announce.ts create mode 100644 src/agent/subagent/index.ts create mode 100644 src/agent/subagent/registry-store.test.ts create mode 100644 src/agent/subagent/registry-store.ts create mode 100644 src/agent/subagent/registry.ts create mode 100644 src/agent/subagent/types.ts diff --git a/src/agent/subagent/announce.test.ts b/src/agent/subagent/announce.test.ts new file mode 100644 index 00000000..8f1a7140 --- /dev/null +++ b/src/agent/subagent/announce.test.ts @@ -0,0 +1,127 @@ +import { describe, it, expect } from "vitest"; +import { buildSubagentSystemPrompt, formatAnnouncementMessage } from "./announce.js"; +import type { FormatAnnouncementParams } from "./announce.js"; + +describe("buildSubagentSystemPrompt", () => { + it("includes task and session context", () => { + const prompt = buildSubagentSystemPrompt({ + requesterSessionId: "parent-123", + childSessionId: "child-456", + task: "Analyze the auth module for security issues", + }); + + expect(prompt).toContain("You are a subagent spawned to complete a specific task"); + expect(prompt).toContain("Analyze the auth module for security issues"); + expect(prompt).toContain("parent-123"); + expect(prompt).toContain("child-456"); + expect(prompt).toContain("Do NOT spawn nested subagents"); + }); + + it("includes label when provided", () => { + const prompt = buildSubagentSystemPrompt({ + requesterSessionId: "parent-123", + childSessionId: "child-456", + label: "Security Audit", + task: "Check for vulnerabilities", + }); + + expect(prompt).toContain('Label: "Security Audit"'); + }); + + it("omits label line when not provided", () => { + const prompt = buildSubagentSystemPrompt({ + requesterSessionId: "parent-123", + childSessionId: "child-456", + task: "Do something", + }); + + expect(prompt).not.toContain("Label:"); + }); +}); + +describe("formatAnnouncementMessage", () => { + const baseParams: FormatAnnouncementParams = { + runId: "run-1", + childSessionId: "child-456", + requesterSessionId: "parent-123", + task: "Analyze code", + label: "Code Analysis", + cleanup: "delete", + outcome: { status: "ok" }, + startedAt: 1000000, + endedAt: 1030000, + }; + + it("formats successful completion", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + findings: "Found 3 issues in the auth module.", + }); + + expect(msg).toContain('"Code Analysis" just completed successfully'); + expect(msg).toContain("Found 3 issues in the auth module."); + expect(msg).toContain("runtime 30s"); + expect(msg).toContain("session child-456"); + }); + + it("formats error outcome", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + outcome: { status: "error", error: "API key expired" }, + }); + + expect(msg).toContain("failed: API key expired"); + }); + + it("formats timeout outcome", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + outcome: { status: "timeout" }, + }); + + expect(msg).toContain("timed out"); + }); + + it("shows (no output) when findings is not provided", () => { + const msg = formatAnnouncementMessage(baseParams); + + expect(msg).toContain("(no output)"); + }); + + it("uses task text when label is not provided", () => { + const paramsNoLabel: FormatAnnouncementParams = { + ...baseParams, + label: undefined, + }; + const msg = formatAnnouncementMessage(paramsNoLabel); + + expect(msg).toContain('"Analyze code"'); + }); + + it("formats runtime for minutes", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + startedAt: 1000000, + endedAt: 1150000, // 150 seconds = 2m30s + }); + + expect(msg).toContain("runtime 2m30s"); + }); + + it("formats runtime for hours", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + startedAt: 1000000, + endedAt: 4600000, // 3600 seconds = 1h + }); + + expect(msg).toContain("runtime 1h"); + }); + + it("includes summarization instruction", () => { + const msg = formatAnnouncementMessage(baseParams); + + expect(msg).toContain("Summarize this naturally for the user"); + expect(msg).toContain("NO_REPLY"); + }); +}); diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts new file mode 100644 index 00000000..e5d06c51 --- /dev/null +++ b/src/agent/subagent/announce.ts @@ -0,0 +1,226 @@ +/** + * Subagent announcement flow. + * + * Handles result propagation from child → parent agent: + * - Builds system prompts for child agents + * - Reads child session output + * - Formats and delivers announcement messages + */ + +import { readEntries } from "../session/storage.js"; +import { getHub } from "../../hub/hub-singleton.js"; +import type { + SubagentAnnounceParams, + SubagentRunOutcome, + SubagentSystemPromptParams, +} from "./types.js"; + +/** + * Build the system prompt injected into a subagent session. + */ +export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): string { + const { requesterSessionId, childSessionId, label, task } = params; + + const lines: string[] = [ + "You are a subagent spawned to complete a specific task.", + "", + "## Rules", + "- Stay focused on the assigned task below.", + "- Complete the task thoroughly and report your findings.", + "- Do NOT initiate side actions unrelated to the task.", + "- Do NOT attempt to communicate with the user directly.", + "- Do NOT spawn nested subagents.", + "- Your session is ephemeral and will be cleaned up after completion.", + "", + "## Context", + `Requester session: ${requesterSessionId}`, + `Child session: ${childSessionId}`, + ]; + + if (label) { + lines.push(`Label: "${label}"`); + } + + lines.push("", "## Task", task); + + return lines.join("\n"); +} + +/** + * Read the latest assistant reply from a session's JSONL file. + */ +export function readLatestAssistantReply(sessionId: string): string | undefined { + const entries = readEntries(sessionId); + + // Walk backwards to find last assistant message + for (let i = entries.length - 1; i >= 0; i--) { + const entry = entries[i]!; + if (entry.type !== "message") continue; + + const message = entry.message; + if (message.role !== "assistant") continue; + + return extractAssistantText(message); + } + + return undefined; +} + +/** + * Extract text content from an assistant message. + * AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[]. + */ +function extractAssistantText(message: { role: string; content: unknown }): string { + const content = message.content; + if (typeof content === "string") { + return sanitizeText(content); + } + + if (!Array.isArray(content)) return ""; + + const textParts: string[] = []; + for (const block of content) { + if (block && typeof block === "object" && "type" in block && block.type === "text" && "text" in block) { + textParts.push(String(block.text)); + } + } + + return sanitizeText(textParts.join("\n")); +} + +/** + * Strip thinking tags and tool markers from text. + */ +function sanitizeText(text: string): string { + return text + .replace(/[\s\S]*?<\/thinking>/g, "") + .replace(/[\s\S]*?<\/tool_call>/g, "") + .trim(); +} + +/** + * Format the duration between two timestamps as a human-readable string. + */ +function formatDuration(startMs: number, endMs: number): string { + const totalSeconds = Math.round((endMs - startMs) / 1000); + if (totalSeconds < 60) return `${totalSeconds}s`; + + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`; + + const hours = Math.floor(minutes / 60); + const remainingMinutes = minutes % 60; + return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`; +} + +/** + * Format a status label from an outcome. + */ +function formatStatusLabel(outcome: SubagentRunOutcome | undefined): string { + if (!outcome) return "completed with unknown status"; + switch (outcome.status) { + case "ok": + return "completed successfully"; + case "error": + return outcome.error ? `failed: ${outcome.error}` : "failed"; + case "timeout": + return "timed out"; + default: + return "completed with unknown status"; + } +} + +/** Parameters for formatAnnouncementMessage */ +export interface FormatAnnouncementParams { + runId: string; + childSessionId: string; + requesterSessionId: string; + task: string; + label?: string | undefined; + cleanup: "delete" | "keep"; + outcome?: SubagentRunOutcome | undefined; + startedAt?: number | undefined; + endedAt?: number | undefined; + findings?: string | undefined; +} + +/** + * Format the announcement message sent to the parent agent. + */ +export function formatAnnouncementMessage(params: FormatAnnouncementParams): string { + const { task, label, outcome, findings, startedAt, endedAt, childSessionId } = params; + const displayName = label || task.slice(0, 60); + const statusLabel = formatStatusLabel(outcome); + + const parts: string[] = [ + `A background task "${displayName}" just ${statusLabel}.`, + "", + "Findings:", + findings || "(no output)", + ]; + + // Stats line + const stats: string[] = []; + if (startedAt && endedAt) { + stats.push(`runtime ${formatDuration(startedAt, endedAt)}`); + } + stats.push(`session ${childSessionId}`); + + parts.push("", `Stats: ${stats.join(" • ")}`); + + parts.push( + "", + "Summarize this naturally for the user. Keep it brief (1-2 sentences).", + "Flow it into the conversation naturally.", + "Do not mention technical details like session IDs or that this was a background task.", + "You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).", + ); + + return parts.join("\n"); +} + +/** + * Run the full subagent announcement flow: + * 1. Read child's last assistant reply + * 2. Format announcement message + * 3. Send to parent agent via Hub + */ +export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean { + const { requesterSessionId, childSessionId } = params; + + // Read child's final output + const findings = readLatestAssistantReply(childSessionId); + + // Format the announcement + const message = formatAnnouncementMessage({ + runId: params.runId, + childSessionId: params.childSessionId, + requesterSessionId: params.requesterSessionId, + task: params.task, + label: params.label, + cleanup: params.cleanup, + outcome: params.outcome, + startedAt: params.startedAt, + endedAt: params.endedAt, + findings, + }); + + // Deliver to parent agent via Hub + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, + ); + return false; + } + + parentAgent.write(message); + return true; + } catch (err) { + console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); + return false; + } +} diff --git a/src/agent/subagent/index.ts b/src/agent/subagent/index.ts new file mode 100644 index 00000000..d0a24d99 --- /dev/null +++ b/src/agent/subagent/index.ts @@ -0,0 +1,37 @@ +/** + * Subagent orchestration system. + * + * Provides child agent spawning, lifecycle management, + * persistent registry, and result announcement flow. + */ + +export type { + SubagentRunOutcome, + SubagentRunRecord, + RegisterSubagentRunParams, + SubagentAnnounceParams, + SubagentSystemPromptParams, +} from "./types.js"; + +export { + initSubagentRegistry, + registerSubagentRun, + listSubagentRuns, + releaseSubagentRun, + getSubagentRun, + resetSubagentRegistryForTests, +} from "./registry.js"; + +export { + buildSubagentSystemPrompt, + readLatestAssistantReply, + formatAnnouncementMessage, + runSubagentAnnounceFlow, +} from "./announce.js"; +export type { FormatAnnouncementParams } from "./announce.js"; + +export { + loadSubagentRuns, + saveSubagentRuns, + getSubagentStorePath, +} from "./registry-store.js"; diff --git a/src/agent/subagent/registry-store.test.ts b/src/agent/subagent/registry-store.test.ts new file mode 100644 index 00000000..7247203c --- /dev/null +++ b/src/agent/subagent/registry-store.test.ts @@ -0,0 +1,81 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { mkdtempSync, rmSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import type { SubagentRunRecord } from "./types.js"; + +// We need to test the store functions with a custom directory. +// Since the store uses DATA_DIR from shared, we test the serialization logic directly. + +describe("registry-store serialization", () => { + let tempDir: string; + + beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), "subagent-store-test-")); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + it("round-trips SubagentRunRecord through JSON", () => { + const record: SubagentRunRecord = { + runId: "run-123", + childSessionId: "child-456", + requesterSessionId: "parent-789", + task: "Analyze code quality", + label: "Code Review", + cleanup: "delete", + createdAt: Date.now(), + startedAt: Date.now(), + endedAt: Date.now() + 30000, + outcome: { status: "ok" }, + archiveAtMs: Date.now() + 3600000, + cleanupHandled: true, + cleanupCompletedAt: Date.now() + 30100, + }; + + // Serialize and deserialize + const json = JSON.stringify({ version: 1, runs: { "run-123": record } }); + const parsed = JSON.parse(json); + + expect(parsed.version).toBe(1); + expect(parsed.runs["run-123"]).toEqual(record); + }); + + it("handles record with minimal fields", () => { + const record: SubagentRunRecord = { + runId: "run-minimal", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Do something", + cleanup: "keep", + createdAt: Date.now(), + }; + + const json = JSON.stringify({ version: 1, runs: { "run-minimal": record } }); + const parsed = JSON.parse(json); + + expect(parsed.runs["run-minimal"].runId).toBe("run-minimal"); + expect(parsed.runs["run-minimal"].outcome).toBeUndefined(); + expect(parsed.runs["run-minimal"].label).toBeUndefined(); + }); + + it("handles error outcome serialization", () => { + const record: SubagentRunRecord = { + runId: "run-err", + childSessionId: "child-err", + requesterSessionId: "parent-1", + task: "Fail", + cleanup: "delete", + createdAt: Date.now(), + outcome: { status: "error", error: "Something went wrong" }, + }; + + const json = JSON.stringify(record); + const parsed = JSON.parse(json) as SubagentRunRecord; + + expect(parsed.outcome?.status).toBe("error"); + expect(parsed.outcome?.error).toBe("Something went wrong"); + }); +}); diff --git a/src/agent/subagent/registry-store.ts b/src/agent/subagent/registry-store.ts new file mode 100644 index 00000000..b96a315c --- /dev/null +++ b/src/agent/subagent/registry-store.ts @@ -0,0 +1,61 @@ +/** + * Persistent storage for subagent run records. + * + * File: ~/.super-multica/subagents/runs.json + */ + +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; +import { DATA_DIR } from "../../shared/index.js"; +import type { SubagentRunRecord } from "./types.js"; + +const SUBAGENTS_DIR = join(DATA_DIR, "subagents"); +const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); + +interface SubagentRunsStore { + version: 1; + runs: Record; +} + +function ensureDir(): void { + if (!existsSync(SUBAGENTS_DIR)) { + mkdirSync(SUBAGENTS_DIR, { recursive: true }); + } +} + +/** Get the path to the subagent store file (for testing) */ +export function getSubagentStorePath(): string { + return RUNS_FILE; +} + +/** Load all persisted subagent runs */ +export function loadSubagentRuns(): Map { + if (!existsSync(RUNS_FILE)) return new Map(); + + try { + const content = readFileSync(RUNS_FILE, "utf-8"); + const store = JSON.parse(content) as SubagentRunsStore; + + if (store.version !== 1) { + console.warn(`[SubagentStore] Unknown store version: ${store.version}, ignoring`); + return new Map(); + } + + return new Map(Object.entries(store.runs)); + } catch (err) { + console.warn(`[SubagentStore] Failed to load runs:`, err); + return new Map(); + } +} + +/** Save all subagent runs to disk */ +export function saveSubagentRuns(runs: Map): void { + ensureDir(); + + const store: SubagentRunsStore = { + version: 1, + runs: Object.fromEntries(runs), + }; + + writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8"); +} diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts new file mode 100644 index 00000000..9915e4a0 --- /dev/null +++ b/src/agent/subagent/registry.ts @@ -0,0 +1,321 @@ +/** + * Subagent registry — in-memory tracking + lifecycle management. + * + * Tracks all active subagent runs, persists state to disk, + * watches for child completion, and triggers announce flow. + */ + +import { getHub } from "../../hub/hub-singleton.js"; +import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; +import { runSubagentAnnounceFlow } from "./announce.js"; +import type { + RegisterSubagentRunParams, + SubagentRunRecord, +} from "./types.js"; +import { resolveSessionDir } from "../session/storage.js"; +import { rmSync } from "node:fs"; + +/** Default archive retention: 60 minutes after completion */ +const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000; + +/** Archive sweep interval: 60 seconds */ +const SWEEP_INTERVAL_MS = 60 * 1000; + +// ============================================================================ +// Module-level state +// ============================================================================ + +const subagentRuns = new Map(); +let sweepTimer: ReturnType | undefined; +const resumedRuns = new Set(); + +// ============================================================================ +// Public API +// ============================================================================ + +/** Initialize registry from persisted state. Call once at startup. */ +export function initSubagentRegistry(): void { + const persisted = loadSubagentRuns(); + for (const [runId, record] of persisted) { + subagentRuns.set(runId, record); + + // Resume incomplete runs + if (!record.cleanupHandled) { + if (record.endedAt) { + // Completed but cleanup not done — run announce flow + if (!resumedRuns.has(runId)) { + resumedRuns.add(runId); + handleRunCompletion(record); + } + } + // If not ended, the child agent session is lost on restart — + // mark as ended with unknown outcome + else if (!record.startedAt) { + record.endedAt = Date.now(); + record.outcome = { status: "unknown" }; + persist(); + if (!resumedRuns.has(runId)) { + resumedRuns.add(runId); + handleRunCompletion(record); + } + } + } + } + + if (subagentRuns.size > 0) { + startSweeper(); + console.log(`[SubagentRegistry] Loaded ${subagentRuns.size} persisted run(s)`); + } +} + +/** Register a new subagent run and start tracking its lifecycle. */ +export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord { + const { + runId, + childSessionId, + requesterSessionId, + task, + label, + cleanup = "delete", + timeoutSeconds, + } = params; + + const record: SubagentRunRecord = { + runId, + childSessionId, + requesterSessionId, + task, + label, + cleanup, + createdAt: Date.now(), + }; + + subagentRuns.set(runId, record); + persist(); + startSweeper(); + + // Start watching the child agent for completion + watchChildAgent(record, timeoutSeconds); + + return record; +} + +/** List all active runs for a given requester session. */ +export function listSubagentRuns(requesterSessionId: string): SubagentRunRecord[] { + const result: SubagentRunRecord[] = []; + for (const record of subagentRuns.values()) { + if (record.requesterSessionId === requesterSessionId) { + result.push(record); + } + } + return result; +} + +/** Remove a run from the registry. */ +export function releaseSubagentRun(runId: string): boolean { + const deleted = subagentRuns.delete(runId); + if (deleted) { + persist(); + if (subagentRuns.size === 0) { + stopSweeper(); + } + } + return deleted; +} + +/** Get a run by ID. */ +export function getSubagentRun(runId: string): SubagentRunRecord | undefined { + return subagentRuns.get(runId); +} + +/** Reset all state (for testing). */ +export function resetSubagentRegistryForTests(): void { + subagentRuns.clear(); + resumedRuns.clear(); + stopSweeper(); +} + +// ============================================================================ +// Lifecycle watching +// ============================================================================ + +function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): void { + const { runId, childSessionId } = record; + + // Mark as started + record.startedAt = Date.now(); + persist(); + + // Set up timeout if specified + let timeoutTimer: ReturnType | undefined; + if (timeoutSeconds && timeoutSeconds > 0) { + timeoutTimer = setTimeout(() => { + if (!record.endedAt) { + record.endedAt = Date.now(); + record.outcome = { status: "timeout" }; + persist(); + + // Try to close the child agent + try { + const hub = getHub(); + hub.closeAgent(childSessionId); + } catch { + // Hub may not be available + } + + handleRunCompletion(record); + } + }, timeoutSeconds * 1000); + } + + // Watch the child agent's channel for closure + void (async () => { + try { + const hub = getHub(); + const childAgent = hub.getAgent(childSessionId); + if (!childAgent) { + record.endedAt = Date.now(); + record.outcome = { status: "error", error: "Child agent not found" }; + persist(); + handleRunCompletion(record); + return; + } + + // Consume the child's output stream — when it ends, the agent is done + for await (const item of childAgent.read()) { + // Check for error messages + if ("content" in item && typeof item.content === "string" && item.content.startsWith("[error]")) { + record.outcome = { status: "error", error: item.content }; + } + } + + // Stream ended — child agent completed + if (!record.endedAt) { + if (timeoutTimer) clearTimeout(timeoutTimer); + record.endedAt = Date.now(); + if (!record.outcome) { + record.outcome = { status: "ok" }; + } + persist(); + handleRunCompletion(record); + } + } catch (err) { + if (!record.endedAt) { + if (timeoutTimer) clearTimeout(timeoutTimer); + record.endedAt = Date.now(); + record.outcome = { + status: "error", + error: err instanceof Error ? err.message : String(err), + }; + persist(); + handleRunCompletion(record); + } + } + })(); +} + +// ============================================================================ +// Cleanup + Announce +// ============================================================================ + +function handleRunCompletion(record: SubagentRunRecord): void { + if (record.cleanupHandled) return; + record.cleanupHandled = true; + persist(); + + // Run announce flow + const announced = runSubagentAnnounceFlow({ + runId: record.runId, + childSessionId: record.childSessionId, + requesterSessionId: record.requesterSessionId, + task: record.task, + label: record.label, + cleanup: record.cleanup, + outcome: record.outcome, + startedAt: record.startedAt, + endedAt: record.endedAt, + }); + + if (!announced) { + console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`); + } + + // Handle session cleanup + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); + } + + // Schedule archive + record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; + record.cleanupCompletedAt = Date.now(); + persist(); +} + +function deleteChildSession(sessionId: string): void { + try { + const sessionDir = resolveSessionDir(sessionId); + rmSync(sessionDir, { recursive: true, force: true }); + console.log(`[SubagentRegistry] Deleted child session: ${sessionId}`); + } catch (err) { + console.warn(`[SubagentRegistry] Failed to delete child session ${sessionId}:`, err); + } + + // Also close the agent in Hub + try { + const hub = getHub(); + hub.closeAgent(sessionId); + } catch { + // Hub may not be available + } +} + +// ============================================================================ +// Archive sweeper +// ============================================================================ + +function startSweeper(): void { + if (sweepTimer) return; + sweepTimer = setInterval(sweep, SWEEP_INTERVAL_MS); + // Don't prevent process exit + if (sweepTimer.unref) sweepTimer.unref(); +} + +function stopSweeper(): void { + if (sweepTimer) { + clearInterval(sweepTimer); + sweepTimer = undefined; + } +} + +function sweep(): void { + const now = Date.now(); + let removed = 0; + + for (const [runId, record] of subagentRuns) { + if (record.archiveAtMs && record.archiveAtMs <= now) { + subagentRuns.delete(runId); + removed++; + } + } + + if (removed > 0) { + persist(); + console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`); + } + + if (subagentRuns.size === 0) { + stopSweeper(); + } +} + +// ============================================================================ +// Persistence helper +// ============================================================================ + +function persist(): void { + try { + saveSubagentRuns(subagentRuns); + } catch (err) { + console.error(`[SubagentRegistry] Failed to persist runs:`, err); + } +} diff --git a/src/agent/subagent/types.ts b/src/agent/subagent/types.ts new file mode 100644 index 00000000..dfbf62bb --- /dev/null +++ b/src/agent/subagent/types.ts @@ -0,0 +1,74 @@ +/** + * Subagent orchestration types. + * + * Models the lifecycle of spawned child agents: + * created → started → ended → cleanup + */ + +/** Final outcome of a subagent run */ +export type SubagentRunOutcome = { + status: "ok" | "error" | "timeout" | "unknown"; + error?: string | undefined; +}; + +/** Persistent record tracking a single subagent run */ +export type SubagentRunRecord = { + /** Unique run identifier (UUIDv7) */ + runId: string; + /** Session ID of the child agent */ + childSessionId: string; + /** Session ID of the parent (requester) agent */ + requesterSessionId: string; + /** The task description / prompt given to the child */ + task: string; + /** Optional human-readable label */ + label?: string | undefined; + /** Session cleanup strategy after completion */ + cleanup: "delete" | "keep"; + /** Timestamp when the run was created */ + createdAt: number; + /** Timestamp when the child agent started execution */ + startedAt?: number | undefined; + /** Timestamp when the child agent finished */ + endedAt?: number | undefined; + /** Final status of the run */ + outcome?: SubagentRunOutcome | undefined; + /** Scheduled auto-archive time (ms since epoch) */ + archiveAtMs?: number | undefined; + /** Whether the cleanup/announce flow has been initiated */ + cleanupHandled?: boolean | undefined; + /** Timestamp when cleanup completed */ + cleanupCompletedAt?: number | undefined; +}; + +/** Parameters for registering a new subagent run */ +export type RegisterSubagentRunParams = { + runId: string; + childSessionId: string; + requesterSessionId: string; + task: string; + label?: string | undefined; + cleanup?: "delete" | "keep" | undefined; + timeoutSeconds?: number | undefined; +}; + +/** Parameters for the announce flow */ +export type SubagentAnnounceParams = { + runId: string; + childSessionId: string; + requesterSessionId: string; + task: string; + label?: string | undefined; + cleanup: "delete" | "keep"; + outcome?: SubagentRunOutcome | undefined; + startedAt?: number | undefined; + endedAt?: number | undefined; +}; + +/** Parameters for building the subagent system prompt */ +export type SubagentSystemPromptParams = { + requesterSessionId: string; + childSessionId: string; + label?: string | undefined; + task: string; +}; From 5b6a1c695337d42ae20b97514a48904bde6e79d2 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 16:49:05 +0800 Subject: [PATCH 2/7] feat(hub): add singleton pattern and subagent spawning Add global Hub singleton for cross-module access by subagent tools. Add createSubagent() method to Hub for spawning ephemeral child agents with isSubagent flag and custom system prompts. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub-singleton.ts | 28 ++++++++++++++++++++++++++++ src/hub/hub.ts | 26 ++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 src/hub/hub-singleton.ts diff --git a/src/hub/hub-singleton.ts b/src/hub/hub-singleton.ts new file mode 100644 index 00000000..5d04d0f7 --- /dev/null +++ b/src/hub/hub-singleton.ts @@ -0,0 +1,28 @@ +/** + * Global Hub singleton for cross-module access. + * + * Used by subagent tools and announce flow to interact with the Hub + * without threading references through the entire call chain. + */ + +import type { Hub } from "./hub.js"; + +let _hub: Hub | undefined; + +/** Set the global Hub instance. Called once during Hub construction. */ +export function setHub(hub: Hub): void { + _hub = hub; +} + +/** Get the global Hub instance. Throws if not yet initialized. */ +export function getHub(): Hub { + if (!_hub) { + throw new Error("[Hub] Hub singleton not initialized. Ensure Hub is constructed before accessing."); + } + return _hub; +} + +/** Check if the Hub singleton has been initialized. */ +export function isHubInitialized(): boolean { + return _hub !== undefined; +} diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 10236ff3..bc86bfdb 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -9,7 +9,9 @@ import { type ResponseErrorPayload, } from "@multica/sdk"; import { AsyncAgent } from "../agent/async-agent.js"; +import type { AgentOptions } from "../agent/types.js"; import { getHubId } from "./hub-identity.js"; +import { setHub } from "./hub-singleton.js"; import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; @@ -48,6 +50,9 @@ export class Hub { this.rpc.register("deleteAgent", createDeleteAgentHandler(this)); this.rpc.register("updateGateway", createUpdateGatewayHandler(this)); + // Register as global singleton for cross-module access (subagent tools, announce flow) + setHub(this); + this.client = this.createClient(this.url); this.client.connect(); this.restoreAgents(); @@ -243,6 +248,27 @@ export class Hub { } } + /** Create a subagent with specific options (isSubagent, systemPrompt, model) */ + createSubagent(sessionId: string, options: Omit = {}): AsyncAgent { + const existing = this.agents.get(sessionId); + if (existing && !existing.closed) { + return existing; + } + + const agent = new AsyncAgent({ + ...options, + sessionId, + isSubagent: true, + }); + this.agents.set(agent.sessionId, agent); + + // Subagents are ephemeral — don't persist to agent store + void this.consumeAgent(agent); + + console.log(`[Hub] Subagent created: ${agent.sessionId}`); + return agent; + } + getAgent(id: string): AsyncAgent | undefined { return this.agents.get(id); } From 83b557a6fc1b999f82e4c5f2aa20a3e644ef3830 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 16:49:27 +0800 Subject: [PATCH 3/7] feat(agent): add sessions_spawn tool for subagent orchestration Register sessions_spawn tool in the tool system with TypeBox schema. Subagents are blocked from spawning nested subagents via both tool policy (DEFAULT_SUBAGENT_TOOL_DENY) and runtime guard. Add group:subagent tool group and parentSessionId to AgentOptions. Co-Authored-By: Claude Opus 4.5 --- src/agent/tools.ts | 16 ++- src/agent/tools/groups.ts | 15 +-- src/agent/tools/sessions-spawn.test.ts | 40 +++++++ src/agent/tools/sessions-spawn.ts | 142 +++++++++++++++++++++++++ src/agent/types.ts | 2 + 5 files changed, 204 insertions(+), 11 deletions(-) create mode 100644 src/agent/tools/sessions-spawn.test.ts create mode 100644 src/agent/tools/sessions-spawn.ts diff --git a/src/agent/tools.ts b/src/agent/tools.ts index 8190cb86..56c9b766 100644 --- a/src/agent/tools.ts +++ b/src/agent/tools.ts @@ -6,6 +6,7 @@ import { createProcessTool } from "./tools/process.js"; import { createGlobTool } from "./tools/glob.js"; import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js"; import { createMemoryTools } from "./tools/memory/index.js"; +import { createSessionsSpawnTool } from "./tools/sessions-spawn.js"; import { filterTools } from "./tools/policy.js"; import { isMulticaError, isRetryableError } from "../shared/errors.js"; @@ -19,6 +20,10 @@ export interface CreateToolsOptions { profileId?: string | undefined; /** Base directory for profiles (optional) */ profileBaseDir?: string | undefined; + /** Whether this agent is a subagent (passed to sessions_spawn tool) */ + isSubagent?: boolean | undefined; + /** Session ID of the agent (passed to sessions_spawn tool) */ + sessionId?: string | undefined; } type ToolErrorPayload = { @@ -88,7 +93,7 @@ function wrapTool( export function createAllTools(options: CreateToolsOptions | string): AgentTool[] { // Support legacy string argument for backwards compatibility const opts: CreateToolsOptions = typeof options === "string" ? { cwd: options } : options; - const { cwd, profileId, profileBaseDir } = opts; + const { cwd, profileId, profileBaseDir, isSubagent, sessionId } = opts; const baseTools = createCodingTools(cwd).filter( (tool) => tool.name !== "bash", @@ -118,6 +123,13 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool< tools.push(...memoryTools); } + // Add sessions_spawn tool (will be filtered by policy for subagents) + const sessionsSpawnTool = createSessionsSpawnTool({ + isSubagent: isSubagent ?? false, + sessionId, + }); + tools.push(sessionsSpawnTool as AgentTool); + return tools; } @@ -138,6 +150,8 @@ export function resolveTools(options: AgentOptions): AgentTool[] { cwd, profileId: options.profileId, profileBaseDir: options.profileBaseDir, + isSubagent: options.isSubagent, + sessionId: options.sessionId, }); // Apply policy filtering diff --git a/src/agent/tools/groups.ts b/src/agent/tools/groups.ts index a886b22b..1e9edf6c 100644 --- a/src/agent/tools/groups.ts +++ b/src/agent/tools/groups.ts @@ -35,6 +35,9 @@ export const TOOL_GROUPS: Record = { // Memory tools (requires profileId) "group:memory": ["memory_get", "memory_set", "memory_delete", "memory_list"], + // Subagent tools + "group:subagent": ["sessions_spawn"], + // All core tools "group:core": [ "read", @@ -76,16 +79,8 @@ export const TOOL_PROFILES: Record { + it("has correct name and description", () => { + const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "test-session" }); + expect(tool.name).toBe("sessions_spawn"); + expect(tool.label).toBe("Spawn Subagent"); + expect(tool.description).toContain("Spawn a background subagent"); + }); + + it("rejects spawn from subagent sessions", async () => { + const tool = createSessionsSpawnTool({ isSubagent: true, sessionId: "child-session" }); + + const result = await tool.execute( + "call-1", + { task: "do something" } as any, + new AbortController().signal, + ); + + expect(result.details.status).toBe("error"); + expect(result.details.error).toContain("not allowed from sub-agent sessions"); + const firstContent = result.content[0] as { type: string; text: string }; + expect(firstContent.text).toContain("not allowed"); + }); + + it("fails gracefully when Hub is not initialized", async () => { + const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "parent-session" }); + + const result = await tool.execute( + "call-2", + { task: "analyze code", label: "Code Analysis" } as any, + new AbortController().signal, + ); + + // Should get an error because Hub singleton is not set up in test + expect(result.details.status).toBe("error"); + expect(result.details.error).toContain("Hub"); + }); +}); diff --git a/src/agent/tools/sessions-spawn.ts b/src/agent/tools/sessions-spawn.ts new file mode 100644 index 00000000..a8ca4f96 --- /dev/null +++ b/src/agent/tools/sessions-spawn.ts @@ -0,0 +1,142 @@ +/** + * sessions_spawn tool — allows a parent agent to spawn subagent runs. + * + * Subagents run in isolated sessions with restricted tools. + * Results are announced back to the parent when the child completes. + */ + +import { v7 as uuidv7 } from "uuid"; +import { Type } from "@sinclair/typebox"; +import type { AgentTool } from "@mariozechner/pi-agent-core"; +import { getHub } from "../../hub/hub-singleton.js"; +import { buildSubagentSystemPrompt } from "../subagent/announce.js"; +import { registerSubagentRun } from "../subagent/registry.js"; + +const SessionsSpawnSchema = Type.Object({ + task: Type.String({ description: "The task for the subagent to perform." }), + label: Type.Optional( + Type.String({ description: "Human-readable label for this background task." }), + ), + model: Type.Optional( + Type.String({ description: "Override the LLM model for the subagent (e.g. 'gpt-4o', 'claude-sonnet')." }), + ), + cleanup: Type.Optional( + Type.Union([Type.Literal("delete"), Type.Literal("keep")], { + description: "Session cleanup after completion. 'delete' removes session files, 'keep' preserves for audit. Default: 'delete'.", + }), + ), + timeoutSeconds: Type.Optional( + Type.Number({ + description: "Execution timeout in seconds. The subagent will be terminated if it exceeds this.", + minimum: 1, + }), + ), +}); + +type SessionsSpawnArgs = { + task: string; + label?: string; + model?: string; + cleanup?: "delete" | "keep"; + timeoutSeconds?: number; +}; + +export type SessionsSpawnResult = { + status: "accepted" | "error"; + childSessionId?: string; + runId?: string; + error?: string; +}; + +export interface CreateSessionsSpawnToolOptions { + /** Whether the current agent is itself a subagent */ + isSubagent?: boolean; + /** Session ID of the current (requester) agent */ + sessionId?: string; +} + +export function createSessionsSpawnTool( + options: CreateSessionsSpawnToolOptions, +): AgentTool { + return { + name: "sessions_spawn", + label: "Spawn Subagent", + description: + "Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " + + "When it completes, its findings are announced back to you automatically. " + + "Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.", + parameters: SessionsSpawnSchema, + execute: async (_toolCallId, args) => { + const { task, label, model, cleanup = "delete", timeoutSeconds } = args as SessionsSpawnArgs; + + // Guard: subagents cannot spawn subagents + if (options.isSubagent) { + return { + content: [{ type: "text", text: "Error: sessions_spawn is not allowed from sub-agent sessions." }], + details: { + status: "error", + error: "sessions_spawn is not allowed from sub-agent sessions", + }, + }; + } + + const requesterSessionId = options.sessionId ?? "unknown"; + const runId = uuidv7(); + const childSessionId = uuidv7(); + + // Build system prompt for the child + const systemPrompt = buildSubagentSystemPrompt({ + requesterSessionId, + childSessionId, + label, + task, + }); + + // Spawn child agent via Hub + try { + const hub = getHub(); + const childAgent = hub.createSubagent(childSessionId, { + systemPrompt, + model, + }); + + // Register the run for lifecycle tracking + registerSubagentRun({ + runId, + childSessionId, + requesterSessionId, + task, + label, + cleanup, + timeoutSeconds, + }); + + // Write the task to the child (non-blocking) + childAgent.write(task); + + return { + content: [ + { + type: "text", + text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. You will receive its findings when it completes.`, + }, + ], + details: { + status: "accepted", + childSessionId, + runId, + }, + }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { + content: [{ type: "text", text: `Error spawning subagent: ${message}` }], + details: { + status: "error", + error: message, + }, + }; + } + }, + }; +} diff --git a/src/agent/types.ts b/src/agent/types.ts index 75e53ad1..06ffb217 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -66,6 +66,8 @@ export type AgentOptions = { tools?: ToolsConfig | undefined; /** Whether this is a subagent (applies restricted tool set) */ isSubagent?: boolean | undefined; + /** Parent session ID (for subagent lineage tracking) */ + parentSessionId?: string | undefined; }; export interface Message { From 918b5d294d595858d7c5f79e775cc3e2f9990214 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 17:01:49 +0800 Subject: [PATCH 4/7] fix(agent): resolve stream race condition and add lifecycle tests Replace for-await stream consumption in watchChildAgent with waitForIdle() + onClose() callbacks on AsyncAgent. This prevents conflict with Hub.consumeAgent() which also reads the Channel. Add shutdownSubagentRegistry() for clean Hub shutdown, guard Hub access with isHubInitialized(), and clean resumedRuns in sweep. Co-Authored-By: Claude Opus 4.5 --- src/agent/async-agent.ts | 26 ++++- src/agent/subagent/index.ts | 1 + src/agent/subagent/registry.test.ts | 161 ++++++++++++++++++++++++++++ src/agent/subagent/registry.ts | 125 +++++++++++---------- 4 files changed, 254 insertions(+), 59 deletions(-) create mode 100644 src/agent/subagent/registry.test.ts diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 97b47378..c5bcdc07 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -14,6 +14,7 @@ export class AsyncAgent { private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); + private closeCallbacks: Array<() => void> = []; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -57,10 +58,33 @@ export class AsyncAgent { return this.channel; } - /** Close agent, stop all reads */ + /** Returns a promise that resolves when the current message queue is drained */ + waitForIdle(): Promise { + return this.queue; + } + + /** Register a callback to be invoked when the agent is closed */ + onClose(callback: () => void): void { + if (this._closed) { + // Already closed, fire immediately + callback(); + return; + } + this.closeCallbacks.push(callback); + } + + /** Close agent, stop all reads, fire close callbacks */ close(): void { if (this._closed) return; this._closed = true; this.channel.close(); + for (const cb of this.closeCallbacks) { + try { + cb(); + } catch { + // Don't let callback errors prevent other callbacks + } + } + this.closeCallbacks = []; } } diff --git a/src/agent/subagent/index.ts b/src/agent/subagent/index.ts index d0a24d99..2785d86e 100644 --- a/src/agent/subagent/index.ts +++ b/src/agent/subagent/index.ts @@ -20,6 +20,7 @@ export { releaseSubagentRun, getSubagentRun, resetSubagentRegistryForTests, + shutdownSubagentRegistry, } from "./registry.js"; export { diff --git a/src/agent/subagent/registry.test.ts b/src/agent/subagent/registry.test.ts new file mode 100644 index 00000000..dda78917 --- /dev/null +++ b/src/agent/subagent/registry.test.ts @@ -0,0 +1,161 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { + registerSubagentRun, + listSubagentRuns, + getSubagentRun, + releaseSubagentRun, + resetSubagentRegistryForTests, + shutdownSubagentRegistry, +} from "./registry.js"; + +// Note: These tests exercise the registry's in-memory state management. +// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent). + +beforeEach(() => { + resetSubagentRegistryForTests(); +}); + +describe("subagent registry", () => { + it("registers a run and retrieves it by ID", () => { + const record = registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Analyze code", + label: "Code Analysis", + }); + + expect(record.runId).toBe("run-1"); + expect(record.childSessionId).toBe("child-1"); + expect(record.requesterSessionId).toBe("parent-1"); + expect(record.task).toBe("Analyze code"); + expect(record.label).toBe("Code Analysis"); + expect(record.cleanup).toBe("delete"); // default + expect(record.createdAt).toBeGreaterThan(0); + expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent + + const retrieved = getSubagentRun("run-1"); + expect(retrieved).toBe(record); + }); + + it("lists runs filtered by requester session", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-A", + task: "Task 1", + }); + registerSubagentRun({ + runId: "run-2", + childSessionId: "child-2", + requesterSessionId: "parent-B", + task: "Task 2", + }); + registerSubagentRun({ + runId: "run-3", + childSessionId: "child-3", + requesterSessionId: "parent-A", + task: "Task 3", + }); + + const parentARuns = listSubagentRuns("parent-A"); + expect(parentARuns).toHaveLength(2); + expect(parentARuns.map((r) => r.runId).sort()).toEqual(["run-1", "run-3"]); + + const parentBRuns = listSubagentRuns("parent-B"); + expect(parentBRuns).toHaveLength(1); + expect(parentBRuns[0]!.runId).toBe("run-2"); + + const emptyRuns = listSubagentRuns("parent-C"); + expect(emptyRuns).toHaveLength(0); + }); + + it("releases a run from the registry", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task", + }); + + expect(getSubagentRun("run-1")).toBeDefined(); + + const released = releaseSubagentRun("run-1"); + expect(released).toBe(true); + expect(getSubagentRun("run-1")).toBeUndefined(); + + // Double release returns false + const releasedAgain = releaseSubagentRun("run-1"); + expect(releasedAgain).toBe(false); + }); + + it("applies custom cleanup value", () => { + const record = registerSubagentRun({ + runId: "run-keep", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Keep session", + cleanup: "keep", + }); + + expect(record.cleanup).toBe("keep"); + }); + + it("registers a run and ends it with error when Hub is not available", () => { + // Without Hub initialized, watchChildAgent detects missing Hub + // and immediately ends the run with an error + registerSubagentRun({ + runId: "run-no-hub", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Running task", + }); + + const record = getSubagentRun("run-no-hub"); + expect(record?.startedAt).toBeGreaterThan(0); + expect(record?.endedAt).toBeGreaterThan(0); + expect(record?.outcome?.status).toBe("error"); + expect(record?.outcome?.error).toContain("Hub not initialized"); + }); + + it("shutdownSubagentRegistry marks unfinished runs as ended", () => { + // Directly set up a record without going through watchChildAgent + // to simulate a run that is still active + registerSubagentRun({ + runId: "run-active", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Running task", + }); + + // The above run already ended due to no Hub; reset its endedAt + // to simulate a truly active run + const record = getSubagentRun("run-active"); + if (record) { + record.endedAt = undefined; + record.outcome = undefined; + } + + shutdownSubagentRegistry(); + + const after = getSubagentRun("run-active"); + expect(after?.endedAt).toBeGreaterThan(0); + expect(after?.outcome?.status).toBe("unknown"); + }); + + it("resetSubagentRegistryForTests clears all state", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task", + }); + + expect(listSubagentRuns("parent-1")).toHaveLength(1); + + resetSubagentRegistryForTests(); + + expect(listSubagentRuns("parent-1")).toHaveLength(0); + expect(getSubagentRun("run-1")).toBeUndefined(); + }); +}); diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index 9915e4a0..b2cc8446 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -5,7 +5,7 @@ * watches for child completion, and triggers announce flow. */ -import { getHub } from "../../hub/hub-singleton.js"; +import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; import { runSubagentAnnounceFlow } from "./announce.js"; import type { @@ -128,6 +128,27 @@ export function getSubagentRun(runId: string): SubagentRunRecord | undefined { return subagentRuns.get(runId); } +/** Mark all active (non-ended) runs as ended with "unknown" status. Called during Hub shutdown. */ +export function shutdownSubagentRegistry(): void { + const now = Date.now(); + let updated = 0; + + for (const record of subagentRuns.values()) { + if (!record.endedAt) { + record.endedAt = now; + record.outcome = { status: "unknown" }; + updated++; + } + } + + if (updated > 0) { + persist(); + console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`); + } + + stopSweeper(); +} + /** Reset all state (for testing). */ export function resetSubagentRegistryForTests(): void { subagentRuns.clear(); @@ -140,78 +161,65 @@ export function resetSubagentRegistryForTests(): void { // ============================================================================ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): void { - const { runId, childSessionId } = record; + const { childSessionId } = record; // Mark as started record.startedAt = Date.now(); persist(); + const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => { + if (record.endedAt) return; // Already finalized + if (timeoutTimer) clearTimeout(timeoutTimer); + record.endedAt = Date.now(); + record.outcome = outcome; + persist(); + handleRunCompletion(record); + }; + // Set up timeout if specified let timeoutTimer: ReturnType | undefined; if (timeoutSeconds && timeoutSeconds > 0) { timeoutTimer = setTimeout(() => { - if (!record.endedAt) { - record.endedAt = Date.now(); - record.outcome = { status: "timeout" }; - persist(); + cleanup({ status: "timeout" }); - // Try to close the child agent - try { - const hub = getHub(); - hub.closeAgent(childSessionId); - } catch { - // Hub may not be available - } - - handleRunCompletion(record); + // Try to close the child agent + try { + const hub = getHub(); + hub.closeAgent(childSessionId); + } catch { + // Hub may not be available } }, timeoutSeconds * 1000); } - // Watch the child agent's channel for closure - void (async () => { - try { - const hub = getHub(); - const childAgent = hub.getAgent(childSessionId); - if (!childAgent) { - record.endedAt = Date.now(); - record.outcome = { status: "error", error: "Child agent not found" }; - persist(); - handleRunCompletion(record); - return; - } + // Get child agent reference (Hub may not be available in tests) + if (!isHubInitialized()) { + cleanup({ status: "error", error: "Hub not initialized" }); + return; + } - // Consume the child's output stream — when it ends, the agent is done - for await (const item of childAgent.read()) { - // Check for error messages - if ("content" in item && typeof item.content === "string" && item.content.startsWith("[error]")) { - record.outcome = { status: "error", error: item.content }; - } - } + const hub = getHub(); + const childAgent = hub.getAgent(childSessionId); + if (!childAgent) { + cleanup({ status: "error", error: "Child agent not found" }); + return; + } - // Stream ended — child agent completed - if (!record.endedAt) { - if (timeoutTimer) clearTimeout(timeoutTimer); - record.endedAt = Date.now(); - if (!record.outcome) { - record.outcome = { status: "ok" }; - } - persist(); - handleRunCompletion(record); - } - } catch (err) { - if (!record.endedAt) { - if (timeoutTimer) clearTimeout(timeoutTimer); - record.endedAt = Date.now(); - record.outcome = { - status: "error", - error: err instanceof Error ? err.message : String(err), - }; - persist(); - handleRunCompletion(record); - } - } - })(); + // Wait for the child agent's task queue to drain (task completion), + // then trigger announce flow. Uses waitForIdle() instead of consuming + // the stream (which would conflict with Hub.consumeAgent). + childAgent.waitForIdle().then( + () => cleanup({ status: "ok" }), + (err) => cleanup({ + status: "error", + error: err instanceof Error ? err.message : String(err), + }), + ); + + // Also handle explicit close (e.g., timeout kill, Hub shutdown) + childAgent.onClose(() => { + cleanup({ status: record.outcome?.status ?? "unknown" }); + }); } // ============================================================================ @@ -292,8 +300,9 @@ function sweep(): void { let removed = 0; for (const [runId, record] of subagentRuns) { - if (record.archiveAtMs && record.archiveAtMs <= now) { + if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) { subagentRuns.delete(runId); + resumedRuns.delete(runId); removed++; } } From 6bd068a97f73435ff391a126847801e26f4b9181 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 17:01:56 +0800 Subject: [PATCH 5/7] fix(hub): init subagent registry on startup and cleanup on shutdown Call initSubagentRegistry() after setHub() in Hub constructor to restore persisted runs. Call shutdownSubagentRegistry() before closing agents in shutdown() to mark active runs as ended. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/hub/hub.ts b/src/hub/hub.ts index bc86bfdb..31818950 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -12,6 +12,7 @@ import { AsyncAgent } from "../agent/async-agent.js"; import type { AgentOptions } from "../agent/types.js"; import { getHubId } from "./hub-identity.js"; import { setHub } from "./hub-singleton.js"; +import { initSubagentRegistry, shutdownSubagentRegistry } from "../agent/subagent/index.js"; import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; @@ -53,6 +54,9 @@ export class Hub { // Register as global singleton for cross-module access (subagent tools, announce flow) setHub(this); + // Restore subagent registry from persistent state + initSubagentRegistry(); + this.client = this.createClient(this.url); this.client.connect(); this.restoreAgents(); @@ -292,6 +296,9 @@ export class Hub { } shutdown(): void { + // Finalize subagent registry before closing agents + shutdownSubagentRegistry(); + for (const [id, agent] of this.agents) { agent.close(); this.agents.delete(id); From 1cd778b84570b9a9140a8c3e5150561fd23f8b86 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 17:02:03 +0800 Subject: [PATCH 6/7] fix(agent): add task minLength validation, remove unused parentSessionId Add minLength:1 to sessions_spawn task parameter to prevent empty task strings. Remove parentSessionId from AgentOptions as the subagent registry tracks lineage via requesterSessionId. Co-Authored-By: Claude Opus 4.5 --- src/agent/tools/sessions-spawn.ts | 2 +- src/agent/types.ts | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/agent/tools/sessions-spawn.ts b/src/agent/tools/sessions-spawn.ts index a8ca4f96..0dfb5c8c 100644 --- a/src/agent/tools/sessions-spawn.ts +++ b/src/agent/tools/sessions-spawn.ts @@ -13,7 +13,7 @@ import { buildSubagentSystemPrompt } from "../subagent/announce.js"; import { registerSubagentRun } from "../subagent/registry.js"; const SessionsSpawnSchema = Type.Object({ - task: Type.String({ description: "The task for the subagent to perform." }), + task: Type.String({ description: "The task for the subagent to perform.", minLength: 1 }), label: Type.Optional( Type.String({ description: "Human-readable label for this background task." }), ), diff --git a/src/agent/types.ts b/src/agent/types.ts index 06ffb217..75e53ad1 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -66,8 +66,6 @@ export type AgentOptions = { tools?: ToolsConfig | undefined; /** Whether this is a subagent (applies restricted tool set) */ isSubagent?: boolean | undefined; - /** Parent session ID (for subagent lineage tracking) */ - parentSessionId?: string | undefined; }; export interface Message { From 30f23459cec38dce45f62127ea47fca1cd5d4773 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 17:22:41 +0800 Subject: [PATCH 7/7] fix(agent): align subagent lifecycle with openclaw --- src/agent/subagent/registry.ts | 11 +++++++---- src/agent/tools/sessions-spawn.ts | 7 ++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index b2cc8446..d6f76b94 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -47,10 +47,9 @@ export function initSubagentRegistry(): void { resumedRuns.add(runId); handleRunCompletion(record); } - } - // If not ended, the child agent session is lost on restart — - // mark as ended with unknown outcome - else if (!record.startedAt) { + } else { + // If not ended, the child agent session is lost on restart — + // mark as ended with unknown outcome record.endedAt = Date.now(); record.outcome = { status: "unknown" }; persist(); @@ -246,6 +245,10 @@ function handleRunCompletion(record: SubagentRunRecord): void { if (!announced) { console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`); + // Allow retry on next restart if announce failed. + record.cleanupHandled = false; + persist(); + return; } // Handle session cleanup diff --git a/src/agent/tools/sessions-spawn.ts b/src/agent/tools/sessions-spawn.ts index 0dfb5c8c..9ae2cc69 100644 --- a/src/agent/tools/sessions-spawn.ts +++ b/src/agent/tools/sessions-spawn.ts @@ -100,6 +100,10 @@ export function createSessionsSpawnTool( model, }); + // Write the task to the child (non-blocking) before registering, + // so waitForIdle() observes the queued work. + childAgent.write(task); + // Register the run for lifecycle tracking registerSubagentRun({ runId, @@ -111,9 +115,6 @@ export function createSessionsSpawnTool( timeoutSeconds, }); - // Write the task to the child (non-blocking) - childAgent.write(task); - return { content: [ {