Merge pull request #212 from multica-ai/codex/telegram-reply-context-queue-fix

fix(gateway): preserve Telegram reply context ordering
This commit is contained in:
Jiayuan Zhang 2026-02-17 00:23:41 +08:00 committed by GitHub
commit a1bb77c162
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 179 additions and 16 deletions

View file

@ -0,0 +1,76 @@
import assert from "node:assert/strict";
import { describe, it } from "node:test";
import { MessageContextQueue } from "./message-context-queue.js";
describe("MessageContextQueue", () => {
it("keeps the first context active while newer messages stay pending", () => {
const queue = new MessageContextQueue();
const deviceId = "device-1";
queue.enqueue(deviceId, { telegramChatId: 100, telegramMessageId: 1 });
queue.enqueue(deviceId, { telegramChatId: 100, telegramMessageId: 2 });
assert.deepEqual(queue.activate(deviceId), {
telegramChatId: 100,
telegramMessageId: 1,
});
assert.deepEqual(queue.peekForSend(deviceId), {
telegramChatId: 100,
telegramMessageId: 1,
});
assert.deepEqual(queue.release(deviceId), {
telegramChatId: 100,
telegramMessageId: 1,
});
assert.deepEqual(queue.peekForSend(deviceId), {
telegramChatId: 100,
telegramMessageId: 2,
});
});
it("releases oldest pending context when a run errors before message_start", () => {
const queue = new MessageContextQueue();
const deviceId = "device-2";
queue.enqueue(deviceId, { telegramChatId: 200, telegramMessageId: 11 });
queue.enqueue(deviceId, { telegramChatId: 200, telegramMessageId: 12 });
// No activate(): simulate agent_error before streaming starts
assert.deepEqual(queue.release(deviceId), {
telegramChatId: 200,
telegramMessageId: 11,
});
assert.deepEqual(queue.peekForSend(deviceId), {
telegramChatId: 200,
telegramMessageId: 12,
});
});
it("does not advance queue on repeated activate calls during one run", () => {
const queue = new MessageContextQueue();
const deviceId = "device-3";
queue.enqueue(deviceId, { telegramChatId: 300, telegramMessageId: 21 });
queue.enqueue(deviceId, { telegramChatId: 300, telegramMessageId: 22 });
assert.equal(queue.activate(deviceId)?.telegramMessageId, 21);
assert.equal(queue.activate(deviceId)?.telegramMessageId, 21);
assert.equal(queue.release(deviceId)?.telegramMessageId, 21);
assert.equal(queue.peekForSend(deviceId)?.telegramMessageId, 22);
});
it("isolates contexts by device", () => {
const queue = new MessageContextQueue();
queue.enqueue("a", { telegramChatId: 1, telegramMessageId: 1 });
queue.enqueue("b", { telegramChatId: 2, telegramMessageId: 2 });
assert.equal(queue.activate("a")?.telegramMessageId, 1);
assert.equal(queue.peekForSend("b")?.telegramMessageId, 2);
assert.equal(queue.release("a")?.telegramMessageId, 1);
assert.equal(queue.peekForSend("a"), undefined);
assert.equal(queue.peekForSend("b")?.telegramMessageId, 2);
});
});

View file

@ -0,0 +1,79 @@
export interface MessageContext {
telegramChatId: number;
telegramMessageId: number;
}
/**
* Tracks inbound Telegram messages per device and pairs them with outbound agent runs.
*
* Why queue + active?
* - Pending queue preserves arrival order for rapid-fire user messages.
* - Active context binds the currently running agent response to exactly one message.
*/
export class MessageContextQueue {
private readonly pending = new Map<string, MessageContext[]>();
private readonly active = new Map<string, MessageContext>();
enqueue(deviceId: string, context: MessageContext): void {
const queue = this.pending.get(deviceId);
if (queue) {
queue.push(context);
return;
}
this.pending.set(deviceId, [context]);
}
/**
* Bind the next pending context to the active run for this device.
* If a run is already active, keep it unchanged.
*/
activate(deviceId: string): MessageContext | undefined {
const current = this.active.get(deviceId);
if (current) return current;
const queue = this.pending.get(deviceId);
if (!queue || queue.length === 0) return undefined;
const next = queue.shift();
if (queue.length === 0) {
this.pending.delete(deviceId);
}
if (next) {
this.active.set(deviceId, next);
}
return next;
}
/**
* Get the context to use for outbound sends.
* Prefer active run context; otherwise fall back to oldest pending.
*/
peekForSend(deviceId: string): MessageContext | undefined {
const current = this.active.get(deviceId);
if (current) return current;
const queue = this.pending.get(deviceId);
return queue?.[0];
}
/**
* Release one context after a run completes/errors.
* Prefer active context; if none active, release oldest pending.
*/
release(deviceId: string): MessageContext | undefined {
const current = this.active.get(deviceId);
if (current) {
this.active.delete(deviceId);
return current;
}
const queue = this.pending.get(deviceId);
if (!queue || queue.length === 0) return undefined;
const next = queue.shift();
if (queue.length === 0) {
this.pending.delete(deviceId);
}
return next;
}
}

View file

@ -46,6 +46,10 @@ import { TelegramUserStore } from "./telegram-user.store.js";
import type { TelegramUser } from "./types.js";
import { markdownToTelegramHtml } from "./telegram-format.js";
import { ShortCodeStore } from "./short-code-store.js";
import {
MessageContextQueue,
type MessageContext,
} from "./message-context-queue.js";
// ── Types ──
@ -67,12 +71,6 @@ interface PendingRequest<T = unknown> {
timer: ReturnType<typeof setTimeout>;
}
/** Tracks the originating Telegram message for reply_to and reaction cleanup */
interface MessageContext {
telegramChatId: number;
telegramMessageId: number;
}
/** Media attachment extracted from a Telegram message */
interface MediaAttachment {
type: "audio" | "image" | "video" | "document";
@ -182,8 +180,11 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
private pendingRequests = new Map<string, PendingRequest>();
/** Typing indicator timers, keyed by deviceId */
private typingTimers = new Map<string, ReturnType<typeof setInterval>>();
/** Tracks the originating message for reply_to & reaction cleanup, keyed by deviceId */
private messageContexts = new Map<string, MessageContext>();
/**
* Per-device inbound message contexts.
* Queue preserves arrival order; active binds the current run's reply target.
*/
private readonly messageContexts = new MessageContextQueue();
/** Deduplicate welcome sends when Telegram replays updates in a short window */
private welcomeSentAt = new Map<string, number>();
@ -818,7 +819,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
}
// Use chatId from message context (supports groups); fall back to user ID (private chat)
const context = this.messageContexts.get(deviceId);
const context = this.messageContexts.peekForSend(deviceId);
const chatId = context?.telegramChatId ?? Number(user.telegramUserId);
const chunks = chunkText(text);
@ -877,7 +878,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
const user = await this.userStore.findByDeviceId(deviceId);
if (!user) return;
const context = this.messageContexts.get(deviceId);
const context = this.messageContexts.peekForSend(deviceId);
const chatId = context?.telegramChatId ?? Number(user.telegramUserId);
const inputFile = new InputFile(data, filename);
@ -966,7 +967,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
private startTyping(deviceId: string): void {
if (this.typingTimers.has(deviceId)) return;
const context = this.messageContexts.get(deviceId);
const context = this.messageContexts.peekForSend(deviceId);
if (!context) return;
const chatId = context.telegramChatId;
@ -996,18 +997,22 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
// ── Message context tracking ──
private storeMessageContext(deviceId: string, chatId: number, messageId: number): void {
this.messageContexts.set(deviceId, {
this.messageContexts.enqueue(deviceId, {
telegramChatId: chatId,
telegramMessageId: messageId,
});
}
/** Bind the oldest pending context to the currently running agent response. */
private activateMessageContext(deviceId: string): MessageContext | undefined {
return this.messageContexts.activate(deviceId);
}
/** Remove context and 👀 reaction for a device after response is sent */
private async clearMessageContext(deviceId: string): Promise<void> {
const context = this.messageContexts.get(deviceId);
const context = this.messageContexts.release(deviceId);
if (context) {
await this.removeReaction(context.telegramChatId, context.telegramMessageId);
this.messageContexts.delete(deviceId);
}
}
@ -1353,6 +1358,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
// Start typing when LLM begins generating
if (event.type === "message_start") {
this.activateMessageContext(deviceId);
this.startTyping(deviceId);
return;
}
@ -1373,7 +1379,7 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
if (narration) {
void this.sendToTelegram(deviceId, narration).then(() => {
// Re-send typing indicator — Telegram clears it when a message is sent
const ctx = this.messageContexts.get(deviceId);
const ctx = this.messageContexts.peekForSend(deviceId);
if (ctx) {
void this.bot?.api.sendChatAction(ctx.telegramChatId, "typing").catch(() => {});
}
@ -1431,7 +1437,9 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
if (msg.action === "message") {
const payload = msg.payload as { content?: string; agentId?: string };
if (payload?.content) {
void this.sendToTelegram(deviceId, payload.content);
void this.sendToTelegram(deviceId, payload.content).then(() => {
void this.clearMessageContext(deviceId);
});
}
return;
}