refactor(core): remove legacy subagent registry subsystem
This commit is contained in:
parent
292e2b9454
commit
909efb5dab
19 changed files with 3 additions and 3191 deletions
|
|
@ -13,18 +13,6 @@ export * from "./tools.js";
|
|||
export * from "./tools/policy.js";
|
||||
export * from "./tools/groups.js";
|
||||
export * from "./extract-text.js";
|
||||
// @deprecated — Old subagent registry. Use `delegate` tool instead.
|
||||
// Kept temporarily for desktop app compatibility.
|
||||
export {
|
||||
listSubagentRuns,
|
||||
getSubagentRun,
|
||||
getSubagentGroup,
|
||||
} from "./subagent/registry.js";
|
||||
export type {
|
||||
SubagentRunRecord,
|
||||
SubagentRunOutcome,
|
||||
SubagentGroup,
|
||||
} from "./subagent/types.js";
|
||||
|
||||
export {
|
||||
readClaudeCliCredentials,
|
||||
|
|
|
|||
|
|
@ -1,172 +0,0 @@
|
|||
# Subagent System
|
||||
|
||||
The subagent system allows a parent agent to spawn isolated child agents that run tasks in parallel and report results back automatically.
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Parent Agent (runner.ts) │
|
||||
│ │
|
||||
│ tools: sessions_spawn, sessions_list │
|
||||
│ state: resolvedProvider, toolsOptions │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ sessions_spawn(task, label, timeoutSeconds)
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Spawn Flow (sessions-spawn.ts) │
|
||||
│ │
|
||||
│ 1. Build subagent system prompt (announce.ts) │
|
||||
│ 2. hub.createSubagent(childSessionId, { provider, model }) │
|
||||
│ 3. registerSubagentRun({ start: () => childAgent.write(task) }) │
|
||||
│ 4. Return { status: "accepted", runId, childSessionId } │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Concurrency Queue (command-queue.ts) │
|
||||
│ │
|
||||
│ Lane: "subagent" — max 10 concurrent (configurable) │
|
||||
│ Queued runs wait for a slot before start() is called │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│ slot acquired
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Child Agent Execution │
|
||||
│ │
|
||||
│ ┌───────────────────────────────────────────────────────────────┐ │
|
||||
│ │ AsyncAgent (async-agent.ts) │ │
|
||||
│ │ - Isolated session with restricted tools (isSubagent=true) │ │
|
||||
│ │ - Inherits parent's LLM provider │ │
|
||||
│ │ - System prompt: task focus + error reporting rules │ │
|
||||
│ │ - Tracks lastRunError for error propagation │ │
|
||||
│ └───────────────────────────────────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌───────────────────────────────────────────────────────────────┐ │
|
||||
│ │ watchChildAgent (registry.ts) │ │
|
||||
│ │ - Sets startedAt, starts timeout timer │ │
|
||||
│ │ - waitForIdle() — waits for child's task queue to drain │ │
|
||||
│ │ - onClose() — handles explicit close (timeout kill, etc.) │ │
|
||||
│ └───────────────────────────────────────────────────────────────┘ │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ child completes / errors / times out
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Completion Handling (registry.ts) │
|
||||
│ │
|
||||
│ handleRunCompletion(record) │
|
||||
│ │ │
|
||||
│ ├─ Phase 1: captureFindings() │
|
||||
│ │ - Read last assistant reply from child session JSONL │
|
||||
│ │ - Falls back to last toolResult if no assistant text │
|
||||
│ │ - Persists findings to record before session deletion │
|
||||
│ │ │
|
||||
│ ├─ Session Cleanup │
|
||||
│ │ - cleanup="delete": rm child session dir + hub.closeAgent() │
|
||||
│ │ - cleanup="keep": preserve for audit │
|
||||
│ │ │
|
||||
│ └─ Phase 2: checkAndAnnounce(requesterSessionId) │
|
||||
│ - Finds all unannounced, completed runs with findings │
|
||||
│ - Calls runCoalescedAnnounceFlow() │
|
||||
│ - Marks records: announced=true, archiveAtMs=now+60min │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Announcement Delivery (announce.ts) │
|
||||
│ │
|
||||
│ runCoalescedAnnounceFlow(requesterSessionId, records) │
|
||||
│ │ │
|
||||
│ ├─ Format message: formatCoalescedAnnouncementMessage() │
|
||||
│ │ - Single record: task name, status, findings, stats │
|
||||
│ │ - Multiple records: combined report with all findings │
|
||||
│ │ │
|
||||
│ ├─ Two-tier delivery: │
|
||||
│ │ │
|
||||
│ │ Tier 1: BUSY (parent running or has pending writes) │
|
||||
│ │ └─ enqueueAnnounce() → announce-queue.ts │
|
||||
│ │ - Debounce 1s to batch nearby completions │
|
||||
│ │ - Drain via writeInternal() when parent finishes │
|
||||
│ │ │
|
||||
│ │ Tier 2: IDLE (parent not running) │
|
||||
│ │ └─ sendAnnounceDirect() │
|
||||
│ │ - writeInternal(msg, { forwardAssistant, persistResponse })│
|
||||
│ │ │
|
||||
│ └─ All delivery uses writeInternal() (marks as internal: true) │
|
||||
│ → Prevents announcement from showing as user bubble in UI │
|
||||
│ → LLM processes findings and responds naturally to user │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Record Lifecycle (registry.ts) │
|
||||
│ │
|
||||
│ created → startedAt → endedAt → findingsCaptured → announced │
|
||||
│ │
|
||||
│ After announcement: │
|
||||
│ - Record kept with archiveAtMs = now + 60 min │
|
||||
│ - sessions_list can still query records during this window │
|
||||
│ - Sweeper runs every 60s, removes expired records │
|
||||
│ - When all records removed, sweeper stops │
|
||||
└─────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Key Files
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `sessions-spawn.ts` | Tool: spawns a child agent with task, label, timeout, provider |
|
||||
| `sessions-list.ts` | Tool: lists subagent runs and their status |
|
||||
| `registry.ts` | Lifecycle management: register, watch, capture, announce, archive |
|
||||
| `announce.ts` | System prompt builder, findings reader, message formatter, delivery |
|
||||
| `announce-queue.ts` | Debounced queue for batching announcements when parent is busy |
|
||||
| `command-queue.ts` | Concurrency limiter for subagent lane slots |
|
||||
| `lanes.ts` | Lane config: max concurrency (10), default timeout (1800s) |
|
||||
| `types.ts` | Shared types: SubagentRunRecord, SubagentRunOutcome, etc. |
|
||||
| `registry-store.ts` | Persistence: save/load runs to disk for crash recovery |
|
||||
|
||||
## Provider Inheritance
|
||||
|
||||
Subagents inherit the parent's resolved LLM provider:
|
||||
|
||||
```
|
||||
runner.ts (resolvedProvider)
|
||||
→ toolsOptions.provider
|
||||
→ tools.ts (CreateToolsOptions.provider)
|
||||
→ sessions-spawn.ts (options.provider)
|
||||
→ hub.createSubagent({ provider })
|
||||
```
|
||||
|
||||
When the user switches providers via UI (`setProvider()`), `toolsOptions.provider` is updated in sync so future spawns use the new provider.
|
||||
|
||||
## Error Propagation
|
||||
|
||||
```
|
||||
Child tool error (e.g., API 401)
|
||||
→ Subagent LLM sees error, includes in final message (system prompt rule)
|
||||
→ captureFindings() reads final message
|
||||
→ Announcement includes error in findings
|
||||
→ Parent LLM sees error and can inform user
|
||||
|
||||
Child run error (e.g., missing API key for provider)
|
||||
→ AsyncAgent._lastRunError set
|
||||
→ registry.ts checks childAgent.lastRunError after waitForIdle()
|
||||
→ outcome = { status: "error", error: "No API key configured..." }
|
||||
→ Announcement: "task failed: No API key configured..."
|
||||
```
|
||||
|
||||
## Timeout Behavior
|
||||
|
||||
Default: 1800s (30 min). System prompt guides the parent LLM:
|
||||
- Simple tasks: 1800s (default)
|
||||
- Moderate tasks: 1800-2400s (30-40 min)
|
||||
- Complex tasks: 2400-3600s (40-60 min)
|
||||
|
||||
On timeout:
|
||||
1. Timeout timer fires in `watchChildAgent()`
|
||||
2. `cleanup({ status: "timeout" })` is called
|
||||
3. Child agent is closed via `hub.closeAgent()`
|
||||
4. Findings are captured from whatever the child wrote so far
|
||||
5. Announcement reports "timed out" with partial findings
|
||||
|
|
@ -1,79 +0,0 @@
|
|||
import { describe, it, expect, afterEach } from "vitest";
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { writeEntries } from "../session/storage.js";
|
||||
import { readLatestAssistantReply } from "./announce.js";
|
||||
import type { SessionEntry } from "../session/types.js";
|
||||
|
||||
describe("readLatestAssistantReply", () => {
|
||||
let testDir: string;
|
||||
|
||||
afterEach(() => {
|
||||
if (testDir) {
|
||||
rmSync(testDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
async function seedSession(sessionId: string, entries: SessionEntry[]) {
|
||||
await writeEntries(sessionId, entries, { baseDir: testDir });
|
||||
}
|
||||
|
||||
it("returns the latest non-empty assistant text when the last assistant message is tool-only", async () => {
|
||||
testDir = mkdtempSync(join(tmpdir(), "announce-test-"));
|
||||
const sessionId = "child-session-1";
|
||||
|
||||
await seedSession(sessionId, [
|
||||
{
|
||||
type: "message",
|
||||
timestamp: 1,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "南京天气:晴,12°C。" }],
|
||||
},
|
||||
} as SessionEntry,
|
||||
{
|
||||
type: "message",
|
||||
timestamp: 2,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "toolCall", id: "tool-1", name: "weather", arguments: { city: "Nanjing" } }],
|
||||
},
|
||||
} as SessionEntry,
|
||||
]);
|
||||
|
||||
const result = readLatestAssistantReply(sessionId, { baseDir: testDir });
|
||||
expect(result).toBe("南京天气:晴,12°C。");
|
||||
});
|
||||
|
||||
it("falls back to latest toolResult text when no assistant text exists", async () => {
|
||||
testDir = mkdtempSync(join(tmpdir(), "announce-test-"));
|
||||
const sessionId = "child-session-2";
|
||||
|
||||
await seedSession(sessionId, [
|
||||
{
|
||||
type: "message",
|
||||
timestamp: 1,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "toolCall", id: "tool-2", name: "weather", arguments: { city: "Nanjing" } }],
|
||||
},
|
||||
} as SessionEntry,
|
||||
{
|
||||
type: "message",
|
||||
timestamp: 2,
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolCallId: "tool-2",
|
||||
toolName: "weather",
|
||||
content: [{ type: "text", text: "{\"city\":\"Nanjing\",\"tempC\":12,\"condition\":\"Sunny\"}" }],
|
||||
isError: false,
|
||||
},
|
||||
} as SessionEntry,
|
||||
]);
|
||||
|
||||
const result = readLatestAssistantReply(sessionId, { baseDir: testDir });
|
||||
expect(result).toContain("\"city\":\"Nanjing\"");
|
||||
expect(result).toContain("\"condition\":\"Sunny\"");
|
||||
});
|
||||
});
|
||||
|
|
@ -1,203 +0,0 @@
|
|||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
enqueueAnnounce,
|
||||
resetAnnounceQueuesForTests,
|
||||
getAnnounceQueueDepth,
|
||||
type AnnounceQueueItem,
|
||||
type AnnounceQueueSettings,
|
||||
} from "./announce-queue.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetAnnounceQueuesForTests();
|
||||
});
|
||||
|
||||
function makeItem(overrides?: Partial<AnnounceQueueItem>): AnnounceQueueItem {
|
||||
return {
|
||||
prompt: "test prompt",
|
||||
summaryLine: "test summary",
|
||||
enqueuedAt: Date.now(),
|
||||
requesterSessionId: "session-1",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
const FAST_SETTINGS: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
describe("announce queue", () => {
|
||||
it("enqueues an item and drains via send callback", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
// Wait for async drain
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]!.prompt).toBe("test prompt");
|
||||
});
|
||||
|
||||
it("batches items in collect mode", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
const collectSettings: AnnounceQueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 1" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 2" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 3" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
// Collect mode batches all into one send
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]!.prompt).toContain("prompt 1");
|
||||
expect(sent[0]!.prompt).toContain("prompt 2");
|
||||
expect(sent[0]!.prompt).toContain("prompt 3");
|
||||
expect(sent[0]!.prompt).toContain("3 queued announce(s)");
|
||||
});
|
||||
|
||||
it("sends items individually in followup mode", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt A" }),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt B" }),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(2);
|
||||
expect(sent[0]!.prompt).toBe("prompt A");
|
||||
expect(sent[1]!.prompt).toBe("prompt B");
|
||||
});
|
||||
|
||||
it("respects cap with 'new' drop policy (rejects new items)", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => {
|
||||
// Slow send to keep items in queue
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
sent.push(item);
|
||||
};
|
||||
|
||||
const cappedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 2,
|
||||
dropPolicy: "new",
|
||||
};
|
||||
|
||||
const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
|
||||
const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
|
||||
const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
|
||||
|
||||
expect(r1).toBe(true);
|
||||
expect(r2).toBe(true);
|
||||
expect(r3).toBe(false); // Rejected — cap reached
|
||||
});
|
||||
|
||||
it("respects cap with 'old' drop policy (drops oldest)", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => {
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
sent.push(item);
|
||||
};
|
||||
|
||||
const cappedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 2,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
|
||||
|
||||
// Queue should have items 2 and 3 (oldest was dropped)
|
||||
expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2);
|
||||
});
|
||||
|
||||
it("cleans up queue after drain completes", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(getAnnounceQueueDepth("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("debounces before draining", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
const debouncedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 100,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: debouncedSettings,
|
||||
send,
|
||||
});
|
||||
|
||||
// Should not have sent yet (debounce)
|
||||
await new Promise((r) => setTimeout(r, 30));
|
||||
expect(sent).toHaveLength(0);
|
||||
|
||||
// Wait for debounce to complete
|
||||
await new Promise((r) => setTimeout(r, 150));
|
||||
expect(sent).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,315 +0,0 @@
|
|||
/**
|
||||
* Announce queue for subagent result delivery.
|
||||
*
|
||||
* Handles queuing and batching of subagent announcements when the parent
|
||||
* agent is busy. Supports debounce, cap, drop policy, and collect mode.
|
||||
*
|
||||
* Ported from OpenClaw (MIT license), adapted for Super Multica.
|
||||
*/
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
export type AnnounceQueueMode =
|
||||
/** Try steer, no queue fallback */
|
||||
| "steer"
|
||||
/** Try steer, fall back to queue */
|
||||
| "steer-backlog"
|
||||
/** Queue and send items individually */
|
||||
| "followup"
|
||||
/** Queue and batch all items into one combined prompt */
|
||||
| "collect";
|
||||
|
||||
export type AnnounceDropPolicy =
|
||||
/** Drop oldest items when cap reached */
|
||||
| "old"
|
||||
/** Drop newest items when cap reached */
|
||||
| "new"
|
||||
/** Summarize dropped items */
|
||||
| "summarize";
|
||||
|
||||
export type AnnounceQueueItem = {
|
||||
prompt: string;
|
||||
summaryLine?: string;
|
||||
enqueuedAt: number;
|
||||
requesterSessionId: string;
|
||||
};
|
||||
|
||||
export type AnnounceQueueSettings = {
|
||||
mode: AnnounceQueueMode;
|
||||
debounceMs?: number;
|
||||
cap?: number;
|
||||
dropPolicy?: AnnounceDropPolicy;
|
||||
};
|
||||
|
||||
type AnnounceQueueState = {
|
||||
items: AnnounceQueueItem[];
|
||||
draining: boolean;
|
||||
lastEnqueuedAt: number;
|
||||
mode: AnnounceQueueMode;
|
||||
debounceMs: number;
|
||||
cap: number;
|
||||
dropPolicy: AnnounceDropPolicy;
|
||||
droppedCount: number;
|
||||
summaryLines: string[];
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Defaults
|
||||
// ============================================================================
|
||||
|
||||
const DEFAULT_DEBOUNCE_MS = 1000;
|
||||
const DEFAULT_CAP = 20;
|
||||
const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize";
|
||||
|
||||
export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = {
|
||||
mode: "steer-backlog",
|
||||
debounceMs: DEFAULT_DEBOUNCE_MS,
|
||||
cap: DEFAULT_CAP,
|
||||
dropPolicy: DEFAULT_DROP_POLICY,
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Module state
|
||||
// ============================================================================
|
||||
|
||||
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
|
||||
|
||||
// ============================================================================
|
||||
// Public API
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Enqueue an announcement for delivery. Returns true if enqueued,
|
||||
* false if dropped (cap + "new" drop policy).
|
||||
*/
|
||||
export function enqueueAnnounce(params: {
|
||||
key: string;
|
||||
item: AnnounceQueueItem;
|
||||
settings: AnnounceQueueSettings;
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
}): boolean {
|
||||
const queue = getOrCreateQueue(params.key, params.settings, params.send);
|
||||
queue.lastEnqueuedAt = Date.now();
|
||||
|
||||
const shouldEnqueue = applyDropPolicy(queue, params.item);
|
||||
if (!shouldEnqueue) {
|
||||
if (queue.dropPolicy === "new") {
|
||||
scheduleAnnounceDrain(params.key);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
queue.items.push(params.item);
|
||||
scheduleAnnounceDrain(params.key);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Reset all queues (for testing). */
|
||||
export function resetAnnounceQueuesForTests(): void {
|
||||
ANNOUNCE_QUEUES.clear();
|
||||
}
|
||||
|
||||
/** Get the current queue depth for a key (for testing/diagnostics). */
|
||||
export function getAnnounceQueueDepth(key: string): number {
|
||||
return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Queue management
|
||||
// ============================================================================
|
||||
|
||||
function getOrCreateQueue(
|
||||
key: string,
|
||||
settings: AnnounceQueueSettings,
|
||||
send: (item: AnnounceQueueItem) => Promise<void>,
|
||||
): AnnounceQueueState {
|
||||
const existing = ANNOUNCE_QUEUES.get(key);
|
||||
if (existing) {
|
||||
existing.mode = settings.mode;
|
||||
if (typeof settings.debounceMs === "number") {
|
||||
existing.debounceMs = Math.max(0, settings.debounceMs);
|
||||
}
|
||||
if (typeof settings.cap === "number" && settings.cap > 0) {
|
||||
existing.cap = Math.floor(settings.cap);
|
||||
}
|
||||
if (settings.dropPolicy) {
|
||||
existing.dropPolicy = settings.dropPolicy;
|
||||
}
|
||||
existing.send = send;
|
||||
return existing;
|
||||
}
|
||||
|
||||
const created: AnnounceQueueState = {
|
||||
items: [],
|
||||
draining: false,
|
||||
lastEnqueuedAt: 0,
|
||||
mode: settings.mode,
|
||||
debounceMs:
|
||||
typeof settings.debounceMs === "number"
|
||||
? Math.max(0, settings.debounceMs)
|
||||
: DEFAULT_DEBOUNCE_MS,
|
||||
cap:
|
||||
typeof settings.cap === "number" && settings.cap > 0
|
||||
? Math.floor(settings.cap)
|
||||
: DEFAULT_CAP,
|
||||
dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY,
|
||||
droppedCount: 0,
|
||||
summaryLines: [],
|
||||
send,
|
||||
};
|
||||
ANNOUNCE_QUEUES.set(key, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Drop policy
|
||||
// ============================================================================
|
||||
|
||||
function applyDropPolicy(
|
||||
queue: AnnounceQueueState,
|
||||
item: AnnounceQueueItem,
|
||||
): boolean {
|
||||
if (queue.items.length < queue.cap) {
|
||||
return true;
|
||||
}
|
||||
|
||||
switch (queue.dropPolicy) {
|
||||
case "new":
|
||||
// Reject the incoming item
|
||||
return false;
|
||||
|
||||
case "old": {
|
||||
// Drop the oldest item to make room
|
||||
const dropped = queue.items.shift();
|
||||
if (dropped) {
|
||||
queue.droppedCount++;
|
||||
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
|
||||
queue.summaryLines.push(summary);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
case "summarize": {
|
||||
// Drop the oldest item but keep a summary
|
||||
const dropped = queue.items.shift();
|
||||
if (dropped) {
|
||||
queue.droppedCount++;
|
||||
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
|
||||
queue.summaryLines.push(summary);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Drain scheduling
|
||||
// ============================================================================
|
||||
|
||||
function scheduleAnnounceDrain(key: string): void {
|
||||
const queue = ANNOUNCE_QUEUES.get(key);
|
||||
if (!queue || queue.draining) return;
|
||||
|
||||
queue.draining = true;
|
||||
void (async () => {
|
||||
try {
|
||||
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
||||
await waitForDebounce(queue);
|
||||
|
||||
if (queue.mode === "collect") {
|
||||
// Batch all items into one combined prompt
|
||||
const items = queue.items.splice(0, queue.items.length);
|
||||
const summary = buildDropSummary(queue);
|
||||
const prompt = buildCollectPrompt(items, summary);
|
||||
const last = items.at(-1);
|
||||
if (!last) break;
|
||||
await queue.send({ ...last, prompt });
|
||||
continue;
|
||||
}
|
||||
|
||||
// followup / steer-backlog: send items individually
|
||||
const summary = buildDropSummary(queue);
|
||||
if (summary) {
|
||||
const next = queue.items.shift();
|
||||
if (!next) break;
|
||||
await queue.send({ ...next, prompt: summary });
|
||||
continue;
|
||||
}
|
||||
|
||||
const next = queue.items.shift();
|
||||
if (!next) break;
|
||||
await queue.send(next);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`);
|
||||
} finally {
|
||||
queue.draining = false;
|
||||
if (queue.items.length === 0 && queue.droppedCount === 0) {
|
||||
ANNOUNCE_QUEUES.delete(key);
|
||||
} else {
|
||||
scheduleAnnounceDrain(key);
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Helpers
|
||||
// ============================================================================
|
||||
|
||||
function waitForDebounce(queue: AnnounceQueueState): Promise<void> {
|
||||
const elapsed = Date.now() - queue.lastEnqueuedAt;
|
||||
const remaining = Math.max(0, queue.debounceMs - elapsed);
|
||||
if (remaining <= 0) return Promise.resolve();
|
||||
return new Promise((resolve) => setTimeout(resolve, remaining));
|
||||
}
|
||||
|
||||
function buildDropSummary(queue: AnnounceQueueState): string | undefined {
|
||||
if (queue.droppedCount === 0) return undefined;
|
||||
|
||||
const parts: string[] = [
|
||||
`[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`,
|
||||
];
|
||||
if (queue.summaryLines.length > 0) {
|
||||
parts.push("");
|
||||
for (const line of queue.summaryLines) {
|
||||
parts.push(`- ${line}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset counters
|
||||
queue.droppedCount = 0;
|
||||
queue.summaryLines = [];
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
||||
function buildCollectPrompt(
|
||||
items: AnnounceQueueItem[],
|
||||
dropSummary: string | undefined,
|
||||
): string {
|
||||
const parts: string[] = [
|
||||
`[${items.length} queued announce(s) while agent was busy]`,
|
||||
"",
|
||||
];
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
parts.push(`---`);
|
||||
parts.push(`Queued #${i + 1}`);
|
||||
parts.push(items[i]!.prompt);
|
||||
parts.push("");
|
||||
}
|
||||
|
||||
if (dropSummary) {
|
||||
parts.push(dropSummary);
|
||||
parts.push("");
|
||||
}
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
|
@ -1,294 +0,0 @@
|
|||
import { describe, it, expect } from "vitest";
|
||||
import { buildSubagentSystemPrompt, formatAnnouncementMessage, formatCoalescedAnnouncementMessage } from "./announce.js";
|
||||
import type { FormatAnnouncementParams } from "./announce.js";
|
||||
import type { SubagentRunRecord } from "./types.js";
|
||||
|
||||
describe("buildSubagentSystemPrompt", () => {
|
||||
it("includes task and session context", () => {
|
||||
const prompt = buildSubagentSystemPrompt({
|
||||
requesterSessionId: "parent-123",
|
||||
childSessionId: "child-456",
|
||||
task: "Analyze the auth module for security issues",
|
||||
});
|
||||
|
||||
expect(prompt).toContain("## Subagent Rules");
|
||||
expect(prompt).toContain("Analyze the auth module for security issues");
|
||||
expect(prompt).toContain("parent-123");
|
||||
expect(prompt).toContain("child-456");
|
||||
expect(prompt).toContain("Do NOT spawn nested subagents");
|
||||
expect(prompt).toContain("## Safety");
|
||||
});
|
||||
|
||||
it("includes label when provided", () => {
|
||||
const prompt = buildSubagentSystemPrompt({
|
||||
requesterSessionId: "parent-123",
|
||||
childSessionId: "child-456",
|
||||
label: "Security Audit",
|
||||
task: "Check for vulnerabilities",
|
||||
});
|
||||
|
||||
expect(prompt).toContain('Label: "Security Audit"');
|
||||
});
|
||||
|
||||
it("omits label line when not provided", () => {
|
||||
const prompt = buildSubagentSystemPrompt({
|
||||
requesterSessionId: "parent-123",
|
||||
childSessionId: "child-456",
|
||||
task: "Do something",
|
||||
});
|
||||
|
||||
expect(prompt).not.toContain("Label:");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatAnnouncementMessage", () => {
|
||||
const baseParams: FormatAnnouncementParams = {
|
||||
runId: "run-1",
|
||||
childSessionId: "child-456",
|
||||
requesterSessionId: "parent-123",
|
||||
task: "Analyze code",
|
||||
label: "Code Analysis",
|
||||
cleanup: "delete",
|
||||
outcome: { status: "ok" },
|
||||
startedAt: 1000000,
|
||||
endedAt: 1030000,
|
||||
};
|
||||
|
||||
it("formats successful completion", () => {
|
||||
const msg = formatAnnouncementMessage({
|
||||
...baseParams,
|
||||
findings: "Found 3 issues in the auth module.",
|
||||
});
|
||||
|
||||
expect(msg).toContain('"Code Analysis" just completed successfully');
|
||||
expect(msg).toContain("Found 3 issues in the auth module.");
|
||||
expect(msg).toContain("runtime 30s");
|
||||
expect(msg).toContain("session child-456");
|
||||
});
|
||||
|
||||
it("formats error outcome", () => {
|
||||
const msg = formatAnnouncementMessage({
|
||||
...baseParams,
|
||||
outcome: { status: "error", error: "API key expired" },
|
||||
});
|
||||
|
||||
expect(msg).toContain("failed: API key expired");
|
||||
});
|
||||
|
||||
it("formats timeout outcome", () => {
|
||||
const msg = formatAnnouncementMessage({
|
||||
...baseParams,
|
||||
outcome: { status: "timeout" },
|
||||
});
|
||||
|
||||
expect(msg).toContain("timed out");
|
||||
});
|
||||
|
||||
it("shows (no output) when findings is not provided", () => {
|
||||
const msg = formatAnnouncementMessage(baseParams);
|
||||
|
||||
expect(msg).toContain("(no output)");
|
||||
});
|
||||
|
||||
it("uses task text when label is not provided", () => {
|
||||
const paramsNoLabel: FormatAnnouncementParams = {
|
||||
...baseParams,
|
||||
label: undefined,
|
||||
};
|
||||
const msg = formatAnnouncementMessage(paramsNoLabel);
|
||||
|
||||
expect(msg).toContain('"Analyze code"');
|
||||
});
|
||||
|
||||
it("formats runtime for minutes", () => {
|
||||
const msg = formatAnnouncementMessage({
|
||||
...baseParams,
|
||||
startedAt: 1000000,
|
||||
endedAt: 1150000, // 150 seconds = 2m30s
|
||||
});
|
||||
|
||||
expect(msg).toContain("runtime 2m30s");
|
||||
});
|
||||
|
||||
it("formats runtime for hours", () => {
|
||||
const msg = formatAnnouncementMessage({
|
||||
...baseParams,
|
||||
startedAt: 1000000,
|
||||
endedAt: 4600000, // 3600 seconds = 1h
|
||||
});
|
||||
|
||||
expect(msg).toContain("runtime 1h");
|
||||
});
|
||||
|
||||
it("includes summarization instruction", () => {
|
||||
const msg = formatAnnouncementMessage(baseParams);
|
||||
|
||||
expect(msg).toContain("Summarize this naturally for the user");
|
||||
expect(msg).toContain("NO_REPLY");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatCoalescedAnnouncementMessage", () => {
|
||||
function makeRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
|
||||
return {
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Default task",
|
||||
cleanup: "delete",
|
||||
createdAt: 1000000,
|
||||
startedAt: 1000000,
|
||||
endedAt: 1030000,
|
||||
outcome: { status: "ok" },
|
||||
findings: "Some findings",
|
||||
findingsCaptured: true,
|
||||
announced: false,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
it("delegates to formatAnnouncementMessage for a single record", () => {
|
||||
const record = makeRecord({ label: "Code Analysis" });
|
||||
const coalesced = formatCoalescedAnnouncementMessage([record]);
|
||||
const direct = formatAnnouncementMessage({
|
||||
runId: record.runId,
|
||||
childSessionId: record.childSessionId,
|
||||
requesterSessionId: record.requesterSessionId,
|
||||
task: record.task,
|
||||
label: record.label,
|
||||
cleanup: record.cleanup,
|
||||
outcome: record.outcome,
|
||||
startedAt: record.startedAt,
|
||||
endedAt: record.endedAt,
|
||||
findings: record.findings,
|
||||
});
|
||||
|
||||
expect(coalesced).toBe(direct);
|
||||
});
|
||||
|
||||
it("formats multiple records with all task findings and stats", () => {
|
||||
const records = [
|
||||
makeRecord({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
label: "Task A",
|
||||
findings: "Found issue A",
|
||||
startedAt: 1000000,
|
||||
endedAt: 1030000,
|
||||
}),
|
||||
makeRecord({
|
||||
runId: "run-2",
|
||||
childSessionId: "child-2",
|
||||
label: "Task B",
|
||||
findings: "Found issue B",
|
||||
startedAt: 1000000,
|
||||
endedAt: 1045000, // 45 seconds
|
||||
}),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).toContain("All 2 background task(s) have completed");
|
||||
expect(msg).toContain('Task 1: "Task A"');
|
||||
expect(msg).toContain("Found issue A");
|
||||
expect(msg).toContain('Task 2: "Task B"');
|
||||
expect(msg).toContain("Found issue B");
|
||||
expect(msg).toContain("Total wall time: 45s");
|
||||
expect(msg).toContain("2 succeeded, 0 failed");
|
||||
});
|
||||
|
||||
it("reports mixed outcomes correctly", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", label: "OK Task", outcome: { status: "ok" } }),
|
||||
makeRecord({ runId: "run-2", label: "Failed Task", outcome: { status: "error", error: "crash" } }),
|
||||
makeRecord({ runId: "run-3", label: "Timeout Task", outcome: { status: "timeout" } }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).toContain("completed successfully");
|
||||
expect(msg).toContain("failed: crash");
|
||||
expect(msg).toContain("timed out");
|
||||
expect(msg).toContain("1 succeeded, 2 failed");
|
||||
});
|
||||
|
||||
it("shows (no output) for missing findings", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", findings: undefined }),
|
||||
makeRecord({ runId: "run-2", findings: "Has output" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).toContain("(no output)");
|
||||
expect(msg).toContain("Has output");
|
||||
});
|
||||
|
||||
it("includes combined summary instruction for multi-record", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1" }),
|
||||
makeRecord({ runId: "run-2" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).toContain("MUST include findings from every task item above");
|
||||
expect(msg).toContain("NO_REPLY");
|
||||
});
|
||||
|
||||
it("includes raw findings for every task in coalesced payload", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", label: "南京天气", findings: "南京:晴,12°C" }),
|
||||
makeRecord({ runId: "run-2", label: "上海天气", findings: "上海:多云,9°C" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).toContain("Raw findings from each task (MUST cover all items):");
|
||||
expect(msg).toContain("[1] 南京天气:");
|
||||
expect(msg).toContain("南京:晴,12°C");
|
||||
expect(msg).toContain("[2] 上海天气:");
|
||||
expect(msg).toContain("上海:多云,9°C");
|
||||
expect(msg).toContain("MUST include findings from every task item above");
|
||||
});
|
||||
|
||||
it("includes continuation prompt when next is provided", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", label: "AAPL data", findings: "AAPL revenue: $100B" }),
|
||||
makeRecord({ runId: "run-2", label: "MSFT data", findings: "MSFT revenue: $200B" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records, "Summarize all data and write a PDF investment report");
|
||||
|
||||
expect(msg).toContain("CONTINUATION TASK");
|
||||
expect(msg).toContain("Summarize all data and write a PDF investment report");
|
||||
expect(msg).toContain("AAPL revenue: $100B");
|
||||
expect(msg).toContain("MSFT revenue: $200B");
|
||||
// Should NOT contain the default summarize instruction
|
||||
expect(msg).not.toContain("Summarize these results naturally for the user");
|
||||
});
|
||||
|
||||
it("uses continuation prompt even for single record when next is provided", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", label: "Data collection", findings: "All data collected" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records, "Generate the final report");
|
||||
|
||||
expect(msg).toContain("CONTINUATION TASK");
|
||||
expect(msg).toContain("Generate the final report");
|
||||
expect(msg).toContain("All data collected");
|
||||
});
|
||||
|
||||
it("uses default summarize instruction when next is not provided", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1" }),
|
||||
makeRecord({ runId: "run-2" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).not.toContain("CONTINUATION TASK");
|
||||
expect(msg).toContain("Summarize these results naturally for the user");
|
||||
});
|
||||
});
|
||||
|
|
@ -1,424 +0,0 @@
|
|||
/**
|
||||
* Subagent announcement flow.
|
||||
*
|
||||
* Handles result propagation from child → parent agent:
|
||||
* - Builds system prompts for child agents
|
||||
* - Reads child session output
|
||||
* - Formats and delivers announcement messages
|
||||
*/
|
||||
|
||||
import { readEntries } from "../session/storage.js";
|
||||
import { getHub } from "../../hub/hub-singleton.js";
|
||||
import { buildSystemPrompt } from "../system-prompt/index.js";
|
||||
import type {
|
||||
SubagentAnnounceParams,
|
||||
SubagentRunOutcome,
|
||||
SubagentRunRecord,
|
||||
SubagentSystemPromptParams,
|
||||
} from "./types.js";
|
||||
import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js";
|
||||
|
||||
/**
|
||||
* Build the system prompt injected into a subagent session.
|
||||
* Uses the structured prompt builder with "minimal" mode.
|
||||
*/
|
||||
export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): string {
|
||||
return buildSystemPrompt({
|
||||
mode: "minimal",
|
||||
subagent: {
|
||||
requesterSessionId: params.requesterSessionId,
|
||||
childSessionId: params.childSessionId,
|
||||
label: params.label,
|
||||
task: params.task,
|
||||
},
|
||||
tools: params.tools,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the latest assistant reply from a session's JSONL file.
|
||||
*/
|
||||
export function readLatestAssistantReply(
|
||||
sessionId: string,
|
||||
options?: { baseDir?: string },
|
||||
): string | undefined {
|
||||
const entries = readEntries(sessionId, options);
|
||||
let latestToolResultText: string | undefined;
|
||||
|
||||
// Walk backwards to find the last non-empty assistant reply.
|
||||
// If no assistant text exists (e.g. run ended after tool execution),
|
||||
// fall back to the latest non-empty toolResult content.
|
||||
for (let i = entries.length - 1; i >= 0; i--) {
|
||||
const entry = entries[i]!;
|
||||
if (entry.type !== "message") continue;
|
||||
|
||||
const message = entry.message;
|
||||
if (message.role === "assistant") {
|
||||
const text = extractAssistantText(message);
|
||||
if (text) return text;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (message.role === "toolResult" && !latestToolResultText) {
|
||||
const text = extractToolResultText(message);
|
||||
if (text) latestToolResultText = text;
|
||||
}
|
||||
}
|
||||
|
||||
return latestToolResultText;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract text content from an assistant message.
|
||||
* AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[].
|
||||
*/
|
||||
function extractAssistantText(message: { role: string; content: unknown }): string {
|
||||
return extractTextLikeContent(message.content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract text content from a toolResult message.
|
||||
*/
|
||||
function extractToolResultText(message: { role: string; content: unknown }): string {
|
||||
return extractTextLikeContent(message.content);
|
||||
}
|
||||
|
||||
function extractTextLikeContent(content: unknown): string {
|
||||
if (typeof content === "string") {
|
||||
return sanitizeText(content);
|
||||
}
|
||||
|
||||
if (!Array.isArray(content)) return "";
|
||||
|
||||
const textParts: string[] = [];
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") continue;
|
||||
if ("text" in block) {
|
||||
textParts.push(String((block as { text: unknown }).text));
|
||||
}
|
||||
}
|
||||
|
||||
return sanitizeText(textParts.join("\n"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip thinking tags and tool markers from text.
|
||||
*/
|
||||
function sanitizeText(text: string): string {
|
||||
return text
|
||||
.replace(/<thinking>[\s\S]*?<\/thinking>/g, "")
|
||||
.replace(/<tool_call>[\s\S]*?<\/tool_call>/g, "")
|
||||
.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Format the duration between two timestamps as a human-readable string.
|
||||
*/
|
||||
function formatDuration(startMs: number, endMs: number): string {
|
||||
const totalSeconds = Math.round((endMs - startMs) / 1000);
|
||||
if (totalSeconds < 60) return `${totalSeconds}s`;
|
||||
|
||||
const minutes = Math.floor(totalSeconds / 60);
|
||||
const seconds = totalSeconds % 60;
|
||||
if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`;
|
||||
|
||||
const hours = Math.floor(minutes / 60);
|
||||
const remainingMinutes = minutes % 60;
|
||||
return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a status label from an outcome.
|
||||
*/
|
||||
function formatStatusLabel(outcome: SubagentRunOutcome | undefined): string {
|
||||
if (!outcome) return "completed with unknown status";
|
||||
switch (outcome.status) {
|
||||
case "ok":
|
||||
return "completed successfully";
|
||||
case "error":
|
||||
return outcome.error ? `failed: ${outcome.error}` : "failed";
|
||||
case "timeout":
|
||||
return "timed out";
|
||||
default:
|
||||
return "completed with unknown status";
|
||||
}
|
||||
}
|
||||
|
||||
/** Parameters for formatAnnouncementMessage */
|
||||
export interface FormatAnnouncementParams {
|
||||
runId: string;
|
||||
childSessionId: string;
|
||||
requesterSessionId: string;
|
||||
task: string;
|
||||
label?: string | undefined;
|
||||
cleanup: "delete" | "keep";
|
||||
outcome?: SubagentRunOutcome | undefined;
|
||||
startedAt?: number | undefined;
|
||||
endedAt?: number | undefined;
|
||||
findings?: string | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format the announcement message sent to the parent agent.
|
||||
*/
|
||||
export function formatAnnouncementMessage(params: FormatAnnouncementParams): string {
|
||||
const { task, label, outcome, findings, startedAt, endedAt, childSessionId } = params;
|
||||
const displayName = label || task.slice(0, 60);
|
||||
const statusLabel = formatStatusLabel(outcome);
|
||||
|
||||
const parts: string[] = [
|
||||
`A background task "${displayName}" just ${statusLabel}.`,
|
||||
"",
|
||||
"Findings:",
|
||||
findings || "(no output)",
|
||||
];
|
||||
|
||||
// Stats line
|
||||
const stats: string[] = [];
|
||||
if (startedAt && endedAt) {
|
||||
stats.push(`runtime ${formatDuration(startedAt, endedAt)}`);
|
||||
}
|
||||
stats.push(`session ${childSessionId}`);
|
||||
|
||||
parts.push("", `Stats: ${stats.join(" • ")}`);
|
||||
|
||||
parts.push(
|
||||
"",
|
||||
"Summarize this naturally for the user. Keep it brief (1-2 sentences).",
|
||||
"Flow it into the conversation naturally.",
|
||||
"Do not mention technical details like session IDs or that this was a background task.",
|
||||
"You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).",
|
||||
);
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a coalesced announcement message from multiple completed subagent runs.
|
||||
* When only one record is provided, delegates to formatAnnouncementMessage.
|
||||
*
|
||||
* @param next — Optional continuation prompt from a SubagentGroup. When present,
|
||||
* the parent agent is instructed to execute the continuation using the combined
|
||||
* findings, rather than just summarizing.
|
||||
*/
|
||||
export function formatCoalescedAnnouncementMessage(
|
||||
records: SubagentRunRecord[],
|
||||
next?: string,
|
||||
): string {
|
||||
// Single record without continuation: delegate to existing format
|
||||
if (records.length === 1 && !next) {
|
||||
const r = records[0]!;
|
||||
return formatAnnouncementMessage({
|
||||
runId: r.runId,
|
||||
childSessionId: r.childSessionId,
|
||||
requesterSessionId: r.requesterSessionId,
|
||||
task: r.task,
|
||||
label: r.label,
|
||||
cleanup: r.cleanup,
|
||||
outcome: r.outcome,
|
||||
startedAt: r.startedAt,
|
||||
endedAt: r.endedAt,
|
||||
findings: r.findings,
|
||||
});
|
||||
}
|
||||
|
||||
// Multiple records (or single with continuation): build combined message.
|
||||
const parts: string[] = [
|
||||
`All ${records.length} background task(s) have completed. Here are the combined results:`,
|
||||
"",
|
||||
];
|
||||
|
||||
for (let i = 0; i < records.length; i++) {
|
||||
const r = records[i]!;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const statusLabel = formatStatusLabel(r.outcome);
|
||||
const durationStr = (r.startedAt && r.endedAt)
|
||||
? ` (${formatDuration(r.startedAt, r.endedAt)})`
|
||||
: "";
|
||||
|
||||
parts.push(
|
||||
`### Task ${i + 1}: "${displayName}"`,
|
||||
`Status: ${statusLabel}${durationStr}`,
|
||||
"",
|
||||
"Findings:",
|
||||
r.findings || "(no output)",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
||||
// Overall stats
|
||||
const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[];
|
||||
const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[];
|
||||
if (allStartTimes.length > 0 && allEndTimes.length > 0) {
|
||||
const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes));
|
||||
parts.push(`Total wall time: ${wallTime}`);
|
||||
}
|
||||
|
||||
const okCount = records.filter(r => r.outcome?.status === "ok").length;
|
||||
const failCount = records.length - okCount;
|
||||
parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`);
|
||||
|
||||
parts.push("", "Raw findings from each task (MUST cover all items):", "");
|
||||
for (let i = 0; i < records.length; i++) {
|
||||
const r = records[i]!;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
parts.push(
|
||||
`[${i + 1}] ${displayName}:`,
|
||||
r.findings || "(no output)",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
||||
// Continuation vs. summarization
|
||||
if (next) {
|
||||
parts.push(
|
||||
"",
|
||||
"---",
|
||||
"",
|
||||
"CONTINUATION TASK: The user's original request requires further work using the findings above.",
|
||||
"Execute the following task now, using ALL the collected data:",
|
||||
"",
|
||||
next,
|
||||
"",
|
||||
"Use the raw findings above as your data source. Call tools as needed to complete this task.",
|
||||
"Do not mention technical details like session IDs or that these were background tasks.",
|
||||
);
|
||||
} else {
|
||||
parts.push(
|
||||
"",
|
||||
"Summarize these results naturally for the user.",
|
||||
"You MUST include findings from every task item above, without omission.",
|
||||
"Keep it concise, but preserve concrete findings from each task.",
|
||||
"Do not mention technical details like session IDs or that these were background tasks.",
|
||||
"You can respond with NO_REPLY if no announcement is needed.",
|
||||
);
|
||||
}
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the coalesced announcement flow for all completed runs of a requester.
|
||||
* Uses two-tier delivery:
|
||||
* 1. Queue — if parent is busy (running or has pending writes), queue for
|
||||
* later drain via writeInternal (with debounce to batch nearby completions)
|
||||
* 2. Direct — if parent is idle, send immediately via writeInternal
|
||||
*
|
||||
* All delivery uses writeInternal() which marks the announcement prompt as
|
||||
* `internal: true` (hidden from UI). The assistant's summary response is
|
||||
* forwarded to the real-time stream (`forwardAssistant: true`) so the user
|
||||
* sees the result, and persisted to JSONL for future session loads.
|
||||
*/
|
||||
export function runCoalescedAnnounceFlow(
|
||||
requesterSessionId: string,
|
||||
records: SubagentRunRecord[],
|
||||
next?: string,
|
||||
): boolean {
|
||||
const message = formatCoalescedAnnouncementMessage(records, next);
|
||||
|
||||
try {
|
||||
const hub = getHub();
|
||||
const parentAgent = hub.getAgent(requesterSessionId);
|
||||
if (!parentAgent || parentAgent.closed) {
|
||||
console.warn(
|
||||
`[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Tier 1: BUSY — parent is running or has pending writes
|
||||
// Queue the announcement for delivery via writeInternal() after the parent
|
||||
// finishes its current work. We do NOT use steer() (cancels unrelated tool
|
||||
// calls) or followUp() (doesn't mark entries as internal, polluting the UI).
|
||||
if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) {
|
||||
enqueueAnnounce({
|
||||
key: requesterSessionId,
|
||||
item: {
|
||||
prompt: message,
|
||||
summaryLine: `${records.length} subagent(s) completed`,
|
||||
enqueuedAt: Date.now(),
|
||||
requesterSessionId,
|
||||
},
|
||||
settings: DEFAULT_ANNOUNCE_SETTINGS,
|
||||
send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt),
|
||||
});
|
||||
console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Tier 2: IDLE — parent is idle, send directly via writeInternal
|
||||
sendAnnounceDirect(requesterSessionId, message);
|
||||
return true;
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send announcement directly to parent via writeInternal.
|
||||
* Used as Tier 3 (idle) and as the queue drain callback.
|
||||
*/
|
||||
function sendAnnounceDirect(requesterSessionId: string, message: string): void {
|
||||
try {
|
||||
const hub = getHub();
|
||||
const parentAgent = hub.getAgent(requesterSessionId);
|
||||
if (!parentAgent || parentAgent.closed) {
|
||||
console.warn(
|
||||
`[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the full subagent announcement flow:
|
||||
* 1. Read child's last assistant reply
|
||||
* 2. Format announcement message
|
||||
* 3. Send to parent agent via Hub
|
||||
*
|
||||
* @deprecated Use runCoalescedAnnounceFlow instead, which supports
|
||||
* batching multiple completed runs into a single announcement.
|
||||
*/
|
||||
export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean {
|
||||
const { requesterSessionId, childSessionId } = params;
|
||||
|
||||
// Read child's final output
|
||||
const findings = readLatestAssistantReply(childSessionId);
|
||||
|
||||
// Format the announcement
|
||||
const message = formatAnnouncementMessage({
|
||||
runId: params.runId,
|
||||
childSessionId: params.childSessionId,
|
||||
requesterSessionId: params.requesterSessionId,
|
||||
task: params.task,
|
||||
label: params.label,
|
||||
cleanup: params.cleanup,
|
||||
outcome: params.outcome,
|
||||
startedAt: params.startedAt,
|
||||
endedAt: params.endedAt,
|
||||
findings,
|
||||
});
|
||||
|
||||
// Deliver to parent agent via Hub
|
||||
try {
|
||||
const hub = getHub();
|
||||
const parentAgent = hub.getAgent(requesterSessionId);
|
||||
if (!parentAgent || parentAgent.closed) {
|
||||
console.warn(
|
||||
`[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
|
||||
return true;
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,117 +0,0 @@
|
|||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
enqueueInLane,
|
||||
getLaneSize,
|
||||
clearLane,
|
||||
setLaneConcurrency,
|
||||
resetLanesForTests,
|
||||
} from "./command-queue.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetLanesForTests();
|
||||
});
|
||||
|
||||
describe("command queue", () => {
|
||||
it("runs tasks serially by default (maxConcurrent = 1)", async () => {
|
||||
let active = 0;
|
||||
let maxActive = 0;
|
||||
const order: number[] = [];
|
||||
|
||||
const makeTask = (id: number) => async () => {
|
||||
active += 1;
|
||||
maxActive = Math.max(maxActive, active);
|
||||
order.push(id);
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
active -= 1;
|
||||
return id;
|
||||
};
|
||||
|
||||
const results = await Promise.all([
|
||||
enqueueInLane("test", makeTask(1)),
|
||||
enqueueInLane("test", makeTask(2)),
|
||||
enqueueInLane("test", makeTask(3)),
|
||||
]);
|
||||
|
||||
expect(results).toEqual([1, 2, 3]);
|
||||
expect(order).toEqual([1, 2, 3]);
|
||||
expect(maxActive).toBe(1);
|
||||
});
|
||||
|
||||
it("respects maxConcurrent limit", async () => {
|
||||
setLaneConcurrency("test", 3);
|
||||
|
||||
let active = 0;
|
||||
let maxActive = 0;
|
||||
|
||||
const makeTask = (id: number) => async () => {
|
||||
active += 1;
|
||||
maxActive = Math.max(maxActive, active);
|
||||
await new Promise((r) => setTimeout(r, 20));
|
||||
active -= 1;
|
||||
return id;
|
||||
};
|
||||
|
||||
const results = await Promise.all([
|
||||
enqueueInLane("test", makeTask(1)),
|
||||
enqueueInLane("test", makeTask(2)),
|
||||
enqueueInLane("test", makeTask(3)),
|
||||
enqueueInLane("test", makeTask(4)),
|
||||
enqueueInLane("test", makeTask(5)),
|
||||
]);
|
||||
|
||||
expect(results).toEqual([1, 2, 3, 4, 5]);
|
||||
expect(maxActive).toBe(3);
|
||||
});
|
||||
|
||||
it("reports correct lane size", async () => {
|
||||
setLaneConcurrency("test", 1);
|
||||
|
||||
let resolveFirst!: () => void;
|
||||
const blocker = new Promise<void>((r) => {
|
||||
resolveFirst = r;
|
||||
});
|
||||
|
||||
// First task blocks the lane
|
||||
const p1 = enqueueInLane("test", () => blocker);
|
||||
// Second task queued
|
||||
const p2 = enqueueInLane("test", async () => "done");
|
||||
|
||||
// 1 active + 1 queued = 2
|
||||
expect(getLaneSize("test")).toBe(2);
|
||||
|
||||
resolveFirst();
|
||||
await Promise.all([p1, p2]);
|
||||
|
||||
expect(getLaneSize("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("clears pending tasks", async () => {
|
||||
setLaneConcurrency("test", 1);
|
||||
|
||||
let resolveFirst!: () => void;
|
||||
const blocker = new Promise<void>((r) => {
|
||||
resolveFirst = r;
|
||||
});
|
||||
|
||||
const p1 = enqueueInLane("test", () => blocker);
|
||||
const p2 = enqueueInLane("test", async () => "should-not-run");
|
||||
const p3 = enqueueInLane("test", async () => "should-not-run");
|
||||
|
||||
const removed = clearLane("test");
|
||||
expect(removed).toBe(2);
|
||||
|
||||
resolveFirst();
|
||||
await p1;
|
||||
|
||||
// p2 and p3 should reject
|
||||
await expect(p2).rejects.toThrow("Lane cleared");
|
||||
await expect(p3).rejects.toThrow("Lane cleared");
|
||||
|
||||
expect(getLaneSize("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("returns 0 for unknown lane", () => {
|
||||
expect(getLaneSize("nonexistent")).toBe(0);
|
||||
expect(clearLane("nonexistent")).toBe(0);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,158 +0,0 @@
|
|||
/**
|
||||
* Lane-based command queue for subagent concurrency control.
|
||||
*
|
||||
* Each lane maintains an independent queue with a configurable max-concurrency
|
||||
* limit. Tasks beyond the limit are queued and executed FIFO as slots free up.
|
||||
*
|
||||
* Adapted from OpenClaw's process/command-queue.ts.
|
||||
*/
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type QueueEntry = {
|
||||
task: () => Promise<unknown>;
|
||||
resolve: (value: unknown) => void;
|
||||
reject: (reason?: unknown) => void;
|
||||
enqueuedAt: number;
|
||||
warnAfterMs: number;
|
||||
onWait?: (waitMs: number, queuedAhead: number) => void;
|
||||
};
|
||||
|
||||
type LaneState = {
|
||||
lane: string;
|
||||
queue: QueueEntry[];
|
||||
active: number;
|
||||
maxConcurrent: number;
|
||||
draining: boolean;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Module state
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const lanes = new Map<string, LaneState>();
|
||||
|
||||
function getLaneState(lane: string): LaneState {
|
||||
const existing = lanes.get(lane);
|
||||
if (existing) return existing;
|
||||
|
||||
const created: LaneState = {
|
||||
lane,
|
||||
queue: [],
|
||||
active: 0,
|
||||
maxConcurrent: 1,
|
||||
draining: false,
|
||||
};
|
||||
lanes.set(lane, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Drain / pump
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function drainLane(lane: string): void {
|
||||
const state = getLaneState(lane);
|
||||
if (state.draining) return;
|
||||
state.draining = true;
|
||||
|
||||
const pump = () => {
|
||||
while (state.active < state.maxConcurrent && state.queue.length > 0) {
|
||||
const entry = state.queue.shift() as QueueEntry;
|
||||
const waitedMs = Date.now() - entry.enqueuedAt;
|
||||
|
||||
if (waitedMs >= entry.warnAfterMs) {
|
||||
entry.onWait?.(waitedMs, state.queue.length);
|
||||
console.warn(
|
||||
`[CommandQueue] lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
|
||||
);
|
||||
}
|
||||
|
||||
state.active += 1;
|
||||
|
||||
void (async () => {
|
||||
try {
|
||||
const result = await entry.task();
|
||||
state.active -= 1;
|
||||
pump();
|
||||
entry.resolve(result);
|
||||
} catch (err) {
|
||||
state.active -= 1;
|
||||
pump();
|
||||
entry.reject(err);
|
||||
}
|
||||
})();
|
||||
}
|
||||
state.draining = false;
|
||||
};
|
||||
|
||||
pump();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public API
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Set (or update) the max concurrency for a lane. Triggers a drain. */
|
||||
export function setLaneConcurrency(lane: string, maxConcurrent: number): void {
|
||||
const state = getLaneState(lane);
|
||||
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
|
||||
drainLane(lane);
|
||||
}
|
||||
|
||||
/** Enqueue a task in a specific lane. Returns a promise that resolves with the task's return value. */
|
||||
export function enqueueInLane<T>(
|
||||
lane: string,
|
||||
task: () => Promise<T>,
|
||||
opts?: {
|
||||
warnAfterMs?: number;
|
||||
onWait?: (waitMs: number, queuedAhead: number) => void;
|
||||
},
|
||||
): Promise<T> {
|
||||
const warnAfterMs = opts?.warnAfterMs ?? 5_000;
|
||||
const state = getLaneState(lane);
|
||||
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
state.queue.push({
|
||||
task: () => task(),
|
||||
resolve: (value) => resolve(value as T),
|
||||
reject,
|
||||
enqueuedAt: Date.now(),
|
||||
warnAfterMs,
|
||||
onWait: opts?.onWait,
|
||||
});
|
||||
drainLane(lane);
|
||||
});
|
||||
}
|
||||
|
||||
/** Number of active + queued tasks in a lane. */
|
||||
export function getLaneSize(lane: string): number {
|
||||
const state = lanes.get(lane);
|
||||
if (!state) return 0;
|
||||
return state.queue.length + state.active;
|
||||
}
|
||||
|
||||
/** Remove all pending (not yet active) tasks from a lane. Returns how many were removed. */
|
||||
export function clearLane(lane: string): number {
|
||||
const state = lanes.get(lane);
|
||||
if (!state) return 0;
|
||||
const removed = state.queue.length;
|
||||
// Reject pending tasks so callers aren't left hanging
|
||||
for (const entry of state.queue) {
|
||||
entry.reject(new Error("Lane cleared"));
|
||||
}
|
||||
state.queue.length = 0;
|
||||
return removed;
|
||||
}
|
||||
|
||||
/** Reset all lanes (for testing). */
|
||||
export function resetLanesForTests(): void {
|
||||
for (const state of lanes.values()) {
|
||||
for (const entry of state.queue) {
|
||||
entry.reject(new Error("Reset"));
|
||||
}
|
||||
}
|
||||
lanes.clear();
|
||||
}
|
||||
|
|
@ -1,40 +0,0 @@
|
|||
/**
|
||||
* Subagent orchestration system.
|
||||
*
|
||||
* Provides child agent spawning, lifecycle management,
|
||||
* persistent registry, and result announcement flow.
|
||||
*/
|
||||
|
||||
export type {
|
||||
SubagentRunOutcome,
|
||||
SubagentRunRecord,
|
||||
RegisterSubagentRunParams,
|
||||
SubagentAnnounceParams,
|
||||
SubagentSystemPromptParams,
|
||||
} from "./types.js";
|
||||
|
||||
export {
|
||||
initSubagentRegistry,
|
||||
registerSubagentRun,
|
||||
listSubagentRuns,
|
||||
releaseSubagentRun,
|
||||
getSubagentRun,
|
||||
resetSubagentRegistryForTests,
|
||||
shutdownSubagentRegistry,
|
||||
} from "./registry.js";
|
||||
|
||||
export {
|
||||
buildSubagentSystemPrompt,
|
||||
readLatestAssistantReply,
|
||||
formatAnnouncementMessage,
|
||||
runSubagentAnnounceFlow,
|
||||
formatCoalescedAnnouncementMessage,
|
||||
runCoalescedAnnounceFlow,
|
||||
} from "./announce.js";
|
||||
export type { FormatAnnouncementParams } from "./announce.js";
|
||||
|
||||
export {
|
||||
loadSubagentRuns,
|
||||
saveSubagentRuns,
|
||||
getSubagentStorePath,
|
||||
} from "./registry-store.js";
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
/** Named lanes for the subagent command queue. */
|
||||
export const SubagentLane = {
|
||||
Subagent: "subagent",
|
||||
} as const;
|
||||
|
||||
/** Default maximum concurrent subagent runs. */
|
||||
export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 10;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Timeout defaults
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Default subagent timeout: 30 minutes. */
|
||||
export const DEFAULT_SUBAGENT_TIMEOUT_SECONDS = 1800;
|
||||
|
||||
/** Maximum safe value for setTimeout (~24.8 days). */
|
||||
const MAX_SAFE_TIMEOUT_MS = 2_147_000_000;
|
||||
|
||||
/**
|
||||
* Resolve the effective timeout in milliseconds for a subagent run.
|
||||
*
|
||||
* - `undefined` / negative → default (1800 s)
|
||||
* - `0` → no timeout (MAX_SAFE_TIMEOUT_MS)
|
||||
* - positive number → use as-is, clamped to safe range
|
||||
*/
|
||||
export function resolveSubagentTimeoutMs(overrideSeconds?: number): number {
|
||||
if (overrideSeconds === undefined || overrideSeconds === null) {
|
||||
return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000;
|
||||
}
|
||||
if (overrideSeconds === 0) {
|
||||
return MAX_SAFE_TIMEOUT_MS; // "no timeout"
|
||||
}
|
||||
if (overrideSeconds < 0) {
|
||||
return DEFAULT_SUBAGENT_TIMEOUT_SECONDS * 1000;
|
||||
}
|
||||
return Math.min(Math.floor(overrideSeconds) * 1000, MAX_SAFE_TIMEOUT_MS);
|
||||
}
|
||||
|
|
@ -1,76 +0,0 @@
|
|||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SubagentRunRecord } from "./types.js";
|
||||
|
||||
const loadSubagentRunsMock = vi.fn<() => Map<string, SubagentRunRecord>>();
|
||||
const saveSubagentRunsMock = vi.fn();
|
||||
const readLatestAssistantReplyMock = vi.fn();
|
||||
const runCoalescedAnnounceFlowMock = vi.fn(() => false);
|
||||
const resolveSessionDirMock = vi.fn((sessionId: string) => `/tmp/${sessionId}`);
|
||||
const closeAgentMock = vi.fn();
|
||||
const getHubMock = vi.fn(() => ({ closeAgent: closeAgentMock }));
|
||||
const rmSyncMock = vi.fn();
|
||||
|
||||
vi.mock("./registry-store.js", () => ({
|
||||
loadSubagentRuns: loadSubagentRunsMock,
|
||||
loadSubagentGroups: vi.fn(() => new Map()),
|
||||
saveSubagentRuns: saveSubagentRunsMock,
|
||||
}));
|
||||
|
||||
vi.mock("./announce.js", () => ({
|
||||
readLatestAssistantReply: readLatestAssistantReplyMock,
|
||||
runCoalescedAnnounceFlow: runCoalescedAnnounceFlowMock,
|
||||
}));
|
||||
|
||||
vi.mock("../session/storage.js", () => ({
|
||||
resolveSessionDir: resolveSessionDirMock,
|
||||
}));
|
||||
|
||||
vi.mock("../../hub/hub-singleton.js", () => ({
|
||||
getHub: getHubMock,
|
||||
isHubInitialized: vi.fn(() => false),
|
||||
}));
|
||||
|
||||
vi.mock("node:fs", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("node:fs")>();
|
||||
return {
|
||||
...actual,
|
||||
rmSync: rmSyncMock,
|
||||
};
|
||||
});
|
||||
|
||||
describe("subagent registry recovery cleanup", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
vi.clearAllMocks();
|
||||
loadSubagentRunsMock.mockReturnValue(new Map());
|
||||
runCoalescedAnnounceFlowMock.mockReturnValue(false);
|
||||
});
|
||||
|
||||
it("deletes child session on recovery even when findings were already captured", async () => {
|
||||
const now = Date.now();
|
||||
const record: SubagentRunRecord = {
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "task",
|
||||
cleanup: "delete",
|
||||
createdAt: now - 1000,
|
||||
startedAt: now - 900,
|
||||
endedAt: now - 100,
|
||||
outcome: { status: "ok" },
|
||||
findings: "done",
|
||||
findingsCaptured: true,
|
||||
cleanupHandled: false,
|
||||
announced: false,
|
||||
};
|
||||
|
||||
loadSubagentRunsMock.mockReturnValue(new Map([["run-1", record]]));
|
||||
|
||||
const registry = await import("./registry.js");
|
||||
registry.initSubagentRegistry();
|
||||
|
||||
expect(readLatestAssistantReplyMock).not.toHaveBeenCalled();
|
||||
expect(resolveSessionDirMock).toHaveBeenCalledWith("child-1");
|
||||
expect(rmSyncMock).toHaveBeenCalledWith("/tmp/child-1", { recursive: true, force: true });
|
||||
});
|
||||
});
|
||||
|
|
@ -1,127 +0,0 @@
|
|||
import { describe, it, expect, beforeEach, afterEach } from "vitest";
|
||||
import { mkdtempSync, rmSync, existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import type { SubagentRunRecord } from "./types.js";
|
||||
|
||||
// We need to test the store functions with a custom directory.
|
||||
// Since the store uses DATA_DIR from shared, we test the serialization logic directly.
|
||||
|
||||
describe("registry-store serialization", () => {
|
||||
let tempDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = mkdtempSync(join(tmpdir(), "subagent-store-test-"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
rmSync(tempDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("round-trips SubagentRunRecord through JSON", () => {
|
||||
const record: SubagentRunRecord = {
|
||||
runId: "run-123",
|
||||
childSessionId: "child-456",
|
||||
requesterSessionId: "parent-789",
|
||||
task: "Analyze code quality",
|
||||
label: "Code Review",
|
||||
cleanup: "delete",
|
||||
createdAt: Date.now(),
|
||||
startedAt: Date.now(),
|
||||
endedAt: Date.now() + 30000,
|
||||
outcome: { status: "ok" },
|
||||
archiveAtMs: Date.now() + 3600000,
|
||||
cleanupHandled: true,
|
||||
cleanupCompletedAt: Date.now() + 30100,
|
||||
};
|
||||
|
||||
// Serialize and deserialize
|
||||
const json = JSON.stringify({ version: 1, runs: { "run-123": record } });
|
||||
const parsed = JSON.parse(json);
|
||||
|
||||
expect(parsed.version).toBe(1);
|
||||
expect(parsed.runs["run-123"]).toEqual(record);
|
||||
});
|
||||
|
||||
it("handles record with minimal fields", () => {
|
||||
const record: SubagentRunRecord = {
|
||||
runId: "run-minimal",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Do something",
|
||||
cleanup: "keep",
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
|
||||
const json = JSON.stringify({ version: 1, runs: { "run-minimal": record } });
|
||||
const parsed = JSON.parse(json);
|
||||
|
||||
expect(parsed.runs["run-minimal"].runId).toBe("run-minimal");
|
||||
expect(parsed.runs["run-minimal"].outcome).toBeUndefined();
|
||||
expect(parsed.runs["run-minimal"].label).toBeUndefined();
|
||||
});
|
||||
|
||||
it("handles error outcome serialization", () => {
|
||||
const record: SubagentRunRecord = {
|
||||
runId: "run-err",
|
||||
childSessionId: "child-err",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Fail",
|
||||
cleanup: "delete",
|
||||
createdAt: Date.now(),
|
||||
outcome: { status: "error", error: "Something went wrong" },
|
||||
};
|
||||
|
||||
const json = JSON.stringify(record);
|
||||
const parsed = JSON.parse(json) as SubagentRunRecord;
|
||||
|
||||
expect(parsed.outcome?.status).toBe("error");
|
||||
expect(parsed.outcome?.error).toBe("Something went wrong");
|
||||
});
|
||||
|
||||
it("round-trips coalescing fields (findings, findingsCaptured, announced)", () => {
|
||||
const record: SubagentRunRecord = {
|
||||
runId: "run-coalesce",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Coalesce test",
|
||||
cleanup: "delete",
|
||||
createdAt: Date.now(),
|
||||
endedAt: Date.now() + 5000,
|
||||
outcome: { status: "ok" },
|
||||
findings: "Found 3 issues in auth module.",
|
||||
findingsCaptured: true,
|
||||
announced: true,
|
||||
};
|
||||
|
||||
const json = JSON.stringify({ version: 1, runs: { "run-coalesce": record } });
|
||||
const parsed = JSON.parse(json);
|
||||
const restored = parsed.runs["run-coalesce"] as SubagentRunRecord;
|
||||
|
||||
expect(restored.findings).toBe("Found 3 issues in auth module.");
|
||||
expect(restored.findingsCaptured).toBe(true);
|
||||
expect(restored.announced).toBe(true);
|
||||
});
|
||||
|
||||
it("round-trips record with undefined coalescing fields", () => {
|
||||
const record: SubagentRunRecord = {
|
||||
runId: "run-old",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Old record",
|
||||
cleanup: "delete",
|
||||
createdAt: Date.now(),
|
||||
cleanupHandled: true,
|
||||
// No findings, findingsCaptured, or announced fields (old format)
|
||||
};
|
||||
|
||||
const json = JSON.stringify({ version: 1, runs: { "run-old": record } });
|
||||
const parsed = JSON.parse(json);
|
||||
const restored = parsed.runs["run-old"] as SubagentRunRecord;
|
||||
|
||||
expect(restored.findings).toBeUndefined();
|
||||
expect(restored.findingsCaptured).toBeUndefined();
|
||||
expect(restored.announced).toBeUndefined();
|
||||
expect(restored.cleanupHandled).toBe(true);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,80 +0,0 @@
|
|||
/**
|
||||
* Persistent storage for subagent run records.
|
||||
*
|
||||
* File: ~/.super-multica/subagents/runs.json
|
||||
*/
|
||||
|
||||
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { DATA_DIR } from "@multica/utils";
|
||||
import type { SubagentRunRecord, SubagentGroup } from "./types.js";
|
||||
|
||||
const SUBAGENTS_DIR = join(DATA_DIR, "subagents");
|
||||
const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
|
||||
|
||||
interface SubagentRunsStore {
|
||||
version: 1;
|
||||
runs: Record<string, SubagentRunRecord>;
|
||||
groups?: Record<string, SubagentGroup> | undefined;
|
||||
}
|
||||
|
||||
function ensureDir(): void {
|
||||
if (!existsSync(SUBAGENTS_DIR)) {
|
||||
mkdirSync(SUBAGENTS_DIR, { recursive: true });
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the path to the subagent store file (for testing) */
|
||||
export function getSubagentStorePath(): string {
|
||||
return RUNS_FILE;
|
||||
}
|
||||
|
||||
/** Load all persisted subagent runs */
|
||||
export function loadSubagentRuns(): Map<string, SubagentRunRecord> {
|
||||
if (!existsSync(RUNS_FILE)) return new Map();
|
||||
|
||||
try {
|
||||
const content = readFileSync(RUNS_FILE, "utf-8");
|
||||
const store = JSON.parse(content) as SubagentRunsStore;
|
||||
|
||||
if (store.version !== 1) {
|
||||
console.warn(`[SubagentStore] Unknown store version: ${store.version}, ignoring`);
|
||||
return new Map();
|
||||
}
|
||||
|
||||
return new Map(Object.entries(store.runs));
|
||||
} catch (err) {
|
||||
console.warn(`[SubagentStore] Failed to load runs:`, err);
|
||||
return new Map();
|
||||
}
|
||||
}
|
||||
|
||||
/** Load all persisted subagent groups */
|
||||
export function loadSubagentGroups(): Map<string, SubagentGroup> {
|
||||
if (!existsSync(RUNS_FILE)) return new Map();
|
||||
|
||||
try {
|
||||
const content = readFileSync(RUNS_FILE, "utf-8");
|
||||
const store = JSON.parse(content) as SubagentRunsStore;
|
||||
if (store.version !== 1 || !store.groups) return new Map();
|
||||
return new Map(Object.entries(store.groups));
|
||||
} catch {
|
||||
return new Map();
|
||||
}
|
||||
}
|
||||
|
||||
/** Save all subagent runs and groups to disk */
|
||||
export function saveSubagentRuns(
|
||||
runs: Map<string, SubagentRunRecord>,
|
||||
groups?: Map<string, SubagentGroup>,
|
||||
): void {
|
||||
ensureDir();
|
||||
|
||||
const store: SubagentRunsStore = {
|
||||
version: 1,
|
||||
runs: Object.fromEntries(runs),
|
||||
groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined,
|
||||
};
|
||||
|
||||
writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8");
|
||||
}
|
||||
|
|
@ -1,405 +0,0 @@
|
|||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import {
|
||||
registerSubagentRun,
|
||||
listSubagentRuns,
|
||||
getSubagentRun,
|
||||
releaseSubagentRun,
|
||||
resetSubagentRegistryForTests,
|
||||
shutdownSubagentRegistry,
|
||||
} from "./registry.js";
|
||||
import { resetLanesForTests } from "./command-queue.js";
|
||||
|
||||
// Note: These tests exercise the registry's in-memory state management.
|
||||
// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent).
|
||||
|
||||
/** Wait for the command queue to process enqueued tasks. */
|
||||
const flushQueue = () => new Promise<void>((r) => setTimeout(r, 0));
|
||||
|
||||
beforeEach(() => {
|
||||
resetSubagentRegistryForTests();
|
||||
resetLanesForTests();
|
||||
});
|
||||
|
||||
describe("subagent registry", () => {
|
||||
it("registers a run and retrieves it by ID", async () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Analyze code",
|
||||
label: "Code Analysis",
|
||||
});
|
||||
|
||||
expect(record.runId).toBe("run-1");
|
||||
expect(record.childSessionId).toBe("child-1");
|
||||
expect(record.requesterSessionId).toBe("parent-1");
|
||||
expect(record.task).toBe("Analyze code");
|
||||
expect(record.label).toBe("Code Analysis");
|
||||
expect(record.cleanup).toBe("delete"); // default
|
||||
expect(record.createdAt).toBeGreaterThan(0);
|
||||
|
||||
await flushQueue();
|
||||
expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent (async via queue)
|
||||
|
||||
const retrieved = getSubagentRun("run-1");
|
||||
expect(retrieved).toBe(record);
|
||||
});
|
||||
|
||||
it("lists runs filtered by requester session", () => {
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-A",
|
||||
task: "Task 1",
|
||||
});
|
||||
registerSubagentRun({
|
||||
runId: "run-2",
|
||||
childSessionId: "child-2",
|
||||
requesterSessionId: "parent-B",
|
||||
task: "Task 2",
|
||||
});
|
||||
registerSubagentRun({
|
||||
runId: "run-3",
|
||||
childSessionId: "child-3",
|
||||
requesterSessionId: "parent-A",
|
||||
task: "Task 3",
|
||||
});
|
||||
|
||||
const parentARuns = listSubagentRuns("parent-A");
|
||||
expect(parentARuns).toHaveLength(2);
|
||||
expect(parentARuns.map((r) => r.runId).sort()).toEqual(["run-1", "run-3"]);
|
||||
|
||||
const parentBRuns = listSubagentRuns("parent-B");
|
||||
expect(parentBRuns).toHaveLength(1);
|
||||
expect(parentBRuns[0]!.runId).toBe("run-2");
|
||||
|
||||
const emptyRuns = listSubagentRuns("parent-C");
|
||||
expect(emptyRuns).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("releases a run from the registry", () => {
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
});
|
||||
|
||||
expect(getSubagentRun("run-1")).toBeDefined();
|
||||
|
||||
const released = releaseSubagentRun("run-1");
|
||||
expect(released).toBe(true);
|
||||
expect(getSubagentRun("run-1")).toBeUndefined();
|
||||
|
||||
// Double release returns false
|
||||
const releasedAgain = releaseSubagentRun("run-1");
|
||||
expect(releasedAgain).toBe(false);
|
||||
});
|
||||
|
||||
it("applies custom cleanup value", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-keep",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Keep session",
|
||||
cleanup: "keep",
|
||||
});
|
||||
|
||||
expect(record.cleanup).toBe("keep");
|
||||
});
|
||||
|
||||
it("registers a run and ends it with error when Hub is not available", async () => {
|
||||
// Without Hub initialized, watchChildAgent detects missing Hub
|
||||
// and immediately ends the run with an error
|
||||
registerSubagentRun({
|
||||
runId: "run-no-hub",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Running task",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const record = getSubagentRun("run-no-hub");
|
||||
expect(record?.startedAt).toBeGreaterThan(0);
|
||||
expect(record?.endedAt).toBeGreaterThan(0);
|
||||
expect(record?.outcome?.status).toBe("error");
|
||||
expect(record?.outcome?.error).toContain("Hub not initialized");
|
||||
});
|
||||
|
||||
it("shutdownSubagentRegistry marks unfinished runs as ended", async () => {
|
||||
// Directly set up a record without going through watchChildAgent
|
||||
// to simulate a run that is still active
|
||||
registerSubagentRun({
|
||||
runId: "run-active",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Running task",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
// The above run already ended due to no Hub; reset its endedAt
|
||||
// to simulate a truly active run
|
||||
const record = getSubagentRun("run-active");
|
||||
if (record) {
|
||||
record.endedAt = undefined;
|
||||
record.outcome = undefined;
|
||||
}
|
||||
|
||||
shutdownSubagentRegistry();
|
||||
|
||||
const after = getSubagentRun("run-active");
|
||||
expect(after?.endedAt).toBeGreaterThan(0);
|
||||
expect(after?.outcome?.status).toBe("unknown");
|
||||
});
|
||||
|
||||
it("resetSubagentRegistryForTests clears all state", () => {
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
});
|
||||
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(1);
|
||||
|
||||
resetSubagentRegistryForTests();
|
||||
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(0);
|
||||
expect(getSubagentRun("run-1")).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — coalescing", () => {
|
||||
// Without Hub, watchChildAgent ends runs immediately with "Hub not initialized".
|
||||
// This allows us to test the coalescing state transitions.
|
||||
|
||||
it("captures findings when a run completes (no Hub)", async () => {
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task 1",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const record = getSubagentRun("run-1");
|
||||
// Run ended immediately due to no Hub
|
||||
expect(record?.endedAt).toBeGreaterThan(0);
|
||||
expect(record?.findingsCaptured).toBe(true);
|
||||
});
|
||||
|
||||
it("does not announce while sibling runs are still pending", async () => {
|
||||
// Register first run — ends immediately (no Hub)
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task 1",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const record1 = getSubagentRun("run-1");
|
||||
expect(record1?.findingsCaptured).toBe(true);
|
||||
|
||||
// Register second run — also ends immediately
|
||||
registerSubagentRun({
|
||||
runId: "run-2",
|
||||
childSessionId: "child-2",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task 2",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const record2 = getSubagentRun("run-2");
|
||||
expect(record2?.findingsCaptured).toBe(true);
|
||||
|
||||
// Both ended, but announce fails because no Hub for parent agent.
|
||||
// The key check: both records should have findings captured.
|
||||
// announced will be false because runCoalescedAnnounceFlow fails (no Hub).
|
||||
expect(record1?.announced).toBeUndefined();
|
||||
expect(record2?.announced).toBeUndefined();
|
||||
});
|
||||
|
||||
it("single run captures findings immediately", async () => {
|
||||
registerSubagentRun({
|
||||
runId: "run-solo",
|
||||
childSessionId: "child-solo",
|
||||
requesterSessionId: "parent-solo",
|
||||
task: "Solo task",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const record = getSubagentRun("run-solo");
|
||||
expect(record?.endedAt).toBeGreaterThan(0);
|
||||
expect(record?.findingsCaptured).toBe(true);
|
||||
expect(record?.outcome?.status).toBe("error");
|
||||
expect(record?.outcome?.error).toContain("Hub not initialized");
|
||||
});
|
||||
|
||||
it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", async () => {
|
||||
registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionId: "child-1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const record = getSubagentRun("run-1");
|
||||
if (record) {
|
||||
// Simulate: run ended but findings not yet captured
|
||||
record.endedAt = Date.now();
|
||||
record.outcome = { status: "ok" };
|
||||
record.findingsCaptured = undefined;
|
||||
}
|
||||
|
||||
shutdownSubagentRegistry();
|
||||
|
||||
expect(record?.findingsCaptured).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — silent announce mode", () => {
|
||||
// Note: In tests (no Hub), watchChildAgent completes synchronously within
|
||||
// registerSubagentRun(), so each run's lifecycle finishes before the next
|
||||
// registration call. Multi-run coalescing requires async child agents and
|
||||
// is validated in integration tests.
|
||||
|
||||
it("stores announce field on the record", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-ann",
|
||||
childSessionId: "child-ann",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
announce: "silent",
|
||||
});
|
||||
expect(record.announce).toBe("silent");
|
||||
});
|
||||
|
||||
it("defaults announce to undefined (immediate behavior)", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-def",
|
||||
childSessionId: "child-def",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
});
|
||||
expect(record.announce).toBeUndefined();
|
||||
});
|
||||
|
||||
it("silent runs are announced via runCoalescedAnnounceFlow", async () => {
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
registerSubagentRun({
|
||||
runId: "run-s1",
|
||||
childSessionId: "child-s1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Silent A",
|
||||
announce: "silent",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
// Silent run announced (via runCoalescedAnnounceFlow mock)
|
||||
const silentCalls = spy.mock.calls.filter(
|
||||
([reqId, records]) =>
|
||||
reqId === "parent-1" &&
|
||||
records.some((r: { announce?: string }) => r.announce === "silent"),
|
||||
);
|
||||
expect(silentCalls.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
const runS1 = getSubagentRun("run-s1");
|
||||
expect(runS1?.announced).toBe(true);
|
||||
expect(runS1?.announce).toBe("silent");
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
|
||||
it("immediate and silent runs are never mixed in the same announce call", async () => {
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
// Register immediate run, then silent run
|
||||
registerSubagentRun({
|
||||
runId: "run-imm",
|
||||
childSessionId: "child-imm",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Immediate task",
|
||||
});
|
||||
registerSubagentRun({
|
||||
runId: "run-s1",
|
||||
childSessionId: "child-s1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Silent task",
|
||||
announce: "silent",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const calls = spy.mock.calls.filter(
|
||||
([reqId]) => reqId === "parent-1",
|
||||
);
|
||||
|
||||
// Immediate and silent should never be in the same announce call
|
||||
const mixedCalls = calls.filter(([, records]) => {
|
||||
const hasImm = records.some((r: { announce?: string }) => r.announce !== "silent");
|
||||
const hasSilent = records.some((r: { announce?: string }) => r.announce === "silent");
|
||||
return hasImm && hasSilent;
|
||||
});
|
||||
expect(mixedCalls).toHaveLength(0);
|
||||
|
||||
// Both should be announced (in separate calls)
|
||||
expect(getSubagentRun("run-imm")?.announced).toBe(true);
|
||||
expect(getSubagentRun("run-s1")?.announced).toBe(true);
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — post-announce cleanup", () => {
|
||||
it("keeps runs in registry after successful announcement with archiveAtMs", async () => {
|
||||
// Mock runCoalescedAnnounceFlow to succeed
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
// Register two runs for the same parent — both end immediately (no Hub)
|
||||
registerSubagentRun({
|
||||
runId: "run-a",
|
||||
childSessionId: "child-a",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task A",
|
||||
});
|
||||
registerSubagentRun({
|
||||
runId: "run-b",
|
||||
childSessionId: "child-b",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task B",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
// Both runs should have been announced but kept in registry with archiveAtMs
|
||||
expect(spy).toHaveBeenCalled();
|
||||
|
||||
const runA = getSubagentRun("run-a");
|
||||
const runB = getSubagentRun("run-b");
|
||||
expect(runA).toBeDefined();
|
||||
expect(runB).toBeDefined();
|
||||
expect(runA!.announced).toBe(true);
|
||||
expect(runB!.announced).toBe(true);
|
||||
expect(runA!.archiveAtMs).toBeGreaterThan(Date.now());
|
||||
expect(runB!.archiveAtMs).toBeGreaterThan(Date.now());
|
||||
|
||||
// Records are still queryable
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(2);
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
|
@ -1,524 +0,0 @@
|
|||
/**
|
||||
* Subagent registry — in-memory tracking + lifecycle management.
|
||||
*
|
||||
* Tracks all active subagent runs, persists state to disk,
|
||||
* watches for child completion, and triggers announce flow.
|
||||
*/
|
||||
|
||||
import { getHub, isHubInitialized } from "../../hub/hub-singleton.js";
|
||||
import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js";
|
||||
import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js";
|
||||
import type {
|
||||
RegisterSubagentRunParams,
|
||||
SubagentRunRecord,
|
||||
SubagentGroup,
|
||||
} from "./types.js";
|
||||
import { resolveSessionDir } from "../session/storage.js";
|
||||
import { rmSync } from "node:fs";
|
||||
import { enqueueInLane, setLaneConcurrency } from "./command-queue.js";
|
||||
import { SubagentLane, DEFAULT_SUBAGENT_MAX_CONCURRENT, resolveSubagentTimeoutMs } from "./lanes.js";
|
||||
|
||||
/** Default archive retention: 60 minutes after completion */
|
||||
const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000;
|
||||
|
||||
/** Archive sweep interval: 60 seconds */
|
||||
const SWEEP_INTERVAL_MS = 60 * 1000;
|
||||
|
||||
// ============================================================================
|
||||
// Module-level state
|
||||
// ============================================================================
|
||||
|
||||
const subagentRuns = new Map<string, SubagentRunRecord>();
|
||||
const subagentGroups = new Map<string, SubagentGroup>();
|
||||
let sweepTimer: ReturnType<typeof setInterval> | undefined;
|
||||
const resumedRequesters = new Set<string>();
|
||||
|
||||
// ============================================================================
|
||||
// Public API
|
||||
// ============================================================================
|
||||
|
||||
/** Initialize registry from persisted state. Call once at startup. */
|
||||
export function initSubagentRegistry(): void {
|
||||
setLaneConcurrency(SubagentLane.Subagent, DEFAULT_SUBAGENT_MAX_CONCURRENT);
|
||||
|
||||
const persisted = loadSubagentRuns();
|
||||
for (const [runId, record] of persisted) {
|
||||
subagentRuns.set(runId, record);
|
||||
|
||||
// Backward compat: old records with cleanupHandled but no announced field
|
||||
if (record.cleanupHandled && record.announced === undefined) {
|
||||
record.announced = true;
|
||||
record.findingsCaptured = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Restore groups
|
||||
const persistedGroups = loadSubagentGroups();
|
||||
for (const [groupId, group] of persistedGroups) {
|
||||
subagentGroups.set(groupId, group);
|
||||
}
|
||||
|
||||
// Process incomplete runs
|
||||
const affectedRequesters = new Set<string>();
|
||||
|
||||
for (const record of subagentRuns.values()) {
|
||||
if (record.announced && record.cleanupHandled) continue; // Already fully done
|
||||
|
||||
if (!record.endedAt) {
|
||||
// Child was running when process crashed — mark as ended/unknown
|
||||
record.endedAt = Date.now();
|
||||
record.outcome = { status: "unknown" };
|
||||
}
|
||||
|
||||
if (!record.findingsCaptured) {
|
||||
captureFindings(record);
|
||||
}
|
||||
|
||||
// Recovery cleanup must be independent from findings capture:
|
||||
// the process may crash after captureFindings() persisted but before deletion.
|
||||
if (record.cleanup === "delete" && !record.cleanupHandled) {
|
||||
deleteChildSession(record.childSessionId);
|
||||
}
|
||||
|
||||
affectedRequesters.add(record.requesterSessionId);
|
||||
}
|
||||
|
||||
persist();
|
||||
|
||||
// For each affected requester, check if coalesced announcement is needed
|
||||
for (const requesterId of affectedRequesters) {
|
||||
if (!resumedRequesters.has(requesterId)) {
|
||||
resumedRequesters.add(requesterId);
|
||||
checkAndAnnounce(requesterId);
|
||||
}
|
||||
}
|
||||
|
||||
if (subagentRuns.size > 0) {
|
||||
startSweeper();
|
||||
console.log(`[SubagentRegistry] Loaded ${subagentRuns.size} persisted run(s)`);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Group management
|
||||
// ============================================================================
|
||||
|
||||
/** Create a new subagent group. Returns the group record. */
|
||||
export function createSubagentGroup(params: {
|
||||
groupId: string;
|
||||
requesterSessionId: string;
|
||||
label?: string;
|
||||
next?: string;
|
||||
}): SubagentGroup {
|
||||
const group: SubagentGroup = {
|
||||
groupId: params.groupId,
|
||||
requesterSessionId: params.requesterSessionId,
|
||||
label: params.label,
|
||||
next: params.next,
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
subagentGroups.set(params.groupId, group);
|
||||
persist();
|
||||
return group;
|
||||
}
|
||||
|
||||
/** Get a group by ID. */
|
||||
export function getSubagentGroup(groupId: string): SubagentGroup | undefined {
|
||||
return subagentGroups.get(groupId);
|
||||
}
|
||||
|
||||
/** List all runs belonging to a group. */
|
||||
export function listGroupRuns(groupId: string): SubagentRunRecord[] {
|
||||
const result: SubagentRunRecord[] = [];
|
||||
for (const record of subagentRuns.values()) {
|
||||
if (record.groupId === groupId) {
|
||||
result.push(record);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Register a new subagent run and start tracking its lifecycle. */
|
||||
export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord {
|
||||
const {
|
||||
runId,
|
||||
childSessionId,
|
||||
requesterSessionId,
|
||||
task,
|
||||
label,
|
||||
cleanup = "delete",
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
groupId,
|
||||
start,
|
||||
} = params;
|
||||
|
||||
const record: SubagentRunRecord = {
|
||||
runId,
|
||||
childSessionId,
|
||||
requesterSessionId,
|
||||
task,
|
||||
label,
|
||||
cleanup,
|
||||
announce,
|
||||
groupId,
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
|
||||
subagentRuns.set(runId, record);
|
||||
persist();
|
||||
startSweeper();
|
||||
|
||||
// Enqueue in the subagent lane — the start callback and watchChildAgent
|
||||
// only execute once a concurrency slot is available.
|
||||
void enqueueInLane(SubagentLane.Subagent, async () => {
|
||||
console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`);
|
||||
start?.();
|
||||
console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`);
|
||||
return watchChildAgent(record, timeoutSeconds);
|
||||
});
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/** List all active runs for a given requester session. */
|
||||
export function listSubagentRuns(requesterSessionId: string): SubagentRunRecord[] {
|
||||
const result: SubagentRunRecord[] = [];
|
||||
for (const record of subagentRuns.values()) {
|
||||
if (record.requesterSessionId === requesterSessionId) {
|
||||
result.push(record);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Remove a run from the registry. */
|
||||
export function releaseSubagentRun(runId: string): boolean {
|
||||
const deleted = subagentRuns.delete(runId);
|
||||
if (deleted) {
|
||||
persist();
|
||||
if (subagentRuns.size === 0) {
|
||||
stopSweeper();
|
||||
}
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/** Get a run by ID. */
|
||||
export function getSubagentRun(runId: string): SubagentRunRecord | undefined {
|
||||
return subagentRuns.get(runId);
|
||||
}
|
||||
|
||||
/** Mark all active (non-ended) runs as ended with "unknown" status. Called during Hub shutdown. */
|
||||
export function shutdownSubagentRegistry(): void {
|
||||
const now = Date.now();
|
||||
let updated = 0;
|
||||
|
||||
for (const record of subagentRuns.values()) {
|
||||
if (!record.endedAt) {
|
||||
record.endedAt = now;
|
||||
record.outcome = { status: "unknown" };
|
||||
updated++;
|
||||
}
|
||||
|
||||
// Opportunistically capture findings for ended-but-uncaptured runs
|
||||
if (record.endedAt && !record.findingsCaptured) {
|
||||
captureFindings(record);
|
||||
updated++;
|
||||
}
|
||||
}
|
||||
|
||||
if (updated > 0) {
|
||||
persist();
|
||||
console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`);
|
||||
}
|
||||
|
||||
stopSweeper();
|
||||
}
|
||||
|
||||
/** Reset all state (for testing). */
|
||||
export function resetSubagentRegistryForTests(): void {
|
||||
subagentRuns.clear();
|
||||
subagentGroups.clear();
|
||||
resumedRequesters.clear();
|
||||
stopSweeper();
|
||||
}
|
||||
|
||||
/** Seed a run record directly (for testing). Bypasses persistence and side effects. */
|
||||
export function seedSubagentRunForTests(record: SubagentRunRecord): void {
|
||||
subagentRuns.set(record.runId, record);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Lifecycle watching
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Watch a child agent for completion.
|
||||
* Returns a promise that resolves when the child finishes (or errors/times out),
|
||||
* keeping the command-queue lane slot occupied until then.
|
||||
*/
|
||||
function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Promise<void> {
|
||||
const { childSessionId } = record;
|
||||
|
||||
// Mark as started
|
||||
record.startedAt = Date.now();
|
||||
persist();
|
||||
|
||||
const timeoutMs = resolveSubagentTimeoutMs(timeoutSeconds);
|
||||
|
||||
return new Promise<void>((resolveSlot) => {
|
||||
const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => {
|
||||
if (record.endedAt) return; // Already finalized
|
||||
if (timeoutTimer) clearTimeout(timeoutTimer);
|
||||
record.endedAt = Date.now();
|
||||
record.outcome = outcome;
|
||||
persist();
|
||||
handleRunCompletion(record);
|
||||
resolveSlot(); // Release the queue slot
|
||||
};
|
||||
|
||||
// Always set a timeout (default 30 min, 0 = ~24 days via resolveSubagentTimeoutMs)
|
||||
const timeoutTimer = setTimeout(() => {
|
||||
cleanup({ status: "timeout" });
|
||||
|
||||
// Try to close the child agent
|
||||
try {
|
||||
const hub = getHub();
|
||||
hub.closeAgent(childSessionId);
|
||||
} catch {
|
||||
// Hub may not be available
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
// Get child agent reference (Hub may not be available in tests)
|
||||
if (!isHubInitialized()) {
|
||||
cleanup({ status: "error", error: "Hub not initialized" });
|
||||
return;
|
||||
}
|
||||
|
||||
const hub = getHub();
|
||||
const childAgent = hub.getAgent(childSessionId);
|
||||
if (!childAgent) {
|
||||
cleanup({ status: "error", error: "Child agent not found" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for the child agent's task queue to drain (task completion),
|
||||
// then trigger announce flow. Uses waitForIdle() instead of consuming
|
||||
// the stream (which would conflict with Hub.consumeAgent).
|
||||
console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`);
|
||||
childAgent.waitForIdle().then(
|
||||
() => {
|
||||
const runtime = Date.now() - (record.startedAt ?? 0);
|
||||
const runError = childAgent.lastRunError;
|
||||
if (runError) {
|
||||
console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`);
|
||||
cleanup({ status: "error", error: runError });
|
||||
} else {
|
||||
console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`);
|
||||
cleanup({ status: "ok" });
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err);
|
||||
cleanup({
|
||||
status: "error",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
|
||||
childAgent.onClose(() => {
|
||||
cleanup({ status: record.outcome?.status ?? "unknown" });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Cleanup + Announce (two-phase: capture findings, then coalesced announce)
|
||||
// ============================================================================
|
||||
|
||||
/** Phase 1: Capture child's findings before session deletion. */
|
||||
function captureFindings(record: SubagentRunRecord): void {
|
||||
try {
|
||||
const findings = readLatestAssistantReply(record.childSessionId);
|
||||
record.findings = findings ?? undefined;
|
||||
} catch {
|
||||
record.findings = "(failed to read findings)";
|
||||
}
|
||||
record.findingsCaptured = true;
|
||||
persist();
|
||||
}
|
||||
|
||||
/**
|
||||
* Phase 2: Announce completed-but-unannounced runs.
|
||||
*
|
||||
* Three announcement paths:
|
||||
* 1. Grouped runs — wait for all runs in the group to complete, then announce
|
||||
* together with the group's `next` continuation prompt (if any).
|
||||
* 2. Ungrouped silent runs — legacy behavior: wait for ALL silent runs from
|
||||
* the same requester to complete, then announce together.
|
||||
* 3. Ungrouped immediate runs — announce per-completion (default).
|
||||
*/
|
||||
function checkAndAnnounce(requesterSessionId: string): void {
|
||||
const allRuns = listSubagentRuns(requesterSessionId);
|
||||
|
||||
// ── 1. Grouped runs: announce by group when all members complete ──
|
||||
const groupIds = new Set<string>();
|
||||
for (const r of allRuns) {
|
||||
if (r.groupId && !r.announced) groupIds.add(r.groupId);
|
||||
}
|
||||
|
||||
for (const groupId of groupIds) {
|
||||
const groupRuns = allRuns.filter(r => r.groupId === groupId);
|
||||
const unannounced = groupRuns.filter(r => !r.announced);
|
||||
const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured);
|
||||
|
||||
if (ready.length > 0 && ready.length === unannounced.length) {
|
||||
const group = subagentGroups.get(groupId);
|
||||
announceRuns(requesterSessionId, ready, group?.next);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 2. Ungrouped runs: original immediate/silent logic ──
|
||||
const ungrouped = allRuns.filter(r => !r.groupId);
|
||||
|
||||
// Immediate: announce per-completion
|
||||
const immediateReady = ungrouped.filter(
|
||||
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent",
|
||||
);
|
||||
if (immediateReady.length > 0) {
|
||||
announceRuns(requesterSessionId, immediateReady);
|
||||
}
|
||||
|
||||
// Silent: announce only when ALL ungrouped silent runs are done
|
||||
const silentRuns = ungrouped.filter(r => r.announce === "silent");
|
||||
const unannouncedSilent = silentRuns.filter(r => !r.announced);
|
||||
const silentReady = unannouncedSilent.filter(
|
||||
r => r.endedAt !== undefined && r.findingsCaptured,
|
||||
);
|
||||
|
||||
if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) {
|
||||
announceRuns(requesterSessionId, silentReady);
|
||||
}
|
||||
}
|
||||
|
||||
/** Announce a batch of completed runs and mark them as announced. */
|
||||
function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void {
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next);
|
||||
|
||||
if (announced) {
|
||||
for (const r of runs) {
|
||||
r.announced = true;
|
||||
r.cleanupHandled = true;
|
||||
// Keep records for querying via sessions_list; let sweeper archive later
|
||||
r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
|
||||
}
|
||||
persist();
|
||||
} else {
|
||||
// Allow retry — mark cleanupHandled false so initSubagentRegistry() retries
|
||||
for (const r of runs) {
|
||||
r.cleanupHandled = false;
|
||||
}
|
||||
persist();
|
||||
console.warn(
|
||||
`[SubagentRegistry] Announce failed for requester ${requesterSessionId}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/** Entry point: called when a child completes. */
|
||||
function handleRunCompletion(record: SubagentRunRecord): void {
|
||||
// Phase 1: capture findings (before session deletion)
|
||||
if (!record.findingsCaptured) {
|
||||
captureFindings(record);
|
||||
|
||||
// Session cleanup (safe now that findings are persisted)
|
||||
if (record.cleanup === "delete") {
|
||||
deleteChildSession(record.childSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: coalesced announce check
|
||||
checkAndAnnounce(record.requesterSessionId);
|
||||
}
|
||||
|
||||
function deleteChildSession(sessionId: string): void {
|
||||
try {
|
||||
const sessionDir = resolveSessionDir(sessionId);
|
||||
rmSync(sessionDir, { recursive: true, force: true });
|
||||
console.log(`[SubagentRegistry] Deleted child session: ${sessionId}`);
|
||||
} catch (err) {
|
||||
console.warn(`[SubagentRegistry] Failed to delete child session ${sessionId}:`, err);
|
||||
}
|
||||
|
||||
// Also close the agent in Hub
|
||||
try {
|
||||
const hub = getHub();
|
||||
hub.closeAgent(sessionId);
|
||||
} catch {
|
||||
// Hub may not be available
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Archive sweeper
|
||||
// ============================================================================
|
||||
|
||||
function startSweeper(): void {
|
||||
if (sweepTimer) return;
|
||||
sweepTimer = setInterval(sweep, SWEEP_INTERVAL_MS);
|
||||
// Don't prevent process exit
|
||||
if (sweepTimer.unref) sweepTimer.unref();
|
||||
}
|
||||
|
||||
function stopSweeper(): void {
|
||||
if (sweepTimer) {
|
||||
clearInterval(sweepTimer);
|
||||
sweepTimer = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function sweep(): void {
|
||||
const now = Date.now();
|
||||
let removed = 0;
|
||||
|
||||
for (const [runId, record] of subagentRuns) {
|
||||
if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) {
|
||||
subagentRuns.delete(runId);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up groups whose runs have all been archived
|
||||
for (const [groupId] of subagentGroups) {
|
||||
const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId);
|
||||
if (!hasActiveRuns) {
|
||||
subagentGroups.delete(groupId);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
|
||||
if (removed > 0) {
|
||||
persist();
|
||||
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`);
|
||||
}
|
||||
|
||||
if (subagentRuns.size === 0) {
|
||||
stopSweeper();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Persistence helper
|
||||
// ============================================================================
|
||||
|
||||
function persist(): void {
|
||||
try {
|
||||
saveSubagentRuns(subagentRuns, subagentGroups);
|
||||
} catch (err) {
|
||||
console.error(`[SubagentRegistry] Failed to persist runs:`, err);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,118 +0,0 @@
|
|||
/**
|
||||
* Subagent orchestration types.
|
||||
*
|
||||
* Models the lifecycle of spawned child agents:
|
||||
* created → started → ended → cleanup
|
||||
*/
|
||||
|
||||
/** Final outcome of a subagent run */
|
||||
export type SubagentRunOutcome = {
|
||||
status: "ok" | "error" | "timeout" | "unknown";
|
||||
error?: string | undefined;
|
||||
};
|
||||
|
||||
/**
|
||||
* A logical group of subagent runs that are tracked together.
|
||||
* Groups enable "collect all, then act" workflows:
|
||||
* all runs in a group must complete before the combined results
|
||||
* (plus an optional `next` continuation) are announced to the parent.
|
||||
*/
|
||||
export type SubagentGroup = {
|
||||
/** Unique group identifier (UUIDv7) */
|
||||
groupId: string;
|
||||
/** Session ID of the parent (requester) agent */
|
||||
requesterSessionId: string;
|
||||
/** Optional human-readable label for the group */
|
||||
label?: string | undefined;
|
||||
/** Continuation prompt executed after all runs in the group complete.
|
||||
* Injected into the announcement so the parent agent acts on the combined findings. */
|
||||
next?: string | undefined;
|
||||
/** Timestamp when the group was created */
|
||||
createdAt: number;
|
||||
};
|
||||
|
||||
/** Persistent record tracking a single subagent run */
|
||||
export type SubagentRunRecord = {
|
||||
/** Unique run identifier (UUIDv7) */
|
||||
runId: string;
|
||||
/** Session ID of the child agent */
|
||||
childSessionId: string;
|
||||
/** Session ID of the parent (requester) agent */
|
||||
requesterSessionId: string;
|
||||
/** The task description / prompt given to the child */
|
||||
task: string;
|
||||
/** Optional human-readable label */
|
||||
label?: string | undefined;
|
||||
/** Session cleanup strategy after completion */
|
||||
cleanup: "delete" | "keep";
|
||||
/** Timestamp when the run was created */
|
||||
createdAt: number;
|
||||
/** Timestamp when the child agent started execution */
|
||||
startedAt?: number | undefined;
|
||||
/** Timestamp when the child agent finished */
|
||||
endedAt?: number | undefined;
|
||||
/** Final status of the run */
|
||||
outcome?: SubagentRunOutcome | undefined;
|
||||
/** Scheduled auto-archive time (ms since epoch) */
|
||||
archiveAtMs?: number | undefined;
|
||||
/** Whether the cleanup/announce flow has been initiated */
|
||||
cleanupHandled?: boolean | undefined;
|
||||
/** Timestamp when cleanup completed */
|
||||
cleanupCompletedAt?: number | undefined;
|
||||
/** Captured findings from the child session's last assistant reply */
|
||||
findings?: string | undefined;
|
||||
/** Whether findings have been captured (safe to delete session after this) */
|
||||
findingsCaptured?: boolean | undefined;
|
||||
/** Whether the coalesced announcement has been sent to parent */
|
||||
announced?: boolean | undefined;
|
||||
/** Announcement mode: "immediate" (default) announces per-completion,
|
||||
* "silent" defers until all silent runs from the same requester complete. */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
/** Group ID this run belongs to (if any). Runs in a group are announced
|
||||
* together when all complete, regardless of the `announce` field. */
|
||||
groupId?: string | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for registering a new subagent run */
|
||||
export type RegisterSubagentRunParams = {
|
||||
runId: string;
|
||||
childSessionId: string;
|
||||
requesterSessionId: string;
|
||||
task: string;
|
||||
label?: string | undefined;
|
||||
cleanup?: "delete" | "keep" | undefined;
|
||||
timeoutSeconds?: number | undefined;
|
||||
/** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */
|
||||
start?: (() => void) | undefined;
|
||||
/** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
/** Group ID to join. Runs in a group are announced together when all complete. */
|
||||
groupId?: string | undefined;
|
||||
/** Continuation prompt for the group. Only used on group creation (first spawn).
|
||||
* After all runs in the group complete, this prompt is included in the announcement
|
||||
* so the parent agent can act on the combined findings (e.g. summarize, write PDF). */
|
||||
next?: string | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for the announce flow */
|
||||
export type SubagentAnnounceParams = {
|
||||
runId: string;
|
||||
childSessionId: string;
|
||||
requesterSessionId: string;
|
||||
task: string;
|
||||
label?: string | undefined;
|
||||
cleanup: "delete" | "keep";
|
||||
outcome?: SubagentRunOutcome | undefined;
|
||||
startedAt?: number | undefined;
|
||||
endedAt?: number | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for building the subagent system prompt */
|
||||
export type SubagentSystemPromptParams = {
|
||||
requesterSessionId: string;
|
||||
childSessionId: string;
|
||||
label?: string | undefined;
|
||||
task: string;
|
||||
/** Tool names available to the subagent (for tooling summary in system prompt) */
|
||||
tools?: string[] | undefined;
|
||||
};
|
||||
|
|
@ -1,8 +1,8 @@
|
|||
/**
|
||||
* Global Hub singleton for cross-module access.
|
||||
*
|
||||
* Used by subagent tools and announce flow to interact with the Hub
|
||||
* without threading references through the entire call chain.
|
||||
* Used by modules like cron execution without threading Hub references
|
||||
* through the entire call chain.
|
||||
*/
|
||||
|
||||
import type { Hub } from "./hub.js";
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ import { AsyncAgent } from "../agent/async-agent.js";
|
|||
import type { AgentOptions } from "../agent/types.js";
|
||||
import { getHubId } from "./hub-identity.js";
|
||||
import { setHub } from "./hub-singleton.js";
|
||||
import { initSubagentRegistry, shutdownSubagentRegistry } from "../agent/subagent/index.js";
|
||||
import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js";
|
||||
import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js";
|
||||
import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js";
|
||||
|
|
@ -144,12 +143,9 @@ export class Hub {
|
|||
});
|
||||
this.rpc.register("resolveExecApproval", createResolveExecApprovalHandler(this.approvalManager));
|
||||
|
||||
// Register as global singleton for cross-module access (subagent tools, announce flow)
|
||||
// Register as global singleton for cross-module access.
|
||||
setHub(this);
|
||||
|
||||
// Restore subagent registry from persistent state
|
||||
initSubagentRegistry();
|
||||
|
||||
// Initialize and start cron service
|
||||
this.initCronService();
|
||||
this.initHeartbeatService();
|
||||
|
|
@ -800,9 +796,6 @@ export class Hub {
|
|||
this.heartbeatUnsubscribe = null;
|
||||
this.heartbeatListeners.clear();
|
||||
|
||||
// Finalize subagent registry before closing agents
|
||||
shutdownSubagentRegistry();
|
||||
|
||||
for (const [id, agent] of this.agents) {
|
||||
agent.close();
|
||||
this.agents.delete(id);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue