From a80d61f8e1ae8bea931491af6433b5f3180a37c9 Mon Sep 17 00:00:00 2001 From: LinYushen Date: Thu, 2 Apr 2026 14:12:00 +0800 Subject: [PATCH] fix(task): enforce per-issue serial execution in task claiming (#330) Add NOT EXISTS check to ClaimAgentTask SQL to prevent claiming a queued task when the same issue already has a dispatched/running task. This ensures serial execution within an issue while preserving parallel execution across different issues (concurrency group pattern). Also add defensive guard in the frontend task:dispatch handler to avoid replacing an active task's LiveLog timeline mid-execution. Closes MUL-183 Co-authored-by: Claude Opus 4.6 (1M context) --- apps/web/features/issues/components/agent-live-card.tsx | 7 +++++-- server/pkg/db/generated/agent.sql.go | 9 +++++++++ server/pkg/db/queries/agent.sql | 9 +++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/apps/web/features/issues/components/agent-live-card.tsx b/apps/web/features/issues/components/agent-live-card.tsx index 6917fc0f..aff2a124 100644 --- a/apps/web/features/issues/components/agent-live-card.tsx +++ b/apps/web/features/issues/components/agent-live-card.tsx @@ -194,10 +194,13 @@ export function AgentLiveCard({ issueId, agentName }: AgentLiveCardProps) { }, [issueId]), ); - // Pick up new tasks + // Pick up new tasks — skip if we're already showing an active task to avoid + // replacing its timeline mid-execution (per-issue serialization in the + // backend prevents this race, but this is a defensive safeguard). useWSEvent( "task:dispatch", useCallback(() => { + if (activeTask) return; api.getActiveTaskForIssue(issueId).then(({ task }) => { if (task) { setActiveTask(task); @@ -205,7 +208,7 @@ export function AgentLiveCard({ issueId, agentName }: AgentLiveCardProps) { seenSeqs.current.clear(); } }).catch(() => {}); - }, [issueId]), + }, [issueId, activeTask]), ); // Elapsed time diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index befca00e..06984472 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -59,6 +59,11 @@ SET status = 'dispatched', dispatched_at = now() WHERE id = ( SELECT atq.id FROM agent_task_queue atq WHERE atq.agent_id = $1 AND atq.status = 'queued' + AND NOT EXISTS ( + SELECT 1 FROM agent_task_queue active + WHERE active.issue_id = atq.issue_id + AND active.status IN ('dispatched', 'running') + ) ORDER BY atq.priority DESC, atq.created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED @@ -66,6 +71,10 @@ WHERE id = ( RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id ` +// Claims the next queued task for an agent, enforcing per-issue serialization: +// a task is only claimable when no other task for the same issue is already +// dispatched or running. This guarantees serial execution within an issue +// while allowing parallel execution across different issues. func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) { row := q.db.QueryRow(ctx, claimAgentTask, agentID) var i AgentTaskQueue diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 4511200b..b55476b8 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -60,11 +60,20 @@ SELECT * FROM agent_task_queue WHERE id = $1; -- name: ClaimAgentTask :one +-- Claims the next queued task for an agent, enforcing per-issue serialization: +-- a task is only claimable when no other task for the same issue is already +-- dispatched or running. This guarantees serial execution within an issue +-- while allowing parallel execution across different issues. UPDATE agent_task_queue SET status = 'dispatched', dispatched_at = now() WHERE id = ( SELECT atq.id FROM agent_task_queue atq WHERE atq.agent_id = $1 AND atq.status = 'queued' + AND NOT EXISTS ( + SELECT 1 FROM agent_task_queue active + WHERE active.issue_id = atq.issue_id + AND active.status IN ('dispatched', 'running') + ) ORDER BY atq.priority DESC, atq.created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED