diff --git a/packages/store/src/index.ts b/packages/store/src/index.ts index b4e5a097..f2ee2670 100644 --- a/packages/store/src/index.ts +++ b/packages/store/src/index.ts @@ -2,6 +2,6 @@ export { useConnectionStore } from "./connection-store" export type { ConnectionStore } from "./connection-store" export { useAutoConnect } from "./use-auto-connect" export { useMessagesStore } from "./messages" -export type { Message, MessagesStore, SendContext } from "./messages" +export type { Message, MessagesStore, SendContext, ToolStatus } from "./messages" export { parseConnectionCode, saveConnection, loadConnection, clearConnection } from "./connection" export type { ConnectionInfo } from "./connection" diff --git a/packages/store/src/messages.ts b/packages/store/src/messages.ts index 0ced06aa..61827471 100644 --- a/packages/store/src/messages.ts +++ b/packages/store/src/messages.ts @@ -1,28 +1,33 @@ /** - * Messages Store - manages chat messages and streaming state for the current Agent + * Messages Store - manages chat messages and streaming state * - * Responsibilities: - * 1. Store current Agent's chat messages (replaced on Agent switch, not accumulated) - * 2. Manage streaming state (intermediate state while AI replies arrive in chunks) - * 3. Provide sendMessage() as the single entry point for sending messages + * Data model mirrors the backend (pi-ai / pi-agent-core) exactly: + * - UserMessage: { role: "user", content: ContentBlock[] } + * - AssistantMessage: { role: "assistant", content: ContentBlock[] } + * - ToolResultMessage: { role: "toolResult", toolCallId, toolName, content, isError } * - * Send flow: - * user input → sendMessage(text) - * → addUserMessage() immediately adds to local state (optimistic update) - * → ConnectionStore.send() sends to Gateway → Hub → Agent - * - * Receive flow (driven by ConnectionStore's onMessage callback): - * Streaming: startStream → appendStream (repeated) → endStream - * Non-streaming: addAssistantMessage (one-shot) + * Streaming simply updates the content of the current assistant message in-place. + * Tool execution events (start/end) create / update toolResult messages. */ import { create } from "zustand" import { v7 as uuidv7 } from "uuid" +import type { ContentBlock } from "@multica/sdk" + +export type ToolStatus = "running" | "success" | "error" | "interrupted" export interface Message { id: string - role: "user" | "assistant" - content: string + role: "user" | "assistant" | "toolResult" + content: ContentBlock[] agentId: string + // AssistantMessage metadata + stopReason?: string + // ToolResult fields (only when role === "toolResult") + toolCallId?: string + toolName?: string + toolArgs?: Record + toolStatus?: ToolStatus + isError?: boolean } /** Parameters needed to route a message through the gateway */ @@ -41,13 +46,16 @@ interface MessagesActions { sendMessage: (text: string, ctx: SendContext) => void addUserMessage: (content: string, agentId: string) => void addAssistantMessage: (content: string, agentId: string) => void - updateMessage: (id: string, content: string) => void - // Replace all messages (for Agent switch or loading history) + updateMessage: (id: string, content: ContentBlock[]) => void loadMessages: (msgs: Message[]) => void clearMessages: () => void + // Streaming startStream: (streamId: string, agentId: string) => void - appendStream: (streamId: string, content: string) => void - endStream: (streamId: string, content: string) => void + appendStream: (streamId: string, content: ContentBlock[]) => void + endStream: (streamId: string, content: ContentBlock[], stopReason?: string) => void + // Tool execution lifecycle + startToolExecution: (agentId: string, toolCallId: string, toolName: string, args?: unknown) => void + endToolExecution: (toolCallId: string, result?: unknown, isError?: boolean) => void } export type MessagesStore = MessagesState & MessagesActions @@ -56,7 +64,6 @@ export const useMessagesStore = create()((set, get) => ({ messages: [], streamingIds: new Set(), - // Single entry point for sending: optimistic local add, then send via WebSocket sendMessage: (text, ctx) => { get().addUserMessage(text, ctx.agentId) ctx.send(ctx.hubId, "message", { agentId: ctx.agentId, content: text }) @@ -64,13 +71,23 @@ export const useMessagesStore = create()((set, get) => ({ addUserMessage: (content, agentId) => { set((s) => ({ - messages: [...s.messages, { id: uuidv7(), role: "user", content, agentId }], + messages: [...s.messages, { + id: uuidv7(), + role: "user", + content: [{ type: "text" as const, text: content }], + agentId, + }], })) }, addAssistantMessage: (content, agentId) => { set((s) => ({ - messages: [...s.messages, { id: uuidv7(), role: "assistant", content, agentId }], + messages: [...s.messages, { + id: uuidv7(), + role: "assistant", + content: [{ type: "text" as const, text: content }], + agentId, + }], })) }, @@ -80,7 +97,6 @@ export const useMessagesStore = create()((set, get) => ({ })) }, - // Replace all messages (for Agent switch or loading history) loadMessages: (msgs) => { set({ messages: msgs }) }, @@ -89,35 +105,76 @@ export const useMessagesStore = create()((set, get) => ({ set({ messages: [], streamingIds: new Set() }) }, - // === The following three methods are called by ConnectionStore's onMessage callback === - // Stream start: create an empty placeholder message and mark as streaming + // --- Streaming: build assistant message incrementally --- + startStream: (streamId, agentId) => { set((s) => { const ids = new Set(s.streamingIds) ids.add(streamId) return { - messages: [...s.messages, { id: streamId, role: "assistant" as const, content: "", agentId }], + messages: [...s.messages, { id: streamId, role: "assistant" as const, content: [], agentId }], streamingIds: ids, } }) }, - // Stream update: replace message content (each update carries the full accumulated text) + // Replace the entire content array with the latest partial snapshot appendStream: (streamId, content) => { set((s) => ({ messages: s.messages.map((m) => (m.id === streamId ? { ...m, content } : m)), })) }, - // Stream end: write final content, remove streaming marker - endStream: (streamId, content) => { + endStream: (streamId, content, stopReason) => { set((s) => { const ids = new Set(s.streamingIds) ids.delete(streamId) return { - messages: s.messages.map((m) => (m.id === streamId ? { ...m, content } : m)), + messages: s.messages.map((m) => { + if (m.id === streamId) return { ...m, content, stopReason } + // Interrupt any still-running tool executions + if (m.role === "toolResult" && m.toolStatus === "running") { + return { ...m, toolStatus: "interrupted" as ToolStatus } + } + return m + }), streamingIds: ids, } }) }, + + // --- Tool execution: create / update toolResult messages --- + + startToolExecution: (agentId, toolCallId, toolName, args) => { + set((s) => ({ + messages: [...s.messages, { + id: uuidv7(), + role: "toolResult" as const, + content: [], + agentId, + toolCallId, + toolName, + toolArgs: args as Record | undefined, + toolStatus: "running" as ToolStatus, + isError: false, + }], + })) + }, + + endToolExecution: (toolCallId, result, isError) => { + set((s) => ({ + messages: s.messages.map((m) => + m.role === "toolResult" && m.toolCallId === toolCallId + ? { + ...m, + toolStatus: (isError ? "error" : "success") as ToolStatus, + isError: isError ?? false, + content: result != null + ? [{ type: "text" as const, text: typeof result === "string" ? result : JSON.stringify(result) }] + : [], + } + : m + ), + })) + }, })) diff --git a/packages/ui/src/components/message-list.tsx b/packages/ui/src/components/message-list.tsx index 937230e6..b372d96f 100644 --- a/packages/ui/src/components/message-list.tsx +++ b/packages/ui/src/components/message-list.tsx @@ -2,8 +2,23 @@ import { MemoizedMarkdown } from "@multica/ui/components/markdown"; import { StreamingMarkdown } from "@multica/ui/components/markdown/StreamingMarkdown"; +import { ToolCallItem } from "@multica/ui/components/tool-call-item"; import { cn } from "@multica/ui/lib/utils"; import type { Message } from "@multica/store"; +import type { ContentBlock } from "@multica/sdk"; + +/** Extract plain text from ContentBlock[] */ +function getTextContent(blocks: ContentBlock[]): string { + return blocks + .filter((b): b is { type: "text"; text: string } => b.type === "text") + .map((b) => b.text) + .join("") +} + +/** Check if content has any toolCall blocks */ +function getToolCalls(blocks: ContentBlock[]) { + return blocks.filter((b): b is Extract => b.type === "toolCall") +} interface MessageListProps { messages: Message[] @@ -12,9 +27,26 @@ interface MessageListProps { export function MessageList({ messages, streamingIds }: MessageListProps) { return ( -
+
{messages.map((msg) => { + // ToolResult messages → render as tool execution item + if (msg.role === "toolResult") { + return + } + + const text = getTextContent(msg.content) + const toolCalls = msg.role === "assistant" ? getToolCalls(msg.content) : [] const isStreaming = streamingIds.has(msg.id) + + // Skip empty assistant messages that only contain toolCalls (no text) + // The toolCalls are visible via the subsequent toolResult entries + if (msg.role === "assistant" && !text && toolCalls.length > 0 && !isStreaming) { + return null + } + + // Skip completely empty messages + if (!text && !isStreaming) return null + return (
{isStreaming ? ( - + ) : ( - {msg.content} + {text} )}