diff --git a/apps/desktop/electron/ipc/hub.ts b/apps/desktop/electron/ipc/hub.ts index b038efe8..68dd5550 100644 --- a/apps/desktop/electron/ipc/hub.ts +++ b/apps/desktop/electron/ipc/hub.ts @@ -343,6 +343,9 @@ export function registerHubIpcHandlers(): void { * Get message history for local chat with pagination. * Returns raw AgentMessageItem[] so the renderer can render content blocks, * tool results, thinking blocks, etc. — same format as the Gateway RPC. + * + * Reads from session storage (not in-memory state) so that internal + * orchestration messages are excluded by default. */ ipcMain.handle('localChat:getHistory', async (_event, agentId: string, options?: { offset?: number; limit?: number }) => { const h = getHub() @@ -353,7 +356,7 @@ export function registerHubIpcHandlers(): void { try { await agent.ensureInitialized() - const allMessages = agent.getMessages() + const allMessages = agent.loadSessionMessages() const total = allMessages.length // Must match DEFAULT_MESSAGES_LIMIT from @multica/sdk/actions/rpc const limit = options?.limit ?? 200 diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts new file mode 100644 index 00000000..3ebe01fd --- /dev/null +++ b/src/agent/async-agent.test.ts @@ -0,0 +1,264 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { AsyncAgent } from "./async-agent.js"; + +const subscribeCallbacks: Array<(event: any) => void> = []; +const internalRunState = { value: false }; + +const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); +const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); +const flushSessionMock = vi.fn(async () => {}); +const persistAssistantSummaryMock = vi.fn(); +const subscribeAllMock = vi.fn((fn: (event: any) => void) => { + subscribeCallbacks.push(fn); + return () => {}; +}); + +vi.mock("./runner.js", () => ({ + Agent: class MockAgent { + sessionId = "test-session"; + subscribeAll = subscribeAllMock; + run = runMock; + runInternal = runInternalMock; + flushSession = flushSessionMock; + persistAssistantSummary = persistAssistantSummaryMock; + get isInternalRun() { + return internalRunState.value; + } + getMessages() { + return []; + } + loadSessionMessages() { + return []; + } + async ensureInitialized() {} + getActiveTools() { + return []; + } + reloadTools() { + return []; + } + getSkillsWithStatus() { + return []; + } + getEligibleSkills() { + return []; + } + reloadSkills() {} + setToolStatus() { + return undefined; + } + getProfileId() { + return undefined; + } + getAgentName() { + return undefined; + } + setAgentName() {} + getUserContent() { + return undefined; + } + setUserContent() {} + getAgentStyle() { + return undefined; + } + setAgentStyle() {} + reloadSystemPrompt() {} + getProviderInfo() { + return { provider: "test", model: "test-model" }; + } + setProvider() { + return { provider: "test", model: "test-model" }; + } + }, +})); + +async function nextWithTimeout(iter: AsyncIterator, timeoutMs = 40): Promise<"timeout" | T> { + return await Promise.race([ + iter.next().then((result) => (result.done ? "timeout" : result.value)), + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), timeoutMs)), + ]); +} + +describe("AsyncAgent internal flow", () => { + afterEach(() => { + subscribeCallbacks.length = 0; + internalRunState.value = false; + runMock.mockReset(); + runInternalMock.mockReset(); + flushSessionMock.mockReset(); + persistAssistantSummaryMock.mockReset(); + subscribeAllMock.mockClear(); + runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); + runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); + flushSessionMock.mockResolvedValue(undefined); + }); + + it("filters internal events in direct subscribe stream", () => { + const agent = new AsyncAgent(); + const events: Array<{ type: string }> = []; + + const unsubscribe = agent.subscribe((event) => { + events.push(event as { type: string }); + }); + + // subscribeAll is called twice: + // 1) constructor for read() channel forwarding + // 2) subscribe() for direct callback forwarding + const subscribeCallback = subscribeCallbacks[1]; + expect(subscribeCallback).toBeDefined(); + + internalRunState.value = true; + subscribeCallback!({ type: "message_end" }); + expect(events).toHaveLength(0); + + internalRunState.value = false; + subscribeCallback!({ type: "message_end" }); + expect(events).toHaveLength(1); + + unsubscribe(); + agent.close(); + }); + + it("does not leak internal run errors to read() stream", async () => { + runInternalMock.mockResolvedValueOnce({ text: "", thinking: undefined, error: "internal failed" }); + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + + agent.writeInternal("test internal"); + await agent.waitForIdle(); + + const value = await nextWithTimeout(iter); + expect(value).toBe("timeout"); + + agent.close(); + }); + + it("does not leak internal run exceptions to read() stream", async () => { + runInternalMock.mockRejectedValueOnce(new Error("internal exception")); + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + + agent.writeInternal("test internal"); + await agent.waitForIdle(); + + const value = await nextWithTimeout(iter); + expect(value).toBe("timeout"); + + agent.close(); + }); + + it("forwards assistant message stream (start/update/end) when writeInternal opts in", async () => { + let resolveRunInternal: ((value: { text: string; thinking: undefined; error: undefined }) => void) | undefined; + runInternalMock.mockImplementationOnce( + () => new Promise((resolve) => { + resolveRunInternal = resolve as typeof resolveRunInternal; + }), + ); + + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + const streamCallback = subscribeCallbacks[0]; + expect(streamCallback).toBeDefined(); + + agent.writeInternal("announce", { forwardAssistant: true }); + await Promise.resolve(); + + internalRunState.value = true; + streamCallback!({ + type: "message_start", + message: { role: "assistant", content: [] }, + }); + streamCallback!({ + type: "message_update", + message: { role: "assistant", content: [{ type: "text", text: "partial" }] }, + }); + streamCallback!({ + type: "message_end", + message: { role: "user", content: [{ type: "text", text: "hidden internal prompt" }] }, + }); + streamCallback!({ + type: "message_end", + message: { role: "assistant", content: [{ type: "text", text: "visible summary" }] }, + }); + + const first = await nextWithTimeout(iter); + expect(first).not.toBe("timeout"); + if (first !== "timeout") { + expect((first as { type: string }).type).toBe("message_start"); + expect((first as { message: { role: string } }).message.role).toBe("assistant"); + } + + const second = await nextWithTimeout(iter); + expect(second).not.toBe("timeout"); + if (second !== "timeout") { + expect((second as { type: string }).type).toBe("message_update"); + expect((second as { message: { role: string } }).message.role).toBe("assistant"); + } + + const third = await nextWithTimeout(iter); + expect(third).not.toBe("timeout"); + if (third !== "timeout") { + expect((third as { type: string }).type).toBe("message_end"); + expect((third as { message: { role: string } }).message.role).toBe("assistant"); + } + + const fourth = await nextWithTimeout(iter); + expect(fourth).toBe("timeout"); + + resolveRunInternal!({ text: "", thinking: undefined, error: undefined }); + await agent.waitForIdle(); + internalRunState.value = false; + agent.close(); + }); + + it("persists assistant summary when persistResponse is true and result has text", async () => { + runInternalMock.mockResolvedValueOnce({ text: "Summary of findings", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).toHaveBeenCalledOnce(); + expect(persistAssistantSummaryMock).toHaveBeenCalledWith("Summary of findings"); + // flushSession called twice: once after runInternal, once after persistAssistantSummary + expect(flushSessionMock).toHaveBeenCalledTimes(2); + + agent.close(); + }); + + it("does not persist assistant summary when result text is NO_REPLY", async () => { + runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + + it("does not persist assistant summary when result text is empty", async () => { + runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + + it("does not persist assistant summary when persistResponse is not set", async () => { + runInternalMock.mockResolvedValueOnce({ text: "Summary of findings", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); +}); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index c1eb9a5e..0f7f7d88 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -10,6 +10,13 @@ const devNull = { write: () => true } as unknown as NodeJS.WritableStream; /** Discriminated union of legacy Message, raw AgentEvent, and MulticaEvent */ export type ChannelItem = Message | AgentEvent | MulticaEvent; +export interface WriteInternalOptions { + /** Forward assistant message_end events to realtime stream during internal runs */ + forwardAssistant?: boolean | undefined; + /** After internal run completes, persist the LLM's summary as a non-internal assistant message */ + persistResponse?: boolean | undefined; +} + export class AsyncAgent { private readonly agent: Agent; private readonly channel = new Channel(); @@ -17,6 +24,7 @@ export class AsyncAgent { private queue: Promise = Promise.resolve(); private pendingWrites = 0; private closeCallbacks: Array<() => void> = []; + private forwardInternalAssistant = false; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -26,8 +34,11 @@ export class AsyncAgent { }); this.sessionId = this.agent.sessionId; - // Forward raw AgentEvent and MulticaEvent into the channel + // Forward raw AgentEvent and MulticaEvent into the channel. + // Suppress forwarding during internal runs to avoid leaking + // orchestration messages to the frontend/real-time stream. this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => { + if (!this.shouldForwardEvent(event)) return; this.channel.send(event); }); } @@ -62,6 +73,44 @@ export class AsyncAgent { }); } + /** + * Write an internal message to agent (non-blocking, serialized queue). + * Messages are persisted with `internal: true` and rolled back from + * in-memory state. Events are suppressed from the real-time stream by default. + */ + writeInternal(content: string, options?: WriteInternalOptions): void { + if (this._closed) throw new Error("Agent is closed"); + const forwardAssistant = options?.forwardAssistant === true; + const persistResponse = options?.persistResponse === true; + + this.queue = this.queue + .then(async () => { + if (this._closed) return; + const prevForward = this.forwardInternalAssistant; + this.forwardInternalAssistant = forwardAssistant; + try { + const result = await this.agent.runInternal(content); + await this.agent.flushSession(); + if (result.error) { + // Internal run errors are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run error: ${result.error}`); + } + // Persist the LLM summary so it remains in parent context for future turns + if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") { + this.agent.persistAssistantSummary(result.text.trim()); + await this.agent.flushSession(); + } + } finally { + this.forwardInternalAssistant = prevForward; + } + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + // Internal run exceptions are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run failed: ${message}`); + }); + } + /** Continuously read channel stream (AgentEvent + error Messages) */ read(): AsyncIterable { return this.channel; @@ -75,6 +124,7 @@ export class AsyncAgent { subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); const unsubscribe = this.agent.subscribeAll((event) => { + if (!this.shouldForwardEvent(event)) return; console.log(`[AsyncAgent] Event received: ${event.type}`); callback(event); }); @@ -89,6 +139,18 @@ export class AsyncAgent { return this.queue; } + private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { + if (!this.agent.isInternalRun) return true; + if (!this.forwardInternalAssistant) return false; + if (event.type !== "message_start" && event.type !== "message_update" && event.type !== "message_end") { + return false; + } + + const maybeMessage = (event as { message?: unknown }).message; + if (!maybeMessage || typeof maybeMessage !== "object") return false; + return (maybeMessage as { role?: unknown }).role === "assistant"; + } + /** Register a callback to be invoked when the agent is closed */ onClose(callback: () => void): void { if (this._closed) { @@ -259,12 +321,20 @@ export class AsyncAgent { } /** - * Get all messages from the current session. + * Get all messages from the current session (in-memory state). */ getMessages(): AgentMessage[] { return this.agent.getMessages(); } + /** + * Load messages from session storage with filtering. + * By default, internal messages are excluded. + */ + loadSessionMessages(options?: { includeInternal?: boolean }): AgentMessage[] { + return this.agent.loadSessionMessages(options); + } + /** * Get current provider and model information. */ diff --git a/src/agent/cli/commands/session.ts b/src/agent/cli/commands/session.ts index e726cf60..149143c7 100644 --- a/src/agent/cli/commands/session.ts +++ b/src/agent/cli/commands/session.ts @@ -22,7 +22,7 @@ ${cyan("Usage:")} multica session [options] ${cyan("Commands:")} ${yellow("list")} List all sessions - ${yellow("show")} Show session details + ${yellow("show")} Show session details (use --show-internal to include internal messages) ${yellow("delete")} Delete a session ${yellow("help")} Show this help @@ -122,7 +122,7 @@ function cmdList() { console.log(`${dim("Resume with:")} multica --session `); } -function cmdShow(sessionId: string | undefined) { +function cmdShow(sessionId: string | undefined, showInternal = false) { if (!sessionId) { console.error("Error: Session ID is required"); console.error("Usage: multica session show "); @@ -160,14 +160,25 @@ function cmdShow(sessionId: string | undefined) { console.log(cyan("─".repeat(60))); console.log(""); - // Parse and display messages + // Parse and display messages as SessionEntry objects for (const line of lines) { try { - const msg = JSON.parse(line); + const entry = JSON.parse(line); + + // Only display message entries + if (entry.type !== "message") continue; + + // Skip internal messages unless --show-internal + if (entry.internal && !showInternal) continue; + + const msg = entry.message; + if (!msg) continue; + const role = msg.role || "unknown"; const roleColor = role === "user" ? green : role === "assistant" ? cyan : dim; + const internalTag = entry.internal ? dim(" [internal]") : ""; - console.log(`${roleColor(`[${role}]`)}`); + console.log(`${roleColor(`[${role}]`)}${internalTag}`); if (typeof msg.content === "string") { // Truncate long content @@ -238,6 +249,7 @@ function cmdDelete(sessionId: string | undefined) { export async function sessionCommand(args: string[]): Promise { const command = (args[0] || "help") as Command; const arg1 = args[1]; + const showInternal = args.includes("--show-internal"); if (args.includes("--help") || args.includes("-h")) { printHelp(); @@ -249,7 +261,7 @@ export async function sessionCommand(args: string[]): Promise { cmdList(); break; case "show": - cmdShow(arg1); + cmdShow(arg1, showInternal); break; case "delete": cmdDelete(arg1); diff --git a/src/agent/runner.ts b/src/agent/runner.ts index d364f0fa..9433d8bc 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -84,6 +84,10 @@ export class Agent { private readonly stderr: NodeJS.WritableStream; private initialized = false; + // Internal run state + private _internalRun = false; + private _runMutex: Promise = Promise.resolve(); + // MulticaEvent subscribers (parallel to PiAgentCore's subscriber list) // Typed as AgentEvent | MulticaEvent to match subscribeAll() callback signature private multicaListeners: Array<(event: AgentEvent | MulticaEvent) => void> = []; @@ -353,6 +357,48 @@ export class Agent { } async run(prompt: string): Promise { + // Run-level mutex: prevents concurrent run/runInternal from mis-tagging messages + return this.withRunMutex(() => this._run(prompt)); + } + + /** + * Run a prompt as an internal turn. + * Messages are persisted with `internal: true` and rolled back from + * in-memory state after the turn completes, so they do not pollute + * the main conversation context. + */ + async runInternal(prompt: string): Promise { + return this.withRunMutex(async () => { + const messageCountBefore = this.agent.state.messages.length; + this._internalRun = true; + try { + const result = await this._run(prompt); + return result; + } finally { + this._internalRun = false; + // Roll back internal messages from in-memory state + const current = this.agent.state.messages; + if (current.length > messageCountBefore) { + this.agent.replaceMessages(current.slice(0, messageCountBefore)); + } + } + }); + } + + private async withRunMutex(fn: () => Promise): Promise { + // Chain on the mutex so only one run executes at a time + const prev = this._runMutex; + let resolve: () => void; + this._runMutex = new Promise((r) => { resolve = r; }); + await prev; + try { + return await fn(); + } finally { + resolve!(); + } + } + + private async _run(prompt: string): Promise { await this.ensureInitialized(); this.output.state.lastAssistantText = ""; @@ -442,8 +488,10 @@ export class Agent { private handleSessionEvent(event: AgentEvent) { if (event.type === "message_end") { const message = event.message as AgentMessage; - this.session.saveMessage(message); - if (message.role === "assistant") { + this.session.saveMessage(message, this._internalRun ? { internal: true } : undefined); + // Skip compaction during internal runs — internal messages will be + // rolled back from memory afterwards, so compacting now would be incorrect. + if (message.role === "assistant" && !this._internalRun) { void this.maybeCompact(); } } @@ -510,6 +558,40 @@ export class Agent { return this.agent.state.tools?.map(t => t.name) ?? []; } + /** Whether the agent is currently executing an internal run */ + get isInternalRun(): boolean { + return this._internalRun; + } + + /** + * Persist a synthetic assistant message into both in-memory state and session JSONL. + * Used after an internal run to keep the LLM summary visible in future turns + * while the internal prompt stays hidden. + */ + persistAssistantSummary(text: string): void { + const model = this.agent.state.model; + const message = { + role: "assistant" as const, + content: [{ type: "text" as const, text }], + api: model?.api ?? "openai-completions", + provider: model?.provider ?? "internal", + model: model?.id ?? "unknown", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop" as const, + timestamp: Date.now(), + }; + + this.agent.appendMessage(message); + this.session.saveMessage(message); + } + /** Ensure session messages are loaded from disk (idempotent) */ async ensureInitialized(): Promise { if (this.initialized) return; @@ -521,11 +603,19 @@ export class Agent { this.initialized = true; } - /** Get all messages from the current session */ + /** Get all messages from the current session (in-memory state) */ getMessages(): AgentMessage[] { return this.agent.state.messages.slice(); } + /** + * Load messages from session storage with filtering. + * By default, internal messages are excluded. + */ + loadSessionMessages(options?: { includeInternal?: boolean }): AgentMessage[] { + return this.session.loadMessages(options); + } + /** * Get all skills with their eligibility status. * Returns empty array if skills are disabled. diff --git a/src/agent/session/session-manager.ts b/src/agent/session/session-manager.ts index eece48b1..6c3c49ae 100644 --- a/src/agent/session/session-manager.ts +++ b/src/agent/session/session-manager.ts @@ -167,11 +167,15 @@ export class SessionManager { return repairSessionFileIfNeeded({ sessionFile: filePath, warn }); } - loadMessages(): AgentMessage[] { + loadMessages(options?: { includeInternal?: boolean }): AgentMessage[] { const entries = this.loadEntries(); let messages = entries - .filter((entry) => entry.type === "message") - .map((entry) => entry.message); + .filter((entry) => { + if (entry.type !== "message") return false; + if (!options?.includeInternal && entry.internal) return false; + return true; + }) + .map((entry) => (entry as { type: "message"; message: AgentMessage }).message); messages = sanitizeToolCallInputs(messages); messages = sanitizeToolUseResultPairing(messages); return messages; @@ -203,11 +207,16 @@ export class SessionManager { ); } - saveMessage(message: AgentMessage) { + saveMessage(message: AgentMessage, options?: { internal?: boolean }) { void this.enqueue(() => appendEntry( this.sessionId, - { type: "message", message, timestamp: Date.now() }, + { + type: "message", + message, + timestamp: Date.now(), + ...(options?.internal ? { internal: true } : {}), + }, { baseDir: this.baseDir }, ), ); diff --git a/src/agent/session/types.ts b/src/agent/session/types.ts index 2d311902..f3478649 100644 --- a/src/agent/session/types.ts +++ b/src/agent/session/types.ts @@ -11,7 +11,7 @@ export type SessionMeta = { }; export type SessionEntry = - | { type: "message"; message: AgentMessage; timestamp: number } + | { type: "message"; message: AgentMessage; timestamp: number; internal?: boolean } | { type: "meta"; meta: SessionMeta; timestamp: number } | { type: "compaction"; diff --git a/src/agent/subagent/announce-findings.test.ts b/src/agent/subagent/announce-findings.test.ts new file mode 100644 index 00000000..6a52fc76 --- /dev/null +++ b/src/agent/subagent/announce-findings.test.ts @@ -0,0 +1,67 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const readEntriesMock = vi.fn(); + +vi.mock("../session/storage.js", () => ({ + readEntries: (sessionId: string) => readEntriesMock(sessionId), +})); + +import { readLatestAssistantReply } from "./announce.js"; + +describe("readLatestAssistantReply", () => { + beforeEach(() => { + readEntriesMock.mockReset(); + }); + + it("returns the latest non-empty assistant text when the last assistant message is tool-only", () => { + readEntriesMock.mockReturnValue([ + { + type: "message", + timestamp: 1, + message: { + role: "assistant", + content: [{ type: "text", text: "南京天气:晴,12°C。" }], + }, + }, + { + type: "message", + timestamp: 2, + message: { + role: "assistant", + content: [{ type: "toolCall", id: "tool-1", name: "weather", arguments: { city: "Nanjing" } }], + }, + }, + ]); + + const result = readLatestAssistantReply("child-session"); + expect(result).toBe("南京天气:晴,12°C。"); + }); + + it("falls back to latest toolResult text when no assistant text exists", () => { + readEntriesMock.mockReturnValue([ + { + type: "message", + timestamp: 1, + message: { + role: "assistant", + content: [{ type: "toolCall", id: "tool-2", name: "weather", arguments: { city: "Nanjing" } }], + }, + }, + { + type: "message", + timestamp: 2, + message: { + role: "toolResult", + toolCallId: "tool-2", + toolName: "weather", + content: [{ type: "text", text: "{\"city\":\"Nanjing\",\"tempC\":12,\"condition\":\"Sunny\"}" }], + isError: false, + }, + }, + ]); + + const result = readLatestAssistantReply("child-session"); + expect(result).toContain("\"city\":\"Nanjing\""); + expect(result).toContain("\"condition\":\"Sunny\""); + }); +}); diff --git a/src/agent/subagent/announce.test.ts b/src/agent/subagent/announce.test.ts index faf6ca35..f532dd5c 100644 --- a/src/agent/subagent/announce.test.ts +++ b/src/agent/subagent/announce.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from "vitest"; -import { buildSubagentSystemPrompt, formatAnnouncementMessage } from "./announce.js"; +import { buildSubagentSystemPrompt, formatAnnouncementMessage, formatCoalescedAnnouncementMessage } from "./announce.js"; import type { FormatAnnouncementParams } from "./announce.js"; +import type { SubagentRunRecord } from "./types.js"; describe("buildSubagentSystemPrompt", () => { it("includes task and session context", () => { @@ -126,3 +127,128 @@ describe("formatAnnouncementMessage", () => { expect(msg).toContain("NO_REPLY"); }); }); + +describe("formatCoalescedAnnouncementMessage", () => { + function makeRecord(overrides: Partial = {}): SubagentRunRecord { + return { + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Default task", + cleanup: "delete", + createdAt: 1000000, + startedAt: 1000000, + endedAt: 1030000, + outcome: { status: "ok" }, + findings: "Some findings", + findingsCaptured: true, + announced: false, + ...overrides, + }; + } + + it("delegates to formatAnnouncementMessage for a single record", () => { + const record = makeRecord({ label: "Code Analysis" }); + const coalesced = formatCoalescedAnnouncementMessage([record]); + const direct = formatAnnouncementMessage({ + runId: record.runId, + childSessionId: record.childSessionId, + requesterSessionId: record.requesterSessionId, + task: record.task, + label: record.label, + cleanup: record.cleanup, + outcome: record.outcome, + startedAt: record.startedAt, + endedAt: record.endedAt, + findings: record.findings, + }); + + expect(coalesced).toBe(direct); + }); + + it("formats multiple records with all task findings and stats", () => { + const records = [ + makeRecord({ + runId: "run-1", + childSessionId: "child-1", + label: "Task A", + findings: "Found issue A", + startedAt: 1000000, + endedAt: 1030000, + }), + makeRecord({ + runId: "run-2", + childSessionId: "child-2", + label: "Task B", + findings: "Found issue B", + startedAt: 1000000, + endedAt: 1045000, // 45 seconds + }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("All 2 background tasks have completed"); + expect(msg).toContain('Task 1: "Task A"'); + expect(msg).toContain("Found issue A"); + expect(msg).toContain('Task 2: "Task B"'); + expect(msg).toContain("Found issue B"); + expect(msg).toContain("Total wall time: 45s"); + expect(msg).toContain("2 succeeded, 0 failed"); + }); + + it("reports mixed outcomes correctly", () => { + const records = [ + makeRecord({ runId: "run-1", label: "OK Task", outcome: { status: "ok" } }), + makeRecord({ runId: "run-2", label: "Failed Task", outcome: { status: "error", error: "crash" } }), + makeRecord({ runId: "run-3", label: "Timeout Task", outcome: { status: "timeout" } }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("completed successfully"); + expect(msg).toContain("failed: crash"); + expect(msg).toContain("timed out"); + expect(msg).toContain("1 succeeded, 2 failed"); + }); + + it("shows (no output) for missing findings", () => { + const records = [ + makeRecord({ runId: "run-1", findings: undefined }), + makeRecord({ runId: "run-2", findings: "Has output" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("(no output)"); + expect(msg).toContain("Has output"); + }); + + it("includes combined summary instruction for multi-record", () => { + const records = [ + makeRecord({ runId: "run-1" }), + makeRecord({ runId: "run-2" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("MUST include findings from every task item above"); + expect(msg).toContain("NO_REPLY"); + }); + + it("includes raw findings for every task in coalesced payload", () => { + const records = [ + makeRecord({ runId: "run-1", label: "南京天气", findings: "南京:晴,12°C" }), + makeRecord({ runId: "run-2", label: "上海天气", findings: "上海:多云,9°C" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).toContain("Raw findings from each task (MUST cover all items):"); + expect(msg).toContain("[1] 南京天气:"); + expect(msg).toContain("南京:晴,12°C"); + expect(msg).toContain("[2] 上海天气:"); + expect(msg).toContain("上海:多云,9°C"); + expect(msg).toContain("MUST include findings from every task item above"); + }); +}); diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index fbfa9800..54c92e75 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -13,6 +13,7 @@ import { buildSystemPrompt } from "../system-prompt/index.js"; import type { SubagentAnnounceParams, SubagentRunOutcome, + SubagentRunRecord, SubagentSystemPromptParams, } from "./types.js"; @@ -38,19 +39,29 @@ export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): s */ export function readLatestAssistantReply(sessionId: string): string | undefined { const entries = readEntries(sessionId); + let latestToolResultText: string | undefined; - // Walk backwards to find last assistant message + // Walk backwards to find the last non-empty assistant reply. + // If no assistant text exists (e.g. run ended after tool execution), + // fall back to the latest non-empty toolResult content. for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i]!; if (entry.type !== "message") continue; const message = entry.message; - if (message.role !== "assistant") continue; + if (message.role === "assistant") { + const text = extractAssistantText(message); + if (text) return text; + continue; + } - return extractAssistantText(message); + if (message.role === "toolResult" && !latestToolResultText) { + const text = extractToolResultText(message); + if (text) latestToolResultText = text; + } } - return undefined; + return latestToolResultText; } /** @@ -58,7 +69,17 @@ export function readLatestAssistantReply(sessionId: string): string | undefined * AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[]. */ function extractAssistantText(message: { role: string; content: unknown }): string { - const content = message.content; + return extractTextLikeContent(message.content); +} + +/** + * Extract text content from a toolResult message. + */ +function extractToolResultText(message: { role: string; content: unknown }): string { + return extractTextLikeContent(message.content); +} + +function extractTextLikeContent(content: unknown): string { if (typeof content === "string") { return sanitizeText(content); } @@ -67,8 +88,9 @@ function extractAssistantText(message: { role: string; content: unknown }): stri const textParts: string[] = []; for (const block of content) { - if (block && typeof block === "object" && "type" in block && block.type === "text" && "text" in block) { - textParts.push(String(block.text)); + if (!block || typeof block !== "object") continue; + if ("text" in block) { + textParts.push(String((block as { text: unknown }).text)); } } @@ -167,11 +189,126 @@ export function formatAnnouncementMessage(params: FormatAnnouncementParams): str return parts.join("\n"); } +/** + * Format a coalesced announcement message from multiple completed subagent runs. + * When only one record is provided, delegates to formatAnnouncementMessage. + */ +export function formatCoalescedAnnouncementMessage( + records: SubagentRunRecord[], +): string { + // Single record: delegate to existing format for backward-compatible behavior + if (records.length === 1) { + const r = records[0]!; + return formatAnnouncementMessage({ + runId: r.runId, + childSessionId: r.childSessionId, + requesterSessionId: r.requesterSessionId, + task: r.task, + label: r.label, + cleanup: r.cleanup, + outcome: r.outcome, + startedAt: r.startedAt, + endedAt: r.endedAt, + findings: r.findings, + }); + } + + // Multiple records: build combined message. + // Include a strict raw-findings section so parent can reliably cover every task result. + const parts: string[] = [ + `All ${records.length} background tasks have completed. Here are the combined results:`, + "", + ]; + + for (let i = 0; i < records.length; i++) { + const r = records[i]!; + const displayName = r.label || r.task.slice(0, 60); + const statusLabel = formatStatusLabel(r.outcome); + const durationStr = (r.startedAt && r.endedAt) + ? ` (${formatDuration(r.startedAt, r.endedAt)})` + : ""; + + parts.push( + `### Task ${i + 1}: "${displayName}"`, + `Status: ${statusLabel}${durationStr}`, + "", + "Findings:", + r.findings || "(no output)", + "", + ); + } + + // Overall stats + const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[]; + const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[]; + if (allStartTimes.length > 0 && allEndTimes.length > 0) { + const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes)); + parts.push(`Total wall time: ${wallTime}`); + } + + const okCount = records.filter(r => r.outcome?.status === "ok").length; + const failCount = records.length - okCount; + parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`); + + parts.push("", "Raw findings from each task (MUST cover all items):", ""); + for (let i = 0; i < records.length; i++) { + const r = records[i]!; + const displayName = r.label || r.task.slice(0, 60); + parts.push( + `[${i + 1}] ${displayName}:`, + r.findings || "(no output)", + "", + ); + } + + parts.push( + "", + "Summarize these results naturally for the user.", + "You MUST include findings from every task item above, without omission.", + "Keep it concise, but preserve concrete findings from each task.", + "Do not mention technical details like session IDs or that these were background tasks.", + "You can respond with NO_REPLY if no announcement is needed.", + ); + + return parts.join("\n"); +} + +/** + * Run the coalesced announcement flow for all completed runs of a requester. + * Formats a single combined message and delivers it to the parent agent. + */ +export function runCoalescedAnnounceFlow( + requesterSessionId: string, + records: SubagentRunRecord[], +): boolean { + const message = formatCoalescedAnnouncementMessage(records); + + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, + ); + return false; + } + + parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); + return true; + } catch (err) { + console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); + return false; + } +} + /** * Run the full subagent announcement flow: * 1. Read child's last assistant reply * 2. Format announcement message * 3. Send to parent agent via Hub + * + * @deprecated Use runCoalescedAnnounceFlow instead, which supports + * batching multiple completed runs into a single announcement. */ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean { const { requesterSessionId, childSessionId } = params; @@ -204,7 +341,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean return false; } - parentAgent.write(message); + parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to announce to parent:`, err); diff --git a/src/agent/subagent/index.ts b/src/agent/subagent/index.ts index 2785d86e..7a934229 100644 --- a/src/agent/subagent/index.ts +++ b/src/agent/subagent/index.ts @@ -28,6 +28,8 @@ export { readLatestAssistantReply, formatAnnouncementMessage, runSubagentAnnounceFlow, + formatCoalescedAnnouncementMessage, + runCoalescedAnnounceFlow, } from "./announce.js"; export type { FormatAnnouncementParams } from "./announce.js"; diff --git a/src/agent/subagent/registry-recovery.test.ts b/src/agent/subagent/registry-recovery.test.ts new file mode 100644 index 00000000..046db13f --- /dev/null +++ b/src/agent/subagent/registry-recovery.test.ts @@ -0,0 +1,75 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { SubagentRunRecord } from "./types.js"; + +const loadSubagentRunsMock = vi.fn<() => Map>(); +const saveSubagentRunsMock = vi.fn(); +const readLatestAssistantReplyMock = vi.fn(); +const runCoalescedAnnounceFlowMock = vi.fn(() => false); +const resolveSessionDirMock = vi.fn((sessionId: string) => `/tmp/${sessionId}`); +const closeAgentMock = vi.fn(); +const getHubMock = vi.fn(() => ({ closeAgent: closeAgentMock })); +const rmSyncMock = vi.fn(); + +vi.mock("./registry-store.js", () => ({ + loadSubagentRuns: loadSubagentRunsMock, + saveSubagentRuns: saveSubagentRunsMock, +})); + +vi.mock("./announce.js", () => ({ + readLatestAssistantReply: readLatestAssistantReplyMock, + runCoalescedAnnounceFlow: runCoalescedAnnounceFlowMock, +})); + +vi.mock("../session/storage.js", () => ({ + resolveSessionDir: resolveSessionDirMock, +})); + +vi.mock("../../hub/hub-singleton.js", () => ({ + getHub: getHubMock, + isHubInitialized: vi.fn(() => false), +})); + +vi.mock("node:fs", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + rmSync: rmSyncMock, + }; +}); + +describe("subagent registry recovery cleanup", () => { + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + loadSubagentRunsMock.mockReturnValue(new Map()); + runCoalescedAnnounceFlowMock.mockReturnValue(false); + }); + + it("deletes child session on recovery even when findings were already captured", async () => { + const now = Date.now(); + const record: SubagentRunRecord = { + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "task", + cleanup: "delete", + createdAt: now - 1000, + startedAt: now - 900, + endedAt: now - 100, + outcome: { status: "ok" }, + findings: "done", + findingsCaptured: true, + cleanupHandled: false, + announced: false, + }; + + loadSubagentRunsMock.mockReturnValue(new Map([["run-1", record]])); + + const registry = await import("./registry.js"); + registry.initSubagentRegistry(); + + expect(readLatestAssistantReplyMock).not.toHaveBeenCalled(); + expect(resolveSessionDirMock).toHaveBeenCalledWith("child-1"); + expect(rmSyncMock).toHaveBeenCalledWith("/tmp/child-1", { recursive: true, force: true }); + }); +}); diff --git a/src/agent/subagent/registry-store.test.ts b/src/agent/subagent/registry-store.test.ts index 7247203c..bd7a5814 100644 --- a/src/agent/subagent/registry-store.test.ts +++ b/src/agent/subagent/registry-store.test.ts @@ -78,4 +78,50 @@ describe("registry-store serialization", () => { expect(parsed.outcome?.status).toBe("error"); expect(parsed.outcome?.error).toBe("Something went wrong"); }); + + it("round-trips coalescing fields (findings, findingsCaptured, announced)", () => { + const record: SubagentRunRecord = { + runId: "run-coalesce", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Coalesce test", + cleanup: "delete", + createdAt: Date.now(), + endedAt: Date.now() + 5000, + outcome: { status: "ok" }, + findings: "Found 3 issues in auth module.", + findingsCaptured: true, + announced: true, + }; + + const json = JSON.stringify({ version: 1, runs: { "run-coalesce": record } }); + const parsed = JSON.parse(json); + const restored = parsed.runs["run-coalesce"] as SubagentRunRecord; + + expect(restored.findings).toBe("Found 3 issues in auth module."); + expect(restored.findingsCaptured).toBe(true); + expect(restored.announced).toBe(true); + }); + + it("round-trips record with undefined coalescing fields", () => { + const record: SubagentRunRecord = { + runId: "run-old", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Old record", + cleanup: "delete", + createdAt: Date.now(), + cleanupHandled: true, + // No findings, findingsCaptured, or announced fields (old format) + }; + + const json = JSON.stringify({ version: 1, runs: { "run-old": record } }); + const parsed = JSON.parse(json); + const restored = parsed.runs["run-old"] as SubagentRunRecord; + + expect(restored.findings).toBeUndefined(); + expect(restored.findingsCaptured).toBeUndefined(); + expect(restored.announced).toBeUndefined(); + expect(restored.cleanupHandled).toBe(true); + }); }); diff --git a/src/agent/subagent/registry.test.ts b/src/agent/subagent/registry.test.ts index dda78917..2169564b 100644 --- a/src/agent/subagent/registry.test.ts +++ b/src/agent/subagent/registry.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, beforeEach } from "vitest"; +import { describe, it, expect, beforeEach, vi } from "vitest"; import { registerSubagentRun, listSubagentRuns, @@ -159,3 +159,118 @@ describe("subagent registry", () => { expect(getSubagentRun("run-1")).toBeUndefined(); }); }); + +describe("subagent registry — coalescing", () => { + // Without Hub, watchChildAgent ends runs immediately with "Hub not initialized". + // This allows us to test the coalescing state transitions. + + it("captures findings when a run completes (no Hub)", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task 1", + }); + + const record = getSubagentRun("run-1"); + // Run ended immediately due to no Hub + expect(record?.endedAt).toBeGreaterThan(0); + expect(record?.findingsCaptured).toBe(true); + }); + + it("does not announce while sibling runs are still pending", () => { + // Register first run — ends immediately (no Hub) + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task 1", + }); + + const record1 = getSubagentRun("run-1"); + expect(record1?.findingsCaptured).toBe(true); + + // Register second run — also ends immediately + registerSubagentRun({ + runId: "run-2", + childSessionId: "child-2", + requesterSessionId: "parent-1", + task: "Task 2", + }); + + const record2 = getSubagentRun("run-2"); + expect(record2?.findingsCaptured).toBe(true); + + // Both ended, but announce fails because no Hub for parent agent. + // The key check: both records should have findings captured. + // announced will be false because runCoalescedAnnounceFlow fails (no Hub). + expect(record1?.announced).toBeUndefined(); + expect(record2?.announced).toBeUndefined(); + }); + + it("single run captures findings immediately", () => { + registerSubagentRun({ + runId: "run-solo", + childSessionId: "child-solo", + requesterSessionId: "parent-solo", + task: "Solo task", + }); + + const record = getSubagentRun("run-solo"); + expect(record?.endedAt).toBeGreaterThan(0); + expect(record?.findingsCaptured).toBe(true); + expect(record?.outcome?.status).toBe("error"); + expect(record?.outcome?.error).toContain("Hub not initialized"); + }); + + it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", () => { + registerSubagentRun({ + runId: "run-1", + childSessionId: "child-1", + requesterSessionId: "parent-1", + task: "Task", + }); + + const record = getSubagentRun("run-1"); + if (record) { + // Simulate: run ended but findings not yet captured + record.endedAt = Date.now(); + record.outcome = { status: "ok" }; + record.findingsCaptured = undefined; + } + + shutdownSubagentRegistry(); + + expect(record?.findingsCaptured).toBe(true); + }); +}); + +describe("subagent registry — post-announce cleanup", () => { + it("removes runs from registry after successful announcement", async () => { + // Mock runCoalescedAnnounceFlow to succeed + const announceModule = await import("./announce.js"); + const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); + + // Register two runs for the same parent — both end immediately (no Hub) + registerSubagentRun({ + runId: "run-a", + childSessionId: "child-a", + requesterSessionId: "parent-1", + task: "Task A", + }); + registerSubagentRun({ + runId: "run-b", + childSessionId: "child-b", + requesterSessionId: "parent-1", + task: "Task B", + }); + + // Both runs should have been announced and removed from registry + expect(spy).toHaveBeenCalled(); + expect(getSubagentRun("run-a")).toBeUndefined(); + expect(getSubagentRun("run-b")).toBeUndefined(); + expect(listSubagentRuns("parent-1")).toHaveLength(0); + + spy.mockRestore(); + }); +}); diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index d6f76b94..0f03002a 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -7,7 +7,7 @@ import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; -import { runSubagentAnnounceFlow } from "./announce.js"; +import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js"; import type { RegisterSubagentRunParams, SubagentRunRecord, @@ -27,7 +27,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000; const subagentRuns = new Map(); let sweepTimer: ReturnType | undefined; -const resumedRuns = new Set(); +const resumedRequesters = new Set(); // ============================================================================ // Public API @@ -39,25 +39,45 @@ export function initSubagentRegistry(): void { for (const [runId, record] of persisted) { subagentRuns.set(runId, record); - // Resume incomplete runs - if (!record.cleanupHandled) { - if (record.endedAt) { - // Completed but cleanup not done — run announce flow - if (!resumedRuns.has(runId)) { - resumedRuns.add(runId); - handleRunCompletion(record); - } - } else { - // If not ended, the child agent session is lost on restart — - // mark as ended with unknown outcome - record.endedAt = Date.now(); - record.outcome = { status: "unknown" }; - persist(); - if (!resumedRuns.has(runId)) { - resumedRuns.add(runId); - handleRunCompletion(record); - } - } + // Backward compat: old records with cleanupHandled but no announced field + if (record.cleanupHandled && record.announced === undefined) { + record.announced = true; + record.findingsCaptured = true; + } + } + + // Process incomplete runs + const affectedRequesters = new Set(); + + for (const record of subagentRuns.values()) { + if (record.announced && record.cleanupHandled) continue; // Already fully done + + if (!record.endedAt) { + // Child was running when process crashed — mark as ended/unknown + record.endedAt = Date.now(); + record.outcome = { status: "unknown" }; + } + + if (!record.findingsCaptured) { + captureFindings(record); + } + + // Recovery cleanup must be independent from findings capture: + // the process may crash after captureFindings() persisted but before deletion. + if (record.cleanup === "delete" && !record.cleanupHandled) { + deleteChildSession(record.childSessionId); + } + + affectedRequesters.add(record.requesterSessionId); + } + + persist(); + + // For each affected requester, check if coalesced announcement is needed + for (const requesterId of affectedRequesters) { + if (!resumedRequesters.has(requesterId)) { + resumedRequesters.add(requesterId); + checkAndAnnounce(requesterId); } } @@ -138,11 +158,17 @@ export function shutdownSubagentRegistry(): void { record.outcome = { status: "unknown" }; updated++; } + + // Opportunistically capture findings for ended-but-uncaptured runs + if (record.endedAt && !record.findingsCaptured) { + captureFindings(record); + updated++; + } } if (updated > 0) { persist(); - console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`); + console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`); } stopSweeper(); @@ -151,7 +177,7 @@ export function shutdownSubagentRegistry(): void { /** Reset all state (for testing). */ export function resetSubagentRegistryForTests(): void { subagentRuns.clear(); - resumedRuns.clear(); + resumedRequesters.clear(); stopSweeper(); } @@ -222,44 +248,76 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): vo } // ============================================================================ -// Cleanup + Announce +// Cleanup + Announce (two-phase: capture findings, then coalesced announce) // ============================================================================ -function handleRunCompletion(record: SubagentRunRecord): void { - if (record.cleanupHandled) return; - record.cleanupHandled = true; +/** Phase 1: Capture child's findings before session deletion. */ +function captureFindings(record: SubagentRunRecord): void { + try { + const findings = readLatestAssistantReply(record.childSessionId); + record.findings = findings ?? undefined; + } catch { + record.findings = "(failed to read findings)"; + } + record.findingsCaptured = true; persist(); +} - // Run announce flow - const announced = runSubagentAnnounceFlow({ - runId: record.runId, - childSessionId: record.childSessionId, - requesterSessionId: record.requesterSessionId, - task: record.task, - label: record.label, - cleanup: record.cleanup, - outcome: record.outcome, - startedAt: record.startedAt, - endedAt: record.endedAt, - }); +/** + * Phase 2: Check if all unannounced runs for this requester have completed. + * If so, send a single coalesced announcement to the parent. + */ +function checkAndAnnounce(requesterSessionId: string): void { + const allRuns = listSubagentRuns(requesterSessionId); - if (!announced) { - console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`); - // Allow retry on next restart if announce failed. - record.cleanupHandled = false; + // Only consider unannounced runs + const pending = allRuns.filter(r => !r.announced); + if (pending.length === 0) return; + + // Are all unannounced runs done? + const allDone = pending.every(r => r.endedAt !== undefined); + if (!allDone) return; + + // Have all had findings captured? + const allCaptured = pending.every(r => r.findingsCaptured); + if (!allCaptured) return; + + // All done — send coalesced announcement + const announced = runCoalescedAnnounceFlow(requesterSessionId, pending); + + if (announced) { + for (const r of pending) { + r.announced = true; + r.cleanupHandled = true; + // Remove from registry immediately — findings already delivered to parent + subagentRuns.delete(r.runId); + } persist(); - return; + if (subagentRuns.size === 0) { + stopSweeper(); + } + } else { + console.warn( + `[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`, + ); + // Leave announced=false so initSubagentRegistry() can retry on restart + } +} + +/** Entry point: called when a child completes. */ +function handleRunCompletion(record: SubagentRunRecord): void { + // Phase 1: capture findings (before session deletion) + if (!record.findingsCaptured) { + captureFindings(record); + + // Session cleanup (safe now that findings are persisted) + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); + } } - // Handle session cleanup - if (record.cleanup === "delete") { - deleteChildSession(record.childSessionId); - } - - // Schedule archive - record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; - record.cleanupCompletedAt = Date.now(); - persist(); + // Phase 2: coalesced announce check + checkAndAnnounce(record.requesterSessionId); } function deleteChildSession(sessionId: string): void { @@ -305,7 +363,6 @@ function sweep(): void { for (const [runId, record] of subagentRuns) { if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) { subagentRuns.delete(runId); - resumedRuns.delete(runId); removed++; } } diff --git a/src/agent/subagent/types.ts b/src/agent/subagent/types.ts index d4043572..8edfc0dc 100644 --- a/src/agent/subagent/types.ts +++ b/src/agent/subagent/types.ts @@ -39,6 +39,12 @@ export type SubagentRunRecord = { cleanupHandled?: boolean | undefined; /** Timestamp when cleanup completed */ cleanupCompletedAt?: number | undefined; + /** Captured findings from the child session's last assistant reply */ + findings?: string | undefined; + /** Whether findings have been captured (safe to delete session after this) */ + findingsCaptured?: boolean | undefined; + /** Whether the coalesced announcement has been sent to parent */ + announced?: boolean | undefined; }; /** Parameters for registering a new subagent run */ diff --git a/src/agent/tools.ts b/src/agent/tools.ts index 4385a534..fdb7ad1f 100644 --- a/src/agent/tools.ts +++ b/src/agent/tools.ts @@ -7,6 +7,7 @@ import { createProcessTool } from "./tools/process.js"; import { createGlobTool } from "./tools/glob.js"; import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js"; import { createSessionsSpawnTool } from "./tools/sessions-spawn.js"; +import { createSessionsListTool } from "./tools/sessions-list.js"; import { createMemorySearchTool } from "./tools/memory-search.js"; import { createCronTool } from "./tools/cron/index.js"; import { filterTools } from "./tools/policy.js"; @@ -133,6 +134,10 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool< }); tools.push(sessionsSpawnTool as AgentTool); + // Add sessions_list tool + const sessionsListTool = createSessionsListTool({ sessionId }); + tools.push(sessionsListTool as AgentTool); + return tools; } diff --git a/src/agent/tools/exec.ts b/src/agent/tools/exec.ts index 826795f6..41b51550 100644 --- a/src/agent/tools/exec.ts +++ b/src/agent/tools/exec.ts @@ -164,9 +164,22 @@ export function createExecTool( // Don't reject, let close event handle }); + // Signal handling: don't kill if already backgrounded + const onAbort = signal ? () => { + if (yielded) return; // Already backgrounded, ignore abort + if (timeout) clearTimeout(timeout); + if (yieldTimer) clearTimeout(yieldTimer); + child.kill("SIGTERM"); + } : undefined; + + if (signal && onAbort) { + signal.addEventListener("abort", onAbort, { once: true }); + } + child.on("close", (code) => { if (timeout) clearTimeout(timeout); if (yieldTimer) clearTimeout(yieldTimer); + if (signal && onAbort) signal.removeEventListener("abort", onAbort); // If already backgrounded, don't resolve again if (yielded) return; @@ -202,16 +215,6 @@ export function createExecTool( }, }); }); - - // Signal handling: don't kill if already backgrounded - if (signal) { - signal.addEventListener("abort", () => { - if (yielded) return; // Already backgrounded, ignore abort - if (timeout) clearTimeout(timeout); - if (yieldTimer) clearTimeout(yieldTimer); - child.kill("SIGTERM"); - }); - } }); }, }; diff --git a/src/agent/tools/groups.ts b/src/agent/tools/groups.ts index b2430cb4..f61c9037 100644 --- a/src/agent/tools/groups.ts +++ b/src/agent/tools/groups.ts @@ -34,7 +34,7 @@ export const TOOL_GROUPS: Record = { "group:memory": ["memory_search"], // Subagent tools - "group:subagent": ["sessions_spawn"], + "group:subagent": ["sessions_spawn", "sessions_list"], // Cron/scheduling tools "group:cron": ["cron"], diff --git a/src/agent/tools/index.ts b/src/agent/tools/index.ts index 5e4902ea..e225c54c 100644 --- a/src/agent/tools/index.ts +++ b/src/agent/tools/index.ts @@ -8,6 +8,7 @@ export { createProcessTool } from "./process.js"; export { createGlobTool } from "./glob.js"; export { createWebFetchTool, createWebSearchTool } from "./web/index.js"; export { createCronTool } from "./cron/index.js"; +export { createSessionsListTool } from "./sessions-list.js"; // Tool groups export { diff --git a/src/agent/tools/process.ts b/src/agent/tools/process.ts index 962b6a13..ad0fecb0 100644 --- a/src/agent/tools/process.ts +++ b/src/agent/tools/process.ts @@ -112,7 +112,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool { child.kill("SIGTERM"); - }); + }, { once: true }); } resolve({ success: true }); diff --git a/src/agent/tools/sessions-list.test.ts b/src/agent/tools/sessions-list.test.ts new file mode 100644 index 00000000..a0bf6c1a --- /dev/null +++ b/src/agent/tools/sessions-list.test.ts @@ -0,0 +1,169 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import type { SubagentRunRecord } from "../subagent/types.js"; + +// Mock the registry module before importing the tool +vi.mock("../subagent/registry.js", () => ({ + listSubagentRuns: vi.fn(), + getSubagentRun: vi.fn(), +})); + +import { createSessionsListTool } from "./sessions-list.js"; +import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js"; + +const mockListSubagentRuns = vi.mocked(listSubagentRuns); +const mockGetSubagentRun = vi.mocked(getSubagentRun); + +function makeRecord(overrides: Partial = {}): SubagentRunRecord { + return { + runId: "run-001", + childSessionId: "child-001", + requesterSessionId: "parent-001", + task: "Test task", + cleanup: "delete", + createdAt: 1700000000000, + ...overrides, + }; +} + +describe("sessions_list tool", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("returns empty message when no runs exist", async () => { + mockListSubagentRuns.mockReturnValue([]); + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", {}); + + expect(result.content[0]).toEqual({ + type: "text", + text: "No subagent runs for this session.", + }); + expect(result.details).toEqual({ runs: [] }); + }); + + it("lists multiple runs with correct status mapping", async () => { + const now = Date.now(); + const runs: SubagentRunRecord[] = [ + makeRecord({ + runId: "run-aaa", + label: "Code Review", + startedAt: now - 45000, + }), + makeRecord({ + runId: "run-bbb", + label: "Test Analysis", + startedAt: now - 60000, + endedAt: now - 30000, + outcome: { status: "ok" }, + }), + makeRecord({ + runId: "run-ccc", + label: "Lint Check", + startedAt: now - 60000, + endedAt: now, + outcome: { status: "error", error: "timeout" }, + }), + ]; + mockListSubagentRuns.mockReturnValue(runs); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", {}); + + const text = result.content[0]!; + expect(text.type).toBe("text"); + expect((text as { text: string }).text).toContain("3 total"); + expect((text as { text: string }).text).toContain("[running]"); + expect((text as { text: string }).text).toContain("[ok]"); + expect((text as { text: string }).text).toContain("[error]"); + expect((text as { text: string }).text).toContain("Code Review"); + expect((text as { text: string }).text).toContain("Test Analysis"); + expect((text as { text: string }).text).toContain("Lint Check"); + + expect(result.details!.runs).toHaveLength(3); + expect(result.details!.runs[0]!.status).toBe("running"); + expect(result.details!.runs[1]!.status).toBe("ok"); + expect(result.details!.runs[2]!.status).toBe("error"); + }); + + it("returns detail for a specific runId", async () => { + const now = Date.now(); + const record = makeRecord({ + runId: "run-detail", + label: "Deep Analysis", + task: "Analyze the authentication module thoroughly", + startedAt: now - 90000, + endedAt: now - 10000, + outcome: { status: "ok" }, + findings: "Found 2 potential issues in token validation.", + findingsCaptured: true, + }); + mockGetSubagentRun.mockReturnValue(record); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "run-detail" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Run: run-detail"); + expect(text).toContain("Label: Deep Analysis"); + expect(text).toContain("Status: ok"); + expect(text).toContain("Found 2 potential issues"); + expect(text).toContain("Duration:"); + + expect(result.details!.runs).toHaveLength(1); + expect(result.details!.runs[0]!.runId).toBe("run-detail"); + }); + + it("returns not found for unknown runId", async () => { + mockGetSubagentRun.mockReturnValue(undefined); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "nonexistent" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Run not found"); + expect(result.details).toEqual({ runs: [] }); + }); + + it("rejects runId belonging to a different requester", async () => { + const record = makeRecord({ + runId: "run-other", + requesterSessionId: "other-parent", + }); + mockGetSubagentRun.mockReturnValue(record); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "run-other" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Run not found"); + expect(result.details).toEqual({ runs: [] }); + }); + + it("handles missing sessionId gracefully", async () => { + const tool = createSessionsListTool({}); + const result = await tool.execute("call-1", {}); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("No session ID available"); + expect(result.details).toEqual({ runs: [] }); + }); + + it("shows findings status for running task", async () => { + const now = Date.now(); + const record = makeRecord({ + runId: "run-running", + label: "Still Running", + startedAt: now - 30000, + // no endedAt + }); + mockGetSubagentRun.mockReturnValue(record); + + const tool = createSessionsListTool({ sessionId: "parent-001" }); + const result = await tool.execute("call-1", { runId: "run-running" }); + + const text = (result.content[0] as { text: string }).text; + expect(text).toContain("Status: running"); + expect(text).toContain("Findings: (still running)"); + }); +}); diff --git a/src/agent/tools/sessions-list.ts b/src/agent/tools/sessions-list.ts new file mode 100644 index 00000000..1106b21e --- /dev/null +++ b/src/agent/tools/sessions-list.ts @@ -0,0 +1,187 @@ +/** + * sessions_list tool — allows an agent to view its spawned subagent runs. + * + * Lists all subagent runs for the current session, or shows details for a + * specific run when a runId is provided. + */ + +import { Type } from "@sinclair/typebox"; +import type { AgentTool } from "@mariozechner/pi-agent-core"; +import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js"; +import type { SubagentRunRecord } from "../subagent/types.js"; + +const SessionsListSchema = Type.Object({ + runId: Type.Optional( + Type.String({ description: "Optional run ID to get details for a specific run. If omitted, lists all runs." }), + ), +}); + +type SessionsListArgs = { + runId?: string; +}; + +export type SessionsListResult = { + runs: Array<{ + runId: string; + label?: string; + task: string; + status: "running" | "ok" | "error" | "timeout" | "unknown"; + startedAt?: number; + endedAt?: number; + findings?: string; + }>; +}; + +export interface CreateSessionsListToolOptions { + /** Session ID of the current (requester) agent */ + sessionId?: string; +} + +function resolveStatus(record: SubagentRunRecord): "running" | "ok" | "error" | "timeout" | "unknown" { + if (!record.endedAt) return "running"; + return record.outcome?.status ?? "unknown"; +} + +function formatElapsed(ms: number): string { + const totalSeconds = Math.round(ms / 1000); + if (totalSeconds < 60) return `${totalSeconds}s`; + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`; + const hours = Math.floor(minutes / 60); + const remainingMinutes = minutes % 60; + return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`; +} + +function formatRunSummary(record: SubagentRunRecord, index: number, now: number): string { + const status = resolveStatus(record); + const displayName = record.label || record.task.slice(0, 60); + const statusTag = `[${status}]`.padEnd(10); + + let timing = ""; + if (status === "running" && record.startedAt) { + timing = `started ${formatElapsed(now - record.startedAt)} ago`; + } else if (record.startedAt && record.endedAt) { + timing = `completed in ${formatElapsed(record.endedAt - record.startedAt)}`; + } + + const parts = [`#${index + 1} ${statusTag} "${displayName}"`]; + if (timing) parts.push(`(${record.runId.slice(0, 8)}…, ${timing})`); + else parts.push(`(${record.runId.slice(0, 8)}…)`); + + return parts.join(" "); +} + +function formatRunDetail(record: SubagentRunRecord, now: number): string { + const status = resolveStatus(record); + const lines: string[] = [ + `Run: ${record.runId}`, + ]; + + if (record.label) lines.push(`Label: ${record.label}`); + lines.push(`Task: ${record.task}`); + lines.push(`Status: ${status}${record.outcome?.error ? ` — ${record.outcome.error}` : ""}`); + lines.push(`Child Session: ${record.childSessionId}`); + lines.push(`Created: ${new Date(record.createdAt).toISOString()} (${formatElapsed(now - record.createdAt)} ago)`); + + if (record.startedAt) { + lines.push(`Started: ${new Date(record.startedAt).toISOString()} (${formatElapsed(now - record.startedAt)} ago)`); + } + if (record.endedAt) { + lines.push(`Ended: ${new Date(record.endedAt).toISOString()}`); + if (record.startedAt) { + lines.push(`Duration: ${formatElapsed(record.endedAt - record.startedAt)}`); + } + } + + if (record.findingsCaptured) { + lines.push(`Findings: ${record.findings || "(no output)"}`); + } else if (record.endedAt) { + lines.push("Findings: (not yet captured)"); + } else { + lines.push("Findings: (still running)"); + } + + if (record.announced) lines.push("Announced: yes"); + + return lines.join("\n"); +} + +function toResultRun(record: SubagentRunRecord) { + return { + runId: record.runId, + label: record.label, + task: record.task, + status: resolveStatus(record), + startedAt: record.startedAt, + endedAt: record.endedAt, + findings: record.findings, + }; +} + +export function createSessionsListTool( + options: CreateSessionsListToolOptions, +): AgentTool { + return { + name: "sessions_list", + label: "List Subagent Runs", + description: + "List all subagent runs spawned by this session and their current status. " + + "Optionally pass a runId to get detailed information about a specific run.", + parameters: SessionsListSchema, + execute: async (_toolCallId, args) => { + const { runId } = args as SessionsListArgs; + const requesterSessionId = options.sessionId; + + if (!requesterSessionId) { + return { + content: [{ type: "text", text: "No session ID available. Cannot list subagent runs." }], + details: { runs: [] }, + }; + } + + const now = Date.now(); + + // Detail mode: specific run + if (runId) { + const record = getSubagentRun(runId); + if (!record) { + return { + content: [{ type: "text", text: `Run not found: ${runId}` }], + details: { runs: [] }, + }; + } + if (record.requesterSessionId !== requesterSessionId) { + return { + content: [{ type: "text", text: `Run not found: ${runId}` }], + details: { runs: [] }, + }; + } + return { + content: [{ type: "text", text: formatRunDetail(record, now) }], + details: { runs: [toResultRun(record)] }, + }; + } + + // List mode: all runs for this session + const runs = listSubagentRuns(requesterSessionId); + + if (runs.length === 0) { + return { + content: [{ type: "text", text: "No subagent runs for this session." }], + details: { runs: [] }, + }; + } + + const lines = [`Subagent runs for this session: ${runs.length} total`, ""]; + for (let i = 0; i < runs.length; i++) { + lines.push(formatRunSummary(runs[i]!, i, now)); + } + + return { + content: [{ type: "text", text: lines.join("\n") }], + details: { runs: runs.map(toResultRun) }, + }; + }, + }; +}