feat(gateway): support telegram active conversation sessions

This commit is contained in:
Jiayuan Zhang 2026-02-17 02:30:06 +08:00
parent 16753d719b
commit 708116afb1
7 changed files with 330 additions and 41 deletions

View file

@ -7,6 +7,7 @@ CREATE TABLE telegram_users (
telegram_user_id VARCHAR(64) PRIMARY KEY,
hub_id VARCHAR(64) NOT NULL,
agent_id VARCHAR(64) NOT NULL,
conversation_id VARCHAR(64) NULL,
device_id VARCHAR(64) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

View file

@ -726,7 +726,7 @@
from: deviceId,
to: targetDeviceId,
action: 'message',
payload: { agentId: targetAgentId, content: text },
payload: { agentId: targetAgentId, conversationId: targetAgentId, content: text },
});
appendMsg('self', text);

View file

@ -18,6 +18,7 @@ interface TelegramUserRow extends RowDataPacket {
telegram_user_id: string;
hub_id: string;
agent_id: string;
conversation_id?: string | null;
device_id: string;
created_at: Date;
updated_at: Date;
@ -38,6 +39,12 @@ export class TelegramUserStore {
constructor(@Inject(DatabaseService) private readonly db: DatabaseService) {}
private hasMissingConversationColumnError(err: unknown): boolean {
if (!err || typeof err !== "object") return false;
const maybe = err as { code?: string; message?: string };
return maybe.code === "ER_BAD_FIELD_ERROR" && (maybe.message ?? "").includes("conversation_id");
}
/** Find user by Telegram user ID */
async findByTelegramUserId(telegramUserId: string): Promise<TelegramUser | null> {
if (!this.db.isAvailable()) {
@ -84,25 +91,51 @@ export class TelegramUserStore {
if (existing) {
// Update existing user — also update device_id if provided
await this.db.execute(
`UPDATE telegram_users SET
hub_id = ?,
agent_id = ?,
device_id = ?,
telegram_username = ?,
telegram_first_name = ?,
telegram_last_name = ?
WHERE telegram_user_id = ?`,
[
data.hubId,
data.agentId,
data.deviceId ?? existing.deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
data.telegramUserId,
]
);
const nextConversationId = data.conversationId ?? existing.conversationId ?? existing.agentId;
try {
await this.db.execute(
`UPDATE telegram_users SET
hub_id = ?,
agent_id = ?,
conversation_id = ?,
device_id = ?,
telegram_username = ?,
telegram_first_name = ?,
telegram_last_name = ?
WHERE telegram_user_id = ?`,
[
data.hubId,
data.agentId,
nextConversationId,
data.deviceId ?? existing.deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
data.telegramUserId,
]
);
} catch (err) {
if (!this.hasMissingConversationColumnError(err)) throw err;
await this.db.execute(
`UPDATE telegram_users SET
hub_id = ?,
agent_id = ?,
device_id = ?,
telegram_username = ?,
telegram_first_name = ?,
telegram_last_name = ?
WHERE telegram_user_id = ?`,
[
data.hubId,
data.agentId,
data.deviceId ?? existing.deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
data.telegramUserId,
]
);
}
const updated = await this.findByTelegramUserId(data.telegramUserId);
return updated!;
@ -110,22 +143,43 @@ export class TelegramUserStore {
// Create new user with provided or generated device ID
const deviceId = data.deviceId ?? `tg-${generateEncryptedId()}`;
const conversationId = data.conversationId ?? data.agentId;
await this.db.execute(
`INSERT INTO telegram_users (
telegram_user_id, hub_id, agent_id, device_id,
telegram_username, telegram_first_name, telegram_last_name
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
[
data.telegramUserId,
data.hubId,
data.agentId,
deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
]
);
try {
await this.db.execute(
`INSERT INTO telegram_users (
telegram_user_id, hub_id, agent_id, conversation_id, device_id,
telegram_username, telegram_first_name, telegram_last_name
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
[
data.telegramUserId,
data.hubId,
data.agentId,
conversationId,
deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
]
);
} catch (err) {
if (!this.hasMissingConversationColumnError(err)) throw err;
await this.db.execute(
`INSERT INTO telegram_users (
telegram_user_id, hub_id, agent_id, device_id,
telegram_username, telegram_first_name, telegram_last_name
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
[
data.telegramUserId,
data.hubId,
data.agentId,
deviceId,
data.telegramUsername ?? null,
data.telegramFirstName ?? null,
data.telegramLastName ?? null,
]
);
}
const created = await this.findByTelegramUserId(data.telegramUserId);
return created!;
@ -178,6 +232,7 @@ export class TelegramUserStore {
telegramUserId: data.telegramUserId,
hubId: data.hubId,
agentId: data.agentId,
conversationId: data.conversationId ?? existing?.conversationId ?? data.agentId,
deviceId: data.deviceId ?? existing?.deviceId ?? `tg-${generateEncryptedId()}`,
createdAt: existing?.createdAt ?? now,
updatedAt: now,
@ -198,6 +253,7 @@ export class TelegramUserStore {
telegramUserId: row.telegram_user_id,
hubId: row.hub_id,
agentId: row.agent_id,
conversationId: row.conversation_id ?? undefined,
deviceId: row.device_id,
createdAt: row.created_at,
updatedAt: row.updated_at,

View file

@ -29,7 +29,7 @@ export class TelegramController {
@Post("connect-code")
async createConnectCode(
@Body() body: { gateway: string; hubId: string; agentId: string; token: string; expires: number },
@Body() body: { gateway: string; hubId: string; agentId: string; conversationId?: string; token: string; expires: number },
): Promise<{ code: string; botUsername: string }> {
if (!this.telegramService.isConfigured()) {
throw new HttpException("Telegram bot not configured", HttpStatus.SERVICE_UNAVAILABLE);
@ -45,6 +45,7 @@ export class TelegramController {
gateway: body.gateway,
hubId: body.hubId,
agentId: body.agentId,
...(body.conversationId ? { conversationId: body.conversationId } : {}),
token: body.token,
expires: body.expires,
};

View file

@ -92,6 +92,14 @@ interface GenerateChannelWelcomeResult {
text: string;
}
interface ListAgentsResult {
agents: Array<{ id: string; closed: boolean }>;
}
interface CreateAgentResult {
id: string;
}
// ── Constants ──
const VERIFY_TIMEOUT_MS = 30_000;
@ -355,6 +363,21 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
await ctx.reply(text, { parse_mode: "HTML", reply_markup: keyboard });
});
this.bot.command("new", async (ctx) => {
if (!this.isPrivateChat(ctx)) return;
await this.handleNewConversationCommand(ctx);
});
this.bot.command("session", async (ctx) => {
if (!this.isPrivateChat(ctx)) return;
await this.handleSessionCommand(ctx, ctx.match?.trim() ?? "");
});
this.bot.command("sessions", async (ctx) => {
if (!this.isPrivateChat(ctx)) return;
await this.handleSessionsCommand(ctx);
});
// Inline button callback queries
this.bot.callbackQuery(CB_HOW_TO_CONNECT, async (ctx) => {
await ctx.answerCallbackQuery();
@ -529,7 +552,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
const text =
`<b>Welcome back!</b>\n\n` +
`${statusEmoji} Status: <b>${statusText}</b>\n` +
`Agent: <code>${user.agentId}</code>\n\n` +
`Agent: <code>${user.agentId}</code>\n` +
`Session: <code>${user.conversationId ?? user.agentId}</code>\n\n` +
(online
? `Your agent is ready. Just send a message to start chatting.`
: `Your Hub is offline. Make sure the Multica Desktop app is running.`);
@ -563,7 +587,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
`<b>Connection Status</b>\n\n` +
`${statusEmoji} <b>${statusLabel}</b>\n\n` +
`Hub: <code>${user.hubId}</code>\n` +
`Agent: <code>${user.agentId}</code>\n\n` +
`Agent: <code>${user.agentId}</code>\n` +
`Session: <code>${user.conversationId ?? user.agentId}</code>\n\n` +
(online
? `Your Hub is online and ready to receive messages.`
: `Your Hub is offline. Make sure the Multica Desktop app is running.`);
@ -585,6 +610,9 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
`<b>Commands</b>\n` +
` /start \u2014 Connect your account or see welcome\n` +
` /status \u2014 Check connection status\n` +
` /new \u2014 Start a new isolated session\n` +
` /session [id] \u2014 Show or switch current session\n` +
` /sessions \u2014 List available sessions\n` +
` /help \u2014 Show this message\n\n` +
`<b>How to connect</b>\n` +
` <b>1.</b> Open Multica Desktop app\n` +
@ -610,6 +638,9 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
await this.bot.api.setMyCommands([
{ command: "start", description: "Connect or show welcome" },
{ command: "status", description: "Check connection status" },
{ command: "new", description: "Create a new session" },
{ command: "session", description: "Show/switch current session" },
{ command: "sessions", description: "List sessions" },
{ command: "help", description: "Show help and instructions" },
]);
@ -677,6 +708,181 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
await ctx.reply(welcome.text, { parse_mode: "HTML", reply_markup: welcome.keyboard });
}
private async handleNewConversationCommand(ctx: Context): Promise<void> {
const telegramUserId = String(ctx.from?.id);
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (!user) {
await ctx.reply("You are not connected yet. Use /start and connect from Desktop first.");
return;
}
if (!this.eventsGateway.isDeviceRegistered(user.hubId)) {
await ctx.reply(
"Your Hub is currently offline.\n\n" +
"Make sure the Multica Desktop app is running and connected to the Gateway.",
);
return;
}
if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) {
this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId);
}
try {
const created = await this.sendRpc<Record<string, never>, CreateAgentResult>(
user.deviceId,
user.hubId,
"createAgent",
{},
VERIFY_TIMEOUT_MS,
"Create session request timed out",
);
await this.userStore.upsert({
telegramUserId: user.telegramUserId,
hubId: user.hubId,
agentId: user.agentId,
conversationId: created.id,
deviceId: user.deviceId,
telegramUsername: user.telegramUsername,
telegramFirstName: user.telegramFirstName,
telegramLastName: user.telegramLastName,
});
await ctx.reply(
`<b>\u2705 New session created</b>\n\n` +
`Session: <code>${created.id}</code>\n\n` +
`All next messages in this Telegram chat will use this session.`,
{ parse_mode: "HTML" },
);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await ctx.reply(`Failed to create session: ${message}`);
}
}
private async handleSessionsCommand(ctx: Context): Promise<void> {
const telegramUserId = String(ctx.from?.id);
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (!user) {
await ctx.reply("You are not connected yet. Use /start and connect from Desktop first.");
return;
}
if (!this.eventsGateway.isDeviceRegistered(user.hubId)) {
await ctx.reply("Your Hub is offline. Open Desktop and reconnect, then try again.");
return;
}
if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) {
this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId);
}
try {
const result = await this.sendRpc<Record<string, never>, ListAgentsResult>(
user.deviceId,
user.hubId,
"listAgents",
{},
VERIFY_TIMEOUT_MS,
"List sessions request timed out",
);
const sessions = result.agents.filter((item) => !item.closed).map((item) => item.id);
if (sessions.length === 0) {
await ctx.reply("No sessions found.");
return;
}
const current = user.conversationId ?? user.agentId;
const lines = sessions.slice(0, 20).map((id) => {
const marker = id === current ? "\u2022 current" : "";
return `<code>${id}</code>${marker ? ` ${marker}` : ""}`;
});
const extra = sessions.length > 20 ? `\n...and ${sessions.length - 20} more` : "";
await ctx.reply(
`<b>Available sessions</b>\n\n` +
`${lines.join("\n")}${extra}\n\n` +
`Use <code>/session &lt;id&gt;</code> to switch.`,
{ parse_mode: "HTML" },
);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await ctx.reply(`Failed to load sessions: ${message}`);
}
}
private async handleSessionCommand(ctx: Context, input: string): Promise<void> {
const telegramUserId = String(ctx.from?.id);
const user = await this.userStore.findByTelegramUserId(telegramUserId);
if (!user) {
await ctx.reply("You are not connected yet. Use /start and connect from Desktop first.");
return;
}
const target = input.trim();
const current = user.conversationId ?? user.agentId;
if (!target) {
await ctx.reply(
`<b>Current session</b>\n\n` +
`<code>${current}</code>\n\n` +
`Use <code>/session &lt;id&gt;</code> to switch.`,
{ parse_mode: "HTML" },
);
return;
}
if (!this.eventsGateway.isDeviceRegistered(user.hubId)) {
await ctx.reply("Your Hub is offline. Open Desktop and reconnect, then try again.");
return;
}
if (!this.eventsGateway.isDeviceRegistered(user.deviceId)) {
this.registerVirtualDeviceForUser(user.deviceId, user.telegramUserId);
}
try {
const result = await this.sendRpc<Record<string, never>, ListAgentsResult>(
user.deviceId,
user.hubId,
"listAgents",
{},
VERIFY_TIMEOUT_MS,
"List sessions request timed out",
);
const exists = result.agents.some((item) => item.id === target && !item.closed);
if (!exists) {
await ctx.reply(
`Session not found: <code>${target}</code>\n\nUse /sessions to list available sessions.`,
{ parse_mode: "HTML" },
);
return;
}
await this.userStore.upsert({
telegramUserId: user.telegramUserId,
hubId: user.hubId,
agentId: user.agentId,
conversationId: target,
deviceId: user.deviceId,
telegramUsername: user.telegramUsername,
telegramFirstName: user.telegramFirstName,
telegramLastName: user.telegramLastName,
});
await ctx.reply(
`<b>\u2705 Session switched</b>\n\n` +
`Current session: <code>${target}</code>`,
{ parse_mode: "HTML" },
);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await ctx.reply(`Failed to switch session: ${message}`);
}
}
// ── Inbound: media messages ──
private async handleMediaMessage(ctx: Context, media: MediaAttachment): Promise<void> {
@ -1073,12 +1279,14 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
? `Telegram @${msg.from.username}`
: `Telegram ${msg?.from?.first_name ?? telegramUserId}`,
});
const mainConversationId = connectionInfo.conversationId ?? result.mainConversationId ?? result.agentId;
// 5. Save to DB
await this.userStore.upsert({
telegramUserId,
hubId: connectionInfo.hubId,
agentId: connectionInfo.agentId,
conversationId: mainConversationId,
deviceId,
telegramUsername: msg?.from?.username,
telegramFirstName: msg?.from?.first_name,
@ -1092,7 +1300,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
await ctx.reply(
`<b>\u2705 Connected successfully!</b>\n\n` +
`Hub: <code>${result.hubId}</code>\n` +
`Agent: <code>${result.agentId}</code>\n\n` +
`Agent: <code>${result.agentId}</code>\n` +
`Session: <code>${mainConversationId}</code>\n\n` +
`You can now send messages to interact with your agent.`,
{ parse_mode: "HTML", reply_markup: successKeyboard },
);
@ -1298,13 +1507,14 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
}
// Send message to Hub
const conversationId = user.conversationId ?? user.agentId;
const message: RoutedMessage = {
id: uuidv7(),
uid: null,
from: user.deviceId,
to: user.hubId,
action: "message",
payload: { agentId: user.agentId, content: text },
payload: { agentId: user.agentId, conversationId, content: text },
};
const sent = this.eventsGateway.routeFromVirtualDevice(message);
@ -1314,7 +1524,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
}
this.logger.debug(
`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}`,
`Routed message to Hub: deviceId=${user.deviceId}, hubId=${user.hubId}, agentId=${user.agentId}, conversationId=${conversationId}`,
);
}

View file

@ -7,6 +7,8 @@ export interface TelegramUser {
telegramUserId: string;
hubId: string;
agentId: string;
/** Current active conversation for this Telegram thread. */
conversationId?: string | undefined;
deviceId: string;
createdAt: Date;
updatedAt: Date;
@ -20,6 +22,7 @@ export interface TelegramUserCreate {
telegramUserId: string;
hubId: string;
agentId: string;
conversationId?: string | undefined;
deviceId?: string;
telegramUsername?: string | undefined;
telegramFirstName?: string | undefined;

View file

@ -52,6 +52,24 @@ pnpm dev:local:archive
- `MULTICA_WORKSPACE_DIR`: override workspace root
- `MULTICA_RUN_LOG=1`: enable structured run-log output
## Agent / Conversation Semantics
- `agentId`: logical owner identity (capabilities/profile scope).
- `conversationId`: isolated runtime thread under an agent.
- `sessionId`: runtime/storage id for a conversation (currently same as `conversationId`).
Compatibility behavior:
- If only `agentId` is provided, the runtime resolves `conversationId = agentId`.
- New integrations should pass `conversationId` explicitly.
Telegram behavior:
- One Telegram DM binds to one active `conversationId`.
- `/new` creates and switches to a new conversation.
- `/session <id>` switches the active conversation.
- `/sessions` lists available conversations.
## Local Full-Stack Notes (`pnpm dev:local`)
`pnpm dev:local` is the recommended way to run the full local stack for integration work.