feat: support multiple agents running concurrently on the same issue

- Relax ClaimAgentTask SQL constraint from per-issue to per-(issue, agent)
  serialization, allowing different agents to run in parallel on the same issue
- Update GetActiveTaskForIssue API to return all active tasks (array) instead of
  just the first one
- Refactor AgentLiveCard to render one card per active task, routing WebSocket
  messages by task_id for independent timelines
- Fix shouldEnqueueOnComment to use per-agent dedup so a mentioned agent's
  pending task doesn't block the assigned agent's on_comment trigger

Closes MUL-160
This commit is contained in:
Jiang Bohan 2026-04-07 18:19:57 +08:00
parent 638033c9ff
commit b94108768e
8 changed files with 139 additions and 86 deletions

View file

@ -182,7 +182,7 @@ vi.mock("@/shared/api", () => ({
listIssueSubscribers: vi.fn().mockResolvedValue([]), listIssueSubscribers: vi.fn().mockResolvedValue([]),
subscribeToIssue: vi.fn().mockResolvedValue(undefined), subscribeToIssue: vi.fn().mockResolvedValue(undefined),
unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined), unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined),
getActiveTaskForIssue: vi.fn().mockResolvedValue({ task: null }), getActiveTasksForIssue: vi.fn().mockResolvedValue({ tasks: [] }),
listTasksByIssue: vi.fn().mockResolvedValue([]), listTasksByIssue: vi.fn().mockResolvedValue([]),
listTaskMessages: vi.fn().mockResolvedValue([]), listTaskMessages: vi.fn().mockResolvedValue([]),
}, },

View file

@ -95,49 +95,51 @@ function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] {
return items.sort((a, b) => a.seq - b.seq); return items.sort((a, b) => a.seq - b.seq);
} }
// ─── AgentLiveCard (real-time view) ──────────────────────────────────────── // ─── Per-task state ─────────────────────────────────────────────────────────
interface TaskState {
task: AgentTask;
items: TimelineItem[];
}
// ─── AgentLiveCard (real-time view for multiple agents) ───────────────────
interface AgentLiveCardProps { interface AgentLiveCardProps {
issueId: string; issueId: string;
agentName?: string;
/** Scroll container ref — used to auto-collapse timeline on outer scroll. */ /** Scroll container ref — used to auto-collapse timeline on outer scroll. */
scrollContainerRef?: React.RefObject<HTMLDivElement | null>; scrollContainerRef?: React.RefObject<HTMLDivElement | null>;
} }
export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentLiveCardProps) { export function AgentLiveCard({ issueId, scrollContainerRef }: AgentLiveCardProps) {
const { getActorName } = useActorName(); const { getActorName } = useActorName();
const [activeTask, setActiveTask] = useState<AgentTask | null>(null); const [taskStates, setTaskStates] = useState<Map<string, TaskState>>(new Map());
const [items, setItems] = useState<TimelineItem[]>([]);
const [elapsed, setElapsed] = useState("");
const [open, setOpen] = useState(false);
const [autoScroll, setAutoScroll] = useState(true);
const [cancelling, setCancelling] = useState(false);
const scrollRef = useRef<HTMLDivElement>(null);
const ignoreScrollRef = useRef(false);
const seenSeqs = useRef(new Set<string>()); const seenSeqs = useRef(new Set<string>());
// Check for active task on mount // Fetch active tasks on mount
useEffect(() => { useEffect(() => {
let cancelled = false; let cancelled = false;
api.getActiveTaskForIssue(issueId).then(({ task }) => { api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
if (!cancelled) { if (cancelled || tasks.length === 0) return;
setActiveTask(task); const newStates = new Map<string, TaskState>();
if (task) { const loadPromises = tasks.map(async (task) => {
api.listTaskMessages(task.id).then((msgs) => { try {
if (!cancelled) { const msgs = await api.listTaskMessages(task.id);
const timeline = buildTimeline(msgs); const timeline = buildTimeline(msgs);
setItems(timeline); for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`); newStates.set(task.id, { task, items: timeline });
} } catch {
}).catch(console.error); newStates.set(task.id, { task, items: [] });
} }
} });
Promise.all(loadPromises).then(() => {
if (!cancelled) setTaskStates(newStates);
});
}).catch(console.error); }).catch(console.error);
return () => { cancelled = true; }; return () => { cancelled = true; };
}, [issueId]); }, [issueId]);
// Handle real-time task messages // Handle real-time task messages — route by task_id
useWSEvent( useWSEvent(
"task:message", "task:message",
useCallback((payload: unknown) => { useCallback((payload: unknown) => {
@ -147,64 +149,109 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
if (seenSeqs.current.has(key)) return; if (seenSeqs.current.has(key)) return;
seenSeqs.current.add(key); seenSeqs.current.add(key);
setItems((prev) => { const item: TimelineItem = {
const item: TimelineItem = { seq: msg.seq,
seq: msg.seq, type: msg.type,
type: msg.type, tool: msg.tool,
tool: msg.tool, content: msg.content,
content: msg.content, input: msg.input,
input: msg.input, output: msg.output,
output: msg.output, };
};
const next = [...prev, item]; setTaskStates((prev) => {
next.sort((a, b) => a.seq - b.seq); const next = new Map(prev);
const existing = next.get(msg.task_id);
if (existing) {
const items = [...existing.items, item].sort((a, b) => a.seq - b.seq);
next.set(msg.task_id, { ...existing, items });
}
// If we don't have this task yet, the dispatch handler will pick it up
return next; return next;
}); });
}, [issueId]), }, [issueId]),
); );
// Handle task completion/failure/cancellation // Handle task end events — remove only the specific task
const handleTaskEnd = useCallback((payload: unknown) => { const handleTaskEnd = useCallback((payload: unknown) => {
const p = payload as { issue_id: string }; const p = payload as { task_id: string; issue_id: string };
if (p.issue_id !== issueId) return; if (p.issue_id !== issueId) return;
setActiveTask(null); setTaskStates((prev) => {
setItems([]); const next = new Map(prev);
seenSeqs.current.clear(); next.delete(p.task_id);
setCancelling(false); return next;
setOpen(false); });
}, [issueId]); }, [issueId]);
useWSEvent("task:completed", handleTaskEnd); useWSEvent("task:completed", handleTaskEnd);
useWSEvent("task:failed", handleTaskEnd); useWSEvent("task:failed", handleTaskEnd);
useWSEvent("task:cancelled", handleTaskEnd); useWSEvent("task:cancelled", handleTaskEnd);
// Pick up new tasks // Pick up newly dispatched tasks
useWSEvent( useWSEvent(
"task:dispatch", "task:dispatch",
useCallback(() => { useCallback(() => {
if (activeTask) return; api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
api.getActiveTaskForIssue(issueId).then(({ task }) => { setTaskStates((prev) => {
if (task) { const next = new Map(prev);
setActiveTask(task); for (const task of tasks) {
setItems([]); if (!next.has(task.id)) {
seenSeqs.current.clear(); next.set(task.id, { task, items: [] });
setOpen(false); }
} }
return next;
});
}).catch(console.error); }).catch(console.error);
}, [issueId, activeTask]), }, [issueId]),
); );
if (taskStates.size === 0) return null;
const entries = Array.from(taskStates.values());
return (
<div className="mt-4 space-y-2">
{entries.map(({ task, items }) => (
<SingleAgentLiveCard
key={task.id}
task={task}
items={items}
issueId={issueId}
agentName={task.agent_id ? getActorName("agent", task.agent_id) : "Agent"}
scrollContainerRef={scrollContainerRef}
/>
))}
</div>
);
}
// ─── SingleAgentLiveCard (one card per running task) ──────────────────────
interface SingleAgentLiveCardProps {
task: AgentTask;
items: TimelineItem[];
issueId: string;
agentName: string;
scrollContainerRef?: React.RefObject<HTMLDivElement | null>;
}
function SingleAgentLiveCard({ task, items, issueId, agentName, scrollContainerRef }: SingleAgentLiveCardProps) {
const [elapsed, setElapsed] = useState("");
const [open, setOpen] = useState(false);
const [autoScroll, setAutoScroll] = useState(true);
const [cancelling, setCancelling] = useState(false);
const scrollRef = useRef<HTMLDivElement>(null);
const ignoreScrollRef = useRef(false);
// Elapsed time // Elapsed time
useEffect(() => { useEffect(() => {
if (!activeTask?.started_at && !activeTask?.dispatched_at) return; if (!task.started_at && !task.dispatched_at) return;
const startRef = activeTask.started_at ?? activeTask.dispatched_at!; const startRef = task.started_at ?? task.dispatched_at!;
setElapsed(formatElapsed(startRef)); setElapsed(formatElapsed(startRef));
const interval = setInterval(() => setElapsed(formatElapsed(startRef)), 1000); const interval = setInterval(() => setElapsed(formatElapsed(startRef)), 1000);
return () => clearInterval(interval); return () => clearInterval(interval);
}, [activeTask?.started_at, activeTask?.dispatched_at]); }, [task.started_at, task.dispatched_at]);
// Auto-collapse timeline when outer scroll container scrolls // Auto-collapse timeline when outer scroll container scrolls
// (ignoreScrollRef prevents layout-induced scroll from collapsing right after expand)
useEffect(() => { useEffect(() => {
const container = scrollContainerRef?.current; const container = scrollContainerRef?.current;
if (!container) return; if (!container) return;
@ -240,23 +287,20 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
}, [open]); }, [open]);
const handleCancel = useCallback(async () => { const handleCancel = useCallback(async () => {
if (!activeTask || cancelling) return; if (cancelling) return;
setCancelling(true); setCancelling(true);
try { try {
await api.cancelTask(issueId, activeTask.id); await api.cancelTask(issueId, task.id);
} catch (e) { } catch (e) {
toast.error(e instanceof Error ? e.message : "Failed to cancel task"); toast.error(e instanceof Error ? e.message : "Failed to cancel task");
setCancelling(false); setCancelling(false);
} }
}, [activeTask, issueId, cancelling]); }, [task.id, issueId, cancelling]);
if (!activeTask) return null;
const toolCount = items.filter((i) => i.type === "tool_use").length; const toolCount = items.filter((i) => i.type === "tool_use").length;
const name = (activeTask.agent_id ? getActorName("agent", activeTask.agent_id) : agentName) ?? "Agent";
return ( return (
<div className="mt-4 sticky top-4 z-10 rounded-lg border border-info/20 bg-info/5 backdrop-blur-sm"> <div className="sticky top-4 z-10 rounded-lg border border-info/20 bg-info/5 backdrop-blur-sm">
{/* Header — click to toggle timeline */} {/* Header — click to toggle timeline */}
<div <div
className="group flex items-center gap-2 px-3 py-2 cursor-pointer select-none text-muted-foreground hover:text-foreground transition-colors" className="group flex items-center gap-2 px-3 py-2 cursor-pointer select-none text-muted-foreground hover:text-foreground transition-colors"
@ -271,8 +315,8 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
} }
}} }}
> >
{activeTask.agent_id ? ( {task.agent_id ? (
<ActorAvatar actorType="agent" actorId={activeTask.agent_id} size={20} /> <ActorAvatar actorType="agent" actorId={task.agent_id} size={20} />
) : ( ) : (
<div className="flex items-center justify-center h-5 w-5 rounded-full shrink-0 bg-info/10 text-info"> <div className="flex items-center justify-center h-5 w-5 rounded-full shrink-0 bg-info/10 text-info">
<Bot className="h-3 w-3" /> <Bot className="h-3 w-3" />
@ -280,7 +324,7 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
)} )}
<div className="flex items-center gap-1.5 text-xs min-w-0"> <div className="flex items-center gap-1.5 text-xs min-w-0">
<Loader2 className="h-3 w-3 animate-spin text-info shrink-0" /> <Loader2 className="h-3 w-3 animate-spin text-info shrink-0" />
<span className="font-medium text-foreground truncate">{name} is working</span> <span className="font-medium text-foreground truncate">{agentName} is working</span>
<span className="text-muted-foreground tabular-nums shrink-0">{elapsed}</span> <span className="text-muted-foreground tabular-nums shrink-0">{elapsed}</span>
{toolCount > 0 && ( {toolCount > 0 && (
<span className="text-muted-foreground shrink-0">{toolCount} tools</span> <span className="text-muted-foreground shrink-0">{toolCount} tools</span>

View file

@ -783,7 +783,6 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
{/* Agent live output */} {/* Agent live output */}
<AgentLiveCard <AgentLiveCard
issueId={id} issueId={id}
agentName={issue.assignee_type === "agent" && issue.assignee_id ? getActorName("agent", issue.assignee_id) : undefined}
scrollContainerRef={scrollContainerRef} scrollContainerRef={scrollContainerRef}
/> />

View file

@ -379,7 +379,7 @@ export class ApiClient {
return this.fetch(`/api/agents/${agentId}/tasks`); return this.fetch(`/api/agents/${agentId}/tasks`);
} }
async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> { async getActiveTasksForIssue(issueId: string): Promise<{ tasks: AgentTask[] }> {
return this.fetch(`/api/issues/${issueId}/active-task`); return this.fetch(`/api/issues/${issueId}/active-task`);
} }

View file

@ -536,17 +536,22 @@ func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp) writeJSON(w, http.StatusOK, resp)
} }
// GetActiveTaskForIssue returns the currently running task for an issue, if any. // GetActiveTaskForIssue returns all currently active tasks for an issue.
// Returns { tasks: [...] } array (may be empty).
func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) { func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id") issueID := chi.URLParam(r, "id")
tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID)) tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID))
if err != nil || len(tasks) == 0 { if err != nil {
writeJSON(w, http.StatusOK, map[string]any{"task": nil}) tasks = nil
return
} }
writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": resp})
} }
// CancelTask cancels a running or queued task by ID. // CancelTask cancels a running or queued task by ID.

View file

@ -527,9 +527,12 @@ func (h *Handler) shouldEnqueueOnComment(ctx context.Context, issue db.Issue) bo
return false return false
} }
// Coalescing queue: allow enqueue when a task is running (so the agent // Coalescing queue: allow enqueue when a task is running (so the agent
// picks up new comments on the next cycle) but skip if a pending task // picks up new comments on the next cycle) but skip if this agent already
// already exists (natural dedup for rapid-fire comments). // has a pending task (natural dedup for rapid-fire comments).
hasPending, err := h.Queries.HasPendingTaskForIssue(ctx, issue.ID) hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
IssueID: issue.ID,
AgentID: issue.AssigneeID,
})
if err != nil || hasPending { if err != nil || hasPending {
return false return false
} }

View file

@ -111,6 +111,7 @@ WHERE id = (
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active SELECT 1 FROM agent_task_queue active
WHERE active.issue_id = atq.issue_id WHERE active.issue_id = atq.issue_id
AND active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running') AND active.status IN ('dispatched', 'running')
) )
ORDER BY atq.priority DESC, atq.created_at ASC ORDER BY atq.priority DESC, atq.created_at ASC
@ -120,10 +121,10 @@ WHERE id = (
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
` `
// Claims the next queued task for an agent, enforcing per-issue serialization: // Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
// a task is only claimable when no other task for the same issue is already // a task is only claimable when no other task for the same issue AND same agent is
// dispatched or running. This guarantees serial execution within an issue // already dispatched or running. This allows different agents to work on the same
// while allowing parallel execution across different issues. // issue in parallel while preventing a single agent from running duplicate tasks.
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) { func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, claimAgentTask, agentID) row := q.db.QueryRow(ctx, claimAgentTask, agentID)
var i AgentTaskQueue var i AgentTaskQueue

View file

@ -77,10 +77,10 @@ SELECT * FROM agent_task_queue
WHERE id = $1; WHERE id = $1;
-- name: ClaimAgentTask :one -- name: ClaimAgentTask :one
-- Claims the next queued task for an agent, enforcing per-issue serialization: -- Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
-- a task is only claimable when no other task for the same issue is already -- a task is only claimable when no other task for the same issue AND same agent is
-- dispatched or running. This guarantees serial execution within an issue -- already dispatched or running. This allows different agents to work on the same
-- while allowing parallel execution across different issues. -- issue in parallel while preventing a single agent from running duplicate tasks.
UPDATE agent_task_queue UPDATE agent_task_queue
SET status = 'dispatched', dispatched_at = now() SET status = 'dispatched', dispatched_at = now()
WHERE id = ( WHERE id = (
@ -89,6 +89,7 @@ WHERE id = (
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active SELECT 1 FROM agent_task_queue active
WHERE active.issue_id = atq.issue_id WHERE active.issue_id = atq.issue_id
AND active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running') AND active.status IN ('dispatched', 'running')
) )
ORDER BY atq.priority DESC, atq.created_at ASC ORDER BY atq.priority DESC, atq.created_at ASC