diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 82f440cf..de371f9a 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -1,14 +1,11 @@ /** - * Channel Manager — orchestrates channel plugin lifecycles and message routing. + * Channel Manager — bridges messaging channels to the Hub's agent. * - * For each configured channel account: - * 1. Starts the gateway adapter (receive messages) - * 2. Routes incoming messages to per-conversation Agents - * 3. Collects Agent responses via MessageAggregator - * 4. Sends responses back via the outbound adapter + * Design: One Hub, one Agent. Channels are just alternative input/output surfaces. + * - Incoming: channel message → agent.write(text) (same as desktop/gateway) + * - Outgoing: agent reply → check lastRoute → forward to originating channel * - * Channel is just a messenger — it doesn't manage context or history. - * That's the Agent's job. + * Uses "last route" pattern: whoever sent the last message gets the reply. */ import type { Hub } from "../hub/hub.js"; @@ -30,12 +27,22 @@ interface AccountHandle { state: ChannelAccountState; } +/** Tracks where the last message came from, so replies go back there. */ +interface LastRoute { + plugin: ChannelPlugin; + deliveryCtx: DeliveryContext; +} + export class ChannelManager { private readonly hub: Hub; /** Running accounts keyed by "channelId:accountId" */ private readonly accounts = new Map(); - /** Agents keyed by "channelId:conversationId" for per-conversation isolation */ - private readonly conversationAgents = new Map(); + /** Where the last channel message came from (reply target) */ + private lastRoute: LastRoute | null = null; + /** Unsubscribe function for the agent subscriber */ + private agentUnsubscribe: (() => void) | null = null; + /** Current aggregator for buffering streaming responses */ + private aggregator: MessageAggregator | null = null; constructor(hub: Hub) { this.hub = hub; @@ -68,6 +75,9 @@ export class ChannelManager { await this.startAccount(plugin.id, accountId, account); } } + + // Subscribe to the Hub's agent for outbound routing + this.subscribeToAgent(); } /** Start a specific channel account */ @@ -100,8 +110,6 @@ export class ChannelManager { console.log(`[Channels] Starting ${key}`); try { - // Start gateway — this begins receiving messages - // The promise may resolve immediately (polling started) or stay pending (long-connection) const startPromise = plugin.gateway.start( accountId, accountConfig, @@ -111,8 +119,6 @@ export class ChannelManager { abortController.signal, ); - // Don't await forever — the start() might be long-running (e.g. polling loop) - // Give it a moment to fail fast if credentials are wrong await Promise.race([ startPromise, new Promise((resolve) => setTimeout(resolve, 3000)), @@ -127,36 +133,102 @@ export class ChannelManager { } } - /** Stop all running channel accounts */ - stopAll(): void { - console.log("[Channels] Stopping all channels..."); - for (const [key, handle] of this.accounts) { - handle.abortController.abort(); - handle.state = { ...handle.state, status: "stopped" }; - console.log(`[Channels] Stopped ${key}`); + /** Get the Hub's current agent (the first active one) */ + private getHubAgent(): AsyncAgent | undefined { + const agentIds = this.hub.listAgents(); + if (agentIds.length === 0) { + console.warn("[Channels] No agent available in Hub"); + return undefined; } - this.accounts.clear(); - this.conversationAgents.clear(); - } - - /** Get or create an Agent for a specific conversation */ - private getOrCreateAgent(channelId: string, conversationId: string): AsyncAgent { - const key = `${channelId}:${conversationId}`; - const existing = this.conversationAgents.get(key); - if (existing && !existing.closed) { - return existing; - } - - const agent = this.hub.createAgent(); - this.conversationAgents.set(key, agent); + const agent = this.hub.getAgent(agentIds[0]!); return agent; } /** - * Route an incoming message to the appropriate Agent and wire the response - * back to the channel via MessageAggregator. - * - * This is the core bridge logic — generalized for any channel. + * Subscribe to the Hub's agent events (once, persistent). + * When AI replies and lastRoute points to a channel, forward the reply there. + */ + private subscribeToAgent(): void { + const agent = this.getHubAgent(); + if (!agent) { + console.warn("[Channels] No agent to subscribe to, channel replies will not be routed"); + return; + } + + console.log(`[Channels] Subscribing to agent ${agent.sessionId} for outbound routing`); + + this.agentUnsubscribe = agent.subscribe((event) => { + // No active channel route — skip (reply goes to desktop/gateway only) + if (!this.lastRoute) return; + + // Handle agent errors — notify the channel user + if (event.type === "agent_error") { + const errorMsg = (event as { error?: string }).error ?? "Unknown error"; + console.error(`[Channels] Agent error: ${errorMsg}`); + const route = this.lastRoute; + if (route) { + void route.plugin.outbound.sendText(route.deliveryCtx, `[Error] ${errorMsg}`).catch((err) => { + console.error(`[Channels] Failed to send error to channel: ${err}`); + }); + } + return; + } + + const maybeMessage = (event as { message?: { role?: string } }).message; + const role = maybeMessage?.role; + + // Only forward assistant message events + if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") { + if (role !== "assistant") return; + } else { + // Non-message events (tool_execution etc.) — skip for channels + return; + } + + // Ensure aggregator exists for this response + if (event.type === "message_start") { + this.createAggregator(); + } + + if (this.aggregator) { + this.aggregator.handleEvent(event); + } + + // Clean up after response complete + if (event.type === "message_end" && role === "assistant") { + this.aggregator = null; + } + }); + } + + /** Create a fresh aggregator wired to the current lastRoute */ + private createAggregator(): void { + const route = this.lastRoute; + if (!route) return; + + const { plugin, deliveryCtx } = route; + const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; + + this.aggregator = new MessageAggregator( + chunkerConfig, + async (block) => { + try { + console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId}`); + if (block.index === 0) { + await plugin.outbound.replyText(deliveryCtx, block.text); + } else { + await plugin.outbound.sendText(deliveryCtx, block.text); + } + } catch (err) { + console.error(`[Channels] Failed to send reply: ${err}`); + } + }, + () => {}, + ); + } + + /** + * Incoming channel message → update lastRoute → forward to Hub's agent. */ private routeIncoming( plugin: ChannelPlugin, @@ -165,67 +237,57 @@ export class ChannelManager { ): void { const { conversationId, senderId, text, messageId } = message; console.log( - `[Channels] Incoming message: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, + `[Channels] Incoming: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, ); - // Find or create Agent for this conversation - const agent = this.getOrCreateAgent(plugin.id, conversationId); - const isNew = !this.conversationAgents.has(`${plugin.id}:${conversationId}`) ? "new" : "existing"; - console.log(`[Channels] Routing to agent: key=${plugin.id}:${conversationId} agentId=${agent.sessionId} (${isNew})`); + const agent = this.getHubAgent(); + if (!agent) { + console.error("[Channels] No agent available, dropping message"); + return; + } - // Build delivery context for outbound replies - const deliveryCtx: DeliveryContext = { - channel: plugin.id, - accountId, - conversationId, - replyToMessageId: messageId, + // Update last route — replies will go back here + this.lastRoute = { + plugin, + deliveryCtx: { + channel: plugin.id, + accountId, + conversationId, + replyToMessageId: messageId, + }, }; + console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`); + console.log(`[Channels] Forwarding to agent ${agent.sessionId}`); - // Use channel-specific chunker config or defaults - const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; - - // Create a fresh aggregator for this message's response - const aggregator = new MessageAggregator( - chunkerConfig, - async (block) => { - try { - console.log(`[Channels] Block ${block.index} ready (${block.text.length} chars${block.isFinal ? ", final" : ""}), sending reply`); - if (block.index === 0) { - await plugin.outbound.replyText(deliveryCtx, block.text); - } else { - await plugin.outbound.sendText(deliveryCtx, block.text); - } - if (block.isFinal) { - console.log(`[Channels] Response complete: channel=${plugin.id} conv=${conversationId} blocks=${block.index + 1}`); - } - } catch (err) { - console.error(`[Channels] Failed to send reply: ${err}`); - } - }, - (_event) => { - // Pass-through events (tool_execution, compaction, etc.) - // Could add typing indicators per-channel later - }, - ); - - // Subscribe to agent events BEFORE writing the message - console.log("[Channels] Agent subscribed, sending message to agent"); - const unsubscribe = agent.subscribe((event) => { - aggregator.handleEvent(event); - - // Unsubscribe after the response is complete - if (event.type === "message_end") { - const maybeMessage = (event as { message?: { role?: string } }).message; - if (maybeMessage?.role === "assistant") { - unsubscribe(); - } - } - }); - - // Send user message to the agent + // Same as typing in the desktop chat agent.write(text); } + /** Stop all running channel accounts */ + stopAll(): void { + console.log("[Channels] Stopping all channels..."); + if (this.agentUnsubscribe) { + this.agentUnsubscribe(); + this.agentUnsubscribe = null; + } + for (const [key, handle] of this.accounts) { + handle.abortController.abort(); + handle.state = { ...handle.state, status: "stopped" }; + console.log(`[Channels] Stopped ${key}`); + } + this.accounts.clear(); + this.lastRoute = null; + this.aggregator = null; + } + + /** Clear the last route (e.g. when desktop user sends a message) */ + clearLastRoute(): void { + if (this.lastRoute) { + console.log("[Channels] lastRoute cleared (non-channel message received)"); + this.lastRoute = null; + } + } + /** Get status of all accounts */ listAccountStates(): ChannelAccountState[] { return Array.from(this.accounts.values()).map((h) => ({ ...h.state })); diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 2bee8078..780a3c1b 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -197,6 +197,7 @@ export class Hub { const agent = this.agents.get(agentId); if (agent && !agent.closed) { this.agentSenders.set(agentId, msg.from); + this.channelManager.clearLastRoute(); agent.write(content); } else { console.warn(`[Hub] Agent not found or closed: ${agentId}`); @@ -323,12 +324,12 @@ export class Hub { content: item.content, }); } else { - // Compaction events: forward with synthetic streamId (no stream tracking) - const isCompactionEvent = - item.type === "compaction_start" || item.type === "compaction_end"; - if (isCompactionEvent) { + // Passthrough events: forward with synthetic streamId (no stream tracking) + const isPassthroughEvent = + item.type === "compaction_start" || item.type === "compaction_end" || item.type === "agent_error"; + if (isPassthroughEvent) { this.client.send(targetDeviceId, StreamAction, { - streamId: `compaction:${agent.sessionId}`, + streamId: `system:${agent.sessionId}`, agentId: agent.sessionId, event: item, });