diff --git a/packages/core/src/agent/subagent/registry.test.ts b/packages/core/src/agent/subagent/registry.test.ts index 2169564b..5d5eacc3 100644 --- a/packages/core/src/agent/subagent/registry.test.ts +++ b/packages/core/src/agent/subagent/registry.test.ts @@ -7,16 +7,21 @@ import { resetSubagentRegistryForTests, shutdownSubagentRegistry, } from "./registry.js"; +import { resetLanesForTests } from "./command-queue.js"; // Note: These tests exercise the registry's in-memory state management. // They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent). +/** Wait for the command queue to process enqueued tasks. */ +const flushQueue = () => new Promise((r) => setTimeout(r, 0)); + beforeEach(() => { resetSubagentRegistryForTests(); + resetLanesForTests(); }); describe("subagent registry", () => { - it("registers a run and retrieves it by ID", () => { + it("registers a run and retrieves it by ID", async () => { const record = registerSubagentRun({ runId: "run-1", childSessionId: "child-1", @@ -32,7 +37,9 @@ describe("subagent registry", () => { expect(record.label).toBe("Code Analysis"); expect(record.cleanup).toBe("delete"); // default expect(record.createdAt).toBeGreaterThan(0); - expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent + + await flushQueue(); + expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent (async via queue) const retrieved = getSubagentRun("run-1"); expect(retrieved).toBe(record); @@ -101,7 +108,7 @@ describe("subagent registry", () => { expect(record.cleanup).toBe("keep"); }); - it("registers a run and ends it with error when Hub is not available", () => { + it("registers a run and ends it with error when Hub is not available", async () => { // Without Hub initialized, watchChildAgent detects missing Hub // and immediately ends the run with an error registerSubagentRun({ @@ -111,6 +118,8 @@ describe("subagent registry", () => { task: "Running task", }); + await flushQueue(); + const record = getSubagentRun("run-no-hub"); expect(record?.startedAt).toBeGreaterThan(0); expect(record?.endedAt).toBeGreaterThan(0); @@ -118,7 +127,7 @@ describe("subagent registry", () => { expect(record?.outcome?.error).toContain("Hub not initialized"); }); - it("shutdownSubagentRegistry marks unfinished runs as ended", () => { + it("shutdownSubagentRegistry marks unfinished runs as ended", async () => { // Directly set up a record without going through watchChildAgent // to simulate a run that is still active registerSubagentRun({ @@ -128,6 +137,8 @@ describe("subagent registry", () => { task: "Running task", }); + await flushQueue(); + // The above run already ended due to no Hub; reset its endedAt // to simulate a truly active run const record = getSubagentRun("run-active"); @@ -164,7 +175,7 @@ describe("subagent registry — coalescing", () => { // Without Hub, watchChildAgent ends runs immediately with "Hub not initialized". // This allows us to test the coalescing state transitions. - it("captures findings when a run completes (no Hub)", () => { + it("captures findings when a run completes (no Hub)", async () => { registerSubagentRun({ runId: "run-1", childSessionId: "child-1", @@ -172,13 +183,15 @@ describe("subagent registry — coalescing", () => { task: "Task 1", }); + await flushQueue(); + const record = getSubagentRun("run-1"); // Run ended immediately due to no Hub expect(record?.endedAt).toBeGreaterThan(0); expect(record?.findingsCaptured).toBe(true); }); - it("does not announce while sibling runs are still pending", () => { + it("does not announce while sibling runs are still pending", async () => { // Register first run — ends immediately (no Hub) registerSubagentRun({ runId: "run-1", @@ -187,6 +200,8 @@ describe("subagent registry — coalescing", () => { task: "Task 1", }); + await flushQueue(); + const record1 = getSubagentRun("run-1"); expect(record1?.findingsCaptured).toBe(true); @@ -198,6 +213,8 @@ describe("subagent registry — coalescing", () => { task: "Task 2", }); + await flushQueue(); + const record2 = getSubagentRun("run-2"); expect(record2?.findingsCaptured).toBe(true); @@ -208,7 +225,7 @@ describe("subagent registry — coalescing", () => { expect(record2?.announced).toBeUndefined(); }); - it("single run captures findings immediately", () => { + it("single run captures findings immediately", async () => { registerSubagentRun({ runId: "run-solo", childSessionId: "child-solo", @@ -216,6 +233,8 @@ describe("subagent registry — coalescing", () => { task: "Solo task", }); + await flushQueue(); + const record = getSubagentRun("run-solo"); expect(record?.endedAt).toBeGreaterThan(0); expect(record?.findingsCaptured).toBe(true); @@ -223,7 +242,7 @@ describe("subagent registry — coalescing", () => { expect(record?.outcome?.error).toContain("Hub not initialized"); }); - it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", () => { + it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", async () => { registerSubagentRun({ runId: "run-1", childSessionId: "child-1", @@ -231,6 +250,8 @@ describe("subagent registry — coalescing", () => { task: "Task", }); + await flushQueue(); + const record = getSubagentRun("run-1"); if (record) { // Simulate: run ended but findings not yet captured @@ -265,6 +286,8 @@ describe("subagent registry — post-announce cleanup", () => { task: "Task B", }); + await flushQueue(); + // Both runs should have been announced and removed from registry expect(spy).toHaveBeenCalled(); expect(getSubagentRun("run-a")).toBeUndefined(); diff --git a/packages/core/src/agent/subagent/registry.ts b/packages/core/src/agent/subagent/registry.ts index 0f03002a..f40a72c0 100644 --- a/packages/core/src/agent/subagent/registry.ts +++ b/packages/core/src/agent/subagent/registry.ts @@ -14,6 +14,8 @@ import type { } from "./types.js"; import { resolveSessionDir } from "../session/storage.js"; import { rmSync } from "node:fs"; +import { enqueueInLane, setLaneConcurrency } from "./command-queue.js"; +import { SubagentLane, DEFAULT_SUBAGENT_MAX_CONCURRENT, resolveSubagentTimeoutMs } from "./lanes.js"; /** Default archive retention: 60 minutes after completion */ const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000; @@ -35,6 +37,8 @@ const resumedRequesters = new Set(); /** Initialize registry from persisted state. Call once at startup. */ export function initSubagentRegistry(): void { + setLaneConcurrency(SubagentLane.Subagent, DEFAULT_SUBAGENT_MAX_CONCURRENT); + const persisted = loadSubagentRuns(); for (const [runId, record] of persisted) { subagentRuns.set(runId, record); @@ -97,6 +101,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent label, cleanup = "delete", timeoutSeconds, + start, } = params; const record: SubagentRunRecord = { @@ -113,8 +118,12 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent persist(); startSweeper(); - // Start watching the child agent for completion - watchChildAgent(record, timeoutSeconds); + // Enqueue in the subagent lane — the start callback and watchChildAgent + // only execute once a concurrency slot is available. + void enqueueInLane(SubagentLane.Subagent, async () => { + start?.(); + return watchChildAgent(record, timeoutSeconds); + }); return record; } @@ -185,26 +194,33 @@ export function resetSubagentRegistryForTests(): void { // Lifecycle watching // ============================================================================ -function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): void { +/** + * Watch a child agent for completion. + * Returns a promise that resolves when the child finishes (or errors/times out), + * keeping the command-queue lane slot occupied until then. + */ +function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Promise { const { childSessionId } = record; // Mark as started record.startedAt = Date.now(); persist(); - const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => { - if (record.endedAt) return; // Already finalized - if (timeoutTimer) clearTimeout(timeoutTimer); - record.endedAt = Date.now(); - record.outcome = outcome; - persist(); - handleRunCompletion(record); - }; + const timeoutMs = resolveSubagentTimeoutMs(timeoutSeconds); - // Set up timeout if specified - let timeoutTimer: ReturnType | undefined; - if (timeoutSeconds && timeoutSeconds > 0) { - timeoutTimer = setTimeout(() => { + return new Promise((resolveSlot) => { + const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => { + if (record.endedAt) return; // Already finalized + if (timeoutTimer) clearTimeout(timeoutTimer); + record.endedAt = Date.now(); + record.outcome = outcome; + persist(); + handleRunCompletion(record); + resolveSlot(); // Release the queue slot + }; + + // Always set a timeout (default 10 min, 0 = ~24 days via resolveSubagentTimeoutMs) + const timeoutTimer = setTimeout(() => { cleanup({ status: "timeout" }); // Try to close the child agent @@ -214,36 +230,36 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): vo } catch { // Hub may not be available } - }, timeoutSeconds * 1000); - } + }, timeoutMs); - // Get child agent reference (Hub may not be available in tests) - if (!isHubInitialized()) { - cleanup({ status: "error", error: "Hub not initialized" }); - return; - } + // Get child agent reference (Hub may not be available in tests) + if (!isHubInitialized()) { + cleanup({ status: "error", error: "Hub not initialized" }); + return; + } - const hub = getHub(); - const childAgent = hub.getAgent(childSessionId); - if (!childAgent) { - cleanup({ status: "error", error: "Child agent not found" }); - return; - } + const hub = getHub(); + const childAgent = hub.getAgent(childSessionId); + if (!childAgent) { + cleanup({ status: "error", error: "Child agent not found" }); + return; + } - // Wait for the child agent's task queue to drain (task completion), - // then trigger announce flow. Uses waitForIdle() instead of consuming - // the stream (which would conflict with Hub.consumeAgent). - childAgent.waitForIdle().then( - () => cleanup({ status: "ok" }), - (err) => cleanup({ - status: "error", - error: err instanceof Error ? err.message : String(err), - }), - ); + // Wait for the child agent's task queue to drain (task completion), + // then trigger announce flow. Uses waitForIdle() instead of consuming + // the stream (which would conflict with Hub.consumeAgent). + childAgent.waitForIdle().then( + () => cleanup({ status: "ok" }), + (err) => cleanup({ + status: "error", + error: err instanceof Error ? err.message : String(err), + }), + ); - // Also handle explicit close (e.g., timeout kill, Hub shutdown) - childAgent.onClose(() => { - cleanup({ status: record.outcome?.status ?? "unknown" }); + // Also handle explicit close (e.g., timeout kill, Hub shutdown) + childAgent.onClose(() => { + cleanup({ status: record.outcome?.status ?? "unknown" }); + }); }); } diff --git a/packages/core/src/agent/subagent/types.ts b/packages/core/src/agent/subagent/types.ts index 8edfc0dc..1d3ca0ac 100644 --- a/packages/core/src/agent/subagent/types.ts +++ b/packages/core/src/agent/subagent/types.ts @@ -56,6 +56,8 @@ export type RegisterSubagentRunParams = { label?: string | undefined; cleanup?: "delete" | "keep" | undefined; timeoutSeconds?: number | undefined; + /** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */ + start?: (() => void) | undefined; }; /** Parameters for the announce flow */ diff --git a/packages/core/src/agent/tools/sessions-spawn.ts b/packages/core/src/agent/tools/sessions-spawn.ts index fc93199e..35c0017b 100644 --- a/packages/core/src/agent/tools/sessions-spawn.ts +++ b/packages/core/src/agent/tools/sessions-spawn.ts @@ -28,8 +28,11 @@ const SessionsSpawnSchema = Type.Object({ ), timeoutSeconds: Type.Optional( Type.Number({ - description: "Execution timeout in seconds. The subagent will be terminated if it exceeds this.", - minimum: 1, + description: + "Execution timeout in seconds. Default: 600 (10 min). " + + "Set to 0 for no timeout (useful for complex, long-running tasks). " + + "The subagent will be terminated if it exceeds this limit.", + minimum: 0, }), ), }); @@ -106,11 +109,9 @@ export function createSessionsSpawnTool( model, }); - // Write the task to the child (non-blocking) before registering, - // so waitForIdle() observes the queued work. - childAgent.write(task); - - // Register the run for lifecycle tracking + // Register the run for lifecycle tracking. + // The write is deferred via the start callback so the child only + // begins work once a concurrency slot is available in the queue. registerSubagentRun({ runId, childSessionId, @@ -119,6 +120,7 @@ export function createSessionsSpawnTool( label, cleanup, timeoutSeconds, + start: () => childAgent.write(task), }); return {