diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx new file mode 100644 index 00000000..71424fa8 --- /dev/null +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -0,0 +1,354 @@ +"use client"; + +import { useState, useEffect, useCallback, useRef } from "react"; +import { Bot, ChevronRight, Loader2, Terminal, FileText, AlertCircle, ArrowDown } from "lucide-react"; +import { api } from "@/shared/api"; +import { useWSEvent } from "@/features/realtime"; +import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from "@/shared/types/events"; +import type { AgentTask } from "@/shared/types/agent"; +import { cn } from "@/lib/utils"; +import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible"; + +interface AgentLiveCardProps { + issueId: string; + assigneeType: string | null; + assigneeId: string | null; + agentName?: string; +} + +// Icons for common tool names +function ToolIcon({ tool }: { tool: string }) { + const name = tool.toLowerCase(); + if (name.includes("bash") || name.includes("shell") || name.includes("terminal")) { + return ; + } + if (name.includes("read") || name.includes("write") || name.includes("edit") || name.includes("glob") || name.includes("grep")) { + return ; + } + return ; +} + +function formatElapsed(startedAt: string): string { + const elapsed = Date.now() - new Date(startedAt).getTime(); + const seconds = Math.floor(elapsed / 1000); + if (seconds < 60) return `${seconds}s`; + const minutes = Math.floor(seconds / 60); + const secs = seconds % 60; + return `${minutes}m ${secs}s`; +} + +interface ToolCallEntry { + seq: number; + tool: string; + input?: Record; + output?: string; +} + +export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: AgentLiveCardProps) { + const [activeTask, setActiveTask] = useState(null); + const [messages, setMessages] = useState([]); + const [toolCalls, setToolCalls] = useState([]); + const [currentText, setCurrentText] = useState(""); + const [elapsed, setElapsed] = useState(""); + const [autoScroll, setAutoScroll] = useState(true); + const scrollRef = useRef(null); + const contentRef = useRef(null); + + // Check for active task on mount + useEffect(() => { + if (assigneeType !== "agent" || !assigneeId) { + setActiveTask(null); + return; + } + + let cancelled = false; + api.getActiveTaskForIssue(issueId).then(({ task }) => { + if (!cancelled) { + setActiveTask(task); + // If there's an active task, fetch existing messages for catch-up + if (task) { + api.listTaskMessages(task.id).then((msgs) => { + if (!cancelled) { + applyMessages(msgs); + } + }).catch(() => {}); + } + } + }).catch(() => {}); + + return () => { cancelled = true; }; + }, [issueId, assigneeType, assigneeId]); + + // Process messages into tool calls and text + const applyMessages = useCallback((msgs: TaskMessagePayload[]) => { + const newToolCalls: ToolCallEntry[] = []; + let text = ""; + + for (const msg of msgs) { + switch (msg.type) { + case "tool_use": + newToolCalls.push({ seq: msg.seq, tool: msg.tool ?? "", input: msg.input }); + break; + case "tool_result": + // Attach output to matching tool call + for (let i = newToolCalls.length - 1; i >= 0; i--) { + const tc = newToolCalls[i]; + if (tc && tc.tool === msg.tool && !tc.output) { + tc.output = msg.output; + break; + } + } + break; + case "text": + text += msg.content ?? ""; + break; + case "error": + text += `\n[Error] ${msg.content ?? ""}\n`; + break; + } + } + + setToolCalls(newToolCalls); + setCurrentText(text); + setMessages(msgs); + }, []); + + // Handle real-time task messages + useWSEvent( + "task:message", + useCallback((payload: unknown) => { + const msg = payload as TaskMessagePayload; + if (msg.issue_id !== issueId) return; + + setMessages((prev) => { + if (prev.some((m) => m.seq === msg.seq && m.task_id === msg.task_id)) return prev; + return [...prev, msg]; + }); + + switch (msg.type) { + case "tool_use": + setToolCalls((prev) => [ + ...prev, + { seq: msg.seq, tool: msg.tool ?? "", input: msg.input }, + ]); + break; + case "tool_result": + setToolCalls((prev) => { + const updated = [...prev]; + for (let i = updated.length - 1; i >= 0; i--) { + const tc = updated[i]; + if (tc && tc.tool === msg.tool && !tc.output) { + updated[i] = { ...tc, output: msg.output }; + break; + } + } + return updated; + }); + break; + case "text": + setCurrentText((prev) => prev + (msg.content ?? "")); + break; + case "error": + setCurrentText((prev) => prev + `\n[Error] ${msg.content ?? ""}\n`); + break; + } + }, [issueId]), + ); + + // Handle task completion - hide the live card + useWSEvent( + "task:completed", + useCallback((payload: unknown) => { + const p = payload as TaskCompletedPayload; + if (p.issue_id !== issueId) return; + setActiveTask(null); + setMessages([]); + setToolCalls([]); + setCurrentText(""); + }, [issueId]), + ); + + useWSEvent( + "task:failed", + useCallback((payload: unknown) => { + const p = payload as TaskFailedPayload; + if (p.issue_id !== issueId) return; + setActiveTask(null); + setMessages([]); + setToolCalls([]); + setCurrentText(""); + }, [issueId]), + ); + + // Also pick up new tasks starting (task:dispatch) + useWSEvent( + "task:dispatch", + useCallback((payload: unknown) => { + const p = payload as { task_id: string; issue_id?: string }; + // We don't have issue_id in dispatch payload, re-fetch + api.getActiveTaskForIssue(issueId).then(({ task }) => { + if (task) { + setActiveTask(task); + setMessages([]); + setToolCalls([]); + setCurrentText(""); + } + }).catch(() => {}); + }, [issueId]), + ); + + // Update elapsed time + useEffect(() => { + if (!activeTask?.started_at && !activeTask?.dispatched_at) return; + const ref = activeTask.started_at ?? activeTask.dispatched_at!; + setElapsed(formatElapsed(ref)); + const interval = setInterval(() => { + setElapsed(formatElapsed(ref)); + }, 1000); + return () => clearInterval(interval); + }, [activeTask?.started_at, activeTask?.dispatched_at]); + + // Auto-scroll + useEffect(() => { + if (autoScroll && scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [toolCalls, currentText, autoScroll]); + + const handleScroll = useCallback(() => { + if (!scrollRef.current) return; + const { scrollTop, scrollHeight, clientHeight } = scrollRef.current; + setAutoScroll(scrollHeight - scrollTop - clientHeight < 40); + }, []); + + if (!activeTask) return null; + + const lastTextLines = currentText.trim().split("\n").filter(Boolean); + const lastLine = lastTextLines[lastTextLines.length - 1] ?? ""; + + return ( +
+ {/* Header */} +
+
+ +
+
+ + {agentName ?? "Agent"} is working +
+ {elapsed} + {toolCalls.length > 0 && ( + + {toolCalls.length} tool {toolCalls.length === 1 ? "call" : "calls"} + + )} +
+ + {/* Content */} + {(toolCalls.length > 0 || currentText) && ( +
+
+ {toolCalls.map((tc, idx) => ( + + ))} + + {/* Current thinking text (last line only) */} + {lastLine && ( +
+ + {lastLine} +
+ )} +
+ + {/* Scroll to bottom button */} + {!autoScroll && ( + + )} +
+ )} +
+ ); +} + +function ToolCallRow({ entry }: { entry: ToolCallEntry }) { + const [open, setOpen] = useState(false); + + // Extract a short summary from tool input + const summary = getToolSummary(entry); + const hasDetails = entry.output || (entry.input && Object.keys(entry.input).length > 0); + + return ( + + + + + {entry.tool} + {summary && {summary}} + {entry.output !== undefined && ( + + )} + {entry.output === undefined && ( + + )} + + + {entry.output && ( +
+            {entry.output.length > 2000 ? entry.output.slice(0, 2000) + "\n..." : entry.output}
+          
+ )} +
+
+ ); +} + +function getToolSummary(entry: ToolCallEntry): string { + if (!entry.input) return ""; + const { file_path, path, pattern, command, description } = entry.input as Record; + + // Shorten file paths + if (file_path) return shortenPath(file_path); + if (path) return shortenPath(path); + if (pattern) return pattern; + if (description) return description; + if (command) { + const cmd = String(command); + return cmd.length > 80 ? cmd.slice(0, 80) + "..." : cmd; + } + return ""; +} + +function shortenPath(p: string): string { + const parts = p.split("/"); + if (parts.length <= 3) return p; + return ".../" + parts.slice(-2).join("/"); +} diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx index 987d4f77..de516bc4 100644 --- a/apps/web/features/issues/components/issue-detail.tsx +++ b/apps/web/features/issues/components/issue-detail.tsx @@ -60,6 +60,7 @@ import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/ import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components"; import { CommentCard } from "./comment-card"; import { CommentInput } from "./comment-input"; +import { AgentLiveCard } from "./agent-live-card"; import { api } from "@/shared/api"; import { useAuthStore } from "@/features/auth"; import { useWorkspaceStore, useActorName } from "@/features/workspace"; @@ -856,6 +857,16 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo + {/* Agent live output */} +
+ +
+ {/* Timeline entries */}
{(() => { diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 5516621b..89e2f107 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -32,6 +32,7 @@ import type { RuntimeHourlyActivity, RuntimePing, TimelineEntry, + TaskMessagePayload, } from "@/shared/types"; import { type Logger, noopLogger } from "@/shared/logger"; @@ -309,6 +310,14 @@ export class ApiClient { return this.fetch(`/api/agents/${agentId}/tasks`); } + async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> { + return this.fetch(`/api/issues/${issueId}/active-task`); + } + + async listTaskMessages(taskId: string): Promise { + return this.fetch(`/api/daemon/tasks/${taskId}/messages`); + } + async getDaemonPairingSession(token: string): Promise { return this.fetch(`/api/daemon/pairing-sessions/${token}`); } diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts index 15991d07..d5eab3bf 100644 --- a/apps/web/shared/types/events.ts +++ b/apps/web/shared/types/events.ts @@ -20,6 +20,7 @@ export type WSEventType = | "task:progress" | "task:completed" | "task:failed" + | "task:message" | "inbox:new" | "inbox:read" | "inbox:archived" @@ -147,3 +148,28 @@ export interface ActivityCreatedPayload { issue_id: string; entry: TimelineEntry; } + +export interface TaskMessagePayload { + task_id: string; + issue_id: string; + seq: number; + type: "text" | "tool_use" | "tool_result" | "error"; + tool?: string; + content?: string; + input?: Record; + output?: string; +} + +export interface TaskCompletedPayload { + task_id: string; + agent_id: string; + issue_id: string; + status: string; +} + +export interface TaskFailedPayload { + task_id: string; + agent_id: string; + issue_id: string; + status: string; +} diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 8797a35b..ffa0c714 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -99,6 +99,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress) r.Post("/tasks/{taskId}/complete", h.CompleteTask) r.Post("/tasks/{taskId}/fail", h.FailTask) + r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages) + r.Get("/tasks/{taskId}/messages", h.ListTaskMessages) }) // Protected API routes @@ -164,6 +166,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Get("/subscribers", h.ListIssueSubscribers) r.Post("/subscribe", h.SubscribeToIssue) r.Post("/unsubscribe", h.UnsubscribeFromIssue) + r.Get("/active-task", h.GetActiveTaskForIssue) }) }) diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go index 1c34afb2..c3bdae90 100644 --- a/server/internal/daemon/client.go +++ b/server/internal/daemon/client.go @@ -83,6 +83,22 @@ func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, ste }, nil) } +// TaskMessageData represents a single agent execution message for batch reporting. +type TaskMessageData struct { + Seq int `json:"seq"` + Type string `json:"type"` + Tool string `json:"tool,omitempty"` + Content string `json:"content,omitempty"` + Input map[string]any `json:"input,omitempty"` + Output string `json:"output,omitempty"` +} + +func (c *Client) ReportTaskMessages(ctx context.Context, taskID string, messages []TaskMessageData) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/messages", taskID), map[string]any{ + "messages": messages, + }, nil) +} + func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error { body := map[string]any{"output": output} if branchName != "" { diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 6ee0e0f6..158370a4 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -821,22 +821,106 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo return TaskResult{}, err } - // Drain message channel — log tool uses and agent text for visibility. + // Drain message channel — forward to server for live output + log locally. var toolCount atomic.Int32 go func() { + var seq atomic.Int32 + var mu sync.Mutex + var pendingText strings.Builder + var batch []TaskMessageData + + flush := func() { + mu.Lock() + // Flush any accumulated text as a single message. + if pendingText.Len() > 0 { + s := seq.Add(1) + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "text", + Content: pendingText.String(), + }) + pendingText.Reset() + } + toSend := batch + batch = nil + mu.Unlock() + + if len(toSend) > 0 { + sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := d.client.ReportTaskMessages(sendCtx, task.ID, toSend); err != nil { + taskLog.Debug("failed to report task messages", "error", err) + } + cancel() + } + } + + // Periodically flush accumulated text messages. + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + done := make(chan struct{}) + go func() { + for { + select { + case <-ticker.C: + flush() + case <-done: + return + } + } + }() + for msg := range session.Messages { switch msg.Type { case agent.MessageToolUse: n := toolCount.Add(1) taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool)) + s := seq.Add(1) + mu.Lock() + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "tool_use", + Tool: msg.Tool, + Input: msg.Input, + }) + mu.Unlock() + case agent.MessageToolResult: + s := seq.Add(1) + // Truncate large tool results for the live feed. + output := msg.Output + if len(output) > 8192 { + output = output[:8192] + } + mu.Lock() + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "tool_result", + Tool: msg.Tool, + Output: output, + }) + mu.Unlock() case agent.MessageText: if msg.Content != "" { taskLog.Debug("agent", "text", truncateLog(msg.Content, 200)) + mu.Lock() + pendingText.WriteString(msg.Content) + mu.Unlock() } case agent.MessageError: taskLog.Error("agent error", "content", msg.Content) + s := seq.Add(1) + mu.Lock() + batch = append(batch, TaskMessageData{ + Seq: int(s), + Type: "error", + Content: msg.Content, + }) + mu.Unlock() } } + + close(done) + flush() // Final flush after channel closes. }() result := <-session.Result diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index 30eb5d32..fd209ec9 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" ) @@ -385,3 +386,120 @@ func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) { slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error) writeJSON(w, http.StatusOK, taskToResponse(*task)) } + +// --------------------------------------------------------------------------- +// Task Messages (live agent output) +// --------------------------------------------------------------------------- + +type TaskMessageRequest struct { + Seq int `json:"seq"` + Type string `json:"type"` + Tool string `json:"tool,omitempty"` + Content string `json:"content,omitempty"` + Input map[string]any `json:"input,omitempty"` + Output string `json:"output,omitempty"` +} + +type TaskMessageBatchRequest struct { + Messages []TaskMessageRequest `json:"messages"` +} + +// ReportTaskMessages receives a batch of agent execution messages from the daemon. +func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) { + taskID := chi.URLParam(r, "taskId") + + var req TaskMessageBatchRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + if len(req.Messages) == 0 { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + return + } + + task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) + if err != nil { + writeError(w, http.StatusNotFound, "task not found") + return + } + + workspaceID := "" + if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { + workspaceID = uuidToString(issue.WorkspaceID) + } + + for _, msg := range req.Messages { + var inputJSON []byte + if msg.Input != nil { + inputJSON, _ = json.Marshal(msg.Input) + } + h.Queries.CreateTaskMessage(r.Context(), db.CreateTaskMessageParams{ + TaskID: parseUUID(taskID), + Seq: int32(msg.Seq), + Type: msg.Type, + Tool: pgtype.Text{String: msg.Tool, Valid: msg.Tool != ""}, + Content: pgtype.Text{String: msg.Content, Valid: msg.Content != ""}, + Input: inputJSON, + Output: pgtype.Text{String: msg.Output, Valid: msg.Output != ""}, + }) + + if workspaceID != "" { + h.publish(protocol.EventTaskMessage, workspaceID, "system", "", protocol.TaskMessagePayload{ + TaskID: taskID, + IssueID: uuidToString(task.IssueID), + Seq: msg.Seq, + Type: msg.Type, + Tool: msg.Tool, + Content: msg.Content, + Input: msg.Input, + Output: msg.Output, + }) + } + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// ListTaskMessages returns the persisted messages for a task (for catch-up after reconnect). +func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) { + taskID := chi.URLParam(r, "taskId") + + messages, err := h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID)) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list task messages") + return + } + + resp := make([]protocol.TaskMessagePayload, len(messages)) + for i, m := range messages { + var input map[string]any + if m.Input != nil { + json.Unmarshal(m.Input, &input) + } + resp[i] = protocol.TaskMessagePayload{ + TaskID: taskID, + Seq: int(m.Seq), + Type: m.Type, + Tool: m.Tool.String, + Content: m.Content.String, + Input: input, + Output: m.Output.String, + } + } + + writeJSON(w, http.StatusOK, resp) +} + +// GetActiveTaskForIssue returns the currently running task for an issue, if any. +func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + + tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID)) + if err != nil || len(tasks) == 0 { + writeJSON(w, http.StatusOK, map[string]any{"task": nil}) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) +} diff --git a/server/migrations/026_task_messages.down.sql b/server/migrations/026_task_messages.down.sql new file mode 100644 index 00000000..a4161d48 --- /dev/null +++ b/server/migrations/026_task_messages.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS task_message; diff --git a/server/migrations/026_task_messages.up.sql b/server/migrations/026_task_messages.up.sql new file mode 100644 index 00000000..db0163d6 --- /dev/null +++ b/server/migrations/026_task_messages.up.sql @@ -0,0 +1,13 @@ +CREATE TABLE task_message ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES agent_task_queue(id) ON DELETE CASCADE, + seq INTEGER NOT NULL, + type TEXT NOT NULL, + tool TEXT, + content TEXT, + input JSONB, + output TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_task_message_task_id_seq ON task_message(task_id, seq); diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 569718dc..3657c088 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -451,6 +451,48 @@ func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUI return has_pending, err } +const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue +WHERE issue_id = $1 AND status IN ('dispatched', 'running') +ORDER BY created_at DESC +` + +func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listAgentTasks = `-- name: ListAgentTasks :many SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue WHERE agent_id = $1 diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index c91bfe1d..2b27c9d3 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -241,6 +241,18 @@ type SkillFile struct { UpdatedAt pgtype.Timestamptz `json:"updated_at"` } +type TaskMessage struct { + ID pgtype.UUID `json:"id"` + TaskID pgtype.UUID `json:"task_id"` + Seq int32 `json:"seq"` + Type string `json:"type"` + Tool pgtype.Text `json:"tool"` + Content pgtype.Text `json:"content"` + Input []byte `json:"input"` + Output pgtype.Text `json:"output"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + type User struct { ID pgtype.UUID `json:"id"` Name string `json:"name"` diff --git a/server/pkg/db/generated/task_message.sql.go b/server/pkg/db/generated/task_message.sql.go new file mode 100644 index 00000000..c1d09327 --- /dev/null +++ b/server/pkg/db/generated/task_message.sql.go @@ -0,0 +1,140 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: task_message.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const createTaskMessage = `-- name: CreateTaskMessage :one +INSERT INTO task_message (task_id, seq, type, tool, content, input, output) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING id, task_id, seq, type, tool, content, input, output, created_at +` + +type CreateTaskMessageParams struct { + TaskID pgtype.UUID `json:"task_id"` + Seq int32 `json:"seq"` + Type string `json:"type"` + Tool pgtype.Text `json:"tool"` + Content pgtype.Text `json:"content"` + Input []byte `json:"input"` + Output pgtype.Text `json:"output"` +} + +func (q *Queries) CreateTaskMessage(ctx context.Context, arg CreateTaskMessageParams) (TaskMessage, error) { + row := q.db.QueryRow(ctx, createTaskMessage, + arg.TaskID, + arg.Seq, + arg.Type, + arg.Tool, + arg.Content, + arg.Input, + arg.Output, + ) + var i TaskMessage + err := row.Scan( + &i.ID, + &i.TaskID, + &i.Seq, + &i.Type, + &i.Tool, + &i.Content, + &i.Input, + &i.Output, + &i.CreatedAt, + ) + return i, err +} + +const deleteTaskMessages = `-- name: DeleteTaskMessages :exec +DELETE FROM task_message +WHERE task_id = $1 +` + +func (q *Queries) DeleteTaskMessages(ctx context.Context, taskID pgtype.UUID) error { + _, err := q.db.Exec(ctx, deleteTaskMessages, taskID) + return err +} + +const listTaskMessages = `-- name: ListTaskMessages :many +SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message +WHERE task_id = $1 +ORDER BY seq ASC +` + +func (q *Queries) ListTaskMessages(ctx context.Context, taskID pgtype.UUID) ([]TaskMessage, error) { + rows, err := q.db.Query(ctx, listTaskMessages, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []TaskMessage{} + for rows.Next() { + var i TaskMessage + if err := rows.Scan( + &i.ID, + &i.TaskID, + &i.Seq, + &i.Type, + &i.Tool, + &i.Content, + &i.Input, + &i.Output, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listTaskMessagesSince = `-- name: ListTaskMessagesSince :many +SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message +WHERE task_id = $1 AND seq > $2 +ORDER BY seq ASC +` + +type ListTaskMessagesSinceParams struct { + TaskID pgtype.UUID `json:"task_id"` + Seq int32 `json:"seq"` +} + +func (q *Queries) ListTaskMessagesSince(ctx context.Context, arg ListTaskMessagesSinceParams) ([]TaskMessage, error) { + rows, err := q.db.Query(ctx, listTaskMessagesSince, arg.TaskID, arg.Seq) + if err != nil { + return nil, err + } + defer rows.Close() + items := []TaskMessage{} + for rows.Next() { + var i TaskMessage + if err := rows.Scan( + &i.ID, + &i.TaskID, + &i.Seq, + &i.Type, + &i.Tool, + &i.Content, + &i.Input, + &i.Output, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 27816de2..dce917ba 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -129,6 +129,11 @@ SELECT * FROM agent_task_queue WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC; +-- name: ListActiveTasksByIssue :many +SELECT * FROM agent_task_queue +WHERE issue_id = $1 AND status IN ('dispatched', 'running') +ORDER BY created_at DESC; + -- name: UpdateAgentStatus :one UPDATE agent SET status = $2, updated_at = now() WHERE id = $1 diff --git a/server/pkg/db/queries/task_message.sql b/server/pkg/db/queries/task_message.sql new file mode 100644 index 00000000..3bbd3ee4 --- /dev/null +++ b/server/pkg/db/queries/task_message.sql @@ -0,0 +1,18 @@ +-- name: CreateTaskMessage :one +INSERT INTO task_message (task_id, seq, type, tool, content, input, output) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING *; + +-- name: ListTaskMessages :many +SELECT * FROM task_message +WHERE task_id = $1 +ORDER BY seq ASC; + +-- name: ListTaskMessagesSince :many +SELECT * FROM task_message +WHERE task_id = $1 AND seq > $2 +ORDER BY seq ASC; + +-- name: DeleteTaskMessages :exec +DELETE FROM task_message +WHERE task_id = $1; diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index 3edd5bdb..25a5f602 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -22,6 +22,7 @@ const ( EventTaskProgress = "task:progress" EventTaskCompleted = "task:completed" EventTaskFailed = "task:failed" + EventTaskMessage = "task:message" // Inbox events EventInboxNew = "inbox:new" diff --git a/server/pkg/protocol/messages.go b/server/pkg/protocol/messages.go index ce79ffa6..3733a397 100644 --- a/server/pkg/protocol/messages.go +++ b/server/pkg/protocol/messages.go @@ -31,6 +31,18 @@ type TaskCompletedPayload struct { Output string `json:"output,omitempty"` } +// TaskMessagePayload represents a single agent execution message (tool call, text, etc.) +type TaskMessagePayload struct { + TaskID string `json:"task_id"` + IssueID string `json:"issue_id,omitempty"` + Seq int `json:"seq"` + Type string `json:"type"` // "text", "tool_use", "tool_result", "error" + Tool string `json:"tool,omitempty"` // tool name for tool_use/tool_result + Content string `json:"content,omitempty"` // text content + Input map[string]any `json:"input,omitempty"` // tool input (tool_use only) + Output string `json:"output,omitempty"` // tool output (tool_result only) +} + // DaemonRegisterPayload is sent from daemon to server on connection. type DaemonRegisterPayload struct { DaemonID string `json:"daemon_id"`