feat(runtime): support CLI update from web runtime page (#331)

* feat(runtime): support CLI update from web runtime page

Add the ability to update the CLI daemon from the web Runtime detail page.
When a newer version is available on GitHub Releases, an update button
appears. Clicking it sends an update command through the server to the
daemon via the heartbeat mechanism (same pattern as ping). The daemon
executes `brew upgrade`, reports the result, and restarts itself with the
new binary.

Changes across all three layers:
- Frontend: version display, GitHub latest check, UpdateSection component
- Server: UpdateStore (in-memory), heartbeat extension, 3 new endpoints
- CLI: shared update logic, daemon handleUpdate + graceful restart

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(runtime): handle 'running' status in ReportUpdateResult

The daemon sends {"status":"running"} when it starts executing the
update, but ReportUpdateResult treated any non-"completed" status as
failure — immediately marking the update as failed before brew upgrade
even ran.

Fix: use a switch statement to handle "running" as a no-op (status is
already "running" from PopPending), and also timeout running updates
after 120 seconds in case brew upgrade hangs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
LinYushen 2026-04-02 14:12:49 +08:00 committed by GitHub
parent a80d61f8e1
commit fdba410f11
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 764 additions and 92 deletions

View file

@ -2,9 +2,20 @@ import type { AgentRuntime } from "@/shared/types";
import { formatLastSeen } from "../utils"; import { formatLastSeen } from "../utils";
import { RuntimeModeIcon, StatusBadge, InfoField } from "./shared"; import { RuntimeModeIcon, StatusBadge, InfoField } from "./shared";
import { PingSection } from "./ping-section"; import { PingSection } from "./ping-section";
import { UpdateSection } from "./update-section";
import { UsageSection } from "./usage-section"; import { UsageSection } from "./usage-section";
function getCliVersion(metadata: Record<string, unknown>): string | null {
if (metadata && typeof metadata.version === "string" && metadata.version) {
return metadata.version;
}
return null;
}
export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) { export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) {
const cliVersion =
runtime.runtime_mode === "local" ? getCliVersion(runtime.metadata) : null;
return ( return (
<div className="flex h-full flex-col"> <div className="flex h-full flex-col">
{/* Header */} {/* Header */}
@ -43,6 +54,20 @@ export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) {
)} )}
</div> </div>
{/* CLI Version & Update */}
{runtime.runtime_mode === "local" && (
<div>
<h3 className="text-xs font-medium text-muted-foreground mb-3">
CLI Version
</h3>
<UpdateSection
runtimeId={runtime.id}
currentVersion={cliVersion}
isOnline={runtime.status === "online"}
/>
</div>
)}
{/* Connection Test */} {/* Connection Test */}
<div> <div>
<h3 className="text-xs font-medium text-muted-foreground mb-3"> <h3 className="text-xs font-medium text-muted-foreground mb-3">

View file

@ -0,0 +1,227 @@
import { useState, useEffect, useCallback, useRef } from "react";
import {
Loader2,
CheckCircle2,
XCircle,
ArrowUpCircle,
Check,
} from "lucide-react";
import { Button } from "@/components/ui/button";
import { api } from "@/shared/api";
import type { RuntimeUpdateStatus } from "@/shared/types";
const GITHUB_RELEASES_URL =
"https://api.github.com/repos/multica-ai/multica/releases/latest";
const CACHE_TTL_MS = 10 * 60 * 1000; // 10 minutes
let cachedLatestVersion: string | null = null;
let cachedAt = 0;
async function fetchLatestVersion(): Promise<string | null> {
if (cachedLatestVersion && Date.now() - cachedAt < CACHE_TTL_MS) {
return cachedLatestVersion;
}
try {
const resp = await fetch(GITHUB_RELEASES_URL, {
headers: { Accept: "application/vnd.github+json" },
});
if (!resp.ok) return null;
const data = await resp.json();
cachedLatestVersion = data.tag_name ?? null;
cachedAt = Date.now();
return cachedLatestVersion;
} catch {
return null;
}
}
function stripV(v: string): string {
return v.replace(/^v/, "");
}
function isNewer(latest: string, current: string): boolean {
const l = stripV(latest).split(".").map(Number);
const c = stripV(current).split(".").map(Number);
for (let i = 0; i < Math.max(l.length, c.length); i++) {
const lv = l[i] ?? 0;
const cv = c[i] ?? 0;
if (lv > cv) return true;
if (lv < cv) return false;
}
return false;
}
const statusConfig: Record<
RuntimeUpdateStatus,
{ label: string; icon: typeof Loader2; color: string }
> = {
pending: {
label: "Waiting for daemon...",
icon: Loader2,
color: "text-muted-foreground",
},
running: {
label: "Updating...",
icon: Loader2,
color: "text-info",
},
completed: {
label: "Update complete. Daemon is restarting...",
icon: CheckCircle2,
color: "text-success",
},
failed: { label: "Update failed", icon: XCircle, color: "text-destructive" },
timeout: { label: "Timeout", icon: XCircle, color: "text-warning" },
};
interface UpdateSectionProps {
runtimeId: string;
currentVersion: string | null;
isOnline: boolean;
}
export function UpdateSection({
runtimeId,
currentVersion,
isOnline,
}: UpdateSectionProps) {
const [latestVersion, setLatestVersion] = useState<string | null>(null);
const [status, setStatus] = useState<RuntimeUpdateStatus | null>(null);
const [error, setError] = useState("");
const [output, setOutput] = useState("");
const [updating, setUpdating] = useState(false);
const pollRef = useRef<ReturnType<typeof setInterval> | null>(null);
const cleanup = useCallback(() => {
if (pollRef.current) {
clearInterval(pollRef.current);
pollRef.current = null;
}
}, []);
useEffect(() => cleanup, [cleanup]);
// Fetch latest version on mount.
useEffect(() => {
fetchLatestVersion().then(setLatestVersion);
}, []);
const handleUpdate = async () => {
if (!latestVersion) return;
cleanup();
setUpdating(true);
setStatus("pending");
setError("");
setOutput("");
try {
const update = await api.initiateUpdate(runtimeId, latestVersion);
pollRef.current = setInterval(async () => {
try {
const result = await api.getUpdateResult(runtimeId, update.id);
setStatus(result.status as RuntimeUpdateStatus);
if (result.status === "completed") {
setOutput(result.output ?? "");
setUpdating(false);
cleanup();
} else if (
result.status === "failed" ||
result.status === "timeout"
) {
setError(result.error ?? "Unknown error");
setUpdating(false);
cleanup();
}
} catch {
// ignore poll errors
}
}, 2000);
} catch {
setStatus("failed");
setError("Failed to initiate update");
setUpdating(false);
}
};
const hasUpdate =
currentVersion &&
latestVersion &&
isNewer(latestVersion, currentVersion);
const config = status ? statusConfig[status] : null;
const Icon = config?.icon;
const isActive = status === "pending" || status === "running";
return (
<div className="space-y-2">
<div className="flex items-center gap-2 flex-wrap">
<span className="text-xs text-muted-foreground">CLI Version:</span>
<span className="text-xs font-mono">
{currentVersion ?? "unknown"}
</span>
{!hasUpdate && currentVersion && latestVersion && !status && (
<span className="inline-flex items-center gap-1 text-xs text-success">
<Check className="h-3 w-3" />
Latest
</span>
)}
{hasUpdate && !status && (
<>
<span className="text-xs text-muted-foreground"></span>
<span className="text-xs font-mono text-info">
{latestVersion}
</span>
<span className="text-xs text-muted-foreground">available</span>
</>
)}
{hasUpdate && isOnline && !status && (
<Button
variant="outline"
size="xs"
onClick={handleUpdate}
disabled={updating}
>
<ArrowUpCircle className="h-3 w-3" />
Update
</Button>
)}
{config && Icon && (
<span
className={`inline-flex items-center gap-1 text-xs ${config.color}`}
>
<Icon className={`h-3 w-3 ${isActive ? "animate-spin" : ""}`} />
{config.label}
</span>
)}
</div>
{status === "completed" && output && (
<div className="rounded-lg border bg-success/5 px-3 py-2">
<p className="text-xs text-success">{output}</p>
</div>
)}
{(status === "failed" || status === "timeout") && error && (
<div className="rounded-lg border border-destructive/20 bg-destructive/5 px-3 py-2">
<p className="text-xs text-destructive">{error}</p>
{status === "failed" && (
<Button
variant="ghost"
size="xs"
className="mt-1"
onClick={handleUpdate}
>
Retry
</Button>
)}
</div>
)}
</div>
);
}

View file

@ -31,6 +31,7 @@ import type {
RuntimeUsage, RuntimeUsage,
RuntimeHourlyActivity, RuntimeHourlyActivity,
RuntimePing, RuntimePing,
RuntimeUpdate,
TimelineEntry, TimelineEntry,
TaskMessagePayload, TaskMessagePayload,
Attachment, Attachment,
@ -344,6 +345,23 @@ export class ApiClient {
return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`); return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`);
} }
async initiateUpdate(
runtimeId: string,
targetVersion: string,
): Promise<RuntimeUpdate> {
return this.fetch(`/api/runtimes/${runtimeId}/update`, {
method: "POST",
body: JSON.stringify({ target_version: targetVersion }),
});
}
async getUpdateResult(
runtimeId: string,
updateId: string,
): Promise<RuntimeUpdate> {
return this.fetch(`/api/runtimes/${runtimeId}/update/${updateId}`);
}
async listAgentTasks(agentId: string): Promise<AgentTask[]> { async listAgentTasks(agentId: string): Promise<AgentTask[]> {
return this.fetch(`/api/agents/${agentId}/tasks`); return this.fetch(`/api/agents/${agentId}/tasks`);
} }

View file

@ -174,3 +174,21 @@ export interface RuntimeHourlyActivity {
hour: number; hour: number;
count: number; count: number;
} }
export type RuntimeUpdateStatus =
| "pending"
| "running"
| "completed"
| "failed"
| "timeout";
export interface RuntimeUpdate {
id: string;
runtime_id: string;
status: RuntimeUpdateStatus;
target_version: string;
output?: string;
error?: string;
created_at: string;
updated_at: string;
}

View file

@ -21,6 +21,8 @@ export type {
RuntimeHourlyActivity, RuntimeHourlyActivity,
RuntimePing, RuntimePing,
RuntimePingStatus, RuntimePingStatus,
RuntimeUpdate,
RuntimeUpdateStatus,
} from "./agent"; } from "./agent";
export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace"; export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace";
export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox";

View file

@ -280,6 +280,39 @@ func runDaemonForeground(cmd *cobra.Command) error {
if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
return err return err
} }
// Check if the daemon needs to restart after a CLI update.
if restartBin := d.RestartBinary(); restartBin != "" {
logger.Info("restarting daemon with updated binary", "path", restartBin)
args := buildDaemonStartArgs(cmd)
child := exec.Command(restartBin, args...)
logPath := daemonLogPathForProfile(profile)
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
logger.Error("failed to open log file for restart", "error", err)
return nil
}
child.Stdout = logFile
child.Stderr = logFile
child.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
if err := child.Start(); err != nil {
logFile.Close()
logger.Error("failed to start new daemon", "error", err)
return nil
}
logFile.Close()
child.Process.Release()
// Write new PID file.
pidPath := daemonPIDPathForProfile(profile)
os.WriteFile(pidPath, []byte(strconv.Itoa(child.Process.Pid)), 0o644)
logger.Info("new daemon started", "pid", child.Process.Pid)
}
return nil return nil
} }

View file

@ -1,16 +1,13 @@
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http"
"os" "os"
"os/exec"
"path/filepath"
"strings" "strings"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
) )
var updateCmd = &cobra.Command{ var updateCmd = &cobra.Command{
@ -19,17 +16,11 @@ var updateCmd = &cobra.Command{
RunE: runUpdate, RunE: runUpdate,
} }
// githubRelease is the subset of the GitHub releases API response we need.
type githubRelease struct {
TagName string `json:"tag_name"`
HTMLURL string `json:"html_url"`
}
func runUpdate(_ *cobra.Command, _ []string) error { func runUpdate(_ *cobra.Command, _ []string) error {
fmt.Fprintf(os.Stderr, "Current version: %s (commit: %s)\n", version, commit) fmt.Fprintf(os.Stderr, "Current version: %s (commit: %s)\n", version, commit)
// Check latest version from GitHub. // Check latest version from GitHub.
latest, err := fetchLatestRelease() latest, err := cli.FetchLatestRelease()
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Warning: could not check latest version: %v\n", err) fmt.Fprintf(os.Stderr, "Warning: could not check latest version: %v\n", err)
} else { } else {
@ -43,8 +34,15 @@ func runUpdate(_ *cobra.Command, _ []string) error {
} }
// Detect installation method and update accordingly. // Detect installation method and update accordingly.
if isBrewInstall() { if cli.IsBrewInstall() {
return updateViaBrew() fmt.Fprintln(os.Stderr, "Updating via Homebrew...")
output, err := cli.UpdateViaBrew()
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n", output)
return fmt.Errorf("brew upgrade failed: %w\nYou can try manually: brew upgrade multica-ai/tap/multica", err)
}
fmt.Fprintln(os.Stderr, "Update complete.")
return nil
} }
// Not installed via brew — show manual instructions. // Not installed via brew — show manual instructions.
@ -57,79 +55,3 @@ func runUpdate(_ *cobra.Command, _ []string) error {
fmt.Fprintln(os.Stderr, " https://github.com/multica-ai/multica/releases/latest") fmt.Fprintln(os.Stderr, " https://github.com/multica-ai/multica/releases/latest")
return nil return nil
} }
// isBrewInstall checks whether the running multica binary was installed via Homebrew.
func isBrewInstall() bool {
exePath, err := os.Executable()
if err != nil {
return false
}
// Resolve symlinks (brew links binaries from Cellar into prefix/bin).
resolved, err := filepath.EvalSymlinks(exePath)
if err != nil {
resolved = exePath
}
// Check if the resolved path is inside a Homebrew prefix.
// Common prefixes: /opt/homebrew (Apple Silicon), /usr/local (Intel Mac), or custom.
brewPrefix := getBrewPrefix()
if brewPrefix != "" && strings.HasPrefix(resolved, brewPrefix) {
return true
}
// Fallback: check well-known Homebrew paths.
for _, prefix := range []string{"/opt/homebrew", "/usr/local", "/home/linuxbrew/.linuxbrew"} {
if strings.HasPrefix(resolved, prefix+"/Cellar/") {
return true
}
}
return false
}
// getBrewPrefix returns the Homebrew prefix by running `brew --prefix`, or empty string.
func getBrewPrefix() string {
out, err := exec.Command("brew", "--prefix").Output()
if err != nil {
return ""
}
return strings.TrimSpace(string(out))
}
func updateViaBrew() error {
fmt.Fprintln(os.Stderr, "Updating via Homebrew...")
cmd := exec.Command("brew", "upgrade", "multica-ai/tap/multica")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("brew upgrade failed: %w\nYou can try manually: brew upgrade multica-ai/tap/multica", err)
}
fmt.Fprintln(os.Stderr, "Update complete.")
return nil
}
func fetchLatestRelease() (*githubRelease, error) {
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequest(http.MethodGet, "https://api.github.com/repos/multica-ai/multica/releases/latest", nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/vnd.github+json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GitHub API returned %d", resp.StatusCode)
}
var release githubRelease
if err := json.NewDecoder(resp.Body).Decode(&release); err != nil {
return nil, err
}
return &release, nil
}

View file

@ -95,6 +95,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage) r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage)
r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult) r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult)
r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult)
r.Get("/tasks/{taskId}/status", h.GetTaskStatus) r.Get("/tasks/{taskId}/status", h.GetTaskStatus)
r.Post("/tasks/{taskId}/start", h.StartTask) r.Post("/tasks/{taskId}/start", h.StartTask)
@ -224,6 +225,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Get("/{runtimeId}/activity", h.GetRuntimeTaskActivity) r.Get("/{runtimeId}/activity", h.GetRuntimeTaskActivity)
r.Post("/{runtimeId}/ping", h.InitiatePing) r.Post("/{runtimeId}/ping", h.InitiatePing)
r.Get("/{runtimeId}/ping/{pingId}", h.GetPing) r.Get("/{runtimeId}/ping/{pingId}", h.GetPing)
r.Post("/{runtimeId}/update", h.InitiateUpdate)
r.Get("/{runtimeId}/update/{updateId}", h.GetUpdate)
}) })
// Inbox // Inbox

View file

@ -0,0 +1,95 @@
package cli
import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
// GitHubRelease is the subset of the GitHub releases API response we need.
type GitHubRelease struct {
TagName string `json:"tag_name"`
HTMLURL string `json:"html_url"`
}
// FetchLatestRelease fetches the latest release tag from the multica GitHub repo.
func FetchLatestRelease() (*GitHubRelease, error) {
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequest(http.MethodGet, "https://api.github.com/repos/multica-ai/multica/releases/latest", nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/vnd.github+json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GitHub API returned %d", resp.StatusCode)
}
var release GitHubRelease
if err := json.NewDecoder(resp.Body).Decode(&release); err != nil {
return nil, err
}
return &release, nil
}
// IsBrewInstall checks whether the running multica binary was installed via Homebrew.
func IsBrewInstall() bool {
exePath, err := os.Executable()
if err != nil {
return false
}
resolved, err := filepath.EvalSymlinks(exePath)
if err != nil {
resolved = exePath
}
brewPrefix := GetBrewPrefix()
if brewPrefix != "" && strings.HasPrefix(resolved, brewPrefix) {
return true
}
for _, prefix := range []string{"/opt/homebrew", "/usr/local", "/home/linuxbrew/.linuxbrew"} {
if strings.HasPrefix(resolved, prefix+"/Cellar/") {
return true
}
}
return false
}
// GetBrewPrefix returns the Homebrew prefix by running `brew --prefix`, or empty string.
func GetBrewPrefix() string {
out, err := exec.Command("brew", "--prefix").Output()
if err != nil {
return ""
}
return strings.TrimSpace(string(out))
}
// UpdateViaBrew runs `brew upgrade multica-ai/tap/multica`.
// Returns the combined output and any error.
func UpdateViaBrew() (string, error) {
cmd := exec.Command("brew", "upgrade", "multica-ai/tap/multica")
out, err := cmd.CombinedOutput()
if err != nil {
return string(out), fmt.Errorf("brew upgrade failed: %w", err)
}
return string(out), nil
}
// DetectNewBinaryPath returns the path to the multica binary after an update.
// It uses exec.LookPath to find the binary in PATH, which will resolve to the
// updated version after a brew upgrade.
func DetectNewBinaryPath() (string, error) {
return exec.LookPath("multica")
}

View file

@ -139,8 +139,9 @@ func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []ma
// HeartbeatResponse contains the server's response to a heartbeat, including any pending actions. // HeartbeatResponse contains the server's response to a heartbeat, including any pending actions.
type HeartbeatResponse struct { type HeartbeatResponse struct {
Status string `json:"status"` Status string `json:"status"`
PendingPing *PendingPing `json:"pending_ping,omitempty"` PendingPing *PendingPing `json:"pending_ping,omitempty"`
PendingUpdate *PendingUpdate `json:"pending_update,omitempty"`
} }
// PendingPing represents a ping test request from the server. // PendingPing represents a ping test request from the server.
@ -148,6 +149,12 @@ type PendingPing struct {
ID string `json:"id"` ID string `json:"id"`
} }
// PendingUpdate represents a CLI update request from the server.
type PendingUpdate struct {
ID string `json:"id"`
TargetVersion string `json:"target_version"`
}
func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) { func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) {
var resp HeartbeatResponse var resp HeartbeatResponse
if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
@ -162,6 +169,11 @@ func (c *Client) ReportPingResult(ctx context.Context, runtimeID, pingID string,
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil) return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil)
} }
// ReportUpdateResult sends the CLI update result back to the server.
func (c *Client) ReportUpdateResult(ctx context.Context, runtimeID, updateID string, result map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/update/%s/result", runtimeID, updateID), result, nil)
}
// WorkspaceInfo holds minimal workspace metadata returned by the API. // WorkspaceInfo holds minimal workspace metadata returned by the API.
type WorkspaceInfo struct { type WorkspaceInfo struct {
ID string `json:"id"` ID string `json:"id"`

View file

@ -35,6 +35,10 @@ type Daemon struct {
workspaces map[string]*workspaceState workspaces map[string]*workspaceState
runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups
reloading sync.Mutex // prevents concurrent reloadWorkspaces reloading sync.Mutex // prevents concurrent reloadWorkspaces
cancelFunc context.CancelFunc // set by Run(); called by triggerRestart
restartBinary string // non-empty after a successful update; path to the new binary
updating atomic.Bool // prevents concurrent update attempts
} }
// New creates a new Daemon instance. // New creates a new Daemon instance.
@ -52,6 +56,10 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
// Run starts the daemon: resolves auth, registers runtimes, then polls for tasks. // Run starts the daemon: resolves auth, registers runtimes, then polls for tasks.
func (d *Daemon) Run(ctx context.Context) error { func (d *Daemon) Run(ctx context.Context) error {
// Wrap context so handleUpdate can cancel the daemon for restart.
ctx, cancel := context.WithCancel(ctx)
d.cancelFunc = cancel
// Bind health port early to detect another running daemon. // Bind health port early to detect another running daemon.
healthLn, err := d.listenHealth() healthLn, err := d.listenHealth()
if err != nil { if err != nil {
@ -98,6 +106,12 @@ func (d *Daemon) Run(ctx context.Context) error {
return d.pollLoop(ctx) return d.pollLoop(ctx)
} }
// RestartBinary returns the path to the new binary if the daemon needs to restart
// after a successful update, or empty string if no restart is needed.
func (d *Daemon) RestartBinary() string {
return d.restartBinary
}
// deregisterRuntimes notifies the server that all runtimes are going offline. // deregisterRuntimes notifies the server that all runtimes are going offline.
func (d *Daemon) deregisterRuntimes() { func (d *Daemon) deregisterRuntimes() {
runtimeIDs := d.allRuntimeIDs() runtimeIDs := d.allRuntimeIDs()
@ -440,6 +454,11 @@ func (d *Daemon) heartbeatLoop(ctx context.Context) {
go d.handlePing(ctx, *rt, resp.PendingPing.ID) go d.handlePing(ctx, *rt, resp.PendingPing.ID)
} }
} }
// Handle pending update requests.
if resp.PendingUpdate != nil {
go d.handleUpdate(ctx, rid, resp.PendingUpdate)
}
} }
} }
} }
@ -519,6 +538,73 @@ func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
} }
} }
// handleUpdate performs the CLI update when triggered by the server via heartbeat.
func (d *Daemon) handleUpdate(ctx context.Context, runtimeID string, update *PendingUpdate) {
// Prevent concurrent update attempts.
if !d.updating.CompareAndSwap(false, true) {
d.logger.Warn("update already in progress, ignoring", "runtime_id", runtimeID, "update_id", update.ID)
return
}
defer d.updating.Store(false)
d.logger.Info("CLI update requested", "runtime_id", runtimeID, "update_id", update.ID, "target_version", update.TargetVersion)
// Report running status.
d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{
"status": "running",
})
// Check if installed via Homebrew.
if !cli.IsBrewInstall() {
d.logger.Warn("CLI not installed via Homebrew, cannot auto-update")
d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{
"status": "failed",
"error": "CLI was not installed via Homebrew. Please update manually: https://github.com/multica-ai/multica/releases/latest",
})
return
}
// Execute brew upgrade.
d.logger.Info("updating CLI via Homebrew...")
output, err := cli.UpdateViaBrew()
if err != nil {
d.logger.Error("CLI update failed", "error", err, "output", output)
d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{
"status": "failed",
"error": fmt.Sprintf("brew upgrade failed: %v", err),
})
return
}
d.logger.Info("CLI update completed successfully", "output", output)
d.client.ReportUpdateResult(ctx, runtimeID, update.ID, map[string]any{
"status": "completed",
"output": fmt.Sprintf("Updated to %s", update.TargetVersion),
})
// Trigger daemon restart with the new binary.
d.triggerRestart()
}
// triggerRestart initiates a graceful daemon restart after a successful CLI update.
// It finds the new binary path and cancels the daemon context so Run() returns.
// The caller (cmd_daemon.go) checks RestartBinary() and launches the new process.
func (d *Daemon) triggerRestart() {
newBin, err := cli.DetectNewBinaryPath()
if err != nil {
d.logger.Error("could not find updated binary for restart", "error", err)
return
}
d.logger.Info("scheduling daemon restart", "new_binary", newBin)
d.restartBinary = newBin
// Cancel the main context to trigger graceful shutdown.
if d.cancelFunc != nil {
d.cancelFunc()
}
}
func (d *Daemon) usageScanLoop(ctx context.Context) { func (d *Daemon) usageScanLoop(ctx context.Context) {
scanner := usage.NewScanner(d.logger) scanner := usage.NewScanner(d.logger)

View file

@ -204,6 +204,14 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
resp["pending_ping"] = map[string]string{"id": pending.ID} resp["pending_ping"] = map[string]string{"id": pending.ID}
} }
// Check for pending update requests for this runtime.
if pending := h.UpdateStore.PopPending(req.RuntimeID); pending != nil {
resp["pending_update"] = map[string]string{
"id": pending.ID,
"target_version": pending.TargetVersion,
}
}
writeJSON(w, http.StatusOK, resp) writeJSON(w, http.StatusOK, resp)
} }

View file

@ -40,6 +40,7 @@ type Handler struct {
TaskService *service.TaskService TaskService *service.TaskService
EmailService *service.EmailService EmailService *service.EmailService
PingStore *PingStore PingStore *PingStore
UpdateStore *UpdateStore
Storage *storage.S3Storage Storage *storage.S3Storage
CFSigner *auth.CloudFrontSigner CFSigner *auth.CloudFrontSigner
} }
@ -59,6 +60,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
TaskService: service.NewTaskService(queries, hub, bus), TaskService: service.NewTaskService(queries, hub, bus),
EmailService: emailService, EmailService: emailService,
PingStore: NewPingStore(), PingStore: NewPingStore(),
UpdateStore: NewUpdateStore(),
Storage: s3, Storage: s3,
CFSigner: cfSigner, CFSigner: cfSigner,
} }

View file

@ -0,0 +1,221 @@
package handler
import (
"encoding/json"
"net/http"
"sync"
"time"
"github.com/go-chi/chi/v5"
)
// ---------------------------------------------------------------------------
// In-memory update store
// ---------------------------------------------------------------------------
type UpdateStatus string
const (
UpdatePending UpdateStatus = "pending"
UpdateRunning UpdateStatus = "running"
UpdateCompleted UpdateStatus = "completed"
UpdateFailed UpdateStatus = "failed"
UpdateTimeout UpdateStatus = "timeout"
)
// UpdateRequest represents a pending or completed CLI update request.
type UpdateRequest struct {
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status UpdateStatus `json:"status"`
TargetVersion string `json:"target_version"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// UpdateStore is a thread-safe in-memory store for CLI update requests.
type UpdateStore struct {
mu sync.Mutex
requests map[string]*UpdateRequest // keyed by update ID
}
func NewUpdateStore() *UpdateStore {
return &UpdateStore{
requests: make(map[string]*UpdateRequest),
}
}
func (s *UpdateStore) Create(runtimeID, targetVersion string) (*UpdateRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Clean up old requests (>5 minutes).
for id, req := range s.requests {
if time.Since(req.CreatedAt) > 5*time.Minute {
delete(s.requests, id)
}
}
// Reject if there is already a pending or running update for this runtime.
for _, req := range s.requests {
if req.RuntimeID == runtimeID && (req.Status == UpdatePending || req.Status == UpdateRunning) {
return nil, errUpdateInProgress
}
}
req := &UpdateRequest{
ID: randomID(),
RuntimeID: runtimeID,
Status: UpdatePending,
TargetVersion: targetVersion,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
s.requests[req.ID] = req
return req, nil
}
var errUpdateInProgress = &updateError{msg: "an update is already in progress for this runtime"}
type updateError struct{ msg string }
func (e *updateError) Error() string { return e.msg }
func (s *UpdateStore) Get(id string) *UpdateRequest {
s.mu.Lock()
defer s.mu.Unlock()
req, ok := s.requests[id]
if !ok {
return nil
}
// Check for timeout (both pending and running states).
if (req.Status == UpdatePending || req.Status == UpdateRunning) && time.Since(req.CreatedAt) > 120*time.Second {
req.Status = UpdateTimeout
req.Error = "update did not complete within 120 seconds"
req.UpdatedAt = time.Now()
}
return req
}
// PopPending returns and marks as running the pending update for a runtime.
func (s *UpdateStore) PopPending(runtimeID string) *UpdateRequest {
s.mu.Lock()
defer s.mu.Unlock()
for _, req := range s.requests {
if req.RuntimeID == runtimeID && req.Status == UpdatePending {
req.Status = UpdateRunning
req.UpdatedAt = time.Now()
return req
}
}
return nil
}
func (s *UpdateStore) Complete(id string, output string) {
s.mu.Lock()
defer s.mu.Unlock()
if req, ok := s.requests[id]; ok {
req.Status = UpdateCompleted
req.Output = output
req.UpdatedAt = time.Now()
}
}
func (s *UpdateStore) Fail(id string, errMsg string) {
s.mu.Lock()
defer s.mu.Unlock()
if req, ok := s.requests[id]; ok {
req.Status = UpdateFailed
req.Error = errMsg
req.UpdatedAt = time.Now()
}
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
// InitiateUpdate creates a new CLI update request (protected route, called by frontend).
func (h *Handler) InitiateUpdate(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
var req struct {
TargetVersion string `json:"target_version"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.TargetVersion == "" {
writeError(w, http.StatusBadRequest, "target_version is required")
return
}
update, err := h.UpdateStore.Create(runtimeID, req.TargetVersion)
if err != nil {
writeError(w, http.StatusConflict, err.Error())
return
}
writeJSON(w, http.StatusOK, update)
}
// GetUpdate returns the status of an update request (protected route, called by frontend).
func (h *Handler) GetUpdate(w http.ResponseWriter, r *http.Request) {
updateID := chi.URLParam(r, "updateId")
update := h.UpdateStore.Get(updateID)
if update == nil {
writeError(w, http.StatusNotFound, "update not found")
return
}
writeJSON(w, http.StatusOK, update)
}
// ReportUpdateResult receives the update result from the daemon.
func (h *Handler) ReportUpdateResult(w http.ResponseWriter, r *http.Request) {
updateID := chi.URLParam(r, "updateId")
var req struct {
Status string `json:"status"` // "running", "completed", or "failed"
Output string `json:"output"`
Error string `json:"error"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
switch req.Status {
case "completed":
h.UpdateStore.Complete(updateID, req.Output)
case "failed":
h.UpdateStore.Fail(updateID, req.Error)
case "running":
// No-op: status is already "running" from PopPending. This call is
// just a progress signal from the daemon to confirm it received the
// update command and is executing it.
default:
writeError(w, http.StatusBadRequest, "invalid status: "+req.Status)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}