Merge pull request #112 from multica-ai/NevilleQingNY/telegram-msg-issue
fix(channels): correct reply routing, ack lifecycle, and whisper detection
This commit is contained in:
commit
f7ddcec78d
6 changed files with 285 additions and 77 deletions
|
|
@ -18,10 +18,23 @@ The Channel system connects external messaging platforms (Telegram, Discord, etc
|
|||
│ Channel Manager (manager.ts) │
|
||||
│ │
|
||||
│ startAll() → iterate plugins → startAccount() per account │
|
||||
│ subscribeToAgent() → listen for AI replies │
|
||||
│ ensureSubscribed() → listen for agent lifecycle events │
|
||||
│ │
|
||||
│ Incoming: routeIncoming() → routeMedia() → agent.write() │
|
||||
│ Outgoing: lastRoute → aggregator → plugin.outbound.*() │
|
||||
│ Incoming: │
|
||||
│ routeIncoming() → 👀 ack + debouncer → agent.write() │
|
||||
│ Outgoing: │
|
||||
│ activeRoute → aggregator → plugin.outbound.*() │
|
||||
│ │
|
||||
│ State: │
|
||||
│ pendingRoutes[] ─(FIFO)→ activeRoute + activeAcks │
|
||||
│ ackBuffer[] ─(snapshot on flush)→ pendingRoutes[].acks │
|
||||
└──────────┬──────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ InboundDebouncer (inbound-debouncer.ts) │
|
||||
│ 500ms idle window / 2000ms hard cap per conversationId │
|
||||
│ Each flush → snapshot route + acks → agent.write() │
|
||||
└──────────┬──────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
|
|
@ -36,7 +49,7 @@ The Channel system connects external messaging platforms (Telegram, Discord, etc
|
|||
│ │
|
||||
│ config — resolve account credentials │
|
||||
│ gateway — receive messages (polling / webhook) │
|
||||
│ outbound — send replies back to platform │
|
||||
│ outbound — send replies, typing, reactions (👀 ack) │
|
||||
│ downloadMedia() — download media files to local disk │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
|
@ -63,7 +76,7 @@ interface ChannelPlugin {
|
|||
|---------|------|-------------|
|
||||
| **config** | Resolve credentials from `credentials.json5` | `listAccountIds()`, `resolveAccount()`, `isConfigured()` |
|
||||
| **gateway** | Receive inbound messages from the platform | `start(accountId, config, onMessage, signal)` |
|
||||
| **outbound** | Send replies back to the platform | `sendText()`, `replyText()`, `sendTyping?()` |
|
||||
| **outbound** | Send replies back to the platform | `sendText()`, `replyText()`, `sendTyping?()`, `addReaction?()`, `removeReaction?()` |
|
||||
|
||||
### downloadMedia (optional)
|
||||
|
||||
|
|
@ -78,9 +91,21 @@ User sends message in Telegram
|
|||
→ grammy long-polling → onMessage callback
|
||||
→ ChannelManager.routeIncoming()
|
||||
1. Update lastRoute (reply target)
|
||||
2. Start typing indicator
|
||||
3. If media: routeMedia() → download → transcribe/describe → text
|
||||
4. agent.write(text)
|
||||
2. Start typing indicator (repeats every 5s)
|
||||
3. Add 👀 reaction to this message (ack)
|
||||
4. Push ack route to ackBuffer
|
||||
5. If media: routeMedia() → download → transcribe/describe → text
|
||||
6. Push text into InboundDebouncer
|
||||
|
||||
InboundDebouncer (per conversationId):
|
||||
┌─ 500ms idle window: wait for more messages
|
||||
│ If another message arrives within 500ms, reset timer and append
|
||||
│ If 2000ms since first message, force-flush immediately
|
||||
└─ On flush:
|
||||
1. Snapshot lastRoute → route
|
||||
2. Snapshot ackBuffer → acks, clear buffer
|
||||
3. Push { route, acks } to pendingRoutes queue
|
||||
4. Call agent.write(combinedText)
|
||||
```
|
||||
|
||||
All media is converted to text before the agent sees it. See [media-handling.md](./media-handling.md) for details.
|
||||
|
|
@ -88,28 +113,114 @@ All media is converted to text before the agent sees it. See [media-handling.md]
|
|||
### Outbound (Agent → Platform)
|
||||
|
||||
```
|
||||
Agent produces reply
|
||||
→ agent.subscribe() in ChannelManager
|
||||
→ Check: if (!lastRoute) return // not from a channel, skip
|
||||
→ message_start → create MessageAggregator
|
||||
→ message_update → feed text to aggregator
|
||||
→ message_end → aggregator flushes final block
|
||||
→ Aggregator emits BlockReply chunks
|
||||
→ Block 0: plugin.outbound.replyText() // Telegram reply format
|
||||
→ Block N: plugin.outbound.sendText() // follow-up messages
|
||||
agent.write() queued → agent.run() starts
|
||||
→ agent_start event
|
||||
1. Shift entry from pendingRoutes queue
|
||||
2. Set activeRoute = entry.route (stable for entire run)
|
||||
3. Set activeAcks = entry.acks
|
||||
|
||||
→ message_start (assistant)
|
||||
1. Create MessageAggregator wired to activeRoute
|
||||
→ message_update (assistant)
|
||||
1. Feed text deltas to aggregator
|
||||
→ message_end (assistant)
|
||||
1. Aggregator flushes final block, then null out
|
||||
(May repeat if agent does multi-turn tool calls)
|
||||
|
||||
→ Aggregator emits BlockReply chunks:
|
||||
Block 0: plugin.outbound.replyText() // reply to original message
|
||||
Block N: plugin.outbound.sendText() // follow-up messages
|
||||
|
||||
→ agent_end event
|
||||
1. Remove 👀 from all activeAcks messages
|
||||
2. Clear activeRoute and activeAcks
|
||||
3. If pendingRoutes is empty → stop typing
|
||||
If more pending → keep typing for next run
|
||||
```
|
||||
|
||||
The **MessageAggregator** buffers streaming LLM output and splits it into blocks at natural text boundaries (paragraphs, code blocks). This is necessary because messaging platforms cannot consume raw streaming deltas.
|
||||
|
||||
## lastRoute Pattern
|
||||
## Route Queue Pattern
|
||||
|
||||
The `lastRoute` tracks which channel last sent a message:
|
||||
The channel system uses a FIFO queue to correctly route replies when multiple messages arrive while the agent is busy. This solves the "reply-to mismatch" problem where rapid-fire messages would cause replies to target the wrong original message.
|
||||
|
||||
- **Channel message arrives** → `lastRoute` is set to that plugin + conversation
|
||||
- **Desktop/Web message arrives** → `clearLastRoute()` is called
|
||||
- **Agent replies** → if `lastRoute` is set, reply goes to that channel; otherwise skipped
|
||||
### State Fields
|
||||
|
||||
This ensures replies go back to the originating channel. Desktop and Web always receive agent events independently via their own mechanisms (IPC / Gateway).
|
||||
| Field | Type | Purpose |
|
||||
|-------|------|---------|
|
||||
| `lastRoute` | `LastRoute \| null` | Where the most recent channel message came from. Updated on every incoming message. |
|
||||
| `pendingRoutes` | `{ route, acks }[]` | FIFO queue of snapshotted routes, one per debouncer flush. Dequeued on `agent_start`. |
|
||||
| `activeRoute` | `LastRoute \| null` | Route for the currently running agent. Set on `agent_start`, cleared on `agent_end`. Stable across all turns within one run. |
|
||||
| `ackBuffer` | `LastRoute[]` | Accumulates 👀 ack targets between debouncer flushes. Snapshotted and cleared on each flush. |
|
||||
| `activeAcks` | `LastRoute[]` | All messages with 👀 in the current run. Cleaned up on `agent_end`. |
|
||||
|
||||
### Lifecycle
|
||||
|
||||
```
|
||||
Message A arrives → lastRoute = A, ackBuffer = [A], 👀 on A
|
||||
Message B arrives (50ms) → lastRoute = B, ackBuffer = [A, B], 👀 on B
|
||||
─── 500ms idle ───
|
||||
Debouncer flushes → pendingRoutes.push({ route: B, acks: [A, B] })
|
||||
ackBuffer = [], agent.write("A\nB")
|
||||
|
||||
Message C arrives → lastRoute = C, ackBuffer = [C], 👀 on C
|
||||
─── 500ms idle ───
|
||||
Debouncer flushes → pendingRoutes.push({ route: C, acks: [C] })
|
||||
ackBuffer = [], agent.write("C")
|
||||
|
||||
agent_start (run 1) → activeRoute = B, activeAcks = [A, B]
|
||||
(agent processes "A\nB", replies to message B)
|
||||
agent_end (run 1) → remove 👀 from A and B, pendingRoutes still has 1 → keep typing
|
||||
|
||||
agent_start (run 2) → activeRoute = C, activeAcks = [C]
|
||||
(agent processes "C", replies to message C)
|
||||
agent_end (run 2) → remove 👀 from C, pendingRoutes empty → stop typing
|
||||
```
|
||||
|
||||
### Why agent_start / agent_end (not message_end)
|
||||
|
||||
In multi-turn agent runs (e.g. when the agent uses tools), `message_end` fires once per assistant message — potentially multiple times per `agent.run()`. Using `message_end` for state management would:
|
||||
- Clear `activeRoute` mid-run, causing the next turn's aggregator to pick up the wrong route
|
||||
- Remove 👀 too early (before the agent is actually done)
|
||||
- Stop typing between tool-call turns
|
||||
|
||||
`agent_start` and `agent_end` fire exactly once per `agent.run()`, making them the correct lifecycle boundaries.
|
||||
|
||||
### lastRoute vs activeRoute
|
||||
|
||||
- **`lastRoute`** — global, updated on every incoming message. Used for: typing indicators, error reporting, creating aggregators when no activeRoute exists.
|
||||
- **`activeRoute`** — per-run, set from queue on `agent_start`. Used for: reply targeting via aggregator. Guarantees that a run's reply goes to the correct message even if new messages arrive during processing.
|
||||
|
||||
Desktop and Web always receive agent events independently via their own mechanisms (IPC / Gateway). `clearLastRoute()` is called when a desktop/web message arrives to prevent channel forwarding.
|
||||
|
||||
## Inbound Debouncer
|
||||
|
||||
The `InboundDebouncer` (`inbound-debouncer.ts`) batches rapid-fire messages from the same conversation into a single `agent.write()` call. This prevents the agent from processing incomplete thoughts when users send multiple short messages quickly.
|
||||
|
||||
**Parameters:**
|
||||
- `delayMs` (default 500ms) — idle window: how long to wait after each message before flushing
|
||||
- `maxWaitMs` (default 2000ms) — hard cap: max time since first message before force-flushing
|
||||
|
||||
**Behavior:**
|
||||
- Messages within 500ms of each other are combined with newlines
|
||||
- Messages >500ms apart get independent flushes and separate agent runs
|
||||
- No busy-awareness: each flush is independent regardless of agent state
|
||||
- Each flush triggers a route snapshot (lastRoute + ackBuffer) pushed to the pendingRoutes queue
|
||||
|
||||
## Typing and Reaction Lifecycle
|
||||
|
||||
### Typing Indicator
|
||||
- **Start:** `routeIncoming()` — starts a 5s repeating interval (Telegram requires re-sending "typing" every 5s)
|
||||
- **Stop:** `agent_end` — only if `pendingRoutes` is empty (all queued runs complete). If runs remain queued, typing persists.
|
||||
- **Also stops on:** `clearLastRoute()` (desktop/web message), `stopAccount()`, `stopAll()`, `agent_error`
|
||||
|
||||
### 👀 Ack Reaction
|
||||
- **Add:** `routeIncoming()` — immediately on each message, before debouncing
|
||||
- **Track:** pushed to `ackBuffer`, then snapshotted into `pendingRoutes[].acks` on debouncer flush, then moved to `activeAcks` on `agent_start`
|
||||
- **Remove:** `agent_end` — iterates `activeAcks` and removes 👀 from each message
|
||||
- **Also removed on:** `agent_error`
|
||||
|
||||
This ensures every queued message shows 👀 while waiting, and all 👀 are cleaned up precisely when the agent finishes processing that batch.
|
||||
|
||||
## Configuration
|
||||
|
||||
|
|
@ -144,7 +255,7 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T
|
|||
|
||||
- [ ] `config` adapter: parse credentials from `credentials.json5`
|
||||
- [ ] `gateway` adapter: connect to platform, normalize messages to `ChannelMessage`
|
||||
- [ ] `outbound` adapter: `sendText`, `replyText`, optional `sendTyping`
|
||||
- [ ] `outbound` adapter: `sendText`, `replyText`, optional `sendTyping`, `addReaction`, `removeReaction`
|
||||
- [ ] `downloadMedia` (if platform supports media): download to `MEDIA_CACHE_DIR`
|
||||
- [ ] Group filtering: only respond to messages directed at the bot
|
||||
- [ ] Graceful shutdown: respect the `AbortSignal` passed to `gateway.start()`
|
||||
|
|
@ -154,7 +265,8 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T
|
|||
| File | Role |
|
||||
|------|------|
|
||||
| `src/channels/types.ts` | All type definitions (`ChannelPlugin`, `ChannelMessage`, `DeliveryContext`, etc.) |
|
||||
| `src/channels/manager.ts` | `ChannelManager` — bridges plugins to the Hub's agent |
|
||||
| `src/channels/manager.ts` | `ChannelManager` — bridges plugins to the Hub's agent, route queue, typing/ack lifecycle |
|
||||
| `src/channels/inbound-debouncer.ts` | `InboundDebouncer` — batches rapid-fire messages per conversationId |
|
||||
| `src/channels/registry.ts` | Plugin registry (`registerChannel`, `listChannels`, `getChannel`) |
|
||||
| `src/channels/config.ts` | Load channel config from `credentials.json5` |
|
||||
| `src/channels/index.ts` | Bootstrap: register built-in plugins, re-export public API |
|
||||
|
|
@ -165,6 +277,7 @@ Each channel ID maps to accounts (keyed by account ID, typically `"default"`). T
|
|||
| `src/media/describe-video.ts` | Video description (ffmpeg frame + Vision API) |
|
||||
| `src/shared/paths.ts` | `MEDIA_CACHE_DIR` path constant |
|
||||
| `src/hub/message-aggregator.ts` | Streaming text → block chunking for channel delivery |
|
||||
| `packages/ui/src/components/message-list.tsx` | UI rendering with `stripUserMetadata()` for clean display |
|
||||
|
||||
## Current Plugins
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,27 @@ function getThinkingText(blocks: ContentBlock[]): string {
|
|||
.join("")
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip LLM-facing metadata prefixes from user messages for clean display.
|
||||
*
|
||||
* TODO: This is a short-term workaround. The root cause is that agent.write()
|
||||
* bakes timestamp and media-type prefixes into the message content, and
|
||||
* session JSONL stores the enriched string as-is. The proper fix is to
|
||||
* separate "displayContent" from "llmContent" at the storage layer so the
|
||||
* UI never sees LLM context prefixes. This regex approach is fragile —
|
||||
* any change to timestamp format, locale, or new media types will break it.
|
||||
*/
|
||||
function stripUserMetadata(text: string): string {
|
||||
// Strip timestamp envelope: [Mon 2026-02-09 14:38 GMT+8]
|
||||
let cleaned = text.replace(/^\[(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}[^\]]*\]\s*/, "")
|
||||
// Strip media type labels injected by channel media processing
|
||||
cleaned = cleaned
|
||||
.replace(/^\[Voice Message\]\n?Transcript:\s*/i, "")
|
||||
.replace(/^\[Image\]\n?Description:\s*/i, "")
|
||||
.replace(/^\[Video\]\n?Description:\s*/i, "")
|
||||
return cleaned
|
||||
}
|
||||
|
||||
/** Build a synthetic "running" toolResult Message from a ToolCall block */
|
||||
function toRunningMessage(tc: ToolCall, agentId: string): Message {
|
||||
return {
|
||||
|
|
@ -62,7 +83,8 @@ export const MessageList = memo(function MessageList({ messages, streamingIds }:
|
|||
return <ToolCallItem key={msg.id} message={msg} />
|
||||
}
|
||||
|
||||
const text = getTextContent(msg.content)
|
||||
const rawText = getTextContent(msg.content)
|
||||
const text = msg.role === "user" ? stripUserMetadata(rawText) : rawText
|
||||
const toolCalls = msg.role === "assistant" ? getToolCalls(msg.content) : []
|
||||
const thinking = msg.role === "assistant" ? getThinkingText(msg.content) : ""
|
||||
const hasThinkingBlocks = msg.role === "assistant" && msg.content.some((b) => b.type === "thinking")
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
---
|
||||
name: Audio Transcription
|
||||
description: Transcribe audio files using local Whisper CLI (fallback when API is unavailable)
|
||||
version: 1.0.0
|
||||
description: Transcribe audio files using local Whisper CLI when automatic pre-processing is unavailable
|
||||
version: 1.1.0
|
||||
metadata:
|
||||
emoji: "🎙️"
|
||||
requires:
|
||||
|
|
@ -23,14 +23,33 @@ userInvocable: false
|
|||
disableModelInvocation: false
|
||||
---
|
||||
|
||||
## Audio Transcription (Local Fallback)
|
||||
## Audio Transcription (Agent Fallback)
|
||||
|
||||
Voice messages from channels are normally transcribed automatically via the OpenAI Whisper API before reaching you. This skill is only needed when the API is unavailable.
|
||||
Voice messages from channels are pre-processed before reaching you. The transcription
|
||||
priority is:
|
||||
|
||||
If you receive `[audio message received]` with a `File:` path (instead of `[Voice Message]` with a transcript), it means the API transcription was not available. Use local whisper to transcribe:
|
||||
1. **Local whisper CLI** (free, offline) — requires `whisper` or `whisper-cli` in PATH
|
||||
2. **OpenAI Whisper API** — requires an OpenAI API key in credentials
|
||||
3. **No provider available** — you receive a raw file path instead of a transcript
|
||||
|
||||
When both providers are unavailable, you will receive `[audio message received]` with a
|
||||
`File:` path instead of `[Voice Message]` with a transcript. Use local whisper to
|
||||
transcribe manually:
|
||||
|
||||
```
|
||||
whisper "<file_path>" --model base --output_format txt --output_dir /tmp
|
||||
```
|
||||
|
||||
Then read the `.txt` file from `/tmp/` and respond based on the transcribed content.
|
||||
|
||||
### Setup
|
||||
|
||||
To enable automatic local transcription (recommended):
|
||||
|
||||
```bash
|
||||
brew install openai-whisper
|
||||
```
|
||||
|
||||
The first run will download the `base` model (~139MB) to `~/.cache/whisper/`.
|
||||
No app restart is required — the binary is detected automatically on the next
|
||||
voice message.
|
||||
|
|
|
|||
|
|
@ -10,12 +10,9 @@
|
|||
* fire immediately regardless of timer
|
||||
* 4. When timer fires, call the flush callback with all accumulated text
|
||||
*
|
||||
* This prevents rapid-fire messages from triggering multiple separate Agent
|
||||
* runs. Instead, messages sent within a short window are concatenated with
|
||||
* newlines and dispatched as one combined prompt.
|
||||
*
|
||||
* Inspired by OpenClaw's createInboundDebouncer pattern.
|
||||
* @see docs/channel/openclaw-research.md — Section 7.3 message preprocessing
|
||||
* Each flush produces an independent agent.write() call with its own reply
|
||||
* target. Messages arriving while the agent is busy are NOT accumulated —
|
||||
* they get their own debounce window and independent processing.
|
||||
*/
|
||||
|
||||
interface PendingBatch {
|
||||
|
|
@ -24,7 +21,7 @@ interface PendingBatch {
|
|||
/** Timestamp of the first message in this batch */
|
||||
firstArrival: number;
|
||||
/** Idle timer — fires when no new message arrives within delayMs */
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
timer: ReturnType<typeof setTimeout> | null;
|
||||
}
|
||||
|
||||
export class InboundDebouncer {
|
||||
|
|
@ -44,11 +41,12 @@ export class InboundDebouncer {
|
|||
/** Add a message to the buffer. May trigger an immediate flush if maxWaitMs exceeded. */
|
||||
push(conversationId: string, text: string): void {
|
||||
const existing = this.pending.get(conversationId);
|
||||
const preview = text.slice(0, 40) + (text.length > 40 ? "..." : "");
|
||||
|
||||
if (existing) {
|
||||
// Append to existing batch, reset idle timer
|
||||
// Append to existing batch, clear any running timer
|
||||
existing.texts.push(text);
|
||||
clearTimeout(existing.timer);
|
||||
if (existing.timer !== null) clearTimeout(existing.timer);
|
||||
|
||||
// Check hard cap: if we've been buffering too long, flush now
|
||||
const elapsed = Date.now() - existing.firstArrival;
|
||||
|
|
@ -58,9 +56,11 @@ export class InboundDebouncer {
|
|||
}
|
||||
|
||||
// Reset idle timer
|
||||
console.log(`[Debouncer] push (appending #${existing.texts.length}): "${preview}"`);
|
||||
existing.timer = setTimeout(() => this.flush(conversationId), this.delayMs);
|
||||
} else {
|
||||
// Start a new batch
|
||||
// Start a new batch with idle timer
|
||||
console.log(`[Debouncer] push (new batch, timer=${this.delayMs}ms): "${preview}"`);
|
||||
const timer = setTimeout(() => this.flush(conversationId), this.delayMs);
|
||||
this.pending.set(conversationId, {
|
||||
texts: [text],
|
||||
|
|
@ -75,18 +75,19 @@ export class InboundDebouncer {
|
|||
const batch = this.pending.get(conversationId);
|
||||
if (!batch) return;
|
||||
|
||||
clearTimeout(batch.timer);
|
||||
if (batch.timer !== null) clearTimeout(batch.timer);
|
||||
this.pending.delete(conversationId);
|
||||
|
||||
// Join multiple messages with newlines so the Agent sees them as one prompt
|
||||
const combined = batch.texts.join("\n");
|
||||
console.log(`[Debouncer] flush: ${batch.texts.length} message(s), ${combined.length} chars`);
|
||||
this.flushFn(conversationId, combined);
|
||||
}
|
||||
|
||||
/** Clean up all pending timers (call on shutdown) */
|
||||
dispose(): void {
|
||||
for (const batch of this.pending.values()) {
|
||||
clearTimeout(batch.timer);
|
||||
if (batch.timer !== null) clearTimeout(batch.timer);
|
||||
}
|
||||
this.pending.clear();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,8 +46,19 @@ export class ChannelManager {
|
|||
private readonly hub: Hub;
|
||||
/** Running accounts keyed by "channelId:accountId" */
|
||||
private readonly accounts = new Map<string, AccountHandle>();
|
||||
/** Where the last channel message came from (reply target) */
|
||||
/** Where the last channel message came from (used for typing/reactions/errors) */
|
||||
private lastRoute: LastRoute | null = null;
|
||||
/**
|
||||
* FIFO queue of route snapshots + their ack targets, captured at each debouncer flush.
|
||||
* Each agent.write() gets its own entry; dequeued on agent_start.
|
||||
*/
|
||||
private pendingRoutes: { route: LastRoute; acks: LastRoute[] }[] = [];
|
||||
/** Route for the currently active agent run (set on agent_start, cleared on agent_end). */
|
||||
private activeRoute: LastRoute | null = null;
|
||||
/** All messages in the current run's batch that have 👀 (cleared on agent_end). */
|
||||
private activeAcks: LastRoute[] = [];
|
||||
/** Accumulates message routes for 👀 removal between debouncer flushes. */
|
||||
private ackBuffer: LastRoute[] = [];
|
||||
/** Unsubscribe function for the agent subscriber */
|
||||
private agentUnsubscribe: (() => void) | null = null;
|
||||
/** Session ID of the currently subscribed agent (for stale detection) */
|
||||
|
|
@ -188,13 +199,50 @@ export class ChannelManager {
|
|||
this.subscribedAgentId = agent.sessionId;
|
||||
|
||||
this.agentUnsubscribe = agent.subscribe((event) => {
|
||||
const maybeMessage = (event as { message?: { role?: string } }).message;
|
||||
const role = maybeMessage?.role;
|
||||
|
||||
// Activate the next pending route + acks when a new agent run starts.
|
||||
if (event.type === "agent_start") {
|
||||
const entry = this.pendingRoutes.shift();
|
||||
if (entry) {
|
||||
this.activeRoute = entry.route;
|
||||
this.activeAcks = entry.acks;
|
||||
console.log(`[Channels] agent_start: activeRoute replyTo=${entry.route.deliveryCtx.replyToMessageId}, acks=${entry.acks.length}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Agent run complete — remove 👀 from all batch messages, conditionally stop typing.
|
||||
if (event.type === "agent_end") {
|
||||
for (const ack of this.activeAcks) {
|
||||
if (ack.plugin.outbound.removeReaction) {
|
||||
console.log(`[Channels] agent_end: removing 👀 from replyTo=${ack.deliveryCtx.replyToMessageId}`);
|
||||
void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {});
|
||||
}
|
||||
}
|
||||
this.activeRoute = null;
|
||||
this.activeAcks = [];
|
||||
if (this.pendingRoutes.length === 0) {
|
||||
console.log("[Channels] agent_end: no more pending, stopping typing");
|
||||
this.stopTyping();
|
||||
} else {
|
||||
console.log(`[Channels] agent_end: ${this.pendingRoutes.length} pending run(s), keeping typing`);
|
||||
}
|
||||
}
|
||||
|
||||
// No active channel route — skip (reply goes to desktop/gateway only)
|
||||
if (!this.lastRoute) return;
|
||||
|
||||
// Handle agent errors — notify the channel user
|
||||
if (event.type === "agent_error") {
|
||||
this.stopTyping();
|
||||
this.removeAckReaction();
|
||||
for (const ack of this.activeAcks) {
|
||||
if (ack.plugin.outbound.removeReaction) {
|
||||
void ack.plugin.outbound.removeReaction(ack.deliveryCtx).catch(() => {});
|
||||
}
|
||||
}
|
||||
this.activeRoute = null;
|
||||
this.activeAcks = [];
|
||||
const errorMsg = (event as { message?: string }).message ?? "Unknown error";
|
||||
console.error(`[Channels] Agent error: ${errorMsg}`);
|
||||
const route = this.lastRoute;
|
||||
|
|
@ -206,9 +254,6 @@ export class ChannelManager {
|
|||
return;
|
||||
}
|
||||
|
||||
const maybeMessage = (event as { message?: { role?: string } }).message;
|
||||
const role = maybeMessage?.role;
|
||||
|
||||
// Only forward assistant message events
|
||||
if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") {
|
||||
if (role !== "assistant") return;
|
||||
|
|
@ -220,8 +265,6 @@ export class ChannelManager {
|
|||
// Keep heartbeat acknowledgements internal (same behavior as desktop/gateway stream path).
|
||||
if (isHeartbeatAckEvent(event)) {
|
||||
if (event.type === "message_end") {
|
||||
this.stopTyping();
|
||||
this.removeAckReaction();
|
||||
this.aggregator = null;
|
||||
}
|
||||
return;
|
||||
|
|
@ -236,28 +279,31 @@ export class ChannelManager {
|
|||
this.aggregator.handleEvent(event);
|
||||
}
|
||||
|
||||
// Clean up after response complete
|
||||
// Finalize aggregator per assistant message (may fire multiple times in multi-turn runs).
|
||||
// Typing and ack removal are handled at agent_end, not here.
|
||||
if (event.type === "message_end" && role === "assistant") {
|
||||
this.stopTyping();
|
||||
this.removeAckReaction();
|
||||
this.aggregator = null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Create a fresh aggregator wired to the current lastRoute */
|
||||
/**
|
||||
* Create a fresh aggregator wired to the activeRoute (snapshotted at flush time).
|
||||
* Falls back to lastRoute for non-debounced paths (e.g. direct writes).
|
||||
*/
|
||||
private createAggregator(): void {
|
||||
const route = this.lastRoute;
|
||||
const route = this.activeRoute ?? this.lastRoute;
|
||||
if (!route) return;
|
||||
|
||||
const { plugin, deliveryCtx } = route;
|
||||
console.log(`[Channels] createAggregator: replyTo=${deliveryCtx.replyToMessageId} (source=${this.activeRoute ? "activeRoute" : "lastRoute"})`);
|
||||
const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG;
|
||||
|
||||
this.aggregator = new MessageAggregator(
|
||||
chunkerConfig,
|
||||
async (block) => {
|
||||
try {
|
||||
console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId}`);
|
||||
console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId} replyTo=${deliveryCtx.replyToMessageId}`);
|
||||
if (block.index === 0) {
|
||||
await plugin.outbound.replyText(deliveryCtx, block.text);
|
||||
} else {
|
||||
|
|
@ -303,12 +349,17 @@ export class ChannelManager {
|
|||
replyToMessageId: messageId,
|
||||
},
|
||||
};
|
||||
console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`);
|
||||
console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId} replyTo=${messageId}`);
|
||||
console.log(`[Channels] Forwarding to agent ${agent.sessionId}`);
|
||||
|
||||
// Show typing indicator and ACK reaction while agent processes
|
||||
// Show typing indicator and 👀 ack on this message
|
||||
this.startTyping();
|
||||
this.addAckReaction();
|
||||
const ackRoute: LastRoute = { ...this.lastRoute };
|
||||
if (ackRoute.plugin.outbound.addReaction) {
|
||||
console.log(`[Channels] Adding 👀 to replyTo=${messageId}`);
|
||||
void ackRoute.plugin.outbound.addReaction(ackRoute.deliveryCtx, "👀").catch(() => {});
|
||||
}
|
||||
this.ackBuffer.push(ackRoute);
|
||||
|
||||
// Handle media messages (processed async, then fed through debouncer)
|
||||
if (message.media && plugin.downloadMedia) {
|
||||
|
|
@ -406,7 +457,15 @@ export class ChannelManager {
|
|||
if (!this.debouncer) {
|
||||
this.debouncer = new InboundDebouncer(
|
||||
(_conversationId, combinedText) => {
|
||||
console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent`);
|
||||
// Snapshot the current route + pending acks for this batch.
|
||||
const route = this.lastRoute ? { ...this.lastRoute } : null;
|
||||
const acks = [...this.ackBuffer];
|
||||
this.ackBuffer = [];
|
||||
if (route) {
|
||||
this.pendingRoutes.push({ route, acks });
|
||||
}
|
||||
const replyTo = route?.deliveryCtx.replyToMessageId ?? "?";
|
||||
console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`);
|
||||
agent.write(combinedText);
|
||||
},
|
||||
);
|
||||
|
|
@ -414,20 +473,6 @@ export class ChannelManager {
|
|||
return this.debouncer;
|
||||
}
|
||||
|
||||
/** Add 👀 reaction to acknowledge message receipt */
|
||||
private addAckReaction(): void {
|
||||
const route = this.lastRoute;
|
||||
if (!route?.plugin.outbound.addReaction) return;
|
||||
void route.plugin.outbound.addReaction(route.deliveryCtx, "👀").catch(() => {});
|
||||
}
|
||||
|
||||
/** Remove ACK reaction when processing completes */
|
||||
private removeAckReaction(): void {
|
||||
const route = this.lastRoute;
|
||||
if (!route?.plugin.outbound.removeReaction) return;
|
||||
void route.plugin.outbound.removeReaction(route.deliveryCtx).catch(() => {});
|
||||
}
|
||||
|
||||
/** Start sending typing indicators (repeats every 5s until stopped) */
|
||||
private startTyping(): void {
|
||||
this.stopTyping();
|
||||
|
|
@ -462,6 +507,10 @@ export class ChannelManager {
|
|||
if (this.lastRoute && this.lastRoute.plugin.id === channelId && this.lastRoute.deliveryCtx.accountId === accountId) {
|
||||
this.stopTyping();
|
||||
this.lastRoute = null;
|
||||
this.activeRoute = null;
|
||||
this.activeAcks = [];
|
||||
this.ackBuffer = [];
|
||||
this.pendingRoutes = [];
|
||||
this.aggregator = null;
|
||||
}
|
||||
|
||||
|
|
@ -495,6 +544,10 @@ export class ChannelManager {
|
|||
}
|
||||
this.accounts.clear();
|
||||
this.lastRoute = null;
|
||||
this.activeRoute = null;
|
||||
this.activeAcks = [];
|
||||
this.ackBuffer = [];
|
||||
this.pendingRoutes = [];
|
||||
this.aggregator = null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ import { execFile, execFileSync } from "node:child_process";
|
|||
import { tmpdir } from "node:os";
|
||||
import { credentialManager } from "../agent/credentials.js";
|
||||
|
||||
/** Cached path to local whisper binary, or false if not found */
|
||||
let cachedWhisperBin: string | false | undefined;
|
||||
/** Cached path to local whisper binary (only caches success; misses re-check each time) */
|
||||
let cachedWhisperBin: string | undefined;
|
||||
|
||||
/** Find local whisper binary in PATH */
|
||||
function findWhisperBin(): string | false {
|
||||
|
|
@ -35,7 +35,7 @@ function findWhisperBin(): string | false {
|
|||
}
|
||||
}
|
||||
|
||||
cachedWhisperBin = false;
|
||||
// Don't cache failure — whisper may be installed while the process is running
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue