diff --git a/docs/channels/README.md b/docs/channels/README.md index 426ebdb3..4a947dbb 100644 --- a/docs/channels/README.md +++ b/docs/channels/README.md @@ -18,10 +18,23 @@ The Channel system connects external messaging platforms (Telegram, Discord, etc │ Channel Manager (manager.ts) │ │ │ │ startAll() → iterate plugins → startAccount() per account │ -│ subscribeToAgent() → listen for AI replies │ +│ ensureSubscribed() → listen for agent lifecycle events │ │ │ -│ Incoming: routeIncoming() → routeMedia() → agent.write() │ -│ Outgoing: lastRoute → aggregator → plugin.outbound.*() │ +│ Incoming: │ +│ routeIncoming() → 👀 ack + debouncer → agent.write() │ +│ Outgoing: │ +│ activeRoute → aggregator → plugin.outbound.*() │ +│ │ +│ State: │ +│ pendingRoutes[] ─(FIFO)→ activeRoute + activeAcks │ +│ ackBuffer[] ─(snapshot on flush)→ pendingRoutes[].acks │ +└──────────┬──────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ InboundDebouncer (inbound-debouncer.ts) │ +│ 500ms idle window / 2000ms hard cap per conversationId │ +│ Each flush → snapshot route + acks → agent.write() │ └──────────┬──────────────────────────────────────────────────┘ │ ▼ @@ -36,7 +49,7 @@ The Channel system connects external messaging platforms (Telegram, Discord, etc │ │ │ config — resolve account credentials │ │ gateway — receive messages (polling / webhook) │ -│ outbound — send replies back to platform │ +│ outbound — send replies, typing, reactions (👀 ack) │ │ downloadMedia() — download media files to local disk │ └─────────────────────────────────────────────────────────────┘ ``` @@ -63,7 +76,7 @@ interface ChannelPlugin { |---------|------|-------------| | **config** | Resolve credentials from `credentials.json5` | `listAccountIds()`, `resolveAccount()`, `isConfigured()` | | **gateway** | Receive inbound messages from the platform | `start(accountId, config, onMessage, signal)` | -| **outbound** | Send replies back to the platform | `sendText()`, `replyText()`, `sendTyping?()` | +| **outbound** | Send replies back to the platform | `sendText()`, `replyText()`, `sendTyping?()`, `addReaction?()`, `removeReaction?()` | ### downloadMedia (optional) @@ -78,9 +91,21 @@ User sends message in Telegram → grammy long-polling → onMessage callback → ChannelManager.routeIncoming() 1. Update lastRoute (reply target) - 2. Start typing indicator - 3. If media: routeMedia() → download → transcribe/describe → text - 4. agent.write(text) + 2. Start typing indicator (repeats every 5s) + 3. Add 👀 reaction to this message (ack) + 4. Push ack route to ackBuffer + 5. If media: routeMedia() → download → transcribe/describe → text + 6. Push text into InboundDebouncer + +InboundDebouncer (per conversationId): + ┌─ 500ms idle window: wait for more messages + │ If another message arrives within 500ms, reset timer and append + │ If 2000ms since first message, force-flush immediately + └─ On flush: + 1. Snapshot lastRoute → route + 2. Snapshot ackBuffer → acks, clear buffer + 3. Push { route, acks } to pendingRoutes queue + 4. Call agent.write(combinedText) ``` All media is converted to text before the agent sees it. See [media-handling.md](./media-handling.md) for details. @@ -88,28 +113,114 @@ All media is converted to text before the agent sees it. See [media-handling.md] ### Outbound (Agent → Platform) ``` -Agent produces reply - → agent.subscribe() in ChannelManager - → Check: if (!lastRoute) return // not from a channel, skip - → message_start → create MessageAggregator - → message_update → feed text to aggregator - → message_end → aggregator flushes final block - → Aggregator emits BlockReply chunks - → Block 0: plugin.outbound.replyText() // Telegram reply format - → Block N: plugin.outbound.sendText() // follow-up messages +agent.write() queued → agent.run() starts + → agent_start event + 1. Shift entry from pendingRoutes queue + 2. Set activeRoute = entry.route (stable for entire run) + 3. Set activeAcks = entry.acks + + → message_start (assistant) + 1. Create MessageAggregator wired to activeRoute + → message_update (assistant) + 1. Feed text deltas to aggregator + → message_end (assistant) + 1. Aggregator flushes final block, then null out + (May repeat if agent does multi-turn tool calls) + + → Aggregator emits BlockReply chunks: + Block 0: plugin.outbound.replyText() // reply to original message + Block N: plugin.outbound.sendText() // follow-up messages + + → agent_end event + 1. Remove 👀 from all activeAcks messages + 2. Clear activeRoute and activeAcks + 3. If pendingRoutes is empty → stop typing + If more pending → keep typing for next run ``` The **MessageAggregator** buffers streaming LLM output and splits it into blocks at natural text boundaries (paragraphs, code blocks). This is necessary because messaging platforms cannot consume raw streaming deltas. -## lastRoute Pattern +## Route Queue Pattern -The `lastRoute` tracks which channel last sent a message: +The channel system uses a FIFO queue to correctly route replies when multiple messages arrive while the agent is busy. This solves the "reply-to mismatch" problem where rapid-fire messages would cause replies to target the wrong original message. -- **Channel message arrives** → `lastRoute` is set to that plugin + conversation -- **Desktop/Web message arrives** → `clearLastRoute()` is called -- **Agent replies** → if `lastRoute` is set, reply goes to that channel; otherwise skipped +### State Fields -This ensures replies go back to the originating channel. Desktop and Web always receive agent events independently via their own mechanisms (IPC / Gateway). +| Field | Type | Purpose | +|-------|------|---------| +| `lastRoute` | `LastRoute \| null` | Where the most recent channel message came from. Updated on every incoming message. | +| `pendingRoutes` | `{ route, acks }[]` | FIFO queue of snapshotted routes, one per debouncer flush. Dequeued on `agent_start`. | +| `activeRoute` | `LastRoute \| null` | Route for the currently running agent. Set on `agent_start`, cleared on `agent_end`. Stable across all turns within one run. | +| `ackBuffer` | `LastRoute[]` | Accumulates 👀 ack targets between debouncer flushes. Snapshotted and cleared on each flush. | +| `activeAcks` | `LastRoute[]` | All messages with 👀 in the current run. Cleaned up on `agent_end`. | + +### Lifecycle + +``` +Message A arrives → lastRoute = A, ackBuffer = [A], 👀 on A +Message B arrives (50ms) → lastRoute = B, ackBuffer = [A, B], 👀 on B + ─── 500ms idle ─── +Debouncer flushes → pendingRoutes.push({ route: B, acks: [A, B] }) + ackBuffer = [], agent.write("A\nB") + +Message C arrives → lastRoute = C, ackBuffer = [C], 👀 on C + ─── 500ms idle ─── +Debouncer flushes → pendingRoutes.push({ route: C, acks: [C] }) + ackBuffer = [], agent.write("C") + +agent_start (run 1) → activeRoute = B, activeAcks = [A, B] + (agent processes "A\nB", replies to message B) +agent_end (run 1) → remove 👀 from A and B, pendingRoutes still has 1 → keep typing + +agent_start (run 2) → activeRoute = C, activeAcks = [C] + (agent processes "C", replies to message C) +agent_end (run 2) → remove 👀 from C, pendingRoutes empty → stop typing +``` + +### Why agent_start / agent_end (not message_end) + +In multi-turn agent runs (e.g. when the agent uses tools), `message_end` fires once per assistant message — potentially multiple times per `agent.run()`. Using `message_end` for state management would: +- Clear `activeRoute` mid-run, causing the next turn's aggregator to pick up the wrong route +- Remove 👀 too early (before the agent is actually done) +- Stop typing between tool-call turns + +`agent_start` and `agent_end` fire exactly once per `agent.run()`, making them the correct lifecycle boundaries. + +### lastRoute vs activeRoute + +- **`lastRoute`** — global, updated on every incoming message. Used for: typing indicators, error reporting, creating aggregators when no activeRoute exists. +- **`activeRoute`** — per-run, set from queue on `agent_start`. Used for: reply targeting via aggregator. Guarantees that a run's reply goes to the correct message even if new messages arrive during processing. + +Desktop and Web always receive agent events independently via their own mechanisms (IPC / Gateway). `clearLastRoute()` is called when a desktop/web message arrives to prevent channel forwarding. + +## Inbound Debouncer + +The `InboundDebouncer` (`inbound-debouncer.ts`) batches rapid-fire messages from the same conversation into a single `agent.write()` call. This prevents the agent from processing incomplete thoughts when users send multiple short messages quickly. + +**Parameters:** +- `delayMs` (default 500ms) — idle window: how long to wait after each message before flushing +- `maxWaitMs` (default 2000ms) — hard cap: max time since first message before force-flushing + +**Behavior:** +- Messages within 500ms of each other are combined with newlines +- Messages >500ms apart get independent flushes and separate agent runs +- No busy-awareness: each flush is independent regardless of agent state +- Each flush triggers a route snapshot (lastRoute + ackBuffer) pushed to the pendingRoutes queue + +## Typing and Reaction Lifecycle + +### Typing Indicator +- **Start:** `routeIncoming()` — starts a 5s repeating interval (Telegram requires re-sending "typing" every 5s) +- **Stop:** `agent_end` — only if `pendingRoutes` is empty (all queued runs complete). If runs remain queued, typing persists. +- **Also stops on:** `clearLastRoute()` (desktop/web message), `stopAccount()`, `stopAll()`, `agent_error` + +### 👀 Ack Reaction +- **Add:** `routeIncoming()` — immediately on each message, before debouncing +- **Track:** pushed to `ackBuffer`, then snapshotted into `pendingRoutes[].acks` on debouncer flush, then moved to `activeAcks` on `agent_start` +- **Remove:** `agent_end` — iterates `activeAcks` and removes 👀 from each message +- **Also removed on:** `agent_error` + +This ensures every queued message shows 👀 while waiting, and all 👀 are cleaned up precisely when the agent finishes processing that batch. ## Configuration @@ -144,7 +255,7 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T - [ ] `config` adapter: parse credentials from `credentials.json5` - [ ] `gateway` adapter: connect to platform, normalize messages to `ChannelMessage` -- [ ] `outbound` adapter: `sendText`, `replyText`, optional `sendTyping` +- [ ] `outbound` adapter: `sendText`, `replyText`, optional `sendTyping`, `addReaction`, `removeReaction` - [ ] `downloadMedia` (if platform supports media): download to `MEDIA_CACHE_DIR` - [ ] Group filtering: only respond to messages directed at the bot - [ ] Graceful shutdown: respect the `AbortSignal` passed to `gateway.start()` @@ -154,7 +265,8 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T | File | Role | |------|------| | `src/channels/types.ts` | All type definitions (`ChannelPlugin`, `ChannelMessage`, `DeliveryContext`, etc.) | -| `src/channels/manager.ts` | `ChannelManager` — bridges plugins to the Hub's agent | +| `src/channels/manager.ts` | `ChannelManager` — bridges plugins to the Hub's agent, route queue, typing/ack lifecycle | +| `src/channels/inbound-debouncer.ts` | `InboundDebouncer` — batches rapid-fire messages per conversationId | | `src/channels/registry.ts` | Plugin registry (`registerChannel`, `listChannels`, `getChannel`) | | `src/channels/config.ts` | Load channel config from `credentials.json5` | | `src/channels/index.ts` | Bootstrap: register built-in plugins, re-export public API | @@ -165,6 +277,7 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T | `src/media/describe-video.ts` | Video description (ffmpeg frame + Vision API) | | `src/shared/paths.ts` | `MEDIA_CACHE_DIR` path constant | | `src/hub/message-aggregator.ts` | Streaming text → block chunking for channel delivery | +| `packages/ui/src/components/message-list.tsx` | UI rendering with `stripUserMetadata()` for clean display | ## Current Plugins diff --git a/packages/ui/src/components/message-list.tsx b/packages/ui/src/components/message-list.tsx index f02d963b..3a754bc3 100644 --- a/packages/ui/src/components/message-list.tsx +++ b/packages/ui/src/components/message-list.tsx @@ -22,6 +22,27 @@ function getThinkingText(blocks: ContentBlock[]): string { .join("") } +/** + * Strip LLM-facing metadata prefixes from user messages for clean display. + * + * TODO: This is a short-term workaround. The root cause is that agent.write() + * bakes timestamp and media-type prefixes into the message content, and + * session JSONL stores the enriched string as-is. The proper fix is to + * separate "displayContent" from "llmContent" at the storage layer so the + * UI never sees LLM context prefixes. This regex approach is fragile — + * any change to timestamp format, locale, or new media types will break it. + */ +function stripUserMetadata(text: string): string { + // Strip timestamp envelope: [Mon 2026-02-09 14:38 GMT+8] + let cleaned = text.replace(/^\[(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}[^\]]*\]\s*/, "") + // Strip media type labels injected by channel media processing + cleaned = cleaned + .replace(/^\[Voice Message\]\n?Transcript:\s*/i, "") + .replace(/^\[Image\]\n?Description:\s*/i, "") + .replace(/^\[Video\]\n?Description:\s*/i, "") + return cleaned +} + /** Build a synthetic "running" toolResult Message from a ToolCall block */ function toRunningMessage(tc: ToolCall, agentId: string): Message { return { @@ -62,7 +83,8 @@ export const MessageList = memo(function MessageList({ messages, streamingIds }: return } - const text = getTextContent(msg.content) + const rawText = getTextContent(msg.content) + const text = msg.role === "user" ? stripUserMetadata(rawText) : rawText const toolCalls = msg.role === "assistant" ? getToolCalls(msg.content) : [] const thinking = msg.role === "assistant" ? getThinkingText(msg.content) : "" const hasThinkingBlocks = msg.role === "assistant" && msg.content.some((b) => b.type === "thinking") diff --git a/skills/whisper/SKILL.md b/skills/whisper/SKILL.md index 1748a79d..fa9a128d 100644 --- a/skills/whisper/SKILL.md +++ b/skills/whisper/SKILL.md @@ -1,7 +1,7 @@ --- name: Audio Transcription -description: Transcribe audio files using local Whisper CLI (fallback when API is unavailable) -version: 1.0.0 +description: Transcribe audio files using local Whisper CLI when automatic pre-processing is unavailable +version: 1.1.0 metadata: emoji: "🎙️" requires: @@ -23,14 +23,33 @@ userInvocable: false disableModelInvocation: false --- -## Audio Transcription (Local Fallback) +## Audio Transcription (Agent Fallback) -Voice messages from channels are normally transcribed automatically via the OpenAI Whisper API before reaching you. This skill is only needed when the API is unavailable. +Voice messages from channels are pre-processed before reaching you. The transcription +priority is: -If you receive `[audio message received]` with a `File:` path (instead of `[Voice Message]` with a transcript), it means the API transcription was not available. Use local whisper to transcribe: +1. **Local whisper CLI** (free, offline) — requires `whisper` or `whisper-cli` in PATH +2. **OpenAI Whisper API** — requires an OpenAI API key in credentials +3. **No provider available** — you receive a raw file path instead of a transcript + +When both providers are unavailable, you will receive `[audio message received]` with a +`File:` path instead of `[Voice Message]` with a transcript. Use local whisper to +transcribe manually: ``` whisper "" --model base --output_format txt --output_dir /tmp ``` Then read the `.txt` file from `/tmp/` and respond based on the transcribed content. + +### Setup + +To enable automatic local transcription (recommended): + +```bash +brew install openai-whisper +``` + +The first run will download the `base` model (~139MB) to `~/.cache/whisper/`. +No app restart is required — the binary is detected automatically on the next +voice message. diff --git a/src/channels/inbound-debouncer.ts b/src/channels/inbound-debouncer.ts index 0ff9213f..3a8ce075 100644 --- a/src/channels/inbound-debouncer.ts +++ b/src/channels/inbound-debouncer.ts @@ -10,12 +10,9 @@ * fire immediately regardless of timer * 4. When timer fires, call the flush callback with all accumulated text * - * This prevents rapid-fire messages from triggering multiple separate Agent - * runs. Instead, messages sent within a short window are concatenated with - * newlines and dispatched as one combined prompt. - * - * Inspired by OpenClaw's createInboundDebouncer pattern. - * @see docs/channel/openclaw-research.md — Section 7.3 message preprocessing + * Each flush produces an independent agent.write() call with its own reply + * target. Messages arriving while the agent is busy are NOT accumulated — + * they get their own debounce window and independent processing. */ interface PendingBatch { @@ -24,7 +21,7 @@ interface PendingBatch { /** Timestamp of the first message in this batch */ firstArrival: number; /** Idle timer — fires when no new message arrives within delayMs */ - timer: ReturnType; + timer: ReturnType | null; } export class InboundDebouncer { @@ -44,11 +41,12 @@ export class InboundDebouncer { /** Add a message to the buffer. May trigger an immediate flush if maxWaitMs exceeded. */ push(conversationId: string, text: string): void { const existing = this.pending.get(conversationId); + const preview = text.slice(0, 40) + (text.length > 40 ? "..." : ""); if (existing) { - // Append to existing batch, reset idle timer + // Append to existing batch, clear any running timer existing.texts.push(text); - clearTimeout(existing.timer); + if (existing.timer !== null) clearTimeout(existing.timer); // Check hard cap: if we've been buffering too long, flush now const elapsed = Date.now() - existing.firstArrival; @@ -58,9 +56,11 @@ export class InboundDebouncer { } // Reset idle timer + console.log(`[Debouncer] push (appending #${existing.texts.length}): "${preview}"`); existing.timer = setTimeout(() => this.flush(conversationId), this.delayMs); } else { - // Start a new batch + // Start a new batch with idle timer + console.log(`[Debouncer] push (new batch, timer=${this.delayMs}ms): "${preview}"`); const timer = setTimeout(() => this.flush(conversationId), this.delayMs); this.pending.set(conversationId, { texts: [text], @@ -75,18 +75,19 @@ export class InboundDebouncer { const batch = this.pending.get(conversationId); if (!batch) return; - clearTimeout(batch.timer); + if (batch.timer !== null) clearTimeout(batch.timer); this.pending.delete(conversationId); // Join multiple messages with newlines so the Agent sees them as one prompt const combined = batch.texts.join("\n"); + console.log(`[Debouncer] flush: ${batch.texts.length} message(s), ${combined.length} chars`); this.flushFn(conversationId, combined); } /** Clean up all pending timers (call on shutdown) */ dispose(): void { for (const batch of this.pending.values()) { - clearTimeout(batch.timer); + if (batch.timer !== null) clearTimeout(batch.timer); } this.pending.clear(); } diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 7b0cea7c..a7d5f248 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -46,8 +46,19 @@ export class ChannelManager { private readonly hub: Hub; /** Running accounts keyed by "channelId:accountId" */ private readonly accounts = new Map(); - /** Where the last channel message came from (reply target) */ + /** Where the last channel message came from (used for typing/reactions/errors) */ private lastRoute: LastRoute | null = null; + /** + * FIFO queue of route snapshots + their ack targets, captured at each debouncer flush. + * Each agent.write() gets its own entry; dequeued on agent_start. + */ + private pendingRoutes: { route: LastRoute; acks: LastRoute[] }[] = []; + /** Route for the currently active agent run (set on agent_start, cleared on agent_end). */ + private activeRoute: LastRoute | null = null; + /** All messages in the current run's batch that have 👀 (cleared on agent_end). */ + private activeAcks: LastRoute[] = []; + /** Accumulates message routes for 👀 removal between debouncer flushes. */ + private ackBuffer: LastRoute[] = []; /** Unsubscribe function for the agent subscriber */ private agentUnsubscribe: (() => void) | null = null; /** Session ID of the currently subscribed agent (for stale detection) */ @@ -188,13 +199,50 @@ export class ChannelManager { this.subscribedAgentId = agent.sessionId; this.agentUnsubscribe = agent.subscribe((event) => { + const maybeMessage = (event as { message?: { role?: string } }).message; + const role = maybeMessage?.role; + + // Activate the next pending route + acks when a new agent run starts. + if (event.type === "agent_start") { + const entry = this.pendingRoutes.shift(); + if (entry) { + this.activeRoute = entry.route; + this.activeAcks = entry.acks; + console.log(`[Channels] agent_start: activeRoute replyTo=${entry.route.deliveryCtx.replyToMessageId}, acks=${entry.acks.length}`); + } + } + + // Agent run complete — remove 👀 from all batch messages, conditionally stop typing. + if (event.type === "agent_end") { + for (const ack of this.activeAcks) { + if (ack.plugin.outbound.removeReaction) { + console.log(`[Channels] agent_end: removing 👀 from replyTo=${ack.deliveryCtx.replyToMessageId}`); + void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); + } + } + this.activeRoute = null; + this.activeAcks = []; + if (this.pendingRoutes.length === 0) { + console.log("[Channels] agent_end: no more pending, stopping typing"); + this.stopTyping(); + } else { + console.log(`[Channels] agent_end: ${this.pendingRoutes.length} pending run(s), keeping typing`); + } + } + // No active channel route — skip (reply goes to desktop/gateway only) if (!this.lastRoute) return; // Handle agent errors — notify the channel user if (event.type === "agent_error") { this.stopTyping(); - this.removeAckReaction(); + for (const ack of this.activeAcks) { + if (ack.plugin.outbound.removeReaction) { + void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {}); + } + } + this.activeRoute = null; + this.activeAcks = []; const errorMsg = (event as { message?: string }).message ?? "Unknown error"; console.error(`[Channels] Agent error: ${errorMsg}`); const route = this.lastRoute; @@ -206,9 +254,6 @@ export class ChannelManager { return; } - const maybeMessage = (event as { message?: { role?: string } }).message; - const role = maybeMessage?.role; - // Only forward assistant message events if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") { if (role !== "assistant") return; @@ -220,8 +265,6 @@ export class ChannelManager { // Keep heartbeat acknowledgements internal (same behavior as desktop/gateway stream path). if (isHeartbeatAckEvent(event)) { if (event.type === "message_end") { - this.stopTyping(); - this.removeAckReaction(); this.aggregator = null; } return; @@ -236,28 +279,31 @@ export class ChannelManager { this.aggregator.handleEvent(event); } - // Clean up after response complete + // Finalize aggregator per assistant message (may fire multiple times in multi-turn runs). + // Typing and ack removal are handled at agent_end, not here. if (event.type === "message_end" && role === "assistant") { - this.stopTyping(); - this.removeAckReaction(); this.aggregator = null; } }); } - /** Create a fresh aggregator wired to the current lastRoute */ + /** + * Create a fresh aggregator wired to the activeRoute (snapshotted at flush time). + * Falls back to lastRoute for non-debounced paths (e.g. direct writes). + */ private createAggregator(): void { - const route = this.lastRoute; + const route = this.activeRoute ?? this.lastRoute; if (!route) return; const { plugin, deliveryCtx } = route; + console.log(`[Channels] createAggregator: replyTo=${deliveryCtx.replyToMessageId} (source=${this.activeRoute ? "activeRoute" : "lastRoute"})`); const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG; this.aggregator = new MessageAggregator( chunkerConfig, async (block) => { try { - console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId}`); + console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId} replyTo=${deliveryCtx.replyToMessageId}`); if (block.index === 0) { await plugin.outbound.replyText(deliveryCtx, block.text); } else { @@ -303,12 +349,17 @@ export class ChannelManager { replyToMessageId: messageId, }, }; - console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`); + console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId} replyTo=${messageId}`); console.log(`[Channels] Forwarding to agent ${agent.sessionId}`); - // Show typing indicator and ACK reaction while agent processes + // Show typing indicator and 👀 ack on this message this.startTyping(); - this.addAckReaction(); + const ackRoute: LastRoute = { ...this.lastRoute }; + if (ackRoute.plugin.outbound.addReaction) { + console.log(`[Channels] Adding 👀 to replyTo=${messageId}`); + void ackRoute.plugin.outbound.addReaction(ackRoute.deliveryCtx, "👀").catch(() => {}); + } + this.ackBuffer.push(ackRoute); // Handle media messages (processed async, then fed through debouncer) if (message.media && plugin.downloadMedia) { @@ -406,7 +457,15 @@ export class ChannelManager { if (!this.debouncer) { this.debouncer = new InboundDebouncer( (_conversationId, combinedText) => { - console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent`); + // Snapshot the current route + pending acks for this batch. + const route = this.lastRoute ? { ...this.lastRoute } : null; + const acks = [...this.ackBuffer]; + this.ackBuffer = []; + if (route) { + this.pendingRoutes.push({ route, acks }); + } + const replyTo = route?.deliveryCtx.replyToMessageId ?? "?"; + console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`); agent.write(combinedText); }, ); @@ -414,20 +473,6 @@ export class ChannelManager { return this.debouncer; } - /** Add 👀 reaction to acknowledge message receipt */ - private addAckReaction(): void { - const route = this.lastRoute; - if (!route?.plugin.outbound.addReaction) return; - void route.plugin.outbound.addReaction(route.deliveryCtx, "👀").catch(() => {}); - } - - /** Remove ACK reaction when processing completes */ - private removeAckReaction(): void { - const route = this.lastRoute; - if (!route?.plugin.outbound.removeReaction) return; - void route.plugin.outbound.removeReaction(route.deliveryCtx).catch(() => {}); - } - /** Start sending typing indicators (repeats every 5s until stopped) */ private startTyping(): void { this.stopTyping(); @@ -462,6 +507,10 @@ export class ChannelManager { if (this.lastRoute && this.lastRoute.plugin.id === channelId && this.lastRoute.deliveryCtx.accountId === accountId) { this.stopTyping(); this.lastRoute = null; + this.activeRoute = null; + this.activeAcks = []; + this.ackBuffer = []; + this.pendingRoutes = []; this.aggregator = null; } @@ -495,6 +544,10 @@ export class ChannelManager { } this.accounts.clear(); this.lastRoute = null; + this.activeRoute = null; + this.activeAcks = []; + this.ackBuffer = []; + this.pendingRoutes = []; this.aggregator = null; } diff --git a/src/media/transcribe.ts b/src/media/transcribe.ts index 84214f9a..2d410be1 100644 --- a/src/media/transcribe.ts +++ b/src/media/transcribe.ts @@ -18,8 +18,8 @@ import { execFile, execFileSync } from "node:child_process"; import { tmpdir } from "node:os"; import { credentialManager } from "../agent/credentials.js"; -/** Cached path to local whisper binary, or false if not found */ -let cachedWhisperBin: string | false | undefined; +/** Cached path to local whisper binary (only caches success; misses re-check each time) */ +let cachedWhisperBin: string | undefined; /** Find local whisper binary in PATH */ function findWhisperBin(): string | false { @@ -35,7 +35,7 @@ function findWhisperBin(): string | false { } } - cachedWhisperBin = false; + // Don't cache failure — whisper may be installed while the process is running return false; }