From 3123506657a225b43c1bbcb984e0a2f3ab6f20d2 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 17 Feb 2026 03:33:58 +0800 Subject: [PATCH] refactor(channels): persist route bindings across restarts --- packages/core/src/channels/manager.test.ts | 104 +++++++++++++---- packages/core/src/channels/manager.ts | 127 ++++++++++++++++++++- 2 files changed, 203 insertions(+), 28 deletions(-) diff --git a/packages/core/src/channels/manager.test.ts b/packages/core/src/channels/manager.test.ts index 652f6005..10b258e6 100644 --- a/packages/core/src/channels/manager.test.ts +++ b/packages/core/src/channels/manager.test.ts @@ -1,4 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; import type { Hub } from "../hub/hub.js"; import type { AsyncAgent } from "../agent/async-agent.js"; import type { ChannelPlugin, ChannelMessage } from "./types.js"; @@ -89,16 +92,17 @@ function createHarness() { }, }; - const manager = new ChannelManager(hub); - - const routeIncoming = (message: ChannelMessage) => { - (manager as unknown as { + const routeIncomingToManager = (target: ChannelManager, message: ChannelMessage) => { + (target as unknown as { routeIncoming: (plugin: ChannelPlugin, accountId: string, message: ChannelMessage) => void; }).routeIncoming(plugin, "default", message); }; - const getConversationIdByExternal = (externalConversationId: string): string | undefined => { - const bindings = (manager as unknown as { + const getConversationIdByExternal = ( + target: ChannelManager, + externalConversationId: string, + ): string | undefined => { + const bindings = (target as unknown as { routeBindings: Map; }).routeBindings; @@ -111,31 +115,38 @@ function createHarness() { }; return { - manager, hub, replyText, sendText, addReaction, - routeIncoming, + plugin, + createManager: (routeBindingsPath: string) => new ChannelManager(hub, { routeBindingsPath }), + routeIncomingToManager, getConversationIdByExternal, conversations, }; } describe("channel manager route isolation", () => { + let testDir: string; + beforeEach(() => { vi.useFakeTimers(); + testDir = mkdtempSync(join(tmpdir(), "channel-manager-")); }); afterEach(() => { vi.useRealTimers(); vi.restoreAllMocks(); + rmSync(testDir, { recursive: true, force: true }); }); it("suppresses pure HEARTBEAT_OK in channel outbound", async () => { - const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness(); + const routeBindingsPath = join(testDir, "route-bindings.json"); + const { createManager, routeIncomingToManager, getConversationIdByExternal, conversations, replyText, sendText } = createHarness(); + const manager = createManager(routeBindingsPath); - routeIncoming({ + routeIncomingToManager(manager, { messageId: "in-1", conversationId: "chat-1", senderId: "user-1", @@ -143,7 +154,7 @@ describe("channel manager route isolation", () => { chatType: "direct", }); - const hubConversationId = getConversationIdByExternal("chat-1"); + const hubConversationId = getConversationIdByExternal(manager, "chat-1"); expect(hubConversationId).toBeDefined(); const harness = conversations.get(hubConversationId!); @@ -167,9 +178,11 @@ describe("channel manager route isolation", () => { }); it("keeps forwarding normal assistant replies", async () => { - const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness(); + const routeBindingsPath = join(testDir, "route-bindings.json"); + const { createManager, routeIncomingToManager, getConversationIdByExternal, conversations, replyText, sendText } = createHarness(); + const manager = createManager(routeBindingsPath); - routeIncoming({ + routeIncomingToManager(manager, { messageId: "in-1", conversationId: "chat-1", senderId: "user-1", @@ -177,7 +190,7 @@ describe("channel manager route isolation", () => { chatType: "direct", }); - const hubConversationId = getConversationIdByExternal("chat-1"); + const hubConversationId = getConversationIdByExternal(manager, "chat-1"); expect(hubConversationId).toBeDefined(); const harness = conversations.get(hubConversationId!); @@ -211,14 +224,15 @@ describe("channel manager route isolation", () => { it("binds different external conversations to isolated hub conversations", async () => { const { - manager, + createManager, hub, - routeIncoming, + routeIncomingToManager, getConversationIdByExternal, conversations, } = createHarness(); + const manager = createManager(join(testDir, "route-bindings.json")); - routeIncoming({ + routeIncomingToManager(manager, { messageId: "in-a1", conversationId: "chat-a", senderId: "user-a", @@ -226,7 +240,7 @@ describe("channel manager route isolation", () => { chatType: "group", }); - routeIncoming({ + routeIncomingToManager(manager, { messageId: "in-b1", conversationId: "chat-b", senderId: "user-b", @@ -236,8 +250,8 @@ describe("channel manager route isolation", () => { await vi.advanceTimersByTimeAsync(600); - const convA = getConversationIdByExternal("chat-a"); - const convB = getConversationIdByExternal("chat-b"); + const convA = getConversationIdByExternal(manager, "chat-a"); + const convB = getConversationIdByExternal(manager, "chat-b"); expect(convA).toBeDefined(); expect(convB).toBeDefined(); @@ -253,7 +267,7 @@ describe("channel manager route isolation", () => { expect(harnessB?.write.mock.calls[0]?.[0]).toContain("beta"); // Same external route should reuse existing hub conversation binding. - routeIncoming({ + routeIncomingToManager(manager, { messageId: "in-a2", conversationId: "chat-a", senderId: "user-a", @@ -263,11 +277,57 @@ describe("channel manager route isolation", () => { await vi.advanceTimersByTimeAsync(600); - expect(getConversationIdByExternal("chat-a")).toBe(convA); + expect(getConversationIdByExternal(manager, "chat-a")).toBe(convA); expect((hub as unknown as { createConversation: ReturnType }).createConversation).toHaveBeenCalledTimes(2); expect(harnessA?.write).toHaveBeenCalledTimes(2); expect(harnessA?.write.mock.calls[1]?.[0]).toContain("alpha-2"); manager.stopAll(); }); + + it("restores route bindings from disk after manager restart", async () => { + const routeBindingsPath = join(testDir, "route-bindings.json"); + const { + hub, + createManager, + routeIncomingToManager, + getConversationIdByExternal, + conversations, + } = createHarness(); + + const managerA = createManager(routeBindingsPath); + routeIncomingToManager(managerA, { + messageId: "in-p1", + conversationId: "chat-persist", + senderId: "user-p", + text: "persist-1", + chatType: "direct", + }); + await vi.advanceTimersByTimeAsync(600); + + const firstConversationId = getConversationIdByExternal(managerA, "chat-persist"); + expect(firstConversationId).toBeDefined(); + const harness = conversations.get(firstConversationId!); + expect(harness?.write).toHaveBeenCalledTimes(1); + + managerA.stopAll(); + + const managerB = createManager(routeBindingsPath); + routeIncomingToManager(managerB, { + messageId: "in-p2", + conversationId: "chat-persist", + senderId: "user-p", + text: "persist-2", + chatType: "direct", + }); + await vi.advanceTimersByTimeAsync(600); + + const restoredConversationId = getConversationIdByExternal(managerB, "chat-persist"); + expect(restoredConversationId).toBe(firstConversationId); + expect((hub as unknown as { createConversation: ReturnType }).createConversation).toHaveBeenCalledTimes(1); + expect(harness?.write).toHaveBeenCalledTimes(2); + expect(harness?.write.mock.calls[1]?.[0]).toContain("persist-2"); + + managerB.stopAll(); + }); }); diff --git a/packages/core/src/channels/manager.ts b/packages/core/src/channels/manager.ts index 7b24eb12..dbb32857 100644 --- a/packages/core/src/channels/manager.ts +++ b/packages/core/src/channels/manager.ts @@ -33,6 +33,27 @@ import { describeImage } from "../media/describe-image.js"; import { describeVideo } from "../media/describe-video.js"; import { InboundDebouncer } from "./inbound-debouncer.js"; import { extname } from "node:path"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { DATA_DIR } from "@multica/utils"; + +const ROUTE_BINDING_STORE_VERSION = 1; + +interface RouteBindingStoreEntry { + routeKey: string; + hubConversationId: string; + hubAgentId: string; + updatedAt: number; +} + +interface RouteBindingStoreFile { + version: number; + bindings: RouteBindingStoreEntry[]; +} + +interface ChannelManagerOptions { + routeBindingsPath?: string; +} interface AccountHandle { channelId: string; @@ -54,7 +75,8 @@ interface RouteBinding { routeKey: string; hubConversationId: string; hubAgentId: string; - lastRoute: LastRoute; + updatedAt: number; + lastRoute: LastRoute | null; } interface PendingRoute { @@ -79,6 +101,7 @@ interface ResolveRouteResult { export class ChannelManager { private readonly hub: Hub; + private readonly routeBindingsPath: string; /** Running accounts keyed by "channelId:accountId" */ private readonly accounts = new Map(); @@ -98,13 +121,18 @@ export class ChannelManager { /** Inbound debouncer keyed by routeKey */ private debouncer: InboundDebouncer | null = null; - constructor(hub: Hub) { + constructor(hub: Hub, options?: ChannelManagerOptions) { this.hub = hub; + this.routeBindingsPath = options?.routeBindingsPath ?? join(DATA_DIR, "channels", "route-bindings.json"); + this.loadRouteBindings(); } /** Start all configured channel accounts */ async startAll(): Promise { console.log("[Channels] Starting all channels..."); + if (this.routeBindings.size === 0) { + this.loadRouteBindings(); + } const config = loadChannelsConfig(); const plugins = listChannels(); @@ -314,13 +342,33 @@ export class ChannelManager { existing.hubConversationId, existing.hubAgentId, ); + existing.updatedAt = Date.now(); this.routeBindings.set(routeKey, existing); return { binding: existing, conversation: existingConversation }; } - // Conversation runtime disappeared — remove stale binding and rebuild. - this.routeBindings.delete(routeKey); + // Conversation runtime disappeared — re-create a conversation under the same agent when possible. this.cleanupConversationState(existing.hubConversationId, { unsubscribe: true }); + const recreated = this.hub.createConversation(undefined, { agentId: existing.hubAgentId }); + const recreatedConversationId = recreated.sessionId; + const recreatedAgentId = this.hub.getConversationAgentId(recreatedConversationId) ?? existing.hubAgentId; + + existing.hubConversationId = recreatedConversationId; + existing.hubAgentId = recreatedAgentId; + existing.updatedAt = Date.now(); + existing.lastRoute = this.createRoute( + routeKey, + plugin, + accountId, + externalConversationId, + messageId, + chatType, + recreatedConversationId, + recreatedAgentId, + ); + this.routeBindings.set(routeKey, existing); + this.persistRouteBindings(); + return { binding: existing, conversation: recreated }; } const { agentId, conversation: maybeMainConversation } = this.resolveDefaultAgentAndConversation(); @@ -332,6 +380,7 @@ export class ChannelManager { routeKey, hubConversationId, hubAgentId, + updatedAt: Date.now(), lastRoute: this.createRoute( routeKey, plugin, @@ -344,6 +393,7 @@ export class ChannelManager { ), }; this.routeBindings.set(routeKey, binding); + this.persistRouteBindings(); console.log( `[Channels] route bind: ${routeKey} -> conversation=${hubConversationId} (agent=${hubAgentId})`, @@ -365,7 +415,7 @@ export class ChannelManager { private findRouteForConversation(conversationId: string): LastRoute | null { for (const binding of this.routeBindings.values()) { - if (binding.hubConversationId === conversationId) { + if (binding.hubConversationId === conversationId && binding.lastRoute) { return this.cloneRoute(binding.lastRoute); } } @@ -554,6 +604,10 @@ export class ChannelManager { } const { binding, conversation } = resolved; + if (!binding.lastRoute) { + console.error(`[Channels] Route binding missing runtime route data for ${binding.routeKey}`); + return; + } this.ensureConversationSubscribed(conversation); const routeSnapshot = this.cloneRoute(binding.lastRoute); @@ -677,12 +731,17 @@ export class ChannelManager { `[Channels] Debouncer flush dropped: conversation unavailable ${binding.hubConversationId}`, ); this.routeBindings.delete(routeKey); + this.persistRouteBindings(); this.cleanupConversationState(binding.hubConversationId, { unsubscribe: true }); return; } this.ensureConversationSubscribed(conversation); + if (!binding.lastRoute) { + console.warn(`[Channels] Debouncer flush dropped: missing lastRoute for routeKey=${routeKey}`); + return; + } const state = this.getConversationState(binding.hubConversationId); const route = this.cloneRoute(binding.lastRoute); const acks = [...state.ackBuffer]; @@ -783,11 +842,15 @@ export class ChannelManager { const removedConversationIds = new Set(); for (const [routeKey, binding] of this.routeBindings.entries()) { const route = binding.lastRoute; - if (route.plugin.id === channelId && route.deliveryCtx.accountId === accountId) { + const matchesByRoute = route + ? route.plugin.id === channelId && route.deliveryCtx.accountId === accountId + : routeKey.startsWith(`${channelId}:${accountId}:`); + if (matchesByRoute) { this.routeBindings.delete(routeKey); removedConversationIds.add(binding.hubConversationId); } } + this.persistRouteBindings(); for (const conversationId of removedConversationIds) { const stillBound = Array.from(this.routeBindings.values()) @@ -876,4 +939,56 @@ export class ChannelManager { } return infos; } + + private loadRouteBindings(): void { + if (!existsSync(this.routeBindingsPath)) return; + + try { + const raw = JSON.parse(readFileSync(this.routeBindingsPath, "utf-8")) as RouteBindingStoreFile; + const bindings = Array.isArray(raw.bindings) ? raw.bindings : []; + for (const item of bindings) { + if (!item || typeof item !== "object") continue; + if (typeof item.routeKey !== "string" || !item.routeKey.trim()) continue; + if (typeof item.hubConversationId !== "string" || !item.hubConversationId.trim()) continue; + if (typeof item.hubAgentId !== "string" || !item.hubAgentId.trim()) continue; + const routeKey = item.routeKey.trim(); + this.routeBindings.set(routeKey, { + routeKey, + hubConversationId: item.hubConversationId.trim(), + hubAgentId: item.hubAgentId.trim(), + updatedAt: typeof item.updatedAt === "number" ? item.updatedAt : Date.now(), + lastRoute: null, + }); + } + if (this.routeBindings.size > 0) { + console.log(`[Channels] Restored ${this.routeBindings.size} route binding(s) from disk`); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + console.warn(`[Channels] Failed to load route bindings: ${message}`); + } + } + + private persistRouteBindings(): void { + const serialized: RouteBindingStoreFile = { + version: ROUTE_BINDING_STORE_VERSION, + bindings: Array.from(this.routeBindings.values()) + .map((binding) => ({ + routeKey: binding.routeKey, + hubConversationId: binding.hubConversationId, + hubAgentId: binding.hubAgentId, + updatedAt: binding.updatedAt, + })) + .sort((a, b) => a.routeKey.localeCompare(b.routeKey)), + }; + + try { + const dir = dirname(this.routeBindingsPath); + mkdirSync(dir, { recursive: true }); + writeFileSync(this.routeBindingsPath, JSON.stringify(serialized, null, 2), "utf-8"); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + console.warn(`[Channels] Failed to persist route bindings: ${message}`); + } + } }