From fdba410f119d0c5ebafbcaed6691abc896d42b12 Mon Sep 17 00:00:00 2001 From: LinYushen Date: Thu, 2 Apr 2026 14:12:49 +0800 Subject: [PATCH] feat(runtime): support CLI update from web runtime page (#331) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(runtime): support CLI update from web runtime page Add the ability to update the CLI daemon from the web Runtime detail page. When a newer version is available on GitHub Releases, an update button appears. Clicking it sends an update command through the server to the daemon via the heartbeat mechanism (same pattern as ping). The daemon executes `brew upgrade`, reports the result, and restarts itself with the new binary. Changes across all three layers: - Frontend: version display, GitHub latest check, UpdateSection component - Server: UpdateStore (in-memory), heartbeat extension, 3 new endpoints - CLI: shared update logic, daemon handleUpdate + graceful restart Co-Authored-By: Claude Opus 4.6 (1M context) * fix(runtime): handle 'running' status in ReportUpdateResult The daemon sends {"status":"running"} when it starts executing the update, but ReportUpdateResult treated any non-"completed" status as failure — immediately marking the update as failed before brew upgrade even ran. Fix: use a switch statement to handle "running" as a no-op (status is already "running" from PopPending), and also timeout running updates after 120 seconds in case brew upgrade hangs. Co-Authored-By: Claude Opus 4.6 (1M context) --------- Co-authored-by: Claude Opus 4.6 (1M context) --- .../runtimes/components/runtime-detail.tsx | 25 ++ .../runtimes/components/update-section.tsx | 227 ++++++++++++++++++ apps/web/shared/api/client.ts | 18 ++ apps/web/shared/types/agent.ts | 18 ++ apps/web/shared/types/index.ts | 2 + server/cmd/multica/cmd_daemon.go | 33 +++ server/cmd/multica/cmd_update.go | 102 +------- server/cmd/server/router.go | 3 + server/internal/cli/update.go | 95 ++++++++ server/internal/daemon/client.go | 16 +- server/internal/daemon/daemon.go | 86 +++++++ server/internal/handler/daemon.go | 8 + server/internal/handler/handler.go | 2 + server/internal/handler/runtime_update.go | 221 +++++++++++++++++ 14 files changed, 764 insertions(+), 92 deletions(-) create mode 100644 apps/web/features/runtimes/components/update-section.tsx create mode 100644 server/internal/cli/update.go create mode 100644 server/internal/handler/runtime_update.go diff --git a/apps/web/features/runtimes/components/runtime-detail.tsx b/apps/web/features/runtimes/components/runtime-detail.tsx index 9c0919ac..3e124ff3 100644 --- a/apps/web/features/runtimes/components/runtime-detail.tsx +++ b/apps/web/features/runtimes/components/runtime-detail.tsx @@ -2,9 +2,20 @@ import type { AgentRuntime } from "@/shared/types"; import { formatLastSeen } from "../utils"; import { RuntimeModeIcon, StatusBadge, InfoField } from "./shared"; import { PingSection } from "./ping-section"; +import { UpdateSection } from "./update-section"; import { UsageSection } from "./usage-section"; +function getCliVersion(metadata: Record): string | null { + if (metadata && typeof metadata.version === "string" && metadata.version) { + return metadata.version; + } + return null; +} + export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) { + const cliVersion = + runtime.runtime_mode === "local" ? getCliVersion(runtime.metadata) : null; + return (
{/* Header */} @@ -43,6 +54,20 @@ export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) { )}
+ {/* CLI Version & Update */} + {runtime.runtime_mode === "local" && ( +
+

+ CLI Version +

+ +
+ )} + {/* Connection Test */}

diff --git a/apps/web/features/runtimes/components/update-section.tsx b/apps/web/features/runtimes/components/update-section.tsx new file mode 100644 index 00000000..b85d1ba2 --- /dev/null +++ b/apps/web/features/runtimes/components/update-section.tsx @@ -0,0 +1,227 @@ +import { useState, useEffect, useCallback, useRef } from "react"; +import { + Loader2, + CheckCircle2, + XCircle, + ArrowUpCircle, + Check, +} from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { api } from "@/shared/api"; +import type { RuntimeUpdateStatus } from "@/shared/types"; + +const GITHUB_RELEASES_URL = + "https://api.github.com/repos/multica-ai/multica/releases/latest"; +const CACHE_TTL_MS = 10 * 60 * 1000; // 10 minutes + +let cachedLatestVersion: string | null = null; +let cachedAt = 0; + +async function fetchLatestVersion(): Promise { + if (cachedLatestVersion && Date.now() - cachedAt < CACHE_TTL_MS) { + return cachedLatestVersion; + } + try { + const resp = await fetch(GITHUB_RELEASES_URL, { + headers: { Accept: "application/vnd.github+json" }, + }); + if (!resp.ok) return null; + const data = await resp.json(); + cachedLatestVersion = data.tag_name ?? null; + cachedAt = Date.now(); + return cachedLatestVersion; + } catch { + return null; + } +} + +function stripV(v: string): string { + return v.replace(/^v/, ""); +} + +function isNewer(latest: string, current: string): boolean { + const l = stripV(latest).split(".").map(Number); + const c = stripV(current).split(".").map(Number); + for (let i = 0; i < Math.max(l.length, c.length); i++) { + const lv = l[i] ?? 0; + const cv = c[i] ?? 0; + if (lv > cv) return true; + if (lv < cv) return false; + } + return false; +} + +const statusConfig: Record< + RuntimeUpdateStatus, + { label: string; icon: typeof Loader2; color: string } +> = { + pending: { + label: "Waiting for daemon...", + icon: Loader2, + color: "text-muted-foreground", + }, + running: { + label: "Updating...", + icon: Loader2, + color: "text-info", + }, + completed: { + label: "Update complete. Daemon is restarting...", + icon: CheckCircle2, + color: "text-success", + }, + failed: { label: "Update failed", icon: XCircle, color: "text-destructive" }, + timeout: { label: "Timeout", icon: XCircle, color: "text-warning" }, +}; + +interface UpdateSectionProps { + runtimeId: string; + currentVersion: string | null; + isOnline: boolean; +} + +export function UpdateSection({ + runtimeId, + currentVersion, + isOnline, +}: UpdateSectionProps) { + const [latestVersion, setLatestVersion] = useState(null); + const [status, setStatus] = useState(null); + const [error, setError] = useState(""); + const [output, setOutput] = useState(""); + const [updating, setUpdating] = useState(false); + const pollRef = useRef | null>(null); + + const cleanup = useCallback(() => { + if (pollRef.current) { + clearInterval(pollRef.current); + pollRef.current = null; + } + }, []); + + useEffect(() => cleanup, [cleanup]); + + // Fetch latest version on mount. + useEffect(() => { + fetchLatestVersion().then(setLatestVersion); + }, []); + + const handleUpdate = async () => { + if (!latestVersion) return; + cleanup(); + setUpdating(true); + setStatus("pending"); + setError(""); + setOutput(""); + + try { + const update = await api.initiateUpdate(runtimeId, latestVersion); + + pollRef.current = setInterval(async () => { + try { + const result = await api.getUpdateResult(runtimeId, update.id); + setStatus(result.status as RuntimeUpdateStatus); + + if (result.status === "completed") { + setOutput(result.output ?? ""); + setUpdating(false); + cleanup(); + } else if ( + result.status === "failed" || + result.status === "timeout" + ) { + setError(result.error ?? "Unknown error"); + setUpdating(false); + cleanup(); + } + } catch { + // ignore poll errors + } + }, 2000); + } catch { + setStatus("failed"); + setError("Failed to initiate update"); + setUpdating(false); + } + }; + + const hasUpdate = + currentVersion && + latestVersion && + isNewer(latestVersion, currentVersion); + + const config = status ? statusConfig[status] : null; + const Icon = config?.icon; + const isActive = status === "pending" || status === "running"; + + return ( +
+
+ CLI Version: + + {currentVersion ?? "unknown"} + + + {!hasUpdate && currentVersion && latestVersion && !status && ( + + + Latest + + )} + + {hasUpdate && !status && ( + <> + + + {latestVersion} + + available + + )} + + {hasUpdate && isOnline && !status && ( + + )} + + {config && Icon && ( + + + {config.label} + + )} +
+ + {status === "completed" && output && ( +
+

{output}

+
+ )} + + {(status === "failed" || status === "timeout") && error && ( +
+

{error}

+ {status === "failed" && ( + + )} +
+ )} +
+ ); +} diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index 65004b34..5314e978 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -31,6 +31,7 @@ import type { RuntimeUsage, RuntimeHourlyActivity, RuntimePing, + RuntimeUpdate, TimelineEntry, TaskMessagePayload, Attachment, @@ -344,6 +345,23 @@ export class ApiClient { return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`); } + async initiateUpdate( + runtimeId: string, + targetVersion: string, + ): Promise { + return this.fetch(`/api/runtimes/${runtimeId}/update`, { + method: "POST", + body: JSON.stringify({ target_version: targetVersion }), + }); + } + + async getUpdateResult( + runtimeId: string, + updateId: string, + ): Promise { + return this.fetch(`/api/runtimes/${runtimeId}/update/${updateId}`); + } + async listAgentTasks(agentId: string): Promise { return this.fetch(`/api/agents/${agentId}/tasks`); } diff --git a/apps/web/shared/types/agent.ts b/apps/web/shared/types/agent.ts index d983c10b..232b4da4 100644 --- a/apps/web/shared/types/agent.ts +++ b/apps/web/shared/types/agent.ts @@ -174,3 +174,21 @@ export interface RuntimeHourlyActivity { hour: number; count: number; } + +export type RuntimeUpdateStatus = + | "pending" + | "running" + | "completed" + | "failed" + | "timeout"; + +export interface RuntimeUpdate { + id: string; + runtime_id: string; + status: RuntimeUpdateStatus; + target_version: string; + output?: string; + error?: string; + created_at: string; + updated_at: string; +} diff --git a/apps/web/shared/types/index.ts b/apps/web/shared/types/index.ts index 709c7f18..375221cf 100644 --- a/apps/web/shared/types/index.ts +++ b/apps/web/shared/types/index.ts @@ -21,6 +21,8 @@ export type { RuntimeHourlyActivity, RuntimePing, RuntimePingStatus, + RuntimeUpdate, + RuntimeUpdateStatus, } from "./agent"; export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox"; diff --git a/server/cmd/multica/cmd_daemon.go b/server/cmd/multica/cmd_daemon.go index 6c70f40c..f7bd4965 100644 --- a/server/cmd/multica/cmd_daemon.go +++ b/server/cmd/multica/cmd_daemon.go @@ -280,6 +280,39 @@ func runDaemonForeground(cmd *cobra.Command) error { if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { return err } + + // Check if the daemon needs to restart after a CLI update. + if restartBin := d.RestartBinary(); restartBin != "" { + logger.Info("restarting daemon with updated binary", "path", restartBin) + + args := buildDaemonStartArgs(cmd) + child := exec.Command(restartBin, args...) + + logPath := daemonLogPathForProfile(profile) + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + logger.Error("failed to open log file for restart", "error", err) + return nil + } + child.Stdout = logFile + child.Stderr = logFile + child.SysProcAttr = &syscall.SysProcAttr{Setsid: true} + + if err := child.Start(); err != nil { + logFile.Close() + logger.Error("failed to start new daemon", "error", err) + return nil + } + logFile.Close() + child.Process.Release() + + // Write new PID file. + pidPath := daemonPIDPathForProfile(profile) + os.WriteFile(pidPath, []byte(strconv.Itoa(child.Process.Pid)), 0o644) + + logger.Info("new daemon started", "pid", child.Process.Pid) + } + return nil } diff --git a/server/cmd/multica/cmd_update.go b/server/cmd/multica/cmd_update.go index 1ed9169e..34142f8b 100644 --- a/server/cmd/multica/cmd_update.go +++ b/server/cmd/multica/cmd_update.go @@ -1,16 +1,13 @@ package main import ( - "encoding/json" "fmt" - "net/http" "os" - "os/exec" - "path/filepath" "strings" - "time" "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" ) var updateCmd = &cobra.Command{ @@ -19,17 +16,11 @@ var updateCmd = &cobra.Command{ RunE: runUpdate, } -// githubRelease is the subset of the GitHub releases API response we need. -type githubRelease struct { - TagName string `json:"tag_name"` - HTMLURL string `json:"html_url"` -} - func runUpdate(_ *cobra.Command, _ []string) error { fmt.Fprintf(os.Stderr, "Current version: %s (commit: %s)\n", version, commit) // Check latest version from GitHub. - latest, err := fetchLatestRelease() + latest, err := cli.FetchLatestRelease() if err != nil { fmt.Fprintf(os.Stderr, "Warning: could not check latest version: %v\n", err) } else { @@ -43,8 +34,15 @@ func runUpdate(_ *cobra.Command, _ []string) error { } // Detect installation method and update accordingly. - if isBrewInstall() { - return updateViaBrew() + if cli.IsBrewInstall() { + fmt.Fprintln(os.Stderr, "Updating via Homebrew...") + output, err := cli.UpdateViaBrew() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", output) + return fmt.Errorf("brew upgrade failed: %w\nYou can try manually: brew upgrade multica-ai/tap/multica", err) + } + fmt.Fprintln(os.Stderr, "Update complete.") + return nil } // Not installed via brew — show manual instructions. @@ -57,79 +55,3 @@ func runUpdate(_ *cobra.Command, _ []string) error { fmt.Fprintln(os.Stderr, " https://github.com/multica-ai/multica/releases/latest") return nil } - -// isBrewInstall checks whether the running multica binary was installed via Homebrew. -func isBrewInstall() bool { - exePath, err := os.Executable() - if err != nil { - return false - } - // Resolve symlinks (brew links binaries from Cellar into prefix/bin). - resolved, err := filepath.EvalSymlinks(exePath) - if err != nil { - resolved = exePath - } - - // Check if the resolved path is inside a Homebrew prefix. - // Common prefixes: /opt/homebrew (Apple Silicon), /usr/local (Intel Mac), or custom. - brewPrefix := getBrewPrefix() - if brewPrefix != "" && strings.HasPrefix(resolved, brewPrefix) { - return true - } - - // Fallback: check well-known Homebrew paths. - for _, prefix := range []string{"/opt/homebrew", "/usr/local", "/home/linuxbrew/.linuxbrew"} { - if strings.HasPrefix(resolved, prefix+"/Cellar/") { - return true - } - } - return false -} - -// getBrewPrefix returns the Homebrew prefix by running `brew --prefix`, or empty string. -func getBrewPrefix() string { - out, err := exec.Command("brew", "--prefix").Output() - if err != nil { - return "" - } - return strings.TrimSpace(string(out)) -} - -func updateViaBrew() error { - fmt.Fprintln(os.Stderr, "Updating via Homebrew...") - - cmd := exec.Command("brew", "upgrade", "multica-ai/tap/multica") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return fmt.Errorf("brew upgrade failed: %w\nYou can try manually: brew upgrade multica-ai/tap/multica", err) - } - - fmt.Fprintln(os.Stderr, "Update complete.") - return nil -} - -func fetchLatestRelease() (*githubRelease, error) { - client := &http.Client{Timeout: 10 * time.Second} - req, err := http.NewRequest(http.MethodGet, "https://api.github.com/repos/multica-ai/multica/releases/latest", nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", "application/vnd.github+json") - - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("GitHub API returned %d", resp.StatusCode) - } - - var release githubRelease - if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { - return nil, err - } - return &release, nil -} diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index a792b381..509826e1 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -95,6 +95,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage) r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult) + r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult) r.Get("/tasks/{taskId}/status", h.GetTaskStatus) r.Post("/tasks/{taskId}/start", h.StartTask) @@ -224,6 +225,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Get("/{runtimeId}/activity", h.GetRuntimeTaskActivity) r.Post("/{runtimeId}/ping", h.InitiatePing) r.Get("/{runtimeId}/ping/{pingId}", h.GetPing) + r.Post("/{runtimeId}/update", h.InitiateUpdate) + r.Get("/{runtimeId}/update/{updateId}", h.GetUpdate) }) // Inbox diff --git a/server/internal/cli/update.go b/server/internal/cli/update.go new file mode 100644 index 00000000..66660b03 --- /dev/null +++ b/server/internal/cli/update.go @@ -0,0 +1,95 @@ +package cli + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +// GitHubRelease is the subset of the GitHub releases API response we need. +type GitHubRelease struct { + TagName string `json:"tag_name"` + HTMLURL string `json:"html_url"` +} + +// FetchLatestRelease fetches the latest release tag from the multica GitHub repo. +func FetchLatestRelease() (*GitHubRelease, error) { + client := &http.Client{Timeout: 10 * time.Second} + req, err := http.NewRequest(http.MethodGet, "https://api.github.com/repos/multica-ai/multica/releases/latest", nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/vnd.github+json") + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GitHub API returned %d", resp.StatusCode) + } + + var release GitHubRelease + if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { + return nil, err + } + return &release, nil +} + +// IsBrewInstall checks whether the running multica binary was installed via Homebrew. +func IsBrewInstall() bool { + exePath, err := os.Executable() + if err != nil { + return false + } + resolved, err := filepath.EvalSymlinks(exePath) + if err != nil { + resolved = exePath + } + + brewPrefix := GetBrewPrefix() + if brewPrefix != "" && strings.HasPrefix(resolved, brewPrefix) { + return true + } + + for _, prefix := range []string{"/opt/homebrew", "/usr/local", "/home/linuxbrew/.linuxbrew"} { + if strings.HasPrefix(resolved, prefix+"/Cellar/") { + return true + } + } + return false +} + +// GetBrewPrefix returns the Homebrew prefix by running `brew --prefix`, or empty string. +func GetBrewPrefix() string { + out, err := exec.Command("brew", "--prefix").Output() + if err != nil { + return "" + } + return strings.TrimSpace(string(out)) +} + +// UpdateViaBrew runs `brew upgrade multica-ai/tap/multica`. +// Returns the combined output and any error. +func UpdateViaBrew() (string, error) { + cmd := exec.Command("brew", "upgrade", "multica-ai/tap/multica") + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("brew upgrade failed: %w", err) + } + return string(out), nil +} + +// DetectNewBinaryPath returns the path to the multica binary after an update. +// It uses exec.LookPath to find the binary in PATH, which will resolve to the +// updated version after a brew upgrade. +func DetectNewBinaryPath() (string, error) { + return exec.LookPath("multica") +} diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go index c3bdae90..5a50ae92 100644 --- a/server/internal/daemon/client.go +++ b/server/internal/daemon/client.go @@ -139,8 +139,9 @@ func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []ma // HeartbeatResponse contains the server's response to a heartbeat, including any pending actions. type HeartbeatResponse struct { - Status string `json:"status"` - PendingPing *PendingPing `json:"pending_ping,omitempty"` + Status string `json:"status"` + PendingPing *PendingPing `json:"pending_ping,omitempty"` + PendingUpdate *PendingUpdate `json:"pending_update,omitempty"` } // PendingPing represents a ping test request from the server. @@ -148,6 +149,12 @@ type PendingPing struct { ID string `json:"id"` } +// PendingUpdate represents a CLI update request from the server. +type PendingUpdate struct { + ID string `json:"id"` + TargetVersion string `json:"target_version"` +} + func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) { var resp HeartbeatResponse if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ @@ -162,6 +169,11 @@ func (c *Client) ReportPingResult(ctx context.Context, runtimeID, pingID string, return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil) } +// ReportUpdateResult sends the CLI update result back to the server. +func (c *Client) ReportUpdateResult(ctx context.Context, runtimeID, updateID string, result map[string]any) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/update/%s/result", runtimeID, updateID), result, nil) +} + // WorkspaceInfo holds minimal workspace metadata returned by the API. type WorkspaceInfo struct { ID string `json:"id"` diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 6ab5ac93..cabbf60c 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -35,6 +35,10 @@ type Daemon struct { workspaces map[string]*workspaceState runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups reloading sync.Mutex // prevents concurrent reloadWorkspaces + + cancelFunc context.CancelFunc // set by Run(); called by triggerRestart + restartBinary string // non-empty after a successful update; path to the new binary + updating atomic.Bool // prevents concurrent update attempts } // New creates a new Daemon instance. @@ -52,6 +56,10 @@ func New(cfg Config, logger *slog.Logger) *Daemon { // Run starts the daemon: resolves auth, registers runtimes, then polls for tasks. func (d *Daemon) Run(ctx context.Context) error { + // Wrap context so handleUpdate can cancel the daemon for restart. + ctx, cancel := context.WithCancel(ctx) + d.cancelFunc = cancel + // Bind health port early to detect another running daemon. healthLn, err := d.listenHealth() if err != nil { @@ -98,6 +106,12 @@ func (d *Daemon) Run(ctx context.Context) error { return d.pollLoop(ctx) } +// RestartBinary returns the path to the new binary if the daemon needs to restart +// after a successful update, or empty string if no restart is needed. +func (d *Daemon) RestartBinary() string { + return d.restartBinary +} + // deregisterRuntimes notifies the server that all runtimes are going offline. func (d *Daemon) deregisterRuntimes() { runtimeIDs := d.allRuntimeIDs() @@ -440,6 +454,11 @@ func (d *Daemon) heartbeatLoop(ctx context.Context) { go d.handlePing(ctx, *rt, resp.PendingPing.ID) } } + + // Handle pending update requests. + if resp.PendingUpdate != nil { + go d.handleUpdate(ctx, rid, resp.PendingUpdate) + } } } } @@ -519,6 +538,73 @@ func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) { } } +// handleUpdate performs the CLI update when triggered by the server via heartbeat. +func (d *Daemon) handleUpdate(ctx context.Context, runtimeID string, update *PendingUpdate) { + // Prevent concurrent update attempts. + if !d.updating.CompareAndSwap(false, true) { + d.logger.Warn("update already in progress, ignoring", "runtime_id", runtimeID, "update_id", update.ID) + return + } + defer d.updating.Store(false) + + d.logger.Info("CLI update requested", "runtime_id", runtimeID, "update_id", update.ID, "target_version", update.TargetVersion) + + // Report running status. + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "running", + }) + + // Check if installed via Homebrew. + if !cli.IsBrewInstall() { + d.logger.Warn("CLI not installed via Homebrew, cannot auto-update") + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "failed", + "error": "CLI was not installed via Homebrew. Please update manually: https://github.com/multica-ai/multica/releases/latest", + }) + return + } + + // Execute brew upgrade. + d.logger.Info("updating CLI via Homebrew...") + output, err := cli.UpdateViaBrew() + if err != nil { + d.logger.Error("CLI update failed", "error", err, "output", output) + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "failed", + "error": fmt.Sprintf("brew upgrade failed: %v", err), + }) + return + } + + d.logger.Info("CLI update completed successfully", "output", output) + d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{ + "status": "completed", + "output": fmt.Sprintf("Updated to %s", update.TargetVersion), + }) + + // Trigger daemon restart with the new binary. + d.triggerRestart() +} + +// triggerRestart initiates a graceful daemon restart after a successful CLI update. +// It finds the new binary path and cancels the daemon context so Run() returns. +// The caller (cmd_daemon.go) checks RestartBinary() and launches the new process. +func (d *Daemon) triggerRestart() { + newBin, err := cli.DetectNewBinaryPath() + if err != nil { + d.logger.Error("could not find updated binary for restart", "error", err) + return + } + + d.logger.Info("scheduling daemon restart", "new_binary", newBin) + d.restartBinary = newBin + + // Cancel the main context to trigger graceful shutdown. + if d.cancelFunc != nil { + d.cancelFunc() + } +} + func (d *Daemon) usageScanLoop(ctx context.Context) { scanner := usage.NewScanner(d.logger) diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index a7821e91..937ddf9a 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -204,6 +204,14 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { resp["pending_ping"] = map[string]string{"id": pending.ID} } + // Check for pending update requests for this runtime. + if pending := h.UpdateStore.PopPending(req.RuntimeID); pending != nil { + resp["pending_update"] = map[string]string{ + "id": pending.ID, + "target_version": pending.TargetVersion, + } + } + writeJSON(w, http.StatusOK, resp) } diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index cdf81027..48f0dabc 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -40,6 +40,7 @@ type Handler struct { TaskService *service.TaskService EmailService *service.EmailService PingStore *PingStore + UpdateStore *UpdateStore Storage *storage.S3Storage CFSigner *auth.CloudFrontSigner } @@ -59,6 +60,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event TaskService: service.NewTaskService(queries, hub, bus), EmailService: emailService, PingStore: NewPingStore(), + UpdateStore: NewUpdateStore(), Storage: s3, CFSigner: cfSigner, } diff --git a/server/internal/handler/runtime_update.go b/server/internal/handler/runtime_update.go new file mode 100644 index 00000000..f7430544 --- /dev/null +++ b/server/internal/handler/runtime_update.go @@ -0,0 +1,221 @@ +package handler + +import ( + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/go-chi/chi/v5" +) + +// --------------------------------------------------------------------------- +// In-memory update store +// --------------------------------------------------------------------------- + +type UpdateStatus string + +const ( + UpdatePending UpdateStatus = "pending" + UpdateRunning UpdateStatus = "running" + UpdateCompleted UpdateStatus = "completed" + UpdateFailed UpdateStatus = "failed" + UpdateTimeout UpdateStatus = "timeout" +) + +// UpdateRequest represents a pending or completed CLI update request. +type UpdateRequest struct { + ID string `json:"id"` + RuntimeID string `json:"runtime_id"` + Status UpdateStatus `json:"status"` + TargetVersion string `json:"target_version"` + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// UpdateStore is a thread-safe in-memory store for CLI update requests. +type UpdateStore struct { + mu sync.Mutex + requests map[string]*UpdateRequest // keyed by update ID +} + +func NewUpdateStore() *UpdateStore { + return &UpdateStore{ + requests: make(map[string]*UpdateRequest), + } +} + +func (s *UpdateStore) Create(runtimeID, targetVersion string) (*UpdateRequest, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // Clean up old requests (>5 minutes). + for id, req := range s.requests { + if time.Since(req.CreatedAt) > 5*time.Minute { + delete(s.requests, id) + } + } + + // Reject if there is already a pending or running update for this runtime. + for _, req := range s.requests { + if req.RuntimeID == runtimeID && (req.Status == UpdatePending || req.Status == UpdateRunning) { + return nil, errUpdateInProgress + } + } + + req := &UpdateRequest{ + ID: randomID(), + RuntimeID: runtimeID, + Status: UpdatePending, + TargetVersion: targetVersion, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + s.requests[req.ID] = req + return req, nil +} + +var errUpdateInProgress = &updateError{msg: "an update is already in progress for this runtime"} + +type updateError struct{ msg string } + +func (e *updateError) Error() string { return e.msg } + +func (s *UpdateStore) Get(id string) *UpdateRequest { + s.mu.Lock() + defer s.mu.Unlock() + + req, ok := s.requests[id] + if !ok { + return nil + } + // Check for timeout (both pending and running states). + if (req.Status == UpdatePending || req.Status == UpdateRunning) && time.Since(req.CreatedAt) > 120*time.Second { + req.Status = UpdateTimeout + req.Error = "update did not complete within 120 seconds" + req.UpdatedAt = time.Now() + } + return req +} + +// PopPending returns and marks as running the pending update for a runtime. +func (s *UpdateStore) PopPending(runtimeID string) *UpdateRequest { + s.mu.Lock() + defer s.mu.Unlock() + + for _, req := range s.requests { + if req.RuntimeID == runtimeID && req.Status == UpdatePending { + req.Status = UpdateRunning + req.UpdatedAt = time.Now() + return req + } + } + return nil +} + +func (s *UpdateStore) Complete(id string, output string) { + s.mu.Lock() + defer s.mu.Unlock() + + if req, ok := s.requests[id]; ok { + req.Status = UpdateCompleted + req.Output = output + req.UpdatedAt = time.Now() + } +} + +func (s *UpdateStore) Fail(id string, errMsg string) { + s.mu.Lock() + defer s.mu.Unlock() + + if req, ok := s.requests[id]; ok { + req.Status = UpdateFailed + req.Error = errMsg + req.UpdatedAt = time.Now() + } +} + +// --------------------------------------------------------------------------- +// Handlers +// --------------------------------------------------------------------------- + +// InitiateUpdate creates a new CLI update request (protected route, called by frontend). +func (h *Handler) InitiateUpdate(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") + + rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID)) + if err != nil { + writeError(w, http.StatusNotFound, "runtime not found") + return + } + + if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok { + return + } + + var req struct { + TargetVersion string `json:"target_version"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + if req.TargetVersion == "" { + writeError(w, http.StatusBadRequest, "target_version is required") + return + } + + update, err := h.UpdateStore.Create(runtimeID, req.TargetVersion) + if err != nil { + writeError(w, http.StatusConflict, err.Error()) + return + } + + writeJSON(w, http.StatusOK, update) +} + +// GetUpdate returns the status of an update request (protected route, called by frontend). +func (h *Handler) GetUpdate(w http.ResponseWriter, r *http.Request) { + updateID := chi.URLParam(r, "updateId") + + update := h.UpdateStore.Get(updateID) + if update == nil { + writeError(w, http.StatusNotFound, "update not found") + return + } + + writeJSON(w, http.StatusOK, update) +} + +// ReportUpdateResult receives the update result from the daemon. +func (h *Handler) ReportUpdateResult(w http.ResponseWriter, r *http.Request) { + updateID := chi.URLParam(r, "updateId") + + var req struct { + Status string `json:"status"` // "running", "completed", or "failed" + Output string `json:"output"` + Error string `json:"error"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + switch req.Status { + case "completed": + h.UpdateStore.Complete(updateID, req.Output) + case "failed": + h.UpdateStore.Fail(updateID, req.Error) + case "running": + // No-op: status is already "running" from PopPending. This call is + // just a progress signal from the daemon to confirm it received the + // update command and is executing it. + default: + writeError(w, http.StatusBadRequest, "invalid status: "+req.Status) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +}