feat(agent): add subscribe method for multiple event consumers

Add AsyncAgent.subscribe() that allows multiple subscribers to receive
the same agent events, enabling local IPC chat to coexist with other
event consumers.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jiang Bohan 2026-02-04 16:46:28 +08:00
parent c70dd338c2
commit 4086fa6985
2 changed files with 160 additions and 0 deletions

View file

@ -0,0 +1,144 @@
/**
* Hook for local direct chat with agent via IPC (no Gateway required).
*
* This hook bridges IPC events to useMessagesStore, allowing the Chat component
* to work identically in both local IPC and remote Gateway modes.
*/
import { useState, useEffect, useCallback, useRef } from 'react'
import { useMessagesStore } from '@multica/store'
interface UseLocalChatOptions {
agentId: string
}
interface UseLocalChatReturn {
isConnected: boolean
isLoading: boolean
sendMessage: (content: string) => void
disconnect: () => void
}
/**
* Provides local IPC chat that uses the same useMessagesStore as Gateway mode.
* This enables full Chat component reuse.
*/
export function useLocalChat({ agentId }: UseLocalChatOptions): UseLocalChatReturn {
const [isConnected, setIsConnected] = useState(false)
const [isLoading, setIsLoading] = useState(false)
const currentStreamRef = useRef<string | null>(null)
// Subscribe to agent events on mount
useEffect(() => {
if (!agentId) return
const subscribe = async () => {
const result = await window.electronAPI.localChat.subscribe(agentId)
if (result.ok) {
setIsConnected(true)
}
}
subscribe()
// Load message history from agent session
const loadHistory = async () => {
try {
const result = await window.electronAPI.localChat.getHistory(agentId)
if (result.messages && result.messages.length > 0) {
useMessagesStore.getState().loadMessages(result.messages)
}
} catch {
// History load is best-effort
}
}
loadHistory()
// Listen for events and route to useMessagesStore
window.electronAPI.localChat.onEvent((event) => {
if (event.agentId !== agentId) return
const store = useMessagesStore.getState()
// Handle error
if (event.type === 'error') {
store.addAssistantMessage(event.content ?? 'Unknown error', agentId)
setIsLoading(false)
return
}
// Handle agent events - same logic as connection-store.ts
const agentEvent = event.event
const streamId = event.streamId
if (!agentEvent || !streamId) return
if (agentEvent.type === 'message_start') {
currentStreamRef.current = streamId
store.startStream(streamId, agentId)
// Extract initial text if any
const text = extractTextFromAgentEvent(agentEvent)
if (text) store.appendStream(streamId, text)
} else if (agentEvent.type === 'message_update') {
const text = extractTextFromAgentEvent(agentEvent)
if (text && currentStreamRef.current) {
store.appendStream(currentStreamRef.current, text)
}
} else if (agentEvent.type === 'message_end') {
const text = extractTextFromAgentEvent(agentEvent)
if (currentStreamRef.current) {
store.endStream(currentStreamRef.current, text)
currentStreamRef.current = null
}
setIsLoading(false)
}
})
return () => {
window.electronAPI.localChat.offEvent()
window.electronAPI.localChat.unsubscribe(agentId)
setIsConnected(false)
}
}, [agentId])
const sendMessage = useCallback(
async (content: string) => {
if (!content.trim() || !agentId || isLoading) return
// Add user message to store (same as Gateway mode)
useMessagesStore.getState().addUserMessage(content.trim(), agentId)
setIsLoading(true)
// Send via IPC
const result = await window.electronAPI.localChat.send(agentId, content.trim())
if (result.error) {
useMessagesStore.getState().addAssistantMessage(`Error: ${result.error}`, agentId)
setIsLoading(false)
}
},
[agentId, isLoading]
)
const disconnect = useCallback(() => {
useMessagesStore.getState().clearMessages()
setIsConnected(false)
setIsLoading(false)
}, [])
return {
isConnected,
isLoading,
sendMessage,
disconnect,
}
}
/**
* Extract text content from AgentEvent message.
* Same logic as @multica/sdk extractTextFromEvent.
*/
function extractTextFromAgentEvent(event: { message?: { content?: Array<{ type: string; text?: string }> } }): string {
if (!event.message?.content) return ''
return event.message.content
.filter((c): c is { type: 'text'; text: string } => c.type === 'text' && !!c.text)
.map((c) => c.text)
.join('')
}

View file

@ -58,6 +58,22 @@ export class AsyncAgent {
return this.channel;
}
/**
* Subscribe to agent events directly (supports multiple subscribers).
* Unlike read(), this allows multiple consumers to receive the same events.
*/
subscribe(callback: (event: AgentEvent) => void): () => void {
console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`);
const unsubscribe = this.agent.subscribe((event) => {
console.log(`[AsyncAgent] Event received: ${event.type}`);
callback(event);
});
return () => {
console.log(`[AsyncAgent] Removing subscriber for agent: ${this.sessionId}`);
unsubscribe();
};
}
/** Returns a promise that resolves when the current message queue is drained */
waitForIdle(): Promise<void> {
return this.queue;