diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts index cd7f3c32..b4dd4276 100644 --- a/src/agent/async-agent.test.ts +++ b/src/agent/async-agent.test.ts @@ -143,4 +143,46 @@ describe("AsyncAgent internal flow", () => { agent.close(); }); + + it("forwards only assistant message_end events when writeInternal opts in", async () => { + let resolveRunInternal: ((value: { text: string; thinking: undefined; error: undefined }) => void) | undefined; + runInternalMock.mockImplementationOnce( + () => new Promise((resolve) => { + resolveRunInternal = resolve as typeof resolveRunInternal; + }), + ); + + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + const streamCallback = subscribeCallbacks[0]; + expect(streamCallback).toBeDefined(); + + agent.writeInternal("announce", { forwardAssistant: true }); + await Promise.resolve(); + + internalRunState.value = true; + streamCallback!({ + type: "message_end", + message: { role: "user", content: [{ type: "text", text: "hidden internal prompt" }] }, + }); + streamCallback!({ + type: "message_end", + message: { role: "assistant", content: [{ type: "text", text: "visible summary" }] }, + }); + + const first = await nextWithTimeout(iter); + expect(first).not.toBe("timeout"); + if (first !== "timeout") { + expect((first as { type: string }).type).toBe("message_end"); + expect((first as { message: { role: string } }).message.role).toBe("assistant"); + } + + const second = await nextWithTimeout(iter); + expect(second).toBe("timeout"); + + resolveRunInternal!({ text: "", thinking: undefined, error: undefined }); + await agent.waitForIdle(); + internalRunState.value = false; + agent.close(); + }); }); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 514a57e7..fc7fb15b 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -10,12 +10,18 @@ const devNull = { write: () => true } as unknown as NodeJS.WritableStream; /** Discriminated union of legacy Message, raw AgentEvent, and MulticaEvent */ export type ChannelItem = Message | AgentEvent | MulticaEvent; +export interface WriteInternalOptions { + /** Forward assistant message_end events to realtime stream during internal runs */ + forwardAssistant?: boolean | undefined; +} + export class AsyncAgent { private readonly agent: Agent; private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); private closeCallbacks: Array<() => void> = []; + private forwardInternalAssistant = false; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -29,7 +35,7 @@ export class AsyncAgent { // Suppress forwarding during internal runs to avoid leaking // orchestration messages to the frontend/real-time stream. this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => { - if (this.agent.isInternalRun) return; + if (!this.shouldForwardEvent(event)) return; this.channel.send(event); }); } @@ -63,19 +69,26 @@ export class AsyncAgent { /** * Write an internal message to agent (non-blocking, serialized queue). * Messages are persisted with `internal: true` and rolled back from - * in-memory state. Events are suppressed from the real-time stream. + * in-memory state. Events are suppressed from the real-time stream by default. */ - writeInternal(content: string): void { + writeInternal(content: string, options?: WriteInternalOptions): void { if (this._closed) throw new Error("Agent is closed"); + const forwardAssistant = options?.forwardAssistant === true; this.queue = this.queue .then(async () => { if (this._closed) return; - const result = await this.agent.runInternal(content); - await this.agent.flushSession(); - if (result.error) { - // Internal run errors are for diagnostics only; do not leak to user stream. - console.error(`[AsyncAgent] Internal run error: ${result.error}`); + const prevForward = this.forwardInternalAssistant; + this.forwardInternalAssistant = forwardAssistant; + try { + const result = await this.agent.runInternal(content); + await this.agent.flushSession(); + if (result.error) { + // Internal run errors are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run error: ${result.error}`); + } + } finally { + this.forwardInternalAssistant = prevForward; } }) .catch((err) => { @@ -98,7 +111,7 @@ export class AsyncAgent { subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); const unsubscribe = this.agent.subscribeAll((event) => { - if (this.agent.isInternalRun) return; + if (!this.shouldForwardEvent(event)) return; console.log(`[AsyncAgent] Event received: ${event.type}`); callback(event); }); @@ -113,6 +126,16 @@ export class AsyncAgent { return this.queue; } + private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { + if (!this.agent.isInternalRun) return true; + if (!this.forwardInternalAssistant) return false; + if (event.type !== "message_end") return false; + + const maybeMessage = (event as { message?: unknown }).message; + if (!maybeMessage || typeof maybeMessage !== "object") return false; + return (maybeMessage as { role?: unknown }).role === "assistant"; + } + /** Register a callback to be invoked when the agent is closed */ onClose(callback: () => void): void { if (this._closed) { diff --git a/src/agent/subagent/announce.ts b/src/agent/subagent/announce.ts index fff82b7e..8809efdb 100644 --- a/src/agent/subagent/announce.ts +++ b/src/agent/subagent/announce.ts @@ -260,7 +260,7 @@ export function runCoalescedAnnounceFlow( return false; } - parentAgent.writeInternal(message); + parentAgent.writeInternal(message, { forwardAssistant: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err); @@ -308,7 +308,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean return false; } - parentAgent.writeInternal(message); + parentAgent.writeInternal(message, { forwardAssistant: true }); return true; } catch (err) { console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);