Merge pull request #146 from multica-ai/feat/message-source-tracking
feat(core): add message source tracking and persistence
This commit is contained in:
commit
71433fc4ed
20 changed files with 543 additions and 26 deletions
14
apps/desktop/src/main/electron-env.d.ts
vendored
14
apps/desktop/src/main/electron-env.d.ts
vendored
|
|
@ -149,6 +149,18 @@ interface ChannelAccountStateInfo {
|
|||
error?: string
|
||||
}
|
||||
|
||||
type MessageSource =
|
||||
| { type: 'local' }
|
||||
| { type: 'gateway'; deviceId: string }
|
||||
| { type: 'channel'; channelId: string; accountId: string; conversationId: string }
|
||||
|
||||
interface InboundMessageEvent {
|
||||
agentId: string
|
||||
content: string
|
||||
source: MessageSource
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
interface ElectronAPI {
|
||||
app: {
|
||||
getFlags: () => Promise<{ forceOnboarding: boolean }>
|
||||
|
|
@ -178,6 +190,8 @@ interface ElectronAPI {
|
|||
offConnectionStateChanged: () => void
|
||||
onDevicesChanged: (callback: () => void) => void
|
||||
offDevicesChanged: () => void
|
||||
onInboundMessage: (callback: (event: InboundMessageEvent) => void) => void
|
||||
offInboundMessage: () => void
|
||||
}
|
||||
tools: {
|
||||
list: () => Promise<ToolInfo[]>
|
||||
|
|
|
|||
|
|
@ -391,7 +391,15 @@ export function registerHubIpcHandlers(): void {
|
|||
}
|
||||
|
||||
h.channelManager.clearLastRoute()
|
||||
agent.write(content)
|
||||
const source = { type: 'local' as const }
|
||||
// Broadcast as local source (for consistency, though UI already knows)
|
||||
h.broadcastInbound({
|
||||
agentId,
|
||||
content,
|
||||
source,
|
||||
timestamp: Date.now(),
|
||||
})
|
||||
agent.write(content, { source })
|
||||
safeLog(`[IPC] Local chat message sent to agent: ${agentId}`)
|
||||
return { ok: true }
|
||||
})
|
||||
|
|
@ -498,6 +506,13 @@ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): voi
|
|||
mainWindow.webContents.send('hub:connection-state-changed', state)
|
||||
}
|
||||
})
|
||||
|
||||
// Forward inbound messages from all sources (gateway, channel) to renderer
|
||||
h.onInboundMessage((event) => {
|
||||
if (!mainWindow.isDestroyed()) {
|
||||
mainWindow.webContents.send('hub:inbound-message', event)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -81,6 +81,19 @@ export interface LocalChatEvent {
|
|||
}
|
||||
}
|
||||
|
||||
// Inbound message event (from any source: local, gateway, channel)
|
||||
export type MessageSource =
|
||||
| { type: 'local' }
|
||||
| { type: 'gateway'; deviceId: string }
|
||||
| { type: 'channel'; channelId: string; accountId: string; conversationId: string }
|
||||
|
||||
export interface InboundMessageEvent {
|
||||
agentId: string
|
||||
content: string
|
||||
source: MessageSource
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
// Local chat approval request (mirrors ExecApprovalRequestPayload from @multica/sdk)
|
||||
export interface LocalChatApproval {
|
||||
approvalId: string
|
||||
|
|
@ -149,6 +162,12 @@ const electronAPI = {
|
|||
offDevicesChanged: () => {
|
||||
ipcRenderer.removeAllListeners('hub:devices-changed')
|
||||
},
|
||||
onInboundMessage: (callback: (event: InboundMessageEvent) => void) => {
|
||||
ipcRenderer.on('hub:inbound-message', (_event, data: InboundMessageEvent) => callback(data))
|
||||
},
|
||||
offInboundMessage: () => {
|
||||
ipcRenderer.removeAllListeners('hub:inbound-message')
|
||||
},
|
||||
},
|
||||
|
||||
// Tools management
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { useState, useEffect, useCallback, useRef } from 'react'
|
||||
import { useChat } from '@multica/hooks/use-chat'
|
||||
import { useChat, type MessageSource } from '@multica/hooks/use-chat'
|
||||
import type {
|
||||
StreamPayload,
|
||||
ExecApprovalRequestPayload,
|
||||
|
|
@ -75,6 +75,16 @@ export function useLocalChat() {
|
|||
chatRef.current.addApproval(approval as ExecApprovalRequestPayload)
|
||||
})
|
||||
|
||||
// Listen for inbound messages from all sources (gateway, channel)
|
||||
// This allows the local UI to display messages from other sources
|
||||
window.electronAPI.hub.onInboundMessage((event: InboundMessageEvent) => {
|
||||
// Only add non-local messages (local messages are added by sendMessage)
|
||||
if (event.source.type !== 'local' && event.agentId === agentId) {
|
||||
chatRef.current.addUserMessage(event.content, event.agentId, event.source as MessageSource)
|
||||
setIsLoading(true)
|
||||
}
|
||||
})
|
||||
|
||||
// Fetch history with pagination
|
||||
window.electronAPI.localChat.getHistory(agentId, { limit: DEFAULT_MESSAGES_LIMIT })
|
||||
.then((result) => {
|
||||
|
|
@ -93,6 +103,7 @@ export function useLocalChat() {
|
|||
return () => {
|
||||
window.electronAPI.localChat.offEvent()
|
||||
window.electronAPI.localChat.offApproval()
|
||||
window.electronAPI.hub.offInboundMessage()
|
||||
window.electronAPI.localChat.unsubscribe(agentId).catch(() => {})
|
||||
}
|
||||
}, [agentId])
|
||||
|
|
@ -101,7 +112,7 @@ export function useLocalChat() {
|
|||
(text: string) => {
|
||||
const trimmed = text.trim()
|
||||
if (!trimmed || !agentId) return
|
||||
chatRef.current.addUserMessage(trimmed, agentId)
|
||||
chatRef.current.addUserMessage(trimmed, agentId, { type: 'local' })
|
||||
chatRef.current.setError(null)
|
||||
window.electronAPI.localChat.send(agentId, trimmed).catch(() => {})
|
||||
setIsLoading(true)
|
||||
|
|
|
|||
292
docs/plans/2026-02-12-dashboard-design.md
Normal file
292
docs/plans/2026-02-12-dashboard-design.md
Normal file
|
|
@ -0,0 +1,292 @@
|
|||
# Dashboard Design Plan
|
||||
|
||||
## Overview
|
||||
|
||||
Build a runtime dashboard that shows "What is my Agent doing now, what will it do next, what did it do before" instead of "How is my Agent configured".
|
||||
|
||||
## Goals
|
||||
|
||||
1. Real-time visibility into Agent execution state
|
||||
2. Monitor all input sources (Desktop IPC, Gateway, Channels)
|
||||
3. Track sub-agents, processes, scheduled tasks
|
||||
4. Provide control capabilities (cancel, retry)
|
||||
|
||||
---
|
||||
|
||||
## Phase 1: Core Infrastructure
|
||||
|
||||
### 1.1 Unified Event Stream
|
||||
|
||||
Create a dashboard subscription mechanism in Hub that exposes all observable data.
|
||||
|
||||
**File:** `packages/core/src/hub/dashboard.ts`
|
||||
|
||||
```typescript
|
||||
interface DashboardEvent {
|
||||
timestamp: number;
|
||||
agentId: string;
|
||||
|
||||
// Source tracking
|
||||
source: {
|
||||
type: "local" | "gateway" | "channel";
|
||||
deviceId?: string; // Gateway source
|
||||
channel?: string; // telegram | discord | slack
|
||||
accountId?: string;
|
||||
conversationId?: string;
|
||||
};
|
||||
|
||||
// Original event
|
||||
event: AgentEvent | MulticaEvent;
|
||||
}
|
||||
|
||||
interface DashboardSubscription {
|
||||
subscribe(callback: (event: DashboardEvent) => void): () => void;
|
||||
|
||||
// Query methods
|
||||
getSnapshot(): DashboardSnapshot;
|
||||
}
|
||||
```
|
||||
|
||||
### 1.2 Dashboard Snapshot
|
||||
|
||||
Queryable state for initial load and polling fallback.
|
||||
|
||||
```typescript
|
||||
interface DashboardSnapshot {
|
||||
// Agent state
|
||||
agents: Array<{
|
||||
id: string;
|
||||
isRunning: boolean;
|
||||
isStreaming: boolean;
|
||||
pendingWrites: number;
|
||||
lastError?: string;
|
||||
}>;
|
||||
|
||||
// Sub-agents
|
||||
subagents: SubagentRunRecord[];
|
||||
|
||||
// Processes
|
||||
processes: ProcessEntry[];
|
||||
|
||||
// Heartbeat
|
||||
lastHeartbeat?: HeartbeatEventPayload;
|
||||
|
||||
// Cron jobs
|
||||
cronJobs: CronJobStatus[];
|
||||
|
||||
// Connections
|
||||
gateway: {
|
||||
connectionState: ConnectionState;
|
||||
connectedDevices: number;
|
||||
};
|
||||
channels: ChannelAccountState[];
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phase 2: IPC Layer (Desktop)
|
||||
|
||||
### 2.1 New IPC Handlers
|
||||
|
||||
**File:** `apps/desktop/src/main/ipc/dashboard.ts`
|
||||
|
||||
```typescript
|
||||
// Subscribe to dashboard events
|
||||
ipcMain.handle('dashboard:subscribe', (agentId?: string) => {
|
||||
// Returns subscription ID
|
||||
});
|
||||
|
||||
// Get current snapshot
|
||||
ipcMain.handle('dashboard:snapshot', () => {
|
||||
return hub.getDashboardSnapshot();
|
||||
});
|
||||
|
||||
// Unsubscribe
|
||||
ipcMain.handle('dashboard:unsubscribe', (subscriptionId) => {});
|
||||
|
||||
// Event push to renderer
|
||||
mainWindow.webContents.send('dashboard:event', event);
|
||||
```
|
||||
|
||||
### 2.2 Preload API
|
||||
|
||||
**File:** `apps/desktop/src/preload/index.ts`
|
||||
|
||||
```typescript
|
||||
dashboard: {
|
||||
subscribe: () => ipcRenderer.invoke('dashboard:subscribe'),
|
||||
unsubscribe: () => ipcRenderer.invoke('dashboard:unsubscribe'),
|
||||
getSnapshot: () => ipcRenderer.invoke('dashboard:snapshot'),
|
||||
onEvent: (callback) => ipcRenderer.on('dashboard:event', callback),
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phase 3: Frontend Store
|
||||
|
||||
### 3.1 Dashboard Store
|
||||
|
||||
**File:** `apps/desktop/src/renderer/src/stores/dashboard.ts`
|
||||
|
||||
```typescript
|
||||
interface DashboardState {
|
||||
// Real-time
|
||||
events: DashboardEvent[]; // Rolling buffer (last 100)
|
||||
currentRun: {
|
||||
agentId: string;
|
||||
streamId: string;
|
||||
messages: StreamingMessage[];
|
||||
tools: ToolExecution[];
|
||||
} | null;
|
||||
|
||||
// Snapshot data
|
||||
subagents: SubagentRunRecord[];
|
||||
processes: ProcessEntry[];
|
||||
heartbeat: HeartbeatEventPayload | null;
|
||||
cronJobs: CronJobStatus[];
|
||||
|
||||
// Connection status
|
||||
gateway: GatewayStatus;
|
||||
channels: ChannelAccountState[];
|
||||
|
||||
// Actions
|
||||
subscribe: () => void;
|
||||
unsubscribe: () => void;
|
||||
refresh: () => void;
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phase 4: UI Components
|
||||
|
||||
### 4.1 Dashboard Page
|
||||
|
||||
**File:** `apps/desktop/src/renderer/src/pages/dashboard.tsx`
|
||||
|
||||
Layout:
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────┐
|
||||
│ Dashboard [Refresh] │
|
||||
├─────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌─────────────────┐ ┌─────────────────────────────┐ │
|
||||
│ │ Agent Status │ │ Current Activity │ │
|
||||
│ │ ● Running │ │ 💬 Generating response... │ │
|
||||
│ │ Pending: 2 │ │ 🔧 exec: npm test │ │
|
||||
│ └─────────────────┘ └─────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌─────────────────────────────────────────────────┐ │
|
||||
│ │ Sub-agents │ │
|
||||
│ │ ┌─────────────────────────────────────────────┐ │ │
|
||||
│ │ │ 🟢 search-docs task: "Find API docs" │ │ │
|
||||
│ │ │ 🔵 analyze-code task: "Review PR #123" │ │ │
|
||||
│ │ │ 🟡 pending task: "Write tests" │ │ │
|
||||
│ │ └─────────────────────────────────────────────┘ │ │
|
||||
│ └─────────────────────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌─────────────────────────────────────────────────┐ │
|
||||
│ │ Running Processes │ │
|
||||
│ │ ┌─────────────────────────────────────────────┐ │ │
|
||||
│ │ │ npm test PID: 12345 ⏱️ 2m 30s │ │ │
|
||||
│ │ │ > Running 45/100 tests... │ │ │
|
||||
│ │ └─────────────────────────────────────────────┘ │ │
|
||||
│ └─────────────────────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||||
│ │ Heartbeat │ │ Gateway │ │ Channels │ │
|
||||
│ │ ✅ 30s ago │ │ 🟢 2 devices │ │ 📱 Telegram │ │
|
||||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### 4.2 Components
|
||||
|
||||
- `AgentStatusCard` - Running/Idle/Error state
|
||||
- `CurrentActivityFeed` - Real-time event stream
|
||||
- `SubagentList` - Sub-agent status with progress
|
||||
- `ProcessList` - Running bash processes with output preview
|
||||
- `HeartbeatIndicator` - Health status
|
||||
- `ConnectionStatus` - Gateway + Channels
|
||||
|
||||
---
|
||||
|
||||
## Phase 5: Control Actions
|
||||
|
||||
### 5.1 Process Control
|
||||
|
||||
```typescript
|
||||
// Stop a running process
|
||||
ipcMain.handle('dashboard:stopProcess', (processId) => {
|
||||
return processRegistry.stop(processId);
|
||||
});
|
||||
```
|
||||
|
||||
### 5.2 Agent Control (Future)
|
||||
|
||||
- Cancel current run (requires AbortController wiring)
|
||||
- Cancel sub-agent
|
||||
- Retry failed operation
|
||||
|
||||
---
|
||||
|
||||
## Implementation Order
|
||||
|
||||
### Step 1: Core Infrastructure
|
||||
- [ ] Create `packages/core/src/hub/dashboard.ts`
|
||||
- [ ] Add `getDashboardSnapshot()` to Hub
|
||||
- [ ] Add source tracking to event stream
|
||||
|
||||
### Step 2: IPC Layer
|
||||
- [ ] Create `apps/desktop/src/main/ipc/dashboard.ts`
|
||||
- [ ] Add preload API
|
||||
- [ ] Wire up event forwarding
|
||||
|
||||
### Step 3: Store
|
||||
- [ ] Create `useDashboardStore`
|
||||
- [ ] Implement subscription lifecycle
|
||||
|
||||
### Step 4: UI
|
||||
- [ ] Create dashboard page
|
||||
- [ ] Build individual components
|
||||
- [ ] Add to navigation
|
||||
|
||||
### Step 5: Polish
|
||||
- [ ] Error handling
|
||||
- [ ] Loading states
|
||||
- [ ] Empty states
|
||||
|
||||
---
|
||||
|
||||
## Data Sources Summary
|
||||
|
||||
| Data | Source | Real-time | Query |
|
||||
|------|--------|-----------|-------|
|
||||
| Agent events | `agent.subscribe()` | ✅ Push | - |
|
||||
| Sub-agents | `listSubagentRuns()` | 🔄 Poll | ✅ |
|
||||
| Processes | `PROCESS_REGISTRY` | 🔄 Poll | ✅ |
|
||||
| Heartbeat | `onHeartbeatEvent()` | ✅ Push | ✅ |
|
||||
| Gateway | `onConnectionStateChange()` | ✅ Push | ✅ |
|
||||
| Channels | `listAccountStates()` | 🔄 Poll | ✅ |
|
||||
|
||||
---
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Event buffer size** - How many events to keep in memory?
|
||||
2. **Polling interval** - For non-push data, how often to refresh?
|
||||
3. **Sub-agent drill-down** - Can we subscribe to child agent events?
|
||||
4. **Process output streaming** - Stream tail buffer in real-time?
|
||||
|
||||
---
|
||||
|
||||
## References
|
||||
|
||||
- Agent event types: `packages/core/src/agent/events.ts`
|
||||
- Sub-agent registry: `packages/core/src/agent/subagent/registry.ts`
|
||||
- Process registry: `packages/core/src/agent/tools/process-registry.ts`
|
||||
- Heartbeat: `packages/core/src/heartbeat/`
|
||||
- Channel manager: `packages/core/src/channels/manager.ts`
|
||||
|
|
@ -40,6 +40,8 @@ export interface WriteInternalOptions {
|
|||
export interface WriteOptions {
|
||||
/** Disable automatic message timestamp injection */
|
||||
injectTimestamp?: boolean | undefined;
|
||||
/** Message source (where did this message come from?) */
|
||||
source?: import("./session/types.js").MessageSource | undefined;
|
||||
}
|
||||
|
||||
export class AsyncAgent {
|
||||
|
|
@ -89,7 +91,7 @@ export class AsyncAgent {
|
|||
return;
|
||||
}
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`);
|
||||
const result = await this.agent.run(message, { displayPrompt: content });
|
||||
const result = await this.agent.run(message, { displayPrompt: content, source: options?.source });
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`);
|
||||
// Flush pending session writes so waitForIdle() callers
|
||||
// can safely read session data from disk.
|
||||
|
|
|
|||
|
|
@ -108,6 +108,7 @@ export class Agent {
|
|||
private _runMutex: Promise<void> = Promise.resolve();
|
||||
private _compactionPromise: Promise<void> = Promise.resolve();
|
||||
private currentUserDisplayPrompt: string | undefined;
|
||||
private currentUserSource: import("./session/types.js").MessageSource | undefined;
|
||||
|
||||
// MulticaEvent subscribers (parallel to PiAgentCore's subscriber list)
|
||||
// Typed as AgentEvent | MulticaEvent to match subscribeAll() callback signature
|
||||
|
|
@ -408,7 +409,7 @@ export class Agent {
|
|||
|
||||
async run(
|
||||
prompt: string,
|
||||
options?: { displayPrompt?: string },
|
||||
options?: { displayPrompt?: string; source?: import("./session/types.js").MessageSource },
|
||||
): Promise<AgentRunResult> {
|
||||
// Run-level mutex: prevents concurrent run/runInternal from mis-tagging messages
|
||||
return this.withRunMutex(() => this._run(prompt, options));
|
||||
|
|
@ -453,7 +454,7 @@ export class Agent {
|
|||
|
||||
private async _run(
|
||||
prompt: string,
|
||||
options?: { displayPrompt?: string },
|
||||
options?: { displayPrompt?: string; source?: import("./session/types.js").MessageSource },
|
||||
): Promise<AgentRunResult> {
|
||||
// Wait for any in-flight compaction from the previous run
|
||||
await this._compactionPromise;
|
||||
|
|
@ -461,6 +462,7 @@ export class Agent {
|
|||
this.refreshAuthState();
|
||||
this.output.state.lastAssistantText = "";
|
||||
this.currentUserDisplayPrompt = options?.displayPrompt;
|
||||
this.currentUserSource = options?.source;
|
||||
this._isRunning = true;
|
||||
this._aborted = false;
|
||||
|
||||
|
|
@ -576,6 +578,7 @@ export class Agent {
|
|||
this._isRunning = false;
|
||||
this._aborted = false;
|
||||
this.currentUserDisplayPrompt = undefined;
|
||||
this.currentUserSource = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -667,13 +670,16 @@ export class Agent {
|
|||
private handleSessionEvent(event: AgentEvent) {
|
||||
if (event.type === "message_end") {
|
||||
const message = event.message as AgentMessage;
|
||||
const saveOptions: { internal?: boolean; displayContent?: UserMessage["content"] } = {};
|
||||
const saveOptions: { internal?: boolean; displayContent?: UserMessage["content"]; source?: import("./session/types.js").MessageSource } = {};
|
||||
if (this._internalRun) {
|
||||
saveOptions.internal = true;
|
||||
}
|
||||
if (message.role === "user" && this.currentUserDisplayPrompt !== undefined) {
|
||||
saveOptions.displayContent = this.currentUserDisplayPrompt;
|
||||
}
|
||||
if (message.role === "user" && this.currentUserSource !== undefined) {
|
||||
saveOptions.source = this.currentUserSource;
|
||||
}
|
||||
this.session.saveMessage(message, Object.keys(saveOptions).length > 0 ? saveOptions : undefined);
|
||||
// Skip compaction during internal runs — internal messages will be
|
||||
// rolled back from memory afterwards, so compacting now would be incorrect.
|
||||
|
|
|
|||
|
|
@ -171,14 +171,14 @@ export class SessionManager {
|
|||
return this.loadMessagesFromEntries(options, false);
|
||||
}
|
||||
|
||||
loadMessagesForDisplay(options?: { includeInternal?: boolean }): AgentMessage[] {
|
||||
loadMessagesForDisplay(options?: { includeInternal?: boolean }): (AgentMessage & { source?: import("./types.js").MessageSource })[] {
|
||||
return this.loadMessagesFromEntries(options, true);
|
||||
}
|
||||
|
||||
private loadMessagesFromEntries(
|
||||
options: { includeInternal?: boolean } | undefined,
|
||||
preferDisplayContent: boolean,
|
||||
): AgentMessage[] {
|
||||
): (AgentMessage & { source?: import("./types.js").MessageSource })[] {
|
||||
const entries = this.loadEntries();
|
||||
let messages = entries
|
||||
.filter((entry) => {
|
||||
|
|
@ -188,17 +188,19 @@ export class SessionManager {
|
|||
})
|
||||
.map((entry) => {
|
||||
const messageEntry = entry as Extract<SessionEntry, { type: "message" }>;
|
||||
if (
|
||||
preferDisplayContent
|
||||
const base = preferDisplayContent
|
||||
&& messageEntry.message.role === "user"
|
||||
&& messageEntry.displayContent !== undefined
|
||||
) {
|
||||
return { ...messageEntry.message, content: messageEntry.displayContent };
|
||||
? { ...messageEntry.message, content: messageEntry.displayContent }
|
||||
: messageEntry.message;
|
||||
// Include source for user messages
|
||||
if (messageEntry.source && messageEntry.message.role === "user") {
|
||||
return { ...base, source: messageEntry.source };
|
||||
}
|
||||
return messageEntry.message;
|
||||
return base;
|
||||
});
|
||||
messages = sanitizeToolCallInputs(messages);
|
||||
messages = sanitizeToolUseResultPairing(messages);
|
||||
messages = sanitizeToolCallInputs(messages) as typeof messages;
|
||||
messages = sanitizeToolUseResultPairing(messages) as typeof messages;
|
||||
return messages;
|
||||
}
|
||||
|
||||
|
|
@ -230,7 +232,7 @@ export class SessionManager {
|
|||
|
||||
saveMessage(
|
||||
message: AgentMessage,
|
||||
options?: { internal?: boolean; displayContent?: UserMessage["content"] },
|
||||
options?: { internal?: boolean; displayContent?: UserMessage["content"]; source?: import("./types.js").MessageSource },
|
||||
) {
|
||||
void this.enqueue(() =>
|
||||
appendEntry(
|
||||
|
|
@ -243,6 +245,7 @@ export class SessionManager {
|
|||
...(options?.displayContent !== undefined
|
||||
? { displayContent: options.displayContent }
|
||||
: {}),
|
||||
...(options?.source !== undefined ? { source: options.source } : {}),
|
||||
},
|
||||
{ baseDir: this.baseDir },
|
||||
),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,12 @@
|
|||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import type { UserMessage } from "@mariozechner/pi-ai";
|
||||
|
||||
/** Message source: where did this message come from? */
|
||||
export type MessageSource =
|
||||
| { type: "local" }
|
||||
| { type: "gateway"; deviceId: string }
|
||||
| { type: "channel"; channelId: string; accountId: string; conversationId: string };
|
||||
|
||||
export type SessionMeta = {
|
||||
provider?: string;
|
||||
model?: string;
|
||||
|
|
@ -22,6 +28,8 @@ export type SessionEntry =
|
|||
* When omitted, consumers should fall back to message.content.
|
||||
*/
|
||||
displayContent?: UserMessage["content"];
|
||||
/** Message source (only for user messages) */
|
||||
source?: MessageSource;
|
||||
}
|
||||
| { type: "meta"; meta: SessionMeta; timestamp: number }
|
||||
| {
|
||||
|
|
|
|||
|
|
@ -461,12 +461,25 @@ export class ChannelManager {
|
|||
const route = this.lastRoute ? { ...this.lastRoute } : null;
|
||||
const acks = [...this.ackBuffer];
|
||||
this.ackBuffer = [];
|
||||
const source = route ? {
|
||||
type: "channel" as const,
|
||||
channelId: route.plugin.id,
|
||||
accountId: route.deliveryCtx.accountId,
|
||||
conversationId: route.deliveryCtx.conversationId,
|
||||
} : undefined;
|
||||
if (route) {
|
||||
this.pendingRoutes.push({ route, acks });
|
||||
// Broadcast inbound message to local listeners (Desktop UI)
|
||||
this.hub.broadcastInbound({
|
||||
agentId: agent.sessionId,
|
||||
content: combinedText,
|
||||
source: source!,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
const replyTo = route?.deliveryCtx.replyToMessageId ?? "?";
|
||||
console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`);
|
||||
agent.write(combinedText);
|
||||
agent.write(combinedText, { source });
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,6 +51,22 @@ import {
|
|||
import { enqueueSystemEvent } from "../heartbeat/system-events.js";
|
||||
import { isHeartbeatAckEvent } from "./heartbeat-filter.js";
|
||||
|
||||
// ============ Message Source Types ============
|
||||
|
||||
/** Message source: where did this inbound message come from? */
|
||||
export type MessageSource =
|
||||
| { type: "local" }
|
||||
| { type: "gateway"; deviceId: string }
|
||||
| { type: "channel"; channelId: string; accountId: string; conversationId: string };
|
||||
|
||||
/** Inbound message event broadcast to all listeners */
|
||||
export interface InboundMessageEvent {
|
||||
agentId: string;
|
||||
content: string;
|
||||
source: MessageSource;
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
export class Hub {
|
||||
private readonly agents = new Map<string, AsyncAgent>();
|
||||
private readonly agentSenders = new Map<string, string>();
|
||||
|
|
@ -59,6 +75,7 @@ export class Hub {
|
|||
private readonly pendingAssistantStarts = new Map<string, { agentId: string; event: unknown }>();
|
||||
private readonly suppressedStreamAgents = new Set<string>();
|
||||
private readonly localApprovalHandlers = new Map<string, (payload: ExecApprovalRequest) => void>();
|
||||
private readonly inboundListeners = new Set<(event: InboundMessageEvent) => void>();
|
||||
private readonly rpc: RpcDispatcher;
|
||||
private readonly approvalManager: ExecApprovalManager;
|
||||
private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>();
|
||||
|
|
@ -268,7 +285,14 @@ export class Hub {
|
|||
if (agent && !agent.closed) {
|
||||
this.agentSenders.set(agentId, msg.from);
|
||||
this.channelManager.clearLastRoute();
|
||||
agent.write(content);
|
||||
const source: MessageSource = { type: "gateway", deviceId: msg.from };
|
||||
this.broadcastInbound({
|
||||
agentId,
|
||||
content,
|
||||
source,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
agent.write(content, { source });
|
||||
} else {
|
||||
console.warn(`[Hub] Agent not found or closed: ${agentId}`);
|
||||
}
|
||||
|
|
@ -295,6 +319,21 @@ export class Hub {
|
|||
};
|
||||
}
|
||||
|
||||
/** Subscribe to inbound messages from all sources. Returns unsubscribe function. */
|
||||
onInboundMessage(callback: (event: InboundMessageEvent) => void): () => void {
|
||||
this.inboundListeners.add(callback);
|
||||
return () => {
|
||||
this.inboundListeners.delete(callback);
|
||||
};
|
||||
}
|
||||
|
||||
/** Broadcast an inbound message to all listeners */
|
||||
broadcastInbound(event: InboundMessageEvent): void {
|
||||
for (const listener of this.inboundListeners) {
|
||||
listener(event);
|
||||
}
|
||||
}
|
||||
|
||||
/** Register a one-time token for device verification (called when QR code is generated) */
|
||||
registerToken(token: string, agentId: string, expiresAt: number): void {
|
||||
this.deviceStore.registerToken(token, agentId, expiresAt);
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
export { Hub } from "./hub.js";
|
||||
export type { MessageSource, InboundMessageEvent } from "./hub.js";
|
||||
export { getHubId } from "./hub-identity.js";
|
||||
export type { HubOptions } from "./types.js";
|
||||
|
|
|
|||
|
|
@ -8,9 +8,11 @@ export type {
|
|||
export { useChat } from "./use-chat";
|
||||
export type {
|
||||
Message,
|
||||
MessageSource,
|
||||
ToolStatus,
|
||||
ChatError,
|
||||
PendingApproval,
|
||||
CompactionInfo,
|
||||
UseChatReturn,
|
||||
} from "./use-chat";
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,12 @@ export interface CompactionInfo {
|
|||
reason: string;
|
||||
}
|
||||
|
||||
/** Message source: where did this message come from? */
|
||||
export type MessageSource =
|
||||
| { type: "local" }
|
||||
| { type: "gateway"; deviceId: string }
|
||||
| { type: "channel"; channelId: string; accountId: string; conversationId: string };
|
||||
|
||||
export interface Message {
|
||||
id: string;
|
||||
role: "user" | "assistant" | "toolResult" | "system";
|
||||
|
|
@ -35,6 +41,8 @@ export interface Message {
|
|||
isError?: boolean;
|
||||
systemType?: "compaction";
|
||||
compaction?: CompactionInfo;
|
||||
/** Message source (only for user messages) */
|
||||
source?: MessageSource;
|
||||
}
|
||||
|
||||
export interface ChatError {
|
||||
|
|
@ -95,7 +103,7 @@ export function useChat() {
|
|||
const loaded: Message[] = [];
|
||||
for (const m of raw) {
|
||||
if (m.role === "user") {
|
||||
loaded.push({ id: uuidv7(), role: "user", content: toContentBlocks(m.content), agentId });
|
||||
loaded.push({ id: uuidv7(), role: "user", content: toContentBlocks(m.content), agentId, source: m.source });
|
||||
} else if (m.role === "assistant") {
|
||||
loaded.push({ id: uuidv7(), role: "assistant", content: toContentBlocks(m.content), agentId, stopReason: m.stopReason });
|
||||
} else if (m.role === "toolResult") {
|
||||
|
|
@ -133,10 +141,10 @@ export function useChat() {
|
|||
}, [convertMessages]);
|
||||
|
||||
/** Add a user message */
|
||||
const addUserMessage = useCallback((text: string, agentId: string) => {
|
||||
const addUserMessage = useCallback((text: string, agentId: string, source?: MessageSource) => {
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
{ id: uuidv7(), role: "user", content: [{ type: "text", text }], agentId },
|
||||
{ id: uuidv7(), role: "user", content: [{ type: "text", text }], agentId, source },
|
||||
]);
|
||||
}, []);
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ export {
|
|||
isResponseSuccess,
|
||||
isResponseError,
|
||||
type AgentMessageItem,
|
||||
type MessageSource,
|
||||
DEFAULT_MESSAGES_LIMIT,
|
||||
type GetAgentMessagesParams,
|
||||
type GetAgentMessagesResult,
|
||||
|
|
|
|||
|
|
@ -70,11 +70,20 @@ export interface GetAgentMessagesParams {
|
|||
limit?: number;
|
||||
}
|
||||
|
||||
/** Message source: where did this message come from? */
|
||||
export type MessageSource =
|
||||
| { type: "local" }
|
||||
| { type: "gateway"; deviceId: string }
|
||||
| { type: "channel"; channelId: string; accountId: string; conversationId: string };
|
||||
|
||||
/**
|
||||
* Agent message returned by getAgentMessages.
|
||||
* This is pi-ai's Message type — the backend returns it as-is from SessionManager.loadMessages().
|
||||
* Extends pi-ai's Message type with optional source field.
|
||||
*/
|
||||
export type AgentMessageItem = Message;
|
||||
export type AgentMessageItem = Message & {
|
||||
/** Message source (only for user messages) */
|
||||
source?: MessageSource;
|
||||
};
|
||||
|
||||
/** getAgentMessages - response payload */
|
||||
export interface GetAgentMessagesResult {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,3 @@
|
|||
export type { Message, ToolStatus } from "./types"
|
||||
export type { Message, MessageSource, ToolStatus } from "./types"
|
||||
export { parseConnectionCode } from "./connection"
|
||||
export type { ConnectionInfo } from "./connection"
|
||||
|
|
|
|||
|
|
@ -2,6 +2,12 @@ import type { ContentBlock } from "@multica/sdk"
|
|||
|
||||
export type ToolStatus = "running" | "success" | "error" | "interrupted"
|
||||
|
||||
/** Message source: where did this message come from? */
|
||||
export type MessageSource =
|
||||
| { type: "local" }
|
||||
| { type: "gateway"; deviceId: string }
|
||||
| { type: "channel"; channelId: string; accountId: string; conversationId: string }
|
||||
|
||||
export interface CompactionInfo {
|
||||
removed: number
|
||||
kept: number
|
||||
|
|
@ -23,4 +29,6 @@ export interface Message {
|
|||
isError?: boolean
|
||||
systemType?: "compaction"
|
||||
compaction?: CompactionInfo
|
||||
/** Message source (only for user messages) */
|
||||
source?: MessageSource
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import { StreamingMarkdown } from "@multica/ui/components/markdown/StreamingMark
|
|||
import { ToolCallItem } from "@multica/ui/components/tool-call-item";
|
||||
import { ThinkingItem } from "@multica/ui/components/thinking-item";
|
||||
import { CompactionItem } from "@multica/ui/components/compaction-item";
|
||||
import { MessageSourceIcon } from "@multica/ui/components/message-source-icon";
|
||||
import { cn, getTextContent } from "@multica/ui/lib/utils";
|
||||
import type { Message } from "@multica/store";
|
||||
import type { ContentBlock, ToolCall, ThinkingContent } from "@multica/sdk";
|
||||
|
|
@ -114,10 +115,14 @@ export const MessageList = memo(function MessageList({ messages, streamingIds }:
|
|||
{(text || isStreaming) && (
|
||||
<div
|
||||
className={cn(
|
||||
"flex",
|
||||
"flex items-center gap-1.5",
|
||||
msg.role === "user" ? "justify-end" : "justify-start"
|
||||
)}
|
||||
>
|
||||
{/* Source icon for non-local user messages */}
|
||||
{msg.role === "user" && msg.source && msg.source.type !== "local" && (
|
||||
<MessageSourceIcon source={msg.source} />
|
||||
)}
|
||||
<div
|
||||
className={cn(
|
||||
msg.role === "user" ? "bg-muted rounded-md max-w-[60%] py-1 px-2.5 my-2" : "w-full py-1 px-2.5 my-1"
|
||||
|
|
|
|||
61
packages/ui/src/components/message-source-icon.tsx
Normal file
61
packages/ui/src/components/message-source-icon.tsx
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
"use client";
|
||||
|
||||
import { Monitor, Smartphone, Send } from "lucide-react";
|
||||
import {
|
||||
Tooltip,
|
||||
TooltipContent,
|
||||
TooltipProvider,
|
||||
TooltipTrigger,
|
||||
} from "@multica/ui/components/ui/tooltip";
|
||||
import type { MessageSource } from "@multica/store";
|
||||
|
||||
interface MessageSourceIconProps {
|
||||
source: MessageSource;
|
||||
className?: string;
|
||||
}
|
||||
|
||||
/** Get icon component for message source */
|
||||
function getSourceIcon(source: MessageSource) {
|
||||
switch (source.type) {
|
||||
case "local":
|
||||
return Monitor;
|
||||
case "gateway":
|
||||
return Smartphone;
|
||||
case "channel":
|
||||
return Send;
|
||||
default:
|
||||
return Monitor;
|
||||
}
|
||||
}
|
||||
|
||||
/** Get tooltip text for message source */
|
||||
function getSourceTooltip(source: MessageSource): string {
|
||||
switch (source.type) {
|
||||
case "local":
|
||||
return "Local";
|
||||
case "gateway":
|
||||
return `Remote: ${source.deviceId}`;
|
||||
case "channel":
|
||||
return `${source.channelId}: ${source.conversationId}`;
|
||||
default:
|
||||
return "Unknown";
|
||||
}
|
||||
}
|
||||
|
||||
export function MessageSourceIcon({ source, className }: MessageSourceIconProps) {
|
||||
const Icon = getSourceIcon(source);
|
||||
const tooltip = getSourceTooltip(source);
|
||||
|
||||
return (
|
||||
<TooltipProvider>
|
||||
<Tooltip>
|
||||
<TooltipTrigger className="inline-flex">
|
||||
<Icon className={className ?? "w-3 h-3 text-muted-foreground"} />
|
||||
</TooltipTrigger>
|
||||
<TooltipContent side="top" className="text-xs">
|
||||
{tooltip}
|
||||
</TooltipContent>
|
||||
</Tooltip>
|
||||
</TooltipProvider>
|
||||
);
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue