feat(subagent): integrate command queue and configurable timeout

Wire registry and sessions_spawn through the lane-based queue so
sub-agents respect max concurrency. Add resolveSubagentTimeoutMs()
with defaults (10 min), 0 = no timeout, and safe clamping.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jiang Bohan 2026-02-10 19:27:26 +08:00
parent 240fdd1286
commit 683dfa759b
4 changed files with 99 additions and 56 deletions

View file

@ -7,16 +7,21 @@ import {
resetSubagentRegistryForTests,
shutdownSubagentRegistry,
} from "./registry.js";
import { resetLanesForTests } from "./command-queue.js";
// Note: These tests exercise the registry's in-memory state management.
// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent).
/** Wait for the command queue to process enqueued tasks. */
const flushQueue = () => new Promise<void>((r) => setTimeout(r, 0));
beforeEach(() => {
resetSubagentRegistryForTests();
resetLanesForTests();
});
describe("subagent registry", () => {
it("registers a run and retrieves it by ID", () => {
it("registers a run and retrieves it by ID", async () => {
const record = registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
@ -32,7 +37,9 @@ describe("subagent registry", () => {
expect(record.label).toBe("Code Analysis");
expect(record.cleanup).toBe("delete"); // default
expect(record.createdAt).toBeGreaterThan(0);
expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent
await flushQueue();
expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent (async via queue)
const retrieved = getSubagentRun("run-1");
expect(retrieved).toBe(record);
@ -101,7 +108,7 @@ describe("subagent registry", () => {
expect(record.cleanup).toBe("keep");
});
it("registers a run and ends it with error when Hub is not available", () => {
it("registers a run and ends it with error when Hub is not available", async () => {
// Without Hub initialized, watchChildAgent detects missing Hub
// and immediately ends the run with an error
registerSubagentRun({
@ -111,6 +118,8 @@ describe("subagent registry", () => {
task: "Running task",
});
await flushQueue();
const record = getSubagentRun("run-no-hub");
expect(record?.startedAt).toBeGreaterThan(0);
expect(record?.endedAt).toBeGreaterThan(0);
@ -118,7 +127,7 @@ describe("subagent registry", () => {
expect(record?.outcome?.error).toContain("Hub not initialized");
});
it("shutdownSubagentRegistry marks unfinished runs as ended", () => {
it("shutdownSubagentRegistry marks unfinished runs as ended", async () => {
// Directly set up a record without going through watchChildAgent
// to simulate a run that is still active
registerSubagentRun({
@ -128,6 +137,8 @@ describe("subagent registry", () => {
task: "Running task",
});
await flushQueue();
// The above run already ended due to no Hub; reset its endedAt
// to simulate a truly active run
const record = getSubagentRun("run-active");
@ -164,7 +175,7 @@ describe("subagent registry — coalescing", () => {
// Without Hub, watchChildAgent ends runs immediately with "Hub not initialized".
// This allows us to test the coalescing state transitions.
it("captures findings when a run completes (no Hub)", () => {
it("captures findings when a run completes (no Hub)", async () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
@ -172,13 +183,15 @@ describe("subagent registry — coalescing", () => {
task: "Task 1",
});
await flushQueue();
const record = getSubagentRun("run-1");
// Run ended immediately due to no Hub
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.findingsCaptured).toBe(true);
});
it("does not announce while sibling runs are still pending", () => {
it("does not announce while sibling runs are still pending", async () => {
// Register first run — ends immediately (no Hub)
registerSubagentRun({
runId: "run-1",
@ -187,6 +200,8 @@ describe("subagent registry — coalescing", () => {
task: "Task 1",
});
await flushQueue();
const record1 = getSubagentRun("run-1");
expect(record1?.findingsCaptured).toBe(true);
@ -198,6 +213,8 @@ describe("subagent registry — coalescing", () => {
task: "Task 2",
});
await flushQueue();
const record2 = getSubagentRun("run-2");
expect(record2?.findingsCaptured).toBe(true);
@ -208,7 +225,7 @@ describe("subagent registry — coalescing", () => {
expect(record2?.announced).toBeUndefined();
});
it("single run captures findings immediately", () => {
it("single run captures findings immediately", async () => {
registerSubagentRun({
runId: "run-solo",
childSessionId: "child-solo",
@ -216,6 +233,8 @@ describe("subagent registry — coalescing", () => {
task: "Solo task",
});
await flushQueue();
const record = getSubagentRun("run-solo");
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.findingsCaptured).toBe(true);
@ -223,7 +242,7 @@ describe("subagent registry — coalescing", () => {
expect(record?.outcome?.error).toContain("Hub not initialized");
});
it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", () => {
it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", async () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
@ -231,6 +250,8 @@ describe("subagent registry — coalescing", () => {
task: "Task",
});
await flushQueue();
const record = getSubagentRun("run-1");
if (record) {
// Simulate: run ended but findings not yet captured
@ -265,6 +286,8 @@ describe("subagent registry — post-announce cleanup", () => {
task: "Task B",
});
await flushQueue();
// Both runs should have been announced and removed from registry
expect(spy).toHaveBeenCalled();
expect(getSubagentRun("run-a")).toBeUndefined();

View file

@ -14,6 +14,8 @@ import type {
} from "./types.js";
import { resolveSessionDir } from "../session/storage.js";
import { rmSync } from "node:fs";
import { enqueueInLane, setLaneConcurrency } from "./command-queue.js";
import { SubagentLane, DEFAULT_SUBAGENT_MAX_CONCURRENT, resolveSubagentTimeoutMs } from "./lanes.js";
/** Default archive retention: 60 minutes after completion */
const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000;
@ -35,6 +37,8 @@ const resumedRequesters = new Set<string>();
/** Initialize registry from persisted state. Call once at startup. */
export function initSubagentRegistry(): void {
setLaneConcurrency(SubagentLane.Subagent, DEFAULT_SUBAGENT_MAX_CONCURRENT);
const persisted = loadSubagentRuns();
for (const [runId, record] of persisted) {
subagentRuns.set(runId, record);
@ -97,6 +101,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
label,
cleanup = "delete",
timeoutSeconds,
start,
} = params;
const record: SubagentRunRecord = {
@ -113,8 +118,12 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
persist();
startSweeper();
// Start watching the child agent for completion
watchChildAgent(record, timeoutSeconds);
// Enqueue in the subagent lane — the start callback and watchChildAgent
// only execute once a concurrency slot is available.
void enqueueInLane(SubagentLane.Subagent, async () => {
start?.();
return watchChildAgent(record, timeoutSeconds);
});
return record;
}
@ -185,26 +194,33 @@ export function resetSubagentRegistryForTests(): void {
// Lifecycle watching
// ============================================================================
function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): void {
/**
* Watch a child agent for completion.
* Returns a promise that resolves when the child finishes (or errors/times out),
* keeping the command-queue lane slot occupied until then.
*/
function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Promise<void> {
const { childSessionId } = record;
// Mark as started
record.startedAt = Date.now();
persist();
const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => {
if (record.endedAt) return; // Already finalized
if (timeoutTimer) clearTimeout(timeoutTimer);
record.endedAt = Date.now();
record.outcome = outcome;
persist();
handleRunCompletion(record);
};
const timeoutMs = resolveSubagentTimeoutMs(timeoutSeconds);
// Set up timeout if specified
let timeoutTimer: ReturnType<typeof setTimeout> | undefined;
if (timeoutSeconds && timeoutSeconds > 0) {
timeoutTimer = setTimeout(() => {
return new Promise<void>((resolveSlot) => {
const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => {
if (record.endedAt) return; // Already finalized
if (timeoutTimer) clearTimeout(timeoutTimer);
record.endedAt = Date.now();
record.outcome = outcome;
persist();
handleRunCompletion(record);
resolveSlot(); // Release the queue slot
};
// Always set a timeout (default 10 min, 0 = ~24 days via resolveSubagentTimeoutMs)
const timeoutTimer = setTimeout(() => {
cleanup({ status: "timeout" });
// Try to close the child agent
@ -214,36 +230,36 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): vo
} catch {
// Hub may not be available
}
}, timeoutSeconds * 1000);
}
}, timeoutMs);
// Get child agent reference (Hub may not be available in tests)
if (!isHubInitialized()) {
cleanup({ status: "error", error: "Hub not initialized" });
return;
}
// Get child agent reference (Hub may not be available in tests)
if (!isHubInitialized()) {
cleanup({ status: "error", error: "Hub not initialized" });
return;
}
const hub = getHub();
const childAgent = hub.getAgent(childSessionId);
if (!childAgent) {
cleanup({ status: "error", error: "Child agent not found" });
return;
}
const hub = getHub();
const childAgent = hub.getAgent(childSessionId);
if (!childAgent) {
cleanup({ status: "error", error: "Child agent not found" });
return;
}
// Wait for the child agent's task queue to drain (task completion),
// then trigger announce flow. Uses waitForIdle() instead of consuming
// the stream (which would conflict with Hub.consumeAgent).
childAgent.waitForIdle().then(
() => cleanup({ status: "ok" }),
(err) => cleanup({
status: "error",
error: err instanceof Error ? err.message : String(err),
}),
);
// Wait for the child agent's task queue to drain (task completion),
// then trigger announce flow. Uses waitForIdle() instead of consuming
// the stream (which would conflict with Hub.consumeAgent).
childAgent.waitForIdle().then(
() => cleanup({ status: "ok" }),
(err) => cleanup({
status: "error",
error: err instanceof Error ? err.message : String(err),
}),
);
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
childAgent.onClose(() => {
cleanup({ status: record.outcome?.status ?? "unknown" });
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
childAgent.onClose(() => {
cleanup({ status: record.outcome?.status ?? "unknown" });
});
});
}

View file

@ -56,6 +56,8 @@ export type RegisterSubagentRunParams = {
label?: string | undefined;
cleanup?: "delete" | "keep" | undefined;
timeoutSeconds?: number | undefined;
/** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */
start?: (() => void) | undefined;
};
/** Parameters for the announce flow */

View file

@ -28,8 +28,11 @@ const SessionsSpawnSchema = Type.Object({
),
timeoutSeconds: Type.Optional(
Type.Number({
description: "Execution timeout in seconds. The subagent will be terminated if it exceeds this.",
minimum: 1,
description:
"Execution timeout in seconds. Default: 600 (10 min). " +
"Set to 0 for no timeout (useful for complex, long-running tasks). " +
"The subagent will be terminated if it exceeds this limit.",
minimum: 0,
}),
),
});
@ -106,11 +109,9 @@ export function createSessionsSpawnTool(
model,
});
// Write the task to the child (non-blocking) before registering,
// so waitForIdle() observes the queued work.
childAgent.write(task);
// Register the run for lifecycle tracking
// Register the run for lifecycle tracking.
// The write is deferred via the start callback so the child only
// begins work once a concurrency slot is available in the queue.
registerSubagentRun({
runId,
childSessionId,
@ -119,6 +120,7 @@ export function createSessionsSpawnTool(
label,
cleanup,
timeoutSeconds,
start: () => childAgent.write(task),
});
return {