feat(daemon): unified agent SDK supporting Claude Code and Codex

Add a reusable Go agent package (server/pkg/agent/) that provides a
unified Backend interface for executing prompts via either Claude Code
or Codex. The daemon now auto-detects which CLIs are available at
startup, registers a runtime for each, and routes tasks to the correct
backend based on task.Context.Runtime.Provider.

Key changes:
- server/pkg/agent/agent.go: Backend interface, Message/Result types, factory
- server/pkg/agent/claude.go: Spawns claude CLI with stream-json, parses output
- server/pkg/agent/codex.go: Spawns codex app-server, JSON-RPC 2.0 protocol
- server/cmd/daemon/daemon.go: Multi-runtime registration, round-robin polling,
  provider-based backend selection. Removes old runCodexExec/codexResultSchema.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
yushen 2026-03-24 14:05:03 +08:00
parent 94c9b07bfb
commit bb45f17cf9
5 changed files with 1220 additions and 182 deletions

View file

@ -15,6 +15,8 @@ import (
"path/filepath"
"strings"
"time"
"github.com/multica-ai/multica/server/pkg/agent"
)
const (
@ -22,11 +24,16 @@ const (
defaultDaemonConfigPath = ".multica/daemon.json"
defaultPollInterval = 3 * time.Second
defaultHeartbeatInterval = 15 * time.Second
defaultCodexTimeout = 20 * time.Minute
defaultRuntimeName = "Local Codex"
defaultCodexPath = "codex"
defaultAgentTimeout = 20 * time.Minute
defaultRuntimeName = "Local Agent"
)
// agentEntry describes a single available agent CLI.
type agentEntry struct {
Path string // path to CLI binary
Model string // model override (optional)
}
type config struct {
ServerBaseURL string
ConfigPath string
@ -34,12 +41,11 @@ type config struct {
DaemonID string
DeviceName string
RuntimeName string
CodexPath string
CodexModel string
Agents map[string]agentEntry // "claude" -> entry, "codex" -> entry
DefaultWorkdir string
PollInterval time.Duration
HeartbeatInterval time.Duration
CodexTimeout time.Duration
AgentTimeout time.Duration
}
type daemon struct {
@ -120,7 +126,7 @@ type daemonRepoRef struct {
Path string `json:"path"`
}
type codexTaskResult struct {
type taskResult struct {
Status string `json:"status"`
Comment string `json:"comment"`
}
@ -144,9 +150,24 @@ func loadConfig() (config, error) {
workspaceID = persisted.WorkspaceID
}
codexPath := envOrDefault("MULTICA_CODEX_PATH", defaultCodexPath)
if _, err := exec.LookPath(codexPath); err != nil {
return config{}, fmt.Errorf("codex executable not found at %q: %w", codexPath, err)
// Probe available agent CLIs.
agents := map[string]agentEntry{}
claudePath := envOrDefault("MULTICA_CLAUDE_PATH", "claude")
if _, err := exec.LookPath(claudePath); err == nil {
agents["claude"] = agentEntry{
Path: claudePath,
Model: strings.TrimSpace(os.Getenv("MULTICA_CLAUDE_MODEL")),
}
}
codexPath := envOrDefault("MULTICA_CODEX_PATH", "codex")
if _, err := exec.LookPath(codexPath); err == nil {
agents["codex"] = agentEntry{
Path: codexPath,
Model: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")),
}
}
if len(agents) == 0 {
return config{}, fmt.Errorf("no agent CLI found: install claude or codex and ensure it is on PATH")
}
host, err := os.Hostname()
@ -154,7 +175,7 @@ func loadConfig() (config, error) {
host = "local-machine"
}
defaultWorkdir := strings.TrimSpace(os.Getenv("MULTICA_CODEX_WORKDIR"))
defaultWorkdir := strings.TrimSpace(os.Getenv("MULTICA_AGENT_WORKDIR"))
if defaultWorkdir == "" {
defaultWorkdir, err = os.Getwd()
if err != nil {
@ -174,7 +195,7 @@ func loadConfig() (config, error) {
if err != nil {
return config{}, err
}
codexTimeout, err := durationFromEnv("MULTICA_CODEX_TIMEOUT", defaultCodexTimeout)
agentTimeout, err := durationFromEnv("MULTICA_AGENT_TIMEOUT", defaultAgentTimeout)
if err != nil {
return config{}, err
}
@ -185,13 +206,12 @@ func loadConfig() (config, error) {
WorkspaceID: workspaceID,
DaemonID: envOrDefault("MULTICA_DAEMON_ID", host),
DeviceName: envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host),
RuntimeName: envOrDefault("MULTICA_CODEX_RUNTIME_NAME", defaultRuntimeName),
CodexPath: codexPath,
CodexModel: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")),
RuntimeName: envOrDefault("MULTICA_AGENT_RUNTIME_NAME", defaultRuntimeName),
Agents: agents,
DefaultWorkdir: defaultWorkdir,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
CodexTimeout: codexTimeout,
AgentTimeout: agentTimeout,
}, nil
}
@ -204,8 +224,12 @@ func newDaemon(cfg config, logger *log.Logger) *daemon {
}
func (d *daemon) run(ctx context.Context) error {
d.logger.Printf("starting daemon for workspace=%s server=%s runtime=%s workdir=%s",
d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.RuntimeName, d.cfg.DefaultWorkdir)
agentNames := make([]string, 0, len(d.cfg.Agents))
for name := range d.cfg.Agents {
agentNames = append(agentNames, name)
}
d.logger.Printf("starting daemon agents=%v workspace=%s server=%s workdir=%s",
agentNames, d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.DefaultWorkdir)
if strings.TrimSpace(d.cfg.WorkspaceID) == "" {
workspaceID, err := d.ensurePaired(ctx)
@ -216,50 +240,68 @@ func (d *daemon) run(ctx context.Context) error {
d.logger.Printf("pairing completed for workspace=%s", workspaceID)
}
runtime, err := d.registerRuntime(ctx)
runtimes, err := d.registerRuntimes(ctx)
if err != nil {
return err
}
d.logger.Printf("registered runtime id=%s provider=%s status=%s", runtime.ID, runtime.Provider, runtime.Status)
runtimeIDs := make([]string, 0, len(runtimes))
for _, rt := range runtimes {
d.logger.Printf("registered runtime id=%s provider=%s status=%s", rt.ID, rt.Provider, rt.Status)
runtimeIDs = append(runtimeIDs, rt.ID)
}
go d.heartbeatLoop(ctx, runtime.ID)
return d.pollLoop(ctx, runtime.ID)
go d.heartbeatLoop(ctx, runtimeIDs)
return d.pollLoop(ctx, runtimeIDs)
}
func (d *daemon) registerRuntime(ctx context.Context) (daemonRuntime, error) {
version, err := detectCodexVersion(ctx, d.cfg.CodexPath)
if err != nil {
return daemonRuntime{}, err
func (d *daemon) registerRuntimes(ctx context.Context) ([]daemonRuntime, error) {
var runtimes []map[string]string
for name, entry := range d.cfg.Agents {
version, err := agent.DetectVersion(ctx, entry.Path)
if err != nil {
d.logger.Printf("skip registering %s: %v", name, err)
continue
}
runtimes = append(runtimes, map[string]string{
"name": fmt.Sprintf("Local %s", strings.Title(name)),
"type": name,
"version": version,
"status": "online",
})
}
if len(runtimes) == 0 {
return nil, fmt.Errorf("no agent runtimes could be registered")
}
req := map[string]any{
"workspace_id": d.cfg.WorkspaceID,
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtimes": []map[string]string{
{
"name": d.cfg.RuntimeName,
"type": "codex",
"version": version,
"status": "online",
},
},
"runtimes": runtimes,
}
var resp struct {
Runtimes []daemonRuntime `json:"runtimes"`
}
if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil {
return daemonRuntime{}, fmt.Errorf("register runtime: %w", err)
return nil, fmt.Errorf("register runtimes: %w", err)
}
if len(resp.Runtimes) == 0 {
return daemonRuntime{}, fmt.Errorf("register runtime: empty response")
return nil, fmt.Errorf("register runtimes: empty response")
}
return resp.Runtimes[0], nil
return resp.Runtimes, nil
}
func (d *daemon) ensurePaired(ctx context.Context) (string, error) {
version, err := detectCodexVersion(ctx, d.cfg.CodexPath)
// Use the first available agent for the pairing session metadata.
var firstName string
var firstEntry agentEntry
for name, entry := range d.cfg.Agents {
firstName = name
firstEntry = entry
break
}
version, err := agent.DetectVersion(ctx, firstEntry.Path)
if err != nil {
return "", err
}
@ -268,14 +310,14 @@ func (d *daemon) ensurePaired(ctx context.Context) (string, error) {
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtime_name": d.cfg.RuntimeName,
"runtime_type": "codex",
"runtime_type": firstName,
"runtime_version": version,
})
if err != nil {
return "", fmt.Errorf("create pairing session: %w", err)
}
if session.LinkURL != nil {
d.logger.Printf("open this link to pair the local Codex runtime: %s", *session.LinkURL)
d.logger.Printf("open this link to pair the daemon: %s", *session.LinkURL)
} else {
d.logger.Printf("pairing session created: %s", session.Token)
}
@ -318,7 +360,7 @@ func (d *daemon) ensurePaired(ctx context.Context) (string, error) {
}
}
func (d *daemon) heartbeatLoop(ctx context.Context, runtimeID string) {
func (d *daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) {
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
defer ticker.Stop()
@ -327,17 +369,19 @@ func (d *daemon) heartbeatLoop(ctx context.Context, runtimeID string) {
case <-ctx.Done():
return
case <-ticker.C:
err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": runtimeID,
}, nil)
if err != nil {
d.logger.Printf("heartbeat failed: %v", err)
for _, rid := range runtimeIDs {
err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": rid,
}, nil)
if err != nil {
d.logger.Printf("heartbeat failed for runtime %s: %v", rid, err)
}
}
}
}
}
func (d *daemon) pollLoop(ctx context.Context, runtimeID string) error {
func (d *daemon) pollLoop(ctx context.Context, runtimeIDs []string) error {
for {
select {
case <-ctx.Done():
@ -345,34 +389,39 @@ func (d *daemon) pollLoop(ctx context.Context, runtimeID string) error {
default:
}
task, err := d.client.claimTask(ctx, runtimeID)
if err != nil {
d.logger.Printf("claim task failed: %v", err)
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
claimed := false
for _, rid := range runtimeIDs {
task, err := d.client.claimTask(ctx, rid)
if err != nil {
d.logger.Printf("claim task failed for runtime %s: %v", rid, err)
continue
}
continue
}
if task == nil {
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
if task != nil {
d.logger.Printf("poll: got task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title)
d.handleTask(ctx, *task)
claimed = true
break
}
continue
}
d.handleTask(ctx, *task)
if !claimed {
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
}
}
}
}
func (d *daemon) handleTask(ctx context.Context, task daemonTask) {
d.logger.Printf("picked task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title)
provider := task.Context.Runtime.Provider
d.logger.Printf("picked task=%s issue=%s provider=%s title=%q", task.ID, task.IssueID, provider, task.Context.Issue.Title)
if err := d.client.startTask(ctx, task.ID); err != nil {
d.logger.Printf("start task %s failed: %v", task.ID, err)
return
}
_ = d.client.reportProgress(ctx, task.ID, "Launching Codex", 1, 2)
_ = d.client.reportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2)
result, err := d.runTask(ctx, task)
if err != nil {
@ -397,119 +446,76 @@ func (d *daemon) handleTask(ctx context.Context, task daemonTask) {
}
}
func (d *daemon) runTask(ctx context.Context, task daemonTask) (codexTaskResult, error) {
func (d *daemon) runTask(ctx context.Context, task daemonTask) (taskResult, error) {
provider := task.Context.Runtime.Provider
entry, ok := d.cfg.Agents[provider]
if !ok {
return taskResult{}, fmt.Errorf("no agent configured for provider %q", provider)
}
workdir, err := resolveTaskWorkdir(d.cfg.DefaultWorkdir, task.Context.Issue.Repository)
if err != nil {
return codexTaskResult{}, err
return taskResult{}, err
}
prompt := buildCodexPrompt(task, workdir)
runCtx, cancel := context.WithTimeout(ctx, d.cfg.CodexTimeout)
defer cancel()
prompt := buildPrompt(task, workdir)
model := d.cfg.CodexModel
if model == "" {
model = "default"
backend, err := agent.New(provider, agent.Config{
ExecutablePath: entry.Path,
Logger: d.logger,
})
if err != nil {
return taskResult{}, fmt.Errorf("create agent backend: %w", err)
}
startedAt := time.Now()
d.logger.Printf(
"starting codex exec task=%s workdir=%s model=%s timeout=%s",
task.ID,
workdir,
model,
d.cfg.CodexTimeout,
"starting %s task=%s workdir=%s model=%s timeout=%s",
provider, task.ID, workdir, entry.Model, d.cfg.AgentTimeout,
)
result, err := runCodexExec(runCtx, d.cfg, workdir, prompt)
session, err := backend.Execute(ctx, prompt, agent.ExecOptions{
Cwd: workdir,
Model: entry.Model,
Timeout: d.cfg.AgentTimeout,
})
if err != nil {
d.logger.Printf(
"codex exec failed task=%s duration=%s err=%v",
task.ID,
time.Since(startedAt).Round(time.Millisecond),
err,
)
if errors.Is(runCtx.Err(), context.DeadlineExceeded) {
return codexTaskResult{}, fmt.Errorf("Codex timed out after %s", d.cfg.CodexTimeout)
return taskResult{}, err
}
// Drain message channel (log tool uses, ignore text since Result has output)
go func() {
for msg := range session.Messages {
switch msg.Type {
case agent.MessageToolUse:
d.logger.Printf("[%s] tool-use: %s (call=%s)", provider, msg.Tool, msg.CallID)
case agent.MessageError:
d.logger.Printf("[%s] error: %s", provider, msg.Content)
}
}
return codexTaskResult{}, err
}
}()
d.logger.Printf(
"codex exec finished task=%s duration=%s status=%s",
task.ID,
time.Since(startedAt).Round(time.Millisecond),
result.Status,
)
return result, nil
result := <-session.Result
switch result.Status {
case "completed":
if result.Output == "" {
return taskResult{}, fmt.Errorf("%s returned empty output", provider)
}
return taskResult{Status: "completed", Comment: result.Output}, nil
case "timeout":
return taskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout)
default:
errMsg := result.Error
if errMsg == "" {
errMsg = fmt.Sprintf("%s execution %s", provider, result.Status)
}
return taskResult{Status: "blocked", Comment: errMsg}, nil
}
}
func runCodexExec(ctx context.Context, cfg config, workdir, prompt string) (codexTaskResult, error) {
outputFile, err := os.CreateTemp("", "multica-codex-output-*.json")
if err != nil {
return codexTaskResult{}, fmt.Errorf("create codex output file: %w", err)
}
outputPath := outputFile.Name()
outputFile.Close()
defer os.Remove(outputPath)
schemaFile, err := os.CreateTemp("", "multica-codex-schema-*.json")
if err != nil {
return codexTaskResult{}, fmt.Errorf("create schema file: %w", err)
}
schemaPath := schemaFile.Name()
if _, err := schemaFile.WriteString(codexResultSchema); err != nil {
schemaFile.Close()
return codexTaskResult{}, fmt.Errorf("write schema file: %w", err)
}
schemaFile.Close()
defer os.Remove(schemaPath)
args := []string{
"-a", "never",
"exec",
"--skip-git-repo-check",
"--sandbox", "workspace-write",
"-C", workdir,
"--output-schema", schemaPath,
"-o", outputPath,
prompt,
}
if cfg.CodexModel != "" {
args = append([]string{"-m", cfg.CodexModel}, args...)
}
cmd := exec.CommandContext(ctx, cfg.CodexPath, args...)
var output bytes.Buffer
cmd.Stdout = &output
cmd.Stderr = &output
if err := cmd.Run(); err != nil {
return codexTaskResult{}, fmt.Errorf("codex exec failed: %w\n%s", err, strings.TrimSpace(output.String()))
}
data, err := os.ReadFile(outputPath)
if err != nil {
return codexTaskResult{}, fmt.Errorf("read codex result: %w", err)
}
var result codexTaskResult
if err := json.Unmarshal(data, &result); err != nil {
return codexTaskResult{}, fmt.Errorf("parse codex result: %w", err)
}
if result.Comment == "" {
return codexTaskResult{}, fmt.Errorf("codex returned empty comment")
}
if result.Status == "" {
result.Status = "completed"
}
return result, nil
}
func buildCodexPrompt(task daemonTask, workdir string) string {
func buildPrompt(task daemonTask, workdir string) string {
var b strings.Builder
b.WriteString("You are running as the local Codex runtime for a Multica agent.\n")
b.WriteString("You are running as a local coding agent for a Multica workspace.\n")
b.WriteString("Complete the assigned issue using the local environment.\n")
b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n")
b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n")
@ -590,15 +596,6 @@ func resolveTaskWorkdir(defaultWorkdir string, repo *daemonRepoRef) (string, err
return path, nil
}
func detectCodexVersion(ctx context.Context, codexPath string) (string, error) {
cmd := exec.CommandContext(ctx, codexPath, "--version")
data, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("detect codex version: %w", err)
}
return strings.TrimSpace(string(data)), nil
}
func resolveDaemonConfigPath(raw string) (string, error) {
if raw != "" {
return filepath.Abs(raw)
@ -810,17 +807,3 @@ func (c *daemonClient) getJSON(ctx context.Context, path string, respBody any) e
return json.NewDecoder(resp.Body).Decode(respBody)
}
const codexResultSchema = `{
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["completed", "blocked"]
},
"comment": {
"type": "string"
}
},
"required": ["status", "comment"],
"additionalProperties": false
}`

View file

@ -40,7 +40,7 @@ func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) {
func TestBuildCodexPromptIncludesIssueAndSkills(t *testing.T) {
t.Parallel()
prompt := buildCodexPrompt(daemonTask{
prompt := buildPrompt(daemonTask{
Context: daemonTaskContext{
Issue: daemonIssueContext{
Title: "Fix failing test",

99
server/pkg/agent/agent.go Normal file
View file

@ -0,0 +1,99 @@
// Package agent provides a unified interface for executing prompts via
// coding agents (Claude Code, Codex). It mirrors the happy-cli AgentBackend
// pattern, translated to idiomatic Go.
package agent
import (
"context"
"fmt"
"log"
"time"
)
// Backend is the unified interface for executing prompts via coding agents.
type Backend interface {
// Execute runs a prompt and returns a Session for streaming results.
// The caller should read from Session.Messages (optional) and wait on
// Session.Result for the final outcome.
Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error)
}
// ExecOptions configures a single execution.
type ExecOptions struct {
Cwd string
Model string
SystemPrompt string
MaxTurns int
Timeout time.Duration
}
// Session represents a running agent execution.
type Session struct {
// Messages streams events as the agent works. The channel is closed
// when the agent finishes (before Result is sent).
Messages <-chan Message
// Result receives exactly one value — the final outcome — then closes.
Result <-chan Result
}
// MessageType identifies the kind of Message.
type MessageType string
const (
MessageText MessageType = "text"
MessageToolUse MessageType = "tool-use"
MessageToolResult MessageType = "tool-result"
MessageStatus MessageType = "status"
MessageError MessageType = "error"
MessageLog MessageType = "log"
)
// Message is a unified event emitted by an agent during execution.
type Message struct {
Type MessageType
Content string // text content (Text, Error, Log)
Tool string // tool name (ToolUse, ToolResult)
CallID string // tool call ID (ToolUse, ToolResult)
Input map[string]any // tool input (ToolUse)
Output string // tool output (ToolResult)
Status string // agent status string (Status)
Level string // log level (Log)
}
// Result is the final outcome after an agent session completes.
type Result struct {
Status string // "completed", "failed", "aborted", "timeout"
Output string // accumulated text output
Error string // error message if failed
DurationMs int64
SessionID string
}
// Config configures a Backend instance.
type Config struct {
ExecutablePath string // path to CLI binary (claude or codex)
Env map[string]string // extra environment variables
Logger *log.Logger
}
// New creates a Backend for the given agent type.
// Supported types: "claude", "codex".
func New(agentType string, cfg Config) (Backend, error) {
if cfg.Logger == nil {
cfg.Logger = log.Default()
}
switch agentType {
case "claude":
return &claudeBackend{cfg: cfg}, nil
case "codex":
return &codexBackend{cfg: cfg}, nil
default:
return nil, fmt.Errorf("unknown agent type: %q (supported: claude, codex)", agentType)
}
}
// DetectVersion runs the agent CLI with --version and returns the output.
func DetectVersion(ctx context.Context, executablePath string) (string, error) {
return detectCLIVersion(ctx, executablePath)
}

324
server/pkg/agent/claude.go Normal file
View file

@ -0,0 +1,324 @@
package agent
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"time"
)
// claudeBackend implements Backend by spawning the Claude Code CLI
// with --output-format stream-json.
type claudeBackend struct {
cfg Config
}
func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
execPath := b.cfg.ExecutablePath
if execPath == "" {
execPath = "claude"
}
if _, err := exec.LookPath(execPath); err != nil {
return nil, fmt.Errorf("claude executable not found at %q: %w", execPath, err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
args := []string{
"--output-format", "stream-json",
"--verbose",
"--permission-mode", "bypassPermissions",
}
if opts.Model != "" {
args = append(args, "--model", opts.Model)
}
if opts.MaxTurns > 0 {
args = append(args, "--max-turns", fmt.Sprintf("%d", opts.MaxTurns))
}
if opts.SystemPrompt != "" {
args = append(args, "--append-system-prompt", opts.SystemPrompt)
}
args = append(args, "-p", prompt)
cmd := exec.CommandContext(runCtx, execPath, args...)
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
cmd.Env = buildEnv(b.cfg.Env)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("claude stdout pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("claude stdin pipe: %w", err)
}
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("start claude: %w", err)
}
b.cfg.Logger.Printf("[claude] started pid=%d cwd=%s model=%s", cmd.Process.Pid, opts.Cwd, opts.Model)
msgCh := make(chan Message, 64)
resCh := make(chan Result, 1)
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
defer stdin.Close()
startTime := time.Now()
var output strings.Builder
var sessionID string
finalStatus := "completed"
var finalError string
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var msg claudeSDKMessage
if err := json.Unmarshal([]byte(line), &msg); err != nil {
continue
}
switch msg.Type {
case "assistant":
b.handleAssistant(msg, msgCh, &output)
case "user":
b.handleUser(msg, msgCh)
case "system":
if msg.SessionID != "" {
sessionID = msg.SessionID
}
trySend(msgCh, Message{Type: MessageStatus, Status: "running"})
case "result":
sessionID = msg.SessionID
if msg.ResultText != "" {
output.Reset()
output.WriteString(msg.ResultText)
}
if msg.IsError {
finalStatus = "failed"
finalError = msg.ResultText
}
case "log":
if msg.Log != nil {
trySend(msgCh, Message{
Type: MessageLog,
Level: msg.Log.Level,
Content: msg.Log.Message,
})
}
case "control_request":
b.handleControlRequest(msg, stdin)
}
}
// Wait for process exit
exitErr := cmd.Wait()
duration := time.Since(startTime)
if runCtx.Err() == context.DeadlineExceeded {
finalStatus = "timeout"
finalError = fmt.Sprintf("claude timed out after %s", timeout)
} else if runCtx.Err() == context.Canceled {
finalStatus = "aborted"
finalError = "execution cancelled"
} else if exitErr != nil && finalStatus == "completed" {
finalStatus = "failed"
finalError = fmt.Sprintf("claude exited with error: %v", exitErr)
}
b.cfg.Logger.Printf("[claude] finished pid=%d status=%s duration=%s",
cmd.Process.Pid, finalStatus, duration.Round(time.Millisecond))
resCh <- Result{
Status: finalStatus,
Output: output.String(),
Error: finalError,
DurationMs: duration.Milliseconds(),
SessionID: sessionID,
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message, output *strings.Builder) {
var content claudeMessageContent
if err := json.Unmarshal(msg.Message, &content); err != nil {
return
}
for _, block := range content.Content {
switch block.Type {
case "text":
if block.Text != "" {
output.WriteString(block.Text)
trySend(ch, Message{Type: MessageText, Content: block.Text})
}
case "tool_use":
var input map[string]any
if block.Input != nil {
_ = json.Unmarshal(block.Input, &input)
}
trySend(ch, Message{
Type: MessageToolUse,
Tool: block.Name,
CallID: block.ID,
Input: input,
})
}
}
}
func (b *claudeBackend) handleUser(msg claudeSDKMessage, ch chan<- Message) {
var content claudeMessageContent
if err := json.Unmarshal(msg.Message, &content); err != nil {
return
}
for _, block := range content.Content {
if block.Type == "tool_result" {
resultStr := ""
if block.Content != nil {
resultStr = string(block.Content)
}
trySend(ch, Message{
Type: MessageToolResult,
CallID: block.ToolUseID,
Output: resultStr,
})
}
}
}
func (b *claudeBackend) handleControlRequest(msg claudeSDKMessage, stdin interface{ Write([]byte) (int, error) }) {
// Auto-approve all tool uses in autonomous/daemon mode.
var req claudeControlRequestPayload
if err := json.Unmarshal(msg.Request, &req); err != nil {
return
}
var inputMap map[string]any
if req.Input != nil {
_ = json.Unmarshal(req.Input, &inputMap)
}
if inputMap == nil {
inputMap = map[string]any{}
}
response := map[string]any{
"type": "control_response",
"response": map[string]any{
"subtype": "success",
"request_id": msg.RequestID,
"response": map[string]any{
"behavior": "allow",
"updatedInput": inputMap,
},
},
}
data, err := json.Marshal(response)
if err != nil {
return
}
data = append(data, '\n')
_, _ = stdin.Write(data)
}
// ── Claude SDK JSON types ──
type claudeSDKMessage struct {
Type string `json:"type"`
Message json.RawMessage `json:"message,omitempty"`
Subtype string `json:"subtype,omitempty"`
SessionID string `json:"session_id,omitempty"`
// result fields
ResultText string `json:"result,omitempty"`
IsError bool `json:"is_error,omitempty"`
DurationMs float64 `json:"duration_ms,omitempty"`
NumTurns int `json:"num_turns,omitempty"`
// log fields
Log *claudeLogEntry `json:"log,omitempty"`
// control request fields
RequestID string `json:"request_id,omitempty"`
Request json.RawMessage `json:"request,omitempty"`
}
type claudeLogEntry struct {
Level string `json:"level"`
Message string `json:"message"`
}
type claudeMessageContent struct {
Role string `json:"role"`
Content []claudeContentBlock `json:"content"`
}
type claudeContentBlock struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
ToolUseID string `json:"tool_use_id,omitempty"`
Content json.RawMessage `json:"content,omitempty"`
}
type claudeControlRequestPayload struct {
Subtype string `json:"subtype"`
ToolName string `json:"tool_name,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
}
// ── Shared helpers ──
func trySend(ch chan<- Message, msg Message) {
select {
case ch <- msg:
default:
}
}
func buildEnv(extra map[string]string) []string {
env := os.Environ()
for k, v := range extra {
env = append(env, k+"="+v)
}
return env
}
func detectCLIVersion(ctx context.Context, execPath string) (string, error) {
cmd := exec.CommandContext(ctx, execPath, "--version")
data, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("detect version for %s: %w", execPath, err)
}
return strings.TrimSpace(string(data)), nil
}

632
server/pkg/agent/codex.go Normal file
View file

@ -0,0 +1,632 @@
package agent
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"time"
)
// codexBackend implements Backend by spawning `codex app-server --listen stdio://`
// and communicating via JSON-RPC 2.0 over stdin/stdout.
type codexBackend struct {
cfg Config
}
func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
execPath := b.cfg.ExecutablePath
if execPath == "" {
execPath = "codex"
}
if _, err := exec.LookPath(execPath); err != nil {
return nil, fmt.Errorf("codex executable not found at %q: %w", execPath, err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
cmd := exec.CommandContext(runCtx, execPath, "app-server", "--listen", "stdio://")
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
cmd.Env = buildEnv(b.cfg.Env)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("codex stdout pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("codex stdin pipe: %w", err)
}
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("start codex: %w", err)
}
b.cfg.Logger.Printf("[codex] started app-server pid=%d cwd=%s", cmd.Process.Pid, opts.Cwd)
c := &codexClient{
cfg: b.cfg,
stdin: stdin,
pending: make(map[int]*pendingRPC),
}
msgCh := make(chan Message, 64)
resCh := make(chan Result, 1)
// Start reading stdout in background
go func() {
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
c.handleLine(line, msgCh)
}
c.closeAllPending(fmt.Errorf("codex process exited"))
}()
// Drive the session lifecycle in a goroutine
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
defer func() {
stdin.Close()
_ = cmd.Wait()
}()
startTime := time.Now()
finalStatus := "completed"
var finalError string
var output strings.Builder
// Drain messages to accumulate output
c.onMessage = func(msg Message) {
if msg.Type == MessageText {
output.WriteString(msg.Content)
}
trySend(msgCh, msg)
}
// 1. Initialize handshake
_, err := c.request(runCtx, "initialize", map[string]any{
"clientInfo": map[string]any{
"name": "multica-agent-sdk",
"title": "Multica Agent SDK",
"version": "0.2.0",
},
"capabilities": map[string]any{
"experimentalApi": true,
},
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("codex initialize failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
c.notify("initialized")
// 2. Start thread
threadResult, err := c.request(runCtx, "thread/start", map[string]any{
"model": nilIfEmpty(opts.Model),
"modelProvider": nil,
"profile": nil,
"cwd": opts.Cwd,
"approvalPolicy": nil,
"sandbox": "workspace-write",
"config": nil,
"baseInstructions": nil,
"developerInstructions": nilIfEmpty(opts.SystemPrompt),
"compactPrompt": nil,
"includeApplyPatchTool": nil,
"experimentalRawEvents": false,
"persistExtendedHistory": true,
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("codex thread/start failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
threadID := extractThreadID(threadResult)
if threadID == "" {
finalStatus = "failed"
finalError = "codex thread/start returned no thread ID"
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
c.threadID = threadID
b.cfg.Logger.Printf("[codex] thread started: %s", threadID)
// 3. Send turn and wait for completion
turnDone := make(chan bool, 1) // true = aborted
c.onTurnDone = func(aborted bool) {
select {
case turnDone <- aborted:
default:
}
}
_, err = c.request(runCtx, "turn/start", map[string]any{
"threadId": threadID,
"input": []map[string]any{
{"type": "text", "text": prompt},
},
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("codex turn/start failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
// Wait for turn completion or context cancellation
select {
case aborted := <-turnDone:
if aborted {
finalStatus = "aborted"
finalError = "turn was aborted"
}
case <-runCtx.Done():
if runCtx.Err() == context.DeadlineExceeded {
finalStatus = "timeout"
finalError = fmt.Sprintf("codex timed out after %s", timeout)
} else {
finalStatus = "aborted"
finalError = "execution cancelled"
}
}
duration := time.Since(startTime)
b.cfg.Logger.Printf("[codex] finished pid=%d status=%s duration=%s",
cmd.Process.Pid, finalStatus, duration.Round(time.Millisecond))
resCh <- Result{
Status: finalStatus,
Output: output.String(),
Error: finalError,
DurationMs: duration.Milliseconds(),
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
// ── codexClient: JSON-RPC 2.0 transport ──
type codexClient struct {
cfg Config
stdin interface{ Write([]byte) (int, error) }
mu sync.Mutex
nextID int
pending map[int]*pendingRPC
threadID string
turnID string
onMessage func(Message)
onTurnDone func(aborted bool)
notificationProtocol string // "unknown", "legacy", "raw"
turnStarted bool
completedTurnIDs map[string]bool
}
type pendingRPC struct {
ch chan rpcResult
method string
}
type rpcResult struct {
result json.RawMessage
err error
}
func (c *codexClient) request(ctx context.Context, method string, params any) (json.RawMessage, error) {
c.mu.Lock()
c.nextID++
id := c.nextID
pr := &pendingRPC{ch: make(chan rpcResult, 1), method: method}
c.pending[id] = pr
c.mu.Unlock()
msg := map[string]any{
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}
data, err := json.Marshal(msg)
if err != nil {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
return nil, err
}
data = append(data, '\n')
if _, err := c.stdin.Write(data); err != nil {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
return nil, fmt.Errorf("write %s: %w", method, err)
}
select {
case res := <-pr.ch:
return res.result, res.err
case <-ctx.Done():
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
return nil, ctx.Err()
}
}
func (c *codexClient) notify(method string) {
msg := map[string]any{
"jsonrpc": "2.0",
"method": method,
}
data, _ := json.Marshal(msg)
data = append(data, '\n')
_, _ = c.stdin.Write(data)
}
func (c *codexClient) respond(id int, result any) {
msg := map[string]any{
"jsonrpc": "2.0",
"id": id,
"result": result,
}
data, _ := json.Marshal(msg)
data = append(data, '\n')
_, _ = c.stdin.Write(data)
}
func (c *codexClient) closeAllPending(err error) {
c.mu.Lock()
defer c.mu.Unlock()
for id, pr := range c.pending {
pr.ch <- rpcResult{err: err}
delete(c.pending, id)
}
}
func (c *codexClient) handleLine(line string, msgCh chan<- Message) {
var raw map[string]json.RawMessage
if err := json.Unmarshal([]byte(line), &raw); err != nil {
return
}
// Check if it's a response to our request
if _, hasID := raw["id"]; hasID {
if _, hasResult := raw["result"]; hasResult {
c.handleResponse(raw)
return
}
if _, hasError := raw["error"]; hasError {
c.handleResponse(raw)
return
}
// Server request (has id + method)
if _, hasMethod := raw["method"]; hasMethod {
c.handleServerRequest(raw)
return
}
}
// Notification (no id, has method)
if _, hasMethod := raw["method"]; hasMethod {
c.handleNotification(raw)
}
}
func (c *codexClient) handleResponse(raw map[string]json.RawMessage) {
var id int
if err := json.Unmarshal(raw["id"], &id); err != nil {
return
}
c.mu.Lock()
pr, ok := c.pending[id]
if ok {
delete(c.pending, id)
}
c.mu.Unlock()
if !ok {
return
}
if errData, hasErr := raw["error"]; hasErr {
var rpcErr struct {
Code int `json:"code"`
Message string `json:"message"`
}
_ = json.Unmarshal(errData, &rpcErr)
pr.ch <- rpcResult{err: fmt.Errorf("%s: %s (code=%d)", pr.method, rpcErr.Message, rpcErr.Code)}
} else {
pr.ch <- rpcResult{result: raw["result"]}
}
}
func (c *codexClient) handleServerRequest(raw map[string]json.RawMessage) {
var id int
_ = json.Unmarshal(raw["id"], &id)
var method string
_ = json.Unmarshal(raw["method"], &method)
// Auto-approve all exec/patch requests in daemon mode
switch method {
case "item/commandExecution/requestApproval", "execCommandApproval":
c.respond(id, map[string]any{"decision": "accept"})
case "item/fileChange/requestApproval", "applyPatchApproval":
c.respond(id, map[string]any{"decision": "accept"})
default:
c.respond(id, map[string]any{})
}
}
func (c *codexClient) handleNotification(raw map[string]json.RawMessage) {
var method string
_ = json.Unmarshal(raw["method"], &method)
var params map[string]any
if p, ok := raw["params"]; ok {
_ = json.Unmarshal(p, &params)
}
// Legacy codex/event notifications
if method == "codex/event" || strings.HasPrefix(method, "codex/event/") {
c.notificationProtocol = "legacy"
msgData, ok := params["msg"]
if !ok {
return
}
msgMap, ok := msgData.(map[string]any)
if !ok {
return
}
c.handleEvent(msgMap)
return
}
// Raw v2 notifications
if c.notificationProtocol != "legacy" {
if c.notificationProtocol == "unknown" &&
(method == "turn/started" || method == "turn/completed" ||
method == "thread/started" || strings.HasPrefix(method, "item/")) {
c.notificationProtocol = "raw"
}
if c.notificationProtocol == "raw" {
c.handleRawNotification(method, params)
}
}
}
func (c *codexClient) handleEvent(msg map[string]any) {
msgType, _ := msg["type"].(string)
switch msgType {
case "task_started":
c.turnStarted = true
if c.onMessage != nil {
c.onMessage(Message{Type: MessageStatus, Status: "running"})
}
case "agent_message":
text, _ := msg["message"].(string)
if text != "" && c.onMessage != nil {
c.onMessage(Message{Type: MessageText, Content: text})
}
case "exec_command_begin":
callID, _ := msg["call_id"].(string)
command, _ := msg["command"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "exec_command",
CallID: callID,
Input: map[string]any{"command": command},
})
}
case "exec_command_end":
callID, _ := msg["call_id"].(string)
output, _ := msg["output"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "exec_command",
CallID: callID,
Output: output,
})
}
case "patch_apply_begin":
callID, _ := msg["call_id"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "patch_apply",
CallID: callID,
})
}
case "patch_apply_end":
callID, _ := msg["call_id"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "patch_apply",
CallID: callID,
})
}
case "task_complete":
if c.onTurnDone != nil {
c.onTurnDone(false)
}
case "turn_aborted":
if c.onTurnDone != nil {
c.onTurnDone(true)
}
}
}
func (c *codexClient) handleRawNotification(method string, params map[string]any) {
switch method {
case "turn/started":
c.turnStarted = true
if turnID := extractNestedString(params, "turn", "id"); turnID != "" {
c.turnID = turnID
}
if c.onMessage != nil {
c.onMessage(Message{Type: MessageStatus, Status: "running"})
}
case "turn/completed":
turnID := extractNestedString(params, "turn", "id")
status := extractNestedString(params, "turn", "status")
aborted := status == "cancelled" || status == "canceled" ||
status == "aborted" || status == "interrupted"
if c.completedTurnIDs == nil {
c.completedTurnIDs = map[string]bool{}
}
if turnID != "" {
if c.completedTurnIDs[turnID] {
return
}
c.completedTurnIDs[turnID] = true
}
if c.onTurnDone != nil {
c.onTurnDone(aborted)
}
case "thread/status/changed":
statusType := extractNestedString(params, "status", "type")
if statusType == "idle" && c.turnStarted {
if c.onTurnDone != nil {
c.onTurnDone(false)
}
}
default:
if strings.HasPrefix(method, "item/") {
c.handleItemNotification(method, params)
}
}
}
func (c *codexClient) handleItemNotification(method string, params map[string]any) {
item, ok := params["item"].(map[string]any)
if !ok {
return
}
itemType, _ := item["type"].(string)
itemID, _ := item["id"].(string)
switch {
case method == "item/started" && itemType == "commandExecution":
command, _ := item["command"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "exec_command",
CallID: itemID,
Input: map[string]any{"command": command},
})
}
case method == "item/completed" && itemType == "commandExecution":
output, _ := item["aggregatedOutput"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "exec_command",
CallID: itemID,
Output: output,
})
}
case method == "item/started" && itemType == "fileChange":
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "patch_apply",
CallID: itemID,
})
}
case method == "item/completed" && itemType == "fileChange":
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "patch_apply",
CallID: itemID,
})
}
case method == "item/completed" && itemType == "agentMessage":
text, _ := item["text"].(string)
if text != "" && c.onMessage != nil {
c.onMessage(Message{Type: MessageText, Content: text})
}
phase, _ := item["phase"].(string)
if phase == "final_answer" && c.turnStarted {
if c.onTurnDone != nil {
c.onTurnDone(false)
}
}
}
}
// ── Helpers ──
func extractThreadID(result json.RawMessage) string {
var r struct {
Thread struct {
ID string `json:"id"`
} `json:"thread"`
}
if err := json.Unmarshal(result, &r); err != nil {
return ""
}
return r.Thread.ID
}
func extractNestedString(m map[string]any, keys ...string) string {
current := any(m)
for _, key := range keys {
obj, ok := current.(map[string]any)
if !ok {
return ""
}
current = obj[key]
}
s, _ := current.(string)
return s
}
func nilIfEmpty(s string) any {
if s == "" {
return nil
}
return s
}