refactor(cli): unify daemon into multica-cli binary with cobra subcommands

Extract daemon logic from cmd/daemon/ into internal/daemon/ package and
create a new unified CLI entry point at cmd/multica/ using cobra. The CLI
supports `daemon` as a long-running subcommand plus ctrl subcommands for
agent/runtime management, config, status, and version.

Server, migrate, and seed binaries remain unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
yushen 2026-03-24 15:44:49 +08:00
parent 41b9698dbf
commit 707b5ac6e7
24 changed files with 1673 additions and 860 deletions

1
.gitignore vendored
View file

@ -24,6 +24,7 @@ server/bin/
server/tmp/
server/migrate
server/daemon
server/multica-cli
# Test artifacts
test-results/

View file

@ -1,4 +1,4 @@
.PHONY: dev daemon build test migrate-up migrate-down sqlc seed clean setup start stop check worktree-env setup-main start-main stop-main check-main setup-worktree start-worktree stop-worktree check-worktree
.PHONY: dev daemon cli build test migrate-up migrate-down sqlc seed clean setup start stop check worktree-env setup-main start-main stop-main check-main setup-worktree start-worktree stop-worktree check-worktree
MAIN_ENV_FILE ?= .env
WORKTREE_ENV_FILE ?= .env.worktree
@ -113,11 +113,14 @@ dev:
cd server && go run ./cmd/server
daemon:
cd server && MULTICA_REPOS_ROOT="${MULTICA_REPOS_ROOT:-$(abspath .)}" go run ./cmd/daemon
cd server && MULTICA_REPOS_ROOT="${MULTICA_REPOS_ROOT:-$(abspath .)}" go run ./cmd/multica daemon
cli:
cd server && go run ./cmd/multica $(ARGS)
build:
cd server && go build -o bin/server ./cmd/server
cd server && go build -o bin/daemon ./cmd/daemon
cd server && go build -o bin/multica-cli ./cmd/multica
test:
cd server && go test ./...

View file

@ -1,816 +0,0 @@
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/multica-ai/multica/server/pkg/agent"
)
const (
defaultServerURL = "ws://localhost:8080/ws"
defaultDaemonConfigPath = ".multica/daemon.json"
defaultPollInterval = 3 * time.Second
defaultHeartbeatInterval = 15 * time.Second
defaultAgentTimeout = 20 * time.Minute
defaultRuntimeName = "Local Agent"
)
// agentEntry describes a single available agent CLI.
type agentEntry struct {
Path string // path to CLI binary
Model string // model override (optional)
}
type config struct {
ServerBaseURL string
ConfigPath string
WorkspaceID string
DaemonID string
DeviceName string
RuntimeName string
Agents map[string]agentEntry // "claude" -> entry, "codex" -> entry
ReposRoot string // parent directory containing all repos
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
}
type daemon struct {
cfg config
client *daemonClient
logger *log.Logger
}
type daemonClient struct {
baseURL string
client *http.Client
}
type daemonRuntime struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
Status string `json:"status"`
}
type daemonPairingSession struct {
Token string `json:"token"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
WorkspaceID *string `json:"workspace_id"`
Status string `json:"status"`
ApprovedAt *string `json:"approved_at"`
ClaimedAt *string `json:"claimed_at"`
ExpiresAt string `json:"expires_at"`
LinkURL *string `json:"link_url"`
}
type daemonPersistedConfig struct {
WorkspaceID string `json:"workspace_id"`
}
type daemonTask struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
IssueID string `json:"issue_id"`
Context daemonTaskContext `json:"context"`
}
type daemonTaskContext struct {
Issue daemonIssueContext `json:"issue"`
Agent daemonAgentContext `json:"agent"`
Runtime daemonRuntimeContext `json:"runtime"`
}
type daemonIssueContext struct {
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
AcceptanceCriteria []string `json:"acceptance_criteria"`
ContextRefs []string `json:"context_refs"`
Repository *daemonRepoRef `json:"repository"`
}
type daemonAgentContext struct {
ID string `json:"id"`
Name string `json:"name"`
Skills string `json:"skills"`
}
type daemonRuntimeContext struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
DeviceInfo string `json:"device_info"`
}
type daemonRepoRef struct {
URL string `json:"url"`
Branch string `json:"branch"`
Path string `json:"path"`
}
type taskResult struct {
Status string `json:"status"`
Comment string `json:"comment"`
}
func loadConfig() (config, error) {
serverBaseURL, err := normalizeServerBaseURL(envOrDefault("MULTICA_SERVER_URL", defaultServerURL))
if err != nil {
return config{}, err
}
configPath, err := resolveDaemonConfigPath(strings.TrimSpace(os.Getenv("MULTICA_DAEMON_CONFIG")))
if err != nil {
return config{}, err
}
persisted, err := loadPersistedDaemonConfig(configPath)
if err != nil {
return config{}, err
}
workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID"))
if workspaceID == "" {
workspaceID = persisted.WorkspaceID
}
// Probe available agent CLIs.
agents := map[string]agentEntry{}
claudePath := envOrDefault("MULTICA_CLAUDE_PATH", "claude")
if _, err := exec.LookPath(claudePath); err == nil {
agents["claude"] = agentEntry{
Path: claudePath,
Model: strings.TrimSpace(os.Getenv("MULTICA_CLAUDE_MODEL")),
}
}
codexPath := envOrDefault("MULTICA_CODEX_PATH", "codex")
if _, err := exec.LookPath(codexPath); err == nil {
agents["codex"] = agentEntry{
Path: codexPath,
Model: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")),
}
}
if len(agents) == 0 {
return config{}, fmt.Errorf("no agent CLI found: install claude or codex and ensure it is on PATH")
}
host, err := os.Hostname()
if err != nil || strings.TrimSpace(host) == "" {
host = "local-machine"
}
reposRoot := strings.TrimSpace(os.Getenv("MULTICA_REPOS_ROOT"))
if reposRoot == "" {
reposRoot, err = os.Getwd()
if err != nil {
return config{}, fmt.Errorf("resolve working directory: %w", err)
}
}
reposRoot, err = filepath.Abs(reposRoot)
if err != nil {
return config{}, fmt.Errorf("resolve absolute repos root: %w", err)
}
pollInterval, err := durationFromEnv("MULTICA_DAEMON_POLL_INTERVAL", defaultPollInterval)
if err != nil {
return config{}, err
}
heartbeatInterval, err := durationFromEnv("MULTICA_DAEMON_HEARTBEAT_INTERVAL", defaultHeartbeatInterval)
if err != nil {
return config{}, err
}
agentTimeout, err := durationFromEnv("MULTICA_AGENT_TIMEOUT", defaultAgentTimeout)
if err != nil {
return config{}, err
}
return config{
ServerBaseURL: serverBaseURL,
ConfigPath: configPath,
WorkspaceID: workspaceID,
DaemonID: envOrDefault("MULTICA_DAEMON_ID", host),
DeviceName: envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host),
RuntimeName: envOrDefault("MULTICA_AGENT_RUNTIME_NAME", defaultRuntimeName),
Agents: agents,
ReposRoot: reposRoot,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
AgentTimeout: agentTimeout,
}, nil
}
func newDaemon(cfg config, logger *log.Logger) *daemon {
return &daemon{
cfg: cfg,
client: &daemonClient{baseURL: cfg.ServerBaseURL, client: &http.Client{Timeout: 30 * time.Second}},
logger: logger,
}
}
func (d *daemon) run(ctx context.Context) error {
agentNames := make([]string, 0, len(d.cfg.Agents))
for name := range d.cfg.Agents {
agentNames = append(agentNames, name)
}
d.logger.Printf("starting daemon agents=%v workspace=%s server=%s repos_root=%s",
agentNames, d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.ReposRoot)
if strings.TrimSpace(d.cfg.WorkspaceID) == "" {
workspaceID, err := d.ensurePaired(ctx)
if err != nil {
return err
}
d.cfg.WorkspaceID = workspaceID
d.logger.Printf("pairing completed for workspace=%s", workspaceID)
}
runtimes, err := d.registerRuntimes(ctx)
if err != nil {
return err
}
runtimeIDs := make([]string, 0, len(runtimes))
for _, rt := range runtimes {
d.logger.Printf("registered runtime id=%s provider=%s status=%s", rt.ID, rt.Provider, rt.Status)
runtimeIDs = append(runtimeIDs, rt.ID)
}
go d.heartbeatLoop(ctx, runtimeIDs)
return d.pollLoop(ctx, runtimeIDs)
}
func (d *daemon) registerRuntimes(ctx context.Context) ([]daemonRuntime, error) {
var runtimes []map[string]string
for name, entry := range d.cfg.Agents {
version, err := agent.DetectVersion(ctx, entry.Path)
if err != nil {
d.logger.Printf("skip registering %s: %v", name, err)
continue
}
runtimes = append(runtimes, map[string]string{
"name": fmt.Sprintf("Local %s", strings.ToUpper(name[:1])+name[1:]),
"type": name,
"version": version,
"status": "online",
})
}
if len(runtimes) == 0 {
return nil, fmt.Errorf("no agent runtimes could be registered")
}
req := map[string]any{
"workspace_id": d.cfg.WorkspaceID,
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtimes": runtimes,
}
var resp struct {
Runtimes []daemonRuntime `json:"runtimes"`
}
if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil {
return nil, fmt.Errorf("register runtimes: %w", err)
}
if len(resp.Runtimes) == 0 {
return nil, fmt.Errorf("register runtimes: empty response")
}
return resp.Runtimes, nil
}
func (d *daemon) ensurePaired(ctx context.Context) (string, error) {
// Use a deterministic agent for the pairing session metadata (prefer codex for backward compat).
var firstName string
var firstEntry agentEntry
for _, preferred := range []string{"codex", "claude"} {
if entry, ok := d.cfg.Agents[preferred]; ok {
firstName = preferred
firstEntry = entry
break
}
}
version, err := agent.DetectVersion(ctx, firstEntry.Path)
if err != nil {
return "", err
}
session, err := d.client.createPairingSession(ctx, map[string]string{
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtime_name": d.cfg.RuntimeName,
"runtime_type": firstName,
"runtime_version": version,
})
if err != nil {
return "", fmt.Errorf("create pairing session: %w", err)
}
if session.LinkURL != nil {
d.logger.Printf("open this link to pair the daemon: %s", *session.LinkURL)
} else {
d.logger.Printf("pairing session created: %s", session.Token)
}
for {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
current, err := d.client.getPairingSession(ctx, session.Token)
if err != nil {
return "", fmt.Errorf("poll pairing session: %w", err)
}
switch current.Status {
case "approved", "claimed":
if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" {
return "", fmt.Errorf("pairing session approved without workspace")
}
if err := savePersistedDaemonConfig(d.cfg.ConfigPath, daemonPersistedConfig{
WorkspaceID: strings.TrimSpace(*current.WorkspaceID),
}); err != nil {
return "", err
}
if current.Status != "claimed" {
if _, err := d.client.claimPairingSession(ctx, current.Token); err != nil {
return "", fmt.Errorf("claim pairing session: %w", err)
}
}
return strings.TrimSpace(*current.WorkspaceID), nil
case "expired":
return "", fmt.Errorf("pairing session expired before approval")
}
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return "", err
}
}
}
func (d *daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) {
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, rid := range runtimeIDs {
err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": rid,
}, nil)
if err != nil {
d.logger.Printf("heartbeat failed for runtime %s: %v", rid, err)
}
}
}
}
}
func (d *daemon) pollLoop(ctx context.Context, runtimeIDs []string) error {
pollOffset := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
claimed := false
n := len(runtimeIDs)
for i := 0; i < n; i++ {
rid := runtimeIDs[(pollOffset+i)%n]
task, err := d.client.claimTask(ctx, rid)
if err != nil {
d.logger.Printf("claim task failed for runtime %s: %v", rid, err)
continue
}
if task != nil {
d.logger.Printf("poll: got task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title)
d.handleTask(ctx, *task)
claimed = true
pollOffset = (pollOffset + i + 1) % n
break
}
}
if !claimed {
pollOffset = (pollOffset + 1) % n
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
}
}
}
}
func (d *daemon) handleTask(ctx context.Context, task daemonTask) {
provider := task.Context.Runtime.Provider
d.logger.Printf("picked task=%s issue=%s provider=%s title=%q", task.ID, task.IssueID, provider, task.Context.Issue.Title)
if err := d.client.startTask(ctx, task.ID); err != nil {
d.logger.Printf("start task %s failed: %v", task.ID, err)
return
}
_ = d.client.reportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2)
result, err := d.runTask(ctx, task)
if err != nil {
d.logger.Printf("task %s failed: %v", task.ID, err)
if failErr := d.client.failTask(ctx, task.ID, err.Error()); failErr != nil {
d.logger.Printf("fail task %s callback failed: %v", task.ID, failErr)
}
return
}
_ = d.client.reportProgress(ctx, task.ID, "Finishing task", 2, 2)
switch result.Status {
case "blocked":
if err := d.client.failTask(ctx, task.ID, result.Comment); err != nil {
d.logger.Printf("report blocked task %s failed: %v", task.ID, err)
}
default:
if err := d.client.completeTask(ctx, task.ID, result.Comment); err != nil {
d.logger.Printf("complete task %s failed: %v", task.ID, err)
}
}
}
func (d *daemon) runTask(ctx context.Context, task daemonTask) (taskResult, error) {
provider := task.Context.Runtime.Provider
entry, ok := d.cfg.Agents[provider]
if !ok {
return taskResult{}, fmt.Errorf("no agent configured for provider %q", provider)
}
workdir, err := resolveTaskWorkdir(d.cfg.ReposRoot, task.Context.Issue.Repository)
if err != nil {
return taskResult{}, err
}
prompt := buildPrompt(task, workdir)
backend, err := agent.New(provider, agent.Config{
ExecutablePath: entry.Path,
Logger: d.logger,
})
if err != nil {
return taskResult{}, fmt.Errorf("create agent backend: %w", err)
}
d.logger.Printf(
"starting %s task=%s workdir=%s model=%s timeout=%s",
provider, task.ID, workdir, entry.Model, d.cfg.AgentTimeout,
)
session, err := backend.Execute(ctx, prompt, agent.ExecOptions{
Cwd: workdir,
Model: entry.Model,
Timeout: d.cfg.AgentTimeout,
})
if err != nil {
return taskResult{}, err
}
// Drain message channel (log tool uses, ignore text since Result has output)
go func() {
for msg := range session.Messages {
switch msg.Type {
case agent.MessageToolUse:
d.logger.Printf("[%s] tool-use: %s (call=%s)", provider, msg.Tool, msg.CallID)
case agent.MessageError:
d.logger.Printf("[%s] error: %s", provider, msg.Content)
}
}
}()
result := <-session.Result
switch result.Status {
case "completed":
if result.Output == "" {
return taskResult{}, fmt.Errorf("%s returned empty output", provider)
}
return taskResult{Status: "completed", Comment: result.Output}, nil
case "timeout":
return taskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout)
default:
errMsg := result.Error
if errMsg == "" {
errMsg = fmt.Sprintf("%s execution %s", provider, result.Status)
}
return taskResult{Status: "blocked", Comment: errMsg}, nil
}
}
func buildPrompt(task daemonTask, workdir string) string {
var b strings.Builder
b.WriteString("You are running as a local coding agent for a Multica workspace.\n")
b.WriteString("Complete the assigned issue using the local environment.\n")
b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n")
b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n")
fmt.Fprintf(&b, "Working directory: %s\n", workdir)
fmt.Fprintf(&b, "Agent: %s\n", task.Context.Agent.Name)
fmt.Fprintf(&b, "Issue title: %s\n\n", task.Context.Issue.Title)
if task.Context.Issue.Description != "" {
b.WriteString("Issue description:\n")
b.WriteString(task.Context.Issue.Description)
b.WriteString("\n\n")
}
if len(task.Context.Issue.AcceptanceCriteria) > 0 {
b.WriteString("Acceptance criteria:\n")
for _, item := range task.Context.Issue.AcceptanceCriteria {
fmt.Fprintf(&b, "- %s\n", item)
}
b.WriteString("\n")
}
if len(task.Context.Issue.ContextRefs) > 0 {
b.WriteString("Context refs:\n")
for _, item := range task.Context.Issue.ContextRefs {
fmt.Fprintf(&b, "- %s\n", item)
}
b.WriteString("\n")
}
if repo := task.Context.Issue.Repository; repo != nil {
b.WriteString("Repository context:\n")
if repo.URL != "" {
fmt.Fprintf(&b, "- url: %s\n", repo.URL)
}
if repo.Branch != "" {
fmt.Fprintf(&b, "- branch: %s\n", repo.Branch)
}
if repo.Path != "" {
fmt.Fprintf(&b, "- path: %s\n", repo.Path)
}
b.WriteString("\n")
}
if task.Context.Agent.Skills != "" {
b.WriteString("Agent skills/instructions:\n")
b.WriteString(task.Context.Agent.Skills)
b.WriteString("\n\n")
}
b.WriteString("Comment requirements:\n")
b.WriteString("- Lead with the outcome.\n")
b.WriteString("- Mention concrete files or commands if you changed anything.\n")
b.WriteString("- Mention blockers or follow-up actions if relevant.\n")
return b.String()
}
func resolveTaskWorkdir(reposRoot string, repo *daemonRepoRef) (string, error) {
base := reposRoot
if repo == nil || strings.TrimSpace(repo.Path) == "" {
return base, nil
}
path := strings.TrimSpace(repo.Path)
if !filepath.IsAbs(path) {
path = filepath.Join(base, path)
}
path = filepath.Clean(path)
info, err := os.Stat(path)
if err != nil {
return "", fmt.Errorf("repository path not found: %s", path)
}
if !info.IsDir() {
return "", fmt.Errorf("repository path is not a directory: %s", path)
}
return path, nil
}
func resolveDaemonConfigPath(raw string) (string, error) {
if raw != "" {
return filepath.Abs(raw)
}
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("resolve daemon config path: %w", err)
}
return filepath.Join(home, defaultDaemonConfigPath), nil
}
func loadPersistedDaemonConfig(path string) (daemonPersistedConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return daemonPersistedConfig{}, nil
}
return daemonPersistedConfig{}, fmt.Errorf("read daemon config: %w", err)
}
var cfg daemonPersistedConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return daemonPersistedConfig{}, fmt.Errorf("parse daemon config: %w", err)
}
return cfg, nil
}
func savePersistedDaemonConfig(path string, cfg daemonPersistedConfig) error {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("create daemon config directory: %w", err)
}
data, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return fmt.Errorf("encode daemon config: %w", err)
}
if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil {
return fmt.Errorf("write daemon config: %w", err)
}
return nil
}
func normalizeServerBaseURL(raw string) (string, error) {
u, err := url.Parse(strings.TrimSpace(raw))
if err != nil {
return "", fmt.Errorf("invalid MULTICA_SERVER_URL: %w", err)
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
case "http", "https":
default:
return "", fmt.Errorf("MULTICA_SERVER_URL must use ws, wss, http, or https")
}
if u.Path == "/ws" {
u.Path = ""
}
u.RawPath = ""
u.RawQuery = ""
u.Fragment = ""
return strings.TrimRight(u.String(), "/"), nil
}
func durationFromEnv(key string, fallback time.Duration) (time.Duration, error) {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback, nil
}
d, err := time.ParseDuration(value)
if err != nil {
return 0, fmt.Errorf("%s: invalid duration %q: %w", key, value, err)
}
return d, nil
}
func envOrDefault(key, fallback string) string {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback
}
return value
}
func sleepWithContext(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func (c *daemonClient) claimTask(ctx context.Context, runtimeID string) (*daemonTask, error) {
var resp struct {
Task *daemonTask `json:"task"`
}
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil {
return nil, err
}
return resp.Task, nil
}
func (c *daemonClient) createPairingSession(ctx context.Context, req map[string]string) (daemonPairingSession, error) {
var resp daemonPairingSession
if err := c.postJSON(ctx, "/api/daemon/pairing-sessions", req, &resp); err != nil {
return daemonPairingSession{}, err
}
return resp, nil
}
func (c *daemonClient) getPairingSession(ctx context.Context, token string) (daemonPairingSession, error) {
var resp daemonPairingSession
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s", url.PathEscape(token)), &resp); err != nil {
return daemonPairingSession{}, err
}
return resp, nil
}
func (c *daemonClient) claimPairingSession(ctx context.Context, token string) (daemonPairingSession, error) {
var resp daemonPairingSession
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s/claim", url.PathEscape(token)), map[string]any{}, &resp); err != nil {
return daemonPairingSession{}, err
}
return resp, nil
}
func (c *daemonClient) startTask(ctx context.Context, taskID string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil)
}
func (c *daemonClient) reportProgress(ctx context.Context, taskID, summary string, step, total int) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{
"summary": summary,
"step": step,
"total": total,
}, nil)
}
func (c *daemonClient) completeTask(ctx context.Context, taskID, output string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), map[string]any{
"output": output,
}, nil)
}
func (c *daemonClient) failTask(ctx context.Context, taskID, errMsg string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{
"error": errMsg,
}, nil)
}
func (c *daemonClient) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
var body io.Reader
if reqBody != nil {
data, err := json.Marshal(reqBody)
if err != nil {
return err
}
body = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("%s %s returned %d: %s", http.MethodPost, path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}
func (c *daemonClient) getJSON(ctx context.Context, path string, respBody any) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("%s %s returned %d: %s", http.MethodGet, path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}

View file

@ -1,27 +0,0 @@
package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"syscall"
)
func main() {
cfg, err := loadConfig()
if err != nil {
log.Fatal(err)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
logger := log.New(os.Stdout, "multica-daemon: ", log.LstdFlags)
d := newDaemon(cfg, logger)
if err := d.run(ctx); err != nil && !errors.Is(err, context.Canceled) {
logger.Fatal(err)
}
}

View file

@ -0,0 +1,172 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
)
var agentCmd = &cobra.Command{
Use: "agent",
Short: "Manage agents",
}
var agentListCmd = &cobra.Command{
Use: "list",
Short: "List agents in the workspace",
RunE: runAgentList,
}
var agentGetCmd = &cobra.Command{
Use: "get <id>",
Short: "Get agent details",
Args: cobra.ExactArgs(1),
RunE: runAgentGet,
}
var agentDeleteCmd = &cobra.Command{
Use: "delete <id>",
Short: "Delete an agent",
Args: cobra.ExactArgs(1),
RunE: runAgentDelete,
}
var agentStopCmd = &cobra.Command{
Use: "stop <id>",
Short: "Stop an agent (set status to offline)",
Args: cobra.ExactArgs(1),
RunE: runAgentStop,
}
func init() {
agentCmd.AddCommand(agentListCmd)
agentCmd.AddCommand(agentGetCmd)
agentCmd.AddCommand(agentDeleteCmd)
agentCmd.AddCommand(agentStopCmd)
agentListCmd.Flags().String("output", "table", "Output format: table or json")
}
func newAPIClient(cmd *cobra.Command) (*cli.APIClient, error) {
serverURL := resolveServerURL(cmd)
workspaceID := resolveWorkspaceID(cmd)
if serverURL == "" {
return nil, fmt.Errorf("server URL not set: use --server-url flag, MULTICA_SERVER_URL env, or 'multica-cli config set server_url <url>'")
}
return cli.NewAPIClient(serverURL, workspaceID), nil
}
func resolveServerURL(cmd *cobra.Command) string {
val := cli.FlagOrEnv(cmd, "server-url", "MULTICA_SERVER_URL", "")
if val != "" {
return val
}
cfg, err := cli.LoadCLIConfig()
if err != nil {
return "http://localhost:8080"
}
if cfg.ServerURL != "" {
return cfg.ServerURL
}
return "http://localhost:8080"
}
func resolveWorkspaceID(cmd *cobra.Command) string {
val := cli.FlagOrEnv(cmd, "workspace-id", "MULTICA_WORKSPACE_ID", "")
if val != "" {
return val
}
cfg, _ := cli.LoadCLIConfig()
return cfg.WorkspaceID
}
func runAgentList(cmd *cobra.Command, _ []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
var agents []map[string]any
path := "/api/agents"
if client.WorkspaceID != "" {
path += "?workspace_id=" + client.WorkspaceID
}
if err := client.GetJSON(ctx, path, &agents); err != nil {
return fmt.Errorf("list agents: %w", err)
}
output, _ := cmd.Flags().GetString("output")
if output == "json" {
return cli.PrintJSON(os.Stdout, agents)
}
headers := []string{"ID", "NAME", "STATUS", "RUNTIME"}
rows := make([][]string, 0, len(agents))
for _, a := range agents {
rows = append(rows, []string{
strVal(a, "id"),
strVal(a, "name"),
strVal(a, "status"),
strVal(a, "runtime_mode"),
})
}
cli.PrintTable(os.Stdout, headers, rows)
return nil
}
func runAgentGet(cmd *cobra.Command, args []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
var agent map[string]any
if err := client.GetJSON(ctx, "/api/agents/"+args[0], &agent); err != nil {
return fmt.Errorf("get agent: %w", err)
}
return cli.PrintJSON(os.Stdout, agent)
}
func runAgentDelete(cmd *cobra.Command, args []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := client.DeleteJSON(ctx, "/api/agents/"+args[0]); err != nil {
return fmt.Errorf("delete agent: %w", err)
}
fmt.Fprintf(os.Stderr, "Agent %s deleted.\n", args[0])
return nil
}
func runAgentStop(cmd *cobra.Command, args []string) error {
// TODO: implement agent stop (PUT /api/agents/{id} with status=offline)
return fmt.Errorf("agent stop is not yet implemented")
}
func strVal(m map[string]any, key string) string {
v, ok := m[key]
if !ok || v == nil {
return ""
}
return fmt.Sprintf("%v", v)
}

View file

@ -0,0 +1,79 @@
package main
import (
"fmt"
"os"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
)
var configCmd = &cobra.Command{
Use: "config",
Short: "Manage CLI configuration",
}
var configShowCmd = &cobra.Command{
Use: "show",
Short: "Show current CLI configuration",
RunE: runConfigShow,
}
var configSetCmd = &cobra.Command{
Use: "set <key> <value>",
Short: "Set a CLI configuration value",
Long: "Supported keys: server_url, workspace_id",
Args: cobra.ExactArgs(2),
RunE: runConfigSet,
}
func init() {
configCmd.AddCommand(configShowCmd)
configCmd.AddCommand(configSetCmd)
}
func runConfigShow(_ *cobra.Command, _ []string) error {
cfg, err := cli.LoadCLIConfig()
if err != nil {
return err
}
path, _ := cli.CLIConfigPath()
fmt.Fprintf(os.Stdout, "Config file: %s\n", path)
fmt.Fprintf(os.Stdout, "server_url: %s\n", valueOrDefault(cfg.ServerURL, "(not set)"))
fmt.Fprintf(os.Stdout, "workspace_id: %s\n", valueOrDefault(cfg.WorkspaceID, "(not set)"))
return nil
}
func runConfigSet(_ *cobra.Command, args []string) error {
key, value := args[0], args[1]
cfg, err := cli.LoadCLIConfig()
if err != nil {
return err
}
switch key {
case "server_url":
cfg.ServerURL = value
case "workspace_id":
cfg.WorkspaceID = value
default:
return fmt.Errorf("unknown config key %q (supported: server_url, workspace_id)", key)
}
if err := cli.SaveCLIConfig(cfg); err != nil {
return err
}
fmt.Fprintf(os.Stderr, "Set %s = %s\n", key, value)
return nil
}
func valueOrDefault(v, fallback string) string {
if v == "" {
return fallback
}
return v
}

View file

@ -0,0 +1,77 @@
package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"syscall"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
"github.com/multica-ai/multica/server/internal/daemon"
)
var daemonCmd = &cobra.Command{
Use: "daemon",
Short: "Run the local agent runtime daemon",
Long: "Start the daemon process that polls for tasks and executes them using local agent CLIs (Claude, Codex).",
RunE: runDaemon,
}
func init() {
f := daemonCmd.Flags()
f.String("repos-root", "", "Base directory for task repositories (env: MULTICA_REPOS_ROOT)")
f.String("config-path", "", "Path to daemon config file (env: MULTICA_DAEMON_CONFIG)")
f.String("daemon-id", "", "Unique daemon identifier (env: MULTICA_DAEMON_ID)")
f.String("device-name", "", "Human-readable device name (env: MULTICA_DAEMON_DEVICE_NAME)")
f.String("runtime-name", "", "Runtime display name (env: MULTICA_AGENT_RUNTIME_NAME)")
f.Duration("poll-interval", 0, "Task poll interval (env: MULTICA_DAEMON_POLL_INTERVAL)")
f.Duration("heartbeat-interval", 0, "Heartbeat interval (env: MULTICA_DAEMON_HEARTBEAT_INTERVAL)")
f.Duration("agent-timeout", 0, "Per-task timeout (env: MULTICA_AGENT_TIMEOUT)")
}
func runDaemon(cmd *cobra.Command, _ []string) error {
overrides := daemon.Overrides{
ServerURL: cli.FlagOrEnv(cmd, "server-url", "MULTICA_SERVER_URL", ""),
WorkspaceID: cli.FlagOrEnv(cmd, "workspace-id", "MULTICA_WORKSPACE_ID", ""),
ReposRoot: flagString(cmd, "repos-root"),
ConfigPath: flagString(cmd, "config-path"),
DaemonID: flagString(cmd, "daemon-id"),
DeviceName: flagString(cmd, "device-name"),
RuntimeName: flagString(cmd, "runtime-name"),
}
if d, _ := cmd.Flags().GetDuration("poll-interval"); d > 0 {
overrides.PollInterval = d
}
if d, _ := cmd.Flags().GetDuration("heartbeat-interval"); d > 0 {
overrides.HeartbeatInterval = d
}
if d, _ := cmd.Flags().GetDuration("agent-timeout"); d > 0 {
overrides.AgentTimeout = d
}
cfg, err := daemon.LoadConfig(overrides)
if err != nil {
return err
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
logger := log.New(os.Stdout, "multica-daemon: ", log.LstdFlags)
d := daemon.New(cfg, logger)
if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
func flagString(cmd *cobra.Command, name string) string {
val, _ := cmd.Flags().GetString(name)
return val
}

View file

@ -0,0 +1,63 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
)
var runtimeCmd = &cobra.Command{
Use: "runtime",
Short: "Manage agent runtimes",
}
var runtimeListCmd = &cobra.Command{
Use: "list",
Short: "List agent runtimes",
RunE: runRuntimeList,
}
func init() {
runtimeCmd.AddCommand(runtimeListCmd)
runtimeListCmd.Flags().String("output", "table", "Output format: table or json")
}
func runRuntimeList(cmd *cobra.Command, _ []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
var runtimes []map[string]any
if err := client.GetJSON(ctx, "/api/runtimes", &runtimes); err != nil {
return fmt.Errorf("list runtimes: %w", err)
}
output, _ := cmd.Flags().GetString("output")
if output == "json" {
return cli.PrintJSON(os.Stdout, runtimes)
}
headers := []string{"ID", "NAME", "PROVIDER", "STATUS", "DEVICE"}
rows := make([][]string, 0, len(runtimes))
for _, r := range runtimes {
rows = append(rows, []string{
strVal(r, "id"),
strVal(r, "name"),
strVal(r, "provider"),
strVal(r, "status"),
strVal(r, "device_info"),
})
}
cli.PrintTable(os.Stdout, headers, rows)
return nil
}

View file

@ -0,0 +1,36 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
)
var statusCmd = &cobra.Command{
Use: "status",
Short: "Check server health",
RunE: runStatus,
}
func runStatus(cmd *cobra.Command, _ []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body, err := client.HealthCheck(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "Server unreachable: %v\n", err)
return err
}
fmt.Fprintf(os.Stdout, "Server: %s\n", client.BaseURL)
fmt.Fprintf(os.Stdout, "Status: %s\n", body)
return nil
}

View file

@ -0,0 +1,15 @@
package main
import (
"fmt"
"github.com/spf13/cobra"
)
var versionCmd = &cobra.Command{
Use: "version",
Short: "Print version information",
Run: func(_ *cobra.Command, _ []string) {
fmt.Printf("multica-cli %s (commit: %s)\n", version, commit)
},
}

View file

@ -0,0 +1,38 @@
package main
import (
"os"
"github.com/spf13/cobra"
)
var (
version = "dev"
commit = "unknown"
)
var rootCmd = &cobra.Command{
Use: "multica-cli",
Short: "Multica CLI — local agent runtime and management tool",
Long: "multica-cli manages local agent runtimes and provides control commands for the Multica platform.",
SilenceUsage: true,
SilenceErrors: true,
}
func init() {
rootCmd.PersistentFlags().String("server-url", "", "Multica server URL (env: MULTICA_SERVER_URL)")
rootCmd.PersistentFlags().String("workspace-id", "", "Workspace ID (env: MULTICA_WORKSPACE_ID)")
rootCmd.AddCommand(daemonCmd)
rootCmd.AddCommand(agentCmd)
rootCmd.AddCommand(runtimeCmd)
rootCmd.AddCommand(configCmd)
rootCmd.AddCommand(statusCmd)
rootCmd.AddCommand(versionCmd)
}
func main() {
if err := rootCmd.Execute(); err != nil {
os.Exit(1)
}
}

View file

@ -5,16 +5,18 @@ go 1.26.1
require (
github.com/go-chi/chi/v5 v5.2.5
github.com/go-chi/cors v1.2.2
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/gorilla/websocket v1.5.3
github.com/jackc/pgx/v5 v5.8.0
github.com/spf13/cobra v1.10.2
)
require (
github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.8.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/crypto v0.49.0 // indirect
github.com/spf13/pflag v1.0.9 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/text v0.35.0 // indirect
)

View file

@ -1,4 +1,7 @@
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE=
@ -7,6 +10,8 @@ github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63Y
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@ -15,15 +20,24 @@ github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View file

@ -0,0 +1,96 @@
package cli
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// APIClient is a REST client for the Multica server API.
// Used by ctrl subcommands (agent, runtime, status, etc.).
type APIClient struct {
BaseURL string
WorkspaceID string
HTTPClient *http.Client
}
// NewAPIClient creates a new API client for ctrl commands.
func NewAPIClient(baseURL, workspaceID string) *APIClient {
return &APIClient{
BaseURL: strings.TrimRight(baseURL, "/"),
WorkspaceID: workspaceID,
HTTPClient: &http.Client{Timeout: 15 * time.Second},
}
}
// GetJSON performs a GET request and decodes the JSON response.
func (c *APIClient) GetJSON(ctx context.Context, path string, out any) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+path, nil)
if err != nil {
return err
}
if c.WorkspaceID != "" {
req.Header.Set("X-Workspace-ID", c.WorkspaceID)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if out == nil {
return nil
}
return json.NewDecoder(resp.Body).Decode(out)
}
// DeleteJSON performs a DELETE request.
func (c *APIClient) DeleteJSON(ctx context.Context, path string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.BaseURL+path, nil)
if err != nil {
return err
}
if c.WorkspaceID != "" {
req.Header.Set("X-Workspace-ID", c.WorkspaceID)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("DELETE %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data)))
}
return nil
}
// HealthCheck hits the /health endpoint and returns the response body.
func (c *APIClient) HealthCheck(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+"/health", nil)
if err != nil {
return "", err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
if resp.StatusCode >= 400 {
return "", fmt.Errorf("health check returned %d: %s", resp.StatusCode, strings.TrimSpace(string(data)))
}
return strings.TrimSpace(string(data)), nil
}

View file

@ -0,0 +1,65 @@
package cli
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
)
const defaultCLIConfigPath = ".multica/config.json"
// CLIConfig holds persistent CLI settings.
type CLIConfig struct {
ServerURL string `json:"server_url,omitempty"`
WorkspaceID string `json:"workspace_id,omitempty"`
}
// CLIConfigPath returns the default path for the CLI config file.
func CLIConfigPath() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("resolve CLI config path: %w", err)
}
return filepath.Join(home, defaultCLIConfigPath), nil
}
// LoadCLIConfig reads the CLI config from disk.
func LoadCLIConfig() (CLIConfig, error) {
path, err := CLIConfigPath()
if err != nil {
return CLIConfig{}, err
}
data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return CLIConfig{}, nil
}
return CLIConfig{}, fmt.Errorf("read CLI config: %w", err)
}
var cfg CLIConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return CLIConfig{}, fmt.Errorf("parse CLI config: %w", err)
}
return cfg, nil
}
// SaveCLIConfig writes the CLI config to disk.
func SaveCLIConfig(cfg CLIConfig) error {
path, err := CLIConfigPath()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("create CLI config directory: %w", err)
}
data, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return fmt.Errorf("encode CLI config: %w", err)
}
if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil {
return fmt.Errorf("write CLI config: %w", err)
}
return nil
}

View file

@ -0,0 +1,21 @@
package cli
import (
"os"
"strings"
"github.com/spf13/cobra"
)
// FlagOrEnv returns the flag value if set, otherwise the environment variable value,
// otherwise the fallback.
func FlagOrEnv(cmd *cobra.Command, flagName, envKey, fallback string) string {
if cmd.Flags().Changed(flagName) {
val, _ := cmd.Flags().GetString(flagName)
return val
}
if v := strings.TrimSpace(os.Getenv(envKey)); v != "" {
return v
}
return fallback
}

View file

@ -0,0 +1,26 @@
package cli
import (
"encoding/json"
"fmt"
"io"
"strings"
"text/tabwriter"
)
// PrintTable writes a simple table with headers and rows to w.
func PrintTable(w io.Writer, headers []string, rows [][]string) {
tw := tabwriter.NewWriter(w, 0, 4, 2, ' ', 0)
fmt.Fprintln(tw, strings.Join(headers, "\t"))
for _, row := range rows {
fmt.Fprintln(tw, strings.Join(row, "\t"))
}
tw.Flush()
}
// PrintJSON writes v as indented JSON to w.
func PrintJSON(w io.Writer, v any) error {
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(v)
}

View file

@ -0,0 +1,141 @@
package daemon
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
// Client handles HTTP communication with the Multica server daemon API.
type Client struct {
baseURL string
client *http.Client
}
// NewClient creates a new daemon API client.
func NewClient(baseURL string) *Client {
return &Client{
baseURL: baseURL,
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (c *Client) ClaimTask(ctx context.Context, runtimeID string) (*Task, error) {
var resp struct {
Task *Task `json:"task"`
}
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil {
return nil, err
}
return resp.Task, nil
}
func (c *Client) CreatePairingSession(ctx context.Context, req map[string]string) (PairingSession, error) {
var resp PairingSession
if err := c.postJSON(ctx, "/api/daemon/pairing-sessions", req, &resp); err != nil {
return PairingSession{}, err
}
return resp, nil
}
func (c *Client) GetPairingSession(ctx context.Context, token string) (PairingSession, error) {
var resp PairingSession
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s", url.PathEscape(token)), &resp); err != nil {
return PairingSession{}, err
}
return resp, nil
}
func (c *Client) ClaimPairingSession(ctx context.Context, token string) (PairingSession, error) {
var resp PairingSession
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s/claim", url.PathEscape(token)), map[string]any{}, &resp); err != nil {
return PairingSession{}, err
}
return resp, nil
}
func (c *Client) StartTask(ctx context.Context, taskID string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil)
}
func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, step, total int) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{
"summary": summary,
"step": step,
"total": total,
}, nil)
}
func (c *Client) CompleteTask(ctx context.Context, taskID, output string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), map[string]any{
"output": output,
}, nil)
}
func (c *Client) FailTask(ctx context.Context, taskID, errMsg string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{
"error": errMsg,
}, nil)
}
func (c *Client) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
var body io.Reader
if reqBody != nil {
data, err := json.Marshal(reqBody)
if err != nil {
return err
}
body = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("%s %s returned %d: %s", http.MethodPost, path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}
func (c *Client) getJSON(ctx context.Context, path string, respBody any) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("%s %s returned %d: %s", http.MethodGet, path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}

View file

@ -0,0 +1,254 @@
package daemon
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
const (
DefaultServerURL = "ws://localhost:8080/ws"
DefaultDaemonConfigPath = ".multica/daemon.json"
DefaultPollInterval = 3 * time.Second
DefaultHeartbeatInterval = 15 * time.Second
DefaultAgentTimeout = 20 * time.Minute
DefaultRuntimeName = "Local Agent"
)
// Config holds all daemon configuration.
type Config struct {
ServerBaseURL string
ConfigPath string
WorkspaceID string
DaemonID string
DeviceName string
RuntimeName string
Agents map[string]AgentEntry // "claude" -> entry, "codex" -> entry
ReposRoot string // parent directory containing all repos
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
}
// Overrides allows CLI flags to override environment variables and defaults.
// Zero values are ignored and the env/default value is used instead.
type Overrides struct {
ServerURL string
WorkspaceID string
ReposRoot string
ConfigPath string
PollInterval time.Duration
HeartbeatInterval time.Duration
AgentTimeout time.Duration
DaemonID string
DeviceName string
RuntimeName string
}
// LoadConfig builds the daemon configuration from environment variables,
// persisted config, and optional CLI flag overrides.
func LoadConfig(overrides Overrides) (Config, error) {
// Server URL: override > env > default
rawServerURL := EnvOrDefault("MULTICA_SERVER_URL", DefaultServerURL)
if overrides.ServerURL != "" {
rawServerURL = overrides.ServerURL
}
serverBaseURL, err := NormalizeServerBaseURL(rawServerURL)
if err != nil {
return Config{}, err
}
// Config path
rawConfigPath := strings.TrimSpace(os.Getenv("MULTICA_DAEMON_CONFIG"))
if overrides.ConfigPath != "" {
rawConfigPath = overrides.ConfigPath
}
configPath, err := resolveDaemonConfigPath(rawConfigPath)
if err != nil {
return Config{}, err
}
// Load persisted config
persisted, err := LoadPersistedConfig(configPath)
if err != nil {
return Config{}, err
}
// Workspace ID: override > env > persisted
workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID"))
if workspaceID == "" {
workspaceID = persisted.WorkspaceID
}
if overrides.WorkspaceID != "" {
workspaceID = overrides.WorkspaceID
}
// Probe available agent CLIs
agents := map[string]AgentEntry{}
claudePath := EnvOrDefault("MULTICA_CLAUDE_PATH", "claude")
if _, err := exec.LookPath(claudePath); err == nil {
agents["claude"] = AgentEntry{
Path: claudePath,
Model: strings.TrimSpace(os.Getenv("MULTICA_CLAUDE_MODEL")),
}
}
codexPath := EnvOrDefault("MULTICA_CODEX_PATH", "codex")
if _, err := exec.LookPath(codexPath); err == nil {
agents["codex"] = AgentEntry{
Path: codexPath,
Model: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")),
}
}
if len(agents) == 0 {
return Config{}, fmt.Errorf("no agent CLI found: install claude or codex and ensure it is on PATH")
}
// Host info
host, err := os.Hostname()
if err != nil || strings.TrimSpace(host) == "" {
host = "local-machine"
}
// Repos root: override > env > cwd
reposRoot := strings.TrimSpace(os.Getenv("MULTICA_REPOS_ROOT"))
if overrides.ReposRoot != "" {
reposRoot = overrides.ReposRoot
}
if reposRoot == "" {
reposRoot, err = os.Getwd()
if err != nil {
return Config{}, fmt.Errorf("resolve working directory: %w", err)
}
}
reposRoot, err = filepath.Abs(reposRoot)
if err != nil {
return Config{}, fmt.Errorf("resolve absolute repos root: %w", err)
}
// Durations: override > env > default
pollInterval, err := DurationFromEnv("MULTICA_DAEMON_POLL_INTERVAL", DefaultPollInterval)
if err != nil {
return Config{}, err
}
if overrides.PollInterval > 0 {
pollInterval = overrides.PollInterval
}
heartbeatInterval, err := DurationFromEnv("MULTICA_DAEMON_HEARTBEAT_INTERVAL", DefaultHeartbeatInterval)
if err != nil {
return Config{}, err
}
if overrides.HeartbeatInterval > 0 {
heartbeatInterval = overrides.HeartbeatInterval
}
agentTimeout, err := DurationFromEnv("MULTICA_AGENT_TIMEOUT", DefaultAgentTimeout)
if err != nil {
return Config{}, err
}
if overrides.AgentTimeout > 0 {
agentTimeout = overrides.AgentTimeout
}
// String overrides
daemonID := EnvOrDefault("MULTICA_DAEMON_ID", host)
if overrides.DaemonID != "" {
daemonID = overrides.DaemonID
}
deviceName := EnvOrDefault("MULTICA_DAEMON_DEVICE_NAME", host)
if overrides.DeviceName != "" {
deviceName = overrides.DeviceName
}
runtimeName := EnvOrDefault("MULTICA_AGENT_RUNTIME_NAME", DefaultRuntimeName)
if overrides.RuntimeName != "" {
runtimeName = overrides.RuntimeName
}
return Config{
ServerBaseURL: serverBaseURL,
ConfigPath: configPath,
WorkspaceID: workspaceID,
DaemonID: daemonID,
DeviceName: deviceName,
RuntimeName: runtimeName,
Agents: agents,
ReposRoot: reposRoot,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
AgentTimeout: agentTimeout,
}, nil
}
// NormalizeServerBaseURL converts a WebSocket or HTTP URL to a base HTTP URL.
func NormalizeServerBaseURL(raw string) (string, error) {
u, err := url.Parse(strings.TrimSpace(raw))
if err != nil {
return "", fmt.Errorf("invalid MULTICA_SERVER_URL: %w", err)
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
case "http", "https":
default:
return "", fmt.Errorf("MULTICA_SERVER_URL must use ws, wss, http, or https")
}
if u.Path == "/ws" {
u.Path = ""
}
u.RawPath = ""
u.RawQuery = ""
u.Fragment = ""
return strings.TrimRight(u.String(), "/"), nil
}
func resolveDaemonConfigPath(raw string) (string, error) {
if raw != "" {
return filepath.Abs(raw)
}
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("resolve daemon config path: %w", err)
}
return filepath.Join(home, DefaultDaemonConfigPath), nil
}
// LoadPersistedConfig reads the daemon config from disk.
func LoadPersistedConfig(path string) (PersistedConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return PersistedConfig{}, nil
}
return PersistedConfig{}, fmt.Errorf("read daemon config: %w", err)
}
var cfg PersistedConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return PersistedConfig{}, fmt.Errorf("parse daemon config: %w", err)
}
return cfg, nil
}
// SavePersistedConfig writes the daemon config to disk.
func SavePersistedConfig(path string, cfg PersistedConfig) error {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("create daemon config directory: %w", err)
}
data, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return fmt.Errorf("encode daemon config: %w", err)
}
if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil {
return fmt.Errorf("write daemon config: %w", err)
}
return nil
}

View file

@ -0,0 +1,325 @@
package daemon
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/multica-ai/multica/server/pkg/agent"
)
// Daemon is the local agent runtime that polls for and executes tasks.
type Daemon struct {
cfg Config
client *Client
logger *log.Logger
}
// New creates a new Daemon instance.
func New(cfg Config, logger *log.Logger) *Daemon {
return &Daemon{
cfg: cfg,
client: NewClient(cfg.ServerBaseURL),
logger: logger,
}
}
// Run starts the daemon: pairs if needed, registers runtimes, then polls for tasks.
func (d *Daemon) Run(ctx context.Context) error {
agentNames := make([]string, 0, len(d.cfg.Agents))
for name := range d.cfg.Agents {
agentNames = append(agentNames, name)
}
d.logger.Printf("starting daemon agents=%v workspace=%s server=%s repos_root=%s",
agentNames, d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.ReposRoot)
if strings.TrimSpace(d.cfg.WorkspaceID) == "" {
workspaceID, err := d.ensurePaired(ctx)
if err != nil {
return err
}
d.cfg.WorkspaceID = workspaceID
d.logger.Printf("pairing completed for workspace=%s", workspaceID)
}
runtimes, err := d.registerRuntimes(ctx)
if err != nil {
return err
}
runtimeIDs := make([]string, 0, len(runtimes))
for _, rt := range runtimes {
d.logger.Printf("registered runtime id=%s provider=%s status=%s", rt.ID, rt.Provider, rt.Status)
runtimeIDs = append(runtimeIDs, rt.ID)
}
go d.heartbeatLoop(ctx, runtimeIDs)
return d.pollLoop(ctx, runtimeIDs)
}
func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) {
var runtimes []map[string]string
for name, entry := range d.cfg.Agents {
version, err := agent.DetectVersion(ctx, entry.Path)
if err != nil {
d.logger.Printf("skip registering %s: %v", name, err)
continue
}
runtimes = append(runtimes, map[string]string{
"name": fmt.Sprintf("Local %s", strings.ToUpper(name[:1])+name[1:]),
"type": name,
"version": version,
"status": "online",
})
}
if len(runtimes) == 0 {
return nil, fmt.Errorf("no agent runtimes could be registered")
}
req := map[string]any{
"workspace_id": d.cfg.WorkspaceID,
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtimes": runtimes,
}
var resp struct {
Runtimes []Runtime `json:"runtimes"`
}
if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil {
return nil, fmt.Errorf("register runtimes: %w", err)
}
if len(resp.Runtimes) == 0 {
return nil, fmt.Errorf("register runtimes: empty response")
}
return resp.Runtimes, nil
}
func (d *Daemon) ensurePaired(ctx context.Context) (string, error) {
// Use a deterministic agent for the pairing session metadata (prefer codex for backward compat).
var firstName string
var firstEntry AgentEntry
for _, preferred := range []string{"codex", "claude"} {
if entry, ok := d.cfg.Agents[preferred]; ok {
firstName = preferred
firstEntry = entry
break
}
}
version, err := agent.DetectVersion(ctx, firstEntry.Path)
if err != nil {
return "", err
}
session, err := d.client.CreatePairingSession(ctx, map[string]string{
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtime_name": d.cfg.RuntimeName,
"runtime_type": firstName,
"runtime_version": version,
})
if err != nil {
return "", fmt.Errorf("create pairing session: %w", err)
}
if session.LinkURL != nil {
d.logger.Printf("open this link to pair the daemon: %s", *session.LinkURL)
} else {
d.logger.Printf("pairing session created: %s", session.Token)
}
for {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
current, err := d.client.GetPairingSession(ctx, session.Token)
if err != nil {
return "", fmt.Errorf("poll pairing session: %w", err)
}
switch current.Status {
case "approved", "claimed":
if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" {
return "", fmt.Errorf("pairing session approved without workspace")
}
if err := SavePersistedConfig(d.cfg.ConfigPath, PersistedConfig{
WorkspaceID: strings.TrimSpace(*current.WorkspaceID),
}); err != nil {
return "", err
}
if current.Status != "claimed" {
if _, err := d.client.ClaimPairingSession(ctx, current.Token); err != nil {
return "", fmt.Errorf("claim pairing session: %w", err)
}
}
return strings.TrimSpace(*current.WorkspaceID), nil
case "expired":
return "", fmt.Errorf("pairing session expired before approval")
}
if err := SleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return "", err
}
}
}
func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) {
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, rid := range runtimeIDs {
err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": rid,
}, nil)
if err != nil {
d.logger.Printf("heartbeat failed for runtime %s: %v", rid, err)
}
}
}
}
}
func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error {
pollOffset := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
claimed := false
n := len(runtimeIDs)
for i := 0; i < n; i++ {
rid := runtimeIDs[(pollOffset+i)%n]
task, err := d.client.ClaimTask(ctx, rid)
if err != nil {
d.logger.Printf("claim task failed for runtime %s: %v", rid, err)
continue
}
if task != nil {
d.logger.Printf("poll: got task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title)
d.handleTask(ctx, *task)
claimed = true
pollOffset = (pollOffset + i + 1) % n
break
}
}
if !claimed {
pollOffset = (pollOffset + 1) % n
if err := SleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
}
}
}
}
func (d *Daemon) handleTask(ctx context.Context, task Task) {
provider := task.Context.Runtime.Provider
d.logger.Printf("picked task=%s issue=%s provider=%s title=%q", task.ID, task.IssueID, provider, task.Context.Issue.Title)
if err := d.client.StartTask(ctx, task.ID); err != nil {
d.logger.Printf("start task %s failed: %v", task.ID, err)
return
}
_ = d.client.ReportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2)
result, err := d.runTask(ctx, task)
if err != nil {
d.logger.Printf("task %s failed: %v", task.ID, err)
if failErr := d.client.FailTask(ctx, task.ID, err.Error()); failErr != nil {
d.logger.Printf("fail task %s callback failed: %v", task.ID, failErr)
}
return
}
_ = d.client.ReportProgress(ctx, task.ID, "Finishing task", 2, 2)
switch result.Status {
case "blocked":
if err := d.client.FailTask(ctx, task.ID, result.Comment); err != nil {
d.logger.Printf("report blocked task %s failed: %v", task.ID, err)
}
default:
if err := d.client.CompleteTask(ctx, task.ID, result.Comment); err != nil {
d.logger.Printf("complete task %s failed: %v", task.ID, err)
}
}
}
func (d *Daemon) runTask(ctx context.Context, task Task) (TaskResult, error) {
provider := task.Context.Runtime.Provider
entry, ok := d.cfg.Agents[provider]
if !ok {
return TaskResult{}, fmt.Errorf("no agent configured for provider %q", provider)
}
workdir, err := ResolveTaskWorkdir(d.cfg.ReposRoot, task.Context.Issue.Repository)
if err != nil {
return TaskResult{}, err
}
prompt := BuildPrompt(task, workdir)
backend, err := agent.New(provider, agent.Config{
ExecutablePath: entry.Path,
Logger: d.logger,
})
if err != nil {
return TaskResult{}, fmt.Errorf("create agent backend: %w", err)
}
d.logger.Printf(
"starting %s task=%s workdir=%s model=%s timeout=%s",
provider, task.ID, workdir, entry.Model, d.cfg.AgentTimeout,
)
session, err := backend.Execute(ctx, prompt, agent.ExecOptions{
Cwd: workdir,
Model: entry.Model,
Timeout: d.cfg.AgentTimeout,
})
if err != nil {
return TaskResult{}, err
}
// Drain message channel (log tool uses, ignore text since Result has output)
go func() {
for msg := range session.Messages {
switch msg.Type {
case agent.MessageToolUse:
d.logger.Printf("[%s] tool-use: %s (call=%s)", provider, msg.Tool, msg.CallID)
case agent.MessageError:
d.logger.Printf("[%s] error: %s", provider, msg.Content)
}
}
}()
result := <-session.Result
switch result.Status {
case "completed":
if result.Output == "" {
return TaskResult{}, fmt.Errorf("%s returned empty output", provider)
}
return TaskResult{Status: "completed", Comment: result.Output}, nil
case "timeout":
return TaskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout)
default:
errMsg := result.Error
if errMsg == "" {
errMsg = fmt.Sprintf("%s execution %s", provider, result.Status)
}
return TaskResult{Status: "blocked", Comment: errMsg}, nil
}
}

View file

@ -1,4 +1,4 @@
package main
package daemon
import (
"os"
@ -10,9 +10,9 @@ import (
func TestNormalizeServerBaseURL(t *testing.T) {
t.Parallel()
got, err := normalizeServerBaseURL("ws://localhost:8080/ws")
got, err := NormalizeServerBaseURL("ws://localhost:8080/ws")
if err != nil {
t.Fatalf("normalizeServerBaseURL returned error: %v", err)
t.Fatalf("NormalizeServerBaseURL returned error: %v", err)
}
if got != "http://localhost:8080" {
t.Fatalf("expected http://localhost:8080, got %s", got)
@ -28,9 +28,9 @@ func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) {
t.Fatalf("mkdir repo: %v", err)
}
got, err := resolveTaskWorkdir(root, &daemonRepoRef{Path: "repo"})
got, err := ResolveTaskWorkdir(root, &RepoRef{Path: "repo"})
if err != nil {
t.Fatalf("resolveTaskWorkdir returned error: %v", err)
t.Fatalf("ResolveTaskWorkdir returned error: %v", err)
}
if got != repoPath {
t.Fatalf("expected %s, got %s", repoPath, got)
@ -40,15 +40,15 @@ func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) {
func TestBuildPromptIncludesIssueAndSkills(t *testing.T) {
t.Parallel()
prompt := buildPrompt(daemonTask{
Context: daemonTaskContext{
Issue: daemonIssueContext{
prompt := BuildPrompt(Task{
Context: TaskContext{
Issue: IssueContext{
Title: "Fix failing test",
Description: "Investigate and fix the test failure.",
AcceptanceCriteria: []string{"tests pass"},
ContextRefs: []string{"log snippet"},
},
Agent: daemonAgentContext{
Agent: AgentContext{
Name: "Local Codex",
Skills: "Be concise.",
},

View file

@ -0,0 +1,46 @@
package daemon
import (
"context"
"fmt"
"os"
"strings"
"time"
)
// EnvOrDefault returns the trimmed value of the environment variable key,
// falling back to fallback if empty.
func EnvOrDefault(key, fallback string) string {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback
}
return value
}
// DurationFromEnv parses a duration from an environment variable,
// returning fallback if the variable is empty.
func DurationFromEnv(key string, fallback time.Duration) (time.Duration, error) {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback, nil
}
d, err := time.ParseDuration(value)
if err != nil {
return 0, fmt.Errorf("%s: invalid duration %q: %w", key, value, err)
}
return d, nil
}
// SleepWithContext blocks for the given duration or until the context is cancelled.
func SleepWithContext(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}

View file

@ -0,0 +1,93 @@
package daemon
import (
"fmt"
"os"
"path/filepath"
"strings"
)
// BuildPrompt constructs the task prompt for an agent CLI.
func BuildPrompt(task Task, workdir string) string {
var b strings.Builder
b.WriteString("You are running as a local coding agent for a Multica workspace.\n")
b.WriteString("Complete the assigned issue using the local environment.\n")
b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n")
b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n")
fmt.Fprintf(&b, "Working directory: %s\n", workdir)
fmt.Fprintf(&b, "Agent: %s\n", task.Context.Agent.Name)
fmt.Fprintf(&b, "Issue title: %s\n\n", task.Context.Issue.Title)
if task.Context.Issue.Description != "" {
b.WriteString("Issue description:\n")
b.WriteString(task.Context.Issue.Description)
b.WriteString("\n\n")
}
if len(task.Context.Issue.AcceptanceCriteria) > 0 {
b.WriteString("Acceptance criteria:\n")
for _, item := range task.Context.Issue.AcceptanceCriteria {
fmt.Fprintf(&b, "- %s\n", item)
}
b.WriteString("\n")
}
if len(task.Context.Issue.ContextRefs) > 0 {
b.WriteString("Context refs:\n")
for _, item := range task.Context.Issue.ContextRefs {
fmt.Fprintf(&b, "- %s\n", item)
}
b.WriteString("\n")
}
if repo := task.Context.Issue.Repository; repo != nil {
b.WriteString("Repository context:\n")
if repo.URL != "" {
fmt.Fprintf(&b, "- url: %s\n", repo.URL)
}
if repo.Branch != "" {
fmt.Fprintf(&b, "- branch: %s\n", repo.Branch)
}
if repo.Path != "" {
fmt.Fprintf(&b, "- path: %s\n", repo.Path)
}
b.WriteString("\n")
}
if task.Context.Agent.Skills != "" {
b.WriteString("Agent skills/instructions:\n")
b.WriteString(task.Context.Agent.Skills)
b.WriteString("\n\n")
}
b.WriteString("Comment requirements:\n")
b.WriteString("- Lead with the outcome.\n")
b.WriteString("- Mention concrete files or commands if you changed anything.\n")
b.WriteString("- Mention blockers or follow-up actions if relevant.\n")
return b.String()
}
// ResolveTaskWorkdir determines the working directory for a task.
func ResolveTaskWorkdir(reposRoot string, repo *RepoRef) (string, error) {
base := reposRoot
if repo == nil || strings.TrimSpace(repo.Path) == "" {
return base, nil
}
path := strings.TrimSpace(repo.Path)
if !filepath.IsAbs(path) {
path = filepath.Join(base, path)
}
path = filepath.Clean(path)
info, err := os.Stat(path)
if err != nil {
return "", fmt.Errorf("repository path not found: %s", path)
}
if !info.IsDir() {
return "", fmt.Errorf("repository path is not a directory: %s", path)
}
return path, nil
}

View file

@ -0,0 +1,89 @@
package daemon
// AgentEntry describes a single available agent CLI.
type AgentEntry struct {
Path string // path to CLI binary
Model string // model override (optional)
}
// Runtime represents a registered daemon runtime.
type Runtime struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
Status string `json:"status"`
}
// PairingSession represents a daemon pairing session.
type PairingSession struct {
Token string `json:"token"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
WorkspaceID *string `json:"workspace_id"`
Status string `json:"status"`
ApprovedAt *string `json:"approved_at"`
ClaimedAt *string `json:"claimed_at"`
ExpiresAt string `json:"expires_at"`
LinkURL *string `json:"link_url"`
}
// PersistedConfig is the JSON structure saved to ~/.multica/daemon.json.
type PersistedConfig struct {
WorkspaceID string `json:"workspace_id"`
}
// Task represents a claimed task from the server.
type Task struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
IssueID string `json:"issue_id"`
Context TaskContext `json:"context"`
}
// TaskContext contains the snapshot context for a task.
type TaskContext struct {
Issue IssueContext `json:"issue"`
Agent AgentContext `json:"agent"`
Runtime RuntimeContext `json:"runtime"`
}
// IssueContext holds issue details for task execution.
type IssueContext struct {
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
AcceptanceCriteria []string `json:"acceptance_criteria"`
ContextRefs []string `json:"context_refs"`
Repository *RepoRef `json:"repository"`
}
// AgentContext holds agent details for task execution.
type AgentContext struct {
ID string `json:"id"`
Name string `json:"name"`
Skills string `json:"skills"`
}
// RuntimeContext holds runtime details for task execution.
type RuntimeContext struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
DeviceInfo string `json:"device_info"`
}
// RepoRef points to a repository for an issue.
type RepoRef struct {
URL string `json:"url"`
Branch string `json:"branch"`
Path string `json:"path"`
}
// TaskResult is the outcome of executing a task.
type TaskResult struct {
Status string `json:"status"`
Comment string `json:"comment"`
}