From 45db13cbdd7474e7922e77c8bfd23fe9c93833f3 Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Wed, 11 Feb 2026 17:09:26 +0800 Subject: [PATCH] feat(subagent): add two-tier announce delivery with debounced queue Add announce-queue.ts for batching nearby subagent completions with debounce (1s) and collect mode. Implement two-tier delivery in announce.ts: queue when parent is busy, writeInternal when idle. All delivery uses writeInternal() to mark messages as internal, preventing announcement bubbles from appearing in the UI. Co-Authored-By: Claude Opus 4.6 --- .../src/agent/subagent/announce-queue.test.ts | 203 +++++++++++ .../core/src/agent/subagent/announce-queue.ts | 315 ++++++++++++++++++ packages/core/src/agent/subagent/announce.ts | 56 +++- 3 files changed, 571 insertions(+), 3 deletions(-) create mode 100644 packages/core/src/agent/subagent/announce-queue.test.ts create mode 100644 packages/core/src/agent/subagent/announce-queue.ts diff --git a/packages/core/src/agent/subagent/announce-queue.test.ts b/packages/core/src/agent/subagent/announce-queue.test.ts new file mode 100644 index 00000000..bff56972 --- /dev/null +++ b/packages/core/src/agent/subagent/announce-queue.test.ts @@ -0,0 +1,203 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + enqueueAnnounce, + resetAnnounceQueuesForTests, + getAnnounceQueueDepth, + type AnnounceQueueItem, + type AnnounceQueueSettings, +} from "./announce-queue.js"; + +afterEach(() => { + resetAnnounceQueuesForTests(); +}); + +function makeItem(overrides?: Partial): AnnounceQueueItem { + return { + prompt: "test prompt", + summaryLine: "test summary", + enqueuedAt: Date.now(), + requesterSessionId: "session-1", + ...overrides, + }; +} + +const FAST_SETTINGS: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 20, + dropPolicy: "old", +}; + +describe("announce queue", () => { + it("enqueues an item and drains via send callback", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + enqueueAnnounce({ + key: "test", + item: makeItem(), + settings: FAST_SETTINGS, + send, + }); + + // Wait for async drain + await new Promise((r) => setTimeout(r, 50)); + + expect(sent).toHaveLength(1); + expect(sent[0]!.prompt).toBe("test prompt"); + }); + + it("batches items in collect mode", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + const collectSettings: AnnounceQueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 20, + dropPolicy: "old", + }; + + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt 1" }), + settings: collectSettings, + send, + }); + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt 2" }), + settings: collectSettings, + send, + }); + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt 3" }), + settings: collectSettings, + send, + }); + + await new Promise((r) => setTimeout(r, 50)); + + // Collect mode batches all into one send + expect(sent).toHaveLength(1); + expect(sent[0]!.prompt).toContain("prompt 1"); + expect(sent[0]!.prompt).toContain("prompt 2"); + expect(sent[0]!.prompt).toContain("prompt 3"); + expect(sent[0]!.prompt).toContain("3 queued announce(s)"); + }); + + it("sends items individually in followup mode", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt A" }), + settings: FAST_SETTINGS, + send, + }); + enqueueAnnounce({ + key: "test", + item: makeItem({ prompt: "prompt B" }), + settings: FAST_SETTINGS, + send, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(sent).toHaveLength(2); + expect(sent[0]!.prompt).toBe("prompt A"); + expect(sent[1]!.prompt).toBe("prompt B"); + }); + + it("respects cap with 'new' drop policy (rejects new items)", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { + // Slow send to keep items in queue + await new Promise((r) => setTimeout(r, 200)); + sent.push(item); + }; + + const cappedSettings: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 2, + dropPolicy: "new", + }; + + const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send }); + const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send }); + const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send }); + + expect(r1).toBe(true); + expect(r2).toBe(true); + expect(r3).toBe(false); // Rejected — cap reached + }); + + it("respects cap with 'old' drop policy (drops oldest)", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { + await new Promise((r) => setTimeout(r, 200)); + sent.push(item); + }; + + const cappedSettings: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 2, + dropPolicy: "old", + }; + + enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send }); + enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send }); + enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send }); + + // Queue should have items 2 and 3 (oldest was dropped) + expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2); + }); + + it("cleans up queue after drain completes", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + enqueueAnnounce({ + key: "test", + item: makeItem(), + settings: FAST_SETTINGS, + send, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(sent).toHaveLength(1); + expect(getAnnounceQueueDepth("test")).toBe(0); + }); + + it("debounces before draining", async () => { + const sent: AnnounceQueueItem[] = []; + const send = async (item: AnnounceQueueItem) => { sent.push(item); }; + + const debouncedSettings: AnnounceQueueSettings = { + mode: "followup", + debounceMs: 100, + cap: 20, + dropPolicy: "old", + }; + + enqueueAnnounce({ + key: "test", + item: makeItem(), + settings: debouncedSettings, + send, + }); + + // Should not have sent yet (debounce) + await new Promise((r) => setTimeout(r, 30)); + expect(sent).toHaveLength(0); + + // Wait for debounce to complete + await new Promise((r) => setTimeout(r, 150)); + expect(sent).toHaveLength(1); + }); +}); diff --git a/packages/core/src/agent/subagent/announce-queue.ts b/packages/core/src/agent/subagent/announce-queue.ts new file mode 100644 index 00000000..8e88a346 --- /dev/null +++ b/packages/core/src/agent/subagent/announce-queue.ts @@ -0,0 +1,315 @@ +/** + * Announce queue for subagent result delivery. + * + * Handles queuing and batching of subagent announcements when the parent + * agent is busy. Supports debounce, cap, drop policy, and collect mode. + * + * Ported from OpenClaw (MIT license), adapted for Super Multica. + */ + +// ============================================================================ +// Types +// ============================================================================ + +export type AnnounceQueueMode = + /** Try steer, no queue fallback */ + | "steer" + /** Try steer, fall back to queue */ + | "steer-backlog" + /** Queue and send items individually */ + | "followup" + /** Queue and batch all items into one combined prompt */ + | "collect"; + +export type AnnounceDropPolicy = + /** Drop oldest items when cap reached */ + | "old" + /** Drop newest items when cap reached */ + | "new" + /** Summarize dropped items */ + | "summarize"; + +export type AnnounceQueueItem = { + prompt: string; + summaryLine?: string; + enqueuedAt: number; + requesterSessionId: string; +}; + +export type AnnounceQueueSettings = { + mode: AnnounceQueueMode; + debounceMs?: number; + cap?: number; + dropPolicy?: AnnounceDropPolicy; +}; + +type AnnounceQueueState = { + items: AnnounceQueueItem[]; + draining: boolean; + lastEnqueuedAt: number; + mode: AnnounceQueueMode; + debounceMs: number; + cap: number; + dropPolicy: AnnounceDropPolicy; + droppedCount: number; + summaryLines: string[]; + send: (item: AnnounceQueueItem) => Promise; +}; + +// ============================================================================ +// Defaults +// ============================================================================ + +const DEFAULT_DEBOUNCE_MS = 1000; +const DEFAULT_CAP = 20; +const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize"; + +export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = { + mode: "steer-backlog", + debounceMs: DEFAULT_DEBOUNCE_MS, + cap: DEFAULT_CAP, + dropPolicy: DEFAULT_DROP_POLICY, +}; + +// ============================================================================ +// Module state +// ============================================================================ + +const ANNOUNCE_QUEUES = new Map(); + +// ============================================================================ +// Public API +// ============================================================================ + +/** + * Enqueue an announcement for delivery. Returns true if enqueued, + * false if dropped (cap + "new" drop policy). + */ +export function enqueueAnnounce(params: { + key: string; + item: AnnounceQueueItem; + settings: AnnounceQueueSettings; + send: (item: AnnounceQueueItem) => Promise; +}): boolean { + const queue = getOrCreateQueue(params.key, params.settings, params.send); + queue.lastEnqueuedAt = Date.now(); + + const shouldEnqueue = applyDropPolicy(queue, params.item); + if (!shouldEnqueue) { + if (queue.dropPolicy === "new") { + scheduleAnnounceDrain(params.key); + } + return false; + } + + queue.items.push(params.item); + scheduleAnnounceDrain(params.key); + return true; +} + +/** Reset all queues (for testing). */ +export function resetAnnounceQueuesForTests(): void { + ANNOUNCE_QUEUES.clear(); +} + +/** Get the current queue depth for a key (for testing/diagnostics). */ +export function getAnnounceQueueDepth(key: string): number { + return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0; +} + +// ============================================================================ +// Queue management +// ============================================================================ + +function getOrCreateQueue( + key: string, + settings: AnnounceQueueSettings, + send: (item: AnnounceQueueItem) => Promise, +): AnnounceQueueState { + const existing = ANNOUNCE_QUEUES.get(key); + if (existing) { + existing.mode = settings.mode; + if (typeof settings.debounceMs === "number") { + existing.debounceMs = Math.max(0, settings.debounceMs); + } + if (typeof settings.cap === "number" && settings.cap > 0) { + existing.cap = Math.floor(settings.cap); + } + if (settings.dropPolicy) { + existing.dropPolicy = settings.dropPolicy; + } + existing.send = send; + return existing; + } + + const created: AnnounceQueueState = { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: settings.mode, + debounceMs: + typeof settings.debounceMs === "number" + ? Math.max(0, settings.debounceMs) + : DEFAULT_DEBOUNCE_MS, + cap: + typeof settings.cap === "number" && settings.cap > 0 + ? Math.floor(settings.cap) + : DEFAULT_CAP, + dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY, + droppedCount: 0, + summaryLines: [], + send, + }; + ANNOUNCE_QUEUES.set(key, created); + return created; +} + +// ============================================================================ +// Drop policy +// ============================================================================ + +function applyDropPolicy( + queue: AnnounceQueueState, + item: AnnounceQueueItem, +): boolean { + if (queue.items.length < queue.cap) { + return true; + } + + switch (queue.dropPolicy) { + case "new": + // Reject the incoming item + return false; + + case "old": { + // Drop the oldest item to make room + const dropped = queue.items.shift(); + if (dropped) { + queue.droppedCount++; + const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80); + queue.summaryLines.push(summary); + } + return true; + } + + case "summarize": { + // Drop the oldest item but keep a summary + const dropped = queue.items.shift(); + if (dropped) { + queue.droppedCount++; + const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80); + queue.summaryLines.push(summary); + } + return true; + } + + default: + return true; + } +} + +// ============================================================================ +// Drain scheduling +// ============================================================================ + +function scheduleAnnounceDrain(key: string): void { + const queue = ANNOUNCE_QUEUES.get(key); + if (!queue || queue.draining) return; + + queue.draining = true; + void (async () => { + try { + while (queue.items.length > 0 || queue.droppedCount > 0) { + await waitForDebounce(queue); + + if (queue.mode === "collect") { + // Batch all items into one combined prompt + const items = queue.items.splice(0, queue.items.length); + const summary = buildDropSummary(queue); + const prompt = buildCollectPrompt(items, summary); + const last = items.at(-1); + if (!last) break; + await queue.send({ ...last, prompt }); + continue; + } + + // followup / steer-backlog: send items individually + const summary = buildDropSummary(queue); + if (summary) { + const next = queue.items.shift(); + if (!next) break; + await queue.send({ ...next, prompt: summary }); + continue; + } + + const next = queue.items.shift(); + if (!next) break; + await queue.send(next); + } + } catch (err) { + console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`); + } finally { + queue.draining = false; + if (queue.items.length === 0 && queue.droppedCount === 0) { + ANNOUNCE_QUEUES.delete(key); + } else { + scheduleAnnounceDrain(key); + } + } + })(); +} + +// ============================================================================ +// Helpers +// ============================================================================ + +function waitForDebounce(queue: AnnounceQueueState): Promise { + const elapsed = Date.now() - queue.lastEnqueuedAt; + const remaining = Math.max(0, queue.debounceMs - elapsed); + if (remaining <= 0) return Promise.resolve(); + return new Promise((resolve) => setTimeout(resolve, remaining)); +} + +function buildDropSummary(queue: AnnounceQueueState): string | undefined { + if (queue.droppedCount === 0) return undefined; + + const parts: string[] = [ + `[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`, + ]; + if (queue.summaryLines.length > 0) { + parts.push(""); + for (const line of queue.summaryLines) { + parts.push(`- ${line}`); + } + } + + // Reset counters + queue.droppedCount = 0; + queue.summaryLines = []; + + return parts.join("\n"); +} + +function buildCollectPrompt( + items: AnnounceQueueItem[], + dropSummary: string | undefined, +): string { + const parts: string[] = [ + `[${items.length} queued announce(s) while agent was busy]`, + "", + ]; + + for (let i = 0; i < items.length; i++) { + parts.push(`---`); + parts.push(`Queued #${i + 1}`); + parts.push(items[i]!.prompt); + parts.push(""); + } + + if (dropSummary) { + parts.push(dropSummary); + parts.push(""); + } + + return parts.join("\n"); +} diff --git a/packages/core/src/agent/subagent/announce.ts b/packages/core/src/agent/subagent/announce.ts index 54c92e75..3532d1af 100644 --- a/packages/core/src/agent/subagent/announce.ts +++ b/packages/core/src/agent/subagent/announce.ts @@ -16,6 +16,7 @@ import type { SubagentRunRecord, SubagentSystemPromptParams, } from "./types.js"; +import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js"; /** * Build the system prompt injected into a subagent session. @@ -275,7 +276,15 @@ export function formatCoalescedAnnouncementMessage( /** * Run the coalesced announcement flow for all completed runs of a requester. - * Formats a single combined message and delivers it to the parent agent. + * Uses two-tier delivery: + * 1. Queue — if parent is busy (running or has pending writes), queue for + * later drain via writeInternal (with debounce to batch nearby completions) + * 2. Direct — if parent is idle, send immediately via writeInternal + * + * All delivery uses writeInternal() which marks entries as `internal: true`, + * preventing announcement messages from showing as user bubbles in the UI. + * We avoid steer() (cancels unrelated tool calls) and followUp() (doesn't + * mark entries as internal, polluting the chat UI). */ export function runCoalescedAnnounceFlow( requesterSessionId: string, @@ -293,7 +302,28 @@ export function runCoalescedAnnounceFlow( return false; } - parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); + // Tier 1: BUSY — parent is running or has pending writes + // Queue the announcement for delivery via writeInternal() after the parent + // finishes its current work. We do NOT use steer() (cancels unrelated tool + // calls) or followUp() (doesn't mark entries as internal, polluting the UI). + if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) { + enqueueAnnounce({ + key: requesterSessionId, + item: { + prompt: message, + summaryLine: `${records.length} subagent(s) completed`, + enqueuedAt: Date.now(), + requesterSessionId, + }, + settings: DEFAULT_ANNOUNCE_SETTINGS, + send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt), + }); + console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`); + return true; + } + + // Tier 2: IDLE — parent is idle, send directly via writeInternal + sendAnnounceDirect(requesterSessionId, message); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); @@ -301,6 +331,26 @@ export function runCoalescedAnnounceFlow( } } +/** + * Send announcement directly to parent via writeInternal. + * Used as Tier 3 (idle) and as the queue drain callback. + */ +function sendAnnounceDirect(requesterSessionId: string, message: string): void { + try { + const hub = getHub(); + const parentAgent = hub.getAgent(requesterSessionId); + if (!parentAgent || parentAgent.closed) { + console.warn( + `[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`, + ); + return; + } + parentAgent.writeInternal(message, { forwardAssistant: false, persistResponse: true }); + } catch (err) { + console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err); + } +} + /** * Run the full subagent announcement flow: * 1. Read child's last assistant reply @@ -341,7 +391,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean return false; } - parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true }); + parentAgent.writeInternal(message, { forwardAssistant: false, persistResponse: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);