refactor(store): consolidate 3 stores into ConnectionStore + MessagesStore

Replace useGatewayStore, useHubStore, useDeviceId, and useHubInit with:
- ConnectionStore: WebSocket lifecycle, deviceId persistence via Zustand persist,
  fetch message history on registration via getAgentMessages RPC
- MessagesStore: simplified to current-agent-only, sendMessage accepts SendContext
  to break circular import with ConnectionStore
- useAutoConnect: returns { loading } for skeleton UI, skips connect if already
  connected (fixes Electron tab-switch reconnect), no cleanup disconnect

Removes: gateway.ts, hub.ts, hub-init.ts, device-id.ts, sonner dep from store

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-02-04 10:12:45 +08:00
parent 32c406a2a4
commit 162a86dff4
9 changed files with 308 additions and 365 deletions

View file

@ -10,7 +10,6 @@
"dependencies": {
"@multica/sdk": "workspace:*",
"react": "catalog:",
"sonner": "^2.0.7",
"uuid": "^13.0.0",
"zustand": "catalog:"
},

View file

@ -0,0 +1,228 @@
/**
* Connection Store - manages WebSocket connection lifecycle
*
* Responsibilities:
* 1. Persist deviceId (auto-generated on first run, restored from localStorage)
* 2. Establish WebSocket connection to Gateway using connection code (from QR/paste)
* 3. Maintain connection state (disconnected connecting connected registered)
* 4. Route incoming stream messages from Hub to MessagesStore
* 5. Provide send() for MessagesStore to send messages
*
* Data flow:
* connection code connect() GatewayClient(Socket.io) Gateway server
*
* onMessage callback MessagesStore
*/
import { create } from "zustand"
import { persist } from "zustand/middleware"
import { v7 as uuidv7 } from "uuid"
import {
GatewayClient,
StreamAction,
extractTextFromEvent,
type ConnectionState,
type SendErrorResponse,
type StreamPayload,
type StreamMessageEvent,
} from "@multica/sdk"
import { useMessagesStore } from "./messages"
import { clearConnection, type ConnectionInfo } from "./connection"
interface ConnectionStoreState {
deviceId: string
gatewayUrl: string | null
hubId: string | null
agentId: string | null
connectionState: ConnectionState
lastError: SendErrorResponse | null
}
interface ConnectionStoreActions {
connect: (code: ConnectionInfo) => void
disconnect: () => void
send: (to: string, action: string, payload: unknown) => void
}
export type ConnectionStore = ConnectionStoreState & ConnectionStoreActions
// Module-level singleton — only one WebSocket connection per app
let client: GatewayClient | null = null
/**
* Create a GatewayClient and bind message-handling callbacks.
*
* GatewayClient is defined in packages/sdk/src/client.ts
* It wraps Socket.io and exposes:
* - connect() establish WebSocket connection
* - disconnect() tear down connection
* - send(to, action, payload) send message to a specific device
* - request(to, method, params) send RPC request and await response
* - onStateChange(cb) listen for connection state changes
* - onMessage(cb) listen for incoming messages
* - onSendError(cb) listen for send failures
* - isRegistered / isConnected connection state checks
*
* Connection requires two params:
* - url: Gateway server address (from connection code's gateway field)
* - deviceId: unique device identifier (persisted in this store)
*
* Sending messages requires two routing params:
* - hubId: which Hub to send to (from connection code)
* - agentId: which Agent within the Hub (from connection code)
*/
function createClient(
url: string,
deviceId: string,
set: (s: Partial<ConnectionStoreState>) => void,
getState: () => ConnectionStoreState,
): GatewayClient {
return new GatewayClient({
url,
deviceId,
deviceType: "client",
})
// Sync connection state changes to the store
.onStateChange((connectionState) => {
set({ connectionState })
// Fetch message history after successful registration
if (connectionState === "registered") {
void fetchHistory(getState())
}
})
// Route incoming messages to MessagesStore
.onMessage((msg) => {
// Streaming messages: Agent replies arrive in chunks
if (msg.action === StreamAction) {
const payload = msg.payload as StreamPayload
const store = useMessagesStore.getState()
const { event } = payload
switch (event.type) {
case "message_start": {
store.startStream(payload.streamId, payload.agentId)
const text = extractTextFromEvent(event as StreamMessageEvent)
if (text) store.appendStream(payload.streamId, text)
break
}
case "message_update": {
const text = extractTextFromEvent(event as StreamMessageEvent)
store.appendStream(payload.streamId, text)
break
}
case "message_end": {
const text = extractTextFromEvent(event as StreamMessageEvent)
store.endStream(payload.streamId, text)
break
}
case "tool_execution_start":
case "tool_execution_end":
break
}
return
}
// Handle direct (non-streaming) messages
const payload = msg.payload as { agentId?: string; content?: string }
if (payload?.agentId && payload?.content) {
useMessagesStore.getState().addAssistantMessage(payload.content, payload.agentId)
}
})
.onSendError((error) => set({ lastError: error }))
}
/** Fetch message history from Hub via RPC after connection is established */
async function fetchHistory(state: ConnectionStoreState): Promise<void> {
const { hubId, agentId } = state
if (!client || !hubId || !agentId) return
try {
const result = await client.request<{
messages: Array<{ role: string; content: unknown }>
total: number
}>(hubId, "getAgentMessages", { agentId, limit: 200 })
const messages = result.messages
.filter((m) => m.role === "user" || m.role === "assistant")
.map((m) => ({
id: uuidv7(),
role: m.role as "user" | "assistant",
content: extractText(m.content),
agentId: agentId,
}))
.filter((m) => m.content.length > 0)
if (messages.length > 0) {
useMessagesStore.getState().loadMessages(messages)
}
} catch {
// History fetch is best-effort — connection still works without it
}
}
/** Extract plain text from AgentMessage content (string or content block array) */
function extractText(content: unknown): string {
if (typeof content === "string") return content
if (!Array.isArray(content)) return ""
return content
.filter((c: { type?: string }) => c.type === "text")
.map((c: { text?: string }) => c.text ?? "")
.join("")
}
export const useConnectionStore = create<ConnectionStore>()(
persist(
(set, get) => ({
deviceId: uuidv7(),
gatewayUrl: null,
hubId: null,
agentId: null,
connectionState: "disconnected",
lastError: null,
// Connect using a connection code (disconnect existing connection first)
connect: (code) => {
if (client) {
client.disconnect()
client = null
}
set({
gatewayUrl: code.gateway,
hubId: code.hubId,
agentId: code.agentId,
})
client = createClient(code.gateway, get().deviceId, set, get)
client.connect()
},
// Disconnect and clear all state (messages + saved connection code)
disconnect: () => {
if (client) {
client.disconnect()
client = null
}
useMessagesStore.getState().clearMessages()
clearConnection()
set({
connectionState: "disconnected",
gatewayUrl: null,
hubId: null,
agentId: null,
lastError: null,
})
},
// Send a message to a target device (called by MessagesStore.sendMessage)
send: (to, action, payload) => {
if (!client?.isRegistered) return
client.send(to, action, payload)
},
}),
{
name: "multica-device",
// Only persist deviceId — other fields are runtime state
partialize: (state) => ({ deviceId: state.deviceId }),
},
),
)

View file

@ -1,28 +0,0 @@
"use client"
import { useSyncExternalStore } from "react"
import { v7 as uuidv7 } from "uuid"
const STORAGE_KEY = "multica-device-id"
function getSnapshot(): string {
let id = localStorage.getItem(STORAGE_KEY)
if (!id) {
id = uuidv7()
localStorage.setItem(STORAGE_KEY, id)
}
return id
}
function subscribe(cb: () => void) {
window.addEventListener("storage", cb)
return () => window.removeEventListener("storage", cb)
}
function getServerSnapshot(): string {
return ""
}
export function useDeviceId(): string {
return useSyncExternalStore(subscribe, getSnapshot, getServerSnapshot)
}

View file

@ -1,140 +0,0 @@
import { create } from "zustand"
import { GatewayClient, StreamAction, extractTextFromEvent, type ConnectionState, type DeviceInfo, type SendErrorResponse, type StreamPayload, type StreamMessageEvent } from "@multica/sdk"
import { useMessagesStore } from "./messages"
import type { ConnectionInfo } from "./connection"
const DEFAULT_GATEWAY_URL = "http://localhost:3000"
interface GatewayState {
gatewayUrl: string
connectionState: ConnectionState
hubId: string | null
agentId: string | null
hubs: DeviceInfo[]
lastError: SendErrorResponse | null
}
interface GatewayActions {
setGatewayUrl: (url: string) => void
connect: (deviceId: string) => void
connectWithCode: (code: ConnectionInfo, deviceId: string) => void
disconnect: () => void
setHubId: (hubId: string) => void
listDevices: () => Promise<DeviceInfo[]>
send: (to: string, action: string, payload: unknown) => void
request: <T = unknown>(method: string, params?: unknown) => Promise<T>
}
export type GatewayStore = GatewayState & GatewayActions
let client: GatewayClient | null = null
function createClient(url: string, deviceId: string, set: (s: Partial<GatewayState>) => void): GatewayClient {
return new GatewayClient({
url,
deviceId,
deviceType: "client",
})
.onStateChange((connectionState) => set({ connectionState }))
.onMessage((msg) => {
if (msg.action === StreamAction) {
const payload = msg.payload as StreamPayload
const store = useMessagesStore.getState()
const { event } = payload
switch (event.type) {
case "message_start": {
store.startStream(payload.streamId, payload.agentId)
const text = extractTextFromEvent(event as StreamMessageEvent)
if (text) store.appendStream(payload.streamId, text)
break
}
case "message_update": {
const text = extractTextFromEvent(event as StreamMessageEvent)
store.appendStream(payload.streamId, text)
break
}
case "message_end": {
const text = extractTextFromEvent(event as StreamMessageEvent)
store.endStream(payload.streamId, text)
break
}
case "tool_execution_start":
case "tool_execution_end":
break
}
return
}
const payload = msg.payload as { agentId?: string; content?: string }
if (payload?.agentId && payload?.content) {
useMessagesStore.getState().addAssistantMessage(payload.content, payload.agentId)
}
})
.onSendError((error) => set({ lastError: error }))
}
export const useGatewayStore = create<GatewayStore>()((set, get) => ({
gatewayUrl: DEFAULT_GATEWAY_URL,
connectionState: "disconnected",
hubId: null,
agentId: null,
hubs: [],
lastError: null,
setGatewayUrl: (url) => set({ gatewayUrl: url }),
connect: (deviceId) => {
if (client) return
client = createClient(get().gatewayUrl, deviceId, set)
client.connect()
},
connectWithCode: (code, deviceId) => {
// Disconnect existing connection if any
if (client) {
client.disconnect()
client = null
}
set({
gatewayUrl: code.gateway,
hubId: code.hubId,
agentId: code.agentId,
})
client = createClient(code.gateway, deviceId, set)
client.connect()
},
disconnect: () => {
if (client) {
client.disconnect()
client = null
}
set({ connectionState: "disconnected", hubId: null, agentId: null, hubs: [] })
},
setHubId: (hubId) => set({ hubId }),
listDevices: async () => {
if (!client?.isRegistered) return []
const devices = await client.listDevices()
const hubs = devices.filter((d) => d.deviceType === "hub")
set({ hubs })
return devices
},
send: (to, action, payload) => {
if (!client?.isRegistered) return
client.send(to, action, payload)
},
request: <T = unknown>(method: string, params?: unknown): Promise<T> => {
const { hubId } = get()
if (!client?.isRegistered || !hubId) {
return Promise.reject(new Error("Not connected"))
}
return client.request<T>(hubId, method, params)
},
}))

View file

@ -1,42 +0,0 @@
"use client"
import { useEffect } from "react"
import { useHubStore } from "./hub"
import { useDeviceId } from "./device-id"
import { useGatewayStore } from "./gateway"
import { loadConnection } from "./connection"
export function useHubInit() {
const deviceId = useDeviceId()
const gwState = useGatewayStore((s) => s.connectionState)
const hubId = useGatewayStore((s) => s.hubId)
const agentId = useGatewayStore((s) => s.agentId)
const reset = useHubStore((s) => s.reset)
const fetchHub = useHubStore((s) => s.fetchHub)
const fetchAgents = useHubStore((s) => s.fetchAgents)
const setActiveAgentId = useHubStore((s) => s.setActiveAgentId)
// Auto-connect from saved connection code
useEffect(() => {
if (!deviceId) return
const saved = loadConnection()
if (saved) {
useGatewayStore.getState().connectWithCode(saved, deviceId)
}
return () => { useGatewayStore.getState().disconnect() }
}, [deviceId])
// Once registered with a hub, fetch hub info and agents, set active agent
useEffect(() => {
if (gwState === "registered" && hubId) {
fetchHub()
fetchAgents()
if (agentId) {
setActiveAgentId(agentId)
}
}
if (gwState === "disconnected") {
reset()
}
}, [gwState, hubId, agentId, reset, fetchHub, fetchAgents, setActiveAgentId])
}

View file

@ -1,137 +0,0 @@
import { create } from "zustand"
import { toast } from "sonner"
import { v7 as uuidv7 } from "uuid"
import type {
GetHubInfoResult,
ListAgentsResult,
CreateAgentResult,
DeleteAgentResult,
GetAgentMessagesResult,
AgentMessageItem,
} from "@multica/sdk"
import { useGatewayStore } from "./gateway"
import { useMessagesStore } from "./messages"
/** Extract plain text from agent message content (string or content block array) */
function extractText(content: string | { type: string; text?: string }[]): string {
if (typeof content === "string") return content
return content
.filter((b) => b.type === "text" && b.text)
.map((b) => b.text!)
.join("\n")
}
export type HubInfo = GetHubInfoResult
export interface Agent {
id: string
closed: boolean
}
export type HubStatus = "idle" | "loading" | "connected" | "error"
interface HubState {
status: HubStatus
hub: HubInfo | null
agents: Agent[]
activeAgentId: string | null
}
interface HubActions {
reset: () => void
setActiveAgentId: (id: string | null) => void
fetchHub: () => Promise<void>
fetchAgents: () => Promise<void>
fetchAgentMessages: (agentId: string) => Promise<void>
createAgent: (options?: Record<string, unknown>) => Promise<void>
deleteAgent: (id: string) => Promise<void>
}
export type HubStore = HubState & HubActions
export const useHubStore = create<HubStore>()((set, get) => ({
status: "idle",
hub: null,
agents: [],
activeAgentId: null,
reset: () => set({ status: "idle", hub: null, agents: [], activeAgentId: null }),
setActiveAgentId: (id) => {
set({ activeAgentId: id })
if (id) {
// Load history if no messages exist for this agent yet
const existing = useMessagesStore.getState().messages.filter((m) => m.agentId === id)
if (existing.length === 0) {
get().fetchAgentMessages(id)
}
}
},
fetchHub: async () => {
set({ status: "loading" })
try {
const { request } = useGatewayStore.getState()
const data = await request<GetHubInfoResult>("getHubInfo")
set({ hub: data, status: "connected" })
} catch {
set({ status: "error", hub: null })
}
},
fetchAgents: async () => {
try {
const { request } = useGatewayStore.getState()
const data = await request<ListAgentsResult>("listAgents")
set({ agents: data.agents })
} catch (e) {
toast.error("Failed to fetch agents")
console.error(e)
}
},
fetchAgentMessages: async (agentId) => {
try {
const { request } = useGatewayStore.getState()
const data = await request<GetAgentMessagesResult>("getAgentMessages", { agentId })
const msgs = data.messages
.filter((m): m is AgentMessageItem & { role: "user" | "assistant" } =>
m.role === "user" || m.role === "assistant"
)
.map((m) => ({
id: uuidv7(),
role: m.role,
content: extractText(m.content),
agentId,
}))
.filter((m) => m.content.length > 0)
useMessagesStore.getState().loadMessages(agentId, msgs)
} catch (e) {
console.error("Failed to fetch agent messages:", e)
}
},
createAgent: async (options?) => {
try {
const { request } = useGatewayStore.getState()
const data = await request<CreateAgentResult>("createAgent", options)
await get().fetchAgents()
if (data.id) set({ activeAgentId: data.id })
} catch (e) {
toast.error("Failed to create agent")
console.error(e)
}
},
deleteAgent: async (id) => {
if (get().activeAgentId === id) set({ activeAgentId: null })
try {
const { request } = useGatewayStore.getState()
await request<DeleteAgentResult>("deleteAgent", { id })
await get().fetchAgents()
} catch (e) {
toast.error("Failed to delete agent")
console.error(e)
}
},
}))

View file

@ -1,10 +1,7 @@
export { useHubStore } from "./hub"
export type { HubInfo, Agent, HubStatus, HubStore } from "./hub"
export { useHubInit } from "./hub-init"
export { useDeviceId } from "./device-id"
export { useConnectionStore } from "./connection-store"
export type { ConnectionStore } from "./connection-store"
export { useAutoConnect } from "./use-auto-connect"
export { useMessagesStore } from "./messages"
export type { Message, MessagesStore } from "./messages"
export { useGatewayStore } from "./gateway"
export type { GatewayStore } from "./gateway"
export type { Message, MessagesStore, SendContext } from "./messages"
export { parseConnectionCode, saveConnection, loadConnection, clearConnection } from "./connection"
export type { ConnectionInfo } from "./connection"

View file

@ -1,3 +1,20 @@
/**
* Messages Store - manages chat messages and streaming state for the current Agent
*
* Responsibilities:
* 1. Store current Agent's chat messages (replaced on Agent switch, not accumulated)
* 2. Manage streaming state (intermediate state while AI replies arrive in chunks)
* 3. Provide sendMessage() as the single entry point for sending messages
*
* Send flow:
* user input sendMessage(text)
* addUserMessage() immediately adds to local state (optimistic update)
* ConnectionStore.send() sends to Gateway Hub Agent
*
* Receive flow (driven by ConnectionStore's onMessage callback):
* Streaming: startStream appendStream (repeated) endStream
* Non-streaming: addAssistantMessage (one-shot)
*/
import { create } from "zustand"
import { v7 as uuidv7 } from "uuid"
@ -8,17 +25,26 @@ export interface Message {
agentId: string
}
/** Parameters needed to route a message through the gateway */
export interface SendContext {
hubId: string
agentId: string
send: (to: string, action: string, payload: unknown) => void
}
interface MessagesState {
messages: Message[]
streamingIds: Set<string>
}
interface MessagesActions {
sendMessage: (text: string, ctx: SendContext) => void
addUserMessage: (content: string, agentId: string) => void
addAssistantMessage: (content: string, agentId: string) => void
updateMessage: (id: string, content: string) => void
loadMessages: (agentId: string, msgs: Message[]) => void
clearMessages: (agentId?: string) => void
// Replace all messages (for Agent switch or loading history)
loadMessages: (msgs: Message[]) => void
clearMessages: () => void
startStream: (streamId: string, agentId: string) => void
appendStream: (streamId: string, content: string) => void
endStream: (streamId: string, content: string) => void
@ -30,6 +56,12 @@ export const useMessagesStore = create<MessagesStore>()((set, get) => ({
messages: [],
streamingIds: new Set<string>(),
// Single entry point for sending: optimistic local add, then send via WebSocket
sendMessage: (text, ctx) => {
get().addUserMessage(text, ctx.agentId)
ctx.send(ctx.hubId, "message", { agentId: ctx.agentId, content: text })
},
addUserMessage: (content, agentId) => {
set((s) => ({
messages: [...s.messages, { id: uuidv7(), role: "user", content, agentId }],
@ -48,18 +80,17 @@ export const useMessagesStore = create<MessagesStore>()((set, get) => ({
}))
},
loadMessages: (agentId, msgs) => {
set((s) => ({
messages: [...s.messages.filter((m) => m.agentId !== agentId), ...msgs],
}))
// Replace all messages (for Agent switch or loading history)
loadMessages: (msgs) => {
set({ messages: msgs })
},
clearMessages: (agentId?) => {
set((s) => ({
messages: agentId ? s.messages.filter((m) => m.agentId !== agentId) : [],
}))
clearMessages: () => {
set({ messages: [], streamingIds: new Set() })
},
// === The following three methods are called by ConnectionStore's onMessage callback ===
// Stream start: create an empty placeholder message and mark as streaming
startStream: (streamId, agentId) => {
set((s) => {
const ids = new Set(s.streamingIds)
@ -71,12 +102,14 @@ export const useMessagesStore = create<MessagesStore>()((set, get) => ({
})
},
// Stream update: replace message content (each update carries the full accumulated text)
appendStream: (streamId, content) => {
set((s) => ({
messages: s.messages.map((m) => (m.id === streamId ? { ...m, content } : m)),
}))
},
// Stream end: write final content, remove streaming marker
endStream: (streamId, content) => {
set((s) => {
const ids = new Set(s.streamingIds)

View file

@ -0,0 +1,33 @@
"use client"
import { useState, useEffect } from "react"
import { useConnectionStore } from "./connection-store"
import { loadConnection } from "./connection"
/** Auto-connect from saved connection code on mount, skip if already connected */
export function useAutoConnect(): { loading: boolean } {
const connectionState = useConnectionStore((s) => s.connectionState)
const [loading, setLoading] = useState(true)
useEffect(() => {
const state = useConnectionStore.getState()
if (state.connectionState !== "disconnected") {
setLoading(false)
return
}
const saved = loadConnection()
if (saved) {
state.connect(saved)
} else {
setLoading(false)
}
}, [])
useEffect(() => {
if (connectionState !== "disconnected") {
setLoading(false)
}
}, [connectionState])
return { loading }
}