feat(gateway): port Desktop Telegram channel features to Gateway webhook bot

Port all 7 features from the Desktop long-polling Telegram channel plugin
to the Gateway webhook-based Telegram bot:

- Markdown → Telegram HTML formatting with parse-error fallback
- Text chunking for messages >4096 chars (paragraph-boundary split)
- Reply-to original message + 👀 ack reaction lifecycle
- Group chat support (mention/reply filtering, @mention stripping)
- Per-chat message serialization (prevents race conditions)
- Inbound media handling (voice/audio/photo/video/document)
  with transcription (Whisper) and description (Vision API)
- Outbound file captions with HTML formatting

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jiayuan Zhang 2026-02-13 22:06:43 +08:00
parent 13e5492993
commit 9646b08306
2 changed files with 682 additions and 113 deletions

View file

@ -0,0 +1,81 @@
/**
* Markdown Telegram HTML converter.
*
* Telegram supports a subset of HTML:
* <b>, <i>, <u>, <s>, <code>, <pre>, <a href="...">, <blockquote>
*
* Strategy:
* 1. Extract code blocks and inline code (protect from further processing)
* 2. Escape HTML entities in remaining text
* 3. Convert Markdown syntax to HTML tags
* 4. Restore code blocks
*/
/** Escape HTML special characters */
function escapeHtml(text: string): string {
return text
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;");
}
/**
* Convert Markdown text to Telegram-compatible HTML.
* Handles: bold, italic, strikethrough, inline code, code blocks, links, blockquotes.
*/
export function markdownToTelegramHtml(markdown: string): string {
// Placeholder system: replace code blocks/inline code with placeholders,
// process markdown on the rest, then restore.
const placeholders: string[] = [];
const placeholder = (content: string): string => {
const idx = placeholders.length;
placeholders.push(content);
return `\x00PH${idx}\x00`;
};
let text = markdown;
// 1. Fenced code blocks: ```lang\n...\n```
text = text.replace(/```(\w*)\n([\s\S]*?)```/g, (_match, lang: string, code: string) => {
const escaped = escapeHtml(code.replace(/\n$/, ""));
const langAttr = lang ? ` class="language-${escapeHtml(lang)}"` : "";
return placeholder(`<pre><code${langAttr}>${escaped}</code></pre>`);
});
// 2. Inline code: `...`
text = text.replace(/`([^`\n]+)`/g, (_match, code: string) => {
return placeholder(`<code>${escapeHtml(code)}</code>`);
});
// 3. Escape HTML in remaining text
text = escapeHtml(text);
// 4. Links: [text](url) — escape quotes in URL to prevent attribute breakout
text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, (_m, label: string, url: string) =>
`<a href="${url.replace(/"/g, "&quot;")}">${label}</a>`,
);
// 5. Bold: **text** or __text__
text = text.replace(/\*\*(.+?)\*\*/g, "<b>$1</b>");
text = text.replace(/__(.+?)__/g, "<b>$1</b>");
// 6. Italic: *text* or _text_ (but not inside words with underscores)
text = text.replace(/(?<!\w)\*(?!\s)(.+?)(?<!\s)\*(?!\w)/g, "<i>$1</i>");
text = text.replace(/(?<!\w)_(?!\s)(.+?)(?<!\s)_(?!\w)/g, "<i>$1</i>");
// 7. Strikethrough: ~~text~~
text = text.replace(/~~(.+?)~~/g, "<s>$1</s>");
// 8. Blockquotes: > text (at line start)
text = text.replace(/^&gt; (.+)$/gm, "<blockquote>$1</blockquote>");
// Merge adjacent blockquotes
text = text.replace(/<\/blockquote>\n<blockquote>/g, "\n");
// 9. Headings: strip # markers, make bold
text = text.replace(/^#{1,6}\s+(.+)$/gm, "<b>$1</b>");
// Restore placeholders
text = text.replace(/\x00PH(\d+)\x00/g, (_match, idx: string) => placeholders[Number(idx)]!);
return text;
}

View file

@ -5,16 +5,27 @@
* - New users: prompts to paste a multica://connect link
* - Connection link: verifies with Hub via RPC, persists to DB
* - Bound users: routes messages to their Hub agent
*
* Features (ported from Desktop channel plugin):
* - Markdown Telegram HTML formatting with parse-error fallback
* - Text chunking for messages >4096 chars (paragraph-boundary split)
* - Reply-to original message + 👀 ack reaction
* - Group chat support (mention/reply filtering)
* - Per-chat message serialization (prevents race conditions)
* - Inbound media handling (voice, photo, video, document)
*/
import { Inject, Injectable, Logger } from "@nestjs/common";
import type { OnModuleInit } from "@nestjs/common";
import { Bot, InputFile, webhookCallback } from "grammy";
import { Bot, GrammyError, InputFile, webhookCallback } from "grammy";
import type { Context } from "grammy";
import { v7 as uuidv7 } from "uuid";
import { generateEncryptedId } from "@multica/utils";
import { writeFile, mkdir } from "node:fs/promises";
import { join, extname } from "node:path";
import { generateEncryptedId, MEDIA_CACHE_DIR } from "@multica/utils";
import { parseConnectionCode } from "@multica/store/connection";
import type { ConnectionInfo } from "@multica/store/connection";
import { transcribeAudio, describeImage, describeVideo } from "@multica/core/media";
import {
GatewayEvents,
RequestAction,
@ -31,8 +42,11 @@ import type { StreamPayload } from "@multica/sdk";
import { EventsGateway } from "../events.gateway.js";
import { TelegramUserStore } from "./telegram-user.store.js";
import type { TelegramUser } from "./types.js";
import { markdownToTelegramHtml } from "./telegram-format.js";
// Minimal Express types for webhook handling
// ── Types ──
/** Minimal Express types for webhook handling */
interface ExpressRequest {
body: unknown;
header: (name: string) => string | undefined;
@ -50,14 +64,82 @@ interface PendingRequest<T = unknown> {
timer: ReturnType<typeof setTimeout>;
}
/** Tracks the originating Telegram message for reply_to and reaction cleanup */
interface MessageContext {
telegramChatId: number;
telegramMessageId: number;
}
/** Media attachment extracted from a Telegram message */
interface MediaAttachment {
type: "audio" | "image" | "video" | "document";
fileId: string;
mimeType?: string;
duration?: number;
caption?: string;
}
// ── Constants ──
const VERIFY_TIMEOUT_MS = 30_000;
const TYPING_TIMEOUT_MS = 60_000;
const MAX_CHARS_PER_MESSAGE = 4000; // Telegram limit is 4096; leave room for HTML overhead
// ── Helpers ──
/** Check if a GrammyError is an HTML parse failure */
function isParseError(err: unknown): boolean {
return err instanceof GrammyError && err.description.includes("can't parse entities");
}
/**
* Split text at natural boundaries so each chunk stays within Telegram's message limit.
* Prefers paragraph breaks > line breaks > spaces > hard cut.
*/
function chunkText(text: string, maxChars = MAX_CHARS_PER_MESSAGE): string[] {
if (text.length <= maxChars) return [text];
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
if (remaining.length <= maxChars) {
chunks.push(remaining);
break;
}
// Find the best break point within the limit
let breakPoint = remaining.lastIndexOf("\n\n", maxChars);
if (breakPoint <= 0 || breakPoint < maxChars * 0.5) {
breakPoint = remaining.lastIndexOf("\n", maxChars);
}
if (breakPoint <= 0 || breakPoint < maxChars * 0.5) {
breakPoint = remaining.lastIndexOf(" ", maxChars);
}
if (breakPoint <= 0) {
breakPoint = maxChars;
}
chunks.push(remaining.slice(0, breakPoint));
remaining = remaining.slice(breakPoint).trimStart();
}
return chunks;
}
// ── Service ──
@Injectable()
export class TelegramService implements OnModuleInit {
private static readonly TYPING_TIMEOUT_MS = 60_000; // 1 minute safety cap
private bot: Bot | null = null;
private botId: number | null = null;
private botUsername: string | null = null;
private pendingRequests = new Map<string, PendingRequest>();
/** Typing indicator timers, keyed by deviceId */
private typingTimers = new Map<string, ReturnType<typeof setInterval>>();
/** Tracks the originating message for reply_to & reaction cleanup, keyed by deviceId */
private messageContexts = new Map<string, MessageContext>();
private readonly logger = new Logger(TelegramService.name);
@ -66,19 +148,29 @@ export class TelegramService implements OnModuleInit {
@Inject(TelegramUserStore) private readonly userStore: TelegramUserStore,
) {}
// ── Lifecycle ──
async onModuleInit(): Promise<void> {
console.log("[TelegramService] onModuleInit starting...");
const token = process.env["TELEGRAM_BOT_TOKEN"];
if (!token) {
console.log("[TelegramService] No bot token");
this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled");
return;
}
console.log("[TelegramService] Creating bot...");
this.bot = new Bot(token);
// Fetch bot identity for group-chat mention detection
try {
const botInfo = await this.bot.api.getMe();
this.botId = botInfo.id;
this.botUsername = botInfo.username ?? null;
this.logger.log(`Telegram bot initialized: @${this.botUsername} (id=${this.botId})`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
this.logger.error(`Failed to get bot info: ${msg}`);
}
this.setupHandlers();
this.logger.log("Telegram bot initialized");
}
/** Get grammY webhook callback for Express/NestJS */
@ -89,12 +181,12 @@ export class TelegramService implements OnModuleInit {
if (secretToken) {
return webhookCallback(this.bot, "express", { secretToken }) as unknown as (
req: ExpressRequest,
res: ExpressResponse
res: ExpressResponse,
) => Promise<void>;
}
return webhookCallback(this.bot, "express") as unknown as (
req: ExpressRequest,
res: ExpressResponse
res: ExpressResponse,
) => Promise<void>;
}
@ -103,7 +195,311 @@ export class TelegramService implements OnModuleInit {
return this.bot !== null;
}
/** Send message to a Telegram user by device ID */
// ── Handler setup ──
private setupHandlers(): void {
if (!this.bot) return;
// Per-chat serialization middleware — ensures messages from the same chat
// are processed one at a time, preventing race conditions.
const chatQueues = new Map<string, Promise<void>>();
this.bot.use(async (ctx, next) => {
const chatId = ctx.chat?.id;
if (!chatId) return next();
const key = String(chatId);
const prev = chatQueues.get(key) ?? Promise.resolve();
const current = prev.then(() => next()).catch(() => {});
chatQueues.set(key, current);
await current;
// Clean up resolved entries to prevent memory leak
if (chatQueues.get(key) === current) {
chatQueues.delete(key);
}
});
// Text messages
this.bot.on("message:text", async (ctx) => {
if (!this.shouldProcessMessage(ctx)) return;
await this.handleTextMessage(ctx);
});
// Media messages
const mediaTypes = [
{
filter: "message:voice" as const,
getMedia: (msg: any): MediaAttachment => ({
type: "audio" as const,
fileId: msg.voice.file_id as string,
mimeType: msg.voice.mime_type as string | undefined,
duration: msg.voice.duration as number | undefined,
}),
},
{
filter: "message:audio" as const,
getMedia: (msg: any): MediaAttachment => ({
type: "audio" as const,
fileId: msg.audio.file_id as string,
mimeType: msg.audio.mime_type as string | undefined,
duration: msg.audio.duration as number | undefined,
}),
},
{
filter: "message:photo" as const,
getMedia: (msg: any): MediaAttachment => {
// Pick the largest photo size (last in array)
const photos = msg.photo as Array<{ file_id: string }>;
const largest = photos[photos.length - 1]!;
return {
type: "image" as const,
fileId: largest.file_id,
mimeType: "image/jpeg",
};
},
},
{
filter: "message:video" as const,
getMedia: (msg: any): MediaAttachment => ({
type: "video" as const,
fileId: msg.video.file_id as string,
mimeType: msg.video.mime_type as string | undefined,
duration: msg.video.duration as number | undefined,
}),
},
{
filter: "message:document" as const,
getMedia: (msg: any): MediaAttachment => ({
type: "document" as const,
fileId: msg.document.file_id as string,
mimeType: msg.document.mime_type as string | undefined,
}),
},
] as const;
for (const { filter, getMedia } of mediaTypes) {
this.bot.on(filter, async (ctx) => {
if (!this.shouldProcessMessage(ctx)) return;
await this.handleMediaMessage(ctx, getMedia(ctx.message));
});
}
}
// ── Group chat filtering ──
/**
* Decide whether this message should be processed.
* Private chats: always. Groups: only @mentions or replies to bot.
*/
private shouldProcessMessage(ctx: Context): boolean {
const msg = ctx.message;
if (!msg) return false;
const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup";
if (!isGroup) return true;
// Check text entities for @mention
if (this.botUsername) {
const text = (msg as any).text || "";
const isMentionedInText = msg.entities?.some(
(e) =>
e.type === "mention" &&
text.substring(e.offset, e.offset + e.length).toLowerCase() === `@${this.botUsername!.toLowerCase()}`,
);
if (isMentionedInText) return true;
// Check caption entities for @mention (media messages)
const caption = (msg as any).caption || "";
const captionEntities = (msg as any).caption_entities as typeof msg.entities | undefined;
const isMentionedInCaption = captionEntities?.some(
(e: any) =>
e.type === "mention" &&
caption.substring(e.offset, e.offset + e.length).toLowerCase() === `@${this.botUsername!.toLowerCase()}`,
);
if (isMentionedInCaption) return true;
}
// Check if reply to bot
const isReplyToBot = msg.reply_to_message?.from?.id === this.botId;
return isReplyToBot;
}
// ── Inbound: text messages ──
private async handleTextMessage(ctx: Context): Promise<void> {
const msg = ctx.message;
if (!msg || !msg.text) return;
const telegramUserId = String(msg.from?.id);
let text = msg.text.trim();
this.logger.debug(`Received message: chatId=${msg.chat.id} from=${telegramUserId} text="${text.slice(0, 50)}"`);
// Connection link — always handle, even for already-bound users (re-binding)
if (text.startsWith("multica://connect?")) {
await this.handleConnectionLink(ctx, telegramUserId, text);
return;
}
// Strip @mention from text for cleaner agent input
if (this.botUsername) {
text = text.replace(new RegExp(`@${this.botUsername}\\s*`, "gi"), "").trim();
}
if (!text) return;
// Check if user is bound
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (user) {
// ACK: 👀 reaction on the original message
await this.addReaction(msg.chat.id, msg.message_id, "👀");
this.storeMessageContext(user.deviceId, msg.chat.id, msg.message_id);
await this.routeToHub(user, text, ctx);
return;
}
// New user without connection link
await ctx.reply(
"Welcome to Multica!\n\n" +
"To get started, open the Multica Desktop app, generate a Connection Link, " +
"and paste it here.\n\n" +
"The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=...",
);
}
// ── Inbound: media messages ──
private async handleMediaMessage(ctx: Context, media: MediaAttachment): Promise<void> {
const msg = ctx.message;
if (!msg) return;
const telegramUserId = String(msg.from?.id);
const caption = (msg as any).caption as string | undefined;
this.logger.debug(`Received ${media.type}: chatId=${msg.chat.id} from=${telegramUserId} fileId=${media.fileId}`);
// Connection link in caption
if (caption?.startsWith("multica://connect?")) {
await this.handleConnectionLink(ctx, telegramUserId, caption);
return;
}
// Check if user is bound
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (!user) {
await ctx.reply(
"Welcome to Multica!\n\n" +
"To get started, open the Multica Desktop app, generate a Connection Link, " +
"and paste it here.",
);
return;
}
// ACK: 👀 reaction
await this.addReaction(msg.chat.id, msg.message_id, "👀");
this.storeMessageContext(user.deviceId, msg.chat.id, msg.message_id);
// Process media → text description (async, may take a few seconds)
const processedText = await this.processMedia({ ...media, caption: caption ?? undefined });
await this.routeToHub(user, processedText, ctx);
}
// ── Media processing ──
/**
* Download a file from the Telegram Bot API and save it locally.
*/
private async downloadMedia(fileId: string): Promise<string> {
if (!this.bot) throw new Error("Bot not initialized");
const file = await this.bot.api.getFile(fileId);
const filePath = file.file_path;
if (!filePath) throw new Error(`Telegram returned no file_path for fileId=${fileId}`);
const url = `https://api.telegram.org/file/bot${this.bot.token}/${filePath}`;
const ext = extname(filePath) || ".bin";
const localPath = join(MEDIA_CACHE_DIR, `${uuidv7()}${ext}`);
await mkdir(MEDIA_CACHE_DIR, { recursive: true });
const res = await fetch(url);
if (!res.ok) throw new Error(`Failed to download file: HTTP ${res.status}`);
const buffer = Buffer.from(await res.arrayBuffer());
await writeFile(localPath, buffer);
this.logger.debug(`Downloaded media: ${filePath}${localPath}`);
return localPath;
}
/**
* Process a media attachment into a text description for the agent.
* Uses local whisper / OpenAI Vision / ffmpeg when available; graceful fallback otherwise.
*/
private async processMedia(media: MediaAttachment): Promise<string> {
try {
const filePath = await this.downloadMedia(media.fileId);
if (media.type === "image") {
const description = await describeImage(filePath);
if (description) {
const parts = ["[Image]", `Description: ${description}`];
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
}
const parts = ["[image message received]", `File: ${filePath}`];
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
}
if (media.type === "audio") {
const transcript = await transcribeAudio(filePath);
if (transcript) {
const parts = ["[Voice Message]", `Transcript: ${transcript}`];
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
}
const parts = ["[audio message received]", `File: ${filePath}`];
if (media.mimeType) parts.push(`Type: ${media.mimeType}`);
if (media.duration) parts.push(`Duration: ${media.duration}s`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
}
if (media.type === "video") {
const description = await describeVideo(filePath);
if (description) {
const parts = ["[Video]", `Description: ${description}`];
if (media.duration) parts.push(`Duration: ${media.duration}s`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
}
const parts = ["[video message received]", `File: ${filePath}`];
if (media.mimeType) parts.push(`Type: ${media.mimeType}`);
if (media.duration) parts.push(`Duration: ${media.duration}s`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
}
// Document — no processing, just metadata
const parts = ["[document message received]", `File: ${filePath}`];
if (media.mimeType) parts.push(`Type: ${media.mimeType}`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
return parts.join("\n");
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
this.logger.error(`Failed to process ${media.type}: ${msg}`);
return media.caption || `[${media.type} message received — processing failed]`;
}
}
// ── Outbound: send to Telegram ──
/**
* Send text to a Telegram user/group by deviceId.
* Applies Markdown HTML formatting, text chunking, and reply-to.
*/
async sendToTelegram(deviceId: string, text: string): Promise<void> {
if (!this.bot) return;
@ -113,18 +509,56 @@ export class TelegramService implements OnModuleInit {
return;
}
// Use chatId from message context (supports groups); fall back to user ID (private chat)
const context = this.messageContexts.get(deviceId);
const chatId = context?.telegramChatId ?? Number(user.telegramUserId);
const chunks = chunkText(text);
try {
await this.bot.api.sendMessage(Number(user.telegramUserId), text);
this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`);
for (let i = 0; i < chunks.length; i++) {
// Only reply_to on the first chunk
const replyTo = i === 0 && context ? context.telegramMessageId : undefined;
await this.sendFormatted(chatId, chunks[i]!, replyTo);
}
this.logger.debug(`Sent ${chunks.length} chunk(s) to Telegram: telegramUserId=${user.telegramUserId}`);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`);
}
}
/**
* Send a single message with HTML formatting and optional reply-to.
* Falls back to plain text if HTML parsing fails.
*/
private async sendFormatted(
chatId: number,
text: string,
replyToMessageId?: number,
): Promise<void> {
if (!this.bot) return;
const html = markdownToTelegramHtml(text);
const extra: Record<string, unknown> = { parse_mode: "HTML" };
if (replyToMessageId) extra["reply_to_message_id"] = replyToMessageId;
try {
await this.bot.api.sendMessage(chatId, html, extra);
} catch (err) {
if (isParseError(err)) {
this.logger.warn("HTML parse failed, retrying as plain text");
const plainExtra: Record<string, unknown> = {};
if (replyToMessageId) plainExtra["reply_to_message_id"] = replyToMessageId;
await this.bot.api.sendMessage(chatId, text, plainExtra);
} else {
throw err;
}
}
}
/** Send a file (photo/document/video/audio) to a Telegram user */
private async sendFileToTelegram(
telegramUserId: string,
deviceId: string,
data: Buffer,
type: string,
caption?: string,
@ -132,9 +566,17 @@ export class TelegramService implements OnModuleInit {
): Promise<void> {
if (!this.bot) return;
const chatId = Number(telegramUserId);
const user = await this.userStore.findByDeviceId(deviceId);
if (!user) return;
const context = this.messageContexts.get(deviceId);
const chatId = context?.telegramChatId ?? Number(user.telegramUserId);
const inputFile = new InputFile(data, filename);
const extra = caption ? { caption: caption.slice(0, 1024) } : {};
// Format caption as HTML with fallback
const rawCaption = caption?.slice(0, 1024);
const captionHtml = rawCaption ? markdownToTelegramHtml(rawCaption) : undefined;
const extra = captionHtml ? { caption: captionHtml, parse_mode: "HTML" as const } : {};
try {
switch (type) {
@ -155,55 +597,114 @@ export class TelegramService implements OnModuleInit {
await this.bot.api.sendDocument(chatId, inputFile, extra);
break;
}
this.logger.debug(`Sent ${type} to Telegram: telegramUserId=${telegramUserId}`);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to send ${type} to Telegram: telegramUserId=${telegramUserId}, error=${message}`);
this.logger.debug(`Sent ${type} to Telegram: deviceId=${deviceId}`);
} catch (err) {
// If HTML caption fails, retry without formatting
if (isParseError(err) && rawCaption) {
this.logger.warn("Media caption HTML parse failed, retrying as plain text");
const plainExtra = { caption: rawCaption };
switch (type) {
case "photo":
await this.bot.api.sendPhoto(chatId, inputFile, plainExtra);
break;
case "video":
await this.bot.api.sendVideo(chatId, inputFile, plainExtra);
break;
case "audio":
await this.bot.api.sendAudio(chatId, inputFile, plainExtra);
break;
case "voice":
await this.bot.api.sendVoice(chatId, inputFile, plainExtra);
break;
case "document":
default:
await this.bot.api.sendDocument(chatId, inputFile, plainExtra);
break;
}
} else {
const message = err instanceof Error ? err.message : String(err);
this.logger.error(`Failed to send ${type}: deviceId=${deviceId}, error=${message}`);
}
}
}
/** Setup bot message handlers */
private setupHandlers(): void {
if (!this.bot) return;
// ── Reactions ──
this.bot.on("message:text", async (ctx) => {
await this.handleTextMessage(ctx);
private async addReaction(chatId: number, messageId: number, emoji: string): Promise<void> {
if (!this.bot) return;
try {
await this.bot.api.setMessageReaction(
chatId,
messageId,
// Grammy expects a specific emoji union type; cast since our interface accepts any string
[{ type: "emoji", emoji } as unknown as { type: "emoji"; emoji: "👀" }],
);
} catch {
// Best-effort — reaction failure is not critical
}
}
private async removeReaction(chatId: number, messageId: number): Promise<void> {
if (!this.bot) return;
try {
await this.bot.api.setMessageReaction(chatId, messageId, []);
} catch {
// Best-effort
}
}
// ── Typing indicators ──
private startTyping(deviceId: string): void {
if (this.typingTimers.has(deviceId)) return;
const context = this.messageContexts.get(deviceId);
if (!context) return;
const chatId = context.telegramChatId;
const send = () => {
void this.bot?.api.sendChatAction(chatId, "typing").catch(() => {});
};
send();
const interval = setInterval(send, 5000);
this.typingTimers.set(deviceId, interval);
// Safety timeout: auto-stop if no message_end/agent_error arrives
setTimeout(() => {
if (this.typingTimers.get(deviceId) === interval) {
this.stopTyping(deviceId);
}
}, TYPING_TIMEOUT_MS);
}
private stopTyping(deviceId: string): void {
const timer = this.typingTimers.get(deviceId);
if (timer) {
clearInterval(timer);
this.typingTimers.delete(deviceId);
}
}
// ── Message context tracking ──
private storeMessageContext(deviceId: string, chatId: number, messageId: number): void {
this.messageContexts.set(deviceId, {
telegramChatId: chatId,
telegramMessageId: messageId,
});
}
/** Handle incoming text message */
private async handleTextMessage(ctx: Context): Promise<void> {
const msg = ctx.message;
if (!msg || !msg.text) return;
const telegramUserId = String(msg.from?.id);
const text = msg.text.trim();
this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`);
// Connection link — always handle, even for already-bound users (re-binding)
if (text.startsWith("multica://connect?")) {
await this.handleConnectionLink(ctx, telegramUserId, text);
return;
/** Remove context and 👀 reaction for a device after response is sent */
private async clearMessageContext(deviceId: string): Promise<void> {
const context = this.messageContexts.get(deviceId);
if (context) {
await this.removeReaction(context.telegramChatId, context.telegramMessageId);
this.messageContexts.delete(deviceId);
}
// Check if user is bound
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (user) {
await this.routeToHub(user, text, ctx);
return;
}
// New user without connection link
await ctx.reply(
"Welcome to Multica!\n\n" +
"To get started, open the Multica Desktop app, generate a Connection Link, " +
"and paste it here.\n\n" +
"The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=..."
);
}
// ── Connection & routing ──
/** Handle a multica://connect? connection link */
private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise<void> {
const msg = ctx.message;
@ -222,7 +723,7 @@ export class TelegramService implements OnModuleInit {
if (!this.eventsGateway.isDeviceRegistered(connectionInfo.hubId)) {
await ctx.reply(
"Connection failed: Hub is not online.\n\n" +
"Make sure the Multica Desktop app is running and connected to the Gateway, then try again."
"Make sure the Multica Desktop app is running and connected to the Gateway, then try again.",
);
return;
}
@ -241,17 +742,12 @@ export class TelegramService implements OnModuleInit {
try {
await ctx.reply("Connecting... Please approve the connection on your Desktop app.");
const result = await this.sendVerifyRpc(
deviceId,
connectionInfo.hubId,
connectionInfo.token,
{
platform: "telegram",
clientName: msg?.from?.username
? `Telegram @${msg.from.username}`
: `Telegram ${msg?.from?.first_name ?? telegramUserId}`,
}
);
const result = await this.sendVerifyRpc(deviceId, connectionInfo.hubId, connectionInfo.token, {
platform: "telegram",
clientName: msg?.from?.username
? `Telegram @${msg.from.username}`
: `Telegram ${msg?.from?.first_name ?? telegramUserId}`,
});
// 6. Save to DB
await this.userStore.upsert({
@ -266,23 +762,26 @@ export class TelegramService implements OnModuleInit {
await ctx.reply(
"Connected successfully!\n\n" +
`Hub: ${result.hubId}\n` +
`Agent: ${result.agentId}\n\n` +
"You can now send messages to interact with your agent."
`Hub: ${result.hubId}\n` +
`Agent: ${result.agentId}\n\n` +
"You can now send messages to interact with your agent.",
);
this.logger.log(`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`);
this.logger.log(
`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`,
);
} catch (error) {
// Cleanup virtual device on failure
this.eventsGateway.unregisterVirtualDevice(deviceId);
// Reject all pending requests for this device
this.cleanupPendingRequests();
const message = error instanceof Error ? error.message : String(error);
if (message.includes("REJECTED")) {
await ctx.reply("Connection rejected.\n\nThe connection was declined on the Desktop app.");
} else if (message.includes("timed out")) {
await ctx.reply("Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds.");
await ctx.reply(
"Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds.",
);
} else {
await ctx.reply(`Connection failed: ${message}\n\nPlease try again.`);
}
@ -342,7 +841,7 @@ export class TelegramService implements OnModuleInit {
if (!this.eventsGateway.isDeviceRegistered(user.hubId)) {
await ctx.reply(
"Your Hub is currently offline.\n\n" +
"Make sure the Multica Desktop app is running and connected to the Gateway."
"Make sure the Multica Desktop app is running and connected to the Gateway.",
);
return;
}
@ -368,10 +867,21 @@ export class TelegramService implements OnModuleInit {
return;
}
this.logger.debug(`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`);
this.logger.debug(
`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`,
);
}
/** Register a virtual device with a sendCallback that handles RPC responses, stream events, and messages */
// ── Virtual device registration ──
/**
* Register a virtual device with a sendCallback that handles:
* - RPC responses (verify)
* - Stream events (typing, text delivery with formatting/chunking/reply-to)
* - File delivery
* - Regular messages
* - Errors
*/
private registerVirtualDeviceForUser(deviceId: string, telegramUserId: string): void {
this.eventsGateway.registerVirtualDevice(deviceId, {
sendCallback: (_event: string, data: unknown) => {
@ -394,7 +904,7 @@ export class TelegramService implements OnModuleInit {
return;
}
// Stream event — typing indicator + extract text content for Telegram
// Stream event — typing indicator + formatted text delivery
if (msg.action === StreamAction) {
const streamPayload = msg.payload as StreamPayload;
const event = streamPayload?.event;
@ -402,13 +912,13 @@ export class TelegramService implements OnModuleInit {
// Start typing when LLM begins generating
if (event.type === "message_start") {
this.startTyping(telegramUserId);
this.startTyping(deviceId);
return;
}
// Stop typing + send text on message_end
// Stop typing + send formatted text on message_end
if (event.type === "message_end") {
this.stopTyping(telegramUserId);
this.stopTyping(deviceId);
const agentMsg = (event as { message?: { content?: Array<{ type: string; text?: string }> } }).message;
if (agentMsg?.content) {
const textContent = agentMsg.content
@ -416,7 +926,9 @@ export class TelegramService implements OnModuleInit {
.map((c) => c.text!)
.join("");
if (textContent) {
this.sendToTelegram(deviceId, textContent);
void this.sendToTelegram(deviceId, textContent).then(() => {
void this.clearMessageContext(deviceId);
});
}
}
return;
@ -424,7 +936,8 @@ export class TelegramService implements OnModuleInit {
// Stop typing on error
if (event.type === "agent_error") {
this.stopTyping(telegramUserId);
this.stopTyping(deviceId);
void this.clearMessageContext(deviceId);
return;
}
@ -441,7 +954,7 @@ export class TelegramService implements OnModuleInit {
};
if (payload?.data) {
void this.sendFileToTelegram(
telegramUserId,
deviceId,
Buffer.from(payload.data, "base64"),
payload.type ?? "document",
payload.caption,
@ -455,50 +968,25 @@ export class TelegramService implements OnModuleInit {
if (msg.action === "message") {
const payload = msg.payload as { content?: string; agentId?: string };
if (payload?.content) {
this.sendToTelegram(deviceId, payload.content);
void this.sendToTelegram(deviceId, payload.content);
}
return;
}
// Error messages
if (msg.action === "error") {
this.stopTyping(telegramUserId);
this.stopTyping(deviceId);
void this.clearMessageContext(deviceId);
const payload = msg.payload as { message?: string; code?: string };
if (payload?.message) {
this.sendToTelegram(deviceId, `Error: ${payload.message}`);
void this.sendToTelegram(deviceId, `Error: ${payload.message}`);
}
}
},
});
}
/** Start sending "typing" indicator to Telegram at regular intervals */
private startTyping(telegramUserId: string): void {
if (this.typingTimers.has(telegramUserId)) return;
const chatId = Number(telegramUserId);
const send = () => {
void this.bot?.api.sendChatAction(chatId, "typing").catch(() => {});
};
send();
const interval = setInterval(send, 5000);
this.typingTimers.set(telegramUserId, interval);
// Safety timeout: auto-stop if no message_end/agent_error arrives
setTimeout(() => {
if (this.typingTimers.get(telegramUserId) === interval) {
this.stopTyping(telegramUserId);
}
}, TelegramService.TYPING_TIMEOUT_MS);
}
/** Stop the "typing" indicator for a Telegram user */
private stopTyping(telegramUserId: string): void {
const timer = this.typingTimers.get(telegramUserId);
if (timer) {
clearInterval(timer);
this.typingTimers.delete(telegramUserId);
}
}
// ── Cleanup ──
/** Cleanup all pending requests (used on verify failure) */
private cleanupPendingRequests(): void {