feat(subagent): deferred record cleanup and error propagation

Replace immediate record deletion with archiveAtMs-based deferred
cleanup (60min retention). This keeps records queryable via
sessions_list after completion. Add sweeper to clean expired records.
Check childAgent.lastRunError after waitForIdle to detect failed runs
that resolve the promise without throwing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jiang Bohan 2026-02-11 17:09:42 +08:00
parent 299c947893
commit 2e6d419c27
2 changed files with 58 additions and 33 deletions

View file

@ -267,7 +267,7 @@ describe("subagent registry — coalescing", () => {
});
describe("subagent registry — post-announce cleanup", () => {
it("removes runs from registry after successful announcement", async () => {
it("keeps runs in registry after successful announcement with archiveAtMs", async () => {
// Mock runCoalescedAnnounceFlow to succeed
const announceModule = await import("./announce.js");
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
@ -288,11 +288,20 @@ describe("subagent registry — post-announce cleanup", () => {
await flushQueue();
// Both runs should have been announced and removed from registry
// Both runs should have been announced but kept in registry with archiveAtMs
expect(spy).toHaveBeenCalled();
expect(getSubagentRun("run-a")).toBeUndefined();
expect(getSubagentRun("run-b")).toBeUndefined();
expect(listSubagentRuns("parent-1")).toHaveLength(0);
const runA = getSubagentRun("run-a");
const runB = getSubagentRun("run-b");
expect(runA).toBeDefined();
expect(runB).toBeDefined();
expect(runA!.announced).toBe(true);
expect(runB!.announced).toBe(true);
expect(runA!.archiveAtMs).toBeGreaterThan(Date.now());
expect(runB!.archiveAtMs).toBeGreaterThan(Date.now());
// Records are still queryable
expect(listSubagentRuns("parent-1")).toHaveLength(2);
spy.mockRestore();
});

View file

@ -121,7 +121,9 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
// Enqueue in the subagent lane — the start callback and watchChildAgent
// only execute once a concurrency slot is available.
void enqueueInLane(SubagentLane.Subagent, async () => {
console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`);
start?.();
console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`);
return watchChildAgent(record, timeoutSeconds);
});
@ -248,12 +250,26 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Pr
// Wait for the child agent's task queue to drain (task completion),
// then trigger announce flow. Uses waitForIdle() instead of consuming
// the stream (which would conflict with Hub.consumeAgent).
console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`);
childAgent.waitForIdle().then(
() => cleanup({ status: "ok" }),
(err) => cleanup({
status: "error",
error: err instanceof Error ? err.message : String(err),
}),
() => {
const runtime = Date.now() - (record.startedAt ?? 0);
const runError = childAgent.lastRunError;
if (runError) {
console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`);
cleanup({ status: "error", error: runError });
} else {
console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`);
cleanup({ status: "ok" });
}
},
(err) => {
console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err);
cleanup({
status: "error",
error: err instanceof Error ? err.message : String(err),
});
},
);
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
@ -280,43 +296,43 @@ function captureFindings(record: SubagentRunRecord): void {
}
/**
* Phase 2: Check if all unannounced runs for this requester have completed.
* If so, send a single coalesced announcement to the parent.
* Phase 2: Announce completed-but-unannounced runs immediately.
*
* Does NOT wait for all runs to finish each completed run is announced
* as soon as its findings are captured. The three-tier delivery in
* announce.ts (steer queue direct) handles batching via the
* announce-queue debounce/collect mechanism when multiple runs complete
* close together.
*/
function checkAndAnnounce(requesterSessionId: string): void {
const allRuns = listSubagentRuns(requesterSessionId);
// Only consider unannounced runs
const pending = allRuns.filter(r => !r.announced);
if (pending.length === 0) return;
// Only consider unannounced runs that are done with findings captured
const ready = allRuns.filter(
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured,
);
if (ready.length === 0) return;
// Are all unannounced runs done?
const allDone = pending.every(r => r.endedAt !== undefined);
if (!allDone) return;
// Have all had findings captured?
const allCaptured = pending.every(r => r.findingsCaptured);
if (!allCaptured) return;
// All done — send coalesced announcement
const announced = runCoalescedAnnounceFlow(requesterSessionId, pending);
// Announce all ready runs
const announced = runCoalescedAnnounceFlow(requesterSessionId, ready);
if (announced) {
for (const r of pending) {
for (const r of ready) {
r.announced = true;
r.cleanupHandled = true;
// Remove from registry immediately — findings already delivered to parent
subagentRuns.delete(r.runId);
// Keep records for querying via sessions_list; let sweeper archive later
r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
}
persist();
if (subagentRuns.size === 0) {
stopSweeper();
}
} else {
// Allow retry — mark cleanupHandled false so initSubagentRegistry() retries
for (const r of ready) {
r.cleanupHandled = false;
}
persist();
console.warn(
`[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`,
`[SubagentRegistry] Announce failed for requester ${requesterSessionId}`,
);
// Leave announced=false so initSubagentRegistry() can retry on restart
}
}