From 604d74d98459a38a9da394a18c8653f2596a848d Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 10 Feb 2026 16:04:10 +0800 Subject: [PATCH] fix(gateway): route messages to virtual devices in handleSend() handleSend() only checked socket-based devices (deviceToSocket map), causing DEVICE_NOT_FOUND errors when Hub sends responses to virtual devices like Telegram. Now checks virtualDevices map as fallback. Also adds routeFromVirtualDevice() to allow virtual devices to initiate messages (e.g., verify RPC, chat messages) through the Gateway routing infrastructure. Co-Authored-By: Claude Opus 4.6 --- package.json | 1 + pnpm-lock.yaml | 68 +++++ src/gateway/app.module.ts | 8 +- src/gateway/database/database.module.ts | 15 ++ src/gateway/database/database.service.ts | 93 +++++++ src/gateway/events.gateway.ts | 134 ++++++---- src/gateway/gateway.module.ts | 3 +- src/gateway/main.ts | 25 +- src/gateway/telegram/telegram-user.store.ts | 152 +++++++++++ src/gateway/telegram/telegram.controller.ts | 67 +++++ src/gateway/telegram/telegram.module.ts | 17 ++ src/gateway/telegram/telegram.service.ts | 273 ++++++++++++++++++++ src/gateway/telegram/types.ts | 27 ++ src/gateway/test-app.ts | 29 +++ src/gateway/types.ts | 8 + 15 files changed, 862 insertions(+), 58 deletions(-) create mode 100644 src/gateway/database/database.module.ts create mode 100644 src/gateway/database/database.service.ts create mode 100644 src/gateway/telegram/telegram-user.store.ts create mode 100644 src/gateway/telegram/telegram.controller.ts create mode 100644 src/gateway/telegram/telegram.module.ts create mode 100644 src/gateway/telegram/telegram.service.ts create mode 100644 src/gateway/telegram/types.ts create mode 100644 src/gateway/test-app.ts create mode 100644 src/gateway/types.ts diff --git a/package.json b/package.json index a6da1ff5..3c492741 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "croner": "^10.0.1", "fast-glob": "^3.3.3", "grammy": "^1.39.3", + "mysql2": "^3.14.1", "json5": "^2.2.3", "linkedom": "^0.18.12", "nestjs-pino": "^4.5.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 84a3a2d7..72d97303 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -83,6 +83,9 @@ importers: linkedom: specifier: ^0.18.12 version: 0.18.12 + mysql2: + specifier: ^3.14.1 + version: 3.16.3 nestjs-pino: specifier: ^4.5.0 version: 4.5.0(@nestjs/common@11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2))(pino-http@11.0.0)(pino@10.3.0)(rxjs@7.8.2) @@ -4349,6 +4352,10 @@ packages: resolution: {integrity: sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ==} engines: {node: '>= 0.4'} + aws-ssl-profiles@1.1.2: + resolution: {integrity: sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g==} + engines: {node: '>= 6.0.0'} + axe-core@4.11.1: resolution: {integrity: sha512-BASOg+YwO2C+346x3LZOeoovTIoTrRqEsqMa6fmfAV0P+U9mFr9NsyOEpiYvFjbc64NMrSswhV50WdXzdb/Z5A==} engines: {node: '>=4'} @@ -5023,6 +5030,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -5890,6 +5901,9 @@ packages: resolution: {integrity: sha512-zV/5HKTfCeKWnxG0Dmrw51hEWFGfcF2xiXqcA3+J90WDuP0SvoiSO5ORvcBsifmx/FoIjgQN3oNOGaQ5PhLFkg==} engines: {node: '>=18'} + generate-function@2.3.1: + resolution: {integrity: sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==} + generator-function@2.0.1: resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==} engines: {node: '>= 0.4'} @@ -6400,6 +6414,9 @@ packages: is-promise@4.0.0: resolution: {integrity: sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==} + is-property@1.0.2: + resolution: {integrity: sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==} + is-regex@1.2.1: resolution: {integrity: sha512-MjYsKHO5O7mCsmRGxWcLWheFqN9DJ/2TmngvjKXihe6efViPqc274+Fx/4fYj/r03+ESvBdTXK0V6tA3rgez1g==} engines: {node: '>= 0.4'} @@ -6929,6 +6946,10 @@ packages: resolution: {integrity: sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==} engines: {node: '>=12'} + lru.min@1.1.4: + resolution: {integrity: sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA==} + engines: {bun: '>=1.0.0', deno: '>=1.30.0', node: '>=8.0.0'} + lucide-react-native@0.563.0: resolution: {integrity: sha512-q4tYoAMorTqv+UXRYc0MyiEAOF+4Bu73zxD63EDrnGCFL+xuj+imBm3E2rIKRmME0heVHlK+98fsi8wbL92LNQ==} peerDependencies: @@ -7315,9 +7336,17 @@ packages: resolution: {integrity: sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA==} engines: {node: ^18.17.0 || >=20.5.0} + mysql2@3.16.3: + resolution: {integrity: sha512-+3XhQEt4FEFuvGV0JjIDj4eP2OT/oIj/54dYvqhblnSzlfcxVOuj+cd15Xz6hsG4HU1a+A5+BA9gm0618C4z7A==} + engines: {node: '>= 8.0'} + mz@2.7.0: resolution: {integrity: sha512-z81GNO7nnYMEhrGh9LeymoE4+Yr0Wn5McHIZMK5cfQCl+NDX08sCZgUc9/6MHni9IWuFLm1Z3HTCXu2z9fN62Q==} + named-placeholders@1.1.6: + resolution: {integrity: sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w==} + engines: {node: '>=8.0.0'} + nanoid@3.3.11: resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -8439,6 +8468,9 @@ packages: resolution: {integrity: sha512-1gnZf7DFcoIcajTjTwjwuDjzuz4PPcY2StKPlsGAQ1+YH20IRVrBaXSWmdjowTJ6u8Rc01PoYOGHXfP1mYcZNQ==} engines: {node: '>= 18'} + seq-queue@0.0.5: + resolution: {integrity: sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==} + serialize-error@2.1.0: resolution: {integrity: sha512-ghgmKt5o4Tly5yEG/UJp8qTd0AN7Xalw4XBtDEKP655B699qMEtra1WlXeE6WIvdEG481JvRxULKsInq/iNysw==} engines: {node: '>=0.10.0'} @@ -8628,6 +8660,10 @@ packages: sprintf-js@1.1.3: resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==} + sqlstring@2.3.3: + resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==} + engines: {node: '>= 0.6'} + stable-hash@0.0.5: resolution: {integrity: sha512-+L3ccpzibovGXFK+Ap/f8LOS0ahMrHTf3xu7mMLSpEGU0EO9ucaysSylKo9eRDFNhWve/y275iPmIZ4z39a9iA==} @@ -14567,6 +14603,8 @@ snapshots: dependencies: possible-typed-array-names: 1.1.0 + aws-ssl-profiles@1.1.2: {} + axe-core@4.11.1: {} axobject-query@4.1.0: {} @@ -15301,6 +15339,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + depd@2.0.0: {} dequal@2.0.3: {} @@ -16561,6 +16601,10 @@ snapshots: transitivePeerDependencies: - supports-color + generate-function@2.3.1: + dependencies: + is-property: 1.0.2 + generator-function@2.0.1: {} gensync@1.0.0-beta.2: {} @@ -17149,6 +17193,8 @@ snapshots: is-promise@4.0.0: {} + is-property@1.0.2: {} + is-regex@1.2.1: dependencies: call-bound: 1.0.4 @@ -17630,6 +17676,8 @@ snapshots: lru-cache@7.18.3: {} + lru.min@1.1.4: {} + lucide-react-native@0.563.0(react-native-svg@15.15.1(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0))(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0): dependencies: react: 19.1.0 @@ -18330,12 +18378,28 @@ snapshots: mute-stream@2.0.0: {} + mysql2@3.16.3: + dependencies: + aws-ssl-profiles: 1.1.2 + denque: 2.1.0 + generate-function: 2.3.1 + iconv-lite: 0.7.2 + long: 5.3.2 + lru.min: 1.1.4 + named-placeholders: 1.1.6 + seq-queue: 0.0.5 + sqlstring: 2.3.3 + mz@2.7.0: dependencies: any-promise: 1.3.0 object-assign: 4.1.1 thenify-all: 1.6.0 + named-placeholders@1.1.6: + dependencies: + lru.min: 1.1.4 + nanoid@3.3.11: {} napi-postinstall@0.3.4: {} @@ -19691,6 +19755,8 @@ snapshots: transitivePeerDependencies: - supports-color + seq-queue@0.0.5: {} + serialize-error@2.1.0: {} serialize-error@7.0.1: @@ -19993,6 +20059,8 @@ snapshots: sprintf-js@1.1.3: optional: true + sqlstring@2.3.3: {} + stable-hash@0.0.5: {} stack-utils@2.0.6: diff --git a/src/gateway/app.module.ts b/src/gateway/app.module.ts index 03200ec5..cd478bf0 100644 --- a/src/gateway/app.module.ts +++ b/src/gateway/app.module.ts @@ -3,8 +3,10 @@ import { ServeStaticModule } from "@nestjs/serve-static"; import { LoggerModule } from "nestjs-pino"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; -import { EventsGateway } from "./events.gateway.js"; import { AppController } from "./app.controller.js"; +import { DatabaseModule } from "./database/database.module.js"; +import { GatewayModule } from "./gateway.module.js"; +import { TelegramModule } from "./telegram/telegram.module.js"; const __dirname = fileURLToPath(new URL(".", import.meta.url)); const isDev = process.env.NODE_ENV !== "production"; @@ -31,8 +33,10 @@ const isDev = process.env.NODE_ENV !== "production"; level: process.env.LOG_LEVEL ?? "info", }, }), + DatabaseModule, + GatewayModule, + TelegramModule, ], - providers: [EventsGateway], controllers: [AppController], }) export class AppModule {} diff --git a/src/gateway/database/database.module.ts b/src/gateway/database/database.module.ts new file mode 100644 index 00000000..a357a4e3 --- /dev/null +++ b/src/gateway/database/database.module.ts @@ -0,0 +1,15 @@ +/** + * Database module for Gateway. + * + * Global module that provides DatabaseService to all other modules. + */ + +import { Global, Module } from "@nestjs/common"; +import { DatabaseService } from "./database.service.js"; + +@Global() +@Module({ + providers: [DatabaseService], + exports: [DatabaseService], +}) +export class DatabaseModule {} diff --git a/src/gateway/database/database.service.ts b/src/gateway/database/database.service.ts new file mode 100644 index 00000000..27a675f6 --- /dev/null +++ b/src/gateway/database/database.service.ts @@ -0,0 +1,93 @@ +/** + * MySQL database service for Gateway. + * + * Provides a connection pool and query interface. + * Configuration is read from MYSQL_DSN environment variable. + */ + +import { Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit, OnModuleDestroy } from "@nestjs/common"; +import mysql from "mysql2/promise"; +import type { Pool, PoolOptions, RowDataPacket, ResultSetHeader } from "mysql2/promise"; + +@Injectable() +export class DatabaseService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(DatabaseService.name); + private pool: Pool | null = null; + + async onModuleInit(): Promise { + console.log("[DatabaseService] onModuleInit starting..."); + const dsn = process.env["MYSQL_DSN"]; + if (!dsn) { + console.log("[DatabaseService] MYSQL_DSN not set"); + this.logger.warn("MYSQL_DSN not set, database features disabled"); + return; + } + + try { + console.log("[DatabaseService] Parsing DSN..."); + const config = this.parseDsn(dsn); + console.log("[DatabaseService] Creating pool..."); + this.pool = mysql.createPool(config); + + // Test connection + console.log("[DatabaseService] Testing connection..."); + const connection = await this.pool.getConnection(); + connection.release(); + console.log("[DatabaseService] Connected!"); + this.logger.log(`MySQL connected (host=${config.host}, database=${config.database})`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.log("[DatabaseService] Error:", message); + this.logger.error(`Failed to connect to MySQL: ${message}`); + throw error; + } + } + + async onModuleDestroy(): Promise { + if (this.pool) { + await this.pool.end(); + this.logger.log("MySQL connection pool closed"); + } + } + + /** Check if database is available */ + isAvailable(): boolean { + return this.pool !== null; + } + + /** Execute a query and return rows */ + async query(sql: string, params?: unknown[]): Promise { + if (!this.pool) { + throw new Error("Database not available"); + } + const [rows] = await this.pool.query(sql, params); + return rows; + } + + /** Execute an insert/update/delete and return result */ + async execute(sql: string, params?: unknown[]): Promise { + if (!this.pool) { + throw new Error("Database not available"); + } + const [result] = await this.pool.execute(sql, params); + return result; + } + + /** Parse MySQL DSN string into connection options */ + private parseDsn(dsn: string): PoolOptions { + // Format: mysql://user:password@host:port/database + const url = new URL(dsn); + + return { + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : 3306, + user: url.username, + password: url.password, + database: url.pathname.slice(1), // Remove leading / + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0, + }; + } +} diff --git a/src/gateway/events.gateway.ts b/src/gateway/events.gateway.ts index ebeb2191..304ba5da 100644 --- a/src/gateway/events.gateway.ts +++ b/src/gateway/events.gateway.ts @@ -1,4 +1,4 @@ -import { Injectable } from "@nestjs/common"; +import { Injectable, Logger } from "@nestjs/common"; import { WebSocketGateway, WebSocketServer, @@ -12,7 +12,6 @@ import type { OnGatewayDisconnect, } from "@nestjs/websockets"; import type { Server, Socket } from "socket.io"; -import { InjectPinoLogger, PinoLogger } from "nestjs-pino"; import { GatewayEvents, type RoutedMessage, @@ -22,6 +21,7 @@ import { type DeviceInfo, type DeviceType, } from "@multica/sdk"; +import type { VirtualDeviceHandler } from "./types.js"; @Injectable() @WebSocketGateway({ @@ -36,10 +36,7 @@ import { export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { - constructor( - @InjectPinoLogger(EventsGateway.name) - private readonly logger: PinoLogger - ) {} + private readonly logger = new Logger(EventsGateway.name); @WebSocketServer() server!: Server; @@ -48,9 +45,11 @@ export class EventsGateway private deviceToSocket = new Map(); // socketId -> deviceInfo mapping private socketToDevice = new Map(); + // Virtual devices (non-socket based, e.g., Telegram) + private virtualDevices = new Map(); afterInit(_server: Server): void { - this.logger.info("WebSocket Gateway initialized"); + this.logger.log("WebSocket Gateway initialized"); } handleConnection(client: Socket): void { @@ -58,16 +57,10 @@ export class EventsGateway const deviceId = query["deviceId"] as string | undefined; const deviceType = query["deviceType"] as DeviceType | undefined; - this.logger.debug( - { socketId: client.id, deviceId, deviceType }, - "Incoming connection" - ); + this.logger.debug(`Incoming connection: socketId=${client.id}, deviceId=${deviceId}, deviceType=${deviceType}`); if (!deviceId || !deviceType) { - this.logger.warn( - { socketId: client.id }, - "Missing deviceId or deviceType in query, disconnecting" - ); + this.logger.warn(`Missing deviceId or deviceType in query, disconnecting (socketId=${client.id})`); client.disconnect(true); return; } @@ -75,10 +68,7 @@ export class EventsGateway // Check if deviceId is already in use by another socket const existingSocketId = this.deviceToSocket.get(deviceId); if (existingSocketId && existingSocketId !== client.id) { - this.logger.warn( - { deviceId, existingSocketId }, - "Device already registered by another socket, disconnecting" - ); + this.logger.warn(`Device already registered by another socket, disconnecting (deviceId=${deviceId}, existingSocketId=${existingSocketId})`); client.emit(GatewayEvents.REGISTERED, { success: false, deviceId, @@ -93,25 +83,19 @@ export class EventsGateway this.deviceToSocket.set(deviceId, client.id); this.socketToDevice.set(client.id, deviceInfo); - this.logger.info({ deviceId, deviceType }, "Device connected and registered"); + this.logger.log(`Device connected and registered: deviceId=${deviceId}, deviceType=${deviceType}`); client.emit(GatewayEvents.REGISTERED, { success: true, deviceId }); } handleDisconnect(client: Socket): void { const deviceInfo = this.socketToDevice.get(client.id); if (deviceInfo) { - this.logger.debug( - { socketId: client.id, deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType }, - "Device disconnecting" - ); - this.logger.info( - { deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType }, - "Device disconnected" - ); + this.logger.debug(`Device disconnecting: socketId=${client.id}, deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`); + this.logger.log(`Device disconnected: deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`); this.deviceToSocket.delete(deviceInfo.deviceId); this.socketToDevice.delete(client.id); } else { - this.logger.info({ socketId: client.id }, "Socket disconnected"); + this.logger.log(`Socket disconnected: socketId=${client.id}`); } } @@ -120,10 +104,7 @@ export class EventsGateway @MessageBody() message: RoutedMessage, @ConnectedSocket() client: Socket ): void { - this.logger.debug( - { socketId: client.id, message }, - "Received send event" - ); + this.logger.debug(`Received send event: socketId=${client.id}, messageId=${message.id}`); const senderDevice = this.socketToDevice.get(client.id); @@ -149,24 +130,27 @@ export class EventsGateway return; } - // Find target device + // Find target device — check socket-based first, then virtual const targetSocketId = this.deviceToSocket.get(message.to); - if (!targetSocketId) { - const error: SendErrorResponse = { - messageId: message.id, - error: `Device ${message.to} not found`, - code: "DEVICE_NOT_FOUND", - }; - client.emit(GatewayEvents.SEND_ERROR, error); + if (targetSocketId) { + this.logger.debug(`Routing message: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); return; } - // Forward message - this.logger.debug( - { messageId: message.id, from: message.from, to: message.to, action: message.action }, - "Routing message" - ); - this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); + const virtualHandler = this.virtualDevices.get(message.to); + if (virtualHandler) { + this.logger.debug(`Routing message to virtual device: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + virtualHandler.sendCallback(GatewayEvents.RECEIVE, message); + return; + } + + const error: SendErrorResponse = { + messageId: message.id, + error: `Device ${message.to} not found`, + code: "DEVICE_NOT_FOUND", + }; + client.emit(GatewayEvents.SEND_ERROR, error); } @SubscribeMessage(GatewayEvents.LIST_DEVICES) @@ -185,7 +169,7 @@ export class EventsGateway @MessageBody() data: PingPayload, @ConnectedSocket() client: Socket ): PongResponse { - this.logger.debug({ socketId: client.id, data }, "Received ping"); + this.logger.debug(`Received ping: socketId=${client.id}`); return { event: GatewayEvents.PONG, data: "Hello from Gateway!" }; } @@ -201,9 +185,63 @@ export class EventsGateway /** Send message to specified device (for HTTP API use) */ sendToDevice(deviceId: string, event: string, data: unknown): boolean { + // Check virtual devices first + const virtualHandler = this.virtualDevices.get(deviceId); + if (virtualHandler) { + this.logger.debug(`Routing to virtual device: deviceId=${deviceId}, event=${event}`); + virtualHandler.sendCallback(event, data); + return true; + } + + // Fall back to socket-based devices const socketId = this.deviceToSocket.get(deviceId); if (!socketId) return false; this.server.to(socketId).emit(event, data); return true; } + + /** Register a virtual device (non-socket based) */ + registerVirtualDevice(deviceId: string, handler: VirtualDeviceHandler): void { + this.virtualDevices.set(deviceId, handler); + this.logger.log(`Virtual device registered: deviceId=${deviceId}`); + } + + /** Unregister a virtual device */ + unregisterVirtualDevice(deviceId: string): void { + this.virtualDevices.delete(deviceId); + this.logger.log(`Virtual device unregistered: deviceId=${deviceId}`); + } + + /** Check if a device (socket or virtual) is registered */ + isDeviceRegistered(deviceId: string): boolean { + return this.deviceToSocket.has(deviceId) || this.virtualDevices.has(deviceId); + } + + /** Route a message originating from a virtual device to its target */ + routeFromVirtualDevice(message: RoutedMessage): boolean { + // Validate sender is a registered virtual device + if (!this.virtualDevices.has(message.from)) { + this.logger.warn(`routeFromVirtualDevice: sender not a virtual device (from=${message.from})`); + return false; + } + + // Try socket-based target first + const targetSocketId = this.deviceToSocket.get(message.to); + if (targetSocketId) { + this.logger.debug(`Virtual device routing: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); + return true; + } + + // Try virtual device target + const virtualHandler = this.virtualDevices.get(message.to); + if (virtualHandler) { + this.logger.debug(`Virtual device routing (v2v): id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + virtualHandler.sendCallback(GatewayEvents.RECEIVE, message); + return true; + } + + this.logger.warn(`routeFromVirtualDevice: target not found (to=${message.to})`); + return false; + } } diff --git a/src/gateway/gateway.module.ts b/src/gateway/gateway.module.ts index 414d18d4..3b285db3 100644 --- a/src/gateway/gateway.module.ts +++ b/src/gateway/gateway.module.ts @@ -1,6 +1,7 @@ -import { Module } from "@nestjs/common"; +import { Global, Module } from "@nestjs/common"; import { EventsGateway } from "./events.gateway.js"; +@Global() @Module({ providers: [EventsGateway], exports: [EventsGateway], diff --git a/src/gateway/main.ts b/src/gateway/main.ts index 424ee6b0..02a9a017 100644 --- a/src/gateway/main.ts +++ b/src/gateway/main.ts @@ -3,16 +3,27 @@ import { NestFactory } from "@nestjs/core"; import { Logger } from "nestjs-pino"; import { AppModule } from "./app.module.js"; +console.log("[Gateway] Starting bootstrap..."); + async function bootstrap(): Promise { - const app = await NestFactory.create(AppModule, { bufferLogs: true }); - app.useLogger(app.get(Logger)); + console.log("[Gateway] Creating NestFactory..."); + try { + const app = await NestFactory.create(AppModule, { bufferLogs: true, abortOnError: false }); + console.log("[Gateway] NestFactory created"); - const port = process.env["PORT"] ?? 3000; - await app.listen(port); + app.useLogger(app.get(Logger)); - const logger = app.get(Logger); - logger.log(`Gateway is running on http://localhost:${port}`); - logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`); + const port = process.env["PORT"] ?? 3000; + console.log(`[Gateway] Listening on port ${port}...`); + await app.listen(port); + + const logger = app.get(Logger); + logger.log(`Gateway is running on http://localhost:${port}`); + logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`); + } catch (err) { + console.error("[Gateway] Error during bootstrap:", err); + throw err; + } } bootstrap().catch((err) => { diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts new file mode 100644 index 00000000..5bd839ec --- /dev/null +++ b/src/gateway/telegram/telegram-user.store.ts @@ -0,0 +1,152 @@ +/** + * Telegram user store - MySQL persistence layer. + */ + +import { Inject, Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit } from "@nestjs/common"; +import { v7 as uuidv7 } from "uuid"; +import type { RowDataPacket } from "mysql2/promise"; +import { DatabaseService } from "../database/database.service.js"; +import type { TelegramUser, TelegramUserCreate } from "./types.js"; + +interface TelegramUserRow extends RowDataPacket { + telegram_user_id: string; + hub_url: string; + device_id: string; + created_at: Date; + updated_at: Date; + telegram_username: string | null; + telegram_first_name: string | null; + telegram_last_name: string | null; +} + +@Injectable() +export class TelegramUserStore implements OnModuleInit { + private readonly logger = new Logger(TelegramUserStore.name); + + constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {} + + async onModuleInit(): Promise { + console.log("[TelegramUserStore] onModuleInit starting..."); + if (!this.db.isAvailable()) { + console.log("[TelegramUserStore] Database not available"); + this.logger.warn("Database not available, TelegramUserStore disabled"); + return; + } + console.log("[TelegramUserStore] Ensuring table..."); + await this.ensureTable(); + console.log("[TelegramUserStore] Done"); + } + + /** Create telegram_users table if not exists */ + private async ensureTable(): Promise { + const sql = ` + CREATE TABLE IF NOT EXISTS telegram_users ( + telegram_user_id VARCHAR(64) PRIMARY KEY, + hub_url VARCHAR(512) NOT NULL, + device_id VARCHAR(64) NOT NULL UNIQUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + telegram_username VARCHAR(255), + telegram_first_name VARCHAR(255), + telegram_last_name VARCHAR(255), + INDEX idx_device_id (device_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + `; + await this.db.execute(sql); + this.logger.log("telegram_users table ensured"); + } + + /** Find user by Telegram user ID */ + async findByTelegramUserId(telegramUserId: string): Promise { + if (!this.db.isAvailable()) return null; + + const rows = await this.db.query( + "SELECT * FROM telegram_users WHERE telegram_user_id = ?", + [telegramUserId] + ); + + if (rows.length === 0) return null; + return this.rowToUser(rows[0]!); + } + + /** Find user by device ID */ + async findByDeviceId(deviceId: string): Promise { + if (!this.db.isAvailable()) return null; + + const rows = await this.db.query( + "SELECT * FROM telegram_users WHERE device_id = ?", + [deviceId] + ); + + if (rows.length === 0) return null; + return this.rowToUser(rows[0]!); + } + + /** Create or update a Telegram user */ + async upsert(data: TelegramUserCreate): Promise { + if (!this.db.isAvailable()) { + throw new Error("Database not available"); + } + + // Check if user exists + const existing = await this.findByTelegramUserId(data.telegramUserId); + + if (existing) { + // Update existing user + await this.db.execute( + `UPDATE telegram_users SET + hub_url = ?, + telegram_username = ?, + telegram_first_name = ?, + telegram_last_name = ? + WHERE telegram_user_id = ?`, + [ + data.hubUrl, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + data.telegramUserId, + ] + ); + + const updated = await this.findByTelegramUserId(data.telegramUserId); + return updated!; + } + + // Create new user with generated device ID + const deviceId = `tg-${uuidv7()}`; + + await this.db.execute( + `INSERT INTO telegram_users ( + telegram_user_id, hub_url, device_id, + telegram_username, telegram_first_name, telegram_last_name + ) VALUES (?, ?, ?, ?, ?, ?)`, + [ + data.telegramUserId, + data.hubUrl, + deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + ] + ); + + const created = await this.findByTelegramUserId(data.telegramUserId); + return created!; + } + + /** Convert database row to TelegramUser object */ + private rowToUser(row: TelegramUserRow): TelegramUser { + return { + telegramUserId: row.telegram_user_id, + hubUrl: row.hub_url, + deviceId: row.device_id, + createdAt: row.created_at, + updatedAt: row.updated_at, + telegramUsername: row.telegram_username ?? undefined, + telegramFirstName: row.telegram_first_name ?? undefined, + telegramLastName: row.telegram_last_name ?? undefined, + }; + } +} diff --git a/src/gateway/telegram/telegram.controller.ts b/src/gateway/telegram/telegram.controller.ts new file mode 100644 index 00000000..2e7c2c39 --- /dev/null +++ b/src/gateway/telegram/telegram.controller.ts @@ -0,0 +1,67 @@ +/** + * Telegram webhook controller. + * + * Receives webhook requests from Telegram Bot API. + */ + +import { Controller, Inject, Logger, Post, Req, Res, Headers } from "@nestjs/common"; +import { TelegramService } from "./telegram.service.js"; + +// Minimal Express types for webhook handling +interface ExpressRequest { + body: unknown; + header: (name: string) => string | undefined; +} + +interface ExpressResponse { + status: (code: number) => ExpressResponse; + json: (data: unknown) => void; + headersSent: boolean; +} + +@Controller("telegram") +export class TelegramController { + private readonly logger = new Logger(TelegramController.name); + + constructor(@Inject(TelegramService) private readonly telegramService: TelegramService) {} + + @Post("webhook") + async handleWebhook( + @Req() req: ExpressRequest, + @Res() res: ExpressResponse, + @Headers("x-telegram-bot-api-secret-token") secretToken?: string + ): Promise { + // Check if Telegram is configured + if (!this.telegramService.isConfigured()) { + this.logger.warn("Telegram webhook called but bot not configured"); + res.status(503).json({ error: "Telegram not configured" }); + return; + } + + // Validate secret token if configured + const expectedToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"]; + if (expectedToken && secretToken !== expectedToken) { + this.logger.warn("Invalid Telegram webhook secret token"); + res.status(401).json({ error: "Unauthorized" }); + return; + } + + // Get grammY webhook callback + const callback = this.telegramService.getWebhookCallback(); + if (!callback) { + res.status(503).json({ error: "Telegram not configured" }); + return; + } + + // Let grammY handle the request + try { + await callback(req, res); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Telegram webhook error: ${message}`); + if (!res.headersSent) { + res.status(500).json({ error: "Internal server error" }); + } + } + } +} diff --git a/src/gateway/telegram/telegram.module.ts b/src/gateway/telegram/telegram.module.ts new file mode 100644 index 00000000..debdb7b4 --- /dev/null +++ b/src/gateway/telegram/telegram.module.ts @@ -0,0 +1,17 @@ +/** + * Telegram module for Gateway. + * + * Provides Telegram webhook functionality. + */ + +import { Module } from "@nestjs/common"; +import { TelegramController } from "./telegram.controller.js"; +import { TelegramService } from "./telegram.service.js"; +import { TelegramUserStore } from "./telegram-user.store.js"; + +@Module({ + controllers: [TelegramController], + providers: [TelegramService, TelegramUserStore], + exports: [TelegramService], +}) +export class TelegramModule {} diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts new file mode 100644 index 00000000..9cf0bcac --- /dev/null +++ b/src/gateway/telegram/telegram.service.ts @@ -0,0 +1,273 @@ +/** + * Telegram service for Gateway. + * + * Handles Telegram bot interactions via webhook. + * - New users: prompts for Hub URL + * - Bound users: routes messages to their Hub + */ + +import { Inject, Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit } from "@nestjs/common"; +import { Bot, webhookCallback } from "grammy"; +import type { Context } from "grammy"; +import { EventsGateway } from "../events.gateway.js"; +import { TelegramUserStore } from "./telegram-user.store.js"; +import type { TelegramUser } from "./types.js"; + +// Minimal Express types for webhook handling +interface ExpressRequest { + body: unknown; + header: (name: string) => string | undefined; +} + +interface ExpressResponse { + status: (code: number) => ExpressResponse; + json: (data: unknown) => void; + headersSent: boolean; +} + +// Users in the process of binding Hub URL +interface PendingBinding { + awaitingUrl: boolean; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +@Injectable() +export class TelegramService implements OnModuleInit { + private bot: Bot | null = null; + private pendingBindings = new Map(); + + private readonly logger = new Logger(TelegramService.name); + + constructor( + @Inject(EventsGateway) private readonly eventsGateway: EventsGateway, + @Inject(TelegramUserStore) private readonly userStore: TelegramUserStore, + ) {} + + async onModuleInit(): Promise { + console.log("[TelegramService] onModuleInit starting..."); + const token = process.env["TELEGRAM_BOT_TOKEN"]; + if (!token) { + console.log("[TelegramService] No bot token"); + this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled"); + return; + } + + console.log("[TelegramService] Creating bot..."); + this.bot = new Bot(token); + this.setupHandlers(); + this.logger.log("Telegram bot initialized"); + } + + /** Get grammY webhook callback for Express/NestJS */ + getWebhookCallback(): ((req: ExpressRequest, res: ExpressResponse) => Promise) | null { + if (!this.bot) return null; + + const secretToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"]; + if (secretToken) { + return webhookCallback(this.bot, "express", { secretToken }) as unknown as ( + req: ExpressRequest, + res: ExpressResponse + ) => Promise; + } + return webhookCallback(this.bot, "express") as unknown as ( + req: ExpressRequest, + res: ExpressResponse + ) => Promise; + } + + /** Check if Telegram bot is configured */ + isConfigured(): boolean { + return this.bot !== null; + } + + /** Send message to a Telegram user by device ID */ + async sendToTelegram(deviceId: string, text: string): Promise { + if (!this.bot) return; + + const user = await this.userStore.findByDeviceId(deviceId); + if (!user) { + this.logger.warn(`Telegram user not found for device: deviceId=${deviceId}`); + return; + } + + try { + await this.bot.api.sendMessage(Number(user.telegramUserId), text); + this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`); + } + } + + /** Setup bot message handlers */ + private setupHandlers(): void { + if (!this.bot) return; + + this.bot.on("message:text", async (ctx) => { + await this.handleTextMessage(ctx); + }); + } + + /** Handle incoming text message */ + private async handleTextMessage(ctx: Context): Promise { + const msg = ctx.message; + if (!msg || !msg.text) return; + + const telegramUserId = String(msg.from?.id); + const text = msg.text.trim(); + + this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + + if (user) { + // User is bound, route message to Hub + await this.routeToHub(user, text, ctx); + return; + } + + // Check if user is in binding process + const pending = this.pendingBindings.get(telegramUserId); + + if (pending?.awaitingUrl) { + // User is providing Hub URL + await this.handleHubUrlInput(ctx, telegramUserId, text, pending); + return; + } + + // New user, start binding process + await this.startBindingProcess(ctx, telegramUserId); + } + + /** Start the Hub binding process for a new user */ + private async startBindingProcess(ctx: Context, telegramUserId: string): Promise { + const msg = ctx.message; + + this.pendingBindings.set(telegramUserId, { + awaitingUrl: true, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + telegramLastName: msg?.from?.last_name, + }); + + await ctx.reply( + "👋 Welcome to Multica!\n\n" + + "Please enter your Hub URL to get started.\n\n" + + "Example: https://your-hub.example.com" + ); + } + + /** Handle Hub URL input from user */ + private async handleHubUrlInput( + ctx: Context, + telegramUserId: string, + url: string, + pending: PendingBinding + ): Promise { + // Validate URL format + if (!this.isValidUrl(url)) { + await ctx.reply( + "❌ Invalid URL format.\n\n" + + "Please enter a valid Hub URL.\n" + + "Example: https://your-hub.example.com" + ); + return; + } + + // Validate Hub connectivity + const isValid = await this.validateHubUrl(url); + if (!isValid) { + await ctx.reply( + "❌ Cannot connect to this Hub.\n\n" + + "Please check the URL and make sure your Hub is online.\n" + + "Then try again with the correct URL." + ); + return; + } + + // Create user record + try { + const user = await this.userStore.upsert({ + telegramUserId, + hubUrl: url, + telegramUsername: pending.telegramUsername, + telegramFirstName: pending.telegramFirstName, + telegramLastName: pending.telegramLastName, + }); + + // Register as virtual device + this.eventsGateway.registerVirtualDevice(user.deviceId, { + sendCallback: (_event, data) => { + const payload = data as { text?: string }; + if (payload.text) { + this.sendToTelegram(user.deviceId, payload.text); + } + }, + }); + + this.pendingBindings.delete(telegramUserId); + + await ctx.reply( + "✅ Hub connected successfully!\n\n" + + `Your Device ID: ${user.deviceId}\n\n` + + "You can now send messages to interact with your agent." + ); + + this.logger.log(`Telegram user bound to Hub: telegramUserId=${telegramUserId}, hubUrl=${url}, deviceId=${user.deviceId}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to bind Telegram user: telegramUserId=${telegramUserId}, error=${message}`); + await ctx.reply("❌ An error occurred. Please try again later."); + } + } + + /** Validate URL format */ + private isValidUrl(url: string): boolean { + try { + const parsed = new URL(url); + return parsed.protocol === "http:" || parsed.protocol === "https:"; + } catch { + return false; + } + } + + /** Validate Hub URL connectivity */ + private async validateHubUrl(url: string): Promise { + try { + // Try to connect to the Hub's health endpoint + const response = await fetch(`${url}/`, { + method: "GET", + signal: AbortSignal.timeout(5000), + }); + return response.ok; + } catch { + return false; + } + } + + /** Route message to user's Hub */ + private async routeToHub(user: TelegramUser, text: string, ctx: Context): Promise { + // Ensure virtual device is registered + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.eventsGateway.registerVirtualDevice(user.deviceId, { + sendCallback: (_event, data) => { + const payload = data as { text?: string }; + if (payload.text) { + this.sendToTelegram(user.deviceId, payload.text); + } + }, + }); + } + + // TODO: Route message to Hub via EventsGateway + // For now, just acknowledge receipt + this.logger.log(`Routing message to Hub: deviceId=${user.deviceId}, hubUrl=${user.hubUrl}`); + + // Placeholder: In full implementation, this would send to Hub + await ctx.reply(`📨 Message received. Routing to your Hub...`); + } +} diff --git a/src/gateway/telegram/types.ts b/src/gateway/telegram/types.ts new file mode 100644 index 00000000..315dd10d --- /dev/null +++ b/src/gateway/telegram/types.ts @@ -0,0 +1,27 @@ +/** + * Telegram user types for Gateway. + */ + +/** Telegram user record stored in database */ +export interface TelegramUser { + telegramUserId: string; + hubUrl: string; + deviceId: string; + createdAt: Date; + updatedAt: Date; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +/** Data required to create/update a Telegram user */ +export interface TelegramUserCreate { + telegramUserId: string; + hubUrl: string; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +// Re-export VirtualDeviceHandler from parent for convenience +export type { VirtualDeviceHandler } from "../types.js"; diff --git a/src/gateway/test-app.ts b/src/gateway/test-app.ts new file mode 100644 index 00000000..05cd0bc0 --- /dev/null +++ b/src/gateway/test-app.ts @@ -0,0 +1,29 @@ +import "reflect-metadata"; +import { NestFactory } from "@nestjs/core"; +import { Module } from "@nestjs/common"; +import { LoggerModule } from "nestjs-pino"; +import { DatabaseModule } from "./database/database.module.js"; +import { GatewayModule } from "./gateway.module.js"; +import { TelegramModule } from "./telegram/telegram.module.js"; + +@Module({ + imports: [ + LoggerModule.forRoot({ pinoHttp: { level: "debug" } }), + DatabaseModule, + GatewayModule, + TelegramModule, + ], +}) +class TestAppModule {} + +console.log("Creating test app..."); +NestFactory.create(TestAppModule, { abortOnError: true }) + .then(async (app) => { + console.log("Test app created!"); + await app.listen(3000); + console.log("Listening on 3000!"); + }) + .catch((err) => { + console.error("Error:", err); + process.exit(1); + }); diff --git a/src/gateway/types.ts b/src/gateway/types.ts new file mode 100644 index 00000000..aa0dc5fc --- /dev/null +++ b/src/gateway/types.ts @@ -0,0 +1,8 @@ +/** + * Gateway types. + */ + +/** Virtual device handler for non-socket devices (e.g., Telegram) */ +export interface VirtualDeviceHandler { + sendCallback: (event: string, data: unknown) => void | Promise; +}