diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..ea1360bb --- /dev/null +++ b/.env.example @@ -0,0 +1,9 @@ +# Telegram Bot +# Get a token from @BotFather on Telegram +TELEGRAM_BOT_TOKEN= + +# Optional: webhook secret token for production +# TELEGRAM_WEBHOOK_SECRET_TOKEN= + +# Optional: webhook URL (if not set, uses long-polling mode for local dev) +# TELEGRAM_WEBHOOK_URL=https://your-domain.ngrok-free.dev/telegram/webhook diff --git a/apps/desktop/src/renderer/src/components/qr-code.tsx b/apps/desktop/src/renderer/src/components/qr-code.tsx index 0dd5eef6..413f37d6 100644 --- a/apps/desktop/src/renderer/src/components/qr-code.tsx +++ b/apps/desktop/src/renderer/src/components/qr-code.tsx @@ -1,7 +1,8 @@ -import { useState, useEffect, useCallback, useMemo, useRef } from 'react' +import { useState, useCallback, useMemo } from 'react' import { QRCodeSVG } from 'qrcode.react' import { Button } from '@multica/ui/components/ui/button' import { Copy, Check } from 'lucide-react' +import { useQRToken, useCountdown } from './qr-hooks' // ============ Types ============ @@ -22,65 +23,7 @@ export interface ConnectionQRCodeProps { size?: number } -// ============ Hooks ============ - -/** Generate a secure random token */ -function generateToken(): string { - return crypto.randomUUID() -} - -/** - * Hook to manage QR token lifecycle - * - Generates token on mount - * - Auto-refreshes when expired - * - Registers token with Hub - */ -function useQRToken(agentId: string, expirySeconds: number) { - const [token, setToken] = useState(generateToken) - const [expiresAt, setExpiresAt] = useState(() => Date.now() + expirySeconds * 1000) - - const refresh = useCallback(() => { - const newToken = generateToken() - const newExpiry = Date.now() + expirySeconds * 1000 - setToken(newToken) - setExpiresAt(newExpiry) - window.electronAPI?.hub.registerToken(newToken, agentId, newExpiry) - }, [agentId, expirySeconds]) - - // Register initial token - useEffect(() => { - window.electronAPI?.hub.registerToken(token, agentId, expiresAt) - }, []) // eslint-disable-line react-hooks/exhaustive-deps - - return { token, expiresAt, refresh } -} - -/** - * Hook for countdown timer - * Returns remaining seconds, auto-updates every second - */ -function useCountdown(expiresAt: number, onExpire: () => void) { - const [remaining, setRemaining] = useState(() => - Math.max(0, Math.ceil((expiresAt - Date.now()) / 1000)) - ) - const onExpireRef = useRef(onExpire) - onExpireRef.current = onExpire - - useEffect(() => { - // Reset when expiresAt changes - setRemaining(Math.max(0, Math.ceil((expiresAt - Date.now()) / 1000))) - - const id = setInterval(() => { - const next = Math.max(0, Math.ceil((expiresAt - Date.now()) / 1000)) - setRemaining(next) - if (next === 0) onExpireRef.current() - }, 1000) - - return () => clearInterval(id) - }, [expiresAt]) - - return remaining -} +// Hooks are in ./qr-hooks.ts (separate file for react-refresh compatibility) /** * Hook for clipboard copy with feedback @@ -121,7 +64,7 @@ function CornerAccent({ position }: { position: 'tl' | 'tr' | 'bl' | 'br' }) { } /** QR code frame with corner accents */ -function QRCodeFrame({ children }: { children: React.ReactNode }) { +export function QRCodeFrame({ children }: { children: React.ReactNode }) { return (
@@ -141,7 +84,7 @@ function formatTime(seconds: number): string { } /** Expiry timer display */ -function ExpiryTimer({ remaining }: { remaining: number }) { +export function ExpiryTimer({ remaining }: { remaining: number }) { // Derive display state from remaining seconds (no extra state needed) const isWarning = remaining > 0 && remaining < 10 diff --git a/apps/desktop/src/renderer/src/components/qr-hooks.ts b/apps/desktop/src/renderer/src/components/qr-hooks.ts new file mode 100644 index 00000000..6e250335 --- /dev/null +++ b/apps/desktop/src/renderer/src/components/qr-hooks.ts @@ -0,0 +1,59 @@ +import { useState, useEffect, useCallback, useRef } from 'react' + +/** Generate a secure random token */ +function generateToken(): string { + return crypto.randomUUID() +} + +/** + * Hook to manage QR token lifecycle + * - Generates token on mount + * - Auto-refreshes when expired + * - Registers token with Hub + */ +export function useQRToken(agentId: string, expirySeconds: number) { + const [token, setToken] = useState(generateToken) + const [expiresAt, setExpiresAt] = useState(() => Date.now() + expirySeconds * 1000) + + const refresh = useCallback(() => { + const newToken = generateToken() + const newExpiry = Date.now() + expirySeconds * 1000 + setToken(newToken) + setExpiresAt(newExpiry) + window.electronAPI?.hub.registerToken(newToken, agentId, newExpiry) + }, [agentId, expirySeconds]) + + // Register initial token + useEffect(() => { + window.electronAPI?.hub.registerToken(token, agentId, expiresAt) + }, []) // eslint-disable-line react-hooks/exhaustive-deps + + return { token, expiresAt, refresh } +} + +/** + * Hook for countdown timer + * Returns remaining seconds, auto-updates every second + */ +export function useCountdown(expiresAt: number, onExpire: () => void) { + const [remaining, setRemaining] = useState(() => + Math.max(0, Math.ceil((expiresAt - Date.now()) / 1000)) + ) + const onExpireRef = useRef(onExpire) + onExpireRef.current = onExpire + + useEffect(() => { + // Reset when expiresAt changes + setRemaining(Math.max(0, Math.ceil((expiresAt - Date.now()) / 1000))) + + const id = setInterval(() => { + const next = Math.max(0, Math.ceil((expiresAt - Date.now()) / 1000)) + setRemaining(next) + if (next === 0) onExpireRef.current() + }, 1000) + + return () => clearInterval(id) + }, [expiresAt]) + + return remaining +} diff --git a/apps/desktop/src/renderer/src/components/telegram-qr.tsx b/apps/desktop/src/renderer/src/components/telegram-qr.tsx new file mode 100644 index 00000000..6157f6cb --- /dev/null +++ b/apps/desktop/src/renderer/src/components/telegram-qr.tsx @@ -0,0 +1,113 @@ +import { useState, useEffect } from 'react' +import { QRCodeSVG } from 'qrcode.react' +import { Loader2 } from 'lucide-react' +import { useQRToken, useCountdown } from './qr-hooks' +import { QRCodeFrame, ExpiryTimer } from './qr-code' + +export interface TelegramConnectQRProps { + gateway: string + hubId: string + agentId: string + expirySeconds?: number + size?: number +} + +/** + * Telegram QR code for deep link connection flow. + * + * Generates a token, sends it to Gateway to create a short code, + * then renders a QR encoding https://t.me/{botUsername}?start={code}. + * Auto-refreshes when the token expires. + */ +export function TelegramConnectQR({ + gateway, + hubId, + agentId, + expirySeconds = 30, + size = 200, +}: TelegramConnectQRProps) { + const { token, expiresAt, refresh } = useQRToken(agentId, expirySeconds) + const remaining = useCountdown(expiresAt, refresh) + + const [deepLink, setDeepLink] = useState(null) + const [error, setError] = useState(null) + const [loading, setLoading] = useState(false) + + useEffect(() => { + let cancelled = false + + async function fetchCode() { + setLoading(true) + setError(null) + + try { + const res = await fetch(`${gateway}/telegram/connect-code`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ gateway, hubId, agentId, token, expires: expiresAt }), + }) + + if (cancelled) return + + if (!res.ok) { + if (res.status === 503) { + setError('Telegram bot not configured on Gateway') + } else { + setError(`Gateway error: ${res.statusText}`) + } + setDeepLink(null) + return + } + + const data = (await res.json()) as { code: string; botUsername: string } + if (cancelled) return + + setDeepLink(`https://t.me/${data.botUsername}?start=${data.code}`) + } catch (err) { + if (cancelled) return + setError(err instanceof Error ? err.message : 'Failed to connect to Gateway') + setDeepLink(null) + } finally { + if (!cancelled) setLoading(false) + } + } + + fetchCode() + return () => { cancelled = true } + }, [token, expiresAt, gateway, hubId, agentId]) + + if (loading) { + return ( +
+ +
+ ) + } + + if (error) { + return ( +
+

{error}

+
+ ) + } + + if (!deepLink) return null + + return ( +
+ + + + + +
+ ) +} diff --git a/apps/desktop/src/renderer/src/pages/clients.tsx b/apps/desktop/src/renderer/src/pages/clients.tsx index 88fe4494..924f6e9b 100644 --- a/apps/desktop/src/renderer/src/pages/clients.tsx +++ b/apps/desktop/src/renderer/src/pages/clients.tsx @@ -7,8 +7,6 @@ import { CardTitle, } from '@multica/ui/components/ui/card' import { Button } from '@multica/ui/components/ui/button' -import { Input } from '@multica/ui/components/ui/input' -import { Badge } from '@multica/ui/components/ui/badge' import { Tabs, TabsContent, @@ -16,169 +14,44 @@ import { TabsTrigger, } from '@multica/ui/components/ui/tabs' import { QrCode, Radio, Smartphone, WifiOff, Loader2 } from 'lucide-react' -import { useChannelsStore } from '../stores/channels' import { useHubStore, selectPrimaryAgent } from '../stores/hub' import { ConnectionQRCode } from '../components/qr-code' +import { TelegramConnectQR } from '../components/telegram-qr' import { DeviceList } from '../components/device-list' -/** Status badge color mapping */ -function statusVariant(status: string): 'default' | 'secondary' | 'destructive' | 'outline' { - switch (status) { - case 'running': return 'default' - case 'starting': return 'secondary' - case 'error': return 'destructive' - default: return 'outline' - } -} - -function TelegramCard() { - const { states, config, saveToken, removeToken, startChannel, stopChannel } = useChannelsStore() - const [token, setToken] = useState('') - const [saving, setSaving] = useState(false) - const [localError, setLocalError] = useState(null) - - // Current state and config for telegram:default - const state = states.find((s) => s.channelId === 'telegram' && s.accountId === 'default') - const savedConfig = config['telegram']?.['default'] as { botToken?: string } | undefined - const hasToken = Boolean(savedConfig?.botToken) - const isRunning = state?.status === 'running' - const isStarting = state?.status === 'starting' - - const handleSave = async () => { - if (!token.trim()) return - setSaving(true) - setLocalError(null) - const result = await saveToken('telegram', 'default', token.trim()) - if (!result.ok) { - setLocalError(result.error ?? 'Failed to save') - } else { - setToken('') // Clear input on success - } - setSaving(false) - } - - const handleRemove = async () => { - setSaving(true) - setLocalError(null) - const result = await removeToken('telegram', 'default') - if (!result.ok) { - setLocalError(result.error ?? 'Failed to remove') - } - setSaving(false) - } - - const handleToggle = async () => { - setSaving(true) - setLocalError(null) - if (isRunning || isStarting) { - await stopChannel('telegram', 'default') - } else { - await startChannel('telegram', 'default') - } - setSaving(false) - } - - // Mask the token for display: show first 5 and last 5 chars - const maskedToken = savedConfig?.botToken - ? `${savedConfig.botToken.slice(0, 5)}${'*'.repeat(10)}${savedConfig.botToken.slice(-5)}` - : null - - return ( - - -
-
- Telegram - - Connect a Telegram bot via Bot API long polling. - -
- {state && ( - - {state.status} - - )} -
-
- - {hasToken ? ( - // Token is configured — show masked token and actions -
-
- - {maskedToken} - -
- - {state?.error && ( -

{state.error}

- )} - -
- - -
-
- ) : ( - // No token — show input form -
- setToken(e.target.value)} - onKeyDown={(e) => e.key === 'Enter' && handleSave()} - /> - -
- )} - - {localError && ( -

{localError}

- )} -
-
- ) -} - function ChannelsTab() { - const { loading, error } = useChannelsStore() - - if (loading) { - return

Loading...

- } - - if (error) { - return

{error}

- } + const { hubInfo, agents } = useHubStore() + const primaryAgent = selectPrimaryAgent(agents) return ( -
+

Connect messaging platforms to chat with your agent.

- + + + +
+
+ Telegram + Scan with your phone camera to connect on Telegram. +
+
+
+ + + +
+ +

+ Discord and Slack coming soon. +

) } diff --git a/apps/desktop/src/renderer/src/pages/onboarding/components/connect-step.tsx b/apps/desktop/src/renderer/src/pages/onboarding/components/connect-step.tsx index 1ab0198a..5ad004ed 100644 --- a/apps/desktop/src/renderer/src/pages/onboarding/components/connect-step.tsx +++ b/apps/desktop/src/renderer/src/pages/onboarding/components/connect-step.tsx @@ -1,37 +1,31 @@ -import { useState } from 'react' +import { useEffect, useState, useCallback } from 'react' import { Button } from '@multica/ui/components/ui/button' -import { Input } from '@multica/ui/components/ui/input' -import { Badge } from '@multica/ui/components/ui/badge' import { Separator } from '@multica/ui/components/ui/separator' +import { ChevronLeft, Info, Check, Smartphone } from 'lucide-react' import { - HoverCard, - HoverCardContent, - HoverCardTrigger, -} from '@multica/ui/components/ui/hover-card' -import { - ChevronLeft, - Loader2, - HelpCircle, - Share2, - Check, - Info, -} from 'lucide-react' -import { useChannelsStore } from '../../../stores/channels' + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from '@multica/ui/components/ui/alert-dialog' +import { useHubStore, selectPrimaryAgent } from '../../../stores/hub' +import { TelegramConnectQR } from '../../../components/telegram-qr' import { StepDots } from './step-dots' -function statusVariant( - status: string -): 'default' | 'secondary' | 'destructive' | 'outline' { - switch (status) { - case 'running': - return 'default' - case 'starting': - return 'secondary' - case 'error': - return 'destructive' - default: - return 'outline' - } +interface DeviceMeta { + userAgent?: string + platform?: string + language?: string + clientName?: string +} + +interface PendingConfirm { + deviceId: string + meta?: DeviceMeta } interface ConnectStepProps { @@ -40,34 +34,37 @@ interface ConnectStepProps { } export default function ConnectStep({ onNext, onBack }: ConnectStepProps) { - const { states, config, saveToken } = useChannelsStore() + const { hubInfo, agents } = useHubStore() + const primaryAgent = selectPrimaryAgent(agents) + const [connected, setConnected] = useState(false) + const [connectedDevice, setConnectedDevice] = useState(null) + const [pending, setPending] = useState(null) - const [token, setToken] = useState('') - const [saving, setSaving] = useState(false) - const [localError, setLocalError] = useState(null) - - const state = states.find( - (s) => s.channelId === 'telegram' && s.accountId === 'default' - ) - const savedConfig = config['telegram']?.['default'] as - | { botToken?: string } - | undefined - const hasToken = Boolean(savedConfig?.botToken) - const isRunning = state?.status === 'running' - const isStarting = state?.status === 'starting' - - const handleConnect = async () => { - if (!token.trim()) return - setSaving(true) - setLocalError(null) - const result = await saveToken('telegram', 'default', token.trim()) - if (!result.ok) { - setLocalError(result.error ?? 'Failed to connect') - } else { - setToken('') + // Listen for device confirm requests during onboarding + useEffect(() => { + window.electronAPI?.hub.onDeviceConfirmRequest((deviceId: string, meta?: DeviceMeta) => { + setPending({ deviceId, meta }) + }) + return () => { + window.electronAPI?.hub.offDeviceConfirmRequest() } - setSaving(false) - } + }, []) + + const handleAllow = useCallback(() => { + if (!pending) return + window.electronAPI?.hub.deviceConfirmResponse(pending.deviceId, true) + setConnectedDevice(pending.meta?.clientName ?? pending.deviceId) + setPending(null) + setConnected(true) + }, [pending]) + + const handleReject = useCallback(() => { + if (!pending) return + window.electronAPI?.hub.deviceConfirmResponse(pending.deviceId, false) + setPending(null) + }, [pending]) + + const deviceLabel = pending?.meta?.clientName ?? pending?.deviceId return (
@@ -84,119 +81,49 @@ export default function ConnectStep({ onNext, onBack }: ConnectStepProps) { {/* Header */}

- Connect a channel + Connect Telegram

- Create bots that talk to your local agent from anywhere. + Scan the QR code with your phone camera to connect on Telegram.

{/* Info box */}

- Your bot connects directly to this machine — - chat from your phone, tablet, or any device. + Chat with your agent from your phone via Telegram. + Your messages are routed through the Gateway to this machine.

- Telegram now. Discord, Slack, Mobile app coming soon. + Discord, Slack, and more coming soon.

- {/* Telegram card */} -
-
-
-
- + {/* QR code or connected state */} +
+ {connected ? ( +
+
+
-
-

Telegram

-

- Bot API long polling -

-
-
- -
- {/* Status badge */} - {state && ( - - {state.status} - +

Telegram connected

+ {connectedDevice && ( +
+ + {connectedDevice} +
)} - - {/* Help hover card */} - - - - - -

- Get a bot token -

-
    -
  1. - 1. - Open @BotFather in Telegram -
  2. -
  3. - 2. - Send /newbot and name your bot -
  4. -
  5. - 3. - Copy the token and paste below -
  6. -
-
-
-
- -
- {hasToken ? ( -
- -

- {isRunning - ? 'Bot is running. Send it a message to test.' - : isStarting - ? 'Starting bot...' - : 'Bot configured.'} -

-
- ) : ( -
- setToken(e.target.value)} - onKeyDown={(e) => e.key === 'Enter' && handleConnect()} - className="flex-1" - /> - -
- )} - - {localError && ( -

{localError}

- )} - {state?.status === 'error' && state.error && ( -

{state.error}

- )} -
+ ) : ( + + )}
@@ -205,15 +132,38 @@ export default function ConnectStep({ onNext, onBack }: ConnectStepProps) {
- - + )} +
+ + {/* Device confirm dialog */} + + + + New Device Connection + + {deviceLabel} wants to connect. + Allow this device? + + + + + Reject + + + Allow + + + +
) } diff --git a/apps/gateway/main.ts b/apps/gateway/main.ts index 02a9a017..ec5f40fd 100644 --- a/apps/gateway/main.ts +++ b/apps/gateway/main.ts @@ -12,6 +12,7 @@ async function bootstrap(): Promise { console.log("[Gateway] NestFactory created"); app.useLogger(app.get(Logger)); + app.enableCors(); const port = process.env["PORT"] ?? 3000; console.log(`[Gateway] Listening on port ${port}...`); diff --git a/apps/gateway/telegram/short-code-store.ts b/apps/gateway/telegram/short-code-store.ts new file mode 100644 index 00000000..85ca37d9 --- /dev/null +++ b/apps/gateway/telegram/short-code-store.ts @@ -0,0 +1,77 @@ +/** + * In-memory short code store for Telegram deep link connection flow. + * + * Maps short alphanumeric codes to full ConnectionInfo objects. + * Codes are one-time use and expire with the underlying connection token. + */ + +import { randomBytes } from "node:crypto"; +import type { ConnectionInfo } from "@multica/store/connection"; + +const CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; +const CODE_LENGTH = 12; +const CLEANUP_INTERVAL_MS = 10_000; + +interface CodeEntry { + connectionInfo: ConnectionInfo; +} + +export class ShortCodeStore { + private codes = new Map(); + private cleanupTimer: ReturnType | null = null; + + constructor() { + this.cleanupTimer = setInterval(() => this.cleanup(), CLEANUP_INTERVAL_MS); + } + + /** Store connection info and return a short code. */ + store(connectionInfo: ConnectionInfo): string { + const code = this.generateCode(); + this.codes.set(code, { connectionInfo }); + return code; + } + + /** Retrieve and delete a code (one-time use). Returns null if expired or not found. */ + consume(code: string): ConnectionInfo | null { + const entry = this.codes.get(code); + if (!entry) return null; + + this.codes.delete(code); + + // Check expiry + if (Date.now() > entry.connectionInfo.expires) { + return null; + } + + return entry.connectionInfo; + } + + /** Stop cleanup interval and clear all codes. */ + destroy(): void { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + this.cleanupTimer = null; + } + this.codes.clear(); + } + + private generateCode(): string { + const bytes = randomBytes(CODE_LENGTH); + let code = ""; + for (let i = 0; i < CODE_LENGTH; i++) { + code += CHARS[bytes[i]! % CHARS.length]; + } + // Ensure uniqueness (extremely unlikely collision, but safe) + if (this.codes.has(code)) return this.generateCode(); + return code; + } + + private cleanup(): void { + const now = Date.now(); + for (const [code, entry] of this.codes) { + if (now > entry.connectionInfo.expires) { + this.codes.delete(code); + } + } + } +} diff --git a/apps/gateway/telegram/telegram-format.ts b/apps/gateway/telegram/telegram-format.ts new file mode 100644 index 00000000..13711921 --- /dev/null +++ b/apps/gateway/telegram/telegram-format.ts @@ -0,0 +1,81 @@ +/** + * Markdown → Telegram HTML converter. + * + * Telegram supports a subset of HTML: + * , , , , ,
, , 
+ * + * Strategy: + * 1. Extract code blocks and inline code (protect from further processing) + * 2. Escape HTML entities in remaining text + * 3. Convert Markdown syntax to HTML tags + * 4. Restore code blocks + */ + +/** Escape HTML special characters */ +function escapeHtml(text: string): string { + return text + .replace(/&/g, "&") + .replace(//g, ">"); +} + +/** + * Convert Markdown text to Telegram-compatible HTML. + * Handles: bold, italic, strikethrough, inline code, code blocks, links, blockquotes. + */ +export function markdownToTelegramHtml(markdown: string): string { + // Placeholder system: replace code blocks/inline code with placeholders, + // process markdown on the rest, then restore. + const placeholders: string[] = []; + const placeholder = (content: string): string => { + const idx = placeholders.length; + placeholders.push(content); + return `\x00PH${idx}\x00`; + }; + + let text = markdown; + + // 1. Fenced code blocks: ```lang\n...\n``` + text = text.replace(/```(\w*)\n([\s\S]*?)```/g, (_match, lang: string, code: string) => { + const escaped = escapeHtml(code.replace(/\n$/, "")); + const langAttr = lang ? ` class="language-${escapeHtml(lang)}"` : ""; + return placeholder(`
${escaped}
`); + }); + + // 2. Inline code: `...` + text = text.replace(/`([^`\n]+)`/g, (_match, code: string) => { + return placeholder(`${escapeHtml(code)}`); + }); + + // 3. Escape HTML in remaining text + text = escapeHtml(text); + + // 4. Links: [text](url) — escape quotes in URL to prevent attribute breakout + text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, (_m, label: string, url: string) => + `
${label}`, + ); + + // 5. Bold: **text** or __text__ + text = text.replace(/\*\*(.+?)\*\*/g, "$1"); + text = text.replace(/__(.+?)__/g, "$1"); + + // 6. Italic: *text* or _text_ (but not inside words with underscores) + text = text.replace(/(?$1
"); + text = text.replace(/(?$1"); + + // 7. Strikethrough: ~~text~~ + text = text.replace(/~~(.+?)~~/g, "$1"); + + // 8. Blockquotes: > text (at line start) + text = text.replace(/^> (.+)$/gm, "
$1
"); + // Merge adjacent blockquotes + text = text.replace(/<\/blockquote>\n
/g, "\n"); + + // 9. Headings: strip # markers, make bold + text = text.replace(/^#{1,6}\s+(.+)$/gm, "$1"); + + // Restore placeholders + text = text.replace(/\x00PH(\d+)\x00/g, (_match, idx: string) => placeholders[Number(idx)]!); + + return text; +} diff --git a/apps/gateway/telegram/telegram-user.store.ts b/apps/gateway/telegram/telegram-user.store.ts index c65afb83..3d0a1f13 100644 --- a/apps/gateway/telegram/telegram-user.store.ts +++ b/apps/gateway/telegram/telegram-user.store.ts @@ -1,9 +1,15 @@ /** - * Telegram user store - MySQL persistence layer. + * Telegram user store. + * + * Uses MySQL when MYSQL_DSN is set (production). + * Falls back to JSON file persistence when database is unavailable (local development). + * File stored at ~/.super-multica/gateway/telegram-users.json. */ import { Inject, Injectable, Logger } from "@nestjs/common"; -import { generateEncryptedId } from "@multica/utils"; +import { generateEncryptedId, DATA_DIR } from "@multica/utils"; +import { readFile, writeFile, mkdir } from "node:fs/promises"; +import { join } from "node:path"; import type { RowDataPacket } from "mysql2/promise"; import { DatabaseService } from "../database/database.service.js"; import type { TelegramUser, TelegramUserCreate } from "./types.js"; @@ -20,15 +26,24 @@ interface TelegramUserRow extends RowDataPacket { telegram_last_name: string | null; } +const LOCAL_STORE_DIR = join(DATA_DIR, "gateway"); +const LOCAL_STORE_PATH = join(LOCAL_STORE_DIR, "telegram-users.json"); + @Injectable() export class TelegramUserStore { private readonly logger = new Logger(TelegramUserStore.name); + /** Local file-backed store, keyed by telegramUserId */ + private localStore = new Map(); + private localStoreLoaded = false; constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {} /** Find user by Telegram user ID */ async findByTelegramUserId(telegramUserId: string): Promise { - if (!this.db.isAvailable()) return null; + if (!this.db.isAvailable()) { + await this.ensureLocalStoreLoaded(); + return this.localStore.get(telegramUserId) ?? null; + } const rows = await this.db.query( "SELECT * FROM telegram_users WHERE telegram_user_id = ?", @@ -41,7 +56,13 @@ export class TelegramUserStore { /** Find user by device ID */ async findByDeviceId(deviceId: string): Promise { - if (!this.db.isAvailable()) return null; + if (!this.db.isAvailable()) { + await this.ensureLocalStoreLoaded(); + for (const user of this.localStore.values()) { + if (user.deviceId === deviceId) return user; + } + return null; + } const rows = await this.db.query( "SELECT * FROM telegram_users WHERE device_id = ?", @@ -55,7 +76,7 @@ export class TelegramUserStore { /** Create or update a Telegram user */ async upsert(data: TelegramUserCreate): Promise { if (!this.db.isAvailable()) { - throw new Error("Database not available"); + return this.upsertLocal(data); } // Check if user exists @@ -110,6 +131,67 @@ export class TelegramUserStore { return created!; } + // ── Local file-backed store (local development only) ── + // + // When MYSQL_DSN is not set the methods below provide a simple JSON-file + // persistence layer so Telegram user bindings survive gateway restarts. + // This is NOT intended for production use — production always uses MySQL. + + /** Load store from JSON file on first access */ + private async ensureLocalStoreLoaded(): Promise { + if (this.localStoreLoaded) return; + this.localStoreLoaded = true; + + try { + const data = await readFile(LOCAL_STORE_PATH, "utf-8"); + const records = JSON.parse(data) as Record; + for (const [key, user] of Object.entries(records)) { + // Restore Date objects from JSON strings + user.createdAt = new Date(user.createdAt); + user.updatedAt = new Date(user.updatedAt); + this.localStore.set(key, user); + } + this.logger.log(`Loaded ${this.localStore.size} Telegram user(s) from ${LOCAL_STORE_PATH}`); + } catch { + // File doesn't exist or is invalid — start fresh + } + } + + /** Persist store to JSON file */ + private async saveLocalStore(): Promise { + const obj: Record = {}; + for (const [key, user] of this.localStore) { + obj[key] = user; + } + await mkdir(LOCAL_STORE_DIR, { recursive: true }); + await writeFile(LOCAL_STORE_PATH, JSON.stringify(obj, null, 2), "utf-8"); + } + + /** Upsert to local file store */ + private async upsertLocal(data: TelegramUserCreate): Promise { + await this.ensureLocalStoreLoaded(); + + const existing = this.localStore.get(data.telegramUserId); + const now = new Date(); + + const user: TelegramUser = { + telegramUserId: data.telegramUserId, + hubId: data.hubId, + agentId: data.agentId, + deviceId: data.deviceId ?? existing?.deviceId ?? `tg-${generateEncryptedId()}`, + createdAt: existing?.createdAt ?? now, + updatedAt: now, + telegramUsername: data.telegramUsername, + telegramFirstName: data.telegramFirstName, + telegramLastName: data.telegramLastName, + }; + + this.localStore.set(data.telegramUserId, user); + await this.saveLocalStore(); + this.logger.debug(`Local upsert: telegramUserId=${data.telegramUserId}`); + return user; + } + /** Convert database row to TelegramUser object */ private rowToUser(row: TelegramUserRow): TelegramUser { return { diff --git a/apps/gateway/telegram/telegram.controller.ts b/apps/gateway/telegram/telegram.controller.ts index 2e7c2c39..22bdc6b4 100644 --- a/apps/gateway/telegram/telegram.controller.ts +++ b/apps/gateway/telegram/telegram.controller.ts @@ -1,11 +1,13 @@ /** - * Telegram webhook controller. + * Telegram controller. * - * Receives webhook requests from Telegram Bot API. + * - POST /telegram/webhook — Receives webhook requests from Telegram Bot API + * - POST /telegram/connect-code — Creates a short code for QR deep link flow */ -import { Controller, Inject, Logger, Post, Req, Res, Headers } from "@nestjs/common"; +import { Body, Controller, HttpException, HttpStatus, Inject, Logger, Post, Req, Res, Headers } from "@nestjs/common"; import { TelegramService } from "./telegram.service.js"; +import type { ConnectionInfo } from "@multica/store/connection"; // Minimal Express types for webhook handling interface ExpressRequest { @@ -25,6 +27,34 @@ export class TelegramController { constructor(@Inject(TelegramService) private readonly telegramService: TelegramService) {} + @Post("connect-code") + async createConnectCode( + @Body() body: { gateway: string; hubId: string; agentId: string; token: string; expires: number }, + ): Promise<{ code: string; botUsername: string }> { + if (!this.telegramService.isConfigured()) { + throw new HttpException("Telegram bot not configured", HttpStatus.SERVICE_UNAVAILABLE); + } + + const botUsername = this.telegramService.getBotUsername(); + if (!botUsername) { + throw new HttpException("Bot username not available", HttpStatus.INTERNAL_SERVER_ERROR); + } + + const connectionInfo: ConnectionInfo = { + type: "multica-connect", + gateway: body.gateway, + hubId: body.hubId, + agentId: body.agentId, + token: body.token, + expires: body.expires, + }; + + const code = this.telegramService.createConnectCode(connectionInfo); + this.logger.debug(`Created connect code: ${code}`); + + return { code, botUsername }; + } + @Post("webhook") async handleWebhook( @Req() req: ExpressRequest, diff --git a/apps/gateway/telegram/telegram.service.ts b/apps/gateway/telegram/telegram.service.ts index 987da5f8..61147259 100644 --- a/apps/gateway/telegram/telegram.service.ts +++ b/apps/gateway/telegram/telegram.service.ts @@ -1,20 +1,33 @@ /** * Telegram service for Gateway. * - * Handles Telegram bot interactions via webhook. + * Handles Telegram bot interactions via webhook or long-polling. + * - Webhook mode: when TELEGRAM_WEBHOOK_URL is set (production / ngrok) + * - Long-polling mode: when TELEGRAM_WEBHOOK_URL is NOT set (local development) + * * - New users: prompts to paste a multica://connect link * - Connection link: verifies with Hub via RPC, persists to DB * - Bound users: routes messages to their Hub agent + * + * Features (ported from Desktop channel plugin): + * - Markdown → Telegram HTML formatting with parse-error fallback + * - Text chunking for messages >4096 chars (paragraph-boundary split) + * - Reply-to original message + 👀 ack reaction + * - Per-chat message serialization (prevents race conditions) + * - Inbound media handling (voice, photo, video, document) */ import { Inject, Injectable, Logger } from "@nestjs/common"; -import type { OnModuleInit } from "@nestjs/common"; -import { Bot, InputFile, webhookCallback } from "grammy"; +import type { OnModuleInit, OnModuleDestroy } from "@nestjs/common"; +import { Bot, GrammyError, InputFile, webhookCallback } from "grammy"; import type { Context } from "grammy"; import { v7 as uuidv7 } from "uuid"; -import { generateEncryptedId } from "@multica/utils"; +import { writeFile, mkdir } from "node:fs/promises"; +import { join, extname } from "node:path"; +import { generateEncryptedId, MEDIA_CACHE_DIR } from "@multica/utils"; import { parseConnectionCode } from "@multica/store/connection"; import type { ConnectionInfo } from "@multica/store/connection"; +import { transcribeAudio, describeImage, describeVideo } from "@multica/core/media"; import { GatewayEvents, RequestAction, @@ -31,8 +44,12 @@ import type { StreamPayload } from "@multica/sdk"; import { EventsGateway } from "../events.gateway.js"; import { TelegramUserStore } from "./telegram-user.store.js"; import type { TelegramUser } from "./types.js"; +import { markdownToTelegramHtml } from "./telegram-format.js"; +import { ShortCodeStore } from "./short-code-store.js"; -// Minimal Express types for webhook handling +// ── Types ── + +/** Minimal Express types for webhook handling */ interface ExpressRequest { body: unknown; header: (name: string) => string | undefined; @@ -50,14 +67,83 @@ interface PendingRequest { timer: ReturnType; } +/** Tracks the originating Telegram message for reply_to and reaction cleanup */ +interface MessageContext { + telegramChatId: number; + telegramMessageId: number; +} + +/** Media attachment extracted from a Telegram message */ +interface MediaAttachment { + type: "audio" | "image" | "video" | "document"; + fileId: string; + mimeType?: string; + duration?: number; + caption?: string; +} + +// ── Constants ── + const VERIFY_TIMEOUT_MS = 30_000; +const TYPING_TIMEOUT_MS = 60_000; +const MAX_CHARS_PER_MESSAGE = 4000; // Telegram limit is 4096; leave room for HTML overhead + +// ── Helpers ── + +/** Check if a GrammyError is an HTML parse failure */ +function isParseError(err: unknown): boolean { + return err instanceof GrammyError && err.description.includes("can't parse entities"); +} + +/** + * Split text at natural boundaries so each chunk stays within Telegram's message limit. + * Prefers paragraph breaks > line breaks > spaces > hard cut. + */ +function chunkText(text: string, maxChars = MAX_CHARS_PER_MESSAGE): string[] { + if (text.length <= maxChars) return [text]; + + const chunks: string[] = []; + let remaining = text; + + while (remaining.length > 0) { + if (remaining.length <= maxChars) { + chunks.push(remaining); + break; + } + + // Find the best break point within the limit + let breakPoint = remaining.lastIndexOf("\n\n", maxChars); + if (breakPoint <= 0 || breakPoint < maxChars * 0.5) { + breakPoint = remaining.lastIndexOf("\n", maxChars); + } + if (breakPoint <= 0 || breakPoint < maxChars * 0.5) { + breakPoint = remaining.lastIndexOf(" ", maxChars); + } + if (breakPoint <= 0) { + breakPoint = maxChars; + } + + chunks.push(remaining.slice(0, breakPoint)); + remaining = remaining.slice(breakPoint).trimStart(); + } + + return chunks; +} + +// ── Service ── @Injectable() -export class TelegramService implements OnModuleInit { - private static readonly TYPING_TIMEOUT_MS = 60_000; // 1 minute safety cap +export class TelegramService implements OnModuleInit, OnModuleDestroy { private bot: Bot | null = null; + private pollingMode = false; + private botUsername: string | null = null; + private readonly shortCodeStore = new ShortCodeStore(); + private pendingRequests = new Map(); + /** Typing indicator timers, keyed by deviceId */ private typingTimers = new Map>(); + /** Tracks the originating message for reply_to & reaction cleanup, keyed by deviceId */ + private messageContexts = new Map(); private readonly logger = new Logger(TelegramService.name); @@ -66,19 +152,50 @@ export class TelegramService implements OnModuleInit { @Inject(TelegramUserStore) private readonly userStore: TelegramUserStore, ) {} + // ── Lifecycle ── + async onModuleInit(): Promise { - console.log("[TelegramService] onModuleInit starting..."); const token = process.env["TELEGRAM_BOT_TOKEN"]; if (!token) { - console.log("[TelegramService] No bot token"); - this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled"); + this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram disabled"); return; } - console.log("[TelegramService] Creating bot..."); this.bot = new Bot(token); + + // Fetch bot info (username) before setting up handlers + try { + const me = await this.bot.api.getMe(); + this.botUsername = me.username ?? null; + this.logger.log(`Telegram bot: @${this.botUsername}`); + } catch (err) { + this.logger.warn(`Failed to fetch bot info: ${err instanceof Error ? err.message : err}`); + } + this.setupHandlers(); - this.logger.log("Telegram bot initialized"); + await this.setupBotCommands(); + + const webhookUrl = process.env["TELEGRAM_WEBHOOK_URL"]; + if (webhookUrl) { + // Webhook mode — Telegram sends updates to our /telegram/webhook endpoint + this.logger.log(`Telegram bot initialized (webhook mode: ${webhookUrl})`); + } else { + // Long-polling mode — pull updates from Telegram (local development) + this.pollingMode = true; + this.bot.start({ + onStart: () => { + this.logger.log("Telegram bot initialized (long-polling mode)"); + }, + }); + } + } + + async onModuleDestroy(): Promise { + this.shortCodeStore.destroy(); + if (this.bot && this.pollingMode) { + await this.bot.stop(); + this.logger.log("Telegram bot stopped"); + } } /** Get grammY webhook callback for Express/NestJS */ @@ -89,12 +206,12 @@ export class TelegramService implements OnModuleInit { if (secretToken) { return webhookCallback(this.bot, "express", { secretToken }) as unknown as ( req: ExpressRequest, - res: ExpressResponse + res: ExpressResponse, ) => Promise; } return webhookCallback(this.bot, "express") as unknown as ( req: ExpressRequest, - res: ExpressResponse + res: ExpressResponse, ) => Promise; } @@ -103,7 +220,365 @@ export class TelegramService implements OnModuleInit { return this.bot !== null; } - /** Send message to a Telegram user by device ID */ + /** Get the bot's Telegram username (e.g. "multica_bot") */ + getBotUsername(): string | null { + return this.botUsername; + } + + /** Create a short code for a connection info (for Telegram deep link QR flow) */ + createConnectCode(connectionInfo: ConnectionInfo): string { + return this.shortCodeStore.store(connectionInfo); + } + + // ── Handler setup ── + + private setupHandlers(): void { + if (!this.bot) return; + + // Per-chat serialization middleware — ensures messages from the same chat + // are processed one at a time, preventing race conditions. + const chatQueues = new Map>(); + this.bot.use(async (ctx, next) => { + const chatId = ctx.chat?.id; + if (!chatId) return next(); + + const key = String(chatId); + const prev = chatQueues.get(key) ?? Promise.resolve(); + + const current = prev.then(() => next()).catch(() => {}); + chatQueues.set(key, current); + await current; + + // Clean up resolved entries to prevent memory leak + if (chatQueues.get(key) === current) { + chatQueues.delete(key); + } + }); + + // Bot commands (must be registered before message:text) + this.bot.command("start", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + const payload = ctx.match?.trim(); + if (payload) { + // Deep link: /start + await this.handleShortCode(ctx, String(ctx.from?.id), payload); + } else { + await ctx.reply( + "Welcome to Multica!\n\n" + + "To connect, scan a QR code from the Multica Desktop app " + + "(Clients \u2192 Channels tab), or paste a connection link here.\n\n" + + "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=...", + ); + } + }); + + this.bot.command("status", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + const telegramUserId = String(ctx.from?.id); + const user = await this.userStore.findByTelegramUserId(telegramUserId); + if (!user) { + await ctx.reply("Not connected.\n\nUse /start or scan a QR code to connect."); + return; + } + const online = this.eventsGateway.isDeviceRegistered(user.hubId); + await ctx.reply( + `Connected to Multica\n\n` + + `Hub: ${user.hubId}\n` + + `Agent: ${user.agentId}\n` + + `Status: ${online ? "Online" : "Offline"}\n\n` + + (online + ? "Your Hub is online and ready to receive messages." + : "Your Hub is offline. Make sure the Multica Desktop app is running."), + ); + }); + + this.bot.command("help", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + await ctx.reply( + "Multica Telegram Bot\n\n" + + "Commands:\n" + + "/start - Connect your account\n" + + "/status - Check connection status\n" + + "/help - Show this message\n\n" + + "To connect:\n" + + "1. Open Multica Desktop app\n" + + "2. Go to Clients \u2192 Channels\n" + + "3. Scan the Telegram QR code with your phone camera\n\n" + + "Or paste a connection link starting with:\nmultica://connect?...", + ); + }); + + // Text messages (private chats only) + this.bot.on("message:text", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + await this.handleTextMessage(ctx); + }); + + // Media messages + const mediaTypes = [ + { + filter: "message:voice" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "audio" as const, + fileId: msg.voice.file_id as string, + mimeType: msg.voice.mime_type as string | undefined, + duration: msg.voice.duration as number | undefined, + }), + }, + { + filter: "message:audio" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "audio" as const, + fileId: msg.audio.file_id as string, + mimeType: msg.audio.mime_type as string | undefined, + duration: msg.audio.duration as number | undefined, + }), + }, + { + filter: "message:photo" as const, + getMedia: (msg: any): MediaAttachment => { + // Pick the largest photo size (last in array) + const photos = msg.photo as Array<{ file_id: string }>; + const largest = photos[photos.length - 1]!; + return { + type: "image" as const, + fileId: largest.file_id, + mimeType: "image/jpeg", + }; + }, + }, + { + filter: "message:video" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "video" as const, + fileId: msg.video.file_id as string, + mimeType: msg.video.mime_type as string | undefined, + duration: msg.video.duration as number | undefined, + }), + }, + { + filter: "message:document" as const, + getMedia: (msg: any): MediaAttachment => ({ + type: "document" as const, + fileId: msg.document.file_id as string, + mimeType: msg.document.mime_type as string | undefined, + }), + }, + ] as const; + + for (const { filter, getMedia } of mediaTypes) { + this.bot.on(filter, async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + await this.handleMediaMessage(ctx, getMedia(ctx.message)); + }); + } + } + + /** Only process private (direct) messages; silently ignore group chats. */ + private isPrivateChat(ctx: Context): boolean { + return ctx.chat?.type === "private"; + } + + /** Register bot commands with Telegram (shown in the menu) */ + private async setupBotCommands(): Promise { + if (!this.bot) return; + try { + await this.bot.api.setMyCommands([ + { command: "start", description: "Connect your Multica account" }, + { command: "status", description: "Check connection status" }, + { command: "help", description: "Show help" }, + ]); + this.logger.log("Telegram bot commands registered"); + } catch (err) { + this.logger.warn(`Failed to set bot commands: ${err instanceof Error ? err.message : err}`); + } + } + + /** Handle a short code from /start deep link */ + private async handleShortCode(ctx: Context, telegramUserId: string, code: string): Promise { + const connectionInfo = this.shortCodeStore.consume(code); + if (!connectionInfo) { + await ctx.reply( + "Connection code expired or invalid.\n\n" + + "QR codes are valid for 30 seconds. Please scan again from the Desktop app.", + ); + return; + } + + await this.connectUser(ctx, telegramUserId, connectionInfo); + } + + // ── Inbound: text messages ── + + private async handleTextMessage(ctx: Context): Promise { + const msg = ctx.message; + if (!msg || !msg.text) return; + + const telegramUserId = String(msg.from?.id); + const text = msg.text.trim(); + + this.logger.debug(`Received message: chatId=${msg.chat.id} from=${telegramUserId} text="${text.slice(0, 50)}"`); + + // Connection link — always handle, even for already-bound users (re-binding) + if (text.startsWith("multica://connect?")) { + await this.handleConnectionLink(ctx, telegramUserId, text); + return; + } + + if (!text) return; + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + + if (user) { + // ACK: 👀 reaction on the original message + await this.addReaction(msg.chat.id, msg.message_id, "👀"); + this.storeMessageContext(user.deviceId, msg.chat.id, msg.message_id); + await this.routeToHub(user, text, ctx); + return; + } + + // New user without connection link + await ctx.reply( + "Welcome to Multica!\n\n" + + "To get started, open the Multica Desktop app, generate a Connection Link, " + + "and paste it here.\n\n" + + "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=...", + ); + } + + // ── Inbound: media messages ── + + private async handleMediaMessage(ctx: Context, media: MediaAttachment): Promise { + const msg = ctx.message; + if (!msg) return; + + const telegramUserId = String(msg.from?.id); + const caption = (msg as any).caption as string | undefined; + + this.logger.debug(`Received ${media.type}: chatId=${msg.chat.id} from=${telegramUserId} fileId=${media.fileId}`); + + // Connection link in caption + if (caption?.startsWith("multica://connect?")) { + await this.handleConnectionLink(ctx, telegramUserId, caption); + return; + } + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + if (!user) { + await ctx.reply( + "Welcome to Multica!\n\n" + + "To get started, open the Multica Desktop app, generate a Connection Link, " + + "and paste it here.", + ); + return; + } + + // ACK: 👀 reaction + await this.addReaction(msg.chat.id, msg.message_id, "👀"); + this.storeMessageContext(user.deviceId, msg.chat.id, msg.message_id); + + // Process media → text description (async, may take a few seconds) + const processedText = await this.processMedia({ ...media, caption: caption ?? undefined }); + + await this.routeToHub(user, processedText, ctx); + } + + // ── Media processing ── + + /** + * Download a file from the Telegram Bot API and save it locally. + */ + private async downloadMedia(fileId: string): Promise { + if (!this.bot) throw new Error("Bot not initialized"); + + const file = await this.bot.api.getFile(fileId); + const filePath = file.file_path; + if (!filePath) throw new Error(`Telegram returned no file_path for fileId=${fileId}`); + + const url = `https://api.telegram.org/file/bot${this.bot.token}/${filePath}`; + const ext = extname(filePath) || ".bin"; + const localPath = join(MEDIA_CACHE_DIR, `${uuidv7()}${ext}`); + + await mkdir(MEDIA_CACHE_DIR, { recursive: true }); + + const res = await fetch(url); + if (!res.ok) throw new Error(`Failed to download file: HTTP ${res.status}`); + const buffer = Buffer.from(await res.arrayBuffer()); + await writeFile(localPath, buffer); + + this.logger.debug(`Downloaded media: ${filePath} → ${localPath}`); + return localPath; + } + + /** + * Process a media attachment into a text description for the agent. + * Uses local whisper / OpenAI Vision / ffmpeg when available; graceful fallback otherwise. + */ + private async processMedia(media: MediaAttachment): Promise { + try { + const filePath = await this.downloadMedia(media.fileId); + + if (media.type === "image") { + const description = await describeImage(filePath); + if (description) { + const parts = ["[Image]", `Description: ${description}`]; + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + const parts = ["[image message received]", `File: ${filePath}`]; + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + + if (media.type === "audio") { + const transcript = await transcribeAudio(filePath); + if (transcript) { + const parts = ["[Voice Message]", `Transcript: ${transcript}`]; + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + const parts = ["[audio message received]", `File: ${filePath}`]; + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + + if (media.type === "video") { + const description = await describeVideo(filePath); + if (description) { + const parts = ["[Video]", `Description: ${description}`]; + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + const parts = ["[video message received]", `File: ${filePath}`]; + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } + + // Document — no processing, just metadata + const parts = ["[document message received]", `File: ${filePath}`]; + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + return parts.join("\n"); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + this.logger.error(`Failed to process ${media.type}: ${msg}`); + return media.caption || `[${media.type} message received — processing failed]`; + } + } + + // ── Outbound: send to Telegram ── + + /** + * Send text to a Telegram user/group by deviceId. + * Applies Markdown → HTML formatting, text chunking, and reply-to. + */ async sendToTelegram(deviceId: string, text: string): Promise { if (!this.bot) return; @@ -113,18 +588,56 @@ export class TelegramService implements OnModuleInit { return; } + // Use chatId from message context (supports groups); fall back to user ID (private chat) + const context = this.messageContexts.get(deviceId); + const chatId = context?.telegramChatId ?? Number(user.telegramUserId); + const chunks = chunkText(text); + try { - await this.bot.api.sendMessage(Number(user.telegramUserId), text); - this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`); + for (let i = 0; i < chunks.length; i++) { + // Only reply_to on the first chunk + const replyTo = i === 0 && context ? context.telegramMessageId : undefined; + await this.sendFormatted(chatId, chunks[i]!, replyTo); + } + this.logger.debug(`Sent ${chunks.length} chunk(s) to Telegram: telegramUserId=${user.telegramUserId}`); } catch (error) { const message = error instanceof Error ? error.message : String(error); this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`); } } + /** + * Send a single message with HTML formatting and optional reply-to. + * Falls back to plain text if HTML parsing fails. + */ + private async sendFormatted( + chatId: number, + text: string, + replyToMessageId?: number, + ): Promise { + if (!this.bot) return; + + const html = markdownToTelegramHtml(text); + const extra: Record = { parse_mode: "HTML" }; + if (replyToMessageId) extra["reply_to_message_id"] = replyToMessageId; + + try { + await this.bot.api.sendMessage(chatId, html, extra); + } catch (err) { + if (isParseError(err)) { + this.logger.warn("HTML parse failed, retrying as plain text"); + const plainExtra: Record = {}; + if (replyToMessageId) plainExtra["reply_to_message_id"] = replyToMessageId; + await this.bot.api.sendMessage(chatId, text, plainExtra); + } else { + throw err; + } + } + } + /** Send a file (photo/document/video/audio) to a Telegram user */ private async sendFileToTelegram( - telegramUserId: string, + deviceId: string, data: Buffer, type: string, caption?: string, @@ -132,9 +645,17 @@ export class TelegramService implements OnModuleInit { ): Promise { if (!this.bot) return; - const chatId = Number(telegramUserId); + const user = await this.userStore.findByDeviceId(deviceId); + if (!user) return; + + const context = this.messageContexts.get(deviceId); + const chatId = context?.telegramChatId ?? Number(user.telegramUserId); const inputFile = new InputFile(data, filename); - const extra = caption ? { caption: caption.slice(0, 1024) } : {}; + + // Format caption as HTML with fallback + const rawCaption = caption?.slice(0, 1024); + const captionHtml = rawCaption ? markdownToTelegramHtml(rawCaption) : undefined; + const extra = captionHtml ? { caption: captionHtml, parse_mode: "HTML" as const } : {}; try { switch (type) { @@ -155,60 +676,116 @@ export class TelegramService implements OnModuleInit { await this.bot.api.sendDocument(chatId, inputFile, extra); break; } - this.logger.debug(`Sent ${type} to Telegram: telegramUserId=${telegramUserId}`); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - this.logger.error(`Failed to send ${type} to Telegram: telegramUserId=${telegramUserId}, error=${message}`); + this.logger.debug(`Sent ${type} to Telegram: deviceId=${deviceId}`); + } catch (err) { + // If HTML caption fails, retry without formatting + if (isParseError(err) && rawCaption) { + this.logger.warn("Media caption HTML parse failed, retrying as plain text"); + const plainExtra = { caption: rawCaption }; + switch (type) { + case "photo": + await this.bot.api.sendPhoto(chatId, inputFile, plainExtra); + break; + case "video": + await this.bot.api.sendVideo(chatId, inputFile, plainExtra); + break; + case "audio": + await this.bot.api.sendAudio(chatId, inputFile, plainExtra); + break; + case "voice": + await this.bot.api.sendVoice(chatId, inputFile, plainExtra); + break; + case "document": + default: + await this.bot.api.sendDocument(chatId, inputFile, plainExtra); + break; + } + } else { + const message = err instanceof Error ? err.message : String(err); + this.logger.error(`Failed to send ${type}: deviceId=${deviceId}, error=${message}`); + } } } - /** Setup bot message handlers */ - private setupHandlers(): void { - if (!this.bot) return; + // ── Reactions ── - this.bot.on("message:text", async (ctx) => { - await this.handleTextMessage(ctx); + private async addReaction(chatId: number, messageId: number, emoji: string): Promise { + if (!this.bot) return; + try { + await this.bot.api.setMessageReaction( + chatId, + messageId, + // Grammy expects a specific emoji union type; cast since our interface accepts any string + [{ type: "emoji", emoji } as unknown as { type: "emoji"; emoji: "👀" }], + ); + } catch { + // Best-effort — reaction failure is not critical + } + } + + private async removeReaction(chatId: number, messageId: number): Promise { + if (!this.bot) return; + try { + await this.bot.api.setMessageReaction(chatId, messageId, []); + } catch { + // Best-effort + } + } + + // ── Typing indicators ── + + private startTyping(deviceId: string): void { + if (this.typingTimers.has(deviceId)) return; + + const context = this.messageContexts.get(deviceId); + if (!context) return; + + const chatId = context.telegramChatId; + const send = () => { + void this.bot?.api.sendChatAction(chatId, "typing").catch(() => {}); + }; + send(); + const interval = setInterval(send, 5000); + this.typingTimers.set(deviceId, interval); + + // Safety timeout: auto-stop if no message_end/agent_error arrives + setTimeout(() => { + if (this.typingTimers.get(deviceId) === interval) { + this.stopTyping(deviceId); + } + }, TYPING_TIMEOUT_MS); + } + + private stopTyping(deviceId: string): void { + const timer = this.typingTimers.get(deviceId); + if (timer) { + clearInterval(timer); + this.typingTimers.delete(deviceId); + } + } + + // ── Message context tracking ── + + private storeMessageContext(deviceId: string, chatId: number, messageId: number): void { + this.messageContexts.set(deviceId, { + telegramChatId: chatId, + telegramMessageId: messageId, }); } - /** Handle incoming text message */ - private async handleTextMessage(ctx: Context): Promise { - const msg = ctx.message; - if (!msg || !msg.text) return; - - const telegramUserId = String(msg.from?.id); - const text = msg.text.trim(); - - this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); - - // Connection link — always handle, even for already-bound users (re-binding) - if (text.startsWith("multica://connect?")) { - await this.handleConnectionLink(ctx, telegramUserId, text); - return; + /** Remove context and 👀 reaction for a device after response is sent */ + private async clearMessageContext(deviceId: string): Promise { + const context = this.messageContexts.get(deviceId); + if (context) { + await this.removeReaction(context.telegramChatId, context.telegramMessageId); + this.messageContexts.delete(deviceId); } - - // Check if user is bound - const user = await this.userStore.findByTelegramUserId(telegramUserId); - - if (user) { - await this.routeToHub(user, text, ctx); - return; - } - - // New user without connection link - await ctx.reply( - "Welcome to Multica!\n\n" + - "To get started, open the Multica Desktop app, generate a Connection Link, " + - "and paste it here.\n\n" + - "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=..." - ); } - /** Handle a multica://connect? connection link */ - private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise { - const msg = ctx.message; + // ── Connection & routing ── - // 1. Parse and validate the connection link + /** Handle a multica://connect? connection link pasted as text */ + private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise { let connectionInfo: ConnectionInfo; try { connectionInfo = parseConnectionCode(text); @@ -218,42 +795,47 @@ export class TelegramService implements OnModuleInit { return; } - // 2. Check Hub is online + await this.connectUser(ctx, telegramUserId, connectionInfo); + } + + /** + * Shared connection flow used by both paste-link and /start deep link. + * Checks Hub online → registers virtual device → sends verify RPC → saves to DB. + */ + private async connectUser(ctx: Context, telegramUserId: string, connectionInfo: ConnectionInfo): Promise { + const msg = ctx.message; + + // 1. Check Hub is online if (!this.eventsGateway.isDeviceRegistered(connectionInfo.hubId)) { await ctx.reply( "Connection failed: Hub is not online.\n\n" + - "Make sure the Multica Desktop app is running and connected to the Gateway, then try again." + "Make sure the Multica Desktop app is running and connected to the Gateway, then try again.", ); return; } - // 3. Unregister old virtual device if user is re-binding + // 2. Unregister old virtual device if user is re-binding const existingUser = await this.userStore.findByTelegramUserId(telegramUserId); if (existingUser && this.eventsGateway.isDeviceRegistered(existingUser.deviceId)) { this.eventsGateway.unregisterVirtualDevice(existingUser.deviceId); } - // 4. Generate device ID and register virtual device + // 3. Generate device ID and register virtual device const deviceId = `tg-${generateEncryptedId()}`; this.registerVirtualDeviceForUser(deviceId, telegramUserId); - // 5. Send verify RPC + // 4. Send verify RPC try { await ctx.reply("Connecting... Please approve the connection on your Desktop app."); - const result = await this.sendVerifyRpc( - deviceId, - connectionInfo.hubId, - connectionInfo.token, - { - platform: "telegram", - clientName: msg?.from?.username - ? `Telegram @${msg.from.username}` - : `Telegram ${msg?.from?.first_name ?? telegramUserId}`, - } - ); + const result = await this.sendVerifyRpc(deviceId, connectionInfo.hubId, connectionInfo.token, { + platform: "telegram", + clientName: msg?.from?.username + ? `Telegram @${msg.from.username}` + : `Telegram ${msg?.from?.first_name ?? telegramUserId}`, + }); - // 6. Save to DB + // 5. Save to DB await this.userStore.upsert({ telegramUserId, hubId: connectionInfo.hubId, @@ -266,23 +848,25 @@ export class TelegramService implements OnModuleInit { await ctx.reply( "Connected successfully!\n\n" + - `Hub: ${result.hubId}\n` + - `Agent: ${result.agentId}\n\n` + - "You can now send messages to interact with your agent." + `Hub: ${result.hubId}\n` + + `Agent: ${result.agentId}\n\n` + + "You can now send messages to interact with your agent.", ); - this.logger.log(`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`); + this.logger.log( + `Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`, + ); } catch (error) { // Cleanup virtual device on failure this.eventsGateway.unregisterVirtualDevice(deviceId); - // Reject all pending requests for this device - this.cleanupPendingRequests(); const message = error instanceof Error ? error.message : String(error); if (message.includes("REJECTED")) { await ctx.reply("Connection rejected.\n\nThe connection was declined on the Desktop app."); } else if (message.includes("timed out")) { - await ctx.reply("Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds."); + await ctx.reply( + "Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds.", + ); } else { await ctx.reply(`Connection failed: ${message}\n\nPlease try again.`); } @@ -342,7 +926,7 @@ export class TelegramService implements OnModuleInit { if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { await ctx.reply( "Your Hub is currently offline.\n\n" + - "Make sure the Multica Desktop app is running and connected to the Gateway." + "Make sure the Multica Desktop app is running and connected to the Gateway.", ); return; } @@ -368,10 +952,21 @@ export class TelegramService implements OnModuleInit { return; } - this.logger.debug(`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`); + this.logger.debug( + `Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`, + ); } - /** Register a virtual device with a sendCallback that handles RPC responses, stream events, and messages */ + // ── Virtual device registration ── + + /** + * Register a virtual device with a sendCallback that handles: + * - RPC responses (verify) + * - Stream events (typing, text delivery with formatting/chunking/reply-to) + * - File delivery + * - Regular messages + * - Errors + */ private registerVirtualDeviceForUser(deviceId: string, telegramUserId: string): void { this.eventsGateway.registerVirtualDevice(deviceId, { sendCallback: (_event: string, data: unknown) => { @@ -394,7 +989,7 @@ export class TelegramService implements OnModuleInit { return; } - // Stream event — typing indicator + extract text content for Telegram + // Stream event — typing indicator + formatted text delivery if (msg.action === StreamAction) { const streamPayload = msg.payload as StreamPayload; const event = streamPayload?.event; @@ -402,13 +997,13 @@ export class TelegramService implements OnModuleInit { // Start typing when LLM begins generating if (event.type === "message_start") { - this.startTyping(telegramUserId); + this.startTyping(deviceId); return; } - // Stop typing + send text on message_end + // Stop typing + send formatted text on message_end if (event.type === "message_end") { - this.stopTyping(telegramUserId); + this.stopTyping(deviceId); const agentMsg = (event as { message?: { content?: Array<{ type: string; text?: string }> } }).message; if (agentMsg?.content) { const textContent = agentMsg.content @@ -416,7 +1011,9 @@ export class TelegramService implements OnModuleInit { .map((c) => c.text!) .join(""); if (textContent) { - this.sendToTelegram(deviceId, textContent); + void this.sendToTelegram(deviceId, textContent).then(() => { + void this.clearMessageContext(deviceId); + }); } } return; @@ -424,7 +1021,8 @@ export class TelegramService implements OnModuleInit { // Stop typing on error if (event.type === "agent_error") { - this.stopTyping(telegramUserId); + this.stopTyping(deviceId); + void this.clearMessageContext(deviceId); return; } @@ -441,7 +1039,7 @@ export class TelegramService implements OnModuleInit { }; if (payload?.data) { void this.sendFileToTelegram( - telegramUserId, + deviceId, Buffer.from(payload.data, "base64"), payload.type ?? "document", payload.caption, @@ -455,57 +1053,22 @@ export class TelegramService implements OnModuleInit { if (msg.action === "message") { const payload = msg.payload as { content?: string; agentId?: string }; if (payload?.content) { - this.sendToTelegram(deviceId, payload.content); + void this.sendToTelegram(deviceId, payload.content); } return; } // Error messages if (msg.action === "error") { - this.stopTyping(telegramUserId); + this.stopTyping(deviceId); + void this.clearMessageContext(deviceId); const payload = msg.payload as { message?: string; code?: string }; if (payload?.message) { - this.sendToTelegram(deviceId, `Error: ${payload.message}`); + void this.sendToTelegram(deviceId, `Error: ${payload.message}`); } } }, }); } - /** Start sending "typing" indicator to Telegram at regular intervals */ - private startTyping(telegramUserId: string): void { - if (this.typingTimers.has(telegramUserId)) return; - const chatId = Number(telegramUserId); - const send = () => { - void this.bot?.api.sendChatAction(chatId, "typing").catch(() => {}); - }; - send(); - const interval = setInterval(send, 5000); - this.typingTimers.set(telegramUserId, interval); - - // Safety timeout: auto-stop if no message_end/agent_error arrives - setTimeout(() => { - if (this.typingTimers.get(telegramUserId) === interval) { - this.stopTyping(telegramUserId); - } - }, TelegramService.TYPING_TIMEOUT_MS); - } - - /** Stop the "typing" indicator for a Telegram user */ - private stopTyping(telegramUserId: string): void { - const timer = this.typingTimers.get(telegramUserId); - if (timer) { - clearInterval(timer); - this.typingTimers.delete(telegramUserId); - } - } - - /** Cleanup all pending requests (used on verify failure) */ - private cleanupPendingRequests(): void { - for (const [id, pending] of this.pendingRequests) { - clearTimeout(pending.timer); - pending.reject(new Error("Cleaned up")); - this.pendingRequests.delete(id); - } - } } diff --git a/package.json b/package.json index 568bff9e..345fe5ac 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "dev:desktop:onboarding": "pnpm --filter @multica/desktop dev:onboarding", "dev:gateway": "pnpm --filter @multica/gateway dev", "dev:web": "pnpm --filter @multica/web dev", + "dev:local": "bash scripts/dev-local.sh", "dev:all": "concurrently \"pnpm dev:gateway\" \"pnpm dev:web\"", "build": "turbo build", "build:desktop": "pnpm --filter @multica/desktop build", diff --git a/packages/core/src/channels/index.ts b/packages/core/src/channels/index.ts index 63ecc950..cd1043f1 100644 --- a/packages/core/src/channels/index.ts +++ b/packages/core/src/channels/index.ts @@ -13,13 +13,10 @@ export type { ChannelsConfig, } from "./types.js"; -// Built-in channel plugins -import { registerChannel } from "./registry.js"; -import { telegramChannel } from "./plugins/telegram.js"; - /** Register all built-in channel plugins. Call once at startup. */ export function initChannels(): void { - registerChannel(telegramChannel); + // Telegram: use official bot via Gateway webhook instead of user-created bots. + // The long-polling plugin is kept in plugins/telegram.ts but not registered. // Future: registerChannel(discordChannel); // Future: registerChannel(feishuChannel); } diff --git a/scripts/dev-local.sh b/scripts/dev-local.sh new file mode 100755 index 00000000..80fd4f6f --- /dev/null +++ b/scripts/dev-local.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +# +# Local development: Gateway (with Telegram bot) + Desktop + Web (for login) +# +# Usage: +# pnpm dev:local +# +# Reads TELEGRAM_BOT_TOKEN from .env at the repo root. +# Gateway runs on port 4000 in long-polling mode (no TELEGRAM_WEBHOOK_URL needed). +# Web app runs on port 3000 (default) for OAuth login flow. +# Desktop connects to the local Gateway and uses local Web for login. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT_DIR="$SCRIPT_DIR/.." +ENV_FILE="$ROOT_DIR/.env" + +# Load .env +if [ ! -f "$ENV_FILE" ]; then + echo "Error: .env file not found at $ENV_FILE" + echo "Copy .env.example to .env and fill in TELEGRAM_BOT_TOKEN" + exit 1 +fi + +set -a +source "$ENV_FILE" +set +a + +if [ -z "${TELEGRAM_BOT_TOKEN:-}" ]; then + echo "Error: TELEGRAM_BOT_TOKEN not set in .env" + exit 1 +fi + +echo "Starting local dev environment..." +echo " Gateway: http://localhost:4000 (Telegram long-polling mode)" +echo " Web: http://localhost:3000 (OAuth login)" +echo " Desktop: connecting to local Gateway + Web" +echo "" + +# Build shared packages first +pnpm turbo build --filter=@multica/types --filter=@multica/utils --filter=@multica/core + +# Start everything +# Gateway uses PORT=4000 to avoid conflict with Web app on port 3000 +exec pnpm concurrently \ + -n types,utils,core,gateway,web,desktop \ + -c blue,green,yellow,magenta,red,cyan \ + "pnpm --filter @multica/types dev" \ + "pnpm --filter @multica/utils dev" \ + "pnpm --filter @multica/core dev" \ + "PORT=4000 pnpm --filter @multica/gateway dev" \ + "pnpm --filter @multica/web dev" \ + "GATEWAY_URL=http://localhost:4000 MAIN_VITE_WEB_URL=http://localhost:3000 pnpm --filter @multica/desktop dev"