fix(agent): filter heartbeat ACK messages from desktop event stream
The heartbeat runner uses agent.write() (normal write), so heartbeat ACK responses like "HEARTBEAT_OK" were not suppressed by the internal run filter and leaked into the desktop UI chat. The Gateway path was already fixed (Hub has delayed-start + isHeartbeatAckEvent filtering), but the local Desktop path through AsyncAgent had no such filtering. Add createFilteredHandler() to AsyncAgent that buffers message_start for assistant messages and checks subsequent events with isHeartbeatAckEvent(). Pure heartbeat ACKs are suppressed end-to-end; all other messages are forwarded normally. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
16cd5d0aaf
commit
8d6a803739
1 changed files with 73 additions and 9 deletions
|
|
@ -6,6 +6,7 @@ import type { AgentOptions, Message } from "./types.js";
|
|||
import type { MulticaEvent } from "./events.js";
|
||||
import { injectMessageTimestamp } from "./message-timestamp.js";
|
||||
import { isSilentReplyText } from "./tokens.js";
|
||||
import { isHeartbeatAckEvent } from "../hub/heartbeat-filter.js";
|
||||
|
||||
const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
|
||||
|
||||
|
|
@ -45,10 +46,10 @@ export class AsyncAgent {
|
|||
// Forward raw AgentEvent and MulticaEvent into the channel.
|
||||
// Suppress forwarding during internal runs to avoid leaking
|
||||
// orchestration messages to the frontend/real-time stream.
|
||||
this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => {
|
||||
if (!this.shouldForwardEvent(event)) return;
|
||||
this.channel.send(event);
|
||||
});
|
||||
// Also suppresses pure heartbeat ACK messages (e.g. "HEARTBEAT_OK").
|
||||
this.agent.subscribeAll(
|
||||
this.createFilteredHandler((event) => this.channel.send(event)),
|
||||
);
|
||||
}
|
||||
|
||||
get closed(): boolean {
|
||||
|
|
@ -160,11 +161,12 @@ export class AsyncAgent {
|
|||
*/
|
||||
subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void {
|
||||
console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`);
|
||||
const unsubscribe = this.agent.subscribeAll((event) => {
|
||||
if (!this.shouldForwardEvent(event)) return;
|
||||
console.log(`[AsyncAgent] Event received: ${event.type}`);
|
||||
callback(event);
|
||||
});
|
||||
const unsubscribe = this.agent.subscribeAll(
|
||||
this.createFilteredHandler((event) => {
|
||||
console.log(`[AsyncAgent] Event received: ${event.type}`);
|
||||
callback(event);
|
||||
}),
|
||||
);
|
||||
return () => {
|
||||
console.log(`[AsyncAgent] Removing subscriber for agent: ${this.sessionId}`);
|
||||
unsubscribe();
|
||||
|
|
@ -225,6 +227,68 @@ export class AsyncAgent {
|
|||
return (maybeMessage as { role?: unknown }).role === "assistant";
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap a forwarding callback with shouldForwardEvent + heartbeat ACK suppression.
|
||||
*
|
||||
* Mirrors Hub's pattern: buffer `message_start` for assistant messages, then
|
||||
* check subsequent events with `isHeartbeatAckEvent()`. If the message is a
|
||||
* pure heartbeat ACK (e.g. "HEARTBEAT_OK"), suppress the entire sequence.
|
||||
* Otherwise flush the buffered start and forward normally.
|
||||
*/
|
||||
private createFilteredHandler(
|
||||
forward: (event: AgentEvent | MulticaEvent) => void,
|
||||
): (event: AgentEvent | MulticaEvent) => void {
|
||||
let pendingStart: (AgentEvent | MulticaEvent) | null = null;
|
||||
|
||||
return (event: AgentEvent | MulticaEvent) => {
|
||||
if (!this.shouldForwardEvent(event)) return;
|
||||
|
||||
const isAssistantMsg = this.isAssistantMessageEvent(event);
|
||||
|
||||
if (!isAssistantMsg) {
|
||||
// Non-assistant event: flush any pending start, then forward
|
||||
if (pendingStart) {
|
||||
forward(pendingStart);
|
||||
pendingStart = null;
|
||||
}
|
||||
forward(event);
|
||||
return;
|
||||
}
|
||||
|
||||
// Assistant message event — apply heartbeat ACK suppression
|
||||
if (event.type === "message_start") {
|
||||
pendingStart = event;
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if this is a heartbeat ACK on content/end events
|
||||
if (isHeartbeatAckEvent(event)) {
|
||||
if (event.type === "message_end") {
|
||||
// Entire message was a heartbeat ACK — suppress it
|
||||
pendingStart = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Not a heartbeat ACK — flush buffered start if present, then forward
|
||||
if (pendingStart) {
|
||||
forward(pendingStart);
|
||||
pendingStart = null;
|
||||
}
|
||||
forward(event);
|
||||
};
|
||||
}
|
||||
|
||||
/** Check if an event is an assistant message event (message_start/update/end with role=assistant) */
|
||||
private isAssistantMessageEvent(event: AgentEvent | MulticaEvent): boolean {
|
||||
if (event.type !== "message_start" && event.type !== "message_update" && event.type !== "message_end") {
|
||||
return false;
|
||||
}
|
||||
const maybeMessage = (event as { message?: unknown }).message;
|
||||
if (!maybeMessage || typeof maybeMessage !== "object") return false;
|
||||
return (maybeMessage as { role?: unknown }).role === "assistant";
|
||||
}
|
||||
|
||||
/** Register a callback to be invoked when the agent is closed */
|
||||
onClose(callback: () => void): void {
|
||||
if (this._closed) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue