diff --git a/apps/desktop/.eslintrc.cjs b/apps/desktop/.eslintrc.cjs index 7dbd9d18..527508b1 100644 --- a/apps/desktop/.eslintrc.cjs +++ b/apps/desktop/.eslintrc.cjs @@ -14,5 +14,6 @@ module.exports = { 'warn', { allowConstantExport: true }, ], + '@typescript-eslint/no-unused-vars': 'warn', }, } diff --git a/apps/desktop/README.md b/apps/desktop/README.md index 0570e013..44fc7955 100644 --- a/apps/desktop/README.md +++ b/apps/desktop/README.md @@ -466,34 +466,39 @@ pnpm --filter @multica/desktop add -D @types/qrcode.react --- -## 八、待办事项 +## 八、数据流架构 -### 核心功能 +Chat 页面支持两种模式,底层使用相同的 UI 组件和 Store: -#### Memory 统一 +### Local Mode (IPC 直连) -- [ ] 统一 memory tool (key-value JSON) 和 memory.md 的逻辑 -- [ ] 考虑支持 OpenClaw 风格的 MEMORY.md + 语义搜索 +本地 Agent 对话,不需要 Gateway,直接通过 Electron IPC 通信: -#### Profile 加载优化 +``` +ChatInput → useLocalChat.sendMessage() + → IPC: localChat:send → agent.write() + → agent.subscribe() → IPC: localChat:event + → useLocalChat.onEvent() → useMessagesStore.startStream/appendStream/endStream + → MessageList 显示 +``` -- [ ] 参考 OpenClaw 的截断策略(超过 20k 字符时保留 head 70% + tail 20%) -- [ ] 子 Agent 只加载必要的 bootstrap 文件 +### Remote Mode (Gateway) -#### Agent 自我迭代 +远程 Agent 对话,通过 WebSocket 连接 Gateway: -- [ ] Agent 需要有自己迭代 Profile 的能力(更新 soul.md、user.md 等) -- [ ] 支持 Agent 主动记忆用户偏好 +``` +ChatInput → useMessagesStore.sendMessage() + → ConnectionStore.send() → WebSocket → Gateway → Hub → agent.write() + → Hub.consumeAgent() → WebSocket: stream event + → ConnectionStore.onMessage() → useMessagesStore.startStream/appendStream/endStream + → MessageList 显示 +``` -#### 本地直连模式 +### 复用层级 -- [ ] 增加通过 IPC 的方式与同一 Electron 进程里的 Agent 直接通讯 -- [ ] 本地对话不需要走 Gateway 授权流程 - -### 体验优化 - -#### Settings 页面 - -- [ ] Gateway URL 配置 -- [ ] Theme 切换 (Light / Dark / System) -- [ ] 打开 credentials.json5 按钮 +| 层级 | 组件/模块 | 复用情况 | +| ---------- | ---------------------------------------- | -------- | +| UI 层 | `MessageList`, `ChatInput` | ✅ 完全复用 | +| Store 层 | `useMessagesStore` | ✅ 完全复用 | +| Agent 层 | `AsyncAgent.write()`, `subscribe()` | ✅ 完全复用 | +| 传输层 | IPC vs WebSocket | ❌ 各自实现 | diff --git a/apps/desktop/electron/electron-env.d.ts b/apps/desktop/electron/electron-env.d.ts index 5b9669e4..880abc6a 100644 --- a/apps/desktop/electron/electron-env.d.ts +++ b/apps/desktop/electron/electron-env.d.ts @@ -85,6 +85,22 @@ interface ProfileData { userContent: string | undefined } +interface LocalChatEvent { + agentId: string + streamId?: string + type?: 'error' + content?: string + event?: { + type: 'message_start' | 'message_update' | 'message_end' | 'tool_execution_start' | 'tool_execution_end' + id?: string + message?: { + role: string + content?: Array<{ type: string; text?: string }> + } + [key: string]: unknown + } +} + interface ElectronAPI { hub: { init: () => Promise @@ -129,6 +145,14 @@ interface ElectronAPI { updateStyle: (style: string) => Promise updateUser: (content: string) => Promise } + localChat: { + subscribe: (agentId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean }> + unsubscribe: (agentId: string) => Promise<{ ok: boolean }> + getHistory: (agentId: string) => Promise<{ messages: Array<{ id: string; role: 'user' | 'assistant'; content: string; agentId: string }> }> + send: (agentId: string, content: string) => Promise<{ ok?: boolean; error?: string }> + onEvent: (callback: (event: LocalChatEvent) => void) => void + offEvent: () => void + } } // Used in Renderer process, expose in `preload.ts` diff --git a/apps/desktop/electron/ipc/hub.ts b/apps/desktop/electron/ipc/hub.ts index 24208586..75d1f1b6 100644 --- a/apps/desktop/electron/ipc/hub.ts +++ b/apps/desktop/electron/ipc/hub.ts @@ -4,14 +4,31 @@ * Creates and manages a Hub instance that connects to the Gateway. * This follows the same pattern as the Console app. */ -import { ipcMain } from 'electron' +import { ipcMain, type BrowserWindow } from 'electron' import { Hub } from '../../../../src/hub/hub.js' import type { ConnectionState } from '@multica/sdk' import type { AsyncAgent } from '../../../../src/agent/async-agent.js' +/** + * Extract plain text from AgentMessage content (string or content block array). + */ +function extractTextContent(content: unknown): string { + if (typeof content === 'string') return content + if (!Array.isArray(content)) return '' + return content + .filter((c: { type?: string }) => c.type === 'text') + .map((c: { text?: string }) => c.text ?? '') + .join('') +} + // Singleton Hub instance let hub: Hub | null = null let defaultAgentId: string | null = null +let mainWindowRef: BrowserWindow | null = null + +// Track which agents have active IPC subscriptions (for local direct chat) +// Value is the unsubscribe function returned by agent.subscribe() +const ipcAgentSubscriptions = new Map void>() /** * Safe log function that catches EPIPE errors. @@ -219,7 +236,8 @@ export function registerHubIpcHandlers(): void { }) /** - * Send a message to an agent. + * Send a message to an agent (for remote clients via Gateway). + * Note: For local direct chat, use 'localChat:send' instead. */ ipcMain.handle('hub:sendMessage', async (_event, agentId: string, content: string) => { const h = getHub() @@ -234,6 +252,135 @@ export function registerHubIpcHandlers(): void { return { ok: true } }) + /** + * Subscribe to local agent events (for direct IPC chat without Gateway). + * Uses agent.subscribe() which supports multiple subscribers. + */ + ipcMain.handle('localChat:subscribe', async (_event, agentId: string) => { + const h = getHub() + const agent = h.getAgent(agentId) + if (!agent) { + return { error: `Agent not found: ${agentId}` } + } + if (agent.closed) { + return { error: `Agent is closed: ${agentId}` } + } + + // Already subscribed? + if (ipcAgentSubscriptions.has(agentId)) { + return { ok: true, alreadySubscribed: true } + } + + // Track current stream ID for message grouping + let currentStreamId: string | null = null + + // Subscribe to agent events using the multi-subscriber mechanism + const unsubscribe = agent.subscribe((event) => { + if (!mainWindowRef || mainWindowRef.isDestroyed()) { + return + } + + // Filter events same as Hub.consumeAgent() + const maybeMessage = (event as { message?: { role?: string } }).message + const isAssistantMessage = maybeMessage?.role === 'assistant' + const shouldForward = + ((event.type === 'message_start' || event.type === 'message_update' || event.type === 'message_end') && isAssistantMessage) + || event.type === 'tool_execution_start' + || event.type === 'tool_execution_end' + + if (!shouldForward) return + + // Track stream ID for message grouping + if (event.type === 'message_start') { + currentStreamId = (event as { id?: string }).id ?? `stream-${Date.now()}` + safeLog(`[IPC] Starting stream: ${currentStreamId}`) + } + + safeLog(`[IPC] Sending event to renderer: ${event.type}, streamId: ${currentStreamId}`) + mainWindowRef.webContents.send('localChat:event', { + agentId, + streamId: currentStreamId, + event, + }) + + if (event.type === 'message_end') { + safeLog(`[IPC] Ending stream: ${currentStreamId}`) + currentStreamId = null + } + }) + + ipcAgentSubscriptions.set(agentId, unsubscribe) + safeLog(`[IPC] Local chat subscribed to agent: ${agentId}`) + + return { ok: true } + }) + + /** + * Unsubscribe from local agent events. + */ + ipcMain.handle('localChat:unsubscribe', async (_event, agentId: string) => { + const unsubscribe = ipcAgentSubscriptions.get(agentId) + if (unsubscribe) { + unsubscribe() + } + ipcAgentSubscriptions.delete(agentId) + safeLog(`[IPC] Local chat unsubscribed from agent: ${agentId}`) + return { ok: true } + }) + + /** + * Get message history for local chat. + * Returns messages in the same format as useMessagesStore. + */ + ipcMain.handle('localChat:getHistory', async (_event, agentId: string) => { + const h = getHub() + const agent = h.getAgent(agentId) + if (!agent) { + return { messages: [] } + } + + try { + const sessionMessages = agent.getMessages() + const messages = sessionMessages + .filter((m) => m.role === 'user' || m.role === 'assistant') + .map((m, i) => ({ + id: `history-${i}-${Date.now()}`, + role: m.role as 'user' | 'assistant', + content: extractTextContent((m as { content?: unknown }).content), + agentId, + })) + .filter((m) => m.content.length > 0) + + return { messages } + } catch { + return { messages: [] } + } + }) + + /** + * Send a message via local direct IPC (no Gateway). + * Events will be pushed to renderer via 'localChat:event' channel. + */ + ipcMain.handle('localChat:send', async (_event, agentId: string, content: string) => { + const h = getHub() + const agent = h.getAgent(agentId) + if (!agent) { + return { error: `Agent not found: ${agentId}` } + } + if (agent.closed) { + return { error: `Agent is closed: ${agentId}` } + } + + // Must be subscribed first to receive events + if (!ipcAgentSubscriptions.has(agentId)) { + return { error: 'Not subscribed to agent events. Call subscribe first.' } + } + + agent.write(content) + safeLog(`[IPC] Local chat message sent to agent: ${agentId}`) + return { ok: true } + }) + /** * Register a one-time token for device verification. * Called by the QR code component when a token is generated or refreshed. @@ -264,9 +411,12 @@ export function registerHubIpcHandlers(): void { /** * Set up device confirmation flow between Hub (main process) and renderer. + * Also stores window reference for local chat IPC events. * Must be called after both Hub initialization and window creation. */ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): void { + // Store reference for local chat IPC + mainWindowRef = mainWindow const h = getHub() const pendingConfirms = new Map void>() @@ -300,6 +450,12 @@ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): voi * Cleanup Hub resources. */ export function cleanupHub(): void { + // Unsubscribe all IPC listeners + for (const unsubscribe of ipcAgentSubscriptions.values()) { + unsubscribe() + } + ipcAgentSubscriptions.clear() + if (hub) { safeLog('[Desktop] Shutting down Hub') hub.shutdown() diff --git a/apps/desktop/electron/preload.ts b/apps/desktop/electron/preload.ts index d93ac3f1..f554ef4b 100644 --- a/apps/desktop/electron/preload.ts +++ b/apps/desktop/electron/preload.ts @@ -44,6 +44,23 @@ export interface ProfileData { userContent: string | undefined } +// Local chat event types (for direct IPC communication without Gateway) +export interface LocalChatEvent { + agentId: string + streamId?: string + type?: 'error' + content?: string + event?: { + type: 'message_start' | 'message_update' | 'message_end' | 'tool_execution_start' | 'tool_execution_end' + id?: string + message?: { + role: string + content?: Array<{ type: string; text?: string }> + } + [key: string]: unknown + } +} + // Available style options export const AGENT_STYLES = ['concise', 'warm', 'playful', 'professional'] as const export type AgentStyle = (typeof AGENT_STYLES)[number] @@ -116,6 +133,27 @@ const electronAPI = { updateStyle: (style: string) => ipcRenderer.invoke('profile:updateStyle', style), updateUser: (content: string) => ipcRenderer.invoke('profile:updateUser', content), }, + + // Local chat (direct IPC, no Gateway required) + localChat: { + /** Subscribe to agent events for local direct chat */ + subscribe: (agentId: string) => ipcRenderer.invoke('localChat:subscribe', agentId), + /** Unsubscribe from agent events */ + unsubscribe: (agentId: string) => ipcRenderer.invoke('localChat:unsubscribe', agentId), + /** Get message history for local chat */ + getHistory: (agentId: string): Promise<{ messages: Array<{ id: string; role: 'user' | 'assistant'; content: string; agentId: string }> }> => + ipcRenderer.invoke('localChat:getHistory', agentId), + /** Send message to agent via direct IPC (no Gateway) */ + send: (agentId: string, content: string) => ipcRenderer.invoke('localChat:send', agentId, content), + /** Listen for agent events */ + onEvent: (callback: (event: LocalChatEvent) => void) => { + ipcRenderer.on('localChat:event', (_event, data: LocalChatEvent) => callback(data)) + }, + /** Remove event listener */ + offEvent: () => { + ipcRenderer.removeAllListeners('localChat:event') + }, + }, } // Expose to renderer diff --git a/apps/desktop/src/hooks/use-local-chat.ts b/apps/desktop/src/hooks/use-local-chat.ts new file mode 100644 index 00000000..caacf681 --- /dev/null +++ b/apps/desktop/src/hooks/use-local-chat.ts @@ -0,0 +1,144 @@ +/** + * Hook for local direct chat with agent via IPC (no Gateway required). + * + * This hook bridges IPC events to useMessagesStore, allowing the Chat component + * to work identically in both local IPC and remote Gateway modes. + */ +import { useState, useEffect, useCallback, useRef } from 'react' +import { useMessagesStore } from '@multica/store' + +interface UseLocalChatOptions { + agentId: string +} + +interface UseLocalChatReturn { + isConnected: boolean + isLoading: boolean + sendMessage: (content: string) => void + disconnect: () => void +} + +/** + * Provides local IPC chat that uses the same useMessagesStore as Gateway mode. + * This enables full Chat component reuse. + */ +export function useLocalChat({ agentId }: UseLocalChatOptions): UseLocalChatReturn { + const [isConnected, setIsConnected] = useState(false) + const [isLoading, setIsLoading] = useState(false) + const currentStreamRef = useRef(null) + + // Subscribe to agent events on mount + useEffect(() => { + if (!agentId) return + + const subscribe = async () => { + const result = await window.electronAPI.localChat.subscribe(agentId) + if (result.ok) { + setIsConnected(true) + } + } + + subscribe() + + // Load message history from agent session + const loadHistory = async () => { + try { + const result = await window.electronAPI.localChat.getHistory(agentId) + if (result.messages && result.messages.length > 0) { + useMessagesStore.getState().loadMessages(result.messages) + } + } catch { + // History load is best-effort + } + } + loadHistory() + + // Listen for events and route to useMessagesStore + window.electronAPI.localChat.onEvent((event) => { + if (event.agentId !== agentId) return + + const store = useMessagesStore.getState() + + // Handle error + if (event.type === 'error') { + store.addAssistantMessage(event.content ?? 'Unknown error', agentId) + setIsLoading(false) + return + } + + // Handle agent events - same logic as connection-store.ts + const agentEvent = event.event + const streamId = event.streamId + if (!agentEvent || !streamId) return + + if (agentEvent.type === 'message_start') { + currentStreamRef.current = streamId + store.startStream(streamId, agentId) + // Extract initial text if any + const text = extractTextFromAgentEvent(agentEvent) + if (text) store.appendStream(streamId, text) + } else if (agentEvent.type === 'message_update') { + const text = extractTextFromAgentEvent(agentEvent) + if (text && currentStreamRef.current) { + store.appendStream(currentStreamRef.current, text) + } + } else if (agentEvent.type === 'message_end') { + const text = extractTextFromAgentEvent(agentEvent) + if (currentStreamRef.current) { + store.endStream(currentStreamRef.current, text) + currentStreamRef.current = null + } + setIsLoading(false) + } + }) + + return () => { + window.electronAPI.localChat.offEvent() + window.electronAPI.localChat.unsubscribe(agentId) + setIsConnected(false) + } + }, [agentId]) + + const sendMessage = useCallback( + async (content: string) => { + if (!content.trim() || !agentId || isLoading) return + + // Add user message to store (same as Gateway mode) + useMessagesStore.getState().addUserMessage(content.trim(), agentId) + setIsLoading(true) + + // Send via IPC + const result = await window.electronAPI.localChat.send(agentId, content.trim()) + if (result.error) { + useMessagesStore.getState().addAssistantMessage(`Error: ${result.error}`, agentId) + setIsLoading(false) + } + }, + [agentId, isLoading] + ) + + const disconnect = useCallback(() => { + useMessagesStore.getState().clearMessages() + setIsConnected(false) + setIsLoading(false) + }, []) + + return { + isConnected, + isLoading, + sendMessage, + disconnect, + } +} + +/** + * Extract text content from AgentEvent message. + * Same logic as @multica/sdk extractTextFromEvent. + */ +function extractTextFromAgentEvent(event: { message?: { content?: Array<{ type: string; text?: string }> } }): string { + if (!event.message?.content) return '' + return event.message.content + .filter((c): c is { type: 'text'; text: string } => c.type === 'text' && !!c.text) + .map((c) => c.text) + .join('') +} diff --git a/apps/desktop/src/pages/chat.tsx b/apps/desktop/src/pages/chat.tsx index 52d406ed..8bf461be 100644 --- a/apps/desktop/src/pages/chat.tsx +++ b/apps/desktop/src/pages/chat.tsx @@ -1,5 +1,239 @@ -import { Chat } from '@multica/ui/components/chat' +/** + * Chat Page - supports both Local (IPC) and Remote (Gateway) modes + * + * Both modes use the same useMessagesStore and Chat UI components. + * The difference is only in the transport layer: + * - Local: Direct IPC to agent in the same Electron process + * - Remote: WebSocket via Gateway to external Hub + */ +import { useState, useEffect, useCallback, useRef } from 'react' +import { Button } from '@multica/ui/components/ui/button' +import { ChatInput } from '@multica/ui/components/chat-input' +import { MessageList } from '@multica/ui/components/message-list' +import { ConnectPrompt } from '@multica/ui/components/connect-prompt' +import { useMessagesStore, useConnectionStore, useAutoConnect } from '@multica/store' +import { useScrollFade } from '@multica/ui/hooks/use-scroll-fade' +import { useAutoScroll } from '@multica/ui/hooks/use-auto-scroll' +import { useLocalChat } from '../hooks/use-local-chat' + +type ChatMode = 'select' | 'local' | 'remote' export default function ChatPage() { - return + const [mode, setMode] = useState('select') + const [defaultAgentId, setDefaultAgentId] = useState(null) + + // Get default agent ID on mount + useEffect(() => { + const loadAgentId = async () => { + const status = await window.electronAPI.hub.getStatus() + if (status.defaultAgent?.agentId) { + setDefaultAgentId(status.defaultAgent.agentId) + } + } + loadAgentId() + }, []) + + // Clear messages when switching modes + const handleModeChange = (newMode: ChatMode) => { + useMessagesStore.getState().clearMessages() + setMode(newMode) + } + + // Mode selection screen + if (mode === 'select') { + return ( +
+
+

Start a Conversation

+

+ Choose how you want to connect +

+
+ +
+ + + +
+ + {!defaultAgentId && ( +

+ Waiting for local agent to initialize... +

+ )} +
+ ) + } + + // Local chat mode - uses useLocalChat hook that bridges to useMessagesStore + if (mode === 'local' && defaultAgentId) { + return handleModeChange('select')} /> + } + + // Remote chat mode - uses Gateway connection + return handleModeChange('select')} /> +} + +/** + * Local Chat View - Direct IPC communication with agent + * Uses useLocalChat hook which bridges IPC events to useMessagesStore + */ +function LocalChatView({ agentId, onBack }: { agentId: string; onBack: () => void }) { + const { isConnected, isLoading, sendMessage, disconnect } = useLocalChat({ agentId }) + + // Use same stores as Gateway mode + const messages = useMessagesStore((s) => s.messages) + const streamingIds = useMessagesStore((s) => s.streamingIds) + + const mainRef = useRef(null) + const fadeStyle = useScrollFade(mainRef) + useAutoScroll(mainRef) + + const handleDisconnect = useCallback(() => { + disconnect() + onBack() + }, [disconnect, onBack]) + + return ( +
+ {/* Header */} +
+
+ + Local Agent + +
+ +
+ + {/* Messages - same component as Gateway mode */} +
+ {messages.length === 0 ? ( +
+ Send a message to start the conversation +
+ ) : ( + + )} +
+ + {/* Input - same component as Gateway mode */} +
+ +
+
+ ) +} + +/** + * Remote Chat View - Gateway connection to external Hub + * Same as the original Chat component + */ +function RemoteChatView({ onBack }: { onBack: () => void }) { + const { loading } = useAutoConnect() + + const agentId = useConnectionStore((s) => s.agentId) + const gwState = useConnectionStore((s) => s.connectionState) + const hubId = useConnectionStore((s) => s.hubId) + + const messages = useMessagesStore((s) => s.messages) + const streamingIds = useMessagesStore((s) => s.streamingIds) + + const isConnected = gwState === 'registered' && !!hubId && !!agentId + + const handleSend = useCallback((text: string) => { + const { hubId, agentId, send, connectionState } = useConnectionStore.getState() + if (connectionState !== 'registered' || !hubId || !agentId) return + useMessagesStore.getState().sendMessage(text, { hubId, agentId, send }) + }, []) + + const handleDisconnect = useCallback(() => { + useConnectionStore.getState().disconnect() + onBack() + }, [onBack]) + + const mainRef = useRef(null) + const fadeStyle = useScrollFade(mainRef) + useAutoScroll(mainRef) + + return ( +
+ {/* Header */} +
+
+ + Remote Agent +
+ {isConnected && ( + + )} +
+ + {/* Messages */} +
+ {loading ? ( +
+ Loading... +
+ ) : !isConnected ? ( + + ) : messages.length === 0 ? ( +
+ Send a message to start the conversation +
+ ) : ( + + )} +
+ + {/* Input */} +
+ +
+
+ ) } diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 9c928232..68475555 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,5 +1,5 @@ import { v7 as uuidv7 } from "uuid"; -import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; @@ -58,6 +58,22 @@ export class AsyncAgent { return this.channel; } + /** + * Subscribe to agent events directly (supports multiple subscribers). + * Unlike read(), this allows multiple consumers to receive the same events. + */ + subscribe(callback: (event: AgentEvent) => void): () => void { + console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); + const unsubscribe = this.agent.subscribe((event) => { + console.log(`[AsyncAgent] Event received: ${event.type}`); + callback(event); + }); + return () => { + console.log(`[AsyncAgent] Removing subscriber for agent: ${this.sessionId}`); + unsubscribe(); + }; + } + /** Returns a promise that resolves when the current message queue is drained */ waitForIdle(): Promise { return this.queue; @@ -198,4 +214,11 @@ export class AsyncAgent { reloadSystemPrompt(): void { this.agent.reloadSystemPrompt(); } + + /** + * Get all messages from the current session. + */ + getMessages(): AgentMessage[] { + return this.agent.getMessages(); + } } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 39923470..f66ef159 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -482,6 +482,11 @@ export class Agent { return this.agent.state.tools?.map(t => t.name) ?? []; } + /** Get all messages from the current session */ + getMessages(): AgentMessage[] { + return this.agent.state.messages.slice(); + } + /** * Get all skills with their eligibility status. * Returns empty array if skills are disabled.