diff --git a/package.json b/package.json index a6da1ff5..3c492741 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "croner": "^10.0.1", "fast-glob": "^3.3.3", "grammy": "^1.39.3", + "mysql2": "^3.14.1", "json5": "^2.2.3", "linkedom": "^0.18.12", "nestjs-pino": "^4.5.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 84a3a2d7..72d97303 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -83,6 +83,9 @@ importers: linkedom: specifier: ^0.18.12 version: 0.18.12 + mysql2: + specifier: ^3.14.1 + version: 3.16.3 nestjs-pino: specifier: ^4.5.0 version: 4.5.0(@nestjs/common@11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2))(pino-http@11.0.0)(pino@10.3.0)(rxjs@7.8.2) @@ -4349,6 +4352,10 @@ packages: resolution: {integrity: sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ==} engines: {node: '>= 0.4'} + aws-ssl-profiles@1.1.2: + resolution: {integrity: sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g==} + engines: {node: '>= 6.0.0'} + axe-core@4.11.1: resolution: {integrity: sha512-BASOg+YwO2C+346x3LZOeoovTIoTrRqEsqMa6fmfAV0P+U9mFr9NsyOEpiYvFjbc64NMrSswhV50WdXzdb/Z5A==} engines: {node: '>=4'} @@ -5023,6 +5030,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -5890,6 +5901,9 @@ packages: resolution: {integrity: sha512-zV/5HKTfCeKWnxG0Dmrw51hEWFGfcF2xiXqcA3+J90WDuP0SvoiSO5ORvcBsifmx/FoIjgQN3oNOGaQ5PhLFkg==} engines: {node: '>=18'} + generate-function@2.3.1: + resolution: {integrity: sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==} + generator-function@2.0.1: resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==} engines: {node: '>= 0.4'} @@ -6400,6 +6414,9 @@ packages: is-promise@4.0.0: resolution: {integrity: sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==} + is-property@1.0.2: + resolution: {integrity: sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==} + is-regex@1.2.1: resolution: {integrity: sha512-MjYsKHO5O7mCsmRGxWcLWheFqN9DJ/2TmngvjKXihe6efViPqc274+Fx/4fYj/r03+ESvBdTXK0V6tA3rgez1g==} engines: {node: '>= 0.4'} @@ -6929,6 +6946,10 @@ packages: resolution: {integrity: sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==} engines: {node: '>=12'} + lru.min@1.1.4: + resolution: {integrity: sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA==} + engines: {bun: '>=1.0.0', deno: '>=1.30.0', node: '>=8.0.0'} + lucide-react-native@0.563.0: resolution: {integrity: sha512-q4tYoAMorTqv+UXRYc0MyiEAOF+4Bu73zxD63EDrnGCFL+xuj+imBm3E2rIKRmME0heVHlK+98fsi8wbL92LNQ==} peerDependencies: @@ -7315,9 +7336,17 @@ packages: resolution: {integrity: sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA==} engines: {node: ^18.17.0 || >=20.5.0} + mysql2@3.16.3: + resolution: {integrity: sha512-+3XhQEt4FEFuvGV0JjIDj4eP2OT/oIj/54dYvqhblnSzlfcxVOuj+cd15Xz6hsG4HU1a+A5+BA9gm0618C4z7A==} + engines: {node: '>= 8.0'} + mz@2.7.0: resolution: {integrity: sha512-z81GNO7nnYMEhrGh9LeymoE4+Yr0Wn5McHIZMK5cfQCl+NDX08sCZgUc9/6MHni9IWuFLm1Z3HTCXu2z9fN62Q==} + named-placeholders@1.1.6: + resolution: {integrity: sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w==} + engines: {node: '>=8.0.0'} + nanoid@3.3.11: resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -8439,6 +8468,9 @@ packages: resolution: {integrity: sha512-1gnZf7DFcoIcajTjTwjwuDjzuz4PPcY2StKPlsGAQ1+YH20IRVrBaXSWmdjowTJ6u8Rc01PoYOGHXfP1mYcZNQ==} engines: {node: '>= 18'} + seq-queue@0.0.5: + resolution: {integrity: sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==} + serialize-error@2.1.0: resolution: {integrity: sha512-ghgmKt5o4Tly5yEG/UJp8qTd0AN7Xalw4XBtDEKP655B699qMEtra1WlXeE6WIvdEG481JvRxULKsInq/iNysw==} engines: {node: '>=0.10.0'} @@ -8628,6 +8660,10 @@ packages: sprintf-js@1.1.3: resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==} + sqlstring@2.3.3: + resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==} + engines: {node: '>= 0.6'} + stable-hash@0.0.5: resolution: {integrity: sha512-+L3ccpzibovGXFK+Ap/f8LOS0ahMrHTf3xu7mMLSpEGU0EO9ucaysSylKo9eRDFNhWve/y275iPmIZ4z39a9iA==} @@ -14567,6 +14603,8 @@ snapshots: dependencies: possible-typed-array-names: 1.1.0 + aws-ssl-profiles@1.1.2: {} + axe-core@4.11.1: {} axobject-query@4.1.0: {} @@ -15301,6 +15339,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + depd@2.0.0: {} dequal@2.0.3: {} @@ -16561,6 +16601,10 @@ snapshots: transitivePeerDependencies: - supports-color + generate-function@2.3.1: + dependencies: + is-property: 1.0.2 + generator-function@2.0.1: {} gensync@1.0.0-beta.2: {} @@ -17149,6 +17193,8 @@ snapshots: is-promise@4.0.0: {} + is-property@1.0.2: {} + is-regex@1.2.1: dependencies: call-bound: 1.0.4 @@ -17630,6 +17676,8 @@ snapshots: lru-cache@7.18.3: {} + lru.min@1.1.4: {} + lucide-react-native@0.563.0(react-native-svg@15.15.1(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0))(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0): dependencies: react: 19.1.0 @@ -18330,12 +18378,28 @@ snapshots: mute-stream@2.0.0: {} + mysql2@3.16.3: + dependencies: + aws-ssl-profiles: 1.1.2 + denque: 2.1.0 + generate-function: 2.3.1 + iconv-lite: 0.7.2 + long: 5.3.2 + lru.min: 1.1.4 + named-placeholders: 1.1.6 + seq-queue: 0.0.5 + sqlstring: 2.3.3 + mz@2.7.0: dependencies: any-promise: 1.3.0 object-assign: 4.1.1 thenify-all: 1.6.0 + named-placeholders@1.1.6: + dependencies: + lru.min: 1.1.4 + nanoid@3.3.11: {} napi-postinstall@0.3.4: {} @@ -19691,6 +19755,8 @@ snapshots: transitivePeerDependencies: - supports-color + seq-queue@0.0.5: {} + serialize-error@2.1.0: {} serialize-error@7.0.1: @@ -19993,6 +20059,8 @@ snapshots: sprintf-js@1.1.3: optional: true + sqlstring@2.3.3: {} + stable-hash@0.0.5: {} stack-utils@2.0.6: diff --git a/src/gateway/app.module.ts b/src/gateway/app.module.ts index 03200ec5..cd478bf0 100644 --- a/src/gateway/app.module.ts +++ b/src/gateway/app.module.ts @@ -3,8 +3,10 @@ import { ServeStaticModule } from "@nestjs/serve-static"; import { LoggerModule } from "nestjs-pino"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; -import { EventsGateway } from "./events.gateway.js"; import { AppController } from "./app.controller.js"; +import { DatabaseModule } from "./database/database.module.js"; +import { GatewayModule } from "./gateway.module.js"; +import { TelegramModule } from "./telegram/telegram.module.js"; const __dirname = fileURLToPath(new URL(".", import.meta.url)); const isDev = process.env.NODE_ENV !== "production"; @@ -31,8 +33,10 @@ const isDev = process.env.NODE_ENV !== "production"; level: process.env.LOG_LEVEL ?? "info", }, }), + DatabaseModule, + GatewayModule, + TelegramModule, ], - providers: [EventsGateway], controllers: [AppController], }) export class AppModule {} diff --git a/src/gateway/database/database.module.ts b/src/gateway/database/database.module.ts new file mode 100644 index 00000000..a357a4e3 --- /dev/null +++ b/src/gateway/database/database.module.ts @@ -0,0 +1,15 @@ +/** + * Database module for Gateway. + * + * Global module that provides DatabaseService to all other modules. + */ + +import { Global, Module } from "@nestjs/common"; +import { DatabaseService } from "./database.service.js"; + +@Global() +@Module({ + providers: [DatabaseService], + exports: [DatabaseService], +}) +export class DatabaseModule {} diff --git a/src/gateway/database/database.service.ts b/src/gateway/database/database.service.ts new file mode 100644 index 00000000..27a675f6 --- /dev/null +++ b/src/gateway/database/database.service.ts @@ -0,0 +1,93 @@ +/** + * MySQL database service for Gateway. + * + * Provides a connection pool and query interface. + * Configuration is read from MYSQL_DSN environment variable. + */ + +import { Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit, OnModuleDestroy } from "@nestjs/common"; +import mysql from "mysql2/promise"; +import type { Pool, PoolOptions, RowDataPacket, ResultSetHeader } from "mysql2/promise"; + +@Injectable() +export class DatabaseService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(DatabaseService.name); + private pool: Pool | null = null; + + async onModuleInit(): Promise { + console.log("[DatabaseService] onModuleInit starting..."); + const dsn = process.env["MYSQL_DSN"]; + if (!dsn) { + console.log("[DatabaseService] MYSQL_DSN not set"); + this.logger.warn("MYSQL_DSN not set, database features disabled"); + return; + } + + try { + console.log("[DatabaseService] Parsing DSN..."); + const config = this.parseDsn(dsn); + console.log("[DatabaseService] Creating pool..."); + this.pool = mysql.createPool(config); + + // Test connection + console.log("[DatabaseService] Testing connection..."); + const connection = await this.pool.getConnection(); + connection.release(); + console.log("[DatabaseService] Connected!"); + this.logger.log(`MySQL connected (host=${config.host}, database=${config.database})`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.log("[DatabaseService] Error:", message); + this.logger.error(`Failed to connect to MySQL: ${message}`); + throw error; + } + } + + async onModuleDestroy(): Promise { + if (this.pool) { + await this.pool.end(); + this.logger.log("MySQL connection pool closed"); + } + } + + /** Check if database is available */ + isAvailable(): boolean { + return this.pool !== null; + } + + /** Execute a query and return rows */ + async query(sql: string, params?: unknown[]): Promise { + if (!this.pool) { + throw new Error("Database not available"); + } + const [rows] = await this.pool.query(sql, params); + return rows; + } + + /** Execute an insert/update/delete and return result */ + async execute(sql: string, params?: unknown[]): Promise { + if (!this.pool) { + throw new Error("Database not available"); + } + const [result] = await this.pool.execute(sql, params); + return result; + } + + /** Parse MySQL DSN string into connection options */ + private parseDsn(dsn: string): PoolOptions { + // Format: mysql://user:password@host:port/database + const url = new URL(dsn); + + return { + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : 3306, + user: url.username, + password: url.password, + database: url.pathname.slice(1), // Remove leading / + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0, + }; + } +} diff --git a/src/gateway/events.gateway.ts b/src/gateway/events.gateway.ts index ebeb2191..304ba5da 100644 --- a/src/gateway/events.gateway.ts +++ b/src/gateway/events.gateway.ts @@ -1,4 +1,4 @@ -import { Injectable } from "@nestjs/common"; +import { Injectable, Logger } from "@nestjs/common"; import { WebSocketGateway, WebSocketServer, @@ -12,7 +12,6 @@ import type { OnGatewayDisconnect, } from "@nestjs/websockets"; import type { Server, Socket } from "socket.io"; -import { InjectPinoLogger, PinoLogger } from "nestjs-pino"; import { GatewayEvents, type RoutedMessage, @@ -22,6 +21,7 @@ import { type DeviceInfo, type DeviceType, } from "@multica/sdk"; +import type { VirtualDeviceHandler } from "./types.js"; @Injectable() @WebSocketGateway({ @@ -36,10 +36,7 @@ import { export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { - constructor( - @InjectPinoLogger(EventsGateway.name) - private readonly logger: PinoLogger - ) {} + private readonly logger = new Logger(EventsGateway.name); @WebSocketServer() server!: Server; @@ -48,9 +45,11 @@ export class EventsGateway private deviceToSocket = new Map(); // socketId -> deviceInfo mapping private socketToDevice = new Map(); + // Virtual devices (non-socket based, e.g., Telegram) + private virtualDevices = new Map(); afterInit(_server: Server): void { - this.logger.info("WebSocket Gateway initialized"); + this.logger.log("WebSocket Gateway initialized"); } handleConnection(client: Socket): void { @@ -58,16 +57,10 @@ export class EventsGateway const deviceId = query["deviceId"] as string | undefined; const deviceType = query["deviceType"] as DeviceType | undefined; - this.logger.debug( - { socketId: client.id, deviceId, deviceType }, - "Incoming connection" - ); + this.logger.debug(`Incoming connection: socketId=${client.id}, deviceId=${deviceId}, deviceType=${deviceType}`); if (!deviceId || !deviceType) { - this.logger.warn( - { socketId: client.id }, - "Missing deviceId or deviceType in query, disconnecting" - ); + this.logger.warn(`Missing deviceId or deviceType in query, disconnecting (socketId=${client.id})`); client.disconnect(true); return; } @@ -75,10 +68,7 @@ export class EventsGateway // Check if deviceId is already in use by another socket const existingSocketId = this.deviceToSocket.get(deviceId); if (existingSocketId && existingSocketId !== client.id) { - this.logger.warn( - { deviceId, existingSocketId }, - "Device already registered by another socket, disconnecting" - ); + this.logger.warn(`Device already registered by another socket, disconnecting (deviceId=${deviceId}, existingSocketId=${existingSocketId})`); client.emit(GatewayEvents.REGISTERED, { success: false, deviceId, @@ -93,25 +83,19 @@ export class EventsGateway this.deviceToSocket.set(deviceId, client.id); this.socketToDevice.set(client.id, deviceInfo); - this.logger.info({ deviceId, deviceType }, "Device connected and registered"); + this.logger.log(`Device connected and registered: deviceId=${deviceId}, deviceType=${deviceType}`); client.emit(GatewayEvents.REGISTERED, { success: true, deviceId }); } handleDisconnect(client: Socket): void { const deviceInfo = this.socketToDevice.get(client.id); if (deviceInfo) { - this.logger.debug( - { socketId: client.id, deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType }, - "Device disconnecting" - ); - this.logger.info( - { deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType }, - "Device disconnected" - ); + this.logger.debug(`Device disconnecting: socketId=${client.id}, deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`); + this.logger.log(`Device disconnected: deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`); this.deviceToSocket.delete(deviceInfo.deviceId); this.socketToDevice.delete(client.id); } else { - this.logger.info({ socketId: client.id }, "Socket disconnected"); + this.logger.log(`Socket disconnected: socketId=${client.id}`); } } @@ -120,10 +104,7 @@ export class EventsGateway @MessageBody() message: RoutedMessage, @ConnectedSocket() client: Socket ): void { - this.logger.debug( - { socketId: client.id, message }, - "Received send event" - ); + this.logger.debug(`Received send event: socketId=${client.id}, messageId=${message.id}`); const senderDevice = this.socketToDevice.get(client.id); @@ -149,24 +130,27 @@ export class EventsGateway return; } - // Find target device + // Find target device — check socket-based first, then virtual const targetSocketId = this.deviceToSocket.get(message.to); - if (!targetSocketId) { - const error: SendErrorResponse = { - messageId: message.id, - error: `Device ${message.to} not found`, - code: "DEVICE_NOT_FOUND", - }; - client.emit(GatewayEvents.SEND_ERROR, error); + if (targetSocketId) { + this.logger.debug(`Routing message: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); return; } - // Forward message - this.logger.debug( - { messageId: message.id, from: message.from, to: message.to, action: message.action }, - "Routing message" - ); - this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); + const virtualHandler = this.virtualDevices.get(message.to); + if (virtualHandler) { + this.logger.debug(`Routing message to virtual device: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + virtualHandler.sendCallback(GatewayEvents.RECEIVE, message); + return; + } + + const error: SendErrorResponse = { + messageId: message.id, + error: `Device ${message.to} not found`, + code: "DEVICE_NOT_FOUND", + }; + client.emit(GatewayEvents.SEND_ERROR, error); } @SubscribeMessage(GatewayEvents.LIST_DEVICES) @@ -185,7 +169,7 @@ export class EventsGateway @MessageBody() data: PingPayload, @ConnectedSocket() client: Socket ): PongResponse { - this.logger.debug({ socketId: client.id, data }, "Received ping"); + this.logger.debug(`Received ping: socketId=${client.id}`); return { event: GatewayEvents.PONG, data: "Hello from Gateway!" }; } @@ -201,9 +185,63 @@ export class EventsGateway /** Send message to specified device (for HTTP API use) */ sendToDevice(deviceId: string, event: string, data: unknown): boolean { + // Check virtual devices first + const virtualHandler = this.virtualDevices.get(deviceId); + if (virtualHandler) { + this.logger.debug(`Routing to virtual device: deviceId=${deviceId}, event=${event}`); + virtualHandler.sendCallback(event, data); + return true; + } + + // Fall back to socket-based devices const socketId = this.deviceToSocket.get(deviceId); if (!socketId) return false; this.server.to(socketId).emit(event, data); return true; } + + /** Register a virtual device (non-socket based) */ + registerVirtualDevice(deviceId: string, handler: VirtualDeviceHandler): void { + this.virtualDevices.set(deviceId, handler); + this.logger.log(`Virtual device registered: deviceId=${deviceId}`); + } + + /** Unregister a virtual device */ + unregisterVirtualDevice(deviceId: string): void { + this.virtualDevices.delete(deviceId); + this.logger.log(`Virtual device unregistered: deviceId=${deviceId}`); + } + + /** Check if a device (socket or virtual) is registered */ + isDeviceRegistered(deviceId: string): boolean { + return this.deviceToSocket.has(deviceId) || this.virtualDevices.has(deviceId); + } + + /** Route a message originating from a virtual device to its target */ + routeFromVirtualDevice(message: RoutedMessage): boolean { + // Validate sender is a registered virtual device + if (!this.virtualDevices.has(message.from)) { + this.logger.warn(`routeFromVirtualDevice: sender not a virtual device (from=${message.from})`); + return false; + } + + // Try socket-based target first + const targetSocketId = this.deviceToSocket.get(message.to); + if (targetSocketId) { + this.logger.debug(`Virtual device routing: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message); + return true; + } + + // Try virtual device target + const virtualHandler = this.virtualDevices.get(message.to); + if (virtualHandler) { + this.logger.debug(`Virtual device routing (v2v): id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`); + virtualHandler.sendCallback(GatewayEvents.RECEIVE, message); + return true; + } + + this.logger.warn(`routeFromVirtualDevice: target not found (to=${message.to})`); + return false; + } } diff --git a/src/gateway/gateway.module.ts b/src/gateway/gateway.module.ts index 414d18d4..3b285db3 100644 --- a/src/gateway/gateway.module.ts +++ b/src/gateway/gateway.module.ts @@ -1,6 +1,7 @@ -import { Module } from "@nestjs/common"; +import { Global, Module } from "@nestjs/common"; import { EventsGateway } from "./events.gateway.js"; +@Global() @Module({ providers: [EventsGateway], exports: [EventsGateway], diff --git a/src/gateway/main.ts b/src/gateway/main.ts index 424ee6b0..02a9a017 100644 --- a/src/gateway/main.ts +++ b/src/gateway/main.ts @@ -3,16 +3,27 @@ import { NestFactory } from "@nestjs/core"; import { Logger } from "nestjs-pino"; import { AppModule } from "./app.module.js"; +console.log("[Gateway] Starting bootstrap..."); + async function bootstrap(): Promise { - const app = await NestFactory.create(AppModule, { bufferLogs: true }); - app.useLogger(app.get(Logger)); + console.log("[Gateway] Creating NestFactory..."); + try { + const app = await NestFactory.create(AppModule, { bufferLogs: true, abortOnError: false }); + console.log("[Gateway] NestFactory created"); - const port = process.env["PORT"] ?? 3000; - await app.listen(port); + app.useLogger(app.get(Logger)); - const logger = app.get(Logger); - logger.log(`Gateway is running on http://localhost:${port}`); - logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`); + const port = process.env["PORT"] ?? 3000; + console.log(`[Gateway] Listening on port ${port}...`); + await app.listen(port); + + const logger = app.get(Logger); + logger.log(`Gateway is running on http://localhost:${port}`); + logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`); + } catch (err) { + console.error("[Gateway] Error during bootstrap:", err); + throw err; + } } bootstrap().catch((err) => { diff --git a/src/gateway/telegram/telegram-user.store.ts b/src/gateway/telegram/telegram-user.store.ts new file mode 100644 index 00000000..5bd839ec --- /dev/null +++ b/src/gateway/telegram/telegram-user.store.ts @@ -0,0 +1,152 @@ +/** + * Telegram user store - MySQL persistence layer. + */ + +import { Inject, Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit } from "@nestjs/common"; +import { v7 as uuidv7 } from "uuid"; +import type { RowDataPacket } from "mysql2/promise"; +import { DatabaseService } from "../database/database.service.js"; +import type { TelegramUser, TelegramUserCreate } from "./types.js"; + +interface TelegramUserRow extends RowDataPacket { + telegram_user_id: string; + hub_url: string; + device_id: string; + created_at: Date; + updated_at: Date; + telegram_username: string | null; + telegram_first_name: string | null; + telegram_last_name: string | null; +} + +@Injectable() +export class TelegramUserStore implements OnModuleInit { + private readonly logger = new Logger(TelegramUserStore.name); + + constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {} + + async onModuleInit(): Promise { + console.log("[TelegramUserStore] onModuleInit starting..."); + if (!this.db.isAvailable()) { + console.log("[TelegramUserStore] Database not available"); + this.logger.warn("Database not available, TelegramUserStore disabled"); + return; + } + console.log("[TelegramUserStore] Ensuring table..."); + await this.ensureTable(); + console.log("[TelegramUserStore] Done"); + } + + /** Create telegram_users table if not exists */ + private async ensureTable(): Promise { + const sql = ` + CREATE TABLE IF NOT EXISTS telegram_users ( + telegram_user_id VARCHAR(64) PRIMARY KEY, + hub_url VARCHAR(512) NOT NULL, + device_id VARCHAR(64) NOT NULL UNIQUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + telegram_username VARCHAR(255), + telegram_first_name VARCHAR(255), + telegram_last_name VARCHAR(255), + INDEX idx_device_id (device_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + `; + await this.db.execute(sql); + this.logger.log("telegram_users table ensured"); + } + + /** Find user by Telegram user ID */ + async findByTelegramUserId(telegramUserId: string): Promise { + if (!this.db.isAvailable()) return null; + + const rows = await this.db.query( + "SELECT * FROM telegram_users WHERE telegram_user_id = ?", + [telegramUserId] + ); + + if (rows.length === 0) return null; + return this.rowToUser(rows[0]!); + } + + /** Find user by device ID */ + async findByDeviceId(deviceId: string): Promise { + if (!this.db.isAvailable()) return null; + + const rows = await this.db.query( + "SELECT * FROM telegram_users WHERE device_id = ?", + [deviceId] + ); + + if (rows.length === 0) return null; + return this.rowToUser(rows[0]!); + } + + /** Create or update a Telegram user */ + async upsert(data: TelegramUserCreate): Promise { + if (!this.db.isAvailable()) { + throw new Error("Database not available"); + } + + // Check if user exists + const existing = await this.findByTelegramUserId(data.telegramUserId); + + if (existing) { + // Update existing user + await this.db.execute( + `UPDATE telegram_users SET + hub_url = ?, + telegram_username = ?, + telegram_first_name = ?, + telegram_last_name = ? + WHERE telegram_user_id = ?`, + [ + data.hubUrl, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + data.telegramUserId, + ] + ); + + const updated = await this.findByTelegramUserId(data.telegramUserId); + return updated!; + } + + // Create new user with generated device ID + const deviceId = `tg-${uuidv7()}`; + + await this.db.execute( + `INSERT INTO telegram_users ( + telegram_user_id, hub_url, device_id, + telegram_username, telegram_first_name, telegram_last_name + ) VALUES (?, ?, ?, ?, ?, ?)`, + [ + data.telegramUserId, + data.hubUrl, + deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + ] + ); + + const created = await this.findByTelegramUserId(data.telegramUserId); + return created!; + } + + /** Convert database row to TelegramUser object */ + private rowToUser(row: TelegramUserRow): TelegramUser { + return { + telegramUserId: row.telegram_user_id, + hubUrl: row.hub_url, + deviceId: row.device_id, + createdAt: row.created_at, + updatedAt: row.updated_at, + telegramUsername: row.telegram_username ?? undefined, + telegramFirstName: row.telegram_first_name ?? undefined, + telegramLastName: row.telegram_last_name ?? undefined, + }; + } +} diff --git a/src/gateway/telegram/telegram.controller.ts b/src/gateway/telegram/telegram.controller.ts new file mode 100644 index 00000000..2e7c2c39 --- /dev/null +++ b/src/gateway/telegram/telegram.controller.ts @@ -0,0 +1,67 @@ +/** + * Telegram webhook controller. + * + * Receives webhook requests from Telegram Bot API. + */ + +import { Controller, Inject, Logger, Post, Req, Res, Headers } from "@nestjs/common"; +import { TelegramService } from "./telegram.service.js"; + +// Minimal Express types for webhook handling +interface ExpressRequest { + body: unknown; + header: (name: string) => string | undefined; +} + +interface ExpressResponse { + status: (code: number) => ExpressResponse; + json: (data: unknown) => void; + headersSent: boolean; +} + +@Controller("telegram") +export class TelegramController { + private readonly logger = new Logger(TelegramController.name); + + constructor(@Inject(TelegramService) private readonly telegramService: TelegramService) {} + + @Post("webhook") + async handleWebhook( + @Req() req: ExpressRequest, + @Res() res: ExpressResponse, + @Headers("x-telegram-bot-api-secret-token") secretToken?: string + ): Promise { + // Check if Telegram is configured + if (!this.telegramService.isConfigured()) { + this.logger.warn("Telegram webhook called but bot not configured"); + res.status(503).json({ error: "Telegram not configured" }); + return; + } + + // Validate secret token if configured + const expectedToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"]; + if (expectedToken && secretToken !== expectedToken) { + this.logger.warn("Invalid Telegram webhook secret token"); + res.status(401).json({ error: "Unauthorized" }); + return; + } + + // Get grammY webhook callback + const callback = this.telegramService.getWebhookCallback(); + if (!callback) { + res.status(503).json({ error: "Telegram not configured" }); + return; + } + + // Let grammY handle the request + try { + await callback(req, res); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Telegram webhook error: ${message}`); + if (!res.headersSent) { + res.status(500).json({ error: "Internal server error" }); + } + } + } +} diff --git a/src/gateway/telegram/telegram.module.ts b/src/gateway/telegram/telegram.module.ts new file mode 100644 index 00000000..debdb7b4 --- /dev/null +++ b/src/gateway/telegram/telegram.module.ts @@ -0,0 +1,17 @@ +/** + * Telegram module for Gateway. + * + * Provides Telegram webhook functionality. + */ + +import { Module } from "@nestjs/common"; +import { TelegramController } from "./telegram.controller.js"; +import { TelegramService } from "./telegram.service.js"; +import { TelegramUserStore } from "./telegram-user.store.js"; + +@Module({ + controllers: [TelegramController], + providers: [TelegramService, TelegramUserStore], + exports: [TelegramService], +}) +export class TelegramModule {} diff --git a/src/gateway/telegram/telegram.service.ts b/src/gateway/telegram/telegram.service.ts new file mode 100644 index 00000000..9cf0bcac --- /dev/null +++ b/src/gateway/telegram/telegram.service.ts @@ -0,0 +1,273 @@ +/** + * Telegram service for Gateway. + * + * Handles Telegram bot interactions via webhook. + * - New users: prompts for Hub URL + * - Bound users: routes messages to their Hub + */ + +import { Inject, Injectable, Logger } from "@nestjs/common"; +import type { OnModuleInit } from "@nestjs/common"; +import { Bot, webhookCallback } from "grammy"; +import type { Context } from "grammy"; +import { EventsGateway } from "../events.gateway.js"; +import { TelegramUserStore } from "./telegram-user.store.js"; +import type { TelegramUser } from "./types.js"; + +// Minimal Express types for webhook handling +interface ExpressRequest { + body: unknown; + header: (name: string) => string | undefined; +} + +interface ExpressResponse { + status: (code: number) => ExpressResponse; + json: (data: unknown) => void; + headersSent: boolean; +} + +// Users in the process of binding Hub URL +interface PendingBinding { + awaitingUrl: boolean; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +@Injectable() +export class TelegramService implements OnModuleInit { + private bot: Bot | null = null; + private pendingBindings = new Map(); + + private readonly logger = new Logger(TelegramService.name); + + constructor( + @Inject(EventsGateway) private readonly eventsGateway: EventsGateway, + @Inject(TelegramUserStore) private readonly userStore: TelegramUserStore, + ) {} + + async onModuleInit(): Promise { + console.log("[TelegramService] onModuleInit starting..."); + const token = process.env["TELEGRAM_BOT_TOKEN"]; + if (!token) { + console.log("[TelegramService] No bot token"); + this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled"); + return; + } + + console.log("[TelegramService] Creating bot..."); + this.bot = new Bot(token); + this.setupHandlers(); + this.logger.log("Telegram bot initialized"); + } + + /** Get grammY webhook callback for Express/NestJS */ + getWebhookCallback(): ((req: ExpressRequest, res: ExpressResponse) => Promise) | null { + if (!this.bot) return null; + + const secretToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"]; + if (secretToken) { + return webhookCallback(this.bot, "express", { secretToken }) as unknown as ( + req: ExpressRequest, + res: ExpressResponse + ) => Promise; + } + return webhookCallback(this.bot, "express") as unknown as ( + req: ExpressRequest, + res: ExpressResponse + ) => Promise; + } + + /** Check if Telegram bot is configured */ + isConfigured(): boolean { + return this.bot !== null; + } + + /** Send message to a Telegram user by device ID */ + async sendToTelegram(deviceId: string, text: string): Promise { + if (!this.bot) return; + + const user = await this.userStore.findByDeviceId(deviceId); + if (!user) { + this.logger.warn(`Telegram user not found for device: deviceId=${deviceId}`); + return; + } + + try { + await this.bot.api.sendMessage(Number(user.telegramUserId), text); + this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`); + } + } + + /** Setup bot message handlers */ + private setupHandlers(): void { + if (!this.bot) return; + + this.bot.on("message:text", async (ctx) => { + await this.handleTextMessage(ctx); + }); + } + + /** Handle incoming text message */ + private async handleTextMessage(ctx: Context): Promise { + const msg = ctx.message; + if (!msg || !msg.text) return; + + const telegramUserId = String(msg.from?.id); + const text = msg.text.trim(); + + this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`); + + // Check if user is bound + const user = await this.userStore.findByTelegramUserId(telegramUserId); + + if (user) { + // User is bound, route message to Hub + await this.routeToHub(user, text, ctx); + return; + } + + // Check if user is in binding process + const pending = this.pendingBindings.get(telegramUserId); + + if (pending?.awaitingUrl) { + // User is providing Hub URL + await this.handleHubUrlInput(ctx, telegramUserId, text, pending); + return; + } + + // New user, start binding process + await this.startBindingProcess(ctx, telegramUserId); + } + + /** Start the Hub binding process for a new user */ + private async startBindingProcess(ctx: Context, telegramUserId: string): Promise { + const msg = ctx.message; + + this.pendingBindings.set(telegramUserId, { + awaitingUrl: true, + telegramUsername: msg?.from?.username, + telegramFirstName: msg?.from?.first_name, + telegramLastName: msg?.from?.last_name, + }); + + await ctx.reply( + "👋 Welcome to Multica!\n\n" + + "Please enter your Hub URL to get started.\n\n" + + "Example: https://your-hub.example.com" + ); + } + + /** Handle Hub URL input from user */ + private async handleHubUrlInput( + ctx: Context, + telegramUserId: string, + url: string, + pending: PendingBinding + ): Promise { + // Validate URL format + if (!this.isValidUrl(url)) { + await ctx.reply( + "❌ Invalid URL format.\n\n" + + "Please enter a valid Hub URL.\n" + + "Example: https://your-hub.example.com" + ); + return; + } + + // Validate Hub connectivity + const isValid = await this.validateHubUrl(url); + if (!isValid) { + await ctx.reply( + "❌ Cannot connect to this Hub.\n\n" + + "Please check the URL and make sure your Hub is online.\n" + + "Then try again with the correct URL." + ); + return; + } + + // Create user record + try { + const user = await this.userStore.upsert({ + telegramUserId, + hubUrl: url, + telegramUsername: pending.telegramUsername, + telegramFirstName: pending.telegramFirstName, + telegramLastName: pending.telegramLastName, + }); + + // Register as virtual device + this.eventsGateway.registerVirtualDevice(user.deviceId, { + sendCallback: (_event, data) => { + const payload = data as { text?: string }; + if (payload.text) { + this.sendToTelegram(user.deviceId, payload.text); + } + }, + }); + + this.pendingBindings.delete(telegramUserId); + + await ctx.reply( + "✅ Hub connected successfully!\n\n" + + `Your Device ID: ${user.deviceId}\n\n` + + "You can now send messages to interact with your agent." + ); + + this.logger.log(`Telegram user bound to Hub: telegramUserId=${telegramUserId}, hubUrl=${url}, deviceId=${user.deviceId}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to bind Telegram user: telegramUserId=${telegramUserId}, error=${message}`); + await ctx.reply("❌ An error occurred. Please try again later."); + } + } + + /** Validate URL format */ + private isValidUrl(url: string): boolean { + try { + const parsed = new URL(url); + return parsed.protocol === "http:" || parsed.protocol === "https:"; + } catch { + return false; + } + } + + /** Validate Hub URL connectivity */ + private async validateHubUrl(url: string): Promise { + try { + // Try to connect to the Hub's health endpoint + const response = await fetch(`${url}/`, { + method: "GET", + signal: AbortSignal.timeout(5000), + }); + return response.ok; + } catch { + return false; + } + } + + /** Route message to user's Hub */ + private async routeToHub(user: TelegramUser, text: string, ctx: Context): Promise { + // Ensure virtual device is registered + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.eventsGateway.registerVirtualDevice(user.deviceId, { + sendCallback: (_event, data) => { + const payload = data as { text?: string }; + if (payload.text) { + this.sendToTelegram(user.deviceId, payload.text); + } + }, + }); + } + + // TODO: Route message to Hub via EventsGateway + // For now, just acknowledge receipt + this.logger.log(`Routing message to Hub: deviceId=${user.deviceId}, hubUrl=${user.hubUrl}`); + + // Placeholder: In full implementation, this would send to Hub + await ctx.reply(`📨 Message received. Routing to your Hub...`); + } +} diff --git a/src/gateway/telegram/types.ts b/src/gateway/telegram/types.ts new file mode 100644 index 00000000..315dd10d --- /dev/null +++ b/src/gateway/telegram/types.ts @@ -0,0 +1,27 @@ +/** + * Telegram user types for Gateway. + */ + +/** Telegram user record stored in database */ +export interface TelegramUser { + telegramUserId: string; + hubUrl: string; + deviceId: string; + createdAt: Date; + updatedAt: Date; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +/** Data required to create/update a Telegram user */ +export interface TelegramUserCreate { + telegramUserId: string; + hubUrl: string; + telegramUsername?: string | undefined; + telegramFirstName?: string | undefined; + telegramLastName?: string | undefined; +} + +// Re-export VirtualDeviceHandler from parent for convenience +export type { VirtualDeviceHandler } from "../types.js"; diff --git a/src/gateway/test-app.ts b/src/gateway/test-app.ts new file mode 100644 index 00000000..05cd0bc0 --- /dev/null +++ b/src/gateway/test-app.ts @@ -0,0 +1,29 @@ +import "reflect-metadata"; +import { NestFactory } from "@nestjs/core"; +import { Module } from "@nestjs/common"; +import { LoggerModule } from "nestjs-pino"; +import { DatabaseModule } from "./database/database.module.js"; +import { GatewayModule } from "./gateway.module.js"; +import { TelegramModule } from "./telegram/telegram.module.js"; + +@Module({ + imports: [ + LoggerModule.forRoot({ pinoHttp: { level: "debug" } }), + DatabaseModule, + GatewayModule, + TelegramModule, + ], +}) +class TestAppModule {} + +console.log("Creating test app..."); +NestFactory.create(TestAppModule, { abortOnError: true }) + .then(async (app) => { + console.log("Test app created!"); + await app.listen(3000); + console.log("Listening on 3000!"); + }) + .catch((err) => { + console.error("Error:", err); + process.exit(1); + }); diff --git a/src/gateway/types.ts b/src/gateway/types.ts new file mode 100644 index 00000000..aa0dc5fc --- /dev/null +++ b/src/gateway/types.ts @@ -0,0 +1,8 @@ +/** + * Gateway types. + */ + +/** Virtual device handler for non-socket devices (e.g., Telegram) */ +export interface VirtualDeviceHandler { + sendCallback: (event: string, data: unknown) => void | Promise; +}