Merge pull request #471 from multica-ai/agent/j/959392dd

feat: support multiple agents running on same issue
This commit is contained in:
Bohan Jiang 2026-04-08 12:45:56 +08:00 committed by GitHub
commit 98af9f442c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 139 additions and 86 deletions

View file

@ -160,7 +160,7 @@ vi.mock("@/shared/api", () => ({
listIssueSubscribers: vi.fn().mockResolvedValue([]),
subscribeToIssue: vi.fn().mockResolvedValue(undefined),
unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined),
getActiveTaskForIssue: vi.fn().mockResolvedValue({ task: null }),
getActiveTasksForIssue: vi.fn().mockResolvedValue({ tasks: [] }),
listTasksByIssue: vi.fn().mockResolvedValue([]),
listTaskMessages: vi.fn().mockResolvedValue([]),
},

View file

@ -95,49 +95,51 @@ function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] {
return items.sort((a, b) => a.seq - b.seq);
}
// ─── AgentLiveCard (real-time view) ────────────────────────────────────────
// ─── Per-task state ─────────────────────────────────────────────────────────
interface TaskState {
task: AgentTask;
items: TimelineItem[];
}
// ─── AgentLiveCard (real-time view for multiple agents) ───────────────────
interface AgentLiveCardProps {
issueId: string;
agentName?: string;
/** Scroll container ref — used to auto-collapse timeline on outer scroll. */
scrollContainerRef?: React.RefObject<HTMLDivElement | null>;
}
export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentLiveCardProps) {
export function AgentLiveCard({ issueId, scrollContainerRef }: AgentLiveCardProps) {
const { getActorName } = useActorName();
const [activeTask, setActiveTask] = useState<AgentTask | null>(null);
const [items, setItems] = useState<TimelineItem[]>([]);
const [elapsed, setElapsed] = useState("");
const [open, setOpen] = useState(false);
const [autoScroll, setAutoScroll] = useState(true);
const [cancelling, setCancelling] = useState(false);
const scrollRef = useRef<HTMLDivElement>(null);
const ignoreScrollRef = useRef(false);
const [taskStates, setTaskStates] = useState<Map<string, TaskState>>(new Map());
const seenSeqs = useRef(new Set<string>());
// Check for active task on mount
// Fetch active tasks on mount
useEffect(() => {
let cancelled = false;
api.getActiveTaskForIssue(issueId).then(({ task }) => {
if (!cancelled) {
setActiveTask(task);
if (task) {
api.listTaskMessages(task.id).then((msgs) => {
if (!cancelled) {
const timeline = buildTimeline(msgs);
setItems(timeline);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
}
}).catch(console.error);
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
if (cancelled || tasks.length === 0) return;
const newStates = new Map<string, TaskState>();
const loadPromises = tasks.map(async (task) => {
try {
const msgs = await api.listTaskMessages(task.id);
const timeline = buildTimeline(msgs);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
newStates.set(task.id, { task, items: timeline });
} catch {
newStates.set(task.id, { task, items: [] });
}
}
});
Promise.all(loadPromises).then(() => {
if (!cancelled) setTaskStates(newStates);
});
}).catch(console.error);
return () => { cancelled = true; };
}, [issueId]);
// Handle real-time task messages
// Handle real-time task messages — route by task_id
useWSEvent(
"task:message",
useCallback((payload: unknown) => {
@ -147,64 +149,109 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
if (seenSeqs.current.has(key)) return;
seenSeqs.current.add(key);
setItems((prev) => {
const item: TimelineItem = {
seq: msg.seq,
type: msg.type,
tool: msg.tool,
content: msg.content,
input: msg.input,
output: msg.output,
};
const next = [...prev, item];
next.sort((a, b) => a.seq - b.seq);
const item: TimelineItem = {
seq: msg.seq,
type: msg.type,
tool: msg.tool,
content: msg.content,
input: msg.input,
output: msg.output,
};
setTaskStates((prev) => {
const next = new Map(prev);
const existing = next.get(msg.task_id);
if (existing) {
const items = [...existing.items, item].sort((a, b) => a.seq - b.seq);
next.set(msg.task_id, { ...existing, items });
}
// If we don't have this task yet, the dispatch handler will pick it up
return next;
});
}, [issueId]),
);
// Handle task completion/failure/cancellation
// Handle task end events — remove only the specific task
const handleTaskEnd = useCallback((payload: unknown) => {
const p = payload as { issue_id: string };
const p = payload as { task_id: string; issue_id: string };
if (p.issue_id !== issueId) return;
setActiveTask(null);
setItems([]);
seenSeqs.current.clear();
setCancelling(false);
setOpen(false);
setTaskStates((prev) => {
const next = new Map(prev);
next.delete(p.task_id);
return next;
});
}, [issueId]);
useWSEvent("task:completed", handleTaskEnd);
useWSEvent("task:failed", handleTaskEnd);
useWSEvent("task:cancelled", handleTaskEnd);
// Pick up new tasks
// Pick up newly dispatched tasks
useWSEvent(
"task:dispatch",
useCallback(() => {
if (activeTask) return;
api.getActiveTaskForIssue(issueId).then(({ task }) => {
if (task) {
setActiveTask(task);
setItems([]);
seenSeqs.current.clear();
setOpen(false);
}
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
setTaskStates((prev) => {
const next = new Map(prev);
for (const task of tasks) {
if (!next.has(task.id)) {
next.set(task.id, { task, items: [] });
}
}
return next;
});
}).catch(console.error);
}, [issueId, activeTask]),
}, [issueId]),
);
if (taskStates.size === 0) return null;
const entries = Array.from(taskStates.values());
return (
<div className="mt-4 space-y-2">
{entries.map(({ task, items }) => (
<SingleAgentLiveCard
key={task.id}
task={task}
items={items}
issueId={issueId}
agentName={task.agent_id ? getActorName("agent", task.agent_id) : "Agent"}
scrollContainerRef={scrollContainerRef}
/>
))}
</div>
);
}
// ─── SingleAgentLiveCard (one card per running task) ──────────────────────
interface SingleAgentLiveCardProps {
task: AgentTask;
items: TimelineItem[];
issueId: string;
agentName: string;
scrollContainerRef?: React.RefObject<HTMLDivElement | null>;
}
function SingleAgentLiveCard({ task, items, issueId, agentName, scrollContainerRef }: SingleAgentLiveCardProps) {
const [elapsed, setElapsed] = useState("");
const [open, setOpen] = useState(false);
const [autoScroll, setAutoScroll] = useState(true);
const [cancelling, setCancelling] = useState(false);
const scrollRef = useRef<HTMLDivElement>(null);
const ignoreScrollRef = useRef(false);
// Elapsed time
useEffect(() => {
if (!activeTask?.started_at && !activeTask?.dispatched_at) return;
const startRef = activeTask.started_at ?? activeTask.dispatched_at!;
if (!task.started_at && !task.dispatched_at) return;
const startRef = task.started_at ?? task.dispatched_at!;
setElapsed(formatElapsed(startRef));
const interval = setInterval(() => setElapsed(formatElapsed(startRef)), 1000);
return () => clearInterval(interval);
}, [activeTask?.started_at, activeTask?.dispatched_at]);
}, [task.started_at, task.dispatched_at]);
// Auto-collapse timeline when outer scroll container scrolls
// (ignoreScrollRef prevents layout-induced scroll from collapsing right after expand)
useEffect(() => {
const container = scrollContainerRef?.current;
if (!container) return;
@ -240,23 +287,20 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
}, [open]);
const handleCancel = useCallback(async () => {
if (!activeTask || cancelling) return;
if (cancelling) return;
setCancelling(true);
try {
await api.cancelTask(issueId, activeTask.id);
await api.cancelTask(issueId, task.id);
} catch (e) {
toast.error(e instanceof Error ? e.message : "Failed to cancel task");
setCancelling(false);
}
}, [activeTask, issueId, cancelling]);
if (!activeTask) return null;
}, [task.id, issueId, cancelling]);
const toolCount = items.filter((i) => i.type === "tool_use").length;
const name = (activeTask.agent_id ? getActorName("agent", activeTask.agent_id) : agentName) ?? "Agent";
return (
<div className="mt-4 sticky top-4 z-10 rounded-lg border border-info/20 bg-info/5 backdrop-blur-sm">
<div className="sticky top-4 z-10 rounded-lg border border-info/20 bg-info/5 backdrop-blur-sm">
{/* Header — click to toggle timeline */}
<div
className="group flex items-center gap-2 px-3 py-2 cursor-pointer select-none text-muted-foreground hover:text-foreground transition-colors"
@ -271,8 +315,8 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
}
}}
>
{activeTask.agent_id ? (
<ActorAvatar actorType="agent" actorId={activeTask.agent_id} size={20} />
{task.agent_id ? (
<ActorAvatar actorType="agent" actorId={task.agent_id} size={20} />
) : (
<div className="flex items-center justify-center h-5 w-5 rounded-full shrink-0 bg-info/10 text-info">
<Bot className="h-3 w-3" />
@ -280,7 +324,7 @@ export function AgentLiveCard({ issueId, agentName, scrollContainerRef }: AgentL
)}
<div className="flex items-center gap-1.5 text-xs min-w-0">
<Loader2 className="h-3 w-3 animate-spin text-info shrink-0" />
<span className="font-medium text-foreground truncate">{name} is working</span>
<span className="font-medium text-foreground truncate">{agentName} is working</span>
<span className="text-muted-foreground tabular-nums shrink-0">{elapsed}</span>
{toolCount > 0 && (
<span className="text-muted-foreground shrink-0">{toolCount} tools</span>

View file

@ -759,7 +759,6 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
{/* Agent live output */}
<AgentLiveCard
issueId={id}
agentName={issue.assignee_type === "agent" && issue.assignee_id ? getActorName("agent", issue.assignee_id) : undefined}
scrollContainerRef={scrollContainerRef}
/>

View file

@ -380,7 +380,7 @@ export class ApiClient {
return this.fetch(`/api/agents/${agentId}/tasks`);
}
async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> {
async getActiveTasksForIssue(issueId: string): Promise<{ tasks: AgentTask[] }> {
return this.fetch(`/api/issues/${issueId}/active-task`);
}

View file

@ -536,17 +536,22 @@ func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp)
}
// GetActiveTaskForIssue returns the currently running task for an issue, if any.
// GetActiveTaskForIssue returns all currently active tasks for an issue.
// Returns { tasks: [...] } array (may be empty).
func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID))
if err != nil || len(tasks) == 0 {
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
return
if err != nil {
tasks = nil
}
writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])})
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": resp})
}
// CancelTask cancels a running or queued task by ID.

View file

@ -549,9 +549,12 @@ func (h *Handler) shouldEnqueueOnComment(ctx context.Context, issue db.Issue) bo
return false
}
// Coalescing queue: allow enqueue when a task is running (so the agent
// picks up new comments on the next cycle) but skip if a pending task
// already exists (natural dedup for rapid-fire comments).
hasPending, err := h.Queries.HasPendingTaskForIssue(ctx, issue.ID)
// picks up new comments on the next cycle) but skip if this agent already
// has a pending task (natural dedup for rapid-fire comments).
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
IssueID: issue.ID,
AgentID: issue.AssigneeID,
})
if err != nil || hasPending {
return false
}

View file

@ -109,6 +109,7 @@ WHERE id = (
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.issue_id = atq.issue_id
AND active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running')
)
ORDER BY atq.priority DESC, atq.created_at ASC
@ -118,10 +119,10 @@ WHERE id = (
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
`
// Claims the next queued task for an agent, enforcing per-issue serialization:
// a task is only claimable when no other task for the same issue is already
// dispatched or running. This guarantees serial execution within an issue
// while allowing parallel execution across different issues.
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
// a task is only claimable when no other task for the same issue AND same agent is
// already dispatched or running. This allows different agents to work on the same
// issue in parallel while preventing a single agent from running duplicate tasks.
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, claimAgentTask, agentID)
var i AgentTaskQueue

View file

@ -75,10 +75,10 @@ SELECT * FROM agent_task_queue
WHERE id = $1;
-- name: ClaimAgentTask :one
-- Claims the next queued task for an agent, enforcing per-issue serialization:
-- a task is only claimable when no other task for the same issue is already
-- dispatched or running. This guarantees serial execution within an issue
-- while allowing parallel execution across different issues.
-- Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
-- a task is only claimable when no other task for the same issue AND same agent is
-- already dispatched or running. This allows different agents to work on the same
-- issue in parallel while preventing a single agent from running duplicate tasks.
UPDATE agent_task_queue
SET status = 'dispatched', dispatched_at = now()
WHERE id = (
@ -87,6 +87,7 @@ WHERE id = (
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.issue_id = atq.issue_id
AND active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running')
)
ORDER BY atq.priority DESC, atq.created_at ASC