From f9d1e5c8096a474da88ff9e223c16202219de84f Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 17:38:42 +0800 Subject: [PATCH 1/5] fix(channels): correct reply routing and ack lifecycle for rapid-fire messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace global lastRoute-based reply targeting with a FIFO route queue (pendingRoutes) that snapshots route + ack targets at each debouncer flush. Use agent_start/agent_end lifecycle instead of message_end to ensure stable routing across multi-turn runs. Track per-message 👀 acks in ackBuffer → activeAcks for precise cleanup. Two-gate typing stop: only stop when all queued runs complete. Co-Authored-By: Claude Opus 4.6 --- src/channels/inbound-debouncer.ts | 25 +++---- src/channels/manager.ts | 113 ++++++++++++++++++++++-------- 2 files changed, 97 insertions(+), 41 deletions(-) diff --git a/src/channels/inbound-debouncer.ts b/src/channels/inbound-debouncer.ts index 0ff9213f..3a8ce075 100644 --- a/src/channels/inbound-debouncer.ts +++ b/src/channels/inbound-debouncer.ts @@ -10,12 +10,9 @@ * fire immediately regardless of timer * 4. When timer fires, call the flush callback with all accumulated text * - * This prevents rapid-fire messages from triggering multiple separate Agent - * runs. Instead, messages sent within a short window are concatenated with - * newlines and dispatched as one combined prompt. - * - * Inspired by OpenClaw's createInboundDebouncer pattern. - * @see docs/channel/openclaw-research.md — Section 7.3 message preprocessing + * Each flush produces an independent agent.write() call with its own reply + * target. Messages arriving while the agent is busy are NOT accumulated — + * they get their own debounce window and independent processing. */ interface PendingBatch { @@ -24,7 +21,7 @@ interface PendingBatch { /** Timestamp of the first message in this batch */ firstArrival: number; /** Idle timer — fires when no new message arrives within delayMs */ - timer: ReturnType; + timer: ReturnType | null; } export class InboundDebouncer { @@ -44,11 +41,12 @@ export class InboundDebouncer { /** Add a message to the buffer. May trigger an immediate flush if maxWaitMs exceeded. */ push(conversationId: string, text: string): void { const existing = this.pending.get(conversationId); + const preview = text.slice(0, 40) + (text.length > 40 ? "..." : ""); if (existing) { - // Append to existing batch, reset idle timer + // Append to existing batch, clear any running timer existing.texts.push(text); - clearTimeout(existing.timer); + if (existing.timer !== null) clearTimeout(existing.timer); // Check hard cap: if we've been buffering too long, flush now const elapsed = Date.now() - existing.firstArrival; @@ -58,9 +56,11 @@ export class InboundDebouncer { } // Reset idle timer + console.log(`[Debouncer] push (appending #${existing.texts.length}): "${preview}"`); existing.timer = setTimeout(() => this.flush(conversationId), this.delayMs); } else { - // Start a new batch + // Start a new batch with idle timer + console.log(`[Debouncer] push (new batch, timer=${this.delayMs}ms): "${preview}"`); const timer = setTimeout(() => this.flush(conversationId), this.delayMs); this.pending.set(conversationId, { texts: [text], @@ -75,18 +75,19 @@ export class InboundDebouncer { const batch = this.pending.get(conversationId); if (!batch) return; - clearTimeout(batch.timer); + if (batch.timer !== null) clearTimeout(batch.timer); this.pending.delete(conversationId); // Join multiple messages with newlines so the Agent sees them as one prompt const combined = batch.texts.join("\n"); + console.log(`[Debouncer] flush: ${batch.texts.length} message(s), ${combined.length} chars`); this.flushFn(conversationId, combined); } /** Clean up all pending timers (call on shutdown) */ dispose(): void { for (const batch of this.pending.values()) { - clearTimeout(batch.timer); + if (batch.timer !== null) clearTimeout(batch.timer); } this.pending.clear(); } diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 7b0cea7c..5cbe7425 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -46,8 +46,19 @@ export class ChannelManager { private readonly hub: Hub; /** Running accounts keyed by "channelId:accountId" */ private readonly accounts = new Map(); - /** Where the last channel message came from (reply target) */ + /** Where the last channel message came from (used for typing/reactions/errors) */ private lastRoute: LastRoute | null = null; + /** + * FIFO queue of route snapshots + their ack targets, captured at each debouncer flush. + * Each agent.write() gets its own entry; dequeued on agent_start. + */ + private pendingRoutes: { route: LastRoute; acks: LastRoute[] }[] = []; + /** Route for the currently active agent run (set on agent_start, cleared on agent_end). */ + private activeRoute: LastRoute | null = null; + /** All messages in the current run's batch that have 👀 (cleared on agent_end). */ + private activeAcks: LastRoute[] = []; + /** Accumulates message routes for 👀 removal between debouncer flushes. */ + private ackBuffer: LastRoute[] = []; /** Unsubscribe function for the agent subscriber */ private agentUnsubscribe: (() => void) | null = null; /** Session ID of the currently subscribed agent (for stale detection) */ @@ -188,13 +199,50 @@ export class ChannelManager { this.subscribedAgentId = agent.sessionId; this.agentUnsubscribe = agent.subscribe((event) => { + const maybeMessage = (event as { message?: { role?: string } }).message; + const role = maybeMessage?.role; + + // Activate the next pending route + acks when a new agent run starts. + if (event.type === "agent_start") { + const entry = this.pendingRoutes.shift(); + if (entry) { + this.activeRoute = entry.route; + this.activeAcks = entry.acks; + console.log(`[Channels] agent_start: activeRoute replyTo=${entry.route.deliveryCtx.replyToMessageId}, acks=${entry.acks.length}`); + } + } + + // Agent run complete — remove 👀 from all batch messages, conditionally stop typing. + if (event.type === "agent_end") { + for (const ack of this.activeAcks) { + if (ack.plugin.outbound.removeReaction) { + console.log(`[Channels] agent_end: removing 👀 from replyTo=${ack.deliveryCtx.replyToMessageId}`); + void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); + } + } + this.activeRoute = null; + this.activeAcks = []; + if (this.pendingRoutes.length === 0) { + console.log("[Channels] agent_end: no more pending, stopping typing"); + this.stopTyping(); + } else { + console.log(`[Channels] agent_end: ${this.pendingRoutes.length} pending run(s), keeping typing`); + } + } + // No active channel route — skip (reply goes to desktop/gateway only) if (!this.lastRoute) return; // Handle agent errors — notify the channel user if (event.type === "agent_error") { this.stopTyping(); - this.removeAckReaction(); + for (const ack of this.activeAcks) { + if (ack.plugin.outbound.removeReaction) { + void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); + } + } + this.activeRoute = null; + this.activeAcks = []; const errorMsg = (event as { message?: string }).message ?? "Unknown error"; console.error(`[Channels] Agent error: ${errorMsg}`); const route = this.lastRoute; @@ -206,9 +254,6 @@ export class ChannelManager { return; } - const maybeMessage = (event as { message?: { role?: string } }).message; - const role = maybeMessage?.role; - // Only forward assistant message events if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") { if (role !== "assistant") return; @@ -236,28 +281,31 @@ export class ChannelManager { this.aggregator.handleEvent(event); } - // Clean up after response complete + // Finalize aggregator per assistant message (may fire multiple times in multi-turn runs). + // Typing and ack removal are handled at agent_end, not here. if (event.type === "message_end" && role === "assistant") { - this.stopTyping(); - this.removeAckReaction(); this.aggregator = null; } }); } - /** Create a fresh aggregator wired to the current lastRoute */ + /** + * Create a fresh aggregator wired to the activeRoute (snapshotted at flush time). + * Falls back to lastRoute for non-debounced paths (e.g. direct writes). + */ private createAggregator(): void { - const route = this.lastRoute; + const route = this.activeRoute ?? this.lastRoute; if (!route) return; const { plugin, deliveryCtx } = route; + console.log(`[Channels] createAggregator: replyTo=${deliveryCtx.replyToMessageId} (source=${this.activeRoute ? "activeRoute" : "lastRoute"})`); const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; this.aggregator = new MessageAggregator( chunkerConfig, async (block) => { try { - console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId}`); + console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId} replyTo=${deliveryCtx.replyToMessageId}`); if (block.index === 0) { await plugin.outbound.replyText(deliveryCtx, block.text); } else { @@ -303,12 +351,17 @@ export class ChannelManager { replyToMessageId: messageId, }, }; - console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`); + console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId} replyTo=${messageId}`); console.log(`[Channels] Forwarding to agent ${agent.sessionId}`); - // Show typing indicator and ACK reaction while agent processes + // Show typing indicator and 👀 ack on this message this.startTyping(); - this.addAckReaction(); + const ackRoute: LastRoute = { ...this.lastRoute }; + if (ackRoute.plugin.outbound.addReaction) { + console.log(`[Channels] Adding 👀 to replyTo=${messageId}`); + void ackRoute.plugin.outbound.addReaction(ackRoute.deliveryCtx, "👀").catch(() => {}); + } + this.ackBuffer.push(ackRoute); // Handle media messages (processed async, then fed through debouncer) if (message.media && plugin.downloadMedia) { @@ -406,7 +459,15 @@ export class ChannelManager { if (!this.debouncer) { this.debouncer = new InboundDebouncer( (_conversationId, combinedText) => { - console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent`); + // Snapshot the current route + pending acks for this batch. + const route = this.lastRoute ? { ...this.lastRoute } : null; + const acks = [...this.ackBuffer]; + this.ackBuffer = []; + if (route) { + this.pendingRoutes.push({ route, acks }); + } + const replyTo = route?.deliveryCtx.replyToMessageId ?? "?"; + console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`); agent.write(combinedText); }, ); @@ -414,20 +475,6 @@ export class ChannelManager { return this.debouncer; } - /** Add 👀 reaction to acknowledge message receipt */ - private addAckReaction(): void { - const route = this.lastRoute; - if (!route?.plugin.outbound.addReaction) return; - void route.plugin.outbound.addReaction(route.deliveryCtx, "👀").catch(() => {}); - } - - /** Remove ACK reaction when processing completes */ - private removeAckReaction(): void { - const route = this.lastRoute; - if (!route?.plugin.outbound.removeReaction) return; - void route.plugin.outbound.removeReaction(route.deliveryCtx).catch(() => {}); - } - /** Start sending typing indicators (repeats every 5s until stopped) */ private startTyping(): void { this.stopTyping(); @@ -462,6 +509,10 @@ export class ChannelManager { if (this.lastRoute && this.lastRoute.plugin.id === channelId && this.lastRoute.deliveryCtx.accountId === accountId) { this.stopTyping(); this.lastRoute = null; + this.activeRoute = null; + this.activeAcks = []; + this.ackBuffer = []; + this.pendingRoutes = []; this.aggregator = null; } @@ -495,6 +546,10 @@ export class ChannelManager { } this.accounts.clear(); this.lastRoute = null; + this.activeRoute = null; + this.activeAcks = []; + this.ackBuffer = []; + this.pendingRoutes = []; this.aggregator = null; } From ec67dd67060cdae79b721b7e0050a0869072c7f8 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 17:38:50 +0800 Subject: [PATCH 2/5] fix(ui): strip channel metadata prefixes from user messages in display Add stripUserMetadata() to remove timestamp envelopes and media type labels ([Voice Message] Transcript:, [Image] Description:, etc.) that are injected for LLM context but should not appear in the desktop UI. Co-Authored-By: Claude Opus 4.6 --- packages/ui/src/components/message-list.tsx | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/ui/src/components/message-list.tsx b/packages/ui/src/components/message-list.tsx index f02d963b..ee53783a 100644 --- a/packages/ui/src/components/message-list.tsx +++ b/packages/ui/src/components/message-list.tsx @@ -22,6 +22,18 @@ function getThinkingText(blocks: ContentBlock[]): string { .join("") } +/** Strip LLM-facing metadata prefixes from user messages for clean display */ +function stripUserMetadata(text: string): string { + // Strip timestamp envelope: [Mon 2026-02-09 14:38 GMT+8] + let cleaned = text.replace(/^\[(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}[^\]]*\]\s*/, "") + // Strip media type labels injected by channel media processing + cleaned = cleaned + .replace(/^\[Voice Message\]\n?Transcript:\s*/i, "") + .replace(/^\[Image\]\n?Description:\s*/i, "") + .replace(/^\[Video\]\n?Description:\s*/i, "") + return cleaned +} + /** Build a synthetic "running" toolResult Message from a ToolCall block */ function toRunningMessage(tc: ToolCall, agentId: string): Message { return { @@ -62,7 +74,8 @@ export const MessageList = memo(function MessageList({ messages, streamingIds }: return } - const text = getTextContent(msg.content) + const rawText = getTextContent(msg.content) + const text = msg.role === "user" ? stripUserMetadata(rawText) : rawText const toolCalls = msg.role === "assistant" ? getToolCalls(msg.content) : [] const thinking = msg.role === "assistant" ? getThinkingText(msg.content) : "" const hasThinkingBlocks = msg.role === "assistant" && msg.content.some((b) => b.type === "thinking") From c27f4e66b5172c5599db967947f867d46fe9ba6a Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 17:38:56 +0800 Subject: [PATCH 3/5] docs(channels): update README with route queue pattern and ack lifecycle Document the new channel system design: FIFO pendingRoutes queue, activeRoute/activeAcks state, agent_start/agent_end lifecycle, InboundDebouncer, typing/reaction lifecycle, and UI metadata stripping. Co-Authored-By: Claude Opus 4.6 --- docs/channels/README.md | 163 +++++++++++++++++--- packages/ui/src/components/message-list.tsx | 11 +- 2 files changed, 148 insertions(+), 26 deletions(-) diff --git a/docs/channels/README.md b/docs/channels/README.md index 426ebdb3..4a947dbb 100644 --- a/docs/channels/README.md +++ b/docs/channels/README.md @@ -18,10 +18,23 @@ The Channel system connects external messaging platforms (Telegram, Discord, etc │ Channel Manager (manager.ts) │ │ │ │ startAll() → iterate plugins → startAccount() per account │ -│ subscribeToAgent() → listen for AI replies │ +│ ensureSubscribed() → listen for agent lifecycle events │ │ │ -│ Incoming: routeIncoming() → routeMedia() → agent.write() │ -│ Outgoing: lastRoute → aggregator → plugin.outbound.*() │ +│ Incoming: │ +│ routeIncoming() → 👀 ack + debouncer → agent.write() │ +│ Outgoing: │ +│ activeRoute → aggregator → plugin.outbound.*() │ +│ │ +│ State: │ +│ pendingRoutes[] ─(FIFO)→ activeRoute + activeAcks │ +│ ackBuffer[] ─(snapshot on flush)→ pendingRoutes[].acks │ +└──────────┬──────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ InboundDebouncer (inbound-debouncer.ts) │ +│ 500ms idle window / 2000ms hard cap per conversationId │ +│ Each flush → snapshot route + acks → agent.write() │ └──────────┬──────────────────────────────────────────────────┘ │ ▼ @@ -36,7 +49,7 @@ The Channel system connects external messaging platforms (Telegram, Discord, etc │ │ │ config — resolve account credentials │ │ gateway — receive messages (polling / webhook) │ -│ outbound — send replies back to platform │ +│ outbound — send replies, typing, reactions (👀 ack) │ │ downloadMedia() — download media files to local disk │ └─────────────────────────────────────────────────────────────┘ ``` @@ -63,7 +76,7 @@ interface ChannelPlugin { |---------|------|-------------| | **config** | Resolve credentials from `credentials.json5` | `listAccountIds()`, `resolveAccount()`, `isConfigured()` | | **gateway** | Receive inbound messages from the platform | `start(accountId, config, onMessage, signal)` | -| **outbound** | Send replies back to the platform | `sendText()`, `replyText()`, `sendTyping?()` | +| **outbound** | Send replies back to the platform | `sendText()`, `replyText()`, `sendTyping?()`, `addReaction?()`, `removeReaction?()` | ### downloadMedia (optional) @@ -78,9 +91,21 @@ User sends message in Telegram → grammy long-polling → onMessage callback → ChannelManager.routeIncoming() 1. Update lastRoute (reply target) - 2. Start typing indicator - 3. If media: routeMedia() → download → transcribe/describe → text - 4. agent.write(text) + 2. Start typing indicator (repeats every 5s) + 3. Add 👀 reaction to this message (ack) + 4. Push ack route to ackBuffer + 5. If media: routeMedia() → download → transcribe/describe → text + 6. Push text into InboundDebouncer + +InboundDebouncer (per conversationId): + ┌─ 500ms idle window: wait for more messages + │ If another message arrives within 500ms, reset timer and append + │ If 2000ms since first message, force-flush immediately + └─ On flush: + 1. Snapshot lastRoute → route + 2. Snapshot ackBuffer → acks, clear buffer + 3. Push { route, acks } to pendingRoutes queue + 4. Call agent.write(combinedText) ``` All media is converted to text before the agent sees it. See [media-handling.md](./media-handling.md) for details. @@ -88,28 +113,114 @@ All media is converted to text before the agent sees it. See [media-handling.md] ### Outbound (Agent → Platform) ``` -Agent produces reply - → agent.subscribe() in ChannelManager - → Check: if (!lastRoute) return // not from a channel, skip - → message_start → create MessageAggregator - → message_update → feed text to aggregator - → message_end → aggregator flushes final block - → Aggregator emits BlockReply chunks - → Block 0: plugin.outbound.replyText() // Telegram reply format - → Block N: plugin.outbound.sendText() // follow-up messages +agent.write() queued → agent.run() starts + → agent_start event + 1. Shift entry from pendingRoutes queue + 2. Set activeRoute = entry.route (stable for entire run) + 3. Set activeAcks = entry.acks + + → message_start (assistant) + 1. Create MessageAggregator wired to activeRoute + → message_update (assistant) + 1. Feed text deltas to aggregator + → message_end (assistant) + 1. Aggregator flushes final block, then null out + (May repeat if agent does multi-turn tool calls) + + → Aggregator emits BlockReply chunks: + Block 0: plugin.outbound.replyText() // reply to original message + Block N: plugin.outbound.sendText() // follow-up messages + + → agent_end event + 1. Remove 👀 from all activeAcks messages + 2. Clear activeRoute and activeAcks + 3. If pendingRoutes is empty → stop typing + If more pending → keep typing for next run ``` The **MessageAggregator** buffers streaming LLM output and splits it into blocks at natural text boundaries (paragraphs, code blocks). This is necessary because messaging platforms cannot consume raw streaming deltas. -## lastRoute Pattern +## Route Queue Pattern -The `lastRoute` tracks which channel last sent a message: +The channel system uses a FIFO queue to correctly route replies when multiple messages arrive while the agent is busy. This solves the "reply-to mismatch" problem where rapid-fire messages would cause replies to target the wrong original message. -- **Channel message arrives** → `lastRoute` is set to that plugin + conversation -- **Desktop/Web message arrives** → `clearLastRoute()` is called -- **Agent replies** → if `lastRoute` is set, reply goes to that channel; otherwise skipped +### State Fields -This ensures replies go back to the originating channel. Desktop and Web always receive agent events independently via their own mechanisms (IPC / Gateway). +| Field | Type | Purpose | +|-------|------|---------| +| `lastRoute` | `LastRoute \| null` | Where the most recent channel message came from. Updated on every incoming message. | +| `pendingRoutes` | `{ route, acks }[]` | FIFO queue of snapshotted routes, one per debouncer flush. Dequeued on `agent_start`. | +| `activeRoute` | `LastRoute \| null` | Route for the currently running agent. Set on `agent_start`, cleared on `agent_end`. Stable across all turns within one run. | +| `ackBuffer` | `LastRoute[]` | Accumulates 👀 ack targets between debouncer flushes. Snapshotted and cleared on each flush. | +| `activeAcks` | `LastRoute[]` | All messages with 👀 in the current run. Cleaned up on `agent_end`. | + +### Lifecycle + +``` +Message A arrives → lastRoute = A, ackBuffer = [A], 👀 on A +Message B arrives (50ms) → lastRoute = B, ackBuffer = [A, B], 👀 on B + ─── 500ms idle ─── +Debouncer flushes → pendingRoutes.push({ route: B, acks: [A, B] }) + ackBuffer = [], agent.write("A\nB") + +Message C arrives → lastRoute = C, ackBuffer = [C], 👀 on C + ─── 500ms idle ─── +Debouncer flushes → pendingRoutes.push({ route: C, acks: [C] }) + ackBuffer = [], agent.write("C") + +agent_start (run 1) → activeRoute = B, activeAcks = [A, B] + (agent processes "A\nB", replies to message B) +agent_end (run 1) → remove 👀 from A and B, pendingRoutes still has 1 → keep typing + +agent_start (run 2) → activeRoute = C, activeAcks = [C] + (agent processes "C", replies to message C) +agent_end (run 2) → remove 👀 from C, pendingRoutes empty → stop typing +``` + +### Why agent_start / agent_end (not message_end) + +In multi-turn agent runs (e.g. when the agent uses tools), `message_end` fires once per assistant message — potentially multiple times per `agent.run()`. Using `message_end` for state management would: +- Clear `activeRoute` mid-run, causing the next turn's aggregator to pick up the wrong route +- Remove 👀 too early (before the agent is actually done) +- Stop typing between tool-call turns + +`agent_start` and `agent_end` fire exactly once per `agent.run()`, making them the correct lifecycle boundaries. + +### lastRoute vs activeRoute + +- **`lastRoute`** — global, updated on every incoming message. Used for: typing indicators, error reporting, creating aggregators when no activeRoute exists. +- **`activeRoute`** — per-run, set from queue on `agent_start`. Used for: reply targeting via aggregator. Guarantees that a run's reply goes to the correct message even if new messages arrive during processing. + +Desktop and Web always receive agent events independently via their own mechanisms (IPC / Gateway). `clearLastRoute()` is called when a desktop/web message arrives to prevent channel forwarding. + +## Inbound Debouncer + +The `InboundDebouncer` (`inbound-debouncer.ts`) batches rapid-fire messages from the same conversation into a single `agent.write()` call. This prevents the agent from processing incomplete thoughts when users send multiple short messages quickly. + +**Parameters:** +- `delayMs` (default 500ms) — idle window: how long to wait after each message before flushing +- `maxWaitMs` (default 2000ms) — hard cap: max time since first message before force-flushing + +**Behavior:** +- Messages within 500ms of each other are combined with newlines +- Messages >500ms apart get independent flushes and separate agent runs +- No busy-awareness: each flush is independent regardless of agent state +- Each flush triggers a route snapshot (lastRoute + ackBuffer) pushed to the pendingRoutes queue + +## Typing and Reaction Lifecycle + +### Typing Indicator +- **Start:** `routeIncoming()` — starts a 5s repeating interval (Telegram requires re-sending "typing" every 5s) +- **Stop:** `agent_end` — only if `pendingRoutes` is empty (all queued runs complete). If runs remain queued, typing persists. +- **Also stops on:** `clearLastRoute()` (desktop/web message), `stopAccount()`, `stopAll()`, `agent_error` + +### 👀 Ack Reaction +- **Add:** `routeIncoming()` — immediately on each message, before debouncing +- **Track:** pushed to `ackBuffer`, then snapshotted into `pendingRoutes[].acks` on debouncer flush, then moved to `activeAcks` on `agent_start` +- **Remove:** `agent_end` — iterates `activeAcks` and removes 👀 from each message +- **Also removed on:** `agent_error` + +This ensures every queued message shows 👀 while waiting, and all 👀 are cleaned up precisely when the agent finishes processing that batch. ## Configuration @@ -144,7 +255,7 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T - [ ] `config` adapter: parse credentials from `credentials.json5` - [ ] `gateway` adapter: connect to platform, normalize messages to `ChannelMessage` -- [ ] `outbound` adapter: `sendText`, `replyText`, optional `sendTyping` +- [ ] `outbound` adapter: `sendText`, `replyText`, optional `sendTyping`, `addReaction`, `removeReaction` - [ ] `downloadMedia` (if platform supports media): download to `MEDIA_CACHE_DIR` - [ ] Group filtering: only respond to messages directed at the bot - [ ] Graceful shutdown: respect the `AbortSignal` passed to `gateway.start()` @@ -154,7 +265,8 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T | File | Role | |------|------| | `src/channels/types.ts` | All type definitions (`ChannelPlugin`, `ChannelMessage`, `DeliveryContext`, etc.) | -| `src/channels/manager.ts` | `ChannelManager` — bridges plugins to the Hub's agent | +| `src/channels/manager.ts` | `ChannelManager` — bridges plugins to the Hub's agent, route queue, typing/ack lifecycle | +| `src/channels/inbound-debouncer.ts` | `InboundDebouncer` — batches rapid-fire messages per conversationId | | `src/channels/registry.ts` | Plugin registry (`registerChannel`, `listChannels`, `getChannel`) | | `src/channels/config.ts` | Load channel config from `credentials.json5` | | `src/channels/index.ts` | Bootstrap: register built-in plugins, re-export public API | @@ -165,6 +277,7 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T | `src/media/describe-video.ts` | Video description (ffmpeg frame + Vision API) | | `src/shared/paths.ts` | `MEDIA_CACHE_DIR` path constant | | `src/hub/message-aggregator.ts` | Streaming text → block chunking for channel delivery | +| `packages/ui/src/components/message-list.tsx` | UI rendering with `stripUserMetadata()` for clean display | ## Current Plugins diff --git a/packages/ui/src/components/message-list.tsx b/packages/ui/src/components/message-list.tsx index ee53783a..3a754bc3 100644 --- a/packages/ui/src/components/message-list.tsx +++ b/packages/ui/src/components/message-list.tsx @@ -22,7 +22,16 @@ function getThinkingText(blocks: ContentBlock[]): string { .join("") } -/** Strip LLM-facing metadata prefixes from user messages for clean display */ +/** + * Strip LLM-facing metadata prefixes from user messages for clean display. + * + * TODO: This is a short-term workaround. The root cause is that agent.write() + * bakes timestamp and media-type prefixes into the message content, and + * session JSONL stores the enriched string as-is. The proper fix is to + * separate "displayContent" from "llmContent" at the storage layer so the + * UI never sees LLM context prefixes. This regex approach is fragile — + * any change to timestamp format, locale, or new media types will break it. + */ function stripUserMetadata(text: string): string { // Strip timestamp envelope: [Mon 2026-02-09 14:38 GMT+8] let cleaned = text.replace(/^\[(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}[^\]]*\]\s*/, "") From 1bd25c5aec8ff7b373e2271917b5a1f18f2f8d32 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 18:10:23 +0800 Subject: [PATCH 4/5] fix(media): don't cache whisper binary detection failure Only cache successful whisper binary lookups. When whisper is not found, leave the cache empty so subsequent calls re-check PATH. This allows the app to detect a newly installed whisper without requiring a restart. Co-Authored-By: Claude Opus 4.6 --- src/media/transcribe.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/media/transcribe.ts b/src/media/transcribe.ts index 84214f9a..2d410be1 100644 --- a/src/media/transcribe.ts +++ b/src/media/transcribe.ts @@ -18,8 +18,8 @@ import { execFile, execFileSync } from "node:child_process"; import { tmpdir } from "node:os"; import { credentialManager } from "../agent/credentials.js"; -/** Cached path to local whisper binary, or false if not found */ -let cachedWhisperBin: string | false | undefined; +/** Cached path to local whisper binary (only caches success; misses re-check each time) */ +let cachedWhisperBin: string | undefined; /** Find local whisper binary in PATH */ function findWhisperBin(): string | false { @@ -35,7 +35,7 @@ function findWhisperBin(): string | false { } } - cachedWhisperBin = false; + // Don't cache failure — whisper may be installed while the process is running return false; } From 7b885673da44ffd952af096bf64606e7517b5e57 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 18:13:47 +0800 Subject: [PATCH 5/5] docs(skills): update whisper skill with correct transcription priority Clarify that local whisper is the primary provider (free, offline), OpenAI API is the fallback, and the skill only activates when both are unavailable. Add setup instructions noting no restart is required. Co-Authored-By: Claude Opus 4.6 --- skills/whisper/SKILL.md | 29 ++++++++++++++++++++++++----- src/channels/manager.ts | 2 -- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/skills/whisper/SKILL.md b/skills/whisper/SKILL.md index 1748a79d..fa9a128d 100644 --- a/skills/whisper/SKILL.md +++ b/skills/whisper/SKILL.md @@ -1,7 +1,7 @@ --- name: Audio Transcription -description: Transcribe audio files using local Whisper CLI (fallback when API is unavailable) -version: 1.0.0 +description: Transcribe audio files using local Whisper CLI when automatic pre-processing is unavailable +version: 1.1.0 metadata: emoji: "🎙️" requires: @@ -23,14 +23,33 @@ userInvocable: false disableModelInvocation: false --- -## Audio Transcription (Local Fallback) +## Audio Transcription (Agent Fallback) -Voice messages from channels are normally transcribed automatically via the OpenAI Whisper API before reaching you. This skill is only needed when the API is unavailable. +Voice messages from channels are pre-processed before reaching you. The transcription +priority is: -If you receive `[audio message received]` with a `File:` path (instead of `[Voice Message]` with a transcript), it means the API transcription was not available. Use local whisper to transcribe: +1. **Local whisper CLI** (free, offline) — requires `whisper` or `whisper-cli` in PATH +2. **OpenAI Whisper API** — requires an OpenAI API key in credentials +3. **No provider available** — you receive a raw file path instead of a transcript + +When both providers are unavailable, you will receive `[audio message received]` with a +`File:` path instead of `[Voice Message]` with a transcript. Use local whisper to +transcribe manually: ``` whisper "" --model base --output_format txt --output_dir /tmp ``` Then read the `.txt` file from `/tmp/` and respond based on the transcribed content. + +### Setup + +To enable automatic local transcription (recommended): + +```bash +brew install openai-whisper +``` + +The first run will download the `base` model (~139MB) to `~/.cache/whisper/`. +No app restart is required — the binary is detected automatically on the next +voice message. diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 5cbe7425..a7d5f248 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -265,8 +265,6 @@ export class ChannelManager { // Keep heartbeat acknowledgements internal (same behavior as desktop/gateway stream path). if (isHeartbeatAckEvent(event)) { if (event.type === "message_end") { - this.stopTyping(); - this.removeAckReaction(); this.aggregator = null; } return;