docs(channels): add message paths documentation
Document the three independent message paths (Desktop IPC, Web WebSocket, Channel Bot API) including send/receive flows, error handling, lastRoute pattern, and event filtering comparison. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
e99600c356
commit
be71614cf5
1 changed files with 232 additions and 0 deletions
232
docs/message-paths.md
Normal file
232
docs/message-paths.md
Normal file
|
|
@ -0,0 +1,232 @@
|
|||
# Message Paths — Desktop / Web / Channel
|
||||
|
||||
Three independent paths deliver messages to and from the Hub's agent.
|
||||
All three share the same `AsyncAgent` instance — they are just different I/O surfaces.
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
```
|
||||
Desktop (Electron IPC) Web (WebSocket via Gateway) Channel (Bot API, e.g. Telegram)
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
localChat:send IPC client.send → Gateway WS plugin.gateway (polling/webhook)
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
hub.ts / ipc/hub.ts hub.ts / onMessage manager.ts / routeIncoming
|
||||
clearLastRoute() clearLastRoute() set lastRoute
|
||||
│ │ │
|
||||
└────────────────► agent.write(text) ◄──────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
AsyncAgent.run()
|
||||
│
|
||||
┌────────────┴────────────────┐
|
||||
▼ ▼
|
||||
agent.subscribe() agent.read()
|
||||
(multi-consumer) (single-consumer iterable)
|
||||
│ │
|
||||
┌────────┴────────┐ ▼
|
||||
▼ ▼ hub.ts / consumeAgent()
|
||||
Desktop IPC Channel Manager │
|
||||
(ipc/hub.ts) (manager.ts) ▼
|
||||
│ │ Gateway WS → Web client
|
||||
▼ ▼
|
||||
localChat:event Bot API reply
|
||||
→ renderer (via lastRoute)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Path 1: Desktop (Electron IPC)
|
||||
|
||||
### Send (User → Agent)
|
||||
|
||||
```
|
||||
Renderer: sendMessage(text)
|
||||
→ IPC: localChat:send
|
||||
→ ipc/hub.ts handler
|
||||
→ hub.channelManager.clearLastRoute() // reply stays in desktop
|
||||
→ agent.write(text)
|
||||
```
|
||||
|
||||
**File**: `apps/desktop/electron/ipc/hub.ts` — `localChat:send` handler (line ~373)
|
||||
|
||||
### Receive (Agent → User)
|
||||
|
||||
```
|
||||
Agent runs LLM
|
||||
→ pi-agent-core fires AgentEvent
|
||||
→ Agent.subscribeAll() → AsyncAgent channel + subscribers
|
||||
→ agent.subscribe() callback in ipc/hub.ts
|
||||
→ Filter: assistant messages + tool_execution + passthrough (compaction, agent_error)
|
||||
→ IPC: mainWindow.webContents.send('localChat:event', { agentId, streamId, event })
|
||||
→ Renderer: use-local-chat.ts onEvent callback
|
||||
→ chat.handleStream(payload)
|
||||
```
|
||||
|
||||
**Files**:
|
||||
- `apps/desktop/electron/ipc/hub.ts` — `localChat:subscribe` handler (line ~248)
|
||||
- `apps/desktop/src/hooks/use-local-chat.ts` — `onEvent` listener (line ~54)
|
||||
- `packages/hooks/src/use-chat.ts` — `handleStream()` (line ~133)
|
||||
|
||||
### Error Handling
|
||||
|
||||
```
|
||||
Agent.run() throws / returns error
|
||||
→ AsyncAgent.write() catch block
|
||||
→ channel.send(legacy Message) // for read() consumers (Web)
|
||||
→ agent.emitMulticaEvent({ type: "agent_error", error }) // for subscribe() consumers
|
||||
→ ipc/hub.ts subscriber → passthrough event → localChat:event
|
||||
→ use-local-chat.ts → chat.setError() + setIsLoading(false)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Path 2: Web (WebSocket via Gateway)
|
||||
|
||||
### Send (User → Agent)
|
||||
|
||||
```
|
||||
Web app: sendMessage(text)
|
||||
→ GatewayClient.send(hubId, "message", { agentId, content })
|
||||
→ Socket.io → Gateway server → routes to Hub device
|
||||
→ hub.ts / onMessage handler
|
||||
→ channelManager.clearLastRoute() // reply stays in gateway
|
||||
→ agentSenders.set(agentId, deviceId)
|
||||
→ agent.write(content)
|
||||
```
|
||||
|
||||
**File**: `src/hub/hub.ts` — `onMessage` handler (line ~154)
|
||||
|
||||
### Receive (Agent → User)
|
||||
|
||||
```
|
||||
Agent runs LLM
|
||||
→ pi-agent-core fires AgentEvent
|
||||
→ Agent.subscribeAll() → AsyncAgent channel + subscribers
|
||||
→ agent.read() consumed by hub.ts / consumeAgent()
|
||||
→ Filter: assistant messages + tool_execution + passthrough (compaction, agent_error)
|
||||
→ client.send(targetDeviceId, StreamAction, { streamId, agentId, event })
|
||||
→ Socket.io → Gateway → routes to Web client device
|
||||
→ GatewayClient.onMessage callback
|
||||
→ use-gateway-chat.ts → chat.handleStream(payload)
|
||||
```
|
||||
|
||||
**Files**:
|
||||
- `src/hub/hub.ts` — `consumeAgent()` (line ~314)
|
||||
- `packages/hooks/src/use-gateway-chat.ts` — `onMessage` listener (line ~50)
|
||||
- `packages/hooks/src/use-chat.ts` — `handleStream()` (line ~133)
|
||||
|
||||
### Error Handling
|
||||
|
||||
```
|
||||
Agent.run() throws / returns error
|
||||
→ AsyncAgent.write() catch block
|
||||
→ channel.send(legacy Message) // consumed by consumeAgent() → sent as "message" action
|
||||
→ agent.emitMulticaEvent({ type: "agent_error", error })
|
||||
→ read() → consumeAgent() → passthrough event → StreamAction
|
||||
→ GatewayClient → use-gateway-chat.ts → chat.setError() + setIsLoading(false)
|
||||
```
|
||||
|
||||
**Note**: Legacy error Messages also reach the Web client as `"message"` action (a plain text fallback). The `agent_error` event provides structured error info for proper UI rendering.
|
||||
|
||||
---
|
||||
|
||||
## Path 3: Channel (Bot API, e.g. Telegram)
|
||||
|
||||
### Send (User → Agent)
|
||||
|
||||
```
|
||||
User sends message in Telegram
|
||||
→ grammy long-polling receives Update
|
||||
→ plugin.gateway.start() callback: onMessage(channelMessage)
|
||||
→ ChannelManager.routeIncoming()
|
||||
→ Set lastRoute = { plugin, deliveryCtx } // reply goes back to Telegram
|
||||
→ agent.write(text) // same as desktop/web
|
||||
```
|
||||
|
||||
**File**: `src/channels/manager.ts` — `routeIncoming()` (line ~233)
|
||||
|
||||
### Receive (Agent → User)
|
||||
|
||||
```
|
||||
Agent runs LLM
|
||||
→ pi-agent-core fires AgentEvent
|
||||
→ Agent.subscribeAll() → AsyncAgent channel + subscribers
|
||||
→ agent.subscribe() callback in ChannelManager.subscribeToAgent()
|
||||
→ Check: if (!lastRoute) return // no active channel route, skip
|
||||
→ Filter: only assistant messages
|
||||
→ message_start → createAggregator() // MessageAggregator buffers/chunks text
|
||||
→ message_update → aggregator.handleEvent()
|
||||
→ message_end → aggregator.handleEvent() → null aggregator
|
||||
→ Aggregator emits text blocks
|
||||
→ Block 0: plugin.outbound.replyText(deliveryCtx, text) // Telegram reply
|
||||
→ Block N: plugin.outbound.sendText(deliveryCtx, text) // follow-up messages
|
||||
```
|
||||
|
||||
**Files**:
|
||||
- `src/channels/manager.ts` — `subscribeToAgent()` (line ~151), `createAggregator()` (line ~205)
|
||||
- `src/hub/message-aggregator.ts` — text chunking/buffering logic
|
||||
|
||||
### Error Handling
|
||||
|
||||
```
|
||||
Agent.run() throws / returns error
|
||||
→ AsyncAgent.write() catch block
|
||||
→ agent.emitMulticaEvent({ type: "agent_error", error })
|
||||
→ subscribe() → ChannelManager subscriber
|
||||
→ if lastRoute exists:
|
||||
→ plugin.outbound.sendText(deliveryCtx, "[Error] ${errorMsg}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Comparison Table
|
||||
|
||||
| Aspect | Desktop (IPC) | Web (WebSocket) | Channel (Bot API) |
|
||||
|---------------------|------------------------|---------------------------|--------------------------|
|
||||
| **Transport** | Electron IPC | Socket.io via Gateway | Bot API (HTTP) |
|
||||
| **Send entry** | `localChat:send` | `client.send` → Gateway | `routeIncoming` |
|
||||
| **Receive method** | `agent.subscribe()` | `agent.read()` (iterable) | `agent.subscribe()` |
|
||||
| **Consumer** | ipc/hub.ts subscriber | hub.ts `consumeAgent()` | manager.ts subscriber |
|
||||
| **Frontend hook** | `use-local-chat.ts` | `use-gateway-chat.ts` | N/A (Bot API) |
|
||||
| **State hook** | `use-chat.ts` | `use-chat.ts` | N/A |
|
||||
| **Reply routing** | Always (IPC channel) | `agentSenders` Map | `lastRoute` pattern |
|
||||
| **clearLastRoute** | Yes (on send) | Yes (on send) | No (sets lastRoute) |
|
||||
| **Error display** | `agent_error` → UI | `agent_error` → UI | `agent_error` → Bot text |
|
||||
| **Tool results** | Rendered in UI | Rendered in UI | Skipped (text only) |
|
||||
| **Text chunking** | No (full stream) | No (full stream) | Yes (MessageAggregator) |
|
||||
|
||||
---
|
||||
|
||||
## lastRoute Pattern
|
||||
|
||||
The `lastRoute` tracks which channel last sent a message. When the agent replies:
|
||||
- If `lastRoute` is set → reply goes to that channel (e.g. Telegram)
|
||||
- If `lastRoute` is null → reply goes to Desktop/Web only (via their own mechanisms)
|
||||
|
||||
**Clearing**: Desktop and Web both call `channelManager.clearLastRoute()` before `agent.write()`, so channel replies stop when the user switches to desktop/web.
|
||||
|
||||
**Setting**: `routeIncoming()` sets `lastRoute` when a channel message arrives.
|
||||
|
||||
Desktop and Web always receive agent events regardless of `lastRoute` — they use their own independent delivery mechanisms (IPC subscribe / Gateway read).
|
||||
|
||||
---
|
||||
|
||||
## Event Filtering
|
||||
|
||||
All three paths filter raw agent events. Only these are forwarded to consumers:
|
||||
|
||||
| Event Type | Desktop | Web | Channel |
|
||||
|-------------------------|---------|-----|---------|
|
||||
| `message_start` | assistant only | assistant only | assistant only |
|
||||
| `message_update` | assistant only | assistant only | assistant only |
|
||||
| `message_end` | assistant only | assistant only | assistant only |
|
||||
| `tool_execution_start` | Yes | Yes | No |
|
||||
| `tool_execution_end` | Yes | Yes | No |
|
||||
| `compaction_start` | Yes (passthrough) | Yes (passthrough) | No |
|
||||
| `compaction_end` | Yes (passthrough) | Yes (passthrough) | No |
|
||||
| `agent_error` | Yes (passthrough) | Yes (passthrough) | Yes (→ text) |
|
||||
| User message events | Filtered out | Filtered out | Filtered out |
|
||||
Loading…
Add table
Add a link
Reference in a new issue