From f7267f6698f66114c3e45e6bae4dfc94dc9f97c9 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 18:38:54 +0800 Subject: [PATCH] fix(agent): prevent internal run leaks in async streams --- src/agent/async-agent.test.ts | 146 ++++++++++++++++++++++++++++++++++ src/agent/async-agent.ts | 7 +- 2 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 src/agent/async-agent.test.ts diff --git a/src/agent/async-agent.test.ts b/src/agent/async-agent.test.ts new file mode 100644 index 00000000..cd7f3c32 --- /dev/null +++ b/src/agent/async-agent.test.ts @@ -0,0 +1,146 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { AsyncAgent } from "./async-agent.js"; + +const subscribeCallbacks: Array<(event: any) => void> = []; +const internalRunState = { value: false }; + +const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); +const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined })); +const flushSessionMock = vi.fn(async () => {}); +const subscribeAllMock = vi.fn((fn: (event: any) => void) => { + subscribeCallbacks.push(fn); + return () => {}; +}); + +vi.mock("./runner.js", () => ({ + Agent: class MockAgent { + sessionId = "test-session"; + subscribeAll = subscribeAllMock; + run = runMock; + runInternal = runInternalMock; + flushSession = flushSessionMock; + get isInternalRun() { + return internalRunState.value; + } + getMessages() { + return []; + } + loadSessionMessages() { + return []; + } + async ensureInitialized() {} + getActiveTools() { + return []; + } + reloadTools() { + return []; + } + getSkillsWithStatus() { + return []; + } + getEligibleSkills() { + return []; + } + reloadSkills() {} + setToolStatus() { + return undefined; + } + getProfileId() { + return undefined; + } + getAgentName() { + return undefined; + } + setAgentName() {} + getUserContent() { + return undefined; + } + setUserContent() {} + getAgentStyle() { + return undefined; + } + setAgentStyle() {} + reloadSystemPrompt() {} + getProviderInfo() { + return { provider: "test", model: "test-model" }; + } + setProvider() { + return { provider: "test", model: "test-model" }; + } + }, +})); + +async function nextWithTimeout(iter: AsyncIterator, timeoutMs = 40): Promise<"timeout" | T> { + return await Promise.race([ + iter.next().then((result) => (result.done ? "timeout" : result.value)), + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), timeoutMs)), + ]); +} + +describe("AsyncAgent internal flow", () => { + afterEach(() => { + subscribeCallbacks.length = 0; + internalRunState.value = false; + runMock.mockReset(); + runInternalMock.mockReset(); + flushSessionMock.mockReset(); + subscribeAllMock.mockClear(); + runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); + runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined }); + flushSessionMock.mockResolvedValue(undefined); + }); + + it("filters internal events in direct subscribe stream", () => { + const agent = new AsyncAgent(); + const events: Array<{ type: string }> = []; + + const unsubscribe = agent.subscribe((event) => { + events.push(event as { type: string }); + }); + + // subscribeAll is called twice: + // 1) constructor for read() channel forwarding + // 2) subscribe() for direct callback forwarding + const subscribeCallback = subscribeCallbacks[1]; + expect(subscribeCallback).toBeDefined(); + + internalRunState.value = true; + subscribeCallback!({ type: "message_end" }); + expect(events).toHaveLength(0); + + internalRunState.value = false; + subscribeCallback!({ type: "message_end" }); + expect(events).toHaveLength(1); + + unsubscribe(); + agent.close(); + }); + + it("does not leak internal run errors to read() stream", async () => { + runInternalMock.mockResolvedValueOnce({ text: "", thinking: undefined, error: "internal failed" }); + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + + agent.writeInternal("test internal"); + await agent.waitForIdle(); + + const value = await nextWithTimeout(iter); + expect(value).toBe("timeout"); + + agent.close(); + }); + + it("does not leak internal run exceptions to read() stream", async () => { + runInternalMock.mockRejectedValueOnce(new Error("internal exception")); + const agent = new AsyncAgent(); + const iter = agent.read()[Symbol.asyncIterator](); + + agent.writeInternal("test internal"); + await agent.waitForIdle(); + + const value = await nextWithTimeout(iter); + expect(value).toBe("timeout"); + + agent.close(); + }); +}); diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index e7aa7c2e..514a57e7 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -74,12 +74,14 @@ export class AsyncAgent { const result = await this.agent.runInternal(content); await this.agent.flushSession(); if (result.error) { - this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + // Internal run errors are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run error: ${result.error}`); } }) .catch((err) => { const message = err instanceof Error ? err.message : String(err); - this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + // Internal run exceptions are for diagnostics only; do not leak to user stream. + console.error(`[AsyncAgent] Internal run failed: ${message}`); }); } @@ -96,6 +98,7 @@ export class AsyncAgent { subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); const unsubscribe = this.agent.subscribeAll((event) => { + if (this.agent.isInternalRun) return; console.log(`[AsyncAgent] Event received: ${event.type}`); callback(event); });