From 901f5ba8045aca65b2eacd2100885eff770cb28b Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:04:07 +0800 Subject: [PATCH 1/2] docs: add dashboard design plan Co-Authored-By: Claude Opus 4.5 --- docs/plans/2026-02-12-dashboard-design.md | 292 ++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 docs/plans/2026-02-12-dashboard-design.md diff --git a/docs/plans/2026-02-12-dashboard-design.md b/docs/plans/2026-02-12-dashboard-design.md new file mode 100644 index 00000000..b30a271b --- /dev/null +++ b/docs/plans/2026-02-12-dashboard-design.md @@ -0,0 +1,292 @@ +# Dashboard Design Plan + +## Overview + +Build a runtime dashboard that shows "What is my Agent doing now, what will it do next, what did it do before" instead of "How is my Agent configured". + +## Goals + +1. Real-time visibility into Agent execution state +2. Monitor all input sources (Desktop IPC, Gateway, Channels) +3. Track sub-agents, processes, scheduled tasks +4. Provide control capabilities (cancel, retry) + +--- + +## Phase 1: Core Infrastructure + +### 1.1 Unified Event Stream + +Create a dashboard subscription mechanism in Hub that exposes all observable data. + +**File:** `packages/core/src/hub/dashboard.ts` + +```typescript +interface DashboardEvent { + timestamp: number; + agentId: string; + + // Source tracking + source: { + type: "local" | "gateway" | "channel"; + deviceId?: string; // Gateway source + channel?: string; // telegram | discord | slack + accountId?: string; + conversationId?: string; + }; + + // Original event + event: AgentEvent | MulticaEvent; +} + +interface DashboardSubscription { + subscribe(callback: (event: DashboardEvent) => void): () => void; + + // Query methods + getSnapshot(): DashboardSnapshot; +} +``` + +### 1.2 Dashboard Snapshot + +Queryable state for initial load and polling fallback. + +```typescript +interface DashboardSnapshot { + // Agent state + agents: Array<{ + id: string; + isRunning: boolean; + isStreaming: boolean; + pendingWrites: number; + lastError?: string; + }>; + + // Sub-agents + subagents: SubagentRunRecord[]; + + // Processes + processes: ProcessEntry[]; + + // Heartbeat + lastHeartbeat?: HeartbeatEventPayload; + + // Cron jobs + cronJobs: CronJobStatus[]; + + // Connections + gateway: { + connectionState: ConnectionState; + connectedDevices: number; + }; + channels: ChannelAccountState[]; +} +``` + +--- + +## Phase 2: IPC Layer (Desktop) + +### 2.1 New IPC Handlers + +**File:** `apps/desktop/src/main/ipc/dashboard.ts` + +```typescript +// Subscribe to dashboard events +ipcMain.handle('dashboard:subscribe', (agentId?: string) => { + // Returns subscription ID +}); + +// Get current snapshot +ipcMain.handle('dashboard:snapshot', () => { + return hub.getDashboardSnapshot(); +}); + +// Unsubscribe +ipcMain.handle('dashboard:unsubscribe', (subscriptionId) => {}); + +// Event push to renderer +mainWindow.webContents.send('dashboard:event', event); +``` + +### 2.2 Preload API + +**File:** `apps/desktop/src/preload/index.ts` + +```typescript +dashboard: { + subscribe: () => ipcRenderer.invoke('dashboard:subscribe'), + unsubscribe: () => ipcRenderer.invoke('dashboard:unsubscribe'), + getSnapshot: () => ipcRenderer.invoke('dashboard:snapshot'), + onEvent: (callback) => ipcRenderer.on('dashboard:event', callback), +} +``` + +--- + +## Phase 3: Frontend Store + +### 3.1 Dashboard Store + +**File:** `apps/desktop/src/renderer/src/stores/dashboard.ts` + +```typescript +interface DashboardState { + // Real-time + events: DashboardEvent[]; // Rolling buffer (last 100) + currentRun: { + agentId: string; + streamId: string; + messages: StreamingMessage[]; + tools: ToolExecution[]; + } | null; + + // Snapshot data + subagents: SubagentRunRecord[]; + processes: ProcessEntry[]; + heartbeat: HeartbeatEventPayload | null; + cronJobs: CronJobStatus[]; + + // Connection status + gateway: GatewayStatus; + channels: ChannelAccountState[]; + + // Actions + subscribe: () => void; + unsubscribe: () => void; + refresh: () => void; +} +``` + +--- + +## Phase 4: UI Components + +### 4.1 Dashboard Page + +**File:** `apps/desktop/src/renderer/src/pages/dashboard.tsx` + +Layout: +``` +┌─────────────────────────────────────────────────────────┐ +│ Dashboard [Refresh] │ +├─────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────┐ ┌─────────────────────────────┐ │ +│ │ Agent Status │ │ Current Activity │ │ +│ │ ● Running │ │ 💬 Generating response... │ │ +│ │ Pending: 2 │ │ 🔧 exec: npm test │ │ +│ └─────────────────┘ └─────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ Sub-agents │ │ +│ │ ┌─────────────────────────────────────────────┐ │ │ +│ │ │ 🟢 search-docs task: "Find API docs" │ │ │ +│ │ │ 🔵 analyze-code task: "Review PR #123" │ │ │ +│ │ │ 🟡 pending task: "Write tests" │ │ │ +│ │ └─────────────────────────────────────────────┘ │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ Running Processes │ │ +│ │ ┌─────────────────────────────────────────────┐ │ │ +│ │ │ npm test PID: 12345 ⏱️ 2m 30s │ │ │ +│ │ │ > Running 45/100 tests... │ │ │ +│ │ └─────────────────────────────────────────────┘ │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ Heartbeat │ │ Gateway │ │ Channels │ │ +│ │ ✅ 30s ago │ │ 🟢 2 devices │ │ 📱 Telegram │ │ +│ └──────────────┘ └──────────────┘ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────┘ +``` + +### 4.2 Components + +- `AgentStatusCard` - Running/Idle/Error state +- `CurrentActivityFeed` - Real-time event stream +- `SubagentList` - Sub-agent status with progress +- `ProcessList` - Running bash processes with output preview +- `HeartbeatIndicator` - Health status +- `ConnectionStatus` - Gateway + Channels + +--- + +## Phase 5: Control Actions + +### 5.1 Process Control + +```typescript +// Stop a running process +ipcMain.handle('dashboard:stopProcess', (processId) => { + return processRegistry.stop(processId); +}); +``` + +### 5.2 Agent Control (Future) + +- Cancel current run (requires AbortController wiring) +- Cancel sub-agent +- Retry failed operation + +--- + +## Implementation Order + +### Step 1: Core Infrastructure +- [ ] Create `packages/core/src/hub/dashboard.ts` +- [ ] Add `getDashboardSnapshot()` to Hub +- [ ] Add source tracking to event stream + +### Step 2: IPC Layer +- [ ] Create `apps/desktop/src/main/ipc/dashboard.ts` +- [ ] Add preload API +- [ ] Wire up event forwarding + +### Step 3: Store +- [ ] Create `useDashboardStore` +- [ ] Implement subscription lifecycle + +### Step 4: UI +- [ ] Create dashboard page +- [ ] Build individual components +- [ ] Add to navigation + +### Step 5: Polish +- [ ] Error handling +- [ ] Loading states +- [ ] Empty states + +--- + +## Data Sources Summary + +| Data | Source | Real-time | Query | +|------|--------|-----------|-------| +| Agent events | `agent.subscribe()` | ✅ Push | - | +| Sub-agents | `listSubagentRuns()` | 🔄 Poll | ✅ | +| Processes | `PROCESS_REGISTRY` | 🔄 Poll | ✅ | +| Heartbeat | `onHeartbeatEvent()` | ✅ Push | ✅ | +| Gateway | `onConnectionStateChange()` | ✅ Push | ✅ | +| Channels | `listAccountStates()` | 🔄 Poll | ✅ | + +--- + +## Open Questions + +1. **Event buffer size** - How many events to keep in memory? +2. **Polling interval** - For non-push data, how often to refresh? +3. **Sub-agent drill-down** - Can we subscribe to child agent events? +4. **Process output streaming** - Stream tail buffer in real-time? + +--- + +## References + +- Agent event types: `packages/core/src/agent/events.ts` +- Sub-agent registry: `packages/core/src/agent/subagent/registry.ts` +- Process registry: `packages/core/src/agent/tools/process-registry.ts` +- Heartbeat: `packages/core/src/heartbeat/` +- Channel manager: `packages/core/src/channels/manager.ts` From 8199dde1b674470bf8927de05e7ba09c89d93a5c Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Thu, 12 Feb 2026 18:45:15 +0800 Subject: [PATCH 2/2] feat(core): add message source tracking and persistence - Add MessageSource type to track where user messages originate (local, gateway, channel) - Broadcast inbound messages from all channels to local Desktop UI via Hub.onInboundMessage() - Persist source field in JSONL session storage so it survives page refresh - Display source icon (Monitor/Smartphone/Send) with tooltip for non-local user messages Co-Authored-By: Claude Opus 4.5 --- apps/desktop/src/main/electron-env.d.ts | 14 +++++ apps/desktop/src/main/ipc/hub.ts | 17 +++++- apps/desktop/src/preload/index.ts | 19 ++++++ .../src/renderer/src/hooks/use-local-chat.ts | 15 ++++- packages/core/src/agent/async-agent.ts | 4 +- packages/core/src/agent/runner.ts | 12 +++- .../core/src/agent/session/session-manager.ts | 23 ++++--- packages/core/src/agent/session/types.ts | 8 +++ packages/core/src/channels/manager.ts | 15 ++++- packages/core/src/hub/hub.ts | 41 ++++++++++++- packages/core/src/hub/index.ts | 1 + packages/hooks/src/index.ts | 2 + packages/hooks/src/use-chat.ts | 14 ++++- packages/sdk/src/actions/index.ts | 1 + packages/sdk/src/actions/rpc.ts | 13 +++- packages/store/src/index.ts | 2 +- packages/store/src/types.ts | 8 +++ packages/ui/src/components/message-list.tsx | 7 ++- .../ui/src/components/message-source-icon.tsx | 61 +++++++++++++++++++ 19 files changed, 251 insertions(+), 26 deletions(-) create mode 100644 packages/ui/src/components/message-source-icon.tsx diff --git a/apps/desktop/src/main/electron-env.d.ts b/apps/desktop/src/main/electron-env.d.ts index 0718e47c..94137db5 100644 --- a/apps/desktop/src/main/electron-env.d.ts +++ b/apps/desktop/src/main/electron-env.d.ts @@ -149,6 +149,18 @@ interface ChannelAccountStateInfo { error?: string } +type MessageSource = + | { type: 'local' } + | { type: 'gateway'; deviceId: string } + | { type: 'channel'; channelId: string; accountId: string; conversationId: string } + +interface InboundMessageEvent { + agentId: string + content: string + source: MessageSource + timestamp: number +} + interface ElectronAPI { app: { getFlags: () => Promise<{ forceOnboarding: boolean }> @@ -178,6 +190,8 @@ interface ElectronAPI { offConnectionStateChanged: () => void onDevicesChanged: (callback: () => void) => void offDevicesChanged: () => void + onInboundMessage: (callback: (event: InboundMessageEvent) => void) => void + offInboundMessage: () => void } tools: { list: () => Promise diff --git a/apps/desktop/src/main/ipc/hub.ts b/apps/desktop/src/main/ipc/hub.ts index 9dc407ee..b6bd7017 100644 --- a/apps/desktop/src/main/ipc/hub.ts +++ b/apps/desktop/src/main/ipc/hub.ts @@ -389,7 +389,15 @@ export function registerHubIpcHandlers(): void { } h.channelManager.clearLastRoute() - agent.write(content) + const source = { type: 'local' as const } + // Broadcast as local source (for consistency, though UI already knows) + h.broadcastInbound({ + agentId, + content, + source, + timestamp: Date.now(), + }) + agent.write(content, { source }) safeLog(`[IPC] Local chat message sent to agent: ${agentId}`) return { ok: true } }) @@ -496,6 +504,13 @@ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): voi mainWindow.webContents.send('hub:connection-state-changed', state) } }) + + // Forward inbound messages from all sources (gateway, channel) to renderer + h.onInboundMessage((event) => { + if (!mainWindow.isDestroyed()) { + mainWindow.webContents.send('hub:inbound-message', event) + } + }) } /** diff --git a/apps/desktop/src/preload/index.ts b/apps/desktop/src/preload/index.ts index cc70dc46..6c9dba62 100644 --- a/apps/desktop/src/preload/index.ts +++ b/apps/desktop/src/preload/index.ts @@ -81,6 +81,19 @@ export interface LocalChatEvent { } } +// Inbound message event (from any source: local, gateway, channel) +export type MessageSource = + | { type: 'local' } + | { type: 'gateway'; deviceId: string } + | { type: 'channel'; channelId: string; accountId: string; conversationId: string } + +export interface InboundMessageEvent { + agentId: string + content: string + source: MessageSource + timestamp: number +} + // Local chat approval request (mirrors ExecApprovalRequestPayload from @multica/sdk) export interface LocalChatApproval { approvalId: string @@ -149,6 +162,12 @@ const electronAPI = { offDevicesChanged: () => { ipcRenderer.removeAllListeners('hub:devices-changed') }, + onInboundMessage: (callback: (event: InboundMessageEvent) => void) => { + ipcRenderer.on('hub:inbound-message', (_event, data: InboundMessageEvent) => callback(data)) + }, + offInboundMessage: () => { + ipcRenderer.removeAllListeners('hub:inbound-message') + }, }, // Tools management diff --git a/apps/desktop/src/renderer/src/hooks/use-local-chat.ts b/apps/desktop/src/renderer/src/hooks/use-local-chat.ts index ec3113ae..1f5fce59 100644 --- a/apps/desktop/src/renderer/src/hooks/use-local-chat.ts +++ b/apps/desktop/src/renderer/src/hooks/use-local-chat.ts @@ -1,5 +1,5 @@ import { useState, useEffect, useCallback, useRef } from 'react' -import { useChat } from '@multica/hooks/use-chat' +import { useChat, type MessageSource } from '@multica/hooks/use-chat' import type { StreamPayload, ExecApprovalRequestPayload, @@ -75,6 +75,16 @@ export function useLocalChat() { chatRef.current.addApproval(approval as ExecApprovalRequestPayload) }) + // Listen for inbound messages from all sources (gateway, channel) + // This allows the local UI to display messages from other sources + window.electronAPI.hub.onInboundMessage((event: InboundMessageEvent) => { + // Only add non-local messages (local messages are added by sendMessage) + if (event.source.type !== 'local' && event.agentId === agentId) { + chatRef.current.addUserMessage(event.content, event.agentId, event.source as MessageSource) + setIsLoading(true) + } + }) + // Fetch history with pagination window.electronAPI.localChat.getHistory(agentId, { limit: DEFAULT_MESSAGES_LIMIT }) .then((result) => { @@ -93,6 +103,7 @@ export function useLocalChat() { return () => { window.electronAPI.localChat.offEvent() window.electronAPI.localChat.offApproval() + window.electronAPI.hub.offInboundMessage() window.electronAPI.localChat.unsubscribe(agentId).catch(() => {}) } }, [agentId]) @@ -101,7 +112,7 @@ export function useLocalChat() { (text: string) => { const trimmed = text.trim() if (!trimmed || !agentId) return - chatRef.current.addUserMessage(trimmed, agentId) + chatRef.current.addUserMessage(trimmed, agentId, { type: 'local' }) chatRef.current.setError(null) window.electronAPI.localChat.send(agentId, trimmed).catch(() => {}) setIsLoading(true) diff --git a/packages/core/src/agent/async-agent.ts b/packages/core/src/agent/async-agent.ts index d5a206b9..90763ff1 100644 --- a/packages/core/src/agent/async-agent.ts +++ b/packages/core/src/agent/async-agent.ts @@ -40,6 +40,8 @@ export interface WriteInternalOptions { export interface WriteOptions { /** Disable automatic message timestamp injection */ injectTimestamp?: boolean | undefined; + /** Message source (where did this message come from?) */ + source?: import("./session/types.js").MessageSource | undefined; } export class AsyncAgent { @@ -89,7 +91,7 @@ export class AsyncAgent { return; } console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`); - const result = await this.agent.run(message, { displayPrompt: content }); + const result = await this.agent.run(message, { displayPrompt: content, source: options?.source }); console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`); // Flush pending session writes so waitForIdle() callers // can safely read session data from disk. diff --git a/packages/core/src/agent/runner.ts b/packages/core/src/agent/runner.ts index 105eee43..ed2ef62e 100644 --- a/packages/core/src/agent/runner.ts +++ b/packages/core/src/agent/runner.ts @@ -107,6 +107,7 @@ export class Agent { private _runMutex: Promise = Promise.resolve(); private _compactionPromise: Promise = Promise.resolve(); private currentUserDisplayPrompt: string | undefined; + private currentUserSource: import("./session/types.js").MessageSource | undefined; // MulticaEvent subscribers (parallel to PiAgentCore's subscriber list) // Typed as AgentEvent | MulticaEvent to match subscribeAll() callback signature @@ -395,7 +396,7 @@ export class Agent { async run( prompt: string, - options?: { displayPrompt?: string }, + options?: { displayPrompt?: string; source?: import("./session/types.js").MessageSource }, ): Promise { // Run-level mutex: prevents concurrent run/runInternal from mis-tagging messages return this.withRunMutex(() => this._run(prompt, options)); @@ -440,7 +441,7 @@ export class Agent { private async _run( prompt: string, - options?: { displayPrompt?: string }, + options?: { displayPrompt?: string; source?: import("./session/types.js").MessageSource }, ): Promise { // Wait for any in-flight compaction from the previous run await this._compactionPromise; @@ -448,6 +449,7 @@ export class Agent { this.refreshAuthState(); this.output.state.lastAssistantText = ""; this.currentUserDisplayPrompt = options?.displayPrompt; + this.currentUserSource = options?.source; this._isRunning = true; this._aborted = false; @@ -563,6 +565,7 @@ export class Agent { this._isRunning = false; this._aborted = false; this.currentUserDisplayPrompt = undefined; + this.currentUserSource = undefined; } } @@ -654,13 +657,16 @@ export class Agent { private handleSessionEvent(event: AgentEvent) { if (event.type === "message_end") { const message = event.message as AgentMessage; - const saveOptions: { internal?: boolean; displayContent?: UserMessage["content"] } = {}; + const saveOptions: { internal?: boolean; displayContent?: UserMessage["content"]; source?: import("./session/types.js").MessageSource } = {}; if (this._internalRun) { saveOptions.internal = true; } if (message.role === "user" && this.currentUserDisplayPrompt !== undefined) { saveOptions.displayContent = this.currentUserDisplayPrompt; } + if (message.role === "user" && this.currentUserSource !== undefined) { + saveOptions.source = this.currentUserSource; + } this.session.saveMessage(message, Object.keys(saveOptions).length > 0 ? saveOptions : undefined); // Skip compaction during internal runs — internal messages will be // rolled back from memory afterwards, so compacting now would be incorrect. diff --git a/packages/core/src/agent/session/session-manager.ts b/packages/core/src/agent/session/session-manager.ts index 9abeaf99..df494d61 100644 --- a/packages/core/src/agent/session/session-manager.ts +++ b/packages/core/src/agent/session/session-manager.ts @@ -171,14 +171,14 @@ export class SessionManager { return this.loadMessagesFromEntries(options, false); } - loadMessagesForDisplay(options?: { includeInternal?: boolean }): AgentMessage[] { + loadMessagesForDisplay(options?: { includeInternal?: boolean }): (AgentMessage & { source?: import("./types.js").MessageSource })[] { return this.loadMessagesFromEntries(options, true); } private loadMessagesFromEntries( options: { includeInternal?: boolean } | undefined, preferDisplayContent: boolean, - ): AgentMessage[] { + ): (AgentMessage & { source?: import("./types.js").MessageSource })[] { const entries = this.loadEntries(); let messages = entries .filter((entry) => { @@ -188,17 +188,19 @@ export class SessionManager { }) .map((entry) => { const messageEntry = entry as Extract; - if ( - preferDisplayContent + const base = preferDisplayContent && messageEntry.message.role === "user" && messageEntry.displayContent !== undefined - ) { - return { ...messageEntry.message, content: messageEntry.displayContent }; + ? { ...messageEntry.message, content: messageEntry.displayContent } + : messageEntry.message; + // Include source for user messages + if (messageEntry.source && messageEntry.message.role === "user") { + return { ...base, source: messageEntry.source }; } - return messageEntry.message; + return base; }); - messages = sanitizeToolCallInputs(messages); - messages = sanitizeToolUseResultPairing(messages); + messages = sanitizeToolCallInputs(messages) as typeof messages; + messages = sanitizeToolUseResultPairing(messages) as typeof messages; return messages; } @@ -230,7 +232,7 @@ export class SessionManager { saveMessage( message: AgentMessage, - options?: { internal?: boolean; displayContent?: UserMessage["content"] }, + options?: { internal?: boolean; displayContent?: UserMessage["content"]; source?: import("./types.js").MessageSource }, ) { void this.enqueue(() => appendEntry( @@ -243,6 +245,7 @@ export class SessionManager { ...(options?.displayContent !== undefined ? { displayContent: options.displayContent } : {}), + ...(options?.source !== undefined ? { source: options.source } : {}), }, { baseDir: this.baseDir }, ), diff --git a/packages/core/src/agent/session/types.ts b/packages/core/src/agent/session/types.ts index ec734424..75fdbdb1 100644 --- a/packages/core/src/agent/session/types.ts +++ b/packages/core/src/agent/session/types.ts @@ -1,6 +1,12 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { UserMessage } from "@mariozechner/pi-ai"; +/** Message source: where did this message come from? */ +export type MessageSource = + | { type: "local" } + | { type: "gateway"; deviceId: string } + | { type: "channel"; channelId: string; accountId: string; conversationId: string }; + export type SessionMeta = { provider?: string; model?: string; @@ -22,6 +28,8 @@ export type SessionEntry = * When omitted, consumers should fall back to message.content. */ displayContent?: UserMessage["content"]; + /** Message source (only for user messages) */ + source?: MessageSource; } | { type: "meta"; meta: SessionMeta; timestamp: number } | { diff --git a/packages/core/src/channels/manager.ts b/packages/core/src/channels/manager.ts index a7d5f248..5cb21f4d 100644 --- a/packages/core/src/channels/manager.ts +++ b/packages/core/src/channels/manager.ts @@ -461,12 +461,25 @@ export class ChannelManager { const route = this.lastRoute ? { ...this.lastRoute } : null; const acks = [...this.ackBuffer]; this.ackBuffer = []; + const source = route ? { + type: "channel" as const, + channelId: route.plugin.id, + accountId: route.deliveryCtx.accountId, + conversationId: route.deliveryCtx.conversationId, + } : undefined; if (route) { this.pendingRoutes.push({ route, acks }); + // Broadcast inbound message to local listeners (Desktop UI) + this.hub.broadcastInbound({ + agentId: agent.sessionId, + content: combinedText, + source: source!, + timestamp: Date.now(), + }); } const replyTo = route?.deliveryCtx.replyToMessageId ?? "?"; console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`); - agent.write(combinedText); + agent.write(combinedText, { source }); }, ); } diff --git a/packages/core/src/hub/hub.ts b/packages/core/src/hub/hub.ts index f9bc7938..a45382fd 100644 --- a/packages/core/src/hub/hub.ts +++ b/packages/core/src/hub/hub.ts @@ -51,6 +51,22 @@ import { import { enqueueSystemEvent } from "../heartbeat/system-events.js"; import { isHeartbeatAckEvent } from "./heartbeat-filter.js"; +// ============ Message Source Types ============ + +/** Message source: where did this inbound message come from? */ +export type MessageSource = + | { type: "local" } + | { type: "gateway"; deviceId: string } + | { type: "channel"; channelId: string; accountId: string; conversationId: string }; + +/** Inbound message event broadcast to all listeners */ +export interface InboundMessageEvent { + agentId: string; + content: string; + source: MessageSource; + timestamp: number; +} + export class Hub { private readonly agents = new Map(); private readonly agentSenders = new Map(); @@ -59,6 +75,7 @@ export class Hub { private readonly pendingAssistantStarts = new Map(); private readonly suppressedStreamAgents = new Set(); private readonly localApprovalHandlers = new Map void>(); + private readonly inboundListeners = new Set<(event: InboundMessageEvent) => void>(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>(); @@ -268,7 +285,14 @@ export class Hub { if (agent && !agent.closed) { this.agentSenders.set(agentId, msg.from); this.channelManager.clearLastRoute(); - agent.write(content); + const source: MessageSource = { type: "gateway", deviceId: msg.from }; + this.broadcastInbound({ + agentId, + content, + source, + timestamp: Date.now(), + }); + agent.write(content, { source }); } else { console.warn(`[Hub] Agent not found or closed: ${agentId}`); } @@ -295,6 +319,21 @@ export class Hub { }; } + /** Subscribe to inbound messages from all sources. Returns unsubscribe function. */ + onInboundMessage(callback: (event: InboundMessageEvent) => void): () => void { + this.inboundListeners.add(callback); + return () => { + this.inboundListeners.delete(callback); + }; + } + + /** Broadcast an inbound message to all listeners */ + broadcastInbound(event: InboundMessageEvent): void { + for (const listener of this.inboundListeners) { + listener(event); + } + } + /** Register a one-time token for device verification (called when QR code is generated) */ registerToken(token: string, agentId: string, expiresAt: number): void { this.deviceStore.registerToken(token, agentId, expiresAt); diff --git a/packages/core/src/hub/index.ts b/packages/core/src/hub/index.ts index fa552708..0b47e705 100644 --- a/packages/core/src/hub/index.ts +++ b/packages/core/src/hub/index.ts @@ -1,3 +1,4 @@ export { Hub } from "./hub.js"; +export type { MessageSource, InboundMessageEvent } from "./hub.js"; export { getHubId } from "./hub-identity.js"; export type { HubOptions } from "./types.js"; diff --git a/packages/hooks/src/index.ts b/packages/hooks/src/index.ts index e9a61966..69756b24 100644 --- a/packages/hooks/src/index.ts +++ b/packages/hooks/src/index.ts @@ -8,9 +8,11 @@ export type { export { useChat } from "./use-chat"; export type { Message, + MessageSource, ToolStatus, ChatError, PendingApproval, + CompactionInfo, UseChatReturn, } from "./use-chat"; diff --git a/packages/hooks/src/use-chat.ts b/packages/hooks/src/use-chat.ts index 208f06b2..32d722fa 100644 --- a/packages/hooks/src/use-chat.ts +++ b/packages/hooks/src/use-chat.ts @@ -22,6 +22,12 @@ export interface CompactionInfo { reason: string; } +/** Message source: where did this message come from? */ +export type MessageSource = + | { type: "local" } + | { type: "gateway"; deviceId: string } + | { type: "channel"; channelId: string; accountId: string; conversationId: string }; + export interface Message { id: string; role: "user" | "assistant" | "toolResult" | "system"; @@ -35,6 +41,8 @@ export interface Message { isError?: boolean; systemType?: "compaction"; compaction?: CompactionInfo; + /** Message source (only for user messages) */ + source?: MessageSource; } export interface ChatError { @@ -95,7 +103,7 @@ export function useChat() { const loaded: Message[] = []; for (const m of raw) { if (m.role === "user") { - loaded.push({ id: uuidv7(), role: "user", content: toContentBlocks(m.content), agentId }); + loaded.push({ id: uuidv7(), role: "user", content: toContentBlocks(m.content), agentId, source: m.source }); } else if (m.role === "assistant") { loaded.push({ id: uuidv7(), role: "assistant", content: toContentBlocks(m.content), agentId, stopReason: m.stopReason }); } else if (m.role === "toolResult") { @@ -133,10 +141,10 @@ export function useChat() { }, [convertMessages]); /** Add a user message */ - const addUserMessage = useCallback((text: string, agentId: string) => { + const addUserMessage = useCallback((text: string, agentId: string, source?: MessageSource) => { setMessages((prev) => [ ...prev, - { id: uuidv7(), role: "user", content: [{ type: "text", text }], agentId }, + { id: uuidv7(), role: "user", content: [{ type: "text", text }], agentId, source }, ]); }, []); diff --git a/packages/sdk/src/actions/index.ts b/packages/sdk/src/actions/index.ts index b3ef94e3..13ad3a55 100644 --- a/packages/sdk/src/actions/index.ts +++ b/packages/sdk/src/actions/index.ts @@ -15,6 +15,7 @@ export { isResponseSuccess, isResponseError, type AgentMessageItem, + type MessageSource, DEFAULT_MESSAGES_LIMIT, type GetAgentMessagesParams, type GetAgentMessagesResult, diff --git a/packages/sdk/src/actions/rpc.ts b/packages/sdk/src/actions/rpc.ts index 5fd587d3..26921944 100644 --- a/packages/sdk/src/actions/rpc.ts +++ b/packages/sdk/src/actions/rpc.ts @@ -70,11 +70,20 @@ export interface GetAgentMessagesParams { limit?: number; } +/** Message source: where did this message come from? */ +export type MessageSource = + | { type: "local" } + | { type: "gateway"; deviceId: string } + | { type: "channel"; channelId: string; accountId: string; conversationId: string }; + /** * Agent message returned by getAgentMessages. - * This is pi-ai's Message type — the backend returns it as-is from SessionManager.loadMessages(). + * Extends pi-ai's Message type with optional source field. */ -export type AgentMessageItem = Message; +export type AgentMessageItem = Message & { + /** Message source (only for user messages) */ + source?: MessageSource; +}; /** getAgentMessages - response payload */ export interface GetAgentMessagesResult { diff --git a/packages/store/src/index.ts b/packages/store/src/index.ts index a048368d..fca0d40b 100644 --- a/packages/store/src/index.ts +++ b/packages/store/src/index.ts @@ -1,3 +1,3 @@ -export type { Message, ToolStatus } from "./types" +export type { Message, MessageSource, ToolStatus } from "./types" export { parseConnectionCode } from "./connection" export type { ConnectionInfo } from "./connection" diff --git a/packages/store/src/types.ts b/packages/store/src/types.ts index d0d48340..858b740e 100644 --- a/packages/store/src/types.ts +++ b/packages/store/src/types.ts @@ -2,6 +2,12 @@ import type { ContentBlock } from "@multica/sdk" export type ToolStatus = "running" | "success" | "error" | "interrupted" +/** Message source: where did this message come from? */ +export type MessageSource = + | { type: "local" } + | { type: "gateway"; deviceId: string } + | { type: "channel"; channelId: string; accountId: string; conversationId: string } + export interface CompactionInfo { removed: number kept: number @@ -23,4 +29,6 @@ export interface Message { isError?: boolean systemType?: "compaction" compaction?: CompactionInfo + /** Message source (only for user messages) */ + source?: MessageSource } diff --git a/packages/ui/src/components/message-list.tsx b/packages/ui/src/components/message-list.tsx index b2ea2fa2..2ff46d5b 100644 --- a/packages/ui/src/components/message-list.tsx +++ b/packages/ui/src/components/message-list.tsx @@ -6,6 +6,7 @@ import { StreamingMarkdown } from "@multica/ui/components/markdown/StreamingMark import { ToolCallItem } from "@multica/ui/components/tool-call-item"; import { ThinkingItem } from "@multica/ui/components/thinking-item"; import { CompactionItem } from "@multica/ui/components/compaction-item"; +import { MessageSourceIcon } from "@multica/ui/components/message-source-icon"; import { cn, getTextContent } from "@multica/ui/lib/utils"; import type { Message } from "@multica/store"; import type { ContentBlock, ToolCall, ThinkingContent } from "@multica/sdk"; @@ -114,10 +115,14 @@ export const MessageList = memo(function MessageList({ messages, streamingIds }: {(text || isStreaming) && (
+ {/* Source icon for non-local user messages */} + {msg.role === "user" && msg.source && msg.source.type !== "local" && ( + + )}
+ + + + + + {tooltip} + + + + ); +}