diff --git a/packages/core/src/agent/async-agent.test.ts b/packages/core/src/agent/async-agent.test.ts index 26e7cc9d..e9700d03 100644 --- a/packages/core/src/agent/async-agent.test.ts +++ b/packages/core/src/agent/async-agent.test.ts @@ -272,6 +272,18 @@ describe("AsyncAgent internal flow", () => { agent.close(); }); + it("does not persist assistant summary when result text is a NO_REPLY variant", async () => { + runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY.", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + it("does not persist assistant summary when result text is empty", async () => { runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined }); const agent = new AsyncAgent(); diff --git a/packages/core/src/agent/async-agent.ts b/packages/core/src/agent/async-agent.ts index 30e1f94d..41c33778 100644 --- a/packages/core/src/agent/async-agent.ts +++ b/packages/core/src/agent/async-agent.ts @@ -5,6 +5,7 @@ import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; import type { MulticaEvent } from "./events.js"; import { injectMessageTimestamp } from "./message-timestamp.js"; +import { isSilentReplyText } from "./tokens.js"; const devNull = { write: () => true } as unknown as NodeJS.WritableStream; @@ -31,6 +32,7 @@ export class AsyncAgent { private pendingWrites = 0; private closeCallbacks: Array<() => void> = []; private forwardInternalAssistant = false; + private _lastRunError: string | undefined; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -64,13 +66,19 @@ export class AsyncAgent { this.queue = this.queue .then(async () => { - if (this._closed) return; + if (this._closed) { + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] write() skipped — agent closed`); + return; + } + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`); const result = await this.agent.run(message, { displayPrompt: content }); + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`); // Flush pending session writes so waitForIdle() callers // can safely read session data from disk. await this.agent.flushSession(); // Normal text is delivered via message_end event; only handle errors here if (result.error) { + this._lastRunError = result.error; console.error(`[AsyncAgent] Agent run error: ${result.error}`); this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); // Only emit agent_error for HTTP 401 from the LLM provider so the @@ -83,6 +91,7 @@ export class AsyncAgent { }) .catch((err) => { const message = err instanceof Error ? err.message : String(err); + this._lastRunError = message; console.error(`[AsyncAgent] Agent run exception: ${message}`); this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); // Only emit agent_error for HTTP 401 from the LLM provider so the @@ -120,8 +129,11 @@ export class AsyncAgent { // Internal run errors are for diagnostics only; do not leak to user stream. console.error(`[AsyncAgent] Internal run error: ${result.error}`); } + // Stop forwarding BEFORE persist to avoid double-emitting the same + // assistant message (once from runInternal streaming, once from appendMessage). + this.forwardInternalAssistant = prevForward; // Persist the LLM summary so it remains in parent context for future turns - if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") { + if (persistResponse && result.text?.trim() && !isSilentReplyText(result.text)) { this.agent.persistAssistantSummary(result.text.trim()); await this.agent.flushSession(); } @@ -164,6 +176,43 @@ export class AsyncAgent { return this.queue; } + /** Error message from the last run, if it failed. */ + get lastRunError(): string | undefined { + return this._lastRunError; + } + + /** Whether the agent is currently executing a run (normal or internal). */ + get isRunning(): boolean { + return this.agent.isRunning; + } + + /** Whether the underlying LLM is currently streaming a response. */ + get isStreaming(): boolean { + return this.agent.isStreaming; + } + + /** + * Steer the agent mid-run. Bypasses the serial queue and injects a message + * directly into the PiAgentCore steering queue. The message is delivered + * after the current tool execution completes, skipping remaining tool calls. + */ + steer(content: string): void { + this.agent.steer(content); + } + + /** + * Queue a follow-up message for after the current run finishes. + * Delivered only when the agent has no more tool calls or steering messages. + */ + followUp(content: string): void { + this.agent.followUp(content); + } + + /** Whether the underlying PiAgentCore has queued steer/followUp messages. */ + hasQueuedMessages(): boolean { + return this.agent.hasQueuedMessages(); + } + private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { if (!this.agent.isInternalRun) return true; if (!this.forwardInternalAssistant) return false; diff --git a/packages/core/src/agent/runner.ts b/packages/core/src/agent/runner.ts index beaa4faf..636a263d 100644 --- a/packages/core/src/agent/runner.ts +++ b/packages/core/src/agent/runner.ts @@ -91,6 +91,7 @@ export class Agent { // Internal run state private _internalRun = false; + private _isRunning = false; private _runMutex: Promise = Promise.resolve(); private currentUserDisplayPrompt: string | undefined; @@ -304,8 +305,8 @@ export class Agent { const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools); const profileDir = this.profile?.getProfileDir(); this.toolsOptions = mergedToolsConfig - ? { ...options, tools: mergedToolsConfig, profileDir } - : { ...options, profileDir }; + ? { ...options, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider } + : { ...options, profileDir, provider: this.resolvedProvider }; const tools = resolveTools(this.toolsOptions); if (this.debug) { @@ -427,6 +428,7 @@ export class Agent { this.refreshAuthState(); this.output.state.lastAssistantText = ""; this.currentUserDisplayPrompt = options?.displayPrompt; + this._isRunning = true; try { // Early validation: check API key before calling PiAgentCore.prompt(), @@ -489,6 +491,7 @@ export class Agent { : undefined; return { text: this.output.state.lastAssistantText, thinking, error: this.agent.state.error }; } finally { + this._isRunning = false; this.currentUserDisplayPrompt = undefined; } } @@ -664,6 +667,40 @@ export class Agent { return this._internalRun; } + /** Whether a run (normal or internal) is currently executing inside _run(). */ + get isRunning(): boolean { + return this._isRunning; + } + + /** Whether the underlying PiAgentCore is currently streaming an LLM response. */ + get isStreaming(): boolean { + return this.agent.state.isStreaming; + } + + /** + * Queue a steering message to interrupt the agent mid-run. + * Delivered after current tool execution, skipping remaining tool calls. + * Safe to call from any context (does not require the run mutex). + */ + steer(content: string): void { + const msg: UserMessage = { role: "user", content, timestamp: Date.now() }; + this.agent.steer(msg); + } + + /** + * Queue a follow-up message for after the current run finishes. + * Delivered only when the agent has no more tool calls or steering messages. + */ + followUp(content: string): void { + const msg: UserMessage = { role: "user", content, timestamp: Date.now() }; + this.agent.followUp(msg); + } + + /** Whether the underlying PiAgentCore has queued steer/followUp messages. */ + hasQueuedMessages(): boolean { + return this.agent.hasQueuedMessages(); + } + /** * Persist a synthetic assistant message into both in-memory state and session JSONL. * Used after an internal run to keep the LLM summary visible in future turns @@ -895,6 +932,13 @@ export class Agent { // Update internal state this.resolvedProvider = providerId; + // Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider + this.toolsOptions = { ...this.toolsOptions, provider: providerId }; + + // Reload tools so sessions_spawn picks up the new provider in its closure. + // Without this, the existing tool instance still captures the old provider. + const tools = resolveTools(this.toolsOptions); + this.agent.setTools(tools); // Update session metadata (save original providerId, not alias-resolved) this.session.saveMeta({ @@ -906,7 +950,7 @@ export class Agent { }); // Rebuild system prompt so runtime info reflects the new provider/model - const toolNames = (this.agent.state.tools ?? []).map((t: { name: string }) => t.name); + const toolNames = tools.map((t) => t.name); const systemPrompt = this.rebuildSystemPrompt(toolNames); if (systemPrompt) { this.agent.setSystemPrompt(systemPrompt); diff --git a/packages/core/src/agent/subagent/README.md b/packages/core/src/agent/subagent/README.md new file mode 100644 index 00000000..e007eab1 --- /dev/null +++ b/packages/core/src/agent/subagent/README.md @@ -0,0 +1,172 @@ +# Subagent System + +The subagent system allows a parent agent to spawn isolated child agents that run tasks in parallel and report results back automatically. + +## Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ Parent Agent (runner.ts) │ +│ │ +│ tools: sessions_spawn, sessions_list │ +│ state: resolvedProvider, toolsOptions │ +└──────────┬──────────────────────────────────────────────────────────┘ + │ + │ sessions_spawn(task, label, timeoutSeconds) + ▼ +┌─────────────────────────────────────────────────────────────────────┐ +│ Spawn Flow (sessions-spawn.ts) │ +│ │ +│ 1. Build subagent system prompt (announce.ts) │ +│ 2. hub.createSubagent(childSessionId, { provider, model }) │ +│ 3. registerSubagentRun({ start: () => childAgent.write(task) }) │ +│ 4. Return { status: "accepted", runId, childSessionId } │ +└──────────┬──────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────┐ +│ Concurrency Queue (command-queue.ts) │ +│ │ +│ Lane: "subagent" — max 10 concurrent (configurable) │ +│ Queued runs wait for a slot before start() is called │ +└──────────┬──────────────────────────────────────────────────────────┘ + │ slot acquired + ▼ +┌─────────────────────────────────────────────────────────────────────┐ +│ Child Agent Execution │ +│ │ +│ ┌───────────────────────────────────────────────────────────────┐ │ +│ │ AsyncAgent (async-agent.ts) │ │ +│ │ - Isolated session with restricted tools (isSubagent=true) │ │ +│ │ - Inherits parent's LLM provider │ │ +│ │ - System prompt: task focus + error reporting rules │ │ +│ │ - Tracks lastRunError for error propagation │ │ +│ └───────────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌───────────────────────────────────────────────────────────────┐ │ +│ │ watchChildAgent (registry.ts) │ │ +│ │ - Sets startedAt, starts timeout timer │ │ +│ │ - waitForIdle() — waits for child's task queue to drain │ │ +│ │ - onClose() — handles explicit close (timeout kill, etc.) │ │ +│ └───────────────────────────────────────────────────────────────┘ │ +└──────────┬──────────────────────────────────────────────────────────┘ + │ + │ child completes / errors / times out + ▼ +┌─────────────────────────────────────────────────────────────────────┐ +│ Completion Handling (registry.ts) │ +│ │ +│ handleRunCompletion(record) │ +│ │ │ +│ ├─ Phase 1: captureFindings() │ +│ │ - Read last assistant reply from child session JSONL │ +│ │ - Falls back to last toolResult if no assistant text │ +│ │ - Persists findings to record before session deletion │ +│ │ │ +│ ├─ Session Cleanup │ +│ │ - cleanup="delete": rm child session dir + hub.closeAgent() │ +│ │ - cleanup="keep": preserve for audit │ +│ │ │ +│ └─ Phase 2: checkAndAnnounce(requesterSessionId) │ +│ - Finds all unannounced, completed runs with findings │ +│ - Calls runCoalescedAnnounceFlow() │ +│ - Marks records: announced=true, archiveAtMs=now+60min │ +└──────────┬──────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────┐ +│ Announcement Delivery (announce.ts) │ +│ │ +│ runCoalescedAnnounceFlow(requesterSessionId, records) │ +│ │ │ +│ ├─ Format message: formatCoalescedAnnouncementMessage() │ +│ │ - Single record: task name, status, findings, stats │ +│ │ - Multiple records: combined report with all findings │ +│ │ │ +│ ├─ Two-tier delivery: │ +│ │ │ +│ │ Tier 1: BUSY (parent running or has pending writes) │ +│ │ └─ enqueueAnnounce() → announce-queue.ts │ +│ │ - Debounce 1s to batch nearby completions │ +│ │ - Drain via writeInternal() when parent finishes │ +│ │ │ +│ │ Tier 2: IDLE (parent not running) │ +│ │ └─ sendAnnounceDirect() │ +│ │ - writeInternal(msg, { forwardAssistant, persistResponse })│ +│ │ │ +│ └─ All delivery uses writeInternal() (marks as internal: true) │ +│ → Prevents announcement from showing as user bubble in UI │ +│ → LLM processes findings and responds naturally to user │ +└──────────┬──────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────┐ +│ Record Lifecycle (registry.ts) │ +│ │ +│ created → startedAt → endedAt → findingsCaptured → announced │ +│ │ +│ After announcement: │ +│ - Record kept with archiveAtMs = now + 60 min │ +│ - sessions_list can still query records during this window │ +│ - Sweeper runs every 60s, removes expired records │ +│ - When all records removed, sweeper stops │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +## Key Files + +| File | Purpose | +|------|---------| +| `sessions-spawn.ts` | Tool: spawns a child agent with task, label, timeout, provider | +| `sessions-list.ts` | Tool: lists subagent runs and their status | +| `registry.ts` | Lifecycle management: register, watch, capture, announce, archive | +| `announce.ts` | System prompt builder, findings reader, message formatter, delivery | +| `announce-queue.ts` | Debounced queue for batching announcements when parent is busy | +| `command-queue.ts` | Concurrency limiter for subagent lane slots | +| `lanes.ts` | Lane config: max concurrency (10), default timeout (600s) | +| `types.ts` | Shared types: SubagentRunRecord, SubagentRunOutcome, etc. | +| `registry-store.ts` | Persistence: save/load runs to disk for crash recovery | + +## Provider Inheritance + +Subagents inherit the parent's resolved LLM provider: + +``` +runner.ts (resolvedProvider) + → toolsOptions.provider + → tools.ts (CreateToolsOptions.provider) + → sessions-spawn.ts (options.provider) + → hub.createSubagent({ provider }) +``` + +When the user switches providers via UI (`setProvider()`), `toolsOptions.provider` is updated in sync so future spawns use the new provider. + +## Error Propagation + +``` +Child tool error (e.g., API 401) + → Subagent LLM sees error, includes in final message (system prompt rule) + → captureFindings() reads final message + → Announcement includes error in findings + → Parent LLM sees error and can inform user + +Child run error (e.g., missing API key for provider) + → AsyncAgent._lastRunError set + → registry.ts checks childAgent.lastRunError after waitForIdle() + → outcome = { status: "error", error: "No API key configured..." } + → Announcement: "task failed: No API key configured..." +``` + +## Timeout Behavior + +Default: 600s (10 min). System prompt guides the parent LLM: +- Simple tasks: 600s (default) +- Moderate tasks: 900-1200s (15-20 min) +- Complex tasks: 1200-1800s (20-30 min) + +On timeout: +1. Timeout timer fires in `watchChildAgent()` +2. `cleanup({ status: "timeout" })` is called +3. Child agent is closed via `hub.closeAgent()` +4. Findings are captured from whatever the child wrote so far +5. Announcement reports "timed out" with partial findings diff --git a/packages/core/src/agent/subagent/announce-queue.test.ts b/packages/core/src/agent/subagent/announce-queue.test.ts new file mode 100644 index 00000000..bff56972 --- /dev/null +++ b/packages/core/src/agent/subagent/announce-queue.test.ts @@ -0,0 +1,203 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + enqueueAnnounce, + resetAnnounceQueuesForTests, + getAnnounceQueueDepth, + type AnnounceQueueItem, + type AnnounceQueueSettings, +} from "./announce-queue.js"; + +afterEach(() => { + resetAnnounceQueuesForTests(); +}); + +function makeItem(overrides?: Partial): AnnounceQueueItem { + return { + prompt: "test prompt", + summaryLine: "test summary", + enqueuedAt: Date.now(), + requesterSessionId: "session-1", + ...overrides, + }; +} + +const FAST_SETTINGS: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 20, + dropPolicy: "old", +}; + +describe("announce queue", () => { + it("enqueues an item and drains via send callback", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + enqueueAnnounce({ + key: "test", + item: makeItem(), + settings: FAST_SETTINGS, + send, + }); + + // Wait for async drain + await new Promise((r) => setTimeout(r, 50)); + + expect(sent).toHaveLength(1); + expect(sent[0]!.prompt).toBe("test prompt"); + }); + + it("batches items in collect mode", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + const collectSettings: AnnounceQueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 20, + dropPolicy: "old", + }; + + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt 1" }), + settings: collectSettings, + send, + }); + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt 2" }), + settings: collectSettings, + send, + }); + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt 3" }), + settings: collectSettings, + send, + }); + + await new Promise((r) => setTimeout(r, 50)); + + // Collect mode batches all into one send + expect(sent).toHaveLength(1); + expect(sent[0]!.prompt).toContain("prompt 1"); + expect(sent[0]!.prompt).toContain("prompt 2"); + expect(sent[0]!.prompt).toContain("prompt 3"); + expect(sent[0]!.prompt).toContain("3 queued announce(s)"); + }); + + it("sends items individually in followup mode", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt A" }), + settings: FAST_SETTINGS, + send, + }); + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt B" }), + settings: FAST_SETTINGS, + send, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(sent).toHaveLength(2); + expect(sent[0]!.prompt).toBe("prompt A"); + expect(sent[1]!.prompt).toBe("prompt B"); + }); + + it("respects cap with 'new' drop policy (rejects new items)", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { + // Slow send to keep items in queue + await new Promise((r) => setTimeout(r, 200)); + sent.push(item); + }; + + const cappedSettings: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 2, + dropPolicy: "new", + }; + + const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send }); + const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send }); + const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send }); + + expect(r1).toBe(true); + expect(r2).toBe(true); + expect(r3).toBe(false); // Rejected — cap reached + }); + + it("respects cap with 'old' drop policy (drops oldest)", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { + await new Promise((r) => setTimeout(r, 200)); + sent.push(item); + }; + + const cappedSettings: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 2, + dropPolicy: "old", + }; + + enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send }); + enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send }); + enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send }); + + // Queue should have items 2 and 3 (oldest was dropped) + expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2); + }); + + it("cleans up queue after drain completes", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + enqueueAnnounce({ + key: "test", + item: makeItem(), + settings: FAST_SETTINGS, + send, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(sent).toHaveLength(1); + expect(getAnnounceQueueDepth("test")).toBe(0); + }); + + it("debounces before draining", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + const debouncedSettings: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 100, + cap: 20, + dropPolicy: "old", + }; + + enqueueAnnounce({ + key: "test", + item: makeItem(), + settings: debouncedSettings, + send, + }); + + // Should not have sent yet (debounce) + await new Promise((r) => setTimeout(r, 30)); + expect(sent).toHaveLength(0); + + // Wait for debounce to complete + await new Promise((r) => setTimeout(r, 150)); + expect(sent).toHaveLength(1); + }); +}); diff --git a/packages/core/src/agent/subagent/announce-queue.ts b/packages/core/src/agent/subagent/announce-queue.ts new file mode 100644 index 00000000..8e88a346 --- /dev/null +++ b/packages/core/src/agent/subagent/announce-queue.ts @@ -0,0 +1,315 @@ +/** + * Announce queue for subagent result delivery. + * + * Handles queuing and batching of subagent announcements when the parent + * agent is busy. Supports debounce, cap, drop policy, and collect mode. + * + * Ported from OpenClaw (MIT license), adapted for Super Multica. + */ + +// ============================================================================ +// Types +// ============================================================================ + +export type AnnounceQueueMode = + /** Try steer, no queue fallback */ + | "steer" + /** Try steer, fall back to queue */ + | "steer-backlog" + /** Queue and send items individually */ + | "followup" + /** Queue and batch all items into one combined prompt */ + | "collect"; + +export type AnnounceDropPolicy = + /** Drop oldest items when cap reached */ + | "old" + /** Drop newest items when cap reached */ + | "new" + /** Summarize dropped items */ + | "summarize"; + +export type AnnounceQueueItem = { + prompt: string; + summaryLine?: string; + enqueuedAt: number; + requesterSessionId: string; +}; + +export type AnnounceQueueSettings = { + mode: AnnounceQueueMode; + debounceMs?: number; + cap?: number; + dropPolicy?: AnnounceDropPolicy; +}; + +type AnnounceQueueState = { + items: AnnounceQueueItem[]; + draining: boolean; + lastEnqueuedAt: number; + mode: AnnounceQueueMode; + debounceMs: number; + cap: number; + dropPolicy: AnnounceDropPolicy; + droppedCount: number; + summaryLines: string[]; + send: (item: AnnounceQueueItem) => Promise; +}; + +// ============================================================================ +// Defaults +// ============================================================================ + +const DEFAULT_DEBOUNCE_MS = 1000; +const DEFAULT_CAP = 20; +const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize"; + +export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = { + mode: "steer-backlog", + debounceMs: DEFAULT_DEBOUNCE_MS, + cap: DEFAULT_CAP, + dropPolicy: DEFAULT_DROP_POLICY, +}; + +// ============================================================================ +// Module state +// ============================================================================ + +const ANNOUNCE_QUEUES = new Map(); + +// ============================================================================ +// Public API +// ============================================================================ + +/** + * Enqueue an announcement for delivery. Returns true if enqueued, + * false if dropped (cap + "new" drop policy). + */ +export function enqueueAnnounce(params: { + key: string; + item: AnnounceQueueItem; + settings: AnnounceQueueSettings; + send: (item: AnnounceQueueItem) => Promise; +}): boolean { + const queue = getOrCreateQueue(params.key, params.settings, params.send); + queue.lastEnqueuedAt = Date.now(); + + const shouldEnqueue = applyDropPolicy(queue, params.item); + if (!shouldEnqueue) { + if (queue.dropPolicy === "new") { + scheduleAnnounceDrain(params.key); + } + return false; + } + + queue.items.push(params.item); + scheduleAnnounceDrain(params.key); + return true; +} + +/** Reset all queues (for testing). */ +export function resetAnnounceQueuesForTests(): void { + ANNOUNCE_QUEUES.clear(); +} + +/** Get the current queue depth for a key (for testing/diagnostics). */ +export function getAnnounceQueueDepth(key: string): number { + return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0; +} + +// ============================================================================ +// Queue management +// ============================================================================ + +function getOrCreateQueue( + key: string, + settings: AnnounceQueueSettings, + send: (item: AnnounceQueueItem) => Promise, +): AnnounceQueueState { + const existing = ANNOUNCE_QUEUES.get(key); + if (existing) { + existing.mode = settings.mode; + if (typeof settings.debounceMs === "number") { + existing.debounceMs = Math.max(0, settings.debounceMs); + } + if (typeof settings.cap === "number" && settings.cap > 0) { + existing.cap = Math.floor(settings.cap); + } + if (settings.dropPolicy) { + existing.dropPolicy = settings.dropPolicy; + } + existing.send = send; + return existing; + } + + const created: AnnounceQueueState = { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: settings.mode, + debounceMs: + typeof settings.debounceMs === "number" + ? Math.max(0, settings.debounceMs) + : DEFAULT_DEBOUNCE_MS, + cap: + typeof settings.cap === "number" && settings.cap > 0 + ? Math.floor(settings.cap) + : DEFAULT_CAP, + dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY, + droppedCount: 0, + summaryLines: [], + send, + }; + ANNOUNCE_QUEUES.set(key, created); + return created; +} + +// ============================================================================ +// Drop policy +// ============================================================================ + +function applyDropPolicy( + queue: AnnounceQueueState, + item: AnnounceQueueItem, +): boolean { + if (queue.items.length < queue.cap) { + return true; + } + + switch (queue.dropPolicy) { + case "new": + // Reject the incoming item + return false; + + case "old": { + // Drop the oldest item to make room + const dropped = queue.items.shift(); + if (dropped) { + queue.droppedCount++; + const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80); + queue.summaryLines.push(summary); + } + return true; + } + + case "summarize": { + // Drop the oldest item but keep a summary + const dropped = queue.items.shift(); + if (dropped) { + queue.droppedCount++; + const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80); + queue.summaryLines.push(summary); + } + return true; + } + + default: + return true; + } +} + +// ============================================================================ +// Drain scheduling +// ============================================================================ + +function scheduleAnnounceDrain(key: string): void { + const queue = ANNOUNCE_QUEUES.get(key); + if (!queue || queue.draining) return; + + queue.draining = true; + void (async () => { + try { + while (queue.items.length > 0 || queue.droppedCount > 0) { + await waitForDebounce(queue); + + if (queue.mode === "collect") { + // Batch all items into one combined prompt + const items = queue.items.splice(0, queue.items.length); + const summary = buildDropSummary(queue); + const prompt = buildCollectPrompt(items, summary); + const last = items.at(-1); + if (!last) break; + await queue.send({ ...last, prompt }); + continue; + } + + // followup / steer-backlog: send items individually + const summary = buildDropSummary(queue); + if (summary) { + const next = queue.items.shift(); + if (!next) break; + await queue.send({ ...next, prompt: summary }); + continue; + } + + const next = queue.items.shift(); + if (!next) break; + await queue.send(next); + } + } catch (err) { + console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`); + } finally { + queue.draining = false; + if (queue.items.length === 0 && queue.droppedCount === 0) { + ANNOUNCE_QUEUES.delete(key); + } else { + scheduleAnnounceDrain(key); + } + } + })(); +} + +// ============================================================================ +// Helpers +// ============================================================================ + +function waitForDebounce(queue: AnnounceQueueState): Promise { + const elapsed = Date.now() - queue.lastEnqueuedAt; + const remaining = Math.max(0, queue.debounceMs - elapsed); + if (remaining <= 0) return Promise.resolve(); + return new Promise((resolve) => setTimeout(resolve, remaining)); +} + +function buildDropSummary(queue: AnnounceQueueState): string | undefined { + if (queue.droppedCount === 0) return undefined; + + const parts: string[] = [ + `[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`, + ]; + if (queue.summaryLines.length > 0) { + parts.push(""); + for (const line of queue.summaryLines) { + parts.push(`- ${line}`); + } + } + + // Reset counters + queue.droppedCount = 0; + queue.summaryLines = []; + + return parts.join("\n"); +} + +function buildCollectPrompt( + items: AnnounceQueueItem[], + dropSummary: string | undefined, +): string { + const parts: string[] = [ + `[${items.length} queued announce(s) while agent was busy]`, + "", + ]; + + for (let i = 0; i < items.length; i++) { + parts.push(`---`); + parts.push(`Queued #${i + 1}`); + parts.push(items[i]!.prompt); + parts.push(""); + } + + if (dropSummary) { + parts.push(dropSummary); + parts.push(""); + } + + return parts.join("\n"); +} diff --git a/packages/core/src/agent/subagent/announce.ts b/packages/core/src/agent/subagent/announce.ts index 54c92e75..c1cf3330 100644 --- a/packages/core/src/agent/subagent/announce.ts +++ b/packages/core/src/agent/subagent/announce.ts @@ -16,6 +16,7 @@ import type { SubagentRunRecord, SubagentSystemPromptParams, } from "./types.js"; +import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js"; /** * Build the system prompt injected into a subagent session. @@ -275,7 +276,15 @@ export function formatCoalescedAnnouncementMessage( /** * Run the coalesced announcement flow for all completed runs of a requester. - * Formats a single combined message and delivers it to the parent agent. + * Uses two-tier delivery: + * 1. Queue — if parent is busy (running or has pending writes), queue for + * later drain via writeInternal (with debounce to batch nearby completions) + * 2. Direct — if parent is idle, send immediately via writeInternal + * + * All delivery uses writeInternal() which marks the announcement prompt as + * `internal: true` (hidden from UI). The assistant's summary response is + * forwarded to the real-time stream (`forwardAssistant: true`) so the user + * sees the result, and persisted to JSONL for future session loads. */ export function runCoalescedAnnounceFlow( requesterSessionId: string, @@ -293,7 +302,28 @@ export function runCoalescedAnnounceFlow( return false; } - parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); + // Tier 1: BUSY — parent is running or has pending writes + // Queue the announcement for delivery via writeInternal() after the parent + // finishes its current work. We do NOT use steer() (cancels unrelated tool + // calls) or followUp() (doesn't mark entries as internal, polluting the UI). + if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) { + enqueueAnnounce({ + key: requesterSessionId, + item: { + prompt: message, + summaryLine: `${records.length} subagent(s) completed`, + enqueuedAt: Date.now(), + requesterSessionId, + }, + settings: DEFAULT_ANNOUNCE_SETTINGS, + send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt), + }); + console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`); + return true; + } + + // Tier 2: IDLE — parent is idle, send directly via writeInternal + sendAnnounceDirect(requesterSessionId, message); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); @@ -301,6 +331,26 @@ export function runCoalescedAnnounceFlow( } } +/** + * Send announcement directly to parent via writeInternal. + * Used as Tier 3 (idle) and as the queue drain callback. + */ +function sendAnnounceDirect(requesterSessionId: string, message: string): void { + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`, + ); + return; + } + parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); + } catch (err) { + console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err); + } +} + /** * Run the full subagent announcement flow: * 1. Read child's last assistant reply diff --git a/packages/core/src/agent/subagent/registry.test.ts b/packages/core/src/agent/subagent/registry.test.ts index 5d5eacc3..e1886dac 100644 --- a/packages/core/src/agent/subagent/registry.test.ts +++ b/packages/core/src/agent/subagent/registry.test.ts @@ -266,8 +266,105 @@ describe("subagent registry — coalescing", () => { }); }); +describe("subagent registry — silent announce mode", () => { + // Note: In tests (no Hub), watchChildAgent completes synchronously within + // registerSubagentRun(), so each run's lifecycle finishes before the next + // registration call. Multi-run coalescing requires async child agents and + // is validated in integration tests. + + it("stores announce field on the record", () => { + const record = registerSubagentRun({ + runId: "run-ann", + childSessionId: "child-ann", + requesterSessionId: "parent-1", + task: "Task", + announce: "silent", + }); + expect(record.announce).toBe("silent"); + }); + + it("defaults announce to undefined (immediate behavior)", () => { + const record = registerSubagentRun({ + runId: "run-def", + childSessionId: "child-def", + requesterSessionId: "parent-1", + task: "Task", + }); + expect(record.announce).toBeUndefined(); + }); + + it("silent runs are announced via runCoalescedAnnounceFlow", async () => { + const announceModule = await import("./announce.js"); + const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); + + registerSubagentRun({ + runId: "run-s1", + childSessionId: "child-s1", + requesterSessionId: "parent-1", + task: "Silent A", + announce: "silent", + }); + + await flushQueue(); + + // Silent run announced (via runCoalescedAnnounceFlow mock) + const silentCalls = spy.mock.calls.filter( + ([reqId, records]) => + reqId === "parent-1" && + records.some((r: { announce?: string }) => r.announce === "silent"), + ); + expect(silentCalls.length).toBeGreaterThanOrEqual(1); + + const runS1 = getSubagentRun("run-s1"); + expect(runS1?.announced).toBe(true); + expect(runS1?.announce).toBe("silent"); + + spy.mockRestore(); + }); + + it("immediate and silent runs are never mixed in the same announce call", async () => { + const announceModule = await import("./announce.js"); + const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); + + // Register immediate run, then silent run + registerSubagentRun({ + runId: "run-imm", + childSessionId: "child-imm", + requesterSessionId: "parent-1", + task: "Immediate task", + }); + registerSubagentRun({ + runId: "run-s1", + childSessionId: "child-s1", + requesterSessionId: "parent-1", + task: "Silent task", + announce: "silent", + }); + + await flushQueue(); + + const calls = spy.mock.calls.filter( + ([reqId]) => reqId === "parent-1", + ); + + // Immediate and silent should never be in the same announce call + const mixedCalls = calls.filter(([, records]) => { + const hasImm = records.some((r: { announce?: string }) => r.announce !== "silent"); + const hasSilent = records.some((r: { announce?: string }) => r.announce === "silent"); + return hasImm && hasSilent; + }); + expect(mixedCalls).toHaveLength(0); + + // Both should be announced (in separate calls) + expect(getSubagentRun("run-imm")?.announced).toBe(true); + expect(getSubagentRun("run-s1")?.announced).toBe(true); + + spy.mockRestore(); + }); +}); + describe("subagent registry — post-announce cleanup", () => { - it("removes runs from registry after successful announcement", async () => { + it("keeps runs in registry after successful announcement with archiveAtMs", async () => { // Mock runCoalescedAnnounceFlow to succeed const announceModule = await import("./announce.js"); const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true); @@ -288,11 +385,20 @@ describe("subagent registry — post-announce cleanup", () => { await flushQueue(); - // Both runs should have been announced and removed from registry + // Both runs should have been announced but kept in registry with archiveAtMs expect(spy).toHaveBeenCalled(); - expect(getSubagentRun("run-a")).toBeUndefined(); - expect(getSubagentRun("run-b")).toBeUndefined(); - expect(listSubagentRuns("parent-1")).toHaveLength(0); + + const runA = getSubagentRun("run-a"); + const runB = getSubagentRun("run-b"); + expect(runA).toBeDefined(); + expect(runB).toBeDefined(); + expect(runA!.announced).toBe(true); + expect(runB!.announced).toBe(true); + expect(runA!.archiveAtMs).toBeGreaterThan(Date.now()); + expect(runB!.archiveAtMs).toBeGreaterThan(Date.now()); + + // Records are still queryable + expect(listSubagentRuns("parent-1")).toHaveLength(2); spy.mockRestore(); }); diff --git a/packages/core/src/agent/subagent/registry.ts b/packages/core/src/agent/subagent/registry.ts index f40a72c0..c19a5529 100644 --- a/packages/core/src/agent/subagent/registry.ts +++ b/packages/core/src/agent/subagent/registry.ts @@ -101,6 +101,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent label, cleanup = "delete", timeoutSeconds, + announce, start, } = params; @@ -111,6 +112,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent task, label, cleanup, + announce, createdAt: Date.now(), }; @@ -121,7 +123,9 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent // Enqueue in the subagent lane — the start callback and watchChildAgent // only execute once a concurrency slot is available. void enqueueInLane(SubagentLane.Subagent, async () => { + console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`); start?.(); + console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`); return watchChildAgent(record, timeoutSeconds); }); @@ -248,12 +252,26 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Pr // Wait for the child agent's task queue to drain (task completion), // then trigger announce flow. Uses waitForIdle() instead of consuming // the stream (which would conflict with Hub.consumeAgent). + console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`); childAgent.waitForIdle().then( - () => cleanup({ status: "ok" }), - (err) => cleanup({ - status: "error", - error: err instanceof Error ? err.message : String(err), - }), + () => { + const runtime = Date.now() - (record.startedAt ?? 0); + const runError = childAgent.lastRunError; + if (runError) { + console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`); + cleanup({ status: "error", error: runError }); + } else { + console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`); + cleanup({ status: "ok" }); + } + }, + (err) => { + console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err); + cleanup({ + status: "error", + error: err instanceof Error ? err.message : String(err), + }); + }, ); // Also handle explicit close (e.g., timeout kill, Hub shutdown) @@ -280,43 +298,57 @@ function captureFindings(record: SubagentRunRecord): void { } /** - * Phase 2: Check if all unannounced runs for this requester have completed. - * If so, send a single coalesced announcement to the parent. + * Phase 2: Announce completed-but-unannounced runs. + * + * Runs with announce="silent" are held back until ALL silent runs from the + * same requester have completed. All other runs (immediate / undefined) are + * announced per-completion as before. */ function checkAndAnnounce(requesterSessionId: string): void { const allRuns = listSubagentRuns(requesterSessionId); - // Only consider unannounced runs - const pending = allRuns.filter(r => !r.announced); - if (pending.length === 0) return; + // ── Immediate runs: announce per-completion (default behavior) ── + const immediateReady = allRuns.filter( + r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent", + ); + if (immediateReady.length > 0) { + announceGroup(requesterSessionId, immediateReady); + } - // Are all unannounced runs done? - const allDone = pending.every(r => r.endedAt !== undefined); - if (!allDone) return; + // ── Silent runs: announce only when ALL silent runs are done ── + const silentRuns = allRuns.filter(r => r.announce === "silent"); + const unannouncedSilent = silentRuns.filter(r => !r.announced); + const silentReady = unannouncedSilent.filter( + r => r.endedAt !== undefined && r.findingsCaptured, + ); - // Have all had findings captured? - const allCaptured = pending.every(r => r.findingsCaptured); - if (!allCaptured) return; + // All unannounced silent runs must be ready (ended + findings captured) + if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) { + announceGroup(requesterSessionId, silentReady); + } +} - // All done — send coalesced announcement - const announced = runCoalescedAnnounceFlow(requesterSessionId, pending); +/** Announce a group of runs and mark them as announced. */ +function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void { + const announced = runCoalescedAnnounceFlow(requesterSessionId, runs); if (announced) { - for (const r of pending) { + for (const r of runs) { r.announced = true; r.cleanupHandled = true; - // Remove from registry immediately — findings already delivered to parent - subagentRuns.delete(r.runId); + // Keep records for querying via sessions_list; let sweeper archive later + r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS; } persist(); - if (subagentRuns.size === 0) { - stopSweeper(); - } } else { + // Allow retry — mark cleanupHandled false so initSubagentRegistry() retries + for (const r of runs) { + r.cleanupHandled = false; + } + persist(); console.warn( - `[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`, + `[SubagentRegistry] Announce failed for requester ${requesterSessionId}`, ); - // Leave announced=false so initSubagentRegistry() can retry on restart } } diff --git a/packages/core/src/agent/subagent/types.ts b/packages/core/src/agent/subagent/types.ts index 1d3ca0ac..96277181 100644 --- a/packages/core/src/agent/subagent/types.ts +++ b/packages/core/src/agent/subagent/types.ts @@ -45,6 +45,9 @@ export type SubagentRunRecord = { findingsCaptured?: boolean | undefined; /** Whether the coalesced announcement has been sent to parent */ announced?: boolean | undefined; + /** Announcement mode: "immediate" (default) announces per-completion, + * "silent" defers until all silent runs from the same requester complete. */ + announce?: "immediate" | "silent" | undefined; }; /** Parameters for registering a new subagent run */ @@ -58,6 +61,8 @@ export type RegisterSubagentRunParams = { timeoutSeconds?: number | undefined; /** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */ start?: (() => void) | undefined; + /** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */ + announce?: "immediate" | "silent" | undefined; }; /** Parameters for the announce flow */ diff --git a/packages/core/src/agent/system-prompt/sections.ts b/packages/core/src/agent/system-prompt/sections.ts index 6d696e7d..97bcedef 100644 --- a/packages/core/src/agent/system-prompt/sections.ts +++ b/packages/core/src/agent/system-prompt/sections.ts @@ -262,9 +262,24 @@ export function buildConditionalToolSections( lines.push( "## Sub-Agents", "If a task is complex or long-running, spawn a sub-agent. It will do the work and report back when done.", - "You can check on running sub-agents at any time.", + "IMPORTANT: After spawning sub-agents, do NOT immediately check on them with sessions_list. " + + "Results are delivered directly into your context automatically when the sub-agent finishes. " + + "Continue with other tasks or finish your turn and wait for the results to arrive.", + "You may use sessions_list to check on sub-agents only if a long time has passed or the user explicitly asks about their status.", "Sub-agents cannot spawn nested sub-agents.", "", + "### Timeout Guidelines", + "Set timeoutSeconds generously — a sub-agent that times out loses all its work.", + "- Simple tasks (search, read, summarize): 600 (10 min, the default)", + "- Moderate tasks (multi-step research, file downloads + analysis): 900–1200 (15–20 min)", + "- Complex tasks (code generation, PDF creation, multi-file operations): 1200–1800 (20–30 min)", + "When in doubt, use a longer timeout. It is always better to wait longer than to lose completed work.", + "", + "### Announce Modes", + "- `announce: \"immediate\"` (default): Each sub-agent's findings are delivered to you as soon as it completes.", + "- `announce: \"silent\"`: All findings are held back until every silent sub-agent finishes, then delivered as ONE combined report.", + "Use \"silent\" when you want to collect data from multiple sub-agents first, then summarize everything at once.", + "", ); } @@ -377,6 +392,10 @@ export function buildSubagentSection( "## Subagent Rules", "- Stay focused on the assigned task below.", "- Complete the task thoroughly and report your findings.", + "- If you encounter errors (missing API keys, permission denied, tool failures, etc.), " + + "you MUST explicitly report them in your final message. " + + "State exactly what failed and what is needed to fix it — " + + "the parent agent relies on your final message to understand what happened.", "- Do NOT initiate side actions unrelated to the task.", "- Do NOT attempt to communicate with the user directly.", "- Do NOT spawn nested subagents.", diff --git a/packages/core/src/agent/tokens.test.ts b/packages/core/src/agent/tokens.test.ts new file mode 100644 index 00000000..d15f1d4f --- /dev/null +++ b/packages/core/src/agent/tokens.test.ts @@ -0,0 +1,39 @@ +import { describe, it, expect } from "vitest"; +import { isSilentReplyText, SILENT_REPLY_TOKEN } from "./tokens.js"; + +describe("isSilentReplyText", () => { + it("detects exact NO_REPLY", () => { + expect(isSilentReplyText("NO_REPLY")).toBe(true); + }); + + it("detects NO_REPLY with surrounding whitespace", () => { + expect(isSilentReplyText(" NO_REPLY ")).toBe(true); + expect(isSilentReplyText("\nNO_REPLY\n")).toBe(true); + }); + + it("detects NO_REPLY with trailing punctuation", () => { + expect(isSilentReplyText("NO_REPLY.")).toBe(true); + expect(isSilentReplyText("NO_REPLY.\n")).toBe(true); + }); + + it("detects NO_REPLY at end of text", () => { + expect(isSilentReplyText("I have nothing to report. NO_REPLY")).toBe(true); + }); + + it("returns false for undefined/empty", () => { + expect(isSilentReplyText(undefined)).toBe(false); + expect(isSilentReplyText("")).toBe(false); + }); + + it("returns false for normal text", () => { + expect(isSilentReplyText("Here are the findings")).toBe(false); + }); + + it("returns false for NO_REPLY embedded in a word", () => { + expect(isSilentReplyText("DONO_REPLYX")).toBe(false); + }); + + it("exports SILENT_REPLY_TOKEN as NO_REPLY", () => { + expect(SILENT_REPLY_TOKEN).toBe("NO_REPLY"); + }); +}); diff --git a/packages/core/src/agent/tokens.ts b/packages/core/src/agent/tokens.ts new file mode 100644 index 00000000..2ec4136f --- /dev/null +++ b/packages/core/src/agent/tokens.ts @@ -0,0 +1,17 @@ +export const SILENT_REPLY_TOKEN = "NO_REPLY"; + +function escapeRegExp(value: string): string { + return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); +} + +export function isSilentReplyText( + text: string | undefined, + token: string = SILENT_REPLY_TOKEN, +): boolean { + if (!text) return false; + const escaped = escapeRegExp(token); + const prefix = new RegExp(`^\\s*${escaped}(?=$|\\W)`); + if (prefix.test(text)) return true; + const suffix = new RegExp(`\\b${escaped}\\b\\W*$`); + return suffix.test(text); +} diff --git a/packages/core/src/agent/tools.ts b/packages/core/src/agent/tools.ts index 7905fc74..edc43dd2 100644 --- a/packages/core/src/agent/tools.ts +++ b/packages/core/src/agent/tools.ts @@ -27,6 +27,8 @@ export interface CreateToolsOptions { isSubagent?: boolean | undefined; /** Session ID of the agent (passed to sessions_spawn tool) */ sessionId?: string | undefined; + /** Resolved provider ID of the parent agent (passed to sessions_spawn for subagent inheritance) */ + provider?: string | undefined; /** Callback invoked when exec tool needs approval before running a command */ onExecApprovalNeeded?: ExecApprovalCallback | undefined; } @@ -134,6 +136,7 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool< const sessionsSpawnTool = createSessionsSpawnTool({ isSubagent: isSubagent ?? false, ...(sessionId !== undefined ? { sessionId } : {}), + ...(opts.provider !== undefined ? { provider: opts.provider } : {}), }); tools.push(sessionsSpawnTool as AgentTool); @@ -168,6 +171,7 @@ export function resolveTools(options: ResolveToolsOptions): AgentTool[] { profileDir: options.profileDir, isSubagent: options.isSubagent, sessionId: options.sessionId, + provider: options.provider, onExecApprovalNeeded: options.onExecApprovalNeeded, }); diff --git a/packages/core/src/agent/tools/sessions-list.test.ts b/packages/core/src/agent/tools/sessions-list.test.ts index a0bf6c1a..989493de 100644 --- a/packages/core/src/agent/tools/sessions-list.test.ts +++ b/packages/core/src/agent/tools/sessions-list.test.ts @@ -73,9 +73,9 @@ describe("sessions_list tool", () => { const text = result.content[0]!; expect(text.type).toBe("text"); expect((text as { text: string }).text).toContain("3 total"); - expect((text as { text: string }).text).toContain("[running]"); - expect((text as { text: string }).text).toContain("[ok]"); - expect((text as { text: string }).text).toContain("[error]"); + expect((text as { text: string }).text).toContain("[RUNNING]"); + expect((text as { text: string }).text).toContain("[OK]"); + expect((text as { text: string }).text).toContain("[ERROR]"); expect((text as { text: string }).text).toContain("Code Review"); expect((text as { text: string }).text).toContain("Test Analysis"); expect((text as { text: string }).text).toContain("Lint Check"); diff --git a/packages/core/src/agent/tools/sessions-list.ts b/packages/core/src/agent/tools/sessions-list.ts index 19a0809e..988bfa1c 100644 --- a/packages/core/src/agent/tools/sessions-list.ts +++ b/packages/core/src/agent/tools/sessions-list.ts @@ -127,7 +127,9 @@ export function createSessionsListTool( label: "List Subagent Runs", description: "List all subagent runs spawned by this session and their current status. " + - "Optionally pass a runId to get detailed information about a specific run.", + "Optionally pass a runId to get detailed information about a specific run. " + + "NOTE: Do NOT call this immediately after spawning subagents — results arrive automatically in your context when subagents complete. " + + "Only use this if a long time has passed or the user explicitly asks about subagent status.", parameters: SessionsListSchema, execute: async (_toolCallId, args) => { const { runId } = args as SessionsListArgs; @@ -173,13 +175,52 @@ export function createSessionsListTool( }; } - const lines = [`Subagent runs for this session: ${runs.length} total`, ""]; + const someRunning = runs.some((r) => !r.endedAt); + + // Build status lines for each run + const statusLines: string[] = []; for (let i = 0; i < runs.length; i++) { - lines.push(formatRunSummary(runs[i]!, i, now)); + const r = runs[i]!; + const displayName = r.label || r.task.slice(0, 60); + const status = resolveStatus(r); + if (status === "running") { + const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned"; + statusLines.push(` ${i + 1}. [RUNNING] "${displayName}" (${elapsed})`); + } else { + const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : ""; + const findings = r.findingsCaptured + ? (r.findings ? r.findings.slice(0, 200) + (r.findings.length > 200 ? "…" : "") : "(no output)") + : "(findings not yet captured)"; + statusLines.push(` ${i + 1}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`); + } } + const header = `Subagent runs for this session: ${runs.length} total`; + const body = statusLines.join("\n"); + + // If any subagents are still running, return status with wait instruction. + // We do NOT use steer() here — steer would cancel unrelated tool calls + // that the LLM may be processing in the same batch. + if (someRunning) { + const runningCount = runs.filter((r) => !r.endedAt).length; + return { + content: [ + { + type: "text", + text: + header + "\n" + body + "\n\n" + + `STATUS: ${runningCount} subagent(s) still running. This is normal — they need time to complete.\n` + + "ACTION REQUIRED: Do NOT call sessions_list again. Results will be delivered into your context automatically when they finish.\n" + + "Do NOT attempt to do this work yourself — the subagents are handling it.", + }, + ], + details: { runs: runs.map(toResultRun) }, + }; + } + + // All completed — normal response return { - content: [{ type: "text", text: lines.join("\n") }], + content: [{ type: "text", text: header + "\n" + body }], details: { runs: runs.map(toResultRun) }, }; }, diff --git a/packages/core/src/agent/tools/sessions-spawn.ts b/packages/core/src/agent/tools/sessions-spawn.ts index 35c0017b..1df31e41 100644 --- a/packages/core/src/agent/tools/sessions-spawn.ts +++ b/packages/core/src/agent/tools/sessions-spawn.ts @@ -35,6 +35,15 @@ const SessionsSpawnSchema = Type.Object({ minimum: 0, }), ), + announce: Type.Optional( + Type.Union([Type.Literal("immediate"), Type.Literal("silent")], { + description: + "Announcement mode. 'immediate' (default): findings delivered as each subagent completes. " + + "'silent': defer all announcements until every silent subagent from this session finishes, " + + "then deliver one combined report. Use 'silent' when spawning multiple subagents to collect " + + "data in parallel and you want to summarize everything at once.", + }), + ), }); type SessionsSpawnArgs = { @@ -43,6 +52,7 @@ type SessionsSpawnArgs = { model?: string; cleanup?: "delete" | "keep"; timeoutSeconds?: number; + announce?: "immediate" | "silent"; }; export type SessionsSpawnResult = { @@ -57,6 +67,8 @@ export interface CreateSessionsSpawnToolOptions { isSubagent?: boolean; /** Session ID of the current (requester) agent */ sessionId?: string; + /** Resolved provider ID of the parent agent (inherited by subagents) */ + provider?: string; } export function createSessionsSpawnTool( @@ -67,11 +79,13 @@ export function createSessionsSpawnTool( label: "Spawn Subagent", description: "Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " + - "When it completes, its findings are announced back to you automatically. " + + "When it completes, its findings are delivered directly into your context automatically — you do NOT need to poll or check. " + + "IMPORTANT: After spawning subagents, continue with any other immediate tasks you have, or simply finish your turn and wait. " + + "Do NOT call sessions_list to check on subagents you just spawned — results take time and will arrive on their own. " + "Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.", parameters: SessionsSpawnSchema, execute: async (_toolCallId, args) => { - const { task, label, model, cleanup = "delete", timeoutSeconds } = args as SessionsSpawnArgs; + const { task, label, model, cleanup = "delete", timeoutSeconds, announce } = args as SessionsSpawnArgs; // Guard: subagents cannot spawn subagents if (options.isSubagent) { @@ -107,6 +121,7 @@ export function createSessionsSpawnTool( const childAgent = hub.createSubagent(childSessionId, { systemPrompt, model, + provider: options.provider, }); // Register the run for lifecycle tracking. @@ -120,6 +135,7 @@ export function createSessionsSpawnTool( label, cleanup, timeoutSeconds, + announce, start: () => childAgent.write(task), }); @@ -127,7 +143,7 @@ export function createSessionsSpawnTool( content: [ { type: "text", - text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. You will receive its findings when it completes.`, + text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. Its findings will be delivered directly into your context when it completes — do NOT poll or call sessions_list for it. Continue with other tasks or finish your turn.`, }, ], details: {