From 5d63727a047bcce948aec253b1fcc17fb95d4de3 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 15:47:36 +0800 Subject: [PATCH 01/33] feat(channels): add channel plugin system with types, registry, and config Introduces the extensible channel plugin architecture for messaging platform integrations. - ChannelPlugin interface with config, gateway, and outbound adapters - Plugin registry with register/get/list operations - Config loader for ~/.super-multica/channels.json5 Co-Authored-By: Claude Opus 4.6 --- src/channels/config.ts | 30 ++++++++++ src/channels/registry.ts | 31 ++++++++++ src/channels/types.ts | 126 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+) create mode 100644 src/channels/config.ts create mode 100644 src/channels/registry.ts create mode 100644 src/channels/types.ts diff --git a/src/channels/config.ts b/src/channels/config.ts new file mode 100644 index 00000000..0d897e68 --- /dev/null +++ b/src/channels/config.ts @@ -0,0 +1,30 @@ +/** + * Channel configuration loader. + * + * Reads ~/.super-multica/channels.json5 for channel credentials and settings. + */ + +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import JSON5 from "json5"; +import { DATA_DIR } from "../shared/paths.js"; +import type { ChannelsConfig } from "./types.js"; + +export const CHANNELS_CONFIG_PATH = join(DATA_DIR, "channels.json5"); + +/** Load channels config from ~/.super-multica/channels.json5 */ +export function loadChannelsConfig(): ChannelsConfig { + if (!existsSync(CHANNELS_CONFIG_PATH)) { + console.log(`[Channels] No channels.json5 found, skipping`); + return {}; + } + try { + const raw = readFileSync(CHANNELS_CONFIG_PATH, "utf8"); + const config = JSON5.parse(raw) as ChannelsConfig; + console.log(`[Channels] Loaded config from ${CHANNELS_CONFIG_PATH}`); + return config; + } catch (err) { + console.warn(`[Channels] Failed to parse ${CHANNELS_CONFIG_PATH}: ${err}`); + return {}; + } +} diff --git a/src/channels/registry.ts b/src/channels/registry.ts new file mode 100644 index 00000000..1eb27efd --- /dev/null +++ b/src/channels/registry.ts @@ -0,0 +1,31 @@ +/** + * Channel plugin registry. + * + * Simple array + Map registry. Plugins are registered at startup + * via registerChannel() and looked up by ID. + */ + +import type { ChannelPlugin } from "./types.js"; + +const plugins: ChannelPlugin[] = []; +const pluginMap = new Map(); + +/** Register a channel plugin. Throws if ID is already registered. */ +export function registerChannel(plugin: ChannelPlugin): void { + if (pluginMap.has(plugin.id)) { + throw new Error(`Channel plugin "${plugin.id}" is already registered`); + } + plugins.push(plugin); + pluginMap.set(plugin.id, plugin); + console.log(`[Channels] Registered plugin: ${plugin.id}`); +} + +/** Get a registered channel plugin by ID */ +export function getChannel(id: string): ChannelPlugin | undefined { + return pluginMap.get(id); +} + +/** List all registered channel plugins */ +export function listChannels(): readonly ChannelPlugin[] { + return plugins; +} diff --git a/src/channels/types.ts b/src/channels/types.ts new file mode 100644 index 00000000..32f165fb --- /dev/null +++ b/src/channels/types.ts @@ -0,0 +1,126 @@ +/** + * Channel plugin system types. + * + * Each messaging platform (Telegram, Discord, Feishu, etc.) implements the + * ChannelPlugin interface with three adapters: config, gateway, outbound. + */ + +import type { BlockChunkerConfig } from "../hub/block-chunker.js"; + +// ─── Normalized Incoming Message ─── + +/** Platform-agnostic incoming message */ +export interface ChannelMessage { + /** Unique message ID from the platform */ + messageId: string; + /** Conversation ID (group ID or DM chat ID) */ + conversationId: string; + /** Sender identifier on the platform */ + senderId: string; + /** Plain text content */ + text: string; + /** Chat type: "direct" (1:1) or "group" */ + chatType: "direct" | "group"; +} + +// ─── Delivery Context ─── + +/** Context for sending a reply back to a specific conversation */ +export interface DeliveryContext { + /** Channel plugin ID (e.g. "telegram", "discord") */ + channel: string; + /** Account identifier (supports multi-account per channel) */ + accountId: string; + /** Target conversation ID */ + conversationId: string; + /** Original message ID (for reply-style responses) */ + replyToMessageId?: string | undefined; +} + +// ─── Config Adapter ─── + +/** Resolves and validates channel credentials from the config file */ +export interface ChannelConfigAdapter> { + /** List all configured account IDs for this channel */ + listAccountIds(config: ChannelsConfig): string[]; + /** Resolve a specific account's config */ + resolveAccount(config: ChannelsConfig, accountId: string): TAccount | undefined; + /** Check if a given account config has all required credentials */ + isConfigured(account: TAccount): boolean; +} + +// ─── Gateway Adapter ─── + +/** Manages the lifecycle of a channel account connection (receiving messages) */ +export interface ChannelGatewayAdapter { + /** + * Start receiving messages for an account. + * Must respect the AbortSignal for graceful shutdown. + */ + start( + accountId: string, + config: Record, + onMessage: (message: ChannelMessage) => void, + signal: AbortSignal, + ): Promise; +} + +// ─── Outbound Adapter ─── + +/** Sends messages back to the platform */ +export interface ChannelOutboundAdapter { + /** Send a text message to a conversation */ + sendText(ctx: DeliveryContext, text: string): Promise; + /** Reply to a specific message */ + replyText(ctx: DeliveryContext, text: string): Promise; +} + +// ─── Channel Plugin ─── + +/** The main plugin interface. Each channel implements this. */ +export interface ChannelPlugin { + /** Unique channel identifier (e.g. "telegram", "discord", "feishu") */ + readonly id: string; + /** Display metadata */ + readonly meta: { + name: string; + description: string; + }; + /** Optional chunker config override per channel */ + readonly chunkerConfig?: BlockChunkerConfig | undefined; + /** Config resolution adapter */ + readonly config: ChannelConfigAdapter; + /** Connection lifecycle adapter (receive messages) */ + readonly gateway: ChannelGatewayAdapter; + /** Message sending adapter */ + readonly outbound: ChannelOutboundAdapter; +} + +// ─── Channels Config File Shape ─── + +/** + * Shape of ~/.super-multica/channels.json5 + * + * Each top-level key is a channel ID. Under it, each key is an account ID. + * Example: + * { + * telegram: { default: { botToken: "xxx" } }, + * discord: { default: { botToken: "xxx" } }, + * } + */ +export interface ChannelsConfig { + [channelId: string]: { + [accountId: string]: Record; + } | undefined; +} + +// ─── Account State ─── + +export type ChannelAccountStatus = "stopped" | "starting" | "running" | "error"; + +export interface ChannelAccountState { + channelId: string; + accountId: string; + status: ChannelAccountStatus; + error?: string | undefined; +} From 971d68b605e7b5ebecb9bfcd3bfb80fcccf1a25a Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 15:47:41 +0800 Subject: [PATCH 02/33] feat(channels): add ChannelManager and Telegram plugin ChannelManager orchestrates channel lifecycles and routes messages to per-conversation Agents. Telegram plugin uses grammy for long polling with group @mention detection. Co-Authored-By: Claude Opus 4.6 --- src/channels/index.ts | 25 ++++ src/channels/manager.ts | 233 +++++++++++++++++++++++++++++++ src/channels/plugins/telegram.ts | 137 ++++++++++++++++++ 3 files changed, 395 insertions(+) create mode 100644 src/channels/index.ts create mode 100644 src/channels/manager.ts create mode 100644 src/channels/plugins/telegram.ts diff --git a/src/channels/index.ts b/src/channels/index.ts new file mode 100644 index 00000000..3bbdd20f --- /dev/null +++ b/src/channels/index.ts @@ -0,0 +1,25 @@ +/** + * Channel system bootstrap and exports. + */ + +export { ChannelManager } from "./manager.js"; +export { registerChannel, getChannel, listChannels } from "./registry.js"; +export { loadChannelsConfig, CHANNELS_CONFIG_PATH } from "./config.js"; +export type { + ChannelPlugin, + ChannelMessage, + DeliveryContext, + ChannelAccountState, + ChannelsConfig, +} from "./types.js"; + +// Built-in channel plugins +import { registerChannel } from "./registry.js"; +import { telegramChannel } from "./plugins/telegram.js"; + +/** Register all built-in channel plugins. Call once at startup. */ +export function initChannels(): void { + registerChannel(telegramChannel); + // Future: registerChannel(discordChannel); + // Future: registerChannel(feishuChannel); +} diff --git a/src/channels/manager.ts b/src/channels/manager.ts new file mode 100644 index 00000000..82f440cf --- /dev/null +++ b/src/channels/manager.ts @@ -0,0 +1,233 @@ +/** + * Channel Manager — orchestrates channel plugin lifecycles and message routing. + * + * For each configured channel account: + * 1. Starts the gateway adapter (receive messages) + * 2. Routes incoming messages to per-conversation Agents + * 3. Collects Agent responses via MessageAggregator + * 4. Sends responses back via the outbound adapter + * + * Channel is just a messenger — it doesn't manage context or history. + * That's the Agent's job. + */ + +import type { Hub } from "../hub/hub.js"; +import type { + ChannelPlugin, + ChannelMessage, + ChannelAccountState, + DeliveryContext, +} from "./types.js"; +import { listChannels } from "./registry.js"; +import { loadChannelsConfig } from "./config.js"; +import { MessageAggregator, DEFAULT_CHUNKER_CONFIG } from "../hub/message-aggregator.js"; +import type { AsyncAgent } from "../agent/async-agent.js"; + +interface AccountHandle { + channelId: string; + accountId: string; + abortController: AbortController; + state: ChannelAccountState; +} + +export class ChannelManager { + private readonly hub: Hub; + /** Running accounts keyed by "channelId:accountId" */ + private readonly accounts = new Map(); + /** Agents keyed by "channelId:conversationId" for per-conversation isolation */ + private readonly conversationAgents = new Map(); + + constructor(hub: Hub) { + this.hub = hub; + } + + /** Start all configured channel accounts */ + async startAll(): Promise { + console.log("[Channels] Starting all channels..."); + const config = loadChannelsConfig(); + const plugins = listChannels(); + + if (plugins.length === 0) { + console.log("[Channels] No plugins registered"); + return; + } + + for (const plugin of plugins) { + const accountIds = plugin.config.listAccountIds(config); + if (accountIds.length === 0) { + console.log(`[Channels] Skipping ${plugin.id} (not configured)`); + continue; + } + + for (const accountId of accountIds) { + const account = plugin.config.resolveAccount(config, accountId); + if (!account || !plugin.config.isConfigured(account)) { + console.log(`[Channels] Skipping ${plugin.id}:${accountId} (incomplete config)`); + continue; + } + await this.startAccount(plugin.id, accountId, account); + } + } + } + + /** Start a specific channel account */ + private async startAccount( + channelId: string, + accountId: string, + accountConfig: Record, + ): Promise { + const key = `${channelId}:${accountId}`; + if (this.accounts.has(key)) { + console.warn(`[Channels] ${key} is already running`); + return; + } + + const plugin = listChannels().find((p) => p.id === channelId); + if (!plugin) { + console.error(`[Channels] Plugin "${channelId}" not found`); + return; + } + + const abortController = new AbortController(); + const handle: AccountHandle = { + channelId, + accountId, + abortController, + state: { channelId, accountId, status: "starting" }, + }; + this.accounts.set(key, handle); + + console.log(`[Channels] Starting ${key}`); + + try { + // Start gateway — this begins receiving messages + // The promise may resolve immediately (polling started) or stay pending (long-connection) + const startPromise = plugin.gateway.start( + accountId, + accountConfig, + (message: ChannelMessage) => { + this.routeIncoming(plugin, accountId, message); + }, + abortController.signal, + ); + + // Don't await forever — the start() might be long-running (e.g. polling loop) + // Give it a moment to fail fast if credentials are wrong + await Promise.race([ + startPromise, + new Promise((resolve) => setTimeout(resolve, 3000)), + ]); + + handle.state = { channelId, accountId, status: "running" }; + console.log(`[Channels] ${key} is running`); + } catch (err) { + const errorMsg = err instanceof Error ? err.message : String(err); + handle.state = { channelId, accountId, status: "error", error: errorMsg }; + console.error(`[Channels] Failed to start ${key}: ${errorMsg}`); + } + } + + /** Stop all running channel accounts */ + stopAll(): void { + console.log("[Channels] Stopping all channels..."); + for (const [key, handle] of this.accounts) { + handle.abortController.abort(); + handle.state = { ...handle.state, status: "stopped" }; + console.log(`[Channels] Stopped ${key}`); + } + this.accounts.clear(); + this.conversationAgents.clear(); + } + + /** Get or create an Agent for a specific conversation */ + private getOrCreateAgent(channelId: string, conversationId: string): AsyncAgent { + const key = `${channelId}:${conversationId}`; + const existing = this.conversationAgents.get(key); + if (existing && !existing.closed) { + return existing; + } + + const agent = this.hub.createAgent(); + this.conversationAgents.set(key, agent); + return agent; + } + + /** + * Route an incoming message to the appropriate Agent and wire the response + * back to the channel via MessageAggregator. + * + * This is the core bridge logic — generalized for any channel. + */ + private routeIncoming( + plugin: ChannelPlugin, + accountId: string, + message: ChannelMessage, + ): void { + const { conversationId, senderId, text, messageId } = message; + console.log( + `[Channels] Incoming message: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, + ); + + // Find or create Agent for this conversation + const agent = this.getOrCreateAgent(plugin.id, conversationId); + const isNew = !this.conversationAgents.has(`${plugin.id}:${conversationId}`) ? "new" : "existing"; + console.log(`[Channels] Routing to agent: key=${plugin.id}:${conversationId} agentId=${agent.sessionId} (${isNew})`); + + // Build delivery context for outbound replies + const deliveryCtx: DeliveryContext = { + channel: plugin.id, + accountId, + conversationId, + replyToMessageId: messageId, + }; + + // Use channel-specific chunker config or defaults + const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; + + // Create a fresh aggregator for this message's response + const aggregator = new MessageAggregator( + chunkerConfig, + async (block) => { + try { + console.log(`[Channels] Block ${block.index} ready (${block.text.length} chars${block.isFinal ? ", final" : ""}), sending reply`); + if (block.index === 0) { + await plugin.outbound.replyText(deliveryCtx, block.text); + } else { + await plugin.outbound.sendText(deliveryCtx, block.text); + } + if (block.isFinal) { + console.log(`[Channels] Response complete: channel=${plugin.id} conv=${conversationId} blocks=${block.index + 1}`); + } + } catch (err) { + console.error(`[Channels] Failed to send reply: ${err}`); + } + }, + (_event) => { + // Pass-through events (tool_execution, compaction, etc.) + // Could add typing indicators per-channel later + }, + ); + + // Subscribe to agent events BEFORE writing the message + console.log("[Channels] Agent subscribed, sending message to agent"); + const unsubscribe = agent.subscribe((event) => { + aggregator.handleEvent(event); + + // Unsubscribe after the response is complete + if (event.type === "message_end") { + const maybeMessage = (event as { message?: { role?: string } }).message; + if (maybeMessage?.role === "assistant") { + unsubscribe(); + } + } + }); + + // Send user message to the agent + agent.write(text); + } + + /** Get status of all accounts */ + listAccountStates(): ChannelAccountState[] { + return Array.from(this.accounts.values()).map((h) => ({ ...h.state })); + } +} diff --git a/src/channels/plugins/telegram.ts b/src/channels/plugins/telegram.ts new file mode 100644 index 00000000..dde00244 --- /dev/null +++ b/src/channels/plugins/telegram.ts @@ -0,0 +1,137 @@ +/** + * Telegram channel plugin. + * + * Uses grammy to connect to Telegram Bot API via long polling. + * - Private chats: all messages are processed + * - Group chats: only messages that @mention the bot or reply to the bot + */ + +import { Bot } from "grammy"; +import type { ChannelPlugin, ChannelMessage, ChannelConfigAdapter, ChannelsConfig, DeliveryContext } from "../types.js"; + +/** Telegram account config shape */ +interface TelegramAccountConfig { + botToken: string; +} + +/** Keep bot instances per account for outbound use */ +const bots = new Map(); + +export const telegramChannel: ChannelPlugin = { + id: "telegram", + meta: { + name: "Telegram", + description: "Telegram bot integration via long polling", + }, + + config: { + listAccountIds(config: ChannelsConfig): string[] { + const section = config["telegram"]; + return section ? Object.keys(section) : []; + }, + + resolveAccount(config: ChannelsConfig, accountId: string): Record | undefined { + return config["telegram"]?.[accountId]; + }, + + isConfigured(account: Record): boolean { + return Boolean((account as unknown as TelegramAccountConfig).botToken); + }, + } satisfies ChannelConfigAdapter, + + gateway: { + async start( + accountId: string, + config: Record, + onMessage: (message: ChannelMessage) => void, + signal: AbortSignal, + ): Promise { + const { botToken } = config as unknown as TelegramAccountConfig; + + const bot = new Bot(botToken); + bots.set(accountId, bot); + + // Get bot info for mention detection + const botInfo = await bot.api.getMe(); + const botUsername = botInfo.username; + console.log(`[Telegram] Starting bot: @${botUsername}`); + + // Handle text messages + bot.on("message:text", (ctx) => { + const msg = ctx.message; + const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; + + // In groups, only respond if bot is mentioned or replied to + if (isGroup) { + const isMentioned = msg.entities?.some( + (e) => + e.type === "mention" && + msg.text.substring(e.offset, e.offset + e.length).toLowerCase() === `@${botUsername?.toLowerCase()}`, + ); + const isReplyToBot = msg.reply_to_message?.from?.is_bot === true; + + if (!isMentioned && !isReplyToBot) { + return; // Ignore group messages not directed at bot + } + console.log(`[Telegram] Received message: chatId=${msg.chat.id} from=${msg.from?.id} type=group text="${msg.text.slice(0, 50)}"`); + } else { + console.log(`[Telegram] Received message: chatId=${msg.chat.id} from=${msg.from?.id} type=direct text="${msg.text.slice(0, 50)}"`); + } + + // Strip @mention from text for cleaner agent input + let text = msg.text; + if (botUsername) { + text = text.replace(new RegExp(`@${botUsername}\\s*`, "gi"), "").trim(); + } + if (!text) return; + + onMessage({ + messageId: String(msg.message_id), + conversationId: String(msg.chat.id), + senderId: String(msg.from?.id ?? "unknown"), + text, + chatType: isGroup ? "group" : "direct", + }); + }); + + // Graceful shutdown on abort + signal.addEventListener("abort", () => { + console.log("[Telegram] Bot stopped"); + bot.stop(); + bots.delete(accountId); + }); + + // Start long polling (this blocks until bot.stop() is called) + console.log("[Telegram] Bot is polling for messages"); + bot.start({ + onStart: () => { + // Already logged above + }, + }); + }, + }, + + outbound: { + async sendText(ctx: DeliveryContext, text: string): Promise { + const bot = bots.get(ctx.accountId); + if (!bot) throw new Error(`No Telegram bot for account ${ctx.accountId}`); + + console.log(`[Telegram] Sending message to chatId=${ctx.conversationId}`); + await bot.api.sendMessage(Number(ctx.conversationId), text); + }, + + async replyText(ctx: DeliveryContext, text: string): Promise { + const bot = bots.get(ctx.accountId); + if (!bot) throw new Error(`No Telegram bot for account ${ctx.accountId}`); + + if (ctx.replyToMessageId) { + console.log(`[Telegram] Sending reply to chatId=${ctx.conversationId} (replyTo=${ctx.replyToMessageId})`); + await bot.api.sendMessage(Number(ctx.conversationId), text, { + reply_to_message_id: Number(ctx.replyToMessageId), + }); + } else { + await telegramChannel.outbound.sendText(ctx, text); + } + }, + }, +}; From 9922355d0c5555a62fca91f86239901e6c88261e Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 15:47:47 +0800 Subject: [PATCH 03/33] feat(hub): integrate ChannelManager into Hub lifecycle Wire up channel system in Hub constructor and shutdown. Add grammy dependency for Telegram bot support. Co-Authored-By: Claude Opus 4.6 --- package.json | 6 +++++- pnpm-lock.yaml | 26 ++++++++++++++++++++++++-- src/hub/hub.ts | 13 +++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 38c7ec1a..e5d4d863 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,10 @@ "license": "ISC", "packageManager": "pnpm@10.28.2", "pnpm": { - "onlyBuiltDependencies": ["electron", "esbuild"] + "onlyBuiltDependencies": [ + "electron", + "esbuild" + ] }, "devDependencies": { "@types/node": "catalog:", @@ -58,6 +61,7 @@ "@nestjs/websockets": "^11.1.12", "@sinclair/typebox": "^0.34.41", "fast-glob": "^3.3.3", + "grammy": "^1.39.3", "json5": "^2.2.3", "linkedom": "^0.18.12", "nestjs-pino": "^4.5.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4e60a83f..a63c92f0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -71,6 +71,9 @@ importers: fast-glob: specifier: ^3.3.3 version: 3.3.3 + grammy: + specifier: ^1.39.3 + version: 1.39.3 json5: specifier: ^2.2.3 version: 2.2.3 @@ -1829,6 +1832,9 @@ packages: '@modelcontextprotocol/sdk': optional: true + '@grammyjs/types@3.23.0': + resolution: {integrity: sha512-D3jQ4UWERPsyR3op/YFudMMIPNTU47vy7L51uO9/73tMELmjO/+LX5N36/Y0CG5IQfIsz43MxiHI5rgsK0/k+g==} + '@hono/node-server@1.19.9': resolution: {integrity: sha512-vHL6w3ecZsky+8P5MD+eFfaGTyCeOHUIFYMGpQGbrBTSmNNoxv0if69rEZ5giu36weC5saFuznL411gRX7bJDw==} engines: {node: '>=18.14.1'} @@ -5741,6 +5747,10 @@ packages: graceful-fs@4.2.11: resolution: {integrity: sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==} + grammy@1.39.3: + resolution: {integrity: sha512-7arRRoOtOh9UwMwANZ475kJrWV6P3/EGNooeHlY0/SwZv4t3ZZ3Uiz9cAXK8Zg9xSdgmm8T21kx6n7SZaWvOcw==} + engines: {node: ^12.20.0 || >=14.13.1} + graphemer@1.4.0: resolution: {integrity: sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag==} @@ -11181,6 +11191,8 @@ snapshots: - supports-color - utf-8-validate + '@grammyjs/types@3.23.0': {} + '@hono/node-server@1.19.9(hono@4.11.7)': dependencies: hono: 4.11.7 @@ -14935,7 +14947,7 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-module-utils@2.12.1(@typescript-eslint/parser@8.54.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)): + eslint-module-utils@2.12.1(@typescript-eslint/parser@8.54.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)): dependencies: debug: 3.2.7 optionalDependencies: @@ -14966,7 +14978,7 @@ snapshots: doctrine: 2.1.0 eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.54.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) + eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.54.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) hasown: 2.0.2 is-core-module: 2.16.1 is-glob: 4.0.3 @@ -15881,6 +15893,16 @@ snapshots: graceful-fs@4.2.11: {} + grammy@1.39.3: + dependencies: + '@grammyjs/types': 3.23.0 + abort-controller: 3.0.0 + debug: 4.4.3 + node-fetch: 2.7.0 + transitivePeerDependencies: + - encoding + - supports-color + graphemer@1.4.0: {} graphql@16.12.0: {} diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 8ba68d64..2bee8078 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -30,6 +30,7 @@ import { evaluateCommandSafety, requiresApproval } from "../agent/tools/exec-saf import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/tools/exec-allowlist.js"; import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js"; import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js"; +import { ChannelManager, initChannels } from "../channels/index.js"; export class Hub { private readonly agents = new Map(); @@ -43,6 +44,7 @@ export class Hub { readonly deviceStore: DeviceStore; private _onConfirmDevice: ((deviceId: string, agentId: string, meta?: DeviceMeta) => Promise) | null = null; private _stateChangeListeners: ((state: ConnectionState) => void)[] = []; + readonly channelManager: ChannelManager; url: string; readonly path: string; readonly hubId: string; @@ -103,6 +105,14 @@ export class Hub { this.client = this.createClient(this.url); this.client.connect(); this.restoreAgents(); + + // Initialize channel plugin system + console.log("[Hub] Initializing channel system..."); + initChannels(); + this.channelManager = new ChannelManager(this); + void this.channelManager.startAll().then(() => { + console.log("[Hub] Channel system started"); + }); } /** Restore agents from persistent storage */ @@ -508,6 +518,9 @@ export class Hub { } shutdown(): void { + // Stop all channel connections + this.channelManager.stopAll(); + // Finalize subagent registry before closing agents shutdownSubagentRegistry(); From 24df8b02e7ec2616a2b53a15ed8d3441dc10ab3a Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Fri, 6 Feb 2026 15:48:29 +0800 Subject: [PATCH 04/33] fix(agent): reload credentials dynamically --- src/agent/credentials.ts | 44 +++++++++++++++++--- src/agent/providers/registry.ts | 2 +- src/agent/runner.ts | 71 ++++++++++++++++++++++++++++++--- 3 files changed, 106 insertions(+), 11 deletions(-) diff --git a/src/agent/credentials.ts b/src/agent/credentials.ts index 223798e6..5e553f73 100644 --- a/src/agent/credentials.ts +++ b/src/agent/credentials.ts @@ -1,4 +1,4 @@ -import { existsSync, readFileSync, writeFileSync, mkdirSync } from "node:fs"; +import { existsSync, readFileSync, writeFileSync, mkdirSync, statSync } from "node:fs"; import { join, dirname } from "node:path"; import { homedir } from "node:os"; import JSON5 from "json5"; @@ -90,6 +90,8 @@ export class CredentialManager { private coreConfig: CredentialsConfig | null = null; private skillsConfig: SkillsEnvConfig | null = null; private resolvedSkillsEnv: Record | null = null; + private coreMtimeMs: number | null = null; + private skillsMtimeMs: number | null = null; private isDisabled(): boolean { if (process.env.SMC_CREDENTIALS_DISABLE === "1") return true; @@ -99,17 +101,32 @@ export class CredentialManager { private loadCore(): void { const path = getCredentialsPath(); const disabled = this.isDisabled(); + let mtimeMs: number | null = null; - if (this.corePath === path && this.disabledState === disabled && this.coreConfig) { + if (!disabled && existsSync(path)) { + try { + mtimeMs = statSync(path).mtimeMs; + } catch { + mtimeMs = null; + } + } + + if ( + this.corePath === path + && this.disabledState === disabled + && this.coreConfig + && this.coreMtimeMs === mtimeMs + ) { return; } this.corePath = path; this.disabledState = disabled; this.coreConfig = null; + this.coreMtimeMs = mtimeMs; if (disabled) return; - if (!existsSync(path)) return; + if (mtimeMs === null) return; const raw = readFileSync(path, "utf8"); try { @@ -123,8 +140,22 @@ export class CredentialManager { private loadSkillsEnv(): void { const path = getSkillsEnvPath(); const disabled = this.isDisabled(); + let mtimeMs: number | null = null; - if (this.skillsPath === path && this.disabledState === disabled && this.resolvedSkillsEnv) { + if (!disabled && existsSync(path)) { + try { + mtimeMs = statSync(path).mtimeMs; + } catch { + mtimeMs = null; + } + } + + if ( + this.skillsPath === path + && this.disabledState === disabled + && this.resolvedSkillsEnv + && this.skillsMtimeMs === mtimeMs + ) { return; } @@ -132,9 +163,10 @@ export class CredentialManager { this.disabledState = disabled; this.skillsConfig = null; this.resolvedSkillsEnv = null; + this.skillsMtimeMs = mtimeMs; if (disabled) return; - if (!existsSync(path)) return; + if (mtimeMs === null) return; const raw = readFileSync(path, "utf8"); try { @@ -228,6 +260,8 @@ export class CredentialManager { this.coreConfig = null; this.skillsConfig = null; this.resolvedSkillsEnv = null; + this.coreMtimeMs = null; + this.skillsMtimeMs = null; } /** diff --git a/src/agent/providers/registry.ts b/src/agent/providers/registry.ts index cd406482..2ea1f93a 100644 --- a/src/agent/providers/registry.ts +++ b/src/agent/providers/registry.ts @@ -264,7 +264,7 @@ export function getLoginInstructions(providerId: string): string { if (info.authMethod === "oauth") { if (info.loginCommand) { - return `Run: ${info.loginCommand}\nThen restart Super Multica to use the credentials.`; + return `Run: ${info.loginCommand}\nThen retry in Super Multica to use the credentials.`; } } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 65fd528b..9b732e91 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -95,6 +95,7 @@ export class Agent { private profileCandidates: string[]; private profileIndex: number; private readonly pinnedProfile: boolean; + private readonly explicitApiKey: boolean; /** Current session ID */ readonly sessionId: string; @@ -126,6 +127,7 @@ export class Agent { // === Auth profile resolution === this.pinnedProfile = !!(options.authProfileId || options.apiKey); + this.explicitApiKey = !!options.apiKey; if (options.apiKey) { // Explicit API key — no rotation @@ -159,11 +161,9 @@ export class Agent { : 0; } - this.agent = new PiAgentCore( - this.currentApiKey - ? { getApiKey: (_provider: string) => this.currentApiKey! } - : {}, - ); + this.agent = new PiAgentCore({ + getApiKey: (_provider: string) => this.currentApiKey ?? "", + }); // Load Agent Profile (if profileId is specified) // Every Agent should have a Profile for memory, tools config, and other settings @@ -354,6 +354,10 @@ export class Agent { async run(prompt: string): Promise { await this.ensureInitialized(); + this.refreshAuthState(); + if (!this.currentApiKey) { + throw new Error(`No API key configured for provider: ${this.resolvedProvider}`); + } this.output.state.lastAssistantText = ""; const canRotate = !this.pinnedProfile && this.profileCandidates.length > 1; @@ -433,12 +437,67 @@ export class Agent { this.currentApiKey = apiKey; this.currentProfileId = candidateId; this.profileIndex = nextIndex; + this.updateSessionApiKey(); return true; } return false; } + private refreshAuthState(): void { + if (this.explicitApiKey) { + return; + } + + const store = loadAuthProfileStore(); + + if (this.pinnedProfile) { + const profileId = this.currentProfileId ?? this.resolvedProvider; + this.currentApiKey = resolveApiKeyForProfile(profileId) ?? resolveApiKey(this.resolvedProvider); + this.currentProfileId = profileId; + this.profileCandidates = []; + this.profileIndex = 0; + this.updateSessionApiKey(); + return; + } + + const candidates = resolveAuthProfileOrder(this.resolvedProvider, store); + this.profileCandidates = candidates; + + if (this.currentProfileId) { + const currentIndex = candidates.indexOf(this.currentProfileId); + if (currentIndex >= 0) { + const stats = store.usageStats?.[this.currentProfileId]; + if (!stats || !isProfileInCooldown(stats)) { + const apiKey = resolveApiKeyForProfile(this.currentProfileId); + if (apiKey) { + this.currentApiKey = apiKey; + this.profileIndex = currentIndex; + this.updateSessionApiKey(); + return; + } + } + } + } + + const resolved = resolveApiKeyForProvider(this.resolvedProvider); + if (resolved) { + this.currentApiKey = resolved.apiKey; + this.currentProfileId = resolved.profileId; + this.profileIndex = Math.max(0, candidates.indexOf(resolved.profileId)); + } else { + this.currentApiKey = undefined; + this.currentProfileId = undefined; + this.profileIndex = 0; + } + this.updateSessionApiKey(); + } + + private updateSessionApiKey(): void { + if (this.session.getCompactionMode() !== "summary") return; + this.session.setApiKey(this.currentApiKey); + } + private handleSessionEvent(event: AgentEvent) { if (event.type === "message_end") { const message = event.message as AgentMessage; @@ -680,6 +739,8 @@ export class Agent { throw new Error(`No API key configured for provider: ${providerId}`); } + this.updateSessionApiKey(); + // Update the agent's model and API key const baseUrl = resolveBaseUrl(actualProvider); const modelWithBaseUrl = baseUrl ? { ...model, baseUrl } : model; From 112ae6cac964653caf641562b9b048da9b933a16 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 16:00:44 +0800 Subject: [PATCH 05/33] refactor(channels): read config from credentials.json5 instead of separate file Move channel configuration into the existing credentials.json5 under a `channels` section, matching OpenClaw's single-config-file approach. Co-Authored-By: Claude Opus 4.6 --- src/agent/credentials.ts | 8 ++++++++ src/channels/config.ts | 28 +++++++++------------------- src/channels/index.ts | 2 +- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/agent/credentials.ts b/src/agent/credentials.ts index 223798e6..5abeca35 100644 --- a/src/agent/credentials.ts +++ b/src/agent/credentials.ts @@ -31,6 +31,8 @@ export type CredentialsConfig = { order?: Record | undefined; } | undefined; tools?: Record | undefined; + /** Channel plugin configs (telegram, discord, etc.) */ + channels?: Record> | undefined> | undefined; }; type SkillsEnvConfig = { @@ -217,6 +219,12 @@ export class CredentialManager { ); } + /** Get channel plugin configs from credentials.json5 `channels` section. */ + getChannelsConfig(): Record> | undefined> { + this.loadCore(); + return this.coreConfig?.channels ?? {}; + } + getResolvedEnvSnapshot(): Record { return { ...this.getResolvedSkillsEnv() }; } diff --git a/src/channels/config.ts b/src/channels/config.ts index 0d897e68..94b49e1e 100644 --- a/src/channels/config.ts +++ b/src/channels/config.ts @@ -1,30 +1,20 @@ /** * Channel configuration loader. * - * Reads ~/.super-multica/channels.json5 for channel credentials and settings. + * Reads the `channels` section from ~/.super-multica/credentials.json5. */ -import { existsSync, readFileSync } from "node:fs"; -import { join } from "node:path"; -import JSON5 from "json5"; -import { DATA_DIR } from "../shared/paths.js"; +import { credentialManager } from "../agent/credentials.js"; import type { ChannelsConfig } from "./types.js"; -export const CHANNELS_CONFIG_PATH = join(DATA_DIR, "channels.json5"); - -/** Load channels config from ~/.super-multica/channels.json5 */ +/** Load channels config from credentials.json5 `channels` section */ export function loadChannelsConfig(): ChannelsConfig { - if (!existsSync(CHANNELS_CONFIG_PATH)) { - console.log(`[Channels] No channels.json5 found, skipping`); - return {}; - } - try { - const raw = readFileSync(CHANNELS_CONFIG_PATH, "utf8"); - const config = JSON5.parse(raw) as ChannelsConfig; - console.log(`[Channels] Loaded config from ${CHANNELS_CONFIG_PATH}`); - return config; - } catch (err) { - console.warn(`[Channels] Failed to parse ${CHANNELS_CONFIG_PATH}: ${err}`); + const channels = credentialManager.getChannelsConfig(); + const keys = Object.keys(channels); + if (keys.length === 0) { + console.log("[Channels] No channels configured in credentials.json5, skipping"); return {}; } + console.log(`[Channels] Loaded config for: ${keys.join(", ")}`); + return channels; } diff --git a/src/channels/index.ts b/src/channels/index.ts index 3bbdd20f..63ecc950 100644 --- a/src/channels/index.ts +++ b/src/channels/index.ts @@ -4,7 +4,7 @@ export { ChannelManager } from "./manager.js"; export { registerChannel, getChannel, listChannels } from "./registry.js"; -export { loadChannelsConfig, CHANNELS_CONFIG_PATH } from "./config.js"; +export { loadChannelsConfig } from "./config.js"; export type { ChannelPlugin, ChannelMessage, From 287e6d5c4f920f029fbac90da14b6db756dd4f0e Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 16:07:27 +0800 Subject: [PATCH 06/33] fix(channels): catch telegram bot polling errors gracefully Handle 409 conflict (another bot instance running) with a clear error message instead of an unhandled promise rejection. Co-Authored-By: Claude Opus 4.6 --- src/channels/plugins/telegram.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/channels/plugins/telegram.ts b/src/channels/plugins/telegram.ts index dde00244..9e2c4dce 100644 --- a/src/channels/plugins/telegram.ts +++ b/src/channels/plugins/telegram.ts @@ -101,12 +101,20 @@ export const telegramChannel: ChannelPlugin = { bots.delete(accountId); }); - // Start long polling (this blocks until bot.stop() is called) + // Start long polling (fire-and-forget, errors are caught here) console.log("[Telegram] Bot is polling for messages"); bot.start({ onStart: () => { // Already logged above }, + }).catch((err: unknown) => { + const msg = err instanceof Error ? err.message : String(err); + if (msg.includes("409") || msg.includes("Conflict")) { + console.error(`[Telegram] Bot conflict: another instance is already polling with this token. Stop the other process and restart.`); + } else { + console.error(`[Telegram] Bot polling error: ${msg}`); + } + bots.delete(accountId); }); }, }, From ee95102613c931c45cf88ea1ad51c8e402b0a9bd Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 17:13:43 +0800 Subject: [PATCH 07/33] docs(channel): add OpenClaw channel system source code research MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Comprehensive research document covering OpenClaw's channel architecture, plugin system, message flow (inbound → agent → outbound), Telegram integration details, routing/session management, and security model. Co-Authored-By: Claude Opus 4.6 --- docs/channel/openclaw-research.md | 1186 +++++++++++++++++++++++++++++ 1 file changed, 1186 insertions(+) create mode 100644 docs/channel/openclaw-research.md diff --git a/docs/channel/openclaw-research.md b/docs/channel/openclaw-research.md new file mode 100644 index 00000000..c804d5ef --- /dev/null +++ b/docs/channel/openclaw-research.md @@ -0,0 +1,1186 @@ +# OpenClaw Channel 系统源码调研 + +> 源码位置: `~/Desktop/参考项目/openclaw` +> +> 调研目的: 深入理解 OpenClaw 的 Channel 架构、消息流转机制、第三方渠道集成模式,为 Super Multica 的 Channel 系统设计提供参考。 + +--- + +## 目录 + +1. [项目整体结构](#1-项目整体结构) +2. [Channel 插件体系架构](#2-channel-插件体系架构) +3. [核心类型定义](#3-核心类型定义) +4. [插件发现与加载机制](#4-插件发现与加载机制) +5. [路由与会话管理](#5-路由与会话管理) +6. [消息流转: 正常发消息 → AI 回复 全链路](#6-消息流转-正常发消息--ai-回复-全链路) +7. [第三方渠道集成: Telegram 完整流程](#7-第三方渠道集成-telegram-完整流程) +8. [Outbound 投递管线](#8-outbound-投递管线) +9. [安全与访问控制](#9-安全与访问控制) +10. [Channel Manager 生命周期管理](#10-channel-manager-生命周期管理) +11. [设计亮点与可借鉴之处](#11-设计亮点与可借鉴之处) +12. [关键文件索引](#12-关键文件索引) + +--- + +## 1. 项目整体结构 + +``` +openclaw/ +├── src/ # 核心模块 +│ ├── channels/ # Channel 插件系统 (类型、注册表、工具函数) +│ │ ├── registry.ts # 内置 Channel 元信息注册表 +│ │ ├── plugins/ # 插件类型定义与加载 +│ │ │ ├── types.core.ts # 基础类型 (ChannelId, ChannelMeta, ChannelCapabilities) +│ │ │ ├── types.adapters.ts # Adapter 接口 (Config, Outbound, Gateway, Security...) +│ │ │ ├── types.plugin.ts # ChannelPlugin 顶层接口 +│ │ │ ├── catalog.ts # 插件发现与目录管理 +│ │ │ └── load.ts # 插件加载 (带缓存) +│ │ ├── mention-gating.ts # 群组 @提及 门控逻辑 +│ │ ├── sender-identity.ts # 发送者身份验证 +│ │ ├── chat-type.ts # 聊天类型标准化 (direct/group/channel/thread) +│ │ └── ack-reactions.ts # ACK 表情反应 +│ ├── telegram/ # Telegram 内置实现 +│ │ ├── monitor.ts # 长轮询/Webhook 启动入口 +│ │ ├── webhook.ts # HTTP Webhook 服务器 +│ │ ├── bot.ts # Grammy Bot 创建与中间件编排 +│ │ ├── bot-handlers.ts # 消息/回调/反应处理器注册 +│ │ ├── bot-message.ts # 消息处理器工厂 +│ │ ├── bot-message-context.ts # Inbound 上下文构建 (路由、安全、信封) +│ │ ├── bot-message-dispatch.ts # 调度到 Agent 并处理流式回复 +│ │ ├── bot/delivery.ts # 回复投递 (文本分块、媒体、线程) +│ │ └── send.ts # 独立 Outbound 发送函数 +│ ├── routing/ # 消息路由与会话管理 +│ │ ├── resolve-route.ts # Agent 路由解析 (binding 匹配) +│ │ ├── bindings.ts # 路由绑定配置读取 +│ │ └── session-key.ts # 会话 Key 构建 (DM/Group/Thread) +│ ├── plugins/ # 通用插件系统 +│ │ ├── registry.ts # 插件注册表 (工具/钩子/Channel/Provider) +│ │ ├── runtime.ts # 全局插件注册表单例 (Symbol-based) +│ │ ├── loader.ts # 插件加载器 (jiti + discovery) +│ │ └── discovery.ts # 插件发现 +│ ├── infra/outbound/ # Outbound 投递基础设施 +│ │ ├── deliver.ts # 主投递编排 +│ │ ├── payloads.ts # Payload 标准化 +│ │ ├── channel-selection.ts # 多 Channel 选择 +│ │ └── target-resolver.ts # 目标解析 (带缓存) +│ ├── auto-reply/ # Agent 回复管线 +│ │ ├── dispatch.ts # 入站消息调度 +│ │ ├── reply/ # 回复生成 +│ │ │ ├── dispatch-from-config.ts # 核心回复流程 +│ │ │ └── get-reply.ts # LLM 调用 +│ │ ├── types.ts # ReplyPayload, GetReplyOptions +│ │ └── envelope.ts # 消息信封格式化 +│ ├── gateway/ # WebSocket 网关 +│ │ └── server-channels.ts # ChannelManager 生命周期管理 +│ └── config/ # 配置类型 +│ ├── types.channels.ts # Channel 配置汇总 +│ └── types.telegram.ts # Telegram 专属配置 +├── extensions/ # 33+ 外部插件 +│ ├── telegram/ # Telegram Channel 插件 +│ │ ├── index.ts # 插件入口 (register) +│ │ └── src/ +│ │ ├── channel.ts # ChannelPlugin 完整实现 +│ │ └── runtime.ts # 全局 Runtime +│ ├── discord/ # Discord Channel 插件 +│ ├── slack/ # Slack Channel 插件 +│ ├── signal/ # Signal Channel 插件 +│ └── ... # 更多渠道 +└── apps/ # Web/Desktop 应用 +``` + +--- + +## 2. Channel 插件体系架构 + +OpenClaw 采用**插件化 Adapter 模式**来统一所有 Channel 的接入。每个 Channel 实现一个 `ChannelPlugin` 合约,包含多个可选的 Adapter: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ ChannelPlugin │ +├─────────┬──────────┬──────────┬──────────┬─────────────────┤ +│ config │ outbound │ gateway │ security │ 其他 Adapter │ +│ │ │ │ │ │ +│ 账号管理 │ 消息发送 │ 生命周期 │ 访问控制 │ groups, mentions │ +│ 启用检查 │ 媒体发送 │ start │ DM策略 │ directory, status │ +│ 配置描述 │ 目标解析 │ stop │ 告警收集 │ actions, threading│ +│ │ 文本分块 │ QR登录 │ │ heartbeat, setup │ +└─────────┴──────────┴──────────┴──────────┴─────────────────┘ +``` + +### ChannelPlugin 接口 + +```typescript +// src/channels/plugins/types.plugin.ts +type ChannelPlugin = { + id: ChannelId; // "telegram" | "discord" | ... + meta: ChannelMeta; // 元信息 (标签、图标、文档路径) + capabilities: ChannelCapabilities; // 能力声明 (chatTypes, reactions, threads...) + + // --- 必选 Adapter --- + config: ChannelConfigAdapter; // 账号配置管理 + + // --- 可选 Adapter --- + outbound?: ChannelOutboundAdapter; // 消息发送 + gateway?: ChannelGatewayAdapter; // 生命周期 (启动/停止/QR登录) + security?: ChannelSecurityAdapter; // DM安全策略 + setup?: ChannelSetupAdapter; // 初始化配置 + groups?: ChannelGroupAdapter; // 群组行为 + mentions?: ChannelMentionAdapter; // @提及处理 + status?: ChannelStatusAdapter; // 状态监控 + directory?: ChannelDirectoryAdapter; // 联系人/群组目录 + actions?: ChannelMessageActionAdapter; // 消息动作 (反应、按钮、卡片) + threading?: ChannelThreadingAdapter; // 线程处理 + streaming?: ChannelStreamingAdapter; // 流式输出 + messaging?: ChannelMessagingAdapter; // 目标格式化 + auth?: ChannelAuthAdapter; // 认证 + heartbeat?: ChannelHeartbeatAdapter; // 心跳检测 + pairing?: ChannelPairingAdapter; // 配对/白名单 + elevated?: ChannelElevatedAdapter; // 提权 + commands?: ChannelCommandAdapter; // 命令控制 + agentPrompt?: ChannelAgentPromptAdapter; // Agent 提示词 + resolver?: ChannelResolverAdapter; // 目标解析 + agentTools?: ChannelAgentToolFactory; // Channel 自带的 Agent 工具 +}; +``` + +### Adapter 职责一览 + +| Adapter | 职责 | 关键方法 | +|---------|------|---------| +| **config** | 账号管理 | `listAccountIds`, `resolveAccount`, `isConfigured`, `isEnabled` | +| **outbound** | 消息发送 | `sendText`, `sendMedia`, `sendPayload`, `resolveTarget` | +| **gateway** | 生命周期 | `startAccount`, `stopAccount`, `loginWithQrStart`, `loginWithQrWait` | +| **security** | 访问控制 | `resolveDmPolicy`, `collectWarnings` | +| **groups** | 群组行为 | `resolveRequireMention`, `resolveToolPolicy` | +| **status** | 状态监控 | `probeAccount`, `auditAccount`, `buildAccountSnapshot` | +| **directory** | 目录查询 | `listPeers`, `listGroups`, `listGroupMembers` | +| **actions** | 消息交互 | `handleAction` (reactions, buttons, cards, polls) | +| **mentions** | @提及 | `stripMentions`, `stripPatterns` | +| **setup** | 初始化 | `applyAccountConfig`, `validateInput` | +| **pairing** | 配对 | `normalizeAllowEntry`, `notifyApproval` | + +--- + +## 3. 核心类型定义 + +### ChannelCapabilities — 渠道能力声明 + +```typescript +// src/channels/plugins/types.core.ts +type ChannelCapabilities = { + chatTypes: Array<"direct" | "group" | "channel" | "thread">; + polls?: boolean; // 原生投票 + reactions?: boolean; // 表情反应 + edit?: boolean; // 编辑消息 + unsend?: boolean; // 撤回消息 + reply?: boolean; // 引用回复 + threads?: boolean; // 线程支持 + media?: boolean; // 媒体支持 + nativeCommands?: boolean; // 原生命令 (如 Telegram /start) + blockStreaming?: boolean; // 流式输出聚合 +}; +``` + +### ChannelMeta — 渠道元信息 + +```typescript +type ChannelMeta = { + id: ChannelId; + label: string; // "Telegram" + selectionLabel: string; // "Telegram (Bot API)" + detailLabel?: string; // "Telegram Bot" + docsPath: string; // "/channels/telegram" + blurb: string; // 简介 + systemImage?: string; // SF Symbol 图标名 + aliases?: string[]; // 别名 + order?: number; // 排序权重 + // ... +}; +``` + +### ChannelAccountSnapshot — 账号运行时快照 + +```typescript +type ChannelAccountSnapshot = { + accountId: string; + name?: string; + enabled?: boolean; + configured?: boolean; + running?: boolean; + connected?: boolean; + lastConnectedAt?: number | null; + lastMessageAt?: number | null; + lastError?: string | null; + lastStartAt?: number | null; + lastStopAt?: number | null; + dmPolicy?: string; + allowFrom?: string[]; + // ... +}; +``` + +### ReplyPayload — Agent 回复载荷 + +```typescript +// src/auto-reply/types.ts +type ReplyPayload = { + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; + replyToId?: string; + audioAsVoice?: boolean; + isError?: boolean; + channelData?: Record; +}; +``` + +--- + +## 4. 插件发现与加载机制 + +### 4.1 插件发现 + +``` +发现源 (优先级从高到低): + 1. config — 配置文件指定的路径 (plugins.load.paths) + 2. workspace — 项目本地 extensions/ + 3. global — ~/.super-multica/extensions/ + 4. bundled — 内置 extensions/ +``` + +每个插件目录需包含 `openclaw.plugin.json` 清单文件,声明插件 ID、名称、类型、配置 Schema 等。 + +### 4.2 插件加载流程 + +``` +loadOpenClawPlugins(options) + │ + ├─ normalizePluginsConfig() // 处理 allow/deny 列表 + ├─ 检查缓存 (cacheKey = workspace + plugins config) + │ + ├─ discoverOpenClawPlugins() // 扫描插件候选 + ├─ loadPluginManifestRegistry() // 加载清单文件 + │ + ├─ for each candidate: + │ ├─ 检查启用/禁用状态 + │ ├─ 验证配置 Schema (JSON Schema) + │ ├─ jiti(candidate.source) // 使用 jiti 动态加载 TypeScript + │ ├─ 解析 module export (default export or register/activate) + │ ├─ createApi(record, config) // 创建插件 API + │ └─ register(api) // 调用插件注册函数 + │ ├─ api.registerChannel({ plugin: channelPlugin }) + │ ├─ api.registerTool(tool) + │ ├─ api.registerHook(events, handler) + │ └─ api.registerProvider(provider) + │ + ├─ setActivePluginRegistry(registry) // 设置为全局活跃注册表 + └─ return registry +``` + +### 4.3 插件注册表 (全局单例) + +```typescript +// src/plugins/runtime.ts +const REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState"); + +// 使用 Symbol 确保跨模块共享同一个注册表实例 +type RegistryState = { + registry: PluginRegistry | null; + key: string | null; +}; + +export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: string); +export function getActivePluginRegistry(): PluginRegistry | null; +export function requireActivePluginRegistry(): PluginRegistry; +``` + +### 4.4 Telegram 插件注册示例 + +```typescript +// extensions/telegram/index.ts +const plugin = { + id: "telegram", + name: "Telegram", + register(api: OpenClawPluginApi) { + setTelegramRuntime(api.runtime); // 保存全局 Runtime + api.registerChannel({ plugin: telegramPlugin }); // 注册 Channel 插件 + }, +}; +export default plugin; +``` + +--- + +## 5. 路由与会话管理 + +### 5.1 路由解析 (Binding 匹配) + +当一条消息进入系统时,需要确定由哪个 Agent 处理。OpenClaw 使用 **Binding 优先级匹配**: + +```typescript +// src/routing/resolve-route.ts +function resolveAgentRoute(input: ResolveAgentRouteInput): ResolvedAgentRoute { + // input: { cfg, channel, accountId, peer, parentPeer, guildId, teamId } + + // 1. 过滤出匹配 channel + accountId 的 bindings + const bindings = listBindings(cfg).filter(b => + matchesChannel(b.match, channel) && matchesAccountId(b.match?.accountId, accountId) + ); + + // 2. 按优先级匹配 + // peer (DM/群组精确匹配) → parentPeer (线程父级继承) + // → guild (Discord服务器) → team (MS Teams团队) + // → account (账号级别) → channel (渠道级别) → default (默认Agent) + + // 3. 返回结果 + return { + agentId: "assistant", + channel: "telegram", + accountId: "default", + sessionKey: "agent:assistant:peer:telegram:default:dm:123456", + mainSessionKey: "agent:assistant:main", + matchedBy: "binding.peer", // 调试信息 + }; +} +``` + +### 5.2 Binding 配置 + +```typescript +// 配置文件中的 bindings 数组 +type AgentBinding = { + agentId: string; // 目标 Agent ID + match?: { + channel?: string; // "telegram" + accountId?: string; // "default" 或 "*" (匹配所有) + peer?: { kind: string; id: string }; // 精确匹配特定聊天 + guildId?: string; // Discord 服务器 + teamId?: string; // MS Teams 团队 + }; +}; +``` + +### 5.3 Session Key 构建 + +Session Key 是会话持久化的核心标识,格式根据 DM Scope 不同而变化: + +``` +DM Scope 模式: + "main" → agent:{agentId}:main + "per-peer" → agent:{agentId}:dm:{peerId} + "per-channel-peer" → agent:{agentId}:{channel}:dm:{peerId} + "per-account-channel-peer"→ agent:{agentId}:{channel}:{accountId}:dm:{peerId} + +Group/Channel: + → agent:{agentId}:{channel}:{peerKind}:{peerId} + +Thread (线程): + → {baseSessionKey}:thread:{threadId} +``` + +**Identity Linking**: 支持跨渠道身份关联,例如 Telegram 用户 `123` 和 WhatsApp 用户 `456` 映射到同一个 canonical ID,共享同一个 session。 + +```typescript +// 配置 +session: { + dmScope: "per-peer", + identityLinks: { + "alice": ["telegram:123", "whatsapp:456"], + } +} +``` + +--- + +## 6. 消息流转: 正常发消息 → AI 回复 全链路 + +以下是一条用户消息从进入系统到 AI 回复的**完整流转路径**: + +``` + ┌─────────────────────────────────┐ + │ 用户发送消息 │ + └──────────────┬──────────────────┘ + │ + ┌──────────────▼──────────────────┐ + │ Channel 接收 (Inbound) │ + │ Polling / Webhook / WebSocket │ + └──────────────┬──────────────────┘ + │ + ┌─────────────────────▼─────────────────────┐ + │ 消息预处理 (bot-handlers) │ + │ │ + │ • 去重 (update offset + dedup) │ + │ • 媒体组缓冲 (multi-image → single event) │ + │ • 文本片段组装 (>4000 字符分片重组) │ + │ • Inbound 防抖 (快速连续消息合并) │ + │ • 媒体文件下载 (图片/视频/语音/贴纸) │ + └─────────────────────┬─────────────────────┘ + │ + ┌─────────────────────▼─────────────────────┐ + │ 上下文构建 (bot-message-context) │ + │ │ + │ 1. 解析 chatType (DM / Group / Thread) │ + │ 2. 解析 Agent 路由 (resolveAgentRoute) │ + │ → agentId, sessionKey │ + │ 3. 安全检查: │ + │ - DM 策略 (pairing/allowlist/open) │ + │ - Group 策略 (open/allowlist/disabled) │ + │ 4. @提及检测与门控 │ + │ - 显式 @bot 提及 │ + │ - 正则模式匹配 │ + │ - 回复链隐式提及 │ + │ 5. 发送 ACK 表情 (👀 处理中) │ + │ 6. 构建消息信封 [Channel From Time] body │ + │ 7. 提取上下文 (引用/转发/位置/群历史) │ + └─────────────────────┬─────────────────────┘ + │ + ┌─────────────────────▼─────────────────────┐ + │ 调度到 Agent (bot-message-dispatch) │ + │ │ + │ 1. 设置流式输出模式: │ + │ - "off": 等完整回复再发送 │ + │ - "partial": 逐 token 实时编辑消息 │ + │ - "block": 语义块级流式 │ + │ 2. 调用 dispatchReplyFromConfig() │ + └─────────────────────┬─────────────────────┘ + │ + ┌───────────────────────────▼───────────────────────────┐ + │ Auto-Reply 管线 (dispatch-from-config) │ + │ │ + │ 1. 检查重复入站消息 │ + │ 2. 触发 message_received 钩子 (插件) │ + │ 3. 检查 /stop 命令 (快速中断) │ + │ 4. 调用 getReplyFromConfig() → LLM 推理 │ + │ ├─ 加载 session transcript │ + │ ├─ 构建 Agent 上下文 (system prompt + tools + skills)│ + │ ├─ 调用 LLM (OpenAI/Anthropic/DeepSeek/...) │ + │ ├─ 执行 tools (如需要) │ + │ └─ 生成 ReplyPayload │ + │ 5. 应用 TTS (如配置) │ + │ 6. 处理跨 Channel 回复路由 │ + └───────────────────────────┬───────────────────────────┘ + │ + ┌─────────────────────▼─────────────────────┐ + │ 回复投递 (delivery / deliver.ts) │ + │ │ + │ 1. 加载 Channel Outbound Adapter │ + │ 2. 标准化 Payload (解析指令, 合并媒体) │ + │ 3. 文本分块: │ + │ - 按字符限制 (Telegram: 4096) │ + │ - 按段落/Markdown 块 │ + │ - Signal: Markdown → 富文本样式 │ + │ 4. 发送: │ + │ - sendText(text) │ + │ - sendMedia(caption, mediaUrl) │ + │ - sendPayload(payload) (channelData) │ + │ 5. 线程引用 (replyToId / threadId) │ + │ 6. 移除 ACK 表情 │ + │ 7. 记录 session transcript │ + └─────────────────────┬─────────────────────┘ + │ + ┌──────────────▼──────────────────┐ + │ Channel 发送 (Outbound) │ + │ Channel API → 用户收到回复 │ + └──────────────────────────────────┘ +``` + +### 关键步骤详解 + +#### Step 1: 消息接收 + +Channel 通过两种方式接收消息: +- **Long Polling**: 主动轮询 API 获取新消息 (Telegram, WhatsApp) +- **Webhook**: 被动接收 HTTP POST 推送 (Telegram 可选, Google Chat) +- **WebSocket**: 实时双向连接 (Discord via discord.js, Slack Socket Mode) + +#### Step 2: 消息预处理 + +```typescript +// Telegram 特有的预处理: + +// 1. 媒体组缓冲 — Telegram 将多图消息拆成多个 update,需要合并 +const MEDIA_GROUP_TIMEOUT_MS = 1500; // 等待 1.5s 收集同组媒体 + +// 2. 文本片段重组 — 超长消息被 Telegram 分片 +const TEXT_FRAGMENT_START_THRESHOLD = 4000; // >4000字符触发分片检测 + +// 3. Inbound 防抖 — 用户快速连发消息时合并处理 +createInboundDebouncer({ delayMs, maxWaitMs }); +``` + +#### Step 3: 路由解析 + +```typescript +const route = resolveAgentRoute({ + cfg, + channel: "telegram", + accountId: "default", + peer: { kind: "dm", id: "123456" }, +}); +// → { agentId: "assistant", sessionKey: "agent:assistant:main", matchedBy: "default" } +``` + +#### Step 4: Agent 调用 + +核心函数 `getReplyFromConfig()` 负责: +1. 从 sessionKey 加载历史 transcript +2. 根据 agentId 加载 Agent 配置 (system prompt, tools, skills) +3. 调用 LLM Provider (支持 OpenAI, Anthropic, DeepSeek, Kimi, Groq, Mistral, Google, Together) +4. 处理 tool calls (循环执行) +5. 返回 `ReplyPayload[]` + +#### Step 5: 回复投递 + +```typescript +await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123456", + accountId: "default", + payloads: [{ text: "Hello! I'm your AI assistant.", mediaUrl: "..." }], + replyToId: originalMessageId, +}); +``` + +--- + +## 7. 第三方渠道集成: Telegram 完整流程 + +以 Telegram 为例,详细说明第三方渠道的集成方式和消息流转。 + +### 7.1 插件注册 + +```typescript +// extensions/telegram/index.ts +export default { + id: "telegram", + register(api: OpenClawPluginApi) { + setTelegramRuntime(api.runtime); + api.registerChannel({ plugin: telegramPlugin }); + }, +}; +``` + +### 7.2 Channel Plugin 实现 + +```typescript +// extensions/telegram/src/channel.ts +export const telegramPlugin: ChannelPlugin = { + id: "telegram", + meta: getChatChannelMeta("telegram"), + capabilities: { + chatTypes: ["direct", "group", "channel", "thread"], + reactions: true, + threads: true, + media: true, + nativeCommands: true, + blockStreaming: true, + }, + + config: { + listAccountIds(cfg) { + // 返回配置中的所有 Telegram 账号 ID + return Object.keys(cfg.channels?.telegram?.accounts ?? {}); + }, + resolveAccount(cfg, accountId) { + // 解析账号配置 (botToken, dmPolicy, allowFrom 等) + }, + isConfigured(account) { + // 检查 botToken 是否存在 + }, + isEnabled(account) { + return account.enabled !== false; + }, + }, + + outbound: { + deliveryMode: "direct", // 直接调用 Bot API + textChunkLimit: 4000, // Telegram 限制 + chunker: markdownToTelegramChunks, // Markdown → Telegram HTML 分块 + + async sendText(ctx) { + return sendMessageTelegram(ctx.to, ctx.text, { + accountId: ctx.accountId, + replyToId: ctx.replyToId, + threadId: ctx.threadId, + }); + }, + + async sendMedia(ctx) { + return sendMessageTelegram(ctx.to, ctx.text, { + mediaUrl: ctx.mediaUrl, + accountId: ctx.accountId, + }); + }, + + resolveTarget({ to, allowFrom, accountId }) { + // 验证并标准化 Telegram chat ID + // 支持: 纯数字 ID, @username, t.me/ 链接 + }, + }, + + gateway: { + async startAccount(ctx) { + // 启动 Telegram 监听 + return monitorTelegramProvider({ + cfg: ctx.cfg, + accountId: ctx.accountId, + abortSignal: ctx.abortSignal, + setStatus: ctx.setStatus, + }); + }, + + async stopAccount(ctx) { + // 通过 AbortController 停止 + ctx.abortSignal.abort(); + }, + }, + + security: { + resolveDmPolicy(ctx) { + return { + policy: ctx.account.dmPolicy ?? "pairing", + allowFrom: ctx.account.allowFrom, + approveHint: "approve via /allow command", + }; + }, + }, +}; +``` + +### 7.3 Telegram 消息接收 (Inbound) 详细流程 + +``` +Telegram 用户发送消息 + │ + ▼ +┌───────────────────────────────────────┐ +│ Telegram API Server │ +│ (api.telegram.org) │ +└───────────────┬───────────────────────┘ + │ + ┌───────────┴───────────┐ + │ │ + ▼ ▼ +┌─────────┐ ┌──────────┐ +│ Polling │ │ Webhook │ +│ (默认) │ │ (可选) │ +│ │ │ │ +│ Grammy │ │ HTTP POST│ +│ Runner │ │ /webhook │ +│ getUpdates│ │ grammy │ +│ + backoff│ │ callback │ +└────┬─────┘ └────┬─────┘ + │ │ + └──────────┬──────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ Grammy Middleware Pipeline │ +│ │ +│ 1. apiThrottler() — 速率限制 │ +│ 2. sequentialize() — 按 chat/topic │ +│ 序列化更新, 保证处理顺序 │ +│ 3. 原始更新日志 (debug) │ +│ 4. Update offset 追踪 + 去重 │ +└───────────────┬──────────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ bot.on("message") Handler │ +│ (bot-handlers.ts — 928 行) │ +│ │ +│ 1. 验证 chatType, 群组策略 │ +│ 2. 文本片段缓冲 (>4000字符) │ +│ 3. 媒体组缓冲 (多图合并) │ +│ 4. 单媒体解析 (resolveMedia) │ +│ 5. Inbound 防抖 (快速连发合并) │ +│ 6. 调用 processMessage() │ +└───────────────┬──────────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ processMessage() │ +│ (bot-message.ts) │ +│ │ +│ ┌─ buildTelegramMessageContext() │ +│ │ (bot-message-context.ts 700行) │ +│ │ │ +│ │ • 记录 channel activity │ +│ │ • 解析 chatType + threadId │ +│ │ • resolveAgentRoute() → agent │ +│ │ • DM 安全检查: │ +│ │ - "pairing": 发送配对码 │ +│ │ - "allowlist": 检查白名单 │ +│ │ - "open": 放行 │ +│ │ • Group 安全检查 │ +│ │ • @提及检测: │ +│ │ - @bot 显式提及 │ +│ │ - 正则模式匹配 │ +│ │ - 回复链隐式提及 │ +│ │ • Mention Gating (群组中未提及 │ +│ │ 则跳过) │ +│ │ • 发送 ACK 反应 (👀) │ +│ │ • formatInboundEnvelope() │ +│ │ • 提取引用/转发/位置/贴纸 │ +│ │ • 群组历史上下文 │ +│ └────────────────────────┐ │ +│ │ │ +│ ┌─ dispatchTelegramMessage() │ +│ │ (bot-message-dispatch.ts 357行) │ +│ │ │ +│ │ • 配置流式模式: │ +│ │ - "off": 完整回复后发送 │ +│ │ - "partial": token级实时编辑 │ +│ │ - "block": 语义块级流式 │ +│ │ • dispatchReplyFromConfig() │ +│ │ → getReplyFromConfig() │ +│ │ → LLM 推理 + Tool 执行 │ +│ │ • 流式回调: │ +│ │ - onBlockReply → 编辑草稿消息 │ +│ │ - onToolResult → 中间结果 │ +│ │ • deliverReplies() → 发送回复 │ +│ │ • 移除 ACK 反应 │ +│ └───────────────────────────────────┘ +└──────────────────────────────────────┘ +``` + +### 7.4 Telegram 消息发送 (Outbound) 详细流程 + +```typescript +// src/telegram/send.ts — 754 行 + +async function sendMessageTelegram( + to: string, + text: string, + opts?: { + mediaUrl?: string; + accountId?: string; + replyToId?: string; + threadId?: string | number; + retry?: OutboundRetryConfig; + } +): Promise { + + // 1. 解析账号配置、Bot Token、代理 + const account = resolveAccount(cfg, opts.accountId); + const token = account.botToken; + const api = new Api(token, { proxy }); + + // 2. 标准化 chatId + // 支持: "123456", "@channel_name", "t.me/+xxxxx" + const chatId = normalizeTelegramChatId(to); + + // 3. 文本转换: Markdown → Telegram HTML + const html = markdownToTelegramHtml(text); + + // 4. 发送消息 + if (opts.mediaUrl) { + // 带媒体: sendPhoto / sendVideo / sendAudio / sendVoice / sendDocument / sendAnimation + const mediaType = detectMediaType(opts.mediaUrl); + const result = await api[`send${mediaType}`](chatId, { + caption: html, + parse_mode: "HTML", + reply_parameters: opts.replyToId ? { message_id: opts.replyToId } : undefined, + message_thread_id: opts.threadId, + }); + return { channel: "telegram", messageId: result.message_id, chatId }; + } else { + // 纯文本 + const result = await api.sendMessage(chatId, html, { + parse_mode: "HTML", + reply_parameters: opts.replyToId ? { message_id: opts.replyToId } : undefined, + message_thread_id: opts.threadId, + link_preview_options: { is_disabled: !account.linkPreview }, + }); + return { channel: "telegram", messageId: result.message_id, chatId }; + } + + // 5. 错误处理 + // - HTML 解析失败 → 降级为纯文本重试 + // - 网络错误 → 指数退避重试 + // - 语音消息被禁止 → 降级为文档发送 + + // 6. 记录已发送消息 (用于反应追踪) +} +``` + +### 7.5 Telegram 配置示例 + +```json5 +// ~/.super-multica/credentials.json5 +{ + channels: { + telegram: { + accounts: { + default: { + botToken: "123456:ABC-DEF...", // BotFather 获取 + // 或 tokenFile: "/path/to/token", // 密钥管理器 + dmPolicy: "pairing", // DM 安全策略 + allowFrom: [123456789], // 白名单 (Telegram user ID) + groupPolicy: "open", // 群组策略 + streamMode: "partial", // 流式输出模式 + textChunkLimit: 4000, // 文本分块大小 + replyToMode: "first", // 引用回复模式 + reactionLevel: "ack", // ACK 反应级别 + actions: { + reactions: true, + sendMessage: true, + }, + groups: { + "-1001234567890": { // 群组 ID + requireMention: true, // 需要 @提及 + tools: { allow: ["search", "calculator"] }, + topics: { + "42": { enabled: true }, // 论坛 topic + }, + }, + }, + }, + }, + }, + }, +} +``` + +--- + +## 8. Outbound 投递管线 + +### 8.1 投递编排 + +```typescript +// src/infra/outbound/deliver.ts + +async function deliverOutboundPayloads(params: { + cfg: OpenClawConfig; + channel: "telegram" | "discord" | "slack" | ...; + to: string; // 目标 ID + accountId?: string; + payloads: ReplyPayload[]; // 回复载荷数组 + replyToId?: string; // 引用的消息 ID + threadId?: string | number;// 线程 ID + abortSignal?: AbortSignal; // 中止信号 + mirror?: { // Session transcript 镜像 + sessionKey: string; + text?: string; + }; +}): Promise { + + // 1. 加载 Channel Outbound Adapter + const handler = await createChannelHandler({ + cfg, channel, to, accountId, ... + }); + // → loadChannelOutboundAdapter(channel) + // → plugin.outbound.sendText / sendMedia + + // 2. 标准化 Payload + const normalized = normalizeReplyPayloadsForDelivery(payloads); + // → 解析文本指令 (mediaUrl, replyToId) + // → 合并多个媒体 URL + // → 过滤空/静默 payload + + // 3. 逐个 Payload 发送 + for (const payload of normalized) { + if (payload.mediaUrls.length === 0) { + // 纯文本 → 分块发送 + await sendTextChunks(payload.text); + } else { + // 带媒体 → 逐媒体发送 (首个附带 caption) + for (const url of payload.mediaUrls) { + await handler.sendMedia(first ? payload.text : "", url); + } + } + } + + // 4. 镜像到 session transcript + if (params.mirror) { + await appendAssistantMessageToSessionTranscript(mirror); + } +} +``` + +### 8.2 文本分块策略 + +``` +分块模式: + "length" (默认) — 按字符限制硬切 (chunker 函数) + "newline" — 先按段落/换行拆分, 再按字符限制 + +特殊处理: + Signal — Markdown → 富文本样式 (SignalTextStyleRange) + Telegram — Markdown → HTML (Telegram flavor) + Discord — 原生 Markdown + Embed +``` + +### 8.3 Channel 选择 + +当系统需要主动发消息(非回复),需要确定使用哪个 Channel: + +```typescript +// src/infra/outbound/channel-selection.ts + +async function resolveMessageChannelSelection(params: { + cfg: OpenClawConfig; + channel?: string; +}) { + // 1. 如果指定了 channel, 直接使用 + // 2. 列出所有已配置的 channel + // 3. 只有一个 → 自动选择 + // 4. 多个 → 抛错要求明确指定 +} + +async function listConfiguredMessageChannels(cfg) { + // 遍历所有已注册的 channel 插件 + // 检查每个插件是否有启用且已配置的账号 + for (const plugin of listChannelPlugins()) { + if (await isPluginConfigured(plugin, cfg)) { + channels.push(plugin.id); + } + } +} +``` + +--- + +## 9. 安全与访问控制 + +### 9.1 DM 安全策略 + +``` +策略类型 (dmPolicy): + "pairing" (默认) — 未知发送者收到配对码, 需管理员批准 + "allowlist" — 仅允许 allowFrom 列表中的用户 + "open" — 允许所有 DM (需 allowFrom 包含 "*") + "disabled" — 忽略所有 DM +``` + +**配对流程 (Pairing)**: +1. 未知用户发送 DM +2. 系统生成配对码,回复给用户 +3. 管理员通过 `/allow` 命令批准 +4. 用户 ID 被加入持久化白名单 + +### 9.2 群组安全策略 + +``` +策略类型 (groupPolicy): + "open" — 绕过 allowFrom, 仅受 mention-gating 控制 + "allowlist" — 仅允许 groupAllowFrom/allowFrom 中的发送者 + "disabled" — 完全阻止群消息 +``` + +### 9.3 Mention Gating (提及门控) + +```typescript +// src/channels/mention-gating.ts + +function resolveMentionGating(params: { + requireMention: boolean; // 是否需要 @提及 + canDetectMention: boolean; // 渠道是否能检测提及 + wasMentioned: boolean; // 是否被提及 + implicitMention?: boolean; // 隐式提及 (回复链) + shouldBypassMention?: boolean; // 命令绕过 +}): { + effectiveWasMentioned: boolean; + shouldSkip: boolean; // true = 跳过处理 +}; +``` + +在群组中,当 `requireMention = true` 时: +- 显式 `@bot` 提及 → 处理 +- 回复 bot 消息 (隐式提及) → 处理 +- 授权用户发送控制命令 → 绕过门控 +- 其他消息 → 跳过 + +--- + +## 10. Channel Manager 生命周期管理 + +```typescript +// src/gateway/server-channels.ts + +type ChannelManager = { + getRuntimeSnapshot: () => ChannelRuntimeSnapshot; // 获取所有 channel 运行状态 + startChannels: () => Promise; // 启动所有已配置 channel + startChannel: (channel, accountId?) => Promise; // 启动单个 channel + stopChannel: (channel, accountId?) => Promise; // 停止单个 channel + markChannelLoggedOut: (channelId, cleared, accountId?) => void; // 标记登出 +}; +``` + +### 启动流程 + +``` +createChannelManager(opts) + │ + ├─ startChannels() + │ └─ for each plugin in listChannelPlugins(): + │ └─ startChannel(plugin.id) + │ + └─ startChannel(channelId, accountId?) + │ + ├─ 获取 plugin = getChannelPlugin(channelId) + ├─ 获取 startAccount = plugin.gateway.startAccount + │ + ├─ for each accountId in plugin.config.listAccountIds(cfg): + │ ├─ 检查是否已启动 (store.tasks.has(id)) + │ ├─ 解析账号配置: plugin.config.resolveAccount(cfg, id) + │ ├─ 检查启用状态: plugin.config.isEnabled(account, cfg) + │ ├─ 检查配置完整: plugin.config.isConfigured(account, cfg) + │ │ + │ ├─ 创建 AbortController + │ ├─ 更新运行状态: setRuntime(running: true, lastStartAt: now) + │ │ + │ └─ startAccount({ + │ cfg, accountId, account, + │ runtime, + │ abortSignal: abort.signal, + │ log: channelLogs[channelId], + │ getStatus, setStatus, + │ }) + │ │ + │ └─ (Telegram) → monitorTelegramProvider() + │ → Grammy Runner / Webhook Server + │ + └─ 错误处理: + ├─ catch → setRuntime(lastError: message) + └─ finally → setRuntime(running: false, lastStopAt: now) +``` + +### 运行时状态追踪 + +```typescript +type ChannelRuntimeStore = { + aborts: Map; // 每个账号的中止控制器 + tasks: Map>; // 每个账号的运行任务 + runtimes: Map; // 每个账号的状态快照 +}; +``` + +`getRuntimeSnapshot()` 聚合所有 channel 的账号状态,用于 UI 展示和健康监控。 + +--- + +## 11. 设计亮点与可借鉴之处 + +### 11.1 Adapter 模式 + +每个 Channel 只需实现必要的 Adapter,无需实现全部。这种**可选 Adapter 组合**模式比传统的继承/全量接口更灵活: + +```typescript +// 最简 Channel 实现只需要: +{ + id: "my-channel", + meta: { ... }, + capabilities: { chatTypes: ["direct"] }, + config: { listAccountIds, resolveAccount }, // 必选 + outbound: { sendText, sendMedia }, // 发消息 + gateway: { startAccount }, // 生命周期 +} +``` + +### 11.2 插件发现的层级优先级 + +``` +config > workspace > global > bundled +``` + +允许用户在项目级、全局级、以及内置级别分别管理插件,高优先级覆盖低优先级。 + +### 11.3 Session Key 的灵活设计 + +通过 `dmScope` 控制 DM 会话的隔离粒度: +- `"main"` — 所有 DM 共享一个 session (跨渠道统一上下文) +- `"per-peer"` — 每个联系人独立 session +- `"per-channel-peer"` — 每个渠道+联系人独立 +- `"per-account-channel-peer"` — 最细粒度 + +配合 `identityLinks` 实现跨渠道身份关联。 + +### 11.4 流式输出的三级模式 + +``` +"off" — 完整回复后一次性发送 +"partial" — Token 级实时编辑消息 (Telegram editMessageText) +"block" — 语义块级流式 (一段完成后发送) +``` + +### 11.5 安全模型分层 + +``` +DM 层: dmPolicy (pairing/allowlist/open/disabled) +Group 层: groupPolicy (open/allowlist/disabled) +提及层: mention-gating (requireMention + 检测) +命令层: command-gating (权限控制) +``` + +### 11.6 统一的 Outbound 投递管线 + +所有 Channel 共享同一个 `deliverOutboundPayloads()` 入口,通过 `loadChannelOutboundAdapter()` 动态加载具体 Channel 的发送逻辑。文本分块、Payload 标准化、错误处理、transcript 镜像等逻辑全部复用。 + +### 11.7 值得注意的工程实践 + +- **Update Offset 持久化** — Telegram 轮询重启后从上次 offset 恢复,避免重复处理 +- **媒体组缓冲** — 解决 Telegram 多图消息拆分为多个 update 的问题 +- **文本片段重组** — 解决超长消息被 Telegram 拆分的问题 +- **Grammy sequentialize** — 保证同一聊天的消息按顺序处理 +- **AbortController** — 优雅的生命周期控制 +- **Symbol.for 全局单例** — 跨模块共享插件注册表 + +--- + +## 12. 关键文件索引 + +### Inbound 链路 +| 文件 | 行数 | 职责 | +|------|------|------| +| `src/telegram/monitor.ts` | 215 | 长轮询/Webhook 启动入口 | +| `src/telegram/webhook.ts` | 127 | HTTP Webhook 服务器 | +| `src/telegram/bot.ts` | 494 | Grammy Bot 创建与中间件编排 | +| `src/telegram/bot-handlers.ts` | 928 | 消息/回调/反应处理器注册 | +| `src/telegram/bot-message.ts` | 92 | 消息处理器工厂 | +| `src/telegram/bot-message-context.ts` | 700 | Inbound 上下文构建 | +| `src/telegram/bot-message-dispatch.ts` | 357 | 调度到 Agent 并处理流式回复 | + +### Outbound 链路 +| 文件 | 行数 | 职责 | +|------|------|------| +| `src/infra/outbound/deliver.ts` | 376 | 主投递编排 | +| `src/infra/outbound/payloads.ts` | ~150 | Payload 标准化 | +| `src/infra/outbound/channel-selection.ts` | ~100 | 多 Channel 选择 | +| `src/telegram/send.ts` | 754 | Telegram 发送函数 | +| `src/telegram/bot/delivery.ts` | 562 | Telegram 回复投递 | + +### 插件系统 +| 文件 | 行数 | 职责 | +|------|------|------| +| `src/plugins/registry.ts` | ~350 | 插件注册表 | +| `src/plugins/runtime.ts` | ~50 | 全局单例管理 | +| `src/plugins/loader.ts` | ~400 | 插件加载器 | +| `src/channels/plugins/types.plugin.ts` | 85 | ChannelPlugin 接口 | +| `src/channels/plugins/types.adapters.ts` | 313 | Adapter 接口 | +| `src/channels/plugins/types.core.ts` | 332 | 基础类型 | +| `src/channels/plugins/catalog.ts` | ~300 | 插件发现与目录 | + +### 路由与会话 +| 文件 | 行数 | 职责 | +|------|------|------| +| `src/routing/resolve-route.ts` | 261 | Agent 路由解析 | +| `src/routing/bindings.ts` | 121 | 路由绑定 | +| `src/routing/session-key.ts` | 250 | Session Key 构建 | + +### 生命周期 +| 文件 | 行数 | 职责 | +|------|------|------| +| `src/gateway/server-channels.ts` | 309 | ChannelManager | +| `src/channels/registry.ts` | 180 | 内置 Channel 注册表 | +| `extensions/telegram/index.ts` | ~15 | 插件入口 | +| `extensions/telegram/src/channel.ts` | 482 | Telegram ChannelPlugin 实现 | + +### 配置 +| 文件 | 职责 | +|------|------| +| `src/config/types.channels.ts` | Channel 配置汇总 | +| `src/config/types.telegram.ts` | Telegram 专属配置 (~200 行) | From 9bb1fd6e12ec5ff1675c635e46daa8dbb4f273b5 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 19:35:25 +0800 Subject: [PATCH 08/33] refactor(channels): rewrite ChannelManager with lastRoute pattern Replace per-conversation agent creation with single Hub agent model. Messages from channels are routed to the existing Hub agent via agent.write(), and replies are sent back through the lastRoute context. Desktop and Gateway paths call clearLastRoute() so channel replies stop when the user switches input surface. Co-Authored-By: Claude Opus 4.6 --- src/channels/manager.ts | 246 +++++++++++++++++++++++++--------------- src/hub/hub.ts | 11 +- 2 files changed, 160 insertions(+), 97 deletions(-) diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 82f440cf..de371f9a 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -1,14 +1,11 @@ /** - * Channel Manager — orchestrates channel plugin lifecycles and message routing. + * Channel Manager — bridges messaging channels to the Hub's agent. * - * For each configured channel account: - * 1. Starts the gateway adapter (receive messages) - * 2. Routes incoming messages to per-conversation Agents - * 3. Collects Agent responses via MessageAggregator - * 4. Sends responses back via the outbound adapter + * Design: One Hub, one Agent. Channels are just alternative input/output surfaces. + * - Incoming: channel message → agent.write(text) (same as desktop/gateway) + * - Outgoing: agent reply → check lastRoute → forward to originating channel * - * Channel is just a messenger — it doesn't manage context or history. - * That's the Agent's job. + * Uses "last route" pattern: whoever sent the last message gets the reply. */ import type { Hub } from "../hub/hub.js"; @@ -30,12 +27,22 @@ interface AccountHandle { state: ChannelAccountState; } +/** Tracks where the last message came from, so replies go back there. */ +interface LastRoute { + plugin: ChannelPlugin; + deliveryCtx: DeliveryContext; +} + export class ChannelManager { private readonly hub: Hub; /** Running accounts keyed by "channelId:accountId" */ private readonly accounts = new Map(); - /** Agents keyed by "channelId:conversationId" for per-conversation isolation */ - private readonly conversationAgents = new Map(); + /** Where the last channel message came from (reply target) */ + private lastRoute: LastRoute | null = null; + /** Unsubscribe function for the agent subscriber */ + private agentUnsubscribe: (() => void) | null = null; + /** Current aggregator for buffering streaming responses */ + private aggregator: MessageAggregator | null = null; constructor(hub: Hub) { this.hub = hub; @@ -68,6 +75,9 @@ export class ChannelManager { await this.startAccount(plugin.id, accountId, account); } } + + // Subscribe to the Hub's agent for outbound routing + this.subscribeToAgent(); } /** Start a specific channel account */ @@ -100,8 +110,6 @@ export class ChannelManager { console.log(`[Channels] Starting ${key}`); try { - // Start gateway — this begins receiving messages - // The promise may resolve immediately (polling started) or stay pending (long-connection) const startPromise = plugin.gateway.start( accountId, accountConfig, @@ -111,8 +119,6 @@ export class ChannelManager { abortController.signal, ); - // Don't await forever — the start() might be long-running (e.g. polling loop) - // Give it a moment to fail fast if credentials are wrong await Promise.race([ startPromise, new Promise((resolve) => setTimeout(resolve, 3000)), @@ -127,36 +133,102 @@ export class ChannelManager { } } - /** Stop all running channel accounts */ - stopAll(): void { - console.log("[Channels] Stopping all channels..."); - for (const [key, handle] of this.accounts) { - handle.abortController.abort(); - handle.state = { ...handle.state, status: "stopped" }; - console.log(`[Channels] Stopped ${key}`); + /** Get the Hub's current agent (the first active one) */ + private getHubAgent(): AsyncAgent | undefined { + const agentIds = this.hub.listAgents(); + if (agentIds.length === 0) { + console.warn("[Channels] No agent available in Hub"); + return undefined; } - this.accounts.clear(); - this.conversationAgents.clear(); - } - - /** Get or create an Agent for a specific conversation */ - private getOrCreateAgent(channelId: string, conversationId: string): AsyncAgent { - const key = `${channelId}:${conversationId}`; - const existing = this.conversationAgents.get(key); - if (existing && !existing.closed) { - return existing; - } - - const agent = this.hub.createAgent(); - this.conversationAgents.set(key, agent); + const agent = this.hub.getAgent(agentIds[0]!); return agent; } /** - * Route an incoming message to the appropriate Agent and wire the response - * back to the channel via MessageAggregator. - * - * This is the core bridge logic — generalized for any channel. + * Subscribe to the Hub's agent events (once, persistent). + * When AI replies and lastRoute points to a channel, forward the reply there. + */ + private subscribeToAgent(): void { + const agent = this.getHubAgent(); + if (!agent) { + console.warn("[Channels] No agent to subscribe to, channel replies will not be routed"); + return; + } + + console.log(`[Channels] Subscribing to agent ${agent.sessionId} for outbound routing`); + + this.agentUnsubscribe = agent.subscribe((event) => { + // No active channel route — skip (reply goes to desktop/gateway only) + if (!this.lastRoute) return; + + // Handle agent errors — notify the channel user + if (event.type === "agent_error") { + const errorMsg = (event as { error?: string }).error ?? "Unknown error"; + console.error(`[Channels] Agent error: ${errorMsg}`); + const route = this.lastRoute; + if (route) { + void route.plugin.outbound.sendText(route.deliveryCtx, `[Error] ${errorMsg}`).catch((err) => { + console.error(`[Channels] Failed to send error to channel: ${err}`); + }); + } + return; + } + + const maybeMessage = (event as { message?: { role?: string } }).message; + const role = maybeMessage?.role; + + // Only forward assistant message events + if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") { + if (role !== "assistant") return; + } else { + // Non-message events (tool_execution etc.) — skip for channels + return; + } + + // Ensure aggregator exists for this response + if (event.type === "message_start") { + this.createAggregator(); + } + + if (this.aggregator) { + this.aggregator.handleEvent(event); + } + + // Clean up after response complete + if (event.type === "message_end" && role === "assistant") { + this.aggregator = null; + } + }); + } + + /** Create a fresh aggregator wired to the current lastRoute */ + private createAggregator(): void { + const route = this.lastRoute; + if (!route) return; + + const { plugin, deliveryCtx } = route; + const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; + + this.aggregator = new MessageAggregator( + chunkerConfig, + async (block) => { + try { + console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId}`); + if (block.index === 0) { + await plugin.outbound.replyText(deliveryCtx, block.text); + } else { + await plugin.outbound.sendText(deliveryCtx, block.text); + } + } catch (err) { + console.error(`[Channels] Failed to send reply: ${err}`); + } + }, + () => {}, + ); + } + + /** + * Incoming channel message → update lastRoute → forward to Hub's agent. */ private routeIncoming( plugin: ChannelPlugin, @@ -165,67 +237,57 @@ export class ChannelManager { ): void { const { conversationId, senderId, text, messageId } = message; console.log( - `[Channels] Incoming message: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, + `[Channels] Incoming: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`, ); - // Find or create Agent for this conversation - const agent = this.getOrCreateAgent(plugin.id, conversationId); - const isNew = !this.conversationAgents.has(`${plugin.id}:${conversationId}`) ? "new" : "existing"; - console.log(`[Channels] Routing to agent: key=${plugin.id}:${conversationId} agentId=${agent.sessionId} (${isNew})`); + const agent = this.getHubAgent(); + if (!agent) { + console.error("[Channels] No agent available, dropping message"); + return; + } - // Build delivery context for outbound replies - const deliveryCtx: DeliveryContext = { - channel: plugin.id, - accountId, - conversationId, - replyToMessageId: messageId, + // Update last route — replies will go back here + this.lastRoute = { + plugin, + deliveryCtx: { + channel: plugin.id, + accountId, + conversationId, + replyToMessageId: messageId, + }, }; + console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`); + console.log(`[Channels] Forwarding to agent ${agent.sessionId}`); - // Use channel-specific chunker config or defaults - const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; - - // Create a fresh aggregator for this message's response - const aggregator = new MessageAggregator( - chunkerConfig, - async (block) => { - try { - console.log(`[Channels] Block ${block.index} ready (${block.text.length} chars${block.isFinal ? ", final" : ""}), sending reply`); - if (block.index === 0) { - await plugin.outbound.replyText(deliveryCtx, block.text); - } else { - await plugin.outbound.sendText(deliveryCtx, block.text); - } - if (block.isFinal) { - console.log(`[Channels] Response complete: channel=${plugin.id} conv=${conversationId} blocks=${block.index + 1}`); - } - } catch (err) { - console.error(`[Channels] Failed to send reply: ${err}`); - } - }, - (_event) => { - // Pass-through events (tool_execution, compaction, etc.) - // Could add typing indicators per-channel later - }, - ); - - // Subscribe to agent events BEFORE writing the message - console.log("[Channels] Agent subscribed, sending message to agent"); - const unsubscribe = agent.subscribe((event) => { - aggregator.handleEvent(event); - - // Unsubscribe after the response is complete - if (event.type === "message_end") { - const maybeMessage = (event as { message?: { role?: string } }).message; - if (maybeMessage?.role === "assistant") { - unsubscribe(); - } - } - }); - - // Send user message to the agent + // Same as typing in the desktop chat agent.write(text); } + /** Stop all running channel accounts */ + stopAll(): void { + console.log("[Channels] Stopping all channels..."); + if (this.agentUnsubscribe) { + this.agentUnsubscribe(); + this.agentUnsubscribe = null; + } + for (const [key, handle] of this.accounts) { + handle.abortController.abort(); + handle.state = { ...handle.state, status: "stopped" }; + console.log(`[Channels] Stopped ${key}`); + } + this.accounts.clear(); + this.lastRoute = null; + this.aggregator = null; + } + + /** Clear the last route (e.g. when desktop user sends a message) */ + clearLastRoute(): void { + if (this.lastRoute) { + console.log("[Channels] lastRoute cleared (non-channel message received)"); + this.lastRoute = null; + } + } + /** Get status of all accounts */ listAccountStates(): ChannelAccountState[] { return Array.from(this.accounts.values()).map((h) => ({ ...h.state })); diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 2bee8078..780a3c1b 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -197,6 +197,7 @@ export class Hub { const agent = this.agents.get(agentId); if (agent && !agent.closed) { this.agentSenders.set(agentId, msg.from); + this.channelManager.clearLastRoute(); agent.write(content); } else { console.warn(`[Hub] Agent not found or closed: ${agentId}`); @@ -323,12 +324,12 @@ export class Hub { content: item.content, }); } else { - // Compaction events: forward with synthetic streamId (no stream tracking) - const isCompactionEvent = - item.type === "compaction_start" || item.type === "compaction_end"; - if (isCompactionEvent) { + // Passthrough events: forward with synthetic streamId (no stream tracking) + const isPassthroughEvent = + item.type === "compaction_start" || item.type === "compaction_end" || item.type === "agent_error"; + if (isPassthroughEvent) { this.client.send(targetDeviceId, StreamAction, { - streamId: `compaction:${agent.sessionId}`, + streamId: `system:${agent.sessionId}`, agentId: agent.sessionId, event: item, }); From 4a503ecf4c9e1c855f54776e4ef256b9ce67ba45 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 19:35:32 +0800 Subject: [PATCH 09/33] feat(agent): add AgentErrorEvent for error propagation via subscribe Add agent_error event type to MulticaEvent union so errors from agent runs reach subscribe() consumers (Desktop IPC + Channel). Make emitMulticaEvent public on Runner so AsyncAgent can emit errors. Co-Authored-By: Claude Opus 4.6 --- src/agent/async-agent.ts | 4 ++++ src/agent/events.ts | 8 +++++++- src/agent/runner.ts | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index db22b843..910e2742 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -48,12 +48,16 @@ export class AsyncAgent { await this.agent.flushSession(); // Normal text is delivered via message_end event; only handle errors here if (result.error) { + console.error(`[AsyncAgent] Agent run error: ${result.error}`); this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + this.agent.emitMulticaEvent({ type: "agent_error", error: result.error }); } }) .catch((err) => { const message = err instanceof Error ? err.message : String(err); + console.error(`[AsyncAgent] Agent run exception: ${message}`); this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + this.agent.emitMulticaEvent({ type: "agent_error", error: message }); }); } diff --git a/src/agent/events.ts b/src/agent/events.ts index 8eb8b422..de46f02a 100644 --- a/src/agent/events.ts +++ b/src/agent/events.ts @@ -26,5 +26,11 @@ export type CompactionEndEvent = { reason: "count" | "tokens" | "summary" | "pruning"; }; +/** Emitted when the agent encounters an error (LLM failure, quota exceeded, etc.) */ +export type AgentErrorEvent = { + type: "agent_error"; + error: string; +}; + /** Union of all Multica-specific events */ -export type MulticaEvent = CompactionStartEvent | CompactionEndEvent; +export type MulticaEvent = CompactionStartEvent | CompactionEndEvent | AgentErrorEvent; diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 65fd528b..479cd473 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -342,7 +342,7 @@ export class Agent { }; } - private emitMulticaEvent(event: MulticaEvent): void { + emitMulticaEvent(event: MulticaEvent): void { for (const fn of this.multicaListeners) { try { fn(event); From 72598322d14992004c8ede446ef919573ebf41c7 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 19:35:39 +0800 Subject: [PATCH 10/33] feat(desktop): handle agent_error events and clearLastRoute in IPC Forward agent_error as passthrough event to renderer. Add clearLastRoute() calls in hub:sendMessage and localChat:send handlers so channel replies stop when desktop sends a message. Handle agent_error in use-local-chat to show error UI. Co-Authored-By: Claude Opus 4.6 --- apps/desktop/electron/ipc/hub.ts | 12 +++++++----- apps/desktop/src/hooks/use-local-chat.ts | 8 ++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/apps/desktop/electron/ipc/hub.ts b/apps/desktop/electron/ipc/hub.ts index b038efe8..288c4ef0 100644 --- a/apps/desktop/electron/ipc/hub.ts +++ b/apps/desktop/electron/ipc/hub.ts @@ -236,6 +236,7 @@ export function registerHubIpcHandlers(): void { if (agent.closed) { return { error: `Agent is closed: ${agentId}` } } + h.channelManager.clearLastRoute() agent.write(content) return { ok: true } }) @@ -268,11 +269,11 @@ export function registerHubIpcHandlers(): void { return } - // Compaction events: forward with no stream tracking - const isCompactionEvent = - event.type === 'compaction_start' || event.type === 'compaction_end' - if (isCompactionEvent) { - safeLog(`[IPC] Sending compaction event to renderer: ${event.type}`) + // Compaction and error events: forward with no stream tracking + const isPassthroughEvent = + event.type === 'compaction_start' || event.type === 'compaction_end' || event.type === 'agent_error' + if (isPassthroughEvent) { + safeLog(`[IPC] Sending ${event.type} event to renderer`) mainWindowRef.webContents.send('localChat:event', { agentId, streamId: null, @@ -384,6 +385,7 @@ export function registerHubIpcHandlers(): void { return { error: 'Not subscribed to agent events. Call subscribe first.' } } + h.channelManager.clearLastRoute() agent.write(content) safeLog(`[IPC] Local chat message sent to agent: ${agentId}`) return { ok: true } diff --git a/apps/desktop/src/hooks/use-local-chat.ts b/apps/desktop/src/hooks/use-local-chat.ts index cf637f25..ec7e9677 100644 --- a/apps/desktop/src/hooks/use-local-chat.ts +++ b/apps/desktop/src/hooks/use-local-chat.ts @@ -56,6 +56,14 @@ export function useLocalChat() { const payload = data as unknown as StreamPayload if (!payload.event) return + // Handle agent errors as transient UI feedback (not persisted to history) + if (payload.event.type === 'agent_error') { + const errorMsg = (payload.event as { error?: string }).error ?? 'Unknown error' + chatRef.current.setError({ code: 'AGENT_ERROR', message: errorMsg }) + setIsLoading(false) + return + } + chatRef.current.handleStream(payload) if (payload.event.type === 'message_start') setIsLoading(true) if (payload.event.type === 'message_end') setIsLoading(false) From e99600c3562cc49b690dc62f640a176830be621f Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 19:35:46 +0800 Subject: [PATCH 11/33] fix(desktop): register ChatPage element in router The /chat route was missing its element prop, causing React Router to render an empty Outlet. Co-Authored-By: Claude Opus 4.6 --- apps/desktop/src/App.tsx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/desktop/src/App.tsx b/apps/desktop/src/App.tsx index b67f362f..df6ae8b1 100644 --- a/apps/desktop/src/App.tsx +++ b/apps/desktop/src/App.tsx @@ -1,6 +1,7 @@ import { createHashRouter, RouterProvider } from 'react-router-dom' import Layout from './pages/layout' import HomePage from './pages/home' +import ChatPage from './pages/chat' import ToolsPage from './pages/tools' import SkillsPage from './pages/skills' @@ -10,7 +11,7 @@ const router = createHashRouter([ element: , children: [ { index: true, element: }, - { path: 'chat' }, + { path: 'chat', element: }, { path: 'tools', element: }, { path: 'skills', element: }, ], From be71614cf57be4b6ab90adf54e805e429f5b8569 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Fri, 6 Feb 2026 19:35:51 +0800 Subject: [PATCH 12/33] docs(channels): add message paths documentation Document the three independent message paths (Desktop IPC, Web WebSocket, Channel Bot API) including send/receive flows, error handling, lastRoute pattern, and event filtering comparison. Co-Authored-By: Claude Opus 4.6 --- docs/message-paths.md | 232 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) create mode 100644 docs/message-paths.md diff --git a/docs/message-paths.md b/docs/message-paths.md new file mode 100644 index 00000000..2b8d24c7 --- /dev/null +++ b/docs/message-paths.md @@ -0,0 +1,232 @@ +# Message Paths — Desktop / Web / Channel + +Three independent paths deliver messages to and from the Hub's agent. +All three share the same `AsyncAgent` instance — they are just different I/O surfaces. + +--- + +## Overview + +``` +Desktop (Electron IPC) Web (WebSocket via Gateway) Channel (Bot API, e.g. Telegram) + │ │ │ + ▼ ▼ ▼ + localChat:send IPC client.send → Gateway WS plugin.gateway (polling/webhook) + │ │ │ + ▼ ▼ ▼ + hub.ts / ipc/hub.ts hub.ts / onMessage manager.ts / routeIncoming + clearLastRoute() clearLastRoute() set lastRoute + │ │ │ + └────────────────► agent.write(text) ◄──────────────────────────────┘ + │ + ▼ + AsyncAgent.run() + │ + ┌────────────┴────────────────┐ + ▼ ▼ + agent.subscribe() agent.read() + (multi-consumer) (single-consumer iterable) + │ │ + ┌────────┴────────┐ ▼ + ▼ ▼ hub.ts / consumeAgent() + Desktop IPC Channel Manager │ + (ipc/hub.ts) (manager.ts) ▼ + │ │ Gateway WS → Web client + ▼ ▼ + localChat:event Bot API reply + → renderer (via lastRoute) +``` + +--- + +## Path 1: Desktop (Electron IPC) + +### Send (User → Agent) + +``` +Renderer: sendMessage(text) + → IPC: localChat:send + → ipc/hub.ts handler + → hub.channelManager.clearLastRoute() // reply stays in desktop + → agent.write(text) +``` + +**File**: `apps/desktop/electron/ipc/hub.ts` — `localChat:send` handler (line ~373) + +### Receive (Agent → User) + +``` +Agent runs LLM + → pi-agent-core fires AgentEvent + → Agent.subscribeAll() → AsyncAgent channel + subscribers + → agent.subscribe() callback in ipc/hub.ts + → Filter: assistant messages + tool_execution + passthrough (compaction, agent_error) + → IPC: mainWindow.webContents.send('localChat:event', { agentId, streamId, event }) + → Renderer: use-local-chat.ts onEvent callback + → chat.handleStream(payload) +``` + +**Files**: +- `apps/desktop/electron/ipc/hub.ts` — `localChat:subscribe` handler (line ~248) +- `apps/desktop/src/hooks/use-local-chat.ts` — `onEvent` listener (line ~54) +- `packages/hooks/src/use-chat.ts` — `handleStream()` (line ~133) + +### Error Handling + +``` +Agent.run() throws / returns error + → AsyncAgent.write() catch block + → channel.send(legacy Message) // for read() consumers (Web) + → agent.emitMulticaEvent({ type: "agent_error", error }) // for subscribe() consumers + → ipc/hub.ts subscriber → passthrough event → localChat:event + → use-local-chat.ts → chat.setError() + setIsLoading(false) +``` + +--- + +## Path 2: Web (WebSocket via Gateway) + +### Send (User → Agent) + +``` +Web app: sendMessage(text) + → GatewayClient.send(hubId, "message", { agentId, content }) + → Socket.io → Gateway server → routes to Hub device + → hub.ts / onMessage handler + → channelManager.clearLastRoute() // reply stays in gateway + → agentSenders.set(agentId, deviceId) + → agent.write(content) +``` + +**File**: `src/hub/hub.ts` — `onMessage` handler (line ~154) + +### Receive (Agent → User) + +``` +Agent runs LLM + → pi-agent-core fires AgentEvent + → Agent.subscribeAll() → AsyncAgent channel + subscribers + → agent.read() consumed by hub.ts / consumeAgent() + → Filter: assistant messages + tool_execution + passthrough (compaction, agent_error) + → client.send(targetDeviceId, StreamAction, { streamId, agentId, event }) + → Socket.io → Gateway → routes to Web client device + → GatewayClient.onMessage callback + → use-gateway-chat.ts → chat.handleStream(payload) +``` + +**Files**: +- `src/hub/hub.ts` — `consumeAgent()` (line ~314) +- `packages/hooks/src/use-gateway-chat.ts` — `onMessage` listener (line ~50) +- `packages/hooks/src/use-chat.ts` — `handleStream()` (line ~133) + +### Error Handling + +``` +Agent.run() throws / returns error + → AsyncAgent.write() catch block + → channel.send(legacy Message) // consumed by consumeAgent() → sent as "message" action + → agent.emitMulticaEvent({ type: "agent_error", error }) + → read() → consumeAgent() → passthrough event → StreamAction + → GatewayClient → use-gateway-chat.ts → chat.setError() + setIsLoading(false) +``` + +**Note**: Legacy error Messages also reach the Web client as `"message"` action (a plain text fallback). The `agent_error` event provides structured error info for proper UI rendering. + +--- + +## Path 3: Channel (Bot API, e.g. Telegram) + +### Send (User → Agent) + +``` +User sends message in Telegram + → grammy long-polling receives Update + → plugin.gateway.start() callback: onMessage(channelMessage) + → ChannelManager.routeIncoming() + → Set lastRoute = { plugin, deliveryCtx } // reply goes back to Telegram + → agent.write(text) // same as desktop/web +``` + +**File**: `src/channels/manager.ts` — `routeIncoming()` (line ~233) + +### Receive (Agent → User) + +``` +Agent runs LLM + → pi-agent-core fires AgentEvent + → Agent.subscribeAll() → AsyncAgent channel + subscribers + → agent.subscribe() callback in ChannelManager.subscribeToAgent() + → Check: if (!lastRoute) return // no active channel route, skip + → Filter: only assistant messages + → message_start → createAggregator() // MessageAggregator buffers/chunks text + → message_update → aggregator.handleEvent() + → message_end → aggregator.handleEvent() → null aggregator + → Aggregator emits text blocks + → Block 0: plugin.outbound.replyText(deliveryCtx, text) // Telegram reply + → Block N: plugin.outbound.sendText(deliveryCtx, text) // follow-up messages +``` + +**Files**: +- `src/channels/manager.ts` — `subscribeToAgent()` (line ~151), `createAggregator()` (line ~205) +- `src/hub/message-aggregator.ts` — text chunking/buffering logic + +### Error Handling + +``` +Agent.run() throws / returns error + → AsyncAgent.write() catch block + → agent.emitMulticaEvent({ type: "agent_error", error }) + → subscribe() → ChannelManager subscriber + → if lastRoute exists: + → plugin.outbound.sendText(deliveryCtx, "[Error] ${errorMsg}") +``` + +--- + +## Comparison Table + +| Aspect | Desktop (IPC) | Web (WebSocket) | Channel (Bot API) | +|---------------------|------------------------|---------------------------|--------------------------| +| **Transport** | Electron IPC | Socket.io via Gateway | Bot API (HTTP) | +| **Send entry** | `localChat:send` | `client.send` → Gateway | `routeIncoming` | +| **Receive method** | `agent.subscribe()` | `agent.read()` (iterable) | `agent.subscribe()` | +| **Consumer** | ipc/hub.ts subscriber | hub.ts `consumeAgent()` | manager.ts subscriber | +| **Frontend hook** | `use-local-chat.ts` | `use-gateway-chat.ts` | N/A (Bot API) | +| **State hook** | `use-chat.ts` | `use-chat.ts` | N/A | +| **Reply routing** | Always (IPC channel) | `agentSenders` Map | `lastRoute` pattern | +| **clearLastRoute** | Yes (on send) | Yes (on send) | No (sets lastRoute) | +| **Error display** | `agent_error` → UI | `agent_error` → UI | `agent_error` → Bot text | +| **Tool results** | Rendered in UI | Rendered in UI | Skipped (text only) | +| **Text chunking** | No (full stream) | No (full stream) | Yes (MessageAggregator) | + +--- + +## lastRoute Pattern + +The `lastRoute` tracks which channel last sent a message. When the agent replies: +- If `lastRoute` is set → reply goes to that channel (e.g. Telegram) +- If `lastRoute` is null → reply goes to Desktop/Web only (via their own mechanisms) + +**Clearing**: Desktop and Web both call `channelManager.clearLastRoute()` before `agent.write()`, so channel replies stop when the user switches to desktop/web. + +**Setting**: `routeIncoming()` sets `lastRoute` when a channel message arrives. + +Desktop and Web always receive agent events regardless of `lastRoute` — they use their own independent delivery mechanisms (IPC subscribe / Gateway read). + +--- + +## Event Filtering + +All three paths filter raw agent events. Only these are forwarded to consumers: + +| Event Type | Desktop | Web | Channel | +|-------------------------|---------|-----|---------| +| `message_start` | assistant only | assistant only | assistant only | +| `message_update` | assistant only | assistant only | assistant only | +| `message_end` | assistant only | assistant only | assistant only | +| `tool_execution_start` | Yes | Yes | No | +| `tool_execution_end` | Yes | Yes | No | +| `compaction_start` | Yes (passthrough) | Yes (passthrough) | No | +| `compaction_end` | Yes (passthrough) | Yes (passthrough) | No | +| `agent_error` | Yes (passthrough) | Yes (passthrough) | Yes (→ text) | +| User message events | Filtered out | Filtered out | Filtered out | From 56ebe613dbf009fe688d6af0e3d75a625e5aeabf Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 07:30:35 +0800 Subject: [PATCH 13/33] fix(hooks): handle agent_error events in useGatewayChat Desktop path already forwards agent_error to chat.setError() via use-local-chat.ts, but the Web/Gateway path was missing this handling. Add agent_error interception in the StreamAction branch so Web clients render LLM errors the same way Desktop does. Co-Authored-By: Claude Opus 4.6 --- packages/hooks/src/use-gateway-chat.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/hooks/src/use-gateway-chat.ts b/packages/hooks/src/use-gateway-chat.ts index 4613bbfb..1871de28 100644 --- a/packages/hooks/src/use-gateway-chat.ts +++ b/packages/hooks/src/use-gateway-chat.ts @@ -50,6 +50,12 @@ export function useGatewayChat({ client, hubId, agentId }: UseGatewayChatOptions client.onMessage((msg) => { if (msg.action === StreamAction) { const payload = msg.payload as StreamPayload; + if (payload.event.type === "agent_error") { + const errorMsg = (payload.event as { error?: string }).error ?? "Unknown error"; + chat.setError({ code: "AGENT_ERROR", message: errorMsg }); + setIsLoading(false); + return; + } chat.handleStream(payload); if (payload.event.type === "message_start") setIsLoading(true); if (payload.event.type === "message_end") setIsLoading(false); From 1819f4196d22b04ee7e2c13521cd4611f4fa7ecd Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 08:04:13 +0800 Subject: [PATCH 14/33] fix(sdk): add AgentErrorEvent to StreamPayload type StreamPayload.event was missing the agent_error event type, causing TypeScript errors in useGatewayChat and useLocalChat where the comparison payload.event.type === "agent_error" had no type overlap. Co-Authored-By: Claude Opus 4.6 --- packages/sdk/src/actions/index.ts | 1 + packages/sdk/src/actions/stream.ts | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/sdk/src/actions/index.ts b/packages/sdk/src/actions/index.ts index c378a6d6..b3ef94e3 100644 --- a/packages/sdk/src/actions/index.ts +++ b/packages/sdk/src/actions/index.ts @@ -38,6 +38,7 @@ export { type CompactionEvent, type CompactionStartEvent, type CompactionEndEvent, + type AgentErrorEvent, type ContentBlock, type TextContent, type ThinkingContent, diff --git a/packages/sdk/src/actions/stream.ts b/packages/sdk/src/actions/stream.ts index c0249353..19aa9482 100644 --- a/packages/sdk/src/actions/stream.ts +++ b/packages/sdk/src/actions/stream.ts @@ -45,16 +45,22 @@ export type CompactionEndEvent = { /** Union of all compaction events */ export type CompactionEvent = CompactionStartEvent | CompactionEndEvent; +/** Emitted when the agent encounters an error (LLM failure, quota exceeded, etc.) */ +export type AgentErrorEvent = { + type: "agent_error"; + error: string; +}; + // --- Stream event types --- /** - * Hub forwards AgentEvent from pi-agent-core and CompactionEvent as-is. - * StreamPayload wraps them with routing metadata. + * Hub forwards AgentEvent from pi-agent-core, CompactionEvent, + * and AgentErrorEvent as-is. StreamPayload wraps them with routing metadata. */ export interface StreamPayload { streamId: string; agentId: string; - event: AgentEvent | CompactionEvent; + event: AgentEvent | CompactionEvent | AgentErrorEvent; } /** Extract thinking/reasoning content from an AgentEvent that carries a message */ From 54b3ebe9e951592d43b587e86450371ba2468174 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 08:04:19 +0800 Subject: [PATCH 15/33] feat(ui): render error messages with Markdown Use MemoizedMarkdown for error messages so links are clickable instead of plain text. Co-Authored-By: Claude Opus 4.6 --- packages/ui/src/components/chat-view.tsx | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/ui/src/components/chat-view.tsx b/packages/ui/src/components/chat-view.tsx index 89c6217a..07c5a197 100644 --- a/packages/ui/src/components/chat-view.tsx +++ b/packages/ui/src/components/chat-view.tsx @@ -5,6 +5,7 @@ import { Button } from "@multica/ui/components/ui/button"; import { Skeleton } from "@multica/ui/components/ui/skeleton"; import { ChatInput } from "@multica/ui/components/chat-input"; import { MessageList } from "@multica/ui/components/message-list"; +import { MemoizedMarkdown } from "@multica/ui/components/markdown"; import { MulticaIcon } from "@multica/ui/components/multica-icon"; import { ExecApprovalItem } from "@multica/ui/components/exec-approval-item"; import { useScrollFade } from "@multica/ui/hooks/use-scroll-fade"; @@ -218,7 +219,11 @@ export function ChatView({ {error && (
- {error.message} + + + {error.message} + + {onDisconnect && (
From 0542c82fe60d5a5b5786ef99fc81c77230b2d83e Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 12:50:15 +0800 Subject: [PATCH 31/33] feat(channels): add credential write and per-account lifecycle control Add setChannelAccountConfig/removeChannelAccountConfig to CredentialManager for persisting channel tokens. Make ChannelManager.startAccount public and add stopAccount for individual account lifecycle control via IPC. Co-Authored-By: Claude Opus 4.6 --- src/agent/credentials.ts | 69 ++++++++++++++++++++++++++++++++++++++++ src/channels/manager.ts | 22 +++++++++++-- 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/src/agent/credentials.ts b/src/agent/credentials.ts index 5abeca35..aabf3c6b 100644 --- a/src/agent/credentials.ts +++ b/src/agent/credentials.ts @@ -330,6 +330,75 @@ export class CredentialManager { this.reset(); } + /** + * Set a channel account config and save to credentials.json5. + * Creates the channels section if it doesn't exist. + * Used by the desktop Channels page to persist bot tokens. + */ + setChannelAccountConfig(channelId: string, accountId: string, accountConfig: Record): void { + const path = getCredentialsPath(); + + let config: CredentialsConfig = { version: 1 }; + if (existsSync(path)) { + try { + const raw = readFileSync(path, "utf8"); + config = JSON5.parse(raw) as CredentialsConfig; + } catch { + config = { version: 1 }; + } + } + + // Ensure channels.[channelId] structure exists + if (!config.channels) { + config.channels = {}; + } + if (!config.channels[channelId]) { + config.channels[channelId] = {}; + } + + // Set or update the account config + config.channels[channelId]![accountId] = accountConfig; + + mkdirSync(dirname(path), { recursive: true }); + const content = JSON.stringify(config, null, 2); + writeFileSync(path, content, "utf8"); + + this.reset(); + } + + /** + * Remove a channel account config from credentials.json5. + * Cleans up the parent channel section if no accounts remain. + */ + removeChannelAccountConfig(channelId: string, accountId: string): void { + const path = getCredentialsPath(); + if (!existsSync(path)) return; + + let config: CredentialsConfig; + try { + const raw = readFileSync(path, "utf8"); + config = JSON5.parse(raw) as CredentialsConfig; + } catch { + return; + } + + const channelSection = config.channels?.[channelId]; + if (!channelSection) return; + + delete channelSection[accountId]; + + // Clean up empty channel section + if (Object.keys(channelSection).length === 0) { + delete config.channels![channelId]; + } + + mkdirSync(dirname(path), { recursive: true }); + const content = JSON.stringify(config, null, 2); + writeFileSync(path, content, "utf8"); + + this.reset(); + } + /** * Set the default LLM provider and save to credentials.json5. */ diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 213b6dc8..fb094065 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -98,8 +98,11 @@ export class ChannelManager { this.ensureSubscribed(); } - /** Start a specific channel account */ - private async startAccount( + /** + * Start a specific channel account. + * Public so the desktop IPC layer can call it after saving config. + */ + async startAccount( channelId: string, accountId: string, accountConfig: Record, @@ -433,6 +436,21 @@ export class ChannelManager { } } + /** + * Stop a specific channel account. + * Public so the desktop IPC layer can call it when removing config. + */ + stopAccount(channelId: string, accountId: string): void { + const key = `${channelId}:${accountId}`; + const handle = this.accounts.get(key); + if (!handle) return; + + handle.abortController.abort(); + handle.state = { ...handle.state, status: "stopped" }; + this.accounts.delete(key); + console.log(`[Channels] Stopped ${key}`); + } + /** Stop all running channel accounts */ stopAll(): void { console.log("[Channels] Stopping all channels..."); From c99675b6e449d75530622770b112877c6a21b900 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 12:50:24 +0800 Subject: [PATCH 32/33] feat(desktop): add Channels configuration page with Telegram support Add IPC handlers, preload API, useChannels hook, and Channels page UI. Users can save/remove Telegram bot tokens and start/stop bots directly from the desktop app with immediate effect and persistence across restarts. Co-Authored-By: Claude Opus 4.6 --- apps/desktop/electron/electron-env.d.ts | 15 ++ apps/desktop/electron/ipc/channels.ts | 133 ++++++++++++++++++ apps/desktop/electron/ipc/index.ts | 3 + apps/desktop/electron/preload.ts | 20 +++ apps/desktop/src/App.tsx | 2 + apps/desktop/src/hooks/use-channels.ts | 121 ++++++++++++++++ apps/desktop/src/pages/channels.tsx | 175 ++++++++++++++++++++++++ apps/desktop/src/pages/layout.tsx | 2 + 8 files changed, 471 insertions(+) create mode 100644 apps/desktop/electron/ipc/channels.ts create mode 100644 apps/desktop/src/hooks/use-channels.ts create mode 100644 apps/desktop/src/pages/channels.tsx diff --git a/apps/desktop/electron/electron-env.d.ts b/apps/desktop/electron/electron-env.d.ts index d9a5846e..b53664a4 100644 --- a/apps/desktop/electron/electron-env.d.ts +++ b/apps/desktop/electron/electron-env.d.ts @@ -132,6 +132,13 @@ interface CurrentProviderInfo { available: boolean } +interface ChannelAccountStateInfo { + channelId: string + accountId: string + status: 'stopped' | 'starting' | 'running' | 'error' + error?: string +} + interface ElectronAPI { hub: { init: () => Promise @@ -190,6 +197,14 @@ interface ElectronAPI { saveApiKey: (providerId: string, apiKey: string) => Promise<{ ok: boolean; error?: string }> importOAuth: (providerId: string) => Promise<{ ok: boolean; expiresAt?: number; error?: string }> } + channels: { + listStates: () => Promise + getConfig: () => Promise> | undefined>> + saveToken: (channelId: string, accountId: string, token: string) => Promise<{ ok: boolean; error?: string }> + removeToken: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> + stop: (channelId: string, accountId: string) => Promise<{ ok: boolean }> + start: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> + } localChat: { subscribe: (agentId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean }> unsubscribe: (agentId: string) => Promise<{ ok: boolean }> diff --git a/apps/desktop/electron/ipc/channels.ts b/apps/desktop/electron/ipc/channels.ts new file mode 100644 index 00000000..dd85ef74 --- /dev/null +++ b/apps/desktop/electron/ipc/channels.ts @@ -0,0 +1,133 @@ +/** + * Channel IPC handlers for Electron main process. + * + * Manages channel account configuration, start/stop lifecycle. + * The Channels page in the renderer uses these to configure + * Telegram (and future channels) with immediate effect. + */ +import { ipcMain } from 'electron' +import { getCurrentHub } from './hub.js' +import { credentialManager } from '../../../../src/agent/credentials.js' +import { listChannels } from '../../../../src/channels/registry.js' + +/** + * Register all Channel-related IPC handlers. + */ +export function registerChannelsIpcHandlers(): void { + /** + * List all channel account states (running / stopped / error). + */ + ipcMain.handle('channels:listStates', async () => { + const hub = getCurrentHub() + if (!hub) return [] + return hub.channelManager.listAccountStates() + }) + + /** + * Get the channels config from credentials.json5. + * Returns the raw `channels` section: { telegram: { default: { botToken: "..." } } } + */ + ipcMain.handle('channels:getConfig', async () => { + return credentialManager.getChannelsConfig() + }) + + /** + * Save a channel account token and start the bot immediately. + * Flow: write to credentials.json5 → start the channel account. + */ + ipcMain.handle( + 'channels:saveToken', + async (_event, channelId: string, accountId: string, token: string): Promise<{ ok: boolean; error?: string }> => { + try { + const hub = getCurrentHub() + if (!hub) return { ok: false, error: 'Hub not initialized' } + + // Find the plugin to validate channelId + const plugin = listChannels().find((p) => p.id === channelId) + if (!plugin) return { ok: false, error: `Unknown channel: ${channelId}` } + + // Persist config to credentials.json5 + credentialManager.setChannelAccountConfig(channelId, accountId, { botToken: token }) + console.log(`[IPC] Channel config saved: ${channelId}:${accountId}`) + + // Stop existing account if running (e.g. token update) + hub.channelManager.stopAccount(channelId, accountId) + + // Start the account with the new config + await hub.channelManager.startAccount(channelId, accountId, { botToken: token }) + console.log(`[IPC] Channel started: ${channelId}:${accountId}`) + + return { ok: true } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + console.error(`[IPC] Failed to save channel token: ${message}`) + return { ok: false, error: message } + } + } + ) + + /** + * Remove a channel account token and stop the bot. + */ + ipcMain.handle( + 'channels:removeToken', + async (_event, channelId: string, accountId: string): Promise<{ ok: boolean; error?: string }> => { + try { + const hub = getCurrentHub() + if (!hub) return { ok: false, error: 'Hub not initialized' } + + // Stop the account + hub.channelManager.stopAccount(channelId, accountId) + + // Remove from credentials.json5 + credentialManager.removeChannelAccountConfig(channelId, accountId) + console.log(`[IPC] Channel config removed: ${channelId}:${accountId}`) + + return { ok: true } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + console.error(`[IPC] Failed to remove channel token: ${message}`) + return { ok: false, error: message } + } + } + ) + + /** + * Stop a channel account without removing its config. + */ + ipcMain.handle( + 'channels:stop', + async (_event, channelId: string, accountId: string): Promise<{ ok: boolean }> => { + const hub = getCurrentHub() + if (!hub) return { ok: false } + hub.channelManager.stopAccount(channelId, accountId) + return { ok: true } + } + ) + + /** + * Start a channel account using its saved config. + */ + ipcMain.handle( + 'channels:start', + async (_event, channelId: string, accountId: string): Promise<{ ok: boolean; error?: string }> => { + try { + const hub = getCurrentHub() + if (!hub) return { ok: false, error: 'Hub not initialized' } + + // Read config from credentials + const config = credentialManager.getChannelsConfig() + const accountConfig = config[channelId]?.[accountId] + if (!accountConfig) { + return { ok: false, error: `No config found for ${channelId}:${accountId}` } + } + + await hub.channelManager.startAccount(channelId, accountId, accountConfig) + return { ok: true } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + return { ok: false, error: message } + } + } + ) +} diff --git a/apps/desktop/electron/ipc/index.ts b/apps/desktop/electron/ipc/index.ts index fc11179c..7c7c596e 100644 --- a/apps/desktop/electron/ipc/index.ts +++ b/apps/desktop/electron/ipc/index.ts @@ -6,12 +6,14 @@ export { registerSkillsIpcHandlers } from './skills.js' export { registerHubIpcHandlers, cleanupHub, initializeHub, setupDeviceConfirmation } from './hub.js' export { registerProfileIpcHandlers } from './profile.js' export { registerProviderIpcHandlers } from './provider.js' +export { registerChannelsIpcHandlers } from './channels.js' import { registerAgentIpcHandlers, cleanupAgent } from './agent.js' import { registerSkillsIpcHandlers } from './skills.js' import { registerHubIpcHandlers, cleanupHub, initializeHub } from './hub.js' import { registerProfileIpcHandlers } from './profile.js' import { registerProviderIpcHandlers } from './provider.js' +import { registerChannelsIpcHandlers } from './channels.js' /** * Register all IPC handlers. @@ -23,6 +25,7 @@ export function registerAllIpcHandlers(): void { registerSkillsIpcHandlers() registerProfileIpcHandlers() registerProviderIpcHandlers() + registerChannelsIpcHandlers() } /** diff --git a/apps/desktop/electron/preload.ts b/apps/desktop/electron/preload.ts index a0d3d00e..6c963a94 100644 --- a/apps/desktop/electron/preload.ts +++ b/apps/desktop/electron/preload.ts @@ -201,6 +201,26 @@ const electronAPI = { ipcRenderer.invoke('provider:importOAuth', providerId), }, + // Channel management (Telegram, Discord, etc.) + channels: { + /** List all channel account states */ + listStates: () => ipcRenderer.invoke('channels:listStates'), + /** Get channels config from credentials.json5 */ + getConfig: () => ipcRenderer.invoke('channels:getConfig'), + /** Save a channel token and start the bot immediately */ + saveToken: (channelId: string, accountId: string, token: string) => + ipcRenderer.invoke('channels:saveToken', channelId, accountId, token), + /** Remove a channel token and stop the bot */ + removeToken: (channelId: string, accountId: string) => + ipcRenderer.invoke('channels:removeToken', channelId, accountId), + /** Stop a channel account */ + stop: (channelId: string, accountId: string) => + ipcRenderer.invoke('channels:stop', channelId, accountId), + /** Start a channel account from saved config */ + start: (channelId: string, accountId: string) => + ipcRenderer.invoke('channels:start', channelId, accountId), + }, + // Local chat (direct IPC, no Gateway required) localChat: { /** Subscribe to agent events for local direct chat */ diff --git a/apps/desktop/src/App.tsx b/apps/desktop/src/App.tsx index df6ae8b1..ac89454e 100644 --- a/apps/desktop/src/App.tsx +++ b/apps/desktop/src/App.tsx @@ -4,6 +4,7 @@ import HomePage from './pages/home' import ChatPage from './pages/chat' import ToolsPage from './pages/tools' import SkillsPage from './pages/skills' +import ChannelsPage from './pages/channels' const router = createHashRouter([ { @@ -14,6 +15,7 @@ const router = createHashRouter([ { path: 'chat', element: }, { path: 'tools', element: }, { path: 'skills', element: }, + { path: 'channels', element: }, ], }, ]) diff --git a/apps/desktop/src/hooks/use-channels.ts b/apps/desktop/src/hooks/use-channels.ts new file mode 100644 index 00000000..29f52100 --- /dev/null +++ b/apps/desktop/src/hooks/use-channels.ts @@ -0,0 +1,121 @@ +/** + * Hook for managing channel accounts (Telegram, Discord, etc.) in the Desktop App. + * + * Provides state and actions for the Channels settings page: + * - List channel account states (running / stopped / error) + * - Read channel config (tokens) + * - Save / remove tokens with immediate start/stop + */ +import { useState, useEffect, useCallback } from 'react' + +interface UseChannelsReturn { + /** Runtime states of all channel accounts */ + states: ChannelAccountStateInfo[] + /** Raw channel config from credentials.json5 */ + config: Record> | undefined> + /** Loading state */ + loading: boolean + /** Error message if any */ + error: string | null + /** Refresh states and config */ + refresh: () => Promise + /** Save a bot token — persists to file and starts the bot immediately */ + saveToken: (channelId: string, accountId: string, token: string) => Promise<{ ok: boolean; error?: string }> + /** Remove a bot token — stops the bot and removes from file */ + removeToken: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> + /** Stop a channel account without removing config */ + stopChannel: (channelId: string, accountId: string) => Promise + /** Start a channel account from saved config */ + startChannel: (channelId: string, accountId: string) => Promise +} + +export function useChannels(): UseChannelsReturn { + const [states, setStates] = useState([]) + const [config, setConfig] = useState> | undefined>>({}) + const [loading, setLoading] = useState(true) + const [error, setError] = useState(null) + + const refresh = useCallback(async () => { + setLoading(true) + setError(null) + + try { + const [stateList, channelConfig] = await Promise.all([ + window.electronAPI.channels.listStates(), + window.electronAPI.channels.getConfig(), + ]) + + setStates(stateList) + setConfig(channelConfig) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + setError(message) + console.error('[useChannels] Failed to load:', message) + } finally { + setLoading(false) + } + }, []) + + useEffect(() => { + refresh() + }, [refresh]) + + const saveToken = useCallback(async (channelId: string, accountId: string, token: string) => { + setError(null) + try { + const result = await window.electronAPI.channels.saveToken(channelId, accountId, token) + if (!result.ok) { + setError(result.error ?? 'Failed to save token') + } + // Refresh to pick up new state + await refresh() + return result + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + setError(message) + return { ok: false, error: message } + } + }, [refresh]) + + const removeToken = useCallback(async (channelId: string, accountId: string) => { + setError(null) + try { + const result = await window.electronAPI.channels.removeToken(channelId, accountId) + if (!result.ok) { + setError(result.error ?? 'Failed to remove token') + } + await refresh() + return result + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + setError(message) + return { ok: false, error: message } + } + }, [refresh]) + + const stopChannel = useCallback(async (channelId: string, accountId: string) => { + await window.electronAPI.channels.stop(channelId, accountId) + await refresh() + }, [refresh]) + + const startChannel = useCallback(async (channelId: string, accountId: string) => { + setError(null) + const result = await window.electronAPI.channels.start(channelId, accountId) + if (!result.ok) { + setError(result.error ?? 'Failed to start channel') + } + await refresh() + }, [refresh]) + + return { + states, + config, + loading, + error, + refresh, + saveToken, + removeToken, + stopChannel, + startChannel, + } +} diff --git a/apps/desktop/src/pages/channels.tsx b/apps/desktop/src/pages/channels.tsx new file mode 100644 index 00000000..1f7b38cd --- /dev/null +++ b/apps/desktop/src/pages/channels.tsx @@ -0,0 +1,175 @@ +import { useState } from 'react' +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from '@multica/ui/components/ui/card' +import { Button } from '@multica/ui/components/ui/button' +import { Input } from '@multica/ui/components/ui/input' +import { Badge } from '@multica/ui/components/ui/badge' +import { useChannels } from '../hooks/use-channels' + +/** Status badge color mapping */ +function statusVariant(status: string): 'default' | 'secondary' | 'destructive' | 'outline' { + switch (status) { + case 'running': return 'default' + case 'starting': return 'secondary' + case 'error': return 'destructive' + default: return 'outline' + } +} + +function TelegramCard() { + const { states, config, saveToken, removeToken, startChannel, stopChannel } = useChannels() + const [token, setToken] = useState('') + const [saving, setSaving] = useState(false) + const [localError, setLocalError] = useState(null) + + // Current state and config for telegram:default + const state = states.find((s) => s.channelId === 'telegram' && s.accountId === 'default') + const savedConfig = config['telegram']?.['default'] as { botToken?: string } | undefined + const hasToken = Boolean(savedConfig?.botToken) + const isRunning = state?.status === 'running' + const isStarting = state?.status === 'starting' + + const handleSave = async () => { + if (!token.trim()) return + setSaving(true) + setLocalError(null) + const result = await saveToken('telegram', 'default', token.trim()) + if (!result.ok) { + setLocalError(result.error ?? 'Failed to save') + } else { + setToken('') // Clear input on success + } + setSaving(false) + } + + const handleRemove = async () => { + setSaving(true) + setLocalError(null) + const result = await removeToken('telegram', 'default') + if (!result.ok) { + setLocalError(result.error ?? 'Failed to remove') + } + setSaving(false) + } + + const handleToggle = async () => { + setSaving(true) + setLocalError(null) + if (isRunning || isStarting) { + await stopChannel('telegram', 'default') + } else { + await startChannel('telegram', 'default') + } + setSaving(false) + } + + // Mask the token for display: show first 5 and last 5 chars + const maskedToken = savedConfig?.botToken + ? `${savedConfig.botToken.slice(0, 5)}${'*'.repeat(10)}${savedConfig.botToken.slice(-5)}` + : null + + return ( + + +
+
+ Telegram + + Connect a Telegram bot via Bot API long polling. + +
+ {state && ( + + {state.status} + + )} +
+
+ + {hasToken ? ( + // Token is configured — show masked token and actions +
+
+ + {maskedToken} + +
+ + {state?.error && ( +

{state.error}

+ )} + +
+ + +
+
+ ) : ( + // No token — show input form +
+ setToken(e.target.value)} + onKeyDown={(e) => e.key === 'Enter' && handleSave()} + /> + +
+ )} + + {localError && ( +

{localError}

+ )} +
+
+ ) +} + +export default function ChannelsPage() { + const { loading, error } = useChannels() + + return ( +
+
+

Channels

+

+ Connect messaging platforms to your Agent. +

+
+ + {loading ? ( +

Loading...

+ ) : error ? ( +

{error}

+ ) : ( + + )} +
+ ) +} diff --git a/apps/desktop/src/pages/layout.tsx b/apps/desktop/src/pages/layout.tsx index 85e97e15..47e5af0d 100644 --- a/apps/desktop/src/pages/layout.tsx +++ b/apps/desktop/src/pages/layout.tsx @@ -8,6 +8,7 @@ import { CodeIcon, PlugIcon, Comment01Icon, + Share08Icon, } from '@hugeicons/core-free-icons' import { cn } from '@multica/ui/lib/utils' import { DeviceConfirmDialog } from '../components/device-confirm-dialog' @@ -18,6 +19,7 @@ const tabs = [ { path: '/chat', label: 'Chat', icon: Comment01Icon }, { path: '/tools', label: 'Tools', icon: CodeIcon }, { path: '/skills', label: 'Skills', icon: PlugIcon }, + { path: '/channels', label: 'Channels', icon: Share08Icon }, ] export default function Layout() { From 43d11a6e5d388afbacbd4da1508cca648992b5c2 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 13:05:34 +0800 Subject: [PATCH 33/33] fix(channels): address code review issues - Fix double useChannels() instantiation: call once in ChannelsPage, pass as props to TelegramCard - Mask bot tokens in channels:getConfig before sending to renderer - Add input validation (isValidId, token length) on all IPC handlers - Fix stopAccount() to clean up typingTimer, lastRoute, aggregator, and debouncer when stopping the account they belong to - Add try/catch to stopChannel/startChannel in useChannels hook - Consistent return type { ok, error? } on channels:stop handler - Add tooltip hint on disabled Remove button Co-Authored-By: Claude Opus 4.6 --- apps/desktop/electron/electron-env.d.ts | 2 +- apps/desktop/electron/ipc/channels.ts | 55 ++++++++++++++++++++++--- apps/desktop/src/hooks/use-channels.ts | 28 +++++++++---- apps/desktop/src/pages/channels.tsx | 12 +++--- src/channels/manager.ts | 16 +++++++ 5 files changed, 95 insertions(+), 18 deletions(-) diff --git a/apps/desktop/electron/electron-env.d.ts b/apps/desktop/electron/electron-env.d.ts index b53664a4..98b087ac 100644 --- a/apps/desktop/electron/electron-env.d.ts +++ b/apps/desktop/electron/electron-env.d.ts @@ -202,7 +202,7 @@ interface ElectronAPI { getConfig: () => Promise> | undefined>> saveToken: (channelId: string, accountId: string, token: string) => Promise<{ ok: boolean; error?: string }> removeToken: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> - stop: (channelId: string, accountId: string) => Promise<{ ok: boolean }> + stop: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> start: (channelId: string, accountId: string) => Promise<{ ok: boolean; error?: string }> } localChat: { diff --git a/apps/desktop/electron/ipc/channels.ts b/apps/desktop/electron/ipc/channels.ts index dd85ef74..49662e94 100644 --- a/apps/desktop/electron/ipc/channels.ts +++ b/apps/desktop/electron/ipc/channels.ts @@ -10,6 +10,21 @@ import { getCurrentHub } from './hub.js' import { credentialManager } from '../../../../src/agent/credentials.js' import { listChannels } from '../../../../src/channels/registry.js' +/** Validate that a string is a safe identifier (alphanumeric, dashes, underscores) */ +function isValidId(value: unknown): value is string { + return typeof value === 'string' && /^[a-zA-Z0-9_-]+$/.test(value) && value.length <= 64 +} + +/** + * Mask a token string for safe display: show first 5 and last 5 chars. + * Returns undefined if the input is not a string. + */ +function maskToken(token: unknown): string | undefined { + if (typeof token !== 'string' || token.length === 0) return undefined + if (token.length <= 12) return '*'.repeat(token.length) + return `${token.slice(0, 5)}${'*'.repeat(10)}${token.slice(-5)}` +} + /** * Register all Channel-related IPC handlers. */ @@ -25,20 +40,42 @@ export function registerChannelsIpcHandlers(): void { /** * Get the channels config from credentials.json5. - * Returns the raw `channels` section: { telegram: { default: { botToken: "..." } } } + * Returns a sanitized version with tokens masked (not the raw secret values). */ ipcMain.handle('channels:getConfig', async () => { - return credentialManager.getChannelsConfig() + const raw = credentialManager.getChannelsConfig() + // Mask secret values before sending to renderer + const masked: Record> | undefined> = {} + for (const [channelId, accounts] of Object.entries(raw)) { + if (!accounts) continue + const maskedAccounts: Record> = {} + for (const [accountId, accountConfig] of Object.entries(accounts)) { + const maskedConfig = { ...accountConfig } + if ('botToken' in maskedConfig) { + maskedConfig.botToken = maskToken(maskedConfig.botToken) + } + maskedAccounts[accountId] = maskedConfig + } + masked[channelId] = maskedAccounts + } + return masked }) /** * Save a channel account token and start the bot immediately. - * Flow: write to credentials.json5 → start the channel account. + * Flow: validate → write to credentials.json5 → start the channel account. */ ipcMain.handle( 'channels:saveToken', async (_event, channelId: string, accountId: string, token: string): Promise<{ ok: boolean; error?: string }> => { try { + // Validate inputs + if (!isValidId(channelId)) return { ok: false, error: 'Invalid channel ID' } + if (!isValidId(accountId)) return { ok: false, error: 'Invalid account ID' } + if (typeof token !== 'string' || token.trim().length === 0 || token.length > 256) { + return { ok: false, error: 'Invalid token' } + } + const hub = getCurrentHub() if (!hub) return { ok: false, error: 'Hub not initialized' } @@ -73,6 +110,9 @@ export function registerChannelsIpcHandlers(): void { 'channels:removeToken', async (_event, channelId: string, accountId: string): Promise<{ ok: boolean; error?: string }> => { try { + if (!isValidId(channelId)) return { ok: false, error: 'Invalid channel ID' } + if (!isValidId(accountId)) return { ok: false, error: 'Invalid account ID' } + const hub = getCurrentHub() if (!hub) return { ok: false, error: 'Hub not initialized' } @@ -97,9 +137,11 @@ export function registerChannelsIpcHandlers(): void { */ ipcMain.handle( 'channels:stop', - async (_event, channelId: string, accountId: string): Promise<{ ok: boolean }> => { + async (_event, channelId: string, accountId: string): Promise<{ ok: boolean; error?: string }> => { + if (!isValidId(channelId)) return { ok: false, error: 'Invalid channel ID' } + if (!isValidId(accountId)) return { ok: false, error: 'Invalid account ID' } const hub = getCurrentHub() - if (!hub) return { ok: false } + if (!hub) return { ok: false, error: 'Hub not initialized' } hub.channelManager.stopAccount(channelId, accountId) return { ok: true } } @@ -112,6 +154,9 @@ export function registerChannelsIpcHandlers(): void { 'channels:start', async (_event, channelId: string, accountId: string): Promise<{ ok: boolean; error?: string }> => { try { + if (!isValidId(channelId)) return { ok: false, error: 'Invalid channel ID' } + if (!isValidId(accountId)) return { ok: false, error: 'Invalid account ID' } + const hub = getCurrentHub() if (!hub) return { ok: false, error: 'Hub not initialized' } diff --git a/apps/desktop/src/hooks/use-channels.ts b/apps/desktop/src/hooks/use-channels.ts index 29f52100..ba8ae386 100644 --- a/apps/desktop/src/hooks/use-channels.ts +++ b/apps/desktop/src/hooks/use-channels.ts @@ -8,7 +8,7 @@ */ import { useState, useEffect, useCallback } from 'react' -interface UseChannelsReturn { +export interface UseChannelsReturn { /** Runtime states of all channel accounts */ states: ChannelAccountStateInfo[] /** Raw channel config from credentials.json5 */ @@ -94,17 +94,31 @@ export function useChannels(): UseChannelsReturn { }, [refresh]) const stopChannel = useCallback(async (channelId: string, accountId: string) => { - await window.electronAPI.channels.stop(channelId, accountId) - await refresh() + setError(null) + try { + const result = await window.electronAPI.channels.stop(channelId, accountId) + if (!result.ok) { + setError(result.error ?? 'Failed to stop channel') + } + await refresh() + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + setError(message) + } }, [refresh]) const startChannel = useCallback(async (channelId: string, accountId: string) => { setError(null) - const result = await window.electronAPI.channels.start(channelId, accountId) - if (!result.ok) { - setError(result.error ?? 'Failed to start channel') + try { + const result = await window.electronAPI.channels.start(channelId, accountId) + if (!result.ok) { + setError(result.error ?? 'Failed to start channel') + } + await refresh() + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + setError(message) } - await refresh() }, [refresh]) return { diff --git a/apps/desktop/src/pages/channels.tsx b/apps/desktop/src/pages/channels.tsx index 1f7b38cd..81bc8926 100644 --- a/apps/desktop/src/pages/channels.tsx +++ b/apps/desktop/src/pages/channels.tsx @@ -9,7 +9,7 @@ import { import { Button } from '@multica/ui/components/ui/button' import { Input } from '@multica/ui/components/ui/input' import { Badge } from '@multica/ui/components/ui/badge' -import { useChannels } from '../hooks/use-channels' +import { useChannels, type UseChannelsReturn } from '../hooks/use-channels' /** Status badge color mapping */ function statusVariant(status: string): 'default' | 'secondary' | 'destructive' | 'outline' { @@ -21,8 +21,8 @@ function statusVariant(status: string): 'default' | 'secondary' | 'destructive' } } -function TelegramCard() { - const { states, config, saveToken, removeToken, startChannel, stopChannel } = useChannels() +function TelegramCard({ channels }: { channels: UseChannelsReturn }) { + const { states, config, saveToken, removeToken, startChannel, stopChannel } = channels const [token, setToken] = useState('') const [saving, setSaving] = useState(false) const [localError, setLocalError] = useState(null) @@ -118,6 +118,7 @@ function TelegramCard() { size="sm" onClick={handleRemove} disabled={saving || isRunning} + title={isRunning ? 'Stop the bot before removing' : undefined} > Remove @@ -152,7 +153,8 @@ function TelegramCard() { } export default function ChannelsPage() { - const { loading, error } = useChannels() + const channels = useChannels() + const { loading, error } = channels return (
@@ -168,7 +170,7 @@ export default function ChannelsPage() { ) : error ? (

{error}

) : ( - + )}
) diff --git a/src/channels/manager.ts b/src/channels/manager.ts index fb094065..8762d674 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -439,15 +439,31 @@ export class ChannelManager { /** * Stop a specific channel account. * Public so the desktop IPC layer can call it when removing config. + * Cleans up typing timer, debouncer, aggregator, and lastRoute if they + * belong to this account. */ stopAccount(channelId: string, accountId: string): void { const key = `${channelId}:${accountId}`; const handle = this.accounts.get(key); if (!handle) return; + // Clean up shared resources if they target this account + if (this.lastRoute && this.lastRoute.plugin.id === channelId && this.lastRoute.deliveryCtx.accountId === accountId) { + this.stopTyping(); + this.lastRoute = null; + this.aggregator = null; + } + handle.abortController.abort(); handle.state = { ...handle.state, status: "stopped" }; this.accounts.delete(key); + + // Dispose debouncer if no accounts remain + if (this.accounts.size === 0 && this.debouncer) { + this.debouncer.dispose(); + this.debouncer = null; + } + console.log(`[Channels] Stopped ${key}`); }