diff --git a/packages/core/src/agent/async-agent.ts b/packages/core/src/agent/async-agent.ts index 41c33778..ce11f17a 100644 --- a/packages/core/src/agent/async-agent.ts +++ b/packages/core/src/agent/async-agent.ts @@ -6,6 +6,7 @@ import type { AgentOptions, Message } from "./types.js"; import type { MulticaEvent } from "./events.js"; import { injectMessageTimestamp } from "./message-timestamp.js"; import { isSilentReplyText } from "./tokens.js"; +import { isHeartbeatAckEvent } from "../hub/heartbeat-filter.js"; const devNull = { write: () => true } as unknown as NodeJS.WritableStream; @@ -45,10 +46,10 @@ export class AsyncAgent { // Forward raw AgentEvent and MulticaEvent into the channel. // Suppress forwarding during internal runs to avoid leaking // orchestration messages to the frontend/real-time stream. - this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => { - if (!this.shouldForwardEvent(event)) return; - this.channel.send(event); - }); + // Also suppresses pure heartbeat ACK messages (e.g. "HEARTBEAT_OK"). + this.agent.subscribeAll( + this.createFilteredHandler((event) => this.channel.send(event)), + ); } get closed(): boolean { @@ -160,11 +161,12 @@ export class AsyncAgent { */ subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); - const unsubscribe = this.agent.subscribeAll((event) => { - if (!this.shouldForwardEvent(event)) return; - console.log(`[AsyncAgent] Event received: ${event.type}`); - callback(event); - }); + const unsubscribe = this.agent.subscribeAll( + this.createFilteredHandler((event) => { + console.log(`[AsyncAgent] Event received: ${event.type}`); + callback(event); + }), + ); return () => { console.log(`[AsyncAgent] Removing subscriber for agent: ${this.sessionId}`); unsubscribe(); @@ -225,6 +227,68 @@ export class AsyncAgent { return (maybeMessage as { role?: unknown }).role === "assistant"; } + /** + * Wrap a forwarding callback with shouldForwardEvent + heartbeat ACK suppression. + * + * Mirrors Hub's pattern: buffer `message_start` for assistant messages, then + * check subsequent events with `isHeartbeatAckEvent()`. If the message is a + * pure heartbeat ACK (e.g. "HEARTBEAT_OK"), suppress the entire sequence. + * Otherwise flush the buffered start and forward normally. + */ + private createFilteredHandler( + forward: (event: AgentEvent | MulticaEvent) => void, + ): (event: AgentEvent | MulticaEvent) => void { + let pendingStart: (AgentEvent | MulticaEvent) | null = null; + + return (event: AgentEvent | MulticaEvent) => { + if (!this.shouldForwardEvent(event)) return; + + const isAssistantMsg = this.isAssistantMessageEvent(event); + + if (!isAssistantMsg) { + // Non-assistant event: flush any pending start, then forward + if (pendingStart) { + forward(pendingStart); + pendingStart = null; + } + forward(event); + return; + } + + // Assistant message event — apply heartbeat ACK suppression + if (event.type === "message_start") { + pendingStart = event; + return; + } + + // Check if this is a heartbeat ACK on content/end events + if (isHeartbeatAckEvent(event)) { + if (event.type === "message_end") { + // Entire message was a heartbeat ACK — suppress it + pendingStart = null; + } + return; + } + + // Not a heartbeat ACK — flush buffered start if present, then forward + if (pendingStart) { + forward(pendingStart); + pendingStart = null; + } + forward(event); + }; + } + + /** Check if an event is an assistant message event (message_start/update/end with role=assistant) */ + private isAssistantMessageEvent(event: AgentEvent | MulticaEvent): boolean { + if (event.type !== "message_start" && event.type !== "message_update" && event.type !== "message_end") { + return false; + } + const maybeMessage = (event as { message?: unknown }).message; + if (!maybeMessage || typeof maybeMessage !== "object") return false; + return (maybeMessage as { role?: unknown }).role === "assistant"; + } + /** Register a callback to be invoked when the agent is closed */ onClose(callback: () => void): void { if (this._closed) {