diff --git a/server/cmd/daemon/daemon.go b/server/cmd/daemon/daemon.go index 3723d338..e4008efb 100644 --- a/server/cmd/daemon/daemon.go +++ b/server/cmd/daemon/daemon.go @@ -15,6 +15,8 @@ import ( "path/filepath" "strings" "time" + + "github.com/multica-ai/multica/server/pkg/agent" ) const ( @@ -22,11 +24,16 @@ const ( defaultDaemonConfigPath = ".multica/daemon.json" defaultPollInterval = 3 * time.Second defaultHeartbeatInterval = 15 * time.Second - defaultCodexTimeout = 20 * time.Minute - defaultRuntimeName = "Local Codex" - defaultCodexPath = "codex" + defaultAgentTimeout = 20 * time.Minute + defaultRuntimeName = "Local Agent" ) +// agentEntry describes a single available agent CLI. +type agentEntry struct { + Path string // path to CLI binary + Model string // model override (optional) +} + type config struct { ServerBaseURL string ConfigPath string @@ -34,12 +41,11 @@ type config struct { DaemonID string DeviceName string RuntimeName string - CodexPath string - CodexModel string + Agents map[string]agentEntry // "claude" -> entry, "codex" -> entry DefaultWorkdir string PollInterval time.Duration HeartbeatInterval time.Duration - CodexTimeout time.Duration + AgentTimeout time.Duration } type daemon struct { @@ -120,7 +126,7 @@ type daemonRepoRef struct { Path string `json:"path"` } -type codexTaskResult struct { +type taskResult struct { Status string `json:"status"` Comment string `json:"comment"` } @@ -144,9 +150,24 @@ func loadConfig() (config, error) { workspaceID = persisted.WorkspaceID } - codexPath := envOrDefault("MULTICA_CODEX_PATH", defaultCodexPath) - if _, err := exec.LookPath(codexPath); err != nil { - return config{}, fmt.Errorf("codex executable not found at %q: %w", codexPath, err) + // Probe available agent CLIs. + agents := map[string]agentEntry{} + claudePath := envOrDefault("MULTICA_CLAUDE_PATH", "claude") + if _, err := exec.LookPath(claudePath); err == nil { + agents["claude"] = agentEntry{ + Path: claudePath, + Model: strings.TrimSpace(os.Getenv("MULTICA_CLAUDE_MODEL")), + } + } + codexPath := envOrDefault("MULTICA_CODEX_PATH", "codex") + if _, err := exec.LookPath(codexPath); err == nil { + agents["codex"] = agentEntry{ + Path: codexPath, + Model: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")), + } + } + if len(agents) == 0 { + return config{}, fmt.Errorf("no agent CLI found: install claude or codex and ensure it is on PATH") } host, err := os.Hostname() @@ -154,7 +175,7 @@ func loadConfig() (config, error) { host = "local-machine" } - defaultWorkdir := strings.TrimSpace(os.Getenv("MULTICA_CODEX_WORKDIR")) + defaultWorkdir := strings.TrimSpace(os.Getenv("MULTICA_AGENT_WORKDIR")) if defaultWorkdir == "" { defaultWorkdir, err = os.Getwd() if err != nil { @@ -174,7 +195,7 @@ func loadConfig() (config, error) { if err != nil { return config{}, err } - codexTimeout, err := durationFromEnv("MULTICA_CODEX_TIMEOUT", defaultCodexTimeout) + agentTimeout, err := durationFromEnv("MULTICA_AGENT_TIMEOUT", defaultAgentTimeout) if err != nil { return config{}, err } @@ -185,13 +206,12 @@ func loadConfig() (config, error) { WorkspaceID: workspaceID, DaemonID: envOrDefault("MULTICA_DAEMON_ID", host), DeviceName: envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host), - RuntimeName: envOrDefault("MULTICA_CODEX_RUNTIME_NAME", defaultRuntimeName), - CodexPath: codexPath, - CodexModel: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")), + RuntimeName: envOrDefault("MULTICA_AGENT_RUNTIME_NAME", defaultRuntimeName), + Agents: agents, DefaultWorkdir: defaultWorkdir, PollInterval: pollInterval, HeartbeatInterval: heartbeatInterval, - CodexTimeout: codexTimeout, + AgentTimeout: agentTimeout, }, nil } @@ -204,8 +224,12 @@ func newDaemon(cfg config, logger *log.Logger) *daemon { } func (d *daemon) run(ctx context.Context) error { - d.logger.Printf("starting daemon for workspace=%s server=%s runtime=%s workdir=%s", - d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.RuntimeName, d.cfg.DefaultWorkdir) + agentNames := make([]string, 0, len(d.cfg.Agents)) + for name := range d.cfg.Agents { + agentNames = append(agentNames, name) + } + d.logger.Printf("starting daemon agents=%v workspace=%s server=%s workdir=%s", + agentNames, d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.DefaultWorkdir) if strings.TrimSpace(d.cfg.WorkspaceID) == "" { workspaceID, err := d.ensurePaired(ctx) @@ -216,50 +240,68 @@ func (d *daemon) run(ctx context.Context) error { d.logger.Printf("pairing completed for workspace=%s", workspaceID) } - runtime, err := d.registerRuntime(ctx) + runtimes, err := d.registerRuntimes(ctx) if err != nil { return err } - d.logger.Printf("registered runtime id=%s provider=%s status=%s", runtime.ID, runtime.Provider, runtime.Status) + runtimeIDs := make([]string, 0, len(runtimes)) + for _, rt := range runtimes { + d.logger.Printf("registered runtime id=%s provider=%s status=%s", rt.ID, rt.Provider, rt.Status) + runtimeIDs = append(runtimeIDs, rt.ID) + } - go d.heartbeatLoop(ctx, runtime.ID) - return d.pollLoop(ctx, runtime.ID) + go d.heartbeatLoop(ctx, runtimeIDs) + return d.pollLoop(ctx, runtimeIDs) } -func (d *daemon) registerRuntime(ctx context.Context) (daemonRuntime, error) { - version, err := detectCodexVersion(ctx, d.cfg.CodexPath) - if err != nil { - return daemonRuntime{}, err +func (d *daemon) registerRuntimes(ctx context.Context) ([]daemonRuntime, error) { + var runtimes []map[string]string + for name, entry := range d.cfg.Agents { + version, err := agent.DetectVersion(ctx, entry.Path) + if err != nil { + d.logger.Printf("skip registering %s: %v", name, err) + continue + } + runtimes = append(runtimes, map[string]string{ + "name": fmt.Sprintf("Local %s", strings.Title(name)), + "type": name, + "version": version, + "status": "online", + }) + } + if len(runtimes) == 0 { + return nil, fmt.Errorf("no agent runtimes could be registered") } req := map[string]any{ "workspace_id": d.cfg.WorkspaceID, "daemon_id": d.cfg.DaemonID, "device_name": d.cfg.DeviceName, - "runtimes": []map[string]string{ - { - "name": d.cfg.RuntimeName, - "type": "codex", - "version": version, - "status": "online", - }, - }, + "runtimes": runtimes, } var resp struct { Runtimes []daemonRuntime `json:"runtimes"` } if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil { - return daemonRuntime{}, fmt.Errorf("register runtime: %w", err) + return nil, fmt.Errorf("register runtimes: %w", err) } if len(resp.Runtimes) == 0 { - return daemonRuntime{}, fmt.Errorf("register runtime: empty response") + return nil, fmt.Errorf("register runtimes: empty response") } - return resp.Runtimes[0], nil + return resp.Runtimes, nil } func (d *daemon) ensurePaired(ctx context.Context) (string, error) { - version, err := detectCodexVersion(ctx, d.cfg.CodexPath) + // Use the first available agent for the pairing session metadata. + var firstName string + var firstEntry agentEntry + for name, entry := range d.cfg.Agents { + firstName = name + firstEntry = entry + break + } + version, err := agent.DetectVersion(ctx, firstEntry.Path) if err != nil { return "", err } @@ -268,14 +310,14 @@ func (d *daemon) ensurePaired(ctx context.Context) (string, error) { "daemon_id": d.cfg.DaemonID, "device_name": d.cfg.DeviceName, "runtime_name": d.cfg.RuntimeName, - "runtime_type": "codex", + "runtime_type": firstName, "runtime_version": version, }) if err != nil { return "", fmt.Errorf("create pairing session: %w", err) } if session.LinkURL != nil { - d.logger.Printf("open this link to pair the local Codex runtime: %s", *session.LinkURL) + d.logger.Printf("open this link to pair the daemon: %s", *session.LinkURL) } else { d.logger.Printf("pairing session created: %s", session.Token) } @@ -318,7 +360,7 @@ func (d *daemon) ensurePaired(ctx context.Context) (string, error) { } } -func (d *daemon) heartbeatLoop(ctx context.Context, runtimeID string) { +func (d *daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { ticker := time.NewTicker(d.cfg.HeartbeatInterval) defer ticker.Stop() @@ -327,17 +369,19 @@ func (d *daemon) heartbeatLoop(ctx context.Context, runtimeID string) { case <-ctx.Done(): return case <-ticker.C: - err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ - "runtime_id": runtimeID, - }, nil) - if err != nil { - d.logger.Printf("heartbeat failed: %v", err) + for _, rid := range runtimeIDs { + err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ + "runtime_id": rid, + }, nil) + if err != nil { + d.logger.Printf("heartbeat failed for runtime %s: %v", rid, err) + } } } } } -func (d *daemon) pollLoop(ctx context.Context, runtimeID string) error { +func (d *daemon) pollLoop(ctx context.Context, runtimeIDs []string) error { for { select { case <-ctx.Done(): @@ -345,34 +389,39 @@ func (d *daemon) pollLoop(ctx context.Context, runtimeID string) error { default: } - task, err := d.client.claimTask(ctx, runtimeID) - if err != nil { - d.logger.Printf("claim task failed: %v", err) - if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { - return err + claimed := false + for _, rid := range runtimeIDs { + task, err := d.client.claimTask(ctx, rid) + if err != nil { + d.logger.Printf("claim task failed for runtime %s: %v", rid, err) + continue } - continue - } - if task == nil { - if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { - return err + if task != nil { + d.logger.Printf("poll: got task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title) + d.handleTask(ctx, *task) + claimed = true + break } - continue } - d.handleTask(ctx, *task) + if !claimed { + if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return err + } + } } } func (d *daemon) handleTask(ctx context.Context, task daemonTask) { - d.logger.Printf("picked task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title) + provider := task.Context.Runtime.Provider + d.logger.Printf("picked task=%s issue=%s provider=%s title=%q", task.ID, task.IssueID, provider, task.Context.Issue.Title) if err := d.client.startTask(ctx, task.ID); err != nil { d.logger.Printf("start task %s failed: %v", task.ID, err) return } - _ = d.client.reportProgress(ctx, task.ID, "Launching Codex", 1, 2) + _ = d.client.reportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2) result, err := d.runTask(ctx, task) if err != nil { @@ -397,119 +446,76 @@ func (d *daemon) handleTask(ctx context.Context, task daemonTask) { } } -func (d *daemon) runTask(ctx context.Context, task daemonTask) (codexTaskResult, error) { +func (d *daemon) runTask(ctx context.Context, task daemonTask) (taskResult, error) { + provider := task.Context.Runtime.Provider + entry, ok := d.cfg.Agents[provider] + if !ok { + return taskResult{}, fmt.Errorf("no agent configured for provider %q", provider) + } + workdir, err := resolveTaskWorkdir(d.cfg.DefaultWorkdir, task.Context.Issue.Repository) if err != nil { - return codexTaskResult{}, err + return taskResult{}, err } - prompt := buildCodexPrompt(task, workdir) - runCtx, cancel := context.WithTimeout(ctx, d.cfg.CodexTimeout) - defer cancel() + prompt := buildPrompt(task, workdir) - model := d.cfg.CodexModel - if model == "" { - model = "default" + backend, err := agent.New(provider, agent.Config{ + ExecutablePath: entry.Path, + Logger: d.logger, + }) + if err != nil { + return taskResult{}, fmt.Errorf("create agent backend: %w", err) } - startedAt := time.Now() d.logger.Printf( - "starting codex exec task=%s workdir=%s model=%s timeout=%s", - task.ID, - workdir, - model, - d.cfg.CodexTimeout, + "starting %s task=%s workdir=%s model=%s timeout=%s", + provider, task.ID, workdir, entry.Model, d.cfg.AgentTimeout, ) - result, err := runCodexExec(runCtx, d.cfg, workdir, prompt) + session, err := backend.Execute(ctx, prompt, agent.ExecOptions{ + Cwd: workdir, + Model: entry.Model, + Timeout: d.cfg.AgentTimeout, + }) if err != nil { - d.logger.Printf( - "codex exec failed task=%s duration=%s err=%v", - task.ID, - time.Since(startedAt).Round(time.Millisecond), - err, - ) - if errors.Is(runCtx.Err(), context.DeadlineExceeded) { - return codexTaskResult{}, fmt.Errorf("Codex timed out after %s", d.cfg.CodexTimeout) + return taskResult{}, err + } + + // Drain message channel (log tool uses, ignore text since Result has output) + go func() { + for msg := range session.Messages { + switch msg.Type { + case agent.MessageToolUse: + d.logger.Printf("[%s] tool-use: %s (call=%s)", provider, msg.Tool, msg.CallID) + case agent.MessageError: + d.logger.Printf("[%s] error: %s", provider, msg.Content) + } } - return codexTaskResult{}, err - } + }() - d.logger.Printf( - "codex exec finished task=%s duration=%s status=%s", - task.ID, - time.Since(startedAt).Round(time.Millisecond), - result.Status, - ) - return result, nil + result := <-session.Result + + switch result.Status { + case "completed": + if result.Output == "" { + return taskResult{}, fmt.Errorf("%s returned empty output", provider) + } + return taskResult{Status: "completed", Comment: result.Output}, nil + case "timeout": + return taskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout) + default: + errMsg := result.Error + if errMsg == "" { + errMsg = fmt.Sprintf("%s execution %s", provider, result.Status) + } + return taskResult{Status: "blocked", Comment: errMsg}, nil + } } -func runCodexExec(ctx context.Context, cfg config, workdir, prompt string) (codexTaskResult, error) { - outputFile, err := os.CreateTemp("", "multica-codex-output-*.json") - if err != nil { - return codexTaskResult{}, fmt.Errorf("create codex output file: %w", err) - } - outputPath := outputFile.Name() - outputFile.Close() - defer os.Remove(outputPath) - - schemaFile, err := os.CreateTemp("", "multica-codex-schema-*.json") - if err != nil { - return codexTaskResult{}, fmt.Errorf("create schema file: %w", err) - } - schemaPath := schemaFile.Name() - if _, err := schemaFile.WriteString(codexResultSchema); err != nil { - schemaFile.Close() - return codexTaskResult{}, fmt.Errorf("write schema file: %w", err) - } - schemaFile.Close() - defer os.Remove(schemaPath) - - args := []string{ - "-a", "never", - "exec", - "--skip-git-repo-check", - "--sandbox", "workspace-write", - "-C", workdir, - "--output-schema", schemaPath, - "-o", outputPath, - prompt, - } - if cfg.CodexModel != "" { - args = append([]string{"-m", cfg.CodexModel}, args...) - } - - cmd := exec.CommandContext(ctx, cfg.CodexPath, args...) - var output bytes.Buffer - cmd.Stdout = &output - cmd.Stderr = &output - - if err := cmd.Run(); err != nil { - return codexTaskResult{}, fmt.Errorf("codex exec failed: %w\n%s", err, strings.TrimSpace(output.String())) - } - - data, err := os.ReadFile(outputPath) - if err != nil { - return codexTaskResult{}, fmt.Errorf("read codex result: %w", err) - } - - var result codexTaskResult - if err := json.Unmarshal(data, &result); err != nil { - return codexTaskResult{}, fmt.Errorf("parse codex result: %w", err) - } - if result.Comment == "" { - return codexTaskResult{}, fmt.Errorf("codex returned empty comment") - } - if result.Status == "" { - result.Status = "completed" - } - - return result, nil -} - -func buildCodexPrompt(task daemonTask, workdir string) string { +func buildPrompt(task daemonTask, workdir string) string { var b strings.Builder - b.WriteString("You are running as the local Codex runtime for a Multica agent.\n") + b.WriteString("You are running as a local coding agent for a Multica workspace.\n") b.WriteString("Complete the assigned issue using the local environment.\n") b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n") b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n") @@ -590,15 +596,6 @@ func resolveTaskWorkdir(defaultWorkdir string, repo *daemonRepoRef) (string, err return path, nil } -func detectCodexVersion(ctx context.Context, codexPath string) (string, error) { - cmd := exec.CommandContext(ctx, codexPath, "--version") - data, err := cmd.Output() - if err != nil { - return "", fmt.Errorf("detect codex version: %w", err) - } - return strings.TrimSpace(string(data)), nil -} - func resolveDaemonConfigPath(raw string) (string, error) { if raw != "" { return filepath.Abs(raw) @@ -810,17 +807,3 @@ func (c *daemonClient) getJSON(ctx context.Context, path string, respBody any) e return json.NewDecoder(resp.Body).Decode(respBody) } -const codexResultSchema = `{ - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": ["completed", "blocked"] - }, - "comment": { - "type": "string" - } - }, - "required": ["status", "comment"], - "additionalProperties": false -}` diff --git a/server/cmd/daemon/daemon_test.go b/server/cmd/daemon/daemon_test.go index a31be2ff..99abc056 100644 --- a/server/cmd/daemon/daemon_test.go +++ b/server/cmd/daemon/daemon_test.go @@ -40,7 +40,7 @@ func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) { func TestBuildCodexPromptIncludesIssueAndSkills(t *testing.T) { t.Parallel() - prompt := buildCodexPrompt(daemonTask{ + prompt := buildPrompt(daemonTask{ Context: daemonTaskContext{ Issue: daemonIssueContext{ Title: "Fix failing test", diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go new file mode 100644 index 00000000..96a73de4 --- /dev/null +++ b/server/pkg/agent/agent.go @@ -0,0 +1,99 @@ +// Package agent provides a unified interface for executing prompts via +// coding agents (Claude Code, Codex). It mirrors the happy-cli AgentBackend +// pattern, translated to idiomatic Go. +package agent + +import ( + "context" + "fmt" + "log" + "time" +) + +// Backend is the unified interface for executing prompts via coding agents. +type Backend interface { + // Execute runs a prompt and returns a Session for streaming results. + // The caller should read from Session.Messages (optional) and wait on + // Session.Result for the final outcome. + Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) +} + +// ExecOptions configures a single execution. +type ExecOptions struct { + Cwd string + Model string + SystemPrompt string + MaxTurns int + Timeout time.Duration +} + +// Session represents a running agent execution. +type Session struct { + // Messages streams events as the agent works. The channel is closed + // when the agent finishes (before Result is sent). + Messages <-chan Message + // Result receives exactly one value — the final outcome — then closes. + Result <-chan Result +} + +// MessageType identifies the kind of Message. +type MessageType string + +const ( + MessageText MessageType = "text" + MessageToolUse MessageType = "tool-use" + MessageToolResult MessageType = "tool-result" + MessageStatus MessageType = "status" + MessageError MessageType = "error" + MessageLog MessageType = "log" +) + +// Message is a unified event emitted by an agent during execution. +type Message struct { + Type MessageType + Content string // text content (Text, Error, Log) + Tool string // tool name (ToolUse, ToolResult) + CallID string // tool call ID (ToolUse, ToolResult) + Input map[string]any // tool input (ToolUse) + Output string // tool output (ToolResult) + Status string // agent status string (Status) + Level string // log level (Log) +} + +// Result is the final outcome after an agent session completes. +type Result struct { + Status string // "completed", "failed", "aborted", "timeout" + Output string // accumulated text output + Error string // error message if failed + DurationMs int64 + SessionID string +} + +// Config configures a Backend instance. +type Config struct { + ExecutablePath string // path to CLI binary (claude or codex) + Env map[string]string // extra environment variables + Logger *log.Logger +} + +// New creates a Backend for the given agent type. +// Supported types: "claude", "codex". +func New(agentType string, cfg Config) (Backend, error) { + if cfg.Logger == nil { + cfg.Logger = log.Default() + } + + switch agentType { + case "claude": + return &claudeBackend{cfg: cfg}, nil + case "codex": + return &codexBackend{cfg: cfg}, nil + default: + return nil, fmt.Errorf("unknown agent type: %q (supported: claude, codex)", agentType) + } +} + +// DetectVersion runs the agent CLI with --version and returns the output. +func DetectVersion(ctx context.Context, executablePath string) (string, error) { + return detectCLIVersion(ctx, executablePath) +} diff --git a/server/pkg/agent/claude.go b/server/pkg/agent/claude.go new file mode 100644 index 00000000..7749ffbc --- /dev/null +++ b/server/pkg/agent/claude.go @@ -0,0 +1,324 @@ +package agent + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "strings" + "time" +) + +// claudeBackend implements Backend by spawning the Claude Code CLI +// with --output-format stream-json. +type claudeBackend struct { + cfg Config +} + +func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) { + execPath := b.cfg.ExecutablePath + if execPath == "" { + execPath = "claude" + } + if _, err := exec.LookPath(execPath); err != nil { + return nil, fmt.Errorf("claude executable not found at %q: %w", execPath, err) + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = 20 * time.Minute + } + runCtx, cancel := context.WithTimeout(ctx, timeout) + + args := []string{ + "--output-format", "stream-json", + "--verbose", + "--permission-mode", "bypassPermissions", + } + if opts.Model != "" { + args = append(args, "--model", opts.Model) + } + if opts.MaxTurns > 0 { + args = append(args, "--max-turns", fmt.Sprintf("%d", opts.MaxTurns)) + } + if opts.SystemPrompt != "" { + args = append(args, "--append-system-prompt", opts.SystemPrompt) + } + args = append(args, "-p", prompt) + + cmd := exec.CommandContext(runCtx, execPath, args...) + if opts.Cwd != "" { + cmd.Dir = opts.Cwd + } + cmd.Env = buildEnv(b.cfg.Env) + + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("claude stdout pipe: %w", err) + } + stdin, err := cmd.StdinPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("claude stdin pipe: %w", err) + } + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + cancel() + return nil, fmt.Errorf("start claude: %w", err) + } + + b.cfg.Logger.Printf("[claude] started pid=%d cwd=%s model=%s", cmd.Process.Pid, opts.Cwd, opts.Model) + + msgCh := make(chan Message, 64) + resCh := make(chan Result, 1) + + go func() { + defer cancel() + defer close(msgCh) + defer close(resCh) + defer stdin.Close() + + startTime := time.Now() + var output strings.Builder + var sessionID string + finalStatus := "completed" + var finalError string + + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + var msg claudeSDKMessage + if err := json.Unmarshal([]byte(line), &msg); err != nil { + continue + } + + switch msg.Type { + case "assistant": + b.handleAssistant(msg, msgCh, &output) + case "user": + b.handleUser(msg, msgCh) + case "system": + if msg.SessionID != "" { + sessionID = msg.SessionID + } + trySend(msgCh, Message{Type: MessageStatus, Status: "running"}) + case "result": + sessionID = msg.SessionID + if msg.ResultText != "" { + output.Reset() + output.WriteString(msg.ResultText) + } + if msg.IsError { + finalStatus = "failed" + finalError = msg.ResultText + } + case "log": + if msg.Log != nil { + trySend(msgCh, Message{ + Type: MessageLog, + Level: msg.Log.Level, + Content: msg.Log.Message, + }) + } + case "control_request": + b.handleControlRequest(msg, stdin) + } + } + + // Wait for process exit + exitErr := cmd.Wait() + duration := time.Since(startTime) + + if runCtx.Err() == context.DeadlineExceeded { + finalStatus = "timeout" + finalError = fmt.Sprintf("claude timed out after %s", timeout) + } else if runCtx.Err() == context.Canceled { + finalStatus = "aborted" + finalError = "execution cancelled" + } else if exitErr != nil && finalStatus == "completed" { + finalStatus = "failed" + finalError = fmt.Sprintf("claude exited with error: %v", exitErr) + } + + b.cfg.Logger.Printf("[claude] finished pid=%d status=%s duration=%s", + cmd.Process.Pid, finalStatus, duration.Round(time.Millisecond)) + + resCh <- Result{ + Status: finalStatus, + Output: output.String(), + Error: finalError, + DurationMs: duration.Milliseconds(), + SessionID: sessionID, + } + }() + + return &Session{Messages: msgCh, Result: resCh}, nil +} + +func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message, output *strings.Builder) { + var content claudeMessageContent + if err := json.Unmarshal(msg.Message, &content); err != nil { + return + } + + for _, block := range content.Content { + switch block.Type { + case "text": + if block.Text != "" { + output.WriteString(block.Text) + trySend(ch, Message{Type: MessageText, Content: block.Text}) + } + case "tool_use": + var input map[string]any + if block.Input != nil { + _ = json.Unmarshal(block.Input, &input) + } + trySend(ch, Message{ + Type: MessageToolUse, + Tool: block.Name, + CallID: block.ID, + Input: input, + }) + } + } +} + +func (b *claudeBackend) handleUser(msg claudeSDKMessage, ch chan<- Message) { + var content claudeMessageContent + if err := json.Unmarshal(msg.Message, &content); err != nil { + return + } + + for _, block := range content.Content { + if block.Type == "tool_result" { + resultStr := "" + if block.Content != nil { + resultStr = string(block.Content) + } + trySend(ch, Message{ + Type: MessageToolResult, + CallID: block.ToolUseID, + Output: resultStr, + }) + } + } +} + +func (b *claudeBackend) handleControlRequest(msg claudeSDKMessage, stdin interface{ Write([]byte) (int, error) }) { + // Auto-approve all tool uses in autonomous/daemon mode. + var req claudeControlRequestPayload + if err := json.Unmarshal(msg.Request, &req); err != nil { + return + } + + var inputMap map[string]any + if req.Input != nil { + _ = json.Unmarshal(req.Input, &inputMap) + } + if inputMap == nil { + inputMap = map[string]any{} + } + + response := map[string]any{ + "type": "control_response", + "response": map[string]any{ + "subtype": "success", + "request_id": msg.RequestID, + "response": map[string]any{ + "behavior": "allow", + "updatedInput": inputMap, + }, + }, + } + + data, err := json.Marshal(response) + if err != nil { + return + } + data = append(data, '\n') + _, _ = stdin.Write(data) +} + +// ── Claude SDK JSON types ── + +type claudeSDKMessage struct { + Type string `json:"type"` + Message json.RawMessage `json:"message,omitempty"` + Subtype string `json:"subtype,omitempty"` + SessionID string `json:"session_id,omitempty"` + + // result fields + ResultText string `json:"result,omitempty"` + IsError bool `json:"is_error,omitempty"` + DurationMs float64 `json:"duration_ms,omitempty"` + NumTurns int `json:"num_turns,omitempty"` + + // log fields + Log *claudeLogEntry `json:"log,omitempty"` + + // control request fields + RequestID string `json:"request_id,omitempty"` + Request json.RawMessage `json:"request,omitempty"` +} + +type claudeLogEntry struct { + Level string `json:"level"` + Message string `json:"message"` +} + +type claudeMessageContent struct { + Role string `json:"role"` + Content []claudeContentBlock `json:"content"` +} + +type claudeContentBlock struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` + ToolUseID string `json:"tool_use_id,omitempty"` + Content json.RawMessage `json:"content,omitempty"` +} + +type claudeControlRequestPayload struct { + Subtype string `json:"subtype"` + ToolName string `json:"tool_name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` +} + +// ── Shared helpers ── + +func trySend(ch chan<- Message, msg Message) { + select { + case ch <- msg: + default: + } +} + +func buildEnv(extra map[string]string) []string { + env := os.Environ() + for k, v := range extra { + env = append(env, k+"="+v) + } + return env +} + +func detectCLIVersion(ctx context.Context, execPath string) (string, error) { + cmd := exec.CommandContext(ctx, execPath, "--version") + data, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("detect version for %s: %w", execPath, err) + } + return strings.TrimSpace(string(data)), nil +} diff --git a/server/pkg/agent/codex.go b/server/pkg/agent/codex.go new file mode 100644 index 00000000..95f68fbc --- /dev/null +++ b/server/pkg/agent/codex.go @@ -0,0 +1,632 @@ +package agent + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "strings" + "sync" + "time" +) + +// codexBackend implements Backend by spawning `codex app-server --listen stdio://` +// and communicating via JSON-RPC 2.0 over stdin/stdout. +type codexBackend struct { + cfg Config +} + +func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) { + execPath := b.cfg.ExecutablePath + if execPath == "" { + execPath = "codex" + } + if _, err := exec.LookPath(execPath); err != nil { + return nil, fmt.Errorf("codex executable not found at %q: %w", execPath, err) + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = 20 * time.Minute + } + runCtx, cancel := context.WithTimeout(ctx, timeout) + + cmd := exec.CommandContext(runCtx, execPath, "app-server", "--listen", "stdio://") + if opts.Cwd != "" { + cmd.Dir = opts.Cwd + } + cmd.Env = buildEnv(b.cfg.Env) + + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("codex stdout pipe: %w", err) + } + stdin, err := cmd.StdinPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("codex stdin pipe: %w", err) + } + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + cancel() + return nil, fmt.Errorf("start codex: %w", err) + } + + b.cfg.Logger.Printf("[codex] started app-server pid=%d cwd=%s", cmd.Process.Pid, opts.Cwd) + + c := &codexClient{ + cfg: b.cfg, + stdin: stdin, + pending: make(map[int]*pendingRPC), + } + + msgCh := make(chan Message, 64) + resCh := make(chan Result, 1) + + // Start reading stdout in background + go func() { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + c.handleLine(line, msgCh) + } + c.closeAllPending(fmt.Errorf("codex process exited")) + }() + + // Drive the session lifecycle in a goroutine + go func() { + defer cancel() + defer close(msgCh) + defer close(resCh) + defer func() { + stdin.Close() + _ = cmd.Wait() + }() + + startTime := time.Now() + finalStatus := "completed" + var finalError string + var output strings.Builder + + // Drain messages to accumulate output + c.onMessage = func(msg Message) { + if msg.Type == MessageText { + output.WriteString(msg.Content) + } + trySend(msgCh, msg) + } + + // 1. Initialize handshake + _, err := c.request(runCtx, "initialize", map[string]any{ + "clientInfo": map[string]any{ + "name": "multica-agent-sdk", + "title": "Multica Agent SDK", + "version": "0.2.0", + }, + "capabilities": map[string]any{ + "experimentalApi": true, + }, + }) + if err != nil { + finalStatus = "failed" + finalError = fmt.Sprintf("codex initialize failed: %v", err) + resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()} + return + } + c.notify("initialized") + + // 2. Start thread + threadResult, err := c.request(runCtx, "thread/start", map[string]any{ + "model": nilIfEmpty(opts.Model), + "modelProvider": nil, + "profile": nil, + "cwd": opts.Cwd, + "approvalPolicy": nil, + "sandbox": "workspace-write", + "config": nil, + "baseInstructions": nil, + "developerInstructions": nilIfEmpty(opts.SystemPrompt), + "compactPrompt": nil, + "includeApplyPatchTool": nil, + "experimentalRawEvents": false, + "persistExtendedHistory": true, + }) + if err != nil { + finalStatus = "failed" + finalError = fmt.Sprintf("codex thread/start failed: %v", err) + resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()} + return + } + + threadID := extractThreadID(threadResult) + if threadID == "" { + finalStatus = "failed" + finalError = "codex thread/start returned no thread ID" + resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()} + return + } + c.threadID = threadID + b.cfg.Logger.Printf("[codex] thread started: %s", threadID) + + // 3. Send turn and wait for completion + turnDone := make(chan bool, 1) // true = aborted + c.onTurnDone = func(aborted bool) { + select { + case turnDone <- aborted: + default: + } + } + + _, err = c.request(runCtx, "turn/start", map[string]any{ + "threadId": threadID, + "input": []map[string]any{ + {"type": "text", "text": prompt}, + }, + }) + if err != nil { + finalStatus = "failed" + finalError = fmt.Sprintf("codex turn/start failed: %v", err) + resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()} + return + } + + // Wait for turn completion or context cancellation + select { + case aborted := <-turnDone: + if aborted { + finalStatus = "aborted" + finalError = "turn was aborted" + } + case <-runCtx.Done(): + if runCtx.Err() == context.DeadlineExceeded { + finalStatus = "timeout" + finalError = fmt.Sprintf("codex timed out after %s", timeout) + } else { + finalStatus = "aborted" + finalError = "execution cancelled" + } + } + + duration := time.Since(startTime) + b.cfg.Logger.Printf("[codex] finished pid=%d status=%s duration=%s", + cmd.Process.Pid, finalStatus, duration.Round(time.Millisecond)) + + resCh <- Result{ + Status: finalStatus, + Output: output.String(), + Error: finalError, + DurationMs: duration.Milliseconds(), + } + }() + + return &Session{Messages: msgCh, Result: resCh}, nil +} + +// ── codexClient: JSON-RPC 2.0 transport ── + +type codexClient struct { + cfg Config + stdin interface{ Write([]byte) (int, error) } + mu sync.Mutex + nextID int + pending map[int]*pendingRPC + threadID string + turnID string + onMessage func(Message) + onTurnDone func(aborted bool) + + notificationProtocol string // "unknown", "legacy", "raw" + turnStarted bool + completedTurnIDs map[string]bool +} + +type pendingRPC struct { + ch chan rpcResult + method string +} + +type rpcResult struct { + result json.RawMessage + err error +} + +func (c *codexClient) request(ctx context.Context, method string, params any) (json.RawMessage, error) { + c.mu.Lock() + c.nextID++ + id := c.nextID + pr := &pendingRPC{ch: make(chan rpcResult, 1), method: method} + c.pending[id] = pr + c.mu.Unlock() + + msg := map[string]any{ + "jsonrpc": "2.0", + "id": id, + "method": method, + "params": params, + } + data, err := json.Marshal(msg) + if err != nil { + c.mu.Lock() + delete(c.pending, id) + c.mu.Unlock() + return nil, err + } + data = append(data, '\n') + if _, err := c.stdin.Write(data); err != nil { + c.mu.Lock() + delete(c.pending, id) + c.mu.Unlock() + return nil, fmt.Errorf("write %s: %w", method, err) + } + + select { + case res := <-pr.ch: + return res.result, res.err + case <-ctx.Done(): + c.mu.Lock() + delete(c.pending, id) + c.mu.Unlock() + return nil, ctx.Err() + } +} + +func (c *codexClient) notify(method string) { + msg := map[string]any{ + "jsonrpc": "2.0", + "method": method, + } + data, _ := json.Marshal(msg) + data = append(data, '\n') + _, _ = c.stdin.Write(data) +} + +func (c *codexClient) respond(id int, result any) { + msg := map[string]any{ + "jsonrpc": "2.0", + "id": id, + "result": result, + } + data, _ := json.Marshal(msg) + data = append(data, '\n') + _, _ = c.stdin.Write(data) +} + +func (c *codexClient) closeAllPending(err error) { + c.mu.Lock() + defer c.mu.Unlock() + for id, pr := range c.pending { + pr.ch <- rpcResult{err: err} + delete(c.pending, id) + } +} + +func (c *codexClient) handleLine(line string, msgCh chan<- Message) { + var raw map[string]json.RawMessage + if err := json.Unmarshal([]byte(line), &raw); err != nil { + return + } + + // Check if it's a response to our request + if _, hasID := raw["id"]; hasID { + if _, hasResult := raw["result"]; hasResult { + c.handleResponse(raw) + return + } + if _, hasError := raw["error"]; hasError { + c.handleResponse(raw) + return + } + // Server request (has id + method) + if _, hasMethod := raw["method"]; hasMethod { + c.handleServerRequest(raw) + return + } + } + + // Notification (no id, has method) + if _, hasMethod := raw["method"]; hasMethod { + c.handleNotification(raw) + } +} + +func (c *codexClient) handleResponse(raw map[string]json.RawMessage) { + var id int + if err := json.Unmarshal(raw["id"], &id); err != nil { + return + } + + c.mu.Lock() + pr, ok := c.pending[id] + if ok { + delete(c.pending, id) + } + c.mu.Unlock() + + if !ok { + return + } + + if errData, hasErr := raw["error"]; hasErr { + var rpcErr struct { + Code int `json:"code"` + Message string `json:"message"` + } + _ = json.Unmarshal(errData, &rpcErr) + pr.ch <- rpcResult{err: fmt.Errorf("%s: %s (code=%d)", pr.method, rpcErr.Message, rpcErr.Code)} + } else { + pr.ch <- rpcResult{result: raw["result"]} + } +} + +func (c *codexClient) handleServerRequest(raw map[string]json.RawMessage) { + var id int + _ = json.Unmarshal(raw["id"], &id) + + var method string + _ = json.Unmarshal(raw["method"], &method) + + // Auto-approve all exec/patch requests in daemon mode + switch method { + case "item/commandExecution/requestApproval", "execCommandApproval": + c.respond(id, map[string]any{"decision": "accept"}) + case "item/fileChange/requestApproval", "applyPatchApproval": + c.respond(id, map[string]any{"decision": "accept"}) + default: + c.respond(id, map[string]any{}) + } +} + +func (c *codexClient) handleNotification(raw map[string]json.RawMessage) { + var method string + _ = json.Unmarshal(raw["method"], &method) + + var params map[string]any + if p, ok := raw["params"]; ok { + _ = json.Unmarshal(p, ¶ms) + } + + // Legacy codex/event notifications + if method == "codex/event" || strings.HasPrefix(method, "codex/event/") { + c.notificationProtocol = "legacy" + msgData, ok := params["msg"] + if !ok { + return + } + msgMap, ok := msgData.(map[string]any) + if !ok { + return + } + c.handleEvent(msgMap) + return + } + + // Raw v2 notifications + if c.notificationProtocol != "legacy" { + if c.notificationProtocol == "unknown" && + (method == "turn/started" || method == "turn/completed" || + method == "thread/started" || strings.HasPrefix(method, "item/")) { + c.notificationProtocol = "raw" + } + + if c.notificationProtocol == "raw" { + c.handleRawNotification(method, params) + } + } +} + +func (c *codexClient) handleEvent(msg map[string]any) { + msgType, _ := msg["type"].(string) + + switch msgType { + case "task_started": + c.turnStarted = true + if c.onMessage != nil { + c.onMessage(Message{Type: MessageStatus, Status: "running"}) + } + case "agent_message": + text, _ := msg["message"].(string) + if text != "" && c.onMessage != nil { + c.onMessage(Message{Type: MessageText, Content: text}) + } + case "exec_command_begin": + callID, _ := msg["call_id"].(string) + command, _ := msg["command"].(string) + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolUse, + Tool: "exec_command", + CallID: callID, + Input: map[string]any{"command": command}, + }) + } + case "exec_command_end": + callID, _ := msg["call_id"].(string) + output, _ := msg["output"].(string) + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolResult, + Tool: "exec_command", + CallID: callID, + Output: output, + }) + } + case "patch_apply_begin": + callID, _ := msg["call_id"].(string) + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolUse, + Tool: "patch_apply", + CallID: callID, + }) + } + case "patch_apply_end": + callID, _ := msg["call_id"].(string) + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolResult, + Tool: "patch_apply", + CallID: callID, + }) + } + case "task_complete": + if c.onTurnDone != nil { + c.onTurnDone(false) + } + case "turn_aborted": + if c.onTurnDone != nil { + c.onTurnDone(true) + } + } +} + +func (c *codexClient) handleRawNotification(method string, params map[string]any) { + switch method { + case "turn/started": + c.turnStarted = true + if turnID := extractNestedString(params, "turn", "id"); turnID != "" { + c.turnID = turnID + } + if c.onMessage != nil { + c.onMessage(Message{Type: MessageStatus, Status: "running"}) + } + + case "turn/completed": + turnID := extractNestedString(params, "turn", "id") + status := extractNestedString(params, "turn", "status") + aborted := status == "cancelled" || status == "canceled" || + status == "aborted" || status == "interrupted" + + if c.completedTurnIDs == nil { + c.completedTurnIDs = map[string]bool{} + } + if turnID != "" { + if c.completedTurnIDs[turnID] { + return + } + c.completedTurnIDs[turnID] = true + } + + if c.onTurnDone != nil { + c.onTurnDone(aborted) + } + + case "thread/status/changed": + statusType := extractNestedString(params, "status", "type") + if statusType == "idle" && c.turnStarted { + if c.onTurnDone != nil { + c.onTurnDone(false) + } + } + + default: + if strings.HasPrefix(method, "item/") { + c.handleItemNotification(method, params) + } + } +} + +func (c *codexClient) handleItemNotification(method string, params map[string]any) { + item, ok := params["item"].(map[string]any) + if !ok { + return + } + + itemType, _ := item["type"].(string) + itemID, _ := item["id"].(string) + + switch { + case method == "item/started" && itemType == "commandExecution": + command, _ := item["command"].(string) + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolUse, + Tool: "exec_command", + CallID: itemID, + Input: map[string]any{"command": command}, + }) + } + + case method == "item/completed" && itemType == "commandExecution": + output, _ := item["aggregatedOutput"].(string) + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolResult, + Tool: "exec_command", + CallID: itemID, + Output: output, + }) + } + + case method == "item/started" && itemType == "fileChange": + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolUse, + Tool: "patch_apply", + CallID: itemID, + }) + } + + case method == "item/completed" && itemType == "fileChange": + if c.onMessage != nil { + c.onMessage(Message{ + Type: MessageToolResult, + Tool: "patch_apply", + CallID: itemID, + }) + } + + case method == "item/completed" && itemType == "agentMessage": + text, _ := item["text"].(string) + if text != "" && c.onMessage != nil { + c.onMessage(Message{Type: MessageText, Content: text}) + } + phase, _ := item["phase"].(string) + if phase == "final_answer" && c.turnStarted { + if c.onTurnDone != nil { + c.onTurnDone(false) + } + } + } +} + +// ── Helpers ── + +func extractThreadID(result json.RawMessage) string { + var r struct { + Thread struct { + ID string `json:"id"` + } `json:"thread"` + } + if err := json.Unmarshal(result, &r); err != nil { + return "" + } + return r.Thread.ID +} + +func extractNestedString(m map[string]any, keys ...string) string { + current := any(m) + for _, key := range keys { + obj, ok := current.(map[string]any) + if !ok { + return "" + } + current = obj[key] + } + s, _ := current.(string) + return s +} + +func nilIfEmpty(s string) any { + if s == "" { + return nil + } + return s +}