From 4a986b1d9a97fe046fa28f6782ef7ac1e4f59a70 Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Wed, 4 Feb 2026 16:46:33 +0800 Subject: [PATCH] feat(desktop): add local chat IPC handlers Add IPC handlers for direct local agent communication: - localChat:subscribe - Subscribe to agent events - localChat:unsubscribe - Unsubscribe from events - localChat:send - Send message to local agent - localChat:getHistory - Get message history This enables chat without Gateway connection. Co-Authored-By: Claude Opus 4.5 --- apps/desktop/electron/electron-env.d.ts | 24 ++++ apps/desktop/electron/ipc/hub.ts | 161 +++++++++++++++++++++++- apps/desktop/electron/preload.ts | 41 ++++++ 3 files changed, 224 insertions(+), 2 deletions(-) diff --git a/apps/desktop/electron/electron-env.d.ts b/apps/desktop/electron/electron-env.d.ts index 5b9669e4..880abc6a 100644 --- a/apps/desktop/electron/electron-env.d.ts +++ b/apps/desktop/electron/electron-env.d.ts @@ -85,6 +85,22 @@ interface ProfileData { userContent: string | undefined } +interface LocalChatEvent { + agentId: string + streamId?: string + type?: 'error' + content?: string + event?: { + type: 'message_start' | 'message_update' | 'message_end' | 'tool_execution_start' | 'tool_execution_end' + id?: string + message?: { + role: string + content?: Array<{ type: string; text?: string }> + } + [key: string]: unknown + } +} + interface ElectronAPI { hub: { init: () => Promise @@ -129,6 +145,14 @@ interface ElectronAPI { updateStyle: (style: string) => Promise updateUser: (content: string) => Promise } + localChat: { + subscribe: (agentId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean }> + unsubscribe: (agentId: string) => Promise<{ ok: boolean }> + getHistory: (agentId: string) => Promise<{ messages: Array<{ id: string; role: 'user' | 'assistant'; content: string; agentId: string }> }> + send: (agentId: string, content: string) => Promise<{ ok?: boolean; error?: string }> + onEvent: (callback: (event: LocalChatEvent) => void) => void + offEvent: () => void + } } // Used in Renderer process, expose in `preload.ts` diff --git a/apps/desktop/electron/ipc/hub.ts b/apps/desktop/electron/ipc/hub.ts index 24208586..685f1d01 100644 --- a/apps/desktop/electron/ipc/hub.ts +++ b/apps/desktop/electron/ipc/hub.ts @@ -4,14 +4,32 @@ * Creates and manages a Hub instance that connects to the Gateway. * This follows the same pattern as the Console app. */ -import { ipcMain } from 'electron' +import { ipcMain, type BrowserWindow } from 'electron' import { Hub } from '../../../../src/hub/hub.js' import type { ConnectionState } from '@multica/sdk' import type { AsyncAgent } from '../../../../src/agent/async-agent.js' +import type { AgentEvent } from '@mariozechner/pi-agent-core' + +/** + * Extract plain text from AgentMessage content (string or content block array). + */ +function extractTextContent(content: unknown): string { + if (typeof content === 'string') return content + if (!Array.isArray(content)) return '' + return content + .filter((c: { type?: string }) => c.type === 'text') + .map((c: { text?: string }) => c.text ?? '') + .join('') +} // Singleton Hub instance let hub: Hub | null = null let defaultAgentId: string | null = null +let mainWindowRef: BrowserWindow | null = null + +// Track which agents have active IPC subscriptions (for local direct chat) +// Value is the unsubscribe function returned by agent.subscribe() +const ipcAgentSubscriptions = new Map void>() /** * Safe log function that catches EPIPE errors. @@ -219,7 +237,8 @@ export function registerHubIpcHandlers(): void { }) /** - * Send a message to an agent. + * Send a message to an agent (for remote clients via Gateway). + * Note: For local direct chat, use 'localChat:send' instead. */ ipcMain.handle('hub:sendMessage', async (_event, agentId: string, content: string) => { const h = getHub() @@ -234,6 +253,135 @@ export function registerHubIpcHandlers(): void { return { ok: true } }) + /** + * Subscribe to local agent events (for direct IPC chat without Gateway). + * Uses agent.subscribe() which supports multiple subscribers. + */ + ipcMain.handle('localChat:subscribe', async (_event, agentId: string) => { + const h = getHub() + const agent = h.getAgent(agentId) + if (!agent) { + return { error: `Agent not found: ${agentId}` } + } + if (agent.closed) { + return { error: `Agent is closed: ${agentId}` } + } + + // Already subscribed? + if (ipcAgentSubscriptions.has(agentId)) { + return { ok: true, alreadySubscribed: true } + } + + // Track current stream ID for message grouping + let currentStreamId: string | null = null + + // Subscribe to agent events using the multi-subscriber mechanism + const unsubscribe = agent.subscribe((event) => { + if (!mainWindowRef || mainWindowRef.isDestroyed()) { + return + } + + // Filter events same as Hub.consumeAgent() + const maybeMessage = (event as { message?: { role?: string } }).message + const isAssistantMessage = maybeMessage?.role === 'assistant' + const shouldForward = + ((event.type === 'message_start' || event.type === 'message_update' || event.type === 'message_end') && isAssistantMessage) + || event.type === 'tool_execution_start' + || event.type === 'tool_execution_end' + + if (!shouldForward) return + + // Track stream ID for message grouping + if (event.type === 'message_start') { + currentStreamId = (event as { id?: string }).id ?? `stream-${Date.now()}` + safeLog(`[IPC] Starting stream: ${currentStreamId}`) + } + + safeLog(`[IPC] Sending event to renderer: ${event.type}, streamId: ${currentStreamId}`) + mainWindowRef.webContents.send('localChat:event', { + agentId, + streamId: currentStreamId, + event, + }) + + if (event.type === 'message_end') { + safeLog(`[IPC] Ending stream: ${currentStreamId}`) + currentStreamId = null + } + }) + + ipcAgentSubscriptions.set(agentId, unsubscribe) + safeLog(`[IPC] Local chat subscribed to agent: ${agentId}`) + + return { ok: true } + }) + + /** + * Unsubscribe from local agent events. + */ + ipcMain.handle('localChat:unsubscribe', async (_event, agentId: string) => { + const unsubscribe = ipcAgentSubscriptions.get(agentId) + if (unsubscribe) { + unsubscribe() + } + ipcAgentSubscriptions.delete(agentId) + safeLog(`[IPC] Local chat unsubscribed from agent: ${agentId}`) + return { ok: true } + }) + + /** + * Get message history for local chat. + * Returns messages in the same format as useMessagesStore. + */ + ipcMain.handle('localChat:getHistory', async (_event, agentId: string) => { + const h = getHub() + const agent = h.getAgent(agentId) + if (!agent) { + return { messages: [] } + } + + try { + const sessionMessages = await agent.getMessages() + const messages = sessionMessages + .filter((m) => m.role === 'user' || m.role === 'assistant') + .map((m, i) => ({ + id: `history-${i}-${Date.now()}`, + role: m.role as 'user' | 'assistant', + content: extractTextContent(m.content), + agentId, + })) + .filter((m) => m.content.length > 0) + + return { messages } + } catch { + return { messages: [] } + } + }) + + /** + * Send a message via local direct IPC (no Gateway). + * Events will be pushed to renderer via 'localChat:event' channel. + */ + ipcMain.handle('localChat:send', async (_event, agentId: string, content: string) => { + const h = getHub() + const agent = h.getAgent(agentId) + if (!agent) { + return { error: `Agent not found: ${agentId}` } + } + if (agent.closed) { + return { error: `Agent is closed: ${agentId}` } + } + + // Must be subscribed first to receive events + if (!ipcAgentSubscriptions.has(agentId)) { + return { error: 'Not subscribed to agent events. Call subscribe first.' } + } + + agent.write(content) + safeLog(`[IPC] Local chat message sent to agent: ${agentId}`) + return { ok: true } + }) + /** * Register a one-time token for device verification. * Called by the QR code component when a token is generated or refreshed. @@ -264,9 +412,12 @@ export function registerHubIpcHandlers(): void { /** * Set up device confirmation flow between Hub (main process) and renderer. + * Also stores window reference for local chat IPC events. * Must be called after both Hub initialization and window creation. */ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): void { + // Store reference for local chat IPC + mainWindowRef = mainWindow const h = getHub() const pendingConfirms = new Map void>() @@ -300,6 +451,12 @@ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): voi * Cleanup Hub resources. */ export function cleanupHub(): void { + // Unsubscribe all IPC listeners + for (const unsubscribe of ipcAgentSubscriptions.values()) { + unsubscribe() + } + ipcAgentSubscriptions.clear() + if (hub) { safeLog('[Desktop] Shutting down Hub') hub.shutdown() diff --git a/apps/desktop/electron/preload.ts b/apps/desktop/electron/preload.ts index d93ac3f1..21edbe7c 100644 --- a/apps/desktop/electron/preload.ts +++ b/apps/desktop/electron/preload.ts @@ -44,6 +44,23 @@ export interface ProfileData { userContent: string | undefined } +// Local chat event types (for direct IPC communication without Gateway) +export interface LocalChatEvent { + agentId: string + streamId?: string + type?: 'error' + content?: string + event?: { + type: 'message_start' | 'message_update' | 'message_end' | 'tool_execution_start' | 'tool_execution_end' + id?: string + message?: { + role: string + content?: Array<{ type: string; text?: string }> + } + [key: string]: unknown + } +} + // Available style options export const AGENT_STYLES = ['concise', 'warm', 'playful', 'professional'] as const export type AgentStyle = (typeof AGENT_STYLES)[number] @@ -116,11 +133,35 @@ const electronAPI = { updateStyle: (style: string) => ipcRenderer.invoke('profile:updateStyle', style), updateUser: (content: string) => ipcRenderer.invoke('profile:updateUser', content), }, + + // Local chat (direct IPC, no Gateway required) + localChat: { + /** Subscribe to agent events for local direct chat */ + subscribe: (agentId: string) => ipcRenderer.invoke('localChat:subscribe', agentId), + /** Unsubscribe from agent events */ + unsubscribe: (agentId: string) => ipcRenderer.invoke('localChat:unsubscribe', agentId), + /** Get message history for local chat */ + getHistory: (agentId: string): Promise<{ messages: Array<{ id: string; role: 'user' | 'assistant'; content: string; agentId: string }> }> => + ipcRenderer.invoke('localChat:getHistory', agentId), + /** Send message to agent via direct IPC (no Gateway) */ + send: (agentId: string, content: string) => ipcRenderer.invoke('localChat:send', agentId, content), + /** Listen for agent events */ + onEvent: (callback: (event: LocalChatEvent) => void) => { + ipcRenderer.on('localChat:event', (_event, data: LocalChatEvent) => callback(data)) + }, + /** Remove event listener */ + offEvent: () => { + ipcRenderer.removeAllListeners('localChat:event') + }, + }, } // Expose to renderer contextBridge.exposeInMainWorld('electronAPI', electronAPI) +// Re-export LocalChatEvent type for use in renderer +export type { LocalChatEvent } + // Also expose ipcRenderer for backward compatibility contextBridge.exposeInMainWorld('ipcRenderer', { on(...args: Parameters) {