diff --git a/packages/core/src/channels/manager.test.ts b/packages/core/src/channels/manager.test.ts index 463eb903..652f6005 100644 --- a/packages/core/src/channels/manager.test.ts +++ b/packages/core/src/channels/manager.test.ts @@ -1,31 +1,73 @@ -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { Hub } from "../hub/hub.js"; import type { AsyncAgent } from "../agent/async-agent.js"; -import type { ChannelPlugin } from "./types.js"; +import type { ChannelPlugin, ChannelMessage } from "./types.js"; import { ChannelManager } from "./manager.js"; type AgentEventCallback = (event: unknown) => void; -function createHarness() { +type AgentHarness = { + agent: AsyncAgent; + write: ReturnType; + emit: (event: unknown) => void; +}; + +function createAgentHarness(sessionId: string): AgentHarness { let subscriber: AgentEventCallback | null = null; + const write = vi.fn(); const agent = { - sessionId: "agent-1", + sessionId, + closed: false, subscribe: (callback: AgentEventCallback) => { subscriber = callback; return () => { subscriber = null; }; }, + write, } as unknown as AsyncAgent; + return { + agent, + write, + emit: (event: unknown) => { + subscriber?.(event); + }, + }; +} + +function createHarness() { + const conversations = new Map(); + let conversationCounter = 0; + + const createConversation = vi.fn(() => { + conversationCounter += 1; + const id = `conv-${conversationCounter}`; + const harness = createAgentHarness(id); + conversations.set(id, harness); + return harness.agent; + }); + + const mainConversation = createAgentHarness("main-conv"); + const createAgent = vi.fn(() => { + conversations.set(mainConversation.agent.sessionId, mainConversation); + return mainConversation.agent; + }); + const hub = { - listAgents: () => ["agent-1"], - getAgent: () => agent, + listAgents: vi.fn(() => ["agent-1"]), + createAgent, + createConversation, + getConversation: vi.fn((conversationId: string) => conversations.get(conversationId)?.agent), + getConversationAgentId: vi.fn(() => "agent-1"), + broadcastInbound: vi.fn(), } as unknown as Hub; const replyText = vi.fn(async () => {}); const sendText = vi.fn(async () => {}); + const addReaction = vi.fn(async () => {}); + const plugin: ChannelPlugin = { id: "telegram", meta: { @@ -43,39 +85,75 @@ function createHarness() { outbound: { replyText, sendText, + addReaction, }, }; const manager = new ChannelManager(hub); - (manager as unknown as { lastRoute: unknown }).lastRoute = { - plugin, - deliveryCtx: { - channel: "telegram", - accountId: "default", - conversationId: "chat-1", - replyToMessageId: "in-1", - }, + + const routeIncoming = (message: ChannelMessage) => { + (manager as unknown as { + routeIncoming: (plugin: ChannelPlugin, accountId: string, message: ChannelMessage) => void; + }).routeIncoming(plugin, "default", message); }; - (manager as unknown as { ensureSubscribed: () => void }).ensureSubscribed(); - const emit = (event: unknown) => subscriber?.(event); + const getConversationIdByExternal = (externalConversationId: string): string | undefined => { + const bindings = (manager as unknown as { + routeBindings: Map; + }).routeBindings; - return { manager, replyText, sendText, emit }; + for (const [routeKey, binding] of bindings.entries()) { + if (routeKey.endsWith(`:${externalConversationId}`)) { + return binding.hubConversationId; + } + } + return undefined; + }; + + return { + manager, + hub, + replyText, + sendText, + addReaction, + routeIncoming, + getConversationIdByExternal, + conversations, + }; } -describe("channel manager heartbeat filtering", () => { +describe("channel manager route isolation", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); vi.restoreAllMocks(); }); it("suppresses pure HEARTBEAT_OK in channel outbound", async () => { - const { manager, replyText, sendText, emit } = createHarness(); + const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness(); - emit({ + routeIncoming({ + messageId: "in-1", + conversationId: "chat-1", + senderId: "user-1", + text: "hi", + chatType: "direct", + }); + + const hubConversationId = getConversationIdByExternal("chat-1"); + expect(hubConversationId).toBeDefined(); + + const harness = conversations.get(hubConversationId!); + expect(harness).toBeDefined(); + + harness!.emit({ type: "message_start", message: { role: "assistant", content: [] }, }); - emit({ + harness!.emit({ type: "message_end", message: { role: "assistant", content: [{ type: "text", text: "HEARTBEAT_OK" }] }, }); @@ -89,13 +167,27 @@ describe("channel manager heartbeat filtering", () => { }); it("keeps forwarding normal assistant replies", async () => { - const { manager, replyText, sendText, emit } = createHarness(); + const { manager, routeIncoming, getConversationIdByExternal, conversations, replyText, sendText } = createHarness(); - emit({ + routeIncoming({ + messageId: "in-1", + conversationId: "chat-1", + senderId: "user-1", + text: "hi", + chatType: "direct", + }); + + const hubConversationId = getConversationIdByExternal("chat-1"); + expect(hubConversationId).toBeDefined(); + + const harness = conversations.get(hubConversationId!); + expect(harness).toBeDefined(); + + harness!.emit({ type: "message_start", message: { role: "assistant", content: [] }, }); - emit({ + harness!.emit({ type: "message_end", message: { role: "assistant", content: [{ type: "text", text: "Reminder: check inbox." }] }, }); @@ -116,4 +208,66 @@ describe("channel manager heartbeat filtering", () => { manager.stopAll(); }); + + it("binds different external conversations to isolated hub conversations", async () => { + const { + manager, + hub, + routeIncoming, + getConversationIdByExternal, + conversations, + } = createHarness(); + + routeIncoming({ + messageId: "in-a1", + conversationId: "chat-a", + senderId: "user-a", + text: "alpha", + chatType: "group", + }); + + routeIncoming({ + messageId: "in-b1", + conversationId: "chat-b", + senderId: "user-b", + text: "beta", + chatType: "group", + }); + + await vi.advanceTimersByTimeAsync(600); + + const convA = getConversationIdByExternal("chat-a"); + const convB = getConversationIdByExternal("chat-b"); + + expect(convA).toBeDefined(); + expect(convB).toBeDefined(); + expect(convA).not.toBe(convB); + + const harnessA = conversations.get(convA!); + const harnessB = conversations.get(convB!); + + expect(harnessA?.write).toHaveBeenCalledTimes(1); + expect(harnessA?.write.mock.calls[0]?.[0]).toContain("alpha"); + + expect(harnessB?.write).toHaveBeenCalledTimes(1); + expect(harnessB?.write.mock.calls[0]?.[0]).toContain("beta"); + + // Same external route should reuse existing hub conversation binding. + routeIncoming({ + messageId: "in-a2", + conversationId: "chat-a", + senderId: "user-a", + text: "alpha-2", + chatType: "group", + }); + + await vi.advanceTimersByTimeAsync(600); + + expect(getConversationIdByExternal("chat-a")).toBe(convA); + expect((hub as unknown as { createConversation: ReturnType }).createConversation).toHaveBeenCalledTimes(2); + expect(harnessA?.write).toHaveBeenCalledTimes(2); + expect(harnessA?.write.mock.calls[1]?.[0]).toContain("alpha-2"); + + manager.stopAll(); + }); }); diff --git a/packages/core/src/channels/manager.ts b/packages/core/src/channels/manager.ts index fc62f786..7b24eb12 100644 --- a/packages/core/src/channels/manager.ts +++ b/packages/core/src/channels/manager.ts @@ -1,15 +1,17 @@ /** - * Channel Manager — bridges messaging channels to the Hub's agent. + * Channel Manager — bridges messaging channels to Hub conversations. * - * Design: One Hub, one Agent. Channels are just alternative input/output surfaces. - * - Incoming: channel message → agent.write(text) (same as desktop/gateway) - * - Outgoing: agent reply → check lastRoute → forward to originating channel + * Design: + * - Incoming channel messages are keyed by routeKey + * (channelId + accountId + externalConversationId). + * - Each routeKey is bound to one Hub conversationId. + * - Outgoing assistant events are delivered back through the bound route. * - * Uses "last route" pattern: whoever sent the last message gets the reply. + * This keeps channel routes isolated across conversations and avoids + * the old "first active agent" coupling. * * @see docs/channels/README.md — Channel system overview * @see docs/channels/media-handling.md — Media processing pipeline - * @see docs/message-paths.md — All three message paths (Desktop / Web / Channel) */ import type { Hub } from "../hub/hub.js"; @@ -39,46 +41,61 @@ interface AccountHandle { state: ChannelAccountState; } -/** Tracks where the last message came from, so replies go back there. */ interface LastRoute { + routeKey: string; plugin: ChannelPlugin; deliveryCtx: DeliveryContext; - /** Chat type of the originating message (for source prefix) */ + hubConversationId: string; + hubAgentId: string; chatType?: "direct" | "group" | undefined; } +interface RouteBinding { + routeKey: string; + hubConversationId: string; + hubAgentId: string; + lastRoute: LastRoute; +} + +interface PendingRoute { + route: LastRoute; + acks: LastRoute[]; +} + +interface ConversationState { + pendingRoutes: PendingRoute[]; + activeRoute: LastRoute | null; + activeAcks: LastRoute[]; + ackBuffer: LastRoute[]; + aggregator: MessageAggregator | null; + typingTimer: ReturnType | null; + statusMessageId: string | null; +} + +interface ResolveRouteResult { + binding: RouteBinding; + conversation: AsyncAgent; +} + export class ChannelManager { private readonly hub: Hub; + /** Running accounts keyed by "channelId:accountId" */ private readonly accounts = new Map(); - /** Where the last channel message came from (used for typing/reactions/errors) */ + + /** routeKey -> route binding */ + private readonly routeBindings = new Map(); + + /** hubConversationId -> runtime state */ + private readonly conversationStates = new Map(); + + /** hubConversationId -> unsubscribe callback */ + private readonly conversationSubscriptions = new Map void>(); + + /** Latest route seen globally (best-effort fallback for send_file) */ private lastRoute: LastRoute | null = null; - /** - * FIFO queue of route snapshots + their ack targets, captured at each debouncer flush. - * Each agent.write() gets its own entry; dequeued on agent_start. - */ - private pendingRoutes: { route: LastRoute; acks: LastRoute[] }[] = []; - /** Route for the currently active agent run (set on agent_start, cleared on agent_end). */ - private activeRoute: LastRoute | null = null; - /** All messages in the current run's batch that have 👀 (cleared on agent_end). */ - private activeAcks: LastRoute[] = []; - /** Accumulates message routes for 👀 removal between debouncer flushes. */ - private ackBuffer: LastRoute[] = []; - /** Unsubscribe function for the agent subscriber */ - private agentUnsubscribe: (() => void) | null = null; - /** Session ID of the currently subscribed agent (for stale detection) */ - private subscribedAgentId: string | null = null; - /** Current aggregator for buffering streaming responses */ - private aggregator: MessageAggregator | null = null; - /** Typing indicator interval (repeats every 5s to keep Telegram typing visible) */ - private typingTimer: ReturnType | null = null; - /** Platform message ID of the editable status message (for tool narration updates) */ - private statusMessageId: string | null = null; - /** - * Inbound message debouncer — batches rapid-fire messages from the same - * conversation into a single agent.write() call. - * Initialized lazily on first message; uses the current agent reference. - */ + + /** Inbound debouncer keyed by routeKey */ private debouncer: InboundDebouncer | null = null; constructor(hub: Hub) { @@ -112,9 +129,6 @@ export class ChannelManager { await this.startAccount(plugin.id, accountId, account); } } - - // Try to subscribe eagerly; if no agent yet, routeIncoming will retry lazily - this.ensureSubscribed(); } /** @@ -173,165 +187,314 @@ export class ChannelManager { } } - /** Get the Hub's current agent (the first active one) */ - private getHubAgent(): AsyncAgent | undefined { - const agentIds = this.hub.listAgents(); - if (agentIds.length === 0) { - console.warn("[Channels] No agent available in Hub"); - return undefined; - } - const agent = this.hub.getAgent(agentIds[0]!); - return agent; + private makeRouteKey(channelId: string, accountId: string, externalConversationId: string): string { + return `${channelId}:${accountId}:${externalConversationId}`; } - /** - * Ensure we're subscribed to the current Hub agent for outbound routing. - * Lazily called from routeIncoming — handles agent not yet available at - * startup and re-subscribes if the agent has changed. - */ - private ensureSubscribed(): void { - const agent = this.getHubAgent(); - if (!agent) return; + private cloneRoute(route: LastRoute): LastRoute { + return { + ...route, + deliveryCtx: { ...route.deliveryCtx }, + }; + } - // Already subscribed to the current agent - if (this.subscribedAgentId === agent.sessionId) return; + private createRoute( + routeKey: string, + plugin: ChannelPlugin, + accountId: string, + externalConversationId: string, + messageId: string, + chatType: "direct" | "group", + hubConversationId: string, + hubAgentId: string, + ): LastRoute { + return { + routeKey, + plugin, + deliveryCtx: { + channel: plugin.id, + accountId, + conversationId: externalConversationId, + replyToMessageId: messageId, + }, + hubConversationId, + hubAgentId, + chatType, + }; + } - // Unsubscribe from stale agent - if (this.agentUnsubscribe) { - console.log(`[Channels] Agent changed, re-subscribing (${this.subscribedAgentId} → ${agent.sessionId})`); - this.agentUnsubscribe(); + private getConversationState(conversationId: string): ConversationState { + const existing = this.conversationStates.get(conversationId); + if (existing) return existing; + + const state: ConversationState = { + pendingRoutes: [], + activeRoute: null, + activeAcks: [], + ackBuffer: [], + aggregator: null, + typingTimer: null, + statusMessageId: null, + }; + this.conversationStates.set(conversationId, state); + return state; + } + + private stopTypingForConversation(conversationId: string): void { + const state = this.conversationStates.get(conversationId); + if (!state?.typingTimer) return; + clearInterval(state.typingTimer); + state.typingTimer = null; + } + + private startTypingForRoute(route: LastRoute): void { + const state = this.getConversationState(route.hubConversationId); + this.stopTypingForConversation(route.hubConversationId); + if (!route.plugin.outbound.sendTyping) return; + + const send = () => route.plugin.outbound.sendTyping!(route.deliveryCtx).catch(() => {}); + void send(); + state.typingTimer = setInterval(send, 5000); + } + + private cleanupConversationState(conversationId: string, options?: { unsubscribe?: boolean }): void { + this.stopTypingForConversation(conversationId); + + const state = this.conversationStates.get(conversationId); + if (state) { + state.pendingRoutes = []; + state.activeRoute = null; + state.activeAcks = []; + state.ackBuffer = []; + state.aggregator = null; + state.statusMessageId = null; + this.conversationStates.delete(conversationId); } - console.log(`[Channels] Subscribing to agent ${agent.sessionId} for outbound routing`); - this.subscribedAgentId = agent.sessionId; + if (options?.unsubscribe) { + const unsubscribe = this.conversationSubscriptions.get(conversationId); + if (unsubscribe) { + unsubscribe(); + this.conversationSubscriptions.delete(conversationId); + } + } + } - this.agentUnsubscribe = agent.subscribe((event) => { - const maybeMessage = (event as { message?: { role?: string } }).message; - const role = maybeMessage?.role; + private resolveDefaultAgentAndConversation(): { agentId: string; conversation?: AsyncAgent } { + const existingAgentId = this.hub.listAgents()[0]; + if (existingAgentId) { + return { agentId: existingAgentId }; + } - // Activate the next pending route + acks when a new agent run starts. - if (event.type === "agent_start") { - const entry = this.pendingRoutes.shift(); - if (entry) { - this.activeRoute = entry.route; - this.activeAcks = entry.acks; - console.log(`[Channels] agent_start: activeRoute replyTo=${entry.route.deliveryCtx.replyToMessageId}, acks=${entry.acks.length}`); - } + const mainConversation = this.hub.createAgent(); + const agentId = this.hub.getConversationAgentId(mainConversation.sessionId) ?? mainConversation.sessionId; + return { agentId, conversation: mainConversation }; + } + + private resolveOrCreateRouteBinding( + plugin: ChannelPlugin, + accountId: string, + externalConversationId: string, + messageId: string, + chatType: "direct" | "group", + ): ResolveRouteResult | null { + const routeKey = this.makeRouteKey(plugin.id, accountId, externalConversationId); + const existing = this.routeBindings.get(routeKey); + + if (existing) { + const existingConversation = this.hub.getConversation(existing.hubConversationId); + if (existingConversation && !existingConversation.closed) { + existing.lastRoute = this.createRoute( + routeKey, + plugin, + accountId, + externalConversationId, + messageId, + chatType, + existing.hubConversationId, + existing.hubAgentId, + ); + this.routeBindings.set(routeKey, existing); + return { binding: existing, conversation: existingConversation }; } - // Agent run complete — remove 👀 from all batch messages, conditionally stop typing. - if (event.type === "agent_end") { - for (const ack of this.activeAcks) { - if (ack.plugin.outbound.removeReaction) { - console.log(`[Channels] agent_end: removing 👀 from replyTo=${ack.deliveryCtx.replyToMessageId}`); - void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); - } - } - this.activeRoute = null; - this.activeAcks = []; - this.statusMessageId = null; - if (this.pendingRoutes.length === 0) { - console.log("[Channels] agent_end: no more pending, stopping typing"); - this.stopTyping(); - } else { - console.log(`[Channels] agent_end: ${this.pendingRoutes.length} pending run(s), keeping typing`); - } - } + // Conversation runtime disappeared — remove stale binding and rebuild. + this.routeBindings.delete(routeKey); + this.cleanupConversationState(existing.hubConversationId, { unsubscribe: true }); + } - // No active channel route — skip (reply goes to desktop/gateway only) - if (!this.lastRoute) return; + const { agentId, conversation: maybeMainConversation } = this.resolveDefaultAgentAndConversation(); + const conversation = maybeMainConversation ?? this.hub.createConversation(undefined, { agentId }); + const hubConversationId = conversation.sessionId; + const hubAgentId = this.hub.getConversationAgentId(hubConversationId) ?? agentId; - // Handle agent errors — notify the channel user - if (event.type === "agent_error") { - this.stopTyping(); - for (const ack of this.activeAcks) { - if (ack.plugin.outbound.removeReaction) { - void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); - } - } - this.activeRoute = null; - this.activeAcks = []; - this.statusMessageId = null; - const errorMsg = (event as { message?: string }).message ?? "Unknown error"; - console.error(`[Channels] Agent error: ${errorMsg}`); - const route = this.lastRoute; - if (route) { - void route.plugin.outbound.sendText(route.deliveryCtx, `[Error] ${errorMsg}`).catch((err) => { - console.error(`[Channels] Failed to send error to channel: ${err}`); - }); - } - return; - } + const binding: RouteBinding = { + routeKey, + hubConversationId, + hubAgentId, + lastRoute: this.createRoute( + routeKey, + plugin, + accountId, + externalConversationId, + messageId, + chatType, + hubConversationId, + hubAgentId, + ), + }; + this.routeBindings.set(routeKey, binding); - // Only forward assistant message events - if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") { - if (role !== "assistant") return; - } else { - // Non-message events (tool_execution etc.) — skip for channels - return; - } + console.log( + `[Channels] route bind: ${routeKey} -> conversation=${hubConversationId} (agent=${hubAgentId})`, + ); - // Keep heartbeat acknowledgements internal (same behavior as desktop/gateway stream path). - if (isHeartbeatAckEvent(event)) { - if (event.type === "message_end") { - this.aggregator = null; - } - return; - } + return { binding, conversation }; + } - // Ensure aggregator exists for this response - if (event.type === "message_start") { - this.createAggregator(); - } + private ensureConversationSubscribed(conversation: AsyncAgent): void { + const conversationId = conversation.sessionId; + if (this.conversationSubscriptions.has(conversationId)) return; - // Tool narration: if the assistant message contains tool_use blocks, - // it's intermediate narration (e.g. "Let me search...") before a tool call. - // Send/edit an editable status message instead of the normal reply flow. - if (event.type === "message_end" && role === "assistant") { - const message = (event as { message?: Parameters[0] }).message; - if (hasToolUse(message)) { - this.aggregator?.reset(); - this.aggregator = null; - - const route = this.activeRoute ?? this.lastRoute; - const narration = extractText(message); - if (route && narration) { - const { plugin, deliveryCtx } = route; - void this.sendOrEditStatus(plugin, deliveryCtx, narration); - } - return; - } - } - - if (this.aggregator) { - this.aggregator.handleEvent(event); - } - - // Finalize aggregator per assistant message (may fire multiple times in multi-turn runs). - // Typing and ack removal are handled at agent_end, not here. - if (event.type === "message_end" && role === "assistant") { - this.aggregator = null; - } + console.log(`[Channels] Subscribing to conversation ${conversationId} for outbound routing`); + const unsubscribe = conversation.subscribe((event) => { + this.handleConversationEvent(conversationId, event); }); + this.conversationSubscriptions.set(conversationId, unsubscribe); } - /** - * Create a fresh aggregator wired to the activeRoute (snapshotted at flush time). - * Falls back to lastRoute for non-debounced paths (e.g. direct writes). - */ - private createAggregator(): void { - const route = this.activeRoute ?? this.lastRoute; + private findRouteForConversation(conversationId: string): LastRoute | null { + for (const binding of this.routeBindings.values()) { + if (binding.hubConversationId === conversationId) { + return this.cloneRoute(binding.lastRoute); + } + } + return null; + } + + private handleConversationEvent(conversationId: string, event: unknown): void { + const state = this.getConversationState(conversationId); + const maybeMessage = (event as { message?: { role?: string } }).message; + const role = maybeMessage?.role; + + // Activate the next pending route + acks when a new agent run starts. + if ((event as { type?: string }).type === "agent_start") { + const entry = state.pendingRoutes.shift(); + if (entry) { + state.activeRoute = entry.route; + state.activeAcks = entry.acks; + console.log( + `[Channels] agent_start: conversation=${conversationId} replyTo=${entry.route.deliveryCtx.replyToMessageId}, acks=${entry.acks.length}`, + ); + } + } + + // Agent run complete — remove 👀 from all batch messages, conditionally stop typing. + if ((event as { type?: string }).type === "agent_end") { + for (const ack of state.activeAcks) { + if (ack.plugin.outbound.removeReaction) { + console.log(`[Channels] agent_end: removing 👀 from replyTo=${ack.deliveryCtx.replyToMessageId}`); + void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); + } + } + state.activeRoute = null; + state.activeAcks = []; + state.statusMessageId = null; + if (state.pendingRoutes.length === 0) { + console.log(`[Channels] agent_end: conversation=${conversationId}, no more pending, stopping typing`); + this.stopTypingForConversation(conversationId); + } else { + console.log( + `[Channels] agent_end: conversation=${conversationId}, ${state.pendingRoutes.length} pending run(s), keeping typing`, + ); + } + } + + const route = state.activeRoute ?? this.findRouteForConversation(conversationId) ?? this.lastRoute; if (!route) return; + // Handle agent errors — notify the channel user + if ((event as { type?: string }).type === "agent_error") { + this.stopTypingForConversation(conversationId); + for (const ack of state.activeAcks) { + if (ack.plugin.outbound.removeReaction) { + void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); + } + } + state.activeRoute = null; + state.activeAcks = []; + state.statusMessageId = null; + const errorMsg = (event as { message?: string }).message ?? "Unknown error"; + console.error(`[Channels] Agent error: ${errorMsg}`); + void route.plugin.outbound.sendText(route.deliveryCtx, `[Error] ${errorMsg}`).catch((err) => { + console.error(`[Channels] Failed to send error to channel: ${err}`); + }); + return; + } + + const eventType = (event as { type?: string }).type; + + // Only forward assistant message events. + if (eventType === "message_start" || eventType === "message_update" || eventType === "message_end") { + if (role !== "assistant") return; + } else { + // Non-message events (tool_execution etc.) — skip for channels. + return; + } + + // Keep heartbeat acknowledgements internal (same behavior as desktop/gateway stream path). + if (isHeartbeatAckEvent(event)) { + if (eventType === "message_end") { + state.aggregator = null; + } + return; + } + + // Ensure aggregator exists for this response. + if (eventType === "message_start") { + this.createAggregator(conversationId, this.cloneRoute(route), state); + } + + // Tool narration: if the assistant message contains tool_use blocks, + // send/edit an editable status message instead of normal reply flow. + if (eventType === "message_end" && role === "assistant") { + const message = (event as { message?: Parameters[0] }).message; + if (hasToolUse(message)) { + state.aggregator?.reset(); + state.aggregator = null; + + const narration = extractText(message as Parameters[0]); + if (narration) { + void this.sendOrEditStatus(conversationId, route, narration); + } + return; + } + } + + if (state.aggregator) { + state.aggregator.handleEvent(event as Parameters[0]); + } + + // Finalize aggregator per assistant message. + if (eventType === "message_end" && role === "assistant") { + state.aggregator = null; + } + } + + private createAggregator(conversationId: string, route: LastRoute, state: ConversationState): void { const { plugin, deliveryCtx } = route; - console.log(`[Channels] createAggregator: replyTo=${deliveryCtx.replyToMessageId} (source=${this.activeRoute ? "activeRoute" : "lastRoute"})`); + console.log( + `[Channels] createAggregator: conversation=${conversationId} replyTo=${deliveryCtx.replyToMessageId}`, + ); const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; - this.aggregator = new MessageAggregator( + state.aggregator = new MessageAggregator( chunkerConfig, async (block) => { try { - console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId} replyTo=${deliveryCtx.replyToMessageId}`); + console.log( + `[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) -> ${deliveryCtx.channel}:${deliveryCtx.conversationId} replyTo=${deliveryCtx.replyToMessageId}`, + ); if (block.index === 0) { await plugin.outbound.replyText(deliveryCtx, block.text); } else { @@ -345,34 +508,29 @@ export class ChannelManager { ); } - /** - * Send or edit a status message for tool narration. - * First call sends a new editable reply; subsequent calls edit the same message. - * Falls back to no-op if the plugin doesn't support editable messages. - */ private async sendOrEditStatus( - plugin: ChannelPlugin, - deliveryCtx: DeliveryContext, + conversationId: string, + route: LastRoute, text: string, ): Promise { + const state = this.getConversationState(conversationId); + try { - if (this.statusMessageId && plugin.outbound.editText) { - console.log(`[Channels] Editing status message ${this.statusMessageId}`); - await plugin.outbound.editText(deliveryCtx, this.statusMessageId, text); - } else if (plugin.outbound.replyTextEditable) { - const msgId = await plugin.outbound.replyTextEditable(deliveryCtx, text); - this.statusMessageId = msgId; + if (state.statusMessageId && route.plugin.outbound.editText) { + console.log(`[Channels] Editing status message ${state.statusMessageId}`); + await route.plugin.outbound.editText(route.deliveryCtx, state.statusMessageId, text); + } else if (route.plugin.outbound.replyTextEditable) { + const msgId = await route.plugin.outbound.replyTextEditable(route.deliveryCtx, text); + state.statusMessageId = msgId; console.log(`[Channels] Sent editable status message ${msgId}`); } - // If plugin doesn't support editable messages, silently skip (typing indicator still active) + // If plugin doesn't support editable messages, silently skip. } catch (err) { console.error(`[Channels] Failed to send/edit status: ${err}`); } } - /** - * Incoming channel message → update lastRoute → forward to Hub's agent. - */ + /** Incoming channel message -> routeKey binding -> Hub conversation write. */ private routeIncoming( plugin: ChannelPlugin, accountId: string, @@ -383,163 +541,181 @@ export class ChannelManager { `[Channels] Incoming: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, ); - const agent = this.getHubAgent(); - if (!agent) { - console.error("[Channels] No agent available, dropping message"); + const resolved = this.resolveOrCreateRouteBinding( + plugin, + accountId, + conversationId, + messageId, + message.chatType, + ); + if (!resolved) { + console.error("[Channels] Failed to resolve conversation route, dropping message"); return; } - // Ensure we're subscribed to this agent (handles late startup / agent change) - this.ensureSubscribed(); + const { binding, conversation } = resolved; + this.ensureConversationSubscribed(conversation); - // Update last route — replies will go back here - this.lastRoute = { - plugin, - deliveryCtx: { - channel: plugin.id, - accountId, - conversationId, - replyToMessageId: messageId, - }, - chatType: message.chatType, - }; - console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId} replyTo=${messageId}`); - console.log(`[Channels] Forwarding to agent ${agent.sessionId}`); + const routeSnapshot = this.cloneRoute(binding.lastRoute); + this.lastRoute = routeSnapshot; + const state = this.getConversationState(binding.hubConversationId); - // Show typing indicator and 👀 ack on this message - this.startTyping(); - const ackRoute: LastRoute = { ...this.lastRoute }; - if (ackRoute.plugin.outbound.addReaction) { + console.log( + `[Channels] route selected: ${binding.routeKey} -> conversation=${binding.hubConversationId} (agent=${binding.hubAgentId})`, + ); + + // Show typing indicator and 👀 ack on this message. + this.startTypingForRoute(routeSnapshot); + if (routeSnapshot.plugin.outbound.addReaction) { console.log(`[Channels] Adding 👀 to replyTo=${messageId}`); - void ackRoute.plugin.outbound.addReaction(ackRoute.deliveryCtx, "👀").catch(() => {}); + void routeSnapshot.plugin.outbound.addReaction(routeSnapshot.deliveryCtx, "👀").catch(() => {}); } - this.ackBuffer.push(ackRoute); + state.ackBuffer.push(routeSnapshot); - // Handle media messages (processed async, then fed through debouncer) + // Handle media messages (processed async, then fed through debouncer). if (message.media && plugin.downloadMedia) { - void this.routeMedia(plugin, accountId, message, agent); + void this.routeMedia(plugin, accountId, message, binding.routeKey); } else { - // Text messages go through debouncer to batch rapid-fire sends - this.getDebouncer(agent).push(conversationId, text); + // Text messages go through debouncer to batch rapid-fire sends. + this.getDebouncer().push(binding.routeKey, text); } } /** * Download media file, process it (transcribe/describe), and forward - * the resulting text through the debouncer to the agent. - * Media results are also debounced so that a rapid "photo + text" combo - * from the same conversation gets batched into one agent prompt. + * the resulting text through the debouncer. */ private async routeMedia( plugin: ChannelPlugin, accountId: string, message: ChannelMessage, - agent: AsyncAgent, + routeKey: string, ): Promise { const media = message.media!; - const debouncer = this.getDebouncer(agent); + const debouncer = this.getDebouncer(); try { const filePath = await plugin.downloadMedia!(media.fileId, accountId); if (media.type === "image") { - // Images: describe via Vision API before reaching agent + // Images: describe via Vision API before reaching agent. const description = await describeImage(filePath); if (description) { const parts = ["[Image]", `Description: ${description}`]; if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } else { - // No API key — fall back to file path + // No API key — fall back to file path. const parts = ["[image message received]", `File: ${filePath}`]; if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } } else if (media.type === "audio") { - // Audio: transcribe via Whisper API before reaching agent + // Audio: transcribe via Whisper API before reaching agent. const transcript = await transcribeAudio(filePath); if (transcript) { const parts = ["[Voice Message]", `Transcript: ${transcript}`]; if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } else { - // No API key configured — fall back to file path + // No API key configured — fall back to file path. const parts = ["[audio message received]", `File: ${filePath}`]; if (media.mimeType) parts.push(`Type: ${media.mimeType}`); if (media.duration) parts.push(`Duration: ${media.duration}s`); if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } } else if (media.type === "video") { - // Video: extract frame + describe via Vision API + // Video: extract frame + describe via Vision API. const description = await describeVideo(filePath); if (description) { const parts = ["[Video]", `Description: ${description}`]; if (media.duration) parts.push(`Duration: ${media.duration}s`); if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } else { - // ffmpeg unavailable or no API key — fall back to file path + // ffmpeg unavailable or no API key — fall back to file path. const parts = ["[video message received]", `File: ${filePath}`]; if (media.mimeType) parts.push(`Type: ${media.mimeType}`); if (media.duration) parts.push(`Duration: ${media.duration}s`); if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } } else { - // Document: tell agent the file path + // Document: tell agent the file path. const parts: string[] = []; - parts.push(`[document message received]`); + parts.push("[document message received]"); parts.push(`File: ${filePath}`); if (media.mimeType) parts.push(`Type: ${media.mimeType}`); if (media.caption) parts.push(`Caption: ${media.caption}`); - debouncer.push(message.conversationId, parts.join("\n")); + debouncer.push(routeKey, parts.join("\n")); } } catch (err) { const msg = err instanceof Error ? err.message : String(err); console.error(`[Channels] Failed to process media: ${msg}`); - debouncer.push(message.conversationId, message.text || `[Failed to process ${media.type}]`); + debouncer.push(routeKey, message.text || `[Failed to process ${media.type}]`); } } /** - * Get or create the inbound debouncer, wired to the given agent. - * The debouncer batches rapid-fire messages by conversationId, then - * calls agent.write() once with the combined text. + * Get or create inbound debouncer. + * Batches rapid-fire messages by routeKey, then writes once to the bound Hub conversation. */ - private getDebouncer(agent: AsyncAgent): InboundDebouncer { + private getDebouncer(): InboundDebouncer { if (!this.debouncer) { this.debouncer = new InboundDebouncer( - (_conversationId, combinedText) => { - // Snapshot the current route + pending acks for this batch. - const route = this.lastRoute ? { ...this.lastRoute } : null; - const acks = [...this.ackBuffer]; - this.ackBuffer = []; - const source = route ? { + (routeKey, combinedText) => { + const binding = this.routeBindings.get(routeKey); + if (!binding) { + console.warn(`[Channels] Debouncer flush dropped: unknown routeKey=${routeKey}`); + return; + } + + const conversation = this.hub.getConversation(binding.hubConversationId); + if (!conversation || conversation.closed) { + console.warn( + `[Channels] Debouncer flush dropped: conversation unavailable ${binding.hubConversationId}`, + ); + this.routeBindings.delete(routeKey); + this.cleanupConversationState(binding.hubConversationId, { unsubscribe: true }); + return; + } + + this.ensureConversationSubscribed(conversation); + + const state = this.getConversationState(binding.hubConversationId); + const route = this.cloneRoute(binding.lastRoute); + const acks = [...state.ackBuffer]; + state.ackBuffer = []; + + state.pendingRoutes.push({ route, acks }); + + const source = { type: "channel" as const, channelId: route.plugin.id, accountId: route.deliveryCtx.accountId, conversationId: route.deliveryCtx.conversationId, - } : undefined; - if (route) { - this.pendingRoutes.push({ route, acks }); - // Broadcast inbound message to local listeners (Desktop UI) - this.hub.broadcastInbound({ - agentId: agent.sessionId, - conversationId: agent.sessionId, - content: combinedText, - source: source!, - timestamp: Date.now(), - }); - } - // Prepend source context so the LLM knows which platform/chat type the message came from - const channelName = route?.plugin.meta.name ?? "Channel"; - const chatLabel = route?.chatType === "group" ? "group" : "private"; + }; + + // Broadcast inbound message to local listeners (Desktop UI). + this.hub.broadcastInbound({ + agentId: binding.hubAgentId, + conversationId: binding.hubConversationId, + content: combinedText, + source, + timestamp: Date.now(), + }); + + // Prepend source context so the LLM knows platform + chat type. + const channelName = route.plugin.meta.name ?? "Channel"; + const chatLabel = route.chatType === "group" ? "group" : "private"; const prefixedText = `[${channelName} · ${chatLabel}]\n${combinedText}`; - const replyTo = route?.deliveryCtx.replyToMessageId ?? "?"; - console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`); - agent.write(prefixedText, { source }); + const replyTo = route.deliveryCtx.replyToMessageId ?? "?"; + console.log( + `[Channels] Debouncer flushing ${combinedText.length} chars to conversation=${binding.hubConversationId} (route replyTo=${replyTo}, acks=${acks.length})`, + ); + conversation.write(prefixedText, { source }); }, ); } @@ -547,11 +723,22 @@ export class ChannelManager { } /** - * Send a file to the active channel conversation. + * Send a file to the active channel route. * Returns true if the file was sent, false if no active route or plugin doesn't support media. */ async sendFile(filePath: string, caption?: string, type?: string): Promise { - const route = this.activeRoute ?? this.lastRoute; + let route: LastRoute | null = null; + + for (const state of this.conversationStates.values()) { + if (state.activeRoute) { + route = state.activeRoute; + break; + } + } + + if (!route) { + route = this.lastRoute; + } if (!route) return false; const { plugin, deliveryCtx } = route; @@ -584,53 +771,45 @@ export class ChannelManager { return "document"; } - /** Start sending typing indicators (repeats every 5s until stopped) */ - private startTyping(): void { - this.stopTyping(); - const route = this.lastRoute; - if (!route?.plugin.outbound.sendTyping) return; - - const send = () => route.plugin.outbound.sendTyping!(route.deliveryCtx).catch(() => {}); - void send(); - this.typingTimer = setInterval(send, 5000); - } - - /** Stop typing indicator interval */ - private stopTyping(): void { - if (this.typingTimer) { - clearInterval(this.typingTimer); - this.typingTimer = null; - } - } - /** * Stop a specific channel account. * Public so the desktop IPC layer can call it when removing config. - * Cleans up typing timer, debouncer, aggregator, and lastRoute if they - * belong to this account. */ stopAccount(channelId: string, accountId: string): void { const key = `${channelId}:${accountId}`; const handle = this.accounts.get(key); if (!handle) return; - // Clean up shared resources if they target this account - if (this.lastRoute && this.lastRoute.plugin.id === channelId && this.lastRoute.deliveryCtx.accountId === accountId) { - this.stopTyping(); + const removedConversationIds = new Set(); + for (const [routeKey, binding] of this.routeBindings.entries()) { + const route = binding.lastRoute; + if (route.plugin.id === channelId && route.deliveryCtx.accountId === accountId) { + this.routeBindings.delete(routeKey); + removedConversationIds.add(binding.hubConversationId); + } + } + + for (const conversationId of removedConversationIds) { + const stillBound = Array.from(this.routeBindings.values()) + .some((binding) => binding.hubConversationId === conversationId); + if (!stillBound) { + this.cleanupConversationState(conversationId, { unsubscribe: true }); + } + } + + if ( + this.lastRoute + && this.lastRoute.plugin.id === channelId + && this.lastRoute.deliveryCtx.accountId === accountId + ) { + this.stopTypingForConversation(this.lastRoute.hubConversationId); this.lastRoute = null; - this.activeRoute = null; - this.activeAcks = []; - this.ackBuffer = []; - this.pendingRoutes = []; - this.aggregator = null; - this.statusMessageId = null; } handle.abortController.abort(); handle.state = { ...handle.state, status: "stopped" }; this.accounts.delete(key); - // Dispose debouncer if no accounts remain if (this.accounts.size === 0 && this.debouncer) { this.debouncer.dispose(); this.debouncer = null; @@ -642,32 +821,35 @@ export class ChannelManager { /** Stop all running channel accounts */ stopAll(): void { console.log("[Channels] Stopping all channels..."); - this.stopTyping(); + this.debouncer?.dispose(); this.debouncer = null; - if (this.agentUnsubscribe) { - this.agentUnsubscribe(); - this.agentUnsubscribe = null; + + for (const unsubscribe of this.conversationSubscriptions.values()) { + unsubscribe(); } + this.conversationSubscriptions.clear(); + + for (const conversationId of this.conversationStates.keys()) { + this.stopTypingForConversation(conversationId); + } + this.conversationStates.clear(); + for (const [key, handle] of this.accounts) { handle.abortController.abort(); handle.state = { ...handle.state, status: "stopped" }; console.log(`[Channels] Stopped ${key}`); } + this.accounts.clear(); + this.routeBindings.clear(); this.lastRoute = null; - this.activeRoute = null; - this.activeAcks = []; - this.ackBuffer = []; - this.pendingRoutes = []; - this.aggregator = null; - this.statusMessageId = null; } /** Clear the last route (e.g. when desktop user sends a message) */ clearLastRoute(): void { if (this.lastRoute) { - this.stopTyping(); + this.stopTypingForConversation(this.lastRoute.hubConversationId); console.log("[Channels] lastRoute cleared (non-channel message received)"); this.lastRoute = null; }