diff --git a/src/hub/heartbeat-filter.test.ts b/src/hub/heartbeat-filter.test.ts new file mode 100644 index 00000000..a273a2af --- /dev/null +++ b/src/hub/heartbeat-filter.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from "vitest"; +import { + extractAssistantEventText, + isHeartbeatAckEvent, +} from "./heartbeat-filter.js"; + +describe("heartbeat-filter", () => { + it("extracts text from string content", () => { + const event = { + message: { + content: " HEARTBEAT_OK ", + }, + }; + expect(extractAssistantEventText(event)).toBe("HEARTBEAT_OK"); + }); + + it("extracts text from content blocks", () => { + const event = { + message: { + content: [ + { type: "text", text: "line 1" }, + { type: "thinking", thinking: "hidden" }, + { type: "text", text: "line 2" }, + ], + }, + }; + expect(extractAssistantEventText(event)).toBe("line 1 line 2"); + }); + + it("treats pure heartbeat token as ack", () => { + const event = { + message: { + content: [{ type: "text", text: "HEARTBEAT_OK" }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(true); + }); + + it("treats marked-up heartbeat token as ack", () => { + const event = { + message: { + content: [{ type: "text", text: "**HEARTBEAT_OK**" }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(true); + }); + + it("does not suppress real alert text", () => { + const event = { + message: { + content: [{ type: "text", text: "Reminder: go downstairs now." }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(false); + }); + + it("does not suppress token plus extra content", () => { + const event = { + message: { + content: [{ type: "text", text: "HEARTBEAT_OK Reminder: check inbox." }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(false); + }); +}); + diff --git a/src/hub/heartbeat-filter.ts b/src/hub/heartbeat-filter.ts new file mode 100644 index 00000000..9d16c781 --- /dev/null +++ b/src/hub/heartbeat-filter.ts @@ -0,0 +1,43 @@ +import { stripHeartbeatToken } from "../heartbeat/index.js"; + +function collapseWhitespace(value: string): string { + return value.replace(/\s+/g, " ").trim(); +} + +/** + * Extract assistant text from an agent stream event. + * Supports both string and rich content-array message shapes. + */ +export function extractAssistantEventText(event: unknown): string { + if (!event || typeof event !== "object") return ""; + const message = (event as { message?: unknown }).message; + if (!message || typeof message !== "object") return ""; + const content = (message as { content?: unknown }).content; + + if (typeof content === "string") { + return collapseWhitespace(content); + } + + if (!Array.isArray(content)) return ""; + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const text = (block as { text?: unknown }).text; + if (typeof text === "string" && text.trim()) { + parts.push(text); + } + } + return collapseWhitespace(parts.join("\n")); +} + +/** + * True only for pure heartbeat ACK payloads (e.g. "HEARTBEAT_OK"). + * Messages that include any extra text are not suppressed. + */ +export function isHeartbeatAckEvent(event: unknown): boolean { + const text = extractAssistantEventText(event); + if (!text) return false; + const stripped = stripHeartbeatToken(text, { mode: "message" }); + return stripped.shouldSkip && stripped.didStrip; +} + diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 786476c8..74501ba7 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -46,12 +46,14 @@ import { type HeartbeatRunner, } from "../heartbeat/index.js"; import { enqueueSystemEvent } from "../heartbeat/system-events.js"; +import { isHeartbeatAckEvent } from "./heartbeat-filter.js"; export class Hub { private readonly agents = new Map(); private readonly agentSenders = new Map(); private readonly agentStreamIds = new Map(); private readonly agentStreamCounters = new Map(); + private readonly pendingAssistantStarts = new Map(); private readonly localApprovalHandlers = new Map void>(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; @@ -362,6 +364,14 @@ export class Hub { this.agentStreamIds.delete(agentId); } + private clearPendingAssistantStarts(agentId: string): void { + for (const [streamId, pending] of this.pendingAssistantStarts) { + if (pending.agentId === agentId) { + this.pendingAssistantStarts.delete(streamId); + } + } + } + /** Internally read agent output and send via Gateway */ private async consumeAgent(agent: AsyncAgent): Promise { for await (const item of agent.read()) { @@ -397,18 +407,55 @@ export class Hub { || item.type === "tool_execution_end"; if (!shouldForward) continue; - if (item.type === "message_start") { - this.beginStream(agent.sessionId, item); + const isAssistantMessageEvent = + item.type === "message_start" || item.type === "message_update" || item.type === "message_end"; + + // Delay assistant message_start forwarding until we see content. + // This lets us suppress pure HEARTBEAT_OK acknowledgements end-to-end. + if (isAssistantMessageEvent && isAssistantMessage) { + if (item.type === "message_start") { + const streamId = this.beginStream(agent.sessionId, item); + this.pendingAssistantStarts.set(streamId, { agentId: agent.sessionId, event: item }); + continue; + } + + const streamId = this.getActiveStreamId(agent.sessionId, item); + const isHeartbeatAck = isHeartbeatAckEvent(item); + if (isHeartbeatAck) { + if (item.type === "message_end") { + this.pendingAssistantStarts.delete(streamId); + this.endStream(agent.sessionId); + } + continue; + } + + const pendingStart = this.pendingAssistantStarts.get(streamId); + if (pendingStart) { + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: pendingStart.event, + }); + this.pendingAssistantStarts.delete(streamId); + } + + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: item, + }); + if (item.type === "message_end") { + this.endStream(agent.sessionId); + } + continue; } + const streamId = this.getActiveStreamId(agent.sessionId, item); this.client.send(targetDeviceId, StreamAction, { streamId, agentId: agent.sessionId, event: item, }); - if (item.type === "message_end") { - this.endStream(agent.sessionId); - } } } } @@ -609,6 +656,7 @@ export class Hub { this.agentSenders.delete(id); this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); + this.clearPendingAssistantStarts(id); this.localApprovalHandlers.delete(id); removeAgentRecord(id); this.heartbeatRunner?.updateConfig(); @@ -633,6 +681,7 @@ export class Hub { this.agentSenders.delete(id); this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); + this.clearPendingAssistantStarts(id); this.localApprovalHandlers.delete(id); } this.client.disconnect();