From 6786af4aa3cdbc9260fae6a107aca0269c76047d Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 17:24:39 +0800 Subject: [PATCH 01/13] feat(subagent): coalesce announcements into single parent notification When multiple children complete, buffer findings per-child and only send one combined announcement to the parent after all unannounced runs for the same requester have finished. This avoids N separate LLM calls and gives the parent a complete picture of all results. Co-Authored-By: Claude Opus 4.6 --- src/agent/subagent/announce.ts | 104 ++++++++++++++++++++++ src/agent/subagent/index.ts | 2 + src/agent/subagent/registry.ts | 157 ++++++++++++++++++++++----------- src/agent/subagent/types.ts | 6 ++ 4 files changed, 216 insertions(+), 53 deletions(-) diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index fbfa9800..ea541d5b 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -13,6 +13,7 @@ import { buildSystemPrompt } from "../system-prompt/index.js"; import type { SubagentAnnounceParams, SubagentRunOutcome, + SubagentRunRecord, SubagentSystemPromptParams, } from "./types.js"; @@ -167,11 +168,114 @@ export function formatAnnouncementMessage(params: FormatAnnouncementParams): str return parts.join("\n"); } +/** + * Format a coalesced announcement message from multiple completed subagent runs. + * When only one record is provided, delegates to formatAnnouncementMessage. + */ +export function formatCoalescedAnnouncementMessage( + records: SubagentRunRecord[], +): string { + // Single record: delegate to existing format for backward-compatible behavior + if (records.length === 1) { + const r = records[0]!; + return formatAnnouncementMessage({ + runId: r.runId, + childSessionId: r.childSessionId, + requesterSessionId: r.requesterSessionId, + task: r.task, + label: r.label, + cleanup: r.cleanup, + outcome: r.outcome, + startedAt: r.startedAt, + endedAt: r.endedAt, + findings: r.findings, + }); + } + + // Multiple records: build combined message + const parts: string[] = [ + `All ${records.length} background tasks have completed. Here are the combined results:`, + "", + ]; + + for (let i = 0; i < records.length; i++) { + const r = records[i]!; + const displayName = r.label || r.task.slice(0, 60); + const statusLabel = formatStatusLabel(r.outcome); + const durationStr = (r.startedAt && r.endedAt) + ? ` (${formatDuration(r.startedAt, r.endedAt)})` + : ""; + + parts.push( + `### Task ${i + 1}: "${displayName}"`, + `Status: ${statusLabel}${durationStr}`, + "", + "Findings:", + r.findings || "(no output)", + "", + ); + } + + // Overall stats + const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[]; + const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[]; + if (allStartTimes.length > 0 && allEndTimes.length > 0) { + const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes)); + parts.push(`Total wall time: ${wallTime}`); + } + + const okCount = records.filter(r => r.outcome?.status === "ok").length; + const failCount = records.length - okCount; + parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`); + + parts.push( + "", + "Summarize these results naturally for the user.", + "Present the combined findings as a coherent summary, not a list of separate reports.", + "Keep it concise but cover the key findings from each task.", + "Do not mention technical details like session IDs or that these were background tasks.", + "You can respond with NO_REPLY if no announcement is needed.", + ); + + return parts.join("\n"); +} + +/** + * Run the coalesced announcement flow for all completed runs of a requester. + * Formats a single combined message and delivers it to the parent agent. + */ +export function runCoalescedAnnounceFlow( + requesterSessionId: string, + records: SubagentRunRecord[], +): boolean { + const message = formatCoalescedAnnouncementMessage(records); + + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, + ); + return false; + } + + parentAgent.write(message); + return true; + } catch (err) { + console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); + return false; + } +} + /** * Run the full subagent announcement flow: * 1. Read child's last assistant reply * 2. Format announcement message * 3. Send to parent agent via Hub + * + * @deprecated Use runCoalescedAnnounceFlow instead, which supports + * batching multiple completed runs into a single announcement. */ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean { const { requesterSessionId, childSessionId } = params; diff --git a/src/agent/subagent/index.ts b/src/agent/subagent/index.ts index 2785d86e..7a934229 100644 --- a/src/agent/subagent/index.ts +++ b/src/agent/subagent/index.ts @@ -28,6 +28,8 @@ export { readLatestAssistantReply, formatAnnouncementMessage, runSubagentAnnounceFlow, + formatCoalescedAnnouncementMessage, + runCoalescedAnnounceFlow, } from "./announce.js"; export type { FormatAnnouncementParams } from "./announce.js"; diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index d6f76b94..980c2793 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -7,7 +7,7 @@ import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; -import { runSubagentAnnounceFlow } from "./announce.js"; +import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js"; import type { RegisterSubagentRunParams, SubagentRunRecord, @@ -27,7 +27,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000; const subagentRuns = new Map(); let sweepTimer: ReturnType | undefined; -const resumedRuns = new Set(); +const resumedRequesters = new Set(); // ============================================================================ // Public API @@ -39,26 +39,43 @@ export function initSubagentRegistry(): void { for (const [runId, record] of persisted) { subagentRuns.set(runId, record); - // Resume incomplete runs - if (!record.cleanupHandled) { - if (record.endedAt) { - // Completed but cleanup not done — run announce flow - if (!resumedRuns.has(runId)) { - resumedRuns.add(runId); - handleRunCompletion(record); - } - } else { - // If not ended, the child agent session is lost on restart — - // mark as ended with unknown outcome - record.endedAt = Date.now(); - record.outcome = { status: "unknown" }; - persist(); - if (!resumedRuns.has(runId)) { - resumedRuns.add(runId); - handleRunCompletion(record); - } + // Backward compat: old records with cleanupHandled but no announced field + if (record.cleanupHandled && record.announced === undefined) { + record.announced = true; + record.findingsCaptured = true; + } + } + + // Process incomplete runs + const affectedRequesters = new Set(); + + for (const record of subagentRuns.values()) { + if (record.announced && record.cleanupHandled) continue; // Already fully done + + if (!record.endedAt) { + // Child was running when process crashed — mark as ended/unknown + record.endedAt = Date.now(); + record.outcome = { status: "unknown" }; + } + + if (!record.findingsCaptured) { + captureFindings(record); + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); } } + + affectedRequesters.add(record.requesterSessionId); + } + + persist(); + + // For each affected requester, check if coalesced announcement is needed + for (const requesterId of affectedRequesters) { + if (!resumedRequesters.has(requesterId)) { + resumedRequesters.add(requesterId); + checkAndAnnounce(requesterId); + } } if (subagentRuns.size > 0) { @@ -138,11 +155,17 @@ export function shutdownSubagentRegistry(): void { record.outcome = { status: "unknown" }; updated++; } + + // Opportunistically capture findings for ended-but-uncaptured runs + if (record.endedAt && !record.findingsCaptured) { + captureFindings(record); + updated++; + } } if (updated > 0) { persist(); - console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`); + console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`); } stopSweeper(); @@ -151,7 +174,7 @@ export function shutdownSubagentRegistry(): void { /** Reset all state (for testing). */ export function resetSubagentRegistryForTests(): void { subagentRuns.clear(); - resumedRuns.clear(); + resumedRequesters.clear(); stopSweeper(); } @@ -222,44 +245,73 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): vo } // ============================================================================ -// Cleanup + Announce +// Cleanup + Announce (two-phase: capture findings, then coalesced announce) // ============================================================================ -function handleRunCompletion(record: SubagentRunRecord): void { - if (record.cleanupHandled) return; - record.cleanupHandled = true; +/** Phase 1: Capture child's findings before session deletion. */ +function captureFindings(record: SubagentRunRecord): void { + try { + const findings = readLatestAssistantReply(record.childSessionId); + record.findings = findings ?? undefined; + } catch { + record.findings = "(failed to read findings)"; + } + record.findingsCaptured = true; persist(); +} - // Run announce flow - const announced = runSubagentAnnounceFlow({ - runId: record.runId, - childSessionId: record.childSessionId, - requesterSessionId: record.requesterSessionId, - task: record.task, - label: record.label, - cleanup: record.cleanup, - outcome: record.outcome, - startedAt: record.startedAt, - endedAt: record.endedAt, - }); +/** + * Phase 2: Check if all unannounced runs for this requester have completed. + * If so, send a single coalesced announcement to the parent. + */ +function checkAndAnnounce(requesterSessionId: string): void { + const allRuns = listSubagentRuns(requesterSessionId); - if (!announced) { - console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`); - // Allow retry on next restart if announce failed. - record.cleanupHandled = false; + // Only consider unannounced runs + const pending = allRuns.filter(r => !r.announced); + if (pending.length === 0) return; + + // Are all unannounced runs done? + const allDone = pending.every(r => r.endedAt !== undefined); + if (!allDone) return; + + // Have all had findings captured? + const allCaptured = pending.every(r => r.findingsCaptured); + if (!allCaptured) return; + + // All done — send coalesced announcement + const announced = runCoalescedAnnounceFlow(requesterSessionId, pending); + + if (announced) { + for (const r of pending) { + r.announced = true; + r.cleanupHandled = true; + r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; + r.cleanupCompletedAt = Date.now(); + } persist(); - return; + } else { + console.warn( + `[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`, + ); + // Leave announced=false so initSubagentRegistry() can retry on restart + } +} + +/** Entry point: called when a child completes. */ +function handleRunCompletion(record: SubagentRunRecord): void { + // Phase 1: capture findings (before session deletion) + if (!record.findingsCaptured) { + captureFindings(record); + + // Session cleanup (safe now that findings are persisted) + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); + } } - // Handle session cleanup - if (record.cleanup === "delete") { - deleteChildSession(record.childSessionId); - } - - // Schedule archive - record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; - record.cleanupCompletedAt = Date.now(); - persist(); + // Phase 2: coalesced announce check + checkAndAnnounce(record.requesterSessionId); } function deleteChildSession(sessionId: string): void { @@ -305,7 +357,6 @@ function sweep(): void { for (const [runId, record] of subagentRuns) { if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) { subagentRuns.delete(runId); - resumedRuns.delete(runId); removed++; } } diff --git a/src/agent/subagent/types.ts b/src/agent/subagent/types.ts index d4043572..8edfc0dc 100644 --- a/src/agent/subagent/types.ts +++ b/src/agent/subagent/types.ts @@ -39,6 +39,12 @@ export type SubagentRunRecord = { cleanupHandled?: boolean | undefined; /** Timestamp when cleanup completed */ cleanupCompletedAt?: number | undefined; + /** Captured findings from the child session's last assistant reply */ + findings?: string | undefined; + /** Whether findings have been captured (safe to delete session after this) */ + findingsCaptured?: boolean | undefined; + /** Whether the coalesced announcement has been sent to parent */ + announced?: boolean | undefined; }; /** Parameters for registering a new subagent run */ From 7f9a0181c84d37839d6b9e6deccca11b3d8128b9 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 17:24:44 +0800 Subject: [PATCH 02/13] test(subagent): add coalescing announcement tests Cover formatCoalescedAnnouncementMessage (single/multi record, mixed outcomes, missing findings), registry coalescing state transitions, shutdown capture, and new field serialization. Co-Authored-By: Claude Opus 4.6 --- src/agent/subagent/announce.test.ts | 112 +++++++++++++++++++++- src/agent/subagent/registry-store.test.ts | 46 +++++++++ src/agent/subagent/registry.test.ts | 85 ++++++++++++++++ 3 files changed, 242 insertions(+), 1 deletion(-) diff --git a/src/agent/subagent/announce.test.ts b/src/agent/subagent/announce.test.ts index faf6ca35..28fafee1 100644 --- a/src/agent/subagent/announce.test.ts +++ b/src/agent/subagent/announce.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from "vitest"; -import { buildSubagentSystemPrompt, formatAnnouncementMessage } from "./announce.js"; +import { buildSubagentSystemPrompt, formatAnnouncementMessage, formatCoalescedAnnouncementMessage } from "./announce.js"; import type { FormatAnnouncementParams } from "./announce.js"; +import type { SubagentRunRecord } from "./types.js"; describe("buildSubagentSystemPrompt", () => { it("includes task and session context", () => { @@ -126,3 +127,112 @@ describe("formatAnnouncementMessage", () => { expect(msg).toContain("NO_REPLY"); }); }); + +describe("formatCoalescedAnnouncementMessage", () => { + function makeRecord(overrides: Partial = {}): SubagentRunRecord { + return { + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Default task", + cleanup: "delete", + createdAt: 1000000, + startedAt: 1000000, + endedAt: 1030000, + outcome: { status: "ok" }, + findings: "Some findings", + findingsCaptured: true, + announced: false, + ...overrides, + }; + } + + it("delegates to formatAnnouncementMessage for a single record", () => { + const record = makeRecord({ label: "Code Analysis" }); + const coalesced = formatCoalescedAnnouncementMessage([record]); + const direct = formatAnnouncementMessage({ + runId: record.runId, + childSessionId: record.childSessionId, + requesterSessionId: record.requesterSessionId, + task: record.task, + label: record.label, + cleanup: record.cleanup, + outcome: record.outcome, + startedAt: record.startedAt, + endedAt: record.endedAt, + findings: record.findings, + }); + + expect(coalesced).toBe(direct); + }); + + it("formats multiple records with all task findings and stats", () => { + const records = [ + makeRecord({ + runId: "run-1", + childSessionId: "child-1", + label: "Task A", + findings: "Found issue A", + startedAt: 1000000, + endedAt: 1030000, + }), + makeRecord({ + runId: "run-2", + childSessionId: "child-2", + label: "Task B", + findings: "Found issue B", + startedAt: 1000000, + endedAt: 1045000, // 45 seconds + }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("All 2 background tasks have completed"); + expect(msg).toContain('Task 1: "Task A"'); + expect(msg).toContain("Found issue A"); + expect(msg).toContain('Task 2: "Task B"'); + expect(msg).toContain("Found issue B"); + expect(msg).toContain("Total wall time: 45s"); + expect(msg).toContain("2 succeeded, 0 failed"); + }); + + it("reports mixed outcomes correctly", () => { + const records = [ + makeRecord({ runId: "run-1", label: "OK Task", outcome: { status: "ok" } }), + makeRecord({ runId: "run-2", label: "Failed Task", outcome: { status: "error", error: "crash" } }), + makeRecord({ runId: "run-3", label: "Timeout Task", outcome: { status: "timeout" } }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("completed successfully"); + expect(msg).toContain("failed: crash"); + expect(msg).toContain("timed out"); + expect(msg).toContain("1 succeeded, 2 failed"); + }); + + it("shows (no output) for missing findings", () => { + const records = [ + makeRecord({ runId: "run-1", findings: undefined }), + makeRecord({ runId: "run-2", findings: "Has output" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("(no output)"); + expect(msg).toContain("Has output"); + }); + + it("includes combined summary instruction for multi-record", () => { + const records = [ + makeRecord({ runId: "run-1" }), + makeRecord({ runId: "run-2" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("combined findings"); + expect(msg).toContain("NO_REPLY"); + }); +}); diff --git a/src/agent/subagent/registry-store.test.ts b/src/agent/subagent/registry-store.test.ts index 7247203c..bd7a5814 100644 --- a/src/agent/subagent/registry-store.test.ts +++ b/src/agent/subagent/registry-store.test.ts @@ -78,4 +78,50 @@ describe("registry-store serialization", () => { expect(parsed.outcome?.status).toBe("error"); expect(parsed.outcome?.error).toBe("Something went wrong"); }); + + it("round-trips coalescing fields (findings, findingsCaptured, announced)", () => { + const record: SubagentRunRecord = { + runId: "run-coalesce", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Coalesce test", + cleanup: "delete", + createdAt: Date.now(), + endedAt: Date.now() + 5000, + outcome: { status: "ok" }, + findings: "Found 3 issues in auth module.", + findingsCaptured: true, + announced: true, + }; + + const json = JSON.stringify({ version: 1, runs: { "run-coalesce": record } }); + const parsed = JSON.parse(json); + const restored = parsed.runs["run-coalesce"] as SubagentRunRecord; + + expect(restored.findings).toBe("Found 3 issues in auth module."); + expect(restored.findingsCaptured).toBe(true); + expect(restored.announced).toBe(true); + }); + + it("round-trips record with undefined coalescing fields", () => { + const record: SubagentRunRecord = { + runId: "run-old", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Old record", + cleanup: "delete", + createdAt: Date.now(), + cleanupHandled: true, + // No findings, findingsCaptured, or announced fields (old format) + }; + + const json = JSON.stringify({ version: 1, runs: { "run-old": record } }); + const parsed = JSON.parse(json); + const restored = parsed.runs["run-old"] as SubagentRunRecord; + + expect(restored.findings).toBeUndefined(); + expect(restored.findingsCaptured).toBeUndefined(); + expect(restored.announced).toBeUndefined(); + expect(restored.cleanupHandled).toBe(true); + }); }); diff --git a/src/agent/subagent/registry.test.ts b/src/agent/subagent/registry.test.ts index dda78917..9d0fe931 100644 --- a/src/agent/subagent/registry.test.ts +++ b/src/agent/subagent/registry.test.ts @@ -159,3 +159,88 @@ describe("subagent registry", () => { expect(getSubagentRun("run-1")).toBeUndefined(); }); }); + +describe("subagent registry — coalescing", () => { + // Without Hub, watchChildAgent ends runs immediately with "Hub not initialized". + // This allows us to test the coalescing state transitions. + + it("captures findings when a run completes (no Hub)", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task 1", + }); + + const record = getSubagentRun("run-1"); + // Run ended immediately due to no Hub + expect(record?.endedAt).toBeGreaterThan(0); + expect(record?.findingsCaptured).toBe(true); + }); + + it("does not announce while sibling runs are still pending", () => { + // Register first run — ends immediately (no Hub) + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task 1", + }); + + const record1 = getSubagentRun("run-1"); + expect(record1?.findingsCaptured).toBe(true); + + // Register second run — also ends immediately + registerSubagentRun({ + runId: "run-2", + childSessionId: "child-2", + requesterSessionId: "parent-1", + task: "Task 2", + }); + + const record2 = getSubagentRun("run-2"); + expect(record2?.findingsCaptured).toBe(true); + + // Both ended, but announce fails because no Hub for parent agent. + // The key check: both records should have findings captured. + // announced will be false because runCoalescedAnnounceFlow fails (no Hub). + expect(record1?.announced).toBeUndefined(); + expect(record2?.announced).toBeUndefined(); + }); + + it("single run captures findings immediately", () => { + registerSubagentRun({ + runId: "run-solo", + childSessionId: "child-solo", + requesterSessionId: "parent-solo", + task: "Solo task", + }); + + const record = getSubagentRun("run-solo"); + expect(record?.endedAt).toBeGreaterThan(0); + expect(record?.findingsCaptured).toBe(true); + expect(record?.outcome?.status).toBe("error"); + expect(record?.outcome?.error).toContain("Hub not initialized"); + }); + + it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task", + }); + + const record = getSubagentRun("run-1"); + if (record) { + // Simulate: run ended but findings not yet captured + record.endedAt = Date.now(); + record.outcome = { status: "ok" }; + record.findingsCaptured = undefined; + } + + shutdownSubagentRegistry(); + + expect(record?.findingsCaptured).toBe(true); + }); +}); From 77260796488af735b1af9d0ac5e6ba75b9cb36fd Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 17:31:21 +0800 Subject: [PATCH 03/13] fix(subagent): recover delete cleanup after crash --- src/agent/subagent/registry-recovery.test.ts | 75 ++++++++++++++++++++ src/agent/subagent/registry.ts | 9 ++- 2 files changed, 81 insertions(+), 3 deletions(-) create mode 100644 src/agent/subagent/registry-recovery.test.ts diff --git a/src/agent/subagent/registry-recovery.test.ts b/src/agent/subagent/registry-recovery.test.ts new file mode 100644 index 00000000..046db13f --- /dev/null +++ b/src/agent/subagent/registry-recovery.test.ts @@ -0,0 +1,75 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { SubagentRunRecord } from "./types.js"; + +const loadSubagentRunsMock = vi.fn<() => Map>(); +const saveSubagentRunsMock = vi.fn(); +const readLatestAssistantReplyMock = vi.fn(); +const runCoalescedAnnounceFlowMock = vi.fn(() => false); +const resolveSessionDirMock = vi.fn((sessionId: string) => `/tmp/${sessionId}`); +const closeAgentMock = vi.fn(); +const getHubMock = vi.fn(() => ({ closeAgent: closeAgentMock })); +const rmSyncMock = vi.fn(); + +vi.mock("./registry-store.js", () => ({ + loadSubagentRuns: loadSubagentRunsMock, + saveSubagentRuns: saveSubagentRunsMock, +})); + +vi.mock("./announce.js", () => ({ + readLatestAssistantReply: readLatestAssistantReplyMock, + runCoalescedAnnounceFlow: runCoalescedAnnounceFlowMock, +})); + +vi.mock("../session/storage.js", () => ({ + resolveSessionDir: resolveSessionDirMock, +})); + +vi.mock("../../hub/hub-singleton.js", () => ({ + getHub: getHubMock, + isHubInitialized: vi.fn(() => false), +})); + +vi.mock("node:fs", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + rmSync: rmSyncMock, + }; +}); + +describe("subagent registry recovery cleanup", () => { + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + loadSubagentRunsMock.mockReturnValue(new Map()); + runCoalescedAnnounceFlowMock.mockReturnValue(false); + }); + + it("deletes child session on recovery even when findings were already captured", async () => { + const now = Date.now(); + const record: SubagentRunRecord = { + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "task", + cleanup: "delete", + createdAt: now - 1000, + startedAt: now - 900, + endedAt: now - 100, + outcome: { status: "ok" }, + findings: "done", + findingsCaptured: true, + cleanupHandled: false, + announced: false, + }; + + loadSubagentRunsMock.mockReturnValue(new Map([["run-1", record]])); + + const registry = await import("./registry.js"); + registry.initSubagentRegistry(); + + expect(readLatestAssistantReplyMock).not.toHaveBeenCalled(); + expect(resolveSessionDirMock).toHaveBeenCalledWith("child-1"); + expect(rmSyncMock).toHaveBeenCalledWith("/tmp/child-1", { recursive: true, force: true }); + }); +}); diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index 980c2793..0dbfa971 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -60,9 +60,12 @@ export function initSubagentRegistry(): void { if (!record.findingsCaptured) { captureFindings(record); - if (record.cleanup === "delete") { - deleteChildSession(record.childSessionId); - } + } + + // Recovery cleanup must be independent from findings capture: + // the process may crash after captureFindings() persisted but before deletion. + if (record.cleanup === "delete" && !record.cleanupHandled) { + deleteChildSession(record.childSessionId); } affectedRequesters.add(record.requesterSessionId); From 9cc89cf29778300f548f126c226e48d907467ef8 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 17:58:22 +0800 Subject: [PATCH 04/13] feat(subagent): add sessions_list tool for viewing spawned sub-tasks Adds a new `sessions_list` tool to the Subagent tool group, allowing agents to query the status of their spawned sub-tasks. Supports both list mode (all runs) and detail mode (specific runId). Co-Authored-By: Claude Opus 4.6 --- src/agent/tools.ts | 5 + src/agent/tools/groups.ts | 2 +- src/agent/tools/index.ts | 1 + src/agent/tools/sessions-list.test.ts | 169 +++++++++++++++++++++++ src/agent/tools/sessions-list.ts | 187 ++++++++++++++++++++++++++ 5 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 src/agent/tools/sessions-list.test.ts create mode 100644 src/agent/tools/sessions-list.ts diff --git a/src/agent/tools.ts b/src/agent/tools.ts index 4385a534..fdb7ad1f 100644 --- a/src/agent/tools.ts +++ b/src/agent/tools.ts @@ -7,6 +7,7 @@ import { createProcessTool } from "./tools/process.js"; import { createGlobTool } from "./tools/glob.js"; import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js"; import { createSessionsSpawnTool } from "./tools/sessions-spawn.js"; +import { createSessionsListTool } from "./tools/sessions-list.js"; import { createMemorySearchTool } from "./tools/memory-search.js"; import { createCronTool } from "./tools/cron/index.js"; import { filterTools } from "./tools/policy.js"; @@ -133,6 +134,10 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool< }); tools.push(sessionsSpawnTool as AgentTool); + // Add sessions_list tool + const sessionsListTool = createSessionsListTool({ sessionId }); + tools.push(sessionsListTool as AgentTool); + return tools; } diff --git a/src/agent/tools/groups.ts b/src/agent/tools/groups.ts index b2430cb4..f61c9037 100644 --- a/src/agent/tools/groups.ts +++ b/src/agent/tools/groups.ts @@ -34,7 +34,7 @@ export const TOOL_GROUPS: Record = { "group:memory": ["memory_search"], // Subagent tools - "group:subagent": ["sessions_spawn"], + "group:subagent": ["sessions_spawn", "sessions_list"], // Cron/scheduling tools "group:cron": ["cron"], diff --git a/src/agent/tools/index.ts b/src/agent/tools/index.ts index 5e4902ea..e225c54c 100644 --- a/src/agent/tools/index.ts +++ b/src/agent/tools/index.ts @@ -8,6 +8,7 @@ export { createProcessTool } from "./process.js"; export { createGlobTool } from "./glob.js"; export { createWebFetchTool, createWebSearchTool } from "./web/index.js"; export { createCronTool } from "./cron/index.js"; +export { createSessionsListTool } from "./sessions-list.js"; // Tool groups export { diff --git a/src/agent/tools/sessions-list.test.ts b/src/agent/tools/sessions-list.test.ts new file mode 100644 index 00000000..a0bf6c1a --- /dev/null +++ b/src/agent/tools/sessions-list.test.ts @@ -0,0 +1,169 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import type { SubagentRunRecord } from "../subagent/types.js"; + +// Mock the registry module before importing the tool +vi.mock("../subagent/registry.js", () => ({ + listSubagentRuns: vi.fn(), + getSubagentRun: vi.fn(), +})); + +import { createSessionsListTool } from "./sessions-list.js"; +import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js"; + +const mockListSubagentRuns = vi.mocked(listSubagentRuns); +const mockGetSubagentRun = vi.mocked(getSubagentRun); + +function makeRecord(overrides: Partial = {}): SubagentRunRecord { + return { + runId: "run-001", + childSessionId: "child-001", + requesterSessionId: "parent-001", + task: "Test task", + cleanup: "delete", + createdAt: 1700000000000, + ...overrides, + }; +} + +describe("sessions_list tool", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("returns empty message when no runs exist", async () => { + mockListSubagentRuns.mockReturnValue([]); + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", {}); + + expect(result.content[0]).toEqual({ + type: "text", + text: "No subagent runs for this session.", + }); + expect(result.details).toEqual({ runs: [] }); + }); + + it("lists multiple runs with correct status mapping", async () => { + const now = Date.now(); + const runs: SubagentRunRecord[] = [ + makeRecord({ + runId: "run-aaa", + label: "Code Review", + startedAt: now - 45000, + }), + makeRecord({ + runId: "run-bbb", + label: "Test Analysis", + startedAt: now - 60000, + endedAt: now - 30000, + outcome: { status: "ok" }, + }), + makeRecord({ + runId: "run-ccc", + label: "Lint Check", + startedAt: now - 60000, + endedAt: now, + outcome: { status: "error", error: "timeout" }, + }), + ]; + mockListSubagentRuns.mockReturnValue(runs); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", {}); + + const text = result.content[0]!; + expect(text.type).toBe("text"); + expect((text as { text: string }).text).toContain("3 total"); + expect((text as { text: string }).text).toContain("[running]"); + expect((text as { text: string }).text).toContain("[ok]"); + expect((text as { text: string }).text).toContain("[error]"); + expect((text as { text: string }).text).toContain("Code Review"); + expect((text as { text: string }).text).toContain("Test Analysis"); + expect((text as { text: string }).text).toContain("Lint Check"); + + expect(result.details!.runs).toHaveLength(3); + expect(result.details!.runs[0]!.status).toBe("running"); + expect(result.details!.runs[1]!.status).toBe("ok"); + expect(result.details!.runs[2]!.status).toBe("error"); + }); + + it("returns detail for a specific runId", async () => { + const now = Date.now(); + const record = makeRecord({ + runId: "run-detail", + label: "Deep Analysis", + task: "Analyze the authentication module thoroughly", + startedAt: now - 90000, + endedAt: now - 10000, + outcome: { status: "ok" }, + findings: "Found 2 potential issues in token validation.", + findingsCaptured: true, + }); + mockGetSubagentRun.mockReturnValue(record); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "run-detail" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Run: run-detail"); + expect(text).toContain("Label: Deep Analysis"); + expect(text).toContain("Status: ok"); + expect(text).toContain("Found 2 potential issues"); + expect(text).toContain("Duration:"); + + expect(result.details!.runs).toHaveLength(1); + expect(result.details!.runs[0]!.runId).toBe("run-detail"); + }); + + it("returns not found for unknown runId", async () => { + mockGetSubagentRun.mockReturnValue(undefined); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "nonexistent" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Run not found"); + expect(result.details).toEqual({ runs: [] }); + }); + + it("rejects runId belonging to a different requester", async () => { + const record = makeRecord({ + runId: "run-other", + requesterSessionId: "other-parent", + }); + mockGetSubagentRun.mockReturnValue(record); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "run-other" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Run not found"); + expect(result.details).toEqual({ runs: [] }); + }); + + it("handles missing sessionId gracefully", async () => { + const tool = createSessionsListTool({}); + const result = await tool.execute("call-1", {}); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("No session ID available"); + expect(result.details).toEqual({ runs: [] }); + }); + + it("shows findings status for running task", async () => { + const now = Date.now(); + const record = makeRecord({ + runId: "run-running", + label: "Still Running", + startedAt: now - 30000, + // no endedAt + }); + mockGetSubagentRun.mockReturnValue(record); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "run-running" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Status: running"); + expect(text).toContain("Findings: (still running)"); + }); +}); diff --git a/src/agent/tools/sessions-list.ts b/src/agent/tools/sessions-list.ts new file mode 100644 index 00000000..1106b21e --- /dev/null +++ b/src/agent/tools/sessions-list.ts @@ -0,0 +1,187 @@ +/** + * sessions_list tool — allows an agent to view its spawned subagent runs. + * + * Lists all subagent runs for the current session, or shows details for a + * specific run when a runId is provided. + */ + +import { Type } from "@sinclair/typebox"; +import type { AgentTool } from "@mariozechner/pi-agent-core"; +import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js"; +import type { SubagentRunRecord } from "../subagent/types.js"; + +const SessionsListSchema = Type.Object({ + runId: Type.Optional( + Type.String({ description: "Optional run ID to get details for a specific run. If omitted, lists all runs." }), + ), +}); + +type SessionsListArgs = { + runId?: string; +}; + +export type SessionsListResult = { + runs: Array<{ + runId: string; + label?: string; + task: string; + status: "running" | "ok" | "error" | "timeout" | "unknown"; + startedAt?: number; + endedAt?: number; + findings?: string; + }>; +}; + +export interface CreateSessionsListToolOptions { + /** Session ID of the current (requester) agent */ + sessionId?: string; +} + +function resolveStatus(record: SubagentRunRecord): "running" | "ok" | "error" | "timeout" | "unknown" { + if (!record.endedAt) return "running"; + return record.outcome?.status ?? "unknown"; +} + +function formatElapsed(ms: number): string { + const totalSeconds = Math.round(ms / 1000); + if (totalSeconds < 60) return `${totalSeconds}s`; + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`; + const hours = Math.floor(minutes / 60); + const remainingMinutes = minutes % 60; + return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`; +} + +function formatRunSummary(record: SubagentRunRecord, index: number, now: number): string { + const status = resolveStatus(record); + const displayName = record.label || record.task.slice(0, 60); + const statusTag = `[${status}]`.padEnd(10); + + let timing = ""; + if (status === "running" && record.startedAt) { + timing = `started ${formatElapsed(now - record.startedAt)} ago`; + } else if (record.startedAt && record.endedAt) { + timing = `completed in ${formatElapsed(record.endedAt - record.startedAt)}`; + } + + const parts = [`#${index + 1} ${statusTag} "${displayName}"`]; + if (timing) parts.push(`(${record.runId.slice(0, 8)}…, ${timing})`); + else parts.push(`(${record.runId.slice(0, 8)}…)`); + + return parts.join(" "); +} + +function formatRunDetail(record: SubagentRunRecord, now: number): string { + const status = resolveStatus(record); + const lines: string[] = [ + `Run: ${record.runId}`, + ]; + + if (record.label) lines.push(`Label: ${record.label}`); + lines.push(`Task: ${record.task}`); + lines.push(`Status: ${status}${record.outcome?.error ? ` — ${record.outcome.error}` : ""}`); + lines.push(`Child Session: ${record.childSessionId}`); + lines.push(`Created: ${new Date(record.createdAt).toISOString()} (${formatElapsed(now - record.createdAt)} ago)`); + + if (record.startedAt) { + lines.push(`Started: ${new Date(record.startedAt).toISOString()} (${formatElapsed(now - record.startedAt)} ago)`); + } + if (record.endedAt) { + lines.push(`Ended: ${new Date(record.endedAt).toISOString()}`); + if (record.startedAt) { + lines.push(`Duration: ${formatElapsed(record.endedAt - record.startedAt)}`); + } + } + + if (record.findingsCaptured) { + lines.push(`Findings: ${record.findings || "(no output)"}`); + } else if (record.endedAt) { + lines.push("Findings: (not yet captured)"); + } else { + lines.push("Findings: (still running)"); + } + + if (record.announced) lines.push("Announced: yes"); + + return lines.join("\n"); +} + +function toResultRun(record: SubagentRunRecord) { + return { + runId: record.runId, + label: record.label, + task: record.task, + status: resolveStatus(record), + startedAt: record.startedAt, + endedAt: record.endedAt, + findings: record.findings, + }; +} + +export function createSessionsListTool( + options: CreateSessionsListToolOptions, +): AgentTool { + return { + name: "sessions_list", + label: "List Subagent Runs", + description: + "List all subagent runs spawned by this session and their current status. " + + "Optionally pass a runId to get detailed information about a specific run.", + parameters: SessionsListSchema, + execute: async (_toolCallId, args) => { + const { runId } = args as SessionsListArgs; + const requesterSessionId = options.sessionId; + + if (!requesterSessionId) { + return { + content: [{ type: "text", text: "No session ID available. Cannot list subagent runs." }], + details: { runs: [] }, + }; + } + + const now = Date.now(); + + // Detail mode: specific run + if (runId) { + const record = getSubagentRun(runId); + if (!record) { + return { + content: [{ type: "text", text: `Run not found: ${runId}` }], + details: { runs: [] }, + }; + } + if (record.requesterSessionId !== requesterSessionId) { + return { + content: [{ type: "text", text: `Run not found: ${runId}` }], + details: { runs: [] }, + }; + } + return { + content: [{ type: "text", text: formatRunDetail(record, now) }], + details: { runs: [toResultRun(record)] }, + }; + } + + // List mode: all runs for this session + const runs = listSubagentRuns(requesterSessionId); + + if (runs.length === 0) { + return { + content: [{ type: "text", text: "No subagent runs for this session." }], + details: { runs: [] }, + }; + } + + const lines = [`Subagent runs for this session: ${runs.length} total`, ""]; + for (let i = 0; i < runs.length; i++) { + lines.push(formatRunSummary(runs[i]!, i, now)); + } + + return { + content: [{ type: "text", text: lines.join("\n") }], + details: { runs: runs.map(toResultRun) }, + }; + }, + }; +} From d392238be30c8605be029ab3e0a254bfcc3f721c Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 18:34:11 +0800 Subject: [PATCH 05/13] feat(subagent): tag internal orchestration messages and filter from user-facing history Announcement messages from subagent completion flows were persisted as regular user messages, polluting conversation history and leaking orchestration instructions to frontend/CLI. - Add `internal?: boolean` to SessionEntry message variant - SessionManager.saveMessage accepts { internal: true } option - SessionManager.loadMessages filters internal by default - Agent.runInternal() tags messages as internal, rolls back from memory - Agent.withRunMutex() prevents concurrent run/runInternal mis-tagging - AsyncAgent.writeInternal() suppresses event forwarding during internal runs - Announce flows use writeInternal() instead of write() - Desktop IPC getHistory reads from session storage (filtered) - CLI session show parses SessionEntry, supports --show-internal flag Co-Authored-By: Claude Opus 4.6 --- apps/desktop/electron/ipc/hub.ts | 5 ++- src/agent/async-agent.ts | 38 +++++++++++++++- src/agent/cli/commands/session.ts | 24 +++++++--- src/agent/runner.ts | 67 ++++++++++++++++++++++++++-- src/agent/session/session-manager.ts | 19 +++++--- src/agent/session/types.ts | 2 +- src/agent/subagent/announce.ts | 4 +- 7 files changed, 139 insertions(+), 20 deletions(-) diff --git a/apps/desktop/electron/ipc/hub.ts b/apps/desktop/electron/ipc/hub.ts index b038efe8..68dd5550 100644 --- a/apps/desktop/electron/ipc/hub.ts +++ b/apps/desktop/electron/ipc/hub.ts @@ -343,6 +343,9 @@ export function registerHubIpcHandlers(): void { * Get message history for local chat with pagination. * Returns raw AgentMessageItem[] so the renderer can render content blocks, * tool results, thinking blocks, etc. — same format as the Gateway RPC. + * + * Reads from session storage (not in-memory state) so that internal + * orchestration messages are excluded by default. */ ipcMain.handle('localChat:getHistory', async (_event, agentId: string, options?: { offset?: number; limit?: number }) => { const h = getHub() @@ -353,7 +356,7 @@ export function registerHubIpcHandlers(): void { try { await agent.ensureInitialized() - const allMessages = agent.getMessages() + const allMessages = agent.loadSessionMessages() const total = allMessages.length // Must match DEFAULT_MESSAGES_LIMIT from @multica/sdk/actions/rpc const limit = options?.limit ?? 200 diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index db22b843..e7aa7c2e 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -25,8 +25,11 @@ export class AsyncAgent { }); this.sessionId = this.agent.sessionId; - // Forward raw AgentEvent and MulticaEvent into the channel + // Forward raw AgentEvent and MulticaEvent into the channel. + // Suppress forwarding during internal runs to avoid leaking + // orchestration messages to the frontend/real-time stream. this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => { + if (this.agent.isInternalRun) return; this.channel.send(event); }); } @@ -57,6 +60,29 @@ export class AsyncAgent { }); } + /** + * Write an internal message to agent (non-blocking, serialized queue). + * Messages are persisted with `internal: true` and rolled back from + * in-memory state. Events are suppressed from the real-time stream. + */ + writeInternal(content: string): void { + if (this._closed) throw new Error("Agent is closed"); + + this.queue = this.queue + .then(async () => { + if (this._closed) return; + const result = await this.agent.runInternal(content); + await this.agent.flushSession(); + if (result.error) { + this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + } + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + }); + } + /** Continuously read channel stream (AgentEvent + error Messages) */ read(): AsyncIterable { return this.channel; @@ -226,12 +252,20 @@ export class AsyncAgent { } /** - * Get all messages from the current session. + * Get all messages from the current session (in-memory state). */ getMessages(): AgentMessage[] { return this.agent.getMessages(); } + /** + * Load messages from session storage with filtering. + * By default, internal messages are excluded. + */ + loadSessionMessages(options?: { includeInternal?: boolean }): AgentMessage[] { + return this.agent.loadSessionMessages(options); + } + /** * Get current provider and model information. */ diff --git a/src/agent/cli/commands/session.ts b/src/agent/cli/commands/session.ts index e726cf60..149143c7 100644 --- a/src/agent/cli/commands/session.ts +++ b/src/agent/cli/commands/session.ts @@ -22,7 +22,7 @@ ${cyan("Usage:")} multica session [options] ${cyan("Commands:")} ${yellow("list")} List all sessions - ${yellow("show")} Show session details + ${yellow("show")} Show session details (use --show-internal to include internal messages) ${yellow("delete")} Delete a session ${yellow("help")} Show this help @@ -122,7 +122,7 @@ function cmdList() { console.log(`${dim("Resume with:")} multica --session `); } -function cmdShow(sessionId: string | undefined) { +function cmdShow(sessionId: string | undefined, showInternal = false) { if (!sessionId) { console.error("Error: Session ID is required"); console.error("Usage: multica session show "); @@ -160,14 +160,25 @@ function cmdShow(sessionId: string | undefined) { console.log(cyan("─".repeat(60))); console.log(""); - // Parse and display messages + // Parse and display messages as SessionEntry objects for (const line of lines) { try { - const msg = JSON.parse(line); + const entry = JSON.parse(line); + + // Only display message entries + if (entry.type !== "message") continue; + + // Skip internal messages unless --show-internal + if (entry.internal && !showInternal) continue; + + const msg = entry.message; + if (!msg) continue; + const role = msg.role || "unknown"; const roleColor = role === "user" ? green : role === "assistant" ? cyan : dim; + const internalTag = entry.internal ? dim(" [internal]") : ""; - console.log(`${roleColor(`[${role}]`)}`); + console.log(`${roleColor(`[${role}]`)}${internalTag}`); if (typeof msg.content === "string") { // Truncate long content @@ -238,6 +249,7 @@ function cmdDelete(sessionId: string | undefined) { export async function sessionCommand(args: string[]): Promise { const command = (args[0] || "help") as Command; const arg1 = args[1]; + const showInternal = args.includes("--show-internal"); if (args.includes("--help") || args.includes("-h")) { printHelp(); @@ -249,7 +261,7 @@ export async function sessionCommand(args: string[]): Promise { cmdList(); break; case "show": - cmdShow(arg1); + cmdShow(arg1, showInternal); break; case "delete": cmdDelete(arg1); diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 65fd528b..8955b04c 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -84,6 +84,10 @@ export class Agent { private readonly stderr: NodeJS.WritableStream; private initialized = false; + // Internal run state + private _internalRun = false; + private _runMutex: Promise = Promise.resolve(); + // MulticaEvent subscribers (parallel to PiAgentCore's subscriber list) // Typed as AgentEvent | MulticaEvent to match subscribeAll() callback signature private multicaListeners: Array<(event: AgentEvent | MulticaEvent) => void> = []; @@ -353,6 +357,48 @@ export class Agent { } async run(prompt: string): Promise { + // Run-level mutex: prevents concurrent run/runInternal from mis-tagging messages + return this.withRunMutex(() => this._run(prompt)); + } + + /** + * Run a prompt as an internal turn. + * Messages are persisted with `internal: true` and rolled back from + * in-memory state after the turn completes, so they do not pollute + * the main conversation context. + */ + async runInternal(prompt: string): Promise { + return this.withRunMutex(async () => { + const messageCountBefore = this.agent.state.messages.length; + this._internalRun = true; + try { + const result = await this._run(prompt); + return result; + } finally { + this._internalRun = false; + // Roll back internal messages from in-memory state + const current = this.agent.state.messages; + if (current.length > messageCountBefore) { + this.agent.replaceMessages(current.slice(0, messageCountBefore)); + } + } + }); + } + + private async withRunMutex(fn: () => Promise): Promise { + // Chain on the mutex so only one run executes at a time + const prev = this._runMutex; + let resolve: () => void; + this._runMutex = new Promise((r) => { resolve = r; }); + await prev; + try { + return await fn(); + } finally { + resolve!(); + } + } + + private async _run(prompt: string): Promise { await this.ensureInitialized(); this.output.state.lastAssistantText = ""; @@ -442,8 +488,10 @@ export class Agent { private handleSessionEvent(event: AgentEvent) { if (event.type === "message_end") { const message = event.message as AgentMessage; - this.session.saveMessage(message); - if (message.role === "assistant") { + this.session.saveMessage(message, this._internalRun ? { internal: true } : undefined); + // Skip compaction during internal runs — internal messages will be + // rolled back from memory afterwards, so compacting now would be incorrect. + if (message.role === "assistant" && !this._internalRun) { void this.maybeCompact(); } } @@ -510,6 +558,11 @@ export class Agent { return this.agent.state.tools?.map(t => t.name) ?? []; } + /** Whether the agent is currently executing an internal run */ + get isInternalRun(): boolean { + return this._internalRun; + } + /** Ensure session messages are loaded from disk (idempotent) */ async ensureInitialized(): Promise { if (this.initialized) return; @@ -521,11 +574,19 @@ export class Agent { this.initialized = true; } - /** Get all messages from the current session */ + /** Get all messages from the current session (in-memory state) */ getMessages(): AgentMessage[] { return this.agent.state.messages.slice(); } + /** + * Load messages from session storage with filtering. + * By default, internal messages are excluded. + */ + loadSessionMessages(options?: { includeInternal?: boolean }): AgentMessage[] { + return this.session.loadMessages(options); + } + /** * Get all skills with their eligibility status. * Returns empty array if skills are disabled. diff --git a/src/agent/session/session-manager.ts b/src/agent/session/session-manager.ts index eece48b1..6c3c49ae 100644 --- a/src/agent/session/session-manager.ts +++ b/src/agent/session/session-manager.ts @@ -167,11 +167,15 @@ export class SessionManager { return repairSessionFileIfNeeded({ sessionFile: filePath, warn }); } - loadMessages(): AgentMessage[] { + loadMessages(options?: { includeInternal?: boolean }): AgentMessage[] { const entries = this.loadEntries(); let messages = entries - .filter((entry) => entry.type === "message") - .map((entry) => entry.message); + .filter((entry) => { + if (entry.type !== "message") return false; + if (!options?.includeInternal && entry.internal) return false; + return true; + }) + .map((entry) => (entry as { type: "message"; message: AgentMessage }).message); messages = sanitizeToolCallInputs(messages); messages = sanitizeToolUseResultPairing(messages); return messages; @@ -203,11 +207,16 @@ export class SessionManager { ); } - saveMessage(message: AgentMessage) { + saveMessage(message: AgentMessage, options?: { internal?: boolean }) { void this.enqueue(() => appendEntry( this.sessionId, - { type: "message", message, timestamp: Date.now() }, + { + type: "message", + message, + timestamp: Date.now(), + ...(options?.internal ? { internal: true } : {}), + }, { baseDir: this.baseDir }, ), ); diff --git a/src/agent/session/types.ts b/src/agent/session/types.ts index 2d311902..f3478649 100644 --- a/src/agent/session/types.ts +++ b/src/agent/session/types.ts @@ -11,7 +11,7 @@ export type SessionMeta = { }; export type SessionEntry = - | { type: "message"; message: AgentMessage; timestamp: number } + | { type: "message"; message: AgentMessage; timestamp: number; internal?: boolean } | { type: "meta"; meta: SessionMeta; timestamp: number } | { type: "compaction"; diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index ea541d5b..fff82b7e 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -260,7 +260,7 @@ export function runCoalescedAnnounceFlow( return false; } - parentAgent.write(message); + parentAgent.writeInternal(message); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); @@ -308,7 +308,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean return false; } - parentAgent.write(message); + parentAgent.writeInternal(message); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); From f7267f6698f66114c3e45e6bae4dfc94dc9f97c9 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 18:38:54 +0800 Subject: [PATCH 06/13] fix(agent): prevent internal run leaks in async streams --- src/agent/async-agent.test.ts | 146 ++++++++++++++++++++++++++++++++++ src/agent/async-agent.ts | 7 +- 2 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 src/agent/async-agent.test.ts diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts new file mode 100644 index 00000000..cd7f3c32 --- /dev/null +++ b/src/agent/async-agent.test.ts @@ -0,0 +1,146 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { AsyncAgent } from "./async-agent.js"; + +const subscribeCallbacks: Array<(event: any) => void> = []; +const internalRunState = { value: false }; + +const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); +const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); +const flushSessionMock = vi.fn(async () => {}); +const subscribeAllMock = vi.fn((fn: (event: any) => void) => { + subscribeCallbacks.push(fn); + return () => {}; +}); + +vi.mock("./runner.js", () => ({ + Agent: class MockAgent { + sessionId = "test-session"; + subscribeAll = subscribeAllMock; + run = runMock; + runInternal = runInternalMock; + flushSession = flushSessionMock; + get isInternalRun() { + return internalRunState.value; + } + getMessages() { + return []; + } + loadSessionMessages() { + return []; + } + async ensureInitialized() {} + getActiveTools() { + return []; + } + reloadTools() { + return []; + } + getSkillsWithStatus() { + return []; + } + getEligibleSkills() { + return []; + } + reloadSkills() {} + setToolStatus() { + return undefined; + } + getProfileId() { + return undefined; + } + getAgentName() { + return undefined; + } + setAgentName() {} + getUserContent() { + return undefined; + } + setUserContent() {} + getAgentStyle() { + return undefined; + } + setAgentStyle() {} + reloadSystemPrompt() {} + getProviderInfo() { + return { provider: "test", model: "test-model" }; + } + setProvider() { + return { provider: "test", model: "test-model" }; + } + }, +})); + +async function nextWithTimeout(iter: AsyncIterator, timeoutMs = 40): Promise<"timeout" | T> { + return await Promise.race([ + iter.next().then((result) => (result.done ? "timeout" : result.value)), + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), timeoutMs)), + ]); +} + +describe("AsyncAgent internal flow", () => { + afterEach(() => { + subscribeCallbacks.length = 0; + internalRunState.value = false; + runMock.mockReset(); + runInternalMock.mockReset(); + flushSessionMock.mockReset(); + subscribeAllMock.mockClear(); + runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); + runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); + flushSessionMock.mockResolvedValue(undefined); + }); + + it("filters internal events in direct subscribe stream", () => { + const agent = new AsyncAgent(); + const events: Array<{ type: string }> = []; + + const unsubscribe = agent.subscribe((event) => { + events.push(event as { type: string }); + }); + + // subscribeAll is called twice: + // 1) constructor for read() channel forwarding + // 2) subscribe() for direct callback forwarding + const subscribeCallback = subscribeCallbacks[1]; + expect(subscribeCallback).toBeDefined(); + + internalRunState.value = true; + subscribeCallback!({ type: "message_end" }); + expect(events).toHaveLength(0); + + internalRunState.value = false; + subscribeCallback!({ type: "message_end" }); + expect(events).toHaveLength(1); + + unsubscribe(); + agent.close(); + }); + + it("does not leak internal run errors to read() stream", async () => { + runInternalMock.mockResolvedValueOnce({ text: "", thinking: undefined, error: "internal failed" }); + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + + agent.writeInternal("test internal"); + await agent.waitForIdle(); + + const value = await nextWithTimeout(iter); + expect(value).toBe("timeout"); + + agent.close(); + }); + + it("does not leak internal run exceptions to read() stream", async () => { + runInternalMock.mockRejectedValueOnce(new Error("internal exception")); + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + + agent.writeInternal("test internal"); + await agent.waitForIdle(); + + const value = await nextWithTimeout(iter); + expect(value).toBe("timeout"); + + agent.close(); + }); +}); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index e7aa7c2e..514a57e7 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -74,12 +74,14 @@ export class AsyncAgent { const result = await this.agent.runInternal(content); await this.agent.flushSession(); if (result.error) { - this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + // Internal run errors are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run error: ${result.error}`); } }) .catch((err) => { const message = err instanceof Error ? err.message : String(err); - this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + // Internal run exceptions are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run failed: ${message}`); }); } @@ -96,6 +98,7 @@ export class AsyncAgent { subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); const unsubscribe = this.agent.subscribeAll((event) => { + if (this.agent.isInternalRun) return; console.log(`[AsyncAgent] Event received: ${event.type}`); callback(event); }); From 9687e7f2a6da03f7d48c8d17e73eecd5f24eea6b Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 18:58:54 +0800 Subject: [PATCH 07/13] fix(subagent): forward announce replies from internal runs --- src/agent/async-agent.test.ts | 42 ++++++++++++++++++++++++++++++++++ src/agent/async-agent.ts | 41 +++++++++++++++++++++++++-------- src/agent/subagent/announce.ts | 4 ++-- 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts index cd7f3c32..b4dd4276 100644 --- a/src/agent/async-agent.test.ts +++ b/src/agent/async-agent.test.ts @@ -143,4 +143,46 @@ describe("AsyncAgent internal flow", () => { agent.close(); }); + + it("forwards only assistant message_end events when writeInternal opts in", async () => { + let resolveRunInternal: ((value: { text: string; thinking: undefined; error: undefined }) => void) | undefined; + runInternalMock.mockImplementationOnce( + () => new Promise((resolve) => { + resolveRunInternal = resolve as typeof resolveRunInternal; + }), + ); + + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + const streamCallback = subscribeCallbacks[0]; + expect(streamCallback).toBeDefined(); + + agent.writeInternal("announce", { forwardAssistant: true }); + await Promise.resolve(); + + internalRunState.value = true; + streamCallback!({ + type: "message_end", + message: { role: "user", content: [{ type: "text", text: "hidden internal prompt" }] }, + }); + streamCallback!({ + type: "message_end", + message: { role: "assistant", content: [{ type: "text", text: "visible summary" }] }, + }); + + const first = await nextWithTimeout(iter); + expect(first).not.toBe("timeout"); + if (first !== "timeout") { + expect((first as { type: string }).type).toBe("message_end"); + expect((first as { message: { role: string } }).message.role).toBe("assistant"); + } + + const second = await nextWithTimeout(iter); + expect(second).toBe("timeout"); + + resolveRunInternal!({ text: "", thinking: undefined, error: undefined }); + await agent.waitForIdle(); + internalRunState.value = false; + agent.close(); + }); }); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 514a57e7..fc7fb15b 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -10,12 +10,18 @@ const devNull = { write: () => true } as unknown as NodeJS.WritableStream; /** Discriminated union of legacy Message, raw AgentEvent, and MulticaEvent */ export type ChannelItem = Message | AgentEvent | MulticaEvent; +export interface WriteInternalOptions { + /** Forward assistant message_end events to realtime stream during internal runs */ + forwardAssistant?: boolean | undefined; +} + export class AsyncAgent { private readonly agent: Agent; private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); private closeCallbacks: Array<() => void> = []; + private forwardInternalAssistant = false; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -29,7 +35,7 @@ export class AsyncAgent { // Suppress forwarding during internal runs to avoid leaking // orchestration messages to the frontend/real-time stream. this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => { - if (this.agent.isInternalRun) return; + if (!this.shouldForwardEvent(event)) return; this.channel.send(event); }); } @@ -63,19 +69,26 @@ export class AsyncAgent { /** * Write an internal message to agent (non-blocking, serialized queue). * Messages are persisted with `internal: true` and rolled back from - * in-memory state. Events are suppressed from the real-time stream. + * in-memory state. Events are suppressed from the real-time stream by default. */ - writeInternal(content: string): void { + writeInternal(content: string, options?: WriteInternalOptions): void { if (this._closed) throw new Error("Agent is closed"); + const forwardAssistant = options?.forwardAssistant === true; this.queue = this.queue .then(async () => { if (this._closed) return; - const result = await this.agent.runInternal(content); - await this.agent.flushSession(); - if (result.error) { - // Internal run errors are for diagnostics only; do not leak to user stream. - console.error(`[AsyncAgent] Internal run error: ${result.error}`); + const prevForward = this.forwardInternalAssistant; + this.forwardInternalAssistant = forwardAssistant; + try { + const result = await this.agent.runInternal(content); + await this.agent.flushSession(); + if (result.error) { + // Internal run errors are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run error: ${result.error}`); + } + } finally { + this.forwardInternalAssistant = prevForward; } }) .catch((err) => { @@ -98,7 +111,7 @@ export class AsyncAgent { subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); const unsubscribe = this.agent.subscribeAll((event) => { - if (this.agent.isInternalRun) return; + if (!this.shouldForwardEvent(event)) return; console.log(`[AsyncAgent] Event received: ${event.type}`); callback(event); }); @@ -113,6 +126,16 @@ export class AsyncAgent { return this.queue; } + private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { + if (!this.agent.isInternalRun) return true; + if (!this.forwardInternalAssistant) return false; + if (event.type !== "message_end") return false; + + const maybeMessage = (event as { message?: unknown }).message; + if (!maybeMessage || typeof maybeMessage !== "object") return false; + return (maybeMessage as { role?: unknown }).role === "assistant"; + } + /** Register a callback to be invoked when the agent is closed */ onClose(callback: () => void): void { if (this._closed) { diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index fff82b7e..8809efdb 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -260,7 +260,7 @@ export function runCoalescedAnnounceFlow( return false; } - parentAgent.writeInternal(message); + parentAgent.writeInternal(message, { forwardAssistant: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); @@ -308,7 +308,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean return false; } - parentAgent.writeInternal(message); + parentAgent.writeInternal(message, { forwardAssistant: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); From 4d6017e782a99981d4f2ee050f09e6d52cccfd24 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 19:04:40 +0800 Subject: [PATCH 08/13] fix(subagent): forward assistant stream events for internal announce --- src/agent/async-agent.test.ts | 28 +++++++++++++++++++++++++--- src/agent/async-agent.ts | 4 +++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts index b4dd4276..3d359d3d 100644 --- a/src/agent/async-agent.test.ts +++ b/src/agent/async-agent.test.ts @@ -144,7 +144,7 @@ describe("AsyncAgent internal flow", () => { agent.close(); }); - it("forwards only assistant message_end events when writeInternal opts in", async () => { + it("forwards assistant message stream (start/update/end) when writeInternal opts in", async () => { let resolveRunInternal: ((value: { text: string; thinking: undefined; error: undefined }) => void) | undefined; runInternalMock.mockImplementationOnce( () => new Promise((resolve) => { @@ -161,6 +161,14 @@ describe("AsyncAgent internal flow", () => { await Promise.resolve(); internalRunState.value = true; + streamCallback!({ + type: "message_start", + message: { role: "assistant", content: [] }, + }); + streamCallback!({ + type: "message_update", + message: { role: "assistant", content: [{ type: "text", text: "partial" }] }, + }); streamCallback!({ type: "message_end", message: { role: "user", content: [{ type: "text", text: "hidden internal prompt" }] }, @@ -173,12 +181,26 @@ describe("AsyncAgent internal flow", () => { const first = await nextWithTimeout(iter); expect(first).not.toBe("timeout"); if (first !== "timeout") { - expect((first as { type: string }).type).toBe("message_end"); + expect((first as { type: string }).type).toBe("message_start"); expect((first as { message: { role: string } }).message.role).toBe("assistant"); } const second = await nextWithTimeout(iter); - expect(second).toBe("timeout"); + expect(second).not.toBe("timeout"); + if (second !== "timeout") { + expect((second as { type: string }).type).toBe("message_update"); + expect((second as { message: { role: string } }).message.role).toBe("assistant"); + } + + const third = await nextWithTimeout(iter); + expect(third).not.toBe("timeout"); + if (third !== "timeout") { + expect((third as { type: string }).type).toBe("message_end"); + expect((third as { message: { role: string } }).message.role).toBe("assistant"); + } + + const fourth = await nextWithTimeout(iter); + expect(fourth).toBe("timeout"); resolveRunInternal!({ text: "", thinking: undefined, error: undefined }); await agent.waitForIdle(); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index fc7fb15b..a284871b 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -129,7 +129,9 @@ export class AsyncAgent { private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { if (!this.agent.isInternalRun) return true; if (!this.forwardInternalAssistant) return false; - if (event.type !== "message_end") return false; + if (event.type !== "message_start" && event.type !== "message_update" && event.type !== "message_end") { + return false; + } const maybeMessage = (event as { message?: unknown }).message; if (!maybeMessage || typeof maybeMessage !== "object") return false; From 12075b96f2bbbfc81a31dc7041ea3097bfb45349 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 19:10:44 +0800 Subject: [PATCH 09/13] fix(subagent): capture latest non-empty findings from child runs --- src/agent/subagent/announce-findings.test.ts | 67 ++++++++++++++++++++ src/agent/subagent/announce.ts | 35 ++++++++-- 2 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 src/agent/subagent/announce-findings.test.ts diff --git a/src/agent/subagent/announce-findings.test.ts b/src/agent/subagent/announce-findings.test.ts new file mode 100644 index 00000000..6a52fc76 --- /dev/null +++ b/src/agent/subagent/announce-findings.test.ts @@ -0,0 +1,67 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const readEntriesMock = vi.fn(); + +vi.mock("../session/storage.js", () => ({ + readEntries: (sessionId: string) => readEntriesMock(sessionId), +})); + +import { readLatestAssistantReply } from "./announce.js"; + +describe("readLatestAssistantReply", () => { + beforeEach(() => { + readEntriesMock.mockReset(); + }); + + it("returns the latest non-empty assistant text when the last assistant message is tool-only", () => { + readEntriesMock.mockReturnValue([ + { + type: "message", + timestamp: 1, + message: { + role: "assistant", + content: [{ type: "text", text: "南京天气:晴,12°C。" }], + }, + }, + { + type: "message", + timestamp: 2, + message: { + role: "assistant", + content: [{ type: "toolCall", id: "tool-1", name: "weather", arguments: { city: "Nanjing" } }], + }, + }, + ]); + + const result = readLatestAssistantReply("child-session"); + expect(result).toBe("南京天气:晴,12°C。"); + }); + + it("falls back to latest toolResult text when no assistant text exists", () => { + readEntriesMock.mockReturnValue([ + { + type: "message", + timestamp: 1, + message: { + role: "assistant", + content: [{ type: "toolCall", id: "tool-2", name: "weather", arguments: { city: "Nanjing" } }], + }, + }, + { + type: "message", + timestamp: 2, + message: { + role: "toolResult", + toolCallId: "tool-2", + toolName: "weather", + content: [{ type: "text", text: "{\"city\":\"Nanjing\",\"tempC\":12,\"condition\":\"Sunny\"}" }], + isError: false, + }, + }, + ]); + + const result = readLatestAssistantReply("child-session"); + expect(result).toContain("\"city\":\"Nanjing\""); + expect(result).toContain("\"condition\":\"Sunny\""); + }); +}); diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index 8809efdb..7afeb707 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -39,19 +39,29 @@ export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): s */ export function readLatestAssistantReply(sessionId: string): string | undefined { const entries = readEntries(sessionId); + let latestToolResultText: string | undefined; - // Walk backwards to find last assistant message + // Walk backwards to find the last non-empty assistant reply. + // If no assistant text exists (e.g. run ended after tool execution), + // fall back to the latest non-empty toolResult content. for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i]!; if (entry.type !== "message") continue; const message = entry.message; - if (message.role !== "assistant") continue; + if (message.role === "assistant") { + const text = extractAssistantText(message); + if (text) return text; + continue; + } - return extractAssistantText(message); + if (message.role === "toolResult" && !latestToolResultText) { + const text = extractToolResultText(message); + if (text) latestToolResultText = text; + } } - return undefined; + return latestToolResultText; } /** @@ -59,7 +69,17 @@ export function readLatestAssistantReply(sessionId: string): string | undefined * AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[]. */ function extractAssistantText(message: { role: string; content: unknown }): string { - const content = message.content; + return extractTextLikeContent(message.content); +} + +/** + * Extract text content from a toolResult message. + */ +function extractToolResultText(message: { role: string; content: unknown }): string { + return extractTextLikeContent(message.content); +} + +function extractTextLikeContent(content: unknown): string { if (typeof content === "string") { return sanitizeText(content); } @@ -68,8 +88,9 @@ function extractAssistantText(message: { role: string; content: unknown }): stri const textParts: string[] = []; for (const block of content) { - if (block && typeof block === "object" && "type" in block && block.type === "text" && "text" in block) { - textParts.push(String(block.text)); + if (!block || typeof block !== "object") continue; + if ("text" in block) { + textParts.push(String((block as { text: unknown }).text)); } } From a50239a9f5eeb61887d5dbfcaa07ea8bd9acf4b6 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 19:12:51 +0800 Subject: [PATCH 10/13] fix(subagent): require coalesced announce to include all findings --- src/agent/subagent/announce.test.ts | 18 +++++++++++++++++- src/agent/subagent/announce.ts | 18 +++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/agent/subagent/announce.test.ts b/src/agent/subagent/announce.test.ts index 28fafee1..f532dd5c 100644 --- a/src/agent/subagent/announce.test.ts +++ b/src/agent/subagent/announce.test.ts @@ -232,7 +232,23 @@ describe("formatCoalescedAnnouncementMessage", () => { const msg = formatCoalescedAnnouncementMessage(records); - expect(msg).toContain("combined findings"); + expect(msg).toContain("MUST include findings from every task item above"); expect(msg).toContain("NO_REPLY"); }); + + it("includes raw findings for every task in coalesced payload", () => { + const records = [ + makeRecord({ runId: "run-1", label: "南京天气", findings: "南京:晴,12°C" }), + makeRecord({ runId: "run-2", label: "上海天气", findings: "上海:多云,9°C" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("Raw findings from each task (MUST cover all items):"); + expect(msg).toContain("[1] 南京天气:"); + expect(msg).toContain("南京:晴,12°C"); + expect(msg).toContain("[2] 上海天气:"); + expect(msg).toContain("上海:多云,9°C"); + expect(msg).toContain("MUST include findings from every task item above"); + }); }); diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index 7afeb707..5f9b5438 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -213,7 +213,8 @@ export function formatCoalescedAnnouncementMessage( }); } - // Multiple records: build combined message + // Multiple records: build combined message. + // Include a strict raw-findings section so parent can reliably cover every task result. const parts: string[] = [ `All ${records.length} background tasks have completed. Here are the combined results:`, "", @@ -249,11 +250,22 @@ export function formatCoalescedAnnouncementMessage( const failCount = records.length - okCount; parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`); + parts.push("", "Raw findings from each task (MUST cover all items):", ""); + for (let i = 0; i < records.length; i++) { + const r = records[i]!; + const displayName = r.label || r.task.slice(0, 60); + parts.push( + `[${i + 1}] ${displayName}:`, + r.findings || "(no output)", + "", + ); + } + parts.push( "", "Summarize these results naturally for the user.", - "Present the combined findings as a coherent summary, not a list of separate reports.", - "Keep it concise but cover the key findings from each task.", + "You MUST include findings from every task item above, without omission.", + "Keep it concise, but preserve concrete findings from each task.", "Do not mention technical details like session IDs or that these were background tasks.", "You can respond with NO_REPLY if no announcement is needed.", ); From a3acd732e055e081a543df330748b69b7c9baf4d Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 19:38:18 +0800 Subject: [PATCH 11/13] fix(subagent): persist LLM summary after internal announce to parent context After child subagents complete, the coalesced announcement runs as an internal turn which rolls back all messages from the parent's in-memory context. This causes the parent LLM to lose findings in subsequent turns. Add persistResponse option to writeInternal that re-injects the LLM's summary as a non-internal assistant message after the internal run completes. The internal prompt stays hidden while the summary persists in both memory and session JSONL for future turns. Co-Authored-By: Claude Opus 4.6 --- src/agent/async-agent.test.ts | 54 ++++++++++++++++++++++++++++++++++ src/agent/async-agent.ts | 8 +++++ src/agent/runner.ts | 29 ++++++++++++++++++ src/agent/subagent/announce.ts | 4 +-- 4 files changed, 93 insertions(+), 2 deletions(-) diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts index 3d359d3d..3ebe01fd 100644 --- a/src/agent/async-agent.test.ts +++ b/src/agent/async-agent.test.ts @@ -7,6 +7,7 @@ const internalRunState = { value: false }; const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); const flushSessionMock = vi.fn(async () => {}); +const persistAssistantSummaryMock = vi.fn(); const subscribeAllMock = vi.fn((fn: (event: any) => void) => { subscribeCallbacks.push(fn); return () => {}; @@ -19,6 +20,7 @@ vi.mock("./runner.js", () => ({ run = runMock; runInternal = runInternalMock; flushSession = flushSessionMock; + persistAssistantSummary = persistAssistantSummaryMock; get isInternalRun() { return internalRunState.value; } @@ -84,6 +86,7 @@ describe("AsyncAgent internal flow", () => { runMock.mockReset(); runInternalMock.mockReset(); flushSessionMock.mockReset(); + persistAssistantSummaryMock.mockReset(); subscribeAllMock.mockClear(); runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); @@ -207,4 +210,55 @@ describe("AsyncAgent internal flow", () => { internalRunState.value = false; agent.close(); }); + + it("persists assistant summary when persistResponse is true and result has text", async () => { + runInternalMock.mockResolvedValueOnce({ text: "Summary of findings", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).toHaveBeenCalledOnce(); + expect(persistAssistantSummaryMock).toHaveBeenCalledWith("Summary of findings"); + // flushSession called twice: once after runInternal, once after persistAssistantSummary + expect(flushSessionMock).toHaveBeenCalledTimes(2); + + agent.close(); + }); + + it("does not persist assistant summary when result text is NO_REPLY", async () => { + runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + + it("does not persist assistant summary when result text is empty", async () => { + runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + + it("does not persist assistant summary when persistResponse is not set", async () => { + runInternalMock.mockResolvedValueOnce({ text: "Summary of findings", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); }); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index a284871b..b3b76ec1 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -13,6 +13,8 @@ export type ChannelItem = Message | AgentEvent | MulticaEvent; export interface WriteInternalOptions { /** Forward assistant message_end events to realtime stream during internal runs */ forwardAssistant?: boolean | undefined; + /** After internal run completes, persist the LLM's summary as a non-internal assistant message */ + persistResponse?: boolean | undefined; } export class AsyncAgent { @@ -74,6 +76,7 @@ export class AsyncAgent { writeInternal(content: string, options?: WriteInternalOptions): void { if (this._closed) throw new Error("Agent is closed"); const forwardAssistant = options?.forwardAssistant === true; + const persistResponse = options?.persistResponse === true; this.queue = this.queue .then(async () => { @@ -87,6 +90,11 @@ export class AsyncAgent { // Internal run errors are for diagnostics only; do not leak to user stream. console.error(`[AsyncAgent] Internal run error: ${result.error}`); } + // Persist the LLM summary so it remains in parent context for future turns + if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") { + this.agent.persistAssistantSummary(result.text.trim()); + await this.agent.flushSession(); + } } finally { this.forwardInternalAssistant = prevForward; } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 8955b04c..41c0f7a2 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -563,6 +563,35 @@ export class Agent { return this._internalRun; } + /** + * Persist a synthetic assistant message into both in-memory state and session JSONL. + * Used after an internal run to keep the LLM summary visible in future turns + * while the internal prompt stays hidden. + */ + persistAssistantSummary(text: string): void { + const model = this.agent.state.model; + const message = { + role: "assistant" as const, + content: [{ type: "text" as const, text }], + api: model?.api ?? "openai-completions", + provider: model?.provider ?? "internal", + model: model?.id ?? "unknown", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop" as const, + timestamp: Date.now(), + }; + + this.agent.appendMessage(message); + this.session.saveMessage(message); + } + /** Ensure session messages are loaded from disk (idempotent) */ async ensureInitialized(): Promise { if (this.initialized) return; diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index 5f9b5438..54c92e75 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -293,7 +293,7 @@ export function runCoalescedAnnounceFlow( return false; } - parentAgent.writeInternal(message, { forwardAssistant: true }); + parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); @@ -341,7 +341,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean return false; } - parentAgent.writeInternal(message, { forwardAssistant: true }); + parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); From a3f5a21561c6ad0f92067cef988fb7b4f7cb20dd Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 19:53:31 +0800 Subject: [PATCH 12/13] fix(subagent): clear run records immediately after successful announcement Instead of keeping announced runs in the registry for 60 minutes (archive sweeper), delete them right after findings are delivered to the parent. This prevents stale completed tasks from appearing in sessions_list on subsequent parent turns. Co-Authored-By: Claude Opus 4.6 --- src/agent/subagent/registry.test.ts | 32 ++++++++++++++++++++++++++++- src/agent/subagent/registry.ts | 7 +++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/agent/subagent/registry.test.ts b/src/agent/subagent/registry.test.ts index 9d0fe931..2169564b 100644 --- a/src/agent/subagent/registry.test.ts +++ b/src/agent/subagent/registry.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, beforeEach } from "vitest"; +import { describe, it, expect, beforeEach, vi } from "vitest"; import { registerSubagentRun, listSubagentRuns, @@ -244,3 +244,33 @@ describe("subagent registry — coalescing", () => { expect(record?.findingsCaptured).toBe(true); }); }); + +describe("subagent registry — post-announce cleanup", () => { + it("removes runs from registry after successful announcement", async () => { + // Mock runCoalescedAnnounceFlow to succeed + const announceModule = await import("./announce.js"); + const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); + + // Register two runs for the same parent — both end immediately (no Hub) + registerSubagentRun({ + runId: "run-a", + childSessionId: "child-a", + requesterSessionId: "parent-1", + task: "Task A", + }); + registerSubagentRun({ + runId: "run-b", + childSessionId: "child-b", + requesterSessionId: "parent-1", + task: "Task B", + }); + + // Both runs should have been announced and removed from registry + expect(spy).toHaveBeenCalled(); + expect(getSubagentRun("run-a")).toBeUndefined(); + expect(getSubagentRun("run-b")).toBeUndefined(); + expect(listSubagentRuns("parent-1")).toHaveLength(0); + + spy.mockRestore(); + }); +}); diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index 0dbfa971..0f03002a 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -289,10 +289,13 @@ function checkAndAnnounce(requesterSessionId: string): void { for (const r of pending) { r.announced = true; r.cleanupHandled = true; - r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; - r.cleanupCompletedAt = Date.now(); + // Remove from registry immediately — findings already delivered to parent + subagentRuns.delete(r.runId); } persist(); + if (subagentRuns.size === 0) { + stopSweeper(); + } } else { console.warn( `[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`, From c24fafadeb02282362743c07e4b9ed2de2e252ee Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 19:58:31 +0800 Subject: [PATCH 13/13] fix(tools): prevent AbortSignal listener leak in exec and process tools Each tool call added an abort listener to the shared agent signal without cleanup, exceeding the default 10-listener limit after 11+ exec calls. Fix by using { once: true } and removing the listener on child process close (exec) to prevent accumulation. Co-Authored-By: Claude Opus 4.6 --- src/agent/tools/exec.ts | 23 +++++++++++++---------- src/agent/tools/process.ts | 2 +- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/agent/tools/exec.ts b/src/agent/tools/exec.ts index 826795f6..41b51550 100644 --- a/src/agent/tools/exec.ts +++ b/src/agent/tools/exec.ts @@ -164,9 +164,22 @@ export function createExecTool( // Don't reject, let close event handle }); + // Signal handling: don't kill if already backgrounded + const onAbort = signal ? () => { + if (yielded) return; // Already backgrounded, ignore abort + if (timeout) clearTimeout(timeout); + if (yieldTimer) clearTimeout(yieldTimer); + child.kill("SIGTERM"); + } : undefined; + + if (signal && onAbort) { + signal.addEventListener("abort", onAbort, { once: true }); + } + child.on("close", (code) => { if (timeout) clearTimeout(timeout); if (yieldTimer) clearTimeout(yieldTimer); + if (signal && onAbort) signal.removeEventListener("abort", onAbort); // If already backgrounded, don't resolve again if (yielded) return; @@ -202,16 +215,6 @@ export function createExecTool( }, }); }); - - // Signal handling: don't kill if already backgrounded - if (signal) { - signal.addEventListener("abort", () => { - if (yielded) return; // Already backgrounded, ignore abort - if (timeout) clearTimeout(timeout); - if (yieldTimer) clearTimeout(yieldTimer); - child.kill("SIGTERM"); - }); - } }); }, }; diff --git a/src/agent/tools/process.ts b/src/agent/tools/process.ts index 962b6a13..ad0fecb0 100644 --- a/src/agent/tools/process.ts +++ b/src/agent/tools/process.ts @@ -112,7 +112,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool { child.kill("SIGTERM"); - }); + }, { once: true }); } resolve({ success: true });