feat(subagent): add lane-based command queue with concurrency control
Introduces a command queue system adapted from OpenClaw to prevent unbounded sub-agent spawning. Default max concurrency: 10. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9d719c66af
commit
240fdd1286
3 changed files with 312 additions and 0 deletions
117
packages/core/src/agent/subagent/command-queue.test.ts
Normal file
117
packages/core/src/agent/subagent/command-queue.test.ts
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
enqueueInLane,
|
||||
getLaneSize,
|
||||
clearLane,
|
||||
setLaneConcurrency,
|
||||
resetLanesForTests,
|
||||
} from "./command-queue.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetLanesForTests();
|
||||
});
|
||||
|
||||
describe("command queue", () => {
|
||||
it("runs tasks serially by default (maxConcurrent = 1)", async () => {
|
||||
let active = 0;
|
||||
let maxActive = 0;
|
||||
const order: number[] = [];
|
||||
|
||||
const makeTask = (id: number) => async () => {
|
||||
active += 1;
|
||||
maxActive = Math.max(maxActive, active);
|
||||
order.push(id);
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
active -= 1;
|
||||
return id;
|
||||
};
|
||||
|
||||
const results = await Promise.all([
|
||||
enqueueInLane("test", makeTask(1)),
|
||||
enqueueInLane("test", makeTask(2)),
|
||||
enqueueInLane("test", makeTask(3)),
|
||||
]);
|
||||
|
||||
expect(results).toEqual([1, 2, 3]);
|
||||
expect(order).toEqual([1, 2, 3]);
|
||||
expect(maxActive).toBe(1);
|
||||
});
|
||||
|
||||
it("respects maxConcurrent limit", async () => {
|
||||
setLaneConcurrency("test", 3);
|
||||
|
||||
let active = 0;
|
||||
let maxActive = 0;
|
||||
|
||||
const makeTask = (id: number) => async () => {
|
||||
active += 1;
|
||||
maxActive = Math.max(maxActive, active);
|
||||
await new Promise((r) => setTimeout(r, 20));
|
||||
active -= 1;
|
||||
return id;
|
||||
};
|
||||
|
||||
const results = await Promise.all([
|
||||
enqueueInLane("test", makeTask(1)),
|
||||
enqueueInLane("test", makeTask(2)),
|
||||
enqueueInLane("test", makeTask(3)),
|
||||
enqueueInLane("test", makeTask(4)),
|
||||
enqueueInLane("test", makeTask(5)),
|
||||
]);
|
||||
|
||||
expect(results).toEqual([1, 2, 3, 4, 5]);
|
||||
expect(maxActive).toBe(3);
|
||||
});
|
||||
|
||||
it("reports correct lane size", async () => {
|
||||
setLaneConcurrency("test", 1);
|
||||
|
||||
let resolveFirst!: () => void;
|
||||
const blocker = new Promise<void>((r) => {
|
||||
resolveFirst = r;
|
||||
});
|
||||
|
||||
// First task blocks the lane
|
||||
const p1 = enqueueInLane("test", () => blocker);
|
||||
// Second task queued
|
||||
const p2 = enqueueInLane("test", async () => "done");
|
||||
|
||||
// 1 active + 1 queued = 2
|
||||
expect(getLaneSize("test")).toBe(2);
|
||||
|
||||
resolveFirst();
|
||||
await Promise.all([p1, p2]);
|
||||
|
||||
expect(getLaneSize("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("clears pending tasks", async () => {
|
||||
setLaneConcurrency("test", 1);
|
||||
|
||||
let resolveFirst!: () => void;
|
||||
const blocker = new Promise<void>((r) => {
|
||||
resolveFirst = r;
|
||||
});
|
||||
|
||||
const p1 = enqueueInLane("test", () => blocker);
|
||||
const p2 = enqueueInLane("test", async () => "should-not-run");
|
||||
const p3 = enqueueInLane("test", async () => "should-not-run");
|
||||
|
||||
const removed = clearLane("test");
|
||||
expect(removed).toBe(2);
|
||||
|
||||
resolveFirst();
|
||||
await p1;
|
||||
|
||||
// p2 and p3 should reject
|
||||
await expect(p2).rejects.toThrow("Lane cleared");
|
||||
await expect(p3).rejects.toThrow("Lane cleared");
|
||||
|
||||
expect(getLaneSize("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("returns 0 for unknown lane", () => {
|
||||
expect(getLaneSize("nonexistent")).toBe(0);
|
||||
expect(clearLane("nonexistent")).toBe(0);
|
||||
});
|
||||
});
|
||||
158
packages/core/src/agent/subagent/command-queue.ts
Normal file
158
packages/core/src/agent/subagent/command-queue.ts
Normal file
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Lane-based command queue for subagent concurrency control.
|
||||
*
|
||||
* Each lane maintains an independent queue with a configurable max-concurrency
|
||||
* limit. Tasks beyond the limit are queued and executed FIFO as slots free up.
|
||||
*
|
||||
* Adapted from OpenClaw's process/command-queue.ts.
|
||||
*/
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type QueueEntry = {
|
||||
task: () => Promise<unknown>;
|
||||
resolve: (value: unknown) => void;
|
||||
reject: (reason?: unknown) => void;
|
||||
enqueuedAt: number;
|
||||
warnAfterMs: number;
|
||||
onWait?: (waitMs: number, queuedAhead: number) => void;
|
||||
};
|
||||
|
||||
type LaneState = {
|
||||
lane: string;
|
||||
queue: QueueEntry[];
|
||||
active: number;
|
||||
maxConcurrent: number;
|
||||
draining: boolean;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Module state
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const lanes = new Map<string, LaneState>();
|
||||
|
||||
function getLaneState(lane: string): LaneState {
|
||||
const existing = lanes.get(lane);
|
||||
if (existing) return existing;
|
||||
|
||||
const created: LaneState = {
|
||||
lane,
|
||||
queue: [],
|
||||
active: 0,
|
||||
maxConcurrent: 1,
|
||||
draining: false,
|
||||
};
|
||||
lanes.set(lane, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Drain / pump
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function drainLane(lane: string): void {
|
||||
const state = getLaneState(lane);
|
||||
if (state.draining) return;
|
||||
state.draining = true;
|
||||
|
||||
const pump = () => {
|
||||
while (state.active < state.maxConcurrent && state.queue.length > 0) {
|
||||
const entry = state.queue.shift() as QueueEntry;
|
||||
const waitedMs = Date.now() - entry.enqueuedAt;
|
||||
|
||||
if (waitedMs >= entry.warnAfterMs) {
|
||||
entry.onWait?.(waitedMs, state.queue.length);
|
||||
console.warn(
|
||||
`[CommandQueue] lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
|
||||
);
|
||||
}
|
||||
|
||||
state.active += 1;
|
||||
|
||||
void (async () => {
|
||||
try {
|
||||
const result = await entry.task();
|
||||
state.active -= 1;
|
||||
pump();
|
||||
entry.resolve(result);
|
||||
} catch (err) {
|
||||
state.active -= 1;
|
||||
pump();
|
||||
entry.reject(err);
|
||||
}
|
||||
})();
|
||||
}
|
||||
state.draining = false;
|
||||
};
|
||||
|
||||
pump();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public API
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Set (or update) the max concurrency for a lane. Triggers a drain. */
|
||||
export function setLaneConcurrency(lane: string, maxConcurrent: number): void {
|
||||
const state = getLaneState(lane);
|
||||
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
|
||||
drainLane(lane);
|
||||
}
|
||||
|
||||
/** Enqueue a task in a specific lane. Returns a promise that resolves with the task's return value. */
|
||||
export function enqueueInLane<T>(
|
||||
lane: string,
|
||||
task: () => Promise<T>,
|
||||
opts?: {
|
||||
warnAfterMs?: number;
|
||||
onWait?: (waitMs: number, queuedAhead: number) => void;
|
||||
},
|
||||
): Promise<T> {
|
||||
const warnAfterMs = opts?.warnAfterMs ?? 5_000;
|
||||
const state = getLaneState(lane);
|
||||
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
state.queue.push({
|
||||
task: () => task(),
|
||||
resolve: (value) => resolve(value as T),
|
||||
reject,
|
||||
enqueuedAt: Date.now(),
|
||||
warnAfterMs,
|
||||
onWait: opts?.onWait,
|
||||
});
|
||||
drainLane(lane);
|
||||
});
|
||||
}
|
||||
|
||||
/** Number of active + queued tasks in a lane. */
|
||||
export function getLaneSize(lane: string): number {
|
||||
const state = lanes.get(lane);
|
||||
if (!state) return 0;
|
||||
return state.queue.length + state.active;
|
||||
}
|
||||
|
||||
/** Remove all pending (not yet active) tasks from a lane. Returns how many were removed. */
|
||||
export function clearLane(lane: string): number {
|
||||
const state = lanes.get(lane);
|
||||
if (!state) return 0;
|
||||
const removed = state.queue.length;
|
||||
// Reject pending tasks so callers aren't left hanging
|
||||
for (const entry of state.queue) {
|
||||
entry.reject(new Error("Lane cleared"));
|
||||
}
|
||||
state.queue.length = 0;
|
||||
return removed;
|
||||
}
|
||||
|
||||
/** Reset all lanes (for testing). */
|
||||
export function resetLanesForTests(): void {
|
||||
for (const state of lanes.values()) {
|
||||
for (const entry of state.queue) {
|
||||
entry.reject(new Error("Reset"));
|
||||
}
|
||||
}
|
||||
lanes.clear();
|
||||
}
|
||||
37
packages/core/src/agent/subagent/lanes.ts
Normal file
37
packages/core/src/agent/subagent/lanes.ts
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
/** Named lanes for the subagent command queue. */
|
||||
export const SubagentLane = {
|
||||
Subagent: "subagent",
|
||||
} as const;
|
||||
|
||||
/** Default maximum concurrent subagent runs. */
|
||||
export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 10;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Timeout defaults
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Default subagent timeout: 10 minutes. */
|
||||
export const DEFAULT_SUBAGENT_TIMEOUT_SECONDS = 600;
|
||||
|
||||
/** Maximum safe value for setTimeout (~24.8 days). */
|
||||
const MAX_SAFE_TIMEOUT_MS = 2_147_000_000;
|
||||
|
||||
/**
|
||||
* Resolve the effective timeout in milliseconds for a subagent run.
|
||||
*
|
||||
* - `undefined` / negative → default (600 s)
|
||||
* - `0` → no timeout (MAX_SAFE_TIMEOUT_MS)
|
||||
* - positive number → use as-is, clamped to safe range
|
||||
*/
|
||||
export function resolveSubagentTimeoutMs(overrideSeconds?: number): number {
|
||||
if (overrideSeconds === undefined || overrideSeconds === null) {
|
||||
return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000;
|
||||
}
|
||||
if (overrideSeconds === 0) {
|
||||
return MAX_SAFE_TIMEOUT_MS; // "no timeout"
|
||||
}
|
||||
if (overrideSeconds < 0) {
|
||||
return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000;
|
||||
}
|
||||
return Math.min(Math.floor(overrideSeconds) * 1000, MAX_SAFE_TIMEOUT_MS);
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue