From 604d74d98459a38a9da394a18c8653f2596a848d Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:04:10 +0800 Subject: [PATCH 1/9] fix(gateway): route messages to virtual devices in handleSend() handleSend() only checked socket-based devices (deviceToSocket map), causing DEVICE_NOT_FOUND errors when Hub sends responses to virtual devices like Telegram. Now checks virtualDevices map as fallback. Also adds routeFromVirtualDevice() to allow virtual devices to initiate messages (e.g., verify RPC, chat messages) through the Gateway routing infrastructure. Co-Authored-By: Claude Opus 4.6 --- package.json | 1 + pnpm-lock.yaml | 68 +++++ src/gateway/app.module.ts | 8 +- src/gateway/database/database.module.ts | 15 ++ src/gateway/database/database.service.ts | 93 +++++++ src/gateway/events.gateway.ts | 134 ++++++---- src/gateway/gateway.module.ts | 3 +- src/gateway/main.ts | 25 +- src/gateway/telegram/telegram-user.store.ts | 152 +++++++++++ src/gateway/telegram/telegram.controller.ts | 67 +++++ src/gateway/telegram/telegram.module.ts | 17 ++ src/gateway/telegram/telegram.service.ts | 273 ++++++++++++++++++++ src/gateway/telegram/types.ts | 27 ++ src/gateway/test-app.ts | 29 +++ src/gateway/types.ts | 8 + 15 files changed, 862 insertions(+), 58 deletions(-) create mode 100644 src/gateway/database/database.module.ts create mode 100644 src/gateway/database/database.service.ts create mode 100644 src/gateway/telegram/telegram-user.store.ts create mode 100644 src/gateway/telegram/telegram.controller.ts create mode 100644 src/gateway/telegram/telegram.module.ts create mode 100644 src/gateway/telegram/telegram.service.ts create mode 100644 src/gateway/telegram/types.ts create mode 100644 src/gateway/test-app.ts create mode 100644 src/gateway/types.ts diff --git a/package.json b/package.json index a6da1ff5..3c492741 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "croner": "^10.0.1", "fast-glob": "^3.3.3", "grammy": "^1.39.3", + "mysql2": "^3.14.1", "json5": "^2.2.3", "linkedom": "^0.18.12", "nestjs-pino": "^4.5.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 84a3a2d7..72d97303 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -83,6 +83,9 @@ importers: linkedom: specifier: ^0.18.12 version: 0.18.12 + mysql2: + specifier: ^3.14.1 + version: 3.16.3 nestjs-pino: specifier: ^4.5.0 version: 4.5.0(@nestjs/common@11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2))(pino-http@11.0.0)(pino@10.3.0)(rxjs@7.8.2) @@ -4349,6 +4352,10 @@ packages: resolution: {integrity: sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ==} engines: {node: '>= 0.4'} + aws-ssl-profiles@1.1.2: + resolution: {integrity: sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g==} + engines: {node: '>= 6.0.0'} + axe-core@4.11.1: resolution: {integrity: sha512-BASOg+YwO2C+346x3LZOeoovTIoTrRqEsqMa6fmfAV0P+U9mFr9NsyOEpiYvFjbc64NMrSswhV50WdXzdb/Z5A==} engines: {node: '>=4'} @@ -5023,6 +5030,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -5890,6 +5901,9 @@ packages: resolution: {integrity: sha512-zV/5HKTfCeKWnxG0Dmrw51hEWFGfcF2xiXqcA3+J90WDuP0SvoiSO5ORvcBsifmx/FoIjgQN3oNOGaQ5PhLFkg==} engines: {node: '>=18'} + generate-function@2.3.1: + resolution: {integrity: sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==} + generator-function@2.0.1: resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==} engines: {node: '>= 0.4'} @@ -6400,6 +6414,9 @@ packages: is-promise@4.0.0: resolution: {integrity: sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==} + is-property@1.0.2: + resolution: {integrity: sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==} + is-regex@1.2.1: resolution: {integrity: sha512-MjYsKHO5O7mCsmRGxWcLWheFqN9DJ/2TmngvjKXihe6efViPqc274+Fx/4fYj/r03+ESvBdTXK0V6tA3rgez1g==} engines: {node: '>= 0.4'} @@ -6929,6 +6946,10 @@ packages: resolution: {integrity: sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==} engines: {node: '>=12'} + lru.min@1.1.4: + resolution: {integrity: sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA==} + engines: {bun: '>=1.0.0', deno: '>=1.30.0', node: '>=8.0.0'} + lucide-react-native@0.563.0: resolution: {integrity: sha512-q4tYoAMorTqv+UXRYc0MyiEAOF+4Bu73zxD63EDrnGCFL+xuj+imBm3E2rIKRmME0heVHlK+98fsi8wbL92LNQ==} peerDependencies: @@ -7315,9 +7336,17 @@ packages: resolution: {integrity: sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA==} engines: {node: ^18.17.0 || >=20.5.0} + mysql2@3.16.3: + resolution: {integrity: sha512-+3XhQEt4FEFuvGV0JjIDj4eP2OT/oIj/54dYvqhblnSzlfcxVOuj+cd15Xz6hsG4HU1a+A5+BA9gm0618C4z7A==} + engines: {node: '>= 8.0'} + mz@2.7.0: resolution: {integrity: sha512-z81GNO7nnYMEhrGh9LeymoE4+Yr0Wn5McHIZMK5cfQCl+NDX08sCZgUc9/6MHni9IWuFLm1Z3HTCXu2z9fN62Q==} + named-placeholders@1.1.6: + resolution: {integrity: sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w==} + engines: {node: '>=8.0.0'} + nanoid@3.3.11: resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -8439,6 +8468,9 @@ packages: resolution: {integrity: sha512-1gnZf7DFcoIcajTjTwjwuDjzuz4PPcY2StKPlsGAQ1+YH20IRVrBaXSWmdjowTJ6u8Rc01PoYOGHXfP1mYcZNQ==} engines: {node: '>= 18'} + seq-queue@0.0.5: + resolution: {integrity: sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==} + serialize-error@2.1.0: resolution: {integrity: sha512-ghgmKt5o4Tly5yEG/UJp8qTd0AN7Xalw4XBtDEKP655B699qMEtra1WlXeE6WIvdEG481JvRxULKsInq/iNysw==} engines: {node: '>=0.10.0'} @@ -8628,6 +8660,10 @@ packages: sprintf-js@1.1.3: resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==} + sqlstring@2.3.3: + resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==} + engines: {node: '>= 0.6'} + stable-hash@0.0.5: resolution: {integrity: sha512-+L3ccpzibovGXFK+Ap/f8LOS0ahMrHTf3xu7mMLSpEGU0EO9ucaysSylKo9eRDFNhWve/y275iPmIZ4z39a9iA==} @@ -14567,6 +14603,8 @@ snapshots: dependencies: possible-typed-array-names: 1.1.0 + aws-ssl-profiles@1.1.2: {} + axe-core@4.11.1: {} axobject-query@4.1.0: {} @@ -15301,6 +15339,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + depd@2.0.0: {} dequal@2.0.3: {} @@ -16561,6 +16601,10 @@ snapshots: transitivePeerDependencies: - supports-color + generate-function@2.3.1: + dependencies: + is-property: 1.0.2 + generator-function@2.0.1: {} gensync@1.0.0-beta.2: {} @@ -17149,6 +17193,8 @@ snapshots: is-promise@4.0.0: {} + is-property@1.0.2: {} + is-regex@1.2.1: dependencies: call-bound: 1.0.4 @@ -17630,6 +17676,8 @@ snapshots: lru-cache@7.18.3: {} + lru.min@1.1.4: {} + lucide-react-native@0.563.0(react-native-svg@15.15.1(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0))(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0): dependencies: react: 19.1.0 @@ -18330,12 +18378,28 @@ snapshots: mute-stream@2.0.0: {} + mysql2@3.16.3: + dependencies: + aws-ssl-profiles: 1.1.2 + denque: 2.1.0 + generate-function: 2.3.1 + iconv-lite: 0.7.2 + long: 5.3.2 + lru.min: 1.1.4 + named-placeholders: 1.1.6 + seq-queue: 0.0.5 + sqlstring: 2.3.3 + mz@2.7.0: dependencies: any-promise: 1.3.0 object-assign: 4.1.1 thenify-all: 1.6.0 + named-placeholders@1.1.6: + dependencies: + lru.min: 1.1.4 + nanoid@3.3.11: {} napi-postinstall@0.3.4: {} @@ -19691,6 +19755,8 @@ snapshots: transitivePeerDependencies: - supports-color + seq-queue@0.0.5: {} + serialize-error@2.1.0: {} serialize-error@7.0.1: @@ -19993,6 +20059,8 @@ snapshots: sprintf-js@1.1.3: optional: true + sqlstring@2.3.3: {} + stable-hash@0.0.5: {} stack-utils@2.0.6: diff --git a/src/gateway/app.module.ts b/src/gateway/app.module.ts index 03200ec5..cd478bf0 100644 --- a/src/gateway/app.module.ts +++ b/src/gateway/app.module.ts @@ -3,8 +3,10 @@ import { ServeStaticModule } from "@nestjs/serve-static"; import { LoggerModule } from "nestjs-pino"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; -import { EventsGateway } from "./events.gateway.js"; import { AppController } from "./app.controller.js"; +import { DatabaseModule } from "./database/database.module.js"; +import { GatewayModule } from "./gateway.module.js"; +import { TelegramModule } from "./telegram/telegram.module.js"; const __dirname = fileURLToPath(new URL(".", import.meta.url)); const isDev = process.env.NODE_ENV !== "production"; @@ -31,8 +33,10 @@ const isDev = process.env.NODE_ENV !== "production"; level: process.env.LOG_LEVEL ?? "info", }, }), + DatabaseModule, + GatewayModule, + TelegramModule, ], - providers: [EventsGateway], controllers: [AppController], }) export class AppModule {} diff --git a/src/gateway/database/database.module.ts b/src/gateway/database/database.module.ts new file mode 100644 index 00000000..a357a4e3 --- /dev/null +++ b/src/gateway/database/database.module.ts @@ -0,0 +1,15 @@ +/** + * Database module for Gateway. + * + * Global module that provides DatabaseService to all other modules. + */ + +import { Global, Module } from "@nestjs/common"; +import { DatabaseService } from "./database.service.js"; + +@Global() +@Module({ + providers: [DatabaseService], + exports: [DatabaseService], +}) +export class DatabaseModule {} diff --git a/src/gateway/database/database.service.ts b/src/gateway/database/database.service.ts new file mode 100644 index 00000000..27a675f6 --- /dev/null +++ b/src/gateway/database/database.service.ts @@ -0,0 +1,93 @@ +/** + * MySQL database service for Gateway. + * + * Provides a connection pool and query interface. + * Configuration is read from MYSQL_DSN environment variable. + */ + +import { Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit, OnModuleDestroy } from "@nestjs/common"; +import mysql from "mysql2/promise"; +import type { Pool, PoolOptions, RowDataPacket, ResultSetHeader } from "mysql2/promise"; + +@Injectable() +export class DatabaseService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(DatabaseService.name); + private pool: Pool | null = null; + + async onModuleInit(): Promise { + console.log("[DatabaseService] onModuleInit starting..."); + const dsn = process.env["MYSQL_DSN"]; + if (!dsn) { + console.log("[DatabaseService] MYSQL_DSN not set"); + this.logger.warn("MYSQL_DSN not set, database features disabled"); + return; + } + + try { + console.log("[DatabaseService] Parsing DSN..."); + const config = this.parseDsn(dsn); + console.log("[DatabaseService] Creating pool..."); + this.pool = mysql.createPool(config); + + // Test connection + console.log("[DatabaseService] Testing connection..."); + const connection = await this.pool.getConnection(); + connection.release(); + console.log("[DatabaseService] Connected!"); + this.logger.log(`MySQL connected (host=${config.host}, database=${config.database})`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.log("[DatabaseService] Error:", message); + this.logger.error(`Failed to connect to MySQL: ${message}`); + throw error; + } + } + + async onModuleDestroy(): Promise { + if (this.pool) { + await this.pool.end(); + this.logger.log("MySQL connection pool closed"); + } + } + + /** Check if database is available */ + isAvailable(): boolean { + return this.pool !== null; + } + + /** Execute a query and return rows */ + async query(sql: string, params?: unknown[]): Promise { + if (!this.pool) { + throw new Error("Database not available"); + } + const [rows] = await this.pool.query(sql, params); + return rows; + } + + /** Execute an insert/update/delete and return result */ + async execute(sql: string, params?: unknown[]): Promise { + if (!this.pool) { + throw new Error("Database not available"); + } + const [result] = await this.pool.execute(sql, params); + return result; + } + + /** Parse MySQL DSN string into connection options */ + private parseDsn(dsn: string): PoolOptions { + // Format: mysql://user:password@host:port/database + const url = new URL(dsn); + + return { + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : 3306, + user: url.username, + password: url.password, + database: url.pathname.slice(1), // Remove leading / + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0, + }; + } +} diff --git a/src/gateway/events.gateway.ts b/src/gateway/events.gateway.ts index ebeb2191..304ba5da 100644 --- a/src/gateway/events.gateway.ts +++ b/src/gateway/events.gateway.ts @@ -1,4 +1,4 @@ -import { Injectable } from "@nestjs/common"; +import { Injectable, Logger } from "@nestjs/common"; import { WebSocketGateway, WebSocketServer, @@ -12,7 +12,6 @@ import type { OnGatewayDisconnect, } from "@nestjs/websockets"; import type { Server, Socket } from "socket.io"; -import { InjectPinoLogger, PinoLogger } from "nestjs-pino"; import { GatewayEvents, type RoutedMessage, @@ -22,6 +21,7 @@ import { type DeviceInfo, type DeviceType, } from "@multica/sdk"; +import type { VirtualDeviceHandler } from "./types.js"; @Injectable() @WebSocketGateway({ @@ -36,10 +36,7 @@ import { export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { - constructor( - @InjectPinoLogger(EventsGateway.name) - private readonly logger: PinoLogger - ) {} + private readonly logger = new Logger(EventsGateway.name); @WebSocketServer() server!: Server; @@ -48,9 +45,11 @@ export class EventsGateway private deviceToSocket = new Map(); // socketId -> deviceInfo mapping private socketToDevice = new Map(); + // Virtual devices (non-socket based, e.g., Telegram) + private virtualDevices = new Map(); afterInit(_server: Server): void { - this.logger.info("WebSocket Gateway initialized"); + this.logger.log("WebSocket Gateway initialized"); } handleConnection(client: Socket): void { @@ -58,16 +57,10 @@ export class EventsGateway const deviceId = query["deviceId"] as string | undefined; const deviceType = query["deviceType"] as DeviceType | undefined; - this.logger.debug( - { socketId: client.id, deviceId, deviceType }, - "Incoming connection" - ); + this.logger.debug(`Incoming connection: socketId=${client.id}, deviceId=${deviceId}, deviceType=${deviceType}`); if (!deviceId || !deviceType) { - this.logger.warn( - { socketId: client.id }, - "Missing deviceId or deviceType in query, disconnecting" - ); + this.logger.warn(`Missing deviceId or deviceType in query, disconnecting (socketId=${client.id})`); client.disconnect(true); return; } @@ -75,10 +68,7 @@ export class EventsGateway // Check if deviceId is already in use by another socket const existingSocketId = this.deviceToSocket.get(deviceId); if (existingSocketId && existingSocketId !== client.id) { - this.logger.warn( - { deviceId, existingSocketId }, - "Device already registered by another socket, disconnecting" - ); + this.logger.warn(`Device already registered by another socket, disconnecting (deviceId=${deviceId}, existingSocketId=${existingSocketId})`); client.emit(GatewayEvents.REGISTERED, { success: false, deviceId, @@ -93,25 +83,19 @@ export class EventsGateway this.deviceToSocket.set(deviceId, client.id); this.socketToDevice.set(client.id, deviceInfo); - this.logger.info({ deviceId, deviceType }, "Device connected and registered"); + this.logger.log(`Device connected and registered: deviceId=${deviceId}, deviceType=${deviceType}`); client.emit(GatewayEvents.REGISTERED, { success: true, deviceId }); } handleDisconnect(client: Socket): void { const deviceInfo = this.socketToDevice.get(client.id); if (deviceInfo) { - this.logger.debug( - { socketId: client.id, deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType }, - "Device disconnecting" - ); - this.logger.info( - { deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType }, - "Device disconnected" - ); + this.logger.debug(`Device disconnecting: socketId=${client.id}, deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`); + this.logger.log(`Device disconnected: deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`); this.deviceToSocket.delete(deviceInfo.deviceId); this.socketToDevice.delete(client.id); } else { - this.logger.info({ socketId: client.id }, "Socket disconnected"); + this.logger.log(`Socket disconnected: socketId=${client.id}`); } } @@ -120,10 +104,7 @@ export class EventsGateway @MessageBody() message: RoutedMessage, @ConnectedSocket() client: Socket ): void { - this.logger.debug( - { socketId: client.id, message }, - "Received send event" - ); + this.logger.debug(`Received send event: socketId=${client.id}, messageId=${message.id}`); const senderDevice = this.socketToDevice.get(client.id); @@ -149,24 +130,27 @@ export class EventsGateway return; } - // Find target device + // Find target device — check socket-based first, then virtual const targetSocketId = this.deviceToSocket.get(message.to); - if (!targetSocketId) { - const error: SendErrorResponse = { - messageId: message.id, - error: `Device ${message.to} not found`, - code: "DEVICE_NOT_FOUND", - }; - client.emit(GatewayEvents.SEND_ERROR, error); + if (targetSocketId) { + this.logger.debug(`Routing message: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); return; } - // Forward message - this.logger.debug( - { messageId: message.id, from: message.from, to: message.to, action: message.action }, - "Routing message" - ); - this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); + const virtualHandler = this.virtualDevices.get(message.to); + if (virtualHandler) { + this.logger.debug(`Routing message to virtual device: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + virtualHandler.sendCallback(GatewayEvents.RECEIVE, message); + return; + } + + const error: SendErrorResponse = { + messageId: message.id, + error: `Device ${message.to} not found`, + code: "DEVICE_NOT_FOUND", + }; + client.emit(GatewayEvents.SEND_ERROR, error); } @SubscribeMessage(GatewayEvents.LIST_DEVICES) @@ -185,7 +169,7 @@ export class EventsGateway @MessageBody() data: PingPayload, @ConnectedSocket() client: Socket ): PongResponse { - this.logger.debug({ socketId: client.id, data }, "Received ping"); + this.logger.debug(`Received ping: socketId=${client.id}`); return { event: GatewayEvents.PONG, data: "Hello from Gateway!" }; } @@ -201,9 +185,63 @@ export class EventsGateway /** Send message to specified device (for HTTP API use) */ sendToDevice(deviceId: string, event: string, data: unknown): boolean { + // Check virtual devices first + const virtualHandler = this.virtualDevices.get(deviceId); + if (virtualHandler) { + this.logger.debug(`Routing to virtual device: deviceId=${deviceId}, event=${event}`); + virtualHandler.sendCallback(event, data); + return true; + } + + // Fall back to socket-based devices const socketId = this.deviceToSocket.get(deviceId); if (!socketId) return false; this.server.to(socketId).emit(event, data); return true; } + + /** Register a virtual device (non-socket based) */ + registerVirtualDevice(deviceId: string, handler: VirtualDeviceHandler): void { + this.virtualDevices.set(deviceId, handler); + this.logger.log(`Virtual device registered: deviceId=${deviceId}`); + } + + /** Unregister a virtual device */ + unregisterVirtualDevice(deviceId: string): void { + this.virtualDevices.delete(deviceId); + this.logger.log(`Virtual device unregistered: deviceId=${deviceId}`); + } + + /** Check if a device (socket or virtual) is registered */ + isDeviceRegistered(deviceId: string): boolean { + return this.deviceToSocket.has(deviceId) || this.virtualDevices.has(deviceId); + } + + /** Route a message originating from a virtual device to its target */ + routeFromVirtualDevice(message: RoutedMessage): boolean { + // Validate sender is a registered virtual device + if (!this.virtualDevices.has(message.from)) { + this.logger.warn(`routeFromVirtualDevice: sender not a virtual device (from=${message.from})`); + return false; + } + + // Try socket-based target first + const targetSocketId = this.deviceToSocket.get(message.to); + if (targetSocketId) { + this.logger.debug(`Virtual device routing: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); + return true; + } + + // Try virtual device target + const virtualHandler = this.virtualDevices.get(message.to); + if (virtualHandler) { + this.logger.debug(`Virtual device routing (v2v): id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + virtualHandler.sendCallback(GatewayEvents.RECEIVE, message); + return true; + } + + this.logger.warn(`routeFromVirtualDevice: target not found (to=${message.to})`); + return false; + } } diff --git a/src/gateway/gateway.module.ts b/src/gateway/gateway.module.ts index 414d18d4..3b285db3 100644 --- a/src/gateway/gateway.module.ts +++ b/src/gateway/gateway.module.ts @@ -1,6 +1,7 @@ -import { Module } from "@nestjs/common"; +import { Global, Module } from "@nestjs/common"; import { EventsGateway } from "./events.gateway.js"; +@Global() @Module({ providers: [EventsGateway], exports: [EventsGateway], diff --git a/src/gateway/main.ts b/src/gateway/main.ts index 424ee6b0..02a9a017 100644 --- a/src/gateway/main.ts +++ b/src/gateway/main.ts @@ -3,16 +3,27 @@ import { NestFactory } from "@nestjs/core"; import { Logger } from "nestjs-pino"; import { AppModule } from "./app.module.js"; +console.log("[Gateway] Starting bootstrap..."); + async function bootstrap(): Promise { - const app = await NestFactory.create(AppModule, { bufferLogs: true }); - app.useLogger(app.get(Logger)); + console.log("[Gateway] Creating NestFactory..."); + try { + const app = await NestFactory.create(AppModule, { bufferLogs: true, abortOnError: false }); + console.log("[Gateway] NestFactory created"); - const port = process.env["PORT"] ?? 3000; - await app.listen(port); + app.useLogger(app.get(Logger)); - const logger = app.get(Logger); - logger.log(`Gateway is running on http://localhost:${port}`); - logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`); + const port = process.env["PORT"] ?? 3000; + console.log(`[Gateway] Listening on port ${port}...`); + await app.listen(port); + + const logger = app.get(Logger); + logger.log(`Gateway is running on http://localhost:${port}`); + logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`); + } catch (err) { + console.error("[Gateway] Error during bootstrap:", err); + throw err; + } } bootstrap().catch((err) => { diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts new file mode 100644 index 00000000..5bd839ec --- /dev/null +++ b/src/gateway/telegram/telegram-user.store.ts @@ -0,0 +1,152 @@ +/** + * Telegram user store - MySQL persistence layer. + */ + +import { Inject, Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit } from "@nestjs/common"; +import { v7 as uuidv7 } from "uuid"; +import type { RowDataPacket } from "mysql2/promise"; +import { DatabaseService } from "../database/database.service.js"; +import type { TelegramUser, TelegramUserCreate } from "./types.js"; + +interface TelegramUserRow extends RowDataPacket { + telegram_user_id: string; + hub_url: string; + device_id: string; + created_at: Date; + updated_at: Date; + telegram_username: string | null; + telegram_first_name: string | null; + telegram_last_name: string | null; +} + +@Injectable() +export class TelegramUserStore implements OnModuleInit { + private readonly logger = new Logger(TelegramUserStore.name); + + constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {} + + async onModuleInit(): Promise { + console.log("[TelegramUserStore] onModuleInit starting..."); + if (!this.db.isAvailable()) { + console.log("[TelegramUserStore] Database not available"); + this.logger.warn("Database not available, TelegramUserStore disabled"); + return; + } + console.log("[TelegramUserStore] Ensuring table..."); + await this.ensureTable(); + console.log("[TelegramUserStore] Done"); + } + + /** Create telegram_users table if not exists */ + private async ensureTable(): Promise { + const sql = ` + CREATE TABLE IF NOT EXISTS telegram_users ( + telegram_user_id VARCHAR(64) PRIMARY KEY, + hub_url VARCHAR(512) NOT NULL, + device_id VARCHAR(64) NOT NULL UNIQUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + telegram_username VARCHAR(255), + telegram_first_name VARCHAR(255), + telegram_last_name VARCHAR(255), + INDEX idx_device_id (device_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + `; + await this.db.execute(sql); + this.logger.log("telegram_users table ensured"); + } + + /** Find user by Telegram user ID */ + async findByTelegramUserId(telegramUserId: string): Promise { + if (!this.db.isAvailable()) return null; + + const rows = await this.db.query( + "SELECT * FROM telegram_users WHERE telegram_user_id = ?", + [telegramUserId] + ); + + if (rows.length === 0) return null; + return this.rowToUser(rows[0]!); + } + + /** Find user by device ID */ + async findByDeviceId(deviceId: string): Promise { + if (!this.db.isAvailable()) return null; + + const rows = await this.db.query( + "SELECT * FROM telegram_users WHERE device_id = ?", + [deviceId] + ); + + if (rows.length === 0) return null; + return this.rowToUser(rows[0]!); + } + + /** Create or update a Telegram user */ + async upsert(data: TelegramUserCreate): Promise { + if (!this.db.isAvailable()) { + throw new Error("Database not available"); + } + + // Check if user exists + const existing = await this.findByTelegramUserId(data.telegramUserId); + + if (existing) { + // Update existing user + await this.db.execute( + `UPDATE telegram_users SET + hub_url = ?, + telegram_username = ?, + telegram_first_name = ?, + telegram_last_name = ? + WHERE telegram_user_id = ?`, + [ + data.hubUrl, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + data.telegramUserId, + ] + ); + + const updated = await this.findByTelegramUserId(data.telegramUserId); + return updated!; + } + + // Create new user with generated device ID + const deviceId = `tg-${uuidv7()}`; + + await this.db.execute( + `INSERT INTO telegram_users ( + telegram_user_id, hub_url, device_id, + telegram_username, telegram_first_name, telegram_last_name + ) VALUES (?, ?, ?, ?, ?, ?)`, + [ + data.telegramUserId, + data.hubUrl, + deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + ] + ); + + const created = await this.findByTelegramUserId(data.telegramUserId); + return created!; + } + + /** Convert database row to TelegramUser object */ + private rowToUser(row: TelegramUserRow): TelegramUser { + return { + telegramUserId: row.telegram_user_id, + hubUrl: row.hub_url, + deviceId: row.device_id, + createdAt: row.created_at, + updatedAt: row.updated_at, + telegramUsername: row.telegram_username ?? undefined, + telegramFirstName: row.telegram_first_name ?? undefined, + telegramLastName: row.telegram_last_name ?? undefined, + }; + } +} diff --git a/src/gateway/telegram/telegram.controller.ts b/src/gateway/telegram/telegram.controller.ts new file mode 100644 index 00000000..2e7c2c39 --- /dev/null +++ b/src/gateway/telegram/telegram.controller.ts @@ -0,0 +1,67 @@ +/** + * Telegram webhook controller. + * + * Receives webhook requests from Telegram Bot API. + */ + +import { Controller, Inject, Logger, Post, Req, Res, Headers } from "@nestjs/common"; +import { TelegramService } from "./telegram.service.js"; + +// Minimal Express types for webhook handling +interface ExpressRequest { + body: unknown; + header: (name: string) => string | undefined; +} + +interface ExpressResponse { + status: (code: number) => ExpressResponse; + json: (data: unknown) => void; + headersSent: boolean; +} + +@Controller("telegram") +export class TelegramController { + private readonly logger = new Logger(TelegramController.name); + + constructor(@Inject(TelegramService) private readonly telegramService: TelegramService) {} + + @Post("webhook") + async handleWebhook( + @Req() req: ExpressRequest, + @Res() res: ExpressResponse, + @Headers("x-telegram-bot-api-secret-token") secretToken?: string + ): Promise { + // Check if Telegram is configured + if (!this.telegramService.isConfigured()) { + this.logger.warn("Telegram webhook called but bot not configured"); + res.status(503).json({ error: "Telegram not configured" }); + return; + } + + // Validate secret token if configured + const expectedToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"]; + if (expectedToken && secretToken !== expectedToken) { + this.logger.warn("Invalid Telegram webhook secret token"); + res.status(401).json({ error: "Unauthorized" }); + return; + } + + // Get grammY webhook callback + const callback = this.telegramService.getWebhookCallback(); + if (!callback) { + res.status(503).json({ error: "Telegram not configured" }); + return; + } + + // Let grammY handle the request + try { + await callback(req, res); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Telegram webhook error: ${message}`); + if (!res.headersSent) { + res.status(500).json({ error: "Internal server error" }); + } + } + } +} diff --git a/src/gateway/telegram/telegram.module.ts b/src/gateway/telegram/telegram.module.ts new file mode 100644 index 00000000..debdb7b4 --- /dev/null +++ b/src/gateway/telegram/telegram.module.ts @@ -0,0 +1,17 @@ +/** + * Telegram module for Gateway. + * + * Provides Telegram webhook functionality. + */ + +import { Module } from "@nestjs/common"; +import { TelegramController } from "./telegram.controller.js"; +import { TelegramService } from "./telegram.service.js"; +import { TelegramUserStore } from "./telegram-user.store.js"; + +@Module({ + controllers: [TelegramController], + providers: [TelegramService, TelegramUserStore], + exports: [TelegramService], +}) +export class TelegramModule {} diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts new file mode 100644 index 00000000..9cf0bcac --- /dev/null +++ b/src/gateway/telegram/telegram.service.ts @@ -0,0 +1,273 @@ +/** + * Telegram service for Gateway. + * + * Handles Telegram bot interactions via webhook. + * - New users: prompts for Hub URL + * - Bound users: routes messages to their Hub + */ + +import { Inject, Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit } from "@nestjs/common"; +import { Bot, webhookCallback } from "grammy"; +import type { Context } from "grammy"; +import { EventsGateway } from "../events.gateway.js"; +import { TelegramUserStore } from "./telegram-user.store.js"; +import type { TelegramUser } from "./types.js"; + +// Minimal Express types for webhook handling +interface ExpressRequest { + body: unknown; + header: (name: string) => string | undefined; +} + +interface ExpressResponse { + status: (code: number) => ExpressResponse; + json: (data: unknown) => void; + headersSent: boolean; +} + +// Users in the process of binding Hub URL +interface PendingBinding { + awaitingUrl: boolean; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +@Injectable() +export class TelegramService implements OnModuleInit { + private bot: Bot | null = null; + private pendingBindings = new Map(); + + private readonly logger = new Logger(TelegramService.name); + + constructor( + @Inject(EventsGateway) private readonly eventsGateway: EventsGateway, + @Inject(TelegramUserStore) private readonly userStore: TelegramUserStore, + ) {} + + async onModuleInit(): Promise { + console.log("[TelegramService] onModuleInit starting..."); + const token = process.env["TELEGRAM_BOT_TOKEN"]; + if (!token) { + console.log("[TelegramService] No bot token"); + this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled"); + return; + } + + console.log("[TelegramService] Creating bot..."); + this.bot = new Bot(token); + this.setupHandlers(); + this.logger.log("Telegram bot initialized"); + } + + /** Get grammY webhook callback for Express/NestJS */ + getWebhookCallback(): ((req: ExpressRequest, res: ExpressResponse) => Promise) | null { + if (!this.bot) return null; + + const secretToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"]; + if (secretToken) { + return webhookCallback(this.bot, "express", { secretToken }) as unknown as ( + req: ExpressRequest, + res: ExpressResponse + ) => Promise; + } + return webhookCallback(this.bot, "express") as unknown as ( + req: ExpressRequest, + res: ExpressResponse + ) => Promise; + } + + /** Check if Telegram bot is configured */ + isConfigured(): boolean { + return this.bot !== null; + } + + /** Send message to a Telegram user by device ID */ + async sendToTelegram(deviceId: string, text: string): Promise { + if (!this.bot) return; + + const user = await this.userStore.findByDeviceId(deviceId); + if (!user) { + this.logger.warn(`Telegram user not found for device: deviceId=${deviceId}`); + return; + } + + try { + await this.bot.api.sendMessage(Number(user.telegramUserId), text); + this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`); + } + } + + /** Setup bot message handlers */ + private setupHandlers(): void { + if (!this.bot) return; + + this.bot.on("message:text", async (ctx) => { + await this.handleTextMessage(ctx); + }); + } + + /** Handle incoming text message */ + private async handleTextMessage(ctx: Context): Promise { + const msg = ctx.message; + if (!msg || !msg.text) return; + + const telegramUserId = String(msg.from?.id); + const text = msg.text.trim(); + + this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + + if (user) { + // User is bound, route message to Hub + await this.routeToHub(user, text, ctx); + return; + } + + // Check if user is in binding process + const pending = this.pendingBindings.get(telegramUserId); + + if (pending?.awaitingUrl) { + // User is providing Hub URL + await this.handleHubUrlInput(ctx, telegramUserId, text, pending); + return; + } + + // New user, start binding process + await this.startBindingProcess(ctx, telegramUserId); + } + + /** Start the Hub binding process for a new user */ + private async startBindingProcess(ctx: Context, telegramUserId: string): Promise { + const msg = ctx.message; + + this.pendingBindings.set(telegramUserId, { + awaitingUrl: true, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + telegramLastName: msg?.from?.last_name, + }); + + await ctx.reply( + "👋 Welcome to Multica!\n\n" + + "Please enter your Hub URL to get started.\n\n" + + "Example: https://your-hub.example.com" + ); + } + + /** Handle Hub URL input from user */ + private async handleHubUrlInput( + ctx: Context, + telegramUserId: string, + url: string, + pending: PendingBinding + ): Promise { + // Validate URL format + if (!this.isValidUrl(url)) { + await ctx.reply( + "❌ Invalid URL format.\n\n" + + "Please enter a valid Hub URL.\n" + + "Example: https://your-hub.example.com" + ); + return; + } + + // Validate Hub connectivity + const isValid = await this.validateHubUrl(url); + if (!isValid) { + await ctx.reply( + "❌ Cannot connect to this Hub.\n\n" + + "Please check the URL and make sure your Hub is online.\n" + + "Then try again with the correct URL." + ); + return; + } + + // Create user record + try { + const user = await this.userStore.upsert({ + telegramUserId, + hubUrl: url, + telegramUsername: pending.telegramUsername, + telegramFirstName: pending.telegramFirstName, + telegramLastName: pending.telegramLastName, + }); + + // Register as virtual device + this.eventsGateway.registerVirtualDevice(user.deviceId, { + sendCallback: (_event, data) => { + const payload = data as { text?: string }; + if (payload.text) { + this.sendToTelegram(user.deviceId, payload.text); + } + }, + }); + + this.pendingBindings.delete(telegramUserId); + + await ctx.reply( + "✅ Hub connected successfully!\n\n" + + `Your Device ID: ${user.deviceId}\n\n` + + "You can now send messages to interact with your agent." + ); + + this.logger.log(`Telegram user bound to Hub: telegramUserId=${telegramUserId}, hubUrl=${url}, deviceId=${user.deviceId}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to bind Telegram user: telegramUserId=${telegramUserId}, error=${message}`); + await ctx.reply("❌ An error occurred. Please try again later."); + } + } + + /** Validate URL format */ + private isValidUrl(url: string): boolean { + try { + const parsed = new URL(url); + return parsed.protocol === "http:" || parsed.protocol === "https:"; + } catch { + return false; + } + } + + /** Validate Hub URL connectivity */ + private async validateHubUrl(url: string): Promise { + try { + // Try to connect to the Hub's health endpoint + const response = await fetch(`${url}/`, { + method: "GET", + signal: AbortSignal.timeout(5000), + }); + return response.ok; + } catch { + return false; + } + } + + /** Route message to user's Hub */ + private async routeToHub(user: TelegramUser, text: string, ctx: Context): Promise { + // Ensure virtual device is registered + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.eventsGateway.registerVirtualDevice(user.deviceId, { + sendCallback: (_event, data) => { + const payload = data as { text?: string }; + if (payload.text) { + this.sendToTelegram(user.deviceId, payload.text); + } + }, + }); + } + + // TODO: Route message to Hub via EventsGateway + // For now, just acknowledge receipt + this.logger.log(`Routing message to Hub: deviceId=${user.deviceId}, hubUrl=${user.hubUrl}`); + + // Placeholder: In full implementation, this would send to Hub + await ctx.reply(`📨 Message received. Routing to your Hub...`); + } +} diff --git a/src/gateway/telegram/types.ts b/src/gateway/telegram/types.ts new file mode 100644 index 00000000..315dd10d --- /dev/null +++ b/src/gateway/telegram/types.ts @@ -0,0 +1,27 @@ +/** + * Telegram user types for Gateway. + */ + +/** Telegram user record stored in database */ +export interface TelegramUser { + telegramUserId: string; + hubUrl: string; + deviceId: string; + createdAt: Date; + updatedAt: Date; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +/** Data required to create/update a Telegram user */ +export interface TelegramUserCreate { + telegramUserId: string; + hubUrl: string; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +// Re-export VirtualDeviceHandler from parent for convenience +export type { VirtualDeviceHandler } from "../types.js"; diff --git a/src/gateway/test-app.ts b/src/gateway/test-app.ts new file mode 100644 index 00000000..05cd0bc0 --- /dev/null +++ b/src/gateway/test-app.ts @@ -0,0 +1,29 @@ +import "reflect-metadata"; +import { NestFactory } from "@nestjs/core"; +import { Module } from "@nestjs/common"; +import { LoggerModule } from "nestjs-pino"; +import { DatabaseModule } from "./database/database.module.js"; +import { GatewayModule } from "./gateway.module.js"; +import { TelegramModule } from "./telegram/telegram.module.js"; + +@Module({ + imports: [ + LoggerModule.forRoot({ pinoHttp: { level: "debug" } }), + DatabaseModule, + GatewayModule, + TelegramModule, + ], +}) +class TestAppModule {} + +console.log("Creating test app..."); +NestFactory.create(TestAppModule, { abortOnError: true }) + .then(async (app) => { + console.log("Test app created!"); + await app.listen(3000); + console.log("Listening on 3000!"); + }) + .catch((err) => { + console.error("Error:", err); + process.exit(1); + }); diff --git a/src/gateway/types.ts b/src/gateway/types.ts new file mode 100644 index 00000000..aa0dc5fc --- /dev/null +++ b/src/gateway/types.ts @@ -0,0 +1,8 @@ +/** + * Gateway types. + */ + +/** Virtual device handler for non-socket devices (e.g., Telegram) */ +export interface VirtualDeviceHandler { + sendCallback: (event: string, data: unknown) => void | Promise; +} From dc60cb754d8c6328251110068f1041a6a7946193 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:04:35 +0800 Subject: [PATCH 2/9] feat(telegram): replace Hub URL binding with connection link verification Replace the two-step Hub URL binding flow with a multica://connect connection link flow that uses the same verify RPC handshake as the SDK/web clients. Changes: - types.ts: replace hubUrl with hubId + agentId fields - telegram-user.store.ts: update DB schema (hub_id, agent_id columns), accept explicit deviceId in upsert - telegram.service.ts: rewrite with parseConnectionCode validation, verify RPC via routeFromVirtualDevice, pending request map for RPC promise tracking, smart sendCallback for stream/message/RPC - package.json: add @multica/store workspace dependency Flow: user pastes multica://connect link -> parse & validate -> check Hub online -> register virtual device -> verify RPC to Hub -> Desktop approval -> save to DB -> route messages Co-Authored-By: Claude Opus 4.6 --- package.json | 1 + pnpm-lock.yaml | 3 + src/gateway/telegram/telegram-user.store.ts | 30 +- src/gateway/telegram/telegram.service.ts | 374 +++++++++++++------- src/gateway/telegram/types.ts | 7 +- 5 files changed, 279 insertions(+), 136 deletions(-) diff --git a/package.json b/package.json index 3c492741..fd91fcea 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "@mariozechner/pi-coding-agent": "^0.52.9", "@mozilla/readability": "^0.6.0", "@multica/sdk": "workspace:*", + "@multica/store": "workspace:*", "@nestjs/common": "^11.1.12", "@nestjs/core": "^11.1.12", "@nestjs/platform-express": "^11.1.12", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 72d97303..5f0841c8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -47,6 +47,9 @@ importers: '@multica/sdk': specifier: workspace:* version: link:packages/sdk + '@multica/store': + specifier: workspace:* + version: link:packages/store '@nestjs/common': specifier: ^11.1.12 version: 11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2) diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts index 5bd839ec..1570bc8d 100644 --- a/src/gateway/telegram/telegram-user.store.ts +++ b/src/gateway/telegram/telegram-user.store.ts @@ -11,7 +11,8 @@ import type { TelegramUser, TelegramUserCreate } from "./types.js"; interface TelegramUserRow extends RowDataPacket { telegram_user_id: string; - hub_url: string; + hub_id: string; + agent_id: string; device_id: string; created_at: Date; updated_at: Date; @@ -43,7 +44,8 @@ export class TelegramUserStore implements OnModuleInit { const sql = ` CREATE TABLE IF NOT EXISTS telegram_users ( telegram_user_id VARCHAR(64) PRIMARY KEY, - hub_url VARCHAR(512) NOT NULL, + hub_id VARCHAR(64) NOT NULL, + agent_id VARCHAR(64) NOT NULL, device_id VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, @@ -93,16 +95,20 @@ export class TelegramUserStore implements OnModuleInit { const existing = await this.findByTelegramUserId(data.telegramUserId); if (existing) { - // Update existing user + // Update existing user — also update device_id if provided await this.db.execute( `UPDATE telegram_users SET - hub_url = ?, + hub_id = ?, + agent_id = ?, + device_id = ?, telegram_username = ?, telegram_first_name = ?, telegram_last_name = ? WHERE telegram_user_id = ?`, [ - data.hubUrl, + data.hubId, + data.agentId, + data.deviceId ?? existing.deviceId, data.telegramUsername ?? null, data.telegramFirstName ?? null, data.telegramLastName ?? null, @@ -114,17 +120,18 @@ export class TelegramUserStore implements OnModuleInit { return updated!; } - // Create new user with generated device ID - const deviceId = `tg-${uuidv7()}`; + // Create new user with provided or generated device ID + const deviceId = data.deviceId ?? `tg-${uuidv7()}`; await this.db.execute( `INSERT INTO telegram_users ( - telegram_user_id, hub_url, device_id, + telegram_user_id, hub_id, agent_id, device_id, telegram_username, telegram_first_name, telegram_last_name - ) VALUES (?, ?, ?, ?, ?, ?)`, + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, [ data.telegramUserId, - data.hubUrl, + data.hubId, + data.agentId, deviceId, data.telegramUsername ?? null, data.telegramFirstName ?? null, @@ -140,7 +147,8 @@ export class TelegramUserStore implements OnModuleInit { private rowToUser(row: TelegramUserRow): TelegramUser { return { telegramUserId: row.telegram_user_id, - hubUrl: row.hub_url, + hubId: row.hub_id, + agentId: row.agent_id, deviceId: row.device_id, createdAt: row.created_at, updatedAt: row.updated_at, diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts index 9cf0bcac..1076b2de 100644 --- a/src/gateway/telegram/telegram.service.ts +++ b/src/gateway/telegram/telegram.service.ts @@ -2,14 +2,30 @@ * Telegram service for Gateway. * * Handles Telegram bot interactions via webhook. - * - New users: prompts for Hub URL - * - Bound users: routes messages to their Hub + * - New users: prompts to paste a multica://connect link + * - Connection link: verifies with Hub via RPC, persists to DB + * - Bound users: routes messages to their Hub agent */ import { Inject, Injectable, Logger } from "@nestjs/common"; import type { OnModuleInit } from "@nestjs/common"; import { Bot, webhookCallback } from "grammy"; import type { Context } from "grammy"; +import { v7 as uuidv7 } from "uuid"; +import { parseConnectionCode } from "@multica/store/connection"; +import type { ConnectionInfo } from "@multica/store/connection"; +import { + GatewayEvents, + RequestAction, + ResponseAction, + StreamAction, + type RoutedMessage, + type RequestPayload, + type ResponsePayload, + type VerifyParams, + type VerifyResult, +} from "@multica/sdk"; +import type { StreamPayload } from "@multica/sdk"; import { EventsGateway } from "../events.gateway.js"; import { TelegramUserStore } from "./telegram-user.store.js"; import type { TelegramUser } from "./types.js"; @@ -26,18 +42,18 @@ interface ExpressResponse { headersSent: boolean; } -// Users in the process of binding Hub URL -interface PendingBinding { - awaitingUrl: boolean; - telegramUsername?: string | undefined; - telegramFirstName?: string | undefined; - telegramLastName?: string | undefined; +interface PendingRequest { + resolve: (value: T) => void; + reject: (reason: Error) => void; + timer: ReturnType; } +const VERIFY_TIMEOUT_MS = 30_000; + @Injectable() export class TelegramService implements OnModuleInit { private bot: Bot | null = null; - private pendingBindings = new Map(); + private pendingRequests = new Map(); private readonly logger = new Logger(TelegramService.name); @@ -121,153 +137,265 @@ export class TelegramService implements OnModuleInit { this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); + // Connection link — always handle, even for already-bound users (re-binding) + if (text.startsWith("multica://connect?")) { + await this.handleConnectionLink(ctx, telegramUserId, text); + return; + } + // Check if user is bound const user = await this.userStore.findByTelegramUserId(telegramUserId); if (user) { - // User is bound, route message to Hub await this.routeToHub(user, text, ctx); return; } - // Check if user is in binding process - const pending = this.pendingBindings.get(telegramUserId); - - if (pending?.awaitingUrl) { - // User is providing Hub URL - await this.handleHubUrlInput(ctx, telegramUserId, text, pending); - return; - } - - // New user, start binding process - await this.startBindingProcess(ctx, telegramUserId); - } - - /** Start the Hub binding process for a new user */ - private async startBindingProcess(ctx: Context, telegramUserId: string): Promise { - const msg = ctx.message; - - this.pendingBindings.set(telegramUserId, { - awaitingUrl: true, - telegramUsername: msg?.from?.username, - telegramFirstName: msg?.from?.first_name, - telegramLastName: msg?.from?.last_name, - }); - + // New user without connection link await ctx.reply( - "👋 Welcome to Multica!\n\n" + - "Please enter your Hub URL to get started.\n\n" + - "Example: https://your-hub.example.com" + "Welcome to Multica!\n\n" + + "To get started, open the Multica Desktop app, generate a Connection Link, " + + "and paste it here.\n\n" + + "The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=..." ); } - /** Handle Hub URL input from user */ - private async handleHubUrlInput( - ctx: Context, - telegramUserId: string, - url: string, - pending: PendingBinding - ): Promise { - // Validate URL format - if (!this.isValidUrl(url)) { - await ctx.reply( - "❌ Invalid URL format.\n\n" + - "Please enter a valid Hub URL.\n" + - "Example: https://your-hub.example.com" - ); - return; - } + /** Handle a multica://connect? connection link */ + private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise { + const msg = ctx.message; - // Validate Hub connectivity - const isValid = await this.validateHubUrl(url); - if (!isValid) { - await ctx.reply( - "❌ Cannot connect to this Hub.\n\n" + - "Please check the URL and make sure your Hub is online.\n" + - "Then try again with the correct URL." - ); - return; - } - - // Create user record + // 1. Parse and validate the connection link + let connectionInfo: ConnectionInfo; try { - const user = await this.userStore.upsert({ + connectionInfo = parseConnectionCode(text); + } catch (error) { + const message = error instanceof Error ? error.message : "Invalid connection link"; + await ctx.reply(`Connection failed: ${message}\n\nPlease generate a new link and try again.`); + return; + } + + // 2. Check Hub is online + if (!this.eventsGateway.isDeviceRegistered(connectionInfo.hubId)) { + await ctx.reply( + "Connection failed: Hub is not online.\n\n" + + "Make sure the Multica Desktop app is running and connected to the Gateway, then try again." + ); + return; + } + + // 3. Unregister old virtual device if user is re-binding + const existingUser = await this.userStore.findByTelegramUserId(telegramUserId); + if (existingUser && this.eventsGateway.isDeviceRegistered(existingUser.deviceId)) { + this.eventsGateway.unregisterVirtualDevice(existingUser.deviceId); + } + + // 4. Generate device ID and register virtual device + const deviceId = `tg-${uuidv7()}`; + this.registerVirtualDeviceForUser(deviceId, telegramUserId); + + // 5. Send verify RPC + try { + await ctx.reply("Connecting... Please approve the connection on your Desktop app."); + + const result = await this.sendVerifyRpc( + deviceId, + connectionInfo.hubId, + connectionInfo.token, + { + platform: "telegram", + telegramUserId, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + } + ); + + // 6. Save to DB + await this.userStore.upsert({ telegramUserId, - hubUrl: url, - telegramUsername: pending.telegramUsername, - telegramFirstName: pending.telegramFirstName, - telegramLastName: pending.telegramLastName, + hubId: connectionInfo.hubId, + agentId: connectionInfo.agentId, + deviceId, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + telegramLastName: msg?.from?.last_name, }); - // Register as virtual device - this.eventsGateway.registerVirtualDevice(user.deviceId, { - sendCallback: (_event, data) => { - const payload = data as { text?: string }; - if (payload.text) { - this.sendToTelegram(user.deviceId, payload.text); - } - }, - }); - - this.pendingBindings.delete(telegramUserId); - await ctx.reply( - "✅ Hub connected successfully!\n\n" + - `Your Device ID: ${user.deviceId}\n\n` + + "Connected successfully!\n\n" + + `Hub: ${result.hubId}\n` + + `Agent: ${result.agentId}\n\n` + "You can now send messages to interact with your agent." ); - this.logger.log(`Telegram user bound to Hub: telegramUserId=${telegramUserId}, hubUrl=${url}, deviceId=${user.deviceId}`); + this.logger.log(`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`); } catch (error) { + // Cleanup virtual device on failure + this.eventsGateway.unregisterVirtualDevice(deviceId); + // Reject all pending requests for this device + this.cleanupPendingRequests(); + const message = error instanceof Error ? error.message : String(error); - this.logger.error(`Failed to bind Telegram user: telegramUserId=${telegramUserId}, error=${message}`); - await ctx.reply("❌ An error occurred. Please try again later."); + if (message.includes("REJECTED")) { + await ctx.reply("Connection rejected.\n\nThe connection was declined on the Desktop app."); + } else if (message.includes("timed out")) { + await ctx.reply("Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds."); + } else { + await ctx.reply(`Connection failed: ${message}\n\nPlease try again.`); + } + + this.logger.warn(`Telegram verify failed: telegramUserId=${telegramUserId}, error=${message}`); } } - /** Validate URL format */ - private isValidUrl(url: string): boolean { - try { - const parsed = new URL(url); - return parsed.protocol === "http:" || parsed.protocol === "https:"; - } catch { - return false; - } - } + /** Send a verify RPC to Hub via the virtual device */ + private sendVerifyRpc( + deviceId: string, + hubId: string, + token: string, + meta: Record, + ): Promise { + return new Promise((resolve, reject) => { + const requestId = uuidv7(); - /** Validate Hub URL connectivity */ - private async validateHubUrl(url: string): Promise { - try { - // Try to connect to the Hub's health endpoint - const response = await fetch(`${url}/`, { - method: "GET", - signal: AbortSignal.timeout(5000), + const timer = setTimeout(() => { + this.pendingRequests.delete(requestId); + reject(new Error("Verify request timed out")); + }, VERIFY_TIMEOUT_MS); + + this.pendingRequests.set(requestId, { + resolve: resolve as (v: unknown) => void, + reject, + timer, }); - return response.ok; - } catch { - return false; - } + + const payload: RequestPayload = { + requestId, + method: "verify", + params: { token, meta }, + }; + + const message: RoutedMessage> = { + id: uuidv7(), + uid: null, + from: deviceId, + to: hubId, + action: RequestAction, + payload, + }; + + const sent = this.eventsGateway.routeFromVirtualDevice(message); + if (!sent) { + this.pendingRequests.delete(requestId); + clearTimeout(timer); + reject(new Error("Failed to route verify request to Hub")); + } + }); } - /** Route message to user's Hub */ + /** Route a regular chat message to the user's Hub agent */ private async routeToHub(user: TelegramUser, text: string, ctx: Context): Promise { - // Ensure virtual device is registered - if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { - this.eventsGateway.registerVirtualDevice(user.deviceId, { - sendCallback: (_event, data) => { - const payload = data as { text?: string }; - if (payload.text) { - this.sendToTelegram(user.deviceId, payload.text); - } - }, - }); + // Ensure Hub is online + if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { + await ctx.reply( + "Your Hub is currently offline.\n\n" + + "Make sure the Multica Desktop app is running and connected to the Gateway." + ); + return; } - // TODO: Route message to Hub via EventsGateway - // For now, just acknowledge receipt - this.logger.log(`Routing message to Hub: deviceId=${user.deviceId}, hubUrl=${user.hubUrl}`); + // Ensure virtual device is registered (may have been lost on gateway restart) + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId); + } - // Placeholder: In full implementation, this would send to Hub - await ctx.reply(`📨 Message received. Routing to your Hub...`); + // Send message to Hub + const message: RoutedMessage = { + id: uuidv7(), + uid: null, + from: user.deviceId, + to: user.hubId, + action: "message", + payload: { agentId: user.agentId, content: text }, + }; + + const sent = this.eventsGateway.routeFromVirtualDevice(message); + if (!sent) { + await ctx.reply("Failed to send message. Please try again."); + return; + } + + this.logger.debug(`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`); + } + + /** Register a virtual device with a sendCallback that handles RPC responses, stream events, and messages */ + private registerVirtualDeviceForUser(deviceId: string, telegramUserId: string): void { + this.eventsGateway.registerVirtualDevice(deviceId, { + sendCallback: (_event: string, data: unknown) => { + const msg = data as RoutedMessage; + if (!msg || !msg.action) return; + + // RPC response — resolve/reject pending request + if (msg.action === ResponseAction) { + const response = msg.payload as ResponsePayload; + const pending = this.pendingRequests.get(response.requestId); + if (pending) { + this.pendingRequests.delete(response.requestId); + clearTimeout(pending.timer); + if (response.ok) { + pending.resolve(response.payload); + } else { + pending.reject(new Error(`RPC error [${response.error.code}]: ${response.error.message}`)); + } + } + return; + } + + // Stream event — extract text content for Telegram + if (msg.action === StreamAction) { + const streamPayload = msg.payload as StreamPayload; + const event = streamPayload?.event; + if (event && "type" in event && event.type === "message_end") { + // Extract final text from the message + const agentMsg = (event as { message?: { content?: Array<{ type: string; text?: string }> } }).message; + if (agentMsg?.content) { + const textContent = agentMsg.content + .filter((c) => c.type === "text" && c.text) + .map((c) => c.text!) + .join(""); + if (textContent) { + this.sendToTelegram(deviceId, textContent); + } + } + } + return; + } + + // Regular message (e.g., "message" action from Hub) + if (msg.action === "message") { + const payload = msg.payload as { content?: string; agentId?: string }; + if (payload?.content) { + this.sendToTelegram(deviceId, payload.content); + } + return; + } + + // Error messages + if (msg.action === "error") { + const payload = msg.payload as { message?: string; code?: string }; + if (payload?.message) { + this.sendToTelegram(deviceId, `Error: ${payload.message}`); + } + } + }, + }); + } + + /** Cleanup all pending requests (used on verify failure) */ + private cleanupPendingRequests(): void { + for (const [id, pending] of this.pendingRequests) { + clearTimeout(pending.timer); + pending.reject(new Error("Cleaned up")); + this.pendingRequests.delete(id); + } } } diff --git a/src/gateway/telegram/types.ts b/src/gateway/telegram/types.ts index 315dd10d..c4bd87fc 100644 --- a/src/gateway/telegram/types.ts +++ b/src/gateway/telegram/types.ts @@ -5,7 +5,8 @@ /** Telegram user record stored in database */ export interface TelegramUser { telegramUserId: string; - hubUrl: string; + hubId: string; + agentId: string; deviceId: string; createdAt: Date; updatedAt: Date; @@ -17,7 +18,9 @@ export interface TelegramUser { /** Data required to create/update a Telegram user */ export interface TelegramUserCreate { telegramUserId: string; - hubUrl: string; + hubId: string; + agentId: string; + deviceId?: string; telegramUsername?: string | undefined; telegramFirstName?: string | undefined; telegramLastName?: string | undefined; From 483f4a2f59ebf72e8efbb726c35de800380addce Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:04:47 +0800 Subject: [PATCH 3/9] fix(gateway): graceful degradation when MySQL is unavailable Instead of throwing and crashing the gateway when MySQL connection fails, close the broken pool and continue with pool=null so the isAvailable() guard handles it gracefully. Co-Authored-By: Claude Opus 4.6 --- src/gateway/database/database.service.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/gateway/database/database.service.ts b/src/gateway/database/database.service.ts index 27a675f6..1360aa11 100644 --- a/src/gateway/database/database.service.ts +++ b/src/gateway/database/database.service.ts @@ -40,7 +40,11 @@ export class DatabaseService implements OnModuleInit, OnModuleDestroy { const message = error instanceof Error ? error.message : String(error); console.log("[DatabaseService] Error:", message); this.logger.error(`Failed to connect to MySQL: ${message}`); - throw error; + // Graceful degradation: close broken pool, keep this.pool = null + if (this.pool) { + await this.pool.end().catch(() => {}); + this.pool = null; + } } } From 6284ffd74a7c4dc68e8d897a8b4f62255574aed9 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:06:58 +0800 Subject: [PATCH 4/9] fix(telegram): drop and recreate telegram_users table for schema change The old table had hub_url column; new schema uses hub_id + agent_id. CREATE TABLE IF NOT EXISTS won't alter existing tables, so drop first. Co-Authored-By: Claude Opus 4.6 --- src/gateway/telegram/telegram-user.store.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts index 1570bc8d..d7f64cf9 100644 --- a/src/gateway/telegram/telegram-user.store.ts +++ b/src/gateway/telegram/telegram-user.store.ts @@ -39,10 +39,11 @@ export class TelegramUserStore implements OnModuleInit { console.log("[TelegramUserStore] Done"); } - /** Create telegram_users table if not exists */ + /** Drop and recreate telegram_users table (schema changed: hub_url -> hub_id + agent_id) */ private async ensureTable(): Promise { + await this.db.execute("DROP TABLE IF EXISTS telegram_users"); const sql = ` - CREATE TABLE IF NOT EXISTS telegram_users ( + CREATE TABLE telegram_users ( telegram_user_id VARCHAR(64) PRIMARY KEY, hub_id VARCHAR(64) NOT NULL, agent_id VARCHAR(64) NOT NULL, From 1547da3279c8df66edf8a9eb7ee48ee44d0e10dc Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:08:52 +0800 Subject: [PATCH 5/9] refactor(telegram): remove auto table creation, add SQL script for ops Move table DDL to scripts/telegram-users.sql for manual execution by ops. Remove OnModuleInit and ensureTable() from TelegramUserStore. Co-Authored-By: Claude Opus 4.6 --- scripts/telegram-users.sql | 17 ++++++++++ src/gateway/telegram/telegram-user.store.ts | 36 +-------------------- 2 files changed, 18 insertions(+), 35 deletions(-) create mode 100644 scripts/telegram-users.sql diff --git a/scripts/telegram-users.sql b/scripts/telegram-users.sql new file mode 100644 index 00000000..19f1a720 --- /dev/null +++ b/scripts/telegram-users.sql @@ -0,0 +1,17 @@ +-- Telegram users table for Gateway +-- Run this manually before starting the Gateway with Telegram enabled. + +DROP TABLE IF EXISTS telegram_users; + +CREATE TABLE telegram_users ( + telegram_user_id VARCHAR(64) PRIMARY KEY, + hub_id VARCHAR(64) NOT NULL, + agent_id VARCHAR(64) NOT NULL, + device_id VARCHAR(64) NOT NULL UNIQUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + telegram_username VARCHAR(255), + telegram_first_name VARCHAR(255), + telegram_last_name VARCHAR(255), + INDEX idx_device_id (device_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts index d7f64cf9..ba2e807f 100644 --- a/src/gateway/telegram/telegram-user.store.ts +++ b/src/gateway/telegram/telegram-user.store.ts @@ -3,7 +3,6 @@ */ import { Inject, Injectable, Logger } from "@nestjs/common"; -import type { OnModuleInit } from "@nestjs/common"; import { v7 as uuidv7 } from "uuid"; import type { RowDataPacket } from "mysql2/promise"; import { DatabaseService } from "../database/database.service.js"; @@ -22,44 +21,11 @@ interface TelegramUserRow extends RowDataPacket { } @Injectable() -export class TelegramUserStore implements OnModuleInit { +export class TelegramUserStore { private readonly logger = new Logger(TelegramUserStore.name); constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {} - async onModuleInit(): Promise { - console.log("[TelegramUserStore] onModuleInit starting..."); - if (!this.db.isAvailable()) { - console.log("[TelegramUserStore] Database not available"); - this.logger.warn("Database not available, TelegramUserStore disabled"); - return; - } - console.log("[TelegramUserStore] Ensuring table..."); - await this.ensureTable(); - console.log("[TelegramUserStore] Done"); - } - - /** Drop and recreate telegram_users table (schema changed: hub_url -> hub_id + agent_id) */ - private async ensureTable(): Promise { - await this.db.execute("DROP TABLE IF EXISTS telegram_users"); - const sql = ` - CREATE TABLE telegram_users ( - telegram_user_id VARCHAR(64) PRIMARY KEY, - hub_id VARCHAR(64) NOT NULL, - agent_id VARCHAR(64) NOT NULL, - device_id VARCHAR(64) NOT NULL UNIQUE, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - telegram_username VARCHAR(255), - telegram_first_name VARCHAR(255), - telegram_last_name VARCHAR(255), - INDEX idx_device_id (device_id) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 - `; - await this.db.execute(sql); - this.logger.log("telegram_users table ensured"); - } - /** Find user by Telegram user ID */ async findByTelegramUserId(telegramUserId: string): Promise { if (!this.db.isAvailable()) return null; From bc78dcdca97118e1137f9f3fd27c0d815a827604 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:10:29 +0800 Subject: [PATCH 6/9] chore: move SQL schema to migrations/ directory Co-Authored-By: Claude Opus 4.6 --- {scripts => migrations}/telegram-users.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {scripts => migrations}/telegram-users.sql (100%) diff --git a/scripts/telegram-users.sql b/migrations/telegram-users.sql similarity index 100% rename from scripts/telegram-users.sql rename to migrations/telegram-users.sql From c9836d60d42b7d93b477b94a375f0a6ca021da26 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:11:47 +0800 Subject: [PATCH 7/9] chore: move SQL schema to src/gateway/migrations/ Co-Authored-By: Claude Opus 4.6 --- {migrations => src/gateway/migrations}/telegram-users.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {migrations => src/gateway/migrations}/telegram-users.sql (100%) diff --git a/migrations/telegram-users.sql b/src/gateway/migrations/telegram-users.sql similarity index 100% rename from migrations/telegram-users.sql rename to src/gateway/migrations/telegram-users.sql From a94b6e8b1c640548784001c38194840d299655d3 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:19:08 +0800 Subject: [PATCH 8/9] fix(telegram): use DeviceMeta interface for verify RPC meta parameter Pack Telegram user info (userId, username, firstName) into the userAgent field instead of passing custom fields that don't conform to the SDK's DeviceMeta interface. Co-Authored-By: Claude Opus 4.6 --- src/gateway/telegram/telegram.service.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts index 1076b2de..6d4372ea 100644 --- a/src/gateway/telegram/telegram.service.ts +++ b/src/gateway/telegram/telegram.service.ts @@ -24,6 +24,7 @@ import { type ResponsePayload, type VerifyParams, type VerifyResult, + type DeviceMeta, } from "@multica/sdk"; import type { StreamPayload } from "@multica/sdk"; import { EventsGateway } from "../events.gateway.js"; @@ -203,9 +204,7 @@ export class TelegramService implements OnModuleInit { connectionInfo.token, { platform: "telegram", - telegramUserId, - telegramUsername: msg?.from?.username, - telegramFirstName: msg?.from?.first_name, + userAgent: `Telegram ${telegramUserId} @${msg?.from?.username ?? ""} ${msg?.from?.first_name ?? ""}`.trim(), } ); @@ -252,7 +251,7 @@ export class TelegramService implements OnModuleInit { deviceId: string, hubId: string, token: string, - meta: Record, + meta: DeviceMeta, ): Promise { return new Promise((resolve, reject) => { const requestId = uuidv7(); From d367e77c0a18641829b3d9d1dfd5fc1fd7671ed7 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:29:17 +0800 Subject: [PATCH 9/9] feat(device): add clientName to DeviceMeta for multi-client display Add clientName field to DeviceMeta so non-browser clients (Telegram, Discord, etc.) can provide a human-readable label instead of relying on userAgent parsing. Desktop UI now prioritizes clientName over parsed UA string, fixing "Unknown on Unknown" display for Telegram connections. Co-Authored-By: Claude Opus 4.6 --- apps/desktop/src/components/device-confirm-dialog.tsx | 9 ++++++--- apps/desktop/src/components/device-list.tsx | 8 +++++--- apps/desktop/src/hooks/use-devices.ts | 1 + packages/sdk/src/actions/rpc.ts | 1 + src/gateway/telegram/telegram.service.ts | 4 +++- src/hub/device-store.ts | 1 + 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/apps/desktop/src/components/device-confirm-dialog.tsx b/apps/desktop/src/components/device-confirm-dialog.tsx index 81420891..4b49aaad 100644 --- a/apps/desktop/src/components/device-confirm-dialog.tsx +++ b/apps/desktop/src/components/device-confirm-dialog.tsx @@ -15,6 +15,7 @@ interface DeviceMeta { userAgent?: string platform?: string language?: string + clientName?: string } interface PendingConfirm { @@ -55,9 +56,11 @@ export function DeviceConfirmDialog() { ? parseUserAgent(pending.meta.userAgent) : null - const deviceLabel = parsed - ? `${parsed.browser} on ${parsed.os}` - : pending?.deviceId + const deviceLabel = pending?.meta?.clientName + ? pending.meta.clientName + : parsed + ? `${parsed.browser} on ${parsed.os}` + : pending?.deviceId return ( diff --git a/apps/desktop/src/components/device-list.tsx b/apps/desktop/src/components/device-list.tsx index 5e5bc897..76183e16 100644 --- a/apps/desktop/src/components/device-list.tsx +++ b/apps/desktop/src/components/device-list.tsx @@ -44,9 +44,11 @@ function DeviceItem({ ? parseUserAgent(device.meta.userAgent) : null - const displayName = parsed - ? `${parsed.browser} on ${parsed.os}` - : device.deviceId + const displayName = device.meta?.clientName + ? device.meta.clientName + : parsed + ? `${parsed.browser} on ${parsed.os}` + : device.deviceId const handleRevoke = async () => { setRevoking(true) diff --git a/apps/desktop/src/hooks/use-devices.ts b/apps/desktop/src/hooks/use-devices.ts index 3a7c2936..1e93a72b 100644 --- a/apps/desktop/src/hooks/use-devices.ts +++ b/apps/desktop/src/hooks/use-devices.ts @@ -4,6 +4,7 @@ export interface DeviceMeta { userAgent?: string platform?: string language?: string + clientName?: string } export interface DeviceEntry { diff --git a/packages/sdk/src/actions/rpc.ts b/packages/sdk/src/actions/rpc.ts index ddbc78f7..5fd587d3 100644 --- a/packages/sdk/src/actions/rpc.ts +++ b/packages/sdk/src/actions/rpc.ts @@ -133,6 +133,7 @@ export interface DeviceMeta { userAgent?: string; platform?: string; language?: string; + clientName?: string; } /** verify - request params */ diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts index 6d4372ea..66e4742b 100644 --- a/src/gateway/telegram/telegram.service.ts +++ b/src/gateway/telegram/telegram.service.ts @@ -204,7 +204,9 @@ export class TelegramService implements OnModuleInit { connectionInfo.token, { platform: "telegram", - userAgent: `Telegram ${telegramUserId} @${msg?.from?.username ?? ""} ${msg?.from?.first_name ?? ""}`.trim(), + clientName: msg?.from?.username + ? `Telegram @${msg.from.username}` + : `Telegram ${msg?.from?.first_name ?? telegramUserId}`, } ); diff --git a/src/hub/device-store.ts b/src/hub/device-store.ts index 97f54aff..0b1d31b3 100644 --- a/src/hub/device-store.ts +++ b/src/hub/device-store.ts @@ -14,6 +14,7 @@ export interface DeviceMeta { userAgent?: string; platform?: string; language?: string; + clientName?: string; } export interface DeviceEntry {