feat(subagent): add two-tier announce delivery with debounced queue

Add announce-queue.ts for batching nearby subagent completions with
debounce (1s) and collect mode. Implement two-tier delivery in
announce.ts: queue when parent is busy, writeInternal when idle.
All delivery uses writeInternal() to mark messages as internal,
preventing announcement bubbles from appearing in the UI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jiang Bohan 2026-02-11 17:09:26 +08:00
parent 7562009a83
commit 45db13cbdd
3 changed files with 571 additions and 3 deletions

View file

@ -0,0 +1,203 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
enqueueAnnounce,
resetAnnounceQueuesForTests,
getAnnounceQueueDepth,
type AnnounceQueueItem,
type AnnounceQueueSettings,
} from "./announce-queue.js";
afterEach(() => {
resetAnnounceQueuesForTests();
});
function makeItem(overrides?: Partial<AnnounceQueueItem>): AnnounceQueueItem {
return {
prompt: "test prompt",
summaryLine: "test summary",
enqueuedAt: Date.now(),
requesterSessionId: "session-1",
...overrides,
};
}
const FAST_SETTINGS: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 20,
dropPolicy: "old",
};
describe("announce queue", () => {
it("enqueues an item and drains via send callback", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
enqueueAnnounce({
key: "test",
item: makeItem(),
settings: FAST_SETTINGS,
send,
});
// Wait for async drain
await new Promise((r) => setTimeout(r, 50));
expect(sent).toHaveLength(1);
expect(sent[0]!.prompt).toBe("test prompt");
});
it("batches items in collect mode", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
const collectSettings: AnnounceQueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 20,
dropPolicy: "old",
};
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt 1" }),
settings: collectSettings,
send,
});
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt 2" }),
settings: collectSettings,
send,
});
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt 3" }),
settings: collectSettings,
send,
});
await new Promise((r) => setTimeout(r, 50));
// Collect mode batches all into one send
expect(sent).toHaveLength(1);
expect(sent[0]!.prompt).toContain("prompt 1");
expect(sent[0]!.prompt).toContain("prompt 2");
expect(sent[0]!.prompt).toContain("prompt 3");
expect(sent[0]!.prompt).toContain("3 queued announce(s)");
});
it("sends items individually in followup mode", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt A" }),
settings: FAST_SETTINGS,
send,
});
enqueueAnnounce({
key: "test",
item: makeItem({ prompt: "prompt B" }),
settings: FAST_SETTINGS,
send,
});
await new Promise((r) => setTimeout(r, 50));
expect(sent).toHaveLength(2);
expect(sent[0]!.prompt).toBe("prompt A");
expect(sent[1]!.prompt).toBe("prompt B");
});
it("respects cap with 'new' drop policy (rejects new items)", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => {
// Slow send to keep items in queue
await new Promise((r) => setTimeout(r, 200));
sent.push(item);
};
const cappedSettings: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 2,
dropPolicy: "new",
};
const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
expect(r1).toBe(true);
expect(r2).toBe(true);
expect(r3).toBe(false); // Rejected — cap reached
});
it("respects cap with 'old' drop policy (drops oldest)", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => {
await new Promise((r) => setTimeout(r, 200));
sent.push(item);
};
const cappedSettings: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 2,
dropPolicy: "old",
};
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
// Queue should have items 2 and 3 (oldest was dropped)
expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2);
});
it("cleans up queue after drain completes", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
enqueueAnnounce({
key: "test",
item: makeItem(),
settings: FAST_SETTINGS,
send,
});
await new Promise((r) => setTimeout(r, 50));
expect(sent).toHaveLength(1);
expect(getAnnounceQueueDepth("test")).toBe(0);
});
it("debounces before draining", async () => {
const sent: AnnounceQueueItem[] = [];
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
const debouncedSettings: AnnounceQueueSettings = {
mode: "followup",
debounceMs: 100,
cap: 20,
dropPolicy: "old",
};
enqueueAnnounce({
key: "test",
item: makeItem(),
settings: debouncedSettings,
send,
});
// Should not have sent yet (debounce)
await new Promise((r) => setTimeout(r, 30));
expect(sent).toHaveLength(0);
// Wait for debounce to complete
await new Promise((r) => setTimeout(r, 150));
expect(sent).toHaveLength(1);
});
});

View file

@ -0,0 +1,315 @@
/**
* Announce queue for subagent result delivery.
*
* Handles queuing and batching of subagent announcements when the parent
* agent is busy. Supports debounce, cap, drop policy, and collect mode.
*
* Ported from OpenClaw (MIT license), adapted for Super Multica.
*/
// ============================================================================
// Types
// ============================================================================
export type AnnounceQueueMode =
/** Try steer, no queue fallback */
| "steer"
/** Try steer, fall back to queue */
| "steer-backlog"
/** Queue and send items individually */
| "followup"
/** Queue and batch all items into one combined prompt */
| "collect";
export type AnnounceDropPolicy =
/** Drop oldest items when cap reached */
| "old"
/** Drop newest items when cap reached */
| "new"
/** Summarize dropped items */
| "summarize";
export type AnnounceQueueItem = {
prompt: string;
summaryLine?: string;
enqueuedAt: number;
requesterSessionId: string;
};
export type AnnounceQueueSettings = {
mode: AnnounceQueueMode;
debounceMs?: number;
cap?: number;
dropPolicy?: AnnounceDropPolicy;
};
type AnnounceQueueState = {
items: AnnounceQueueItem[];
draining: boolean;
lastEnqueuedAt: number;
mode: AnnounceQueueMode;
debounceMs: number;
cap: number;
dropPolicy: AnnounceDropPolicy;
droppedCount: number;
summaryLines: string[];
send: (item: AnnounceQueueItem) => Promise<void>;
};
// ============================================================================
// Defaults
// ============================================================================
const DEFAULT_DEBOUNCE_MS = 1000;
const DEFAULT_CAP = 20;
const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize";
export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = {
mode: "steer-backlog",
debounceMs: DEFAULT_DEBOUNCE_MS,
cap: DEFAULT_CAP,
dropPolicy: DEFAULT_DROP_POLICY,
};
// ============================================================================
// Module state
// ============================================================================
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
// ============================================================================
// Public API
// ============================================================================
/**
* Enqueue an announcement for delivery. Returns true if enqueued,
* false if dropped (cap + "new" drop policy).
*/
export function enqueueAnnounce(params: {
key: string;
item: AnnounceQueueItem;
settings: AnnounceQueueSettings;
send: (item: AnnounceQueueItem) => Promise<void>;
}): boolean {
const queue = getOrCreateQueue(params.key, params.settings, params.send);
queue.lastEnqueuedAt = Date.now();
const shouldEnqueue = applyDropPolicy(queue, params.item);
if (!shouldEnqueue) {
if (queue.dropPolicy === "new") {
scheduleAnnounceDrain(params.key);
}
return false;
}
queue.items.push(params.item);
scheduleAnnounceDrain(params.key);
return true;
}
/** Reset all queues (for testing). */
export function resetAnnounceQueuesForTests(): void {
ANNOUNCE_QUEUES.clear();
}
/** Get the current queue depth for a key (for testing/diagnostics). */
export function getAnnounceQueueDepth(key: string): number {
return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0;
}
// ============================================================================
// Queue management
// ============================================================================
function getOrCreateQueue(
key: string,
settings: AnnounceQueueSettings,
send: (item: AnnounceQueueItem) => Promise<void>,
): AnnounceQueueState {
const existing = ANNOUNCE_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
if (typeof settings.debounceMs === "number") {
existing.debounceMs = Math.max(0, settings.debounceMs);
}
if (typeof settings.cap === "number" && settings.cap > 0) {
existing.cap = Math.floor(settings.cap);
}
if (settings.dropPolicy) {
existing.dropPolicy = settings.dropPolicy;
}
existing.send = send;
return existing;
}
const created: AnnounceQueueState = {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
debounceMs:
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: DEFAULT_DEBOUNCE_MS,
cap:
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: DEFAULT_CAP,
dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY,
droppedCount: 0,
summaryLines: [],
send,
};
ANNOUNCE_QUEUES.set(key, created);
return created;
}
// ============================================================================
// Drop policy
// ============================================================================
function applyDropPolicy(
queue: AnnounceQueueState,
item: AnnounceQueueItem,
): boolean {
if (queue.items.length < queue.cap) {
return true;
}
switch (queue.dropPolicy) {
case "new":
// Reject the incoming item
return false;
case "old": {
// Drop the oldest item to make room
const dropped = queue.items.shift();
if (dropped) {
queue.droppedCount++;
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
queue.summaryLines.push(summary);
}
return true;
}
case "summarize": {
// Drop the oldest item but keep a summary
const dropped = queue.items.shift();
if (dropped) {
queue.droppedCount++;
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
queue.summaryLines.push(summary);
}
return true;
}
default:
return true;
}
}
// ============================================================================
// Drain scheduling
// ============================================================================
function scheduleAnnounceDrain(key: string): void {
const queue = ANNOUNCE_QUEUES.get(key);
if (!queue || queue.draining) return;
queue.draining = true;
void (async () => {
try {
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForDebounce(queue);
if (queue.mode === "collect") {
// Batch all items into one combined prompt
const items = queue.items.splice(0, queue.items.length);
const summary = buildDropSummary(queue);
const prompt = buildCollectPrompt(items, summary);
const last = items.at(-1);
if (!last) break;
await queue.send({ ...last, prompt });
continue;
}
// followup / steer-backlog: send items individually
const summary = buildDropSummary(queue);
if (summary) {
const next = queue.items.shift();
if (!next) break;
await queue.send({ ...next, prompt: summary });
continue;
}
const next = queue.items.shift();
if (!next) break;
await queue.send(next);
}
} catch (err) {
console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
ANNOUNCE_QUEUES.delete(key);
} else {
scheduleAnnounceDrain(key);
}
}
})();
}
// ============================================================================
// Helpers
// ============================================================================
function waitForDebounce(queue: AnnounceQueueState): Promise<void> {
const elapsed = Date.now() - queue.lastEnqueuedAt;
const remaining = Math.max(0, queue.debounceMs - elapsed);
if (remaining <= 0) return Promise.resolve();
return new Promise((resolve) => setTimeout(resolve, remaining));
}
function buildDropSummary(queue: AnnounceQueueState): string | undefined {
if (queue.droppedCount === 0) return undefined;
const parts: string[] = [
`[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`,
];
if (queue.summaryLines.length > 0) {
parts.push("");
for (const line of queue.summaryLines) {
parts.push(`- ${line}`);
}
}
// Reset counters
queue.droppedCount = 0;
queue.summaryLines = [];
return parts.join("\n");
}
function buildCollectPrompt(
items: AnnounceQueueItem[],
dropSummary: string | undefined,
): string {
const parts: string[] = [
`[${items.length} queued announce(s) while agent was busy]`,
"",
];
for (let i = 0; i < items.length; i++) {
parts.push(`---`);
parts.push(`Queued #${i + 1}`);
parts.push(items[i]!.prompt);
parts.push("");
}
if (dropSummary) {
parts.push(dropSummary);
parts.push("");
}
return parts.join("\n");
}

View file

@ -16,6 +16,7 @@ import type {
SubagentRunRecord,
SubagentSystemPromptParams,
} from "./types.js";
import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js";
/**
* Build the system prompt injected into a subagent session.
@ -275,7 +276,15 @@ export function formatCoalescedAnnouncementMessage(
/**
* Run the coalesced announcement flow for all completed runs of a requester.
* Formats a single combined message and delivers it to the parent agent.
* Uses two-tier delivery:
* 1. Queue if parent is busy (running or has pending writes), queue for
* later drain via writeInternal (with debounce to batch nearby completions)
* 2. Direct if parent is idle, send immediately via writeInternal
*
* All delivery uses writeInternal() which marks entries as `internal: true`,
* preventing announcement messages from showing as user bubbles in the UI.
* We avoid steer() (cancels unrelated tool calls) and followUp() (doesn't
* mark entries as internal, polluting the chat UI).
*/
export function runCoalescedAnnounceFlow(
requesterSessionId: string,
@ -293,7 +302,28 @@ export function runCoalescedAnnounceFlow(
return false;
}
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
// Tier 1: BUSY — parent is running or has pending writes
// Queue the announcement for delivery via writeInternal() after the parent
// finishes its current work. We do NOT use steer() (cancels unrelated tool
// calls) or followUp() (doesn't mark entries as internal, polluting the UI).
if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) {
enqueueAnnounce({
key: requesterSessionId,
item: {
prompt: message,
summaryLine: `${records.length} subagent(s) completed`,
enqueuedAt: Date.now(),
requesterSessionId,
},
settings: DEFAULT_ANNOUNCE_SETTINGS,
send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt),
});
console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`);
return true;
}
// Tier 2: IDLE — parent is idle, send directly via writeInternal
sendAnnounceDirect(requesterSessionId, message);
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err);
@ -301,6 +331,26 @@ export function runCoalescedAnnounceFlow(
}
}
/**
* Send announcement directly to parent via writeInternal.
* Used as Tier 3 (idle) and as the queue drain callback.
*/
function sendAnnounceDirect(requesterSessionId: string, message: string): void {
try {
const hub = getHub();
const parentAgent = hub.getAgent(requesterSessionId);
if (!parentAgent || parentAgent.closed) {
console.warn(
`[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`,
);
return;
}
parentAgent.writeInternal(message, { forwardAssistant: false, persistResponse: true });
} catch (err) {
console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err);
}
}
/**
* Run the full subagent announcement flow:
* 1. Read child's last assistant reply
@ -341,7 +391,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean
return false;
}
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
parentAgent.writeInternal(message, { forwardAssistant: false, persistResponse: true });
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);