fix(agent): prevent internal run leaks in async streams

This commit is contained in:
yushen 2026-02-06 18:38:54 +08:00
parent d392238be3
commit f7267f6698
2 changed files with 151 additions and 2 deletions

View file

@ -0,0 +1,146 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { AsyncAgent } from "./async-agent.js";
const subscribeCallbacks: Array<(event: any) => void> = [];
const internalRunState = { value: false };
const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined }));
const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined }));
const flushSessionMock = vi.fn(async () => {});
const subscribeAllMock = vi.fn((fn: (event: any) => void) => {
subscribeCallbacks.push(fn);
return () => {};
});
vi.mock("./runner.js", () => ({
Agent: class MockAgent {
sessionId = "test-session";
subscribeAll = subscribeAllMock;
run = runMock;
runInternal = runInternalMock;
flushSession = flushSessionMock;
get isInternalRun() {
return internalRunState.value;
}
getMessages() {
return [];
}
loadSessionMessages() {
return [];
}
async ensureInitialized() {}
getActiveTools() {
return [];
}
reloadTools() {
return [];
}
getSkillsWithStatus() {
return [];
}
getEligibleSkills() {
return [];
}
reloadSkills() {}
setToolStatus() {
return undefined;
}
getProfileId() {
return undefined;
}
getAgentName() {
return undefined;
}
setAgentName() {}
getUserContent() {
return undefined;
}
setUserContent() {}
getAgentStyle() {
return undefined;
}
setAgentStyle() {}
reloadSystemPrompt() {}
getProviderInfo() {
return { provider: "test", model: "test-model" };
}
setProvider() {
return { provider: "test", model: "test-model" };
}
},
}));
async function nextWithTimeout<T>(iter: AsyncIterator<T>, timeoutMs = 40): Promise<"timeout" | T> {
return await Promise.race([
iter.next().then((result) => (result.done ? "timeout" : result.value)),
new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), timeoutMs)),
]);
}
describe("AsyncAgent internal flow", () => {
afterEach(() => {
subscribeCallbacks.length = 0;
internalRunState.value = false;
runMock.mockReset();
runInternalMock.mockReset();
flushSessionMock.mockReset();
subscribeAllMock.mockClear();
runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined });
runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined });
flushSessionMock.mockResolvedValue(undefined);
});
it("filters internal events in direct subscribe stream", () => {
const agent = new AsyncAgent();
const events: Array<{ type: string }> = [];
const unsubscribe = agent.subscribe((event) => {
events.push(event as { type: string });
});
// subscribeAll is called twice:
// 1) constructor for read() channel forwarding
// 2) subscribe() for direct callback forwarding
const subscribeCallback = subscribeCallbacks[1];
expect(subscribeCallback).toBeDefined();
internalRunState.value = true;
subscribeCallback!({ type: "message_end" });
expect(events).toHaveLength(0);
internalRunState.value = false;
subscribeCallback!({ type: "message_end" });
expect(events).toHaveLength(1);
unsubscribe();
agent.close();
});
it("does not leak internal run errors to read() stream", async () => {
runInternalMock.mockResolvedValueOnce({ text: "", thinking: undefined, error: "internal failed" });
const agent = new AsyncAgent();
const iter = agent.read()[Symbol.asyncIterator]();
agent.writeInternal("test internal");
await agent.waitForIdle();
const value = await nextWithTimeout(iter);
expect(value).toBe("timeout");
agent.close();
});
it("does not leak internal run exceptions to read() stream", async () => {
runInternalMock.mockRejectedValueOnce(new Error("internal exception"));
const agent = new AsyncAgent();
const iter = agent.read()[Symbol.asyncIterator]();
agent.writeInternal("test internal");
await agent.waitForIdle();
const value = await nextWithTimeout(iter);
expect(value).toBe("timeout");
agent.close();
});
});

View file

@ -74,12 +74,14 @@ export class AsyncAgent {
const result = await this.agent.runInternal(content);
await this.agent.flushSession();
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
// Internal run errors are for diagnostics only; do not leak to user stream.
console.error(`[AsyncAgent] Internal run error: ${result.error}`);
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
// Internal run exceptions are for diagnostics only; do not leak to user stream.
console.error(`[AsyncAgent] Internal run failed: ${message}`);
});
}
@ -96,6 +98,7 @@ export class AsyncAgent {
subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void {
console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`);
const unsubscribe = this.agent.subscribeAll((event) => {
if (this.agent.isInternalRun) return;
console.log(`[AsyncAgent] Event received: ${event.type}`);
callback(event);
});