diff --git a/server/cmd/multica/cmd_daemon.go b/server/cmd/multica/cmd_daemon.go index f0e6184c..9e16a386 100644 --- a/server/cmd/multica/cmd_daemon.go +++ b/server/cmd/multica/cmd_daemon.go @@ -33,7 +33,6 @@ func init() { func runDaemon(cmd *cobra.Command, _ []string) error { overrides := daemon.Overrides{ ServerURL: cli.FlagOrEnv(cmd, "server-url", "MULTICA_SERVER_URL", ""), - WorkspaceID: cli.FlagOrEnv(cmd, "workspace-id", "MULTICA_WORKSPACE_ID", ""), DaemonID: flagString(cmd, "daemon-id"), DeviceName: flagString(cmd, "device-name"), RuntimeName: flagString(cmd, "runtime-name"), @@ -69,4 +68,3 @@ func flagString(cmd *cobra.Command, name string) string { val, _ := cmd.Flags().GetString(name) return val } - diff --git a/server/cmd/multica/cmd_workspace.go b/server/cmd/multica/cmd_workspace.go new file mode 100644 index 00000000..3fa2464e --- /dev/null +++ b/server/cmd/multica/cmd_workspace.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "fmt" + "os" + "text/tabwriter" + "time" + + "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" +) + +var workspaceCmd = &cobra.Command{ + Use: "workspace", + Short: "Manage watched workspaces for the daemon", +} + +var workspaceWatchCmd = &cobra.Command{ + Use: "watch ", + Short: "Add a workspace to the daemon watch list", + Args: cobra.ExactArgs(1), + RunE: runWorkspaceWatch, +} + +var workspaceUnwatchCmd = &cobra.Command{ + Use: "unwatch ", + Short: "Remove a workspace from the daemon watch list", + Args: cobra.ExactArgs(1), + RunE: runWorkspaceUnwatch, +} + +var workspaceListCmd = &cobra.Command{ + Use: "list", + Short: "List watched workspaces", + RunE: runWorkspaceList, +} + +func init() { + workspaceCmd.AddCommand(workspaceWatchCmd) + workspaceCmd.AddCommand(workspaceUnwatchCmd) + workspaceCmd.AddCommand(workspaceListCmd) +} + +func runWorkspaceWatch(cmd *cobra.Command, args []string) error { + workspaceID := args[0] + + // Validate the workspace exists by calling the API. + serverURL := resolveServerURL(cmd) + token := resolveToken() + if token == "" { + return fmt.Errorf("not authenticated: run 'multica auth login' first") + } + + client := cli.NewAPIClient(serverURL, "", token) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var ws struct { + ID string `json:"id"` + Name string `json:"name"` + } + if err := client.GetJSON(ctx, "/api/workspaces/"+workspaceID, &ws); err != nil { + return fmt.Errorf("workspace not found: %w", err) + } + + cfg, err := cli.LoadCLIConfig() + if err != nil { + return err + } + + if !cfg.AddWatchedWorkspace(ws.ID, ws.Name) { + fmt.Fprintf(os.Stderr, "Already watching workspace %s (%s)\n", ws.ID, ws.Name) + return nil + } + + // Also set as default workspace_id if none is set. + if cfg.WorkspaceID == "" { + cfg.WorkspaceID = ws.ID + } + + if err := cli.SaveCLIConfig(cfg); err != nil { + return err + } + + fmt.Fprintf(os.Stderr, "Watching workspace %s (%s)\n", ws.ID, ws.Name) + return nil +} + +func runWorkspaceUnwatch(_ *cobra.Command, args []string) error { + workspaceID := args[0] + + cfg, err := cli.LoadCLIConfig() + if err != nil { + return err + } + + if !cfg.RemoveWatchedWorkspace(workspaceID) { + return fmt.Errorf("workspace %s is not being watched", workspaceID) + } + + if err := cli.SaveCLIConfig(cfg); err != nil { + return err + } + + fmt.Fprintf(os.Stderr, "Stopped watching workspace %s\n", workspaceID) + return nil +} + +func runWorkspaceList(_ *cobra.Command, _ []string) error { + cfg, err := cli.LoadCLIConfig() + if err != nil { + return err + } + + if len(cfg.WatchedWorkspaces) == 0 { + fmt.Fprintln(os.Stderr, "No watched workspaces. Run 'multica workspace watch ' to add one.") + return nil + } + + w := tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0) + fmt.Fprintln(w, "ID\tNAME") + for _, ws := range cfg.WatchedWorkspaces { + name := ws.Name + if name == "" { + name = "-" + } + fmt.Fprintf(w, "%s\t%s\n", ws.ID, name) + } + return w.Flush() +} diff --git a/server/cmd/multica/main.go b/server/cmd/multica/main.go index 95d29807..50699489 100644 --- a/server/cmd/multica/main.go +++ b/server/cmd/multica/main.go @@ -28,6 +28,7 @@ func init() { rootCmd.AddCommand(daemonCmd) rootCmd.AddCommand(agentCmd) rootCmd.AddCommand(runtimeCmd) + rootCmd.AddCommand(workspaceCmd) rootCmd.AddCommand(configCmd) rootCmd.AddCommand(statusCmd) rootCmd.AddCommand(versionCmd) diff --git a/server/internal/cli/config.go b/server/internal/cli/config.go index 7aec4c5d..ba3225df 100644 --- a/server/internal/cli/config.go +++ b/server/internal/cli/config.go @@ -10,11 +10,40 @@ import ( const defaultCLIConfigPath = ".multica/config.json" +// WatchedWorkspace represents a workspace the daemon should monitor for tasks. +type WatchedWorkspace struct { + ID string `json:"id"` + Name string `json:"name,omitempty"` +} + // CLIConfig holds persistent CLI settings. type CLIConfig struct { - ServerURL string `json:"server_url,omitempty"` - WorkspaceID string `json:"workspace_id,omitempty"` - Token string `json:"token,omitempty"` + ServerURL string `json:"server_url,omitempty"` + WorkspaceID string `json:"workspace_id,omitempty"` + Token string `json:"token,omitempty"` + WatchedWorkspaces []WatchedWorkspace `json:"watched_workspaces,omitempty"` +} + +// AddWatchedWorkspace adds a workspace to the watch list. Returns true if added. +func (c *CLIConfig) AddWatchedWorkspace(id, name string) bool { + for _, w := range c.WatchedWorkspaces { + if w.ID == id { + return false + } + } + c.WatchedWorkspaces = append(c.WatchedWorkspaces, WatchedWorkspace{ID: id, Name: name}) + return true +} + +// RemoveWatchedWorkspace removes a workspace from the watch list. Returns true if found. +func (c *CLIConfig) RemoveWatchedWorkspace(id string) bool { + for i, w := range c.WatchedWorkspaces { + if w.ID == id { + c.WatchedWorkspaces = append(c.WatchedWorkspaces[:i], c.WatchedWorkspaces[i+1:]...) + return true + } + } + return false } // CLIConfigPath returns the default path for the CLI config file. diff --git a/server/internal/daemon/config.go b/server/internal/daemon/config.go index ced3d018..7084b57c 100644 --- a/server/internal/daemon/config.go +++ b/server/internal/daemon/config.go @@ -16,13 +16,12 @@ const ( DefaultHeartbeatInterval = 15 * time.Second DefaultAgentTimeout = 2 * time.Hour DefaultRuntimeName = "Local Agent" + DefaultConfigReloadInterval = 5 * time.Second ) // Config holds all daemon configuration. type Config struct { ServerBaseURL string - WorkspaceID string - Token string DaemonID string DeviceName string RuntimeName string @@ -38,7 +37,6 @@ type Config struct { // Zero values are ignored and the env/default value is used instead. type Overrides struct { ServerURL string - WorkspaceID string WorkspacesRoot string PollInterval time.Duration HeartbeatInterval time.Duration @@ -48,8 +46,8 @@ type Overrides struct { RuntimeName string } -// LoadConfig builds the daemon configuration from environment variables, -// persisted config, and optional CLI flag overrides. +// LoadConfig builds the daemon configuration from environment variables +// and optional CLI flag overrides. func LoadConfig(overrides Overrides) (Config, error) { // Server URL: override > env > default rawServerURL := envOrDefault("MULTICA_SERVER_URL", DefaultServerURL) @@ -61,12 +59,6 @@ func LoadConfig(overrides Overrides) (Config, error) { return Config{}, err } - // Workspace ID: override > env (optional — resolved at runtime if empty) - workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID")) - if overrides.WorkspaceID != "" { - workspaceID = overrides.WorkspaceID - } - // Probe available agent CLIs agents := map[string]AgentEntry{} claudePath := envOrDefault("MULTICA_CLAUDE_PATH", "claude") @@ -156,7 +148,6 @@ func LoadConfig(overrides Overrides) (Config, error) { return Config{ ServerBaseURL: serverBaseURL, - WorkspaceID: workspaceID, DaemonID: daemonID, DeviceName: deviceName, RuntimeName: runtimeName, @@ -192,4 +183,3 @@ func NormalizeServerBaseURL(raw string) (string, error) { u.Fragment = "" return strings.TrimRight(u.String(), "/"), nil } - diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 5f137878..5ea867cb 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "log/slog" + "os" "strings" + "sync" "time" "github.com/multica-ai/multica/server/internal/cli" @@ -12,21 +14,10 @@ import ( "github.com/multica-ai/multica/server/pkg/agent" ) -// cliConfigData holds the fields we need from the CLI config. -type cliConfigData struct { - Token string - WorkspaceID string -} - -func loadCLIConfig() (cliConfigData, error) { - cfg, err := cli.LoadCLIConfig() - if err != nil { - return cliConfigData{}, err - } - return cliConfigData{ - Token: cfg.Token, - WorkspaceID: cfg.WorkspaceID, - }, nil +// workspaceState tracks registered runtimes for a single workspace. +type workspaceState struct { + workspaceID string + runtimeIDs []string } // Daemon is the local agent runtime that polls for and executes tasks. @@ -34,14 +25,18 @@ type Daemon struct { cfg Config client *Client logger *slog.Logger + + mu sync.Mutex + workspaces map[string]*workspaceState } // New creates a new Daemon instance. func New(cfg Config, logger *slog.Logger) *Daemon { return &Daemon{ - cfg: cfg, - client: NewClient(cfg.ServerBaseURL), - logger: logger, + cfg: cfg, + client: NewClient(cfg.ServerBaseURL), + logger: logger, + workspaces: make(map[string]*workspaceState), } } @@ -53,68 +48,83 @@ func (d *Daemon) Run(ctx context.Context) error { } d.logger.Info("starting daemon", "agents", agentNames, "server", d.cfg.ServerBaseURL) - // Resolve auth token and workspace from CLI config. - if err := d.resolveAuth(ctx); err != nil { + // Load auth token from CLI config. + if err := d.resolveAuth(); err != nil { return err } - runtimes, err := d.registerRuntimes(ctx) - if err != nil { + // Load and register watched workspaces. + if err := d.loadWatchedWorkspaces(ctx); err != nil { return err } - runtimeIDs := make([]string, 0, len(runtimes)) - for _, rt := range runtimes { - d.logger.Info("registered runtime", "id", rt.ID, "provider", rt.Provider, "status", rt.Status) - runtimeIDs = append(runtimeIDs, rt.ID) + + runtimeIDs := d.allRuntimeIDs() + if len(runtimeIDs) == 0 { + return fmt.Errorf("no runtimes registered") } - go d.heartbeatLoop(ctx, runtimeIDs) - return d.pollLoop(ctx, runtimeIDs) + // Start config watcher for hot-reload. + go d.configWatchLoop(ctx) + + go d.heartbeatLoop(ctx) + return d.pollLoop(ctx) } -// resolveAuth loads the CLI auth token and workspace ID. -// If not authenticated, it waits and retries periodically until the user logs in. -func (d *Daemon) resolveAuth(ctx context.Context) error { - // If workspace ID is already set via flag/env, just need a token. - if d.cfg.WorkspaceID != "" { - if d.cfg.Token != "" { - d.client.SetToken(d.cfg.Token) - d.logger.Info("authenticated", "workspace_id", d.cfg.WorkspaceID) - return nil - } - } - - // Try loading from CLI config. - cfg, _ := loadCLIConfig() - if cfg.Token != "" { - d.client.SetToken(cfg.Token) - if d.cfg.WorkspaceID == "" && cfg.WorkspaceID != "" { - d.cfg.WorkspaceID = cfg.WorkspaceID - } - } - - if d.cfg.Token == "" && cfg.Token == "" { +// resolveAuth loads the auth token from the CLI config. +func (d *Daemon) resolveAuth() error { + cfg, _ := cli.LoadCLIConfig() + if cfg.Token == "" { d.logger.Warn("not authenticated — run 'multica auth login' to authenticate, then restart the daemon") return fmt.Errorf("not authenticated: run 'multica auth login' first") } + d.client.SetToken(cfg.Token) + d.logger.Info("authenticated") + return nil +} - // If we have a token but no workspace ID, fetch the user's workspaces. - if d.cfg.WorkspaceID == "" { - ws, err := d.client.ListWorkspaces(ctx) +// loadWatchedWorkspaces reads watched workspaces from CLI config and registers runtimes. +func (d *Daemon) loadWatchedWorkspaces(ctx context.Context) error { + cfg, err := cli.LoadCLIConfig() + if err != nil { + return fmt.Errorf("load CLI config: %w", err) + } + + if len(cfg.WatchedWorkspaces) == 0 { + return fmt.Errorf("no watched workspaces configured: run 'multica workspace watch ' to add one") + } + + for _, ws := range cfg.WatchedWorkspaces { + runtimes, err := d.registerRuntimesForWorkspace(ctx, ws.ID) if err != nil { - return fmt.Errorf("failed to fetch workspaces: %w (is your token valid? try 'multica auth login')", err) + d.logger.Error("failed to register runtimes", "workspace_id", ws.ID, "name", ws.Name, "error", err) + continue } - if len(ws) == 0 { - return fmt.Errorf("no workspaces found for this account") + runtimeIDs := make([]string, len(runtimes)) + for i, rt := range runtimes { + runtimeIDs[i] = rt.ID + d.logger.Info("registered runtime", "workspace_id", ws.ID, "runtime_id", rt.ID, "provider", rt.Provider) } - d.cfg.WorkspaceID = ws[0].ID - d.logger.Info("using workspace", "workspace_id", ws[0].ID, "name", ws[0].Name) + d.mu.Lock() + d.workspaces[ws.ID] = &workspaceState{workspaceID: ws.ID, runtimeIDs: runtimeIDs} + d.mu.Unlock() + d.logger.Info("watching workspace", "workspace_id", ws.ID, "name", ws.Name, "runtimes", len(runtimes)) } return nil } -func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) { +// allRuntimeIDs returns all runtime IDs across all watched workspaces. +func (d *Daemon) allRuntimeIDs() []string { + d.mu.Lock() + defer d.mu.Unlock() + var ids []string + for _, ws := range d.workspaces { + ids = append(ids, ws.runtimeIDs...) + } + return ids +} + +func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) ([]Runtime, error) { var runtimes []map[string]string for name, entry := range d.cfg.Agents { version, err := agent.DetectVersion(ctx, entry.Path) @@ -134,7 +144,7 @@ func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) { } req := map[string]any{ - "workspace_id": d.cfg.WorkspaceID, + "workspace_id": workspaceID, "daemon_id": d.cfg.DaemonID, "device_name": d.cfg.DeviceName, "runtimes": runtimes, @@ -150,8 +160,91 @@ func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) { return rts, nil } +// configWatchLoop periodically checks for config file changes and reloads workspaces. +func (d *Daemon) configWatchLoop(ctx context.Context) { + configPath, err := cli.CLIConfigPath() + if err != nil { + d.logger.Warn("cannot watch config file", "error", err) + return + } -func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { + var lastModTime time.Time + if info, err := os.Stat(configPath); err == nil { + lastModTime = info.ModTime() + } + + ticker := time.NewTicker(DefaultConfigReloadInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + info, err := os.Stat(configPath) + if err != nil { + continue + } + if !info.ModTime().After(lastModTime) { + continue + } + lastModTime = info.ModTime() + d.reloadWorkspaces(ctx) + } + } +} + +// reloadWorkspaces reconciles the active workspace set with the config file. +func (d *Daemon) reloadWorkspaces(ctx context.Context) { + cfg, err := cli.LoadCLIConfig() + if err != nil { + d.logger.Warn("reload config failed", "error", err) + return + } + + newIDs := make(map[string]string) // id -> name + for _, ws := range cfg.WatchedWorkspaces { + newIDs[ws.ID] = ws.Name + } + + d.mu.Lock() + currentIDs := make(map[string]bool) + for id := range d.workspaces { + currentIDs[id] = true + } + d.mu.Unlock() + + // Register runtimes for newly added workspaces. + for id, name := range newIDs { + if !currentIDs[id] { + runtimes, err := d.registerRuntimesForWorkspace(ctx, id) + if err != nil { + d.logger.Error("register runtimes for new workspace failed", "workspace_id", id, "error", err) + continue + } + runtimeIDs := make([]string, len(runtimes)) + for i, rt := range runtimes { + runtimeIDs[i] = rt.ID + } + d.mu.Lock() + d.workspaces[id] = &workspaceState{workspaceID: id, runtimeIDs: runtimeIDs} + d.mu.Unlock() + d.logger.Info("now watching workspace", "workspace_id", id, "name", name) + } + } + + // Remove workspaces no longer in config. + for id := range currentIDs { + if _, ok := newIDs[id]; !ok { + d.mu.Lock() + delete(d.workspaces, id) + d.mu.Unlock() + d.logger.Info("stopped watching workspace", "workspace_id", id) + } + } +} + +func (d *Daemon) heartbeatLoop(ctx context.Context) { ticker := time.NewTicker(d.cfg.HeartbeatInterval) defer ticker.Stop() @@ -160,7 +253,7 @@ func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { case <-ctx.Done(): return case <-ticker.C: - for _, rid := range runtimeIDs { + for _, rid := range d.allRuntimeIDs() { if err := d.client.SendHeartbeat(ctx, rid); err != nil { d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err) } @@ -169,7 +262,7 @@ func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { } } -func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error { +func (d *Daemon) pollLoop(ctx context.Context) error { pollOffset := 0 pollCount := 0 for { @@ -179,6 +272,14 @@ func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error { default: } + runtimeIDs := d.allRuntimeIDs() + if len(runtimeIDs) == 0 { + if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return err + } + continue + } + claimed := false n := len(runtimeIDs) for i := 0; i < n; i++ {