diff --git a/packages/store/package.json b/packages/store/package.json index 61a1b936..e9e891f3 100644 --- a/packages/store/package.json +++ b/packages/store/package.json @@ -10,7 +10,6 @@ "dependencies": { "@multica/sdk": "workspace:*", "react": "catalog:", - "sonner": "^2.0.7", "uuid": "^13.0.0", "zustand": "catalog:" }, diff --git a/packages/store/src/connection-store.ts b/packages/store/src/connection-store.ts new file mode 100644 index 00000000..76589656 --- /dev/null +++ b/packages/store/src/connection-store.ts @@ -0,0 +1,228 @@ +/** + * Connection Store - manages WebSocket connection lifecycle + * + * Responsibilities: + * 1. Persist deviceId (auto-generated on first run, restored from localStorage) + * 2. Establish WebSocket connection to Gateway using connection code (from QR/paste) + * 3. Maintain connection state (disconnected → connecting → connected → registered) + * 4. Route incoming stream messages from Hub to MessagesStore + * 5. Provide send() for MessagesStore to send messages + * + * Data flow: + * connection code → connect() → GatewayClient(Socket.io) → Gateway server + * ↓ + * onMessage callback → MessagesStore + */ +import { create } from "zustand" +import { persist } from "zustand/middleware" +import { v7 as uuidv7 } from "uuid" +import { + GatewayClient, + StreamAction, + extractTextFromEvent, + type ConnectionState, + type SendErrorResponse, + type StreamPayload, + type StreamMessageEvent, +} from "@multica/sdk" +import { useMessagesStore } from "./messages" +import { clearConnection, type ConnectionInfo } from "./connection" + +interface ConnectionStoreState { + deviceId: string + gatewayUrl: string | null + hubId: string | null + agentId: string | null + connectionState: ConnectionState + lastError: SendErrorResponse | null +} + +interface ConnectionStoreActions { + connect: (code: ConnectionInfo) => void + disconnect: () => void + send: (to: string, action: string, payload: unknown) => void +} + +export type ConnectionStore = ConnectionStoreState & ConnectionStoreActions + +// Module-level singleton — only one WebSocket connection per app +let client: GatewayClient | null = null + +/** + * Create a GatewayClient and bind message-handling callbacks. + * + * GatewayClient is defined in packages/sdk/src/client.ts + * It wraps Socket.io and exposes: + * - connect() establish WebSocket connection + * - disconnect() tear down connection + * - send(to, action, payload) send message to a specific device + * - request(to, method, params) send RPC request and await response + * - onStateChange(cb) listen for connection state changes + * - onMessage(cb) listen for incoming messages + * - onSendError(cb) listen for send failures + * - isRegistered / isConnected connection state checks + * + * Connection requires two params: + * - url: Gateway server address (from connection code's gateway field) + * - deviceId: unique device identifier (persisted in this store) + * + * Sending messages requires two routing params: + * - hubId: which Hub to send to (from connection code) + * - agentId: which Agent within the Hub (from connection code) + */ +function createClient( + url: string, + deviceId: string, + set: (s: Partial) => void, + getState: () => ConnectionStoreState, +): GatewayClient { + return new GatewayClient({ + url, + deviceId, + deviceType: "client", + }) + // Sync connection state changes to the store + .onStateChange((connectionState) => { + set({ connectionState }) + // Fetch message history after successful registration + if (connectionState === "registered") { + void fetchHistory(getState()) + } + }) + // Route incoming messages to MessagesStore + .onMessage((msg) => { + // Streaming messages: Agent replies arrive in chunks + if (msg.action === StreamAction) { + const payload = msg.payload as StreamPayload + const store = useMessagesStore.getState() + const { event } = payload + + switch (event.type) { + case "message_start": { + store.startStream(payload.streamId, payload.agentId) + const text = extractTextFromEvent(event as StreamMessageEvent) + if (text) store.appendStream(payload.streamId, text) + break + } + case "message_update": { + const text = extractTextFromEvent(event as StreamMessageEvent) + store.appendStream(payload.streamId, text) + break + } + case "message_end": { + const text = extractTextFromEvent(event as StreamMessageEvent) + store.endStream(payload.streamId, text) + break + } + case "tool_execution_start": + case "tool_execution_end": + break + } + return + } + + // Handle direct (non-streaming) messages + const payload = msg.payload as { agentId?: string; content?: string } + if (payload?.agentId && payload?.content) { + useMessagesStore.getState().addAssistantMessage(payload.content, payload.agentId) + } + }) + .onSendError((error) => set({ lastError: error })) +} + +/** Fetch message history from Hub via RPC after connection is established */ +async function fetchHistory(state: ConnectionStoreState): Promise { + const { hubId, agentId } = state + if (!client || !hubId || !agentId) return + + try { + const result = await client.request<{ + messages: Array<{ role: string; content: unknown }> + total: number + }>(hubId, "getAgentMessages", { agentId, limit: 200 }) + + const messages = result.messages + .filter((m) => m.role === "user" || m.role === "assistant") + .map((m) => ({ + id: uuidv7(), + role: m.role as "user" | "assistant", + content: extractText(m.content), + agentId: agentId, + })) + .filter((m) => m.content.length > 0) + + if (messages.length > 0) { + useMessagesStore.getState().loadMessages(messages) + } + } catch { + // History fetch is best-effort — connection still works without it + } +} + +/** Extract plain text from AgentMessage content (string or content block array) */ +function extractText(content: unknown): string { + if (typeof content === "string") return content + if (!Array.isArray(content)) return "" + return content + .filter((c: { type?: string }) => c.type === "text") + .map((c: { text?: string }) => c.text ?? "") + .join("") +} + +export const useConnectionStore = create()( + persist( + (set, get) => ({ + deviceId: uuidv7(), + gatewayUrl: null, + hubId: null, + agentId: null, + connectionState: "disconnected", + lastError: null, + + // Connect using a connection code (disconnect existing connection first) + connect: (code) => { + if (client) { + client.disconnect() + client = null + } + + set({ + gatewayUrl: code.gateway, + hubId: code.hubId, + agentId: code.agentId, + }) + + client = createClient(code.gateway, get().deviceId, set, get) + client.connect() + }, + + // Disconnect and clear all state (messages + saved connection code) + disconnect: () => { + if (client) { + client.disconnect() + client = null + } + useMessagesStore.getState().clearMessages() + clearConnection() + set({ + connectionState: "disconnected", + gatewayUrl: null, + hubId: null, + agentId: null, + lastError: null, + }) + }, + + // Send a message to a target device (called by MessagesStore.sendMessage) + send: (to, action, payload) => { + if (!client?.isRegistered) return + client.send(to, action, payload) + }, + }), + { + name: "multica-device", + // Only persist deviceId — other fields are runtime state + partialize: (state) => ({ deviceId: state.deviceId }), + }, + ), +) diff --git a/packages/store/src/device-id.ts b/packages/store/src/device-id.ts deleted file mode 100644 index 7f88fc64..00000000 --- a/packages/store/src/device-id.ts +++ /dev/null @@ -1,28 +0,0 @@ -"use client" - -import { useSyncExternalStore } from "react" -import { v7 as uuidv7 } from "uuid" - -const STORAGE_KEY = "multica-device-id" - -function getSnapshot(): string { - let id = localStorage.getItem(STORAGE_KEY) - if (!id) { - id = uuidv7() - localStorage.setItem(STORAGE_KEY, id) - } - return id -} - -function subscribe(cb: () => void) { - window.addEventListener("storage", cb) - return () => window.removeEventListener("storage", cb) -} - -function getServerSnapshot(): string { - return "" -} - -export function useDeviceId(): string { - return useSyncExternalStore(subscribe, getSnapshot, getServerSnapshot) -} diff --git a/packages/store/src/gateway.ts b/packages/store/src/gateway.ts deleted file mode 100644 index 009848d4..00000000 --- a/packages/store/src/gateway.ts +++ /dev/null @@ -1,140 +0,0 @@ -import { create } from "zustand" -import { GatewayClient, StreamAction, extractTextFromEvent, type ConnectionState, type DeviceInfo, type SendErrorResponse, type StreamPayload, type StreamMessageEvent } from "@multica/sdk" -import { useMessagesStore } from "./messages" -import type { ConnectionInfo } from "./connection" - -const DEFAULT_GATEWAY_URL = "http://localhost:3000" - -interface GatewayState { - gatewayUrl: string - connectionState: ConnectionState - hubId: string | null - agentId: string | null - hubs: DeviceInfo[] - lastError: SendErrorResponse | null -} - -interface GatewayActions { - setGatewayUrl: (url: string) => void - connect: (deviceId: string) => void - connectWithCode: (code: ConnectionInfo, deviceId: string) => void - disconnect: () => void - setHubId: (hubId: string) => void - listDevices: () => Promise - send: (to: string, action: string, payload: unknown) => void - request: (method: string, params?: unknown) => Promise -} - -export type GatewayStore = GatewayState & GatewayActions - -let client: GatewayClient | null = null - -function createClient(url: string, deviceId: string, set: (s: Partial) => void): GatewayClient { - return new GatewayClient({ - url, - deviceId, - deviceType: "client", - }) - .onStateChange((connectionState) => set({ connectionState })) - .onMessage((msg) => { - if (msg.action === StreamAction) { - const payload = msg.payload as StreamPayload - const store = useMessagesStore.getState() - const { event } = payload - - switch (event.type) { - case "message_start": { - store.startStream(payload.streamId, payload.agentId) - const text = extractTextFromEvent(event as StreamMessageEvent) - if (text) store.appendStream(payload.streamId, text) - break - } - case "message_update": { - const text = extractTextFromEvent(event as StreamMessageEvent) - store.appendStream(payload.streamId, text) - break - } - case "message_end": { - const text = extractTextFromEvent(event as StreamMessageEvent) - store.endStream(payload.streamId, text) - break - } - case "tool_execution_start": - case "tool_execution_end": - break - } - return - } - - const payload = msg.payload as { agentId?: string; content?: string } - if (payload?.agentId && payload?.content) { - useMessagesStore.getState().addAssistantMessage(payload.content, payload.agentId) - } - }) - .onSendError((error) => set({ lastError: error })) -} - -export const useGatewayStore = create()((set, get) => ({ - gatewayUrl: DEFAULT_GATEWAY_URL, - connectionState: "disconnected", - hubId: null, - agentId: null, - hubs: [], - lastError: null, - - setGatewayUrl: (url) => set({ gatewayUrl: url }), - - connect: (deviceId) => { - if (client) return - client = createClient(get().gatewayUrl, deviceId, set) - client.connect() - }, - - connectWithCode: (code, deviceId) => { - // Disconnect existing connection if any - if (client) { - client.disconnect() - client = null - } - - set({ - gatewayUrl: code.gateway, - hubId: code.hubId, - agentId: code.agentId, - }) - - client = createClient(code.gateway, deviceId, set) - client.connect() - }, - - disconnect: () => { - if (client) { - client.disconnect() - client = null - } - set({ connectionState: "disconnected", hubId: null, agentId: null, hubs: [] }) - }, - - setHubId: (hubId) => set({ hubId }), - - listDevices: async () => { - if (!client?.isRegistered) return [] - const devices = await client.listDevices() - const hubs = devices.filter((d) => d.deviceType === "hub") - set({ hubs }) - return devices - }, - - send: (to, action, payload) => { - if (!client?.isRegistered) return - client.send(to, action, payload) - }, - - request: (method: string, params?: unknown): Promise => { - const { hubId } = get() - if (!client?.isRegistered || !hubId) { - return Promise.reject(new Error("Not connected")) - } - return client.request(hubId, method, params) - }, -})) diff --git a/packages/store/src/hub-init.ts b/packages/store/src/hub-init.ts deleted file mode 100644 index d9735981..00000000 --- a/packages/store/src/hub-init.ts +++ /dev/null @@ -1,42 +0,0 @@ -"use client" - -import { useEffect } from "react" -import { useHubStore } from "./hub" -import { useDeviceId } from "./device-id" -import { useGatewayStore } from "./gateway" -import { loadConnection } from "./connection" - -export function useHubInit() { - const deviceId = useDeviceId() - const gwState = useGatewayStore((s) => s.connectionState) - const hubId = useGatewayStore((s) => s.hubId) - const agentId = useGatewayStore((s) => s.agentId) - const reset = useHubStore((s) => s.reset) - const fetchHub = useHubStore((s) => s.fetchHub) - const fetchAgents = useHubStore((s) => s.fetchAgents) - const setActiveAgentId = useHubStore((s) => s.setActiveAgentId) - - // Auto-connect from saved connection code - useEffect(() => { - if (!deviceId) return - const saved = loadConnection() - if (saved) { - useGatewayStore.getState().connectWithCode(saved, deviceId) - } - return () => { useGatewayStore.getState().disconnect() } - }, [deviceId]) - - // Once registered with a hub, fetch hub info and agents, set active agent - useEffect(() => { - if (gwState === "registered" && hubId) { - fetchHub() - fetchAgents() - if (agentId) { - setActiveAgentId(agentId) - } - } - if (gwState === "disconnected") { - reset() - } - }, [gwState, hubId, agentId, reset, fetchHub, fetchAgents, setActiveAgentId]) -} diff --git a/packages/store/src/hub.ts b/packages/store/src/hub.ts deleted file mode 100644 index f06ca1fe..00000000 --- a/packages/store/src/hub.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { create } from "zustand" -import { toast } from "sonner" -import { v7 as uuidv7 } from "uuid" -import type { - GetHubInfoResult, - ListAgentsResult, - CreateAgentResult, - DeleteAgentResult, - GetAgentMessagesResult, - AgentMessageItem, -} from "@multica/sdk" -import { useGatewayStore } from "./gateway" -import { useMessagesStore } from "./messages" - -/** Extract plain text from agent message content (string or content block array) */ -function extractText(content: string | { type: string; text?: string }[]): string { - if (typeof content === "string") return content - return content - .filter((b) => b.type === "text" && b.text) - .map((b) => b.text!) - .join("\n") -} - -export type HubInfo = GetHubInfoResult - -export interface Agent { - id: string - closed: boolean -} - -export type HubStatus = "idle" | "loading" | "connected" | "error" - -interface HubState { - status: HubStatus - hub: HubInfo | null - agents: Agent[] - activeAgentId: string | null -} - -interface HubActions { - reset: () => void - setActiveAgentId: (id: string | null) => void - fetchHub: () => Promise - fetchAgents: () => Promise - fetchAgentMessages: (agentId: string) => Promise - createAgent: (options?: Record) => Promise - deleteAgent: (id: string) => Promise -} - -export type HubStore = HubState & HubActions - -export const useHubStore = create()((set, get) => ({ - status: "idle", - hub: null, - agents: [], - activeAgentId: null, - - reset: () => set({ status: "idle", hub: null, agents: [], activeAgentId: null }), - - setActiveAgentId: (id) => { - set({ activeAgentId: id }) - if (id) { - // Load history if no messages exist for this agent yet - const existing = useMessagesStore.getState().messages.filter((m) => m.agentId === id) - if (existing.length === 0) { - get().fetchAgentMessages(id) - } - } - }, - - fetchHub: async () => { - set({ status: "loading" }) - try { - const { request } = useGatewayStore.getState() - const data = await request("getHubInfo") - set({ hub: data, status: "connected" }) - } catch { - set({ status: "error", hub: null }) - } - }, - - fetchAgents: async () => { - try { - const { request } = useGatewayStore.getState() - const data = await request("listAgents") - set({ agents: data.agents }) - } catch (e) { - toast.error("Failed to fetch agents") - console.error(e) - } - }, - - fetchAgentMessages: async (agentId) => { - try { - const { request } = useGatewayStore.getState() - const data = await request("getAgentMessages", { agentId }) - const msgs = data.messages - .filter((m): m is AgentMessageItem & { role: "user" | "assistant" } => - m.role === "user" || m.role === "assistant" - ) - .map((m) => ({ - id: uuidv7(), - role: m.role, - content: extractText(m.content), - agentId, - })) - .filter((m) => m.content.length > 0) - useMessagesStore.getState().loadMessages(agentId, msgs) - } catch (e) { - console.error("Failed to fetch agent messages:", e) - } - }, - - createAgent: async (options?) => { - try { - const { request } = useGatewayStore.getState() - const data = await request("createAgent", options) - await get().fetchAgents() - if (data.id) set({ activeAgentId: data.id }) - } catch (e) { - toast.error("Failed to create agent") - console.error(e) - } - }, - - deleteAgent: async (id) => { - if (get().activeAgentId === id) set({ activeAgentId: null }) - try { - const { request } = useGatewayStore.getState() - await request("deleteAgent", { id }) - await get().fetchAgents() - } catch (e) { - toast.error("Failed to delete agent") - console.error(e) - } - }, -})) diff --git a/packages/store/src/index.ts b/packages/store/src/index.ts index 6b42afab..b4e5a097 100644 --- a/packages/store/src/index.ts +++ b/packages/store/src/index.ts @@ -1,10 +1,7 @@ -export { useHubStore } from "./hub" -export type { HubInfo, Agent, HubStatus, HubStore } from "./hub" -export { useHubInit } from "./hub-init" -export { useDeviceId } from "./device-id" +export { useConnectionStore } from "./connection-store" +export type { ConnectionStore } from "./connection-store" +export { useAutoConnect } from "./use-auto-connect" export { useMessagesStore } from "./messages" -export type { Message, MessagesStore } from "./messages" -export { useGatewayStore } from "./gateway" -export type { GatewayStore } from "./gateway" +export type { Message, MessagesStore, SendContext } from "./messages" export { parseConnectionCode, saveConnection, loadConnection, clearConnection } from "./connection" export type { ConnectionInfo } from "./connection" diff --git a/packages/store/src/messages.ts b/packages/store/src/messages.ts index a25625d9..0ced06aa 100644 --- a/packages/store/src/messages.ts +++ b/packages/store/src/messages.ts @@ -1,3 +1,20 @@ +/** + * Messages Store - manages chat messages and streaming state for the current Agent + * + * Responsibilities: + * 1. Store current Agent's chat messages (replaced on Agent switch, not accumulated) + * 2. Manage streaming state (intermediate state while AI replies arrive in chunks) + * 3. Provide sendMessage() as the single entry point for sending messages + * + * Send flow: + * user input → sendMessage(text) + * → addUserMessage() immediately adds to local state (optimistic update) + * → ConnectionStore.send() sends to Gateway → Hub → Agent + * + * Receive flow (driven by ConnectionStore's onMessage callback): + * Streaming: startStream → appendStream (repeated) → endStream + * Non-streaming: addAssistantMessage (one-shot) + */ import { create } from "zustand" import { v7 as uuidv7 } from "uuid" @@ -8,17 +25,26 @@ export interface Message { agentId: string } +/** Parameters needed to route a message through the gateway */ +export interface SendContext { + hubId: string + agentId: string + send: (to: string, action: string, payload: unknown) => void +} + interface MessagesState { messages: Message[] streamingIds: Set } interface MessagesActions { + sendMessage: (text: string, ctx: SendContext) => void addUserMessage: (content: string, agentId: string) => void addAssistantMessage: (content: string, agentId: string) => void updateMessage: (id: string, content: string) => void - loadMessages: (agentId: string, msgs: Message[]) => void - clearMessages: (agentId?: string) => void + // Replace all messages (for Agent switch or loading history) + loadMessages: (msgs: Message[]) => void + clearMessages: () => void startStream: (streamId: string, agentId: string) => void appendStream: (streamId: string, content: string) => void endStream: (streamId: string, content: string) => void @@ -30,6 +56,12 @@ export const useMessagesStore = create()((set, get) => ({ messages: [], streamingIds: new Set(), + // Single entry point for sending: optimistic local add, then send via WebSocket + sendMessage: (text, ctx) => { + get().addUserMessage(text, ctx.agentId) + ctx.send(ctx.hubId, "message", { agentId: ctx.agentId, content: text }) + }, + addUserMessage: (content, agentId) => { set((s) => ({ messages: [...s.messages, { id: uuidv7(), role: "user", content, agentId }], @@ -48,18 +80,17 @@ export const useMessagesStore = create()((set, get) => ({ })) }, - loadMessages: (agentId, msgs) => { - set((s) => ({ - messages: [...s.messages.filter((m) => m.agentId !== agentId), ...msgs], - })) + // Replace all messages (for Agent switch or loading history) + loadMessages: (msgs) => { + set({ messages: msgs }) }, - clearMessages: (agentId?) => { - set((s) => ({ - messages: agentId ? s.messages.filter((m) => m.agentId !== agentId) : [], - })) + clearMessages: () => { + set({ messages: [], streamingIds: new Set() }) }, + // === The following three methods are called by ConnectionStore's onMessage callback === + // Stream start: create an empty placeholder message and mark as streaming startStream: (streamId, agentId) => { set((s) => { const ids = new Set(s.streamingIds) @@ -71,12 +102,14 @@ export const useMessagesStore = create()((set, get) => ({ }) }, + // Stream update: replace message content (each update carries the full accumulated text) appendStream: (streamId, content) => { set((s) => ({ messages: s.messages.map((m) => (m.id === streamId ? { ...m, content } : m)), })) }, + // Stream end: write final content, remove streaming marker endStream: (streamId, content) => { set((s) => { const ids = new Set(s.streamingIds) diff --git a/packages/store/src/use-auto-connect.ts b/packages/store/src/use-auto-connect.ts new file mode 100644 index 00000000..e01a7b1f --- /dev/null +++ b/packages/store/src/use-auto-connect.ts @@ -0,0 +1,33 @@ +"use client" + +import { useState, useEffect } from "react" +import { useConnectionStore } from "./connection-store" +import { loadConnection } from "./connection" + +/** Auto-connect from saved connection code on mount, skip if already connected */ +export function useAutoConnect(): { loading: boolean } { + const connectionState = useConnectionStore((s) => s.connectionState) + const [loading, setLoading] = useState(true) + + useEffect(() => { + const state = useConnectionStore.getState() + if (state.connectionState !== "disconnected") { + setLoading(false) + return + } + const saved = loadConnection() + if (saved) { + state.connect(saved) + } else { + setLoading(false) + } + }, []) + + useEffect(() => { + if (connectionState !== "disconnected") { + setLoading(false) + } + }, [connectionState]) + + return { loading } +}