Merge pull request #126 from multica-ai/feat/subagent-announce-delivery
feat(subagent): two-tier announce delivery, provider inheritance, error propagation
This commit is contained in:
commit
16cd5d0aaf
17 changed files with 1173 additions and 49 deletions
|
|
@ -272,6 +272,18 @@ describe("AsyncAgent internal flow", () => {
|
|||
agent.close();
|
||||
});
|
||||
|
||||
it("does not persist assistant summary when result text is a NO_REPLY variant", async () => {
|
||||
runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY.", thinking: undefined, error: undefined });
|
||||
const agent = new AsyncAgent();
|
||||
|
||||
agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true });
|
||||
await agent.waitForIdle();
|
||||
|
||||
expect(persistAssistantSummaryMock).not.toHaveBeenCalled();
|
||||
|
||||
agent.close();
|
||||
});
|
||||
|
||||
it("does not persist assistant summary when result text is empty", async () => {
|
||||
runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined });
|
||||
const agent = new AsyncAgent();
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import { Channel } from "./channel.js";
|
|||
import type { AgentOptions, Message } from "./types.js";
|
||||
import type { MulticaEvent } from "./events.js";
|
||||
import { injectMessageTimestamp } from "./message-timestamp.js";
|
||||
import { isSilentReplyText } from "./tokens.js";
|
||||
|
||||
const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
|
||||
|
||||
|
|
@ -31,6 +32,7 @@ export class AsyncAgent {
|
|||
private pendingWrites = 0;
|
||||
private closeCallbacks: Array<() => void> = [];
|
||||
private forwardInternalAssistant = false;
|
||||
private _lastRunError: string | undefined;
|
||||
readonly sessionId: string;
|
||||
|
||||
constructor(options?: AgentOptions) {
|
||||
|
|
@ -64,13 +66,19 @@ export class AsyncAgent {
|
|||
|
||||
this.queue = this.queue
|
||||
.then(async () => {
|
||||
if (this._closed) return;
|
||||
if (this._closed) {
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] write() skipped — agent closed`);
|
||||
return;
|
||||
}
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`);
|
||||
const result = await this.agent.run(message, { displayPrompt: content });
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`);
|
||||
// Flush pending session writes so waitForIdle() callers
|
||||
// can safely read session data from disk.
|
||||
await this.agent.flushSession();
|
||||
// Normal text is delivered via message_end event; only handle errors here
|
||||
if (result.error) {
|
||||
this._lastRunError = result.error;
|
||||
console.error(`[AsyncAgent] Agent run error: ${result.error}`);
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
|
||||
// Only emit agent_error for HTTP 401 from the LLM provider so the
|
||||
|
|
@ -83,6 +91,7 @@ export class AsyncAgent {
|
|||
})
|
||||
.catch((err) => {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this._lastRunError = message;
|
||||
console.error(`[AsyncAgent] Agent run exception: ${message}`);
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
|
||||
// Only emit agent_error for HTTP 401 from the LLM provider so the
|
||||
|
|
@ -120,8 +129,11 @@ export class AsyncAgent {
|
|||
// Internal run errors are for diagnostics only; do not leak to user stream.
|
||||
console.error(`[AsyncAgent] Internal run error: ${result.error}`);
|
||||
}
|
||||
// Stop forwarding BEFORE persist to avoid double-emitting the same
|
||||
// assistant message (once from runInternal streaming, once from appendMessage).
|
||||
this.forwardInternalAssistant = prevForward;
|
||||
// Persist the LLM summary so it remains in parent context for future turns
|
||||
if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") {
|
||||
if (persistResponse && result.text?.trim() && !isSilentReplyText(result.text)) {
|
||||
this.agent.persistAssistantSummary(result.text.trim());
|
||||
await this.agent.flushSession();
|
||||
}
|
||||
|
|
@ -164,6 +176,43 @@ export class AsyncAgent {
|
|||
return this.queue;
|
||||
}
|
||||
|
||||
/** Error message from the last run, if it failed. */
|
||||
get lastRunError(): string | undefined {
|
||||
return this._lastRunError;
|
||||
}
|
||||
|
||||
/** Whether the agent is currently executing a run (normal or internal). */
|
||||
get isRunning(): boolean {
|
||||
return this.agent.isRunning;
|
||||
}
|
||||
|
||||
/** Whether the underlying LLM is currently streaming a response. */
|
||||
get isStreaming(): boolean {
|
||||
return this.agent.isStreaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Steer the agent mid-run. Bypasses the serial queue and injects a message
|
||||
* directly into the PiAgentCore steering queue. The message is delivered
|
||||
* after the current tool execution completes, skipping remaining tool calls.
|
||||
*/
|
||||
steer(content: string): void {
|
||||
this.agent.steer(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a follow-up message for after the current run finishes.
|
||||
* Delivered only when the agent has no more tool calls or steering messages.
|
||||
*/
|
||||
followUp(content: string): void {
|
||||
this.agent.followUp(content);
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore has queued steer/followUp messages. */
|
||||
hasQueuedMessages(): boolean {
|
||||
return this.agent.hasQueuedMessages();
|
||||
}
|
||||
|
||||
private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean {
|
||||
if (!this.agent.isInternalRun) return true;
|
||||
if (!this.forwardInternalAssistant) return false;
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ export class Agent {
|
|||
|
||||
// Internal run state
|
||||
private _internalRun = false;
|
||||
private _isRunning = false;
|
||||
private _runMutex: Promise<void> = Promise.resolve();
|
||||
private currentUserDisplayPrompt: string | undefined;
|
||||
|
||||
|
|
@ -304,8 +305,8 @@ export class Agent {
|
|||
const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools);
|
||||
const profileDir = this.profile?.getProfileDir();
|
||||
this.toolsOptions = mergedToolsConfig
|
||||
? { ...options, tools: mergedToolsConfig, profileDir }
|
||||
: { ...options, profileDir };
|
||||
? { ...options, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider }
|
||||
: { ...options, profileDir, provider: this.resolvedProvider };
|
||||
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
if (this.debug) {
|
||||
|
|
@ -427,6 +428,7 @@ export class Agent {
|
|||
this.refreshAuthState();
|
||||
this.output.state.lastAssistantText = "";
|
||||
this.currentUserDisplayPrompt = options?.displayPrompt;
|
||||
this._isRunning = true;
|
||||
|
||||
try {
|
||||
// Early validation: check API key before calling PiAgentCore.prompt(),
|
||||
|
|
@ -489,6 +491,7 @@ export class Agent {
|
|||
: undefined;
|
||||
return { text: this.output.state.lastAssistantText, thinking, error: this.agent.state.error };
|
||||
} finally {
|
||||
this._isRunning = false;
|
||||
this.currentUserDisplayPrompt = undefined;
|
||||
}
|
||||
}
|
||||
|
|
@ -664,6 +667,40 @@ export class Agent {
|
|||
return this._internalRun;
|
||||
}
|
||||
|
||||
/** Whether a run (normal or internal) is currently executing inside _run(). */
|
||||
get isRunning(): boolean {
|
||||
return this._isRunning;
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore is currently streaming an LLM response. */
|
||||
get isStreaming(): boolean {
|
||||
return this.agent.state.isStreaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Delivered after current tool execution, skipping remaining tool calls.
|
||||
* Safe to call from any context (does not require the run mutex).
|
||||
*/
|
||||
steer(content: string): void {
|
||||
const msg: UserMessage = { role: "user", content, timestamp: Date.now() };
|
||||
this.agent.steer(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a follow-up message for after the current run finishes.
|
||||
* Delivered only when the agent has no more tool calls or steering messages.
|
||||
*/
|
||||
followUp(content: string): void {
|
||||
const msg: UserMessage = { role: "user", content, timestamp: Date.now() };
|
||||
this.agent.followUp(msg);
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore has queued steer/followUp messages. */
|
||||
hasQueuedMessages(): boolean {
|
||||
return this.agent.hasQueuedMessages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a synthetic assistant message into both in-memory state and session JSONL.
|
||||
* Used after an internal run to keep the LLM summary visible in future turns
|
||||
|
|
@ -895,6 +932,13 @@ export class Agent {
|
|||
|
||||
// Update internal state
|
||||
this.resolvedProvider = providerId;
|
||||
// Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider
|
||||
this.toolsOptions = { ...this.toolsOptions, provider: providerId };
|
||||
|
||||
// Reload tools so sessions_spawn picks up the new provider in its closure.
|
||||
// Without this, the existing tool instance still captures the old provider.
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
this.agent.setTools(tools);
|
||||
|
||||
// Update session metadata (save original providerId, not alias-resolved)
|
||||
this.session.saveMeta({
|
||||
|
|
@ -906,7 +950,7 @@ export class Agent {
|
|||
});
|
||||
|
||||
// Rebuild system prompt so runtime info reflects the new provider/model
|
||||
const toolNames = (this.agent.state.tools ?? []).map((t: { name: string }) => t.name);
|
||||
const toolNames = tools.map((t) => t.name);
|
||||
const systemPrompt = this.rebuildSystemPrompt(toolNames);
|
||||
if (systemPrompt) {
|
||||
this.agent.setSystemPrompt(systemPrompt);
|
||||
|
|
|
|||
172
packages/core/src/agent/subagent/README.md
Normal file
172
packages/core/src/agent/subagent/README.md
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
# Subagent System
|
||||
|
||||
The subagent system allows a parent agent to spawn isolated child agents that run tasks in parallel and report results back automatically.
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Parent Agent (runner.ts) │
|
||||
│ │
|
||||
│ tools: sessions_spawn, sessions_list │
|
||||
│ state: resolvedProvider, toolsOptions │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ sessions_spawn(task, label, timeoutSeconds)
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Spawn Flow (sessions-spawn.ts) │
|
||||
│ │
|
||||
│ 1. Build subagent system prompt (announce.ts) │
|
||||
│ 2. hub.createSubagent(childSessionId, { provider, model }) │
|
||||
│ 3. registerSubagentRun({ start: () => childAgent.write(task) }) │
|
||||
│ 4. Return { status: "accepted", runId, childSessionId } │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Concurrency Queue (command-queue.ts) │
|
||||
│ │
|
||||
│ Lane: "subagent" — max 10 concurrent (configurable) │
|
||||
│ Queued runs wait for a slot before start() is called │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│ slot acquired
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Child Agent Execution │
|
||||
│ │
|
||||
│ ┌───────────────────────────────────────────────────────────────┐ │
|
||||
│ │ AsyncAgent (async-agent.ts) │ │
|
||||
│ │ - Isolated session with restricted tools (isSubagent=true) │ │
|
||||
│ │ - Inherits parent's LLM provider │ │
|
||||
│ │ - System prompt: task focus + error reporting rules │ │
|
||||
│ │ - Tracks lastRunError for error propagation │ │
|
||||
│ └───────────────────────────────────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌───────────────────────────────────────────────────────────────┐ │
|
||||
│ │ watchChildAgent (registry.ts) │ │
|
||||
│ │ - Sets startedAt, starts timeout timer │ │
|
||||
│ │ - waitForIdle() — waits for child's task queue to drain │ │
|
||||
│ │ - onClose() — handles explicit close (timeout kill, etc.) │ │
|
||||
│ └───────────────────────────────────────────────────────────────┘ │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ child completes / errors / times out
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Completion Handling (registry.ts) │
|
||||
│ │
|
||||
│ handleRunCompletion(record) │
|
||||
│ │ │
|
||||
│ ├─ Phase 1: captureFindings() │
|
||||
│ │ - Read last assistant reply from child session JSONL │
|
||||
│ │ - Falls back to last toolResult if no assistant text │
|
||||
│ │ - Persists findings to record before session deletion │
|
||||
│ │ │
|
||||
│ ├─ Session Cleanup │
|
||||
│ │ - cleanup="delete": rm child session dir + hub.closeAgent() │
|
||||
│ │ - cleanup="keep": preserve for audit │
|
||||
│ │ │
|
||||
│ └─ Phase 2: checkAndAnnounce(requesterSessionId) │
|
||||
│ - Finds all unannounced, completed runs with findings │
|
||||
│ - Calls runCoalescedAnnounceFlow() │
|
||||
│ - Marks records: announced=true, archiveAtMs=now+60min │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Announcement Delivery (announce.ts) │
|
||||
│ │
|
||||
│ runCoalescedAnnounceFlow(requesterSessionId, records) │
|
||||
│ │ │
|
||||
│ ├─ Format message: formatCoalescedAnnouncementMessage() │
|
||||
│ │ - Single record: task name, status, findings, stats │
|
||||
│ │ - Multiple records: combined report with all findings │
|
||||
│ │ │
|
||||
│ ├─ Two-tier delivery: │
|
||||
│ │ │
|
||||
│ │ Tier 1: BUSY (parent running or has pending writes) │
|
||||
│ │ └─ enqueueAnnounce() → announce-queue.ts │
|
||||
│ │ - Debounce 1s to batch nearby completions │
|
||||
│ │ - Drain via writeInternal() when parent finishes │
|
||||
│ │ │
|
||||
│ │ Tier 2: IDLE (parent not running) │
|
||||
│ │ └─ sendAnnounceDirect() │
|
||||
│ │ - writeInternal(msg, { forwardAssistant, persistResponse })│
|
||||
│ │ │
|
||||
│ └─ All delivery uses writeInternal() (marks as internal: true) │
|
||||
│ → Prevents announcement from showing as user bubble in UI │
|
||||
│ → LLM processes findings and responds naturally to user │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Record Lifecycle (registry.ts) │
|
||||
│ │
|
||||
│ created → startedAt → endedAt → findingsCaptured → announced │
|
||||
│ │
|
||||
│ After announcement: │
|
||||
│ - Record kept with archiveAtMs = now + 60 min │
|
||||
│ - sessions_list can still query records during this window │
|
||||
│ - Sweeper runs every 60s, removes expired records │
|
||||
│ - When all records removed, sweeper stops │
|
||||
└─────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Key Files
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `sessions-spawn.ts` | Tool: spawns a child agent with task, label, timeout, provider |
|
||||
| `sessions-list.ts` | Tool: lists subagent runs and their status |
|
||||
| `registry.ts` | Lifecycle management: register, watch, capture, announce, archive |
|
||||
| `announce.ts` | System prompt builder, findings reader, message formatter, delivery |
|
||||
| `announce-queue.ts` | Debounced queue for batching announcements when parent is busy |
|
||||
| `command-queue.ts` | Concurrency limiter for subagent lane slots |
|
||||
| `lanes.ts` | Lane config: max concurrency (10), default timeout (600s) |
|
||||
| `types.ts` | Shared types: SubagentRunRecord, SubagentRunOutcome, etc. |
|
||||
| `registry-store.ts` | Persistence: save/load runs to disk for crash recovery |
|
||||
|
||||
## Provider Inheritance
|
||||
|
||||
Subagents inherit the parent's resolved LLM provider:
|
||||
|
||||
```
|
||||
runner.ts (resolvedProvider)
|
||||
→ toolsOptions.provider
|
||||
→ tools.ts (CreateToolsOptions.provider)
|
||||
→ sessions-spawn.ts (options.provider)
|
||||
→ hub.createSubagent({ provider })
|
||||
```
|
||||
|
||||
When the user switches providers via UI (`setProvider()`), `toolsOptions.provider` is updated in sync so future spawns use the new provider.
|
||||
|
||||
## Error Propagation
|
||||
|
||||
```
|
||||
Child tool error (e.g., API 401)
|
||||
→ Subagent LLM sees error, includes in final message (system prompt rule)
|
||||
→ captureFindings() reads final message
|
||||
→ Announcement includes error in findings
|
||||
→ Parent LLM sees error and can inform user
|
||||
|
||||
Child run error (e.g., missing API key for provider)
|
||||
→ AsyncAgent._lastRunError set
|
||||
→ registry.ts checks childAgent.lastRunError after waitForIdle()
|
||||
→ outcome = { status: "error", error: "No API key configured..." }
|
||||
→ Announcement: "task failed: No API key configured..."
|
||||
```
|
||||
|
||||
## Timeout Behavior
|
||||
|
||||
Default: 600s (10 min). System prompt guides the parent LLM:
|
||||
- Simple tasks: 600s (default)
|
||||
- Moderate tasks: 900-1200s (15-20 min)
|
||||
- Complex tasks: 1200-1800s (20-30 min)
|
||||
|
||||
On timeout:
|
||||
1. Timeout timer fires in `watchChildAgent()`
|
||||
2. `cleanup({ status: "timeout" })` is called
|
||||
3. Child agent is closed via `hub.closeAgent()`
|
||||
4. Findings are captured from whatever the child wrote so far
|
||||
5. Announcement reports "timed out" with partial findings
|
||||
203
packages/core/src/agent/subagent/announce-queue.test.ts
Normal file
203
packages/core/src/agent/subagent/announce-queue.test.ts
Normal file
|
|
@ -0,0 +1,203 @@
|
|||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
enqueueAnnounce,
|
||||
resetAnnounceQueuesForTests,
|
||||
getAnnounceQueueDepth,
|
||||
type AnnounceQueueItem,
|
||||
type AnnounceQueueSettings,
|
||||
} from "./announce-queue.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetAnnounceQueuesForTests();
|
||||
});
|
||||
|
||||
function makeItem(overrides?: Partial<AnnounceQueueItem>): AnnounceQueueItem {
|
||||
return {
|
||||
prompt: "test prompt",
|
||||
summaryLine: "test summary",
|
||||
enqueuedAt: Date.now(),
|
||||
requesterSessionId: "session-1",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
const FAST_SETTINGS: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
describe("announce queue", () => {
|
||||
it("enqueues an item and drains via send callback", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
// Wait for async drain
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]!.prompt).toBe("test prompt");
|
||||
});
|
||||
|
||||
it("batches items in collect mode", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
const collectSettings: AnnounceQueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 1" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 2" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 3" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
// Collect mode batches all into one send
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]!.prompt).toContain("prompt 1");
|
||||
expect(sent[0]!.prompt).toContain("prompt 2");
|
||||
expect(sent[0]!.prompt).toContain("prompt 3");
|
||||
expect(sent[0]!.prompt).toContain("3 queued announce(s)");
|
||||
});
|
||||
|
||||
it("sends items individually in followup mode", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt A" }),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt B" }),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(2);
|
||||
expect(sent[0]!.prompt).toBe("prompt A");
|
||||
expect(sent[1]!.prompt).toBe("prompt B");
|
||||
});
|
||||
|
||||
it("respects cap with 'new' drop policy (rejects new items)", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => {
|
||||
// Slow send to keep items in queue
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
sent.push(item);
|
||||
};
|
||||
|
||||
const cappedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 2,
|
||||
dropPolicy: "new",
|
||||
};
|
||||
|
||||
const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
|
||||
const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
|
||||
const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
|
||||
|
||||
expect(r1).toBe(true);
|
||||
expect(r2).toBe(true);
|
||||
expect(r3).toBe(false); // Rejected — cap reached
|
||||
});
|
||||
|
||||
it("respects cap with 'old' drop policy (drops oldest)", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => {
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
sent.push(item);
|
||||
};
|
||||
|
||||
const cappedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 2,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
|
||||
|
||||
// Queue should have items 2 and 3 (oldest was dropped)
|
||||
expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2);
|
||||
});
|
||||
|
||||
it("cleans up queue after drain completes", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(getAnnounceQueueDepth("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("debounces before draining", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
const debouncedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 100,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: debouncedSettings,
|
||||
send,
|
||||
});
|
||||
|
||||
// Should not have sent yet (debounce)
|
||||
await new Promise((r) => setTimeout(r, 30));
|
||||
expect(sent).toHaveLength(0);
|
||||
|
||||
// Wait for debounce to complete
|
||||
await new Promise((r) => setTimeout(r, 150));
|
||||
expect(sent).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
315
packages/core/src/agent/subagent/announce-queue.ts
Normal file
315
packages/core/src/agent/subagent/announce-queue.ts
Normal file
|
|
@ -0,0 +1,315 @@
|
|||
/**
|
||||
* Announce queue for subagent result delivery.
|
||||
*
|
||||
* Handles queuing and batching of subagent announcements when the parent
|
||||
* agent is busy. Supports debounce, cap, drop policy, and collect mode.
|
||||
*
|
||||
* Ported from OpenClaw (MIT license), adapted for Super Multica.
|
||||
*/
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
export type AnnounceQueueMode =
|
||||
/** Try steer, no queue fallback */
|
||||
| "steer"
|
||||
/** Try steer, fall back to queue */
|
||||
| "steer-backlog"
|
||||
/** Queue and send items individually */
|
||||
| "followup"
|
||||
/** Queue and batch all items into one combined prompt */
|
||||
| "collect";
|
||||
|
||||
export type AnnounceDropPolicy =
|
||||
/** Drop oldest items when cap reached */
|
||||
| "old"
|
||||
/** Drop newest items when cap reached */
|
||||
| "new"
|
||||
/** Summarize dropped items */
|
||||
| "summarize";
|
||||
|
||||
export type AnnounceQueueItem = {
|
||||
prompt: string;
|
||||
summaryLine?: string;
|
||||
enqueuedAt: number;
|
||||
requesterSessionId: string;
|
||||
};
|
||||
|
||||
export type AnnounceQueueSettings = {
|
||||
mode: AnnounceQueueMode;
|
||||
debounceMs?: number;
|
||||
cap?: number;
|
||||
dropPolicy?: AnnounceDropPolicy;
|
||||
};
|
||||
|
||||
type AnnounceQueueState = {
|
||||
items: AnnounceQueueItem[];
|
||||
draining: boolean;
|
||||
lastEnqueuedAt: number;
|
||||
mode: AnnounceQueueMode;
|
||||
debounceMs: number;
|
||||
cap: number;
|
||||
dropPolicy: AnnounceDropPolicy;
|
||||
droppedCount: number;
|
||||
summaryLines: string[];
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Defaults
|
||||
// ============================================================================
|
||||
|
||||
const DEFAULT_DEBOUNCE_MS = 1000;
|
||||
const DEFAULT_CAP = 20;
|
||||
const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize";
|
||||
|
||||
export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = {
|
||||
mode: "steer-backlog",
|
||||
debounceMs: DEFAULT_DEBOUNCE_MS,
|
||||
cap: DEFAULT_CAP,
|
||||
dropPolicy: DEFAULT_DROP_POLICY,
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Module state
|
||||
// ============================================================================
|
||||
|
||||
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
|
||||
|
||||
// ============================================================================
|
||||
// Public API
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Enqueue an announcement for delivery. Returns true if enqueued,
|
||||
* false if dropped (cap + "new" drop policy).
|
||||
*/
|
||||
export function enqueueAnnounce(params: {
|
||||
key: string;
|
||||
item: AnnounceQueueItem;
|
||||
settings: AnnounceQueueSettings;
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
}): boolean {
|
||||
const queue = getOrCreateQueue(params.key, params.settings, params.send);
|
||||
queue.lastEnqueuedAt = Date.now();
|
||||
|
||||
const shouldEnqueue = applyDropPolicy(queue, params.item);
|
||||
if (!shouldEnqueue) {
|
||||
if (queue.dropPolicy === "new") {
|
||||
scheduleAnnounceDrain(params.key);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
queue.items.push(params.item);
|
||||
scheduleAnnounceDrain(params.key);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Reset all queues (for testing). */
|
||||
export function resetAnnounceQueuesForTests(): void {
|
||||
ANNOUNCE_QUEUES.clear();
|
||||
}
|
||||
|
||||
/** Get the current queue depth for a key (for testing/diagnostics). */
|
||||
export function getAnnounceQueueDepth(key: string): number {
|
||||
return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Queue management
|
||||
// ============================================================================
|
||||
|
||||
function getOrCreateQueue(
|
||||
key: string,
|
||||
settings: AnnounceQueueSettings,
|
||||
send: (item: AnnounceQueueItem) => Promise<void>,
|
||||
): AnnounceQueueState {
|
||||
const existing = ANNOUNCE_QUEUES.get(key);
|
||||
if (existing) {
|
||||
existing.mode = settings.mode;
|
||||
if (typeof settings.debounceMs === "number") {
|
||||
existing.debounceMs = Math.max(0, settings.debounceMs);
|
||||
}
|
||||
if (typeof settings.cap === "number" && settings.cap > 0) {
|
||||
existing.cap = Math.floor(settings.cap);
|
||||
}
|
||||
if (settings.dropPolicy) {
|
||||
existing.dropPolicy = settings.dropPolicy;
|
||||
}
|
||||
existing.send = send;
|
||||
return existing;
|
||||
}
|
||||
|
||||
const created: AnnounceQueueState = {
|
||||
items: [],
|
||||
draining: false,
|
||||
lastEnqueuedAt: 0,
|
||||
mode: settings.mode,
|
||||
debounceMs:
|
||||
typeof settings.debounceMs === "number"
|
||||
? Math.max(0, settings.debounceMs)
|
||||
: DEFAULT_DEBOUNCE_MS,
|
||||
cap:
|
||||
typeof settings.cap === "number" && settings.cap > 0
|
||||
? Math.floor(settings.cap)
|
||||
: DEFAULT_CAP,
|
||||
dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY,
|
||||
droppedCount: 0,
|
||||
summaryLines: [],
|
||||
send,
|
||||
};
|
||||
ANNOUNCE_QUEUES.set(key, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Drop policy
|
||||
// ============================================================================
|
||||
|
||||
function applyDropPolicy(
|
||||
queue: AnnounceQueueState,
|
||||
item: AnnounceQueueItem,
|
||||
): boolean {
|
||||
if (queue.items.length < queue.cap) {
|
||||
return true;
|
||||
}
|
||||
|
||||
switch (queue.dropPolicy) {
|
||||
case "new":
|
||||
// Reject the incoming item
|
||||
return false;
|
||||
|
||||
case "old": {
|
||||
// Drop the oldest item to make room
|
||||
const dropped = queue.items.shift();
|
||||
if (dropped) {
|
||||
queue.droppedCount++;
|
||||
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
|
||||
queue.summaryLines.push(summary);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
case "summarize": {
|
||||
// Drop the oldest item but keep a summary
|
||||
const dropped = queue.items.shift();
|
||||
if (dropped) {
|
||||
queue.droppedCount++;
|
||||
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
|
||||
queue.summaryLines.push(summary);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Drain scheduling
|
||||
// ============================================================================
|
||||
|
||||
function scheduleAnnounceDrain(key: string): void {
|
||||
const queue = ANNOUNCE_QUEUES.get(key);
|
||||
if (!queue || queue.draining) return;
|
||||
|
||||
queue.draining = true;
|
||||
void (async () => {
|
||||
try {
|
||||
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
||||
await waitForDebounce(queue);
|
||||
|
||||
if (queue.mode === "collect") {
|
||||
// Batch all items into one combined prompt
|
||||
const items = queue.items.splice(0, queue.items.length);
|
||||
const summary = buildDropSummary(queue);
|
||||
const prompt = buildCollectPrompt(items, summary);
|
||||
const last = items.at(-1);
|
||||
if (!last) break;
|
||||
await queue.send({ ...last, prompt });
|
||||
continue;
|
||||
}
|
||||
|
||||
// followup / steer-backlog: send items individually
|
||||
const summary = buildDropSummary(queue);
|
||||
if (summary) {
|
||||
const next = queue.items.shift();
|
||||
if (!next) break;
|
||||
await queue.send({ ...next, prompt: summary });
|
||||
continue;
|
||||
}
|
||||
|
||||
const next = queue.items.shift();
|
||||
if (!next) break;
|
||||
await queue.send(next);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`);
|
||||
} finally {
|
||||
queue.draining = false;
|
||||
if (queue.items.length === 0 && queue.droppedCount === 0) {
|
||||
ANNOUNCE_QUEUES.delete(key);
|
||||
} else {
|
||||
scheduleAnnounceDrain(key);
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Helpers
|
||||
// ============================================================================
|
||||
|
||||
function waitForDebounce(queue: AnnounceQueueState): Promise<void> {
|
||||
const elapsed = Date.now() - queue.lastEnqueuedAt;
|
||||
const remaining = Math.max(0, queue.debounceMs - elapsed);
|
||||
if (remaining <= 0) return Promise.resolve();
|
||||
return new Promise((resolve) => setTimeout(resolve, remaining));
|
||||
}
|
||||
|
||||
function buildDropSummary(queue: AnnounceQueueState): string | undefined {
|
||||
if (queue.droppedCount === 0) return undefined;
|
||||
|
||||
const parts: string[] = [
|
||||
`[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`,
|
||||
];
|
||||
if (queue.summaryLines.length > 0) {
|
||||
parts.push("");
|
||||
for (const line of queue.summaryLines) {
|
||||
parts.push(`- ${line}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset counters
|
||||
queue.droppedCount = 0;
|
||||
queue.summaryLines = [];
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
||||
function buildCollectPrompt(
|
||||
items: AnnounceQueueItem[],
|
||||
dropSummary: string | undefined,
|
||||
): string {
|
||||
const parts: string[] = [
|
||||
`[${items.length} queued announce(s) while agent was busy]`,
|
||||
"",
|
||||
];
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
parts.push(`---`);
|
||||
parts.push(`Queued #${i + 1}`);
|
||||
parts.push(items[i]!.prompt);
|
||||
parts.push("");
|
||||
}
|
||||
|
||||
if (dropSummary) {
|
||||
parts.push(dropSummary);
|
||||
parts.push("");
|
||||
}
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@ import type {
|
|||
SubagentRunRecord,
|
||||
SubagentSystemPromptParams,
|
||||
} from "./types.js";
|
||||
import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js";
|
||||
|
||||
/**
|
||||
* Build the system prompt injected into a subagent session.
|
||||
|
|
@ -275,7 +276,15 @@ export function formatCoalescedAnnouncementMessage(
|
|||
|
||||
/**
|
||||
* Run the coalesced announcement flow for all completed runs of a requester.
|
||||
* Formats a single combined message and delivers it to the parent agent.
|
||||
* Uses two-tier delivery:
|
||||
* 1. Queue — if parent is busy (running or has pending writes), queue for
|
||||
* later drain via writeInternal (with debounce to batch nearby completions)
|
||||
* 2. Direct — if parent is idle, send immediately via writeInternal
|
||||
*
|
||||
* All delivery uses writeInternal() which marks the announcement prompt as
|
||||
* `internal: true` (hidden from UI). The assistant's summary response is
|
||||
* forwarded to the real-time stream (`forwardAssistant: true`) so the user
|
||||
* sees the result, and persisted to JSONL for future session loads.
|
||||
*/
|
||||
export function runCoalescedAnnounceFlow(
|
||||
requesterSessionId: string,
|
||||
|
|
@ -293,7 +302,28 @@ export function runCoalescedAnnounceFlow(
|
|||
return false;
|
||||
}
|
||||
|
||||
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
|
||||
// Tier 1: BUSY — parent is running or has pending writes
|
||||
// Queue the announcement for delivery via writeInternal() after the parent
|
||||
// finishes its current work. We do NOT use steer() (cancels unrelated tool
|
||||
// calls) or followUp() (doesn't mark entries as internal, polluting the UI).
|
||||
if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) {
|
||||
enqueueAnnounce({
|
||||
key: requesterSessionId,
|
||||
item: {
|
||||
prompt: message,
|
||||
summaryLine: `${records.length} subagent(s) completed`,
|
||||
enqueuedAt: Date.now(),
|
||||
requesterSessionId,
|
||||
},
|
||||
settings: DEFAULT_ANNOUNCE_SETTINGS,
|
||||
send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt),
|
||||
});
|
||||
console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Tier 2: IDLE — parent is idle, send directly via writeInternal
|
||||
sendAnnounceDirect(requesterSessionId, message);
|
||||
return true;
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err);
|
||||
|
|
@ -301,6 +331,26 @@ export function runCoalescedAnnounceFlow(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send announcement directly to parent via writeInternal.
|
||||
* Used as Tier 3 (idle) and as the queue drain callback.
|
||||
*/
|
||||
function sendAnnounceDirect(requesterSessionId: string, message: string): void {
|
||||
try {
|
||||
const hub = getHub();
|
||||
const parentAgent = hub.getAgent(requesterSessionId);
|
||||
if (!parentAgent || parentAgent.closed) {
|
||||
console.warn(
|
||||
`[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the full subagent announcement flow:
|
||||
* 1. Read child's last assistant reply
|
||||
|
|
|
|||
|
|
@ -266,8 +266,105 @@ describe("subagent registry — coalescing", () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — silent announce mode", () => {
|
||||
// Note: In tests (no Hub), watchChildAgent completes synchronously within
|
||||
// registerSubagentRun(), so each run's lifecycle finishes before the next
|
||||
// registration call. Multi-run coalescing requires async child agents and
|
||||
// is validated in integration tests.
|
||||
|
||||
it("stores announce field on the record", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-ann",
|
||||
childSessionId: "child-ann",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
announce: "silent",
|
||||
});
|
||||
expect(record.announce).toBe("silent");
|
||||
});
|
||||
|
||||
it("defaults announce to undefined (immediate behavior)", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-def",
|
||||
childSessionId: "child-def",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
});
|
||||
expect(record.announce).toBeUndefined();
|
||||
});
|
||||
|
||||
it("silent runs are announced via runCoalescedAnnounceFlow", async () => {
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
registerSubagentRun({
|
||||
runId: "run-s1",
|
||||
childSessionId: "child-s1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Silent A",
|
||||
announce: "silent",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
// Silent run announced (via runCoalescedAnnounceFlow mock)
|
||||
const silentCalls = spy.mock.calls.filter(
|
||||
([reqId, records]) =>
|
||||
reqId === "parent-1" &&
|
||||
records.some((r: { announce?: string }) => r.announce === "silent"),
|
||||
);
|
||||
expect(silentCalls.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
const runS1 = getSubagentRun("run-s1");
|
||||
expect(runS1?.announced).toBe(true);
|
||||
expect(runS1?.announce).toBe("silent");
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
|
||||
it("immediate and silent runs are never mixed in the same announce call", async () => {
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
// Register immediate run, then silent run
|
||||
registerSubagentRun({
|
||||
runId: "run-imm",
|
||||
childSessionId: "child-imm",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Immediate task",
|
||||
});
|
||||
registerSubagentRun({
|
||||
runId: "run-s1",
|
||||
childSessionId: "child-s1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Silent task",
|
||||
announce: "silent",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const calls = spy.mock.calls.filter(
|
||||
([reqId]) => reqId === "parent-1",
|
||||
);
|
||||
|
||||
// Immediate and silent should never be in the same announce call
|
||||
const mixedCalls = calls.filter(([, records]) => {
|
||||
const hasImm = records.some((r: { announce?: string }) => r.announce !== "silent");
|
||||
const hasSilent = records.some((r: { announce?: string }) => r.announce === "silent");
|
||||
return hasImm && hasSilent;
|
||||
});
|
||||
expect(mixedCalls).toHaveLength(0);
|
||||
|
||||
// Both should be announced (in separate calls)
|
||||
expect(getSubagentRun("run-imm")?.announced).toBe(true);
|
||||
expect(getSubagentRun("run-s1")?.announced).toBe(true);
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — post-announce cleanup", () => {
|
||||
it("removes runs from registry after successful announcement", async () => {
|
||||
it("keeps runs in registry after successful announcement with archiveAtMs", async () => {
|
||||
// Mock runCoalescedAnnounceFlow to succeed
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
|
@ -288,11 +385,20 @@ describe("subagent registry — post-announce cleanup", () => {
|
|||
|
||||
await flushQueue();
|
||||
|
||||
// Both runs should have been announced and removed from registry
|
||||
// Both runs should have been announced but kept in registry with archiveAtMs
|
||||
expect(spy).toHaveBeenCalled();
|
||||
expect(getSubagentRun("run-a")).toBeUndefined();
|
||||
expect(getSubagentRun("run-b")).toBeUndefined();
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(0);
|
||||
|
||||
const runA = getSubagentRun("run-a");
|
||||
const runB = getSubagentRun("run-b");
|
||||
expect(runA).toBeDefined();
|
||||
expect(runB).toBeDefined();
|
||||
expect(runA!.announced).toBe(true);
|
||||
expect(runB!.announced).toBe(true);
|
||||
expect(runA!.archiveAtMs).toBeGreaterThan(Date.now());
|
||||
expect(runB!.archiveAtMs).toBeGreaterThan(Date.now());
|
||||
|
||||
// Records are still queryable
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(2);
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
|
|
|
|||
|
|
@ -101,6 +101,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
label,
|
||||
cleanup = "delete",
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
start,
|
||||
} = params;
|
||||
|
||||
|
|
@ -111,6 +112,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
task,
|
||||
label,
|
||||
cleanup,
|
||||
announce,
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
|
||||
|
|
@ -121,7 +123,9 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
// Enqueue in the subagent lane — the start callback and watchChildAgent
|
||||
// only execute once a concurrency slot is available.
|
||||
void enqueueInLane(SubagentLane.Subagent, async () => {
|
||||
console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`);
|
||||
start?.();
|
||||
console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`);
|
||||
return watchChildAgent(record, timeoutSeconds);
|
||||
});
|
||||
|
||||
|
|
@ -248,12 +252,26 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Pr
|
|||
// Wait for the child agent's task queue to drain (task completion),
|
||||
// then trigger announce flow. Uses waitForIdle() instead of consuming
|
||||
// the stream (which would conflict with Hub.consumeAgent).
|
||||
console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`);
|
||||
childAgent.waitForIdle().then(
|
||||
() => cleanup({ status: "ok" }),
|
||||
(err) => cleanup({
|
||||
status: "error",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
}),
|
||||
() => {
|
||||
const runtime = Date.now() - (record.startedAt ?? 0);
|
||||
const runError = childAgent.lastRunError;
|
||||
if (runError) {
|
||||
console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`);
|
||||
cleanup({ status: "error", error: runError });
|
||||
} else {
|
||||
console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`);
|
||||
cleanup({ status: "ok" });
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err);
|
||||
cleanup({
|
||||
status: "error",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
|
||||
|
|
@ -280,43 +298,57 @@ function captureFindings(record: SubagentRunRecord): void {
|
|||
}
|
||||
|
||||
/**
|
||||
* Phase 2: Check if all unannounced runs for this requester have completed.
|
||||
* If so, send a single coalesced announcement to the parent.
|
||||
* Phase 2: Announce completed-but-unannounced runs.
|
||||
*
|
||||
* Runs with announce="silent" are held back until ALL silent runs from the
|
||||
* same requester have completed. All other runs (immediate / undefined) are
|
||||
* announced per-completion as before.
|
||||
*/
|
||||
function checkAndAnnounce(requesterSessionId: string): void {
|
||||
const allRuns = listSubagentRuns(requesterSessionId);
|
||||
|
||||
// Only consider unannounced runs
|
||||
const pending = allRuns.filter(r => !r.announced);
|
||||
if (pending.length === 0) return;
|
||||
// ── Immediate runs: announce per-completion (default behavior) ──
|
||||
const immediateReady = allRuns.filter(
|
||||
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent",
|
||||
);
|
||||
if (immediateReady.length > 0) {
|
||||
announceGroup(requesterSessionId, immediateReady);
|
||||
}
|
||||
|
||||
// Are all unannounced runs done?
|
||||
const allDone = pending.every(r => r.endedAt !== undefined);
|
||||
if (!allDone) return;
|
||||
// ── Silent runs: announce only when ALL silent runs are done ──
|
||||
const silentRuns = allRuns.filter(r => r.announce === "silent");
|
||||
const unannouncedSilent = silentRuns.filter(r => !r.announced);
|
||||
const silentReady = unannouncedSilent.filter(
|
||||
r => r.endedAt !== undefined && r.findingsCaptured,
|
||||
);
|
||||
|
||||
// Have all had findings captured?
|
||||
const allCaptured = pending.every(r => r.findingsCaptured);
|
||||
if (!allCaptured) return;
|
||||
// All unannounced silent runs must be ready (ended + findings captured)
|
||||
if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) {
|
||||
announceGroup(requesterSessionId, silentReady);
|
||||
}
|
||||
}
|
||||
|
||||
// All done — send coalesced announcement
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, pending);
|
||||
/** Announce a group of runs and mark them as announced. */
|
||||
function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void {
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs);
|
||||
|
||||
if (announced) {
|
||||
for (const r of pending) {
|
||||
for (const r of runs) {
|
||||
r.announced = true;
|
||||
r.cleanupHandled = true;
|
||||
// Remove from registry immediately — findings already delivered to parent
|
||||
subagentRuns.delete(r.runId);
|
||||
// Keep records for querying via sessions_list; let sweeper archive later
|
||||
r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
|
||||
}
|
||||
persist();
|
||||
if (subagentRuns.size === 0) {
|
||||
stopSweeper();
|
||||
}
|
||||
} else {
|
||||
// Allow retry — mark cleanupHandled false so initSubagentRegistry() retries
|
||||
for (const r of runs) {
|
||||
r.cleanupHandled = false;
|
||||
}
|
||||
persist();
|
||||
console.warn(
|
||||
`[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`,
|
||||
`[SubagentRegistry] Announce failed for requester ${requesterSessionId}`,
|
||||
);
|
||||
// Leave announced=false so initSubagentRegistry() can retry on restart
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,6 +45,9 @@ export type SubagentRunRecord = {
|
|||
findingsCaptured?: boolean | undefined;
|
||||
/** Whether the coalesced announcement has been sent to parent */
|
||||
announced?: boolean | undefined;
|
||||
/** Announcement mode: "immediate" (default) announces per-completion,
|
||||
* "silent" defers until all silent runs from the same requester complete. */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for registering a new subagent run */
|
||||
|
|
@ -58,6 +61,8 @@ export type RegisterSubagentRunParams = {
|
|||
timeoutSeconds?: number | undefined;
|
||||
/** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */
|
||||
start?: (() => void) | undefined;
|
||||
/** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for the announce flow */
|
||||
|
|
|
|||
|
|
@ -262,9 +262,24 @@ export function buildConditionalToolSections(
|
|||
lines.push(
|
||||
"## Sub-Agents",
|
||||
"If a task is complex or long-running, spawn a sub-agent. It will do the work and report back when done.",
|
||||
"You can check on running sub-agents at any time.",
|
||||
"IMPORTANT: After spawning sub-agents, do NOT immediately check on them with sessions_list. " +
|
||||
"Results are delivered directly into your context automatically when the sub-agent finishes. " +
|
||||
"Continue with other tasks or finish your turn and wait for the results to arrive.",
|
||||
"You may use sessions_list to check on sub-agents only if a long time has passed or the user explicitly asks about their status.",
|
||||
"Sub-agents cannot spawn nested sub-agents.",
|
||||
"",
|
||||
"### Timeout Guidelines",
|
||||
"Set timeoutSeconds generously — a sub-agent that times out loses all its work.",
|
||||
"- Simple tasks (search, read, summarize): 600 (10 min, the default)",
|
||||
"- Moderate tasks (multi-step research, file downloads + analysis): 900–1200 (15–20 min)",
|
||||
"- Complex tasks (code generation, PDF creation, multi-file operations): 1200–1800 (20–30 min)",
|
||||
"When in doubt, use a longer timeout. It is always better to wait longer than to lose completed work.",
|
||||
"",
|
||||
"### Announce Modes",
|
||||
"- `announce: \"immediate\"` (default): Each sub-agent's findings are delivered to you as soon as it completes.",
|
||||
"- `announce: \"silent\"`: All findings are held back until every silent sub-agent finishes, then delivered as ONE combined report.",
|
||||
"Use \"silent\" when you want to collect data from multiple sub-agents first, then summarize everything at once.",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -377,6 +392,10 @@ export function buildSubagentSection(
|
|||
"## Subagent Rules",
|
||||
"- Stay focused on the assigned task below.",
|
||||
"- Complete the task thoroughly and report your findings.",
|
||||
"- If you encounter errors (missing API keys, permission denied, tool failures, etc.), " +
|
||||
"you MUST explicitly report them in your final message. " +
|
||||
"State exactly what failed and what is needed to fix it — " +
|
||||
"the parent agent relies on your final message to understand what happened.",
|
||||
"- Do NOT initiate side actions unrelated to the task.",
|
||||
"- Do NOT attempt to communicate with the user directly.",
|
||||
"- Do NOT spawn nested subagents.",
|
||||
|
|
|
|||
39
packages/core/src/agent/tokens.test.ts
Normal file
39
packages/core/src/agent/tokens.test.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import { describe, it, expect } from "vitest";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "./tokens.js";
|
||||
|
||||
describe("isSilentReplyText", () => {
|
||||
it("detects exact NO_REPLY", () => {
|
||||
expect(isSilentReplyText("NO_REPLY")).toBe(true);
|
||||
});
|
||||
|
||||
it("detects NO_REPLY with surrounding whitespace", () => {
|
||||
expect(isSilentReplyText(" NO_REPLY ")).toBe(true);
|
||||
expect(isSilentReplyText("\nNO_REPLY\n")).toBe(true);
|
||||
});
|
||||
|
||||
it("detects NO_REPLY with trailing punctuation", () => {
|
||||
expect(isSilentReplyText("NO_REPLY.")).toBe(true);
|
||||
expect(isSilentReplyText("NO_REPLY.\n")).toBe(true);
|
||||
});
|
||||
|
||||
it("detects NO_REPLY at end of text", () => {
|
||||
expect(isSilentReplyText("I have nothing to report. NO_REPLY")).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false for undefined/empty", () => {
|
||||
expect(isSilentReplyText(undefined)).toBe(false);
|
||||
expect(isSilentReplyText("")).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for normal text", () => {
|
||||
expect(isSilentReplyText("Here are the findings")).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for NO_REPLY embedded in a word", () => {
|
||||
expect(isSilentReplyText("DONO_REPLYX")).toBe(false);
|
||||
});
|
||||
|
||||
it("exports SILENT_REPLY_TOKEN as NO_REPLY", () => {
|
||||
expect(SILENT_REPLY_TOKEN).toBe("NO_REPLY");
|
||||
});
|
||||
});
|
||||
17
packages/core/src/agent/tokens.ts
Normal file
17
packages/core/src/agent/tokens.ts
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
export const SILENT_REPLY_TOKEN = "NO_REPLY";
|
||||
|
||||
function escapeRegExp(value: string): string {
|
||||
return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
||||
}
|
||||
|
||||
export function isSilentReplyText(
|
||||
text: string | undefined,
|
||||
token: string = SILENT_REPLY_TOKEN,
|
||||
): boolean {
|
||||
if (!text) return false;
|
||||
const escaped = escapeRegExp(token);
|
||||
const prefix = new RegExp(`^\\s*${escaped}(?=$|\\W)`);
|
||||
if (prefix.test(text)) return true;
|
||||
const suffix = new RegExp(`\\b${escaped}\\b\\W*$`);
|
||||
return suffix.test(text);
|
||||
}
|
||||
|
|
@ -27,6 +27,8 @@ export interface CreateToolsOptions {
|
|||
isSubagent?: boolean | undefined;
|
||||
/** Session ID of the agent (passed to sessions_spawn tool) */
|
||||
sessionId?: string | undefined;
|
||||
/** Resolved provider ID of the parent agent (passed to sessions_spawn for subagent inheritance) */
|
||||
provider?: string | undefined;
|
||||
/** Callback invoked when exec tool needs approval before running a command */
|
||||
onExecApprovalNeeded?: ExecApprovalCallback | undefined;
|
||||
}
|
||||
|
|
@ -134,6 +136,7 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
|
|||
const sessionsSpawnTool = createSessionsSpawnTool({
|
||||
isSubagent: isSubagent ?? false,
|
||||
...(sessionId !== undefined ? { sessionId } : {}),
|
||||
...(opts.provider !== undefined ? { provider: opts.provider } : {}),
|
||||
});
|
||||
tools.push(sessionsSpawnTool as AgentTool<any>);
|
||||
|
||||
|
|
@ -168,6 +171,7 @@ export function resolveTools(options: ResolveToolsOptions): AgentTool<any>[] {
|
|||
profileDir: options.profileDir,
|
||||
isSubagent: options.isSubagent,
|
||||
sessionId: options.sessionId,
|
||||
provider: options.provider,
|
||||
onExecApprovalNeeded: options.onExecApprovalNeeded,
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -73,9 +73,9 @@ describe("sessions_list tool", () => {
|
|||
const text = result.content[0]!;
|
||||
expect(text.type).toBe("text");
|
||||
expect((text as { text: string }).text).toContain("3 total");
|
||||
expect((text as { text: string }).text).toContain("[running]");
|
||||
expect((text as { text: string }).text).toContain("[ok]");
|
||||
expect((text as { text: string }).text).toContain("[error]");
|
||||
expect((text as { text: string }).text).toContain("[RUNNING]");
|
||||
expect((text as { text: string }).text).toContain("[OK]");
|
||||
expect((text as { text: string }).text).toContain("[ERROR]");
|
||||
expect((text as { text: string }).text).toContain("Code Review");
|
||||
expect((text as { text: string }).text).toContain("Test Analysis");
|
||||
expect((text as { text: string }).text).toContain("Lint Check");
|
||||
|
|
|
|||
|
|
@ -127,7 +127,9 @@ export function createSessionsListTool(
|
|||
label: "List Subagent Runs",
|
||||
description:
|
||||
"List all subagent runs spawned by this session and their current status. " +
|
||||
"Optionally pass a runId to get detailed information about a specific run.",
|
||||
"Optionally pass a runId to get detailed information about a specific run. " +
|
||||
"NOTE: Do NOT call this immediately after spawning subagents — results arrive automatically in your context when subagents complete. " +
|
||||
"Only use this if a long time has passed or the user explicitly asks about subagent status.",
|
||||
parameters: SessionsListSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { runId } = args as SessionsListArgs;
|
||||
|
|
@ -173,13 +175,52 @@ export function createSessionsListTool(
|
|||
};
|
||||
}
|
||||
|
||||
const lines = [`Subagent runs for this session: ${runs.length} total`, ""];
|
||||
const someRunning = runs.some((r) => !r.endedAt);
|
||||
|
||||
// Build status lines for each run
|
||||
const statusLines: string[] = [];
|
||||
for (let i = 0; i < runs.length; i++) {
|
||||
lines.push(formatRunSummary(runs[i]!, i, now));
|
||||
const r = runs[i]!;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const status = resolveStatus(r);
|
||||
if (status === "running") {
|
||||
const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned";
|
||||
statusLines.push(` ${i + 1}. [RUNNING] "${displayName}" (${elapsed})`);
|
||||
} else {
|
||||
const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : "";
|
||||
const findings = r.findingsCaptured
|
||||
? (r.findings ? r.findings.slice(0, 200) + (r.findings.length > 200 ? "…" : "") : "(no output)")
|
||||
: "(findings not yet captured)";
|
||||
statusLines.push(` ${i + 1}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`);
|
||||
}
|
||||
}
|
||||
|
||||
const header = `Subagent runs for this session: ${runs.length} total`;
|
||||
const body = statusLines.join("\n");
|
||||
|
||||
// If any subagents are still running, return status with wait instruction.
|
||||
// We do NOT use steer() here — steer would cancel unrelated tool calls
|
||||
// that the LLM may be processing in the same batch.
|
||||
if (someRunning) {
|
||||
const runningCount = runs.filter((r) => !r.endedAt).length;
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text:
|
||||
header + "\n" + body + "\n\n" +
|
||||
`STATUS: ${runningCount} subagent(s) still running. This is normal — they need time to complete.\n` +
|
||||
"ACTION REQUIRED: Do NOT call sessions_list again. Results will be delivered into your context automatically when they finish.\n" +
|
||||
"Do NOT attempt to do this work yourself — the subagents are handling it.",
|
||||
},
|
||||
],
|
||||
details: { runs: runs.map(toResultRun) },
|
||||
};
|
||||
}
|
||||
|
||||
// All completed — normal response
|
||||
return {
|
||||
content: [{ type: "text", text: lines.join("\n") }],
|
||||
content: [{ type: "text", text: header + "\n" + body }],
|
||||
details: { runs: runs.map(toResultRun) },
|
||||
};
|
||||
},
|
||||
|
|
|
|||
|
|
@ -35,6 +35,15 @@ const SessionsSpawnSchema = Type.Object({
|
|||
minimum: 0,
|
||||
}),
|
||||
),
|
||||
announce: Type.Optional(
|
||||
Type.Union([Type.Literal("immediate"), Type.Literal("silent")], {
|
||||
description:
|
||||
"Announcement mode. 'immediate' (default): findings delivered as each subagent completes. " +
|
||||
"'silent': defer all announcements until every silent subagent from this session finishes, " +
|
||||
"then deliver one combined report. Use 'silent' when spawning multiple subagents to collect " +
|
||||
"data in parallel and you want to summarize everything at once.",
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
type SessionsSpawnArgs = {
|
||||
|
|
@ -43,6 +52,7 @@ type SessionsSpawnArgs = {
|
|||
model?: string;
|
||||
cleanup?: "delete" | "keep";
|
||||
timeoutSeconds?: number;
|
||||
announce?: "immediate" | "silent";
|
||||
};
|
||||
|
||||
export type SessionsSpawnResult = {
|
||||
|
|
@ -57,6 +67,8 @@ export interface CreateSessionsSpawnToolOptions {
|
|||
isSubagent?: boolean;
|
||||
/** Session ID of the current (requester) agent */
|
||||
sessionId?: string;
|
||||
/** Resolved provider ID of the parent agent (inherited by subagents) */
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
export function createSessionsSpawnTool(
|
||||
|
|
@ -67,11 +79,13 @@ export function createSessionsSpawnTool(
|
|||
label: "Spawn Subagent",
|
||||
description:
|
||||
"Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " +
|
||||
"When it completes, its findings are announced back to you automatically. " +
|
||||
"When it completes, its findings are delivered directly into your context automatically — you do NOT need to poll or check. " +
|
||||
"IMPORTANT: After spawning subagents, continue with any other immediate tasks you have, or simply finish your turn and wait. " +
|
||||
"Do NOT call sessions_list to check on subagents you just spawned — results take time and will arrive on their own. " +
|
||||
"Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.",
|
||||
parameters: SessionsSpawnSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds } = args as SessionsSpawnArgs;
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds, announce } = args as SessionsSpawnArgs;
|
||||
|
||||
// Guard: subagents cannot spawn subagents
|
||||
if (options.isSubagent) {
|
||||
|
|
@ -107,6 +121,7 @@ export function createSessionsSpawnTool(
|
|||
const childAgent = hub.createSubagent(childSessionId, {
|
||||
systemPrompt,
|
||||
model,
|
||||
provider: options.provider,
|
||||
});
|
||||
|
||||
// Register the run for lifecycle tracking.
|
||||
|
|
@ -120,6 +135,7 @@ export function createSessionsSpawnTool(
|
|||
label,
|
||||
cleanup,
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
start: () => childAgent.write(task),
|
||||
});
|
||||
|
||||
|
|
@ -127,7 +143,7 @@ export function createSessionsSpawnTool(
|
|||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. You will receive its findings when it completes.`,
|
||||
text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. Its findings will be delivered directly into your context when it completes — do NOT poll or call sessions_list for it. Continue with other tasks or finish your turn.`,
|
||||
},
|
||||
],
|
||||
details: {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue