From 708116afb1e070eda013bf224651c13bfa2ed3aa Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 17 Feb 2026 02:30:06 +0800 Subject: [PATCH] feat(gateway): support telegram active conversation sessions --- apps/gateway/migrations/telegram-users.sql | 1 + apps/gateway/public/index.html | 2 +- apps/gateway/telegram/telegram-user.store.ts | 124 ++++++++--- apps/gateway/telegram/telegram.controller.ts | 3 +- apps/gateway/telegram/telegram.service.ts | 220 ++++++++++++++++++- apps/gateway/telegram/types.ts | 3 + docs/development.md | 18 ++ 7 files changed, 330 insertions(+), 41 deletions(-) diff --git a/apps/gateway/migrations/telegram-users.sql b/apps/gateway/migrations/telegram-users.sql index 19f1a720..376f86ce 100644 --- a/apps/gateway/migrations/telegram-users.sql +++ b/apps/gateway/migrations/telegram-users.sql @@ -7,6 +7,7 @@ CREATE TABLE telegram_users ( telegram_user_id VARCHAR(64) PRIMARY KEY, hub_id VARCHAR(64) NOT NULL, agent_id VARCHAR(64) NOT NULL, + conversation_id VARCHAR(64) NULL, device_id VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, diff --git a/apps/gateway/public/index.html b/apps/gateway/public/index.html index 421ba06d..49333633 100644 --- a/apps/gateway/public/index.html +++ b/apps/gateway/public/index.html @@ -726,7 +726,7 @@ from: deviceId, to: targetDeviceId, action: 'message', - payload: { agentId: targetAgentId, content: text }, + payload: { agentId: targetAgentId, conversationId: targetAgentId, content: text }, }); appendMsg('self', text); diff --git a/apps/gateway/telegram/telegram-user.store.ts b/apps/gateway/telegram/telegram-user.store.ts index 3d0a1f13..075e0b91 100644 --- a/apps/gateway/telegram/telegram-user.store.ts +++ b/apps/gateway/telegram/telegram-user.store.ts @@ -18,6 +18,7 @@ interface TelegramUserRow extends RowDataPacket { telegram_user_id: string; hub_id: string; agent_id: string; + conversation_id?: string | null; device_id: string; created_at: Date; updated_at: Date; @@ -38,6 +39,12 @@ export class TelegramUserStore { constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {} + private hasMissingConversationColumnError(err: unknown): boolean { + if (!err || typeof err !== "object") return false; + const maybe = err as { code?: string; message?: string }; + return maybe.code === "ER_BAD_FIELD_ERROR" && (maybe.message ?? "").includes("conversation_id"); + } + /** Find user by Telegram user ID */ async findByTelegramUserId(telegramUserId: string): Promise { if (!this.db.isAvailable()) { @@ -84,25 +91,51 @@ export class TelegramUserStore { if (existing) { // Update existing user — also update device_id if provided - await this.db.execute( - `UPDATE telegram_users SET - hub_id = ?, - agent_id = ?, - device_id = ?, - telegram_username = ?, - telegram_first_name = ?, - telegram_last_name = ? - WHERE telegram_user_id = ?`, - [ - data.hubId, - data.agentId, - data.deviceId ?? existing.deviceId, - data.telegramUsername ?? null, - data.telegramFirstName ?? null, - data.telegramLastName ?? null, - data.telegramUserId, - ] - ); + const nextConversationId = data.conversationId ?? existing.conversationId ?? existing.agentId; + try { + await this.db.execute( + `UPDATE telegram_users SET + hub_id = ?, + agent_id = ?, + conversation_id = ?, + device_id = ?, + telegram_username = ?, + telegram_first_name = ?, + telegram_last_name = ? + WHERE telegram_user_id = ?`, + [ + data.hubId, + data.agentId, + nextConversationId, + data.deviceId ?? existing.deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + data.telegramUserId, + ] + ); + } catch (err) { + if (!this.hasMissingConversationColumnError(err)) throw err; + await this.db.execute( + `UPDATE telegram_users SET + hub_id = ?, + agent_id = ?, + device_id = ?, + telegram_username = ?, + telegram_first_name = ?, + telegram_last_name = ? + WHERE telegram_user_id = ?`, + [ + data.hubId, + data.agentId, + data.deviceId ?? existing.deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + data.telegramUserId, + ] + ); + } const updated = await this.findByTelegramUserId(data.telegramUserId); return updated!; @@ -110,22 +143,43 @@ export class TelegramUserStore { // Create new user with provided or generated device ID const deviceId = data.deviceId ?? `tg-${generateEncryptedId()}`; + const conversationId = data.conversationId ?? data.agentId; - await this.db.execute( - `INSERT INTO telegram_users ( - telegram_user_id, hub_id, agent_id, device_id, - telegram_username, telegram_first_name, telegram_last_name - ) VALUES (?, ?, ?, ?, ?, ?, ?)`, - [ - data.telegramUserId, - data.hubId, - data.agentId, - deviceId, - data.telegramUsername ?? null, - data.telegramFirstName ?? null, - data.telegramLastName ?? null, - ] - ); + try { + await this.db.execute( + `INSERT INTO telegram_users ( + telegram_user_id, hub_id, agent_id, conversation_id, device_id, + telegram_username, telegram_first_name, telegram_last_name + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + [ + data.telegramUserId, + data.hubId, + data.agentId, + conversationId, + deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + ] + ); + } catch (err) { + if (!this.hasMissingConversationColumnError(err)) throw err; + await this.db.execute( + `INSERT INTO telegram_users ( + telegram_user_id, hub_id, agent_id, device_id, + telegram_username, telegram_first_name, telegram_last_name + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + [ + data.telegramUserId, + data.hubId, + data.agentId, + deviceId, + data.telegramUsername ?? null, + data.telegramFirstName ?? null, + data.telegramLastName ?? null, + ] + ); + } const created = await this.findByTelegramUserId(data.telegramUserId); return created!; @@ -178,6 +232,7 @@ export class TelegramUserStore { telegramUserId: data.telegramUserId, hubId: data.hubId, agentId: data.agentId, + conversationId: data.conversationId ?? existing?.conversationId ?? data.agentId, deviceId: data.deviceId ?? existing?.deviceId ?? `tg-${generateEncryptedId()}`, createdAt: existing?.createdAt ?? now, updatedAt: now, @@ -198,6 +253,7 @@ export class TelegramUserStore { telegramUserId: row.telegram_user_id, hubId: row.hub_id, agentId: row.agent_id, + conversationId: row.conversation_id ?? undefined, deviceId: row.device_id, createdAt: row.created_at, updatedAt: row.updated_at, diff --git a/apps/gateway/telegram/telegram.controller.ts b/apps/gateway/telegram/telegram.controller.ts index 22bdc6b4..e60c8f4f 100644 --- a/apps/gateway/telegram/telegram.controller.ts +++ b/apps/gateway/telegram/telegram.controller.ts @@ -29,7 +29,7 @@ export class TelegramController { @Post("connect-code") async createConnectCode( - @Body() body: { gateway: string; hubId: string; agentId: string; token: string; expires: number }, + @Body() body: { gateway: string; hubId: string; agentId: string; conversationId?: string; token: string; expires: number }, ): Promise<{ code: string; botUsername: string }> { if (!this.telegramService.isConfigured()) { throw new HttpException("Telegram bot not configured", HttpStatus.SERVICE_UNAVAILABLE); @@ -45,6 +45,7 @@ export class TelegramController { gateway: body.gateway, hubId: body.hubId, agentId: body.agentId, + ...(body.conversationId ? { conversationId: body.conversationId } : {}), token: body.token, expires: body.expires, }; diff --git a/apps/gateway/telegram/telegram.service.ts b/apps/gateway/telegram/telegram.service.ts index 3fa3cc8e..594762d0 100644 --- a/apps/gateway/telegram/telegram.service.ts +++ b/apps/gateway/telegram/telegram.service.ts @@ -92,6 +92,14 @@ interface GenerateChannelWelcomeResult { text: string; } +interface ListAgentsResult { + agents: Array<{ id: string; closed: boolean }>; +} + +interface CreateAgentResult { + id: string; +} + // ── Constants ── const VERIFY_TIMEOUT_MS = 30_000; @@ -355,6 +363,21 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { await ctx.reply(text, { parse_mode: "HTML", reply_markup: keyboard }); }); + this.bot.command("new", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + await this.handleNewConversationCommand(ctx); + }); + + this.bot.command("session", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + await this.handleSessionCommand(ctx, ctx.match?.trim() ?? ""); + }); + + this.bot.command("sessions", async (ctx) => { + if (!this.isPrivateChat(ctx)) return; + await this.handleSessionsCommand(ctx); + }); + // Inline button callback queries this.bot.callbackQuery(CB_HOW_TO_CONNECT, async (ctx) => { await ctx.answerCallbackQuery(); @@ -529,7 +552,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { const text = `Welcome back!\n\n` + `${statusEmoji} Status: ${statusText}\n` + - `Agent: ${user.agentId}\n\n` + + `Agent: ${user.agentId}\n` + + `Session: ${user.conversationId ?? user.agentId}\n\n` + (online ? `Your agent is ready. Just send a message to start chatting.` : `Your Hub is offline. Make sure the Multica Desktop app is running.`); @@ -563,7 +587,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { `Connection Status\n\n` + `${statusEmoji} ${statusLabel}\n\n` + `Hub: ${user.hubId}\n` + - `Agent: ${user.agentId}\n\n` + + `Agent: ${user.agentId}\n` + + `Session: ${user.conversationId ?? user.agentId}\n\n` + (online ? `Your Hub is online and ready to receive messages.` : `Your Hub is offline. Make sure the Multica Desktop app is running.`); @@ -585,6 +610,9 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { `Commands\n` + ` /start \u2014 Connect your account or see welcome\n` + ` /status \u2014 Check connection status\n` + + ` /new \u2014 Start a new isolated session\n` + + ` /session [id] \u2014 Show or switch current session\n` + + ` /sessions \u2014 List available sessions\n` + ` /help \u2014 Show this message\n\n` + `How to connect\n` + ` 1. Open Multica Desktop app\n` + @@ -610,6 +638,9 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { await this.bot.api.setMyCommands([ { command: "start", description: "Connect or show welcome" }, { command: "status", description: "Check connection status" }, + { command: "new", description: "Create a new session" }, + { command: "session", description: "Show/switch current session" }, + { command: "sessions", description: "List sessions" }, { command: "help", description: "Show help and instructions" }, ]); @@ -677,6 +708,181 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { await ctx.reply(welcome.text, { parse_mode: "HTML", reply_markup: welcome.keyboard }); } + private async handleNewConversationCommand(ctx: Context): Promise { + const telegramUserId = String(ctx.from?.id); + const user = await this.userStore.findByTelegramUserId(telegramUserId); + if (!user) { + await ctx.reply("You are not connected yet. Use /start and connect from Desktop first."); + return; + } + + if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { + await ctx.reply( + "Your Hub is currently offline.\n\n" + + "Make sure the Multica Desktop app is running and connected to the Gateway.", + ); + return; + } + + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId); + } + + try { + const created = await this.sendRpc, CreateAgentResult>( + user.deviceId, + user.hubId, + "createAgent", + {}, + VERIFY_TIMEOUT_MS, + "Create session request timed out", + ); + + await this.userStore.upsert({ + telegramUserId: user.telegramUserId, + hubId: user.hubId, + agentId: user.agentId, + conversationId: created.id, + deviceId: user.deviceId, + telegramUsername: user.telegramUsername, + telegramFirstName: user.telegramFirstName, + telegramLastName: user.telegramLastName, + }); + + await ctx.reply( + `\u2705 New session created\n\n` + + `Session: ${created.id}\n\n` + + `All next messages in this Telegram chat will use this session.`, + { parse_mode: "HTML" }, + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await ctx.reply(`Failed to create session: ${message}`); + } + } + + private async handleSessionsCommand(ctx: Context): Promise { + const telegramUserId = String(ctx.from?.id); + const user = await this.userStore.findByTelegramUserId(telegramUserId); + if (!user) { + await ctx.reply("You are not connected yet. Use /start and connect from Desktop first."); + return; + } + + if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { + await ctx.reply("Your Hub is offline. Open Desktop and reconnect, then try again."); + return; + } + + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId); + } + + try { + const result = await this.sendRpc, ListAgentsResult>( + user.deviceId, + user.hubId, + "listAgents", + {}, + VERIFY_TIMEOUT_MS, + "List sessions request timed out", + ); + + const sessions = result.agents.filter((item) => !item.closed).map((item) => item.id); + if (sessions.length === 0) { + await ctx.reply("No sessions found."); + return; + } + + const current = user.conversationId ?? user.agentId; + const lines = sessions.slice(0, 20).map((id) => { + const marker = id === current ? "\u2022 current" : ""; + return `${id}${marker ? ` ${marker}` : ""}`; + }); + const extra = sessions.length > 20 ? `\n...and ${sessions.length - 20} more` : ""; + + await ctx.reply( + `Available sessions\n\n` + + `${lines.join("\n")}${extra}\n\n` + + `Use /session <id> to switch.`, + { parse_mode: "HTML" }, + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await ctx.reply(`Failed to load sessions: ${message}`); + } + } + + private async handleSessionCommand(ctx: Context, input: string): Promise { + const telegramUserId = String(ctx.from?.id); + const user = await this.userStore.findByTelegramUserId(telegramUserId); + if (!user) { + await ctx.reply("You are not connected yet. Use /start and connect from Desktop first."); + return; + } + + const target = input.trim(); + const current = user.conversationId ?? user.agentId; + if (!target) { + await ctx.reply( + `Current session\n\n` + + `${current}\n\n` + + `Use /session <id> to switch.`, + { parse_mode: "HTML" }, + ); + return; + } + + if (!this.eventsGateway.isDeviceRegistered(user.hubId)) { + await ctx.reply("Your Hub is offline. Open Desktop and reconnect, then try again."); + return; + } + + if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) { + this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId); + } + + try { + const result = await this.sendRpc, ListAgentsResult>( + user.deviceId, + user.hubId, + "listAgents", + {}, + VERIFY_TIMEOUT_MS, + "List sessions request timed out", + ); + + const exists = result.agents.some((item) => item.id === target && !item.closed); + if (!exists) { + await ctx.reply( + `Session not found: ${target}\n\nUse /sessions to list available sessions.`, + { parse_mode: "HTML" }, + ); + return; + } + + await this.userStore.upsert({ + telegramUserId: user.telegramUserId, + hubId: user.hubId, + agentId: user.agentId, + conversationId: target, + deviceId: user.deviceId, + telegramUsername: user.telegramUsername, + telegramFirstName: user.telegramFirstName, + telegramLastName: user.telegramLastName, + }); + + await ctx.reply( + `\u2705 Session switched\n\n` + + `Current session: ${target}`, + { parse_mode: "HTML" }, + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await ctx.reply(`Failed to switch session: ${message}`); + } + } + // ── Inbound: media messages ── private async handleMediaMessage(ctx: Context, media: MediaAttachment): Promise { @@ -1073,12 +1279,14 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { ? `Telegram @${msg.from.username}` : `Telegram ${msg?.from?.first_name ?? telegramUserId}`, }); + const mainConversationId = connectionInfo.conversationId ?? result.mainConversationId ?? result.agentId; // 5. Save to DB await this.userStore.upsert({ telegramUserId, hubId: connectionInfo.hubId, agentId: connectionInfo.agentId, + conversationId: mainConversationId, deviceId, telegramUsername: msg?.from?.username, telegramFirstName: msg?.from?.first_name, @@ -1092,7 +1300,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { await ctx.reply( `\u2705 Connected successfully!\n\n` + `Hub: ${result.hubId}\n` + - `Agent: ${result.agentId}\n\n` + + `Agent: ${result.agentId}\n` + + `Session: ${mainConversationId}\n\n` + `You can now send messages to interact with your agent.`, { parse_mode: "HTML", reply_markup: successKeyboard }, ); @@ -1298,13 +1507,14 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { } // Send message to Hub + const conversationId = user.conversationId ?? user.agentId; const message: RoutedMessage = { id: uuidv7(), uid: null, from: user.deviceId, to: user.hubId, action: "message", - payload: { agentId: user.agentId, content: text }, + payload: { agentId: user.agentId, conversationId, content: text }, }; const sent = this.eventsGateway.routeFromVirtualDevice(message); @@ -1314,7 +1524,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { } this.logger.debug( - `Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`, + `Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}, conversationId=${conversationId}`, ); } diff --git a/apps/gateway/telegram/types.ts b/apps/gateway/telegram/types.ts index c4bd87fc..734fad78 100644 --- a/apps/gateway/telegram/types.ts +++ b/apps/gateway/telegram/types.ts @@ -7,6 +7,8 @@ export interface TelegramUser { telegramUserId: string; hubId: string; agentId: string; + /** Current active conversation for this Telegram thread. */ + conversationId?: string | undefined; deviceId: string; createdAt: Date; updatedAt: Date; @@ -20,6 +22,7 @@ export interface TelegramUserCreate { telegramUserId: string; hubId: string; agentId: string; + conversationId?: string | undefined; deviceId?: string; telegramUsername?: string | undefined; telegramFirstName?: string | undefined; diff --git a/docs/development.md b/docs/development.md index 896f8d83..86c01b26 100644 --- a/docs/development.md +++ b/docs/development.md @@ -52,6 +52,24 @@ pnpm dev:local:archive - `MULTICA_WORKSPACE_DIR`: override workspace root - `MULTICA_RUN_LOG=1`: enable structured run-log output +## Agent / Conversation Semantics + +- `agentId`: logical owner identity (capabilities/profile scope). +- `conversationId`: isolated runtime thread under an agent. +- `sessionId`: runtime/storage id for a conversation (currently same as `conversationId`). + +Compatibility behavior: + +- If only `agentId` is provided, the runtime resolves `conversationId = agentId`. +- New integrations should pass `conversationId` explicitly. + +Telegram behavior: + +- One Telegram DM binds to one active `conversationId`. +- `/new` creates and switches to a new conversation. +- `/session ` switches the active conversation. +- `/sessions` lists available conversations. + ## Local Full-Stack Notes (`pnpm dev:local`) `pnpm dev:local` is the recommended way to run the full local stack for integration work.