From 3c93ebaf1ccc197b1bb1215f86bead5831fa1480 Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Mon, 30 Mar 2026 22:53:28 +0800 Subject: [PATCH 1/4] feat(agent): stream live agent output to issue detail page When an agent is working on an issue, users can now see real-time output in the issue detail page instead of waiting for completion. Backend: - Add task_message table and migration for persisting agent messages - Add POST /api/daemon/tasks/{id}/messages endpoint for daemon to report structured messages (tool_use, tool_result, text, error) in batches - Add GET /api/daemon/tasks/{id}/messages for catch-up after reconnect - Add GET /api/issues/{id}/active-task to check for running tasks - Broadcast task:message events via WebSocket - Daemon forwards agent session messages with 500ms text throttling Frontend: - Add AgentLiveCard component showing live tool calls, text output, and progress indicators with auto-scroll - Wire into issue detail timeline with WS subscription and HTTP catch-up - Card appears when agent is working, disappears on completion/failure --- .../issues/components/agent-live-card.tsx | 354 ++++++++++++++++++ .../issues/components/issue-detail.tsx | 11 + apps/web/shared/api/client.ts | 9 + apps/web/shared/types/events.ts | 26 ++ server/cmd/server/router.go | 3 + server/internal/daemon/client.go | 16 + server/internal/daemon/daemon.go | 86 ++++- server/internal/handler/daemon.go | 118 ++++++ server/migrations/026_task_messages.down.sql | 1 + server/migrations/026_task_messages.up.sql | 13 + server/pkg/db/generated/agent.sql.go | 42 +++ server/pkg/db/generated/models.go | 12 + server/pkg/db/generated/task_message.sql.go | 140 +++++++ server/pkg/db/queries/agent.sql | 5 + server/pkg/db/queries/task_message.sql | 18 + server/pkg/protocol/events.go | 1 + server/pkg/protocol/messages.go | 12 + 17 files changed, 866 insertions(+), 1 deletion(-) create mode 100644 apps/web/features/issues/components/agent-live-card.tsx create mode 100644 server/migrations/026_task_messages.down.sql create mode 100644 server/migrations/026_task_messages.up.sql create mode 100644 server/pkg/db/generated/task_message.sql.go create mode 100644 server/pkg/db/queries/task_message.sql diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx new file mode 100644 index 00000000..71424fa8 --- /dev/null +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -0,0 +1,354 @@ +"use client"; + +import { useState, useEffect, useCallback, useRef } from "react"; +import { Bot, ChevronRight, Loader2, Terminal, FileText, AlertCircle, ArrowDown } from "lucide-react"; +import { api } from "@/shared/api"; +import { useWSEvent } from "@/features/realtime"; +import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from "@/shared/types/events"; +import type { AgentTask } from "@/shared/types/agent"; +import { cn } from "@/lib/utils"; +import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible"; + +interface AgentLiveCardProps { + issueId: string; + assigneeType: string | null; + assigneeId: string | null; + agentName?: string; +} + +// Icons for common tool names +function ToolIcon({ tool }: { tool: string }) { + const name = tool.toLowerCase(); + if (name.includes("bash") || name.includes("shell") || name.includes("terminal")) { + return ; + } + if (name.includes("read") || name.includes("write") || name.includes("edit") || name.includes("glob") || name.includes("grep")) { + return ; + } + return ; +} + +function formatElapsed(startedAt: string): string { + const elapsed = Date.now() - new Date(startedAt).getTime(); + const seconds = Math.floor(elapsed / 1000); + if (seconds < 60) return `${seconds}s`; + const minutes = Math.floor(seconds / 60); + const secs = seconds % 60; + return `${minutes}m ${secs}s`; +} + +interface ToolCallEntry { + seq: number; + tool: string; + input?: Record; + output?: string; +} + +export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: AgentLiveCardProps) { + const [activeTask, setActiveTask] = useState(null); + const [messages, setMessages] = useState([]); + const [toolCalls, setToolCalls] = useState([]); + const [currentText, setCurrentText] = useState(""); + const [elapsed, setElapsed] = useState(""); + const [autoScroll, setAutoScroll] = useState(true); + const scrollRef = useRef(null); + const contentRef = useRef(null); + + // Check for active task on mount + useEffect(() => { + if (assigneeType !== "agent" || !assigneeId) { + setActiveTask(null); + return; + } + + let cancelled = false; + api.getActiveTaskForIssue(issueId).then(({ task }) => { + if (!cancelled) { + setActiveTask(task); + // If there's an active task, fetch existing messages for catch-up + if (task) { + api.listTaskMessages(task.id).then((msgs) => { + if (!cancelled) { + applyMessages(msgs); + } + }).catch(() => {}); + } + } + }).catch(() => {}); + + return () => { cancelled = true; }; + }, [issueId, assigneeType, assigneeId]); + + // Process messages into tool calls and text + const applyMessages = useCallback((msgs: TaskMessagePayload[]) => { + const newToolCalls: ToolCallEntry[] = []; + let text = ""; + + for (const msg of msgs) { + switch (msg.type) { + case "tool_use": + newToolCalls.push({ seq: msg.seq, tool: msg.tool ?? "", input: msg.input }); + break; + case "tool_result": + // Attach output to matching tool call + for (let i = newToolCalls.length - 1; i >= 0; i--) { + const tc = newToolCalls[i]; + if (tc && tc.tool === msg.tool && !tc.output) { + tc.output = msg.output; + break; + } + } + break; + case "text": + text += msg.content ?? ""; + break; + case "error": + text += `\n[Error] ${msg.content ?? ""}\n`; + break; + } + } + + setToolCalls(newToolCalls); + setCurrentText(text); + setMessages(msgs); + }, []); + + // Handle real-time task messages + useWSEvent( + "task:message", + useCallback((payload: unknown) => { + const msg = payload as TaskMessagePayload; + if (msg.issue_id !== issueId) return; + + setMessages((prev) => { + if (prev.some((m) => m.seq === msg.seq && m.task_id === msg.task_id)) return prev; + return [...prev, msg]; + }); + + switch (msg.type) { + case "tool_use": + setToolCalls((prev) => [ + ...prev, + { seq: msg.seq, tool: msg.tool ?? "", input: msg.input }, + ]); + break; + case "tool_result": + setToolCalls((prev) => { + const updated = [...prev]; + for (let i = updated.length - 1; i >= 0; i--) { + const tc = updated[i]; + if (tc && tc.tool === msg.tool && !tc.output) { + updated[i] = { ...tc, output: msg.output }; + break; + } + } + return updated; + }); + break; + case "text": + setCurrentText((prev) => prev + (msg.content ?? "")); + break; + case "error": + setCurrentText((prev) => prev + `\n[Error] ${msg.content ?? ""}\n`); + break; + } + }, [issueId]), + ); + + // Handle task completion - hide the live card + useWSEvent( + "task:completed", + useCallback((payload: unknown) => { + const p = payload as TaskCompletedPayload; + if (p.issue_id !== issueId) return; + setActiveTask(null); + setMessages([]); + setToolCalls([]); + setCurrentText(""); + }, [issueId]), + ); + + useWSEvent( + "task:failed", + useCallback((payload: unknown) => { + const p = payload as TaskFailedPayload; + if (p.issue_id !== issueId) return; + setActiveTask(null); + setMessages([]); + setToolCalls([]); + setCurrentText(""); + }, [issueId]), + ); + + // Also pick up new tasks starting (task:dispatch) + useWSEvent( + "task:dispatch", + useCallback((payload: unknown) => { + const p = payload as { task_id: string; issue_id?: string }; + // We don't have issue_id in dispatch payload, re-fetch + api.getActiveTaskForIssue(issueId).then(({ task }) => { + if (task) { + setActiveTask(task); + setMessages([]); + setToolCalls([]); + setCurrentText(""); + } + }).catch(() => {}); + }, [issueId]), + ); + + // Update elapsed time + useEffect(() => { + if (!activeTask?.started_at && !activeTask?.dispatched_at) return; + const ref = activeTask.started_at ?? activeTask.dispatched_at!; + setElapsed(formatElapsed(ref)); + const interval = setInterval(() => { + setElapsed(formatElapsed(ref)); + }, 1000); + return () => clearInterval(interval); + }, [activeTask?.started_at, activeTask?.dispatched_at]); + + // Auto-scroll + useEffect(() => { + if (autoScroll && scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [toolCalls, currentText, autoScroll]); + + const handleScroll = useCallback(() => { + if (!scrollRef.current) return; + const { scrollTop, scrollHeight, clientHeight } = scrollRef.current; + setAutoScroll(scrollHeight - scrollTop - clientHeight < 40); + }, []); + + if (!activeTask) return null; + + const lastTextLines = currentText.trim().split("\n").filter(Boolean); + const lastLine = lastTextLines[lastTextLines.length - 1] ?? ""; + + return ( +
+ {/* Header */} +
+
+ +
+
+ + {agentName ?? "Agent"} is working +
+ {elapsed} + {toolCalls.length > 0 && ( + + {toolCalls.length} tool {toolCalls.length === 1 ? "call" : "calls"} + + )} +
+ + {/* Content */} + {(toolCalls.length > 0 || currentText) && ( +
+
+ {toolCalls.map((tc, idx) => ( + + ))} + + {/* Current thinking text (last line only) */} + {lastLine && ( +
+ + {lastLine} +
+ )} +
+ + {/* Scroll to bottom button */} + {!autoScroll && ( + + )} +
+ )} +
+ ); +} + +function ToolCallRow({ entry }: { entry: ToolCallEntry }) { + const [open, setOpen] = useState(false); + + // Extract a short summary from tool input + const summary = getToolSummary(entry); + const hasDetails = entry.output || (entry.input && Object.keys(entry.input).length > 0); + + return ( + + + + + {entry.tool} + {summary && {summary}} + {entry.output !== undefined && ( + + )} + {entry.output === undefined && ( + + )} + + + {entry.output && ( +
+            {entry.output.length > 2000 ? entry.output.slice(0, 2000) + "\n..." : entry.output}
+          
+ )} +
+
+ ); +} + +function getToolSummary(entry: ToolCallEntry): string { + if (!entry.input) return ""; + const { file_path, path, pattern, command, description } = entry.input as Record; + + // Shorten file paths + if (file_path) return shortenPath(file_path); + if (path) return shortenPath(path); + if (pattern) return pattern; + if (description) return description; + if (command) { + const cmd = String(command); + return cmd.length > 80 ? cmd.slice(0, 80) + "..." : cmd; + } + return ""; +} + +function shortenPath(p: string): string { + const parts = p.split("/"); + if (parts.length <= 3) return p; + return ".../" + parts.slice(-2).join("/"); +} diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx index 987d4f77..de516bc4 100644 --- a/apps/web/features/issues/components/issue-detail.tsx +++ b/apps/web/features/issues/components/issue-detail.tsx @@ -60,6 +60,7 @@ import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/ import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components"; import { CommentCard } from "./comment-card"; import { CommentInput } from "./comment-input"; +import { AgentLiveCard } from "./agent-live-card"; import { api } from "@/shared/api"; import { useAuthStore } from "@/features/auth"; import { useWorkspaceStore, useActorName } from "@/features/workspace"; @@ -856,6 +857,16 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo + {/* Agent live output */} +
+ +
+ {/* Timeline entries */}
{(() => { diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 5516621b..89e2f107 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -32,6 +32,7 @@ import type { RuntimeHourlyActivity, RuntimePing, TimelineEntry, + TaskMessagePayload, } from "@/shared/types"; import { type Logger, noopLogger } from "@/shared/logger"; @@ -309,6 +310,14 @@ export class ApiClient { return this.fetch(`/api/agents/${agentId}/tasks`); } + async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> { + return this.fetch(`/api/issues/${issueId}/active-task`); + } + + async listTaskMessages(taskId: string): Promise { + return this.fetch(`/api/daemon/tasks/${taskId}/messages`); + } + async getDaemonPairingSession(token: string): Promise { return this.fetch(`/api/daemon/pairing-sessions/${token}`); } diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts index 15991d07..d5eab3bf 100644 --- a/apps/web/shared/types/events.ts +++ b/apps/web/shared/types/events.ts @@ -20,6 +20,7 @@ export type WSEventType = | "task:progress" | "task:completed" | "task:failed" + | "task:message" | "inbox:new" | "inbox:read" | "inbox:archived" @@ -147,3 +148,28 @@ export interface ActivityCreatedPayload { issue_id: string; entry: TimelineEntry; } + +export interface TaskMessagePayload { + task_id: string; + issue_id: string; + seq: number; + type: "text" | "tool_use" | "tool_result" | "error"; + tool?: string; + content?: string; + input?: Record; + output?: string; +} + +export interface TaskCompletedPayload { + task_id: string; + agent_id: string; + issue_id: string; + status: string; +} + +export interface TaskFailedPayload { + task_id: string; + agent_id: string; + issue_id: string; + status: string; +} diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 8797a35b..ffa0c714 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -99,6 +99,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress) r.Post("/tasks/{taskId}/complete", h.CompleteTask) r.Post("/tasks/{taskId}/fail", h.FailTask) + r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages) + r.Get("/tasks/{taskId}/messages", h.ListTaskMessages) }) // Protected API routes @@ -164,6 +166,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Get("/subscribers", h.ListIssueSubscribers) r.Post("/subscribe", h.SubscribeToIssue) r.Post("/unsubscribe", h.UnsubscribeFromIssue) + r.Get("/active-task", h.GetActiveTaskForIssue) }) }) diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go index 1c34afb2..c3bdae90 100644 --- a/server/internal/daemon/client.go +++ b/server/internal/daemon/client.go @@ -83,6 +83,22 @@ func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, ste }, nil) } +// TaskMessageData represents a single agent execution message for batch reporting. +type TaskMessageData struct { + Seq int `json:"seq"` + Type string `json:"type"` + Tool string `json:"tool,omitempty"` + Content string `json:"content,omitempty"` + Input map[string]any `json:"input,omitempty"` + Output string `json:"output,omitempty"` +} + +func (c *Client) ReportTaskMessages(ctx context.Context, taskID string, messages []TaskMessageData) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/messages", taskID), map[string]any{ + "messages": messages, + }, nil) +} + func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error { body := map[string]any{"output": output} if branchName != "" { diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 6ee0e0f6..158370a4 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -821,22 +821,106 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo return TaskResult{}, err } - // Drain message channel — log tool uses and agent text for visibility. + // Drain message channel — forward to server for live output + log locally. var toolCount atomic.Int32 go func() { + var seq atomic.Int32 + var mu sync.Mutex + var pendingText strings.Builder + var batch []TaskMessageData + + flush := func() { + mu.Lock() + // Flush any accumulated text as a single message. + if pendingText.Len() > 0 { + s := seq.Add(1) + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "text", + Content: pendingText.String(), + }) + pendingText.Reset() + } + toSend := batch + batch = nil + mu.Unlock() + + if len(toSend) > 0 { + sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := d.client.ReportTaskMessages(sendCtx, task.ID, toSend); err != nil { + taskLog.Debug("failed to report task messages", "error", err) + } + cancel() + } + } + + // Periodically flush accumulated text messages. + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + done := make(chan struct{}) + go func() { + for { + select { + case <-ticker.C: + flush() + case <-done: + return + } + } + }() + for msg := range session.Messages { switch msg.Type { case agent.MessageToolUse: n := toolCount.Add(1) taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool)) + s := seq.Add(1) + mu.Lock() + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "tool_use", + Tool: msg.Tool, + Input: msg.Input, + }) + mu.Unlock() + case agent.MessageToolResult: + s := seq.Add(1) + // Truncate large tool results for the live feed. + output := msg.Output + if len(output) > 8192 { + output = output[:8192] + } + mu.Lock() + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "tool_result", + Tool: msg.Tool, + Output: output, + }) + mu.Unlock() case agent.MessageText: if msg.Content != "" { taskLog.Debug("agent", "text", truncateLog(msg.Content, 200)) + mu.Lock() + pendingText.WriteString(msg.Content) + mu.Unlock() } case agent.MessageError: taskLog.Error("agent error", "content", msg.Content) + s := seq.Add(1) + mu.Lock() + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "error", + Content: msg.Content, + }) + mu.Unlock() } } + + close(done) + flush() // Final flush after channel closes. }() result := <-session.Result diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index 30eb5d32..fd209ec9 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" ) @@ -385,3 +386,120 @@ func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) { slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error) writeJSON(w, http.StatusOK, taskToResponse(*task)) } + +// --------------------------------------------------------------------------- +// Task Messages (live agent output) +// --------------------------------------------------------------------------- + +type TaskMessageRequest struct { + Seq int `json:"seq"` + Type string `json:"type"` + Tool string `json:"tool,omitempty"` + Content string `json:"content,omitempty"` + Input map[string]any `json:"input,omitempty"` + Output string `json:"output,omitempty"` +} + +type TaskMessageBatchRequest struct { + Messages []TaskMessageRequest `json:"messages"` +} + +// ReportTaskMessages receives a batch of agent execution messages from the daemon. +func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) { + taskID := chi.URLParam(r, "taskId") + + var req TaskMessageBatchRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + if len(req.Messages) == 0 { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + return + } + + task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) + if err != nil { + writeError(w, http.StatusNotFound, "task not found") + return + } + + workspaceID := "" + if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { + workspaceID = uuidToString(issue.WorkspaceID) + } + + for _, msg := range req.Messages { + var inputJSON []byte + if msg.Input != nil { + inputJSON, _ = json.Marshal(msg.Input) + } + h.Queries.CreateTaskMessage(r.Context(), db.CreateTaskMessageParams{ + TaskID: parseUUID(taskID), + Seq: int32(msg.Seq), + Type: msg.Type, + Tool: pgtype.Text{String: msg.Tool, Valid: msg.Tool != ""}, + Content: pgtype.Text{String: msg.Content, Valid: msg.Content != ""}, + Input: inputJSON, + Output: pgtype.Text{String: msg.Output, Valid: msg.Output != ""}, + }) + + if workspaceID != "" { + h.publish(protocol.EventTaskMessage, workspaceID, "system", "", protocol.TaskMessagePayload{ + TaskID: taskID, + IssueID: uuidToString(task.IssueID), + Seq: msg.Seq, + Type: msg.Type, + Tool: msg.Tool, + Content: msg.Content, + Input: msg.Input, + Output: msg.Output, + }) + } + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// ListTaskMessages returns the persisted messages for a task (for catch-up after reconnect). +func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) { + taskID := chi.URLParam(r, "taskId") + + messages, err := h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID)) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list task messages") + return + } + + resp := make([]protocol.TaskMessagePayload, len(messages)) + for i, m := range messages { + var input map[string]any + if m.Input != nil { + json.Unmarshal(m.Input, &input) + } + resp[i] = protocol.TaskMessagePayload{ + TaskID: taskID, + Seq: int(m.Seq), + Type: m.Type, + Tool: m.Tool.String, + Content: m.Content.String, + Input: input, + Output: m.Output.String, + } + } + + writeJSON(w, http.StatusOK, resp) +} + +// GetActiveTaskForIssue returns the currently running task for an issue, if any. +func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + + tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID)) + if err != nil || len(tasks) == 0 { + writeJSON(w, http.StatusOK, map[string]any{"task": nil}) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) +} diff --git a/server/migrations/026_task_messages.down.sql b/server/migrations/026_task_messages.down.sql new file mode 100644 index 00000000..a4161d48 --- /dev/null +++ b/server/migrations/026_task_messages.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS task_message; diff --git a/server/migrations/026_task_messages.up.sql b/server/migrations/026_task_messages.up.sql new file mode 100644 index 00000000..db0163d6 --- /dev/null +++ b/server/migrations/026_task_messages.up.sql @@ -0,0 +1,13 @@ +CREATE TABLE task_message ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES agent_task_queue(id) ON DELETE CASCADE, + seq INTEGER NOT NULL, + type TEXT NOT NULL, + tool TEXT, + content TEXT, + input JSONB, + output TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_task_message_task_id_seq ON task_message(task_id, seq); diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 569718dc..3657c088 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -451,6 +451,48 @@ func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUI return has_pending, err } +const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue +WHERE issue_id = $1 AND status IN ('dispatched', 'running') +ORDER BY created_at DESC +` + +func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listAgentTasks = `-- name: ListAgentTasks :many SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue WHERE agent_id = $1 diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index c91bfe1d..2b27c9d3 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -241,6 +241,18 @@ type SkillFile struct { UpdatedAt pgtype.Timestamptz `json:"updated_at"` } +type TaskMessage struct { + ID pgtype.UUID `json:"id"` + TaskID pgtype.UUID `json:"task_id"` + Seq int32 `json:"seq"` + Type string `json:"type"` + Tool pgtype.Text `json:"tool"` + Content pgtype.Text `json:"content"` + Input []byte `json:"input"` + Output pgtype.Text `json:"output"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + type User struct { ID pgtype.UUID `json:"id"` Name string `json:"name"` diff --git a/server/pkg/db/generated/task_message.sql.go b/server/pkg/db/generated/task_message.sql.go new file mode 100644 index 00000000..c1d09327 --- /dev/null +++ b/server/pkg/db/generated/task_message.sql.go @@ -0,0 +1,140 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: task_message.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const createTaskMessage = `-- name: CreateTaskMessage :one +INSERT INTO task_message (task_id, seq, type, tool, content, input, output) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING id, task_id, seq, type, tool, content, input, output, created_at +` + +type CreateTaskMessageParams struct { + TaskID pgtype.UUID `json:"task_id"` + Seq int32 `json:"seq"` + Type string `json:"type"` + Tool pgtype.Text `json:"tool"` + Content pgtype.Text `json:"content"` + Input []byte `json:"input"` + Output pgtype.Text `json:"output"` +} + +func (q *Queries) CreateTaskMessage(ctx context.Context, arg CreateTaskMessageParams) (TaskMessage, error) { + row := q.db.QueryRow(ctx, createTaskMessage, + arg.TaskID, + arg.Seq, + arg.Type, + arg.Tool, + arg.Content, + arg.Input, + arg.Output, + ) + var i TaskMessage + err := row.Scan( + &i.ID, + &i.TaskID, + &i.Seq, + &i.Type, + &i.Tool, + &i.Content, + &i.Input, + &i.Output, + &i.CreatedAt, + ) + return i, err +} + +const deleteTaskMessages = `-- name: DeleteTaskMessages :exec +DELETE FROM task_message +WHERE task_id = $1 +` + +func (q *Queries) DeleteTaskMessages(ctx context.Context, taskID pgtype.UUID) error { + _, err := q.db.Exec(ctx, deleteTaskMessages, taskID) + return err +} + +const listTaskMessages = `-- name: ListTaskMessages :many +SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message +WHERE task_id = $1 +ORDER BY seq ASC +` + +func (q *Queries) ListTaskMessages(ctx context.Context, taskID pgtype.UUID) ([]TaskMessage, error) { + rows, err := q.db.Query(ctx, listTaskMessages, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []TaskMessage{} + for rows.Next() { + var i TaskMessage + if err := rows.Scan( + &i.ID, + &i.TaskID, + &i.Seq, + &i.Type, + &i.Tool, + &i.Content, + &i.Input, + &i.Output, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listTaskMessagesSince = `-- name: ListTaskMessagesSince :many +SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message +WHERE task_id = $1 AND seq > $2 +ORDER BY seq ASC +` + +type ListTaskMessagesSinceParams struct { + TaskID pgtype.UUID `json:"task_id"` + Seq int32 `json:"seq"` +} + +func (q *Queries) ListTaskMessagesSince(ctx context.Context, arg ListTaskMessagesSinceParams) ([]TaskMessage, error) { + rows, err := q.db.Query(ctx, listTaskMessagesSince, arg.TaskID, arg.Seq) + if err != nil { + return nil, err + } + defer rows.Close() + items := []TaskMessage{} + for rows.Next() { + var i TaskMessage + if err := rows.Scan( + &i.ID, + &i.TaskID, + &i.Seq, + &i.Type, + &i.Tool, + &i.Content, + &i.Input, + &i.Output, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 27816de2..dce917ba 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -129,6 +129,11 @@ SELECT * FROM agent_task_queue WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC; +-- name: ListActiveTasksByIssue :many +SELECT * FROM agent_task_queue +WHERE issue_id = $1 AND status IN ('dispatched', 'running') +ORDER BY created_at DESC; + -- name: UpdateAgentStatus :one UPDATE agent SET status = $2, updated_at = now() WHERE id = $1 diff --git a/server/pkg/db/queries/task_message.sql b/server/pkg/db/queries/task_message.sql new file mode 100644 index 00000000..3bbd3ee4 --- /dev/null +++ b/server/pkg/db/queries/task_message.sql @@ -0,0 +1,18 @@ +-- name: CreateTaskMessage :one +INSERT INTO task_message (task_id, seq, type, tool, content, input, output) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING *; + +-- name: ListTaskMessages :many +SELECT * FROM task_message +WHERE task_id = $1 +ORDER BY seq ASC; + +-- name: ListTaskMessagesSince :many +SELECT * FROM task_message +WHERE task_id = $1 AND seq > $2 +ORDER BY seq ASC; + +-- name: DeleteTaskMessages :exec +DELETE FROM task_message +WHERE task_id = $1; diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index 3edd5bdb..25a5f602 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -22,6 +22,7 @@ const ( EventTaskProgress = "task:progress" EventTaskCompleted = "task:completed" EventTaskFailed = "task:failed" + EventTaskMessage = "task:message" // Inbox events EventInboxNew = "inbox:new" diff --git a/server/pkg/protocol/messages.go b/server/pkg/protocol/messages.go index ce79ffa6..3733a397 100644 --- a/server/pkg/protocol/messages.go +++ b/server/pkg/protocol/messages.go @@ -31,6 +31,18 @@ type TaskCompletedPayload struct { Output string `json:"output,omitempty"` } +// TaskMessagePayload represents a single agent execution message (tool call, text, etc.) +type TaskMessagePayload struct { + TaskID string `json:"task_id"` + IssueID string `json:"issue_id,omitempty"` + Seq int `json:"seq"` + Type string `json:"type"` // "text", "tool_use", "tool_result", "error" + Tool string `json:"tool,omitempty"` // tool name for tool_use/tool_result + Content string `json:"content,omitempty"` // text content + Input map[string]any `json:"input,omitempty"` // tool input (tool_use only) + Output string `json:"output,omitempty"` // tool output (tool_result only) +} + // DaemonRegisterPayload is sent from daemon to server on connection. type DaemonRegisterPayload struct { DaemonID string `json:"daemon_id"` From 1e2052c689b170ed91e6b9b6849798dd81d210a1 Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Mon, 30 Mar 2026 23:10:54 +0800 Subject: [PATCH 2/4] feat(agent): improve live output UI and add execution history MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix duplicate icons in tool call rows (use chevron only for expand/collapse) - Show detailed tool information (WebSearch queries, Agent prompts, Skill names) - Add thinking/reasoning rows with Brain icon and expandable content - Show tool results as separate chronological entries with previews - Add TaskRunHistory component for viewing past agent execution logs - Add listTasksByIssue API endpoint and task-runs route - Support thinking content blocks in agent SDK (MessageThinking type) - Improve callID→toolName mapping in daemon message forwarding --- .../issues/components/agent-live-card.tsx | 526 +++++++++++------- .../issues/components/issue-detail.tsx | 7 +- apps/web/shared/api/client.ts | 4 + apps/web/shared/types/events.ts | 2 +- server/cmd/server/router.go | 1 + server/internal/daemon/daemon.go | 41 +- server/internal/handler/daemon.go | 18 + server/pkg/agent/agent.go | 1 + server/pkg/agent/claude.go | 4 + server/pkg/db/generated/agent.sql.go | 42 ++ server/pkg/db/queries/agent.sql | 5 + 11 files changed, 456 insertions(+), 195 deletions(-) diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx index 71424fa8..f47fa34d 100644 --- a/apps/web/features/issues/components/agent-live-card.tsx +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -1,7 +1,7 @@ "use client"; import { useState, useEffect, useCallback, useRef } from "react"; -import { Bot, ChevronRight, Loader2, Terminal, FileText, AlertCircle, ArrowDown } from "lucide-react"; +import { Bot, ChevronRight, Loader2, ArrowDown, Brain, AlertCircle, Clock, CheckCircle2, XCircle } from "lucide-react"; import { api } from "@/shared/api"; import { useWSEvent } from "@/features/realtime"; import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from "@/shared/types/events"; @@ -9,23 +9,16 @@ import type { AgentTask } from "@/shared/types/agent"; import { cn } from "@/lib/utils"; import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible"; -interface AgentLiveCardProps { - issueId: string; - assigneeType: string | null; - assigneeId: string | null; - agentName?: string; -} +// ─── Shared types & helpers ───────────────────────────────────────────────── -// Icons for common tool names -function ToolIcon({ tool }: { tool: string }) { - const name = tool.toLowerCase(); - if (name.includes("bash") || name.includes("shell") || name.includes("terminal")) { - return ; - } - if (name.includes("read") || name.includes("write") || name.includes("edit") || name.includes("glob") || name.includes("grep")) { - return ; - } - return ; +/** A unified timeline entry: tool calls, thinking, text, and errors in chronological order. */ +interface TimelineItem { + seq: number; + type: "tool_use" | "tool_result" | "thinking" | "text" | "error"; + tool?: string; + content?: string; + input?: Record; + output?: string; } function formatElapsed(startedAt: string): string { @@ -37,22 +30,83 @@ function formatElapsed(startedAt: string): string { return `${minutes}m ${secs}s`; } -interface ToolCallEntry { - seq: number; - tool: string; - input?: Record; - output?: string; +function formatDuration(start: string, end: string): string { + const ms = new Date(end).getTime() - new Date(start).getTime(); + const seconds = Math.floor(ms / 1000); + if (seconds < 60) return `${seconds}s`; + const minutes = Math.floor(seconds / 60); + const secs = seconds % 60; + return `${minutes}m ${secs}s`; +} + +function shortenPath(p: string): string { + const parts = p.split("/"); + if (parts.length <= 3) return p; + return ".../" + parts.slice(-2).join("/"); +} + +function getToolSummary(item: TimelineItem): string { + if (!item.input) return ""; + const inp = item.input as Record; + + // WebSearch / web search + if (inp.query) return inp.query; + // File operations + if (inp.file_path) return shortenPath(inp.file_path); + if (inp.path) return shortenPath(inp.path); + if (inp.pattern) return inp.pattern; + // Bash + if (inp.description) return String(inp.description); + if (inp.command) { + const cmd = String(inp.command); + return cmd.length > 100 ? cmd.slice(0, 100) + "..." : cmd; + } + // Agent + if (inp.prompt) { + const p = String(inp.prompt); + return p.length > 100 ? p.slice(0, 100) + "..." : p; + } + // Skill + if (inp.skill) return String(inp.skill); + // Fallback: show first string value + for (const v of Object.values(inp)) { + if (typeof v === "string" && v.length > 0 && v.length < 120) return v; + } + return ""; +} + +/** Build a chronologically ordered timeline from raw messages. */ +function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] { + const items: TimelineItem[] = []; + for (const msg of msgs) { + items.push({ + seq: msg.seq, + type: msg.type, + tool: msg.tool, + content: msg.content, + input: msg.input, + output: msg.output, + }); + } + return items.sort((a, b) => a.seq - b.seq); +} + +// ─── AgentLiveCard (real-time view) ──────────────────────────────────────── + +interface AgentLiveCardProps { + issueId: string; + assigneeType: string | null; + assigneeId: string | null; + agentName?: string; } export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: AgentLiveCardProps) { const [activeTask, setActiveTask] = useState(null); - const [messages, setMessages] = useState([]); - const [toolCalls, setToolCalls] = useState([]); - const [currentText, setCurrentText] = useState(""); + const [items, setItems] = useState([]); const [elapsed, setElapsed] = useState(""); const [autoScroll, setAutoScroll] = useState(true); const scrollRef = useRef(null); - const contentRef = useRef(null); + const seenSeqs = useRef(new Set()); // Check for active task on mount useEffect(() => { @@ -65,11 +119,12 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: api.getActiveTaskForIssue(issueId).then(({ task }) => { if (!cancelled) { setActiveTask(task); - // If there's an active task, fetch existing messages for catch-up if (task) { api.listTaskMessages(task.id).then((msgs) => { if (!cancelled) { - applyMessages(msgs); + const timeline = buildTimeline(msgs); + setItems(timeline); + for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`); } }).catch(() => {}); } @@ -79,92 +134,41 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: return () => { cancelled = true; }; }, [issueId, assigneeType, assigneeId]); - // Process messages into tool calls and text - const applyMessages = useCallback((msgs: TaskMessagePayload[]) => { - const newToolCalls: ToolCallEntry[] = []; - let text = ""; - - for (const msg of msgs) { - switch (msg.type) { - case "tool_use": - newToolCalls.push({ seq: msg.seq, tool: msg.tool ?? "", input: msg.input }); - break; - case "tool_result": - // Attach output to matching tool call - for (let i = newToolCalls.length - 1; i >= 0; i--) { - const tc = newToolCalls[i]; - if (tc && tc.tool === msg.tool && !tc.output) { - tc.output = msg.output; - break; - } - } - break; - case "text": - text += msg.content ?? ""; - break; - case "error": - text += `\n[Error] ${msg.content ?? ""}\n`; - break; - } - } - - setToolCalls(newToolCalls); - setCurrentText(text); - setMessages(msgs); - }, []); - // Handle real-time task messages useWSEvent( "task:message", useCallback((payload: unknown) => { const msg = payload as TaskMessagePayload; if (msg.issue_id !== issueId) return; + const key = `${msg.task_id}:${msg.seq}`; + if (seenSeqs.current.has(key)) return; + seenSeqs.current.add(key); - setMessages((prev) => { - if (prev.some((m) => m.seq === msg.seq && m.task_id === msg.task_id)) return prev; - return [...prev, msg]; + setItems((prev) => { + const item: TimelineItem = { + seq: msg.seq, + type: msg.type, + tool: msg.tool, + content: msg.content, + input: msg.input, + output: msg.output, + }; + const next = [...prev, item]; + next.sort((a, b) => a.seq - b.seq); + return next; }); - - switch (msg.type) { - case "tool_use": - setToolCalls((prev) => [ - ...prev, - { seq: msg.seq, tool: msg.tool ?? "", input: msg.input }, - ]); - break; - case "tool_result": - setToolCalls((prev) => { - const updated = [...prev]; - for (let i = updated.length - 1; i >= 0; i--) { - const tc = updated[i]; - if (tc && tc.tool === msg.tool && !tc.output) { - updated[i] = { ...tc, output: msg.output }; - break; - } - } - return updated; - }); - break; - case "text": - setCurrentText((prev) => prev + (msg.content ?? "")); - break; - case "error": - setCurrentText((prev) => prev + `\n[Error] ${msg.content ?? ""}\n`); - break; - } }, [issueId]), ); - // Handle task completion - hide the live card + // Handle task completion/failure useWSEvent( "task:completed", useCallback((payload: unknown) => { const p = payload as TaskCompletedPayload; if (p.issue_id !== issueId) return; setActiveTask(null); - setMessages([]); - setToolCalls([]); - setCurrentText(""); + setItems([]); + seenSeqs.current.clear(); }, [issueId]), ); @@ -174,37 +178,31 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: const p = payload as TaskFailedPayload; if (p.issue_id !== issueId) return; setActiveTask(null); - setMessages([]); - setToolCalls([]); - setCurrentText(""); + setItems([]); + seenSeqs.current.clear(); }, [issueId]), ); - // Also pick up new tasks starting (task:dispatch) + // Pick up new tasks useWSEvent( "task:dispatch", - useCallback((payload: unknown) => { - const p = payload as { task_id: string; issue_id?: string }; - // We don't have issue_id in dispatch payload, re-fetch + useCallback(() => { api.getActiveTaskForIssue(issueId).then(({ task }) => { if (task) { setActiveTask(task); - setMessages([]); - setToolCalls([]); - setCurrentText(""); + setItems([]); + seenSeqs.current.clear(); } }).catch(() => {}); }, [issueId]), ); - // Update elapsed time + // Elapsed time useEffect(() => { if (!activeTask?.started_at && !activeTask?.dispatched_at) return; const ref = activeTask.started_at ?? activeTask.dispatched_at!; setElapsed(formatElapsed(ref)); - const interval = setInterval(() => { - setElapsed(formatElapsed(ref)); - }, 1000); + const interval = setInterval(() => setElapsed(formatElapsed(ref)), 1000); return () => clearInterval(interval); }, [activeTask?.started_at, activeTask?.dispatched_at]); @@ -213,7 +211,7 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: if (autoScroll && scrollRef.current) { scrollRef.current.scrollTop = scrollRef.current.scrollHeight; } - }, [toolCalls, currentText, autoScroll]); + }, [items, autoScroll]); const handleScroll = useCallback(() => { if (!scrollRef.current) return; @@ -223,50 +221,38 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: if (!activeTask) return null; - const lastTextLines = currentText.trim().split("\n").filter(Boolean); - const lastLine = lastTextLines[lastTextLines.length - 1] ?? ""; + const toolCount = items.filter((i) => i.type === "tool_use").length; return (
{/* Header */}
-
+
-
- - {agentName ?? "Agent"} is working +
+ + {agentName ?? "Agent"} is working
- {elapsed} - {toolCalls.length > 0 && ( - - {toolCalls.length} tool {toolCalls.length === 1 ? "call" : "calls"} + {elapsed} + {toolCount > 0 && ( + + {toolCount} tool {toolCount === 1 ? "call" : "calls"} )}
- {/* Content */} - {(toolCalls.length > 0 || currentText) && ( + {/* Timeline content */} + {items.length > 0 && (
-
- {toolCalls.map((tc, idx) => ( - - ))} + {items.map((item, idx) => ( + + ))} - {/* Current thinking text (last line only) */} - {lastLine && ( -
- - {lastLine} -
- )} -
- - {/* Scroll to bottom button */} {!autoScroll && (
+ {/* Agent execution history */} +
+ +
+ {/* Timeline entries */}
{(() => { diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 89e2f107..8483a1ad 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -318,6 +318,10 @@ export class ApiClient { return this.fetch(`/api/daemon/tasks/${taskId}/messages`); } + async listTasksByIssue(issueId: string): Promise { + return this.fetch(`/api/issues/${issueId}/task-runs`); + } + async getDaemonPairingSession(token: string): Promise { return this.fetch(`/api/daemon/pairing-sessions/${token}`); } diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts index d5eab3bf..4e22b5e2 100644 --- a/apps/web/shared/types/events.ts +++ b/apps/web/shared/types/events.ts @@ -153,7 +153,7 @@ export interface TaskMessagePayload { task_id: string; issue_id: string; seq: number; - type: "text" | "tool_use" | "tool_result" | "error"; + type: "text" | "thinking" | "tool_use" | "tool_result" | "error"; tool?: string; content?: string; input?: Record; diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index ffa0c714..be9e7ff4 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -167,6 +167,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Post("/subscribe", h.SubscribeToIssue) r.Post("/unsubscribe", h.UnsubscribeFromIssue) r.Get("/active-task", h.GetActiveTaskForIssue) + r.Get("/task-runs", h.ListTasksByIssue) }) }) diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 158370a4..d9b66c58 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -827,10 +827,22 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo var seq atomic.Int32 var mu sync.Mutex var pendingText strings.Builder + var pendingThinking strings.Builder var batch []TaskMessageData + callIDToTool := map[string]string{} // track callID → tool name for tool_result flush := func() { mu.Lock() + // Flush any accumulated thinking as a single message. + if pendingThinking.Len() > 0 { + s := seq.Add(1) + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "thinking", + Content: pendingThinking.String(), + }) + pendingThinking.Reset() + } // Flush any accumulated text as a single message. if pendingText.Len() > 0 { s := seq.Add(1) @@ -854,7 +866,7 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo } } - // Periodically flush accumulated text messages. + // Periodically flush accumulated text/thinking messages. ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() @@ -875,30 +887,47 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo case agent.MessageToolUse: n := toolCount.Add(1) taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool)) + if msg.CallID != "" { + mu.Lock() + callIDToTool[msg.CallID] = msg.Tool + mu.Unlock() + } s := seq.Add(1) mu.Lock() batch = append(batch, TaskMessageData{ - Seq: int(s), - Type: "tool_use", - Tool: msg.Tool, + Seq: int(s), + Type: "tool_use", + Tool: msg.Tool, Input: msg.Input, }) mu.Unlock() case agent.MessageToolResult: s := seq.Add(1) - // Truncate large tool results for the live feed. output := msg.Output if len(output) > 8192 { output = output[:8192] } + // Resolve tool name from callID if not set directly. + toolName := msg.Tool + if toolName == "" && msg.CallID != "" { + mu.Lock() + toolName = callIDToTool[msg.CallID] + mu.Unlock() + } mu.Lock() batch = append(batch, TaskMessageData{ Seq: int(s), Type: "tool_result", - Tool: msg.Tool, + Tool: toolName, Output: output, }) mu.Unlock() + case agent.MessageThinking: + if msg.Content != "" { + mu.Lock() + pendingThinking.WriteString(msg.Content) + mu.Unlock() + } case agent.MessageText: if msg.Content != "" { taskLog.Debug("agent", "text", truncateLog(msg.Content, 200)) diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index fd209ec9..b5bf102c 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -503,3 +503,21 @@ func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) } + +// ListTasksByIssue returns all tasks (any status) for an issue — used for execution history. +func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + + tasks, err := h.Queries.ListTasksByIssue(r.Context(), parseUUID(issueID)) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list tasks") + return + } + + resp := make([]AgentTaskResponse, len(tasks)) + for i, t := range tasks { + resp[i] = taskToResponse(t) + } + + writeJSON(w, http.StatusOK, resp) +} diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go index d19887ed..d80a2641 100644 --- a/server/pkg/agent/agent.go +++ b/server/pkg/agent/agent.go @@ -42,6 +42,7 @@ type MessageType string const ( MessageText MessageType = "text" + MessageThinking MessageType = "thinking" MessageToolUse MessageType = "tool-use" MessageToolResult MessageType = "tool-result" MessageStatus MessageType = "status" diff --git a/server/pkg/agent/claude.go b/server/pkg/agent/claude.go index 610df39c..c1b78406 100644 --- a/server/pkg/agent/claude.go +++ b/server/pkg/agent/claude.go @@ -181,6 +181,10 @@ func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message, output.WriteString(block.Text) trySend(ch, Message{Type: MessageText, Content: block.Text}) } + case "thinking": + if block.Text != "" { + trySend(ch, Message{Type: MessageThinking, Content: block.Text}) + } case "tool_use": var input map[string]any if block.Input != nil { diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 3657c088..1de2793f 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -621,6 +621,48 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp return items, nil } +const listTasksByIssue = `-- name: ListTasksByIssue :many +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue +WHERE issue_id = $1 +ORDER BY created_at DESC +` + +func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, listTasksByIssue, issueID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const startAgentTask = `-- name: StartAgentTask :one UPDATE agent_task_queue SET status = 'running', started_at = now() diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index dce917ba..4540a7e9 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -134,6 +134,11 @@ SELECT * FROM agent_task_queue WHERE issue_id = $1 AND status IN ('dispatched', 'running') ORDER BY created_at DESC; +-- name: ListTasksByIssue :many +SELECT * FROM agent_task_queue +WHERE issue_id = $1 +ORDER BY created_at DESC; + -- name: UpdateAgentStatus :one UPDATE agent SET status = $2, updated_at = now() WHERE id = $1 From 83f325c58667498cc10db0d6d6688048d4145d90 Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Mon, 30 Mar 2026 23:27:09 +0800 Subject: [PATCH 3/4] fix(ui): apply mono font directly to code/pre elements Browser UA stylesheet sets font-family: monospace on and
,
overriding the inherited Geist Mono from parent containers. Apply
font-mono explicitly on these elements so they use the project's
monospace font instead of the browser default.
---
 apps/web/components/common/rich-text-editor.css |  1 +
 apps/web/components/markdown/CodeBlock.tsx      | 10 +++++-----
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/apps/web/components/common/rich-text-editor.css b/apps/web/components/common/rich-text-editor.css
index 599a9555..47c3c416 100644
--- a/apps/web/components/common/rich-text-editor.css
+++ b/apps/web/components/common/rich-text-editor.css
@@ -94,6 +94,7 @@
 
 /* Code blocks */
 .rich-text-editor pre {
+  font-family: var(--font-mono, ui-monospace, monospace);
   background: var(--muted);
   border-radius: var(--radius);
   padding: 0.75rem 1rem;
diff --git a/apps/web/components/markdown/CodeBlock.tsx b/apps/web/components/markdown/CodeBlock.tsx
index 6bdd4d2c..ef69afb4 100644
--- a/apps/web/components/markdown/CodeBlock.tsx
+++ b/apps/web/components/markdown/CodeBlock.tsx
@@ -141,7 +141,7 @@ export function CodeBlock({
   if (mode === 'terminal') {
     return (
       
-        {code}
+        {code}
       
) } @@ -151,7 +151,7 @@ export function CodeBlock({ if (isLoading || !highlighted) { return (
-          {code}
+          {code}
         
) } @@ -159,7 +159,7 @@ export function CodeBlock({ return (
{isLoading || !highlighted ? (
-            {code}
+            {code}
           
) : (
)} From a00485cf135a1c08f33fc7539657b6044d916819 Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Mon, 30 Mar 2026 23:38:49 +0800 Subject: [PATCH 4/4] feat(security): redact sensitive information in agent live output Server-side (primary): Apply redact.Text/InputMap on task message content, output, and input fields before DB persistence and WebSocket broadcast. Extended redact package with GitLab tokens, JWTs, connection strings, and PASSWORD/SECRET/TOKEN env var patterns. Frontend (fallback): redactSecrets utility mirrors server patterns, applied in buildTimeline and ToolCallRow render as a safety net. --- .../issues/components/agent-live-card.tsx | 7 +- apps/web/features/issues/utils/redact.test.ts | 88 +++++++++++++++++++ apps/web/features/issues/utils/redact.ts | 37 ++++++++ server/internal/handler/daemon.go | 6 ++ server/pkg/redact/redact.go | 28 +++++- server/pkg/redact/redact_test.go | 77 ++++++++++++++++ 6 files changed, 239 insertions(+), 4 deletions(-) create mode 100644 apps/web/features/issues/utils/redact.test.ts create mode 100644 apps/web/features/issues/utils/redact.ts diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx index f47fa34d..6d1de879 100644 --- a/apps/web/features/issues/components/agent-live-card.tsx +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -8,6 +8,7 @@ import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from import type { AgentTask } from "@/shared/types/agent"; import { cn } from "@/lib/utils"; import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible"; +import { redactSecrets } from "../utils/redact"; // ─── Shared types & helpers ───────────────────────────────────────────────── @@ -83,9 +84,9 @@ function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] { seq: msg.seq, type: msg.type, tool: msg.tool, - content: msg.content, + content: msg.content ? redactSecrets(msg.content) : msg.content, input: msg.input, - output: msg.output, + output: msg.output ? redactSecrets(msg.output) : msg.output, }); } return items.sort((a, b) => a.seq - b.seq); @@ -425,7 +426,7 @@ function ToolCallRow({ item }: { item: TimelineItem }) { {hasInput && (
-            {JSON.stringify(item.input, null, 2)}
+            {redactSecrets(JSON.stringify(item.input, null, 2))}
           
)} diff --git a/apps/web/features/issues/utils/redact.test.ts b/apps/web/features/issues/utils/redact.test.ts new file mode 100644 index 00000000..9aee9f03 --- /dev/null +++ b/apps/web/features/issues/utils/redact.test.ts @@ -0,0 +1,88 @@ +import { describe, it, expect } from "vitest"; +import { redactSecrets } from "./redact"; + +describe("redactSecrets", () => { + it("redacts AWS access key", () => { + const result = redactSecrets("key: AKIAIOSFODNN7EXAMPLE"); + expect(result).not.toContain("AKIAIOSFODNN7EXAMPLE"); + expect(result).toContain("[REDACTED AWS KEY]"); + }); + + it("redacts AWS secret key", () => { + const result = redactSecrets("aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"); + expect(result).not.toContain("wJalrXUtnFEMI"); + }); + + it("redacts PEM private keys", () => { + const input = "-----BEGIN RSA PRIVATE KEY-----\nMIIEow...\n-----END RSA PRIVATE KEY-----"; + const result = redactSecrets(input); + expect(result).not.toContain("MIIEow"); + expect(result).toContain("[REDACTED PRIVATE KEY]"); + }); + + it("redacts GitHub tokens", () => { + const result = redactSecrets("GITHUB_TOKEN=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn"); + expect(result).not.toContain("ghp_"); + }); + + it("redacts GitLab tokens", () => { + const result = redactSecrets("glpat-AbCdEfGhIjKlMnOpQrStUvWx"); + expect(result).not.toContain("glpat-"); + expect(result).toContain("[REDACTED GITLAB TOKEN]"); + }); + + it("redacts OpenAI/Anthropic API keys", () => { + const result = redactSecrets("sk-proj-abc123def456ghi789jkl012mno345"); + expect(result).not.toContain("sk-proj"); + expect(result).toContain("[REDACTED API KEY]"); + }); + + it("redacts Slack tokens", () => { + const result = redactSecrets("xoxb-123456789012-1234567890123-AbCdEfGhIjKl"); + expect(result).not.toContain("xoxb-"); + }); + + it("redacts JWT tokens", () => { + const result = redactSecrets("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"); + expect(result).not.toContain("eyJhbGci"); + expect(result).toContain("[REDACTED JWT]"); + }); + + it("redacts Bearer tokens", () => { + const result = redactSecrets("Authorization: Bearer abc123xyz.def456"); + expect(result).toContain("Bearer [REDACTED]"); + expect(result).not.toContain("abc123xyz"); + }); + + it("redacts connection strings", () => { + const result = redactSecrets("postgres://admin:s3cret@db.example.com:5432/mydb"); + expect(result).not.toContain("s3cret"); + }); + + it("redacts generic credential env vars", () => { + for (const key of ["PASSWORD", "SECRET", "TOKEN", "DATABASE_URL", "API_KEY"]) { + const result = redactSecrets(`${key}=supersecretvalue123`); + expect(result).toContain("[REDACTED CREDENTIAL]"); + expect(result).not.toContain("supersecretvalue123"); + } + }); + + it("redacts multiple secrets in one string", () => { + const result = redactSecrets("AKIAIOSFODNN7EXAMPLE and ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn"); + expect(result).not.toContain("AKIAIOSFODNN7EXAMPLE"); + expect(result).not.toContain("ghp_"); + }); + + it("does not alter normal text", () => { + const inputs = [ + "This is a normal commit message about fixing a bug", + "The function returns skip-navigation as the class name", + "Created PR #42 for the authentication feature", + "Running tests in /tmp/test-workspace/project", + "The API endpoint /api/issues/123 was updated", + ]; + for (const input of inputs) { + expect(redactSecrets(input)).toBe(input); + } + }); +}); diff --git a/apps/web/features/issues/utils/redact.ts b/apps/web/features/issues/utils/redact.ts new file mode 100644 index 00000000..066c8fa6 --- /dev/null +++ b/apps/web/features/issues/utils/redact.ts @@ -0,0 +1,37 @@ +/** + * Client-side fallback for redacting sensitive information in agent output. + * The server performs primary redaction; this is a safety net for display. + */ + +const patterns: { re: RegExp; replacement: string }[] = [ + // AWS access key IDs + { re: /\bAKIA[0-9A-Z]{16}\b/g, replacement: "[REDACTED AWS KEY]" }, + // AWS secret access keys + { re: /(?:aws_secret_access_key|secret_?access_?key)\s*[=:]\s*[A-Za-z0-9/+=]{40}/gi, replacement: "[REDACTED AWS SECRET]" }, + // PEM private keys + { re: /-----BEGIN[A-Z\s]*PRIVATE KEY-----[\s\S]*?-----END[A-Z\s]*PRIVATE KEY-----/g, replacement: "[REDACTED PRIVATE KEY]" }, + // GitHub tokens + { re: /\b(?:ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9_]{36,255}\b/g, replacement: "[REDACTED GITHUB TOKEN]" }, + // GitLab personal access tokens + { re: /\bglpat-[A-Za-z0-9_-]{20,}\b/g, replacement: "[REDACTED GITLAB TOKEN]" }, + // OpenAI / Anthropic API keys + { re: /\bsk-[A-Za-z0-9_-]{20,}\b/g, replacement: "[REDACTED API KEY]" }, + // Slack tokens + { re: /\bxox[bporas]-[A-Za-z0-9-]{10,}\b/g, replacement: "[REDACTED SLACK TOKEN]" }, + // JWT tokens + { re: /\bey[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g, replacement: "[REDACTED JWT]" }, + // Bearer tokens + { re: /\bBearer\s+[A-Za-z0-9\-._~+/]+=*/gi, replacement: "Bearer [REDACTED]" }, + // Connection strings with embedded passwords + { re: /(?:postgres|mysql|mongodb|redis|amqp)(?:ql)?:\/\/[^:\s]+:[^@\s]+@/gi, replacement: "[REDACTED CONNECTION STRING]@" }, + // Generic key=value secret env vars + { re: /(?:API_KEY|API_SECRET|SECRET_KEY|SECRET|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|DB_URL|REDIS_URL|PASSWORD|TOKEN)\s*[=:]\s*\S+/gi, replacement: "[REDACTED CREDENTIAL]" }, +]; + +export function redactSecrets(text: string): string { + let result = text; + for (const { re, replacement } of patterns) { + result = result.replace(re, replacement); + } + return result; +} diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index b5bf102c..d91b9376 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" + "github.com/multica-ai/multica/server/pkg/redact" ) // --------------------------------------------------------------------------- @@ -430,6 +431,11 @@ func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) { } for _, msg := range req.Messages { + // Redact sensitive information before persisting or broadcasting. + msg.Content = redact.Text(msg.Content) + msg.Output = redact.Text(msg.Output) + msg.Input = redact.InputMap(msg.Input) + var inputJSON []byte if msg.Input != nil { inputJSON, _ = json.Marshal(msg.Input) diff --git a/server/pkg/redact/redact.go b/server/pkg/redact/redact.go index 878b88d8..6a64ca7b 100644 --- a/server/pkg/redact/redact.go +++ b/server/pkg/redact/redact.go @@ -35,11 +35,37 @@ var patterns = []secretPattern{ // Slack tokens {regexp.MustCompile(`\bxox[bporas]-[A-Za-z0-9\-]{10,}\b`), "[REDACTED SLACK TOKEN]"}, + // GitLab personal access tokens + {regexp.MustCompile(`\bglpat-[A-Za-z0-9_-]{20,}\b`), "[REDACTED GITLAB TOKEN]"}, + + // JWT tokens (three base64url segments) + {regexp.MustCompile(`\bey[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b`), "[REDACTED JWT]"}, + // Generic "Bearer " in output {regexp.MustCompile(`(?i)\bBearer\s+[A-Za-z0-9\-._~+/]+=*\b`), "Bearer [REDACTED]"}, + // Connection strings with embedded passwords + {regexp.MustCompile(`(?i)(?:postgres|mysql|mongodb|redis|amqp)(?:ql)?://[^:\s]+:[^@\s]+@`), "[REDACTED CONNECTION STRING]@"}, + // Generic key=value patterns for common secret env var names - {regexp.MustCompile(`(?i)(?:API_KEY|API_SECRET|SECRET_KEY|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|REDIS_URL)\s*[=:]\s*\S+`), "[REDACTED CREDENTIAL]"}, + {regexp.MustCompile(`(?i)(?:API_KEY|API_SECRET|SECRET_KEY|SECRET|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|DB_URL|REDIS_URL|PASSWORD|TOKEN)\s*[=:]\s*\S+`), "[REDACTED CREDENTIAL]"}, +} + +// InputMap returns a copy of m with all string values passed through Text. +// Non-string values are preserved as-is. +func InputMap(m map[string]any) map[string]any { + if m == nil { + return nil + } + out := make(map[string]any, len(m)) + for k, v := range m { + if s, ok := v.(string); ok { + out[k] = Text(s) + } else { + out[k] = v + } + } + return out } // homeDir is resolved once at init for path redaction. diff --git a/server/pkg/redact/redact_test.go b/server/pkg/redact/redact_test.go index 13100624..3caab19f 100644 --- a/server/pkg/redact/redact_test.go +++ b/server/pkg/redact/redact_test.go @@ -126,6 +126,83 @@ func TestNoFalsePositivesOnNormalText(t *testing.T) { } } +func TestRedactGitLabToken(t *testing.T) { + t.Parallel() + input := "GITLAB_TOKEN=glpat-AbCdEfGhIjKlMnOpQrStUvWx" + got := Text(input) + if strings.Contains(got, "glpat-") { + t.Fatalf("GitLab token not redacted: %s", got) + } +} + +func TestRedactJWT(t *testing.T) { + t.Parallel() + input := "token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" + got := Text(input) + if strings.Contains(got, "eyJhbGci") { + t.Fatalf("JWT not redacted: %s", got) + } +} + +func TestRedactConnectionString(t *testing.T) { + t.Parallel() + input := "connecting to postgres://admin:s3cret@db.example.com:5432/mydb" + got := Text(input) + if strings.Contains(got, "s3cret") { + t.Fatalf("connection string password not redacted: %s", got) + } +} + +func TestRedactPasswordEnvVar(t *testing.T) { + t.Parallel() + cases := []struct { + name string + input string + }{ + {"PASSWORD", "PASSWORD=hunter2"}, + {"SECRET", "SECRET=mysecretvalue"}, + {"TOKEN", "TOKEN=abc123xyz"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := Text(tc.input) + if !strings.Contains(got, "[REDACTED CREDENTIAL]") { + t.Fatalf("expected credential redaction for %s, got: %s", tc.name, got) + } + }) + } +} + +func TestInputMap(t *testing.T) { + t.Parallel() + m := map[string]any{ + "command": "echo sk-proj-abc123def456ghi789jkl012mno345", + "file_path": "/tmp/test.txt", + "count": 42, + } + got := InputMap(m) + if s, ok := got["command"].(string); ok { + if strings.Contains(s, "sk-proj") { + t.Fatalf("API key in input map not redacted: %s", s) + } + } + // Non-string values preserved + if got["count"] != 42 { + t.Fatalf("non-string value altered: %v", got["count"]) + } + // Clean strings unchanged + if got["file_path"] != "/tmp/test.txt" { + t.Fatalf("clean string altered: %v", got["file_path"]) + } +} + +func TestInputMapNil(t *testing.T) { + t.Parallel() + if got := InputMap(nil); got != nil { + t.Fatalf("expected nil, got: %v", got) + } +} + func TestRedactMultipleSecrets(t *testing.T) { t.Parallel() input := "Keys: AKIAIOSFODNN7EXAMPLE and ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn"