From 60e66f32b3b58c309d7557ab6f1b9fce0416fe3b Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Thu, 2 Apr 2026 03:30:32 +0800 Subject: [PATCH] test(server): add integration tests for sweeper event broadcasting fix Tests verify: - Stale running tasks: task:failed event has WorkspaceID set - Agent status reconciliation: agent returns to "idle" after sweep - Stale dispatched tasks: same correctness for dispatch timeout path --- server/cmd/server/runtime_sweeper_test.go | 325 ++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 server/cmd/server/runtime_sweeper_test.go diff --git a/server/cmd/server/runtime_sweeper_test.go b/server/cmd/server/runtime_sweeper_test.go new file mode 100644 index 00000000..22404760 --- /dev/null +++ b/server/cmd/server/runtime_sweeper_test.go @@ -0,0 +1,325 @@ +package main + +import ( + "context" + "strings" + "sync" + "testing" + + "github.com/multica-ai/multica/server/internal/events" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// setupSweeperTestFixture creates an issue and a task in the given status with +// timestamps old enough to trigger the sweeper. Returns (issueID, agentID, taskID). +func setupSweeperTestFixture(t *testing.T, taskStatus string) (string, string, string) { + t.Helper() + ctx := context.Background() + + // Find the integration test agent + var agentID, runtimeID string + err := testPool.QueryRow(ctx, ` + SELECT a.id, a.runtime_id FROM agent a + JOIN member m ON m.workspace_id = a.workspace_id + JOIN "user" u ON u.id = m.user_id + WHERE u.email = $1 + LIMIT 1 + `, integrationTestEmail).Scan(&agentID, &runtimeID) + if err != nil { + t.Fatalf("failed to find test agent: %v", err) + } + + // Create an issue assigned to the agent + var issueID string + err = testPool.QueryRow(ctx, ` + INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id) + SELECT $1, 'Sweeper test issue', 'todo', 'none', 'member', m.user_id, 'agent', $2 + FROM member m WHERE m.workspace_id = $1 LIMIT 1 + RETURNING id + `, testWorkspaceID, agentID).Scan(&issueID) + if err != nil { + t.Fatalf("failed to create test issue: %v", err) + } + + // Create a task in the desired status with old timestamps + var taskID string + switch taskStatus { + case "running": + err = testPool.QueryRow(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, dispatched_at, started_at) + VALUES ($1, $2, $3, 'running', 0, now() - interval '3 hours', now() - interval '3 hours') + RETURNING id + `, agentID, runtimeID, issueID).Scan(&taskID) + case "dispatched": + err = testPool.QueryRow(ctx, ` + INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, dispatched_at) + VALUES ($1, $2, $3, 'dispatched', 0, now() - interval '10 minutes') + RETURNING id + `, agentID, runtimeID, issueID).Scan(&taskID) + } + if err != nil { + t.Fatalf("failed to create test task: %v", err) + } + + // Set agent status to "working" + _, err = testPool.Exec(ctx, `UPDATE agent SET status = 'working' WHERE id = $1`, agentID) + if err != nil { + t.Fatalf("failed to set agent status: %v", err) + } + + return issueID, agentID, taskID +} + +func cleanupSweeperFixture(t *testing.T, issueID, agentID string) { + t.Helper() + ctx := context.Background() + testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) + testPool.Exec(ctx, `UPDATE agent SET status = 'idle' WHERE id = $1`, agentID) +} + +// TestSweepStaleTasksBroadcastsWithWorkspaceID verifies that when the task sweeper +// fails a stale running task, the task:failed event is broadcast with the correct +// WorkspaceID so it reaches frontend WebSocket clients (events without WorkspaceID +// are silently dropped by the WS listener — that was the original bug). +func TestSweepStaleTasksBroadcastsWithWorkspaceID(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, agentID, taskID := setupSweeperTestFixture(t, "running") + t.Cleanup(func() { cleanupSweeperFixture(t, issueID, agentID) }) + + queries := db.New(testPool) + bus := events.New() + + // Capture task:failed events to verify WorkspaceID is set + var taskEvents []events.Event + var mu sync.Mutex + bus.Subscribe("task:failed", func(e events.Event) { + mu.Lock() + taskEvents = append(taskEvents, e) + mu.Unlock() + }) + + // Use very short timeouts to trigger the sweep on our test task + failedTasks, err := queries.FailStaleTasks(context.Background(), db.FailStaleTasksParams{ + DispatchTimeoutSecs: 300.0, + RunningTimeoutSecs: 1.0, // 1 second — our task is 3 hours old + }) + if err != nil { + t.Fatalf("FailStaleTasks query failed: %v", err) + } + if len(failedTasks) == 0 { + t.Fatal("expected at least 1 stale task to be failed") + } + + // Verify our task was included + found := false + for _, ft := range failedTasks { + if ft.ID.Bytes == parseUUIDBytes(taskID) { + found = true + break + } + } + if !found { + t.Fatalf("expected task %s to be in failed tasks list", taskID) + } + + // Call broadcastFailedTasks — this is what we're testing + broadcastFailedTasks(context.Background(), queries, bus, failedTasks) + + // Verify the event was published with WorkspaceID (the core of the bug fix) + mu.Lock() + defer mu.Unlock() + var foundEvent bool + for _, e := range taskEvents { + payload, _ := e.Payload.(map[string]any) + if payload["task_id"] == taskID { + if e.WorkspaceID == "" { + t.Fatal("task:failed event is missing WorkspaceID — this was the original bug") + } + if e.WorkspaceID != testWorkspaceID { + t.Fatalf("expected WorkspaceID %s, got %s", testWorkspaceID, e.WorkspaceID) + } + foundEvent = true + break + } + } + if !foundEvent { + t.Fatalf("expected task:failed event for task %s", taskID) + } + + // Verify DB: task should be failed + var status string + err = testPool.QueryRow(context.Background(), `SELECT status FROM agent_task_queue WHERE id = $1`, taskID).Scan(&status) + if err != nil { + t.Fatalf("failed to query task status: %v", err) + } + if status != "failed" { + t.Fatalf("expected task status 'failed', got '%s'", status) + } +} + +// TestSweepStaleTasksReconcileAgentStatus verifies that after the sweeper fails +// stale tasks, the agent status is reconciled from "working" back to "idle". +func TestSweepStaleTasksReconcileAgentStatus(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, agentID, _ := setupSweeperTestFixture(t, "running") + t.Cleanup(func() { cleanupSweeperFixture(t, issueID, agentID) }) + + queries := db.New(testPool) + bus := events.New() + + // Capture agent:status events + var agentStatusEvents []events.Event + var mu sync.Mutex + bus.Subscribe("agent:status", func(e events.Event) { + mu.Lock() + agentStatusEvents = append(agentStatusEvents, e) + mu.Unlock() + }) + + // Fail stale tasks with short timeout + failedTasks, err := queries.FailStaleTasks(context.Background(), db.FailStaleTasksParams{ + DispatchTimeoutSecs: 300.0, + RunningTimeoutSecs: 1.0, + }) + if err != nil { + t.Fatalf("FailStaleTasks failed: %v", err) + } + if len(failedTasks) == 0 { + t.Fatal("expected at least 1 stale task") + } + + broadcastFailedTasks(context.Background(), queries, bus, failedTasks) + + // Verify agent status is now "idle" in DB + var agentStatus string + err = testPool.QueryRow(context.Background(), `SELECT status FROM agent WHERE id = $1`, agentID).Scan(&agentStatus) + if err != nil { + t.Fatalf("failed to query agent status: %v", err) + } + if agentStatus != "idle" { + t.Fatalf("expected agent status 'idle', got '%s'", agentStatus) + } + + // Verify agent:status event was published with correct WorkspaceID + mu.Lock() + defer mu.Unlock() + if len(agentStatusEvents) == 0 { + t.Fatal("expected agent:status event to be published") + } + lastEvent := agentStatusEvents[len(agentStatusEvents)-1] + if lastEvent.WorkspaceID == "" { + t.Fatal("agent:status event should have WorkspaceID set") + } + if lastEvent.WorkspaceID != testWorkspaceID { + t.Fatalf("expected WorkspaceID %s, got %s", testWorkspaceID, lastEvent.WorkspaceID) + } +} + +// TestSweepDispatchedStaleTask verifies the sweeper handles dispatched tasks +// stuck beyond the dispatch timeout. +func TestSweepDispatchedStaleTask(t *testing.T) { + if testPool == nil { + t.Skip("no database connection") + } + + issueID, agentID, taskID := setupSweeperTestFixture(t, "dispatched") + t.Cleanup(func() { cleanupSweeperFixture(t, issueID, agentID) }) + + queries := db.New(testPool) + bus := events.New() + + // Capture task:failed events + var taskEvents []events.Event + var mu sync.Mutex + bus.Subscribe("task:failed", func(e events.Event) { + mu.Lock() + taskEvents = append(taskEvents, e) + mu.Unlock() + }) + + // Fail stale tasks — dispatch timeout of 1 second (our task is 10 minutes old) + failedTasks, err := queries.FailStaleTasks(context.Background(), db.FailStaleTasksParams{ + DispatchTimeoutSecs: 1.0, + RunningTimeoutSecs: 9000.0, + }) + if err != nil { + t.Fatalf("FailStaleTasks failed: %v", err) + } + if len(failedTasks) == 0 { + t.Fatal("expected at least 1 stale dispatched task") + } + + broadcastFailedTasks(context.Background(), queries, bus, failedTasks) + + // Verify DB: task should be failed + var status string + err = testPool.QueryRow(context.Background(), `SELECT status FROM agent_task_queue WHERE id = $1`, taskID).Scan(&status) + if err != nil { + t.Fatalf("failed to query task: %v", err) + } + if status != "failed" { + t.Fatalf("expected task status 'failed', got '%s'", status) + } + + // Verify task:failed event was published WITH WorkspaceID + mu.Lock() + defer mu.Unlock() + found := false + for _, e := range taskEvents { + payload, _ := e.Payload.(map[string]any) + if payload["task_id"] == taskID { + if e.WorkspaceID == "" { + t.Fatal("task:failed event is missing WorkspaceID — this was the bug") + } + if e.WorkspaceID != testWorkspaceID { + t.Fatalf("expected WorkspaceID %s, got %s", testWorkspaceID, e.WorkspaceID) + } + found = true + break + } + } + if !found { + t.Fatalf("expected task:failed event for task %s", taskID) + } + + // Verify agent status reconciled to idle + var agentStatus string + err = testPool.QueryRow(context.Background(), `SELECT status FROM agent WHERE id = $1`, agentID).Scan(&agentStatus) + if err != nil { + t.Fatalf("failed to query agent: %v", err) + } + if agentStatus != "idle" { + t.Fatalf("expected agent status 'idle' after sweep, got '%s'", agentStatus) + } +} + +// parseUUIDBytes converts a UUID string to the 16-byte array used by pgtype.UUID. +func parseUUIDBytes(s string) [16]byte { + s = strings.ReplaceAll(s, "-", "") + var b [16]byte + for i := 0; i < 16; i++ { + hi := unhex(s[i*2]) + lo := unhex(s[i*2+1]) + b[i] = hi<<4 | lo + } + return b +} + +func unhex(c byte) byte { + switch { + case c >= '0' && c <= '9': + return c - '0' + case c >= 'a' && c <= 'f': + return c - 'a' + 10 + case c >= 'A' && c <= 'F': + return c - 'A' + 10 + } + return 0 +}