From afdfee78b943ce5cfeb27e4fe5b591774ceae284 Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Tue, 31 Mar 2026 14:41:26 +0800 Subject: [PATCH] feat(daemon): add authentication for daemon API routes Issue daemon auth tokens (mdt_) on pairing session claim, bound to workspace_id + daemon_id with 1-year expiry. Add DaemonAuth middleware that validates these tokens and falls back to JWT/PAT for backward compatibility. Apply middleware to all daemon routes except pairing endpoints. --- server/cmd/server/router.go | 36 ++++--- server/internal/auth/jwt.go | 9 ++ server/internal/handler/daemon_pairing.go | 40 ++++++- server/internal/middleware/daemon_auth.go | 112 ++++++++++++++++++++ server/migrations/028_daemon_token.down.sql | 1 + server/migrations/028_daemon_token.up.sql | 11 ++ server/pkg/db/generated/daemon_token.sql.go | 88 +++++++++++++++ server/pkg/db/generated/models.go | 9 ++ server/pkg/db/queries/daemon_token.sql | 16 +++ 9 files changed, 306 insertions(+), 16 deletions(-) create mode 100644 server/internal/middleware/daemon_auth.go create mode 100644 server/migrations/028_daemon_token.down.sql create mode 100644 server/migrations/028_daemon_token.up.sql create mode 100644 server/pkg/db/generated/daemon_token.sql.go create mode 100644 server/pkg/db/queries/daemon_token.sql diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 2de70b1e..0f70001d 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -79,28 +79,34 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Post("/auth/send-code", h.SendCode) r.Post("/auth/verify-code", h.VerifyCode) - // Daemon API routes (no user auth; daemon auth deferred to later) + // Daemon API routes r.Route("/api/daemon", func(r chi.Router) { + // Pairing routes — no auth required (daemon doesn't have a token yet). r.Post("/pairing-sessions", h.CreateDaemonPairingSession) r.Get("/pairing-sessions/{token}", h.GetDaemonPairingSession) r.Post("/pairing-sessions/{token}/claim", h.ClaimDaemonPairingSession) - r.Post("/register", h.DaemonRegister) - r.Post("/deregister", h.DaemonDeregister) - r.Post("/heartbeat", h.DaemonHeartbeat) + // Authenticated daemon routes — require daemon token (mdt_) or user JWT/PAT. + r.Group(func(r chi.Router) { + r.Use(middleware.DaemonAuth(queries)) - r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime) - r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) - r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage) - r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult) + r.Post("/register", h.DaemonRegister) + r.Post("/deregister", h.DaemonDeregister) + r.Post("/heartbeat", h.DaemonHeartbeat) - r.Get("/tasks/{taskId}/status", h.GetTaskStatus) - r.Post("/tasks/{taskId}/start", h.StartTask) - r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress) - r.Post("/tasks/{taskId}/complete", h.CompleteTask) - r.Post("/tasks/{taskId}/fail", h.FailTask) - r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages) - r.Get("/tasks/{taskId}/messages", h.ListTaskMessages) + r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime) + r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) + r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage) + r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult) + + r.Get("/tasks/{taskId}/status", h.GetTaskStatus) + r.Post("/tasks/{taskId}/start", h.StartTask) + r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress) + r.Post("/tasks/{taskId}/complete", h.CompleteTask) + r.Post("/tasks/{taskId}/fail", h.FailTask) + r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages) + r.Get("/tasks/{taskId}/messages", h.ListTaskMessages) + }) }) // Protected API routes diff --git a/server/internal/auth/jwt.go b/server/internal/auth/jwt.go index 6ad212ff..f300ed70 100644 --- a/server/internal/auth/jwt.go +++ b/server/internal/auth/jwt.go @@ -37,6 +37,15 @@ func GeneratePATToken() (string, error) { return "mul_" + hex.EncodeToString(b), nil } +// GenerateDaemonToken creates a new daemon auth token: "mdt_" + 40 random hex chars. +func GenerateDaemonToken() (string, error) { + b := make([]byte, 20) // 20 bytes = 40 hex chars + if _, err := rand.Read(b); err != nil { + return "", fmt.Errorf("generate daemon token: %w", err) + } + return "mdt_" + hex.EncodeToString(b), nil +} + // HashToken returns the hex-encoded SHA-256 hash of a token string. func HashToken(token string) string { h := sha256.Sum256([]byte(token)) diff --git a/server/internal/handler/daemon_pairing.go b/server/internal/handler/daemon_pairing.go index 9cf7747f..a8bfcfda 100644 --- a/server/internal/handler/daemon_pairing.go +++ b/server/internal/handler/daemon_pairing.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "log/slog" "net/http" "net/url" "os" @@ -14,6 +15,8 @@ import ( "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/auth" + db "github.com/multica-ai/multica/server/pkg/db/generated" ) const daemonPairingTTL = 10 * time.Minute @@ -50,6 +53,7 @@ type DaemonPairingSessionResponse struct { CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` LinkURL *string `json:"link_url,omitempty"` + DaemonToken *string `json:"daemon_token,omitempty"` } type CreateDaemonPairingSessionRequest struct { @@ -382,5 +386,39 @@ func (h *Handler) ClaimDaemonPairingSession(w http.ResponseWriter, r *http.Reque return } - writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true)) + resp := daemonPairingSessionToResponse(rec, true) + + // Issue a daemon auth token bound to the workspace and daemon. + if rec.WorkspaceID.Valid { + plainToken, err := auth.GenerateDaemonToken() + if err != nil { + slog.Error("failed to generate daemon token", "error", err) + writeError(w, http.StatusInternalServerError, "failed to generate daemon token") + return + } + hash := auth.HashToken(plainToken) + + // Revoke any existing tokens for this workspace+daemon pair. + _ = h.Queries.DeleteDaemonTokensByWorkspaceAndDaemon(r.Context(), db.DeleteDaemonTokensByWorkspaceAndDaemonParams{ + WorkspaceID: rec.WorkspaceID, + DaemonID: rec.DaemonID, + }) + + _, err = h.Queries.CreateDaemonToken(r.Context(), db.CreateDaemonTokenParams{ + TokenHash: hash, + WorkspaceID: rec.WorkspaceID, + DaemonID: rec.DaemonID, + ExpiresAt: pgtype.Timestamptz{Time: time.Now().Add(365 * 24 * time.Hour), Valid: true}, + }) + if err != nil { + slog.Error("failed to store daemon token", "error", err) + writeError(w, http.StatusInternalServerError, "failed to store daemon token") + return + } + + resp.DaemonToken = &plainToken + slog.Info("daemon token issued", "daemon_id", rec.DaemonID, "workspace_id", uuidToPtr(rec.WorkspaceID)) + } + + writeJSON(w, http.StatusOK, resp) } diff --git a/server/internal/middleware/daemon_auth.go b/server/internal/middleware/daemon_auth.go new file mode 100644 index 00000000..d91282cf --- /dev/null +++ b/server/internal/middleware/daemon_auth.go @@ -0,0 +1,112 @@ +package middleware + +import ( + "context" + "log/slog" + "net/http" + "strings" + + "github.com/golang-jwt/jwt/v5" + "github.com/multica-ai/multica/server/internal/auth" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// Daemon context keys. +type daemonContextKey int + +const ( + ctxKeyDaemonWorkspaceID daemonContextKey = iota + ctxKeyDaemonID +) + +// DaemonWorkspaceIDFromContext returns the workspace ID set by DaemonAuth middleware. +func DaemonWorkspaceIDFromContext(ctx context.Context) string { + id, _ := ctx.Value(ctxKeyDaemonWorkspaceID).(string) + return id +} + +// DaemonIDFromContext returns the daemon ID set by DaemonAuth middleware. +func DaemonIDFromContext(ctx context.Context) string { + id, _ := ctx.Value(ctxKeyDaemonID).(string) + return id +} + +// DaemonAuth validates daemon auth tokens (mdt_ prefix) or falls back to +// JWT/PAT validation for backward compatibility with daemons that +// authenticate via user tokens. +func DaemonAuth(queries *db.Queries) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + authHeader := r.Header.Get("Authorization") + if authHeader == "" { + slog.Debug("daemon_auth: missing authorization header", "path", r.URL.Path) + writeError(w, http.StatusUnauthorized, "missing authorization header") + return + } + + tokenString := strings.TrimPrefix(authHeader, "Bearer ") + if tokenString == authHeader { + slog.Debug("daemon_auth: invalid format", "path", r.URL.Path) + writeError(w, http.StatusUnauthorized, "invalid authorization format") + return + } + + // Daemon token: "mdt_" prefix. + if strings.HasPrefix(tokenString, "mdt_") { + hash := auth.HashToken(tokenString) + dt, err := queries.GetDaemonTokenByHash(r.Context(), hash) + if err != nil { + slog.Warn("daemon_auth: invalid daemon token", "path", r.URL.Path, "error", err) + writeError(w, http.StatusUnauthorized, "invalid daemon token") + return + } + + ctx := context.WithValue(r.Context(), ctxKeyDaemonWorkspaceID, uuidToString(dt.WorkspaceID)) + ctx = context.WithValue(ctx, ctxKeyDaemonID, dt.DaemonID) + next.ServeHTTP(w, r.WithContext(ctx)) + return + } + + // Fallback: PAT tokens ("mul_" prefix). + if strings.HasPrefix(tokenString, "mul_") { + hash := auth.HashToken(tokenString) + pat, err := queries.GetPersonalAccessTokenByHash(r.Context(), hash) + if err != nil { + slog.Warn("daemon_auth: invalid PAT", "path", r.URL.Path, "error", err) + writeError(w, http.StatusUnauthorized, "invalid token") + return + } + r.Header.Set("X-User-ID", uuidToString(pat.UserID)) + go queries.UpdatePersonalAccessTokenLastUsed(context.Background(), pat.ID) + next.ServeHTTP(w, r) + return + } + + // Fallback: JWT tokens. + token, err := jwt.Parse(tokenString, func(token *jwt.Token) (any, error) { + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, jwt.ErrSignatureInvalid + } + return auth.JWTSecret(), nil + }) + if err != nil || !token.Valid { + slog.Warn("daemon_auth: invalid token", "path", r.URL.Path, "error", err) + writeError(w, http.StatusUnauthorized, "invalid token") + return + } + + claims, ok := token.Claims.(jwt.MapClaims) + if !ok { + writeError(w, http.StatusUnauthorized, "invalid claims") + return + } + sub, ok := claims["sub"].(string) + if !ok || strings.TrimSpace(sub) == "" { + writeError(w, http.StatusUnauthorized, "invalid claims") + return + } + r.Header.Set("X-User-ID", sub) + next.ServeHTTP(w, r) + }) + } +} diff --git a/server/migrations/028_daemon_token.down.sql b/server/migrations/028_daemon_token.down.sql new file mode 100644 index 00000000..18600acc --- /dev/null +++ b/server/migrations/028_daemon_token.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS daemon_token; diff --git a/server/migrations/028_daemon_token.up.sql b/server/migrations/028_daemon_token.up.sql new file mode 100644 index 00000000..6704aa08 --- /dev/null +++ b/server/migrations/028_daemon_token.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE daemon_token ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + token_hash TEXT NOT NULL, + workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE, + daemon_id TEXT NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX idx_daemon_token_hash ON daemon_token(token_hash); +CREATE INDEX idx_daemon_token_workspace_daemon ON daemon_token(workspace_id, daemon_id); diff --git a/server/pkg/db/generated/daemon_token.sql.go b/server/pkg/db/generated/daemon_token.sql.go new file mode 100644 index 00000000..367d7504 --- /dev/null +++ b/server/pkg/db/generated/daemon_token.sql.go @@ -0,0 +1,88 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: daemon_token.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const createDaemonToken = `-- name: CreateDaemonToken :one +INSERT INTO daemon_token (token_hash, workspace_id, daemon_id, expires_at) +VALUES ($1, $2, $3, $4) +RETURNING id, token_hash, workspace_id, daemon_id, expires_at, created_at +` + +type CreateDaemonTokenParams struct { + TokenHash string `json:"token_hash"` + WorkspaceID pgtype.UUID `json:"workspace_id"` + DaemonID string `json:"daemon_id"` + ExpiresAt pgtype.Timestamptz `json:"expires_at"` +} + +func (q *Queries) CreateDaemonToken(ctx context.Context, arg CreateDaemonTokenParams) (DaemonToken, error) { + row := q.db.QueryRow(ctx, createDaemonToken, + arg.TokenHash, + arg.WorkspaceID, + arg.DaemonID, + arg.ExpiresAt, + ) + var i DaemonToken + err := row.Scan( + &i.ID, + &i.TokenHash, + &i.WorkspaceID, + &i.DaemonID, + &i.ExpiresAt, + &i.CreatedAt, + ) + return i, err +} + +const deleteDaemonTokensByWorkspaceAndDaemon = `-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec +DELETE FROM daemon_token +WHERE workspace_id = $1 AND daemon_id = $2 +` + +type DeleteDaemonTokensByWorkspaceAndDaemonParams struct { + WorkspaceID pgtype.UUID `json:"workspace_id"` + DaemonID string `json:"daemon_id"` +} + +func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemon(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonParams) error { + _, err := q.db.Exec(ctx, deleteDaemonTokensByWorkspaceAndDaemon, arg.WorkspaceID, arg.DaemonID) + return err +} + +const deleteExpiredDaemonTokens = `-- name: DeleteExpiredDaemonTokens :exec +DELETE FROM daemon_token +WHERE expires_at <= now() +` + +func (q *Queries) DeleteExpiredDaemonTokens(ctx context.Context) error { + _, err := q.db.Exec(ctx, deleteExpiredDaemonTokens) + return err +} + +const getDaemonTokenByHash = `-- name: GetDaemonTokenByHash :one +SELECT id, token_hash, workspace_id, daemon_id, expires_at, created_at FROM daemon_token +WHERE token_hash = $1 AND expires_at > now() +` + +func (q *Queries) GetDaemonTokenByHash(ctx context.Context, tokenHash string) (DaemonToken, error) { + row := q.db.QueryRow(ctx, getDaemonTokenByHash, tokenHash) + var i DaemonToken + err := row.Scan( + &i.ID, + &i.TokenHash, + &i.WorkspaceID, + &i.DaemonID, + &i.ExpiresAt, + &i.CreatedAt, + ) + return i, err +} diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index 9547212e..78a736c5 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -131,6 +131,15 @@ type DaemonPairingSession struct { UpdatedAt pgtype.Timestamptz `json:"updated_at"` } +type DaemonToken struct { + ID pgtype.UUID `json:"id"` + TokenHash string `json:"token_hash"` + WorkspaceID pgtype.UUID `json:"workspace_id"` + DaemonID string `json:"daemon_id"` + ExpiresAt pgtype.Timestamptz `json:"expires_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + type InboxItem struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` diff --git a/server/pkg/db/queries/daemon_token.sql b/server/pkg/db/queries/daemon_token.sql new file mode 100644 index 00000000..252b17f2 --- /dev/null +++ b/server/pkg/db/queries/daemon_token.sql @@ -0,0 +1,16 @@ +-- name: CreateDaemonToken :one +INSERT INTO daemon_token (token_hash, workspace_id, daemon_id, expires_at) +VALUES ($1, $2, $3, $4) +RETURNING *; + +-- name: GetDaemonTokenByHash :one +SELECT * FROM daemon_token +WHERE token_hash = $1 AND expires_at > now(); + +-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec +DELETE FROM daemon_token +WHERE workspace_id = $1 AND daemon_id = $2; + +-- name: DeleteExpiredDaemonTokens :exec +DELETE FROM daemon_token +WHERE expires_at <= now();