diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx index 71424fa8..f47fa34d 100644 --- a/apps/web/features/issues/components/agent-live-card.tsx +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -1,7 +1,7 @@ "use client"; import { useState, useEffect, useCallback, useRef } from "react"; -import { Bot, ChevronRight, Loader2, Terminal, FileText, AlertCircle, ArrowDown } from "lucide-react"; +import { Bot, ChevronRight, Loader2, ArrowDown, Brain, AlertCircle, Clock, CheckCircle2, XCircle } from "lucide-react"; import { api } from "@/shared/api"; import { useWSEvent } from "@/features/realtime"; import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from "@/shared/types/events"; @@ -9,23 +9,16 @@ import type { AgentTask } from "@/shared/types/agent"; import { cn } from "@/lib/utils"; import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible"; -interface AgentLiveCardProps { - issueId: string; - assigneeType: string | null; - assigneeId: string | null; - agentName?: string; -} +// ─── Shared types & helpers ───────────────────────────────────────────────── -// Icons for common tool names -function ToolIcon({ tool }: { tool: string }) { - const name = tool.toLowerCase(); - if (name.includes("bash") || name.includes("shell") || name.includes("terminal")) { - return ; - } - if (name.includes("read") || name.includes("write") || name.includes("edit") || name.includes("glob") || name.includes("grep")) { - return ; - } - return ; +/** A unified timeline entry: tool calls, thinking, text, and errors in chronological order. */ +interface TimelineItem { + seq: number; + type: "tool_use" | "tool_result" | "thinking" | "text" | "error"; + tool?: string; + content?: string; + input?: Record; + output?: string; } function formatElapsed(startedAt: string): string { @@ -37,22 +30,83 @@ function formatElapsed(startedAt: string): string { return `${minutes}m ${secs}s`; } -interface ToolCallEntry { - seq: number; - tool: string; - input?: Record; - output?: string; +function formatDuration(start: string, end: string): string { + const ms = new Date(end).getTime() - new Date(start).getTime(); + const seconds = Math.floor(ms / 1000); + if (seconds < 60) return `${seconds}s`; + const minutes = Math.floor(seconds / 60); + const secs = seconds % 60; + return `${minutes}m ${secs}s`; +} + +function shortenPath(p: string): string { + const parts = p.split("/"); + if (parts.length <= 3) return p; + return ".../" + parts.slice(-2).join("/"); +} + +function getToolSummary(item: TimelineItem): string { + if (!item.input) return ""; + const inp = item.input as Record; + + // WebSearch / web search + if (inp.query) return inp.query; + // File operations + if (inp.file_path) return shortenPath(inp.file_path); + if (inp.path) return shortenPath(inp.path); + if (inp.pattern) return inp.pattern; + // Bash + if (inp.description) return String(inp.description); + if (inp.command) { + const cmd = String(inp.command); + return cmd.length > 100 ? cmd.slice(0, 100) + "..." : cmd; + } + // Agent + if (inp.prompt) { + const p = String(inp.prompt); + return p.length > 100 ? p.slice(0, 100) + "..." : p; + } + // Skill + if (inp.skill) return String(inp.skill); + // Fallback: show first string value + for (const v of Object.values(inp)) { + if (typeof v === "string" && v.length > 0 && v.length < 120) return v; + } + return ""; +} + +/** Build a chronologically ordered timeline from raw messages. */ +function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] { + const items: TimelineItem[] = []; + for (const msg of msgs) { + items.push({ + seq: msg.seq, + type: msg.type, + tool: msg.tool, + content: msg.content, + input: msg.input, + output: msg.output, + }); + } + return items.sort((a, b) => a.seq - b.seq); +} + +// ─── AgentLiveCard (real-time view) ──────────────────────────────────────── + +interface AgentLiveCardProps { + issueId: string; + assigneeType: string | null; + assigneeId: string | null; + agentName?: string; } export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: AgentLiveCardProps) { const [activeTask, setActiveTask] = useState(null); - const [messages, setMessages] = useState([]); - const [toolCalls, setToolCalls] = useState([]); - const [currentText, setCurrentText] = useState(""); + const [items, setItems] = useState([]); const [elapsed, setElapsed] = useState(""); const [autoScroll, setAutoScroll] = useState(true); const scrollRef = useRef(null); - const contentRef = useRef(null); + const seenSeqs = useRef(new Set()); // Check for active task on mount useEffect(() => { @@ -65,11 +119,12 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: api.getActiveTaskForIssue(issueId).then(({ task }) => { if (!cancelled) { setActiveTask(task); - // If there's an active task, fetch existing messages for catch-up if (task) { api.listTaskMessages(task.id).then((msgs) => { if (!cancelled) { - applyMessages(msgs); + const timeline = buildTimeline(msgs); + setItems(timeline); + for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`); } }).catch(() => {}); } @@ -79,92 +134,41 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: return () => { cancelled = true; }; }, [issueId, assigneeType, assigneeId]); - // Process messages into tool calls and text - const applyMessages = useCallback((msgs: TaskMessagePayload[]) => { - const newToolCalls: ToolCallEntry[] = []; - let text = ""; - - for (const msg of msgs) { - switch (msg.type) { - case "tool_use": - newToolCalls.push({ seq: msg.seq, tool: msg.tool ?? "", input: msg.input }); - break; - case "tool_result": - // Attach output to matching tool call - for (let i = newToolCalls.length - 1; i >= 0; i--) { - const tc = newToolCalls[i]; - if (tc && tc.tool === msg.tool && !tc.output) { - tc.output = msg.output; - break; - } - } - break; - case "text": - text += msg.content ?? ""; - break; - case "error": - text += `\n[Error] ${msg.content ?? ""}\n`; - break; - } - } - - setToolCalls(newToolCalls); - setCurrentText(text); - setMessages(msgs); - }, []); - // Handle real-time task messages useWSEvent( "task:message", useCallback((payload: unknown) => { const msg = payload as TaskMessagePayload; if (msg.issue_id !== issueId) return; + const key = `${msg.task_id}:${msg.seq}`; + if (seenSeqs.current.has(key)) return; + seenSeqs.current.add(key); - setMessages((prev) => { - if (prev.some((m) => m.seq === msg.seq && m.task_id === msg.task_id)) return prev; - return [...prev, msg]; + setItems((prev) => { + const item: TimelineItem = { + seq: msg.seq, + type: msg.type, + tool: msg.tool, + content: msg.content, + input: msg.input, + output: msg.output, + }; + const next = [...prev, item]; + next.sort((a, b) => a.seq - b.seq); + return next; }); - - switch (msg.type) { - case "tool_use": - setToolCalls((prev) => [ - ...prev, - { seq: msg.seq, tool: msg.tool ?? "", input: msg.input }, - ]); - break; - case "tool_result": - setToolCalls((prev) => { - const updated = [...prev]; - for (let i = updated.length - 1; i >= 0; i--) { - const tc = updated[i]; - if (tc && tc.tool === msg.tool && !tc.output) { - updated[i] = { ...tc, output: msg.output }; - break; - } - } - return updated; - }); - break; - case "text": - setCurrentText((prev) => prev + (msg.content ?? "")); - break; - case "error": - setCurrentText((prev) => prev + `\n[Error] ${msg.content ?? ""}\n`); - break; - } }, [issueId]), ); - // Handle task completion - hide the live card + // Handle task completion/failure useWSEvent( "task:completed", useCallback((payload: unknown) => { const p = payload as TaskCompletedPayload; if (p.issue_id !== issueId) return; setActiveTask(null); - setMessages([]); - setToolCalls([]); - setCurrentText(""); + setItems([]); + seenSeqs.current.clear(); }, [issueId]), ); @@ -174,37 +178,31 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: const p = payload as TaskFailedPayload; if (p.issue_id !== issueId) return; setActiveTask(null); - setMessages([]); - setToolCalls([]); - setCurrentText(""); + setItems([]); + seenSeqs.current.clear(); }, [issueId]), ); - // Also pick up new tasks starting (task:dispatch) + // Pick up new tasks useWSEvent( "task:dispatch", - useCallback((payload: unknown) => { - const p = payload as { task_id: string; issue_id?: string }; - // We don't have issue_id in dispatch payload, re-fetch + useCallback(() => { api.getActiveTaskForIssue(issueId).then(({ task }) => { if (task) { setActiveTask(task); - setMessages([]); - setToolCalls([]); - setCurrentText(""); + setItems([]); + seenSeqs.current.clear(); } }).catch(() => {}); }, [issueId]), ); - // Update elapsed time + // Elapsed time useEffect(() => { if (!activeTask?.started_at && !activeTask?.dispatched_at) return; const ref = activeTask.started_at ?? activeTask.dispatched_at!; setElapsed(formatElapsed(ref)); - const interval = setInterval(() => { - setElapsed(formatElapsed(ref)); - }, 1000); + const interval = setInterval(() => setElapsed(formatElapsed(ref)), 1000); return () => clearInterval(interval); }, [activeTask?.started_at, activeTask?.dispatched_at]); @@ -213,7 +211,7 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: if (autoScroll && scrollRef.current) { scrollRef.current.scrollTop = scrollRef.current.scrollHeight; } - }, [toolCalls, currentText, autoScroll]); + }, [items, autoScroll]); const handleScroll = useCallback(() => { if (!scrollRef.current) return; @@ -223,50 +221,38 @@ export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: if (!activeTask) return null; - const lastTextLines = currentText.trim().split("\n").filter(Boolean); - const lastLine = lastTextLines[lastTextLines.length - 1] ?? ""; + const toolCount = items.filter((i) => i.type === "tool_use").length; return (
{/* Header */}
-
+
-
- - {agentName ?? "Agent"} is working +
+ + {agentName ?? "Agent"} is working
- {elapsed} - {toolCalls.length > 0 && ( - - {toolCalls.length} tool {toolCalls.length === 1 ? "call" : "calls"} + {elapsed} + {toolCount > 0 && ( + + {toolCount} tool {toolCount === 1 ? "call" : "calls"} )}
- {/* Content */} - {(toolCalls.length > 0 || currentText) && ( + {/* Timeline content */} + {items.length > 0 && (
-
- {toolCalls.map((tc, idx) => ( - - ))} + {items.map((item, idx) => ( + + ))} - {/* Current thinking text (last line only) */} - {lastLine && ( -
- - {lastLine} -
- )} -
- - {/* Scroll to bottom button */} {!autoScroll && (
+ {/* Agent execution history */} +
+ +
+ {/* Timeline entries */}
{(() => { diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 89e2f107..8483a1ad 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -318,6 +318,10 @@ export class ApiClient { return this.fetch(`/api/daemon/tasks/${taskId}/messages`); } + async listTasksByIssue(issueId: string): Promise { + return this.fetch(`/api/issues/${issueId}/task-runs`); + } + async getDaemonPairingSession(token: string): Promise { return this.fetch(`/api/daemon/pairing-sessions/${token}`); } diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts index d5eab3bf..4e22b5e2 100644 --- a/apps/web/shared/types/events.ts +++ b/apps/web/shared/types/events.ts @@ -153,7 +153,7 @@ export interface TaskMessagePayload { task_id: string; issue_id: string; seq: number; - type: "text" | "tool_use" | "tool_result" | "error"; + type: "text" | "thinking" | "tool_use" | "tool_result" | "error"; tool?: string; content?: string; input?: Record; diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index ffa0c714..be9e7ff4 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -167,6 +167,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Post("/subscribe", h.SubscribeToIssue) r.Post("/unsubscribe", h.UnsubscribeFromIssue) r.Get("/active-task", h.GetActiveTaskForIssue) + r.Get("/task-runs", h.ListTasksByIssue) }) }) diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 158370a4..d9b66c58 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -827,10 +827,22 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo var seq atomic.Int32 var mu sync.Mutex var pendingText strings.Builder + var pendingThinking strings.Builder var batch []TaskMessageData + callIDToTool := map[string]string{} // track callID → tool name for tool_result flush := func() { mu.Lock() + // Flush any accumulated thinking as a single message. + if pendingThinking.Len() > 0 { + s := seq.Add(1) + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "thinking", + Content: pendingThinking.String(), + }) + pendingThinking.Reset() + } // Flush any accumulated text as a single message. if pendingText.Len() > 0 { s := seq.Add(1) @@ -854,7 +866,7 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo } } - // Periodically flush accumulated text messages. + // Periodically flush accumulated text/thinking messages. ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() @@ -875,30 +887,47 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo case agent.MessageToolUse: n := toolCount.Add(1) taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool)) + if msg.CallID != "" { + mu.Lock() + callIDToTool[msg.CallID] = msg.Tool + mu.Unlock() + } s := seq.Add(1) mu.Lock() batch = append(batch, TaskMessageData{ - Seq: int(s), - Type: "tool_use", - Tool: msg.Tool, + Seq: int(s), + Type: "tool_use", + Tool: msg.Tool, Input: msg.Input, }) mu.Unlock() case agent.MessageToolResult: s := seq.Add(1) - // Truncate large tool results for the live feed. output := msg.Output if len(output) > 8192 { output = output[:8192] } + // Resolve tool name from callID if not set directly. + toolName := msg.Tool + if toolName == "" && msg.CallID != "" { + mu.Lock() + toolName = callIDToTool[msg.CallID] + mu.Unlock() + } mu.Lock() batch = append(batch, TaskMessageData{ Seq: int(s), Type: "tool_result", - Tool: msg.Tool, + Tool: toolName, Output: output, }) mu.Unlock() + case agent.MessageThinking: + if msg.Content != "" { + mu.Lock() + pendingThinking.WriteString(msg.Content) + mu.Unlock() + } case agent.MessageText: if msg.Content != "" { taskLog.Debug("agent", "text", truncateLog(msg.Content, 200)) diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index fd209ec9..b5bf102c 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -503,3 +503,21 @@ func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) } + +// ListTasksByIssue returns all tasks (any status) for an issue — used for execution history. +func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + + tasks, err := h.Queries.ListTasksByIssue(r.Context(), parseUUID(issueID)) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list tasks") + return + } + + resp := make([]AgentTaskResponse, len(tasks)) + for i, t := range tasks { + resp[i] = taskToResponse(t) + } + + writeJSON(w, http.StatusOK, resp) +} diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go index d19887ed..d80a2641 100644 --- a/server/pkg/agent/agent.go +++ b/server/pkg/agent/agent.go @@ -42,6 +42,7 @@ type MessageType string const ( MessageText MessageType = "text" + MessageThinking MessageType = "thinking" MessageToolUse MessageType = "tool-use" MessageToolResult MessageType = "tool-result" MessageStatus MessageType = "status" diff --git a/server/pkg/agent/claude.go b/server/pkg/agent/claude.go index 610df39c..c1b78406 100644 --- a/server/pkg/agent/claude.go +++ b/server/pkg/agent/claude.go @@ -181,6 +181,10 @@ func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message, output.WriteString(block.Text) trySend(ch, Message{Type: MessageText, Content: block.Text}) } + case "thinking": + if block.Text != "" { + trySend(ch, Message{Type: MessageThinking, Content: block.Text}) + } case "tool_use": var input map[string]any if block.Input != nil { diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 3657c088..1de2793f 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -621,6 +621,48 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp return items, nil } +const listTasksByIssue = `-- name: ListTasksByIssue :many +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue +WHERE issue_id = $1 +ORDER BY created_at DESC +` + +func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, listTasksByIssue, issueID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const startAgentTask = `-- name: StartAgentTask :one UPDATE agent_task_queue SET status = 'running', started_at = now() diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index dce917ba..4540a7e9 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -134,6 +134,11 @@ SELECT * FROM agent_task_queue WHERE issue_id = $1 AND status IN ('dispatched', 'running') ORDER BY created_at DESC; +-- name: ListTasksByIssue :many +SELECT * FROM agent_task_queue +WHERE issue_id = $1 +ORDER BY created_at DESC; + -- name: UpdateAgentStatus :one UPDATE agent SET status = $2, updated_at = now() WHERE id = $1