The only path to marking a runtime offline was the daemon's deregister call on graceful shutdown. If the daemon crashed, was killed, or lost network, the status stayed "online" forever. Add a background goroutine that sweeps every 30s and marks runtimes offline after 45s without a heartbeat (3 missed intervals). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
95 lines
2.3 KiB
Go
95 lines
2.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/multica-ai/multica/server/internal/events"
|
|
"github.com/multica-ai/multica/server/internal/logger"
|
|
"github.com/multica-ai/multica/server/internal/realtime"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
)
|
|
|
|
func main() {
|
|
logger.Init()
|
|
|
|
port := os.Getenv("PORT")
|
|
if port == "" {
|
|
port = "8080"
|
|
}
|
|
|
|
dbURL := os.Getenv("DATABASE_URL")
|
|
if dbURL == "" {
|
|
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
|
}
|
|
|
|
// Connect to database
|
|
ctx := context.Background()
|
|
pool, err := pgxpool.New(ctx, dbURL)
|
|
if err != nil {
|
|
slog.Error("unable to connect to database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
defer pool.Close()
|
|
|
|
if err := pool.Ping(ctx); err != nil {
|
|
slog.Error("unable to ping database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("connected to database")
|
|
|
|
bus := events.New()
|
|
hub := realtime.NewHub()
|
|
go hub.Run()
|
|
registerListeners(bus, hub)
|
|
|
|
queries := db.New(pool)
|
|
// Order matters: subscriber listeners must register BEFORE notification listeners.
|
|
// The notification listener queries the subscriber table to determine recipients,
|
|
// so subscribers must be written first within the same synchronous event dispatch.
|
|
registerSubscriberListeners(bus, queries)
|
|
registerActivityListeners(bus, queries)
|
|
registerNotificationListeners(bus, queries)
|
|
|
|
r := NewRouter(pool, hub, bus)
|
|
|
|
srv := &http.Server{
|
|
Addr: ":" + port,
|
|
Handler: r,
|
|
}
|
|
|
|
// Start background sweeper to mark stale runtimes as offline.
|
|
sweepCtx, sweepCancel := context.WithCancel(context.Background())
|
|
go runRuntimeSweeper(sweepCtx, queries, bus)
|
|
|
|
// Graceful shutdown
|
|
go func() {
|
|
slog.Info("server starting", "port", port)
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
slog.Error("server error", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
}()
|
|
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
<-quit
|
|
|
|
slog.Info("shutting down server")
|
|
sweepCancel()
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := srv.Shutdown(shutdownCtx); err != nil {
|
|
slog.Error("server forced to shutdown", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("server stopped")
|
|
}
|