From 96f83c0bc6dd01a4e956ac8a4d7b7b88d76c6134 Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Mon, 9 Feb 2026 16:53:41 +0800 Subject: [PATCH] fix(channels): suppress heartbeat ack in outbound replies --- src/channels/manager.test.ts | 119 +++++++++++++++++++++++++++++++++++ src/channels/manager.ts | 11 ++++ 2 files changed, 130 insertions(+) create mode 100644 src/channels/manager.test.ts diff --git a/src/channels/manager.test.ts b/src/channels/manager.test.ts new file mode 100644 index 00000000..463eb903 --- /dev/null +++ b/src/channels/manager.test.ts @@ -0,0 +1,119 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { Hub } from "../hub/hub.js"; +import type { AsyncAgent } from "../agent/async-agent.js"; +import type { ChannelPlugin } from "./types.js"; +import { ChannelManager } from "./manager.js"; + +type AgentEventCallback = (event: unknown) => void; + +function createHarness() { + let subscriber: AgentEventCallback | null = null; + + const agent = { + sessionId: "agent-1", + subscribe: (callback: AgentEventCallback) => { + subscriber = callback; + return () => { + subscriber = null; + }; + }, + } as unknown as AsyncAgent; + + const hub = { + listAgents: () => ["agent-1"], + getAgent: () => agent, + } as unknown as Hub; + + const replyText = vi.fn(async () => {}); + const sendText = vi.fn(async () => {}); + const plugin: ChannelPlugin = { + id: "telegram", + meta: { + name: "Telegram", + description: "test", + }, + config: { + listAccountIds: () => [], + resolveAccount: () => undefined, + isConfigured: () => false, + }, + gateway: { + start: async () => {}, + }, + outbound: { + replyText, + sendText, + }, + }; + + const manager = new ChannelManager(hub); + (manager as unknown as { lastRoute: unknown }).lastRoute = { + plugin, + deliveryCtx: { + channel: "telegram", + accountId: "default", + conversationId: "chat-1", + replyToMessageId: "in-1", + }, + }; + (manager as unknown as { ensureSubscribed: () => void }).ensureSubscribed(); + + const emit = (event: unknown) => subscriber?.(event); + + return { manager, replyText, sendText, emit }; +} + +describe("channel manager heartbeat filtering", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("suppresses pure HEARTBEAT_OK in channel outbound", async () => { + const { manager, replyText, sendText, emit } = createHarness(); + + emit({ + type: "message_start", + message: { role: "assistant", content: [] }, + }); + emit({ + type: "message_end", + message: { role: "assistant", content: [{ type: "text", text: "HEARTBEAT_OK" }] }, + }); + + await Promise.resolve(); + + expect(replyText).not.toHaveBeenCalled(); + expect(sendText).not.toHaveBeenCalled(); + + manager.stopAll(); + }); + + it("keeps forwarding normal assistant replies", async () => { + const { manager, replyText, sendText, emit } = createHarness(); + + emit({ + type: "message_start", + message: { role: "assistant", content: [] }, + }); + emit({ + type: "message_end", + message: { role: "assistant", content: [{ type: "text", text: "Reminder: check inbox." }] }, + }); + + await Promise.resolve(); + + expect(replyText).toHaveBeenCalledTimes(1); + expect(replyText).toHaveBeenCalledWith( + { + channel: "telegram", + accountId: "default", + conversationId: "chat-1", + replyToMessageId: "in-1", + }, + "Reminder: check inbox.", + ); + expect(sendText).not.toHaveBeenCalled(); + + manager.stopAll(); + }); +}); diff --git a/src/channels/manager.ts b/src/channels/manager.ts index b0042cc1..7b0cea7c 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -22,6 +22,7 @@ import type { import { listChannels } from "./registry.js"; import { loadChannelsConfig } from "./config.js"; import { MessageAggregator, DEFAULT_CHUNKER_CONFIG } from "../hub/message-aggregator.js"; +import { isHeartbeatAckEvent } from "../hub/heartbeat-filter.js"; import type { AsyncAgent } from "../agent/async-agent.js"; import { transcribeAudio } from "../media/transcribe.js"; import { describeImage } from "../media/describe-image.js"; @@ -216,6 +217,16 @@ export class ChannelManager { return; } + // Keep heartbeat acknowledgements internal (same behavior as desktop/gateway stream path). + if (isHeartbeatAckEvent(event)) { + if (event.type === "message_end") { + this.stopTyping(); + this.removeAckReaction(); + this.aggregator = null; + } + return; + } + // Ensure aggregator exists for this response if (event.type === "message_start") { this.createAggregator();