diff --git a/apps/web/components/common/rich-text-editor.css b/apps/web/components/common/rich-text-editor.css
index 599a9555..47c3c416 100644
--- a/apps/web/components/common/rich-text-editor.css
+++ b/apps/web/components/common/rich-text-editor.css
@@ -94,6 +94,7 @@
/* Code blocks */
.rich-text-editor pre {
+ font-family: var(--font-mono, ui-monospace, monospace);
background: var(--muted);
border-radius: var(--radius);
padding: 0.75rem 1rem;
diff --git a/apps/web/components/markdown/CodeBlock.tsx b/apps/web/components/markdown/CodeBlock.tsx
index 6bdd4d2c..ef69afb4 100644
--- a/apps/web/components/markdown/CodeBlock.tsx
+++ b/apps/web/components/markdown/CodeBlock.tsx
@@ -141,7 +141,7 @@ export function CodeBlock({
if (mode === 'terminal') {
return (
{isLoading || !highlighted ? (
- {code}
+ {code}
) : (
)}
diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx
new file mode 100644
index 00000000..6d1de879
--- /dev/null
+++ b/apps/web/features/issues/components/agent-live-card.tsx
@@ -0,0 +1,507 @@
+"use client";
+
+import { useState, useEffect, useCallback, useRef } from "react";
+import { Bot, ChevronRight, Loader2, ArrowDown, Brain, AlertCircle, Clock, CheckCircle2, XCircle } from "lucide-react";
+import { api } from "@/shared/api";
+import { useWSEvent } from "@/features/realtime";
+import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from "@/shared/types/events";
+import type { AgentTask } from "@/shared/types/agent";
+import { cn } from "@/lib/utils";
+import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible";
+import { redactSecrets } from "../utils/redact";
+
+// ─── Shared types & helpers ─────────────────────────────────────────────────
+
+/** A unified timeline entry: tool calls, thinking, text, and errors in chronological order. */
+interface TimelineItem {
+ seq: number;
+ type: "tool_use" | "tool_result" | "thinking" | "text" | "error";
+ tool?: string;
+ content?: string;
+ input?: Record
;
+ output?: string;
+}
+
+function formatElapsed(startedAt: string): string {
+ const elapsed = Date.now() - new Date(startedAt).getTime();
+ const seconds = Math.floor(elapsed / 1000);
+ if (seconds < 60) return `${seconds}s`;
+ const minutes = Math.floor(seconds / 60);
+ const secs = seconds % 60;
+ return `${minutes}m ${secs}s`;
+}
+
+function formatDuration(start: string, end: string): string {
+ const ms = new Date(end).getTime() - new Date(start).getTime();
+ const seconds = Math.floor(ms / 1000);
+ if (seconds < 60) return `${seconds}s`;
+ const minutes = Math.floor(seconds / 60);
+ const secs = seconds % 60;
+ return `${minutes}m ${secs}s`;
+}
+
+function shortenPath(p: string): string {
+ const parts = p.split("/");
+ if (parts.length <= 3) return p;
+ return ".../" + parts.slice(-2).join("/");
+}
+
+function getToolSummary(item: TimelineItem): string {
+ if (!item.input) return "";
+ const inp = item.input as Record;
+
+ // WebSearch / web search
+ if (inp.query) return inp.query;
+ // File operations
+ if (inp.file_path) return shortenPath(inp.file_path);
+ if (inp.path) return shortenPath(inp.path);
+ if (inp.pattern) return inp.pattern;
+ // Bash
+ if (inp.description) return String(inp.description);
+ if (inp.command) {
+ const cmd = String(inp.command);
+ return cmd.length > 100 ? cmd.slice(0, 100) + "..." : cmd;
+ }
+ // Agent
+ if (inp.prompt) {
+ const p = String(inp.prompt);
+ return p.length > 100 ? p.slice(0, 100) + "..." : p;
+ }
+ // Skill
+ if (inp.skill) return String(inp.skill);
+ // Fallback: show first string value
+ for (const v of Object.values(inp)) {
+ if (typeof v === "string" && v.length > 0 && v.length < 120) return v;
+ }
+ return "";
+}
+
+/** Build a chronologically ordered timeline from raw messages. */
+function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] {
+ const items: TimelineItem[] = [];
+ for (const msg of msgs) {
+ items.push({
+ seq: msg.seq,
+ type: msg.type,
+ tool: msg.tool,
+ content: msg.content ? redactSecrets(msg.content) : msg.content,
+ input: msg.input,
+ output: msg.output ? redactSecrets(msg.output) : msg.output,
+ });
+ }
+ return items.sort((a, b) => a.seq - b.seq);
+}
+
+// ─── AgentLiveCard (real-time view) ────────────────────────────────────────
+
+interface AgentLiveCardProps {
+ issueId: string;
+ assigneeType: string | null;
+ assigneeId: string | null;
+ agentName?: string;
+}
+
+export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: AgentLiveCardProps) {
+ const [activeTask, setActiveTask] = useState(null);
+ const [items, setItems] = useState([]);
+ const [elapsed, setElapsed] = useState("");
+ const [autoScroll, setAutoScroll] = useState(true);
+ const scrollRef = useRef(null);
+ const seenSeqs = useRef(new Set());
+
+ // Check for active task on mount
+ useEffect(() => {
+ if (assigneeType !== "agent" || !assigneeId) {
+ setActiveTask(null);
+ return;
+ }
+
+ let cancelled = false;
+ api.getActiveTaskForIssue(issueId).then(({ task }) => {
+ if (!cancelled) {
+ setActiveTask(task);
+ if (task) {
+ api.listTaskMessages(task.id).then((msgs) => {
+ if (!cancelled) {
+ const timeline = buildTimeline(msgs);
+ setItems(timeline);
+ for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
+ }
+ }).catch(() => {});
+ }
+ }
+ }).catch(() => {});
+
+ return () => { cancelled = true; };
+ }, [issueId, assigneeType, assigneeId]);
+
+ // Handle real-time task messages
+ useWSEvent(
+ "task:message",
+ useCallback((payload: unknown) => {
+ const msg = payload as TaskMessagePayload;
+ if (msg.issue_id !== issueId) return;
+ const key = `${msg.task_id}:${msg.seq}`;
+ if (seenSeqs.current.has(key)) return;
+ seenSeqs.current.add(key);
+
+ setItems((prev) => {
+ const item: TimelineItem = {
+ seq: msg.seq,
+ type: msg.type,
+ tool: msg.tool,
+ content: msg.content,
+ input: msg.input,
+ output: msg.output,
+ };
+ const next = [...prev, item];
+ next.sort((a, b) => a.seq - b.seq);
+ return next;
+ });
+ }, [issueId]),
+ );
+
+ // Handle task completion/failure
+ useWSEvent(
+ "task:completed",
+ useCallback((payload: unknown) => {
+ const p = payload as TaskCompletedPayload;
+ if (p.issue_id !== issueId) return;
+ setActiveTask(null);
+ setItems([]);
+ seenSeqs.current.clear();
+ }, [issueId]),
+ );
+
+ useWSEvent(
+ "task:failed",
+ useCallback((payload: unknown) => {
+ const p = payload as TaskFailedPayload;
+ if (p.issue_id !== issueId) return;
+ setActiveTask(null);
+ setItems([]);
+ seenSeqs.current.clear();
+ }, [issueId]),
+ );
+
+ // Pick up new tasks
+ useWSEvent(
+ "task:dispatch",
+ useCallback(() => {
+ api.getActiveTaskForIssue(issueId).then(({ task }) => {
+ if (task) {
+ setActiveTask(task);
+ setItems([]);
+ seenSeqs.current.clear();
+ }
+ }).catch(() => {});
+ }, [issueId]),
+ );
+
+ // Elapsed time
+ useEffect(() => {
+ if (!activeTask?.started_at && !activeTask?.dispatched_at) return;
+ const ref = activeTask.started_at ?? activeTask.dispatched_at!;
+ setElapsed(formatElapsed(ref));
+ const interval = setInterval(() => setElapsed(formatElapsed(ref)), 1000);
+ return () => clearInterval(interval);
+ }, [activeTask?.started_at, activeTask?.dispatched_at]);
+
+ // Auto-scroll
+ useEffect(() => {
+ if (autoScroll && scrollRef.current) {
+ scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
+ }
+ }, [items, autoScroll]);
+
+ const handleScroll = useCallback(() => {
+ if (!scrollRef.current) return;
+ const { scrollTop, scrollHeight, clientHeight } = scrollRef.current;
+ setAutoScroll(scrollHeight - scrollTop - clientHeight < 40);
+ }, []);
+
+ if (!activeTask) return null;
+
+ const toolCount = items.filter((i) => i.type === "tool_use").length;
+
+ return (
+
+ {/* Header */}
+
+
+
+
+
+
+ {agentName ?? "Agent"} is working
+
+
{elapsed}
+ {toolCount > 0 && (
+
+ {toolCount} tool {toolCount === 1 ? "call" : "calls"}
+
+ )}
+
+
+ {/* Timeline content */}
+ {items.length > 0 && (
+
+ {items.map((item, idx) => (
+
+ ))}
+
+ {!autoScroll && (
+
+ )}
+
+ )}
+
+ );
+}
+
+// ─── TaskRunHistory (past execution logs) ──────────────────────────────────
+
+interface TaskRunHistoryProps {
+ issueId: string;
+ assigneeType: string | null;
+}
+
+export function TaskRunHistory({ issueId, assigneeType }: TaskRunHistoryProps) {
+ const [tasks, setTasks] = useState([]);
+ const [open, setOpen] = useState(false);
+
+ useEffect(() => {
+ if (assigneeType !== "agent") return;
+ api.listTasksByIssue(issueId).then(setTasks).catch(() => {});
+ }, [issueId, assigneeType]);
+
+ // Refresh when a task completes
+ useWSEvent(
+ "task:completed",
+ useCallback((payload: unknown) => {
+ const p = payload as TaskCompletedPayload;
+ if (p.issue_id !== issueId) return;
+ api.listTasksByIssue(issueId).then(setTasks).catch(() => {});
+ }, [issueId]),
+ );
+
+ useWSEvent(
+ "task:failed",
+ useCallback((payload: unknown) => {
+ const p = payload as TaskFailedPayload;
+ if (p.issue_id !== issueId) return;
+ api.listTasksByIssue(issueId).then(setTasks).catch(() => {});
+ }, [issueId]),
+ );
+
+ const completedTasks = tasks.filter((t) => t.status === "completed" || t.status === "failed");
+ if (completedTasks.length === 0) return null;
+
+ return (
+
+
+
+
+ Execution history ({completedTasks.length})
+
+
+
+ {completedTasks.map((task) => (
+
+ ))}
+
+
+
+ );
+}
+
+function TaskRunEntry({ task }: { task: AgentTask }) {
+ const [open, setOpen] = useState(false);
+ const [items, setItems] = useState(null);
+
+ const loadMessages = useCallback(() => {
+ if (items !== null) return; // already loaded
+ api.listTaskMessages(task.id).then((msgs) => {
+ setItems(buildTimeline(msgs));
+ }).catch(() => setItems([]));
+ }, [task.id, items]);
+
+ useEffect(() => {
+ if (open) loadMessages();
+ }, [open, loadMessages]);
+
+ const duration = task.started_at && task.completed_at
+ ? formatDuration(task.started_at, task.completed_at)
+ : null;
+
+ return (
+
+
+
+ {task.status === "completed" ? (
+
+ ) : (
+
+ )}
+
+ {new Date(task.created_at).toLocaleString(undefined, { month: "short", day: "numeric", hour: "2-digit", minute: "2-digit" })}
+
+ {duration && {duration}}
+
+ {task.status}
+
+
+
+
+ {items === null ? (
+
+
+ Loading...
+
+ ) : items.length === 0 ? (
+
No execution data recorded.
+ ) : (
+ items.map((item, idx) => (
+
+ ))
+ )}
+
+
+
+ );
+}
+
+// ─── Shared timeline row rendering ──────────────────────────────────────────
+
+function TimelineRow({ item }: { item: TimelineItem }) {
+ switch (item.type) {
+ case "tool_use":
+ return ;
+ case "tool_result":
+ return ;
+ case "thinking":
+ return ;
+ case "text":
+ return ;
+ case "error":
+ return ;
+ default:
+ return null;
+ }
+}
+
+function ToolCallRow({ item }: { item: TimelineItem }) {
+ const [open, setOpen] = useState(false);
+ const summary = getToolSummary(item);
+ const hasInput = item.input && Object.keys(item.input).length > 0;
+
+ return (
+
+
+
+ {item.tool}
+ {summary && {summary}}
+
+ {hasInput && (
+
+
+ {redactSecrets(JSON.stringify(item.input, null, 2))}
+
+
+ )}
+
+ );
+}
+
+function ToolResultRow({ item }: { item: TimelineItem }) {
+ const [open, setOpen] = useState(false);
+ const output = item.output ?? "";
+ if (!output) return null;
+
+ const preview = output.length > 120 ? output.slice(0, 120) + "..." : output;
+
+ return (
+
+
+
+
+ {item.tool ? `${item.tool} result: ` : "result: "}{preview}
+
+
+
+
+ {output.length > 4000 ? output.slice(0, 4000) + "\n... (truncated)" : output}
+
+
+
+ );
+}
+
+function ThinkingRow({ item }: { item: TimelineItem }) {
+ const [open, setOpen] = useState(false);
+ const text = item.content ?? "";
+ if (!text) return null;
+
+ const preview = text.length > 150 ? text.slice(0, 150) + "..." : text;
+
+ return (
+
+
+
+ {preview}
+
+
+
+ {text}
+
+
+
+ );
+}
+
+function TextRow({ item }: { item: TimelineItem }) {
+ const text = item.content ?? "";
+ if (!text.trim()) return null;
+ const lines = text.trim().split("\n").filter(Boolean);
+ const last = lines[lines.length - 1] ?? "";
+ if (!last) return null;
+
+ return (
+
+
+ {last}
+
+ );
+}
+
+function ErrorRow({ item }: { item: TimelineItem }) {
+ return (
+
+ );
+}
diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx
index 461962c2..48a72778 100644
--- a/apps/web/features/issues/components/issue-detail.tsx
+++ b/apps/web/features/issues/components/issue-detail.tsx
@@ -60,6 +60,7 @@ import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/
import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components";
import { CommentCard } from "./comment-card";
import { CommentInput } from "./comment-input";
+import { AgentLiveCard, TaskRunHistory } from "./agent-live-card";
import { api } from "@/shared/api";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore, useActorName } from "@/features/workspace";
@@ -1019,6 +1020,21 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
+ {/* Agent live output */}
+
{(() => {
diff --git a/apps/web/features/issues/utils/redact.test.ts b/apps/web/features/issues/utils/redact.test.ts
new file mode 100644
index 00000000..9aee9f03
--- /dev/null
+++ b/apps/web/features/issues/utils/redact.test.ts
@@ -0,0 +1,88 @@
+import { describe, it, expect } from "vitest";
+import { redactSecrets } from "./redact";
+
+describe("redactSecrets", () => {
+ it("redacts AWS access key", () => {
+ const result = redactSecrets("key: AKIAIOSFODNN7EXAMPLE");
+ expect(result).not.toContain("AKIAIOSFODNN7EXAMPLE");
+ expect(result).toContain("[REDACTED AWS KEY]");
+ });
+
+ it("redacts AWS secret key", () => {
+ const result = redactSecrets("aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+ expect(result).not.toContain("wJalrXUtnFEMI");
+ });
+
+ it("redacts PEM private keys", () => {
+ const input = "-----BEGIN RSA PRIVATE KEY-----\nMIIEow...\n-----END RSA PRIVATE KEY-----";
+ const result = redactSecrets(input);
+ expect(result).not.toContain("MIIEow");
+ expect(result).toContain("[REDACTED PRIVATE KEY]");
+ });
+
+ it("redacts GitHub tokens", () => {
+ const result = redactSecrets("GITHUB_TOKEN=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn");
+ expect(result).not.toContain("ghp_");
+ });
+
+ it("redacts GitLab tokens", () => {
+ const result = redactSecrets("glpat-AbCdEfGhIjKlMnOpQrStUvWx");
+ expect(result).not.toContain("glpat-");
+ expect(result).toContain("[REDACTED GITLAB TOKEN]");
+ });
+
+ it("redacts OpenAI/Anthropic API keys", () => {
+ const result = redactSecrets("sk-proj-abc123def456ghi789jkl012mno345");
+ expect(result).not.toContain("sk-proj");
+ expect(result).toContain("[REDACTED API KEY]");
+ });
+
+ it("redacts Slack tokens", () => {
+ const result = redactSecrets("xoxb-123456789012-1234567890123-AbCdEfGhIjKl");
+ expect(result).not.toContain("xoxb-");
+ });
+
+ it("redacts JWT tokens", () => {
+ const result = redactSecrets("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c");
+ expect(result).not.toContain("eyJhbGci");
+ expect(result).toContain("[REDACTED JWT]");
+ });
+
+ it("redacts Bearer tokens", () => {
+ const result = redactSecrets("Authorization: Bearer abc123xyz.def456");
+ expect(result).toContain("Bearer [REDACTED]");
+ expect(result).not.toContain("abc123xyz");
+ });
+
+ it("redacts connection strings", () => {
+ const result = redactSecrets("postgres://admin:s3cret@db.example.com:5432/mydb");
+ expect(result).not.toContain("s3cret");
+ });
+
+ it("redacts generic credential env vars", () => {
+ for (const key of ["PASSWORD", "SECRET", "TOKEN", "DATABASE_URL", "API_KEY"]) {
+ const result = redactSecrets(`${key}=supersecretvalue123`);
+ expect(result).toContain("[REDACTED CREDENTIAL]");
+ expect(result).not.toContain("supersecretvalue123");
+ }
+ });
+
+ it("redacts multiple secrets in one string", () => {
+ const result = redactSecrets("AKIAIOSFODNN7EXAMPLE and ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn");
+ expect(result).not.toContain("AKIAIOSFODNN7EXAMPLE");
+ expect(result).not.toContain("ghp_");
+ });
+
+ it("does not alter normal text", () => {
+ const inputs = [
+ "This is a normal commit message about fixing a bug",
+ "The function returns skip-navigation as the class name",
+ "Created PR #42 for the authentication feature",
+ "Running tests in /tmp/test-workspace/project",
+ "The API endpoint /api/issues/123 was updated",
+ ];
+ for (const input of inputs) {
+ expect(redactSecrets(input)).toBe(input);
+ }
+ });
+});
diff --git a/apps/web/features/issues/utils/redact.ts b/apps/web/features/issues/utils/redact.ts
new file mode 100644
index 00000000..066c8fa6
--- /dev/null
+++ b/apps/web/features/issues/utils/redact.ts
@@ -0,0 +1,37 @@
+/**
+ * Client-side fallback for redacting sensitive information in agent output.
+ * The server performs primary redaction; this is a safety net for display.
+ */
+
+const patterns: { re: RegExp; replacement: string }[] = [
+ // AWS access key IDs
+ { re: /\bAKIA[0-9A-Z]{16}\b/g, replacement: "[REDACTED AWS KEY]" },
+ // AWS secret access keys
+ { re: /(?:aws_secret_access_key|secret_?access_?key)\s*[=:]\s*[A-Za-z0-9/+=]{40}/gi, replacement: "[REDACTED AWS SECRET]" },
+ // PEM private keys
+ { re: /-----BEGIN[A-Z\s]*PRIVATE KEY-----[\s\S]*?-----END[A-Z\s]*PRIVATE KEY-----/g, replacement: "[REDACTED PRIVATE KEY]" },
+ // GitHub tokens
+ { re: /\b(?:ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9_]{36,255}\b/g, replacement: "[REDACTED GITHUB TOKEN]" },
+ // GitLab personal access tokens
+ { re: /\bglpat-[A-Za-z0-9_-]{20,}\b/g, replacement: "[REDACTED GITLAB TOKEN]" },
+ // OpenAI / Anthropic API keys
+ { re: /\bsk-[A-Za-z0-9_-]{20,}\b/g, replacement: "[REDACTED API KEY]" },
+ // Slack tokens
+ { re: /\bxox[bporas]-[A-Za-z0-9-]{10,}\b/g, replacement: "[REDACTED SLACK TOKEN]" },
+ // JWT tokens
+ { re: /\bey[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g, replacement: "[REDACTED JWT]" },
+ // Bearer tokens
+ { re: /\bBearer\s+[A-Za-z0-9\-._~+/]+=*/gi, replacement: "Bearer [REDACTED]" },
+ // Connection strings with embedded passwords
+ { re: /(?:postgres|mysql|mongodb|redis|amqp)(?:ql)?:\/\/[^:\s]+:[^@\s]+@/gi, replacement: "[REDACTED CONNECTION STRING]@" },
+ // Generic key=value secret env vars
+ { re: /(?:API_KEY|API_SECRET|SECRET_KEY|SECRET|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|DB_URL|REDIS_URL|PASSWORD|TOKEN)\s*[=:]\s*\S+/gi, replacement: "[REDACTED CREDENTIAL]" },
+];
+
+export function redactSecrets(text: string): string {
+ let result = text;
+ for (const { re, replacement } of patterns) {
+ result = result.replace(re, replacement);
+ }
+ return result;
+}
diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts
index b890a19f..d2d2fcb3 100644
--- a/apps/web/shared/api/client.ts
+++ b/apps/web/shared/api/client.ts
@@ -34,6 +34,7 @@ import type {
RuntimeHourlyActivity,
RuntimePing,
TimelineEntry,
+ TaskMessagePayload,
} from "@/shared/types";
import { type Logger, noopLogger } from "@/shared/logger";
@@ -339,6 +340,18 @@ export class ApiClient {
return this.fetch(`/api/agents/${agentId}/tasks`);
}
+ async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> {
+ return this.fetch(`/api/issues/${issueId}/active-task`);
+ }
+
+ async listTaskMessages(taskId: string): Promise
{
+ return this.fetch(`/api/daemon/tasks/${taskId}/messages`);
+ }
+
+ async listTasksByIssue(issueId: string): Promise {
+ return this.fetch(`/api/issues/${issueId}/task-runs`);
+ }
+
async getDaemonPairingSession(token: string): Promise {
return this.fetch(`/api/daemon/pairing-sessions/${token}`);
}
diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts
index c9fb5a59..aac9f0a9 100644
--- a/apps/web/shared/types/events.ts
+++ b/apps/web/shared/types/events.ts
@@ -20,6 +20,7 @@ export type WSEventType =
| "task:progress"
| "task:completed"
| "task:failed"
+ | "task:message"
| "inbox:new"
| "inbox:read"
| "inbox:archived"
@@ -152,6 +153,31 @@ export interface ActivityCreatedPayload {
entry: TimelineEntry;
}
+export interface TaskMessagePayload {
+ task_id: string;
+ issue_id: string;
+ seq: number;
+ type: "text" | "thinking" | "tool_use" | "tool_result" | "error";
+ tool?: string;
+ content?: string;
+ input?: Record;
+ output?: string;
+}
+
+export interface TaskCompletedPayload {
+ task_id: string;
+ agent_id: string;
+ issue_id: string;
+ status: string;
+}
+
+export interface TaskFailedPayload {
+ task_id: string;
+ agent_id: string;
+ issue_id: string;
+ status: string;
+}
+
export interface ReactionAddedPayload {
reaction: Reaction;
issue_id: string;
diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go
index e7cfaf23..2de70b1e 100644
--- a/server/cmd/server/router.go
+++ b/server/cmd/server/router.go
@@ -99,6 +99,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress)
r.Post("/tasks/{taskId}/complete", h.CompleteTask)
r.Post("/tasks/{taskId}/fail", h.FailTask)
+ r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages)
+ r.Get("/tasks/{taskId}/messages", h.ListTaskMessages)
})
// Protected API routes
@@ -164,6 +166,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Get("/subscribers", h.ListIssueSubscribers)
r.Post("/subscribe", h.SubscribeToIssue)
r.Post("/unsubscribe", h.UnsubscribeFromIssue)
+ r.Get("/active-task", h.GetActiveTaskForIssue)
+ r.Get("/task-runs", h.ListTasksByIssue)
r.Post("/reactions", h.AddIssueReaction)
r.Delete("/reactions", h.RemoveIssueReaction)
})
diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go
index 1c34afb2..c3bdae90 100644
--- a/server/internal/daemon/client.go
+++ b/server/internal/daemon/client.go
@@ -83,6 +83,22 @@ func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, ste
}, nil)
}
+// TaskMessageData represents a single agent execution message for batch reporting.
+type TaskMessageData struct {
+ Seq int `json:"seq"`
+ Type string `json:"type"`
+ Tool string `json:"tool,omitempty"`
+ Content string `json:"content,omitempty"`
+ Input map[string]any `json:"input,omitempty"`
+ Output string `json:"output,omitempty"`
+}
+
+func (c *Client) ReportTaskMessages(ctx context.Context, taskID string, messages []TaskMessageData) error {
+ return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/messages", taskID), map[string]any{
+ "messages": messages,
+ }, nil)
+}
+
func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error {
body := map[string]any{"output": output}
if branchName != "" {
diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go
index 6ee0e0f6..d9b66c58 100644
--- a/server/internal/daemon/daemon.go
+++ b/server/internal/daemon/daemon.go
@@ -821,22 +821,135 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
return TaskResult{}, err
}
- // Drain message channel — log tool uses and agent text for visibility.
+ // Drain message channel — forward to server for live output + log locally.
var toolCount atomic.Int32
go func() {
+ var seq atomic.Int32
+ var mu sync.Mutex
+ var pendingText strings.Builder
+ var pendingThinking strings.Builder
+ var batch []TaskMessageData
+ callIDToTool := map[string]string{} // track callID → tool name for tool_result
+
+ flush := func() {
+ mu.Lock()
+ // Flush any accumulated thinking as a single message.
+ if pendingThinking.Len() > 0 {
+ s := seq.Add(1)
+ batch = append(batch, TaskMessageData{
+ Seq: int(s),
+ Type: "thinking",
+ Content: pendingThinking.String(),
+ })
+ pendingThinking.Reset()
+ }
+ // Flush any accumulated text as a single message.
+ if pendingText.Len() > 0 {
+ s := seq.Add(1)
+ batch = append(batch, TaskMessageData{
+ Seq: int(s),
+ Type: "text",
+ Content: pendingText.String(),
+ })
+ pendingText.Reset()
+ }
+ toSend := batch
+ batch = nil
+ mu.Unlock()
+
+ if len(toSend) > 0 {
+ sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ if err := d.client.ReportTaskMessages(sendCtx, task.ID, toSend); err != nil {
+ taskLog.Debug("failed to report task messages", "error", err)
+ }
+ cancel()
+ }
+ }
+
+ // Periodically flush accumulated text/thinking messages.
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
+
+ done := make(chan struct{})
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ flush()
+ case <-done:
+ return
+ }
+ }
+ }()
+
for msg := range session.Messages {
switch msg.Type {
case agent.MessageToolUse:
n := toolCount.Add(1)
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
+ if msg.CallID != "" {
+ mu.Lock()
+ callIDToTool[msg.CallID] = msg.Tool
+ mu.Unlock()
+ }
+ s := seq.Add(1)
+ mu.Lock()
+ batch = append(batch, TaskMessageData{
+ Seq: int(s),
+ Type: "tool_use",
+ Tool: msg.Tool,
+ Input: msg.Input,
+ })
+ mu.Unlock()
+ case agent.MessageToolResult:
+ s := seq.Add(1)
+ output := msg.Output
+ if len(output) > 8192 {
+ output = output[:8192]
+ }
+ // Resolve tool name from callID if not set directly.
+ toolName := msg.Tool
+ if toolName == "" && msg.CallID != "" {
+ mu.Lock()
+ toolName = callIDToTool[msg.CallID]
+ mu.Unlock()
+ }
+ mu.Lock()
+ batch = append(batch, TaskMessageData{
+ Seq: int(s),
+ Type: "tool_result",
+ Tool: toolName,
+ Output: output,
+ })
+ mu.Unlock()
+ case agent.MessageThinking:
+ if msg.Content != "" {
+ mu.Lock()
+ pendingThinking.WriteString(msg.Content)
+ mu.Unlock()
+ }
case agent.MessageText:
if msg.Content != "" {
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
+ mu.Lock()
+ pendingText.WriteString(msg.Content)
+ mu.Unlock()
}
case agent.MessageError:
taskLog.Error("agent error", "content", msg.Content)
+ s := seq.Add(1)
+ mu.Lock()
+ batch = append(batch, TaskMessageData{
+ Seq: int(s),
+ Type: "error",
+ Content: msg.Content,
+ })
+ mu.Unlock()
}
}
+
+ close(done)
+ flush() // Final flush after channel closes.
}()
result := <-session.Result
diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go
index 30eb5d32..d91b9376 100644
--- a/server/internal/handler/daemon.go
+++ b/server/internal/handler/daemon.go
@@ -8,8 +8,10 @@ import (
"strings"
"github.com/go-chi/chi/v5"
+ "github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
+ "github.com/multica-ai/multica/server/pkg/redact"
)
// ---------------------------------------------------------------------------
@@ -385,3 +387,143 @@ func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) {
slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error)
writeJSON(w, http.StatusOK, taskToResponse(*task))
}
+
+// ---------------------------------------------------------------------------
+// Task Messages (live agent output)
+// ---------------------------------------------------------------------------
+
+type TaskMessageRequest struct {
+ Seq int `json:"seq"`
+ Type string `json:"type"`
+ Tool string `json:"tool,omitempty"`
+ Content string `json:"content,omitempty"`
+ Input map[string]any `json:"input,omitempty"`
+ Output string `json:"output,omitempty"`
+}
+
+type TaskMessageBatchRequest struct {
+ Messages []TaskMessageRequest `json:"messages"`
+}
+
+// ReportTaskMessages receives a batch of agent execution messages from the daemon.
+func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) {
+ taskID := chi.URLParam(r, "taskId")
+
+ var req TaskMessageBatchRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid request body")
+ return
+ }
+ if len(req.Messages) == 0 {
+ writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
+ return
+ }
+
+ task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
+ if err != nil {
+ writeError(w, http.StatusNotFound, "task not found")
+ return
+ }
+
+ workspaceID := ""
+ if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
+ workspaceID = uuidToString(issue.WorkspaceID)
+ }
+
+ for _, msg := range req.Messages {
+ // Redact sensitive information before persisting or broadcasting.
+ msg.Content = redact.Text(msg.Content)
+ msg.Output = redact.Text(msg.Output)
+ msg.Input = redact.InputMap(msg.Input)
+
+ var inputJSON []byte
+ if msg.Input != nil {
+ inputJSON, _ = json.Marshal(msg.Input)
+ }
+ h.Queries.CreateTaskMessage(r.Context(), db.CreateTaskMessageParams{
+ TaskID: parseUUID(taskID),
+ Seq: int32(msg.Seq),
+ Type: msg.Type,
+ Tool: pgtype.Text{String: msg.Tool, Valid: msg.Tool != ""},
+ Content: pgtype.Text{String: msg.Content, Valid: msg.Content != ""},
+ Input: inputJSON,
+ Output: pgtype.Text{String: msg.Output, Valid: msg.Output != ""},
+ })
+
+ if workspaceID != "" {
+ h.publish(protocol.EventTaskMessage, workspaceID, "system", "", protocol.TaskMessagePayload{
+ TaskID: taskID,
+ IssueID: uuidToString(task.IssueID),
+ Seq: msg.Seq,
+ Type: msg.Type,
+ Tool: msg.Tool,
+ Content: msg.Content,
+ Input: msg.Input,
+ Output: msg.Output,
+ })
+ }
+ }
+
+ writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
+}
+
+// ListTaskMessages returns the persisted messages for a task (for catch-up after reconnect).
+func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
+ taskID := chi.URLParam(r, "taskId")
+
+ messages, err := h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID))
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, "failed to list task messages")
+ return
+ }
+
+ resp := make([]protocol.TaskMessagePayload, len(messages))
+ for i, m := range messages {
+ var input map[string]any
+ if m.Input != nil {
+ json.Unmarshal(m.Input, &input)
+ }
+ resp[i] = protocol.TaskMessagePayload{
+ TaskID: taskID,
+ Seq: int(m.Seq),
+ Type: m.Type,
+ Tool: m.Tool.String,
+ Content: m.Content.String,
+ Input: input,
+ Output: m.Output.String,
+ }
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+}
+
+// GetActiveTaskForIssue returns the currently running task for an issue, if any.
+func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) {
+ issueID := chi.URLParam(r, "id")
+
+ tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID))
+ if err != nil || len(tasks) == 0 {
+ writeJSON(w, http.StatusOK, map[string]any{"task": nil})
+ return
+ }
+
+ writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])})
+}
+
+// ListTasksByIssue returns all tasks (any status) for an issue — used for execution history.
+func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) {
+ issueID := chi.URLParam(r, "id")
+
+ tasks, err := h.Queries.ListTasksByIssue(r.Context(), parseUUID(issueID))
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, "failed to list tasks")
+ return
+ }
+
+ resp := make([]AgentTaskResponse, len(tasks))
+ for i, t := range tasks {
+ resp[i] = taskToResponse(t)
+ }
+
+ writeJSON(w, http.StatusOK, resp)
+}
diff --git a/server/migrations/026_task_messages.down.sql b/server/migrations/026_task_messages.down.sql
new file mode 100644
index 00000000..a4161d48
--- /dev/null
+++ b/server/migrations/026_task_messages.down.sql
@@ -0,0 +1 @@
+DROP TABLE IF EXISTS task_message;
diff --git a/server/migrations/026_task_messages.up.sql b/server/migrations/026_task_messages.up.sql
new file mode 100644
index 00000000..db0163d6
--- /dev/null
+++ b/server/migrations/026_task_messages.up.sql
@@ -0,0 +1,13 @@
+CREATE TABLE task_message (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ task_id UUID NOT NULL REFERENCES agent_task_queue(id) ON DELETE CASCADE,
+ seq INTEGER NOT NULL,
+ type TEXT NOT NULL,
+ tool TEXT,
+ content TEXT,
+ input JSONB,
+ output TEXT,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
+);
+
+CREATE INDEX idx_task_message_task_id_seq ON task_message(task_id, seq);
diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go
index d19887ed..d80a2641 100644
--- a/server/pkg/agent/agent.go
+++ b/server/pkg/agent/agent.go
@@ -42,6 +42,7 @@ type MessageType string
const (
MessageText MessageType = "text"
+ MessageThinking MessageType = "thinking"
MessageToolUse MessageType = "tool-use"
MessageToolResult MessageType = "tool-result"
MessageStatus MessageType = "status"
diff --git a/server/pkg/agent/claude.go b/server/pkg/agent/claude.go
index 610df39c..c1b78406 100644
--- a/server/pkg/agent/claude.go
+++ b/server/pkg/agent/claude.go
@@ -181,6 +181,10 @@ func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message,
output.WriteString(block.Text)
trySend(ch, Message{Type: MessageText, Content: block.Text})
}
+ case "thinking":
+ if block.Text != "" {
+ trySend(ch, Message{Type: MessageThinking, Content: block.Text})
+ }
case "tool_use":
var input map[string]any
if block.Input != nil {
diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go
index 569718dc..1de2793f 100644
--- a/server/pkg/db/generated/agent.sql.go
+++ b/server/pkg/db/generated/agent.sql.go
@@ -451,6 +451,48 @@ func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUI
return has_pending, err
}
+const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
+SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
+WHERE issue_id = $1 AND status IN ('dispatched', 'running')
+ORDER BY created_at DESC
+`
+
+func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
+ rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ items := []AgentTaskQueue{}
+ for rows.Next() {
+ var i AgentTaskQueue
+ if err := rows.Scan(
+ &i.ID,
+ &i.AgentID,
+ &i.IssueID,
+ &i.Status,
+ &i.Priority,
+ &i.DispatchedAt,
+ &i.StartedAt,
+ &i.CompletedAt,
+ &i.Result,
+ &i.Error,
+ &i.CreatedAt,
+ &i.Context,
+ &i.RuntimeID,
+ &i.SessionID,
+ &i.WorkDir,
+ ); err != nil {
+ return nil, err
+ }
+ items = append(items, i)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return items, nil
+}
+
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1
@@ -579,6 +621,48 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
return items, nil
}
+const listTasksByIssue = `-- name: ListTasksByIssue :many
+SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
+WHERE issue_id = $1
+ORDER BY created_at DESC
+`
+
+func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
+ rows, err := q.db.Query(ctx, listTasksByIssue, issueID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ items := []AgentTaskQueue{}
+ for rows.Next() {
+ var i AgentTaskQueue
+ if err := rows.Scan(
+ &i.ID,
+ &i.AgentID,
+ &i.IssueID,
+ &i.Status,
+ &i.Priority,
+ &i.DispatchedAt,
+ &i.StartedAt,
+ &i.CompletedAt,
+ &i.Result,
+ &i.Error,
+ &i.CreatedAt,
+ &i.Context,
+ &i.RuntimeID,
+ &i.SessionID,
+ &i.WorkDir,
+ ); err != nil {
+ return nil, err
+ }
+ items = append(items, i)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return items, nil
+}
+
const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()
diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go
index 2d0e386e..ecd575be 100644
--- a/server/pkg/db/generated/models.go
+++ b/server/pkg/db/generated/models.go
@@ -261,6 +261,18 @@ type SkillFile struct {
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
+type TaskMessage struct {
+ ID pgtype.UUID `json:"id"`
+ TaskID pgtype.UUID `json:"task_id"`
+ Seq int32 `json:"seq"`
+ Type string `json:"type"`
+ Tool pgtype.Text `json:"tool"`
+ Content pgtype.Text `json:"content"`
+ Input []byte `json:"input"`
+ Output pgtype.Text `json:"output"`
+ CreatedAt pgtype.Timestamptz `json:"created_at"`
+}
+
type User struct {
ID pgtype.UUID `json:"id"`
Name string `json:"name"`
diff --git a/server/pkg/db/generated/task_message.sql.go b/server/pkg/db/generated/task_message.sql.go
new file mode 100644
index 00000000..c1d09327
--- /dev/null
+++ b/server/pkg/db/generated/task_message.sql.go
@@ -0,0 +1,140 @@
+// Code generated by sqlc. DO NOT EDIT.
+// versions:
+// sqlc v1.30.0
+// source: task_message.sql
+
+package db
+
+import (
+ "context"
+
+ "github.com/jackc/pgx/v5/pgtype"
+)
+
+const createTaskMessage = `-- name: CreateTaskMessage :one
+INSERT INTO task_message (task_id, seq, type, tool, content, input, output)
+VALUES ($1, $2, $3, $4, $5, $6, $7)
+RETURNING id, task_id, seq, type, tool, content, input, output, created_at
+`
+
+type CreateTaskMessageParams struct {
+ TaskID pgtype.UUID `json:"task_id"`
+ Seq int32 `json:"seq"`
+ Type string `json:"type"`
+ Tool pgtype.Text `json:"tool"`
+ Content pgtype.Text `json:"content"`
+ Input []byte `json:"input"`
+ Output pgtype.Text `json:"output"`
+}
+
+func (q *Queries) CreateTaskMessage(ctx context.Context, arg CreateTaskMessageParams) (TaskMessage, error) {
+ row := q.db.QueryRow(ctx, createTaskMessage,
+ arg.TaskID,
+ arg.Seq,
+ arg.Type,
+ arg.Tool,
+ arg.Content,
+ arg.Input,
+ arg.Output,
+ )
+ var i TaskMessage
+ err := row.Scan(
+ &i.ID,
+ &i.TaskID,
+ &i.Seq,
+ &i.Type,
+ &i.Tool,
+ &i.Content,
+ &i.Input,
+ &i.Output,
+ &i.CreatedAt,
+ )
+ return i, err
+}
+
+const deleteTaskMessages = `-- name: DeleteTaskMessages :exec
+DELETE FROM task_message
+WHERE task_id = $1
+`
+
+func (q *Queries) DeleteTaskMessages(ctx context.Context, taskID pgtype.UUID) error {
+ _, err := q.db.Exec(ctx, deleteTaskMessages, taskID)
+ return err
+}
+
+const listTaskMessages = `-- name: ListTaskMessages :many
+SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message
+WHERE task_id = $1
+ORDER BY seq ASC
+`
+
+func (q *Queries) ListTaskMessages(ctx context.Context, taskID pgtype.UUID) ([]TaskMessage, error) {
+ rows, err := q.db.Query(ctx, listTaskMessages, taskID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ items := []TaskMessage{}
+ for rows.Next() {
+ var i TaskMessage
+ if err := rows.Scan(
+ &i.ID,
+ &i.TaskID,
+ &i.Seq,
+ &i.Type,
+ &i.Tool,
+ &i.Content,
+ &i.Input,
+ &i.Output,
+ &i.CreatedAt,
+ ); err != nil {
+ return nil, err
+ }
+ items = append(items, i)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return items, nil
+}
+
+const listTaskMessagesSince = `-- name: ListTaskMessagesSince :many
+SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message
+WHERE task_id = $1 AND seq > $2
+ORDER BY seq ASC
+`
+
+type ListTaskMessagesSinceParams struct {
+ TaskID pgtype.UUID `json:"task_id"`
+ Seq int32 `json:"seq"`
+}
+
+func (q *Queries) ListTaskMessagesSince(ctx context.Context, arg ListTaskMessagesSinceParams) ([]TaskMessage, error) {
+ rows, err := q.db.Query(ctx, listTaskMessagesSince, arg.TaskID, arg.Seq)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ items := []TaskMessage{}
+ for rows.Next() {
+ var i TaskMessage
+ if err := rows.Scan(
+ &i.ID,
+ &i.TaskID,
+ &i.Seq,
+ &i.Type,
+ &i.Tool,
+ &i.Content,
+ &i.Input,
+ &i.Output,
+ &i.CreatedAt,
+ ); err != nil {
+ return nil, err
+ }
+ items = append(items, i)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return items, nil
+}
diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql
index 27816de2..4540a7e9 100644
--- a/server/pkg/db/queries/agent.sql
+++ b/server/pkg/db/queries/agent.sql
@@ -129,6 +129,16 @@ SELECT * FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC;
+-- name: ListActiveTasksByIssue :many
+SELECT * FROM agent_task_queue
+WHERE issue_id = $1 AND status IN ('dispatched', 'running')
+ORDER BY created_at DESC;
+
+-- name: ListTasksByIssue :many
+SELECT * FROM agent_task_queue
+WHERE issue_id = $1
+ORDER BY created_at DESC;
+
-- name: UpdateAgentStatus :one
UPDATE agent SET status = $2, updated_at = now()
WHERE id = $1
diff --git a/server/pkg/db/queries/task_message.sql b/server/pkg/db/queries/task_message.sql
new file mode 100644
index 00000000..3bbd3ee4
--- /dev/null
+++ b/server/pkg/db/queries/task_message.sql
@@ -0,0 +1,18 @@
+-- name: CreateTaskMessage :one
+INSERT INTO task_message (task_id, seq, type, tool, content, input, output)
+VALUES ($1, $2, $3, $4, $5, $6, $7)
+RETURNING *;
+
+-- name: ListTaskMessages :many
+SELECT * FROM task_message
+WHERE task_id = $1
+ORDER BY seq ASC;
+
+-- name: ListTaskMessagesSince :many
+SELECT * FROM task_message
+WHERE task_id = $1 AND seq > $2
+ORDER BY seq ASC;
+
+-- name: DeleteTaskMessages :exec
+DELETE FROM task_message
+WHERE task_id = $1;
diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go
index bca1b598..6d9ec027 100644
--- a/server/pkg/protocol/events.go
+++ b/server/pkg/protocol/events.go
@@ -26,6 +26,7 @@ const (
EventTaskProgress = "task:progress"
EventTaskCompleted = "task:completed"
EventTaskFailed = "task:failed"
+ EventTaskMessage = "task:message"
// Inbox events
EventInboxNew = "inbox:new"
diff --git a/server/pkg/protocol/messages.go b/server/pkg/protocol/messages.go
index ce79ffa6..3733a397 100644
--- a/server/pkg/protocol/messages.go
+++ b/server/pkg/protocol/messages.go
@@ -31,6 +31,18 @@ type TaskCompletedPayload struct {
Output string `json:"output,omitempty"`
}
+// TaskMessagePayload represents a single agent execution message (tool call, text, etc.)
+type TaskMessagePayload struct {
+ TaskID string `json:"task_id"`
+ IssueID string `json:"issue_id,omitempty"`
+ Seq int `json:"seq"`
+ Type string `json:"type"` // "text", "tool_use", "tool_result", "error"
+ Tool string `json:"tool,omitempty"` // tool name for tool_use/tool_result
+ Content string `json:"content,omitempty"` // text content
+ Input map[string]any `json:"input,omitempty"` // tool input (tool_use only)
+ Output string `json:"output,omitempty"` // tool output (tool_result only)
+}
+
// DaemonRegisterPayload is sent from daemon to server on connection.
type DaemonRegisterPayload struct {
DaemonID string `json:"daemon_id"`
diff --git a/server/pkg/redact/redact.go b/server/pkg/redact/redact.go
index 878b88d8..6a64ca7b 100644
--- a/server/pkg/redact/redact.go
+++ b/server/pkg/redact/redact.go
@@ -35,11 +35,37 @@ var patterns = []secretPattern{
// Slack tokens
{regexp.MustCompile(`\bxox[bporas]-[A-Za-z0-9\-]{10,}\b`), "[REDACTED SLACK TOKEN]"},
+ // GitLab personal access tokens
+ {regexp.MustCompile(`\bglpat-[A-Za-z0-9_-]{20,}\b`), "[REDACTED GITLAB TOKEN]"},
+
+ // JWT tokens (three base64url segments)
+ {regexp.MustCompile(`\bey[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b`), "[REDACTED JWT]"},
+
// Generic "Bearer " in output
{regexp.MustCompile(`(?i)\bBearer\s+[A-Za-z0-9\-._~+/]+=*\b`), "Bearer [REDACTED]"},
+ // Connection strings with embedded passwords
+ {regexp.MustCompile(`(?i)(?:postgres|mysql|mongodb|redis|amqp)(?:ql)?://[^:\s]+:[^@\s]+@`), "[REDACTED CONNECTION STRING]@"},
+
// Generic key=value patterns for common secret env var names
- {regexp.MustCompile(`(?i)(?:API_KEY|API_SECRET|SECRET_KEY|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|REDIS_URL)\s*[=:]\s*\S+`), "[REDACTED CREDENTIAL]"},
+ {regexp.MustCompile(`(?i)(?:API_KEY|API_SECRET|SECRET_KEY|SECRET|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|DB_URL|REDIS_URL|PASSWORD|TOKEN)\s*[=:]\s*\S+`), "[REDACTED CREDENTIAL]"},
+}
+
+// InputMap returns a copy of m with all string values passed through Text.
+// Non-string values are preserved as-is.
+func InputMap(m map[string]any) map[string]any {
+ if m == nil {
+ return nil
+ }
+ out := make(map[string]any, len(m))
+ for k, v := range m {
+ if s, ok := v.(string); ok {
+ out[k] = Text(s)
+ } else {
+ out[k] = v
+ }
+ }
+ return out
}
// homeDir is resolved once at init for path redaction.
diff --git a/server/pkg/redact/redact_test.go b/server/pkg/redact/redact_test.go
index 13100624..3caab19f 100644
--- a/server/pkg/redact/redact_test.go
+++ b/server/pkg/redact/redact_test.go
@@ -126,6 +126,83 @@ func TestNoFalsePositivesOnNormalText(t *testing.T) {
}
}
+func TestRedactGitLabToken(t *testing.T) {
+ t.Parallel()
+ input := "GITLAB_TOKEN=glpat-AbCdEfGhIjKlMnOpQrStUvWx"
+ got := Text(input)
+ if strings.Contains(got, "glpat-") {
+ t.Fatalf("GitLab token not redacted: %s", got)
+ }
+}
+
+func TestRedactJWT(t *testing.T) {
+ t.Parallel()
+ input := "token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
+ got := Text(input)
+ if strings.Contains(got, "eyJhbGci") {
+ t.Fatalf("JWT not redacted: %s", got)
+ }
+}
+
+func TestRedactConnectionString(t *testing.T) {
+ t.Parallel()
+ input := "connecting to postgres://admin:s3cret@db.example.com:5432/mydb"
+ got := Text(input)
+ if strings.Contains(got, "s3cret") {
+ t.Fatalf("connection string password not redacted: %s", got)
+ }
+}
+
+func TestRedactPasswordEnvVar(t *testing.T) {
+ t.Parallel()
+ cases := []struct {
+ name string
+ input string
+ }{
+ {"PASSWORD", "PASSWORD=hunter2"},
+ {"SECRET", "SECRET=mysecretvalue"},
+ {"TOKEN", "TOKEN=abc123xyz"},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ got := Text(tc.input)
+ if !strings.Contains(got, "[REDACTED CREDENTIAL]") {
+ t.Fatalf("expected credential redaction for %s, got: %s", tc.name, got)
+ }
+ })
+ }
+}
+
+func TestInputMap(t *testing.T) {
+ t.Parallel()
+ m := map[string]any{
+ "command": "echo sk-proj-abc123def456ghi789jkl012mno345",
+ "file_path": "/tmp/test.txt",
+ "count": 42,
+ }
+ got := InputMap(m)
+ if s, ok := got["command"].(string); ok {
+ if strings.Contains(s, "sk-proj") {
+ t.Fatalf("API key in input map not redacted: %s", s)
+ }
+ }
+ // Non-string values preserved
+ if got["count"] != 42 {
+ t.Fatalf("non-string value altered: %v", got["count"])
+ }
+ // Clean strings unchanged
+ if got["file_path"] != "/tmp/test.txt" {
+ t.Fatalf("clean string altered: %v", got["file_path"])
+ }
+}
+
+func TestInputMapNil(t *testing.T) {
+ t.Parallel()
+ if got := InputMap(nil); got != nil {
+ t.Fatalf("expected nil, got: %v", got)
+ }
+}
+
func TestRedactMultipleSecrets(t *testing.T) {
t.Parallel()
input := "Keys: AKIAIOSFODNN7EXAMPLE and ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn"