From 2e6d419c276d1d3d08693cf22b661c56446182ca Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Wed, 11 Feb 2026 17:09:42 +0800 Subject: [PATCH] feat(subagent): deferred record cleanup and error propagation Replace immediate record deletion with archiveAtMs-based deferred cleanup (60min retention). This keeps records queryable via sessions_list after completion. Add sweeper to clean expired records. Check childAgent.lastRunError after waitForIdle to detect failed runs that resolve the promise without throwing. Co-Authored-By: Claude Opus 4.6 --- .../core/src/agent/subagent/registry.test.ts | 19 +++-- packages/core/src/agent/subagent/registry.ts | 72 +++++++++++-------- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/packages/core/src/agent/subagent/registry.test.ts b/packages/core/src/agent/subagent/registry.test.ts index 5d5eacc3..14414e95 100644 --- a/packages/core/src/agent/subagent/registry.test.ts +++ b/packages/core/src/agent/subagent/registry.test.ts @@ -267,7 +267,7 @@ describe("subagent registry — coalescing", () => { }); describe("subagent registry — post-announce cleanup", () => { - it("removes runs from registry after successful announcement", async () => { + it("keeps runs in registry after successful announcement with archiveAtMs", async () => { // Mock runCoalescedAnnounceFlow to succeed const announceModule = await import("./announce.js"); const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); @@ -288,11 +288,20 @@ describe("subagent registry — post-announce cleanup", () => { await flushQueue(); - // Both runs should have been announced and removed from registry + // Both runs should have been announced but kept in registry with archiveAtMs expect(spy).toHaveBeenCalled(); - expect(getSubagentRun("run-a")).toBeUndefined(); - expect(getSubagentRun("run-b")).toBeUndefined(); - expect(listSubagentRuns("parent-1")).toHaveLength(0); + + const runA = getSubagentRun("run-a"); + const runB = getSubagentRun("run-b"); + expect(runA).toBeDefined(); + expect(runB).toBeDefined(); + expect(runA!.announced).toBe(true); + expect(runB!.announced).toBe(true); + expect(runA!.archiveAtMs).toBeGreaterThan(Date.now()); + expect(runB!.archiveAtMs).toBeGreaterThan(Date.now()); + + // Records are still queryable + expect(listSubagentRuns("parent-1")).toHaveLength(2); spy.mockRestore(); }); diff --git a/packages/core/src/agent/subagent/registry.ts b/packages/core/src/agent/subagent/registry.ts index f40a72c0..665eb3fc 100644 --- a/packages/core/src/agent/subagent/registry.ts +++ b/packages/core/src/agent/subagent/registry.ts @@ -121,7 +121,9 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent // Enqueue in the subagent lane — the start callback and watchChildAgent // only execute once a concurrency slot is available. void enqueueInLane(SubagentLane.Subagent, async () => { + console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`); start?.(); + console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`); return watchChildAgent(record, timeoutSeconds); }); @@ -248,12 +250,26 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Pr // Wait for the child agent's task queue to drain (task completion), // then trigger announce flow. Uses waitForIdle() instead of consuming // the stream (which would conflict with Hub.consumeAgent). + console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`); childAgent.waitForIdle().then( - () => cleanup({ status: "ok" }), - (err) => cleanup({ - status: "error", - error: err instanceof Error ? err.message : String(err), - }), + () => { + const runtime = Date.now() - (record.startedAt ?? 0); + const runError = childAgent.lastRunError; + if (runError) { + console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`); + cleanup({ status: "error", error: runError }); + } else { + console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`); + cleanup({ status: "ok" }); + } + }, + (err) => { + console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err); + cleanup({ + status: "error", + error: err instanceof Error ? err.message : String(err), + }); + }, ); // Also handle explicit close (e.g., timeout kill, Hub shutdown) @@ -280,43 +296,43 @@ function captureFindings(record: SubagentRunRecord): void { } /** - * Phase 2: Check if all unannounced runs for this requester have completed. - * If so, send a single coalesced announcement to the parent. + * Phase 2: Announce completed-but-unannounced runs immediately. + * + * Does NOT wait for all runs to finish — each completed run is announced + * as soon as its findings are captured. The three-tier delivery in + * announce.ts (steer → queue → direct) handles batching via the + * announce-queue debounce/collect mechanism when multiple runs complete + * close together. */ function checkAndAnnounce(requesterSessionId: string): void { const allRuns = listSubagentRuns(requesterSessionId); - // Only consider unannounced runs - const pending = allRuns.filter(r => !r.announced); - if (pending.length === 0) return; + // Only consider unannounced runs that are done with findings captured + const ready = allRuns.filter( + r => !r.announced && r.endedAt !== undefined && r.findingsCaptured, + ); + if (ready.length === 0) return; - // Are all unannounced runs done? - const allDone = pending.every(r => r.endedAt !== undefined); - if (!allDone) return; - - // Have all had findings captured? - const allCaptured = pending.every(r => r.findingsCaptured); - if (!allCaptured) return; - - // All done — send coalesced announcement - const announced = runCoalescedAnnounceFlow(requesterSessionId, pending); + // Announce all ready runs + const announced = runCoalescedAnnounceFlow(requesterSessionId, ready); if (announced) { - for (const r of pending) { + for (const r of ready) { r.announced = true; r.cleanupHandled = true; - // Remove from registry immediately — findings already delivered to parent - subagentRuns.delete(r.runId); + // Keep records for querying via sessions_list; let sweeper archive later + r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; } persist(); - if (subagentRuns.size === 0) { - stopSweeper(); - } } else { + // Allow retry — mark cleanupHandled false so initSubagentRegistry() retries + for (const r of ready) { + r.cleanupHandled = false; + } + persist(); console.warn( - `[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`, + `[SubagentRegistry] Announce failed for requester ${requesterSessionId}`, ); - // Leave announced=false so initSubagentRegistry() can retry on restart } }