From b94108768e2cf63b4d587ec7e288f95ab11c8bc1 Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Tue, 7 Apr 2026 18:19:57 +0800 Subject: [PATCH] feat: support multiple agents running concurrently on the same issue - Relax ClaimAgentTask SQL constraint from per-issue to per-(issue, agent) serialization, allowing different agents to run in parallel on the same issue - Update GetActiveTaskForIssue API to return all active tasks (array) instead of just the first one - Refactor AgentLiveCard to render one card per active task, routing WebSocket messages by task_id for independent timelines - Fix shouldEnqueueOnComment to use per-agent dedup so a mentioned agent's pending task doesn't block the assigned agent's on_comment trigger Closes MUL-160 --- .../app/(dashboard)/issues/[id]/page.test.tsx | 2 +- .../issues/components/agent-live-card.tsx | 178 +++++++++++------- .../issues/components/issue-detail.tsx | 1 - apps/web/shared/api/client.ts | 2 +- server/internal/handler/daemon.go | 15 +- server/internal/handler/issue.go | 9 +- server/pkg/db/generated/agent.sql.go | 9 +- server/pkg/db/queries/agent.sql | 9 +- 8 files changed, 139 insertions(+), 86 deletions(-) diff --git a/apps/web/app/(dashboard)/issues/[id]/page.test.tsx b/apps/web/app/(dashboard)/issues/[id]/page.test.tsx index 7ec44b49..3360f765 100644 --- a/apps/web/app/(dashboard)/issues/[id]/page.test.tsx +++ b/apps/web/app/(dashboard)/issues/[id]/page.test.tsx @@ -182,7 +182,7 @@ vi.mock("@/shared/api", () => ({ listIssueSubscribers: vi.fn().mockResolvedValue([]), subscribeToIssue: vi.fn().mockResolvedValue(undefined), unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined), - getActiveTaskForIssue: vi.fn().mockResolvedValue({ task: null }), + getActiveTasksForIssue: vi.fn().mockResolvedValue({ tasks: [] }), listTasksByIssue: vi.fn().mockResolvedValue([]), listTaskMessages: vi.fn().mockResolvedValue([]), }, diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx index a7d2ca34..08dcb381 100644 --- a/apps/web/features/issues/components/agent-live-card.tsx +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -95,49 +95,51 @@ function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] { return items.sort((a, b) => a.seq - b.seq); } -// ─── AgentLiveCard (real-time view) ──────────────────────────────────────── +// ─── Per-task state ───────────────────────────────────────────────────────── + +interface TaskState { + task: AgentTask; + items: TimelineItem[]; +} + +// ─── AgentLiveCard (real-time view for multiple agents) ─────────────────── interface AgentLiveCardProps { issueId: string; - agentName?: string; /** Scroll container ref — used to auto-collapse timeline on outer scroll. */ scrollContainerRef?: React.RefObject; } -export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentLiveCardProps) { +export function AgentLiveCard({ issueId, scrollContainerRef }: AgentLiveCardProps) { const { getActorName } = useActorName(); - const [activeTask, setActiveTask] = useState(null); - const [items, setItems] = useState([]); - const [elapsed, setElapsed] = useState(""); - const [open, setOpen] = useState(false); - const [autoScroll, setAutoScroll] = useState(true); - const [cancelling, setCancelling] = useState(false); - const scrollRef = useRef(null); - const ignoreScrollRef = useRef(false); + const [taskStates, setTaskStates] = useState>(new Map()); const seenSeqs = useRef(new Set()); - // Check for active task on mount + // Fetch active tasks on mount useEffect(() => { let cancelled = false; - api.getActiveTaskForIssue(issueId).then(({ task }) => { - if (!cancelled) { - setActiveTask(task); - if (task) { - api.listTaskMessages(task.id).then((msgs) => { - if (!cancelled) { - const timeline = buildTimeline(msgs); - setItems(timeline); - for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`); - } - }).catch(console.error); + api.getActiveTasksForIssue(issueId).then(({ tasks }) => { + if (cancelled || tasks.length === 0) return; + const newStates = new Map(); + const loadPromises = tasks.map(async (task) => { + try { + const msgs = await api.listTaskMessages(task.id); + const timeline = buildTimeline(msgs); + for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`); + newStates.set(task.id, { task, items: timeline }); + } catch { + newStates.set(task.id, { task, items: [] }); } - } + }); + Promise.all(loadPromises).then(() => { + if (!cancelled) setTaskStates(newStates); + }); }).catch(console.error); return () => { cancelled = true; }; }, [issueId]); - // Handle real-time task messages + // Handle real-time task messages — route by task_id useWSEvent( "task:message", useCallback((payload: unknown) => { @@ -147,64 +149,109 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL if (seenSeqs.current.has(key)) return; seenSeqs.current.add(key); - setItems((prev) => { - const item: TimelineItem = { - seq: msg.seq, - type: msg.type, - tool: msg.tool, - content: msg.content, - input: msg.input, - output: msg.output, - }; - const next = [...prev, item]; - next.sort((a, b) => a.seq - b.seq); + const item: TimelineItem = { + seq: msg.seq, + type: msg.type, + tool: msg.tool, + content: msg.content, + input: msg.input, + output: msg.output, + }; + + setTaskStates((prev) => { + const next = new Map(prev); + const existing = next.get(msg.task_id); + if (existing) { + const items = [...existing.items, item].sort((a, b) => a.seq - b.seq); + next.set(msg.task_id, { ...existing, items }); + } + // If we don't have this task yet, the dispatch handler will pick it up return next; }); }, [issueId]), ); - // Handle task completion/failure/cancellation + // Handle task end events — remove only the specific task const handleTaskEnd = useCallback((payload: unknown) => { - const p = payload as { issue_id: string }; + const p = payload as { task_id: string; issue_id: string }; if (p.issue_id !== issueId) return; - setActiveTask(null); - setItems([]); - seenSeqs.current.clear(); - setCancelling(false); - setOpen(false); + setTaskStates((prev) => { + const next = new Map(prev); + next.delete(p.task_id); + return next; + }); }, [issueId]); useWSEvent("task:completed", handleTaskEnd); useWSEvent("task:failed", handleTaskEnd); useWSEvent("task:cancelled", handleTaskEnd); - // Pick up new tasks + // Pick up newly dispatched tasks useWSEvent( "task:dispatch", useCallback(() => { - if (activeTask) return; - api.getActiveTaskForIssue(issueId).then(({ task }) => { - if (task) { - setActiveTask(task); - setItems([]); - seenSeqs.current.clear(); - setOpen(false); - } + api.getActiveTasksForIssue(issueId).then(({ tasks }) => { + setTaskStates((prev) => { + const next = new Map(prev); + for (const task of tasks) { + if (!next.has(task.id)) { + next.set(task.id, { task, items: [] }); + } + } + return next; + }); }).catch(console.error); - }, [issueId, activeTask]), + }, [issueId]), ); + if (taskStates.size === 0) return null; + + const entries = Array.from(taskStates.values()); + + return ( +
+ {entries.map(({ task, items }) => ( + + ))} +
+ ); +} + +// ─── SingleAgentLiveCard (one card per running task) ────────────────────── + +interface SingleAgentLiveCardProps { + task: AgentTask; + items: TimelineItem[]; + issueId: string; + agentName: string; + scrollContainerRef?: React.RefObject; +} + +function SingleAgentLiveCard({ task, items, issueId, agentName, scrollContainerRef }: SingleAgentLiveCardProps) { + const [elapsed, setElapsed] = useState(""); + const [open, setOpen] = useState(false); + const [autoScroll, setAutoScroll] = useState(true); + const [cancelling, setCancelling] = useState(false); + const scrollRef = useRef(null); + const ignoreScrollRef = useRef(false); + // Elapsed time useEffect(() => { - if (!activeTask?.started_at && !activeTask?.dispatched_at) return; - const startRef = activeTask.started_at ?? activeTask.dispatched_at!; + if (!task.started_at && !task.dispatched_at) return; + const startRef = task.started_at ?? task.dispatched_at!; setElapsed(formatElapsed(startRef)); const interval = setInterval(() => setElapsed(formatElapsed(startRef)), 1000); return () => clearInterval(interval); - }, [activeTask?.started_at, activeTask?.dispatched_at]); + }, [task.started_at, task.dispatched_at]); // Auto-collapse timeline when outer scroll container scrolls - // (ignoreScrollRef prevents layout-induced scroll from collapsing right after expand) useEffect(() => { const container = scrollContainerRef?.current; if (!container) return; @@ -240,23 +287,20 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL }, [open]); const handleCancel = useCallback(async () => { - if (!activeTask || cancelling) return; + if (cancelling) return; setCancelling(true); try { - await api.cancelTask(issueId, activeTask.id); + await api.cancelTask(issueId, task.id); } catch (e) { toast.error(e instanceof Error ? e.message : "Failed to cancel task"); setCancelling(false); } - }, [activeTask, issueId, cancelling]); - - if (!activeTask) return null; + }, [task.id, issueId, cancelling]); const toolCount = items.filter((i) => i.type === "tool_use").length; - const name = (activeTask.agent_id ? getActorName("agent", activeTask.agent_id) : agentName) ?? "Agent"; return ( -
+
{/* Header — click to toggle timeline */}
- {activeTask.agent_id ? ( - + {task.agent_id ? ( + ) : (
@@ -280,7 +324,7 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL )}
- {name} is working + {agentName} is working {elapsed} {toolCount > 0 && ( {toolCount} tools diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx index 43c38885..815d25fc 100644 --- a/apps/web/features/issues/components/issue-detail.tsx +++ b/apps/web/features/issues/components/issue-detail.tsx @@ -783,7 +783,6 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo {/* Agent live output */} diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 5d7d2e79..cf0ad4f0 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -379,7 +379,7 @@ export class ApiClient { return this.fetch(`/api/agents/${agentId}/tasks`); } - async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> { + async getActiveTasksForIssue(issueId: string): Promise<{ tasks: AgentTask[] }> { return this.fetch(`/api/issues/${issueId}/active-task`); } diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index bab4ce4d..d04d8fe8 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -536,17 +536,22 @@ func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, resp) } -// GetActiveTaskForIssue returns the currently running task for an issue, if any. +// GetActiveTaskForIssue returns all currently active tasks for an issue. +// Returns { tasks: [...] } array (may be empty). func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) { issueID := chi.URLParam(r, "id") tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID)) - if err != nil || len(tasks) == 0 { - writeJSON(w, http.StatusOK, map[string]any{"task": nil}) - return + if err != nil { + tasks = nil } - writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) + resp := make([]AgentTaskResponse, len(tasks)) + for i, t := range tasks { + resp[i] = taskToResponse(t) + } + + writeJSON(w, http.StatusOK, map[string]any{"tasks": resp}) } // CancelTask cancels a running or queued task by ID. diff --git a/server/internal/handler/issue.go b/server/internal/handler/issue.go index 0259bb21..4fe715c1 100644 --- a/server/internal/handler/issue.go +++ b/server/internal/handler/issue.go @@ -527,9 +527,12 @@ func (h *Handler) shouldEnqueueOnComment(ctx context.Context, issue db.Issue) bo return false } // Coalescing queue: allow enqueue when a task is running (so the agent - // picks up new comments on the next cycle) but skip if a pending task - // already exists (natural dedup for rapid-fire comments). - hasPending, err := h.Queries.HasPendingTaskForIssue(ctx, issue.ID) + // picks up new comments on the next cycle) but skip if this agent already + // has a pending task (natural dedup for rapid-fire comments). + hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{ + IssueID: issue.ID, + AgentID: issue.AssigneeID, + }) if err != nil || hasPending { return false } diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index bc785374..f639208c 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -111,6 +111,7 @@ WHERE id = ( AND NOT EXISTS ( SELECT 1 FROM agent_task_queue active WHERE active.issue_id = atq.issue_id + AND active.agent_id = atq.agent_id AND active.status IN ('dispatched', 'running') ) ORDER BY atq.priority DESC, atq.created_at ASC @@ -120,10 +121,10 @@ WHERE id = ( RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id ` -// Claims the next queued task for an agent, enforcing per-issue serialization: -// a task is only claimable when no other task for the same issue is already -// dispatched or running. This guarantees serial execution within an issue -// while allowing parallel execution across different issues. +// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization: +// a task is only claimable when no other task for the same issue AND same agent is +// already dispatched or running. This allows different agents to work on the same +// issue in parallel while preventing a single agent from running duplicate tasks. func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) { row := q.db.QueryRow(ctx, claimAgentTask, agentID) var i AgentTaskQueue diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 95239b2f..5ef051b7 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -77,10 +77,10 @@ SELECT * FROM agent_task_queue WHERE id = $1; -- name: ClaimAgentTask :one --- Claims the next queued task for an agent, enforcing per-issue serialization: --- a task is only claimable when no other task for the same issue is already --- dispatched or running. This guarantees serial execution within an issue --- while allowing parallel execution across different issues. +-- Claims the next queued task for an agent, enforcing per-(issue, agent) serialization: +-- a task is only claimable when no other task for the same issue AND same agent is +-- already dispatched or running. This allows different agents to work on the same +-- issue in parallel while preventing a single agent from running duplicate tasks. UPDATE agent_task_queue SET status = 'dispatched', dispatched_at = now() WHERE id = ( @@ -89,6 +89,7 @@ WHERE id = ( AND NOT EXISTS ( SELECT 1 FROM agent_task_queue active WHERE active.issue_id = atq.issue_id + AND active.agent_id = atq.agent_id AND active.status IN ('dispatched', 'running') ) ORDER BY atq.priority DESC, atq.created_at ASC