diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 7f5bf1f2..13fd3092 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -65,6 +65,10 @@ func main() { Handler: r, } + // Start background sweeper to mark stale runtimes as offline. + sweepCtx, sweepCancel := context.WithCancel(context.Background()) + go runRuntimeSweeper(sweepCtx, queries, bus) + // Graceful shutdown go func() { slog.Info("server starting", "port", port) @@ -79,6 +83,7 @@ func main() { <-quit slog.Info("shutting down server") + sweepCancel() shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/server/cmd/server/runtime_sweeper.go b/server/cmd/server/runtime_sweeper.go new file mode 100644 index 00000000..e1d47a53 --- /dev/null +++ b/server/cmd/server/runtime_sweeper.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "log/slog" + "time" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +const ( + // sweepInterval is how often we check for stale runtimes. + sweepInterval = 30 * time.Second + // staleThresholdSeconds marks runtimes offline if no heartbeat for this long. + // The daemon heartbeat interval is 15s, so 45s = 3 missed heartbeats. + staleThresholdSeconds = 45.0 +) + +// runRuntimeSweeper periodically marks runtimes as offline if their +// last_seen_at exceeds the stale threshold. This handles cases where the +// daemon crashes or is killed without calling the deregister endpoint. +func runRuntimeSweeper(ctx context.Context, queries *db.Queries, bus *events.Bus) { + ticker := time.NewTicker(sweepInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + staleRows, err := queries.MarkStaleRuntimesOffline(ctx, staleThresholdSeconds) + if err != nil { + slog.Warn("runtime sweeper: failed to mark stale runtimes offline", "error", err) + continue + } + if len(staleRows) == 0 { + continue + } + + // Collect unique workspace IDs to notify. + workspaces := make(map[string]bool) + for _, row := range staleRows { + wsID := util.UUIDToString(row.WorkspaceID) + workspaces[wsID] = true + } + + slog.Info("runtime sweeper: marked stale runtimes offline", "count", len(staleRows), "workspaces", len(workspaces)) + + // Notify frontend clients so they re-fetch runtime list. + for wsID := range workspaces { + bus.Publish(events.Event{ + Type: protocol.EventDaemonRegister, + WorkspaceID: wsID, + ActorType: "system", + Payload: map[string]any{ + "action": "stale_sweep", + }, + }) + } + } + } +} diff --git a/server/pkg/db/generated/runtime.sql.go b/server/pkg/db/generated/runtime.sql.go index 1946ec33..d871d72d 100644 --- a/server/pkg/db/generated/runtime.sql.go +++ b/server/pkg/db/generated/runtime.sql.go @@ -105,6 +105,39 @@ func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID return items, nil } +const markStaleRuntimesOffline = `-- name: MarkStaleRuntimesOffline :many +UPDATE agent_runtime +SET status = 'offline', updated_at = now() +WHERE status = 'online' + AND last_seen_at < now() - make_interval(secs => $1::double precision) +RETURNING id, workspace_id +` + +type MarkStaleRuntimesOfflineRow struct { + ID pgtype.UUID `json:"id"` + WorkspaceID pgtype.UUID `json:"workspace_id"` +} + +func (q *Queries) MarkStaleRuntimesOffline(ctx context.Context, staleSeconds float64) ([]MarkStaleRuntimesOfflineRow, error) { + rows, err := q.db.Query(ctx, markStaleRuntimesOffline, staleSeconds) + if err != nil { + return nil, err + } + defer rows.Close() + items := []MarkStaleRuntimesOfflineRow{} + for rows.Next() { + var i MarkStaleRuntimesOfflineRow + if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const setAgentRuntimeOffline = `-- name: SetAgentRuntimeOffline :exec UPDATE agent_runtime SET status = 'offline', updated_at = now() diff --git a/server/pkg/db/queries/runtime.sql b/server/pkg/db/queries/runtime.sql index 5a40ee8a..6aabb657 100644 --- a/server/pkg/db/queries/runtime.sql +++ b/server/pkg/db/queries/runtime.sql @@ -44,3 +44,10 @@ RETURNING *; UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE id = $1; + +-- name: MarkStaleRuntimesOffline :many +UPDATE agent_runtime +SET status = 'offline', updated_at = now() +WHERE status = 'online' + AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision) +RETURNING id, workspace_id;