fix(hub): suppress heartbeat ack messages in streamed chat

This commit is contained in:
Jiang Bohan 2026-02-06 17:25:22 +08:00
parent 9663079f48
commit 9f2a1c240f
3 changed files with 163 additions and 5 deletions

View file

@ -0,0 +1,66 @@
import { describe, expect, it } from "vitest";
import {
extractAssistantEventText,
isHeartbeatAckEvent,
} from "./heartbeat-filter.js";
describe("heartbeat-filter", () => {
it("extracts text from string content", () => {
const event = {
message: {
content: " HEARTBEAT_OK ",
},
};
expect(extractAssistantEventText(event)).toBe("HEARTBEAT_OK");
});
it("extracts text from content blocks", () => {
const event = {
message: {
content: [
{ type: "text", text: "line 1" },
{ type: "thinking", thinking: "hidden" },
{ type: "text", text: "line 2" },
],
},
};
expect(extractAssistantEventText(event)).toBe("line 1 line 2");
});
it("treats pure heartbeat token as ack", () => {
const event = {
message: {
content: [{ type: "text", text: "HEARTBEAT_OK" }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(true);
});
it("treats marked-up heartbeat token as ack", () => {
const event = {
message: {
content: [{ type: "text", text: "**HEARTBEAT_OK**" }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(true);
});
it("does not suppress real alert text", () => {
const event = {
message: {
content: [{ type: "text", text: "Reminder: go downstairs now." }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(false);
});
it("does not suppress token plus extra content", () => {
const event = {
message: {
content: [{ type: "text", text: "HEARTBEAT_OK Reminder: check inbox." }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(false);
});
});

View file

@ -0,0 +1,43 @@
import { stripHeartbeatToken } from "../heartbeat/index.js";
function collapseWhitespace(value: string): string {
return value.replace(/\s+/g, " ").trim();
}
/**
* Extract assistant text from an agent stream event.
* Supports both string and rich content-array message shapes.
*/
export function extractAssistantEventText(event: unknown): string {
if (!event || typeof event !== "object") return "";
const message = (event as { message?: unknown }).message;
if (!message || typeof message !== "object") return "";
const content = (message as { content?: unknown }).content;
if (typeof content === "string") {
return collapseWhitespace(content);
}
if (!Array.isArray(content)) return "";
const parts: string[] = [];
for (const block of content) {
if (!block || typeof block !== "object") continue;
const text = (block as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
parts.push(text);
}
}
return collapseWhitespace(parts.join("\n"));
}
/**
* True only for pure heartbeat ACK payloads (e.g. "HEARTBEAT_OK").
* Messages that include any extra text are not suppressed.
*/
export function isHeartbeatAckEvent(event: unknown): boolean {
const text = extractAssistantEventText(event);
if (!text) return false;
const stripped = stripHeartbeatToken(text, { mode: "message" });
return stripped.shouldSkip && stripped.didStrip;
}

View file

@ -46,12 +46,14 @@ import {
type HeartbeatRunner,
} from "../heartbeat/index.js";
import { enqueueSystemEvent } from "../heartbeat/system-events.js";
import { isHeartbeatAckEvent } from "./heartbeat-filter.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
private readonly agentSenders = new Map<string, string>();
private readonly agentStreamIds = new Map<string, string>();
private readonly agentStreamCounters = new Map<string, number>();
private readonly pendingAssistantStarts = new Map<string, { agentId: string; event: unknown }>();
private readonly localApprovalHandlers = new Map<string, (payload: ExecApprovalRequest) => void>();
private readonly rpc: RpcDispatcher;
private readonly approvalManager: ExecApprovalManager;
@ -362,6 +364,14 @@ export class Hub {
this.agentStreamIds.delete(agentId);
}
private clearPendingAssistantStarts(agentId: string): void {
for (const [streamId, pending] of this.pendingAssistantStarts) {
if (pending.agentId === agentId) {
this.pendingAssistantStarts.delete(streamId);
}
}
}
/** Internally read agent output and send via Gateway */
private async consumeAgent(agent: AsyncAgent): Promise<void> {
for await (const item of agent.read()) {
@ -397,18 +407,55 @@ export class Hub {
|| item.type === "tool_execution_end";
if (!shouldForward) continue;
if (item.type === "message_start") {
this.beginStream(agent.sessionId, item);
const isAssistantMessageEvent =
item.type === "message_start" || item.type === "message_update" || item.type === "message_end";
// Delay assistant message_start forwarding until we see content.
// This lets us suppress pure HEARTBEAT_OK acknowledgements end-to-end.
if (isAssistantMessageEvent && isAssistantMessage) {
if (item.type === "message_start") {
const streamId = this.beginStream(agent.sessionId, item);
this.pendingAssistantStarts.set(streamId, { agentId: agent.sessionId, event: item });
continue;
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
const isHeartbeatAck = isHeartbeatAckEvent(item);
if (isHeartbeatAck) {
if (item.type === "message_end") {
this.pendingAssistantStarts.delete(streamId);
this.endStream(agent.sessionId);
}
continue;
}
const pendingStart = this.pendingAssistantStarts.get(streamId);
if (pendingStart) {
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: pendingStart.event,
});
this.pendingAssistantStarts.delete(streamId);
}
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
continue;
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
}
}
}
@ -609,6 +656,7 @@ export class Hub {
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.clearPendingAssistantStarts(id);
this.localApprovalHandlers.delete(id);
removeAgentRecord(id);
this.heartbeatRunner?.updateConfig();
@ -633,6 +681,7 @@ export class Hub {
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.clearPendingAssistantStarts(id);
this.localApprovalHandlers.delete(id);
}
this.client.disconnect();