multica/server/cmd/server/runtime_sweeper.go
Jiayuan 67f1f49b09 fix(daemon): prevent stuck tasks from blocking queue and add concurrent execution
- Expand FailAgentTask SQL to accept dispatched OR running status
- Add FailStaleTasks server-side sweeper (dispatched >5min, running >2.5h)
- Fix daemon handleTask to fail tasks on all error paths (StartTask, CompleteTask)
- Make daemon poll loop concurrent with semaphore (default 20 parallel tasks)
- Raise default agent max_concurrent_tasks from 1 to 6 (migration 023)
- Add --max-concurrent-tasks CLI flag and MULTICA_DAEMON_MAX_CONCURRENT_TASKS env
2026-03-30 03:08:52 +08:00

133 lines
4.4 KiB
Go

package main
import (
"context"
"log/slog"
"time"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
const (
// sweepInterval is how often we check for stale runtimes and tasks.
sweepInterval = 30 * time.Second
// staleThresholdSeconds marks runtimes offline if no heartbeat for this long.
// The daemon heartbeat interval is 15s, so 45s = 3 missed heartbeats.
staleThresholdSeconds = 45.0
// dispatchTimeoutSeconds fails tasks stuck in 'dispatched' beyond this.
// The dispatched→running transition should be near-instant, so 5 minutes
// means something went wrong (e.g. StartTask API call failed silently).
dispatchTimeoutSeconds = 300.0
// runningTimeoutSeconds fails tasks stuck in 'running' beyond this.
// The default agent timeout is 2h, so 2.5h gives a generous buffer.
runningTimeoutSeconds = 9000.0
)
// runRuntimeSweeper periodically marks runtimes as offline if their
// last_seen_at exceeds the stale threshold, and fails orphaned tasks.
// This handles cases where the daemon crashes, is killed without calling
// the deregister endpoint, or leaves tasks in a non-terminal state.
func runRuntimeSweeper(ctx context.Context, queries *db.Queries, bus *events.Bus) {
ticker := time.NewTicker(sweepInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sweepStaleRuntimes(ctx, queries, bus)
sweepStaleTasks(ctx, queries, bus)
}
}
}
// sweepStaleRuntimes marks runtimes offline if they haven't heartbeated,
// then fails any tasks belonging to those offline runtimes.
func sweepStaleRuntimes(ctx context.Context, queries *db.Queries, bus *events.Bus) {
staleRows, err := queries.MarkStaleRuntimesOffline(ctx, staleThresholdSeconds)
if err != nil {
slog.Warn("runtime sweeper: failed to mark stale runtimes offline", "error", err)
return
}
if len(staleRows) == 0 {
return
}
// Collect unique workspace IDs to notify.
workspaces := make(map[string]bool)
for _, row := range staleRows {
wsID := util.UUIDToString(row.WorkspaceID)
workspaces[wsID] = true
}
slog.Info("runtime sweeper: marked stale runtimes offline", "count", len(staleRows), "workspaces", len(workspaces))
// Fail orphaned tasks (dispatched/running) whose runtimes just went offline.
failedTasks, err := queries.FailTasksForOfflineRuntimes(ctx)
if err != nil {
slog.Warn("runtime sweeper: failed to clean up stale tasks", "error", err)
} else if len(failedTasks) > 0 {
slog.Info("runtime sweeper: failed orphaned tasks", "count", len(failedTasks))
for _, ft := range failedTasks {
bus.Publish(events.Event{
Type: protocol.EventTaskFailed,
ActorType: "system",
Payload: map[string]any{
"task_id": util.UUIDToString(ft.ID),
"agent_id": util.UUIDToString(ft.AgentID),
"issue_id": util.UUIDToString(ft.IssueID),
"status": "failed",
},
})
}
}
// Notify frontend clients so they re-fetch runtime list.
for wsID := range workspaces {
bus.Publish(events.Event{
Type: protocol.EventDaemonRegister,
WorkspaceID: wsID,
ActorType: "system",
Payload: map[string]any{
"action": "stale_sweep",
},
})
}
}
// sweepStaleTasks fails tasks stuck in dispatched/running for too long,
// even when the runtime is still online. This handles cases where:
// - The agent process hangs and the daemon is still heartbeating
// - The daemon failed to report task completion/failure
// - A server restart left tasks in a non-terminal state
func sweepStaleTasks(ctx context.Context, queries *db.Queries, bus *events.Bus) {
failedTasks, err := queries.FailStaleTasks(ctx, db.FailStaleTasksParams{
DispatchTimeoutSecs: dispatchTimeoutSeconds,
RunningTimeoutSecs: runningTimeoutSeconds,
})
if err != nil {
slog.Warn("task sweeper: failed to clean up stale tasks", "error", err)
return
}
if len(failedTasks) == 0 {
return
}
slog.Info("task sweeper: failed stale tasks", "count", len(failedTasks))
for _, ft := range failedTasks {
bus.Publish(events.Event{
Type: protocol.EventTaskFailed,
ActorType: "system",
Payload: map[string]any{
"task_id": util.UUIDToString(ft.ID),
"agent_id": util.UUIDToString(ft.AgentID),
"issue_id": util.UUIDToString(ft.IssueID),
"status": "failed",
},
})
}
}