From dc60cb754d8c6328251110068f1041a6a7946193 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:04:35 +0800 Subject: [PATCH] feat(telegram): replace Hub URL binding with connection link verification Replace the two-step Hub URL binding flow with a multica://connect connection link flow that uses the same verify RPC handshake as the SDK/web clients. Changes: - types.ts: replace hubUrl with hubId + agentId fields - telegram-user.store.ts: update DB schema (hub_id, agent_id columns), accept explicit deviceId in upsert - telegram.service.ts: rewrite with parseConnectionCode validation, verify RPC via routeFromVirtualDevice, pending request map for RPC promise tracking, smart sendCallback for stream/message/RPC - package.json: add @multica/store workspace dependency Flow: user pastes multica://connect link -> parse & validate -> check Hub online -> register virtual device -> verify RPC to Hub -> Desktop approval -> save to DB -> route messages Co-Authored-By: Claude Opus 4.6 --- package.json | 1 + pnpm-lock.yaml | 3 + src/gateway/telegram/telegram-user.store.ts | 30 +- src/gateway/telegram/telegram.service.ts | 374 +++++++++++++------- src/gateway/telegram/types.ts | 7 +- 5 files changed, 279 insertions(+), 136 deletions(-) diff --git a/package.json b/package.json index 3c492741..fd91fcea 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "@mariozechner/pi-coding-agent": "^0.52.9", "@mozilla/readability": "^0.6.0", "@multica/sdk": "workspace:*", + "@multica/store": "workspace:*", "@nestjs/common": "^11.1.12", "@nestjs/core": "^11.1.12", "@nestjs/platform-express": "^11.1.12", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 72d97303..5f0841c8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -47,6 +47,9 @@ importers: '@multica/sdk': specifier: workspace:* version: link:packages/sdk + '@multica/store': + specifier: workspace:* + version: link:packages/store '@nestjs/common': specifier: ^11.1.12 version: 11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2) diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts index 5bd839ec..1570bc8d 100644 --- a/src/gateway/telegram/telegram-user.store.ts +++ b/src/gateway/telegram/telegram-user.store.ts @@ -11,7 +11,8 @@ import type { TelegramUser, TelegramUserCreate } from "./types.js"; interface TelegramUserRow extends RowDataPacket { telegram_user_id: string; - hub_url: string; + hub_id: string; + agent_id: string; device_id: string; created_at: Date; updated_at: Date; @@ -43,7 +44,8 @@ export class TelegramUserStore implements OnModuleInit { const sql = ` CREATE TABLE IF NOT EXISTS telegram_users ( telegram_user_id VARCHAR(64) PRIMARY KEY, - hub_url VARCHAR(512) NOT NULL, + hub_id VARCHAR(64) NOT NULL, + agent_id VARCHAR(64) NOT NULL, device_id VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, @@ -93,16 +95,20 @@ export class TelegramUserStore implements OnModuleInit { const existing = await this.findByTelegramUserId(data.telegramUserId); if (existing) { - // Update existing user + // Update existing user — also update device_id if provided await this.db.execute( `UPDATE telegram_users SET - hub_url = ?, + hub_id = ?, + agent_id = ?, + device_id = ?, telegram_username = ?, telegram_first_name = ?, telegram_last_name = ? WHERE telegram_user_id = ?`, [ - data.hubUrl, + data.hubId, + data.agentId, + data.deviceId ?? existing.deviceId, data.telegramUsername ?? null, data.telegramFirstName ?? null, data.telegramLastName ?? null, @@ -114,17 +120,18 @@ export class TelegramUserStore implements OnModuleInit { return updated!; } - // Create new user with generated device ID - const deviceId = `tg-${uuidv7()}`; + // Create new user with provided or generated device ID + const deviceId = data.deviceId ?? `tg-${uuidv7()}`; await this.db.execute( `INSERT INTO telegram_users ( - telegram_user_id, hub_url, device_id, + telegram_user_id, hub_id, agent_id, device_id, telegram_username, telegram_first_name, telegram_last_name - ) VALUES (?, ?, ?, ?, ?, ?)`, + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, [ data.telegramUserId, - data.hubUrl, + data.hubId, + data.agentId, deviceId, data.telegramUsername ?? null, data.telegramFirstName ?? null, @@ -140,7 +147,8 @@ export class TelegramUserStore implements OnModuleInit { private rowToUser(row: TelegramUserRow): TelegramUser { return { telegramUserId: row.telegram_user_id, - hubUrl: row.hub_url, + hubId: row.hub_id, + agentId: row.agent_id, deviceId: row.device_id, createdAt: row.created_at, updatedAt: row.updated_at, diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts index 9cf0bcac..1076b2de 100644 --- a/src/gateway/telegram/telegram.service.ts +++ b/src/gateway/telegram/telegram.service.ts @@ -2,14 +2,30 @@ * Telegram service for Gateway. * * Handles Telegram bot interactions via webhook. - * - New users: prompts for Hub URL - * - Bound users: routes messages to their Hub + * - New users: prompts to paste a multica://connect link + * - Connection link: verifies with Hub via RPC, persists to DB + * - Bound users: routes messages to their Hub agent */ import { Inject, Injectable, Logger } from "@nestjs/common"; import type { OnModuleInit } from "@nestjs/common"; import { Bot, webhookCallback } from "grammy"; import type { Context } from "grammy"; +import { v7 as uuidv7 } from "uuid"; +import { parseConnectionCode } from "@multica/store/connection"; +import type { ConnectionInfo } from "@multica/store/connection"; +import { + GatewayEvents, + RequestAction, + ResponseAction, + StreamAction, + type RoutedMessage, + type RequestPayload, + type ResponsePayload, + type VerifyParams, + type VerifyResult, +} from "@multica/sdk"; +import type { StreamPayload } from "@multica/sdk"; import { EventsGateway } from "../events.gateway.js"; import { TelegramUserStore } from "./telegram-user.store.js"; import type { TelegramUser } from "./types.js"; @@ -26,18 +42,18 @@ interface ExpressResponse { headersSent: boolean; } -// Users in the process of binding Hub URL -interface PendingBinding { - awaitingUrl: boolean; - telegramUsername?: string | undefined; - telegramFirstName?: string | undefined; - telegramLastName?: string | undefined; +interface PendingRequest { + resolve: (value: T) => void; + reject: (reason: Error) => void; + timer: ReturnType; } +const VERIFY_TIMEOUT_MS = 30_000; + @Injectable() export class TelegramService implements OnModuleInit { private bot: Bot | null = null; - private pendingBindings = new Map(); + private pendingRequests = new Map(); private readonly logger = new Logger(TelegramService.name); @@ -121,153 +137,265 @@ export class TelegramService implements OnModuleInit { this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); + // Connection link — always handle, even for already-bound users (re-binding) + if (text.startsWith("multica://connect?")) { + await this.handleConnectionLink(ctx, telegramUserId, text); + return; + } + // Check if user is bound const user = await this.userStore.findByTelegramUserId(telegramUserId); if (user) { - // User is bound, route message to Hub await this.routeToHub(user, text, ctx); return; } - // Check if user is in binding process - const pending = this.pendingBindings.get(telegramUserId); - - if (pending?.awaitingUrl) { - // User is providing Hub URL - await this.handleHubUrlInput(ctx, telegramUserId, text, pending); - return; - } - - // New user, start binding process - await this.startBindingProcess(ctx, telegramUserId); - } - - /** Start the Hub binding process for a new user */ - private async startBindingProcess(ctx: Context, telegramUserId: string): Promise { - const msg = ctx.message; - - this.pendingBindings.set(telegramUserId, { - awaitingUrl: true, - telegramUsername: msg?.from?.username, - telegramFirstName: msg?.from?.first_name, - telegramLastName: msg?.from?.last_name, - }); - + // New user without connection link await ctx.reply( - "👋 Welcome to Multica!\n\n" + - "Please enter your Hub URL to get started.\n\n" + - "Example: https://your-hub.example.com" + "Welcome to Multica!\n\n" + + "To get started, open the Multica Desktop app, generate a Connection Link, " + + "and paste it here.\n\n" + + "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=..." ); } - /** Handle Hub URL input from user */ - private async handleHubUrlInput( - ctx: Context, - telegramUserId: string, - url: string, - pending: PendingBinding - ): Promise { - // Validate URL format - if (!this.isValidUrl(url)) { - await ctx.reply( - "❌ Invalid URL format.\n\n" + - "Please enter a valid Hub URL.\n" + - "Example: https://your-hub.example.com" - ); - return; - } + /** Handle a multica://connect? connection link */ + private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise { + const msg = ctx.message; - // Validate Hub connectivity - const isValid = await this.validateHubUrl(url); - if (!isValid) { - await ctx.reply( - "❌ Cannot connect to this Hub.\n\n" + - "Please check the URL and make sure your Hub is online.\n" + - "Then try again with the correct URL." - ); - return; - } - - // Create user record + // 1. Parse and validate the connection link + let connectionInfo: ConnectionInfo; try { - const user = await this.userStore.upsert({ + connectionInfo = parseConnectionCode(text); + } catch (error) { + const message = error instanceof Error ? error.message : "Invalid connection link"; + await ctx.reply(`Connection failed: ${message}\n\nPlease generate a new link and try again.`); + return; + } + + // 2. Check Hub is online + if (!this.eventsGateway.isDeviceRegistered(connectionInfo.hubId)) { + await ctx.reply( + "Connection failed: Hub is not online.\n\n" + + "Make sure the Multica Desktop app is running and connected to the Gateway, then try again." + ); + return; + } + + // 3. Unregister old virtual device if user is re-binding + const existingUser = await this.userStore.findByTelegramUserId(telegramUserId); + if (existingUser && this.eventsGateway.isDeviceRegistered(existingUser.deviceId)) { + this.eventsGateway.unregisterVirtualDevice(existingUser.deviceId); + } + + // 4. Generate device ID and register virtual device + const deviceId = `tg-${uuidv7()}`; + this.registerVirtualDeviceForUser(deviceId, telegramUserId); + + // 5. Send verify RPC + try { + await ctx.reply("Connecting... Please approve the connection on your Desktop app."); + + const result = await this.sendVerifyRpc( + deviceId, + connectionInfo.hubId, + connectionInfo.token, + { + platform: "telegram", + telegramUserId, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + } + ); + + // 6. Save to DB + await this.userStore.upsert({ telegramUserId, - hubUrl: url, - telegramUsername: pending.telegramUsername, - telegramFirstName: pending.telegramFirstName, - telegramLastName: pending.telegramLastName, + hubId: connectionInfo.hubId, + agentId: connectionInfo.agentId, + deviceId, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + telegramLastName: msg?.from?.last_name, }); - // Register as virtual device - this.eventsGateway.registerVirtualDevice(user.deviceId, { - sendCallback: (_event, data) => { - const payload = data as { text?: string }; - if (payload.text) { - this.sendToTelegram(user.deviceId, payload.text); - } - }, - }); - - this.pendingBindings.delete(telegramUserId); - await ctx.reply( - "✅ Hub connected successfully!\n\n" + - `Your Device ID: ${user.deviceId}\n\n` + + "Connected successfully!\n\n" + + `Hub: ${result.hubId}\n` + + `Agent: ${result.agentId}\n\n` + "You can now send messages to interact with your agent." ); - this.logger.log(`Telegram user bound to Hub: telegramUserId=${telegramUserId}, hubUrl=${url}, deviceId=${user.deviceId}`); + this.logger.log(`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`); } catch (error) { + // Cleanup virtual device on failure + this.eventsGateway.unregisterVirtualDevice(deviceId); + // Reject all pending requests for this device + this.cleanupPendingRequests(); + const message = error instanceof Error ? error.message : String(error); - this.logger.error(`Failed to bind Telegram user: telegramUserId=${telegramUserId}, error=${message}`); - await ctx.reply("❌ An error occurred. Please try again later."); + if (message.includes("REJECTED")) { + await ctx.reply("Connection rejected.\n\nThe connection was declined on the Desktop app."); + } else if (message.includes("timed out")) { + await ctx.reply("Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds."); + } else { + await ctx.reply(`Connection failed: ${message}\n\nPlease try again.`); + } + + this.logger.warn(`Telegram verify failed: telegramUserId=${telegramUserId}, error=${message}`); } } - /** Validate URL format */ - private isValidUrl(url: string): boolean { - try { - const parsed = new URL(url); - return parsed.protocol === "http:" || parsed.protocol === "https:"; - } catch { - return false; - } - } + /** Send a verify RPC to Hub via the virtual device */ + private sendVerifyRpc( + deviceId: string, + hubId: string, + token: string, + meta: Record, + ): Promise { + return new Promise((resolve, reject) => { + const requestId = uuidv7(); - /** Validate Hub URL connectivity */ - private async validateHubUrl(url: string): Promise { - try { - // Try to connect to the Hub's health endpoint - const response = await fetch(`${url}/`, { - method: "GET", - signal: AbortSignal.timeout(5000), + const timer = setTimeout(() => { + this.pendingRequests.delete(requestId); + reject(new Error("Verify request timed out")); + }, VERIFY_TIMEOUT_MS); + + this.pendingRequests.set(requestId, { + resolve: resolve as (v: unknown) => void, + reject, + timer, }); - return response.ok; - } catch { - return false; - } + + const payload: RequestPayload = { + requestId, + method: "verify", + params: { token, meta }, + }; + + const message: RoutedMessage> = { + id: uuidv7(), + uid: null, + from: deviceId, + to: hubId, + action: RequestAction, + payload, + }; + + const sent = this.eventsGateway.routeFromVirtualDevice(message); + if (!sent) { + this.pendingRequests.delete(requestId); + clearTimeout(timer); + reject(new Error("Failed to route verify request to Hub")); + } + }); } - /** Route message to user's Hub */ + /** Route a regular chat message to the user's Hub agent */ private async routeToHub(user: TelegramUser, text: string, ctx: Context): Promise { - // Ensure virtual device is registered - if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { - this.eventsGateway.registerVirtualDevice(user.deviceId, { - sendCallback: (_event, data) => { - const payload = data as { text?: string }; - if (payload.text) { - this.sendToTelegram(user.deviceId, payload.text); - } - }, - }); + // Ensure Hub is online + if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { + await ctx.reply( + "Your Hub is currently offline.\n\n" + + "Make sure the Multica Desktop app is running and connected to the Gateway." + ); + return; } - // TODO: Route message to Hub via EventsGateway - // For now, just acknowledge receipt - this.logger.log(`Routing message to Hub: deviceId=${user.deviceId}, hubUrl=${user.hubUrl}`); + // Ensure virtual device is registered (may have been lost on gateway restart) + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId); + } - // Placeholder: In full implementation, this would send to Hub - await ctx.reply(`📨 Message received. Routing to your Hub...`); + // Send message to Hub + const message: RoutedMessage = { + id: uuidv7(), + uid: null, + from: user.deviceId, + to: user.hubId, + action: "message", + payload: { agentId: user.agentId, content: text }, + }; + + const sent = this.eventsGateway.routeFromVirtualDevice(message); + if (!sent) { + await ctx.reply("Failed to send message. Please try again."); + return; + } + + this.logger.debug(`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`); + } + + /** Register a virtual device with a sendCallback that handles RPC responses, stream events, and messages */ + private registerVirtualDeviceForUser(deviceId: string, telegramUserId: string): void { + this.eventsGateway.registerVirtualDevice(deviceId, { + sendCallback: (_event: string, data: unknown) => { + const msg = data as RoutedMessage; + if (!msg || !msg.action) return; + + // RPC response — resolve/reject pending request + if (msg.action === ResponseAction) { + const response = msg.payload as ResponsePayload; + const pending = this.pendingRequests.get(response.requestId); + if (pending) { + this.pendingRequests.delete(response.requestId); + clearTimeout(pending.timer); + if (response.ok) { + pending.resolve(response.payload); + } else { + pending.reject(new Error(`RPC error [${response.error.code}]: ${response.error.message}`)); + } + } + return; + } + + // Stream event — extract text content for Telegram + if (msg.action === StreamAction) { + const streamPayload = msg.payload as StreamPayload; + const event = streamPayload?.event; + if (event && "type" in event && event.type === "message_end") { + // Extract final text from the message + const agentMsg = (event as { message?: { content?: Array<{ type: string; text?: string }> } }).message; + if (agentMsg?.content) { + const textContent = agentMsg.content + .filter((c) => c.type === "text" && c.text) + .map((c) => c.text!) + .join(""); + if (textContent) { + this.sendToTelegram(deviceId, textContent); + } + } + } + return; + } + + // Regular message (e.g., "message" action from Hub) + if (msg.action === "message") { + const payload = msg.payload as { content?: string; agentId?: string }; + if (payload?.content) { + this.sendToTelegram(deviceId, payload.content); + } + return; + } + + // Error messages + if (msg.action === "error") { + const payload = msg.payload as { message?: string; code?: string }; + if (payload?.message) { + this.sendToTelegram(deviceId, `Error: ${payload.message}`); + } + } + }, + }); + } + + /** Cleanup all pending requests (used on verify failure) */ + private cleanupPendingRequests(): void { + for (const [id, pending] of this.pendingRequests) { + clearTimeout(pending.timer); + pending.reject(new Error("Cleaned up")); + this.pendingRequests.delete(id); + } } } diff --git a/src/gateway/telegram/types.ts b/src/gateway/telegram/types.ts index 315dd10d..c4bd87fc 100644 --- a/src/gateway/telegram/types.ts +++ b/src/gateway/telegram/types.ts @@ -5,7 +5,8 @@ /** Telegram user record stored in database */ export interface TelegramUser { telegramUserId: string; - hubUrl: string; + hubId: string; + agentId: string; deviceId: string; createdAt: Date; updatedAt: Date; @@ -17,7 +18,9 @@ export interface TelegramUser { /** Data required to create/update a Telegram user */ export interface TelegramUserCreate { telegramUserId: string; - hubUrl: string; + hubId: string; + agentId: string; + deviceId?: string; telegramUsername?: string | undefined; telegramFirstName?: string | undefined; telegramLastName?: string | undefined;