diff --git a/server/cmd/server/runtime_sweeper.go b/server/cmd/server/runtime_sweeper.go index 2e58c0bf..fdb6b6d5 100644 --- a/server/cmd/server/runtime_sweeper.go +++ b/server/cmd/server/runtime_sweeper.go @@ -5,6 +5,7 @@ import ( "log/slog" "time" + "github.com/jackc/pgx/v5/pgtype" "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/util" db "github.com/multica-ai/multica/server/pkg/db/generated" @@ -72,18 +73,7 @@ func sweepStaleRuntimes(ctx context.Context, queries *db.Queries, bus *events.Bu slog.Warn("runtime sweeper: failed to clean up stale tasks", "error", err) } else if len(failedTasks) > 0 { slog.Info("runtime sweeper: failed orphaned tasks", "count", len(failedTasks)) - for _, ft := range failedTasks { - bus.Publish(events.Event{ - Type: protocol.EventTaskFailed, - ActorType: "system", - Payload: map[string]any{ - "task_id": util.UUIDToString(ft.ID), - "agent_id": util.UUIDToString(ft.AgentID), - "issue_id": util.UUIDToString(ft.IssueID), - "status": "failed", - }, - }) - } + broadcastFailedTasks(ctx, queries, bus, failedTasks) } // Notify frontend clients so they re-fetch runtime list. @@ -118,10 +108,44 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, bus *events.Bus) } slog.Info("task sweeper: failed stale tasks", "count", len(failedTasks)) - for _, ft := range failedTasks { + broadcastFailedTasks(ctx, queries, bus, failedTasks) +} + +// failedTask is a common interface for both sweeper result types. +type failedTask struct { + ID pgtype.UUID + AgentID pgtype.UUID + IssueID pgtype.UUID +} + +// broadcastFailedTasks publishes task:failed events with the correct WorkspaceID +// and reconciles agent status for all affected agents. +func broadcastFailedTasks(ctx context.Context, queries *db.Queries, bus *events.Bus, tasks any) { + var items []failedTask + switch ts := tasks.(type) { + case []db.FailStaleTasksRow: + for _, t := range ts { + items = append(items, failedTask{ID: t.ID, AgentID: t.AgentID, IssueID: t.IssueID}) + } + case []db.FailTasksForOfflineRuntimesRow: + for _, t := range ts { + items = append(items, failedTask{ID: t.ID, AgentID: t.AgentID, IssueID: t.IssueID}) + } + } + + affectedAgents := make(map[string]pgtype.UUID) + + for _, ft := range items { + // Look up workspace ID from the issue so the event reaches the right WS room. + workspaceID := "" + if issue, err := queries.GetIssue(ctx, ft.IssueID); err == nil { + workspaceID = util.UUIDToString(issue.WorkspaceID) + } + bus.Publish(events.Event{ - Type: protocol.EventTaskFailed, - ActorType: "system", + Type: protocol.EventTaskFailed, + WorkspaceID: workspaceID, + ActorType: "system", Payload: map[string]any{ "task_id": util.UUIDToString(ft.ID), "agent_id": util.UUIDToString(ft.AgentID), @@ -129,5 +153,38 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, bus *events.Bus) "status": "failed", }, }) + + agentKey := util.UUIDToString(ft.AgentID) + affectedAgents[agentKey] = ft.AgentID + } + + // Reconcile status for each affected agent. + for _, agentID := range affectedAgents { + reconcileAgentStatus(ctx, queries, bus, agentID) } } + +// reconcileAgentStatus checks running task count and updates agent status. +func reconcileAgentStatus(ctx context.Context, queries *db.Queries, bus *events.Bus, agentID pgtype.UUID) { + running, err := queries.CountRunningTasks(ctx, agentID) + if err != nil { + return + } + newStatus := "idle" + if running > 0 { + newStatus = "working" + } + agent, err := queries.UpdateAgentStatus(ctx, db.UpdateAgentStatusParams{ + ID: agentID, + Status: newStatus, + }) + if err != nil { + return + } + bus.Publish(events.Event{ + Type: protocol.EventAgentStatus, + WorkspaceID: util.UUIDToString(agent.WorkspaceID), + ActorType: "system", + Payload: map[string]any{"agent_id": util.UUIDToString(agent.ID), "status": agent.Status}, + }) +}