multica/docs/message-paths.md
Naiyuan Qing be71614cf5 docs(channels): add message paths documentation
Document the three independent message paths (Desktop IPC, Web
WebSocket, Channel Bot API) including send/receive flows, error
handling, lastRoute pattern, and event filtering comparison.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 19:35:51 +08:00

10 KiB

Message Paths — Desktop / Web / Channel

Three independent paths deliver messages to and from the Hub's agent. All three share the same AsyncAgent instance — they are just different I/O surfaces.


Overview

Desktop (Electron IPC)          Web (WebSocket via Gateway)        Channel (Bot API, e.g. Telegram)
        │                                │                                  │
        ▼                                ▼                                  ▼
  localChat:send IPC            client.send → Gateway WS           plugin.gateway (polling/webhook)
        │                                │                                  │
        ▼                                ▼                                  ▼
  hub.ts / ipc/hub.ts           hub.ts / onMessage                  manager.ts / routeIncoming
  clearLastRoute()              clearLastRoute()                    set lastRoute
        │                                │                                  │
        └────────────────► agent.write(text) ◄──────────────────────────────┘
                                     │
                                     ▼
                              AsyncAgent.run()
                                     │
                        ┌────────────┴────────────────┐
                        ▼                              ▼
                  agent.subscribe()              agent.read()
                  (multi-consumer)            (single-consumer iterable)
                        │                              │
               ┌────────┴────────┐                     ▼
               ▼                 ▼              hub.ts / consumeAgent()
        Desktop IPC        Channel Manager             │
     (ipc/hub.ts)       (manager.ts)                   ▼
               │                 │              Gateway WS → Web client
               ▼                 ▼
         localChat:event   Bot API reply
         → renderer        (via lastRoute)

Path 1: Desktop (Electron IPC)

Send (User → Agent)

Renderer: sendMessage(text)
  → IPC: localChat:send
    → ipc/hub.ts handler
      → hub.channelManager.clearLastRoute()   // reply stays in desktop
      → agent.write(text)

File: apps/desktop/electron/ipc/hub.tslocalChat:send handler (line ~373)

Receive (Agent → User)

Agent runs LLM
  → pi-agent-core fires AgentEvent
    → Agent.subscribeAll() → AsyncAgent channel + subscribers
      → agent.subscribe() callback in ipc/hub.ts
        → Filter: assistant messages + tool_execution + passthrough (compaction, agent_error)
        → IPC: mainWindow.webContents.send('localChat:event', { agentId, streamId, event })
          → Renderer: use-local-chat.ts onEvent callback
            → chat.handleStream(payload)

Files:

  • apps/desktop/electron/ipc/hub.tslocalChat:subscribe handler (line ~248)
  • apps/desktop/src/hooks/use-local-chat.tsonEvent listener (line ~54)
  • packages/hooks/src/use-chat.tshandleStream() (line ~133)

Error Handling

Agent.run() throws / returns error
  → AsyncAgent.write() catch block
    → channel.send(legacy Message)           // for read() consumers (Web)
    → agent.emitMulticaEvent({ type: "agent_error", error })  // for subscribe() consumers
      → ipc/hub.ts subscriber → passthrough event → localChat:event
        → use-local-chat.ts → chat.setError() + setIsLoading(false)

Path 2: Web (WebSocket via Gateway)

Send (User → Agent)

Web app: sendMessage(text)
  → GatewayClient.send(hubId, "message", { agentId, content })
    → Socket.io → Gateway server → routes to Hub device
      → hub.ts / onMessage handler
        → channelManager.clearLastRoute()    // reply stays in gateway
        → agentSenders.set(agentId, deviceId)
        → agent.write(content)

File: src/hub/hub.tsonMessage handler (line ~154)

Receive (Agent → User)

Agent runs LLM
  → pi-agent-core fires AgentEvent
    → Agent.subscribeAll() → AsyncAgent channel + subscribers
      → agent.read() consumed by hub.ts / consumeAgent()
        → Filter: assistant messages + tool_execution + passthrough (compaction, agent_error)
        → client.send(targetDeviceId, StreamAction, { streamId, agentId, event })
          → Socket.io → Gateway → routes to Web client device
            → GatewayClient.onMessage callback
              → use-gateway-chat.ts → chat.handleStream(payload)

Files:

  • src/hub/hub.tsconsumeAgent() (line ~314)
  • packages/hooks/src/use-gateway-chat.tsonMessage listener (line ~50)
  • packages/hooks/src/use-chat.tshandleStream() (line ~133)

Error Handling

Agent.run() throws / returns error
  → AsyncAgent.write() catch block
    → channel.send(legacy Message)           // consumed by consumeAgent() → sent as "message" action
    → agent.emitMulticaEvent({ type: "agent_error", error })
      → read() → consumeAgent() → passthrough event → StreamAction
        → GatewayClient → use-gateway-chat.ts → chat.setError() + setIsLoading(false)

Note: Legacy error Messages also reach the Web client as "message" action (a plain text fallback). The agent_error event provides structured error info for proper UI rendering.


Path 3: Channel (Bot API, e.g. Telegram)

Send (User → Agent)

User sends message in Telegram
  → grammy long-polling receives Update
    → plugin.gateway.start() callback: onMessage(channelMessage)
      → ChannelManager.routeIncoming()
        → Set lastRoute = { plugin, deliveryCtx }   // reply goes back to Telegram
        → agent.write(text)                          // same as desktop/web

File: src/channels/manager.tsrouteIncoming() (line ~233)

Receive (Agent → User)

Agent runs LLM
  → pi-agent-core fires AgentEvent
    → Agent.subscribeAll() → AsyncAgent channel + subscribers
      → agent.subscribe() callback in ChannelManager.subscribeToAgent()
        → Check: if (!lastRoute) return         // no active channel route, skip
        → Filter: only assistant messages
        → message_start → createAggregator()    // MessageAggregator buffers/chunks text
        → message_update → aggregator.handleEvent()
        → message_end → aggregator.handleEvent() → null aggregator
          → Aggregator emits text blocks
            → Block 0: plugin.outbound.replyText(deliveryCtx, text)   // Telegram reply
            → Block N: plugin.outbound.sendText(deliveryCtx, text)    // follow-up messages

Files:

  • src/channels/manager.tssubscribeToAgent() (line ~151), createAggregator() (line ~205)
  • src/hub/message-aggregator.ts — text chunking/buffering logic

Error Handling

Agent.run() throws / returns error
  → AsyncAgent.write() catch block
    → agent.emitMulticaEvent({ type: "agent_error", error })
      → subscribe() → ChannelManager subscriber
        → if lastRoute exists:
          → plugin.outbound.sendText(deliveryCtx, "[Error] ${errorMsg}")

Comparison Table

Aspect Desktop (IPC) Web (WebSocket) Channel (Bot API)
Transport Electron IPC Socket.io via Gateway Bot API (HTTP)
Send entry localChat:send client.send → Gateway routeIncoming
Receive method agent.subscribe() agent.read() (iterable) agent.subscribe()
Consumer ipc/hub.ts subscriber hub.ts consumeAgent() manager.ts subscriber
Frontend hook use-local-chat.ts use-gateway-chat.ts N/A (Bot API)
State hook use-chat.ts use-chat.ts N/A
Reply routing Always (IPC channel) agentSenders Map lastRoute pattern
clearLastRoute Yes (on send) Yes (on send) No (sets lastRoute)
Error display agent_error → UI agent_error → UI agent_error → Bot text
Tool results Rendered in UI Rendered in UI Skipped (text only)
Text chunking No (full stream) No (full stream) Yes (MessageAggregator)

lastRoute Pattern

The lastRoute tracks which channel last sent a message. When the agent replies:

  • If lastRoute is set → reply goes to that channel (e.g. Telegram)
  • If lastRoute is null → reply goes to Desktop/Web only (via their own mechanisms)

Clearing: Desktop and Web both call channelManager.clearLastRoute() before agent.write(), so channel replies stop when the user switches to desktop/web.

Setting: routeIncoming() sets lastRoute when a channel message arrives.

Desktop and Web always receive agent events regardless of lastRoute — they use their own independent delivery mechanisms (IPC subscribe / Gateway read).


Event Filtering

All three paths filter raw agent events. Only these are forwarded to consumers:

Event Type Desktop Web Channel
message_start assistant only assistant only assistant only
message_update assistant only assistant only assistant only
message_end assistant only assistant only assistant only
tool_execution_start Yes Yes No
tool_execution_end Yes Yes No
compaction_start Yes (passthrough) Yes (passthrough) No
compaction_end Yes (passthrough) Yes (passthrough) No
agent_error Yes (passthrough) Yes (passthrough) Yes (→ text)
User message events Filtered out Filtered out Filtered out