Document the three independent message paths (Desktop IPC, Web WebSocket, Channel Bot API) including send/receive flows, error handling, lastRoute pattern, and event filtering comparison. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
10 KiB
Message Paths — Desktop / Web / Channel
Three independent paths deliver messages to and from the Hub's agent.
All three share the same AsyncAgent instance — they are just different I/O surfaces.
Overview
Desktop (Electron IPC) Web (WebSocket via Gateway) Channel (Bot API, e.g. Telegram)
│ │ │
▼ ▼ ▼
localChat:send IPC client.send → Gateway WS plugin.gateway (polling/webhook)
│ │ │
▼ ▼ ▼
hub.ts / ipc/hub.ts hub.ts / onMessage manager.ts / routeIncoming
clearLastRoute() clearLastRoute() set lastRoute
│ │ │
└────────────────► agent.write(text) ◄──────────────────────────────┘
│
▼
AsyncAgent.run()
│
┌────────────┴────────────────┐
▼ ▼
agent.subscribe() agent.read()
(multi-consumer) (single-consumer iterable)
│ │
┌────────┴────────┐ ▼
▼ ▼ hub.ts / consumeAgent()
Desktop IPC Channel Manager │
(ipc/hub.ts) (manager.ts) ▼
│ │ Gateway WS → Web client
▼ ▼
localChat:event Bot API reply
→ renderer (via lastRoute)
Path 1: Desktop (Electron IPC)
Send (User → Agent)
Renderer: sendMessage(text)
→ IPC: localChat:send
→ ipc/hub.ts handler
→ hub.channelManager.clearLastRoute() // reply stays in desktop
→ agent.write(text)
File: apps/desktop/electron/ipc/hub.ts — localChat:send handler (line ~373)
Receive (Agent → User)
Agent runs LLM
→ pi-agent-core fires AgentEvent
→ Agent.subscribeAll() → AsyncAgent channel + subscribers
→ agent.subscribe() callback in ipc/hub.ts
→ Filter: assistant messages + tool_execution + passthrough (compaction, agent_error)
→ IPC: mainWindow.webContents.send('localChat:event', { agentId, streamId, event })
→ Renderer: use-local-chat.ts onEvent callback
→ chat.handleStream(payload)
Files:
apps/desktop/electron/ipc/hub.ts—localChat:subscribehandler (line ~248)apps/desktop/src/hooks/use-local-chat.ts—onEventlistener (line ~54)packages/hooks/src/use-chat.ts—handleStream()(line ~133)
Error Handling
Agent.run() throws / returns error
→ AsyncAgent.write() catch block
→ channel.send(legacy Message) // for read() consumers (Web)
→ agent.emitMulticaEvent({ type: "agent_error", error }) // for subscribe() consumers
→ ipc/hub.ts subscriber → passthrough event → localChat:event
→ use-local-chat.ts → chat.setError() + setIsLoading(false)
Path 2: Web (WebSocket via Gateway)
Send (User → Agent)
Web app: sendMessage(text)
→ GatewayClient.send(hubId, "message", { agentId, content })
→ Socket.io → Gateway server → routes to Hub device
→ hub.ts / onMessage handler
→ channelManager.clearLastRoute() // reply stays in gateway
→ agentSenders.set(agentId, deviceId)
→ agent.write(content)
File: src/hub/hub.ts — onMessage handler (line ~154)
Receive (Agent → User)
Agent runs LLM
→ pi-agent-core fires AgentEvent
→ Agent.subscribeAll() → AsyncAgent channel + subscribers
→ agent.read() consumed by hub.ts / consumeAgent()
→ Filter: assistant messages + tool_execution + passthrough (compaction, agent_error)
→ client.send(targetDeviceId, StreamAction, { streamId, agentId, event })
→ Socket.io → Gateway → routes to Web client device
→ GatewayClient.onMessage callback
→ use-gateway-chat.ts → chat.handleStream(payload)
Files:
src/hub/hub.ts—consumeAgent()(line ~314)packages/hooks/src/use-gateway-chat.ts—onMessagelistener (line ~50)packages/hooks/src/use-chat.ts—handleStream()(line ~133)
Error Handling
Agent.run() throws / returns error
→ AsyncAgent.write() catch block
→ channel.send(legacy Message) // consumed by consumeAgent() → sent as "message" action
→ agent.emitMulticaEvent({ type: "agent_error", error })
→ read() → consumeAgent() → passthrough event → StreamAction
→ GatewayClient → use-gateway-chat.ts → chat.setError() + setIsLoading(false)
Note: Legacy error Messages also reach the Web client as "message" action (a plain text fallback). The agent_error event provides structured error info for proper UI rendering.
Path 3: Channel (Bot API, e.g. Telegram)
Send (User → Agent)
User sends message in Telegram
→ grammy long-polling receives Update
→ plugin.gateway.start() callback: onMessage(channelMessage)
→ ChannelManager.routeIncoming()
→ Set lastRoute = { plugin, deliveryCtx } // reply goes back to Telegram
→ agent.write(text) // same as desktop/web
File: src/channels/manager.ts — routeIncoming() (line ~233)
Receive (Agent → User)
Agent runs LLM
→ pi-agent-core fires AgentEvent
→ Agent.subscribeAll() → AsyncAgent channel + subscribers
→ agent.subscribe() callback in ChannelManager.subscribeToAgent()
→ Check: if (!lastRoute) return // no active channel route, skip
→ Filter: only assistant messages
→ message_start → createAggregator() // MessageAggregator buffers/chunks text
→ message_update → aggregator.handleEvent()
→ message_end → aggregator.handleEvent() → null aggregator
→ Aggregator emits text blocks
→ Block 0: plugin.outbound.replyText(deliveryCtx, text) // Telegram reply
→ Block N: plugin.outbound.sendText(deliveryCtx, text) // follow-up messages
Files:
src/channels/manager.ts—subscribeToAgent()(line ~151),createAggregator()(line ~205)src/hub/message-aggregator.ts— text chunking/buffering logic
Error Handling
Agent.run() throws / returns error
→ AsyncAgent.write() catch block
→ agent.emitMulticaEvent({ type: "agent_error", error })
→ subscribe() → ChannelManager subscriber
→ if lastRoute exists:
→ plugin.outbound.sendText(deliveryCtx, "[Error] ${errorMsg}")
Comparison Table
| Aspect | Desktop (IPC) | Web (WebSocket) | Channel (Bot API) |
|---|---|---|---|
| Transport | Electron IPC | Socket.io via Gateway | Bot API (HTTP) |
| Send entry | localChat:send |
client.send → Gateway |
routeIncoming |
| Receive method | agent.subscribe() |
agent.read() (iterable) |
agent.subscribe() |
| Consumer | ipc/hub.ts subscriber | hub.ts consumeAgent() |
manager.ts subscriber |
| Frontend hook | use-local-chat.ts |
use-gateway-chat.ts |
N/A (Bot API) |
| State hook | use-chat.ts |
use-chat.ts |
N/A |
| Reply routing | Always (IPC channel) | agentSenders Map |
lastRoute pattern |
| clearLastRoute | Yes (on send) | Yes (on send) | No (sets lastRoute) |
| Error display | agent_error → UI |
agent_error → UI |
agent_error → Bot text |
| Tool results | Rendered in UI | Rendered in UI | Skipped (text only) |
| Text chunking | No (full stream) | No (full stream) | Yes (MessageAggregator) |
lastRoute Pattern
The lastRoute tracks which channel last sent a message. When the agent replies:
- If
lastRouteis set → reply goes to that channel (e.g. Telegram) - If
lastRouteis null → reply goes to Desktop/Web only (via their own mechanisms)
Clearing: Desktop and Web both call channelManager.clearLastRoute() before agent.write(), so channel replies stop when the user switches to desktop/web.
Setting: routeIncoming() sets lastRoute when a channel message arrives.
Desktop and Web always receive agent events regardless of lastRoute — they use their own independent delivery mechanisms (IPC subscribe / Gateway read).
Event Filtering
All three paths filter raw agent events. Only these are forwarded to consumers:
| Event Type | Desktop | Web | Channel |
|---|---|---|---|
message_start |
assistant only | assistant only | assistant only |
message_update |
assistant only | assistant only | assistant only |
message_end |
assistant only | assistant only | assistant only |
tool_execution_start |
Yes | Yes | No |
tool_execution_end |
Yes | Yes | No |
compaction_start |
Yes (passthrough) | Yes (passthrough) | No |
compaction_end |
Yes (passthrough) | Yes (passthrough) | No |
agent_error |
Yes (passthrough) | Yes (passthrough) | Yes (→ text) |
| User message events | Filtered out | Filtered out | Filtered out |