Merge pull request #313 from multica-ai/agent/lambda/9c6590aa
fix(server): broadcast sweeper task:failed events to correct workspace
This commit is contained in:
commit
7b19ad7ccc
2 changed files with 397 additions and 15 deletions
|
|
@ -5,6 +5,7 @@ import (
|
|||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
|
|
@ -72,18 +73,7 @@ func sweepStaleRuntimes(ctx context.Context, queries *db.Queries, bus *events.Bu
|
|||
slog.Warn("runtime sweeper: failed to clean up stale tasks", "error", err)
|
||||
} else if len(failedTasks) > 0 {
|
||||
slog.Info("runtime sweeper: failed orphaned tasks", "count", len(failedTasks))
|
||||
for _, ft := range failedTasks {
|
||||
bus.Publish(events.Event{
|
||||
Type: protocol.EventTaskFailed,
|
||||
ActorType: "system",
|
||||
Payload: map[string]any{
|
||||
"task_id": util.UUIDToString(ft.ID),
|
||||
"agent_id": util.UUIDToString(ft.AgentID),
|
||||
"issue_id": util.UUIDToString(ft.IssueID),
|
||||
"status": "failed",
|
||||
},
|
||||
})
|
||||
}
|
||||
broadcastFailedTasks(ctx, queries, bus, failedTasks)
|
||||
}
|
||||
|
||||
// Notify frontend clients so they re-fetch runtime list.
|
||||
|
|
@ -118,10 +108,44 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, bus *events.Bus)
|
|||
}
|
||||
|
||||
slog.Info("task sweeper: failed stale tasks", "count", len(failedTasks))
|
||||
for _, ft := range failedTasks {
|
||||
broadcastFailedTasks(ctx, queries, bus, failedTasks)
|
||||
}
|
||||
|
||||
// failedTask is a common interface for both sweeper result types.
|
||||
type failedTask struct {
|
||||
ID pgtype.UUID
|
||||
AgentID pgtype.UUID
|
||||
IssueID pgtype.UUID
|
||||
}
|
||||
|
||||
// broadcastFailedTasks publishes task:failed events with the correct WorkspaceID
|
||||
// and reconciles agent status for all affected agents.
|
||||
func broadcastFailedTasks(ctx context.Context, queries *db.Queries, bus *events.Bus, tasks any) {
|
||||
var items []failedTask
|
||||
switch ts := tasks.(type) {
|
||||
case []db.FailStaleTasksRow:
|
||||
for _, t := range ts {
|
||||
items = append(items, failedTask{ID: t.ID, AgentID: t.AgentID, IssueID: t.IssueID})
|
||||
}
|
||||
case []db.FailTasksForOfflineRuntimesRow:
|
||||
for _, t := range ts {
|
||||
items = append(items, failedTask{ID: t.ID, AgentID: t.AgentID, IssueID: t.IssueID})
|
||||
}
|
||||
}
|
||||
|
||||
affectedAgents := make(map[string]pgtype.UUID)
|
||||
|
||||
for _, ft := range items {
|
||||
// Look up workspace ID from the issue so the event reaches the right WS room.
|
||||
workspaceID := ""
|
||||
if issue, err := queries.GetIssue(ctx, ft.IssueID); err == nil {
|
||||
workspaceID = util.UUIDToString(issue.WorkspaceID)
|
||||
}
|
||||
|
||||
bus.Publish(events.Event{
|
||||
Type: protocol.EventTaskFailed,
|
||||
ActorType: "system",
|
||||
Type: protocol.EventTaskFailed,
|
||||
WorkspaceID: workspaceID,
|
||||
ActorType: "system",
|
||||
Payload: map[string]any{
|
||||
"task_id": util.UUIDToString(ft.ID),
|
||||
"agent_id": util.UUIDToString(ft.AgentID),
|
||||
|
|
@ -129,5 +153,38 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, bus *events.Bus)
|
|||
"status": "failed",
|
||||
},
|
||||
})
|
||||
|
||||
agentKey := util.UUIDToString(ft.AgentID)
|
||||
affectedAgents[agentKey] = ft.AgentID
|
||||
}
|
||||
|
||||
// Reconcile status for each affected agent.
|
||||
for _, agentID := range affectedAgents {
|
||||
reconcileAgentStatus(ctx, queries, bus, agentID)
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileAgentStatus checks running task count and updates agent status.
|
||||
func reconcileAgentStatus(ctx context.Context, queries *db.Queries, bus *events.Bus, agentID pgtype.UUID) {
|
||||
running, err := queries.CountRunningTasks(ctx, agentID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
newStatus := "idle"
|
||||
if running > 0 {
|
||||
newStatus = "working"
|
||||
}
|
||||
agent, err := queries.UpdateAgentStatus(ctx, db.UpdateAgentStatusParams{
|
||||
ID: agentID,
|
||||
Status: newStatus,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
bus.Publish(events.Event{
|
||||
Type: protocol.EventAgentStatus,
|
||||
WorkspaceID: util.UUIDToString(agent.WorkspaceID),
|
||||
ActorType: "system",
|
||||
Payload: map[string]any{"agent_id": util.UUIDToString(agent.ID), "status": agent.Status},
|
||||
})
|
||||
}
|
||||
|
|
|
|||
325
server/cmd/server/runtime_sweeper_test.go
Normal file
325
server/cmd/server/runtime_sweeper_test.go
Normal file
|
|
@ -0,0 +1,325 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// setupSweeperTestFixture creates an issue and a task in the given status with
|
||||
// timestamps old enough to trigger the sweeper. Returns (issueID, agentID, taskID).
|
||||
func setupSweeperTestFixture(t *testing.T, taskStatus string) (string, string, string) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
// Find the integration test agent
|
||||
var agentID, runtimeID string
|
||||
err := testPool.QueryRow(ctx, `
|
||||
SELECT a.id, a.runtime_id FROM agent a
|
||||
JOIN member m ON m.workspace_id = a.workspace_id
|
||||
JOIN "user" u ON u.id = m.user_id
|
||||
WHERE u.email = $1
|
||||
LIMIT 1
|
||||
`, integrationTestEmail).Scan(&agentID, &runtimeID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to find test agent: %v", err)
|
||||
}
|
||||
|
||||
// Create an issue assigned to the agent
|
||||
var issueID string
|
||||
err = testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id)
|
||||
SELECT $1, 'Sweeper test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2
|
||||
FROM member m WHERE m.workspace_id = $1 LIMIT 1
|
||||
RETURNING id
|
||||
`, testWorkspaceID, agentID).Scan(&issueID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create test issue: %v", err)
|
||||
}
|
||||
|
||||
// Create a task in the desired status with old timestamps
|
||||
var taskID string
|
||||
switch taskStatus {
|
||||
case "running":
|
||||
err = testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, dispatched_at, started_at)
|
||||
VALUES ($1, $2, $3, 'running', 0, now() - interval '3 hours', now() - interval '3 hours')
|
||||
RETURNING id
|
||||
`, agentID, runtimeID, issueID).Scan(&taskID)
|
||||
case "dispatched":
|
||||
err = testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, dispatched_at)
|
||||
VALUES ($1, $2, $3, 'dispatched', 0, now() - interval '10 minutes')
|
||||
RETURNING id
|
||||
`, agentID, runtimeID, issueID).Scan(&taskID)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create test task: %v", err)
|
||||
}
|
||||
|
||||
// Set agent status to "working"
|
||||
_, err = testPool.Exec(ctx, `UPDATE agent SET status = 'working' WHERE id = $1`, agentID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to set agent status: %v", err)
|
||||
}
|
||||
|
||||
return issueID, agentID, taskID
|
||||
}
|
||||
|
||||
func cleanupSweeperFixture(t *testing.T, issueID, agentID string) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, issueID)
|
||||
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
testPool.Exec(ctx, `UPDATE agent SET status = 'idle' WHERE id = $1`, agentID)
|
||||
}
|
||||
|
||||
// TestSweepStaleTasksBroadcastsWithWorkspaceID verifies that when the task sweeper
|
||||
// fails a stale running task, the task:failed event is broadcast with the correct
|
||||
// WorkspaceID so it reaches frontend WebSocket clients (events without WorkspaceID
|
||||
// are silently dropped by the WS listener — that was the original bug).
|
||||
func TestSweepStaleTasksBroadcastsWithWorkspaceID(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, agentID, taskID := setupSweeperTestFixture(t, "running")
|
||||
t.Cleanup(func() { cleanupSweeperFixture(t, issueID, agentID) })
|
||||
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
|
||||
// Capture task:failed events to verify WorkspaceID is set
|
||||
var taskEvents []events.Event
|
||||
var mu sync.Mutex
|
||||
bus.Subscribe("task:failed", func(e events.Event) {
|
||||
mu.Lock()
|
||||
taskEvents = append(taskEvents, e)
|
||||
mu.Unlock()
|
||||
})
|
||||
|
||||
// Use very short timeouts to trigger the sweep on our test task
|
||||
failedTasks, err := queries.FailStaleTasks(context.Background(), db.FailStaleTasksParams{
|
||||
DispatchTimeoutSecs: 300.0,
|
||||
RunningTimeoutSecs: 1.0, // 1 second — our task is 3 hours old
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("FailStaleTasks query failed: %v", err)
|
||||
}
|
||||
if len(failedTasks) == 0 {
|
||||
t.Fatal("expected at least 1 stale task to be failed")
|
||||
}
|
||||
|
||||
// Verify our task was included
|
||||
found := false
|
||||
for _, ft := range failedTasks {
|
||||
if ft.ID.Bytes == parseUUIDBytes(taskID) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("expected task %s to be in failed tasks list", taskID)
|
||||
}
|
||||
|
||||
// Call broadcastFailedTasks — this is what we're testing
|
||||
broadcastFailedTasks(context.Background(), queries, bus, failedTasks)
|
||||
|
||||
// Verify the event was published with WorkspaceID (the core of the bug fix)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
var foundEvent bool
|
||||
for _, e := range taskEvents {
|
||||
payload, _ := e.Payload.(map[string]any)
|
||||
if payload["task_id"] == taskID {
|
||||
if e.WorkspaceID == "" {
|
||||
t.Fatal("task:failed event is missing WorkspaceID — this was the original bug")
|
||||
}
|
||||
if e.WorkspaceID != testWorkspaceID {
|
||||
t.Fatalf("expected WorkspaceID %s, got %s", testWorkspaceID, e.WorkspaceID)
|
||||
}
|
||||
foundEvent = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundEvent {
|
||||
t.Fatalf("expected task:failed event for task %s", taskID)
|
||||
}
|
||||
|
||||
// Verify DB: task should be failed
|
||||
var status string
|
||||
err = testPool.QueryRow(context.Background(), `SELECT status FROM agent_task_queue WHERE id = $1`, taskID).Scan(&status)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to query task status: %v", err)
|
||||
}
|
||||
if status != "failed" {
|
||||
t.Fatalf("expected task status 'failed', got '%s'", status)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweepStaleTasksReconcileAgentStatus verifies that after the sweeper fails
|
||||
// stale tasks, the agent status is reconciled from "working" back to "idle".
|
||||
func TestSweepStaleTasksReconcileAgentStatus(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, agentID, _ := setupSweeperTestFixture(t, "running")
|
||||
t.Cleanup(func() { cleanupSweeperFixture(t, issueID, agentID) })
|
||||
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
|
||||
// Capture agent:status events
|
||||
var agentStatusEvents []events.Event
|
||||
var mu sync.Mutex
|
||||
bus.Subscribe("agent:status", func(e events.Event) {
|
||||
mu.Lock()
|
||||
agentStatusEvents = append(agentStatusEvents, e)
|
||||
mu.Unlock()
|
||||
})
|
||||
|
||||
// Fail stale tasks with short timeout
|
||||
failedTasks, err := queries.FailStaleTasks(context.Background(), db.FailStaleTasksParams{
|
||||
DispatchTimeoutSecs: 300.0,
|
||||
RunningTimeoutSecs: 1.0,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("FailStaleTasks failed: %v", err)
|
||||
}
|
||||
if len(failedTasks) == 0 {
|
||||
t.Fatal("expected at least 1 stale task")
|
||||
}
|
||||
|
||||
broadcastFailedTasks(context.Background(), queries, bus, failedTasks)
|
||||
|
||||
// Verify agent status is now "idle" in DB
|
||||
var agentStatus string
|
||||
err = testPool.QueryRow(context.Background(), `SELECT status FROM agent WHERE id = $1`, agentID).Scan(&agentStatus)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to query agent status: %v", err)
|
||||
}
|
||||
if agentStatus != "idle" {
|
||||
t.Fatalf("expected agent status 'idle', got '%s'", agentStatus)
|
||||
}
|
||||
|
||||
// Verify agent:status event was published with correct WorkspaceID
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if len(agentStatusEvents) == 0 {
|
||||
t.Fatal("expected agent:status event to be published")
|
||||
}
|
||||
lastEvent := agentStatusEvents[len(agentStatusEvents)-1]
|
||||
if lastEvent.WorkspaceID == "" {
|
||||
t.Fatal("agent:status event should have WorkspaceID set")
|
||||
}
|
||||
if lastEvent.WorkspaceID != testWorkspaceID {
|
||||
t.Fatalf("expected WorkspaceID %s, got %s", testWorkspaceID, lastEvent.WorkspaceID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweepDispatchedStaleTask verifies the sweeper handles dispatched tasks
|
||||
// stuck beyond the dispatch timeout.
|
||||
func TestSweepDispatchedStaleTask(t *testing.T) {
|
||||
if testPool == nil {
|
||||
t.Skip("no database connection")
|
||||
}
|
||||
|
||||
issueID, agentID, taskID := setupSweeperTestFixture(t, "dispatched")
|
||||
t.Cleanup(func() { cleanupSweeperFixture(t, issueID, agentID) })
|
||||
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
|
||||
// Capture task:failed events
|
||||
var taskEvents []events.Event
|
||||
var mu sync.Mutex
|
||||
bus.Subscribe("task:failed", func(e events.Event) {
|
||||
mu.Lock()
|
||||
taskEvents = append(taskEvents, e)
|
||||
mu.Unlock()
|
||||
})
|
||||
|
||||
// Fail stale tasks — dispatch timeout of 1 second (our task is 10 minutes old)
|
||||
failedTasks, err := queries.FailStaleTasks(context.Background(), db.FailStaleTasksParams{
|
||||
DispatchTimeoutSecs: 1.0,
|
||||
RunningTimeoutSecs: 9000.0,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("FailStaleTasks failed: %v", err)
|
||||
}
|
||||
if len(failedTasks) == 0 {
|
||||
t.Fatal("expected at least 1 stale dispatched task")
|
||||
}
|
||||
|
||||
broadcastFailedTasks(context.Background(), queries, bus, failedTasks)
|
||||
|
||||
// Verify DB: task should be failed
|
||||
var status string
|
||||
err = testPool.QueryRow(context.Background(), `SELECT status FROM agent_task_queue WHERE id = $1`, taskID).Scan(&status)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to query task: %v", err)
|
||||
}
|
||||
if status != "failed" {
|
||||
t.Fatalf("expected task status 'failed', got '%s'", status)
|
||||
}
|
||||
|
||||
// Verify task:failed event was published WITH WorkspaceID
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
found := false
|
||||
for _, e := range taskEvents {
|
||||
payload, _ := e.Payload.(map[string]any)
|
||||
if payload["task_id"] == taskID {
|
||||
if e.WorkspaceID == "" {
|
||||
t.Fatal("task:failed event is missing WorkspaceID — this was the bug")
|
||||
}
|
||||
if e.WorkspaceID != testWorkspaceID {
|
||||
t.Fatalf("expected WorkspaceID %s, got %s", testWorkspaceID, e.WorkspaceID)
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("expected task:failed event for task %s", taskID)
|
||||
}
|
||||
|
||||
// Verify agent status reconciled to idle
|
||||
var agentStatus string
|
||||
err = testPool.QueryRow(context.Background(), `SELECT status FROM agent WHERE id = $1`, agentID).Scan(&agentStatus)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to query agent: %v", err)
|
||||
}
|
||||
if agentStatus != "idle" {
|
||||
t.Fatalf("expected agent status 'idle' after sweep, got '%s'", agentStatus)
|
||||
}
|
||||
}
|
||||
|
||||
// parseUUIDBytes converts a UUID string to the 16-byte array used by pgtype.UUID.
|
||||
func parseUUIDBytes(s string) [16]byte {
|
||||
s = strings.ReplaceAll(s, "-", "")
|
||||
var b [16]byte
|
||||
for i := 0; i < 16; i++ {
|
||||
hi := unhex(s[i*2])
|
||||
lo := unhex(s[i*2+1])
|
||||
b[i] = hi<<4 | lo
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func unhex(c byte) byte {
|
||||
switch {
|
||||
case c >= '0' && c <= '9':
|
||||
return c - '0'
|
||||
case c >= 'a' && c <= 'f':
|
||||
return c - 'a' + 10
|
||||
case c >= 'A' && c <= 'F':
|
||||
return c - 'A' + 10
|
||||
}
|
||||
return 0
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue