diff --git a/apps/gateway/telegram/message-context-queue.test.ts b/apps/gateway/telegram/message-context-queue.test.ts new file mode 100644 index 00000000..16208389 --- /dev/null +++ b/apps/gateway/telegram/message-context-queue.test.ts @@ -0,0 +1,76 @@ +import assert from "node:assert/strict"; +import { describe, it } from "node:test"; +import { MessageContextQueue } from "./message-context-queue.js"; + +describe("MessageContextQueue", () => { + it("keeps the first context active while newer messages stay pending", () => { + const queue = new MessageContextQueue(); + const deviceId = "device-1"; + + queue.enqueue(deviceId, { telegramChatId: 100, telegramMessageId: 1 }); + queue.enqueue(deviceId, { telegramChatId: 100, telegramMessageId: 2 }); + + assert.deepEqual(queue.activate(deviceId), { + telegramChatId: 100, + telegramMessageId: 1, + }); + assert.deepEqual(queue.peekForSend(deviceId), { + telegramChatId: 100, + telegramMessageId: 1, + }); + + assert.deepEqual(queue.release(deviceId), { + telegramChatId: 100, + telegramMessageId: 1, + }); + assert.deepEqual(queue.peekForSend(deviceId), { + telegramChatId: 100, + telegramMessageId: 2, + }); + }); + + it("releases oldest pending context when a run errors before message_start", () => { + const queue = new MessageContextQueue(); + const deviceId = "device-2"; + + queue.enqueue(deviceId, { telegramChatId: 200, telegramMessageId: 11 }); + queue.enqueue(deviceId, { telegramChatId: 200, telegramMessageId: 12 }); + + // No activate(): simulate agent_error before streaming starts + assert.deepEqual(queue.release(deviceId), { + telegramChatId: 200, + telegramMessageId: 11, + }); + assert.deepEqual(queue.peekForSend(deviceId), { + telegramChatId: 200, + telegramMessageId: 12, + }); + }); + + it("does not advance queue on repeated activate calls during one run", () => { + const queue = new MessageContextQueue(); + const deviceId = "device-3"; + + queue.enqueue(deviceId, { telegramChatId: 300, telegramMessageId: 21 }); + queue.enqueue(deviceId, { telegramChatId: 300, telegramMessageId: 22 }); + + assert.equal(queue.activate(deviceId)?.telegramMessageId, 21); + assert.equal(queue.activate(deviceId)?.telegramMessageId, 21); + + assert.equal(queue.release(deviceId)?.telegramMessageId, 21); + assert.equal(queue.peekForSend(deviceId)?.telegramMessageId, 22); + }); + + it("isolates contexts by device", () => { + const queue = new MessageContextQueue(); + + queue.enqueue("a", { telegramChatId: 1, telegramMessageId: 1 }); + queue.enqueue("b", { telegramChatId: 2, telegramMessageId: 2 }); + + assert.equal(queue.activate("a")?.telegramMessageId, 1); + assert.equal(queue.peekForSend("b")?.telegramMessageId, 2); + assert.equal(queue.release("a")?.telegramMessageId, 1); + assert.equal(queue.peekForSend("a"), undefined); + assert.equal(queue.peekForSend("b")?.telegramMessageId, 2); + }); +}); diff --git a/apps/gateway/telegram/message-context-queue.ts b/apps/gateway/telegram/message-context-queue.ts new file mode 100644 index 00000000..fd630bb7 --- /dev/null +++ b/apps/gateway/telegram/message-context-queue.ts @@ -0,0 +1,79 @@ +export interface MessageContext { + telegramChatId: number; + telegramMessageId: number; +} + +/** + * Tracks inbound Telegram messages per device and pairs them with outbound agent runs. + * + * Why queue + active? + * - Pending queue preserves arrival order for rapid-fire user messages. + * - Active context binds the currently running agent response to exactly one message. + */ +export class MessageContextQueue { + private readonly pending = new Map(); + private readonly active = new Map(); + + enqueue(deviceId: string, context: MessageContext): void { + const queue = this.pending.get(deviceId); + if (queue) { + queue.push(context); + return; + } + this.pending.set(deviceId, [context]); + } + + /** + * Bind the next pending context to the active run for this device. + * If a run is already active, keep it unchanged. + */ + activate(deviceId: string): MessageContext | undefined { + const current = this.active.get(deviceId); + if (current) return current; + + const queue = this.pending.get(deviceId); + if (!queue || queue.length === 0) return undefined; + + const next = queue.shift(); + if (queue.length === 0) { + this.pending.delete(deviceId); + } + if (next) { + this.active.set(deviceId, next); + } + return next; + } + + /** + * Get the context to use for outbound sends. + * Prefer active run context; otherwise fall back to oldest pending. + */ + peekForSend(deviceId: string): MessageContext | undefined { + const current = this.active.get(deviceId); + if (current) return current; + + const queue = this.pending.get(deviceId); + return queue?.[0]; + } + + /** + * Release one context after a run completes/errors. + * Prefer active context; if none active, release oldest pending. + */ + release(deviceId: string): MessageContext | undefined { + const current = this.active.get(deviceId); + if (current) { + this.active.delete(deviceId); + return current; + } + + const queue = this.pending.get(deviceId); + if (!queue || queue.length === 0) return undefined; + + const next = queue.shift(); + if (queue.length === 0) { + this.pending.delete(deviceId); + } + return next; + } +} diff --git a/apps/gateway/telegram/telegram.service.ts b/apps/gateway/telegram/telegram.service.ts index e4aedcf9..3fa3cc8e 100644 --- a/apps/gateway/telegram/telegram.service.ts +++ b/apps/gateway/telegram/telegram.service.ts @@ -46,6 +46,10 @@ import { TelegramUserStore } from "./telegram-user.store.js"; import type { TelegramUser } from "./types.js"; import { markdownToTelegramHtml } from "./telegram-format.js"; import { ShortCodeStore } from "./short-code-store.js"; +import { + MessageContextQueue, + type MessageContext, +} from "./message-context-queue.js"; // ── Types ── @@ -67,12 +71,6 @@ interface PendingRequest { timer: ReturnType; } -/** Tracks the originating Telegram message for reply_to and reaction cleanup */ -interface MessageContext { - telegramChatId: number; - telegramMessageId: number; -} - /** Media attachment extracted from a Telegram message */ interface MediaAttachment { type: "audio" | "image" | "video" | "document"; @@ -182,8 +180,11 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { private pendingRequests = new Map(); /** Typing indicator timers, keyed by deviceId */ private typingTimers = new Map>(); - /** Tracks the originating message for reply_to & reaction cleanup, keyed by deviceId */ - private messageContexts = new Map(); + /** + * Per-device inbound message contexts. + * Queue preserves arrival order; active binds the current run's reply target. + */ + private readonly messageContexts = new MessageContextQueue(); /** Deduplicate welcome sends when Telegram replays updates in a short window */ private welcomeSentAt = new Map(); @@ -818,7 +819,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { } // Use chatId from message context (supports groups); fall back to user ID (private chat) - const context = this.messageContexts.get(deviceId); + const context = this.messageContexts.peekForSend(deviceId); const chatId = context?.telegramChatId ?? Number(user.telegramUserId); const chunks = chunkText(text); @@ -877,7 +878,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { const user = await this.userStore.findByDeviceId(deviceId); if (!user) return; - const context = this.messageContexts.get(deviceId); + const context = this.messageContexts.peekForSend(deviceId); const chatId = context?.telegramChatId ?? Number(user.telegramUserId); const inputFile = new InputFile(data, filename); @@ -966,7 +967,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { private startTyping(deviceId: string): void { if (this.typingTimers.has(deviceId)) return; - const context = this.messageContexts.get(deviceId); + const context = this.messageContexts.peekForSend(deviceId); if (!context) return; const chatId = context.telegramChatId; @@ -996,18 +997,22 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { // ── Message context tracking ── private storeMessageContext(deviceId: string, chatId: number, messageId: number): void { - this.messageContexts.set(deviceId, { + this.messageContexts.enqueue(deviceId, { telegramChatId: chatId, telegramMessageId: messageId, }); } + /** Bind the oldest pending context to the currently running agent response. */ + private activateMessageContext(deviceId: string): MessageContext | undefined { + return this.messageContexts.activate(deviceId); + } + /** Remove context and 👀 reaction for a device after response is sent */ private async clearMessageContext(deviceId: string): Promise { - const context = this.messageContexts.get(deviceId); + const context = this.messageContexts.release(deviceId); if (context) { await this.removeReaction(context.telegramChatId, context.telegramMessageId); - this.messageContexts.delete(deviceId); } } @@ -1353,6 +1358,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { // Start typing when LLM begins generating if (event.type === "message_start") { + this.activateMessageContext(deviceId); this.startTyping(deviceId); return; } @@ -1373,7 +1379,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { if (narration) { void this.sendToTelegram(deviceId, narration).then(() => { // Re-send typing indicator — Telegram clears it when a message is sent - const ctx = this.messageContexts.get(deviceId); + const ctx = this.messageContexts.peekForSend(deviceId); if (ctx) { void this.bot?.api.sendChatAction(ctx.telegramChatId, "typing").catch(() => {}); } @@ -1431,7 +1437,9 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy { if (msg.action === "message") { const payload = msg.payload as { content?: string; agentId?: string }; if (payload?.content) { - void this.sendToTelegram(deviceId, payload.content); + void this.sendToTelegram(deviceId, payload.content).then(() => { + void this.clearMessageContext(deviceId); + }); } return; }