refactor(channels): bind route keys to isolated conversations

This commit is contained in:
Jiayuan Zhang 2026-02-17 03:12:47 +08:00
parent f0f6055031
commit dee70ea659
2 changed files with 668 additions and 332 deletions

View file

@ -1,31 +1,73 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { Hub } from "../hub/hub.js";
import type { AsyncAgent } from "../agent/async-agent.js";
import type { ChannelPlugin } from "./types.js";
import type { ChannelPlugin, ChannelMessage } from "./types.js";
import { ChannelManager } from "./manager.js";
type AgentEventCallback = (event: unknown) => void;
function createHarness() {
type AgentHarness = {
agent: AsyncAgent;
write: ReturnType<typeof vi.fn>;
emit: (event: unknown) => void;
};
function createAgentHarness(sessionId: string): AgentHarness {
let subscriber: AgentEventCallback | null = null;
const write = vi.fn();
const agent = {
sessionId: "agent-1",
sessionId,
closed: false,
subscribe: (callback: AgentEventCallback) => {
subscriber = callback;
return () => {
subscriber = null;
};
},
write,
} as unknown as AsyncAgent;
return {
agent,
write,
emit: (event: unknown) => {
subscriber?.(event);
},
};
}
function createHarness() {
const conversations = new Map<string, AgentHarness>();
let conversationCounter = 0;
const createConversation = vi.fn(() => {
conversationCounter += 1;
const id = `conv-${conversationCounter}`;
const harness = createAgentHarness(id);
conversations.set(id, harness);
return harness.agent;
});
const mainConversation = createAgentHarness("main-conv");
const createAgent = vi.fn(() => {
conversations.set(mainConversation.agent.sessionId, mainConversation);
return mainConversation.agent;
});
const hub = {
listAgents: () => ["agent-1"],
getAgent: () => agent,
listAgents: vi.fn(() => ["agent-1"]),
createAgent,
createConversation,
getConversation: vi.fn((conversationId: string) => conversations.get(conversationId)?.agent),
getConversationAgentId: vi.fn(() => "agent-1"),
broadcastInbound: vi.fn(),
} as unknown as Hub;
const replyText = vi.fn(async () => {});
const sendText = vi.fn(async () => {});
const addReaction = vi.fn(async () => {});
const plugin: ChannelPlugin = {
id: "telegram",
meta: {
@ -43,39 +85,75 @@ function createHarness() {
outbound: {
replyText,
sendText,
addReaction,
},
};
const manager = new ChannelManager(hub);
(manager as unknown as { lastRoute: unknown }).lastRoute = {
plugin,
deliveryCtx: {
channel: "telegram",
accountId: "default",
conversationId: "chat-1",
replyToMessageId: "in-1",
},
const routeIncoming = (message: ChannelMessage) => {
(manager as unknown as {
routeIncoming: (plugin: ChannelPlugin, accountId: string, message: ChannelMessage) => void;
}).routeIncoming(plugin, "default", message);
};
(manager as unknown as { ensureSubscribed: () => void }).ensureSubscribed();
const emit = (event: unknown) => subscriber?.(event);
const getConversationIdByExternal = (externalConversationId: string): string | undefined => {
const bindings = (manager as unknown as {
routeBindings: Map<string, { hubConversationId: string }>;
}).routeBindings;
return { manager, replyText, sendText, emit };
for (const [routeKey, binding] of bindings.entries()) {
if (routeKey.endsWith(`:${externalConversationId}`)) {
return binding.hubConversationId;
}
}
return undefined;
};
return {
manager,
hub,
replyText,
sendText,
addReaction,
routeIncoming,
getConversationIdByExternal,
conversations,
};
}
describe("channel manager heartbeat filtering", () => {
describe("channel manager route isolation", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it("suppresses pure HEARTBEAT_OK in channel outbound", async () => {
const { manager, replyText, sendText, emit } = createHarness();
const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness();
emit({
routeIncoming({
messageId: "in-1",
conversationId: "chat-1",
senderId: "user-1",
text: "hi",
chatType: "direct",
});
const hubConversationId = getConversationIdByExternal("chat-1");
expect(hubConversationId).toBeDefined();
const harness = conversations.get(hubConversationId!);
expect(harness).toBeDefined();
harness!.emit({
type: "message_start",
message: { role: "assistant", content: [] },
});
emit({
harness!.emit({
type: "message_end",
message: { role: "assistant", content: [{ type: "text", text: "HEARTBEAT_OK" }] },
});
@ -89,13 +167,27 @@ describe("channel manager heartbeat filtering", () => {
});
it("keeps forwarding normal assistant replies", async () => {
const { manager, replyText, sendText, emit } = createHarness();
const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness();
emit({
routeIncoming({
messageId: "in-1",
conversationId: "chat-1",
senderId: "user-1",
text: "hi",
chatType: "direct",
});
const hubConversationId = getConversationIdByExternal("chat-1");
expect(hubConversationId).toBeDefined();
const harness = conversations.get(hubConversationId!);
expect(harness).toBeDefined();
harness!.emit({
type: "message_start",
message: { role: "assistant", content: [] },
});
emit({
harness!.emit({
type: "message_end",
message: { role: "assistant", content: [{ type: "text", text: "Reminder: check inbox." }] },
});
@ -116,4 +208,66 @@ describe("channel manager heartbeat filtering", () => {
manager.stopAll();
});
it("binds different external conversations to isolated hub conversations", async () => {
const {
manager,
hub,
routeIncoming,
getConversationIdByExternal,
conversations,
} = createHarness();
routeIncoming({
messageId: "in-a1",
conversationId: "chat-a",
senderId: "user-a",
text: "alpha",
chatType: "group",
});
routeIncoming({
messageId: "in-b1",
conversationId: "chat-b",
senderId: "user-b",
text: "beta",
chatType: "group",
});
await vi.advanceTimersByTimeAsync(600);
const convA = getConversationIdByExternal("chat-a");
const convB = getConversationIdByExternal("chat-b");
expect(convA).toBeDefined();
expect(convB).toBeDefined();
expect(convA).not.toBe(convB);
const harnessA = conversations.get(convA!);
const harnessB = conversations.get(convB!);
expect(harnessA?.write).toHaveBeenCalledTimes(1);
expect(harnessA?.write.mock.calls[0]?.[0]).toContain("alpha");
expect(harnessB?.write).toHaveBeenCalledTimes(1);
expect(harnessB?.write.mock.calls[0]?.[0]).toContain("beta");
// Same external route should reuse existing hub conversation binding.
routeIncoming({
messageId: "in-a2",
conversationId: "chat-a",
senderId: "user-a",
text: "alpha-2",
chatType: "group",
});
await vi.advanceTimersByTimeAsync(600);
expect(getConversationIdByExternal("chat-a")).toBe(convA);
expect((hub as unknown as { createConversation: ReturnType<typeof vi.fn> }).createConversation).toHaveBeenCalledTimes(2);
expect(harnessA?.write).toHaveBeenCalledTimes(2);
expect(harnessA?.write.mock.calls[1]?.[0]).toContain("alpha-2");
manager.stopAll();
});
});

File diff suppressed because it is too large Load diff