diff --git a/packages/core/src/agent/subagent/command-queue.test.ts b/packages/core/src/agent/subagent/command-queue.test.ts new file mode 100644 index 00000000..a9bd6966 --- /dev/null +++ b/packages/core/src/agent/subagent/command-queue.test.ts @@ -0,0 +1,117 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + enqueueInLane, + getLaneSize, + clearLane, + setLaneConcurrency, + resetLanesForTests, +} from "./command-queue.js"; + +afterEach(() => { + resetLanesForTests(); +}); + +describe("command queue", () => { + it("runs tasks serially by default (maxConcurrent = 1)", async () => { + let active = 0; + let maxActive = 0; + const order: number[] = []; + + const makeTask = (id: number) => async () => { + active += 1; + maxActive = Math.max(maxActive, active); + order.push(id); + await new Promise((r) => setTimeout(r, 10)); + active -= 1; + return id; + }; + + const results = await Promise.all([ + enqueueInLane("test", makeTask(1)), + enqueueInLane("test", makeTask(2)), + enqueueInLane("test", makeTask(3)), + ]); + + expect(results).toEqual([1, 2, 3]); + expect(order).toEqual([1, 2, 3]); + expect(maxActive).toBe(1); + }); + + it("respects maxConcurrent limit", async () => { + setLaneConcurrency("test", 3); + + let active = 0; + let maxActive = 0; + + const makeTask = (id: number) => async () => { + active += 1; + maxActive = Math.max(maxActive, active); + await new Promise((r) => setTimeout(r, 20)); + active -= 1; + return id; + }; + + const results = await Promise.all([ + enqueueInLane("test", makeTask(1)), + enqueueInLane("test", makeTask(2)), + enqueueInLane("test", makeTask(3)), + enqueueInLane("test", makeTask(4)), + enqueueInLane("test", makeTask(5)), + ]); + + expect(results).toEqual([1, 2, 3, 4, 5]); + expect(maxActive).toBe(3); + }); + + it("reports correct lane size", async () => { + setLaneConcurrency("test", 1); + + let resolveFirst!: () => void; + const blocker = new Promise((r) => { + resolveFirst = r; + }); + + // First task blocks the lane + const p1 = enqueueInLane("test", () => blocker); + // Second task queued + const p2 = enqueueInLane("test", async () => "done"); + + // 1 active + 1 queued = 2 + expect(getLaneSize("test")).toBe(2); + + resolveFirst(); + await Promise.all([p1, p2]); + + expect(getLaneSize("test")).toBe(0); + }); + + it("clears pending tasks", async () => { + setLaneConcurrency("test", 1); + + let resolveFirst!: () => void; + const blocker = new Promise((r) => { + resolveFirst = r; + }); + + const p1 = enqueueInLane("test", () => blocker); + const p2 = enqueueInLane("test", async () => "should-not-run"); + const p3 = enqueueInLane("test", async () => "should-not-run"); + + const removed = clearLane("test"); + expect(removed).toBe(2); + + resolveFirst(); + await p1; + + // p2 and p3 should reject + await expect(p2).rejects.toThrow("Lane cleared"); + await expect(p3).rejects.toThrow("Lane cleared"); + + expect(getLaneSize("test")).toBe(0); + }); + + it("returns 0 for unknown lane", () => { + expect(getLaneSize("nonexistent")).toBe(0); + expect(clearLane("nonexistent")).toBe(0); + }); +}); diff --git a/packages/core/src/agent/subagent/command-queue.ts b/packages/core/src/agent/subagent/command-queue.ts new file mode 100644 index 00000000..470378d2 --- /dev/null +++ b/packages/core/src/agent/subagent/command-queue.ts @@ -0,0 +1,158 @@ +/** + * Lane-based command queue for subagent concurrency control. + * + * Each lane maintains an independent queue with a configurable max-concurrency + * limit. Tasks beyond the limit are queued and executed FIFO as slots free up. + * + * Adapted from OpenClaw's process/command-queue.ts. + */ + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +type QueueEntry = { + task: () => Promise; + resolve: (value: unknown) => void; + reject: (reason?: unknown) => void; + enqueuedAt: number; + warnAfterMs: number; + onWait?: (waitMs: number, queuedAhead: number) => void; +}; + +type LaneState = { + lane: string; + queue: QueueEntry[]; + active: number; + maxConcurrent: number; + draining: boolean; +}; + +// --------------------------------------------------------------------------- +// Module state +// --------------------------------------------------------------------------- + +const lanes = new Map(); + +function getLaneState(lane: string): LaneState { + const existing = lanes.get(lane); + if (existing) return existing; + + const created: LaneState = { + lane, + queue: [], + active: 0, + maxConcurrent: 1, + draining: false, + }; + lanes.set(lane, created); + return created; +} + +// --------------------------------------------------------------------------- +// Drain / pump +// --------------------------------------------------------------------------- + +function drainLane(lane: string): void { + const state = getLaneState(lane); + if (state.draining) return; + state.draining = true; + + const pump = () => { + while (state.active < state.maxConcurrent && state.queue.length > 0) { + const entry = state.queue.shift() as QueueEntry; + const waitedMs = Date.now() - entry.enqueuedAt; + + if (waitedMs >= entry.warnAfterMs) { + entry.onWait?.(waitedMs, state.queue.length); + console.warn( + `[CommandQueue] lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`, + ); + } + + state.active += 1; + + void (async () => { + try { + const result = await entry.task(); + state.active -= 1; + pump(); + entry.resolve(result); + } catch (err) { + state.active -= 1; + pump(); + entry.reject(err); + } + })(); + } + state.draining = false; + }; + + pump(); +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/** Set (or update) the max concurrency for a lane. Triggers a drain. */ +export function setLaneConcurrency(lane: string, maxConcurrent: number): void { + const state = getLaneState(lane); + state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent)); + drainLane(lane); +} + +/** Enqueue a task in a specific lane. Returns a promise that resolves with the task's return value. */ +export function enqueueInLane( + lane: string, + task: () => Promise, + opts?: { + warnAfterMs?: number; + onWait?: (waitMs: number, queuedAhead: number) => void; + }, +): Promise { + const warnAfterMs = opts?.warnAfterMs ?? 5_000; + const state = getLaneState(lane); + + return new Promise((resolve, reject) => { + state.queue.push({ + task: () => task(), + resolve: (value) => resolve(value as T), + reject, + enqueuedAt: Date.now(), + warnAfterMs, + onWait: opts?.onWait, + }); + drainLane(lane); + }); +} + +/** Number of active + queued tasks in a lane. */ +export function getLaneSize(lane: string): number { + const state = lanes.get(lane); + if (!state) return 0; + return state.queue.length + state.active; +} + +/** Remove all pending (not yet active) tasks from a lane. Returns how many were removed. */ +export function clearLane(lane: string): number { + const state = lanes.get(lane); + if (!state) return 0; + const removed = state.queue.length; + // Reject pending tasks so callers aren't left hanging + for (const entry of state.queue) { + entry.reject(new Error("Lane cleared")); + } + state.queue.length = 0; + return removed; +} + +/** Reset all lanes (for testing). */ +export function resetLanesForTests(): void { + for (const state of lanes.values()) { + for (const entry of state.queue) { + entry.reject(new Error("Reset")); + } + } + lanes.clear(); +} diff --git a/packages/core/src/agent/subagent/lanes.ts b/packages/core/src/agent/subagent/lanes.ts new file mode 100644 index 00000000..88649722 --- /dev/null +++ b/packages/core/src/agent/subagent/lanes.ts @@ -0,0 +1,37 @@ +/** Named lanes for the subagent command queue. */ +export const SubagentLane = { + Subagent: "subagent", +} as const; + +/** Default maximum concurrent subagent runs. */ +export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 10; + +// --------------------------------------------------------------------------- +// Timeout defaults +// --------------------------------------------------------------------------- + +/** Default subagent timeout: 10 minutes. */ +export const DEFAULT_SUBAGENT_TIMEOUT_SECONDS = 600; + +/** Maximum safe value for setTimeout (~24.8 days). */ +const MAX_SAFE_TIMEOUT_MS = 2_147_000_000; + +/** + * Resolve the effective timeout in milliseconds for a subagent run. + * + * - `undefined` / negative → default (600 s) + * - `0` → no timeout (MAX_SAFE_TIMEOUT_MS) + * - positive number → use as-is, clamped to safe range + */ +export function resolveSubagentTimeoutMs(overrideSeconds?: number): number { + if (overrideSeconds === undefined || overrideSeconds === null) { + return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000; + } + if (overrideSeconds === 0) { + return MAX_SAFE_TIMEOUT_MS; // "no timeout" + } + if (overrideSeconds < 0) { + return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000; + } + return Math.min(Math.floor(overrideSeconds) * 1000, MAX_SAFE_TIMEOUT_MS); +}