From 9646b08306fa929081820ef4dbcf59b35ef811d3 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Fri, 13 Feb 2026 22:06:43 +0800 Subject: [PATCH] feat(gateway): port Desktop Telegram channel features to Gateway webhook bot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port all 7 features from the Desktop long-polling Telegram channel plugin to the Gateway webhook-based Telegram bot: - Markdown → Telegram HTML formatting with parse-error fallback - Text chunking for messages >4096 chars (paragraph-boundary split) - Reply-to original message + 👀 ack reaction lifecycle - Group chat support (mention/reply filtering, @mention stripping) - Per-chat message serialization (prevents race conditions) - Inbound media handling (voice/audio/photo/video/document) with transcription (Whisper) and description (Vision API) - Outbound file captions with HTML formatting Co-Authored-By: Claude Opus 4.6 --- apps/gateway/telegram/telegram-format.ts | 81 +++ apps/gateway/telegram/telegram.service.ts | 714 ++++++++++++++++++---- 2 files changed, 682 insertions(+), 113 deletions(-) create mode 100644 apps/gateway/telegram/telegram-format.ts diff --git a/apps/gateway/telegram/telegram-format.ts b/apps/gateway/telegram/telegram-format.ts new file mode 100644 index 00000000..13711921 --- /dev/null +++ b/apps/gateway/telegram/telegram-format.ts @@ -0,0 +1,81 @@ +/** + * Markdown → Telegram HTML converter. + * + * Telegram supports a subset of HTML: + * , , , , ,
, , 
+ * + * Strategy: + * 1. Extract code blocks and inline code (protect from further processing) + * 2. Escape HTML entities in remaining text + * 3. Convert Markdown syntax to HTML tags + * 4. Restore code blocks + */ + +/** Escape HTML special characters */ +function escapeHtml(text: string): string { + return text + .replace(/&/g, "&") + .replace(//g, ">"); +} + +/** + * Convert Markdown text to Telegram-compatible HTML. + * Handles: bold, italic, strikethrough, inline code, code blocks, links, blockquotes. + */ +export function markdownToTelegramHtml(markdown: string): string { + // Placeholder system: replace code blocks/inline code with placeholders, + // process markdown on the rest, then restore. + const placeholders: string[] = []; + const placeholder = (content: string): string => { + const idx = placeholders.length; + placeholders.push(content); + return `\x00PH${idx}\x00`; + }; + + let text = markdown; + + // 1. Fenced code blocks: ```lang\n...\n``` + text = text.replace(/```(\w*)\n([\s\S]*?)```/g, (_match, lang: string, code: string) => { + const escaped = escapeHtml(code.replace(/\n$/, "")); + const langAttr = lang ? ` class="language-${escapeHtml(lang)}"` : ""; + return placeholder(`
${escaped}
`); + }); + + // 2. Inline code: `...` + text = text.replace(/`([^`\n]+)`/g, (_match, code: string) => { + return placeholder(`${escapeHtml(code)}`); + }); + + // 3. Escape HTML in remaining text + text = escapeHtml(text); + + // 4. Links: [text](url) — escape quotes in URL to prevent attribute breakout + text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, (_m, label: string, url: string) => + `
${label}`, + ); + + // 5. Bold: **text** or __text__ + text = text.replace(/\*\*(.+?)\*\*/g, "$1"); + text = text.replace(/__(.+?)__/g, "$1"); + + // 6. Italic: *text* or _text_ (but not inside words with underscores) + text = text.replace(/(?$1
"); + text = text.replace(/(?$1"); + + // 7. Strikethrough: ~~text~~ + text = text.replace(/~~(.+?)~~/g, "$1"); + + // 8. Blockquotes: > text (at line start) + text = text.replace(/^> (.+)$/gm, "
$1
"); + // Merge adjacent blockquotes + text = text.replace(/<\/blockquote>\n
/g, "\n"); + + // 9. Headings: strip # markers, make bold + text = text.replace(/^#{1,6}\s+(.+)$/gm, "$1"); + + // Restore placeholders + text = text.replace(/\x00PH(\d+)\x00/g, (_match, idx: string) => placeholders[Number(idx)]!); + + return text; +} diff --git a/apps/gateway/telegram/telegram.service.ts b/apps/gateway/telegram/telegram.service.ts index 987da5f8..2fbb220c 100644 --- a/apps/gateway/telegram/telegram.service.ts +++ b/apps/gateway/telegram/telegram.service.ts @@ -5,16 +5,27 @@ * - New users: prompts to paste a multica://connect link * - Connection link: verifies with Hub via RPC, persists to DB * - Bound users: routes messages to their Hub agent + * + * Features (ported from Desktop channel plugin): + * - Markdown → Telegram HTML formatting with parse-error fallback + * - Text chunking for messages >4096 chars (paragraph-boundary split) + * - Reply-to original message + 👀 ack reaction + * - Group chat support (mention/reply filtering) + * - Per-chat message serialization (prevents race conditions) + * - Inbound media handling (voice, photo, video, document) */ import { Inject, Injectable, Logger } from "@nestjs/common"; import type { OnModuleInit } from "@nestjs/common"; -import { Bot, InputFile, webhookCallback } from "grammy"; +import { Bot, GrammyError, InputFile, webhookCallback } from "grammy"; import type { Context } from "grammy"; import { v7 as uuidv7 } from "uuid"; -import { generateEncryptedId } from "@multica/utils"; +import { writeFile, mkdir } from "node:fs/promises"; +import { join, extname } from "node:path"; +import { generateEncryptedId, MEDIA_CACHE_DIR } from "@multica/utils"; import { parseConnectionCode } from "@multica/store/connection"; import type { ConnectionInfo } from "@multica/store/connection"; +import { transcribeAudio, describeImage, describeVideo } from "@multica/core/media"; import { GatewayEvents, RequestAction, @@ -31,8 +42,11 @@ import type { StreamPayload } from "@multica/sdk"; import { EventsGateway } from "../events.gateway.js"; import { TelegramUserStore } from "./telegram-user.store.js"; import type { TelegramUser } from "./types.js"; +import { markdownToTelegramHtml } from "./telegram-format.js"; -// Minimal Express types for webhook handling +// ── Types ── + +/** Minimal Express types for webhook handling */ interface ExpressRequest { body: unknown; header: (name: string) => string | undefined; @@ -50,14 +64,82 @@ interface PendingRequest { timer: ReturnType; } +/** Tracks the originating Telegram message for reply_to and reaction cleanup */ +interface MessageContext { + telegramChatId: number; + telegramMessageId: number; +} + +/** Media attachment extracted from a Telegram message */ +interface MediaAttachment { + type: "audio" | "image" | "video" | "document"; + fileId: string; + mimeType?: string; + duration?: number; + caption?: string; +} + +// ── Constants ── + const VERIFY_TIMEOUT_MS = 30_000; +const TYPING_TIMEOUT_MS = 60_000; +const MAX_CHARS_PER_MESSAGE = 4000; // Telegram limit is 4096; leave room for HTML overhead + +// ── Helpers ── + +/** Check if a GrammyError is an HTML parse failure */ +function isParseError(err: unknown): boolean { + return err instanceof GrammyError && err.description.includes("can't parse entities"); +} + +/** + * Split text at natural boundaries so each chunk stays within Telegram's message limit. + * Prefers paragraph breaks > line breaks > spaces > hard cut. + */ +function chunkText(text: string, maxChars = MAX_CHARS_PER_MESSAGE): string[] { + if (text.length <= maxChars) return [text]; + + const chunks: string[] = []; + let remaining = text; + + while (remaining.length > 0) { + if (remaining.length <= maxChars) { + chunks.push(remaining); + break; + } + + // Find the best break point within the limit + let breakPoint = remaining.lastIndexOf("\n\n", maxChars); + if (breakPoint <= 0 || breakPoint < maxChars * 0.5) { + breakPoint = remaining.lastIndexOf("\n", maxChars); + } + if (breakPoint <= 0 || breakPoint < maxChars * 0.5) { + breakPoint = remaining.lastIndexOf(" ", maxChars); + } + if (breakPoint <= 0) { + breakPoint = maxChars; + } + + chunks.push(remaining.slice(0, breakPoint)); + remaining = remaining.slice(breakPoint).trimStart(); + } + + return chunks; +} + +// ── Service ── @Injectable() export class TelegramService implements OnModuleInit { - private static readonly TYPING_TIMEOUT_MS = 60_000; // 1 minute safety cap private bot: Bot | null = null; + private botId: number | null = null; + private botUsername: string | null = null; + private pendingRequests = new Map(); + /** Typing indicator timers, keyed by deviceId */ private typingTimers = new Map>(); + /** Tracks the originating message for reply_to & reaction cleanup, keyed by deviceId */ + private messageContexts = new Map(); private readonly logger = new Logger(TelegramService.name); @@ -66,19 +148,29 @@ export class TelegramService implements OnModuleInit { @Inject(TelegramUserStore) private readonly userStore: TelegramUserStore, ) {} + // ── Lifecycle ── + async onModuleInit(): Promise { - console.log("[TelegramService] onModuleInit starting..."); const token = process.env["TELEGRAM_BOT_TOKEN"]; if (!token) { - console.log("[TelegramService] No bot token"); this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled"); return; } - console.log("[TelegramService] Creating bot..."); this.bot = new Bot(token); + + // Fetch bot identity for group-chat mention detection + try { + const botInfo = await this.bot.api.getMe(); + this.botId = botInfo.id; + this.botUsername = botInfo.username ?? null; + this.logger.log(`Telegram bot initialized: @${this.botUsername} (id=${this.botId})`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + this.logger.error(`Failed to get bot info: ${msg}`); + } + this.setupHandlers(); - this.logger.log("Telegram bot initialized"); } /** Get grammY webhook callback for Express/NestJS */ @@ -89,12 +181,12 @@ export class TelegramService implements OnModuleInit { if (secretToken) { return webhookCallback(this.bot, "express", { secretToken }) as unknown as ( req: ExpressRequest, - res: ExpressResponse + res: ExpressResponse, ) => Promise; } return webhookCallback(this.bot, "express") as unknown as ( req: ExpressRequest, - res: ExpressResponse + res: ExpressResponse, ) => Promise; } @@ -103,7 +195,311 @@ export class TelegramService implements OnModuleInit { return this.bot !== null; } - /** Send message to a Telegram user by device ID */ + // ── Handler setup ── + + private setupHandlers(): void { + if (!this.bot) return; + + // Per-chat serialization middleware — ensures messages from the same chat + // are processed one at a time, preventing race conditions. + const chatQueues = new Map>(); + this.bot.use(async (ctx, next) => { + const chatId = ctx.chat?.id; + if (!chatId) return next(); + + const key = String(chatId); + const prev = chatQueues.get(key) ?? Promise.resolve(); + + const current = prev.then(() => next()).catch(() => {}); + chatQueues.set(key, current); + await current; + + // Clean up resolved entries to prevent memory leak + if (chatQueues.get(key) === current) { + chatQueues.delete(key); + } + }); + + // Text messages + this.bot.on("message:text", async (ctx) => { + if (!this.shouldProcessMessage(ctx)) return; + await this.handleTextMessage(ctx); + }); + + // Media messages + const mediaTypes = [ + { + filter: "message:voice" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "audio" as const, + fileId: msg.voice.file_id as string, + mimeType: msg.voice.mime_type as string | undefined, + duration: msg.voice.duration as number | undefined, + }), + }, + { + filter: "message:audio" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "audio" as const, + fileId: msg.audio.file_id as string, + mimeType: msg.audio.mime_type as string | undefined, + duration: msg.audio.duration as number | undefined, + }), + }, + { + filter: "message:photo" as const, + getMedia: (msg: any): MediaAttachment => { + // Pick the largest photo size (last in array) + const photos = msg.photo as Array<{ file_id: string }>; + const largest = photos[photos.length - 1]!; + return { + type: "image" as const, + fileId: largest.file_id, + mimeType: "image/jpeg", + }; + }, + }, + { + filter: "message:video" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "video" as const, + fileId: msg.video.file_id as string, + mimeType: msg.video.mime_type as string | undefined, + duration: msg.video.duration as number | undefined, + }), + }, + { + filter: "message:document" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "document" as const, + fileId: msg.document.file_id as string, + mimeType: msg.document.mime_type as string | undefined, + }), + }, + ] as const; + + for (const { filter, getMedia } of mediaTypes) { + this.bot.on(filter, async (ctx) => { + if (!this.shouldProcessMessage(ctx)) return; + await this.handleMediaMessage(ctx, getMedia(ctx.message)); + }); + } + } + + // ── Group chat filtering ── + + /** + * Decide whether this message should be processed. + * Private chats: always. Groups: only @mentions or replies to bot. + */ + private shouldProcessMessage(ctx: Context): boolean { + const msg = ctx.message; + if (!msg) return false; + + const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; + if (!isGroup) return true; + + // Check text entities for @mention + if (this.botUsername) { + const text = (msg as any).text || ""; + const isMentionedInText = msg.entities?.some( + (e) => + e.type === "mention" && + text.substring(e.offset, e.offset + e.length).toLowerCase() === `@${this.botUsername!.toLowerCase()}`, + ); + if (isMentionedInText) return true; + + // Check caption entities for @mention (media messages) + const caption = (msg as any).caption || ""; + const captionEntities = (msg as any).caption_entities as typeof msg.entities | undefined; + const isMentionedInCaption = captionEntities?.some( + (e: any) => + e.type === "mention" && + caption.substring(e.offset, e.offset + e.length).toLowerCase() === `@${this.botUsername!.toLowerCase()}`, + ); + if (isMentionedInCaption) return true; + } + + // Check if reply to bot + const isReplyToBot = msg.reply_to_message?.from?.id === this.botId; + return isReplyToBot; + } + + // ── Inbound: text messages ── + + private async handleTextMessage(ctx: Context): Promise { + const msg = ctx.message; + if (!msg || !msg.text) return; + + const telegramUserId = String(msg.from?.id); + let text = msg.text.trim(); + + this.logger.debug(`Received message: chatId=${msg.chat.id} from=${telegramUserId} text="${text.slice(0, 50)}"`); + + // Connection link — always handle, even for already-bound users (re-binding) + if (text.startsWith("multica://connect?")) { + await this.handleConnectionLink(ctx, telegramUserId, text); + return; + } + + // Strip @mention from text for cleaner agent input + if (this.botUsername) { + text = text.replace(new RegExp(`@${this.botUsername}\\s*`, "gi"), "").trim(); + } + if (!text) return; + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + + if (user) { + // ACK: 👀 reaction on the original message + await this.addReaction(msg.chat.id, msg.message_id, "👀"); + this.storeMessageContext(user.deviceId, msg.chat.id, msg.message_id); + await this.routeToHub(user, text, ctx); + return; + } + + // New user without connection link + await ctx.reply( + "Welcome to Multica!\n\n" + + "To get started, open the Multica Desktop app, generate a Connection Link, " + + "and paste it here.\n\n" + + "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=...", + ); + } + + // ── Inbound: media messages ── + + private async handleMediaMessage(ctx: Context, media: MediaAttachment): Promise { + const msg = ctx.message; + if (!msg) return; + + const telegramUserId = String(msg.from?.id); + const caption = (msg as any).caption as string | undefined; + + this.logger.debug(`Received ${media.type}: chatId=${msg.chat.id} from=${telegramUserId} fileId=${media.fileId}`); + + // Connection link in caption + if (caption?.startsWith("multica://connect?")) { + await this.handleConnectionLink(ctx, telegramUserId, caption); + return; + } + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + if (!user) { + await ctx.reply( + "Welcome to Multica!\n\n" + + "To get started, open the Multica Desktop app, generate a Connection Link, " + + "and paste it here.", + ); + return; + } + + // ACK: 👀 reaction + await this.addReaction(msg.chat.id, msg.message_id, "👀"); + this.storeMessageContext(user.deviceId, msg.chat.id, msg.message_id); + + // Process media → text description (async, may take a few seconds) + const processedText = await this.processMedia({ ...media, caption: caption ?? undefined }); + + await this.routeToHub(user, processedText, ctx); + } + + // ── Media processing ── + + /** + * Download a file from the Telegram Bot API and save it locally. + */ + private async downloadMedia(fileId: string): Promise { + if (!this.bot) throw new Error("Bot not initialized"); + + const file = await this.bot.api.getFile(fileId); + const filePath = file.file_path; + if (!filePath) throw new Error(`Telegram returned no file_path for fileId=${fileId}`); + + const url = `https://api.telegram.org/file/bot${this.bot.token}/${filePath}`; + const ext = extname(filePath) || ".bin"; + const localPath = join(MEDIA_CACHE_DIR, `${uuidv7()}${ext}`); + + await mkdir(MEDIA_CACHE_DIR, { recursive: true }); + + const res = await fetch(url); + if (!res.ok) throw new Error(`Failed to download file: HTTP ${res.status}`); + const buffer = Buffer.from(await res.arrayBuffer()); + await writeFile(localPath, buffer); + + this.logger.debug(`Downloaded media: ${filePath} → ${localPath}`); + return localPath; + } + + /** + * Process a media attachment into a text description for the agent. + * Uses local whisper / OpenAI Vision / ffmpeg when available; graceful fallback otherwise. + */ + private async processMedia(media: MediaAttachment): Promise { + try { + const filePath = await this.downloadMedia(media.fileId); + + if (media.type === "image") { + const description = await describeImage(filePath); + if (description) { + const parts = ["[Image]", `Description: ${description}`]; + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + const parts = ["[image message received]", `File: ${filePath}`]; + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + + if (media.type === "audio") { + const transcript = await transcribeAudio(filePath); + if (transcript) { + const parts = ["[Voice Message]", `Transcript: ${transcript}`]; + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + const parts = ["[audio message received]", `File: ${filePath}`]; + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + + if (media.type === "video") { + const description = await describeVideo(filePath); + if (description) { + const parts = ["[Video]", `Description: ${description}`]; + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + const parts = ["[video message received]", `File: ${filePath}`]; + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + + // Document — no processing, just metadata + const parts = ["[document message received]", `File: ${filePath}`]; + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + this.logger.error(`Failed to process ${media.type}: ${msg}`); + return media.caption || `[${media.type} message received — processing failed]`; + } + } + + // ── Outbound: send to Telegram ── + + /** + * Send text to a Telegram user/group by deviceId. + * Applies Markdown → HTML formatting, text chunking, and reply-to. + */ async sendToTelegram(deviceId: string, text: string): Promise { if (!this.bot) return; @@ -113,18 +509,56 @@ export class TelegramService implements OnModuleInit { return; } + // Use chatId from message context (supports groups); fall back to user ID (private chat) + const context = this.messageContexts.get(deviceId); + const chatId = context?.telegramChatId ?? Number(user.telegramUserId); + const chunks = chunkText(text); + try { - await this.bot.api.sendMessage(Number(user.telegramUserId), text); - this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`); + for (let i = 0; i < chunks.length; i++) { + // Only reply_to on the first chunk + const replyTo = i === 0 && context ? context.telegramMessageId : undefined; + await this.sendFormatted(chatId, chunks[i]!, replyTo); + } + this.logger.debug(`Sent ${chunks.length} chunk(s) to Telegram: telegramUserId=${user.telegramUserId}`); } catch (error) { const message = error instanceof Error ? error.message : String(error); this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`); } } + /** + * Send a single message with HTML formatting and optional reply-to. + * Falls back to plain text if HTML parsing fails. + */ + private async sendFormatted( + chatId: number, + text: string, + replyToMessageId?: number, + ): Promise { + if (!this.bot) return; + + const html = markdownToTelegramHtml(text); + const extra: Record = { parse_mode: "HTML" }; + if (replyToMessageId) extra["reply_to_message_id"] = replyToMessageId; + + try { + await this.bot.api.sendMessage(chatId, html, extra); + } catch (err) { + if (isParseError(err)) { + this.logger.warn("HTML parse failed, retrying as plain text"); + const plainExtra: Record = {}; + if (replyToMessageId) plainExtra["reply_to_message_id"] = replyToMessageId; + await this.bot.api.sendMessage(chatId, text, plainExtra); + } else { + throw err; + } + } + } + /** Send a file (photo/document/video/audio) to a Telegram user */ private async sendFileToTelegram( - telegramUserId: string, + deviceId: string, data: Buffer, type: string, caption?: string, @@ -132,9 +566,17 @@ export class TelegramService implements OnModuleInit { ): Promise { if (!this.bot) return; - const chatId = Number(telegramUserId); + const user = await this.userStore.findByDeviceId(deviceId); + if (!user) return; + + const context = this.messageContexts.get(deviceId); + const chatId = context?.telegramChatId ?? Number(user.telegramUserId); const inputFile = new InputFile(data, filename); - const extra = caption ? { caption: caption.slice(0, 1024) } : {}; + + // Format caption as HTML with fallback + const rawCaption = caption?.slice(0, 1024); + const captionHtml = rawCaption ? markdownToTelegramHtml(rawCaption) : undefined; + const extra = captionHtml ? { caption: captionHtml, parse_mode: "HTML" as const } : {}; try { switch (type) { @@ -155,55 +597,114 @@ export class TelegramService implements OnModuleInit { await this.bot.api.sendDocument(chatId, inputFile, extra); break; } - this.logger.debug(`Sent ${type} to Telegram: telegramUserId=${telegramUserId}`); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - this.logger.error(`Failed to send ${type} to Telegram: telegramUserId=${telegramUserId}, error=${message}`); + this.logger.debug(`Sent ${type} to Telegram: deviceId=${deviceId}`); + } catch (err) { + // If HTML caption fails, retry without formatting + if (isParseError(err) && rawCaption) { + this.logger.warn("Media caption HTML parse failed, retrying as plain text"); + const plainExtra = { caption: rawCaption }; + switch (type) { + case "photo": + await this.bot.api.sendPhoto(chatId, inputFile, plainExtra); + break; + case "video": + await this.bot.api.sendVideo(chatId, inputFile, plainExtra); + break; + case "audio": + await this.bot.api.sendAudio(chatId, inputFile, plainExtra); + break; + case "voice": + await this.bot.api.sendVoice(chatId, inputFile, plainExtra); + break; + case "document": + default: + await this.bot.api.sendDocument(chatId, inputFile, plainExtra); + break; + } + } else { + const message = err instanceof Error ? err.message : String(err); + this.logger.error(`Failed to send ${type}: deviceId=${deviceId}, error=${message}`); + } } } - /** Setup bot message handlers */ - private setupHandlers(): void { - if (!this.bot) return; + // ── Reactions ── - this.bot.on("message:text", async (ctx) => { - await this.handleTextMessage(ctx); + private async addReaction(chatId: number, messageId: number, emoji: string): Promise { + if (!this.bot) return; + try { + await this.bot.api.setMessageReaction( + chatId, + messageId, + // Grammy expects a specific emoji union type; cast since our interface accepts any string + [{ type: "emoji", emoji } as unknown as { type: "emoji"; emoji: "👀" }], + ); + } catch { + // Best-effort — reaction failure is not critical + } + } + + private async removeReaction(chatId: number, messageId: number): Promise { + if (!this.bot) return; + try { + await this.bot.api.setMessageReaction(chatId, messageId, []); + } catch { + // Best-effort + } + } + + // ── Typing indicators ── + + private startTyping(deviceId: string): void { + if (this.typingTimers.has(deviceId)) return; + + const context = this.messageContexts.get(deviceId); + if (!context) return; + + const chatId = context.telegramChatId; + const send = () => { + void this.bot?.api.sendChatAction(chatId, "typing").catch(() => {}); + }; + send(); + const interval = setInterval(send, 5000); + this.typingTimers.set(deviceId, interval); + + // Safety timeout: auto-stop if no message_end/agent_error arrives + setTimeout(() => { + if (this.typingTimers.get(deviceId) === interval) { + this.stopTyping(deviceId); + } + }, TYPING_TIMEOUT_MS); + } + + private stopTyping(deviceId: string): void { + const timer = this.typingTimers.get(deviceId); + if (timer) { + clearInterval(timer); + this.typingTimers.delete(deviceId); + } + } + + // ── Message context tracking ── + + private storeMessageContext(deviceId: string, chatId: number, messageId: number): void { + this.messageContexts.set(deviceId, { + telegramChatId: chatId, + telegramMessageId: messageId, }); } - /** Handle incoming text message */ - private async handleTextMessage(ctx: Context): Promise { - const msg = ctx.message; - if (!msg || !msg.text) return; - - const telegramUserId = String(msg.from?.id); - const text = msg.text.trim(); - - this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); - - // Connection link — always handle, even for already-bound users (re-binding) - if (text.startsWith("multica://connect?")) { - await this.handleConnectionLink(ctx, telegramUserId, text); - return; + /** Remove context and 👀 reaction for a device after response is sent */ + private async clearMessageContext(deviceId: string): Promise { + const context = this.messageContexts.get(deviceId); + if (context) { + await this.removeReaction(context.telegramChatId, context.telegramMessageId); + this.messageContexts.delete(deviceId); } - - // Check if user is bound - const user = await this.userStore.findByTelegramUserId(telegramUserId); - - if (user) { - await this.routeToHub(user, text, ctx); - return; - } - - // New user without connection link - await ctx.reply( - "Welcome to Multica!\n\n" + - "To get started, open the Multica Desktop app, generate a Connection Link, " + - "and paste it here.\n\n" + - "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=..." - ); } + // ── Connection & routing ── + /** Handle a multica://connect? connection link */ private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise { const msg = ctx.message; @@ -222,7 +723,7 @@ export class TelegramService implements OnModuleInit { if (!this.eventsGateway.isDeviceRegistered(connectionInfo.hubId)) { await ctx.reply( "Connection failed: Hub is not online.\n\n" + - "Make sure the Multica Desktop app is running and connected to the Gateway, then try again." + "Make sure the Multica Desktop app is running and connected to the Gateway, then try again.", ); return; } @@ -241,17 +742,12 @@ export class TelegramService implements OnModuleInit { try { await ctx.reply("Connecting... Please approve the connection on your Desktop app."); - const result = await this.sendVerifyRpc( - deviceId, - connectionInfo.hubId, - connectionInfo.token, - { - platform: "telegram", - clientName: msg?.from?.username - ? `Telegram @${msg.from.username}` - : `Telegram ${msg?.from?.first_name ?? telegramUserId}`, - } - ); + const result = await this.sendVerifyRpc(deviceId, connectionInfo.hubId, connectionInfo.token, { + platform: "telegram", + clientName: msg?.from?.username + ? `Telegram @${msg.from.username}` + : `Telegram ${msg?.from?.first_name ?? telegramUserId}`, + }); // 6. Save to DB await this.userStore.upsert({ @@ -266,23 +762,26 @@ export class TelegramService implements OnModuleInit { await ctx.reply( "Connected successfully!\n\n" + - `Hub: ${result.hubId}\n` + - `Agent: ${result.agentId}\n\n` + - "You can now send messages to interact with your agent." + `Hub: ${result.hubId}\n` + + `Agent: ${result.agentId}\n\n` + + "You can now send messages to interact with your agent.", ); - this.logger.log(`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`); + this.logger.log( + `Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`, + ); } catch (error) { // Cleanup virtual device on failure this.eventsGateway.unregisterVirtualDevice(deviceId); - // Reject all pending requests for this device this.cleanupPendingRequests(); const message = error instanceof Error ? error.message : String(error); if (message.includes("REJECTED")) { await ctx.reply("Connection rejected.\n\nThe connection was declined on the Desktop app."); } else if (message.includes("timed out")) { - await ctx.reply("Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds."); + await ctx.reply( + "Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds.", + ); } else { await ctx.reply(`Connection failed: ${message}\n\nPlease try again.`); } @@ -342,7 +841,7 @@ export class TelegramService implements OnModuleInit { if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { await ctx.reply( "Your Hub is currently offline.\n\n" + - "Make sure the Multica Desktop app is running and connected to the Gateway." + "Make sure the Multica Desktop app is running and connected to the Gateway.", ); return; } @@ -368,10 +867,21 @@ export class TelegramService implements OnModuleInit { return; } - this.logger.debug(`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`); + this.logger.debug( + `Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`, + ); } - /** Register a virtual device with a sendCallback that handles RPC responses, stream events, and messages */ + // ── Virtual device registration ── + + /** + * Register a virtual device with a sendCallback that handles: + * - RPC responses (verify) + * - Stream events (typing, text delivery with formatting/chunking/reply-to) + * - File delivery + * - Regular messages + * - Errors + */ private registerVirtualDeviceForUser(deviceId: string, telegramUserId: string): void { this.eventsGateway.registerVirtualDevice(deviceId, { sendCallback: (_event: string, data: unknown) => { @@ -394,7 +904,7 @@ export class TelegramService implements OnModuleInit { return; } - // Stream event — typing indicator + extract text content for Telegram + // Stream event — typing indicator + formatted text delivery if (msg.action === StreamAction) { const streamPayload = msg.payload as StreamPayload; const event = streamPayload?.event; @@ -402,13 +912,13 @@ export class TelegramService implements OnModuleInit { // Start typing when LLM begins generating if (event.type === "message_start") { - this.startTyping(telegramUserId); + this.startTyping(deviceId); return; } - // Stop typing + send text on message_end + // Stop typing + send formatted text on message_end if (event.type === "message_end") { - this.stopTyping(telegramUserId); + this.stopTyping(deviceId); const agentMsg = (event as { message?: { content?: Array<{ type: string; text?: string }> } }).message; if (agentMsg?.content) { const textContent = agentMsg.content @@ -416,7 +926,9 @@ export class TelegramService implements OnModuleInit { .map((c) => c.text!) .join(""); if (textContent) { - this.sendToTelegram(deviceId, textContent); + void this.sendToTelegram(deviceId, textContent).then(() => { + void this.clearMessageContext(deviceId); + }); } } return; @@ -424,7 +936,8 @@ export class TelegramService implements OnModuleInit { // Stop typing on error if (event.type === "agent_error") { - this.stopTyping(telegramUserId); + this.stopTyping(deviceId); + void this.clearMessageContext(deviceId); return; } @@ -441,7 +954,7 @@ export class TelegramService implements OnModuleInit { }; if (payload?.data) { void this.sendFileToTelegram( - telegramUserId, + deviceId, Buffer.from(payload.data, "base64"), payload.type ?? "document", payload.caption, @@ -455,50 +968,25 @@ export class TelegramService implements OnModuleInit { if (msg.action === "message") { const payload = msg.payload as { content?: string; agentId?: string }; if (payload?.content) { - this.sendToTelegram(deviceId, payload.content); + void this.sendToTelegram(deviceId, payload.content); } return; } // Error messages if (msg.action === "error") { - this.stopTyping(telegramUserId); + this.stopTyping(deviceId); + void this.clearMessageContext(deviceId); const payload = msg.payload as { message?: string; code?: string }; if (payload?.message) { - this.sendToTelegram(deviceId, `Error: ${payload.message}`); + void this.sendToTelegram(deviceId, `Error: ${payload.message}`); } } }, }); } - /** Start sending "typing" indicator to Telegram at regular intervals */ - private startTyping(telegramUserId: string): void { - if (this.typingTimers.has(telegramUserId)) return; - const chatId = Number(telegramUserId); - const send = () => { - void this.bot?.api.sendChatAction(chatId, "typing").catch(() => {}); - }; - send(); - const interval = setInterval(send, 5000); - this.typingTimers.set(telegramUserId, interval); - - // Safety timeout: auto-stop if no message_end/agent_error arrives - setTimeout(() => { - if (this.typingTimers.get(telegramUserId) === interval) { - this.stopTyping(telegramUserId); - } - }, TelegramService.TYPING_TIMEOUT_MS); - } - - /** Stop the "typing" indicator for a Telegram user */ - private stopTyping(telegramUserId: string): void { - const timer = this.typingTimers.get(telegramUserId); - if (timer) { - clearInterval(timer); - this.typingTimers.delete(telegramUserId); - } - } + // ── Cleanup ── /** Cleanup all pending requests (used on verify failure) */ private cleanupPendingRequests(): void {