diff --git a/src/channels/inbound-debouncer.ts b/src/channels/inbound-debouncer.ts new file mode 100644 index 00000000..0ff9213f --- /dev/null +++ b/src/channels/inbound-debouncer.ts @@ -0,0 +1,93 @@ +/** + * Inbound message debouncer — batches rapid-fire messages from the same + * conversation into a single agent.write() call. + * + * When a message arrives: + * 1. Start a timer (delayMs, default 500ms) + * 2. If another message from the same conversationId arrives before timer fires, + * reset the timer and append the text + * 3. If maxWaitMs (default 2000ms) has elapsed since the first message, + * fire immediately regardless of timer + * 4. When timer fires, call the flush callback with all accumulated text + * + * This prevents rapid-fire messages from triggering multiple separate Agent + * runs. Instead, messages sent within a short window are concatenated with + * newlines and dispatched as one combined prompt. + * + * Inspired by OpenClaw's createInboundDebouncer pattern. + * @see docs/channel/openclaw-research.md — Section 7.3 message preprocessing + */ + +interface PendingBatch { + /** Accumulated message texts in arrival order */ + texts: string[]; + /** Timestamp of the first message in this batch */ + firstArrival: number; + /** Idle timer — fires when no new message arrives within delayMs */ + timer: ReturnType; +} + +export class InboundDebouncer { + private pending = new Map(); + + /** + * @param flushFn - Called when a batch is ready; receives conversationId and combined text + * @param delayMs - Idle window: how long to wait after each message before flushing (default 500ms) + * @param maxWaitMs - Hard cap: max time since first message before force-flushing (default 2000ms) + */ + constructor( + private readonly flushFn: (conversationId: string, combinedText: string) => void, + private readonly delayMs = 500, + private readonly maxWaitMs = 2000, + ) {} + + /** Add a message to the buffer. May trigger an immediate flush if maxWaitMs exceeded. */ + push(conversationId: string, text: string): void { + const existing = this.pending.get(conversationId); + + if (existing) { + // Append to existing batch, reset idle timer + existing.texts.push(text); + clearTimeout(existing.timer); + + // Check hard cap: if we've been buffering too long, flush now + const elapsed = Date.now() - existing.firstArrival; + if (elapsed >= this.maxWaitMs) { + this.flush(conversationId); + return; + } + + // Reset idle timer + existing.timer = setTimeout(() => this.flush(conversationId), this.delayMs); + } else { + // Start a new batch + const timer = setTimeout(() => this.flush(conversationId), this.delayMs); + this.pending.set(conversationId, { + texts: [text], + firstArrival: Date.now(), + timer, + }); + } + } + + /** Flush all pending messages for a conversation, invoking the flush callback */ + private flush(conversationId: string): void { + const batch = this.pending.get(conversationId); + if (!batch) return; + + clearTimeout(batch.timer); + this.pending.delete(conversationId); + + // Join multiple messages with newlines so the Agent sees them as one prompt + const combined = batch.texts.join("\n"); + this.flushFn(conversationId, combined); + } + + /** Clean up all pending timers (call on shutdown) */ + dispose(): void { + for (const batch of this.pending.values()) { + clearTimeout(batch.timer); + } + this.pending.clear(); + } +} diff --git a/src/channels/manager.ts b/src/channels/manager.ts index e54b97ad..213b6dc8 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -26,6 +26,7 @@ import type { AsyncAgent } from "../agent/async-agent.js"; import { transcribeAudio } from "../media/transcribe.js"; import { describeImage } from "../media/describe-image.js"; import { describeVideo } from "../media/describe-video.js"; +import { InboundDebouncer } from "./inbound-debouncer.js"; interface AccountHandle { channelId: string; @@ -54,6 +55,12 @@ export class ChannelManager { private aggregator: MessageAggregator | null = null; /** Typing indicator interval (repeats every 5s to keep Telegram typing visible) */ private typingTimer: ReturnType | null = null; + /** + * Inbound message debouncer — batches rapid-fire messages from the same + * conversation into a single agent.write() call. + * Initialized lazily on first message; uses the current agent reference. + */ + private debouncer: InboundDebouncer | null = null; constructor(hub: Hub) { this.hub = hub; @@ -183,6 +190,7 @@ export class ChannelManager { // Handle agent errors — notify the channel user if (event.type === "agent_error") { this.stopTyping(); + this.removeAckReaction(); const errorMsg = (event as { error?: string }).error ?? "Unknown error"; console.error(`[Channels] Agent error: ${errorMsg}`); const route = this.lastRoute; @@ -217,6 +225,7 @@ export class ChannelManager { // Clean up after response complete if (event.type === "message_end" && role === "assistant") { this.stopTyping(); + this.removeAckReaction(); this.aggregator = null; } }); @@ -283,18 +292,25 @@ export class ChannelManager { console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`); console.log(`[Channels] Forwarding to agent ${agent.sessionId}`); - // Show typing indicator while agent processes + // Show typing indicator and ACK reaction while agent processes this.startTyping(); + this.addAckReaction(); - // Handle media messages + // Handle media messages (processed async, then fed through debouncer) if (message.media && plugin.downloadMedia) { void this.routeMedia(plugin, accountId, message, agent); } else { - agent.write(text); + // Text messages go through debouncer to batch rapid-fire sends + this.getDebouncer(agent).push(conversationId, text); } } - /** Download media file, process it, and forward result to agent */ + /** + * Download media file, process it (transcribe/describe), and forward + * the resulting text through the debouncer to the agent. + * Media results are also debounced so that a rapid "photo + text" combo + * from the same conversation gets batched into one agent prompt. + */ private async routeMedia( plugin: ChannelPlugin, accountId: string, @@ -302,6 +318,7 @@ export class ChannelManager { agent: AsyncAgent, ): Promise { const media = message.media!; + const debouncer = this.getDebouncer(agent); try { const filePath = await plugin.downloadMedia!(media.fileId, accountId); @@ -312,12 +329,12 @@ export class ChannelManager { if (description) { const parts = ["[Image]", `Description: ${description}`]; if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } else { // No API key — fall back to file path const parts = ["[image message received]", `File: ${filePath}`]; if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } } else if (media.type === "audio") { // Audio: transcribe via Whisper API before reaching agent @@ -325,14 +342,14 @@ export class ChannelManager { if (transcript) { const parts = ["[Voice Message]", `Transcript: ${transcript}`]; if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } else { // No API key configured — fall back to file path const parts = ["[audio message received]", `File: ${filePath}`]; if (media.mimeType) parts.push(`Type: ${media.mimeType}`); if (media.duration) parts.push(`Duration: ${media.duration}s`); if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } } else if (media.type === "video") { // Video: extract frame + describe via Vision API @@ -341,14 +358,14 @@ export class ChannelManager { const parts = ["[Video]", `Description: ${description}`]; if (media.duration) parts.push(`Duration: ${media.duration}s`); if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } else { // ffmpeg unavailable or no API key — fall back to file path const parts = ["[video message received]", `File: ${filePath}`]; if (media.mimeType) parts.push(`Type: ${media.mimeType}`); if (media.duration) parts.push(`Duration: ${media.duration}s`); if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } } else { // Document: tell agent the file path @@ -357,15 +374,46 @@ export class ChannelManager { parts.push(`File: ${filePath}`); if (media.mimeType) parts.push(`Type: ${media.mimeType}`); if (media.caption) parts.push(`Caption: ${media.caption}`); - agent.write(parts.join("\n")); + debouncer.push(message.conversationId, parts.join("\n")); } } catch (err) { const msg = err instanceof Error ? err.message : String(err); console.error(`[Channels] Failed to process media: ${msg}`); - agent.write(message.text || `[Failed to process ${media.type}]`); + debouncer.push(message.conversationId, message.text || `[Failed to process ${media.type}]`); } } + /** + * Get or create the inbound debouncer, wired to the given agent. + * The debouncer batches rapid-fire messages by conversationId, then + * calls agent.write() once with the combined text. + */ + private getDebouncer(agent: AsyncAgent): InboundDebouncer { + if (!this.debouncer) { + this.debouncer = new InboundDebouncer( + (_conversationId, combinedText) => { + console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent`); + agent.write(combinedText); + }, + ); + } + return this.debouncer; + } + + /** Add 👀 reaction to acknowledge message receipt */ + private addAckReaction(): void { + const route = this.lastRoute; + if (!route?.plugin.outbound.addReaction) return; + void route.plugin.outbound.addReaction(route.deliveryCtx, "👀").catch(() => {}); + } + + /** Remove ACK reaction when processing completes */ + private removeAckReaction(): void { + const route = this.lastRoute; + if (!route?.plugin.outbound.removeReaction) return; + void route.plugin.outbound.removeReaction(route.deliveryCtx).catch(() => {}); + } + /** Start sending typing indicators (repeats every 5s until stopped) */ private startTyping(): void { this.stopTyping(); @@ -389,6 +437,8 @@ export class ChannelManager { stopAll(): void { console.log("[Channels] Stopping all channels..."); this.stopTyping(); + this.debouncer?.dispose(); + this.debouncer = null; if (this.agentUnsubscribe) { this.agentUnsubscribe(); this.agentUnsubscribe = null; diff --git a/src/channels/plugins/telegram.ts b/src/channels/plugins/telegram.ts index ecfa33a5..a4bb02c2 100644 --- a/src/channels/plugins/telegram.ts +++ b/src/channels/plugins/telegram.ts @@ -95,6 +95,32 @@ export const telegramChannel: ChannelPlugin = { const botUsername = botInfo.username; console.log(`[Telegram] Starting bot: @${botUsername} (id=${botId})`); + // ── Sequentialize middleware ── + // Ensures updates from the same chat are processed one at a time, + // preventing race conditions on shared state (e.g. ChannelManager.lastRoute). + // Grammy processes updates concurrently by default — without this, + // two messages arriving near-simultaneously could interleave. + // Lightweight alternative to @grammyjs/runner's sequentialize(). + // @see docs/channel/openclaw-research.md — Grammy middleware pipeline + const chatQueues = new Map>(); + bot.use(async (ctx, next) => { + const chatId = ctx.chat?.id; + if (!chatId) return next(); + + const key = String(chatId); + const prev = chatQueues.get(key) ?? Promise.resolve(); + + // Chain this handler onto the per-chat queue + const current = prev.then(() => next()).catch(() => {}); + chatQueues.set(key, current); + await current; + + // Clean up resolved entries to prevent memory leak + if (chatQueues.get(key) === current) { + chatQueues.delete(key); + } + }); + // Handle text messages bot.on("message:text", (ctx) => { const msg = ctx.message; @@ -263,6 +289,38 @@ export const telegramChannel: ChannelPlugin = { // Best-effort — typing indicator failure is not critical } }, + + async addReaction(ctx: DeliveryContext, emoji: string): Promise { + const bot = bots.get(ctx.accountId); + if (!bot || !ctx.replyToMessageId) return; + + try { + await bot.api.setMessageReaction( + Number(ctx.conversationId), + Number(ctx.replyToMessageId), + // Grammy expects a specific emoji union type; cast since our interface accepts any string + [{ type: "emoji", emoji } as unknown as { type: "emoji"; emoji: "👀" }], + ); + } catch { + // Best-effort — reaction failure is not critical + // (e.g. bot may lack permission in some groups) + } + }, + + async removeReaction(ctx: DeliveryContext): Promise { + const bot = bots.get(ctx.accountId); + if (!bot || !ctx.replyToMessageId) return; + + try { + await bot.api.setMessageReaction( + Number(ctx.conversationId), + Number(ctx.replyToMessageId), + [], // Empty array clears all bot reactions + ); + } catch { + // Best-effort + } + }, }, async downloadMedia(fileId: string, accountId: string): Promise { diff --git a/src/channels/types.ts b/src/channels/types.ts index a1aa58e9..43967759 100644 --- a/src/channels/types.ts +++ b/src/channels/types.ts @@ -98,6 +98,16 @@ export interface ChannelOutboundAdapter { replyText(ctx: DeliveryContext, text: string): Promise; /** Send "typing" indicator (optional, not all platforms support it) */ sendTyping?(ctx: DeliveryContext): Promise; + /** + * Add a reaction emoji to the incoming message (optional). + * Used for ACK feedback — e.g. 👀 to signal "processing started". + */ + addReaction?(ctx: DeliveryContext, emoji: string): Promise; + /** + * Remove reaction from the incoming message (optional). + * Called when processing completes to clear the ACK indicator. + */ + removeReaction?(ctx: DeliveryContext): Promise; } // ─── Channel Plugin ───