diff --git a/apps/web/features/runtimes/components/runtime-detail.tsx b/apps/web/features/runtimes/components/runtime-detail.tsx index 9c0919ac..3e124ff3 100644 --- a/apps/web/features/runtimes/components/runtime-detail.tsx +++ b/apps/web/features/runtimes/components/runtime-detail.tsx @@ -2,9 +2,20 @@ import type { AgentRuntime } from "@/shared/types"; import { formatLastSeen } from "../utils"; import { RuntimeModeIcon, StatusBadge, InfoField } from "./shared"; import { PingSection } from "./ping-section"; +import { UpdateSection } from "./update-section"; import { UsageSection } from "./usage-section"; +function getCliVersion(metadata: Record): string | null { + if (metadata && typeof metadata.version === "string" && metadata.version) { + return metadata.version; + } + return null; +} + export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) { + const cliVersion = + runtime.runtime_mode === "local" ? getCliVersion(runtime.metadata) : null; + return (
{/* Header */} @@ -43,6 +54,20 @@ export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) { )}
+ {/* CLI Version & Update */} + {runtime.runtime_mode === "local" && ( +
+

+ CLI Version +

+ +
+ )} + {/* Connection Test */}

diff --git a/apps/web/features/runtimes/components/update-section.tsx b/apps/web/features/runtimes/components/update-section.tsx new file mode 100644 index 00000000..b85d1ba2 --- /dev/null +++ b/apps/web/features/runtimes/components/update-section.tsx @@ -0,0 +1,227 @@ +import { useState, useEffect, useCallback, useRef } from "react"; +import { + Loader2, + CheckCircle2, + XCircle, + ArrowUpCircle, + Check, +} from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { api } from "@/shared/api"; +import type { RuntimeUpdateStatus } from "@/shared/types"; + +const GITHUB_RELEASES_URL = + "https://api.github.com/repos/multica-ai/multica/releases/latest"; +const CACHE_TTL_MS = 10 * 60 * 1000; // 10 minutes + +let cachedLatestVersion: string | null = null; +let cachedAt = 0; + +async function fetchLatestVersion(): Promise { + if (cachedLatestVersion && Date.now() - cachedAt < CACHE_TTL_MS) { + return cachedLatestVersion; + } + try { + const resp = await fetch(GITHUB_RELEASES_URL, { + headers: { Accept: "application/vnd.github+json" }, + }); + if (!resp.ok) return null; + const data = await resp.json(); + cachedLatestVersion = data.tag_name ?? null; + cachedAt = Date.now(); + return cachedLatestVersion; + } catch { + return null; + } +} + +function stripV(v: string): string { + return v.replace(/^v/, ""); +} + +function isNewer(latest: string, current: string): boolean { + const l = stripV(latest).split(".").map(Number); + const c = stripV(current).split(".").map(Number); + for (let i = 0; i < Math.max(l.length, c.length); i++) { + const lv = l[i] ?? 0; + const cv = c[i] ?? 0; + if (lv > cv) return true; + if (lv < cv) return false; + } + return false; +} + +const statusConfig: Record< + RuntimeUpdateStatus, + { label: string; icon: typeof Loader2; color: string } +> = { + pending: { + label: "Waiting for daemon...", + icon: Loader2, + color: "text-muted-foreground", + }, + running: { + label: "Updating...", + icon: Loader2, + color: "text-info", + }, + completed: { + label: "Update complete. Daemon is restarting...", + icon: CheckCircle2, + color: "text-success", + }, + failed: { label: "Update failed", icon: XCircle, color: "text-destructive" }, + timeout: { label: "Timeout", icon: XCircle, color: "text-warning" }, +}; + +interface UpdateSectionProps { + runtimeId: string; + currentVersion: string | null; + isOnline: boolean; +} + +export function UpdateSection({ + runtimeId, + currentVersion, + isOnline, +}: UpdateSectionProps) { + const [latestVersion, setLatestVersion] = useState(null); + const [status, setStatus] = useState(null); + const [error, setError] = useState(""); + const [output, setOutput] = useState(""); + const [updating, setUpdating] = useState(false); + const pollRef = useRef | null>(null); + + const cleanup = useCallback(() => { + if (pollRef.current) { + clearInterval(pollRef.current); + pollRef.current = null; + } + }, []); + + useEffect(() => cleanup, [cleanup]); + + // Fetch latest version on mount. + useEffect(() => { + fetchLatestVersion().then(setLatestVersion); + }, []); + + const handleUpdate = async () => { + if (!latestVersion) return; + cleanup(); + setUpdating(true); + setStatus("pending"); + setError(""); + setOutput(""); + + try { + const update = await api.initiateUpdate(runtimeId, latestVersion); + + pollRef.current = setInterval(async () => { + try { + const result = await api.getUpdateResult(runtimeId, update.id); + setStatus(result.status as RuntimeUpdateStatus); + + if (result.status === "completed") { + setOutput(result.output ?? ""); + setUpdating(false); + cleanup(); + } else if ( + result.status === "failed" || + result.status === "timeout" + ) { + setError(result.error ?? "Unknown error"); + setUpdating(false); + cleanup(); + } + } catch { + // ignore poll errors + } + }, 2000); + } catch { + setStatus("failed"); + setError("Failed to initiate update"); + setUpdating(false); + } + }; + + const hasUpdate = + currentVersion && + latestVersion && + isNewer(latestVersion, currentVersion); + + const config = status ? statusConfig[status] : null; + const Icon = config?.icon; + const isActive = status === "pending" || status === "running"; + + return ( +
+
+ CLI Version: + + {currentVersion ?? "unknown"} + + + {!hasUpdate && currentVersion && latestVersion && !status && ( + + + Latest + + )} + + {hasUpdate && !status && ( + <> + + + {latestVersion} + + available + + )} + + {hasUpdate && isOnline && !status && ( + + )} + + {config && Icon && ( + + + {config.label} + + )} +
+ + {status === "completed" && output && ( +
+

{output}

+
+ )} + + {(status === "failed" || status === "timeout") && error && ( +
+

{error}

+ {status === "failed" && ( + + )} +
+ )} +
+ ); +} diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 65004b34..5314e978 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -31,6 +31,7 @@ import type { RuntimeUsage, RuntimeHourlyActivity, RuntimePing, + RuntimeUpdate, TimelineEntry, TaskMessagePayload, Attachment, @@ -344,6 +345,23 @@ export class ApiClient { return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`); } + async initiateUpdate( + runtimeId: string, + targetVersion: string, + ): Promise { + return this.fetch(`/api/runtimes/${runtimeId}/update`, { + method: "POST", + body: JSON.stringify({ target_version: targetVersion }), + }); + } + + async getUpdateResult( + runtimeId: string, + updateId: string, + ): Promise { + return this.fetch(`/api/runtimes/${runtimeId}/update/${updateId}`); + } + async listAgentTasks(agentId: string): Promise { return this.fetch(`/api/agents/${agentId}/tasks`); } diff --git a/apps/web/shared/types/agent.ts b/apps/web/shared/types/agent.ts index d983c10b..232b4da4 100644 --- a/apps/web/shared/types/agent.ts +++ b/apps/web/shared/types/agent.ts @@ -174,3 +174,21 @@ export interface RuntimeHourlyActivity { hour: number; count: number; } + +export type RuntimeUpdateStatus = + | "pending" + | "running" + | "completed" + | "failed" + | "timeout"; + +export interface RuntimeUpdate { + id: string; + runtime_id: string; + status: RuntimeUpdateStatus; + target_version: string; + output?: string; + error?: string; + created_at: string; + updated_at: string; +} diff --git a/apps/web/shared/types/index.ts b/apps/web/shared/types/index.ts index 709c7f18..375221cf 100644 --- a/apps/web/shared/types/index.ts +++ b/apps/web/shared/types/index.ts @@ -21,6 +21,8 @@ export type { RuntimeHourlyActivity, RuntimePing, RuntimePingStatus, + RuntimeUpdate, + RuntimeUpdateStatus, } from "./agent"; export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox"; diff --git a/server/cmd/multica/cmd_daemon.go b/server/cmd/multica/cmd_daemon.go index 6c70f40c..f7bd4965 100644 --- a/server/cmd/multica/cmd_daemon.go +++ b/server/cmd/multica/cmd_daemon.go @@ -280,6 +280,39 @@ func runDaemonForeground(cmd *cobra.Command) error { if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { return err } + + // Check if the daemon needs to restart after a CLI update. + if restartBin := d.RestartBinary(); restartBin != "" { + logger.Info("restarting daemon with updated binary", "path", restartBin) + + args := buildDaemonStartArgs(cmd) + child := exec.Command(restartBin, args...) + + logPath := daemonLogPathForProfile(profile) + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + logger.Error("failed to open log file for restart", "error", err) + return nil + } + child.Stdout = logFile + child.Stderr = logFile + child.SysProcAttr = &syscall.SysProcAttr{Setsid: true} + + if err := child.Start(); err != nil { + logFile.Close() + logger.Error("failed to start new daemon", "error", err) + return nil + } + logFile.Close() + child.Process.Release() + + // Write new PID file. + pidPath := daemonPIDPathForProfile(profile) + os.WriteFile(pidPath, []byte(strconv.Itoa(child.Process.Pid)), 0o644) + + logger.Info("new daemon started", "pid", child.Process.Pid) + } + return nil } diff --git a/server/cmd/multica/cmd_update.go b/server/cmd/multica/cmd_update.go index 1ed9169e..34142f8b 100644 --- a/server/cmd/multica/cmd_update.go +++ b/server/cmd/multica/cmd_update.go @@ -1,16 +1,13 @@ package main import ( - "encoding/json" "fmt" - "net/http" "os" - "os/exec" - "path/filepath" "strings" - "time" "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" ) var updateCmd = &cobra.Command{ @@ -19,17 +16,11 @@ var updateCmd = &cobra.Command{ RunE: runUpdate, } -// githubRelease is the subset of the GitHub releases API response we need. -type githubRelease struct { - TagName string `json:"tag_name"` - HTMLURL string `json:"html_url"` -} - func runUpdate(_ *cobra.Command, _ []string) error { fmt.Fprintf(os.Stderr, "Current version: %s (commit: %s)\n", version, commit) // Check latest version from GitHub. - latest, err := fetchLatestRelease() + latest, err := cli.FetchLatestRelease() if err != nil { fmt.Fprintf(os.Stderr, "Warning: could not check latest version: %v\n", err) } else { @@ -43,8 +34,15 @@ func runUpdate(_ *cobra.Command, _ []string) error { } // Detect installation method and update accordingly. - if isBrewInstall() { - return updateViaBrew() + if cli.IsBrewInstall() { + fmt.Fprintln(os.Stderr, "Updating via Homebrew...") + output, err := cli.UpdateViaBrew() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", output) + return fmt.Errorf("brew upgrade failed: %w\nYou can try manually: brew upgrade multica-ai/tap/multica", err) + } + fmt.Fprintln(os.Stderr, "Update complete.") + return nil } // Not installed via brew — show manual instructions. @@ -57,79 +55,3 @@ func runUpdate(_ *cobra.Command, _ []string) error { fmt.Fprintln(os.Stderr, " https://github.com/multica-ai/multica/releases/latest") return nil } - -// isBrewInstall checks whether the running multica binary was installed via Homebrew. -func isBrewInstall() bool { - exePath, err := os.Executable() - if err != nil { - return false - } - // Resolve symlinks (brew links binaries from Cellar into prefix/bin). - resolved, err := filepath.EvalSymlinks(exePath) - if err != nil { - resolved = exePath - } - - // Check if the resolved path is inside a Homebrew prefix. - // Common prefixes: /opt/homebrew (Apple Silicon), /usr/local (Intel Mac), or custom. - brewPrefix := getBrewPrefix() - if brewPrefix != "" && strings.HasPrefix(resolved, brewPrefix) { - return true - } - - // Fallback: check well-known Homebrew paths. - for _, prefix := range []string{"/opt/homebrew", "/usr/local", "/home/linuxbrew/.linuxbrew"} { - if strings.HasPrefix(resolved, prefix+"/Cellar/") { - return true - } - } - return false -} - -// getBrewPrefix returns the Homebrew prefix by running `brew --prefix`, or empty string. -func getBrewPrefix() string { - out, err := exec.Command("brew", "--prefix").Output() - if err != nil { - return "" - } - return strings.TrimSpace(string(out)) -} - -func updateViaBrew() error { - fmt.Fprintln(os.Stderr, "Updating via Homebrew...") - - cmd := exec.Command("brew", "upgrade", "multica-ai/tap/multica") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return fmt.Errorf("brew upgrade failed: %w\nYou can try manually: brew upgrade multica-ai/tap/multica", err) - } - - fmt.Fprintln(os.Stderr, "Update complete.") - return nil -} - -func fetchLatestRelease() (*githubRelease, error) { - client := &http.Client{Timeout: 10 * time.Second} - req, err := http.NewRequest(http.MethodGet, "https://api.github.com/repos/multica-ai/multica/releases/latest", nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", "application/vnd.github+json") - - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("GitHub API returned %d", resp.StatusCode) - } - - var release githubRelease - if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { - return nil, err - } - return &release, nil -} diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index a792b381..509826e1 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -95,6 +95,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage) r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult) + r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult) r.Get("/tasks/{taskId}/status", h.GetTaskStatus) r.Post("/tasks/{taskId}/start", h.StartTask) @@ -224,6 +225,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Get("/{runtimeId}/activity", h.GetRuntimeTaskActivity) r.Post("/{runtimeId}/ping", h.InitiatePing) r.Get("/{runtimeId}/ping/{pingId}", h.GetPing) + r.Post("/{runtimeId}/update", h.InitiateUpdate) + r.Get("/{runtimeId}/update/{updateId}", h.GetUpdate) }) // Inbox diff --git a/server/internal/cli/update.go b/server/internal/cli/update.go new file mode 100644 index 00000000..66660b03 --- /dev/null +++ b/server/internal/cli/update.go @@ -0,0 +1,95 @@ +package cli + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +// GitHubRelease is the subset of the GitHub releases API response we need. +type GitHubRelease struct { + TagName string `json:"tag_name"` + HTMLURL string `json:"html_url"` +} + +// FetchLatestRelease fetches the latest release tag from the multica GitHub repo. +func FetchLatestRelease() (*GitHubRelease, error) { + client := &http.Client{Timeout: 10 * time.Second} + req, err := http.NewRequest(http.MethodGet, "https://api.github.com/repos/multica-ai/multica/releases/latest", nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/vnd.github+json") + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GitHub API returned %d", resp.StatusCode) + } + + var release GitHubRelease + if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { + return nil, err + } + return &release, nil +} + +// IsBrewInstall checks whether the running multica binary was installed via Homebrew. +func IsBrewInstall() bool { + exePath, err := os.Executable() + if err != nil { + return false + } + resolved, err := filepath.EvalSymlinks(exePath) + if err != nil { + resolved = exePath + } + + brewPrefix := GetBrewPrefix() + if brewPrefix != "" && strings.HasPrefix(resolved, brewPrefix) { + return true + } + + for _, prefix := range []string{"/opt/homebrew", "/usr/local", "/home/linuxbrew/.linuxbrew"} { + if strings.HasPrefix(resolved, prefix+"/Cellar/") { + return true + } + } + return false +} + +// GetBrewPrefix returns the Homebrew prefix by running `brew --prefix`, or empty string. +func GetBrewPrefix() string { + out, err := exec.Command("brew", "--prefix").Output() + if err != nil { + return "" + } + return strings.TrimSpace(string(out)) +} + +// UpdateViaBrew runs `brew upgrade multica-ai/tap/multica`. +// Returns the combined output and any error. +func UpdateViaBrew() (string, error) { + cmd := exec.Command("brew", "upgrade", "multica-ai/tap/multica") + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("brew upgrade failed: %w", err) + } + return string(out), nil +} + +// DetectNewBinaryPath returns the path to the multica binary after an update. +// It uses exec.LookPath to find the binary in PATH, which will resolve to the +// updated version after a brew upgrade. +func DetectNewBinaryPath() (string, error) { + return exec.LookPath("multica") +} diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go index c3bdae90..5a50ae92 100644 --- a/server/internal/daemon/client.go +++ b/server/internal/daemon/client.go @@ -139,8 +139,9 @@ func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []ma // HeartbeatResponse contains the server's response to a heartbeat, including any pending actions. type HeartbeatResponse struct { - Status string `json:"status"` - PendingPing *PendingPing `json:"pending_ping,omitempty"` + Status string `json:"status"` + PendingPing *PendingPing `json:"pending_ping,omitempty"` + PendingUpdate *PendingUpdate `json:"pending_update,omitempty"` } // PendingPing represents a ping test request from the server. @@ -148,6 +149,12 @@ type PendingPing struct { ID string `json:"id"` } +// PendingUpdate represents a CLI update request from the server. +type PendingUpdate struct { + ID string `json:"id"` + TargetVersion string `json:"target_version"` +} + func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) { var resp HeartbeatResponse if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ @@ -162,6 +169,11 @@ func (c *Client) ReportPingResult(ctx context.Context, runtimeID, pingID string, return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil) } +// ReportUpdateResult sends the CLI update result back to the server. +func (c *Client) ReportUpdateResult(ctx context.Context, runtimeID, updateID string, result map[string]any) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/update/%s/result", runtimeID, updateID), result, nil) +} + // WorkspaceInfo holds minimal workspace metadata returned by the API. type WorkspaceInfo struct { ID string `json:"id"` diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 6ab5ac93..cabbf60c 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -35,6 +35,10 @@ type Daemon struct { workspaces map[string]*workspaceState runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups reloading sync.Mutex // prevents concurrent reloadWorkspaces + + cancelFunc context.CancelFunc // set by Run(); called by triggerRestart + restartBinary string // non-empty after a successful update; path to the new binary + updating atomic.Bool // prevents concurrent update attempts } // New creates a new Daemon instance. @@ -52,6 +56,10 @@ func New(cfg Config, logger *slog.Logger) *Daemon { // Run starts the daemon: resolves auth, registers runtimes, then polls for tasks. func (d *Daemon) Run(ctx context.Context) error { + // Wrap context so handleUpdate can cancel the daemon for restart. + ctx, cancel := context.WithCancel(ctx) + d.cancelFunc = cancel + // Bind health port early to detect another running daemon. healthLn, err := d.listenHealth() if err != nil { @@ -98,6 +106,12 @@ func (d *Daemon) Run(ctx context.Context) error { return d.pollLoop(ctx) } +// RestartBinary returns the path to the new binary if the daemon needs to restart +// after a successful update, or empty string if no restart is needed. +func (d *Daemon) RestartBinary() string { + return d.restartBinary +} + // deregisterRuntimes notifies the server that all runtimes are going offline. func (d *Daemon) deregisterRuntimes() { runtimeIDs := d.allRuntimeIDs() @@ -440,6 +454,11 @@ func (d *Daemon) heartbeatLoop(ctx context.Context) { go d.handlePing(ctx, *rt, resp.PendingPing.ID) } } + + // Handle pending update requests. + if resp.PendingUpdate != nil { + go d.handleUpdate(ctx, rid, resp.PendingUpdate) + } } } } @@ -519,6 +538,73 @@ func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) { } } +// handleUpdate performs the CLI update when triggered by the server via heartbeat. +func (d *Daemon) handleUpdate(ctx context.Context, runtimeID string, update *PendingUpdate) { + // Prevent concurrent update attempts. + if !d.updating.CompareAndSwap(false, true) { + d.logger.Warn("update already in progress, ignoring", "runtime_id", runtimeID, "update_id", update.ID) + return + } + defer d.updating.Store(false) + + d.logger.Info("CLI update requested", "runtime_id", runtimeID, "update_id", update.ID, "target_version", update.TargetVersion) + + // Report running status. + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "running", + }) + + // Check if installed via Homebrew. + if !cli.IsBrewInstall() { + d.logger.Warn("CLI not installed via Homebrew, cannot auto-update") + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "failed", + "error": "CLI was not installed via Homebrew. Please update manually: https://github.com/multica-ai/multica/releases/latest", + }) + return + } + + // Execute brew upgrade. + d.logger.Info("updating CLI via Homebrew...") + output, err := cli.UpdateViaBrew() + if err != nil { + d.logger.Error("CLI update failed", "error", err, "output", output) + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "failed", + "error": fmt.Sprintf("brew upgrade failed: %v", err), + }) + return + } + + d.logger.Info("CLI update completed successfully", "output", output) + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "completed", + "output": fmt.Sprintf("Updated to %s", update.TargetVersion), + }) + + // Trigger daemon restart with the new binary. + d.triggerRestart() +} + +// triggerRestart initiates a graceful daemon restart after a successful CLI update. +// It finds the new binary path and cancels the daemon context so Run() returns. +// The caller (cmd_daemon.go) checks RestartBinary() and launches the new process. +func (d *Daemon) triggerRestart() { + newBin, err := cli.DetectNewBinaryPath() + if err != nil { + d.logger.Error("could not find updated binary for restart", "error", err) + return + } + + d.logger.Info("scheduling daemon restart", "new_binary", newBin) + d.restartBinary = newBin + + // Cancel the main context to trigger graceful shutdown. + if d.cancelFunc != nil { + d.cancelFunc() + } +} + func (d *Daemon) usageScanLoop(ctx context.Context) { scanner := usage.NewScanner(d.logger) diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index a7821e91..937ddf9a 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -204,6 +204,14 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { resp["pending_ping"] = map[string]string{"id": pending.ID} } + // Check for pending update requests for this runtime. + if pending := h.UpdateStore.PopPending(req.RuntimeID); pending != nil { + resp["pending_update"] = map[string]string{ + "id": pending.ID, + "target_version": pending.TargetVersion, + } + } + writeJSON(w, http.StatusOK, resp) } diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index cdf81027..48f0dabc 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -40,6 +40,7 @@ type Handler struct { TaskService *service.TaskService EmailService *service.EmailService PingStore *PingStore + UpdateStore *UpdateStore Storage *storage.S3Storage CFSigner *auth.CloudFrontSigner } @@ -59,6 +60,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event TaskService: service.NewTaskService(queries, hub, bus), EmailService: emailService, PingStore: NewPingStore(), + UpdateStore: NewUpdateStore(), Storage: s3, CFSigner: cfSigner, } diff --git a/server/internal/handler/runtime_update.go b/server/internal/handler/runtime_update.go new file mode 100644 index 00000000..f7430544 --- /dev/null +++ b/server/internal/handler/runtime_update.go @@ -0,0 +1,221 @@ +package handler + +import ( + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/go-chi/chi/v5" +) + +// --------------------------------------------------------------------------- +// In-memory update store +// --------------------------------------------------------------------------- + +type UpdateStatus string + +const ( + UpdatePending UpdateStatus = "pending" + UpdateRunning UpdateStatus = "running" + UpdateCompleted UpdateStatus = "completed" + UpdateFailed UpdateStatus = "failed" + UpdateTimeout UpdateStatus = "timeout" +) + +// UpdateRequest represents a pending or completed CLI update request. +type UpdateRequest struct { + ID string `json:"id"` + RuntimeID string `json:"runtime_id"` + Status UpdateStatus `json:"status"` + TargetVersion string `json:"target_version"` + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// UpdateStore is a thread-safe in-memory store for CLI update requests. +type UpdateStore struct { + mu sync.Mutex + requests map[string]*UpdateRequest // keyed by update ID +} + +func NewUpdateStore() *UpdateStore { + return &UpdateStore{ + requests: make(map[string]*UpdateRequest), + } +} + +func (s *UpdateStore) Create(runtimeID, targetVersion string) (*UpdateRequest, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // Clean up old requests (>5 minutes). + for id, req := range s.requests { + if time.Since(req.CreatedAt) > 5*time.Minute { + delete(s.requests, id) + } + } + + // Reject if there is already a pending or running update for this runtime. + for _, req := range s.requests { + if req.RuntimeID == runtimeID && (req.Status == UpdatePending || req.Status == UpdateRunning) { + return nil, errUpdateInProgress + } + } + + req := &UpdateRequest{ + ID: randomID(), + RuntimeID: runtimeID, + Status: UpdatePending, + TargetVersion: targetVersion, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + s.requests[req.ID] = req + return req, nil +} + +var errUpdateInProgress = &updateError{msg: "an update is already in progress for this runtime"} + +type updateError struct{ msg string } + +func (e *updateError) Error() string { return e.msg } + +func (s *UpdateStore) Get(id string) *UpdateRequest { + s.mu.Lock() + defer s.mu.Unlock() + + req, ok := s.requests[id] + if !ok { + return nil + } + // Check for timeout (both pending and running states). + if (req.Status == UpdatePending || req.Status == UpdateRunning) && time.Since(req.CreatedAt) > 120*time.Second { + req.Status = UpdateTimeout + req.Error = "update did not complete within 120 seconds" + req.UpdatedAt = time.Now() + } + return req +} + +// PopPending returns and marks as running the pending update for a runtime. +func (s *UpdateStore) PopPending(runtimeID string) *UpdateRequest { + s.mu.Lock() + defer s.mu.Unlock() + + for _, req := range s.requests { + if req.RuntimeID == runtimeID && req.Status == UpdatePending { + req.Status = UpdateRunning + req.UpdatedAt = time.Now() + return req + } + } + return nil +} + +func (s *UpdateStore) Complete(id string, output string) { + s.mu.Lock() + defer s.mu.Unlock() + + if req, ok := s.requests[id]; ok { + req.Status = UpdateCompleted + req.Output = output + req.UpdatedAt = time.Now() + } +} + +func (s *UpdateStore) Fail(id string, errMsg string) { + s.mu.Lock() + defer s.mu.Unlock() + + if req, ok := s.requests[id]; ok { + req.Status = UpdateFailed + req.Error = errMsg + req.UpdatedAt = time.Now() + } +} + +// --------------------------------------------------------------------------- +// Handlers +// --------------------------------------------------------------------------- + +// InitiateUpdate creates a new CLI update request (protected route, called by frontend). +func (h *Handler) InitiateUpdate(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") + + rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID)) + if err != nil { + writeError(w, http.StatusNotFound, "runtime not found") + return + } + + if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok { + return + } + + var req struct { + TargetVersion string `json:"target_version"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + if req.TargetVersion == "" { + writeError(w, http.StatusBadRequest, "target_version is required") + return + } + + update, err := h.UpdateStore.Create(runtimeID, req.TargetVersion) + if err != nil { + writeError(w, http.StatusConflict, err.Error()) + return + } + + writeJSON(w, http.StatusOK, update) +} + +// GetUpdate returns the status of an update request (protected route, called by frontend). +func (h *Handler) GetUpdate(w http.ResponseWriter, r *http.Request) { + updateID := chi.URLParam(r, "updateId") + + update := h.UpdateStore.Get(updateID) + if update == nil { + writeError(w, http.StatusNotFound, "update not found") + return + } + + writeJSON(w, http.StatusOK, update) +} + +// ReportUpdateResult receives the update result from the daemon. +func (h *Handler) ReportUpdateResult(w http.ResponseWriter, r *http.Request) { + updateID := chi.URLParam(r, "updateId") + + var req struct { + Status string `json:"status"` // "running", "completed", or "failed" + Output string `json:"output"` + Error string `json:"error"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + switch req.Status { + case "completed": + h.UpdateStore.Complete(updateID, req.Output) + case "failed": + h.UpdateStore.Fail(updateID, req.Error) + case "running": + // No-op: status is already "running" from PopPending. This call is + // just a progress signal from the daemon to confirm it received the + // update command and is executing it. + default: + writeError(w, http.StatusBadRequest, "invalid status: "+req.Status) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +}