diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index fbfa9800..ea541d5b 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -13,6 +13,7 @@ import { buildSystemPrompt } from "../system-prompt/index.js"; import type { SubagentAnnounceParams, SubagentRunOutcome, + SubagentRunRecord, SubagentSystemPromptParams, } from "./types.js"; @@ -167,11 +168,114 @@ export function formatAnnouncementMessage(params: FormatAnnouncementParams): str return parts.join("\n"); } +/** + * Format a coalesced announcement message from multiple completed subagent runs. + * When only one record is provided, delegates to formatAnnouncementMessage. + */ +export function formatCoalescedAnnouncementMessage( + records: SubagentRunRecord[], +): string { + // Single record: delegate to existing format for backward-compatible behavior + if (records.length === 1) { + const r = records[0]!; + return formatAnnouncementMessage({ + runId: r.runId, + childSessionId: r.childSessionId, + requesterSessionId: r.requesterSessionId, + task: r.task, + label: r.label, + cleanup: r.cleanup, + outcome: r.outcome, + startedAt: r.startedAt, + endedAt: r.endedAt, + findings: r.findings, + }); + } + + // Multiple records: build combined message + const parts: string[] = [ + `All ${records.length} background tasks have completed. Here are the combined results:`, + "", + ]; + + for (let i = 0; i < records.length; i++) { + const r = records[i]!; + const displayName = r.label || r.task.slice(0, 60); + const statusLabel = formatStatusLabel(r.outcome); + const durationStr = (r.startedAt && r.endedAt) + ? ` (${formatDuration(r.startedAt, r.endedAt)})` + : ""; + + parts.push( + `### Task ${i + 1}: "${displayName}"`, + `Status: ${statusLabel}${durationStr}`, + "", + "Findings:", + r.findings || "(no output)", + "", + ); + } + + // Overall stats + const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[]; + const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[]; + if (allStartTimes.length > 0 && allEndTimes.length > 0) { + const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes)); + parts.push(`Total wall time: ${wallTime}`); + } + + const okCount = records.filter(r => r.outcome?.status === "ok").length; + const failCount = records.length - okCount; + parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`); + + parts.push( + "", + "Summarize these results naturally for the user.", + "Present the combined findings as a coherent summary, not a list of separate reports.", + "Keep it concise but cover the key findings from each task.", + "Do not mention technical details like session IDs or that these were background tasks.", + "You can respond with NO_REPLY if no announcement is needed.", + ); + + return parts.join("\n"); +} + +/** + * Run the coalesced announcement flow for all completed runs of a requester. + * Formats a single combined message and delivers it to the parent agent. + */ +export function runCoalescedAnnounceFlow( + requesterSessionId: string, + records: SubagentRunRecord[], +): boolean { + const message = formatCoalescedAnnouncementMessage(records); + + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`, + ); + return false; + } + + parentAgent.write(message); + return true; + } catch (err) { + console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); + return false; + } +} + /** * Run the full subagent announcement flow: * 1. Read child's last assistant reply * 2. Format announcement message * 3. Send to parent agent via Hub + * + * @deprecated Use runCoalescedAnnounceFlow instead, which supports + * batching multiple completed runs into a single announcement. */ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean { const { requesterSessionId, childSessionId } = params; diff --git a/src/agent/subagent/index.ts b/src/agent/subagent/index.ts index 2785d86e..7a934229 100644 --- a/src/agent/subagent/index.ts +++ b/src/agent/subagent/index.ts @@ -28,6 +28,8 @@ export { readLatestAssistantReply, formatAnnouncementMessage, runSubagentAnnounceFlow, + formatCoalescedAnnouncementMessage, + runCoalescedAnnounceFlow, } from "./announce.js"; export type { FormatAnnouncementParams } from "./announce.js"; diff --git a/src/agent/subagent/registry.ts b/src/agent/subagent/registry.ts index d6f76b94..980c2793 100644 --- a/src/agent/subagent/registry.ts +++ b/src/agent/subagent/registry.ts @@ -7,7 +7,7 @@ import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; -import { runSubagentAnnounceFlow } from "./announce.js"; +import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js"; import type { RegisterSubagentRunParams, SubagentRunRecord, @@ -27,7 +27,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000; const subagentRuns = new Map(); let sweepTimer: ReturnType | undefined; -const resumedRuns = new Set(); +const resumedRequesters = new Set(); // ============================================================================ // Public API @@ -39,26 +39,43 @@ export function initSubagentRegistry(): void { for (const [runId, record] of persisted) { subagentRuns.set(runId, record); - // Resume incomplete runs - if (!record.cleanupHandled) { - if (record.endedAt) { - // Completed but cleanup not done — run announce flow - if (!resumedRuns.has(runId)) { - resumedRuns.add(runId); - handleRunCompletion(record); - } - } else { - // If not ended, the child agent session is lost on restart — - // mark as ended with unknown outcome - record.endedAt = Date.now(); - record.outcome = { status: "unknown" }; - persist(); - if (!resumedRuns.has(runId)) { - resumedRuns.add(runId); - handleRunCompletion(record); - } + // Backward compat: old records with cleanupHandled but no announced field + if (record.cleanupHandled && record.announced === undefined) { + record.announced = true; + record.findingsCaptured = true; + } + } + + // Process incomplete runs + const affectedRequesters = new Set(); + + for (const record of subagentRuns.values()) { + if (record.announced && record.cleanupHandled) continue; // Already fully done + + if (!record.endedAt) { + // Child was running when process crashed — mark as ended/unknown + record.endedAt = Date.now(); + record.outcome = { status: "unknown" }; + } + + if (!record.findingsCaptured) { + captureFindings(record); + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); } } + + affectedRequesters.add(record.requesterSessionId); + } + + persist(); + + // For each affected requester, check if coalesced announcement is needed + for (const requesterId of affectedRequesters) { + if (!resumedRequesters.has(requesterId)) { + resumedRequesters.add(requesterId); + checkAndAnnounce(requesterId); + } } if (subagentRuns.size > 0) { @@ -138,11 +155,17 @@ export function shutdownSubagentRegistry(): void { record.outcome = { status: "unknown" }; updated++; } + + // Opportunistically capture findings for ended-but-uncaptured runs + if (record.endedAt && !record.findingsCaptured) { + captureFindings(record); + updated++; + } } if (updated > 0) { persist(); - console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`); + console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`); } stopSweeper(); @@ -151,7 +174,7 @@ export function shutdownSubagentRegistry(): void { /** Reset all state (for testing). */ export function resetSubagentRegistryForTests(): void { subagentRuns.clear(); - resumedRuns.clear(); + resumedRequesters.clear(); stopSweeper(); } @@ -222,44 +245,73 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): vo } // ============================================================================ -// Cleanup + Announce +// Cleanup + Announce (two-phase: capture findings, then coalesced announce) // ============================================================================ -function handleRunCompletion(record: SubagentRunRecord): void { - if (record.cleanupHandled) return; - record.cleanupHandled = true; +/** Phase 1: Capture child's findings before session deletion. */ +function captureFindings(record: SubagentRunRecord): void { + try { + const findings = readLatestAssistantReply(record.childSessionId); + record.findings = findings ?? undefined; + } catch { + record.findings = "(failed to read findings)"; + } + record.findingsCaptured = true; persist(); +} - // Run announce flow - const announced = runSubagentAnnounceFlow({ - runId: record.runId, - childSessionId: record.childSessionId, - requesterSessionId: record.requesterSessionId, - task: record.task, - label: record.label, - cleanup: record.cleanup, - outcome: record.outcome, - startedAt: record.startedAt, - endedAt: record.endedAt, - }); +/** + * Phase 2: Check if all unannounced runs for this requester have completed. + * If so, send a single coalesced announcement to the parent. + */ +function checkAndAnnounce(requesterSessionId: string): void { + const allRuns = listSubagentRuns(requesterSessionId); - if (!announced) { - console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`); - // Allow retry on next restart if announce failed. - record.cleanupHandled = false; + // Only consider unannounced runs + const pending = allRuns.filter(r => !r.announced); + if (pending.length === 0) return; + + // Are all unannounced runs done? + const allDone = pending.every(r => r.endedAt !== undefined); + if (!allDone) return; + + // Have all had findings captured? + const allCaptured = pending.every(r => r.findingsCaptured); + if (!allCaptured) return; + + // All done — send coalesced announcement + const announced = runCoalescedAnnounceFlow(requesterSessionId, pending); + + if (announced) { + for (const r of pending) { + r.announced = true; + r.cleanupHandled = true; + r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; + r.cleanupCompletedAt = Date.now(); + } persist(); - return; + } else { + console.warn( + `[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`, + ); + // Leave announced=false so initSubagentRegistry() can retry on restart + } +} + +/** Entry point: called when a child completes. */ +function handleRunCompletion(record: SubagentRunRecord): void { + // Phase 1: capture findings (before session deletion) + if (!record.findingsCaptured) { + captureFindings(record); + + // Session cleanup (safe now that findings are persisted) + if (record.cleanup === "delete") { + deleteChildSession(record.childSessionId); + } } - // Handle session cleanup - if (record.cleanup === "delete") { - deleteChildSession(record.childSessionId); - } - - // Schedule archive - record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; - record.cleanupCompletedAt = Date.now(); - persist(); + // Phase 2: coalesced announce check + checkAndAnnounce(record.requesterSessionId); } function deleteChildSession(sessionId: string): void { @@ -305,7 +357,6 @@ function sweep(): void { for (const [runId, record] of subagentRuns) { if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) { subagentRuns.delete(runId); - resumedRuns.delete(runId); removed++; } } diff --git a/src/agent/subagent/types.ts b/src/agent/subagent/types.ts index d4043572..8edfc0dc 100644 --- a/src/agent/subagent/types.ts +++ b/src/agent/subagent/types.ts @@ -39,6 +39,12 @@ export type SubagentRunRecord = { cleanupHandled?: boolean | undefined; /** Timestamp when cleanup completed */ cleanupCompletedAt?: number | undefined; + /** Captured findings from the child session's last assistant reply */ + findings?: string | undefined; + /** Whether findings have been captured (safe to delete session after this) */ + findingsCaptured?: boolean | undefined; + /** Whether the coalesced announcement has been sent to parent */ + announced?: boolean | undefined; }; /** Parameters for registering a new subagent run */