refactor(channels): persist route bindings across restarts

This commit is contained in:
Jiayuan Zhang 2026-02-17 03:33:58 +08:00
parent c4d9f0273a
commit 3123506657
2 changed files with 203 additions and 28 deletions

View file

@ -1,4 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { mkdtempSync, rmSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import type { Hub } from "../hub/hub.js";
import type { AsyncAgent } from "../agent/async-agent.js";
import type { ChannelPlugin, ChannelMessage } from "./types.js";
@ -89,16 +92,17 @@ function createHarness() {
},
};
const manager = new ChannelManager(hub);
const routeIncoming = (message: ChannelMessage) => {
(manager as unknown as {
const routeIncomingToManager = (target: ChannelManager, message: ChannelMessage) => {
(target as unknown as {
routeIncoming: (plugin: ChannelPlugin, accountId: string, message: ChannelMessage) => void;
}).routeIncoming(plugin, "default", message);
};
const getConversationIdByExternal = (externalConversationId: string): string | undefined => {
const bindings = (manager as unknown as {
const getConversationIdByExternal = (
target: ChannelManager,
externalConversationId: string,
): string | undefined => {
const bindings = (target as unknown as {
routeBindings: Map<string, { hubConversationId: string }>;
}).routeBindings;
@ -111,31 +115,38 @@ function createHarness() {
};
return {
manager,
hub,
replyText,
sendText,
addReaction,
routeIncoming,
plugin,
createManager: (routeBindingsPath: string) => new ChannelManager(hub, { routeBindingsPath }),
routeIncomingToManager,
getConversationIdByExternal,
conversations,
};
}
describe("channel manager route isolation", () => {
let testDir: string;
beforeEach(() => {
vi.useFakeTimers();
testDir = mkdtempSync(join(tmpdir(), "channel-manager-"));
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
rmSync(testDir, { recursive: true, force: true });
});
it("suppresses pure HEARTBEAT_OK in channel outbound", async () => {
const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness();
const routeBindingsPath = join(testDir, "route-bindings.json");
const { createManager, routeIncomingToManager, getConversationIdByExternal, conversations, replyText, sendText } = createHarness();
const manager = createManager(routeBindingsPath);
routeIncoming({
routeIncomingToManager(manager, {
messageId: "in-1",
conversationId: "chat-1",
senderId: "user-1",
@ -143,7 +154,7 @@ describe("channel manager route isolation", () => {
chatType: "direct",
});
const hubConversationId = getConversationIdByExternal("chat-1");
const hubConversationId = getConversationIdByExternal(manager, "chat-1");
expect(hubConversationId).toBeDefined();
const harness = conversations.get(hubConversationId!);
@ -167,9 +178,11 @@ describe("channel manager route isolation", () => {
});
it("keeps forwarding normal assistant replies", async () => {
const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness();
const routeBindingsPath = join(testDir, "route-bindings.json");
const { createManager, routeIncomingToManager, getConversationIdByExternal, conversations, replyText, sendText } = createHarness();
const manager = createManager(routeBindingsPath);
routeIncoming({
routeIncomingToManager(manager, {
messageId: "in-1",
conversationId: "chat-1",
senderId: "user-1",
@ -177,7 +190,7 @@ describe("channel manager route isolation", () => {
chatType: "direct",
});
const hubConversationId = getConversationIdByExternal("chat-1");
const hubConversationId = getConversationIdByExternal(manager, "chat-1");
expect(hubConversationId).toBeDefined();
const harness = conversations.get(hubConversationId!);
@ -211,14 +224,15 @@ describe("channel manager route isolation", () => {
it("binds different external conversations to isolated hub conversations", async () => {
const {
manager,
createManager,
hub,
routeIncoming,
routeIncomingToManager,
getConversationIdByExternal,
conversations,
} = createHarness();
const manager = createManager(join(testDir, "route-bindings.json"));
routeIncoming({
routeIncomingToManager(manager, {
messageId: "in-a1",
conversationId: "chat-a",
senderId: "user-a",
@ -226,7 +240,7 @@ describe("channel manager route isolation", () => {
chatType: "group",
});
routeIncoming({
routeIncomingToManager(manager, {
messageId: "in-b1",
conversationId: "chat-b",
senderId: "user-b",
@ -236,8 +250,8 @@ describe("channel manager route isolation", () => {
await vi.advanceTimersByTimeAsync(600);
const convA = getConversationIdByExternal("chat-a");
const convB = getConversationIdByExternal("chat-b");
const convA = getConversationIdByExternal(manager, "chat-a");
const convB = getConversationIdByExternal(manager, "chat-b");
expect(convA).toBeDefined();
expect(convB).toBeDefined();
@ -253,7 +267,7 @@ describe("channel manager route isolation", () => {
expect(harnessB?.write.mock.calls[0]?.[0]).toContain("beta");
// Same external route should reuse existing hub conversation binding.
routeIncoming({
routeIncomingToManager(manager, {
messageId: "in-a2",
conversationId: "chat-a",
senderId: "user-a",
@ -263,11 +277,57 @@ describe("channel manager route isolation", () => {
await vi.advanceTimersByTimeAsync(600);
expect(getConversationIdByExternal("chat-a")).toBe(convA);
expect(getConversationIdByExternal(manager, "chat-a")).toBe(convA);
expect((hub as unknown as { createConversation: ReturnType<typeof vi.fn> }).createConversation).toHaveBeenCalledTimes(2);
expect(harnessA?.write).toHaveBeenCalledTimes(2);
expect(harnessA?.write.mock.calls[1]?.[0]).toContain("alpha-2");
manager.stopAll();
});
it("restores route bindings from disk after manager restart", async () => {
const routeBindingsPath = join(testDir, "route-bindings.json");
const {
hub,
createManager,
routeIncomingToManager,
getConversationIdByExternal,
conversations,
} = createHarness();
const managerA = createManager(routeBindingsPath);
routeIncomingToManager(managerA, {
messageId: "in-p1",
conversationId: "chat-persist",
senderId: "user-p",
text: "persist-1",
chatType: "direct",
});
await vi.advanceTimersByTimeAsync(600);
const firstConversationId = getConversationIdByExternal(managerA, "chat-persist");
expect(firstConversationId).toBeDefined();
const harness = conversations.get(firstConversationId!);
expect(harness?.write).toHaveBeenCalledTimes(1);
managerA.stopAll();
const managerB = createManager(routeBindingsPath);
routeIncomingToManager(managerB, {
messageId: "in-p2",
conversationId: "chat-persist",
senderId: "user-p",
text: "persist-2",
chatType: "direct",
});
await vi.advanceTimersByTimeAsync(600);
const restoredConversationId = getConversationIdByExternal(managerB, "chat-persist");
expect(restoredConversationId).toBe(firstConversationId);
expect((hub as unknown as { createConversation: ReturnType<typeof vi.fn> }).createConversation).toHaveBeenCalledTimes(1);
expect(harness?.write).toHaveBeenCalledTimes(2);
expect(harness?.write.mock.calls[1]?.[0]).toContain("persist-2");
managerB.stopAll();
});
});

View file

@ -33,6 +33,27 @@ import { describeImage } from "../media/describe-image.js";
import { describeVideo } from "../media/describe-video.js";
import { InboundDebouncer } from "./inbound-debouncer.js";
import { extname } from "node:path";
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { DATA_DIR } from "@multica/utils";
const ROUTE_BINDING_STORE_VERSION = 1;
interface RouteBindingStoreEntry {
routeKey: string;
hubConversationId: string;
hubAgentId: string;
updatedAt: number;
}
interface RouteBindingStoreFile {
version: number;
bindings: RouteBindingStoreEntry[];
}
interface ChannelManagerOptions {
routeBindingsPath?: string;
}
interface AccountHandle {
channelId: string;
@ -54,7 +75,8 @@ interface RouteBinding {
routeKey: string;
hubConversationId: string;
hubAgentId: string;
lastRoute: LastRoute;
updatedAt: number;
lastRoute: LastRoute | null;
}
interface PendingRoute {
@ -79,6 +101,7 @@ interface ResolveRouteResult {
export class ChannelManager {
private readonly hub: Hub;
private readonly routeBindingsPath: string;
/** Running accounts keyed by "channelId:accountId" */
private readonly accounts = new Map<string, AccountHandle>();
@ -98,13 +121,18 @@ export class ChannelManager {
/** Inbound debouncer keyed by routeKey */
private debouncer: InboundDebouncer | null = null;
constructor(hub: Hub) {
constructor(hub: Hub, options?: ChannelManagerOptions) {
this.hub = hub;
this.routeBindingsPath = options?.routeBindingsPath ?? join(DATA_DIR, "channels", "route-bindings.json");
this.loadRouteBindings();
}
/** Start all configured channel accounts */
async startAll(): Promise<void> {
console.log("[Channels] Starting all channels...");
if (this.routeBindings.size === 0) {
this.loadRouteBindings();
}
const config = loadChannelsConfig();
const plugins = listChannels();
@ -314,13 +342,33 @@ export class ChannelManager {
existing.hubConversationId,
existing.hubAgentId,
);
existing.updatedAt = Date.now();
this.routeBindings.set(routeKey, existing);
return { binding: existing, conversation: existingConversation };
}
// Conversation runtime disappeared — remove stale binding and rebuild.
this.routeBindings.delete(routeKey);
// Conversation runtime disappeared — re-create a conversation under the same agent when possible.
this.cleanupConversationState(existing.hubConversationId, { unsubscribe: true });
const recreated = this.hub.createConversation(undefined, { agentId: existing.hubAgentId });
const recreatedConversationId = recreated.sessionId;
const recreatedAgentId = this.hub.getConversationAgentId(recreatedConversationId) ?? existing.hubAgentId;
existing.hubConversationId = recreatedConversationId;
existing.hubAgentId = recreatedAgentId;
existing.updatedAt = Date.now();
existing.lastRoute = this.createRoute(
routeKey,
plugin,
accountId,
externalConversationId,
messageId,
chatType,
recreatedConversationId,
recreatedAgentId,
);
this.routeBindings.set(routeKey, existing);
this.persistRouteBindings();
return { binding: existing, conversation: recreated };
}
const { agentId, conversation: maybeMainConversation } = this.resolveDefaultAgentAndConversation();
@ -332,6 +380,7 @@ export class ChannelManager {
routeKey,
hubConversationId,
hubAgentId,
updatedAt: Date.now(),
lastRoute: this.createRoute(
routeKey,
plugin,
@ -344,6 +393,7 @@ export class ChannelManager {
),
};
this.routeBindings.set(routeKey, binding);
this.persistRouteBindings();
console.log(
`[Channels] route bind: ${routeKey} -> conversation=${hubConversationId} (agent=${hubAgentId})`,
@ -365,7 +415,7 @@ export class ChannelManager {
private findRouteForConversation(conversationId: string): LastRoute | null {
for (const binding of this.routeBindings.values()) {
if (binding.hubConversationId === conversationId) {
if (binding.hubConversationId === conversationId && binding.lastRoute) {
return this.cloneRoute(binding.lastRoute);
}
}
@ -554,6 +604,10 @@ export class ChannelManager {
}
const { binding, conversation } = resolved;
if (!binding.lastRoute) {
console.error(`[Channels] Route binding missing runtime route data for ${binding.routeKey}`);
return;
}
this.ensureConversationSubscribed(conversation);
const routeSnapshot = this.cloneRoute(binding.lastRoute);
@ -677,12 +731,17 @@ export class ChannelManager {
`[Channels] Debouncer flush dropped: conversation unavailable ${binding.hubConversationId}`,
);
this.routeBindings.delete(routeKey);
this.persistRouteBindings();
this.cleanupConversationState(binding.hubConversationId, { unsubscribe: true });
return;
}
this.ensureConversationSubscribed(conversation);
if (!binding.lastRoute) {
console.warn(`[Channels] Debouncer flush dropped: missing lastRoute for routeKey=${routeKey}`);
return;
}
const state = this.getConversationState(binding.hubConversationId);
const route = this.cloneRoute(binding.lastRoute);
const acks = [...state.ackBuffer];
@ -783,11 +842,15 @@ export class ChannelManager {
const removedConversationIds = new Set<string>();
for (const [routeKey, binding] of this.routeBindings.entries()) {
const route = binding.lastRoute;
if (route.plugin.id === channelId && route.deliveryCtx.accountId === accountId) {
const matchesByRoute = route
? route.plugin.id === channelId && route.deliveryCtx.accountId === accountId
: routeKey.startsWith(`${channelId}:${accountId}:`);
if (matchesByRoute) {
this.routeBindings.delete(routeKey);
removedConversationIds.add(binding.hubConversationId);
}
}
this.persistRouteBindings();
for (const conversationId of removedConversationIds) {
const stillBound = Array.from(this.routeBindings.values())
@ -876,4 +939,56 @@ export class ChannelManager {
}
return infos;
}
private loadRouteBindings(): void {
if (!existsSync(this.routeBindingsPath)) return;
try {
const raw = JSON.parse(readFileSync(this.routeBindingsPath, "utf-8")) as RouteBindingStoreFile;
const bindings = Array.isArray(raw.bindings) ? raw.bindings : [];
for (const item of bindings) {
if (!item || typeof item !== "object") continue;
if (typeof item.routeKey !== "string" || !item.routeKey.trim()) continue;
if (typeof item.hubConversationId !== "string" || !item.hubConversationId.trim()) continue;
if (typeof item.hubAgentId !== "string" || !item.hubAgentId.trim()) continue;
const routeKey = item.routeKey.trim();
this.routeBindings.set(routeKey, {
routeKey,
hubConversationId: item.hubConversationId.trim(),
hubAgentId: item.hubAgentId.trim(),
updatedAt: typeof item.updatedAt === "number" ? item.updatedAt : Date.now(),
lastRoute: null,
});
}
if (this.routeBindings.size > 0) {
console.log(`[Channels] Restored ${this.routeBindings.size} route binding(s) from disk`);
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.warn(`[Channels] Failed to load route bindings: ${message}`);
}
}
private persistRouteBindings(): void {
const serialized: RouteBindingStoreFile = {
version: ROUTE_BINDING_STORE_VERSION,
bindings: Array.from(this.routeBindings.values())
.map((binding) => ({
routeKey: binding.routeKey,
hubConversationId: binding.hubConversationId,
hubAgentId: binding.hubAgentId,
updatedAt: binding.updatedAt,
}))
.sort((a, b) => a.routeKey.localeCompare(b.routeKey)),
};
try {
const dir = dirname(this.routeBindingsPath);
mkdirSync(dir, { recursive: true });
writeFileSync(this.routeBindingsPath, JSON.stringify(serialized, null, 2), "utf-8");
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.warn(`[Channels] Failed to persist route bindings: ${message}`);
}
}
}