Merge pull request #117 from multica-ai/fix/gateway-pino-deadlock

feat(gateway): Telegram channel integration with connection link verification
This commit is contained in:
LinYushen 2026-02-10 17:00:18 +08:00 committed by GitHub
commit 187199d389
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1008 additions and 64 deletions

View file

@ -15,6 +15,7 @@ interface DeviceMeta {
userAgent?: string
platform?: string
language?: string
clientName?: string
}
interface PendingConfirm {
@ -55,9 +56,11 @@ export function DeviceConfirmDialog() {
? parseUserAgent(pending.meta.userAgent)
: null
const deviceLabel = parsed
? `${parsed.browser} on ${parsed.os}`
: pending?.deviceId
const deviceLabel = pending?.meta?.clientName
? pending.meta.clientName
: parsed
? `${parsed.browser} on ${parsed.os}`
: pending?.deviceId
return (
<AlertDialog open={pending !== null}>

View file

@ -44,9 +44,11 @@ function DeviceItem({
? parseUserAgent(device.meta.userAgent)
: null
const displayName = parsed
? `${parsed.browser} on ${parsed.os}`
: device.deviceId
const displayName = device.meta?.clientName
? device.meta.clientName
: parsed
? `${parsed.browser} on ${parsed.os}`
: device.deviceId
const handleRevoke = async () => {
setRevoking(true)

View file

@ -4,6 +4,7 @@ export interface DeviceMeta {
userAgent?: string
platform?: string
language?: string
clientName?: string
}
export interface DeviceEntry {

View file

@ -53,6 +53,7 @@
"@mariozechner/pi-coding-agent": "^0.52.9",
"@mozilla/readability": "^0.6.0",
"@multica/sdk": "workspace:*",
"@multica/store": "workspace:*",
"@nestjs/common": "^11.1.12",
"@nestjs/core": "^11.1.12",
"@nestjs/platform-express": "^11.1.12",
@ -63,6 +64,7 @@
"croner": "^10.0.1",
"fast-glob": "^3.3.3",
"grammy": "^1.39.3",
"mysql2": "^3.14.1",
"json5": "^2.2.3",
"linkedom": "^0.18.12",
"nestjs-pino": "^4.5.0",

View file

@ -133,6 +133,7 @@ export interface DeviceMeta {
userAgent?: string;
platform?: string;
language?: string;
clientName?: string;
}
/** verify - request params */

71
pnpm-lock.yaml generated
View file

@ -47,6 +47,9 @@ importers:
'@multica/sdk':
specifier: workspace:*
version: link:packages/sdk
'@multica/store':
specifier: workspace:*
version: link:packages/store
'@nestjs/common':
specifier: ^11.1.12
version: 11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2)
@ -83,6 +86,9 @@ importers:
linkedom:
specifier: ^0.18.12
version: 0.18.12
mysql2:
specifier: ^3.14.1
version: 3.16.3
nestjs-pino:
specifier: ^4.5.0
version: 4.5.0(@nestjs/common@11.1.12(reflect-metadata@0.2.2)(rxjs@7.8.2))(pino-http@11.0.0)(pino@10.3.0)(rxjs@7.8.2)
@ -4349,6 +4355,10 @@ packages:
resolution: {integrity: sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ==}
engines: {node: '>= 0.4'}
aws-ssl-profiles@1.1.2:
resolution: {integrity: sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g==}
engines: {node: '>= 6.0.0'}
axe-core@4.11.1:
resolution: {integrity: sha512-BASOg+YwO2C+346x3LZOeoovTIoTrRqEsqMa6fmfAV0P+U9mFr9NsyOEpiYvFjbc64NMrSswhV50WdXzdb/Z5A==}
engines: {node: '>=4'}
@ -5023,6 +5033,10 @@ packages:
resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==}
engines: {node: '>=0.4.0'}
denque@2.1.0:
resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==}
engines: {node: '>=0.10'}
depd@2.0.0:
resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==}
engines: {node: '>= 0.8'}
@ -5890,6 +5904,9 @@ packages:
resolution: {integrity: sha512-zV/5HKTfCeKWnxG0Dmrw51hEWFGfcF2xiXqcA3+J90WDuP0SvoiSO5ORvcBsifmx/FoIjgQN3oNOGaQ5PhLFkg==}
engines: {node: '>=18'}
generate-function@2.3.1:
resolution: {integrity: sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==}
generator-function@2.0.1:
resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==}
engines: {node: '>= 0.4'}
@ -6400,6 +6417,9 @@ packages:
is-promise@4.0.0:
resolution: {integrity: sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==}
is-property@1.0.2:
resolution: {integrity: sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==}
is-regex@1.2.1:
resolution: {integrity: sha512-MjYsKHO5O7mCsmRGxWcLWheFqN9DJ/2TmngvjKXihe6efViPqc274+Fx/4fYj/r03+ESvBdTXK0V6tA3rgez1g==}
engines: {node: '>= 0.4'}
@ -6929,6 +6949,10 @@ packages:
resolution: {integrity: sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==}
engines: {node: '>=12'}
lru.min@1.1.4:
resolution: {integrity: sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA==}
engines: {bun: '>=1.0.0', deno: '>=1.30.0', node: '>=8.0.0'}
lucide-react-native@0.563.0:
resolution: {integrity: sha512-q4tYoAMorTqv+UXRYc0MyiEAOF+4Bu73zxD63EDrnGCFL+xuj+imBm3E2rIKRmME0heVHlK+98fsi8wbL92LNQ==}
peerDependencies:
@ -7315,9 +7339,17 @@ packages:
resolution: {integrity: sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA==}
engines: {node: ^18.17.0 || >=20.5.0}
mysql2@3.16.3:
resolution: {integrity: sha512-+3XhQEt4FEFuvGV0JjIDj4eP2OT/oIj/54dYvqhblnSzlfcxVOuj+cd15Xz6hsG4HU1a+A5+BA9gm0618C4z7A==}
engines: {node: '>= 8.0'}
mz@2.7.0:
resolution: {integrity: sha512-z81GNO7nnYMEhrGh9LeymoE4+Yr0Wn5McHIZMK5cfQCl+NDX08sCZgUc9/6MHni9IWuFLm1Z3HTCXu2z9fN62Q==}
named-placeholders@1.1.6:
resolution: {integrity: sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w==}
engines: {node: '>=8.0.0'}
nanoid@3.3.11:
resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==}
engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1}
@ -8439,6 +8471,9 @@ packages:
resolution: {integrity: sha512-1gnZf7DFcoIcajTjTwjwuDjzuz4PPcY2StKPlsGAQ1+YH20IRVrBaXSWmdjowTJ6u8Rc01PoYOGHXfP1mYcZNQ==}
engines: {node: '>= 18'}
seq-queue@0.0.5:
resolution: {integrity: sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==}
serialize-error@2.1.0:
resolution: {integrity: sha512-ghgmKt5o4Tly5yEG/UJp8qTd0AN7Xalw4XBtDEKP655B699qMEtra1WlXeE6WIvdEG481JvRxULKsInq/iNysw==}
engines: {node: '>=0.10.0'}
@ -8628,6 +8663,10 @@ packages:
sprintf-js@1.1.3:
resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==}
sqlstring@2.3.3:
resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==}
engines: {node: '>= 0.6'}
stable-hash@0.0.5:
resolution: {integrity: sha512-+L3ccpzibovGXFK+Ap/f8LOS0ahMrHTf3xu7mMLSpEGU0EO9ucaysSylKo9eRDFNhWve/y275iPmIZ4z39a9iA==}
@ -14567,6 +14606,8 @@ snapshots:
dependencies:
possible-typed-array-names: 1.1.0
aws-ssl-profiles@1.1.2: {}
axe-core@4.11.1: {}
axobject-query@4.1.0: {}
@ -15301,6 +15342,8 @@ snapshots:
delayed-stream@1.0.0: {}
denque@2.1.0: {}
depd@2.0.0: {}
dequal@2.0.3: {}
@ -16561,6 +16604,10 @@ snapshots:
transitivePeerDependencies:
- supports-color
generate-function@2.3.1:
dependencies:
is-property: 1.0.2
generator-function@2.0.1: {}
gensync@1.0.0-beta.2: {}
@ -17149,6 +17196,8 @@ snapshots:
is-promise@4.0.0: {}
is-property@1.0.2: {}
is-regex@1.2.1:
dependencies:
call-bound: 1.0.4
@ -17630,6 +17679,8 @@ snapshots:
lru-cache@7.18.3: {}
lru.min@1.1.4: {}
lucide-react-native@0.563.0(react-native-svg@15.15.1(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0))(react-native@0.81.5(@babel/core@7.28.6)(@types/react@19.1.17)(react@19.1.0))(react@19.1.0):
dependencies:
react: 19.1.0
@ -18330,12 +18381,28 @@ snapshots:
mute-stream@2.0.0: {}
mysql2@3.16.3:
dependencies:
aws-ssl-profiles: 1.1.2
denque: 2.1.0
generate-function: 2.3.1
iconv-lite: 0.7.2
long: 5.3.2
lru.min: 1.1.4
named-placeholders: 1.1.6
seq-queue: 0.0.5
sqlstring: 2.3.3
mz@2.7.0:
dependencies:
any-promise: 1.3.0
object-assign: 4.1.1
thenify-all: 1.6.0
named-placeholders@1.1.6:
dependencies:
lru.min: 1.1.4
nanoid@3.3.11: {}
napi-postinstall@0.3.4: {}
@ -19691,6 +19758,8 @@ snapshots:
transitivePeerDependencies:
- supports-color
seq-queue@0.0.5: {}
serialize-error@2.1.0: {}
serialize-error@7.0.1:
@ -19993,6 +20062,8 @@ snapshots:
sprintf-js@1.1.3:
optional: true
sqlstring@2.3.3: {}
stable-hash@0.0.5: {}
stack-utils@2.0.6:

View file

@ -3,8 +3,10 @@ import { ServeStaticModule } from "@nestjs/serve-static";
import { LoggerModule } from "nestjs-pino";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { EventsGateway } from "./events.gateway.js";
import { AppController } from "./app.controller.js";
import { DatabaseModule } from "./database/database.module.js";
import { GatewayModule } from "./gateway.module.js";
import { TelegramModule } from "./telegram/telegram.module.js";
const __dirname = fileURLToPath(new URL(".", import.meta.url));
const isDev = process.env.NODE_ENV !== "production";
@ -31,8 +33,10 @@ const isDev = process.env.NODE_ENV !== "production";
level: process.env.LOG_LEVEL ?? "info",
},
}),
DatabaseModule,
GatewayModule,
TelegramModule,
],
providers: [EventsGateway],
controllers: [AppController],
})
export class AppModule {}

View file

@ -0,0 +1,15 @@
/**
* Database module for Gateway.
*
* Global module that provides DatabaseService to all other modules.
*/
import { Global, Module } from "@nestjs/common";
import { DatabaseService } from "./database.service.js";
@Global()
@Module({
providers: [DatabaseService],
exports: [DatabaseService],
})
export class DatabaseModule {}

View file

@ -0,0 +1,97 @@
/**
* MySQL database service for Gateway.
*
* Provides a connection pool and query interface.
* Configuration is read from MYSQL_DSN environment variable.
*/
import { Injectable, Logger } from "@nestjs/common";
import type { OnModuleInit, OnModuleDestroy } from "@nestjs/common";
import mysql from "mysql2/promise";
import type { Pool, PoolOptions, RowDataPacket, ResultSetHeader } from "mysql2/promise";
@Injectable()
export class DatabaseService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(DatabaseService.name);
private pool: Pool | null = null;
async onModuleInit(): Promise<void> {
console.log("[DatabaseService] onModuleInit starting...");
const dsn = process.env["MYSQL_DSN"];
if (!dsn) {
console.log("[DatabaseService] MYSQL_DSN not set");
this.logger.warn("MYSQL_DSN not set, database features disabled");
return;
}
try {
console.log("[DatabaseService] Parsing DSN...");
const config = this.parseDsn(dsn);
console.log("[DatabaseService] Creating pool...");
this.pool = mysql.createPool(config);
// Test connection
console.log("[DatabaseService] Testing connection...");
const connection = await this.pool.getConnection();
connection.release();
console.log("[DatabaseService] Connected!");
this.logger.log(`MySQL connected (host=${config.host}, database=${config.database})`);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.log("[DatabaseService] Error:", message);
this.logger.error(`Failed to connect to MySQL: ${message}`);
// Graceful degradation: close broken pool, keep this.pool = null
if (this.pool) {
await this.pool.end().catch(() => {});
this.pool = null;
}
}
}
async onModuleDestroy(): Promise<void> {
if (this.pool) {
await this.pool.end();
this.logger.log("MySQL connection pool closed");
}
}
/** Check if database is available */
isAvailable(): boolean {
return this.pool !== null;
}
/** Execute a query and return rows */
async query<T extends RowDataPacket[]>(sql: string, params?: unknown[]): Promise<T> {
if (!this.pool) {
throw new Error("Database not available");
}
const [rows] = await this.pool.query<T>(sql, params);
return rows;
}
/** Execute an insert/update/delete and return result */
async execute(sql: string, params?: unknown[]): Promise<ResultSetHeader> {
if (!this.pool) {
throw new Error("Database not available");
}
const [result] = await this.pool.execute<ResultSetHeader>(sql, params);
return result;
}
/** Parse MySQL DSN string into connection options */
private parseDsn(dsn: string): PoolOptions {
// Format: mysql://user:password@host:port/database
const url = new URL(dsn);
return {
host: url.hostname,
port: url.port ? parseInt(url.port, 10) : 3306,
user: url.username,
password: url.password,
database: url.pathname.slice(1), // Remove leading /
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
};
}
}

View file

@ -1,4 +1,4 @@
import { Injectable } from "@nestjs/common";
import { Injectable, Logger } from "@nestjs/common";
import {
WebSocketGateway,
WebSocketServer,
@ -12,7 +12,6 @@ import type {
OnGatewayDisconnect,
} from "@nestjs/websockets";
import type { Server, Socket } from "socket.io";
import { InjectPinoLogger, PinoLogger } from "nestjs-pino";
import {
GatewayEvents,
type RoutedMessage,
@ -22,6 +21,7 @@ import {
type DeviceInfo,
type DeviceType,
} from "@multica/sdk";
import type { VirtualDeviceHandler } from "./types.js";
@Injectable()
@WebSocketGateway({
@ -36,10 +36,7 @@ import {
export class EventsGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
constructor(
@InjectPinoLogger(EventsGateway.name)
private readonly logger: PinoLogger
) {}
private readonly logger = new Logger(EventsGateway.name);
@WebSocketServer()
server!: Server;
@ -48,9 +45,11 @@ export class EventsGateway
private deviceToSocket = new Map<string, string>();
// socketId -> deviceInfo mapping
private socketToDevice = new Map<string, DeviceInfo>();
// Virtual devices (non-socket based, e.g., Telegram)
private virtualDevices = new Map<string, VirtualDeviceHandler>();
afterInit(_server: Server): void {
this.logger.info("WebSocket Gateway initialized");
this.logger.log("WebSocket Gateway initialized");
}
handleConnection(client: Socket): void {
@ -58,16 +57,10 @@ export class EventsGateway
const deviceId = query["deviceId"] as string | undefined;
const deviceType = query["deviceType"] as DeviceType | undefined;
this.logger.debug(
{ socketId: client.id, deviceId, deviceType },
"Incoming connection"
);
this.logger.debug(`Incoming connection: socketId=${client.id}, deviceId=${deviceId}, deviceType=${deviceType}`);
if (!deviceId || !deviceType) {
this.logger.warn(
{ socketId: client.id },
"Missing deviceId or deviceType in query, disconnecting"
);
this.logger.warn(`Missing deviceId or deviceType in query, disconnecting (socketId=${client.id})`);
client.disconnect(true);
return;
}
@ -75,10 +68,7 @@ export class EventsGateway
// Check if deviceId is already in use by another socket
const existingSocketId = this.deviceToSocket.get(deviceId);
if (existingSocketId && existingSocketId !== client.id) {
this.logger.warn(
{ deviceId, existingSocketId },
"Device already registered by another socket, disconnecting"
);
this.logger.warn(`Device already registered by another socket, disconnecting (deviceId=${deviceId}, existingSocketId=${existingSocketId})`);
client.emit(GatewayEvents.REGISTERED, {
success: false,
deviceId,
@ -93,25 +83,19 @@ export class EventsGateway
this.deviceToSocket.set(deviceId, client.id);
this.socketToDevice.set(client.id, deviceInfo);
this.logger.info({ deviceId, deviceType }, "Device connected and registered");
this.logger.log(`Device connected and registered: deviceId=${deviceId}, deviceType=${deviceType}`);
client.emit(GatewayEvents.REGISTERED, { success: true, deviceId });
}
handleDisconnect(client: Socket): void {
const deviceInfo = this.socketToDevice.get(client.id);
if (deviceInfo) {
this.logger.debug(
{ socketId: client.id, deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType },
"Device disconnecting"
);
this.logger.info(
{ deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType },
"Device disconnected"
);
this.logger.debug(`Device disconnecting: socketId=${client.id}, deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`);
this.logger.log(`Device disconnected: deviceId=${deviceInfo.deviceId}, deviceType=${deviceInfo.deviceType}`);
this.deviceToSocket.delete(deviceInfo.deviceId);
this.socketToDevice.delete(client.id);
} else {
this.logger.info({ socketId: client.id }, "Socket disconnected");
this.logger.log(`Socket disconnected: socketId=${client.id}`);
}
}
@ -120,10 +104,7 @@ export class EventsGateway
@MessageBody() message: RoutedMessage,
@ConnectedSocket() client: Socket
): void {
this.logger.debug(
{ socketId: client.id, message },
"Received send event"
);
this.logger.debug(`Received send event: socketId=${client.id}, messageId=${message.id}`);
const senderDevice = this.socketToDevice.get(client.id);
@ -149,24 +130,27 @@ export class EventsGateway
return;
}
// Find target device
// Find target device — check socket-based first, then virtual
const targetSocketId = this.deviceToSocket.get(message.to);
if (!targetSocketId) {
const error: SendErrorResponse = {
messageId: message.id,
error: `Device ${message.to} not found`,
code: "DEVICE_NOT_FOUND",
};
client.emit(GatewayEvents.SEND_ERROR, error);
if (targetSocketId) {
this.logger.debug(`Routing message: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`);
this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message);
return;
}
// Forward message
this.logger.debug(
{ messageId: message.id, from: message.from, to: message.to, action: message.action },
"Routing message"
);
this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message);
const virtualHandler = this.virtualDevices.get(message.to);
if (virtualHandler) {
this.logger.debug(`Routing message to virtual device: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`);
virtualHandler.sendCallback(GatewayEvents.RECEIVE, message);
return;
}
const error: SendErrorResponse = {
messageId: message.id,
error: `Device ${message.to} not found`,
code: "DEVICE_NOT_FOUND",
};
client.emit(GatewayEvents.SEND_ERROR, error);
}
@SubscribeMessage(GatewayEvents.LIST_DEVICES)
@ -185,7 +169,7 @@ export class EventsGateway
@MessageBody() data: PingPayload,
@ConnectedSocket() client: Socket
): PongResponse {
this.logger.debug({ socketId: client.id, data }, "Received ping");
this.logger.debug(`Received ping: socketId=${client.id}`);
return { event: GatewayEvents.PONG, data: "Hello from Gateway!" };
}
@ -201,9 +185,63 @@ export class EventsGateway
/** Send message to specified device (for HTTP API use) */
sendToDevice(deviceId: string, event: string, data: unknown): boolean {
// Check virtual devices first
const virtualHandler = this.virtualDevices.get(deviceId);
if (virtualHandler) {
this.logger.debug(`Routing to virtual device: deviceId=${deviceId}, event=${event}`);
virtualHandler.sendCallback(event, data);
return true;
}
// Fall back to socket-based devices
const socketId = this.deviceToSocket.get(deviceId);
if (!socketId) return false;
this.server.to(socketId).emit(event, data);
return true;
}
/** Register a virtual device (non-socket based) */
registerVirtualDevice(deviceId: string, handler: VirtualDeviceHandler): void {
this.virtualDevices.set(deviceId, handler);
this.logger.log(`Virtual device registered: deviceId=${deviceId}`);
}
/** Unregister a virtual device */
unregisterVirtualDevice(deviceId: string): void {
this.virtualDevices.delete(deviceId);
this.logger.log(`Virtual device unregistered: deviceId=${deviceId}`);
}
/** Check if a device (socket or virtual) is registered */
isDeviceRegistered(deviceId: string): boolean {
return this.deviceToSocket.has(deviceId) || this.virtualDevices.has(deviceId);
}
/** Route a message originating from a virtual device to its target */
routeFromVirtualDevice(message: RoutedMessage): boolean {
// Validate sender is a registered virtual device
if (!this.virtualDevices.has(message.from)) {
this.logger.warn(`routeFromVirtualDevice: sender not a virtual device (from=${message.from})`);
return false;
}
// Try socket-based target first
const targetSocketId = this.deviceToSocket.get(message.to);
if (targetSocketId) {
this.logger.debug(`Virtual device routing: id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`);
this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message);
return true;
}
// Try virtual device target
const virtualHandler = this.virtualDevices.get(message.to);
if (virtualHandler) {
this.logger.debug(`Virtual device routing (v2v): id=${message.id}, from=${message.from}, to=${message.to}, action=${message.action}`);
virtualHandler.sendCallback(GatewayEvents.RECEIVE, message);
return true;
}
this.logger.warn(`routeFromVirtualDevice: target not found (to=${message.to})`);
return false;
}
}

View file

@ -1,6 +1,7 @@
import { Module } from "@nestjs/common";
import { Global, Module } from "@nestjs/common";
import { EventsGateway } from "./events.gateway.js";
@Global()
@Module({
providers: [EventsGateway],
exports: [EventsGateway],

View file

@ -3,16 +3,27 @@ import { NestFactory } from "@nestjs/core";
import { Logger } from "nestjs-pino";
import { AppModule } from "./app.module.js";
console.log("[Gateway] Starting bootstrap...");
async function bootstrap(): Promise<void> {
const app = await NestFactory.create(AppModule, { bufferLogs: true });
app.useLogger(app.get(Logger));
console.log("[Gateway] Creating NestFactory...");
try {
const app = await NestFactory.create(AppModule, { bufferLogs: true, abortOnError: false });
console.log("[Gateway] NestFactory created");
const port = process.env["PORT"] ?? 3000;
await app.listen(port);
app.useLogger(app.get(Logger));
const logger = app.get(Logger);
logger.log(`Gateway is running on http://localhost:${port}`);
logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`);
const port = process.env["PORT"] ?? 3000;
console.log(`[Gateway] Listening on port ${port}...`);
await app.listen(port);
const logger = app.get(Logger);
logger.log(`Gateway is running on http://localhost:${port}`);
logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`);
} catch (err) {
console.error("[Gateway] Error during bootstrap:", err);
throw err;
}
}
bootstrap().catch((err) => {

View file

@ -0,0 +1,17 @@
-- Telegram users table for Gateway
-- Run this manually before starting the Gateway with Telegram enabled.
DROP TABLE IF EXISTS telegram_users;
CREATE TABLE telegram_users (
telegram_user_id VARCHAR(64) PRIMARY KEY,
hub_id VARCHAR(64) NOT NULL,
agent_id VARCHAR(64) NOT NULL,
device_id VARCHAR(64) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
telegram_username VARCHAR(255),
telegram_first_name VARCHAR(255),
telegram_last_name VARCHAR(255),
INDEX idx_device_id (device_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View file

@ -0,0 +1,127 @@
/**
* Telegram user store - MySQL persistence layer.
*/
import { Inject, Injectable, Logger } from "@nestjs/common";
import { v7 as uuidv7 } from "uuid";
import type { RowDataPacket } from "mysql2/promise";
import { DatabaseService } from "../database/database.service.js";
import type { TelegramUser, TelegramUserCreate } from "./types.js";
interface TelegramUserRow extends RowDataPacket {
telegram_user_id: string;
hub_id: string;
agent_id: string;
device_id: string;
created_at: Date;
updated_at: Date;
telegram_username: string | null;
telegram_first_name: string | null;
telegram_last_name: string | null;
}
@Injectable()
export class TelegramUserStore {
private readonly logger = new Logger(TelegramUserStore.name);
constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {}
/** Find user by Telegram user ID */
async findByTelegramUserId(telegramUserId: string): Promise<TelegramUser | null> {
if (!this.db.isAvailable()) return null;
const rows = await this.db.query<TelegramUserRow[]>(
"SELECT * FROM telegram_users WHERE telegram_user_id = ?",
[telegramUserId]
);
if (rows.length === 0) return null;
return this.rowToUser(rows[0]!);
}
/** Find user by device ID */
async findByDeviceId(deviceId: string): Promise<TelegramUser | null> {
if (!this.db.isAvailable()) return null;
const rows = await this.db.query<TelegramUserRow[]>(
"SELECT * FROM telegram_users WHERE device_id = ?",
[deviceId]
);
if (rows.length === 0) return null;
return this.rowToUser(rows[0]!);
}
/** Create or update a Telegram user */
async upsert(data: TelegramUserCreate): Promise<TelegramUser> {
if (!this.db.isAvailable()) {
throw new Error("Database not available");
}
// Check if user exists
const existing = await this.findByTelegramUserId(data.telegramUserId);
if (existing) {
// Update existing user — also update device_id if provided
await this.db.execute(
`UPDATE telegram_users SET
hub_id = ?,
agent_id = ?,
device_id = ?,
telegram_username = ?,
telegram_first_name = ?,
telegram_last_name = ?
WHERE telegram_user_id = ?`,
[
data.hubId,
data.agentId,
data.deviceId ?? existing.deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
data.telegramUserId,
]
);
const updated = await this.findByTelegramUserId(data.telegramUserId);
return updated!;
}
// Create new user with provided or generated device ID
const deviceId = data.deviceId ?? `tg-${uuidv7()}`;
await this.db.execute(
`INSERT INTO telegram_users (
telegram_user_id, hub_id, agent_id, device_id,
telegram_username, telegram_first_name, telegram_last_name
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
[
data.telegramUserId,
data.hubId,
data.agentId,
deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
]
);
const created = await this.findByTelegramUserId(data.telegramUserId);
return created!;
}
/** Convert database row to TelegramUser object */
private rowToUser(row: TelegramUserRow): TelegramUser {
return {
telegramUserId: row.telegram_user_id,
hubId: row.hub_id,
agentId: row.agent_id,
deviceId: row.device_id,
createdAt: row.created_at,
updatedAt: row.updated_at,
telegramUsername: row.telegram_username ?? undefined,
telegramFirstName: row.telegram_first_name ?? undefined,
telegramLastName: row.telegram_last_name ?? undefined,
};
}
}

View file

@ -0,0 +1,67 @@
/**
* Telegram webhook controller.
*
* Receives webhook requests from Telegram Bot API.
*/
import { Controller, Inject, Logger, Post, Req, Res, Headers } from "@nestjs/common";
import { TelegramService } from "./telegram.service.js";
// Minimal Express types for webhook handling
interface ExpressRequest {
body: unknown;
header: (name: string) => string | undefined;
}
interface ExpressResponse {
status: (code: number) => ExpressResponse;
json: (data: unknown) => void;
headersSent: boolean;
}
@Controller("telegram")
export class TelegramController {
private readonly logger = new Logger(TelegramController.name);
constructor(@Inject(TelegramService) private readonly telegramService: TelegramService) {}
@Post("webhook")
async handleWebhook(
@Req() req: ExpressRequest,
@Res() res: ExpressResponse,
@Headers("x-telegram-bot-api-secret-token") secretToken?: string
): Promise<void> {
// Check if Telegram is configured
if (!this.telegramService.isConfigured()) {
this.logger.warn("Telegram webhook called but bot not configured");
res.status(503).json({ error: "Telegram not configured" });
return;
}
// Validate secret token if configured
const expectedToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"];
if (expectedToken && secretToken !== expectedToken) {
this.logger.warn("Invalid Telegram webhook secret token");
res.status(401).json({ error: "Unauthorized" });
return;
}
// Get grammY webhook callback
const callback = this.telegramService.getWebhookCallback();
if (!callback) {
res.status(503).json({ error: "Telegram not configured" });
return;
}
// Let grammY handle the request
try {
await callback(req, res);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Telegram webhook error: ${message}`);
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
}
}

View file

@ -0,0 +1,17 @@
/**
* Telegram module for Gateway.
*
* Provides Telegram webhook functionality.
*/
import { Module } from "@nestjs/common";
import { TelegramController } from "./telegram.controller.js";
import { TelegramService } from "./telegram.service.js";
import { TelegramUserStore } from "./telegram-user.store.js";
@Module({
controllers: [TelegramController],
providers: [TelegramService, TelegramUserStore],
exports: [TelegramService],
})
export class TelegramModule {}

View file

@ -0,0 +1,402 @@
/**
* Telegram service for Gateway.
*
* Handles Telegram bot interactions via webhook.
* - New users: prompts to paste a multica://connect link
* - Connection link: verifies with Hub via RPC, persists to DB
* - Bound users: routes messages to their Hub agent
*/
import { Inject, Injectable, Logger } from "@nestjs/common";
import type { OnModuleInit } from "@nestjs/common";
import { Bot, webhookCallback } from "grammy";
import type { Context } from "grammy";
import { v7 as uuidv7 } from "uuid";
import { parseConnectionCode } from "@multica/store/connection";
import type { ConnectionInfo } from "@multica/store/connection";
import {
GatewayEvents,
RequestAction,
ResponseAction,
StreamAction,
type RoutedMessage,
type RequestPayload,
type ResponsePayload,
type VerifyParams,
type VerifyResult,
type DeviceMeta,
} from "@multica/sdk";
import type { StreamPayload } from "@multica/sdk";
import { EventsGateway } from "../events.gateway.js";
import { TelegramUserStore } from "./telegram-user.store.js";
import type { TelegramUser } from "./types.js";
// Minimal Express types for webhook handling
interface ExpressRequest {
body: unknown;
header: (name: string) => string | undefined;
}
interface ExpressResponse {
status: (code: number) => ExpressResponse;
json: (data: unknown) => void;
headersSent: boolean;
}
interface PendingRequest<T = unknown> {
resolve: (value: T) => void;
reject: (reason: Error) => void;
timer: ReturnType<typeof setTimeout>;
}
const VERIFY_TIMEOUT_MS = 30_000;
@Injectable()
export class TelegramService implements OnModuleInit {
private bot: Bot | null = null;
private pendingRequests = new Map<string, PendingRequest>();
private readonly logger = new Logger(TelegramService.name);
constructor(
@Inject(EventsGateway) private readonly eventsGateway: EventsGateway,
@Inject(TelegramUserStore) private readonly userStore: TelegramUserStore,
) {}
async onModuleInit(): Promise<void> {
console.log("[TelegramService] onModuleInit starting...");
const token = process.env["TELEGRAM_BOT_TOKEN"];
if (!token) {
console.log("[TelegramService] No bot token");
this.logger.warn("TELEGRAM_BOT_TOKEN not set, Telegram webhook disabled");
return;
}
console.log("[TelegramService] Creating bot...");
this.bot = new Bot(token);
this.setupHandlers();
this.logger.log("Telegram bot initialized");
}
/** Get grammY webhook callback for Express/NestJS */
getWebhookCallback(): ((req: ExpressRequest, res: ExpressResponse) => Promise<void>) | null {
if (!this.bot) return null;
const secretToken = process.env["TELEGRAM_WEBHOOK_SECRET_TOKEN"];
if (secretToken) {
return webhookCallback(this.bot, "express", { secretToken }) as unknown as (
req: ExpressRequest,
res: ExpressResponse
) => Promise<void>;
}
return webhookCallback(this.bot, "express") as unknown as (
req: ExpressRequest,
res: ExpressResponse
) => Promise<void>;
}
/** Check if Telegram bot is configured */
isConfigured(): boolean {
return this.bot !== null;
}
/** Send message to a Telegram user by device ID */
async sendToTelegram(deviceId: string, text: string): Promise<void> {
if (!this.bot) return;
const user = await this.userStore.findByDeviceId(deviceId);
if (!user) {
this.logger.warn(`Telegram user not found for device: deviceId=${deviceId}`);
return;
}
try {
await this.bot.api.sendMessage(Number(user.telegramUserId), text);
this.logger.debug(`Sent message to Telegram: telegramUserId=${user.telegramUserId}`);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to send Telegram message: deviceId=${deviceId}, error=${message}`);
}
}
/** Setup bot message handlers */
private setupHandlers(): void {
if (!this.bot) return;
this.bot.on("message:text", async (ctx) => {
await this.handleTextMessage(ctx);
});
}
/** Handle incoming text message */
private async handleTextMessage(ctx: Context): Promise<void> {
const msg = ctx.message;
if (!msg || !msg.text) return;
const telegramUserId = String(msg.from?.id);
const text = msg.text.trim();
this.logger.debug(`Received Telegram message: telegramUserId=${telegramUserId}, text=${text.slice(0, 50)}`);
// Connection link — always handle, even for already-bound users (re-binding)
if (text.startsWith("multica://connect?")) {
await this.handleConnectionLink(ctx, telegramUserId, text);
return;
}
// Check if user is bound
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (user) {
await this.routeToHub(user, text, ctx);
return;
}
// New user without connection link
await ctx.reply(
"Welcome to Multica!\n\n" +
"To get started, open the Multica Desktop app, generate a Connection Link, " +
"and paste it here.\n\n" +
"The link looks like:\nmultica://connect?gateway=...&hub=...&agent=...&token=...&exp=..."
);
}
/** Handle a multica://connect? connection link */
private async handleConnectionLink(ctx: Context, telegramUserId: string, text: string): Promise<void> {
const msg = ctx.message;
// 1. Parse and validate the connection link
let connectionInfo: ConnectionInfo;
try {
connectionInfo = parseConnectionCode(text);
} catch (error) {
const message = error instanceof Error ? error.message : "Invalid connection link";
await ctx.reply(`Connection failed: ${message}\n\nPlease generate a new link and try again.`);
return;
}
// 2. Check Hub is online
if (!this.eventsGateway.isDeviceRegistered(connectionInfo.hubId)) {
await ctx.reply(
"Connection failed: Hub is not online.\n\n" +
"Make sure the Multica Desktop app is running and connected to the Gateway, then try again."
);
return;
}
// 3. Unregister old virtual device if user is re-binding
const existingUser = await this.userStore.findByTelegramUserId(telegramUserId);
if (existingUser && this.eventsGateway.isDeviceRegistered(existingUser.deviceId)) {
this.eventsGateway.unregisterVirtualDevice(existingUser.deviceId);
}
// 4. Generate device ID and register virtual device
const deviceId = `tg-${uuidv7()}`;
this.registerVirtualDeviceForUser(deviceId, telegramUserId);
// 5. Send verify RPC
try {
await ctx.reply("Connecting... Please approve the connection on your Desktop app.");
const result = await this.sendVerifyRpc(
deviceId,
connectionInfo.hubId,
connectionInfo.token,
{
platform: "telegram",
clientName: msg?.from?.username
? `Telegram @${msg.from.username}`
: `Telegram ${msg?.from?.first_name ?? telegramUserId}`,
}
);
// 6. Save to DB
await this.userStore.upsert({
telegramUserId,
hubId: connectionInfo.hubId,
agentId: connectionInfo.agentId,
deviceId,
telegramUsername: msg?.from?.username,
telegramFirstName: msg?.from?.first_name,
telegramLastName: msg?.from?.last_name,
});
await ctx.reply(
"Connected successfully!\n\n" +
`Hub: ${result.hubId}\n` +
`Agent: ${result.agentId}\n\n` +
"You can now send messages to interact with your agent."
);
this.logger.log(`Telegram user verified: telegramUserId=${telegramUserId}, hubId=${connectionInfo.hubId}, deviceId=${deviceId}`);
} catch (error) {
// Cleanup virtual device on failure
this.eventsGateway.unregisterVirtualDevice(deviceId);
// Reject all pending requests for this device
this.cleanupPendingRequests();
const message = error instanceof Error ? error.message : String(error);
if (message.includes("REJECTED")) {
await ctx.reply("Connection rejected.\n\nThe connection was declined on the Desktop app.");
} else if (message.includes("timed out")) {
await ctx.reply("Connection timed out.\n\nPlease try again and approve the connection on your Desktop app within 30 seconds.");
} else {
await ctx.reply(`Connection failed: ${message}\n\nPlease try again.`);
}
this.logger.warn(`Telegram verify failed: telegramUserId=${telegramUserId}, error=${message}`);
}
}
/** Send a verify RPC to Hub via the virtual device */
private sendVerifyRpc(
deviceId: string,
hubId: string,
token: string,
meta: DeviceMeta,
): Promise<VerifyResult> {
return new Promise<VerifyResult>((resolve, reject) => {
const requestId = uuidv7();
const timer = setTimeout(() => {
this.pendingRequests.delete(requestId);
reject(new Error("Verify request timed out"));
}, VERIFY_TIMEOUT_MS);
this.pendingRequests.set(requestId, {
resolve: resolve as (v: unknown) => void,
reject,
timer,
});
const payload: RequestPayload<VerifyParams> = {
requestId,
method: "verify",
params: { token, meta },
};
const message: RoutedMessage<RequestPayload<VerifyParams>> = {
id: uuidv7(),
uid: null,
from: deviceId,
to: hubId,
action: RequestAction,
payload,
};
const sent = this.eventsGateway.routeFromVirtualDevice(message);
if (!sent) {
this.pendingRequests.delete(requestId);
clearTimeout(timer);
reject(new Error("Failed to route verify request to Hub"));
}
});
}
/** Route a regular chat message to the user's Hub agent */
private async routeToHub(user: TelegramUser, text: string, ctx: Context): Promise<void> {
// Ensure Hub is online
if (!this.eventsGateway.isDeviceRegistered(user.hubId)) {
await ctx.reply(
"Your Hub is currently offline.\n\n" +
"Make sure the Multica Desktop app is running and connected to the Gateway."
);
return;
}
// Ensure virtual device is registered (may have been lost on gateway restart)
if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) {
this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId);
}
// Send message to Hub
const message: RoutedMessage = {
id: uuidv7(),
uid: null,
from: user.deviceId,
to: user.hubId,
action: "message",
payload: { agentId: user.agentId, content: text },
};
const sent = this.eventsGateway.routeFromVirtualDevice(message);
if (!sent) {
await ctx.reply("Failed to send message. Please try again.");
return;
}
this.logger.debug(`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`);
}
/** Register a virtual device with a sendCallback that handles RPC responses, stream events, and messages */
private registerVirtualDeviceForUser(deviceId: string, telegramUserId: string): void {
this.eventsGateway.registerVirtualDevice(deviceId, {
sendCallback: (_event: string, data: unknown) => {
const msg = data as RoutedMessage;
if (!msg || !msg.action) return;
// RPC response — resolve/reject pending request
if (msg.action === ResponseAction) {
const response = msg.payload as ResponsePayload;
const pending = this.pendingRequests.get(response.requestId);
if (pending) {
this.pendingRequests.delete(response.requestId);
clearTimeout(pending.timer);
if (response.ok) {
pending.resolve(response.payload);
} else {
pending.reject(new Error(`RPC error [${response.error.code}]: ${response.error.message}`));
}
}
return;
}
// Stream event — extract text content for Telegram
if (msg.action === StreamAction) {
const streamPayload = msg.payload as StreamPayload;
const event = streamPayload?.event;
if (event && "type" in event && event.type === "message_end") {
// Extract final text from the message
const agentMsg = (event as { message?: { content?: Array<{ type: string; text?: string }> } }).message;
if (agentMsg?.content) {
const textContent = agentMsg.content
.filter((c) => c.type === "text" && c.text)
.map((c) => c.text!)
.join("");
if (textContent) {
this.sendToTelegram(deviceId, textContent);
}
}
}
return;
}
// Regular message (e.g., "message" action from Hub)
if (msg.action === "message") {
const payload = msg.payload as { content?: string; agentId?: string };
if (payload?.content) {
this.sendToTelegram(deviceId, payload.content);
}
return;
}
// Error messages
if (msg.action === "error") {
const payload = msg.payload as { message?: string; code?: string };
if (payload?.message) {
this.sendToTelegram(deviceId, `Error: ${payload.message}`);
}
}
},
});
}
/** Cleanup all pending requests (used on verify failure) */
private cleanupPendingRequests(): void {
for (const [id, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error("Cleaned up"));
this.pendingRequests.delete(id);
}
}
}

View file

@ -0,0 +1,30 @@
/**
* Telegram user types for Gateway.
*/
/** Telegram user record stored in database */
export interface TelegramUser {
telegramUserId: string;
hubId: string;
agentId: string;
deviceId: string;
createdAt: Date;
updatedAt: Date;
telegramUsername?: string | undefined;
telegramFirstName?: string | undefined;
telegramLastName?: string | undefined;
}
/** Data required to create/update a Telegram user */
export interface TelegramUserCreate {
telegramUserId: string;
hubId: string;
agentId: string;
deviceId?: string;
telegramUsername?: string | undefined;
telegramFirstName?: string | undefined;
telegramLastName?: string | undefined;
}
// Re-export VirtualDeviceHandler from parent for convenience
export type { VirtualDeviceHandler } from "../types.js";

29
src/gateway/test-app.ts Normal file
View file

@ -0,0 +1,29 @@
import "reflect-metadata";
import { NestFactory } from "@nestjs/core";
import { Module } from "@nestjs/common";
import { LoggerModule } from "nestjs-pino";
import { DatabaseModule } from "./database/database.module.js";
import { GatewayModule } from "./gateway.module.js";
import { TelegramModule } from "./telegram/telegram.module.js";
@Module({
imports: [
LoggerModule.forRoot({ pinoHttp: { level: "debug" } }),
DatabaseModule,
GatewayModule,
TelegramModule,
],
})
class TestAppModule {}
console.log("Creating test app...");
NestFactory.create(TestAppModule, { abortOnError: true })
.then(async (app) => {
console.log("Test app created!");
await app.listen(3000);
console.log("Listening on 3000!");
})
.catch((err) => {
console.error("Error:", err);
process.exit(1);
});

8
src/gateway/types.ts Normal file
View file

@ -0,0 +1,8 @@
/**
* Gateway types.
*/
/** Virtual device handler for non-socket devices (e.g., Telegram) */
export interface VirtualDeviceHandler {
sendCallback: (event: string, data: unknown) => void | Promise<void>;
}

View file

@ -14,6 +14,7 @@ export interface DeviceMeta {
userAgent?: string;
platform?: string;
language?: string;
clientName?: string;
}
export interface DeviceEntry {