From 971d68b605e7b5ebecb9bfcd3bfb80fcccf1a25a Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 15:47:41 +0800 Subject: [PATCH] feat(channels): add ChannelManager and Telegram plugin ChannelManager orchestrates channel lifecycles and routes messages to per-conversation Agents. Telegram plugin uses grammy for long polling with group @mention detection. Co-Authored-By: Claude Opus 4.6 --- src/channels/index.ts | 25 ++++ src/channels/manager.ts | 233 +++++++++++++++++++++++++++++++ src/channels/plugins/telegram.ts | 137 ++++++++++++++++++ 3 files changed, 395 insertions(+) create mode 100644 src/channels/index.ts create mode 100644 src/channels/manager.ts create mode 100644 src/channels/plugins/telegram.ts diff --git a/src/channels/index.ts b/src/channels/index.ts new file mode 100644 index 00000000..3bbdd20f --- /dev/null +++ b/src/channels/index.ts @@ -0,0 +1,25 @@ +/** + * Channel system bootstrap and exports. + */ + +export { ChannelManager } from "./manager.js"; +export { registerChannel, getChannel, listChannels } from "./registry.js"; +export { loadChannelsConfig, CHANNELS_CONFIG_PATH } from "./config.js"; +export type { + ChannelPlugin, + ChannelMessage, + DeliveryContext, + ChannelAccountState, + ChannelsConfig, +} from "./types.js"; + +// Built-in channel plugins +import { registerChannel } from "./registry.js"; +import { telegramChannel } from "./plugins/telegram.js"; + +/** Register all built-in channel plugins. Call once at startup. */ +export function initChannels(): void { + registerChannel(telegramChannel); + // Future: registerChannel(discordChannel); + // Future: registerChannel(feishuChannel); +} diff --git a/src/channels/manager.ts b/src/channels/manager.ts new file mode 100644 index 00000000..82f440cf --- /dev/null +++ b/src/channels/manager.ts @@ -0,0 +1,233 @@ +/** + * Channel Manager — orchestrates channel plugin lifecycles and message routing. + * + * For each configured channel account: + * 1. Starts the gateway adapter (receive messages) + * 2. Routes incoming messages to per-conversation Agents + * 3. Collects Agent responses via MessageAggregator + * 4. Sends responses back via the outbound adapter + * + * Channel is just a messenger — it doesn't manage context or history. + * That's the Agent's job. + */ + +import type { Hub } from "../hub/hub.js"; +import type { + ChannelPlugin, + ChannelMessage, + ChannelAccountState, + DeliveryContext, +} from "./types.js"; +import { listChannels } from "./registry.js"; +import { loadChannelsConfig } from "./config.js"; +import { MessageAggregator, DEFAULT_CHUNKER_CONFIG } from "../hub/message-aggregator.js"; +import type { AsyncAgent } from "../agent/async-agent.js"; + +interface AccountHandle { + channelId: string; + accountId: string; + abortController: AbortController; + state: ChannelAccountState; +} + +export class ChannelManager { + private readonly hub: Hub; + /** Running accounts keyed by "channelId:accountId" */ + private readonly accounts = new Map(); + /** Agents keyed by "channelId:conversationId" for per-conversation isolation */ + private readonly conversationAgents = new Map(); + + constructor(hub: Hub) { + this.hub = hub; + } + + /** Start all configured channel accounts */ + async startAll(): Promise { + console.log("[Channels] Starting all channels..."); + const config = loadChannelsConfig(); + const plugins = listChannels(); + + if (plugins.length === 0) { + console.log("[Channels] No plugins registered"); + return; + } + + for (const plugin of plugins) { + const accountIds = plugin.config.listAccountIds(config); + if (accountIds.length === 0) { + console.log(`[Channels] Skipping ${plugin.id} (not configured)`); + continue; + } + + for (const accountId of accountIds) { + const account = plugin.config.resolveAccount(config, accountId); + if (!account || !plugin.config.isConfigured(account)) { + console.log(`[Channels] Skipping ${plugin.id}:${accountId} (incomplete config)`); + continue; + } + await this.startAccount(plugin.id, accountId, account); + } + } + } + + /** Start a specific channel account */ + private async startAccount( + channelId: string, + accountId: string, + accountConfig: Record, + ): Promise { + const key = `${channelId}:${accountId}`; + if (this.accounts.has(key)) { + console.warn(`[Channels] ${key} is already running`); + return; + } + + const plugin = listChannels().find((p) => p.id === channelId); + if (!plugin) { + console.error(`[Channels] Plugin "${channelId}" not found`); + return; + } + + const abortController = new AbortController(); + const handle: AccountHandle = { + channelId, + accountId, + abortController, + state: { channelId, accountId, status: "starting" }, + }; + this.accounts.set(key, handle); + + console.log(`[Channels] Starting ${key}`); + + try { + // Start gateway — this begins receiving messages + // The promise may resolve immediately (polling started) or stay pending (long-connection) + const startPromise = plugin.gateway.start( + accountId, + accountConfig, + (message: ChannelMessage) => { + this.routeIncoming(plugin, accountId, message); + }, + abortController.signal, + ); + + // Don't await forever — the start() might be long-running (e.g. polling loop) + // Give it a moment to fail fast if credentials are wrong + await Promise.race([ + startPromise, + new Promise((resolve) => setTimeout(resolve, 3000)), + ]); + + handle.state = { channelId, accountId, status: "running" }; + console.log(`[Channels] ${key} is running`); + } catch (err) { + const errorMsg = err instanceof Error ? err.message : String(err); + handle.state = { channelId, accountId, status: "error", error: errorMsg }; + console.error(`[Channels] Failed to start ${key}: ${errorMsg}`); + } + } + + /** Stop all running channel accounts */ + stopAll(): void { + console.log("[Channels] Stopping all channels..."); + for (const [key, handle] of this.accounts) { + handle.abortController.abort(); + handle.state = { ...handle.state, status: "stopped" }; + console.log(`[Channels] Stopped ${key}`); + } + this.accounts.clear(); + this.conversationAgents.clear(); + } + + /** Get or create an Agent for a specific conversation */ + private getOrCreateAgent(channelId: string, conversationId: string): AsyncAgent { + const key = `${channelId}:${conversationId}`; + const existing = this.conversationAgents.get(key); + if (existing && !existing.closed) { + return existing; + } + + const agent = this.hub.createAgent(); + this.conversationAgents.set(key, agent); + return agent; + } + + /** + * Route an incoming message to the appropriate Agent and wire the response + * back to the channel via MessageAggregator. + * + * This is the core bridge logic — generalized for any channel. + */ + private routeIncoming( + plugin: ChannelPlugin, + accountId: string, + message: ChannelMessage, + ): void { + const { conversationId, senderId, text, messageId } = message; + console.log( + `[Channels] Incoming message: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, + ); + + // Find or create Agent for this conversation + const agent = this.getOrCreateAgent(plugin.id, conversationId); + const isNew = !this.conversationAgents.has(`${plugin.id}:${conversationId}`) ? "new" : "existing"; + console.log(`[Channels] Routing to agent: key=${plugin.id}:${conversationId} agentId=${agent.sessionId} (${isNew})`); + + // Build delivery context for outbound replies + const deliveryCtx: DeliveryContext = { + channel: plugin.id, + accountId, + conversationId, + replyToMessageId: messageId, + }; + + // Use channel-specific chunker config or defaults + const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; + + // Create a fresh aggregator for this message's response + const aggregator = new MessageAggregator( + chunkerConfig, + async (block) => { + try { + console.log(`[Channels] Block ${block.index} ready (${block.text.length} chars${block.isFinal ? ", final" : ""}), sending reply`); + if (block.index === 0) { + await plugin.outbound.replyText(deliveryCtx, block.text); + } else { + await plugin.outbound.sendText(deliveryCtx, block.text); + } + if (block.isFinal) { + console.log(`[Channels] Response complete: channel=${plugin.id} conv=${conversationId} blocks=${block.index + 1}`); + } + } catch (err) { + console.error(`[Channels] Failed to send reply: ${err}`); + } + }, + (_event) => { + // Pass-through events (tool_execution, compaction, etc.) + // Could add typing indicators per-channel later + }, + ); + + // Subscribe to agent events BEFORE writing the message + console.log("[Channels] Agent subscribed, sending message to agent"); + const unsubscribe = agent.subscribe((event) => { + aggregator.handleEvent(event); + + // Unsubscribe after the response is complete + if (event.type === "message_end") { + const maybeMessage = (event as { message?: { role?: string } }).message; + if (maybeMessage?.role === "assistant") { + unsubscribe(); + } + } + }); + + // Send user message to the agent + agent.write(text); + } + + /** Get status of all accounts */ + listAccountStates(): ChannelAccountState[] { + return Array.from(this.accounts.values()).map((h) => ({ ...h.state })); + } +} diff --git a/src/channels/plugins/telegram.ts b/src/channels/plugins/telegram.ts new file mode 100644 index 00000000..dde00244 --- /dev/null +++ b/src/channels/plugins/telegram.ts @@ -0,0 +1,137 @@ +/** + * Telegram channel plugin. + * + * Uses grammy to connect to Telegram Bot API via long polling. + * - Private chats: all messages are processed + * - Group chats: only messages that @mention the bot or reply to the bot + */ + +import { Bot } from "grammy"; +import type { ChannelPlugin, ChannelMessage, ChannelConfigAdapter, ChannelsConfig, DeliveryContext } from "../types.js"; + +/** Telegram account config shape */ +interface TelegramAccountConfig { + botToken: string; +} + +/** Keep bot instances per account for outbound use */ +const bots = new Map(); + +export const telegramChannel: ChannelPlugin = { + id: "telegram", + meta: { + name: "Telegram", + description: "Telegram bot integration via long polling", + }, + + config: { + listAccountIds(config: ChannelsConfig): string[] { + const section = config["telegram"]; + return section ? Object.keys(section) : []; + }, + + resolveAccount(config: ChannelsConfig, accountId: string): Record | undefined { + return config["telegram"]?.[accountId]; + }, + + isConfigured(account: Record): boolean { + return Boolean((account as unknown as TelegramAccountConfig).botToken); + }, + } satisfies ChannelConfigAdapter, + + gateway: { + async start( + accountId: string, + config: Record, + onMessage: (message: ChannelMessage) => void, + signal: AbortSignal, + ): Promise { + const { botToken } = config as unknown as TelegramAccountConfig; + + const bot = new Bot(botToken); + bots.set(accountId, bot); + + // Get bot info for mention detection + const botInfo = await bot.api.getMe(); + const botUsername = botInfo.username; + console.log(`[Telegram] Starting bot: @${botUsername}`); + + // Handle text messages + bot.on("message:text", (ctx) => { + const msg = ctx.message; + const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; + + // In groups, only respond if bot is mentioned or replied to + if (isGroup) { + const isMentioned = msg.entities?.some( + (e) => + e.type === "mention" && + msg.text.substring(e.offset, e.offset + e.length).toLowerCase() === `@${botUsername?.toLowerCase()}`, + ); + const isReplyToBot = msg.reply_to_message?.from?.is_bot === true; + + if (!isMentioned && !isReplyToBot) { + return; // Ignore group messages not directed at bot + } + console.log(`[Telegram] Received message: chatId=${msg.chat.id} from=${msg.from?.id} type=group text="${msg.text.slice(0, 50)}"`); + } else { + console.log(`[Telegram] Received message: chatId=${msg.chat.id} from=${msg.from?.id} type=direct text="${msg.text.slice(0, 50)}"`); + } + + // Strip @mention from text for cleaner agent input + let text = msg.text; + if (botUsername) { + text = text.replace(new RegExp(`@${botUsername}\\s*`, "gi"), "").trim(); + } + if (!text) return; + + onMessage({ + messageId: String(msg.message_id), + conversationId: String(msg.chat.id), + senderId: String(msg.from?.id ?? "unknown"), + text, + chatType: isGroup ? "group" : "direct", + }); + }); + + // Graceful shutdown on abort + signal.addEventListener("abort", () => { + console.log("[Telegram] Bot stopped"); + bot.stop(); + bots.delete(accountId); + }); + + // Start long polling (this blocks until bot.stop() is called) + console.log("[Telegram] Bot is polling for messages"); + bot.start({ + onStart: () => { + // Already logged above + }, + }); + }, + }, + + outbound: { + async sendText(ctx: DeliveryContext, text: string): Promise { + const bot = bots.get(ctx.accountId); + if (!bot) throw new Error(`No Telegram bot for account ${ctx.accountId}`); + + console.log(`[Telegram] Sending message to chatId=${ctx.conversationId}`); + await bot.api.sendMessage(Number(ctx.conversationId), text); + }, + + async replyText(ctx: DeliveryContext, text: string): Promise { + const bot = bots.get(ctx.accountId); + if (!bot) throw new Error(`No Telegram bot for account ${ctx.accountId}`); + + if (ctx.replyToMessageId) { + console.log(`[Telegram] Sending reply to chatId=${ctx.conversationId} (replyTo=${ctx.replyToMessageId})`); + await bot.api.sendMessage(Number(ctx.conversationId), text, { + reply_to_message_id: Number(ctx.replyToMessageId), + }); + } else { + await telegramChannel.outbound.sendText(ctx, text); + } + }, + }, +};