diff --git a/.gitignore b/.gitignore index 95713cb0..d902de1c 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ server/bin/ server/tmp/ server/migrate server/daemon +server/multica-cli # Test artifacts test-results/ diff --git a/Makefile b/Makefile index 31d3b693..d5c0b55b 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: dev daemon build test migrate-up migrate-down sqlc seed clean setup start stop check worktree-env setup-main start-main stop-main check-main setup-worktree start-worktree stop-worktree check-worktree +.PHONY: dev daemon cli build test migrate-up migrate-down sqlc seed clean setup start stop check worktree-env setup-main start-main stop-main check-main setup-worktree start-worktree stop-worktree check-worktree MAIN_ENV_FILE ?= .env WORKTREE_ENV_FILE ?= .env.worktree @@ -113,11 +113,14 @@ dev: cd server && go run ./cmd/server daemon: - cd server && MULTICA_REPOS_ROOT="${MULTICA_REPOS_ROOT:-$(abspath .)}" go run ./cmd/daemon + cd server && MULTICA_REPOS_ROOT="${MULTICA_REPOS_ROOT:-$(abspath .)}" go run ./cmd/multica daemon + +cli: + cd server && go run ./cmd/multica $(ARGS) build: cd server && go build -o bin/server ./cmd/server - cd server && go build -o bin/daemon ./cmd/daemon + cd server && go build -o bin/multica-cli ./cmd/multica test: cd server && go test ./... diff --git a/server/cmd/daemon/daemon.go b/server/cmd/daemon/daemon.go deleted file mode 100644 index dfea6bc5..00000000 --- a/server/cmd/daemon/daemon.go +++ /dev/null @@ -1,816 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "log" - "net/http" - "net/url" - "os" - "os/exec" - "path/filepath" - "strings" - "time" - - "github.com/multica-ai/multica/server/pkg/agent" -) - -const ( - defaultServerURL = "ws://localhost:8080/ws" - defaultDaemonConfigPath = ".multica/daemon.json" - defaultPollInterval = 3 * time.Second - defaultHeartbeatInterval = 15 * time.Second - defaultAgentTimeout = 20 * time.Minute - defaultRuntimeName = "Local Agent" -) - -// agentEntry describes a single available agent CLI. -type agentEntry struct { - Path string // path to CLI binary - Model string // model override (optional) -} - -type config struct { - ServerBaseURL string - ConfigPath string - WorkspaceID string - DaemonID string - DeviceName string - RuntimeName string - Agents map[string]agentEntry // "claude" -> entry, "codex" -> entry - ReposRoot string // parent directory containing all repos - PollInterval time.Duration - HeartbeatInterval time.Duration - AgentTimeout time.Duration -} - -type daemon struct { - cfg config - client *daemonClient - logger *log.Logger -} - -type daemonClient struct { - baseURL string - client *http.Client -} - -type daemonRuntime struct { - ID string `json:"id"` - Name string `json:"name"` - Provider string `json:"provider"` - Status string `json:"status"` -} - -type daemonPairingSession struct { - Token string `json:"token"` - DaemonID string `json:"daemon_id"` - DeviceName string `json:"device_name"` - RuntimeName string `json:"runtime_name"` - RuntimeType string `json:"runtime_type"` - RuntimeVersion string `json:"runtime_version"` - WorkspaceID *string `json:"workspace_id"` - Status string `json:"status"` - ApprovedAt *string `json:"approved_at"` - ClaimedAt *string `json:"claimed_at"` - ExpiresAt string `json:"expires_at"` - LinkURL *string `json:"link_url"` -} - -type daemonPersistedConfig struct { - WorkspaceID string `json:"workspace_id"` -} - -type daemonTask struct { - ID string `json:"id"` - AgentID string `json:"agent_id"` - IssueID string `json:"issue_id"` - Context daemonTaskContext `json:"context"` -} - -type daemonTaskContext struct { - Issue daemonIssueContext `json:"issue"` - Agent daemonAgentContext `json:"agent"` - Runtime daemonRuntimeContext `json:"runtime"` -} - -type daemonIssueContext struct { - ID string `json:"id"` - Title string `json:"title"` - Description string `json:"description"` - AcceptanceCriteria []string `json:"acceptance_criteria"` - ContextRefs []string `json:"context_refs"` - Repository *daemonRepoRef `json:"repository"` -} - -type daemonAgentContext struct { - ID string `json:"id"` - Name string `json:"name"` - Skills string `json:"skills"` -} - -type daemonRuntimeContext struct { - ID string `json:"id"` - Name string `json:"name"` - Provider string `json:"provider"` - DeviceInfo string `json:"device_info"` -} - -type daemonRepoRef struct { - URL string `json:"url"` - Branch string `json:"branch"` - Path string `json:"path"` -} - -type taskResult struct { - Status string `json:"status"` - Comment string `json:"comment"` -} - -func loadConfig() (config, error) { - serverBaseURL, err := normalizeServerBaseURL(envOrDefault("MULTICA_SERVER_URL", defaultServerURL)) - if err != nil { - return config{}, err - } - - configPath, err := resolveDaemonConfigPath(strings.TrimSpace(os.Getenv("MULTICA_DAEMON_CONFIG"))) - if err != nil { - return config{}, err - } - persisted, err := loadPersistedDaemonConfig(configPath) - if err != nil { - return config{}, err - } - workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID")) - if workspaceID == "" { - workspaceID = persisted.WorkspaceID - } - - // Probe available agent CLIs. - agents := map[string]agentEntry{} - claudePath := envOrDefault("MULTICA_CLAUDE_PATH", "claude") - if _, err := exec.LookPath(claudePath); err == nil { - agents["claude"] = agentEntry{ - Path: claudePath, - Model: strings.TrimSpace(os.Getenv("MULTICA_CLAUDE_MODEL")), - } - } - codexPath := envOrDefault("MULTICA_CODEX_PATH", "codex") - if _, err := exec.LookPath(codexPath); err == nil { - agents["codex"] = agentEntry{ - Path: codexPath, - Model: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")), - } - } - if len(agents) == 0 { - return config{}, fmt.Errorf("no agent CLI found: install claude or codex and ensure it is on PATH") - } - - host, err := os.Hostname() - if err != nil || strings.TrimSpace(host) == "" { - host = "local-machine" - } - - reposRoot := strings.TrimSpace(os.Getenv("MULTICA_REPOS_ROOT")) - if reposRoot == "" { - reposRoot, err = os.Getwd() - if err != nil { - return config{}, fmt.Errorf("resolve working directory: %w", err) - } - } - reposRoot, err = filepath.Abs(reposRoot) - if err != nil { - return config{}, fmt.Errorf("resolve absolute repos root: %w", err) - } - - pollInterval, err := durationFromEnv("MULTICA_DAEMON_POLL_INTERVAL", defaultPollInterval) - if err != nil { - return config{}, err - } - heartbeatInterval, err := durationFromEnv("MULTICA_DAEMON_HEARTBEAT_INTERVAL", defaultHeartbeatInterval) - if err != nil { - return config{}, err - } - agentTimeout, err := durationFromEnv("MULTICA_AGENT_TIMEOUT", defaultAgentTimeout) - if err != nil { - return config{}, err - } - - return config{ - ServerBaseURL: serverBaseURL, - ConfigPath: configPath, - WorkspaceID: workspaceID, - DaemonID: envOrDefault("MULTICA_DAEMON_ID", host), - DeviceName: envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host), - RuntimeName: envOrDefault("MULTICA_AGENT_RUNTIME_NAME", defaultRuntimeName), - Agents: agents, - ReposRoot: reposRoot, - PollInterval: pollInterval, - HeartbeatInterval: heartbeatInterval, - AgentTimeout: agentTimeout, - }, nil -} - -func newDaemon(cfg config, logger *log.Logger) *daemon { - return &daemon{ - cfg: cfg, - client: &daemonClient{baseURL: cfg.ServerBaseURL, client: &http.Client{Timeout: 30 * time.Second}}, - logger: logger, - } -} - -func (d *daemon) run(ctx context.Context) error { - agentNames := make([]string, 0, len(d.cfg.Agents)) - for name := range d.cfg.Agents { - agentNames = append(agentNames, name) - } - d.logger.Printf("starting daemon agents=%v workspace=%s server=%s repos_root=%s", - agentNames, d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.ReposRoot) - - if strings.TrimSpace(d.cfg.WorkspaceID) == "" { - workspaceID, err := d.ensurePaired(ctx) - if err != nil { - return err - } - d.cfg.WorkspaceID = workspaceID - d.logger.Printf("pairing completed for workspace=%s", workspaceID) - } - - runtimes, err := d.registerRuntimes(ctx) - if err != nil { - return err - } - runtimeIDs := make([]string, 0, len(runtimes)) - for _, rt := range runtimes { - d.logger.Printf("registered runtime id=%s provider=%s status=%s", rt.ID, rt.Provider, rt.Status) - runtimeIDs = append(runtimeIDs, rt.ID) - } - - go d.heartbeatLoop(ctx, runtimeIDs) - return d.pollLoop(ctx, runtimeIDs) -} - -func (d *daemon) registerRuntimes(ctx context.Context) ([]daemonRuntime, error) { - var runtimes []map[string]string - for name, entry := range d.cfg.Agents { - version, err := agent.DetectVersion(ctx, entry.Path) - if err != nil { - d.logger.Printf("skip registering %s: %v", name, err) - continue - } - runtimes = append(runtimes, map[string]string{ - "name": fmt.Sprintf("Local %s", strings.ToUpper(name[:1])+name[1:]), - "type": name, - "version": version, - "status": "online", - }) - } - if len(runtimes) == 0 { - return nil, fmt.Errorf("no agent runtimes could be registered") - } - - req := map[string]any{ - "workspace_id": d.cfg.WorkspaceID, - "daemon_id": d.cfg.DaemonID, - "device_name": d.cfg.DeviceName, - "runtimes": runtimes, - } - - var resp struct { - Runtimes []daemonRuntime `json:"runtimes"` - } - if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil { - return nil, fmt.Errorf("register runtimes: %w", err) - } - if len(resp.Runtimes) == 0 { - return nil, fmt.Errorf("register runtimes: empty response") - } - return resp.Runtimes, nil -} - -func (d *daemon) ensurePaired(ctx context.Context) (string, error) { - // Use a deterministic agent for the pairing session metadata (prefer codex for backward compat). - var firstName string - var firstEntry agentEntry - for _, preferred := range []string{"codex", "claude"} { - if entry, ok := d.cfg.Agents[preferred]; ok { - firstName = preferred - firstEntry = entry - break - } - } - version, err := agent.DetectVersion(ctx, firstEntry.Path) - if err != nil { - return "", err - } - - session, err := d.client.createPairingSession(ctx, map[string]string{ - "daemon_id": d.cfg.DaemonID, - "device_name": d.cfg.DeviceName, - "runtime_name": d.cfg.RuntimeName, - "runtime_type": firstName, - "runtime_version": version, - }) - if err != nil { - return "", fmt.Errorf("create pairing session: %w", err) - } - if session.LinkURL != nil { - d.logger.Printf("open this link to pair the daemon: %s", *session.LinkURL) - } else { - d.logger.Printf("pairing session created: %s", session.Token) - } - - for { - select { - case <-ctx.Done(): - return "", ctx.Err() - default: - } - - current, err := d.client.getPairingSession(ctx, session.Token) - if err != nil { - return "", fmt.Errorf("poll pairing session: %w", err) - } - - switch current.Status { - case "approved", "claimed": - if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" { - return "", fmt.Errorf("pairing session approved without workspace") - } - if err := savePersistedDaemonConfig(d.cfg.ConfigPath, daemonPersistedConfig{ - WorkspaceID: strings.TrimSpace(*current.WorkspaceID), - }); err != nil { - return "", err - } - if current.Status != "claimed" { - if _, err := d.client.claimPairingSession(ctx, current.Token); err != nil { - return "", fmt.Errorf("claim pairing session: %w", err) - } - } - return strings.TrimSpace(*current.WorkspaceID), nil - case "expired": - return "", fmt.Errorf("pairing session expired before approval") - } - - if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { - return "", err - } - } -} - -func (d *daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { - ticker := time.NewTicker(d.cfg.HeartbeatInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - for _, rid := range runtimeIDs { - err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ - "runtime_id": rid, - }, nil) - if err != nil { - d.logger.Printf("heartbeat failed for runtime %s: %v", rid, err) - } - } - } - } -} - -func (d *daemon) pollLoop(ctx context.Context, runtimeIDs []string) error { - pollOffset := 0 - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - claimed := false - n := len(runtimeIDs) - for i := 0; i < n; i++ { - rid := runtimeIDs[(pollOffset+i)%n] - task, err := d.client.claimTask(ctx, rid) - if err != nil { - d.logger.Printf("claim task failed for runtime %s: %v", rid, err) - continue - } - if task != nil { - d.logger.Printf("poll: got task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title) - d.handleTask(ctx, *task) - claimed = true - pollOffset = (pollOffset + i + 1) % n - break - } - } - - if !claimed { - pollOffset = (pollOffset + 1) % n - if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { - return err - } - } - } -} - -func (d *daemon) handleTask(ctx context.Context, task daemonTask) { - provider := task.Context.Runtime.Provider - d.logger.Printf("picked task=%s issue=%s provider=%s title=%q", task.ID, task.IssueID, provider, task.Context.Issue.Title) - - if err := d.client.startTask(ctx, task.ID); err != nil { - d.logger.Printf("start task %s failed: %v", task.ID, err) - return - } - - _ = d.client.reportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2) - - result, err := d.runTask(ctx, task) - if err != nil { - d.logger.Printf("task %s failed: %v", task.ID, err) - if failErr := d.client.failTask(ctx, task.ID, err.Error()); failErr != nil { - d.logger.Printf("fail task %s callback failed: %v", task.ID, failErr) - } - return - } - - _ = d.client.reportProgress(ctx, task.ID, "Finishing task", 2, 2) - - switch result.Status { - case "blocked": - if err := d.client.failTask(ctx, task.ID, result.Comment); err != nil { - d.logger.Printf("report blocked task %s failed: %v", task.ID, err) - } - default: - if err := d.client.completeTask(ctx, task.ID, result.Comment); err != nil { - d.logger.Printf("complete task %s failed: %v", task.ID, err) - } - } -} - -func (d *daemon) runTask(ctx context.Context, task daemonTask) (taskResult, error) { - provider := task.Context.Runtime.Provider - entry, ok := d.cfg.Agents[provider] - if !ok { - return taskResult{}, fmt.Errorf("no agent configured for provider %q", provider) - } - - workdir, err := resolveTaskWorkdir(d.cfg.ReposRoot, task.Context.Issue.Repository) - if err != nil { - return taskResult{}, err - } - - prompt := buildPrompt(task, workdir) - - backend, err := agent.New(provider, agent.Config{ - ExecutablePath: entry.Path, - Logger: d.logger, - }) - if err != nil { - return taskResult{}, fmt.Errorf("create agent backend: %w", err) - } - - d.logger.Printf( - "starting %s task=%s workdir=%s model=%s timeout=%s", - provider, task.ID, workdir, entry.Model, d.cfg.AgentTimeout, - ) - - session, err := backend.Execute(ctx, prompt, agent.ExecOptions{ - Cwd: workdir, - Model: entry.Model, - Timeout: d.cfg.AgentTimeout, - }) - if err != nil { - return taskResult{}, err - } - - // Drain message channel (log tool uses, ignore text since Result has output) - go func() { - for msg := range session.Messages { - switch msg.Type { - case agent.MessageToolUse: - d.logger.Printf("[%s] tool-use: %s (call=%s)", provider, msg.Tool, msg.CallID) - case agent.MessageError: - d.logger.Printf("[%s] error: %s", provider, msg.Content) - } - } - }() - - result := <-session.Result - - switch result.Status { - case "completed": - if result.Output == "" { - return taskResult{}, fmt.Errorf("%s returned empty output", provider) - } - return taskResult{Status: "completed", Comment: result.Output}, nil - case "timeout": - return taskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout) - default: - errMsg := result.Error - if errMsg == "" { - errMsg = fmt.Sprintf("%s execution %s", provider, result.Status) - } - return taskResult{Status: "blocked", Comment: errMsg}, nil - } -} - -func buildPrompt(task daemonTask, workdir string) string { - var b strings.Builder - b.WriteString("You are running as a local coding agent for a Multica workspace.\n") - b.WriteString("Complete the assigned issue using the local environment.\n") - b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n") - b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n") - - fmt.Fprintf(&b, "Working directory: %s\n", workdir) - fmt.Fprintf(&b, "Agent: %s\n", task.Context.Agent.Name) - fmt.Fprintf(&b, "Issue title: %s\n\n", task.Context.Issue.Title) - - if task.Context.Issue.Description != "" { - b.WriteString("Issue description:\n") - b.WriteString(task.Context.Issue.Description) - b.WriteString("\n\n") - } - - if len(task.Context.Issue.AcceptanceCriteria) > 0 { - b.WriteString("Acceptance criteria:\n") - for _, item := range task.Context.Issue.AcceptanceCriteria { - fmt.Fprintf(&b, "- %s\n", item) - } - b.WriteString("\n") - } - - if len(task.Context.Issue.ContextRefs) > 0 { - b.WriteString("Context refs:\n") - for _, item := range task.Context.Issue.ContextRefs { - fmt.Fprintf(&b, "- %s\n", item) - } - b.WriteString("\n") - } - - if repo := task.Context.Issue.Repository; repo != nil { - b.WriteString("Repository context:\n") - if repo.URL != "" { - fmt.Fprintf(&b, "- url: %s\n", repo.URL) - } - if repo.Branch != "" { - fmt.Fprintf(&b, "- branch: %s\n", repo.Branch) - } - if repo.Path != "" { - fmt.Fprintf(&b, "- path: %s\n", repo.Path) - } - b.WriteString("\n") - } - - if task.Context.Agent.Skills != "" { - b.WriteString("Agent skills/instructions:\n") - b.WriteString(task.Context.Agent.Skills) - b.WriteString("\n\n") - } - - b.WriteString("Comment requirements:\n") - b.WriteString("- Lead with the outcome.\n") - b.WriteString("- Mention concrete files or commands if you changed anything.\n") - b.WriteString("- Mention blockers or follow-up actions if relevant.\n") - - return b.String() -} - -func resolveTaskWorkdir(reposRoot string, repo *daemonRepoRef) (string, error) { - base := reposRoot - if repo == nil || strings.TrimSpace(repo.Path) == "" { - return base, nil - } - - path := strings.TrimSpace(repo.Path) - if !filepath.IsAbs(path) { - path = filepath.Join(base, path) - } - path = filepath.Clean(path) - - info, err := os.Stat(path) - if err != nil { - return "", fmt.Errorf("repository path not found: %s", path) - } - if !info.IsDir() { - return "", fmt.Errorf("repository path is not a directory: %s", path) - } - return path, nil -} - -func resolveDaemonConfigPath(raw string) (string, error) { - if raw != "" { - return filepath.Abs(raw) - } - - home, err := os.UserHomeDir() - if err != nil { - return "", fmt.Errorf("resolve daemon config path: %w", err) - } - return filepath.Join(home, defaultDaemonConfigPath), nil -} - -func loadPersistedDaemonConfig(path string) (daemonPersistedConfig, error) { - data, err := os.ReadFile(path) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return daemonPersistedConfig{}, nil - } - return daemonPersistedConfig{}, fmt.Errorf("read daemon config: %w", err) - } - - var cfg daemonPersistedConfig - if err := json.Unmarshal(data, &cfg); err != nil { - return daemonPersistedConfig{}, fmt.Errorf("parse daemon config: %w", err) - } - return cfg, nil -} - -func savePersistedDaemonConfig(path string, cfg daemonPersistedConfig) error { - if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { - return fmt.Errorf("create daemon config directory: %w", err) - } - data, err := json.MarshalIndent(cfg, "", " ") - if err != nil { - return fmt.Errorf("encode daemon config: %w", err) - } - if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil { - return fmt.Errorf("write daemon config: %w", err) - } - return nil -} - -func normalizeServerBaseURL(raw string) (string, error) { - u, err := url.Parse(strings.TrimSpace(raw)) - if err != nil { - return "", fmt.Errorf("invalid MULTICA_SERVER_URL: %w", err) - } - switch u.Scheme { - case "ws": - u.Scheme = "http" - case "wss": - u.Scheme = "https" - case "http", "https": - default: - return "", fmt.Errorf("MULTICA_SERVER_URL must use ws, wss, http, or https") - } - if u.Path == "/ws" { - u.Path = "" - } - u.RawPath = "" - u.RawQuery = "" - u.Fragment = "" - return strings.TrimRight(u.String(), "/"), nil -} - -func durationFromEnv(key string, fallback time.Duration) (time.Duration, error) { - value := strings.TrimSpace(os.Getenv(key)) - if value == "" { - return fallback, nil - } - d, err := time.ParseDuration(value) - if err != nil { - return 0, fmt.Errorf("%s: invalid duration %q: %w", key, value, err) - } - return d, nil -} - -func envOrDefault(key, fallback string) string { - value := strings.TrimSpace(os.Getenv(key)) - if value == "" { - return fallback - } - return value -} - -func sleepWithContext(ctx context.Context, d time.Duration) error { - timer := time.NewTimer(d) - defer timer.Stop() - - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: - return nil - } -} - -func (c *daemonClient) claimTask(ctx context.Context, runtimeID string) (*daemonTask, error) { - var resp struct { - Task *daemonTask `json:"task"` - } - if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil { - return nil, err - } - return resp.Task, nil -} - -func (c *daemonClient) createPairingSession(ctx context.Context, req map[string]string) (daemonPairingSession, error) { - var resp daemonPairingSession - if err := c.postJSON(ctx, "/api/daemon/pairing-sessions", req, &resp); err != nil { - return daemonPairingSession{}, err - } - return resp, nil -} - -func (c *daemonClient) getPairingSession(ctx context.Context, token string) (daemonPairingSession, error) { - var resp daemonPairingSession - if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s", url.PathEscape(token)), &resp); err != nil { - return daemonPairingSession{}, err - } - return resp, nil -} - -func (c *daemonClient) claimPairingSession(ctx context.Context, token string) (daemonPairingSession, error) { - var resp daemonPairingSession - if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s/claim", url.PathEscape(token)), map[string]any{}, &resp); err != nil { - return daemonPairingSession{}, err - } - return resp, nil -} - -func (c *daemonClient) startTask(ctx context.Context, taskID string) error { - return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil) -} - -func (c *daemonClient) reportProgress(ctx context.Context, taskID, summary string, step, total int) error { - return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{ - "summary": summary, - "step": step, - "total": total, - }, nil) -} - -func (c *daemonClient) completeTask(ctx context.Context, taskID, output string) error { - return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), map[string]any{ - "output": output, - }, nil) -} - -func (c *daemonClient) failTask(ctx context.Context, taskID, errMsg string) error { - return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{ - "error": errMsg, - }, nil) -} - -func (c *daemonClient) postJSON(ctx context.Context, path string, reqBody any, respBody any) error { - var body io.Reader - if reqBody != nil { - data, err := json.Marshal(reqBody) - if err != nil { - return err - } - body = bytes.NewReader(data) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode >= 400 { - data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) - return fmt.Errorf("%s %s returned %d: %s", http.MethodPost, path, resp.StatusCode, strings.TrimSpace(string(data))) - } - if respBody == nil { - io.Copy(io.Discard, resp.Body) - return nil - } - return json.NewDecoder(resp.Body).Decode(respBody) -} - -func (c *daemonClient) getJSON(ctx context.Context, path string, respBody any) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) - if err != nil { - return err - } - - resp, err := c.client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode >= 400 { - data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) - return fmt.Errorf("%s %s returned %d: %s", http.MethodGet, path, resp.StatusCode, strings.TrimSpace(string(data))) - } - if respBody == nil { - io.Copy(io.Discard, resp.Body) - return nil - } - return json.NewDecoder(resp.Body).Decode(respBody) -} - diff --git a/server/cmd/daemon/main.go b/server/cmd/daemon/main.go deleted file mode 100644 index 11d9fd93..00000000 --- a/server/cmd/daemon/main.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "context" - "errors" - "log" - "os" - "os/signal" - "syscall" -) - -func main() { - cfg, err := loadConfig() - if err != nil { - log.Fatal(err) - } - - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() - - logger := log.New(os.Stdout, "multica-daemon: ", log.LstdFlags) - d := newDaemon(cfg, logger) - - if err := d.run(ctx); err != nil && !errors.Is(err, context.Canceled) { - logger.Fatal(err) - } -} diff --git a/server/cmd/multica/cmd_agent.go b/server/cmd/multica/cmd_agent.go new file mode 100644 index 00000000..69fcaf60 --- /dev/null +++ b/server/cmd/multica/cmd_agent.go @@ -0,0 +1,172 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" +) + +var agentCmd = &cobra.Command{ + Use: "agent", + Short: "Manage agents", +} + +var agentListCmd = &cobra.Command{ + Use: "list", + Short: "List agents in the workspace", + RunE: runAgentList, +} + +var agentGetCmd = &cobra.Command{ + Use: "get ", + Short: "Get agent details", + Args: cobra.ExactArgs(1), + RunE: runAgentGet, +} + +var agentDeleteCmd = &cobra.Command{ + Use: "delete ", + Short: "Delete an agent", + Args: cobra.ExactArgs(1), + RunE: runAgentDelete, +} + +var agentStopCmd = &cobra.Command{ + Use: "stop ", + Short: "Stop an agent (set status to offline)", + Args: cobra.ExactArgs(1), + RunE: runAgentStop, +} + +func init() { + agentCmd.AddCommand(agentListCmd) + agentCmd.AddCommand(agentGetCmd) + agentCmd.AddCommand(agentDeleteCmd) + agentCmd.AddCommand(agentStopCmd) + + agentListCmd.Flags().String("output", "table", "Output format: table or json") +} + +func newAPIClient(cmd *cobra.Command) (*cli.APIClient, error) { + serverURL := resolveServerURL(cmd) + workspaceID := resolveWorkspaceID(cmd) + + if serverURL == "" { + return nil, fmt.Errorf("server URL not set: use --server-url flag, MULTICA_SERVER_URL env, or 'multica-cli config set server_url '") + } + + return cli.NewAPIClient(serverURL, workspaceID), nil +} + +func resolveServerURL(cmd *cobra.Command) string { + val := cli.FlagOrEnv(cmd, "server-url", "MULTICA_SERVER_URL", "") + if val != "" { + return val + } + cfg, err := cli.LoadCLIConfig() + if err != nil { + return "http://localhost:8080" + } + if cfg.ServerURL != "" { + return cfg.ServerURL + } + return "http://localhost:8080" +} + +func resolveWorkspaceID(cmd *cobra.Command) string { + val := cli.FlagOrEnv(cmd, "workspace-id", "MULTICA_WORKSPACE_ID", "") + if val != "" { + return val + } + cfg, _ := cli.LoadCLIConfig() + return cfg.WorkspaceID +} + +func runAgentList(cmd *cobra.Command, _ []string) error { + client, err := newAPIClient(cmd) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var agents []map[string]any + path := "/api/agents" + if client.WorkspaceID != "" { + path += "?workspace_id=" + client.WorkspaceID + } + if err := client.GetJSON(ctx, path, &agents); err != nil { + return fmt.Errorf("list agents: %w", err) + } + + output, _ := cmd.Flags().GetString("output") + if output == "json" { + return cli.PrintJSON(os.Stdout, agents) + } + + headers := []string{"ID", "NAME", "STATUS", "RUNTIME"} + rows := make([][]string, 0, len(agents)) + for _, a := range agents { + rows = append(rows, []string{ + strVal(a, "id"), + strVal(a, "name"), + strVal(a, "status"), + strVal(a, "runtime_mode"), + }) + } + cli.PrintTable(os.Stdout, headers, rows) + return nil +} + +func runAgentGet(cmd *cobra.Command, args []string) error { + client, err := newAPIClient(cmd) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var agent map[string]any + if err := client.GetJSON(ctx, "/api/agents/"+args[0], &agent); err != nil { + return fmt.Errorf("get agent: %w", err) + } + + return cli.PrintJSON(os.Stdout, agent) +} + +func runAgentDelete(cmd *cobra.Command, args []string) error { + client, err := newAPIClient(cmd) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if err := client.DeleteJSON(ctx, "/api/agents/"+args[0]); err != nil { + return fmt.Errorf("delete agent: %w", err) + } + + fmt.Fprintf(os.Stderr, "Agent %s deleted.\n", args[0]) + return nil +} + +func runAgentStop(cmd *cobra.Command, args []string) error { + // TODO: implement agent stop (PUT /api/agents/{id} with status=offline) + return fmt.Errorf("agent stop is not yet implemented") +} + +func strVal(m map[string]any, key string) string { + v, ok := m[key] + if !ok || v == nil { + return "" + } + return fmt.Sprintf("%v", v) +} diff --git a/server/cmd/multica/cmd_config.go b/server/cmd/multica/cmd_config.go new file mode 100644 index 00000000..a9b5d1f5 --- /dev/null +++ b/server/cmd/multica/cmd_config.go @@ -0,0 +1,79 @@ +package main + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" +) + +var configCmd = &cobra.Command{ + Use: "config", + Short: "Manage CLI configuration", +} + +var configShowCmd = &cobra.Command{ + Use: "show", + Short: "Show current CLI configuration", + RunE: runConfigShow, +} + +var configSetCmd = &cobra.Command{ + Use: "set ", + Short: "Set a CLI configuration value", + Long: "Supported keys: server_url, workspace_id", + Args: cobra.ExactArgs(2), + RunE: runConfigSet, +} + +func init() { + configCmd.AddCommand(configShowCmd) + configCmd.AddCommand(configSetCmd) +} + +func runConfigShow(_ *cobra.Command, _ []string) error { + cfg, err := cli.LoadCLIConfig() + if err != nil { + return err + } + + path, _ := cli.CLIConfigPath() + fmt.Fprintf(os.Stdout, "Config file: %s\n", path) + fmt.Fprintf(os.Stdout, "server_url: %s\n", valueOrDefault(cfg.ServerURL, "(not set)")) + fmt.Fprintf(os.Stdout, "workspace_id: %s\n", valueOrDefault(cfg.WorkspaceID, "(not set)")) + return nil +} + +func runConfigSet(_ *cobra.Command, args []string) error { + key, value := args[0], args[1] + + cfg, err := cli.LoadCLIConfig() + if err != nil { + return err + } + + switch key { + case "server_url": + cfg.ServerURL = value + case "workspace_id": + cfg.WorkspaceID = value + default: + return fmt.Errorf("unknown config key %q (supported: server_url, workspace_id)", key) + } + + if err := cli.SaveCLIConfig(cfg); err != nil { + return err + } + + fmt.Fprintf(os.Stderr, "Set %s = %s\n", key, value) + return nil +} + +func valueOrDefault(v, fallback string) string { + if v == "" { + return fallback + } + return v +} diff --git a/server/cmd/multica/cmd_daemon.go b/server/cmd/multica/cmd_daemon.go new file mode 100644 index 00000000..e70f2a5f --- /dev/null +++ b/server/cmd/multica/cmd_daemon.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "errors" + "log" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" + "github.com/multica-ai/multica/server/internal/daemon" +) + +var daemonCmd = &cobra.Command{ + Use: "daemon", + Short: "Run the local agent runtime daemon", + Long: "Start the daemon process that polls for tasks and executes them using local agent CLIs (Claude, Codex).", + RunE: runDaemon, +} + +func init() { + f := daemonCmd.Flags() + f.String("repos-root", "", "Base directory for task repositories (env: MULTICA_REPOS_ROOT)") + f.String("config-path", "", "Path to daemon config file (env: MULTICA_DAEMON_CONFIG)") + f.String("daemon-id", "", "Unique daemon identifier (env: MULTICA_DAEMON_ID)") + f.String("device-name", "", "Human-readable device name (env: MULTICA_DAEMON_DEVICE_NAME)") + f.String("runtime-name", "", "Runtime display name (env: MULTICA_AGENT_RUNTIME_NAME)") + f.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)") + f.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)") + f.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)") +} + +func runDaemon(cmd *cobra.Command, _ []string) error { + overrides := daemon.Overrides{ + ServerURL: cli.FlagOrEnv(cmd, "server-url", "MULTICA_SERVER_URL", ""), + WorkspaceID: cli.FlagOrEnv(cmd, "workspace-id", "MULTICA_WORKSPACE_ID", ""), + ReposRoot: flagString(cmd, "repos-root"), + ConfigPath: flagString(cmd, "config-path"), + DaemonID: flagString(cmd, "daemon-id"), + DeviceName: flagString(cmd, "device-name"), + RuntimeName: flagString(cmd, "runtime-name"), + } + if d, _ := cmd.Flags().GetDuration("poll-interval"); d > 0 { + overrides.PollInterval = d + } + if d, _ := cmd.Flags().GetDuration("heartbeat-interval"); d > 0 { + overrides.HeartbeatInterval = d + } + if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 { + overrides.AgentTimeout = d + } + + cfg, err := daemon.LoadConfig(overrides) + if err != nil { + return err + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + logger := log.New(os.Stdout, "multica-daemon: ", log.LstdFlags) + d := daemon.New(cfg, logger) + + if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil +} + +func flagString(cmd *cobra.Command, name string) string { + val, _ := cmd.Flags().GetString(name) + return val +} + diff --git a/server/cmd/multica/cmd_runtime.go b/server/cmd/multica/cmd_runtime.go new file mode 100644 index 00000000..a507579f --- /dev/null +++ b/server/cmd/multica/cmd_runtime.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" +) + +var runtimeCmd = &cobra.Command{ + Use: "runtime", + Short: "Manage agent runtimes", +} + +var runtimeListCmd = &cobra.Command{ + Use: "list", + Short: "List agent runtimes", + RunE: runRuntimeList, +} + +func init() { + runtimeCmd.AddCommand(runtimeListCmd) + + runtimeListCmd.Flags().String("output", "table", "Output format: table or json") +} + +func runRuntimeList(cmd *cobra.Command, _ []string) error { + client, err := newAPIClient(cmd) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var runtimes []map[string]any + if err := client.GetJSON(ctx, "/api/runtimes", &runtimes); err != nil { + return fmt.Errorf("list runtimes: %w", err) + } + + output, _ := cmd.Flags().GetString("output") + if output == "json" { + return cli.PrintJSON(os.Stdout, runtimes) + } + + headers := []string{"ID", "NAME", "PROVIDER", "STATUS", "DEVICE"} + rows := make([][]string, 0, len(runtimes)) + for _, r := range runtimes { + rows = append(rows, []string{ + strVal(r, "id"), + strVal(r, "name"), + strVal(r, "provider"), + strVal(r, "status"), + strVal(r, "device_info"), + }) + } + cli.PrintTable(os.Stdout, headers, rows) + return nil +} diff --git a/server/cmd/multica/cmd_status.go b/server/cmd/multica/cmd_status.go new file mode 100644 index 00000000..5f9bb8ab --- /dev/null +++ b/server/cmd/multica/cmd_status.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/spf13/cobra" +) + +var statusCmd = &cobra.Command{ + Use: "status", + Short: "Check server health", + RunE: runStatus, +} + +func runStatus(cmd *cobra.Command, _ []string) error { + client, err := newAPIClient(cmd) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + body, err := client.HealthCheck(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "Server unreachable: %v\n", err) + return err + } + + fmt.Fprintf(os.Stdout, "Server: %s\n", client.BaseURL) + fmt.Fprintf(os.Stdout, "Status: %s\n", body) + return nil +} diff --git a/server/cmd/multica/cmd_version.go b/server/cmd/multica/cmd_version.go new file mode 100644 index 00000000..99c7fb7a --- /dev/null +++ b/server/cmd/multica/cmd_version.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var versionCmd = &cobra.Command{ + Use: "version", + Short: "Print version information", + Run: func(_ *cobra.Command, _ []string) { + fmt.Printf("multica-cli %s (commit: %s)\n", version, commit) + }, +} diff --git a/server/cmd/multica/main.go b/server/cmd/multica/main.go new file mode 100644 index 00000000..1b06c9e8 --- /dev/null +++ b/server/cmd/multica/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "os" + + "github.com/spf13/cobra" +) + +var ( + version = "dev" + commit = "unknown" +) + +var rootCmd = &cobra.Command{ + Use: "multica-cli", + Short: "Multica CLI — local agent runtime and management tool", + Long: "multica-cli manages local agent runtimes and provides control commands for the Multica platform.", + SilenceUsage: true, + SilenceErrors: true, +} + +func init() { + rootCmd.PersistentFlags().String("server-url", "", "Multica server URL (env: MULTICA_SERVER_URL)") + rootCmd.PersistentFlags().String("workspace-id", "", "Workspace ID (env: MULTICA_WORKSPACE_ID)") + + rootCmd.AddCommand(daemonCmd) + rootCmd.AddCommand(agentCmd) + rootCmd.AddCommand(runtimeCmd) + rootCmd.AddCommand(configCmd) + rootCmd.AddCommand(statusCmd) + rootCmd.AddCommand(versionCmd) +} + +func main() { + if err := rootCmd.Execute(); err != nil { + os.Exit(1) + } +} diff --git a/server/go.mod b/server/go.mod index f1f1f05f..2e2aaa76 100644 --- a/server/go.mod +++ b/server/go.mod @@ -5,16 +5,18 @@ go 1.26.1 require ( github.com/go-chi/chi/v5 v5.2.5 github.com/go-chi/cors v1.2.2 + github.com/golang-jwt/jwt/v5 v5.3.1 github.com/gorilla/websocket v1.5.3 + github.com/jackc/pgx/v5 v5.8.0 + github.com/spf13/cobra v1.10.2 ) require ( - github.com/golang-jwt/jwt/v5 v5.3.1 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.8.0 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - golang.org/x/crypto v0.49.0 // indirect + github.com/spf13/pflag v1.0.9 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/text v0.35.0 // indirect ) diff --git a/server/go.sum b/server/go.sum index a1fa3d9a..40d399bf 100644 --- a/server/go.sum +++ b/server/go.sum @@ -1,4 +1,7 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE= @@ -7,6 +10,8 @@ github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63Y github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -15,15 +20,24 @@ github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= -golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/internal/cli/client.go b/server/internal/cli/client.go new file mode 100644 index 00000000..90badbb6 --- /dev/null +++ b/server/internal/cli/client.go @@ -0,0 +1,96 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// APIClient is a REST client for the Multica server API. +// Used by ctrl subcommands (agent, runtime, status, etc.). +type APIClient struct { + BaseURL string + WorkspaceID string + HTTPClient *http.Client +} + +// NewAPIClient creates a new API client for ctrl commands. +func NewAPIClient(baseURL, workspaceID string) *APIClient { + return &APIClient{ + BaseURL: strings.TrimRight(baseURL, "/"), + WorkspaceID: workspaceID, + HTTPClient: &http.Client{Timeout: 15 * time.Second}, + } +} + +// GetJSON performs a GET request and decodes the JSON response. +func (c *APIClient) GetJSON(ctx context.Context, path string, out any) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+path, nil) + if err != nil { + return err + } + if c.WorkspaceID != "" { + req.Header.Set("X-Workspace-ID", c.WorkspaceID) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data))) + } + if out == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(out) +} + +// DeleteJSON performs a DELETE request. +func (c *APIClient) DeleteJSON(ctx context.Context, path string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.BaseURL+path, nil) + if err != nil { + return err + } + if c.WorkspaceID != "" { + req.Header.Set("X-Workspace-ID", c.WorkspaceID) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("DELETE %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data))) + } + return nil +} + +// HealthCheck hits the /health endpoint and returns the response body. +func (c *APIClient) HealthCheck(ctx context.Context) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+"/health", nil) + if err != nil { + return "", err + } + resp, err := c.HTTPClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + if resp.StatusCode >= 400 { + return "", fmt.Errorf("health check returned %d: %s", resp.StatusCode, strings.TrimSpace(string(data))) + } + return strings.TrimSpace(string(data)), nil +} diff --git a/server/internal/cli/config.go b/server/internal/cli/config.go new file mode 100644 index 00000000..a2aa1bb8 --- /dev/null +++ b/server/internal/cli/config.go @@ -0,0 +1,65 @@ +package cli + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" +) + +const defaultCLIConfigPath = ".multica/config.json" + +// CLIConfig holds persistent CLI settings. +type CLIConfig struct { + ServerURL string `json:"server_url,omitempty"` + WorkspaceID string `json:"workspace_id,omitempty"` +} + +// CLIConfigPath returns the default path for the CLI config file. +func CLIConfigPath() (string, error) { + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("resolve CLI config path: %w", err) + } + return filepath.Join(home, defaultCLIConfigPath), nil +} + +// LoadCLIConfig reads the CLI config from disk. +func LoadCLIConfig() (CLIConfig, error) { + path, err := CLIConfigPath() + if err != nil { + return CLIConfig{}, err + } + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return CLIConfig{}, nil + } + return CLIConfig{}, fmt.Errorf("read CLI config: %w", err) + } + var cfg CLIConfig + if err := json.Unmarshal(data, &cfg); err != nil { + return CLIConfig{}, fmt.Errorf("parse CLI config: %w", err) + } + return cfg, nil +} + +// SaveCLIConfig writes the CLI config to disk. +func SaveCLIConfig(cfg CLIConfig) error { + path, err := CLIConfigPath() + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return fmt.Errorf("create CLI config directory: %w", err) + } + data, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("encode CLI config: %w", err) + } + if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil { + return fmt.Errorf("write CLI config: %w", err) + } + return nil +} diff --git a/server/internal/cli/flags.go b/server/internal/cli/flags.go new file mode 100644 index 00000000..c67d92ae --- /dev/null +++ b/server/internal/cli/flags.go @@ -0,0 +1,21 @@ +package cli + +import ( + "os" + "strings" + + "github.com/spf13/cobra" +) + +// FlagOrEnv returns the flag value if set, otherwise the environment variable value, +// otherwise the fallback. +func FlagOrEnv(cmd *cobra.Command, flagName, envKey, fallback string) string { + if cmd.Flags().Changed(flagName) { + val, _ := cmd.Flags().GetString(flagName) + return val + } + if v := strings.TrimSpace(os.Getenv(envKey)); v != "" { + return v + } + return fallback +} diff --git a/server/internal/cli/output.go b/server/internal/cli/output.go new file mode 100644 index 00000000..e403038c --- /dev/null +++ b/server/internal/cli/output.go @@ -0,0 +1,26 @@ +package cli + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "text/tabwriter" +) + +// PrintTable writes a simple table with headers and rows to w. +func PrintTable(w io.Writer, headers []string, rows [][]string) { + tw := tabwriter.NewWriter(w, 0, 4, 2, ' ', 0) + fmt.Fprintln(tw, strings.Join(headers, "\t")) + for _, row := range rows { + fmt.Fprintln(tw, strings.Join(row, "\t")) + } + tw.Flush() +} + +// PrintJSON writes v as indented JSON to w. +func PrintJSON(w io.Writer, v any) error { + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(v) +} diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go new file mode 100644 index 00000000..8f2c58d9 --- /dev/null +++ b/server/internal/daemon/client.go @@ -0,0 +1,141 @@ +package daemon + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// Client handles HTTP communication with the Multica server daemon API. +type Client struct { + baseURL string + client *http.Client +} + +// NewClient creates a new daemon API client. +func NewClient(baseURL string) *Client { + return &Client{ + baseURL: baseURL, + client: &http.Client{Timeout: 30 * time.Second}, + } +} + +func (c *Client) ClaimTask(ctx context.Context, runtimeID string) (*Task, error) { + var resp struct { + Task *Task `json:"task"` + } + if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil { + return nil, err + } + return resp.Task, nil +} + +func (c *Client) CreatePairingSession(ctx context.Context, req map[string]string) (PairingSession, error) { + var resp PairingSession + if err := c.postJSON(ctx, "/api/daemon/pairing-sessions", req, &resp); err != nil { + return PairingSession{}, err + } + return resp, nil +} + +func (c *Client) GetPairingSession(ctx context.Context, token string) (PairingSession, error) { + var resp PairingSession + if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s", url.PathEscape(token)), &resp); err != nil { + return PairingSession{}, err + } + return resp, nil +} + +func (c *Client) ClaimPairingSession(ctx context.Context, token string) (PairingSession, error) { + var resp PairingSession + if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s/claim", url.PathEscape(token)), map[string]any{}, &resp); err != nil { + return PairingSession{}, err + } + return resp, nil +} + +func (c *Client) StartTask(ctx context.Context, taskID string) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil) +} + +func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, step, total int) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{ + "summary": summary, + "step": step, + "total": total, + }, nil) +} + +func (c *Client) CompleteTask(ctx context.Context, taskID, output string) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), map[string]any{ + "output": output, + }, nil) +} + +func (c *Client) FailTask(ctx context.Context, taskID, errMsg string) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{ + "error": errMsg, + }, nil) +} + +func (c *Client) postJSON(ctx context.Context, path string, reqBody any, respBody any) error { + var body io.Reader + if reqBody != nil { + data, err := json.Marshal(reqBody) + if err != nil { + return err + } + body = bytes.NewReader(data) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("%s %s returned %d: %s", http.MethodPost, path, resp.StatusCode, strings.TrimSpace(string(data))) + } + if respBody == nil { + io.Copy(io.Discard, resp.Body) + return nil + } + return json.NewDecoder(resp.Body).Decode(respBody) +} + +func (c *Client) getJSON(ctx context.Context, path string, respBody any) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) + if err != nil { + return err + } + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("%s %s returned %d: %s", http.MethodGet, path, resp.StatusCode, strings.TrimSpace(string(data))) + } + if respBody == nil { + io.Copy(io.Discard, resp.Body) + return nil + } + return json.NewDecoder(resp.Body).Decode(respBody) +} diff --git a/server/internal/daemon/config.go b/server/internal/daemon/config.go new file mode 100644 index 00000000..6c474771 --- /dev/null +++ b/server/internal/daemon/config.go @@ -0,0 +1,254 @@ +package daemon + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +const ( + DefaultServerURL = "ws://localhost:8080/ws" + DefaultDaemonConfigPath = ".multica/daemon.json" + DefaultPollInterval = 3 * time.Second + DefaultHeartbeatInterval = 15 * time.Second + DefaultAgentTimeout = 20 * time.Minute + DefaultRuntimeName = "Local Agent" +) + +// Config holds all daemon configuration. +type Config struct { + ServerBaseURL string + ConfigPath string + WorkspaceID string + DaemonID string + DeviceName string + RuntimeName string + Agents map[string]AgentEntry // "claude" -> entry, "codex" -> entry + ReposRoot string // parent directory containing all repos + PollInterval time.Duration + HeartbeatInterval time.Duration + AgentTimeout time.Duration +} + +// Overrides allows CLI flags to override environment variables and defaults. +// Zero values are ignored and the env/default value is used instead. +type Overrides struct { + ServerURL string + WorkspaceID string + ReposRoot string + ConfigPath string + PollInterval time.Duration + HeartbeatInterval time.Duration + AgentTimeout time.Duration + DaemonID string + DeviceName string + RuntimeName string +} + +// LoadConfig builds the daemon configuration from environment variables, +// persisted config, and optional CLI flag overrides. +func LoadConfig(overrides Overrides) (Config, error) { + // Server URL: override > env > default + rawServerURL := EnvOrDefault("MULTICA_SERVER_URL", DefaultServerURL) + if overrides.ServerURL != "" { + rawServerURL = overrides.ServerURL + } + serverBaseURL, err := NormalizeServerBaseURL(rawServerURL) + if err != nil { + return Config{}, err + } + + // Config path + rawConfigPath := strings.TrimSpace(os.Getenv("MULTICA_DAEMON_CONFIG")) + if overrides.ConfigPath != "" { + rawConfigPath = overrides.ConfigPath + } + configPath, err := resolveDaemonConfigPath(rawConfigPath) + if err != nil { + return Config{}, err + } + + // Load persisted config + persisted, err := LoadPersistedConfig(configPath) + if err != nil { + return Config{}, err + } + + // Workspace ID: override > env > persisted + workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID")) + if workspaceID == "" { + workspaceID = persisted.WorkspaceID + } + if overrides.WorkspaceID != "" { + workspaceID = overrides.WorkspaceID + } + + // Probe available agent CLIs + agents := map[string]AgentEntry{} + claudePath := EnvOrDefault("MULTICA_CLAUDE_PATH", "claude") + if _, err := exec.LookPath(claudePath); err == nil { + agents["claude"] = AgentEntry{ + Path: claudePath, + Model: strings.TrimSpace(os.Getenv("MULTICA_CLAUDE_MODEL")), + } + } + codexPath := EnvOrDefault("MULTICA_CODEX_PATH", "codex") + if _, err := exec.LookPath(codexPath); err == nil { + agents["codex"] = AgentEntry{ + Path: codexPath, + Model: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")), + } + } + if len(agents) == 0 { + return Config{}, fmt.Errorf("no agent CLI found: install claude or codex and ensure it is on PATH") + } + + // Host info + host, err := os.Hostname() + if err != nil || strings.TrimSpace(host) == "" { + host = "local-machine" + } + + // Repos root: override > env > cwd + reposRoot := strings.TrimSpace(os.Getenv("MULTICA_REPOS_ROOT")) + if overrides.ReposRoot != "" { + reposRoot = overrides.ReposRoot + } + if reposRoot == "" { + reposRoot, err = os.Getwd() + if err != nil { + return Config{}, fmt.Errorf("resolve working directory: %w", err) + } + } + reposRoot, err = filepath.Abs(reposRoot) + if err != nil { + return Config{}, fmt.Errorf("resolve absolute repos root: %w", err) + } + + // Durations: override > env > default + pollInterval, err := DurationFromEnv("MULTICA_DAEMON_POLL_INTERVAL", DefaultPollInterval) + if err != nil { + return Config{}, err + } + if overrides.PollInterval > 0 { + pollInterval = overrides.PollInterval + } + + heartbeatInterval, err := DurationFromEnv("MULTICA_DAEMON_HEARTBEAT_INTERVAL", DefaultHeartbeatInterval) + if err != nil { + return Config{}, err + } + if overrides.HeartbeatInterval > 0 { + heartbeatInterval = overrides.HeartbeatInterval + } + + agentTimeout, err := DurationFromEnv("MULTICA_AGENT_TIMEOUT", DefaultAgentTimeout) + if err != nil { + return Config{}, err + } + if overrides.AgentTimeout > 0 { + agentTimeout = overrides.AgentTimeout + } + + // String overrides + daemonID := EnvOrDefault("MULTICA_DAEMON_ID", host) + if overrides.DaemonID != "" { + daemonID = overrides.DaemonID + } + + deviceName := EnvOrDefault("MULTICA_DAEMON_DEVICE_NAME", host) + if overrides.DeviceName != "" { + deviceName = overrides.DeviceName + } + + runtimeName := EnvOrDefault("MULTICA_AGENT_RUNTIME_NAME", DefaultRuntimeName) + if overrides.RuntimeName != "" { + runtimeName = overrides.RuntimeName + } + + return Config{ + ServerBaseURL: serverBaseURL, + ConfigPath: configPath, + WorkspaceID: workspaceID, + DaemonID: daemonID, + DeviceName: deviceName, + RuntimeName: runtimeName, + Agents: agents, + ReposRoot: reposRoot, + PollInterval: pollInterval, + HeartbeatInterval: heartbeatInterval, + AgentTimeout: agentTimeout, + }, nil +} + +// NormalizeServerBaseURL converts a WebSocket or HTTP URL to a base HTTP URL. +func NormalizeServerBaseURL(raw string) (string, error) { + u, err := url.Parse(strings.TrimSpace(raw)) + if err != nil { + return "", fmt.Errorf("invalid MULTICA_SERVER_URL: %w", err) + } + switch u.Scheme { + case "ws": + u.Scheme = "http" + case "wss": + u.Scheme = "https" + case "http", "https": + default: + return "", fmt.Errorf("MULTICA_SERVER_URL must use ws, wss, http, or https") + } + if u.Path == "/ws" { + u.Path = "" + } + u.RawPath = "" + u.RawQuery = "" + u.Fragment = "" + return strings.TrimRight(u.String(), "/"), nil +} + +func resolveDaemonConfigPath(raw string) (string, error) { + if raw != "" { + return filepath.Abs(raw) + } + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("resolve daemon config path: %w", err) + } + return filepath.Join(home, DefaultDaemonConfigPath), nil +} + +// LoadPersistedConfig reads the daemon config from disk. +func LoadPersistedConfig(path string) (PersistedConfig, error) { + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return PersistedConfig{}, nil + } + return PersistedConfig{}, fmt.Errorf("read daemon config: %w", err) + } + var cfg PersistedConfig + if err := json.Unmarshal(data, &cfg); err != nil { + return PersistedConfig{}, fmt.Errorf("parse daemon config: %w", err) + } + return cfg, nil +} + +// SavePersistedConfig writes the daemon config to disk. +func SavePersistedConfig(path string, cfg PersistedConfig) error { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return fmt.Errorf("create daemon config directory: %w", err) + } + data, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("encode daemon config: %w", err) + } + if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil { + return fmt.Errorf("write daemon config: %w", err) + } + return nil +} diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go new file mode 100644 index 00000000..401241fc --- /dev/null +++ b/server/internal/daemon/daemon.go @@ -0,0 +1,325 @@ +package daemon + +import ( + "context" + "fmt" + "log" + "strings" + "time" + + "github.com/multica-ai/multica/server/pkg/agent" +) + +// Daemon is the local agent runtime that polls for and executes tasks. +type Daemon struct { + cfg Config + client *Client + logger *log.Logger +} + +// New creates a new Daemon instance. +func New(cfg Config, logger *log.Logger) *Daemon { + return &Daemon{ + cfg: cfg, + client: NewClient(cfg.ServerBaseURL), + logger: logger, + } +} + +// Run starts the daemon: pairs if needed, registers runtimes, then polls for tasks. +func (d *Daemon) Run(ctx context.Context) error { + agentNames := make([]string, 0, len(d.cfg.Agents)) + for name := range d.cfg.Agents { + agentNames = append(agentNames, name) + } + d.logger.Printf("starting daemon agents=%v workspace=%s server=%s repos_root=%s", + agentNames, d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.ReposRoot) + + if strings.TrimSpace(d.cfg.WorkspaceID) == "" { + workspaceID, err := d.ensurePaired(ctx) + if err != nil { + return err + } + d.cfg.WorkspaceID = workspaceID + d.logger.Printf("pairing completed for workspace=%s", workspaceID) + } + + runtimes, err := d.registerRuntimes(ctx) + if err != nil { + return err + } + runtimeIDs := make([]string, 0, len(runtimes)) + for _, rt := range runtimes { + d.logger.Printf("registered runtime id=%s provider=%s status=%s", rt.ID, rt.Provider, rt.Status) + runtimeIDs = append(runtimeIDs, rt.ID) + } + + go d.heartbeatLoop(ctx, runtimeIDs) + return d.pollLoop(ctx, runtimeIDs) +} + +func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) { + var runtimes []map[string]string + for name, entry := range d.cfg.Agents { + version, err := agent.DetectVersion(ctx, entry.Path) + if err != nil { + d.logger.Printf("skip registering %s: %v", name, err) + continue + } + runtimes = append(runtimes, map[string]string{ + "name": fmt.Sprintf("Local %s", strings.ToUpper(name[:1])+name[1:]), + "type": name, + "version": version, + "status": "online", + }) + } + if len(runtimes) == 0 { + return nil, fmt.Errorf("no agent runtimes could be registered") + } + + req := map[string]any{ + "workspace_id": d.cfg.WorkspaceID, + "daemon_id": d.cfg.DaemonID, + "device_name": d.cfg.DeviceName, + "runtimes": runtimes, + } + + var resp struct { + Runtimes []Runtime `json:"runtimes"` + } + if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil { + return nil, fmt.Errorf("register runtimes: %w", err) + } + if len(resp.Runtimes) == 0 { + return nil, fmt.Errorf("register runtimes: empty response") + } + return resp.Runtimes, nil +} + +func (d *Daemon) ensurePaired(ctx context.Context) (string, error) { + // Use a deterministic agent for the pairing session metadata (prefer codex for backward compat). + var firstName string + var firstEntry AgentEntry + for _, preferred := range []string{"codex", "claude"} { + if entry, ok := d.cfg.Agents[preferred]; ok { + firstName = preferred + firstEntry = entry + break + } + } + version, err := agent.DetectVersion(ctx, firstEntry.Path) + if err != nil { + return "", err + } + + session, err := d.client.CreatePairingSession(ctx, map[string]string{ + "daemon_id": d.cfg.DaemonID, + "device_name": d.cfg.DeviceName, + "runtime_name": d.cfg.RuntimeName, + "runtime_type": firstName, + "runtime_version": version, + }) + if err != nil { + return "", fmt.Errorf("create pairing session: %w", err) + } + if session.LinkURL != nil { + d.logger.Printf("open this link to pair the daemon: %s", *session.LinkURL) + } else { + d.logger.Printf("pairing session created: %s", session.Token) + } + + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + current, err := d.client.GetPairingSession(ctx, session.Token) + if err != nil { + return "", fmt.Errorf("poll pairing session: %w", err) + } + + switch current.Status { + case "approved", "claimed": + if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" { + return "", fmt.Errorf("pairing session approved without workspace") + } + if err := SavePersistedConfig(d.cfg.ConfigPath, PersistedConfig{ + WorkspaceID: strings.TrimSpace(*current.WorkspaceID), + }); err != nil { + return "", err + } + if current.Status != "claimed" { + if _, err := d.client.ClaimPairingSession(ctx, current.Token); err != nil { + return "", fmt.Errorf("claim pairing session: %w", err) + } + } + return strings.TrimSpace(*current.WorkspaceID), nil + case "expired": + return "", fmt.Errorf("pairing session expired before approval") + } + + if err := SleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return "", err + } + } +} + +func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { + ticker := time.NewTicker(d.cfg.HeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, rid := range runtimeIDs { + err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ + "runtime_id": rid, + }, nil) + if err != nil { + d.logger.Printf("heartbeat failed for runtime %s: %v", rid, err) + } + } + } + } +} + +func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error { + pollOffset := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + claimed := false + n := len(runtimeIDs) + for i := 0; i < n; i++ { + rid := runtimeIDs[(pollOffset+i)%n] + task, err := d.client.ClaimTask(ctx, rid) + if err != nil { + d.logger.Printf("claim task failed for runtime %s: %v", rid, err) + continue + } + if task != nil { + d.logger.Printf("poll: got task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title) + d.handleTask(ctx, *task) + claimed = true + pollOffset = (pollOffset + i + 1) % n + break + } + } + + if !claimed { + pollOffset = (pollOffset + 1) % n + if err := SleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return err + } + } + } +} + +func (d *Daemon) handleTask(ctx context.Context, task Task) { + provider := task.Context.Runtime.Provider + d.logger.Printf("picked task=%s issue=%s provider=%s title=%q", task.ID, task.IssueID, provider, task.Context.Issue.Title) + + if err := d.client.StartTask(ctx, task.ID); err != nil { + d.logger.Printf("start task %s failed: %v", task.ID, err) + return + } + + _ = d.client.ReportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2) + + result, err := d.runTask(ctx, task) + if err != nil { + d.logger.Printf("task %s failed: %v", task.ID, err) + if failErr := d.client.FailTask(ctx, task.ID, err.Error()); failErr != nil { + d.logger.Printf("fail task %s callback failed: %v", task.ID, failErr) + } + return + } + + _ = d.client.ReportProgress(ctx, task.ID, "Finishing task", 2, 2) + + switch result.Status { + case "blocked": + if err := d.client.FailTask(ctx, task.ID, result.Comment); err != nil { + d.logger.Printf("report blocked task %s failed: %v", task.ID, err) + } + default: + if err := d.client.CompleteTask(ctx, task.ID, result.Comment); err != nil { + d.logger.Printf("complete task %s failed: %v", task.ID, err) + } + } +} + +func (d *Daemon) runTask(ctx context.Context, task Task) (TaskResult, error) { + provider := task.Context.Runtime.Provider + entry, ok := d.cfg.Agents[provider] + if !ok { + return TaskResult{}, fmt.Errorf("no agent configured for provider %q", provider) + } + + workdir, err := ResolveTaskWorkdir(d.cfg.ReposRoot, task.Context.Issue.Repository) + if err != nil { + return TaskResult{}, err + } + + prompt := BuildPrompt(task, workdir) + + backend, err := agent.New(provider, agent.Config{ + ExecutablePath: entry.Path, + Logger: d.logger, + }) + if err != nil { + return TaskResult{}, fmt.Errorf("create agent backend: %w", err) + } + + d.logger.Printf( + "starting %s task=%s workdir=%s model=%s timeout=%s", + provider, task.ID, workdir, entry.Model, d.cfg.AgentTimeout, + ) + + session, err := backend.Execute(ctx, prompt, agent.ExecOptions{ + Cwd: workdir, + Model: entry.Model, + Timeout: d.cfg.AgentTimeout, + }) + if err != nil { + return TaskResult{}, err + } + + // Drain message channel (log tool uses, ignore text since Result has output) + go func() { + for msg := range session.Messages { + switch msg.Type { + case agent.MessageToolUse: + d.logger.Printf("[%s] tool-use: %s (call=%s)", provider, msg.Tool, msg.CallID) + case agent.MessageError: + d.logger.Printf("[%s] error: %s", provider, msg.Content) + } + } + }() + + result := <-session.Result + + switch result.Status { + case "completed": + if result.Output == "" { + return TaskResult{}, fmt.Errorf("%s returned empty output", provider) + } + return TaskResult{Status: "completed", Comment: result.Output}, nil + case "timeout": + return TaskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout) + default: + errMsg := result.Error + if errMsg == "" { + errMsg = fmt.Sprintf("%s execution %s", provider, result.Status) + } + return TaskResult{Status: "blocked", Comment: errMsg}, nil + } +} diff --git a/server/cmd/daemon/daemon_test.go b/server/internal/daemon/daemon_test.go similarity index 75% rename from server/cmd/daemon/daemon_test.go rename to server/internal/daemon/daemon_test.go index 42efebc4..bca2b976 100644 --- a/server/cmd/daemon/daemon_test.go +++ b/server/internal/daemon/daemon_test.go @@ -1,4 +1,4 @@ -package main +package daemon import ( "os" @@ -10,9 +10,9 @@ import ( func TestNormalizeServerBaseURL(t *testing.T) { t.Parallel() - got, err := normalizeServerBaseURL("ws://localhost:8080/ws") + got, err := NormalizeServerBaseURL("ws://localhost:8080/ws") if err != nil { - t.Fatalf("normalizeServerBaseURL returned error: %v", err) + t.Fatalf("NormalizeServerBaseURL returned error: %v", err) } if got != "http://localhost:8080" { t.Fatalf("expected http://localhost:8080, got %s", got) @@ -28,9 +28,9 @@ func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) { t.Fatalf("mkdir repo: %v", err) } - got, err := resolveTaskWorkdir(root, &daemonRepoRef{Path: "repo"}) + got, err := ResolveTaskWorkdir(root, &RepoRef{Path: "repo"}) if err != nil { - t.Fatalf("resolveTaskWorkdir returned error: %v", err) + t.Fatalf("ResolveTaskWorkdir returned error: %v", err) } if got != repoPath { t.Fatalf("expected %s, got %s", repoPath, got) @@ -40,15 +40,15 @@ func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) { func TestBuildPromptIncludesIssueAndSkills(t *testing.T) { t.Parallel() - prompt := buildPrompt(daemonTask{ - Context: daemonTaskContext{ - Issue: daemonIssueContext{ + prompt := BuildPrompt(Task{ + Context: TaskContext{ + Issue: IssueContext{ Title: "Fix failing test", Description: "Investigate and fix the test failure.", AcceptanceCriteria: []string{"tests pass"}, ContextRefs: []string{"log snippet"}, }, - Agent: daemonAgentContext{ + Agent: AgentContext{ Name: "Local Codex", Skills: "Be concise.", }, diff --git a/server/internal/daemon/helpers.go b/server/internal/daemon/helpers.go new file mode 100644 index 00000000..75bb2c24 --- /dev/null +++ b/server/internal/daemon/helpers.go @@ -0,0 +1,46 @@ +package daemon + +import ( + "context" + "fmt" + "os" + "strings" + "time" +) + +// EnvOrDefault returns the trimmed value of the environment variable key, +// falling back to fallback if empty. +func EnvOrDefault(key, fallback string) string { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + return value +} + +// DurationFromEnv parses a duration from an environment variable, +// returning fallback if the variable is empty. +func DurationFromEnv(key string, fallback time.Duration) (time.Duration, error) { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback, nil + } + d, err := time.ParseDuration(value) + if err != nil { + return 0, fmt.Errorf("%s: invalid duration %q: %w", key, value, err) + } + return d, nil +} + +// SleepWithContext blocks for the given duration or until the context is cancelled. +func SleepWithContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} diff --git a/server/internal/daemon/prompt.go b/server/internal/daemon/prompt.go new file mode 100644 index 00000000..0c9ba155 --- /dev/null +++ b/server/internal/daemon/prompt.go @@ -0,0 +1,93 @@ +package daemon + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +// BuildPrompt constructs the task prompt for an agent CLI. +func BuildPrompt(task Task, workdir string) string { + var b strings.Builder + b.WriteString("You are running as a local coding agent for a Multica workspace.\n") + b.WriteString("Complete the assigned issue using the local environment.\n") + b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n") + b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n") + + fmt.Fprintf(&b, "Working directory: %s\n", workdir) + fmt.Fprintf(&b, "Agent: %s\n", task.Context.Agent.Name) + fmt.Fprintf(&b, "Issue title: %s\n\n", task.Context.Issue.Title) + + if task.Context.Issue.Description != "" { + b.WriteString("Issue description:\n") + b.WriteString(task.Context.Issue.Description) + b.WriteString("\n\n") + } + + if len(task.Context.Issue.AcceptanceCriteria) > 0 { + b.WriteString("Acceptance criteria:\n") + for _, item := range task.Context.Issue.AcceptanceCriteria { + fmt.Fprintf(&b, "- %s\n", item) + } + b.WriteString("\n") + } + + if len(task.Context.Issue.ContextRefs) > 0 { + b.WriteString("Context refs:\n") + for _, item := range task.Context.Issue.ContextRefs { + fmt.Fprintf(&b, "- %s\n", item) + } + b.WriteString("\n") + } + + if repo := task.Context.Issue.Repository; repo != nil { + b.WriteString("Repository context:\n") + if repo.URL != "" { + fmt.Fprintf(&b, "- url: %s\n", repo.URL) + } + if repo.Branch != "" { + fmt.Fprintf(&b, "- branch: %s\n", repo.Branch) + } + if repo.Path != "" { + fmt.Fprintf(&b, "- path: %s\n", repo.Path) + } + b.WriteString("\n") + } + + if task.Context.Agent.Skills != "" { + b.WriteString("Agent skills/instructions:\n") + b.WriteString(task.Context.Agent.Skills) + b.WriteString("\n\n") + } + + b.WriteString("Comment requirements:\n") + b.WriteString("- Lead with the outcome.\n") + b.WriteString("- Mention concrete files or commands if you changed anything.\n") + b.WriteString("- Mention blockers or follow-up actions if relevant.\n") + + return b.String() +} + +// ResolveTaskWorkdir determines the working directory for a task. +func ResolveTaskWorkdir(reposRoot string, repo *RepoRef) (string, error) { + base := reposRoot + if repo == nil || strings.TrimSpace(repo.Path) == "" { + return base, nil + } + + path := strings.TrimSpace(repo.Path) + if !filepath.IsAbs(path) { + path = filepath.Join(base, path) + } + path = filepath.Clean(path) + + info, err := os.Stat(path) + if err != nil { + return "", fmt.Errorf("repository path not found: %s", path) + } + if !info.IsDir() { + return "", fmt.Errorf("repository path is not a directory: %s", path) + } + return path, nil +} diff --git a/server/internal/daemon/types.go b/server/internal/daemon/types.go new file mode 100644 index 00000000..785edef1 --- /dev/null +++ b/server/internal/daemon/types.go @@ -0,0 +1,89 @@ +package daemon + +// AgentEntry describes a single available agent CLI. +type AgentEntry struct { + Path string // path to CLI binary + Model string // model override (optional) +} + +// Runtime represents a registered daemon runtime. +type Runtime struct { + ID string `json:"id"` + Name string `json:"name"` + Provider string `json:"provider"` + Status string `json:"status"` +} + +// PairingSession represents a daemon pairing session. +type PairingSession struct { + Token string `json:"token"` + DaemonID string `json:"daemon_id"` + DeviceName string `json:"device_name"` + RuntimeName string `json:"runtime_name"` + RuntimeType string `json:"runtime_type"` + RuntimeVersion string `json:"runtime_version"` + WorkspaceID *string `json:"workspace_id"` + Status string `json:"status"` + ApprovedAt *string `json:"approved_at"` + ClaimedAt *string `json:"claimed_at"` + ExpiresAt string `json:"expires_at"` + LinkURL *string `json:"link_url"` +} + +// PersistedConfig is the JSON structure saved to ~/.multica/daemon.json. +type PersistedConfig struct { + WorkspaceID string `json:"workspace_id"` +} + +// Task represents a claimed task from the server. +type Task struct { + ID string `json:"id"` + AgentID string `json:"agent_id"` + IssueID string `json:"issue_id"` + Context TaskContext `json:"context"` +} + +// TaskContext contains the snapshot context for a task. +type TaskContext struct { + Issue IssueContext `json:"issue"` + Agent AgentContext `json:"agent"` + Runtime RuntimeContext `json:"runtime"` +} + +// IssueContext holds issue details for task execution. +type IssueContext struct { + ID string `json:"id"` + Title string `json:"title"` + Description string `json:"description"` + AcceptanceCriteria []string `json:"acceptance_criteria"` + ContextRefs []string `json:"context_refs"` + Repository *RepoRef `json:"repository"` +} + +// AgentContext holds agent details for task execution. +type AgentContext struct { + ID string `json:"id"` + Name string `json:"name"` + Skills string `json:"skills"` +} + +// RuntimeContext holds runtime details for task execution. +type RuntimeContext struct { + ID string `json:"id"` + Name string `json:"name"` + Provider string `json:"provider"` + DeviceInfo string `json:"device_info"` +} + +// RepoRef points to a repository for an issue. +type RepoRef struct { + URL string `json:"url"` + Branch string `json:"branch"` + Path string `json:"path"` +} + +// TaskResult is the outcome of executing a task. +type TaskResult struct { + Status string `json:"status"` + Comment string `json:"comment"` +}