diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 97b47378..c5bcdc07 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -14,6 +14,7 @@ export class AsyncAgent { private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); + private closeCallbacks: Array<() => void> = []; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -57,10 +58,33 @@ export class AsyncAgent { return this.channel; } - /** Close agent, stop all reads */ + /** Returns a promise that resolves when the current message queue is drained */ + waitForIdle(): Promise { + return this.queue; + } + + /** Register a callback to be invoked when the agent is closed */ + onClose(callback: () => void): void { + if (this._closed) { + // Already closed, fire immediately + callback(); + return; + } + this.closeCallbacks.push(callback); + } + + /** Close agent, stop all reads, fire close callbacks */ close(): void { if (this._closed) return; this._closed = true; this.channel.close(); + for (const cb of this.closeCallbacks) { + try { + cb(); + } catch { + // Don't let callback errors prevent other callbacks + } + } + this.closeCallbacks = []; } } diff --git a/src/agent/subagent/announce.test.ts b/src/agent/subagent/announce.test.ts new file mode 100644 index 00000000..8f1a7140 --- /dev/null +++ b/src/agent/subagent/announce.test.ts @@ -0,0 +1,127 @@ +import { describe, it, expect } from "vitest"; +import { buildSubagentSystemPrompt, formatAnnouncementMessage } from "./announce.js"; +import type { FormatAnnouncementParams } from "./announce.js"; + +describe("buildSubagentSystemPrompt", () => { + it("includes task and session context", () => { + const prompt = buildSubagentSystemPrompt({ + requesterSessionId: "parent-123", + childSessionId: "child-456", + task: "Analyze the auth module for security issues", + }); + + expect(prompt).toContain("You are a subagent spawned to complete a specific task"); + expect(prompt).toContain("Analyze the auth module for security issues"); + expect(prompt).toContain("parent-123"); + expect(prompt).toContain("child-456"); + expect(prompt).toContain("Do NOT spawn nested subagents"); + }); + + it("includes label when provided", () => { + const prompt = buildSubagentSystemPrompt({ + requesterSessionId: "parent-123", + childSessionId: "child-456", + label: "Security Audit", + task: "Check for vulnerabilities", + }); + + expect(prompt).toContain('Label: "Security Audit"'); + }); + + it("omits label line when not provided", () => { + const prompt = buildSubagentSystemPrompt({ + requesterSessionId: "parent-123", + childSessionId: "child-456", + task: "Do something", + }); + + expect(prompt).not.toContain("Label:"); + }); +}); + +describe("formatAnnouncementMessage", () => { + const baseParams: FormatAnnouncementParams = { + runId: "run-1", + childSessionId: "child-456", + requesterSessionId: "parent-123", + task: "Analyze code", + label: "Code Analysis", + cleanup: "delete", + outcome: { status: "ok" }, + startedAt: 1000000, + endedAt: 1030000, + }; + + it("formats successful completion", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + findings: "Found 3 issues in the auth module.", + }); + + expect(msg).toContain('"Code Analysis" just completed successfully'); + expect(msg).toContain("Found 3 issues in the auth module."); + expect(msg).toContain("runtime 30s"); + expect(msg).toContain("session child-456"); + }); + + it("formats error outcome", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + outcome: { status: "error", error: "API key expired" }, + }); + + expect(msg).toContain("failed: API key expired"); + }); + + it("formats timeout outcome", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + outcome: { status: "timeout" }, + }); + + expect(msg).toContain("timed out"); + }); + + it("shows (no output) when findings is not provided", () => { + const msg = formatAnnouncementMessage(baseParams); + + expect(msg).toContain("(no output)"); + }); + + it("uses task text when label is not provided", () => { + const paramsNoLabel: FormatAnnouncementParams = { + ...baseParams, + label: undefined, + }; + const msg = formatAnnouncementMessage(paramsNoLabel); + + expect(msg).toContain('"Analyze code"'); + }); + + it("formats runtime for minutes", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + startedAt: 1000000, + endedAt: 1150000, // 150 seconds = 2m30s + }); + + expect(msg).toContain("runtime 2m30s"); + }); + + it("formats runtime for hours", () => { + const msg = formatAnnouncementMessage({ + ...baseParams, + startedAt: 1000000, + endedAt: 4600000, // 3600 seconds = 1h + }); + + expect(msg).toContain("runtime 1h"); + }); + + it("includes summarization instruction", () => { + const msg = formatAnnouncementMessage(baseParams); + + expect(msg).toContain("Summarize this naturally for the user"); + expect(msg).toContain("NO_REPLY"); + }); +}); diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts new file mode 100644 index 00000000..e5d06c51 --- /dev/null +++ b/src/agent/subagent/announce.ts @@ -0,0 +1,226 @@ +/** + * Subagent announcement flow. + * + * Handles result propagation from child → parent agent: + * - Builds system prompts for child agents + * - Reads child session output + * - Formats and delivers announcement messages + */ + +import { readEntries } from "../session/storage.js"; +import { getHub } from "../../hub/hub-singleton.js"; +import type { + SubagentAnnounceParams, + SubagentRunOutcome, + SubagentSystemPromptParams, +} from "./types.js"; + +/** + * Build the system prompt injected into a subagent session. + */ +export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): string { + const { requesterSessionId, childSessionId, label, task } = params; + + const lines: string[] = [ + "You are a subagent spawned to complete a specific task.", + "", + "## Rules", + "- Stay focused on the assigned task below.", + "- Complete the task thoroughly and report your findings.", + "- Do NOT initiate side actions unrelated to the task.", + "- Do NOT attempt to communicate with the user directly.", + "- Do NOT spawn nested subagents.", + "- Your session is ephemeral and will be cleaned up after completion.", + "", + "## Context", + `Requester session: ${requesterSessionId}`, + `Child session: ${childSessionId}`, + ]; + + if (label) { + lines.push(`Label: "${label}"`); + } + + lines.push("", "## Task", task); + + return lines.join("\n"); +} + +/** + * Read the latest assistant reply from a session's JSONL file. + */ +export function readLatestAssistantReply(sessionId: string): string | undefined { + const entries = readEntries(sessionId); + + // Walk backwards to find last assistant message + for (let i = entries.length - 1; i >= 0; i--) { + const entry = entries[i]!; + if (entry.type !== "message") continue; + + const message = entry.message; + if (message.role !== "assistant") continue; + + return extractAssistantText(message); + } + + return undefined; +} + +/** + * Extract text content from an assistant message. + * AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[]. + */ +function extractAssistantText(message: { role: string; content: unknown }): string { + const content = message.content; + if (typeof content === "string") { + return sanitizeText(content); + } + + if (!Array.isArray(content)) return ""; + + const textParts: string[] = []; + for (const block of content) { + if (block && typeof block === "object" && "type" in block && block.type === "text" && "text" in block) { + textParts.push(String(block.text)); + } + } + + return sanitizeText(textParts.join("\n")); +} + +/** + * Strip thinking tags and tool markers from text. + */ +function sanitizeText(text: string): string { + return text + .replace(/[\s\S]*?<\/thinking>/g, "") + .replace(/[\s\S]*?<\/tool_call>/g, "") + .trim(); +} + +/** + * Format the duration between two timestamps as a human-readable string. + */ +function formatDuration(startMs: number, endMs: number): string { + const totalSeconds = Math.round((endMs - startMs) / 1000); + if (totalSeconds < 60) return `${totalSeconds}s`; + + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`; + + const hours = Math.floor(minutes / 60); + const remainingMinutes = minutes % 60; + return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`; +} + +/** + * Format a status label from an outcome. + */ +function formatStatusLabel(outcome: SubagentRunOutcome | undefined): string { + if (!outcome) return "completed with unknown status"; + switch (outcome.status) { + case "ok": + return "completed successfully"; + case "error": + return outcome.error ? `failed: ${outcome.error}` : "failed"; + case "timeout": + return "timed out"; + default: + return "completed with unknown status"; + } +} + +/** Parameters for formatAnnouncementMessage */ +export interface FormatAnnouncementParams { + runId: string; + childSessionId: string; + requesterSessionId: string; + task: string; + label?: string | undefined; + cleanup: "delete" | "keep"; + outcome?: SubagentRunOutcome | undefined; + startedAt?: number | undefined; + endedAt?: number | undefined; + findings?: string | undefined; +} + +/** + * Format the announcement message sent to the parent agent. + */ +export function formatAnnouncementMessage(params: FormatAnnouncementParams): string { + const { task, label, outcome, findings, startedAt, endedAt, childSessionId } = params; + const displayName = label || task.slice(0, 60); + const statusLabel = formatStatusLabel(outcome); + + const parts: string[] = [ + `A background task "${displayName}" just ${statusLabel}.`, + "", + "Findings:", + findings || "(no output)", + ]; + + // Stats line + const stats: string[] = []; + if (startedAt && endedAt) { + stats.push(`runtime ${formatDuration(startedAt, endedAt)}`); + } + stats.push(`session ${childSessionId}`); + + parts.push("", `Stats: ${stats.join(" • ")}`); + + parts.push( + "", + "Summarize this naturally for the user. Keep it brief (1-2 sentences).", + "Flow it into the conversation naturally.", + "Do not mention technical details like session IDs or that this was a background task.", + "You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).", + ); + + return parts.join("\n"); +} + +/** + * Run the full subagent announcement flow: + * 1. Read child's last assistant reply + * 2. Format announcement message + * 3. Send to parent agent via Hub + */ +export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean { + const { requesterSessionId, childSessionId } = params; + + // Read child's final output + const findings = readLatestAssistantReply(childSessionId); + + // Format the announcement + const message = formatAnnouncementMessage({ + runId: params.runId, + childSessionId: params.childSessionId, + requesterSessionId: params.requesterSessionId, + task: params.task, + label: params.label, + cleanup: params.cleanup, + outcome: params.outcome, + startedAt: params.startedAt, + endedAt: params.endedAt, + findings, + }); + + // Deliver to parent agent via Hub + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, + ); + return false; + } + + parentAgent.write(message); + return true; + } catch (err) { + console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); + return false; + } +} diff --git a/src/agent/subagent/index.ts b/src/agent/subagent/index.ts new file mode 100644 index 00000000..2785d86e --- /dev/null +++ b/src/agent/subagent/index.ts @@ -0,0 +1,38 @@ +/** + * Subagent orchestration system. + * + * Provides child agent spawning, lifecycle management, + * persistent registry, and result announcement flow. + */ + +export type { + SubagentRunOutcome, + SubagentRunRecord, + RegisterSubagentRunParams, + SubagentAnnounceParams, + SubagentSystemPromptParams, +} from "./types.js"; + +export { + initSubagentRegistry, + registerSubagentRun, + listSubagentRuns, + releaseSubagentRun, + getSubagentRun, + resetSubagentRegistryForTests, + shutdownSubagentRegistry, +} from "./registry.js"; + +export { + buildSubagentSystemPrompt, + readLatestAssistantReply, + formatAnnouncementMessage, + runSubagentAnnounceFlow, +} from "./announce.js"; +export type { FormatAnnouncementParams } from "./announce.js"; + +export { + loadSubagentRuns, + saveSubagentRuns, + getSubagentStorePath, +} from "./registry-store.js"; diff --git a/src/agent/subagent/registry-store.test.ts b/src/agent/subagent/registry-store.test.ts new file mode 100644 index 00000000..7247203c --- /dev/null +++ b/src/agent/subagent/registry-store.test.ts @@ -0,0 +1,81 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { mkdtempSync, rmSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import type { SubagentRunRecord } from "./types.js"; + +// We need to test the store functions with a custom directory. +// Since the store uses DATA_DIR from shared, we test the serialization logic directly. + +describe("registry-store serialization", () => { + let tempDir: string; + + beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), "subagent-store-test-")); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + it("round-trips SubagentRunRecord through JSON", () => { + const record: SubagentRunRecord = { + runId: "run-123", + childSessionId: "child-456", + requesterSessionId: "parent-789", + task: "Analyze code quality", + label: "Code Review", + cleanup: "delete", + createdAt: Date.now(), + startedAt: Date.now(), + endedAt: Date.now() + 30000, + outcome: { status: "ok" }, + archiveAtMs: Date.now() + 3600000, + cleanupHandled: true, + cleanupCompletedAt: Date.now() + 30100, + }; + + // Serialize and deserialize + const json = JSON.stringify({ version: 1, runs: { "run-123": record } }); + const parsed = JSON.parse(json); + + expect(parsed.version).toBe(1); + expect(parsed.runs["run-123"]).toEqual(record); + }); + + it("handles record with minimal fields", () => { + const record: SubagentRunRecord = { + runId: "run-minimal", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Do something", + cleanup: "keep", + createdAt: Date.now(), + }; + + const json = JSON.stringify({ version: 1, runs: { "run-minimal": record } }); + const parsed = JSON.parse(json); + + expect(parsed.runs["run-minimal"].runId).toBe("run-minimal"); + expect(parsed.runs["run-minimal"].outcome).toBeUndefined(); + expect(parsed.runs["run-minimal"].label).toBeUndefined(); + }); + + it("handles error outcome serialization", () => { + const record: SubagentRunRecord = { + runId: "run-err", + childSessionId: "child-err", + requesterSessionId: "parent-1", + task: "Fail", + cleanup: "delete", + createdAt: Date.now(), + outcome: { status: "error", error: "Something went wrong" }, + }; + + const json = JSON.stringify(record); + const parsed = JSON.parse(json) as SubagentRunRecord; + + expect(parsed.outcome?.status).toBe("error"); + expect(parsed.outcome?.error).toBe("Something went wrong"); + }); +}); diff --git a/src/agent/subagent/registry-store.ts b/src/agent/subagent/registry-store.ts new file mode 100644 index 00000000..b96a315c --- /dev/null +++ b/src/agent/subagent/registry-store.ts @@ -0,0 +1,61 @@ +/** + * Persistent storage for subagent run records. + * + * File: ~/.super-multica/subagents/runs.json + */ + +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; +import { DATA_DIR } from "../../shared/index.js"; +import type { SubagentRunRecord } from "./types.js"; + +const SUBAGENTS_DIR = join(DATA_DIR, "subagents"); +const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); + +interface SubagentRunsStore { + version: 1; + runs: Record; +} + +function ensureDir(): void { + if (!existsSync(SUBAGENTS_DIR)) { + mkdirSync(SUBAGENTS_DIR, { recursive: true }); + } +} + +/** Get the path to the subagent store file (for testing) */ +export function getSubagentStorePath(): string { + return RUNS_FILE; +} + +/** Load all persisted subagent runs */ +export function loadSubagentRuns(): Map { + if (!existsSync(RUNS_FILE)) return new Map(); + + try { + const content = readFileSync(RUNS_FILE, "utf-8"); + const store = JSON.parse(content) as SubagentRunsStore; + + if (store.version !== 1) { + console.warn(`[SubagentStore] Unknown store version: ${store.version}, ignoring`); + return new Map(); + } + + return new Map(Object.entries(store.runs)); + } catch (err) { + console.warn(`[SubagentStore] Failed to load runs:`, err); + return new Map(); + } +} + +/** Save all subagent runs to disk */ +export function saveSubagentRuns(runs: Map): void { + ensureDir(); + + const store: SubagentRunsStore = { + version: 1, + runs: Object.fromEntries(runs), + }; + + writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8"); +} diff --git a/src/agent/subagent/registry.test.ts b/src/agent/subagent/registry.test.ts new file mode 100644 index 00000000..dda78917 --- /dev/null +++ b/src/agent/subagent/registry.test.ts @@ -0,0 +1,161 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { + registerSubagentRun, + listSubagentRuns, + getSubagentRun, + releaseSubagentRun, + resetSubagentRegistryForTests, + shutdownSubagentRegistry, +} from "./registry.js"; + +// Note: These tests exercise the registry's in-memory state management. +// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent). + +beforeEach(() => { + resetSubagentRegistryForTests(); +}); + +describe("subagent registry", () => { + it("registers a run and retrieves it by ID", () => { + const record = registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Analyze code", + label: "Code Analysis", + }); + + expect(record.runId).toBe("run-1"); + expect(record.childSessionId).toBe("child-1"); + expect(record.requesterSessionId).toBe("parent-1"); + expect(record.task).toBe("Analyze code"); + expect(record.label).toBe("Code Analysis"); + expect(record.cleanup).toBe("delete"); // default + expect(record.createdAt).toBeGreaterThan(0); + expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent + + const retrieved = getSubagentRun("run-1"); + expect(retrieved).toBe(record); + }); + + it("lists runs filtered by requester session", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-A", + task: "Task 1", + }); + registerSubagentRun({ + runId: "run-2", + childSessionId: "child-2", + requesterSessionId: "parent-B", + task: "Task 2", + }); + registerSubagentRun({ + runId: "run-3", + childSessionId: "child-3", + requesterSessionId: "parent-A", + task: "Task 3", + }); + + const parentARuns = listSubagentRuns("parent-A"); + expect(parentARuns).toHaveLength(2); + expect(parentARuns.map((r) => r.runId).sort()).toEqual(["run-1", "run-3"]); + + const parentBRuns = listSubagentRuns("parent-B"); + expect(parentBRuns).toHaveLength(1); + expect(parentBRuns[0]!.runId).toBe("run-2"); + + const emptyRuns = listSubagentRuns("parent-C"); + expect(emptyRuns).toHaveLength(0); + }); + + it("releases a run from the registry", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task", + }); + + expect(getSubagentRun("run-1")).toBeDefined(); + + const released = releaseSubagentRun("run-1"); + expect(released).toBe(true); + expect(getSubagentRun("run-1")).toBeUndefined(); + + // Double release returns false + const releasedAgain = releaseSubagentRun("run-1"); + expect(releasedAgain).toBe(false); + }); + + it("applies custom cleanup value", () => { + const record = registerSubagentRun({ + runId: "run-keep", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Keep session", + cleanup: "keep", + }); + + expect(record.cleanup).toBe("keep"); + }); + + it("registers a run and ends it with error when Hub is not available", () => { + // Without Hub initialized, watchChildAgent detects missing Hub + // and immediately ends the run with an error + registerSubagentRun({ + runId: "run-no-hub", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Running task", + }); + + const record = getSubagentRun("run-no-hub"); + expect(record?.startedAt).toBeGreaterThan(0); + expect(record?.endedAt).toBeGreaterThan(0); + expect(record?.outcome?.status).toBe("error"); + expect(record?.outcome?.error).toContain("Hub not initialized"); + }); + + it("shutdownSubagentRegistry marks unfinished runs as ended", () => { + // Directly set up a record without going through watchChildAgent + // to simulate a run that is still active + registerSubagentRun({ + runId: "run-active", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Running task", + }); + + // The above run already ended due to no Hub; reset its endedAt + // to simulate a truly active run + const record = getSubagentRun("run-active"); + if (record) { + record.endedAt = undefined; + record.outcome = undefined; + } + + shutdownSubagentRegistry(); + + const after = getSubagentRun("run-active"); + expect(after?.endedAt).toBeGreaterThan(0); + expect(after?.outcome?.status).toBe("unknown"); + }); + + it("resetSubagentRegistryForTests clears all state", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task", + }); + + expect(listSubagentRuns("parent-1")).toHaveLength(1); + + resetSubagentRegistryForTests(); + + expect(listSubagentRuns("parent-1")).toHaveLength(0); + expect(getSubagentRun("run-1")).toBeUndefined(); + }); +}); diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts new file mode 100644 index 00000000..d6f76b94 --- /dev/null +++ b/src/agent/subagent/registry.ts @@ -0,0 +1,333 @@ +/** + * Subagent registry — in-memory tracking + lifecycle management. + * + * Tracks all active subagent runs, persists state to disk, + * watches for child completion, and triggers announce flow. + */ + +import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; +import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; +import { runSubagentAnnounceFlow } from "./announce.js"; +import type { + RegisterSubagentRunParams, + SubagentRunRecord, +} from "./types.js"; +import { resolveSessionDir } from "../session/storage.js"; +import { rmSync } from "node:fs"; + +/** Default archive retention: 60 minutes after completion */ +const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000; + +/** Archive sweep interval: 60 seconds */ +const SWEEP_INTERVAL_MS = 60 * 1000; + +// ============================================================================ +// Module-level state +// ============================================================================ + +const subagentRuns = new Map(); +let sweepTimer: ReturnType | undefined; +const resumedRuns = new Set(); + +// ============================================================================ +// Public API +// ============================================================================ + +/** Initialize registry from persisted state. Call once at startup. */ +export function initSubagentRegistry(): void { + const persisted = loadSubagentRuns(); + for (const [runId, record] of persisted) { + subagentRuns.set(runId, record); + + // Resume incomplete runs + if (!record.cleanupHandled) { + if (record.endedAt) { + // Completed but cleanup not done — run announce flow + if (!resumedRuns.has(runId)) { + resumedRuns.add(runId); + handleRunCompletion(record); + } + } else { + // If not ended, the child agent session is lost on restart — + // mark as ended with unknown outcome + record.endedAt = Date.now(); + record.outcome = { status: "unknown" }; + persist(); + if (!resumedRuns.has(runId)) { + resumedRuns.add(runId); + handleRunCompletion(record); + } + } + } + } + + if (subagentRuns.size > 0) { + startSweeper(); + console.log(`[SubagentRegistry] Loaded ${subagentRuns.size} persisted run(s)`); + } +} + +/** Register a new subagent run and start tracking its lifecycle. */ +export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord { + const { + runId, + childSessionId, + requesterSessionId, + task, + label, + cleanup = "delete", + timeoutSeconds, + } = params; + + const record: SubagentRunRecord = { + runId, + childSessionId, + requesterSessionId, + task, + label, + cleanup, + createdAt: Date.now(), + }; + + subagentRuns.set(runId, record); + persist(); + startSweeper(); + + // Start watching the child agent for completion + watchChildAgent(record, timeoutSeconds); + + return record; +} + +/** List all active runs for a given requester session. */ +export function listSubagentRuns(requesterSessionId: string): SubagentRunRecord[] { + const result: SubagentRunRecord[] = []; + for (const record of subagentRuns.values()) { + if (record.requesterSessionId === requesterSessionId) { + result.push(record); + } + } + return result; +} + +/** Remove a run from the registry. */ +export function releaseSubagentRun(runId: string): boolean { + const deleted = subagentRuns.delete(runId); + if (deleted) { + persist(); + if (subagentRuns.size === 0) { + stopSweeper(); + } + } + return deleted; +} + +/** Get a run by ID. */ +export function getSubagentRun(runId: string): SubagentRunRecord | undefined { + return subagentRuns.get(runId); +} + +/** Mark all active (non-ended) runs as ended with "unknown" status. Called during Hub shutdown. */ +export function shutdownSubagentRegistry(): void { + const now = Date.now(); + let updated = 0; + + for (const record of subagentRuns.values()) { + if (!record.endedAt) { + record.endedAt = now; + record.outcome = { status: "unknown" }; + updated++; + } + } + + if (updated > 0) { + persist(); + console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`); + } + + stopSweeper(); +} + +/** Reset all state (for testing). */ +export function resetSubagentRegistryForTests(): void { + subagentRuns.clear(); + resumedRuns.clear(); + stopSweeper(); +} + +// ============================================================================ +// Lifecycle watching +// ============================================================================ + +function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): void { + const { childSessionId } = record; + + // Mark as started + record.startedAt = Date.now(); + persist(); + + const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => { + if (record.endedAt) return; // Already finalized + if (timeoutTimer) clearTimeout(timeoutTimer); + record.endedAt = Date.now(); + record.outcome = outcome; + persist(); + handleRunCompletion(record); + }; + + // Set up timeout if specified + let timeoutTimer: ReturnType | undefined; + if (timeoutSeconds && timeoutSeconds > 0) { + timeoutTimer = setTimeout(() => { + cleanup({ status: "timeout" }); + + // Try to close the child agent + try { + const hub = getHub(); + hub.closeAgent(childSessionId); + } catch { + // Hub may not be available + } + }, timeoutSeconds * 1000); + } + + // Get child agent reference (Hub may not be available in tests) + if (!isHubInitialized()) { + cleanup({ status: "error", error: "Hub not initialized" }); + return; + } + + const hub = getHub(); + const childAgent = hub.getAgent(childSessionId); + if (!childAgent) { + cleanup({ status: "error", error: "Child agent not found" }); + return; + } + + // Wait for the child agent's task queue to drain (task completion), + // then trigger announce flow. Uses waitForIdle() instead of consuming + // the stream (which would conflict with Hub.consumeAgent). + childAgent.waitForIdle().then( + () => cleanup({ status: "ok" }), + (err) => cleanup({ + status: "error", + error: err instanceof Error ? err.message : String(err), + }), + ); + + // Also handle explicit close (e.g., timeout kill, Hub shutdown) + childAgent.onClose(() => { + cleanup({ status: record.outcome?.status ?? "unknown" }); + }); +} + +// ============================================================================ +// Cleanup + Announce +// ============================================================================ + +function handleRunCompletion(record: SubagentRunRecord): void { + if (record.cleanupHandled) return; + record.cleanupHandled = true; + persist(); + + // Run announce flow + const announced = runSubagentAnnounceFlow({ + runId: record.runId, + childSessionId: record.childSessionId, + requesterSessionId: record.requesterSessionId, + task: record.task, + label: record.label, + cleanup: record.cleanup, + outcome: record.outcome, + startedAt: record.startedAt, + endedAt: record.endedAt, + }); + + if (!announced) { + console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`); + // Allow retry on next restart if announce failed. + record.cleanupHandled = false; + persist(); + return; + } + + // Handle session cleanup + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); + } + + // Schedule archive + record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; + record.cleanupCompletedAt = Date.now(); + persist(); +} + +function deleteChildSession(sessionId: string): void { + try { + const sessionDir = resolveSessionDir(sessionId); + rmSync(sessionDir, { recursive: true, force: true }); + console.log(`[SubagentRegistry] Deleted child session: ${sessionId}`); + } catch (err) { + console.warn(`[SubagentRegistry] Failed to delete child session ${sessionId}:`, err); + } + + // Also close the agent in Hub + try { + const hub = getHub(); + hub.closeAgent(sessionId); + } catch { + // Hub may not be available + } +} + +// ============================================================================ +// Archive sweeper +// ============================================================================ + +function startSweeper(): void { + if (sweepTimer) return; + sweepTimer = setInterval(sweep, SWEEP_INTERVAL_MS); + // Don't prevent process exit + if (sweepTimer.unref) sweepTimer.unref(); +} + +function stopSweeper(): void { + if (sweepTimer) { + clearInterval(sweepTimer); + sweepTimer = undefined; + } +} + +function sweep(): void { + const now = Date.now(); + let removed = 0; + + for (const [runId, record] of subagentRuns) { + if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) { + subagentRuns.delete(runId); + resumedRuns.delete(runId); + removed++; + } + } + + if (removed > 0) { + persist(); + console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`); + } + + if (subagentRuns.size === 0) { + stopSweeper(); + } +} + +// ============================================================================ +// Persistence helper +// ============================================================================ + +function persist(): void { + try { + saveSubagentRuns(subagentRuns); + } catch (err) { + console.error(`[SubagentRegistry] Failed to persist runs:`, err); + } +} diff --git a/src/agent/subagent/types.ts b/src/agent/subagent/types.ts new file mode 100644 index 00000000..dfbf62bb --- /dev/null +++ b/src/agent/subagent/types.ts @@ -0,0 +1,74 @@ +/** + * Subagent orchestration types. + * + * Models the lifecycle of spawned child agents: + * created → started → ended → cleanup + */ + +/** Final outcome of a subagent run */ +export type SubagentRunOutcome = { + status: "ok" | "error" | "timeout" | "unknown"; + error?: string | undefined; +}; + +/** Persistent record tracking a single subagent run */ +export type SubagentRunRecord = { + /** Unique run identifier (UUIDv7) */ + runId: string; + /** Session ID of the child agent */ + childSessionId: string; + /** Session ID of the parent (requester) agent */ + requesterSessionId: string; + /** The task description / prompt given to the child */ + task: string; + /** Optional human-readable label */ + label?: string | undefined; + /** Session cleanup strategy after completion */ + cleanup: "delete" | "keep"; + /** Timestamp when the run was created */ + createdAt: number; + /** Timestamp when the child agent started execution */ + startedAt?: number | undefined; + /** Timestamp when the child agent finished */ + endedAt?: number | undefined; + /** Final status of the run */ + outcome?: SubagentRunOutcome | undefined; + /** Scheduled auto-archive time (ms since epoch) */ + archiveAtMs?: number | undefined; + /** Whether the cleanup/announce flow has been initiated */ + cleanupHandled?: boolean | undefined; + /** Timestamp when cleanup completed */ + cleanupCompletedAt?: number | undefined; +}; + +/** Parameters for registering a new subagent run */ +export type RegisterSubagentRunParams = { + runId: string; + childSessionId: string; + requesterSessionId: string; + task: string; + label?: string | undefined; + cleanup?: "delete" | "keep" | undefined; + timeoutSeconds?: number | undefined; +}; + +/** Parameters for the announce flow */ +export type SubagentAnnounceParams = { + runId: string; + childSessionId: string; + requesterSessionId: string; + task: string; + label?: string | undefined; + cleanup: "delete" | "keep"; + outcome?: SubagentRunOutcome | undefined; + startedAt?: number | undefined; + endedAt?: number | undefined; +}; + +/** Parameters for building the subagent system prompt */ +export type SubagentSystemPromptParams = { + requesterSessionId: string; + childSessionId: string; + label?: string | undefined; + task: string; +}; diff --git a/src/agent/tools.ts b/src/agent/tools.ts index 8190cb86..56c9b766 100644 --- a/src/agent/tools.ts +++ b/src/agent/tools.ts @@ -6,6 +6,7 @@ import { createProcessTool } from "./tools/process.js"; import { createGlobTool } from "./tools/glob.js"; import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js"; import { createMemoryTools } from "./tools/memory/index.js"; +import { createSessionsSpawnTool } from "./tools/sessions-spawn.js"; import { filterTools } from "./tools/policy.js"; import { isMulticaError, isRetryableError } from "../shared/errors.js"; @@ -19,6 +20,10 @@ export interface CreateToolsOptions { profileId?: string | undefined; /** Base directory for profiles (optional) */ profileBaseDir?: string | undefined; + /** Whether this agent is a subagent (passed to sessions_spawn tool) */ + isSubagent?: boolean | undefined; + /** Session ID of the agent (passed to sessions_spawn tool) */ + sessionId?: string | undefined; } type ToolErrorPayload = { @@ -88,7 +93,7 @@ function wrapTool( export function createAllTools(options: CreateToolsOptions | string): AgentTool[] { // Support legacy string argument for backwards compatibility const opts: CreateToolsOptions = typeof options === "string" ? { cwd: options } : options; - const { cwd, profileId, profileBaseDir } = opts; + const { cwd, profileId, profileBaseDir, isSubagent, sessionId } = opts; const baseTools = createCodingTools(cwd).filter( (tool) => tool.name !== "bash", @@ -118,6 +123,13 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool< tools.push(...memoryTools); } + // Add sessions_spawn tool (will be filtered by policy for subagents) + const sessionsSpawnTool = createSessionsSpawnTool({ + isSubagent: isSubagent ?? false, + sessionId, + }); + tools.push(sessionsSpawnTool as AgentTool); + return tools; } @@ -138,6 +150,8 @@ export function resolveTools(options: AgentOptions): AgentTool[] { cwd, profileId: options.profileId, profileBaseDir: options.profileBaseDir, + isSubagent: options.isSubagent, + sessionId: options.sessionId, }); // Apply policy filtering diff --git a/src/agent/tools/groups.ts b/src/agent/tools/groups.ts index a886b22b..1e9edf6c 100644 --- a/src/agent/tools/groups.ts +++ b/src/agent/tools/groups.ts @@ -35,6 +35,9 @@ export const TOOL_GROUPS: Record = { // Memory tools (requires profileId) "group:memory": ["memory_get", "memory_set", "memory_delete", "memory_list"], + // Subagent tools + "group:subagent": ["sessions_spawn"], + // All core tools "group:core": [ "read", @@ -76,16 +79,8 @@ export const TOOL_PROFILES: Record { + it("has correct name and description", () => { + const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "test-session" }); + expect(tool.name).toBe("sessions_spawn"); + expect(tool.label).toBe("Spawn Subagent"); + expect(tool.description).toContain("Spawn a background subagent"); + }); + + it("rejects spawn from subagent sessions", async () => { + const tool = createSessionsSpawnTool({ isSubagent: true, sessionId: "child-session" }); + + const result = await tool.execute( + "call-1", + { task: "do something" } as any, + new AbortController().signal, + ); + + expect(result.details.status).toBe("error"); + expect(result.details.error).toContain("not allowed from sub-agent sessions"); + const firstContent = result.content[0] as { type: string; text: string }; + expect(firstContent.text).toContain("not allowed"); + }); + + it("fails gracefully when Hub is not initialized", async () => { + const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "parent-session" }); + + const result = await tool.execute( + "call-2", + { task: "analyze code", label: "Code Analysis" } as any, + new AbortController().signal, + ); + + // Should get an error because Hub singleton is not set up in test + expect(result.details.status).toBe("error"); + expect(result.details.error).toContain("Hub"); + }); +}); diff --git a/src/agent/tools/sessions-spawn.ts b/src/agent/tools/sessions-spawn.ts new file mode 100644 index 00000000..9ae2cc69 --- /dev/null +++ b/src/agent/tools/sessions-spawn.ts @@ -0,0 +1,143 @@ +/** + * sessions_spawn tool — allows a parent agent to spawn subagent runs. + * + * Subagents run in isolated sessions with restricted tools. + * Results are announced back to the parent when the child completes. + */ + +import { v7 as uuidv7 } from "uuid"; +import { Type } from "@sinclair/typebox"; +import type { AgentTool } from "@mariozechner/pi-agent-core"; +import { getHub } from "../../hub/hub-singleton.js"; +import { buildSubagentSystemPrompt } from "../subagent/announce.js"; +import { registerSubagentRun } from "../subagent/registry.js"; + +const SessionsSpawnSchema = Type.Object({ + task: Type.String({ description: "The task for the subagent to perform.", minLength: 1 }), + label: Type.Optional( + Type.String({ description: "Human-readable label for this background task." }), + ), + model: Type.Optional( + Type.String({ description: "Override the LLM model for the subagent (e.g. 'gpt-4o', 'claude-sonnet')." }), + ), + cleanup: Type.Optional( + Type.Union([Type.Literal("delete"), Type.Literal("keep")], { + description: "Session cleanup after completion. 'delete' removes session files, 'keep' preserves for audit. Default: 'delete'.", + }), + ), + timeoutSeconds: Type.Optional( + Type.Number({ + description: "Execution timeout in seconds. The subagent will be terminated if it exceeds this.", + minimum: 1, + }), + ), +}); + +type SessionsSpawnArgs = { + task: string; + label?: string; + model?: string; + cleanup?: "delete" | "keep"; + timeoutSeconds?: number; +}; + +export type SessionsSpawnResult = { + status: "accepted" | "error"; + childSessionId?: string; + runId?: string; + error?: string; +}; + +export interface CreateSessionsSpawnToolOptions { + /** Whether the current agent is itself a subagent */ + isSubagent?: boolean; + /** Session ID of the current (requester) agent */ + sessionId?: string; +} + +export function createSessionsSpawnTool( + options: CreateSessionsSpawnToolOptions, +): AgentTool { + return { + name: "sessions_spawn", + label: "Spawn Subagent", + description: + "Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " + + "When it completes, its findings are announced back to you automatically. " + + "Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.", + parameters: SessionsSpawnSchema, + execute: async (_toolCallId, args) => { + const { task, label, model, cleanup = "delete", timeoutSeconds } = args as SessionsSpawnArgs; + + // Guard: subagents cannot spawn subagents + if (options.isSubagent) { + return { + content: [{ type: "text", text: "Error: sessions_spawn is not allowed from sub-agent sessions." }], + details: { + status: "error", + error: "sessions_spawn is not allowed from sub-agent sessions", + }, + }; + } + + const requesterSessionId = options.sessionId ?? "unknown"; + const runId = uuidv7(); + const childSessionId = uuidv7(); + + // Build system prompt for the child + const systemPrompt = buildSubagentSystemPrompt({ + requesterSessionId, + childSessionId, + label, + task, + }); + + // Spawn child agent via Hub + try { + const hub = getHub(); + const childAgent = hub.createSubagent(childSessionId, { + systemPrompt, + model, + }); + + // Write the task to the child (non-blocking) before registering, + // so waitForIdle() observes the queued work. + childAgent.write(task); + + // Register the run for lifecycle tracking + registerSubagentRun({ + runId, + childSessionId, + requesterSessionId, + task, + label, + cleanup, + timeoutSeconds, + }); + + return { + content: [ + { + type: "text", + text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. You will receive its findings when it completes.`, + }, + ], + details: { + status: "accepted", + childSessionId, + runId, + }, + }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { + content: [{ type: "text", text: `Error spawning subagent: ${message}` }], + details: { + status: "error", + error: message, + }, + }; + } + }, + }; +} diff --git a/src/hub/hub-singleton.ts b/src/hub/hub-singleton.ts new file mode 100644 index 00000000..5d04d0f7 --- /dev/null +++ b/src/hub/hub-singleton.ts @@ -0,0 +1,28 @@ +/** + * Global Hub singleton for cross-module access. + * + * Used by subagent tools and announce flow to interact with the Hub + * without threading references through the entire call chain. + */ + +import type { Hub } from "./hub.js"; + +let _hub: Hub | undefined; + +/** Set the global Hub instance. Called once during Hub construction. */ +export function setHub(hub: Hub): void { + _hub = hub; +} + +/** Get the global Hub instance. Throws if not yet initialized. */ +export function getHub(): Hub { + if (!_hub) { + throw new Error("[Hub] Hub singleton not initialized. Ensure Hub is constructed before accessing."); + } + return _hub; +} + +/** Check if the Hub singleton has been initialized. */ +export function isHubInitialized(): boolean { + return _hub !== undefined; +} diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 10236ff3..31818950 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -9,7 +9,10 @@ import { type ResponseErrorPayload, } from "@multica/sdk"; import { AsyncAgent } from "../agent/async-agent.js"; +import type { AgentOptions } from "../agent/types.js"; import { getHubId } from "./hub-identity.js"; +import { setHub } from "./hub-singleton.js"; +import { initSubagentRegistry, shutdownSubagentRegistry } from "../agent/subagent/index.js"; import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; @@ -48,6 +51,12 @@ export class Hub { this.rpc.register("deleteAgent", createDeleteAgentHandler(this)); this.rpc.register("updateGateway", createUpdateGatewayHandler(this)); + // Register as global singleton for cross-module access (subagent tools, announce flow) + setHub(this); + + // Restore subagent registry from persistent state + initSubagentRegistry(); + this.client = this.createClient(this.url); this.client.connect(); this.restoreAgents(); @@ -243,6 +252,27 @@ export class Hub { } } + /** Create a subagent with specific options (isSubagent, systemPrompt, model) */ + createSubagent(sessionId: string, options: Omit = {}): AsyncAgent { + const existing = this.agents.get(sessionId); + if (existing && !existing.closed) { + return existing; + } + + const agent = new AsyncAgent({ + ...options, + sessionId, + isSubagent: true, + }); + this.agents.set(agent.sessionId, agent); + + // Subagents are ephemeral — don't persist to agent store + void this.consumeAgent(agent); + + console.log(`[Hub] Subagent created: ${agent.sessionId}`); + return agent; + } + getAgent(id: string): AsyncAgent | undefined { return this.agents.get(id); } @@ -266,6 +296,9 @@ export class Hub { } shutdown(): void { + // Finalize subagent registry before closing agents + shutdownSubagentRegistry(); + for (const [id, agent] of this.agents) { agent.close(); this.agents.delete(id);