Merge pull request #482 from multica-ai/agent/j/674c6839
feat(usage): add per-task token usage tracking
This commit is contained in:
commit
8403c97688
17 changed files with 644 additions and 15 deletions
|
|
@ -103,6 +103,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
|
|||
r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress)
|
||||
r.Post("/tasks/{taskId}/complete", h.CompleteTask)
|
||||
r.Post("/tasks/{taskId}/fail", h.FailTask)
|
||||
r.Post("/tasks/{taskId}/usage", h.ReportTaskUsage)
|
||||
r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages)
|
||||
r.Get("/tasks/{taskId}/messages", h.ListTaskMessages)
|
||||
})
|
||||
|
|
@ -221,6 +222,12 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
|
|||
})
|
||||
})
|
||||
|
||||
// Usage
|
||||
r.Route("/api/usage", func(r chi.Router) {
|
||||
r.Get("/daily", h.GetWorkspaceUsageByDay)
|
||||
r.Get("/summary", h.GetWorkspaceUsageSummary)
|
||||
})
|
||||
|
||||
// Runtimes
|
||||
r.Route("/api/runtimes", func(r chi.Router) {
|
||||
r.Get("/", h.ListAgentRuntimes)
|
||||
|
|
|
|||
|
|
@ -113,6 +113,15 @@ func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, s
|
|||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil)
|
||||
}
|
||||
|
||||
func (c *Client) ReportTaskUsage(ctx context.Context, taskID string, usage []TaskUsageEntry) error {
|
||||
if len(usage) == 0 {
|
||||
return nil
|
||||
}
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/usage", taskID), map[string]any{
|
||||
"usage": usage,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (c *Client) FailTask(ctx context.Context, taskID, errMsg string) error {
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{
|
||||
"error": errMsg,
|
||||
|
|
|
|||
|
|
@ -837,6 +837,13 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) {
|
|||
return
|
||||
}
|
||||
|
||||
// Report usage independently so it's captured even for failed/blocked tasks.
|
||||
if len(result.Usage) > 0 {
|
||||
if err := d.client.ReportTaskUsage(ctx, task.ID, result.Usage); err != nil {
|
||||
taskLog.Warn("report task usage failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
switch result.Status {
|
||||
case "blocked":
|
||||
if err := d.client.FailTask(ctx, task.ID, result.Comment); err != nil {
|
||||
|
|
@ -1105,6 +1112,22 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
|
|||
"tools", toolCount.Load(),
|
||||
)
|
||||
|
||||
// Convert agent usage map to task usage entries.
|
||||
var usageEntries []TaskUsageEntry
|
||||
for model, u := range result.Usage {
|
||||
if u.InputTokens == 0 && u.OutputTokens == 0 && u.CacheReadTokens == 0 && u.CacheWriteTokens == 0 {
|
||||
continue
|
||||
}
|
||||
usageEntries = append(usageEntries, TaskUsageEntry{
|
||||
Provider: provider,
|
||||
Model: model,
|
||||
InputTokens: u.InputTokens,
|
||||
OutputTokens: u.OutputTokens,
|
||||
CacheReadTokens: u.CacheReadTokens,
|
||||
CacheWriteTokens: u.CacheWriteTokens,
|
||||
})
|
||||
}
|
||||
|
||||
switch result.Status {
|
||||
case "completed":
|
||||
if result.Output == "" {
|
||||
|
|
@ -1115,6 +1138,7 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
|
|||
Comment: result.Output,
|
||||
SessionID: result.SessionID,
|
||||
WorkDir: env.WorkDir,
|
||||
Usage: usageEntries,
|
||||
}, nil
|
||||
case "timeout":
|
||||
return TaskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout)
|
||||
|
|
@ -1123,7 +1147,7 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
|
|||
if errMsg == "" {
|
||||
errMsg = fmt.Sprintf("%s execution %s", provider, result.Status)
|
||||
}
|
||||
return TaskResult{Status: "blocked", Comment: errMsg}, nil
|
||||
return TaskResult{Status: "blocked", Comment: errMsg, Usage: usageEntries}, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -56,12 +56,23 @@ type SkillFileData struct {
|
|||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
// TaskUsageEntry represents token usage for a single model during a task execution.
|
||||
type TaskUsageEntry struct {
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
// TaskResult is the outcome of executing a task.
|
||||
type TaskResult struct {
|
||||
Status string `json:"status"`
|
||||
Comment string `json:"comment"`
|
||||
BranchName string `json:"branch_name,omitempty"`
|
||||
EnvType string `json:"env_type,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
|
||||
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
|
||||
Status string `json:"status"`
|
||||
Comment string `json:"comment"`
|
||||
BranchName string `json:"branch_name,omitempty"`
|
||||
EnvType string `json:"env_type,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
|
||||
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
|
||||
Usage []TaskUsageEntry `json:"usage,omitempty"` // per-model token usage
|
||||
}
|
||||
|
|
|
|||
|
|
@ -368,6 +368,45 @@ func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
|
|||
writeJSON(w, http.StatusOK, taskToResponse(*task))
|
||||
}
|
||||
|
||||
// ReportTaskUsage stores per-task token usage. Called independently of
|
||||
// complete/fail so usage is captured even when tasks fail or are blocked.
|
||||
type TaskUsagePayload struct {
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
func (h *Handler) ReportTaskUsage(w http.ResponseWriter, r *http.Request) {
|
||||
taskID := chi.URLParam(r, "taskId")
|
||||
|
||||
var req struct {
|
||||
Usage []TaskUsagePayload `json:"usage"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
for _, u := range req.Usage {
|
||||
if err := h.Queries.UpsertTaskUsage(r.Context(), db.UpsertTaskUsageParams{
|
||||
TaskID: parseUUID(taskID),
|
||||
Provider: u.Provider,
|
||||
Model: u.Model,
|
||||
InputTokens: u.InputTokens,
|
||||
OutputTokens: u.OutputTokens,
|
||||
CacheReadTokens: u.CacheReadTokens,
|
||||
CacheWriteTokens: u.CacheWriteTokens,
|
||||
}); err != nil {
|
||||
slog.Warn("upsert task usage failed", "task_id", taskID, "model", u.Model, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
// GetTaskStatus returns the current status of a task.
|
||||
// Used by the daemon to check whether a task was cancelled mid-execution.
|
||||
func (h *Handler) GetTaskStatus(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
|||
|
|
@ -192,6 +192,96 @@ func (h *Handler) GetRuntimeTaskActivity(w http.ResponseWriter, r *http.Request)
|
|||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// GetWorkspaceUsageByDay returns daily token usage aggregated by model for the workspace.
|
||||
func (h *Handler) GetWorkspaceUsageByDay(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
since := parseSinceParam(r, 30)
|
||||
|
||||
rows, err := h.Queries.GetWorkspaceUsageByDay(r.Context(), db.GetWorkspaceUsageByDayParams{
|
||||
WorkspaceID: parseUUID(workspaceID),
|
||||
Since: since,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to get usage")
|
||||
return
|
||||
}
|
||||
|
||||
type DailyUsageRow struct {
|
||||
Date string `json:"date"`
|
||||
Model string `json:"model"`
|
||||
TotalInputTokens int64 `json:"total_input_tokens"`
|
||||
TotalOutputTokens int64 `json:"total_output_tokens"`
|
||||
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
||||
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
resp := make([]DailyUsageRow, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = DailyUsageRow{
|
||||
Date: row.Date.Time.Format("2006-01-02"),
|
||||
Model: row.Model,
|
||||
TotalInputTokens: row.TotalInputTokens,
|
||||
TotalOutputTokens: row.TotalOutputTokens,
|
||||
TotalCacheReadTokens: row.TotalCacheReadTokens,
|
||||
TotalCacheWriteTokens: row.TotalCacheWriteTokens,
|
||||
TaskCount: row.TaskCount,
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// GetWorkspaceUsageSummary returns total token usage aggregated by model for the workspace.
|
||||
func (h *Handler) GetWorkspaceUsageSummary(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
since := parseSinceParam(r, 30)
|
||||
|
||||
rows, err := h.Queries.GetWorkspaceUsageSummary(r.Context(), db.GetWorkspaceUsageSummaryParams{
|
||||
WorkspaceID: parseUUID(workspaceID),
|
||||
Since: since,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to get usage summary")
|
||||
return
|
||||
}
|
||||
|
||||
type UsageSummaryRow struct {
|
||||
Model string `json:"model"`
|
||||
TotalInputTokens int64 `json:"total_input_tokens"`
|
||||
TotalOutputTokens int64 `json:"total_output_tokens"`
|
||||
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
||||
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
resp := make([]UsageSummaryRow, len(rows))
|
||||
for i, row := range rows {
|
||||
resp[i] = UsageSummaryRow{
|
||||
Model: row.Model,
|
||||
TotalInputTokens: row.TotalInputTokens,
|
||||
TotalOutputTokens: row.TotalOutputTokens,
|
||||
TotalCacheReadTokens: row.TotalCacheReadTokens,
|
||||
TotalCacheWriteTokens: row.TotalCacheWriteTokens,
|
||||
TaskCount: row.TaskCount,
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// parseSinceParam parses the "days" query parameter and returns a timestamptz.
|
||||
func parseSinceParam(r *http.Request, defaultDays int) pgtype.Timestamptz {
|
||||
days := defaultDays
|
||||
if d := r.URL.Query().Get("days"); d != "" {
|
||||
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
|
||||
days = parsed
|
||||
}
|
||||
}
|
||||
t := time.Now().AddDate(0, 0, -days)
|
||||
return pgtype.Timestamptz{Time: t, Valid: true}
|
||||
}
|
||||
|
||||
func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
|
||||
|
|
|
|||
1
server/migrations/032_task_usage.down.sql
Normal file
1
server/migrations/032_task_usage.down.sql
Normal file
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE IF EXISTS task_usage;
|
||||
14
server/migrations/032_task_usage.up.sql
Normal file
14
server/migrations/032_task_usage.up.sql
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
CREATE TABLE task_usage (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
task_id UUID NOT NULL REFERENCES agent_task_queue(id) ON DELETE CASCADE,
|
||||
provider TEXT NOT NULL DEFAULT '',
|
||||
model TEXT NOT NULL,
|
||||
input_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
output_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (task_id, provider, model)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_task_usage_task_id ON task_usage(task_id);
|
||||
|
|
@ -62,6 +62,14 @@ type Message struct {
|
|||
Level string // log level (Log)
|
||||
}
|
||||
|
||||
// TokenUsage tracks token consumption for a single model.
|
||||
type TokenUsage struct {
|
||||
InputTokens int64
|
||||
OutputTokens int64
|
||||
CacheReadTokens int64
|
||||
CacheWriteTokens int64
|
||||
}
|
||||
|
||||
// Result is the final outcome after an agent session completes.
|
||||
type Result struct {
|
||||
Status string // "completed", "failed", "aborted", "timeout"
|
||||
|
|
@ -69,6 +77,7 @@ type Result struct {
|
|||
Error string // error message if failed
|
||||
DurationMs int64
|
||||
SessionID string
|
||||
Usage map[string]TokenUsage // keyed by model name
|
||||
}
|
||||
|
||||
// Config configures a Backend instance.
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
|||
var sessionID string
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
usage := make(map[string]TokenUsage)
|
||||
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
|
@ -108,7 +109,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
|||
|
||||
switch msg.Type {
|
||||
case "assistant":
|
||||
b.handleAssistant(msg, msgCh, &output)
|
||||
b.handleAssistant(msg, msgCh, &output, usage)
|
||||
case "user":
|
||||
b.handleUser(msg, msgCh)
|
||||
case "system":
|
||||
|
|
@ -162,18 +163,29 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
|||
Error: finalError,
|
||||
DurationMs: duration.Milliseconds(),
|
||||
SessionID: sessionID,
|
||||
Usage: usage,
|
||||
}
|
||||
}()
|
||||
|
||||
return &Session{Messages: msgCh, Result: resCh}, nil
|
||||
}
|
||||
|
||||
func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message, output *strings.Builder) {
|
||||
func (b *claudeBackend) handleAssistant(msg claudeSDKMessage, ch chan<- Message, output *strings.Builder, usage map[string]TokenUsage) {
|
||||
var content claudeMessageContent
|
||||
if err := json.Unmarshal(msg.Message, &content); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Accumulate token usage per model.
|
||||
if content.Usage != nil && content.Model != "" {
|
||||
u := usage[content.Model]
|
||||
u.InputTokens += content.Usage.InputTokens
|
||||
u.OutputTokens += content.Usage.OutputTokens
|
||||
u.CacheReadTokens += content.Usage.CacheReadInputTokens
|
||||
u.CacheWriteTokens += content.Usage.CacheCreationInputTokens
|
||||
usage[content.Model] = u
|
||||
}
|
||||
|
||||
for _, block := range content.Content {
|
||||
switch block.Type {
|
||||
case "text":
|
||||
|
|
@ -287,8 +299,17 @@ type claudeLogEntry struct {
|
|||
}
|
||||
|
||||
type claudeMessageContent struct {
|
||||
Role string `json:"role"`
|
||||
Role string `json:"role"`
|
||||
Model string `json:"model"`
|
||||
Content []claudeContentBlock `json:"content"`
|
||||
Usage *claudeUsage `json:"usage,omitempty"`
|
||||
}
|
||||
|
||||
type claudeUsage struct {
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
|
||||
CacheCreationInputTokens int64 `json:"cache_creation_input_tokens"`
|
||||
}
|
||||
|
||||
type claudeContentBlock struct {
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ func TestClaudeHandleAssistantText(t *testing.T) {
|
|||
}),
|
||||
}
|
||||
|
||||
b.handleAssistant(msg, ch, &output)
|
||||
b.handleAssistant(msg, ch, &output, make(map[string]TokenUsage))
|
||||
|
||||
if output.String() != "Hello world" {
|
||||
t.Fatalf("expected output 'Hello world', got %q", output.String())
|
||||
|
|
@ -62,7 +62,7 @@ func TestClaudeHandleAssistantToolUse(t *testing.T) {
|
|||
}),
|
||||
}
|
||||
|
||||
b.handleAssistant(msg, ch, &output)
|
||||
b.handleAssistant(msg, ch, &output, make(map[string]TokenUsage))
|
||||
|
||||
if output.String() != "" {
|
||||
t.Fatalf("tool_use should not add to output, got %q", output.String())
|
||||
|
|
@ -162,7 +162,7 @@ func TestClaudeHandleAssistantInvalidJSON(t *testing.T) {
|
|||
}
|
||||
|
||||
// Should not panic
|
||||
b.handleAssistant(msg, ch, &output)
|
||||
b.handleAssistant(msg, ch, &output, make(map[string]TokenUsage))
|
||||
|
||||
if output.String() != "" {
|
||||
t.Fatalf("expected empty output for invalid JSON, got %q", output.String())
|
||||
|
|
|
|||
|
|
@ -220,11 +220,25 @@ func (b *codexBackend) Execute(ctx context.Context, prompt string, opts ExecOpti
|
|||
finalOutput := output.String()
|
||||
outputMu.Unlock()
|
||||
|
||||
// Build usage map from accumulated codex usage.
|
||||
var usageMap map[string]TokenUsage
|
||||
c.usageMu.Lock()
|
||||
u := c.usage
|
||||
c.usageMu.Unlock()
|
||||
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
|
||||
model := opts.Model
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
usageMap = map[string]TokenUsage{model: u}
|
||||
}
|
||||
|
||||
resCh <- Result{
|
||||
Status: finalStatus,
|
||||
Output: finalOutput,
|
||||
Error: finalError,
|
||||
DurationMs: duration.Milliseconds(),
|
||||
Usage: usageMap,
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -247,6 +261,9 @@ type codexClient struct {
|
|||
notificationProtocol string // "unknown", "legacy", "raw"
|
||||
turnStarted bool
|
||||
completedTurnIDs map[string]bool
|
||||
|
||||
usageMu sync.Mutex
|
||||
usage TokenUsage // accumulated from turn events
|
||||
}
|
||||
|
||||
type pendingRPC struct {
|
||||
|
|
@ -498,6 +515,8 @@ func (c *codexClient) handleEvent(msg map[string]any) {
|
|||
})
|
||||
}
|
||||
case "task_complete":
|
||||
// Extract usage from legacy task_complete if present.
|
||||
c.extractUsageFromMap(msg)
|
||||
if c.onTurnDone != nil {
|
||||
c.onTurnDone(false)
|
||||
}
|
||||
|
|
@ -535,6 +554,11 @@ func (c *codexClient) handleRawNotification(method string, params map[string]any
|
|||
c.completedTurnIDs[turnID] = true
|
||||
}
|
||||
|
||||
// Extract usage from turn/completed if present (e.g. params.turn.usage).
|
||||
if turn, ok := params["turn"].(map[string]any); ok {
|
||||
c.extractUsageFromMap(turn)
|
||||
}
|
||||
|
||||
if c.onTurnDone != nil {
|
||||
c.onTurnDone(aborted)
|
||||
}
|
||||
|
|
@ -618,6 +642,48 @@ func (c *codexClient) handleItemNotification(method string, params map[string]an
|
|||
}
|
||||
}
|
||||
|
||||
// extractUsageFromMap extracts token usage from a map that may contain
|
||||
// "usage", "token_usage", or "tokens" fields. Handles various Codex formats.
|
||||
func (c *codexClient) extractUsageFromMap(data map[string]any) {
|
||||
// Try common field names for usage data.
|
||||
var usageMap map[string]any
|
||||
for _, key := range []string{"usage", "token_usage", "tokens"} {
|
||||
if v, ok := data[key].(map[string]any); ok {
|
||||
usageMap = v
|
||||
break
|
||||
}
|
||||
}
|
||||
if usageMap == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.usageMu.Lock()
|
||||
defer c.usageMu.Unlock()
|
||||
|
||||
// Try various key conventions.
|
||||
c.usage.InputTokens += codexInt64(usageMap, "input_tokens", "input", "prompt_tokens")
|
||||
c.usage.OutputTokens += codexInt64(usageMap, "output_tokens", "output", "completion_tokens")
|
||||
c.usage.CacheReadTokens += codexInt64(usageMap, "cache_read_tokens", "cache_read_input_tokens")
|
||||
c.usage.CacheWriteTokens += codexInt64(usageMap, "cache_write_tokens", "cache_creation_input_tokens")
|
||||
}
|
||||
|
||||
// codexInt64 returns the first non-zero int64 value from the map for the given keys.
|
||||
func codexInt64(m map[string]any, keys ...string) int64 {
|
||||
for _, key := range keys {
|
||||
switch v := m[key].(type) {
|
||||
case float64:
|
||||
if v != 0 {
|
||||
return int64(v)
|
||||
}
|
||||
case int64:
|
||||
if v != 0 {
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// ── Helpers ──
|
||||
|
||||
func extractThreadID(result json.RawMessage) string {
|
||||
|
|
|
|||
|
|
@ -96,12 +96,25 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
|||
|
||||
b.cfg.Logger.Info("openclaw finished", "pid", cmd.Process.Pid, "status", scanResult.status, "duration", duration.Round(time.Millisecond).String())
|
||||
|
||||
// Build usage map. OpenClaw doesn't report model per-step, so we
|
||||
// attribute all usage to the configured model (or "unknown").
|
||||
var usage map[string]TokenUsage
|
||||
u := scanResult.usage
|
||||
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
|
||||
model := opts.Model
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
usage = map[string]TokenUsage{model: u}
|
||||
}
|
||||
|
||||
resCh <- Result{
|
||||
Status: scanResult.status,
|
||||
Output: scanResult.output,
|
||||
Error: scanResult.errMsg,
|
||||
DurationMs: duration.Milliseconds(),
|
||||
SessionID: scanResult.sessionID,
|
||||
Usage: usage,
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -116,6 +129,7 @@ type openclawEventResult struct {
|
|||
errMsg string
|
||||
output string
|
||||
sessionID string
|
||||
usage TokenUsage
|
||||
}
|
||||
|
||||
// processEvents reads NDJSON lines from r, dispatches events to ch, and returns
|
||||
|
|
@ -123,6 +137,7 @@ type openclawEventResult struct {
|
|||
func (b *openclawBackend) processEvents(r io.Reader, ch chan<- Message) openclawEventResult {
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
|
||||
|
|
@ -160,7 +175,13 @@ func (b *openclawBackend) processEvents(r io.Reader, ch chan<- Message) openclaw
|
|||
case "step_start":
|
||||
trySend(ch, Message{Type: MessageStatus, Status: "running"})
|
||||
case "step_end":
|
||||
// Captures final session ID from step_end if present.
|
||||
// Accumulate token usage from step_end events if present.
|
||||
if event.Data != nil {
|
||||
usage.InputTokens += openclawInt64(event.Data, "inputTokens")
|
||||
usage.OutputTokens += openclawInt64(event.Data, "outputTokens")
|
||||
usage.CacheReadTokens += openclawInt64(event.Data, "cacheReadTokens")
|
||||
usage.CacheWriteTokens += openclawInt64(event.Data, "cacheWriteTokens")
|
||||
}
|
||||
case "result":
|
||||
// The result event only updates status on explicit failure. A
|
||||
// "completed" result is a no-op because finalStatus defaults to
|
||||
|
|
@ -193,6 +214,24 @@ func (b *openclawBackend) processEvents(r io.Reader, ch chan<- Message) openclaw
|
|||
errMsg: finalError,
|
||||
output: output.String(),
|
||||
sessionID: sessionID,
|
||||
usage: usage,
|
||||
}
|
||||
}
|
||||
|
||||
// openclawInt64 safely extracts an int64 from a JSON-decoded map value (which
|
||||
// may be float64 due to Go's JSON number handling).
|
||||
func openclawInt64(data map[string]any, key string) int64 {
|
||||
v, ok := data[key]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
switch n := v.(type) {
|
||||
case float64:
|
||||
return int64(n)
|
||||
case int64:
|
||||
return n
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -99,12 +99,25 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
|||
|
||||
b.cfg.Logger.Info("opencode finished", "pid", cmd.Process.Pid, "status", scanResult.status, "duration", duration.Round(time.Millisecond).String())
|
||||
|
||||
// Build usage map. OpenCode doesn't report model per-step, so we
|
||||
// attribute all usage to the configured model (or "unknown").
|
||||
var usage map[string]TokenUsage
|
||||
u := scanResult.usage
|
||||
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
|
||||
model := opts.Model
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
usage = map[string]TokenUsage{model: u}
|
||||
}
|
||||
|
||||
resCh <- Result{
|
||||
Status: scanResult.status,
|
||||
Output: scanResult.output,
|
||||
Error: scanResult.errMsg,
|
||||
DurationMs: duration.Milliseconds(),
|
||||
SessionID: scanResult.sessionID,
|
||||
Usage: usage,
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -119,6 +132,7 @@ type eventResult struct {
|
|||
errMsg string
|
||||
output string
|
||||
sessionID string
|
||||
usage TokenUsage // accumulated token usage across all steps
|
||||
}
|
||||
|
||||
// processEvents reads JSON lines from r, dispatches events to ch, and returns
|
||||
|
|
@ -126,6 +140,7 @@ type eventResult struct {
|
|||
func (b *opencodeBackend) processEvents(r io.Reader, ch chan<- Message) eventResult {
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
|
||||
|
|
@ -157,7 +172,15 @@ func (b *opencodeBackend) processEvents(r io.Reader, ch chan<- Message) eventRes
|
|||
case "step_start":
|
||||
trySend(ch, Message{Type: MessageStatus, Status: "running"})
|
||||
case "step_finish":
|
||||
// Captures final session ID from step_finish if present.
|
||||
// Accumulate token usage from step_finish events.
|
||||
if t := event.Part.Tokens; t != nil {
|
||||
usage.InputTokens += t.Input
|
||||
usage.OutputTokens += t.Output
|
||||
if t.Cache != nil {
|
||||
usage.CacheReadTokens += t.Cache.Read
|
||||
usage.CacheWriteTokens += t.Cache.Write
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -175,6 +198,7 @@ func (b *opencodeBackend) processEvents(r io.Reader, ch chan<- Message) eventRes
|
|||
errMsg: finalError,
|
||||
output: output.String(),
|
||||
sessionID: sessionID,
|
||||
usage: usage,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -281,6 +305,21 @@ type opencodeEventPart struct {
|
|||
Tool string `json:"tool,omitempty"`
|
||||
CallID string `json:"callID,omitempty"`
|
||||
State *opencodeToolState `json:"state,omitempty"`
|
||||
|
||||
// step_finish token usage
|
||||
Tokens *opencodeTokens `json:"tokens,omitempty"`
|
||||
}
|
||||
|
||||
// opencodeTokens represents token usage in a step_finish event.
|
||||
type opencodeTokens struct {
|
||||
Input int64 `json:"input"`
|
||||
Output int64 `json:"output"`
|
||||
Cache *opencodeCacheTokens `json:"cache,omitempty"`
|
||||
}
|
||||
|
||||
type opencodeCacheTokens struct {
|
||||
Read int64 `json:"read"`
|
||||
Write int64 `json:"write"`
|
||||
}
|
||||
|
||||
// opencodeToolState represents the state of a tool invocation.
|
||||
|
|
|
|||
|
|
@ -279,6 +279,18 @@ type TaskMessage struct {
|
|||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type TaskUsage struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type User struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
|
|
|||
201
server/pkg/db/generated/task_usage.sql.go
Normal file
201
server/pkg/db/generated/task_usage.sql.go
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: task_usage.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const getTaskUsage = `-- name: GetTaskUsage :many
|
||||
SELECT id, task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at FROM task_usage
|
||||
WHERE task_id = $1
|
||||
ORDER BY model
|
||||
`
|
||||
|
||||
func (q *Queries) GetTaskUsage(ctx context.Context, taskID pgtype.UUID) ([]TaskUsage, error) {
|
||||
rows, err := q.db.Query(ctx, getTaskUsage, taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []TaskUsage{}
|
||||
for rows.Next() {
|
||||
var i TaskUsage
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.TaskID,
|
||||
&i.Provider,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getWorkspaceUsageByDay = `-- name: GetWorkspaceUsageByDay :many
|
||||
SELECT
|
||||
DATE(atq.created_at) AS date,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS total_input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS total_output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.created_at >= $2::timestamptz
|
||||
GROUP BY DATE(atq.created_at), tu.model
|
||||
ORDER BY DATE(atq.created_at) DESC, tu.model
|
||||
`
|
||||
|
||||
type GetWorkspaceUsageByDayParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
}
|
||||
|
||||
type GetWorkspaceUsageByDayRow struct {
|
||||
Date pgtype.Date `json:"date"`
|
||||
Model string `json:"model"`
|
||||
TotalInputTokens int64 `json:"total_input_tokens"`
|
||||
TotalOutputTokens int64 `json:"total_output_tokens"`
|
||||
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
||||
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetWorkspaceUsageByDay(ctx context.Context, arg GetWorkspaceUsageByDayParams) ([]GetWorkspaceUsageByDayRow, error) {
|
||||
rows, err := q.db.Query(ctx, getWorkspaceUsageByDay, arg.WorkspaceID, arg.Since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []GetWorkspaceUsageByDayRow{}
|
||||
for rows.Next() {
|
||||
var i GetWorkspaceUsageByDayRow
|
||||
if err := rows.Scan(
|
||||
&i.Date,
|
||||
&i.Model,
|
||||
&i.TotalInputTokens,
|
||||
&i.TotalOutputTokens,
|
||||
&i.TotalCacheReadTokens,
|
||||
&i.TotalCacheWriteTokens,
|
||||
&i.TaskCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getWorkspaceUsageSummary = `-- name: GetWorkspaceUsageSummary :many
|
||||
SELECT
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS total_input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS total_output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.created_at >= $2::timestamptz
|
||||
GROUP BY tu.model
|
||||
ORDER BY (SUM(tu.input_tokens) + SUM(tu.output_tokens)) DESC
|
||||
`
|
||||
|
||||
type GetWorkspaceUsageSummaryParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
}
|
||||
|
||||
type GetWorkspaceUsageSummaryRow struct {
|
||||
Model string `json:"model"`
|
||||
TotalInputTokens int64 `json:"total_input_tokens"`
|
||||
TotalOutputTokens int64 `json:"total_output_tokens"`
|
||||
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
||||
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
||||
TaskCount int32 `json:"task_count"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetWorkspaceUsageSummary(ctx context.Context, arg GetWorkspaceUsageSummaryParams) ([]GetWorkspaceUsageSummaryRow, error) {
|
||||
rows, err := q.db.Query(ctx, getWorkspaceUsageSummary, arg.WorkspaceID, arg.Since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []GetWorkspaceUsageSummaryRow{}
|
||||
for rows.Next() {
|
||||
var i GetWorkspaceUsageSummaryRow
|
||||
if err := rows.Scan(
|
||||
&i.Model,
|
||||
&i.TotalInputTokens,
|
||||
&i.TotalOutputTokens,
|
||||
&i.TotalCacheReadTokens,
|
||||
&i.TotalCacheWriteTokens,
|
||||
&i.TaskCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const upsertTaskUsage = `-- name: UpsertTaskUsage :exec
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (task_id, provider, model)
|
||||
DO UPDATE SET
|
||||
input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens
|
||||
`
|
||||
|
||||
type UpsertTaskUsageParams struct {
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpsertTaskUsage(ctx context.Context, arg UpsertTaskUsageParams) error {
|
||||
_, err := q.db.Exec(ctx, upsertTaskUsage,
|
||||
arg.TaskID,
|
||||
arg.Provider,
|
||||
arg.Model,
|
||||
arg.InputTokens,
|
||||
arg.OutputTokens,
|
||||
arg.CacheReadTokens,
|
||||
arg.CacheWriteTokens,
|
||||
)
|
||||
return err
|
||||
}
|
||||
47
server/pkg/db/queries/task_usage.sql
Normal file
47
server/pkg/db/queries/task_usage.sql
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
-- name: UpsertTaskUsage :exec
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (task_id, provider, model)
|
||||
DO UPDATE SET
|
||||
input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens;
|
||||
|
||||
-- name: GetTaskUsage :many
|
||||
SELECT * FROM task_usage
|
||||
WHERE task_id = $1
|
||||
ORDER BY model;
|
||||
|
||||
-- name: GetWorkspaceUsageByDay :many
|
||||
SELECT
|
||||
DATE(atq.created_at) AS date,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS total_input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS total_output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.created_at >= @since::timestamptz
|
||||
GROUP BY DATE(atq.created_at), tu.model
|
||||
ORDER BY DATE(atq.created_at) DESC, tu.model;
|
||||
|
||||
-- name: GetWorkspaceUsageSummary :many
|
||||
SELECT
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS total_input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS total_output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS total_cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS total_cache_write_tokens,
|
||||
COUNT(DISTINCT tu.task_id)::int AS task_count
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
JOIN agent a ON a.id = atq.agent_id
|
||||
WHERE a.workspace_id = $1
|
||||
AND atq.created_at >= @since::timestamptz
|
||||
GROUP BY tu.model
|
||||
ORDER BY (SUM(tu.input_tokens) + SUM(tu.output_tokens)) DESC;
|
||||
Loading…
Add table
Add a link
Reference in a new issue