Merge pull request #201 from multica-ai/forrestchang/agent-live-output

feat(agent): stream live agent output and execution history
This commit is contained in:
Jiayuan Zhang 2026-03-30 23:51:07 +08:00 committed by GitHub
commit 32e19f847f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1369 additions and 7 deletions

View file

@ -94,6 +94,7 @@
/* Code blocks */
.rich-text-editor pre {
font-family: var(--font-mono, ui-monospace, monospace);
background: var(--muted);
border-radius: var(--radius);
padding: 0.75rem 1rem;

View file

@ -141,7 +141,7 @@ export function CodeBlock({
if (mode === 'terminal') {
return (
<pre className={cn('font-mono text-sm whitespace-pre-wrap', className)}>
<code>{code}</code>
<code className="font-mono">{code}</code>
</pre>
)
}
@ -151,7 +151,7 @@ export function CodeBlock({
if (isLoading || !highlighted) {
return (
<pre className={cn('font-mono text-sm whitespace-pre-wrap', className)}>
<code>{code}</code>
<code className="font-mono">{code}</code>
</pre>
)
}
@ -159,7 +159,7 @@ export function CodeBlock({
return (
<div
className={cn(
'font-mono text-sm [&_pre]:!bg-transparent [&_pre]:!p-0 [&_pre]:whitespace-pre-wrap [&_pre]:break-all [&_code]:!bg-transparent',
'font-mono text-sm [&_pre]:!bg-transparent [&_pre]:!p-0 [&_pre]:whitespace-pre-wrap [&_pre]:break-all [&_code]:!bg-transparent [&_code]:font-mono [&_pre]:font-mono',
className
)}
dangerouslySetInnerHTML={{ __html: highlighted }}
@ -206,11 +206,11 @@ export function CodeBlock({
<div className="p-3 overflow-x-auto">
{isLoading || !highlighted ? (
<pre className="font-mono text-sm whitespace-pre-wrap break-all">
<code>{code}</code>
<code className="font-mono">{code}</code>
</pre>
) : (
<div
className="font-mono text-sm [&_pre]:!bg-transparent [&_pre]:!m-0 [&_pre]:!p-0 [&_pre]:whitespace-pre-wrap [&_pre]:break-all [&_code]:!bg-transparent"
className="font-mono text-sm [&_pre]:!bg-transparent [&_pre]:!m-0 [&_pre]:!p-0 [&_pre]:whitespace-pre-wrap [&_pre]:break-all [&_code]:!bg-transparent [&_code]:font-mono [&_pre]:font-mono"
dangerouslySetInnerHTML={{ __html: highlighted }}
/>
)}

View file

@ -0,0 +1,507 @@
"use client";
import { useState, useEffect, useCallback, useRef } from "react";
import { Bot, ChevronRight, Loader2, ArrowDown, Brain, AlertCircle, Clock, CheckCircle2, XCircle } from "lucide-react";
import { api } from "@/shared/api";
import { useWSEvent } from "@/features/realtime";
import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload } from "@/shared/types/events";
import type { AgentTask } from "@/shared/types/agent";
import { cn } from "@/lib/utils";
import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible";
import { redactSecrets } from "../utils/redact";
// ─── Shared types & helpers ─────────────────────────────────────────────────
/** A unified timeline entry: tool calls, thinking, text, and errors in chronological order. */
interface TimelineItem {
seq: number;
type: "tool_use" | "tool_result" | "thinking" | "text" | "error";
tool?: string;
content?: string;
input?: Record<string, unknown>;
output?: string;
}
function formatElapsed(startedAt: string): string {
const elapsed = Date.now() - new Date(startedAt).getTime();
const seconds = Math.floor(elapsed / 1000);
if (seconds < 60) return `${seconds}s`;
const minutes = Math.floor(seconds / 60);
const secs = seconds % 60;
return `${minutes}m ${secs}s`;
}
function formatDuration(start: string, end: string): string {
const ms = new Date(end).getTime() - new Date(start).getTime();
const seconds = Math.floor(ms / 1000);
if (seconds < 60) return `${seconds}s`;
const minutes = Math.floor(seconds / 60);
const secs = seconds % 60;
return `${minutes}m ${secs}s`;
}
function shortenPath(p: string): string {
const parts = p.split("/");
if (parts.length <= 3) return p;
return ".../" + parts.slice(-2).join("/");
}
function getToolSummary(item: TimelineItem): string {
if (!item.input) return "";
const inp = item.input as Record<string, string>;
// WebSearch / web search
if (inp.query) return inp.query;
// File operations
if (inp.file_path) return shortenPath(inp.file_path);
if (inp.path) return shortenPath(inp.path);
if (inp.pattern) return inp.pattern;
// Bash
if (inp.description) return String(inp.description);
if (inp.command) {
const cmd = String(inp.command);
return cmd.length > 100 ? cmd.slice(0, 100) + "..." : cmd;
}
// Agent
if (inp.prompt) {
const p = String(inp.prompt);
return p.length > 100 ? p.slice(0, 100) + "..." : p;
}
// Skill
if (inp.skill) return String(inp.skill);
// Fallback: show first string value
for (const v of Object.values(inp)) {
if (typeof v === "string" && v.length > 0 && v.length < 120) return v;
}
return "";
}
/** Build a chronologically ordered timeline from raw messages. */
function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] {
const items: TimelineItem[] = [];
for (const msg of msgs) {
items.push({
seq: msg.seq,
type: msg.type,
tool: msg.tool,
content: msg.content ? redactSecrets(msg.content) : msg.content,
input: msg.input,
output: msg.output ? redactSecrets(msg.output) : msg.output,
});
}
return items.sort((a, b) => a.seq - b.seq);
}
// ─── AgentLiveCard (real-time view) ────────────────────────────────────────
interface AgentLiveCardProps {
issueId: string;
assigneeType: string | null;
assigneeId: string | null;
agentName?: string;
}
export function AgentLiveCard({ issueId, assigneeType, assigneeId, agentName }: AgentLiveCardProps) {
const [activeTask, setActiveTask] = useState<AgentTask | null>(null);
const [items, setItems] = useState<TimelineItem[]>([]);
const [elapsed, setElapsed] = useState("");
const [autoScroll, setAutoScroll] = useState(true);
const scrollRef = useRef<HTMLDivElement>(null);
const seenSeqs = useRef(new Set<string>());
// Check for active task on mount
useEffect(() => {
if (assigneeType !== "agent" || !assigneeId) {
setActiveTask(null);
return;
}
let cancelled = false;
api.getActiveTaskForIssue(issueId).then(({ task }) => {
if (!cancelled) {
setActiveTask(task);
if (task) {
api.listTaskMessages(task.id).then((msgs) => {
if (!cancelled) {
const timeline = buildTimeline(msgs);
setItems(timeline);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
}
}).catch(() => {});
}
}
}).catch(() => {});
return () => { cancelled = true; };
}, [issueId, assigneeType, assigneeId]);
// Handle real-time task messages
useWSEvent(
"task:message",
useCallback((payload: unknown) => {
const msg = payload as TaskMessagePayload;
if (msg.issue_id !== issueId) return;
const key = `${msg.task_id}:${msg.seq}`;
if (seenSeqs.current.has(key)) return;
seenSeqs.current.add(key);
setItems((prev) => {
const item: TimelineItem = {
seq: msg.seq,
type: msg.type,
tool: msg.tool,
content: msg.content,
input: msg.input,
output: msg.output,
};
const next = [...prev, item];
next.sort((a, b) => a.seq - b.seq);
return next;
});
}, [issueId]),
);
// Handle task completion/failure
useWSEvent(
"task:completed",
useCallback((payload: unknown) => {
const p = payload as TaskCompletedPayload;
if (p.issue_id !== issueId) return;
setActiveTask(null);
setItems([]);
seenSeqs.current.clear();
}, [issueId]),
);
useWSEvent(
"task:failed",
useCallback((payload: unknown) => {
const p = payload as TaskFailedPayload;
if (p.issue_id !== issueId) return;
setActiveTask(null);
setItems([]);
seenSeqs.current.clear();
}, [issueId]),
);
// Pick up new tasks
useWSEvent(
"task:dispatch",
useCallback(() => {
api.getActiveTaskForIssue(issueId).then(({ task }) => {
if (task) {
setActiveTask(task);
setItems([]);
seenSeqs.current.clear();
}
}).catch(() => {});
}, [issueId]),
);
// Elapsed time
useEffect(() => {
if (!activeTask?.started_at && !activeTask?.dispatched_at) return;
const ref = activeTask.started_at ?? activeTask.dispatched_at!;
setElapsed(formatElapsed(ref));
const interval = setInterval(() => setElapsed(formatElapsed(ref)), 1000);
return () => clearInterval(interval);
}, [activeTask?.started_at, activeTask?.dispatched_at]);
// Auto-scroll
useEffect(() => {
if (autoScroll && scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
}
}, [items, autoScroll]);
const handleScroll = useCallback(() => {
if (!scrollRef.current) return;
const { scrollTop, scrollHeight, clientHeight } = scrollRef.current;
setAutoScroll(scrollHeight - scrollTop - clientHeight < 40);
}, []);
if (!activeTask) return null;
const toolCount = items.filter((i) => i.type === "tool_use").length;
return (
<div className="rounded-lg border border-info/20 bg-info/5">
{/* Header */}
<div className="flex items-center gap-2 px-3 py-2">
<div className="flex items-center justify-center h-5 w-5 rounded-full bg-info/10 text-info shrink-0">
<Bot className="h-3 w-3" />
</div>
<div className="flex items-center gap-1.5 text-xs font-medium min-w-0">
<Loader2 className="h-3 w-3 animate-spin text-info shrink-0" />
<span className="truncate">{agentName ?? "Agent"} is working</span>
</div>
<span className="ml-auto text-xs text-muted-foreground tabular-nums shrink-0">{elapsed}</span>
{toolCount > 0 && (
<span className="text-xs text-muted-foreground shrink-0">
{toolCount} tool {toolCount === 1 ? "call" : "calls"}
</span>
)}
</div>
{/* Timeline content */}
{items.length > 0 && (
<div
ref={scrollRef}
onScroll={handleScroll}
className="relative max-h-80 overflow-y-auto border-t border-info/10 px-3 py-2 space-y-0.5"
>
{items.map((item, idx) => (
<TimelineRow key={`${item.seq}-${idx}`} item={item} />
))}
{!autoScroll && (
<button
onClick={() => {
if (scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
setAutoScroll(true);
}
}}
className="sticky bottom-0 left-1/2 -translate-x-1/2 flex items-center gap-1 rounded-full bg-background border px-2 py-0.5 text-xs text-muted-foreground hover:text-foreground shadow-sm"
>
<ArrowDown className="h-3 w-3" />
Latest
</button>
)}
</div>
)}
</div>
);
}
// ─── TaskRunHistory (past execution logs) ──────────────────────────────────
interface TaskRunHistoryProps {
issueId: string;
assigneeType: string | null;
}
export function TaskRunHistory({ issueId, assigneeType }: TaskRunHistoryProps) {
const [tasks, setTasks] = useState<AgentTask[]>([]);
const [open, setOpen] = useState(false);
useEffect(() => {
if (assigneeType !== "agent") return;
api.listTasksByIssue(issueId).then(setTasks).catch(() => {});
}, [issueId, assigneeType]);
// Refresh when a task completes
useWSEvent(
"task:completed",
useCallback((payload: unknown) => {
const p = payload as TaskCompletedPayload;
if (p.issue_id !== issueId) return;
api.listTasksByIssue(issueId).then(setTasks).catch(() => {});
}, [issueId]),
);
useWSEvent(
"task:failed",
useCallback((payload: unknown) => {
const p = payload as TaskFailedPayload;
if (p.issue_id !== issueId) return;
api.listTasksByIssue(issueId).then(setTasks).catch(() => {});
}, [issueId]),
);
const completedTasks = tasks.filter((t) => t.status === "completed" || t.status === "failed");
if (completedTasks.length === 0) return null;
return (
<Collapsible open={open} onOpenChange={setOpen}>
<CollapsibleTrigger className="flex w-full items-center gap-1.5 text-xs text-muted-foreground hover:text-foreground transition-colors py-1">
<ChevronRight className={cn("h-3 w-3 transition-transform", open && "rotate-90")} />
<Clock className="h-3 w-3" />
<span>Execution history ({completedTasks.length})</span>
</CollapsibleTrigger>
<CollapsibleContent>
<div className="mt-1 space-y-2">
{completedTasks.map((task) => (
<TaskRunEntry key={task.id} task={task} />
))}
</div>
</CollapsibleContent>
</Collapsible>
);
}
function TaskRunEntry({ task }: { task: AgentTask }) {
const [open, setOpen] = useState(false);
const [items, setItems] = useState<TimelineItem[] | null>(null);
const loadMessages = useCallback(() => {
if (items !== null) return; // already loaded
api.listTaskMessages(task.id).then((msgs) => {
setItems(buildTimeline(msgs));
}).catch(() => setItems([]));
}, [task.id, items]);
useEffect(() => {
if (open) loadMessages();
}, [open, loadMessages]);
const duration = task.started_at && task.completed_at
? formatDuration(task.started_at, task.completed_at)
: null;
return (
<Collapsible open={open} onOpenChange={setOpen}>
<CollapsibleTrigger className="flex w-full items-center gap-2 rounded px-2 py-1.5 text-xs hover:bg-accent/30 transition-colors border border-transparent hover:border-border">
<ChevronRight className={cn("h-3 w-3 shrink-0 text-muted-foreground transition-transform", open && "rotate-90")} />
{task.status === "completed" ? (
<CheckCircle2 className="h-3.5 w-3.5 shrink-0 text-success" />
) : (
<XCircle className="h-3.5 w-3.5 shrink-0 text-destructive" />
)}
<span className="text-muted-foreground">
{new Date(task.created_at).toLocaleString(undefined, { month: "short", day: "numeric", hour: "2-digit", minute: "2-digit" })}
</span>
{duration && <span className="text-muted-foreground">{duration}</span>}
<span className={cn("ml-auto capitalize", task.status === "completed" ? "text-success" : "text-destructive")}>
{task.status}
</span>
</CollapsibleTrigger>
<CollapsibleContent>
<div className="ml-5 mt-1 max-h-64 overflow-y-auto rounded border bg-muted/30 px-3 py-2 space-y-0.5">
{items === null ? (
<div className="flex items-center gap-2 text-xs text-muted-foreground py-2">
<Loader2 className="h-3 w-3 animate-spin" />
Loading...
</div>
) : items.length === 0 ? (
<p className="text-xs text-muted-foreground py-2">No execution data recorded.</p>
) : (
items.map((item, idx) => (
<TimelineRow key={`${item.seq}-${idx}`} item={item} />
))
)}
</div>
</CollapsibleContent>
</Collapsible>
);
}
// ─── Shared timeline row rendering ──────────────────────────────────────────
function TimelineRow({ item }: { item: TimelineItem }) {
switch (item.type) {
case "tool_use":
return <ToolCallRow item={item} />;
case "tool_result":
return <ToolResultRow item={item} />;
case "thinking":
return <ThinkingRow item={item} />;
case "text":
return <TextRow item={item} />;
case "error":
return <ErrorRow item={item} />;
default:
return null;
}
}
function ToolCallRow({ item }: { item: TimelineItem }) {
const [open, setOpen] = useState(false);
const summary = getToolSummary(item);
const hasInput = item.input && Object.keys(item.input).length > 0;
return (
<Collapsible open={open} onOpenChange={setOpen}>
<CollapsibleTrigger className="flex w-full items-center gap-1.5 rounded px-1 -mx-1 py-0.5 text-xs hover:bg-accent/30 transition-colors">
<ChevronRight
className={cn(
"h-3 w-3 shrink-0 text-muted-foreground transition-transform",
open && "rotate-90",
!hasInput && "invisible",
)}
/>
<span className="font-medium text-foreground shrink-0">{item.tool}</span>
{summary && <span className="truncate text-muted-foreground">{summary}</span>}
</CollapsibleTrigger>
{hasInput && (
<CollapsibleContent>
<pre className="ml-[18px] mt-0.5 max-h-32 overflow-auto rounded bg-muted/50 p-2 text-[11px] text-muted-foreground whitespace-pre-wrap break-all">
{redactSecrets(JSON.stringify(item.input, null, 2))}
</pre>
</CollapsibleContent>
)}
</Collapsible>
);
}
function ToolResultRow({ item }: { item: TimelineItem }) {
const [open, setOpen] = useState(false);
const output = item.output ?? "";
if (!output) return null;
const preview = output.length > 120 ? output.slice(0, 120) + "..." : output;
return (
<Collapsible open={open} onOpenChange={setOpen}>
<CollapsibleTrigger className="flex w-full items-start gap-1.5 rounded px-1 -mx-1 py-0.5 text-xs hover:bg-accent/30 transition-colors">
<ChevronRight
className={cn("h-3 w-3 shrink-0 text-muted-foreground transition-transform mt-0.5", open && "rotate-90")}
/>
<span className="text-muted-foreground/70 truncate">
{item.tool ? `${item.tool} result: ` : "result: "}{preview}
</span>
</CollapsibleTrigger>
<CollapsibleContent>
<pre className="ml-[18px] mt-0.5 max-h-40 overflow-auto rounded bg-muted/50 p-2 text-[11px] text-muted-foreground whitespace-pre-wrap break-all">
{output.length > 4000 ? output.slice(0, 4000) + "\n... (truncated)" : output}
</pre>
</CollapsibleContent>
</Collapsible>
);
}
function ThinkingRow({ item }: { item: TimelineItem }) {
const [open, setOpen] = useState(false);
const text = item.content ?? "";
if (!text) return null;
const preview = text.length > 150 ? text.slice(0, 150) + "..." : text;
return (
<Collapsible open={open} onOpenChange={setOpen}>
<CollapsibleTrigger className="flex w-full items-start gap-1.5 rounded px-1 -mx-1 py-0.5 text-xs hover:bg-accent/30 transition-colors">
<Brain className="h-3 w-3 shrink-0 text-info/60 mt-0.5" />
<span className="text-muted-foreground italic truncate">{preview}</span>
</CollapsibleTrigger>
<CollapsibleContent>
<pre className="ml-[18px] mt-0.5 max-h-40 overflow-auto rounded bg-info/5 p-2 text-[11px] text-muted-foreground whitespace-pre-wrap break-words">
{text}
</pre>
</CollapsibleContent>
</Collapsible>
);
}
function TextRow({ item }: { item: TimelineItem }) {
const text = item.content ?? "";
if (!text.trim()) return null;
const lines = text.trim().split("\n").filter(Boolean);
const last = lines[lines.length - 1] ?? "";
if (!last) return null;
return (
<div className="flex items-start gap-1.5 px-1 -mx-1 py-0.5 text-xs">
<span className="h-3 w-3 shrink-0" />
<span className="text-muted-foreground/60 truncate">{last}</span>
</div>
);
}
function ErrorRow({ item }: { item: TimelineItem }) {
return (
<div className="flex items-start gap-1.5 px-1 -mx-1 py-0.5 text-xs">
<AlertCircle className="h-3 w-3 shrink-0 text-destructive mt-0.5" />
<span className="text-destructive">{item.content}</span>
</div>
);
}

View file

@ -60,6 +60,7 @@ import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/
import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components";
import { CommentCard } from "./comment-card";
import { CommentInput } from "./comment-input";
import { AgentLiveCard, TaskRunHistory } from "./agent-live-card";
import { api } from "@/shared/api";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore, useActorName } from "@/features/workspace";
@ -1019,6 +1020,21 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
</div>
</div>
{/* Agent live output */}
<div className="mt-4">
<AgentLiveCard
issueId={id}
assigneeType={issue.assignee_type}
assigneeId={issue.assignee_id}
agentName={issue.assignee_type === "agent" && issue.assignee_id ? getActorName("agent", issue.assignee_id) : undefined}
/>
</div>
{/* Agent execution history */}
<div className="mt-3">
<TaskRunHistory issueId={id} assigneeType={issue.assignee_type} />
</div>
{/* Timeline entries */}
<div className="mt-4 flex flex-col gap-3">
{(() => {

View file

@ -0,0 +1,88 @@
import { describe, it, expect } from "vitest";
import { redactSecrets } from "./redact";
describe("redactSecrets", () => {
it("redacts AWS access key", () => {
const result = redactSecrets("key: AKIAIOSFODNN7EXAMPLE");
expect(result).not.toContain("AKIAIOSFODNN7EXAMPLE");
expect(result).toContain("[REDACTED AWS KEY]");
});
it("redacts AWS secret key", () => {
const result = redactSecrets("aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
expect(result).not.toContain("wJalrXUtnFEMI");
});
it("redacts PEM private keys", () => {
const input = "-----BEGIN RSA PRIVATE KEY-----\nMIIEow...\n-----END RSA PRIVATE KEY-----";
const result = redactSecrets(input);
expect(result).not.toContain("MIIEow");
expect(result).toContain("[REDACTED PRIVATE KEY]");
});
it("redacts GitHub tokens", () => {
const result = redactSecrets("GITHUB_TOKEN=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn");
expect(result).not.toContain("ghp_");
});
it("redacts GitLab tokens", () => {
const result = redactSecrets("glpat-AbCdEfGhIjKlMnOpQrStUvWx");
expect(result).not.toContain("glpat-");
expect(result).toContain("[REDACTED GITLAB TOKEN]");
});
it("redacts OpenAI/Anthropic API keys", () => {
const result = redactSecrets("sk-proj-abc123def456ghi789jkl012mno345");
expect(result).not.toContain("sk-proj");
expect(result).toContain("[REDACTED API KEY]");
});
it("redacts Slack tokens", () => {
const result = redactSecrets("xoxb-123456789012-1234567890123-AbCdEfGhIjKl");
expect(result).not.toContain("xoxb-");
});
it("redacts JWT tokens", () => {
const result = redactSecrets("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c");
expect(result).not.toContain("eyJhbGci");
expect(result).toContain("[REDACTED JWT]");
});
it("redacts Bearer tokens", () => {
const result = redactSecrets("Authorization: Bearer abc123xyz.def456");
expect(result).toContain("Bearer [REDACTED]");
expect(result).not.toContain("abc123xyz");
});
it("redacts connection strings", () => {
const result = redactSecrets("postgres://admin:s3cret@db.example.com:5432/mydb");
expect(result).not.toContain("s3cret");
});
it("redacts generic credential env vars", () => {
for (const key of ["PASSWORD", "SECRET", "TOKEN", "DATABASE_URL", "API_KEY"]) {
const result = redactSecrets(`${key}=supersecretvalue123`);
expect(result).toContain("[REDACTED CREDENTIAL]");
expect(result).not.toContain("supersecretvalue123");
}
});
it("redacts multiple secrets in one string", () => {
const result = redactSecrets("AKIAIOSFODNN7EXAMPLE and ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn");
expect(result).not.toContain("AKIAIOSFODNN7EXAMPLE");
expect(result).not.toContain("ghp_");
});
it("does not alter normal text", () => {
const inputs = [
"This is a normal commit message about fixing a bug",
"The function returns skip-navigation as the class name",
"Created PR #42 for the authentication feature",
"Running tests in /tmp/test-workspace/project",
"The API endpoint /api/issues/123 was updated",
];
for (const input of inputs) {
expect(redactSecrets(input)).toBe(input);
}
});
});

View file

@ -0,0 +1,37 @@
/**
* Client-side fallback for redacting sensitive information in agent output.
* The server performs primary redaction; this is a safety net for display.
*/
const patterns: { re: RegExp; replacement: string }[] = [
// AWS access key IDs
{ re: /\bAKIA[0-9A-Z]{16}\b/g, replacement: "[REDACTED AWS KEY]" },
// AWS secret access keys
{ re: /(?:aws_secret_access_key|secret_?access_?key)\s*[=:]\s*[A-Za-z0-9/+=]{40}/gi, replacement: "[REDACTED AWS SECRET]" },
// PEM private keys
{ re: /-----BEGIN[A-Z\s]*PRIVATE KEY-----[\s\S]*?-----END[A-Z\s]*PRIVATE KEY-----/g, replacement: "[REDACTED PRIVATE KEY]" },
// GitHub tokens
{ re: /\b(?:ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9_]{36,255}\b/g, replacement: "[REDACTED GITHUB TOKEN]" },
// GitLab personal access tokens
{ re: /\bglpat-[A-Za-z0-9_-]{20,}\b/g, replacement: "[REDACTED GITLAB TOKEN]" },
// OpenAI / Anthropic API keys
{ re: /\bsk-[A-Za-z0-9_-]{20,}\b/g, replacement: "[REDACTED API KEY]" },
// Slack tokens
{ re: /\bxox[bporas]-[A-Za-z0-9-]{10,}\b/g, replacement: "[REDACTED SLACK TOKEN]" },
// JWT tokens
{ re: /\bey[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g, replacement: "[REDACTED JWT]" },
// Bearer tokens
{ re: /\bBearer\s+[A-Za-z0-9\-._~+/]+=*/gi, replacement: "Bearer [REDACTED]" },
// Connection strings with embedded passwords
{ re: /(?:postgres|mysql|mongodb|redis|amqp)(?:ql)?:\/\/[^:\s]+:[^@\s]+@/gi, replacement: "[REDACTED CONNECTION STRING]@" },
// Generic key=value secret env vars
{ re: /(?:API_KEY|API_SECRET|SECRET_KEY|SECRET|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|DB_URL|REDIS_URL|PASSWORD|TOKEN)\s*[=:]\s*\S+/gi, replacement: "[REDACTED CREDENTIAL]" },
];
export function redactSecrets(text: string): string {
let result = text;
for (const { re, replacement } of patterns) {
result = result.replace(re, replacement);
}
return result;
}

View file

@ -34,6 +34,7 @@ import type {
RuntimeHourlyActivity,
RuntimePing,
TimelineEntry,
TaskMessagePayload,
} from "@/shared/types";
import { type Logger, noopLogger } from "@/shared/logger";
@ -339,6 +340,18 @@ export class ApiClient {
return this.fetch(`/api/agents/${agentId}/tasks`);
}
async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> {
return this.fetch(`/api/issues/${issueId}/active-task`);
}
async listTaskMessages(taskId: string): Promise<TaskMessagePayload[]> {
return this.fetch(`/api/daemon/tasks/${taskId}/messages`);
}
async listTasksByIssue(issueId: string): Promise<AgentTask[]> {
return this.fetch(`/api/issues/${issueId}/task-runs`);
}
async getDaemonPairingSession(token: string): Promise<DaemonPairingSession> {
return this.fetch(`/api/daemon/pairing-sessions/${token}`);
}

View file

@ -20,6 +20,7 @@ export type WSEventType =
| "task:progress"
| "task:completed"
| "task:failed"
| "task:message"
| "inbox:new"
| "inbox:read"
| "inbox:archived"
@ -152,6 +153,31 @@ export interface ActivityCreatedPayload {
entry: TimelineEntry;
}
export interface TaskMessagePayload {
task_id: string;
issue_id: string;
seq: number;
type: "text" | "thinking" | "tool_use" | "tool_result" | "error";
tool?: string;
content?: string;
input?: Record<string, unknown>;
output?: string;
}
export interface TaskCompletedPayload {
task_id: string;
agent_id: string;
issue_id: string;
status: string;
}
export interface TaskFailedPayload {
task_id: string;
agent_id: string;
issue_id: string;
status: string;
}
export interface ReactionAddedPayload {
reaction: Reaction;
issue_id: string;

View file

@ -99,6 +99,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress)
r.Post("/tasks/{taskId}/complete", h.CompleteTask)
r.Post("/tasks/{taskId}/fail", h.FailTask)
r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages)
r.Get("/tasks/{taskId}/messages", h.ListTaskMessages)
})
// Protected API routes
@ -164,6 +166,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Get("/subscribers", h.ListIssueSubscribers)
r.Post("/subscribe", h.SubscribeToIssue)
r.Post("/unsubscribe", h.UnsubscribeFromIssue)
r.Get("/active-task", h.GetActiveTaskForIssue)
r.Get("/task-runs", h.ListTasksByIssue)
r.Post("/reactions", h.AddIssueReaction)
r.Delete("/reactions", h.RemoveIssueReaction)
})

View file

@ -83,6 +83,22 @@ func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, ste
}, nil)
}
// TaskMessageData represents a single agent execution message for batch reporting.
type TaskMessageData struct {
Seq int `json:"seq"`
Type string `json:"type"`
Tool string `json:"tool,omitempty"`
Content string `json:"content,omitempty"`
Input map[string]any `json:"input,omitempty"`
Output string `json:"output,omitempty"`
}
func (c *Client) ReportTaskMessages(ctx context.Context, taskID string, messages []TaskMessageData) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/messages", taskID), map[string]any{
"messages": messages,
}, nil)
}
func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error {
body := map[string]any{"output": output}
if branchName != "" {

View file

@ -821,22 +821,135 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
return TaskResult{}, err
}
// Drain message channel — log tool uses and agent text for visibility.
// Drain message channel — forward to server for live output + log locally.
var toolCount atomic.Int32
go func() {
var seq atomic.Int32
var mu sync.Mutex
var pendingText strings.Builder
var pendingThinking strings.Builder
var batch []TaskMessageData
callIDToTool := map[string]string{} // track callID → tool name for tool_result
flush := func() {
mu.Lock()
// Flush any accumulated thinking as a single message.
if pendingThinking.Len() > 0 {
s := seq.Add(1)
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "thinking",
Content: pendingThinking.String(),
})
pendingThinking.Reset()
}
// Flush any accumulated text as a single message.
if pendingText.Len() > 0 {
s := seq.Add(1)
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "text",
Content: pendingText.String(),
})
pendingText.Reset()
}
toSend := batch
batch = nil
mu.Unlock()
if len(toSend) > 0 {
sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := d.client.ReportTaskMessages(sendCtx, task.ID, toSend); err != nil {
taskLog.Debug("failed to report task messages", "error", err)
}
cancel()
}
}
// Periodically flush accumulated text/thinking messages.
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
done := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
flush()
case <-done:
return
}
}
}()
for msg := range session.Messages {
switch msg.Type {
case agent.MessageToolUse:
n := toolCount.Add(1)
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
if msg.CallID != "" {
mu.Lock()
callIDToTool[msg.CallID] = msg.Tool
mu.Unlock()
}
s := seq.Add(1)
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "tool_use",
Tool: msg.Tool,
Input: msg.Input,
})
mu.Unlock()
case agent.MessageToolResult:
s := seq.Add(1)
output := msg.Output
if len(output) > 8192 {
output = output[:8192]
}
// Resolve tool name from callID if not set directly.
toolName := msg.Tool
if toolName == "" && msg.CallID != "" {
mu.Lock()
toolName = callIDToTool[msg.CallID]
mu.Unlock()
}
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "tool_result",
Tool: toolName,
Output: output,
})
mu.Unlock()
case agent.MessageThinking:
if msg.Content != "" {
mu.Lock()
pendingThinking.WriteString(msg.Content)
mu.Unlock()
}
case agent.MessageText:
if msg.Content != "" {
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
mu.Lock()
pendingText.WriteString(msg.Content)
mu.Unlock()
}
case agent.MessageError:
taskLog.Error("agent error", "content", msg.Content)
s := seq.Add(1)
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "error",
Content: msg.Content,
})
mu.Unlock()
}
}
close(done)
flush() // Final flush after channel closes.
}()
result := <-session.Result

View file

@ -8,8 +8,10 @@ import (
"strings"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/multica-ai/multica/server/pkg/redact"
)
// ---------------------------------------------------------------------------
@ -385,3 +387,143 @@ func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) {
slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error)
writeJSON(w, http.StatusOK, taskToResponse(*task))
}
// ---------------------------------------------------------------------------
// Task Messages (live agent output)
// ---------------------------------------------------------------------------
type TaskMessageRequest struct {
Seq int `json:"seq"`
Type string `json:"type"`
Tool string `json:"tool,omitempty"`
Content string `json:"content,omitempty"`
Input map[string]any `json:"input,omitempty"`
Output string `json:"output,omitempty"`
}
type TaskMessageBatchRequest struct {
Messages []TaskMessageRequest `json:"messages"`
}
// ReportTaskMessages receives a batch of agent execution messages from the daemon.
func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskMessageBatchRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.Messages) == 0 {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
workspaceID := ""
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
for _, msg := range req.Messages {
// Redact sensitive information before persisting or broadcasting.
msg.Content = redact.Text(msg.Content)
msg.Output = redact.Text(msg.Output)
msg.Input = redact.InputMap(msg.Input)
var inputJSON []byte
if msg.Input != nil {
inputJSON, _ = json.Marshal(msg.Input)
}
h.Queries.CreateTaskMessage(r.Context(), db.CreateTaskMessageParams{
TaskID: parseUUID(taskID),
Seq: int32(msg.Seq),
Type: msg.Type,
Tool: pgtype.Text{String: msg.Tool, Valid: msg.Tool != ""},
Content: pgtype.Text{String: msg.Content, Valid: msg.Content != ""},
Input: inputJSON,
Output: pgtype.Text{String: msg.Output, Valid: msg.Output != ""},
})
if workspaceID != "" {
h.publish(protocol.EventTaskMessage, workspaceID, "system", "", protocol.TaskMessagePayload{
TaskID: taskID,
IssueID: uuidToString(task.IssueID),
Seq: msg.Seq,
Type: msg.Type,
Tool: msg.Tool,
Content: msg.Content,
Input: msg.Input,
Output: msg.Output,
})
}
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// ListTaskMessages returns the persisted messages for a task (for catch-up after reconnect).
func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
messages, err := h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list task messages")
return
}
resp := make([]protocol.TaskMessagePayload, len(messages))
for i, m := range messages {
var input map[string]any
if m.Input != nil {
json.Unmarshal(m.Input, &input)
}
resp[i] = protocol.TaskMessagePayload{
TaskID: taskID,
Seq: int(m.Seq),
Type: m.Type,
Tool: m.Tool.String,
Content: m.Content.String,
Input: input,
Output: m.Output.String,
}
}
writeJSON(w, http.StatusOK, resp)
}
// GetActiveTaskForIssue returns the currently running task for an issue, if any.
func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID))
if err != nil || len(tasks) == 0 {
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
return
}
writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])})
}
// ListTasksByIssue returns all tasks (any status) for an issue — used for execution history.
func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
tasks, err := h.Queries.ListTasksByIssue(r.Context(), parseUUID(issueID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list tasks")
return
}
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, resp)
}

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS task_message;

View file

@ -0,0 +1,13 @@
CREATE TABLE task_message (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID NOT NULL REFERENCES agent_task_queue(id) ON DELETE CASCADE,
seq INTEGER NOT NULL,
type TEXT NOT NULL,
tool TEXT,
content TEXT,
input JSONB,
output TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_task_message_task_id_seq ON task_message(task_id, seq);

View file

@ -42,6 +42,7 @@ type MessageType string
const (
MessageText MessageType = "text"
MessageThinking MessageType = "thinking"
MessageToolUse MessageType = "tool-use"
MessageToolResult MessageType = "tool-result"
MessageStatus MessageType = "status"

View file

@ -181,6 +181,10 @@ func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message,
output.WriteString(block.Text)
trySend(ch, Message{Type: MessageText, Content: block.Text})
}
case "thinking":
if block.Text != "" {
trySend(ch, Message{Type: MessageThinking, Content: block.Text})
}
case "tool_use":
var input map[string]any
if block.Input != nil {

View file

@ -451,6 +451,48 @@ func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUI
return has_pending, err
}
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
ORDER BY created_at DESC
`
func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1
@ -579,6 +621,48 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
return items, nil
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC
`
func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listTasksByIssue, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()

View file

@ -261,6 +261,18 @@ type SkillFile struct {
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type TaskMessage struct {
ID pgtype.UUID `json:"id"`
TaskID pgtype.UUID `json:"task_id"`
Seq int32 `json:"seq"`
Type string `json:"type"`
Tool pgtype.Text `json:"tool"`
Content pgtype.Text `json:"content"`
Input []byte `json:"input"`
Output pgtype.Text `json:"output"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type User struct {
ID pgtype.UUID `json:"id"`
Name string `json:"name"`

View file

@ -0,0 +1,140 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: task_message.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const createTaskMessage = `-- name: CreateTaskMessage :one
INSERT INTO task_message (task_id, seq, type, tool, content, input, output)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, task_id, seq, type, tool, content, input, output, created_at
`
type CreateTaskMessageParams struct {
TaskID pgtype.UUID `json:"task_id"`
Seq int32 `json:"seq"`
Type string `json:"type"`
Tool pgtype.Text `json:"tool"`
Content pgtype.Text `json:"content"`
Input []byte `json:"input"`
Output pgtype.Text `json:"output"`
}
func (q *Queries) CreateTaskMessage(ctx context.Context, arg CreateTaskMessageParams) (TaskMessage, error) {
row := q.db.QueryRow(ctx, createTaskMessage,
arg.TaskID,
arg.Seq,
arg.Type,
arg.Tool,
arg.Content,
arg.Input,
arg.Output,
)
var i TaskMessage
err := row.Scan(
&i.ID,
&i.TaskID,
&i.Seq,
&i.Type,
&i.Tool,
&i.Content,
&i.Input,
&i.Output,
&i.CreatedAt,
)
return i, err
}
const deleteTaskMessages = `-- name: DeleteTaskMessages :exec
DELETE FROM task_message
WHERE task_id = $1
`
func (q *Queries) DeleteTaskMessages(ctx context.Context, taskID pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteTaskMessages, taskID)
return err
}
const listTaskMessages = `-- name: ListTaskMessages :many
SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message
WHERE task_id = $1
ORDER BY seq ASC
`
func (q *Queries) ListTaskMessages(ctx context.Context, taskID pgtype.UUID) ([]TaskMessage, error) {
rows, err := q.db.Query(ctx, listTaskMessages, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []TaskMessage{}
for rows.Next() {
var i TaskMessage
if err := rows.Scan(
&i.ID,
&i.TaskID,
&i.Seq,
&i.Type,
&i.Tool,
&i.Content,
&i.Input,
&i.Output,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTaskMessagesSince = `-- name: ListTaskMessagesSince :many
SELECT id, task_id, seq, type, tool, content, input, output, created_at FROM task_message
WHERE task_id = $1 AND seq > $2
ORDER BY seq ASC
`
type ListTaskMessagesSinceParams struct {
TaskID pgtype.UUID `json:"task_id"`
Seq int32 `json:"seq"`
}
func (q *Queries) ListTaskMessagesSince(ctx context.Context, arg ListTaskMessagesSinceParams) ([]TaskMessage, error) {
rows, err := q.db.Query(ctx, listTaskMessagesSince, arg.TaskID, arg.Seq)
if err != nil {
return nil, err
}
defer rows.Close()
items := []TaskMessage{}
for rows.Next() {
var i TaskMessage
if err := rows.Scan(
&i.ID,
&i.TaskID,
&i.Seq,
&i.Type,
&i.Tool,
&i.Content,
&i.Input,
&i.Output,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}

View file

@ -129,6 +129,16 @@ SELECT * FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC;
-- name: ListActiveTasksByIssue :many
SELECT * FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
ORDER BY created_at DESC;
-- name: ListTasksByIssue :many
SELECT * FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC;
-- name: UpdateAgentStatus :one
UPDATE agent SET status = $2, updated_at = now()
WHERE id = $1

View file

@ -0,0 +1,18 @@
-- name: CreateTaskMessage :one
INSERT INTO task_message (task_id, seq, type, tool, content, input, output)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *;
-- name: ListTaskMessages :many
SELECT * FROM task_message
WHERE task_id = $1
ORDER BY seq ASC;
-- name: ListTaskMessagesSince :many
SELECT * FROM task_message
WHERE task_id = $1 AND seq > $2
ORDER BY seq ASC;
-- name: DeleteTaskMessages :exec
DELETE FROM task_message
WHERE task_id = $1;

View file

@ -26,6 +26,7 @@ const (
EventTaskProgress = "task:progress"
EventTaskCompleted = "task:completed"
EventTaskFailed = "task:failed"
EventTaskMessage = "task:message"
// Inbox events
EventInboxNew = "inbox:new"

View file

@ -31,6 +31,18 @@ type TaskCompletedPayload struct {
Output string `json:"output,omitempty"`
}
// TaskMessagePayload represents a single agent execution message (tool call, text, etc.)
type TaskMessagePayload struct {
TaskID string `json:"task_id"`
IssueID string `json:"issue_id,omitempty"`
Seq int `json:"seq"`
Type string `json:"type"` // "text", "tool_use", "tool_result", "error"
Tool string `json:"tool,omitempty"` // tool name for tool_use/tool_result
Content string `json:"content,omitempty"` // text content
Input map[string]any `json:"input,omitempty"` // tool input (tool_use only)
Output string `json:"output,omitempty"` // tool output (tool_result only)
}
// DaemonRegisterPayload is sent from daemon to server on connection.
type DaemonRegisterPayload struct {
DaemonID string `json:"daemon_id"`

View file

@ -35,11 +35,37 @@ var patterns = []secretPattern{
// Slack tokens
{regexp.MustCompile(`\bxox[bporas]-[A-Za-z0-9\-]{10,}\b`), "[REDACTED SLACK TOKEN]"},
// GitLab personal access tokens
{regexp.MustCompile(`\bglpat-[A-Za-z0-9_-]{20,}\b`), "[REDACTED GITLAB TOKEN]"},
// JWT tokens (three base64url segments)
{regexp.MustCompile(`\bey[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b`), "[REDACTED JWT]"},
// Generic "Bearer <token>" in output
{regexp.MustCompile(`(?i)\bBearer\s+[A-Za-z0-9\-._~+/]+=*\b`), "Bearer [REDACTED]"},
// Connection strings with embedded passwords
{regexp.MustCompile(`(?i)(?:postgres|mysql|mongodb|redis|amqp)(?:ql)?://[^:\s]+:[^@\s]+@`), "[REDACTED CONNECTION STRING]@"},
// Generic key=value patterns for common secret env var names
{regexp.MustCompile(`(?i)(?:API_KEY|API_SECRET|SECRET_KEY|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|REDIS_URL)\s*[=:]\s*\S+`), "[REDACTED CREDENTIAL]"},
{regexp.MustCompile(`(?i)(?:API_KEY|API_SECRET|SECRET_KEY|SECRET|ACCESS_TOKEN|AUTH_TOKEN|PRIVATE_KEY|DATABASE_URL|DB_PASSWORD|DB_URL|REDIS_URL|PASSWORD|TOKEN)\s*[=:]\s*\S+`), "[REDACTED CREDENTIAL]"},
}
// InputMap returns a copy of m with all string values passed through Text.
// Non-string values are preserved as-is.
func InputMap(m map[string]any) map[string]any {
if m == nil {
return nil
}
out := make(map[string]any, len(m))
for k, v := range m {
if s, ok := v.(string); ok {
out[k] = Text(s)
} else {
out[k] = v
}
}
return out
}
// homeDir is resolved once at init for path redaction.

View file

@ -126,6 +126,83 @@ func TestNoFalsePositivesOnNormalText(t *testing.T) {
}
}
func TestRedactGitLabToken(t *testing.T) {
t.Parallel()
input := "GITLAB_TOKEN=glpat-AbCdEfGhIjKlMnOpQrStUvWx"
got := Text(input)
if strings.Contains(got, "glpat-") {
t.Fatalf("GitLab token not redacted: %s", got)
}
}
func TestRedactJWT(t *testing.T) {
t.Parallel()
input := "token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
got := Text(input)
if strings.Contains(got, "eyJhbGci") {
t.Fatalf("JWT not redacted: %s", got)
}
}
func TestRedactConnectionString(t *testing.T) {
t.Parallel()
input := "connecting to postgres://admin:s3cret@db.example.com:5432/mydb"
got := Text(input)
if strings.Contains(got, "s3cret") {
t.Fatalf("connection string password not redacted: %s", got)
}
}
func TestRedactPasswordEnvVar(t *testing.T) {
t.Parallel()
cases := []struct {
name string
input string
}{
{"PASSWORD", "PASSWORD=hunter2"},
{"SECRET", "SECRET=mysecretvalue"},
{"TOKEN", "TOKEN=abc123xyz"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := Text(tc.input)
if !strings.Contains(got, "[REDACTED CREDENTIAL]") {
t.Fatalf("expected credential redaction for %s, got: %s", tc.name, got)
}
})
}
}
func TestInputMap(t *testing.T) {
t.Parallel()
m := map[string]any{
"command": "echo sk-proj-abc123def456ghi789jkl012mno345",
"file_path": "/tmp/test.txt",
"count": 42,
}
got := InputMap(m)
if s, ok := got["command"].(string); ok {
if strings.Contains(s, "sk-proj") {
t.Fatalf("API key in input map not redacted: %s", s)
}
}
// Non-string values preserved
if got["count"] != 42 {
t.Fatalf("non-string value altered: %v", got["count"])
}
// Clean strings unchanged
if got["file_path"] != "/tmp/test.txt" {
t.Fatalf("clean string altered: %v", got["file_path"])
}
}
func TestInputMapNil(t *testing.T) {
t.Parallel()
if got := InputMap(nil); got != nil {
t.Fatalf("expected nil, got: %v", got)
}
}
func TestRedactMultipleSecrets(t *testing.T) {
t.Parallel()
input := "Keys: AKIAIOSFODNN7EXAMPLE and ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn"