multica/server/cmd/multica/cmd_daemon.go
Jiayuan 8fa1b163a6 feat(daemon): add --profile flag for multi-environment isolation
Allow running multiple daemon instances against different servers (e.g.
production and local dev) simultaneously. Each profile gets isolated
config, PID file, log file, health port, and workspaces root.

Usage:
  multica login --profile dev --server-url http://localhost:8080
  multica daemon start --profile dev

Default profile (no --profile flag) behavior is unchanged.

Closes MUL-42
2026-03-30 20:21:23 +08:00

428 lines
12 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
"github.com/multica-ai/multica/server/internal/daemon"
logger_pkg "github.com/multica-ai/multica/server/internal/logger"
)
var daemonCmd = &cobra.Command{
Use: "daemon",
Short: "Manage the local agent runtime daemon",
}
var daemonStartCmd = &cobra.Command{
Use: "start",
Short: "Start the local agent runtime daemon",
Long: "Start the daemon process that polls for tasks and executes them using local agent CLIs (Claude, Codex).\nRuns in the background by default. Use --foreground to run in the current terminal.",
RunE: runDaemonStart,
}
var daemonStopCmd = &cobra.Command{
Use: "stop",
Short: "Stop the running daemon",
RunE: runDaemonStop,
}
var daemonStatusCmd = &cobra.Command{
Use: "status",
Short: "Show daemon status",
RunE: runDaemonStatus,
}
var daemonLogsCmd = &cobra.Command{
Use: "logs",
Short: "Show daemon logs",
RunE: runDaemonLogs,
}
func init() {
f := daemonStartCmd.Flags()
f.Bool("foreground", false, "Run in the foreground instead of background")
f.String("daemon-id", "", "Unique daemon identifier (env: MULTICA_DAEMON_ID)")
f.String("device-name", "", "Human-readable device name (env: MULTICA_DAEMON_DEVICE_NAME)")
f.String("runtime-name", "", "Runtime display name (env: MULTICA_AGENT_RUNTIME_NAME)")
f.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)")
f.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)")
f.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)")
f.Int("max-concurrent-tasks", 0, "Max tasks running in parallel (env: MULTICA_DAEMON_MAX_CONCURRENT_TASKS)")
daemonLogsCmd.Flags().BoolP("follow", "f", false, "Follow log output")
daemonLogsCmd.Flags().IntP("lines", "n", 50, "Number of lines to show")
daemonStatusCmd.Flags().String("output", "table", "Output format: table or json")
daemonCmd.AddCommand(daemonStartCmd)
daemonCmd.AddCommand(daemonStopCmd)
daemonCmd.AddCommand(daemonStatusCmd)
daemonCmd.AddCommand(daemonLogsCmd)
}
// daemonDirForProfile returns the state directory for the given profile.
// Empty profile → ~/.multica/, named profile → ~/.multica/profiles/<name>/.
func daemonDirForProfile(profile string) string {
dir, err := cli.ProfileDir(profile)
if err != nil {
return ""
}
return dir
}
func daemonPIDPathForProfile(profile string) string {
return daemonDirForProfile(profile) + "/daemon.pid"
}
func daemonLogPathForProfile(profile string) string {
return daemonDirForProfile(profile) + "/daemon.log"
}
// healthPortForProfile returns the health check port for the given profile.
// Default profile uses the standard port (19514). Named profiles get a
// deterministic offset derived from the profile name.
func healthPortForProfile(profile string) int {
if profile == "" {
return daemon.DefaultHealthPort
}
// Simple hash: sum of bytes mod 1000, offset from base+1.
var h int
for _, b := range []byte(profile) {
h += int(b)
}
return daemon.DefaultHealthPort + 1 + (h % 1000)
}
// --- daemon start ---
func runDaemonStart(cmd *cobra.Command, _ []string) error {
foreground, _ := cmd.Flags().GetBool("foreground")
if foreground {
return runDaemonForeground(cmd)
}
return runDaemonBackground(cmd)
}
func runDaemonBackground(cmd *cobra.Command) error {
profile := resolveProfile(cmd)
healthPort := healthPortForProfile(profile)
// Check if daemon is already running.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
health := checkDaemonHealthOnPort(ctx, healthPort)
if health["status"] == "running" {
label := "daemon"
if profile != "" {
label = fmt.Sprintf("daemon [%s]", profile)
}
return fmt.Errorf("%s is already running (pid %v)", label, health["pid"])
}
// Resolve current executable.
exePath, err := os.Executable()
if err != nil {
return fmt.Errorf("resolve executable path: %w", err)
}
// Build child args: daemon start --foreground + forwarded flags.
args := buildDaemonStartArgs(cmd)
// Ensure daemon directory exists.
dir := daemonDirForProfile(profile)
if err := os.MkdirAll(dir, 0o755); err != nil {
return fmt.Errorf("create daemon directory: %w", err)
}
logPath := daemonLogPathForProfile(profile)
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return fmt.Errorf("open log file %s: %w", logPath, err)
}
child := exec.Command(exePath, args...)
child.Stdout = logFile
child.Stderr = logFile
child.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
if err := child.Start(); err != nil {
logFile.Close()
return fmt.Errorf("start daemon: %w", err)
}
logFile.Close()
// Detach: we don't Wait() on the child — it runs independently.
child.Process.Release()
// Write PID file.
pidPath := daemonPIDPathForProfile(profile)
if err := os.WriteFile(pidPath, []byte(strconv.Itoa(child.Process.Pid)), 0o644); err != nil {
fmt.Fprintf(os.Stderr, "Warning: could not write PID file: %v\n", err)
}
// Wait briefly and verify daemon started via health endpoint.
time.Sleep(2 * time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel2()
health = checkDaemonHealthOnPort(ctx2, healthPort)
if health["status"] != "running" {
fmt.Fprintf(os.Stderr, "Daemon may not have started successfully. Check logs:\n %s\n", logPath)
return nil
}
if profile != "" {
fmt.Fprintf(os.Stderr, "Daemon [%s] started (pid %d)\n", profile, child.Process.Pid)
} else {
fmt.Fprintf(os.Stderr, "Daemon started (pid %d)\n", child.Process.Pid)
}
fmt.Fprintf(os.Stderr, "Logs: %s\n", logPath)
return nil
}
// buildDaemonStartArgs constructs args for the background child process.
func buildDaemonStartArgs(cmd *cobra.Command) []string {
args := []string{"daemon", "start", "--foreground"}
if v := flagString(cmd, "daemon-id"); v != "" {
args = append(args, "--daemon-id", v)
}
if v := flagString(cmd, "device-name"); v != "" {
args = append(args, "--device-name", v)
}
if v := flagString(cmd, "runtime-name"); v != "" {
args = append(args, "--runtime-name", v)
}
if d, _ := cmd.Flags().GetDuration("poll-interval"); d > 0 {
args = append(args, "--poll-interval", d.String())
}
if d, _ := cmd.Flags().GetDuration("heartbeat-interval"); d > 0 {
args = append(args, "--heartbeat-interval", d.String())
}
if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 {
args = append(args, "--agent-timeout", d.String())
}
if n, _ := cmd.Flags().GetInt("max-concurrent-tasks"); n > 0 {
args = append(args, "--max-concurrent-tasks", strconv.Itoa(n))
}
// Forward global persistent flags.
if v, _ := cmd.Flags().GetString("server-url"); v != "" {
args = append(args, "--server-url", v)
}
if v := resolveProfile(cmd); v != "" {
args = append(args, "--profile", v)
}
return args
}
func runDaemonForeground(cmd *cobra.Command) error {
profile := resolveProfile(cmd)
serverURL := cli.FlagOrEnv(cmd, "server-url", "MULTICA_SERVER_URL", "")
if serverURL == "" {
if c, err := cli.LoadCLIConfigForProfile(profile); err == nil && c.ServerURL != "" {
serverURL = c.ServerURL
}
}
overrides := daemon.Overrides{
ServerURL: serverURL,
DaemonID: flagString(cmd, "daemon-id"),
DeviceName: flagString(cmd, "device-name"),
RuntimeName: flagString(cmd, "runtime-name"),
Profile: profile,
HealthPort: healthPortForProfile(profile),
}
if d, _ := cmd.Flags().GetDuration("poll-interval"); d > 0 {
overrides.PollInterval = d
}
if d, _ := cmd.Flags().GetDuration("heartbeat-interval"); d > 0 {
overrides.HeartbeatInterval = d
}
if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 {
overrides.AgentTimeout = d
}
if n, _ := cmd.Flags().GetInt("max-concurrent-tasks"); n > 0 {
overrides.MaxConcurrentTasks = n
}
cfg, err := daemon.LoadConfig(overrides)
if err != nil {
return err
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
logger := logger_pkg.NewLogger("daemon")
d := daemon.New(cfg, logger)
// Write PID file so "daemon stop" can find us.
if dir := daemonDirForProfile(profile); dir != "" {
os.MkdirAll(dir, 0o755)
os.WriteFile(daemonPIDPathForProfile(profile), []byte(strconv.Itoa(os.Getpid())), 0o644)
}
defer os.Remove(daemonPIDPathForProfile(profile))
if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
// --- daemon stop ---
func runDaemonStop(cmd *cobra.Command, _ []string) error {
profile := resolveProfile(cmd)
healthPort := healthPortForProfile(profile)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
health := checkDaemonHealthOnPort(ctx, healthPort)
if health["status"] != "running" {
label := "Daemon"
if profile != "" {
label = fmt.Sprintf("Daemon [%s]", profile)
}
fmt.Fprintf(os.Stderr, "%s is not running.\n", label)
return nil
}
pid, ok := health["pid"].(float64)
if !ok || pid == 0 {
return fmt.Errorf("could not determine daemon PID from health endpoint")
}
process, err := os.FindProcess(int(pid))
if err != nil {
return fmt.Errorf("find process %d: %w", int(pid), err)
}
if err := process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("stop daemon (pid %d): %w", int(pid), err)
}
fmt.Fprintf(os.Stderr, "Stopping daemon (pid %d)...\n", int(pid))
// Poll health endpoint until daemon is gone.
for i := 0; i < 10; i++ {
time.Sleep(500 * time.Millisecond)
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
h := checkDaemonHealthOnPort(ctx2, healthPort)
cancel2()
if h["status"] != "running" {
os.Remove(daemonPIDPathForProfile(profile))
fmt.Fprintln(os.Stderr, "Daemon stopped.")
return nil
}
}
fmt.Fprintln(os.Stderr, "Daemon is still stopping. It may be finishing a running task.")
return nil
}
// --- daemon status ---
func runDaemonStatus(cmd *cobra.Command, _ []string) error {
profile := resolveProfile(cmd)
healthPort := healthPortForProfile(profile)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
health := checkDaemonHealthOnPort(ctx, healthPort)
output, _ := cmd.Flags().GetString("output")
if output == "json" {
return cli.PrintJSON(os.Stdout, health)
}
label := "Daemon"
if profile != "" {
label = fmt.Sprintf("Daemon [%s]", profile)
}
if health["status"] != "running" {
fmt.Fprintf(os.Stdout, "%s: stopped\n", label)
return nil
}
fmt.Fprintf(os.Stdout, "%s: running (pid %v, uptime %v)\n", label, health["pid"], health["uptime"])
if agents, ok := health["agents"].([]any); ok && len(agents) > 0 {
parts := make([]string, len(agents))
for i, a := range agents {
parts[i] = fmt.Sprint(a)
}
fmt.Fprintf(os.Stdout, "Agents: %s\n", strings.Join(parts, ", "))
}
if ws, ok := health["workspaces"].([]any); ok {
fmt.Fprintf(os.Stdout, "Workspaces: %d\n", len(ws))
}
return nil
}
// --- daemon logs ---
func runDaemonLogs(cmd *cobra.Command, _ []string) error {
profile := resolveProfile(cmd)
logPath := daemonLogPathForProfile(profile)
if _, err := os.Stat(logPath); os.IsNotExist(err) {
return fmt.Errorf("no log file found at %s\nThe daemon may not have been started in background mode", logPath)
}
follow, _ := cmd.Flags().GetBool("follow")
lines, _ := cmd.Flags().GetInt("lines")
args := []string{"-n", strconv.Itoa(lines)}
if follow {
args = append(args, "-f")
}
args = append(args, logPath)
tail := exec.Command("tail", args...)
tail.Stdout = os.Stdout
tail.Stderr = os.Stderr
return tail.Run()
}
// checkDaemonHealthOnPort calls the daemon's local health endpoint on the given port.
func checkDaemonHealthOnPort(ctx context.Context, port int) map[string]any {
addr := fmt.Sprintf("http://127.0.0.1:%d/health", port)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil)
if err != nil {
return map[string]any{"status": "stopped"}
}
httpClient := &http.Client{Timeout: 2 * time.Second}
resp, err := httpClient.Do(req)
if err != nil {
return map[string]any{"status": "stopped"}
}
defer resp.Body.Close()
var result map[string]any
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return map[string]any{"status": "stopped"}
}
return result
}
// flagString returns a string flag value or empty string.
func flagString(cmd *cobra.Command, name string) string {
val, _ := cmd.Flags().GetString(name)
return val
}