fix(runtime): add server-side sweeper to detect stale runtimes
The only path to marking a runtime offline was the daemon's deregister call on graceful shutdown. If the daemon crashed, was killed, or lost network, the status stayed "online" forever. Add a background goroutine that sweeps every 30s and marks runtimes offline after 45s without a heartbeat (3 missed intervals). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
586c3bf470
commit
b3bbf92a1d
4 changed files with 110 additions and 0 deletions
|
|
@ -65,6 +65,10 @@ func main() {
|
|||
Handler: r,
|
||||
}
|
||||
|
||||
// Start background sweeper to mark stale runtimes as offline.
|
||||
sweepCtx, sweepCancel := context.WithCancel(context.Background())
|
||||
go runRuntimeSweeper(sweepCtx, queries, bus)
|
||||
|
||||
// Graceful shutdown
|
||||
go func() {
|
||||
slog.Info("server starting", "port", port)
|
||||
|
|
@ -79,6 +83,7 @@ func main() {
|
|||
<-quit
|
||||
|
||||
slog.Info("shutting down server")
|
||||
sweepCancel()
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
|
|
|||
65
server/cmd/server/runtime_sweeper.go
Normal file
65
server/cmd/server/runtime_sweeper.go
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
const (
|
||||
// sweepInterval is how often we check for stale runtimes.
|
||||
sweepInterval = 30 * time.Second
|
||||
// staleThresholdSeconds marks runtimes offline if no heartbeat for this long.
|
||||
// The daemon heartbeat interval is 15s, so 45s = 3 missed heartbeats.
|
||||
staleThresholdSeconds = 45.0
|
||||
)
|
||||
|
||||
// runRuntimeSweeper periodically marks runtimes as offline if their
|
||||
// last_seen_at exceeds the stale threshold. This handles cases where the
|
||||
// daemon crashes or is killed without calling the deregister endpoint.
|
||||
func runRuntimeSweeper(ctx context.Context, queries *db.Queries, bus *events.Bus) {
|
||||
ticker := time.NewTicker(sweepInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
staleRows, err := queries.MarkStaleRuntimesOffline(ctx, staleThresholdSeconds)
|
||||
if err != nil {
|
||||
slog.Warn("runtime sweeper: failed to mark stale runtimes offline", "error", err)
|
||||
continue
|
||||
}
|
||||
if len(staleRows) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Collect unique workspace IDs to notify.
|
||||
workspaces := make(map[string]bool)
|
||||
for _, row := range staleRows {
|
||||
wsID := util.UUIDToString(row.WorkspaceID)
|
||||
workspaces[wsID] = true
|
||||
}
|
||||
|
||||
slog.Info("runtime sweeper: marked stale runtimes offline", "count", len(staleRows), "workspaces", len(workspaces))
|
||||
|
||||
// Notify frontend clients so they re-fetch runtime list.
|
||||
for wsID := range workspaces {
|
||||
bus.Publish(events.Event{
|
||||
Type: protocol.EventDaemonRegister,
|
||||
WorkspaceID: wsID,
|
||||
ActorType: "system",
|
||||
Payload: map[string]any{
|
||||
"action": "stale_sweep",
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -105,6 +105,39 @@ func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID
|
|||
return items, nil
|
||||
}
|
||||
|
||||
const markStaleRuntimesOffline = `-- name: MarkStaleRuntimesOffline :many
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE status = 'online'
|
||||
AND last_seen_at < now() - make_interval(secs => $1::double precision)
|
||||
RETURNING id, workspace_id
|
||||
`
|
||||
|
||||
type MarkStaleRuntimesOfflineRow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) MarkStaleRuntimesOffline(ctx context.Context, staleSeconds float64) ([]MarkStaleRuntimesOfflineRow, error) {
|
||||
rows, err := q.db.Query(ctx, markStaleRuntimesOffline, staleSeconds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []MarkStaleRuntimesOfflineRow{}
|
||||
for rows.Next() {
|
||||
var i MarkStaleRuntimesOfflineRow
|
||||
if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const setAgentRuntimeOffline = `-- name: SetAgentRuntimeOffline :exec
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
|
|
|
|||
|
|
@ -44,3 +44,10 @@ RETURNING *;
|
|||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: MarkStaleRuntimesOffline :many
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE status = 'online'
|
||||
AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision)
|
||||
RETURNING id, workspace_id;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue