feat(channels): add inbound debouncer, ACK reactions, and sequentialize

- InboundDebouncer: batches rapid-fire messages from the same conversation
  into a single agent.write() call (500ms idle, 2s hard cap)
- ACK reactions: add 👀 emoji on message receipt, remove on completion
  (addReaction/removeReaction on ChannelOutboundAdapter interface)
- Grammy sequentialize middleware: ensures same-chat updates are processed
  in order, preventing race conditions on shared state

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-02-09 11:43:42 +08:00
parent 614d2cfd88
commit 0500dc1d53
4 changed files with 223 additions and 12 deletions

View file

@ -0,0 +1,93 @@
/**
* Inbound message debouncer batches rapid-fire messages from the same
* conversation into a single agent.write() call.
*
* When a message arrives:
* 1. Start a timer (delayMs, default 500ms)
* 2. If another message from the same conversationId arrives before timer fires,
* reset the timer and append the text
* 3. If maxWaitMs (default 2000ms) has elapsed since the first message,
* fire immediately regardless of timer
* 4. When timer fires, call the flush callback with all accumulated text
*
* This prevents rapid-fire messages from triggering multiple separate Agent
* runs. Instead, messages sent within a short window are concatenated with
* newlines and dispatched as one combined prompt.
*
* Inspired by OpenClaw's createInboundDebouncer pattern.
* @see docs/channel/openclaw-research.md Section 7.3 message preprocessing
*/
interface PendingBatch {
/** Accumulated message texts in arrival order */
texts: string[];
/** Timestamp of the first message in this batch */
firstArrival: number;
/** Idle timer — fires when no new message arrives within delayMs */
timer: ReturnType<typeof setTimeout>;
}
export class InboundDebouncer {
private pending = new Map<string, PendingBatch>();
/**
* @param flushFn - Called when a batch is ready; receives conversationId and combined text
* @param delayMs - Idle window: how long to wait after each message before flushing (default 500ms)
* @param maxWaitMs - Hard cap: max time since first message before force-flushing (default 2000ms)
*/
constructor(
private readonly flushFn: (conversationId: string, combinedText: string) => void,
private readonly delayMs = 500,
private readonly maxWaitMs = 2000,
) {}
/** Add a message to the buffer. May trigger an immediate flush if maxWaitMs exceeded. */
push(conversationId: string, text: string): void {
const existing = this.pending.get(conversationId);
if (existing) {
// Append to existing batch, reset idle timer
existing.texts.push(text);
clearTimeout(existing.timer);
// Check hard cap: if we've been buffering too long, flush now
const elapsed = Date.now() - existing.firstArrival;
if (elapsed >= this.maxWaitMs) {
this.flush(conversationId);
return;
}
// Reset idle timer
existing.timer = setTimeout(() => this.flush(conversationId), this.delayMs);
} else {
// Start a new batch
const timer = setTimeout(() => this.flush(conversationId), this.delayMs);
this.pending.set(conversationId, {
texts: [text],
firstArrival: Date.now(),
timer,
});
}
}
/** Flush all pending messages for a conversation, invoking the flush callback */
private flush(conversationId: string): void {
const batch = this.pending.get(conversationId);
if (!batch) return;
clearTimeout(batch.timer);
this.pending.delete(conversationId);
// Join multiple messages with newlines so the Agent sees them as one prompt
const combined = batch.texts.join("\n");
this.flushFn(conversationId, combined);
}
/** Clean up all pending timers (call on shutdown) */
dispose(): void {
for (const batch of this.pending.values()) {
clearTimeout(batch.timer);
}
this.pending.clear();
}
}

View file

@ -26,6 +26,7 @@ import type { AsyncAgent } from "../agent/async-agent.js";
import { transcribeAudio } from "../media/transcribe.js";
import { describeImage } from "../media/describe-image.js";
import { describeVideo } from "../media/describe-video.js";
import { InboundDebouncer } from "./inbound-debouncer.js";
interface AccountHandle {
channelId: string;
@ -54,6 +55,12 @@ export class ChannelManager {
private aggregator: MessageAggregator | null = null;
/** Typing indicator interval (repeats every 5s to keep Telegram typing visible) */
private typingTimer: ReturnType<typeof setInterval> | null = null;
/**
* Inbound message debouncer batches rapid-fire messages from the same
* conversation into a single agent.write() call.
* Initialized lazily on first message; uses the current agent reference.
*/
private debouncer: InboundDebouncer | null = null;
constructor(hub: Hub) {
this.hub = hub;
@ -183,6 +190,7 @@ export class ChannelManager {
// Handle agent errors — notify the channel user
if (event.type === "agent_error") {
this.stopTyping();
this.removeAckReaction();
const errorMsg = (event as { error?: string }).error ?? "Unknown error";
console.error(`[Channels] Agent error: ${errorMsg}`);
const route = this.lastRoute;
@ -217,6 +225,7 @@ export class ChannelManager {
// Clean up after response complete
if (event.type === "message_end" && role === "assistant") {
this.stopTyping();
this.removeAckReaction();
this.aggregator = null;
}
});
@ -283,18 +292,25 @@ export class ChannelManager {
console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`);
console.log(`[Channels] Forwarding to agent ${agent.sessionId}`);
// Show typing indicator while agent processes
// Show typing indicator and ACK reaction while agent processes
this.startTyping();
this.addAckReaction();
// Handle media messages
// Handle media messages (processed async, then fed through debouncer)
if (message.media && plugin.downloadMedia) {
void this.routeMedia(plugin, accountId, message, agent);
} else {
agent.write(text);
// Text messages go through debouncer to batch rapid-fire sends
this.getDebouncer(agent).push(conversationId, text);
}
}
/** Download media file, process it, and forward result to agent */
/**
* Download media file, process it (transcribe/describe), and forward
* the resulting text through the debouncer to the agent.
* Media results are also debounced so that a rapid "photo + text" combo
* from the same conversation gets batched into one agent prompt.
*/
private async routeMedia(
plugin: ChannelPlugin,
accountId: string,
@ -302,6 +318,7 @@ export class ChannelManager {
agent: AsyncAgent,
): Promise<void> {
const media = message.media!;
const debouncer = this.getDebouncer(agent);
try {
const filePath = await plugin.downloadMedia!(media.fileId, accountId);
@ -312,12 +329,12 @@ export class ChannelManager {
if (description) {
const parts = ["[Image]", `Description: ${description}`];
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
} else {
// No API key — fall back to file path
const parts = ["[image message received]", `File: ${filePath}`];
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
}
} else if (media.type === "audio") {
// Audio: transcribe via Whisper API before reaching agent
@ -325,14 +342,14 @@ export class ChannelManager {
if (transcript) {
const parts = ["[Voice Message]", `Transcript: ${transcript}`];
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
} else {
// No API key configured — fall back to file path
const parts = ["[audio message received]", `File: ${filePath}`];
if (media.mimeType) parts.push(`Type: ${media.mimeType}`);
if (media.duration) parts.push(`Duration: ${media.duration}s`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
}
} else if (media.type === "video") {
// Video: extract frame + describe via Vision API
@ -341,14 +358,14 @@ export class ChannelManager {
const parts = ["[Video]", `Description: ${description}`];
if (media.duration) parts.push(`Duration: ${media.duration}s`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
} else {
// ffmpeg unavailable or no API key — fall back to file path
const parts = ["[video message received]", `File: ${filePath}`];
if (media.mimeType) parts.push(`Type: ${media.mimeType}`);
if (media.duration) parts.push(`Duration: ${media.duration}s`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
}
} else {
// Document: tell agent the file path
@ -357,15 +374,46 @@ export class ChannelManager {
parts.push(`File: ${filePath}`);
if (media.mimeType) parts.push(`Type: ${media.mimeType}`);
if (media.caption) parts.push(`Caption: ${media.caption}`);
agent.write(parts.join("\n"));
debouncer.push(message.conversationId, parts.join("\n"));
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[Channels] Failed to process media: ${msg}`);
agent.write(message.text || `[Failed to process ${media.type}]`);
debouncer.push(message.conversationId, message.text || `[Failed to process ${media.type}]`);
}
}
/**
* Get or create the inbound debouncer, wired to the given agent.
* The debouncer batches rapid-fire messages by conversationId, then
* calls agent.write() once with the combined text.
*/
private getDebouncer(agent: AsyncAgent): InboundDebouncer {
if (!this.debouncer) {
this.debouncer = new InboundDebouncer(
(_conversationId, combinedText) => {
console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent`);
agent.write(combinedText);
},
);
}
return this.debouncer;
}
/** Add 👀 reaction to acknowledge message receipt */
private addAckReaction(): void {
const route = this.lastRoute;
if (!route?.plugin.outbound.addReaction) return;
void route.plugin.outbound.addReaction(route.deliveryCtx, "👀").catch(() => {});
}
/** Remove ACK reaction when processing completes */
private removeAckReaction(): void {
const route = this.lastRoute;
if (!route?.plugin.outbound.removeReaction) return;
void route.plugin.outbound.removeReaction(route.deliveryCtx).catch(() => {});
}
/** Start sending typing indicators (repeats every 5s until stopped) */
private startTyping(): void {
this.stopTyping();
@ -389,6 +437,8 @@ export class ChannelManager {
stopAll(): void {
console.log("[Channels] Stopping all channels...");
this.stopTyping();
this.debouncer?.dispose();
this.debouncer = null;
if (this.agentUnsubscribe) {
this.agentUnsubscribe();
this.agentUnsubscribe = null;

View file

@ -95,6 +95,32 @@ export const telegramChannel: ChannelPlugin = {
const botUsername = botInfo.username;
console.log(`[Telegram] Starting bot: @${botUsername} (id=${botId})`);
// ── Sequentialize middleware ──
// Ensures updates from the same chat are processed one at a time,
// preventing race conditions on shared state (e.g. ChannelManager.lastRoute).
// Grammy processes updates concurrently by default — without this,
// two messages arriving near-simultaneously could interleave.
// Lightweight alternative to @grammyjs/runner's sequentialize().
// @see docs/channel/openclaw-research.md — Grammy middleware pipeline
const chatQueues = new Map<string, Promise<void>>();
bot.use(async (ctx, next) => {
const chatId = ctx.chat?.id;
if (!chatId) return next();
const key = String(chatId);
const prev = chatQueues.get(key) ?? Promise.resolve();
// Chain this handler onto the per-chat queue
const current = prev.then(() => next()).catch(() => {});
chatQueues.set(key, current);
await current;
// Clean up resolved entries to prevent memory leak
if (chatQueues.get(key) === current) {
chatQueues.delete(key);
}
});
// Handle text messages
bot.on("message:text", (ctx) => {
const msg = ctx.message;
@ -263,6 +289,38 @@ export const telegramChannel: ChannelPlugin = {
// Best-effort — typing indicator failure is not critical
}
},
async addReaction(ctx: DeliveryContext, emoji: string): Promise<void> {
const bot = bots.get(ctx.accountId);
if (!bot || !ctx.replyToMessageId) return;
try {
await bot.api.setMessageReaction(
Number(ctx.conversationId),
Number(ctx.replyToMessageId),
// Grammy expects a specific emoji union type; cast since our interface accepts any string
[{ type: "emoji", emoji } as unknown as { type: "emoji"; emoji: "👀" }],
);
} catch {
// Best-effort — reaction failure is not critical
// (e.g. bot may lack permission in some groups)
}
},
async removeReaction(ctx: DeliveryContext): Promise<void> {
const bot = bots.get(ctx.accountId);
if (!bot || !ctx.replyToMessageId) return;
try {
await bot.api.setMessageReaction(
Number(ctx.conversationId),
Number(ctx.replyToMessageId),
[], // Empty array clears all bot reactions
);
} catch {
// Best-effort
}
},
},
async downloadMedia(fileId: string, accountId: string): Promise<string> {

View file

@ -98,6 +98,16 @@ export interface ChannelOutboundAdapter {
replyText(ctx: DeliveryContext, text: string): Promise<void>;
/** Send "typing" indicator (optional, not all platforms support it) */
sendTyping?(ctx: DeliveryContext): Promise<void>;
/**
* Add a reaction emoji to the incoming message (optional).
* Used for ACK feedback e.g. 👀 to signal "processing started".
*/
addReaction?(ctx: DeliveryContext, emoji: string): Promise<void>;
/**
* Remove reaction from the incoming message (optional).
* Called when processing completes to clear the ACK indicator.
*/
removeReaction?(ctx: DeliveryContext): Promise<void>;
}
// ─── Channel Plugin ───