multica/server/pkg/agent/openclaw.go
Jiang Bohan fa0c0fe747 fix(usage): address review feedback — independent usage reporting + all providers
1. Separate ReportTaskUsage endpoint (POST /api/daemon/tasks/{id}/usage)
   so usage is captured independently of complete/fail — fixes usage loss
   for failed/blocked tasks.

2. Add usage tracking for all four providers:
   - Claude: already done (stream-json message.usage)
   - OpenCode: extract from step_finish.part.tokens
   - OpenClaw: extract from step_end.data token fields
   - Codex: extract from turn/completed and task_complete usage fields

3. Remove usage from CompleteTask payload — all usage goes through the
   dedicated endpoint now.
2026-04-08 13:23:54 +08:00

352 lines
9.7 KiB
Go

package agent
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os/exec"
"strings"
"time"
)
// openclawBackend implements Backend by spawning `openclaw agent -p <prompt>
// --output-format stream-json --yes` and reading streaming NDJSON events from
// stdout — similar to the opencode backend.
type openclawBackend struct {
cfg Config
}
func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
execPath := b.cfg.ExecutablePath
if execPath == "" {
execPath = "openclaw"
}
if _, err := exec.LookPath(execPath); err != nil {
return nil, fmt.Errorf("openclaw executable not found at %q: %w", execPath, err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
args := []string{"agent", "--output-format", "stream-json", "--yes"}
if opts.Model != "" {
args = append(args, "--model", opts.Model)
}
if opts.SystemPrompt != "" {
args = append(args, "--system-prompt", opts.SystemPrompt)
}
if opts.MaxTurns > 0 {
args = append(args, "--max-turns", fmt.Sprintf("%d", opts.MaxTurns))
}
if opts.ResumeSessionID != "" {
args = append(args, "--session", opts.ResumeSessionID)
}
args = append(args, "-p", prompt)
cmd := exec.CommandContext(runCtx, execPath, args...)
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
cmd.Env = buildEnv(b.cfg.Env)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("openclaw stdout pipe: %w", err)
}
cmd.Stderr = newLogWriter(b.cfg.Logger, "[openclaw:stderr] ")
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("start openclaw: %w", err)
}
b.cfg.Logger.Info("openclaw started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model)
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
startTime := time.Now()
scanResult := b.processEvents(stdout, msgCh)
// Wait for process exit.
exitErr := cmd.Wait()
duration := time.Since(startTime)
if runCtx.Err() == context.DeadlineExceeded {
scanResult.status = "timeout"
scanResult.errMsg = fmt.Sprintf("openclaw timed out after %s", timeout)
} else if runCtx.Err() == context.Canceled {
scanResult.status = "aborted"
scanResult.errMsg = "execution cancelled"
} else if exitErr != nil && scanResult.status == "completed" {
scanResult.status = "failed"
scanResult.errMsg = fmt.Sprintf("openclaw exited with error: %v", exitErr)
}
b.cfg.Logger.Info("openclaw finished", "pid", cmd.Process.Pid, "status", scanResult.status, "duration", duration.Round(time.Millisecond).String())
// Build usage map. OpenClaw doesn't report model per-step, so we
// attribute all usage to the configured model (or "unknown").
var usage map[string]TokenUsage
u := scanResult.usage
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
model := opts.Model
if model == "" {
model = "unknown"
}
usage = map[string]TokenUsage{model: u}
}
resCh <- Result{
Status: scanResult.status,
Output: scanResult.output,
Error: scanResult.errMsg,
DurationMs: duration.Milliseconds(),
SessionID: scanResult.sessionID,
Usage: usage,
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
// ── Event handlers ──
// openclawEventResult holds accumulated state from processing the event stream.
type openclawEventResult struct {
status string
errMsg string
output string
sessionID string
usage TokenUsage
}
// processEvents reads NDJSON lines from r, dispatches events to ch, and returns
// the accumulated result.
func (b *openclawBackend) processEvents(r io.Reader, ch chan<- Message) openclawEventResult {
var output strings.Builder
var sessionID string
var usage TokenUsage
finalStatus := "completed"
var finalError string
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var event openclawEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
continue
}
if event.SessionID != "" {
sessionID = event.SessionID
}
switch event.Type {
case "text":
b.handleOCTextEvent(event, ch, &output)
case "thinking":
b.handleOCThinkingEvent(event, ch)
case "tool_call":
b.handleOCToolCallEvent(event, ch)
case "error":
// NOTE: error events unconditionally set finalStatus to "failed" and
// it stays sticky — subsequent text or result events won't revert it.
// This is intentional: once an error fires, the session is considered
// failed regardless of later events.
b.handleOCErrorEvent(event, ch, &finalStatus, &finalError)
case "step_start":
trySend(ch, Message{Type: MessageStatus, Status: "running"})
case "step_end":
// Accumulate token usage from step_end events if present.
if event.Data != nil {
usage.InputTokens += openclawInt64(event.Data, "inputTokens")
usage.OutputTokens += openclawInt64(event.Data, "outputTokens")
usage.CacheReadTokens += openclawInt64(event.Data, "cacheReadTokens")
usage.CacheWriteTokens += openclawInt64(event.Data, "cacheWriteTokens")
}
case "result":
// The result event only updates status on explicit failure. A
// "completed" result is a no-op because finalStatus defaults to
// "completed". Any unrecognized status (e.g. "partial") is also
// treated as success — update this if OpenClaw adds new statuses.
if event.Data != nil {
if s, ok := event.Data["status"].(string); ok && s != "" {
if s == "error" || s == "failed" {
finalStatus = "failed"
if msg, ok := event.Data["error"].(string); ok {
finalError = msg
}
}
}
}
}
}
// Check for scanner errors (e.g. broken pipe, read errors).
if scanErr := scanner.Err(); scanErr != nil {
b.cfg.Logger.Warn("openclaw stdout scanner error", "error", scanErr)
if finalStatus == "completed" {
finalStatus = "failed"
finalError = fmt.Sprintf("stdout read error: %v", scanErr)
}
}
return openclawEventResult{
status: finalStatus,
errMsg: finalError,
output: output.String(),
sessionID: sessionID,
usage: usage,
}
}
// openclawInt64 safely extracts an int64 from a JSON-decoded map value (which
// may be float64 due to Go's JSON number handling).
func openclawInt64(data map[string]any, key string) int64 {
v, ok := data[key]
if !ok {
return 0
}
switch n := v.(type) {
case float64:
return int64(n)
case int64:
return n
default:
return 0
}
}
func (b *openclawBackend) handleOCTextEvent(event openclawEvent, ch chan<- Message, output *strings.Builder) {
text := openclawExtractText(event.Data)
if text != "" {
output.WriteString(text)
trySend(ch, Message{Type: MessageText, Content: text})
}
}
func (b *openclawBackend) handleOCThinkingEvent(event openclawEvent, ch chan<- Message) {
text := openclawExtractText(event.Data)
if text != "" {
trySend(ch, Message{Type: MessageThinking, Content: text})
}
}
// handleOCToolCallEvent processes "tool_call" events from OpenClaw. A single
// tool_call event may contain both the call and result when the tool has
// completed (status == "completed").
func (b *openclawBackend) handleOCToolCallEvent(event openclawEvent, ch chan<- Message) {
if event.Data == nil {
return
}
name, _ := event.Data["name"].(string)
callID, _ := event.Data["callId"].(string)
// Extract input.
var input map[string]any
if raw, ok := event.Data["input"]; ok {
if m, ok := raw.(map[string]any); ok {
input = m
}
}
// Emit the tool-use message.
trySend(ch, Message{
Type: MessageToolUse,
Tool: name,
CallID: callID,
Input: input,
})
// If the tool has completed, also emit a tool-result message.
status, _ := event.Data["status"].(string)
if status == "completed" {
outputStr := extractToolOutput(event.Data["output"])
trySend(ch, Message{
Type: MessageToolResult,
Tool: name,
CallID: callID,
Output: outputStr,
})
}
}
func (b *openclawBackend) handleOCErrorEvent(event openclawEvent, ch chan<- Message, finalStatus, finalError *string) {
errMsg := ""
if event.Data != nil {
if msg, ok := event.Data["message"].(string); ok {
errMsg = msg
}
if errMsg == "" {
if code, ok := event.Data["code"].(string); ok {
errMsg = code
}
}
}
if errMsg == "" {
errMsg = "unknown openclaw error"
}
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
trySend(ch, Message{Type: MessageError, Content: errMsg})
*finalStatus = "failed"
*finalError = errMsg
}
// openclawExtractText extracts text content from an openclaw event data map.
// Supports both flat {"text": "..."} and nested {"content": {"text": "..."}} layouts.
func openclawExtractText(data map[string]any) string {
if data == nil {
return ""
}
// Try "text" field directly.
if text, ok := data["text"].(string); ok {
return text
}
// Try nested "content.text".
if content, ok := data["content"].(map[string]any); ok {
if text, ok := content["text"].(string); ok {
return text
}
}
return ""
}
// ── JSON types for `openclaw agent --output-format stream-json` stdout events ──
// openclawEvent represents a single NDJSON line from OpenClaw's stream-json output.
//
// Event types:
//
// "step_start" — agent step begins
// "text" — text output from agent
// "thinking" — model reasoning/thinking
// "tool_call" — tool invocation with call and result
// "error" — error from openclaw
// "step_end" — agent step completes
// "result" — final result with status
type openclawEvent struct {
Type string `json:"type"`
SessionID string `json:"sessionId,omitempty"`
Data map[string]any `json:"data,omitempty"`
}