multica/server/pkg/agent/codex.go
Naiyuan Qing 8983a9fefa feat(logging): add structured logging across server and SDK
Replace raw fmt/log calls with structured slog logger (Go) and
console-based logger (TypeScript). Add request logging middleware.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:57:11 +08:00

653 lines
16 KiB
Go

package agent
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"sync"
"time"
)
// codexBackend implements Backend by spawning `codex app-server --listen stdio://`
// and communicating via JSON-RPC 2.0 over stdin/stdout.
type codexBackend struct {
cfg Config
}
func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
execPath := b.cfg.ExecutablePath
if execPath == "" {
execPath = "codex"
}
if _, err := exec.LookPath(execPath); err != nil {
return nil, fmt.Errorf("codex executable not found at %q: %w", execPath, err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
cmd := exec.CommandContext(runCtx, execPath, "app-server", "--listen", "stdio://")
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
cmd.Env = buildEnv(b.cfg.Env)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("codex stdout pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("codex stdin pipe: %w", err)
}
cmd.Stderr = newLogWriter(b.cfg.Logger, "[codex:stderr] ")
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("start codex: %w", err)
}
b.cfg.Logger.Info("codex started app-server", "pid", cmd.Process.Pid, "cwd", opts.Cwd)
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
var outputMu sync.Mutex
var output strings.Builder
// turnDone is set before starting the reader goroutine so there is no
// race between the lifecycle goroutine writing and the reader reading.
turnDone := make(chan bool, 1) // true = aborted
c := &codexClient{
cfg: b.cfg,
stdin: stdin,
pending: make(map[int]*pendingRPC),
notificationProtocol: "unknown",
onMessage: func(msg Message) {
if msg.Type == MessageText {
outputMu.Lock()
output.WriteString(msg.Content)
outputMu.Unlock()
}
trySend(msgCh, msg)
},
onTurnDone: func(aborted bool) {
select {
case turnDone <- aborted:
default:
}
},
}
// Start reading stdout in background
readerDone := make(chan struct{})
go func() {
defer close(readerDone)
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
c.handleLine(line)
}
c.closeAllPending(fmt.Errorf("codex process exited"))
}()
// Drive the session lifecycle in a goroutine.
// Shutdown sequence: lifecycle goroutine closes stdin + cancels context →
// codex process exits → reader goroutine's scanner.Scan() returns false →
// readerDone closes → lifecycle goroutine collects final output and sends Result.
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
defer func() {
stdin.Close()
_ = cmd.Wait()
}()
startTime := time.Now()
finalStatus := "completed"
var finalError string
// 1. Initialize handshake
_, err := c.request(runCtx, "initialize", map[string]any{
"clientInfo": map[string]any{
"name": "multica-agent-sdk",
"title": "Multica Agent SDK",
"version": "0.2.0",
},
"capabilities": map[string]any{
"experimentalApi": true,
},
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("codex initialize failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
c.notify("initialized")
// 2. Start thread
threadResult, err := c.request(runCtx, "thread/start", map[string]any{
"model": nilIfEmpty(opts.Model),
"modelProvider": nil,
"profile": nil,
"cwd": opts.Cwd,
"approvalPolicy": nil,
"sandbox": "workspace-write",
"config": nil,
"baseInstructions": nil,
"developerInstructions": nilIfEmpty(opts.SystemPrompt),
"compactPrompt": nil,
"includeApplyPatchTool": nil,
"experimentalRawEvents": false,
"persistExtendedHistory": true,
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("codex thread/start failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
threadID := extractThreadID(threadResult)
if threadID == "" {
finalStatus = "failed"
finalError = "codex thread/start returned no thread ID"
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
c.threadID = threadID
b.cfg.Logger.Info("codex thread started", "thread_id", threadID)
// 3. Send turn and wait for completion
_, err = c.request(runCtx, "turn/start", map[string]any{
"threadId": threadID,
"input": []map[string]any{
{"type": "text", "text": prompt},
},
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("codex turn/start failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
// Wait for turn completion or context cancellation
select {
case aborted := <-turnDone:
if aborted {
finalStatus = "aborted"
finalError = "turn was aborted"
}
case <-runCtx.Done():
if runCtx.Err() == context.DeadlineExceeded {
finalStatus = "timeout"
finalError = fmt.Sprintf("codex timed out after %s", timeout)
} else {
finalStatus = "aborted"
finalError = "execution cancelled"
}
}
duration := time.Since(startTime)
b.cfg.Logger.Info("codex finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
// Close stdin and cancel context to signal the app-server to exit.
// Without this, the long-running codex process keeps stdout open and
// the reader goroutine blocks forever on scanner.Scan().
stdin.Close()
cancel()
// Wait for the reader goroutine to finish so all output is accumulated.
<-readerDone
outputMu.Lock()
finalOutput := output.String()
outputMu.Unlock()
resCh <- Result{
Status: finalStatus,
Output: finalOutput,
Error: finalError,
DurationMs: duration.Milliseconds(),
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
// ── codexClient: JSON-RPC 2.0 transport ──
type codexClient struct {
cfg Config
stdin interface{ Write([]byte) (int, error) }
mu sync.Mutex
nextID int
pending map[int]*pendingRPC
threadID string
turnID string
onMessage func(Message)
onTurnDone func(aborted bool)
notificationProtocol string // "unknown", "legacy", "raw"
turnStarted bool
completedTurnIDs map[string]bool
}
type pendingRPC struct {
ch chan rpcResult
method string
}
type rpcResult struct {
result json.RawMessage
err error
}
func (c *codexClient) request(ctx context.Context, method string, params any) (json.RawMessage, error) {
c.mu.Lock()
c.nextID++
id := c.nextID
pr := &pendingRPC{ch: make(chan rpcResult, 1), method: method}
c.pending[id] = pr
c.mu.Unlock()
msg := map[string]any{
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}
data, err := json.Marshal(msg)
if err != nil {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
return nil, err
}
data = append(data, '\n')
if _, err := c.stdin.Write(data); err != nil {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
return nil, fmt.Errorf("write %s: %w", method, err)
}
select {
case res := <-pr.ch:
return res.result, res.err
case <-ctx.Done():
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
return nil, ctx.Err()
}
}
func (c *codexClient) notify(method string) {
msg := map[string]any{
"jsonrpc": "2.0",
"method": method,
}
data, _ := json.Marshal(msg)
data = append(data, '\n')
_, _ = c.stdin.Write(data)
}
func (c *codexClient) respond(id int, result any) {
msg := map[string]any{
"jsonrpc": "2.0",
"id": id,
"result": result,
}
data, _ := json.Marshal(msg)
data = append(data, '\n')
_, _ = c.stdin.Write(data)
}
func (c *codexClient) closeAllPending(err error) {
c.mu.Lock()
defer c.mu.Unlock()
for id, pr := range c.pending {
pr.ch <- rpcResult{err: err}
delete(c.pending, id)
}
}
func (c *codexClient) handleLine(line string) {
var raw map[string]json.RawMessage
if err := json.Unmarshal([]byte(line), &raw); err != nil {
return
}
// Check if it's a response to our request
if _, hasID := raw["id"]; hasID {
if _, hasResult := raw["result"]; hasResult {
c.handleResponse(raw)
return
}
if _, hasError := raw["error"]; hasError {
c.handleResponse(raw)
return
}
// Server request (has id + method)
if _, hasMethod := raw["method"]; hasMethod {
c.handleServerRequest(raw)
return
}
}
// Notification (no id, has method)
if _, hasMethod := raw["method"]; hasMethod {
c.handleNotification(raw)
}
}
func (c *codexClient) handleResponse(raw map[string]json.RawMessage) {
var id int
if err := json.Unmarshal(raw["id"], &id); err != nil {
return
}
c.mu.Lock()
pr, ok := c.pending[id]
if ok {
delete(c.pending, id)
}
c.mu.Unlock()
if !ok {
return
}
if errData, hasErr := raw["error"]; hasErr {
var rpcErr struct {
Code int `json:"code"`
Message string `json:"message"`
}
_ = json.Unmarshal(errData, &rpcErr)
pr.ch <- rpcResult{err: fmt.Errorf("%s: %s (code=%d)", pr.method, rpcErr.Message, rpcErr.Code)}
} else {
pr.ch <- rpcResult{result: raw["result"]}
}
}
func (c *codexClient) handleServerRequest(raw map[string]json.RawMessage) {
var id int
_ = json.Unmarshal(raw["id"], &id)
var method string
_ = json.Unmarshal(raw["method"], &method)
// Auto-approve all exec/patch requests in daemon mode
switch method {
case "item/commandExecution/requestApproval", "execCommandApproval":
c.respond(id, map[string]any{"decision": "accept"})
case "item/fileChange/requestApproval", "applyPatchApproval":
c.respond(id, map[string]any{"decision": "accept"})
default:
c.respond(id, map[string]any{})
}
}
func (c *codexClient) handleNotification(raw map[string]json.RawMessage) {
var method string
_ = json.Unmarshal(raw["method"], &method)
var params map[string]any
if p, ok := raw["params"]; ok {
_ = json.Unmarshal(p, &params)
}
// Legacy codex/event notifications
if method == "codex/event" || strings.HasPrefix(method, "codex/event/") {
c.notificationProtocol = "legacy"
msgData, ok := params["msg"]
if !ok {
return
}
msgMap, ok := msgData.(map[string]any)
if !ok {
return
}
c.handleEvent(msgMap)
return
}
// Raw v2 notifications
if c.notificationProtocol != "legacy" {
if c.notificationProtocol == "unknown" &&
(method == "turn/started" || method == "turn/completed" ||
method == "thread/started" || strings.HasPrefix(method, "item/")) {
c.notificationProtocol = "raw"
}
if c.notificationProtocol == "raw" {
c.handleRawNotification(method, params)
}
}
}
func (c *codexClient) handleEvent(msg map[string]any) {
msgType, _ := msg["type"].(string)
switch msgType {
case "task_started":
c.turnStarted = true
if c.onMessage != nil {
c.onMessage(Message{Type: MessageStatus, Status: "running"})
}
case "agent_message":
text, _ := msg["message"].(string)
if text != "" && c.onMessage != nil {
c.onMessage(Message{Type: MessageText, Content: text})
}
case "exec_command_begin":
callID, _ := msg["call_id"].(string)
command, _ := msg["command"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "exec_command",
CallID: callID,
Input: map[string]any{"command": command},
})
}
case "exec_command_end":
callID, _ := msg["call_id"].(string)
output, _ := msg["output"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "exec_command",
CallID: callID,
Output: output,
})
}
case "patch_apply_begin":
callID, _ := msg["call_id"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "patch_apply",
CallID: callID,
})
}
case "patch_apply_end":
callID, _ := msg["call_id"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "patch_apply",
CallID: callID,
})
}
case "task_complete":
if c.onTurnDone != nil {
c.onTurnDone(false)
}
case "turn_aborted":
if c.onTurnDone != nil {
c.onTurnDone(true)
}
}
}
func (c *codexClient) handleRawNotification(method string, params map[string]any) {
switch method {
case "turn/started":
c.turnStarted = true
if turnID := extractNestedString(params, "turn", "id"); turnID != "" {
c.turnID = turnID
}
if c.onMessage != nil {
c.onMessage(Message{Type: MessageStatus, Status: "running"})
}
case "turn/completed":
turnID := extractNestedString(params, "turn", "id")
status := extractNestedString(params, "turn", "status")
aborted := status == "cancelled" || status == "canceled" ||
status == "aborted" || status == "interrupted"
if c.completedTurnIDs == nil {
c.completedTurnIDs = map[string]bool{}
}
if turnID != "" {
if c.completedTurnIDs[turnID] {
return
}
c.completedTurnIDs[turnID] = true
}
if c.onTurnDone != nil {
c.onTurnDone(aborted)
}
case "thread/status/changed":
statusType := extractNestedString(params, "status", "type")
if statusType == "idle" && c.turnStarted {
if c.onTurnDone != nil {
c.onTurnDone(false)
}
}
default:
if strings.HasPrefix(method, "item/") {
c.handleItemNotification(method, params)
}
}
}
func (c *codexClient) handleItemNotification(method string, params map[string]any) {
item, ok := params["item"].(map[string]any)
if !ok {
return
}
itemType, _ := item["type"].(string)
itemID, _ := item["id"].(string)
switch {
case method == "item/started" && itemType == "commandExecution":
command, _ := item["command"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "exec_command",
CallID: itemID,
Input: map[string]any{"command": command},
})
}
case method == "item/completed" && itemType == "commandExecution":
output, _ := item["aggregatedOutput"].(string)
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "exec_command",
CallID: itemID,
Output: output,
})
}
case method == "item/started" && itemType == "fileChange":
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolUse,
Tool: "patch_apply",
CallID: itemID,
})
}
case method == "item/completed" && itemType == "fileChange":
if c.onMessage != nil {
c.onMessage(Message{
Type: MessageToolResult,
Tool: "patch_apply",
CallID: itemID,
})
}
case method == "item/completed" && itemType == "agentMessage":
text, _ := item["text"].(string)
if text != "" && c.onMessage != nil {
c.onMessage(Message{Type: MessageText, Content: text})
}
phase, _ := item["phase"].(string)
if phase == "final_answer" && c.turnStarted {
if c.onTurnDone != nil {
c.onTurnDone(false)
}
}
}
}
// ── Helpers ──
func extractThreadID(result json.RawMessage) string {
var r struct {
Thread struct {
ID string `json:"id"`
} `json:"thread"`
}
if err := json.Unmarshal(result, &r); err != nil {
return ""
}
return r.Thread.ID
}
func extractNestedString(m map[string]any, keys ...string) string {
current := any(m)
for _, key := range keys {
obj, ok := current.(map[string]any)
if !ok {
return ""
}
current = obj[key]
}
s, _ := current.(string)
return s
}
func nilIfEmpty(s string) any {
if s == "" {
return nil
}
return s
}