diff --git a/Makefile b/Makefile index 811c8637..8d5811e3 100644 --- a/Makefile +++ b/Makefile @@ -95,6 +95,10 @@ check-main: @ENV_FILE=$(MAIN_ENV_FILE) bash scripts/check.sh setup-worktree: + @if [ ! -f "$(WORKTREE_ENV_FILE)" ]; then \ + echo "==> No $(WORKTREE_ENV_FILE) found, generating..."; \ + bash scripts/init-worktree-env.sh $(WORKTREE_ENV_FILE); \ + fi @$(MAKE) setup ENV_FILE=$(WORKTREE_ENV_FILE) start-worktree: diff --git a/apps/web/app/(dashboard)/_components/app-sidebar.tsx b/apps/web/app/(dashboard)/_components/app-sidebar.tsx index c3caca71..b703e359 100644 --- a/apps/web/app/(dashboard)/_components/app-sidebar.tsx +++ b/apps/web/app/(dashboard)/_components/app-sidebar.tsx @@ -6,6 +6,7 @@ import { Inbox, ListTodo, Bot, + Monitor, BookOpen, ChevronDown, Settings, @@ -49,6 +50,7 @@ const primaryNav = [ const workspaceNav = [ { href: "/agents", label: "Agents", icon: Bot }, + { href: "/runtimes", label: "Runtimes", icon: Monitor }, { href: "/skills", label: "Skills", icon: Sparkles }, { href: "/knowledge-base", label: "Knowledge Base", icon: BookOpen }, ]; diff --git a/apps/web/app/(dashboard)/runtimes/page.tsx b/apps/web/app/(dashboard)/runtimes/page.tsx new file mode 100644 index 00000000..621147a5 --- /dev/null +++ b/apps/web/app/(dashboard)/runtimes/page.tsx @@ -0,0 +1 @@ +export { RuntimesPage as default } from "@/features/runtimes"; diff --git a/apps/web/features/runtimes/components/index.ts b/apps/web/features/runtimes/components/index.ts new file mode 100644 index 00000000..de77d9b4 --- /dev/null +++ b/apps/web/features/runtimes/components/index.ts @@ -0,0 +1 @@ +export { default as RuntimesPage } from "./runtimes-page"; diff --git a/apps/web/features/runtimes/components/runtimes-page.tsx b/apps/web/features/runtimes/components/runtimes-page.tsx new file mode 100644 index 00000000..90f27485 --- /dev/null +++ b/apps/web/features/runtimes/components/runtimes-page.tsx @@ -0,0 +1,607 @@ +"use client"; + +import { useState, useEffect, useCallback, useRef } from "react"; +import { + Monitor, + Cloud, + Wifi, + WifiOff, + Server, + BarChart3, + Loader2, + CheckCircle2, + XCircle, + Zap, +} from "lucide-react"; +import type { AgentRuntime, RuntimeUsage, RuntimePingStatus } from "@multica/types"; +import { Button } from "@/components/ui/button"; +import { api } from "@/shared/api"; +import { useAuthStore } from "@/features/auth"; +import { useWorkspaceStore } from "@/features/workspace"; +import { useWSEvent } from "@/features/realtime"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function formatLastSeen(lastSeenAt: string | null): string { + if (!lastSeenAt) return "Never"; + const diff = Date.now() - new Date(lastSeenAt).getTime(); + if (diff < 60_000) return "Just now"; + if (diff < 3_600_000) return `${Math.floor(diff / 60_000)}m ago`; + if (diff < 86_400_000) return `${Math.floor(diff / 3_600_000)}h ago`; + return `${Math.floor(diff / 86_400_000)}d ago`; +} + +function formatTokens(n: number): string { + if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M`; + if (n >= 1_000) return `${(n / 1_000).toFixed(1)}K`; + return n.toLocaleString(); +} + +// Pricing per million tokens (USD) +const MODEL_PRICING: Record = { + "claude-haiku-4-5": { input: 1, output: 5, cacheRead: 0.1, cacheWrite: 1.25 }, + "claude-sonnet-4-5": { input: 3, output: 15, cacheRead: 0.3, cacheWrite: 3.75 }, + "claude-sonnet-4-6": { input: 3, output: 15, cacheRead: 0.3, cacheWrite: 3.75 }, + "claude-opus-4-5": { input: 5, output: 25, cacheRead: 0.5, cacheWrite: 6.25 }, + "claude-opus-4-6": { input: 5, output: 25, cacheRead: 0.5, cacheWrite: 6.25 }, +}; + +function estimateCost(usage: RuntimeUsage): number { + // Try to find a matching model in pricing table + const model = usage.model; + let pricing = MODEL_PRICING[model]; + if (!pricing) { + // Try partial match + for (const [key, p] of Object.entries(MODEL_PRICING)) { + if (model.startsWith(key)) { + pricing = p; + break; + } + } + } + if (!pricing) return 0; + + return ( + (usage.input_tokens * pricing.input + + usage.output_tokens * pricing.output + + usage.cache_read_tokens * pricing.cacheRead + + usage.cache_write_tokens * pricing.cacheWrite) / + 1_000_000 + ); +} + +function RuntimeModeIcon({ mode }: { mode: string }) { + return mode === "cloud" ? ( + + ) : ( + + ); +} + +function StatusBadge({ status }: { status: string }) { + const isOnline = status === "online"; + return ( + + {isOnline ? ( + + ) : ( + + )} + {isOnline ? "Online" : "Offline"} + + ); +} + +// --------------------------------------------------------------------------- +// Runtime List Item +// --------------------------------------------------------------------------- + +function RuntimeListItem({ + runtime, + isSelected, + onClick, +}: { + runtime: AgentRuntime; + isSelected: boolean; + onClick: () => void; +}) { + return ( + + ); +} + +// --------------------------------------------------------------------------- +// Usage Section +// --------------------------------------------------------------------------- + +function UsageSection({ runtimeId }: { runtimeId: string }) { + const [usage, setUsage] = useState([]); + const [loading, setLoading] = useState(true); + + useEffect(() => { + setLoading(true); + api + .getRuntimeUsage(runtimeId, { days: 30 }) + .then(setUsage) + .catch(() => setUsage([])) + .finally(() => setLoading(false)); + }, [runtimeId]); + + if (loading) { + return ( +
Loading usage...
+ ); + } + + if (usage.length === 0) { + return ( +
+ +

+ No usage data yet +

+
+ ); + } + + // Compute totals + const totals = usage.reduce( + (acc, u) => ({ + input: acc.input + u.input_tokens, + output: acc.output + u.output_tokens, + cacheRead: acc.cacheRead + u.cache_read_tokens, + cacheWrite: acc.cacheWrite + u.cache_write_tokens, + cost: acc.cost + estimateCost(u), + }), + { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0 }, + ); + + // Group by date for the table + const byDate = new Map(); + for (const u of usage) { + const existing = byDate.get(u.date) ?? []; + existing.push(u); + byDate.set(u.date, existing); + } + + return ( +
+ {/* Summary cards */} +
+ + + + +
+ + {totals.cost > 0 && ( +
+ + Estimated cost (30d):{" "} + + + ${totals.cost.toFixed(2)} + +
+ )} + + {/* Daily breakdown table */} +
+
+
Date
+
Model
+
Input
+
Output
+
Cache R
+
Cache W
+
+
+ {[...byDate.entries()].map(([date, rows]) => + rows.map((row, i) => ( +
+
{date}
+
{row.model}
+
+ {formatTokens(row.input_tokens)} +
+
+ {formatTokens(row.output_tokens)} +
+
+ {formatTokens(row.cache_read_tokens)} +
+
+ {formatTokens(row.cache_write_tokens)} +
+
+ )), + )} +
+
+
+ ); +} + +function TokenCard({ label, value }: { label: string; value: number }) { + return ( +
+
{label}
+
+ {formatTokens(value)} +
+
+ ); +} + +// --------------------------------------------------------------------------- +// Connection Test (Ping) +// --------------------------------------------------------------------------- + +const pingStatusConfig: Record< + RuntimePingStatus, + { label: string; icon: typeof Loader2; color: string } +> = { + pending: { label: "Waiting for daemon...", icon: Loader2, color: "text-muted-foreground" }, + running: { label: "Running test...", icon: Loader2, color: "text-info" }, + completed: { label: "Connected", icon: CheckCircle2, color: "text-success" }, + failed: { label: "Failed", icon: XCircle, color: "text-destructive" }, + timeout: { label: "Timeout", icon: XCircle, color: "text-warning" }, +}; + +function PingSection({ runtimeId }: { runtimeId: string }) { + const [status, setStatus] = useState(null); + const [output, setOutput] = useState(""); + const [error, setError] = useState(""); + const [durationMs, setDurationMs] = useState(null); + const [testing, setTesting] = useState(false); + const pollRef = useRef | null>(null); + + const cleanup = useCallback(() => { + if (pollRef.current) { + clearInterval(pollRef.current); + pollRef.current = null; + } + }, []); + + useEffect(() => cleanup, [cleanup]); + + const handleTest = async () => { + cleanup(); + setTesting(true); + setStatus("pending"); + setOutput(""); + setError(""); + setDurationMs(null); + + try { + const ping = await api.pingRuntime(runtimeId); + + // Poll for result every 2 seconds + pollRef.current = setInterval(async () => { + try { + const result = await api.getPingResult(runtimeId, ping.id); + setStatus(result.status as RuntimePingStatus); + + if (result.status === "completed") { + setOutput(result.output ?? ""); + setDurationMs(result.duration_ms ?? null); + setTesting(false); + cleanup(); + } else if (result.status === "failed" || result.status === "timeout") { + setError(result.error ?? "Unknown error"); + setDurationMs(result.duration_ms ?? null); + setTesting(false); + cleanup(); + } + } catch { + // ignore poll errors + } + }, 2000); + } catch { + setStatus("failed"); + setError("Failed to initiate test"); + setTesting(false); + } + }; + + const config = status ? pingStatusConfig[status] : null; + const Icon = config?.icon; + const isActive = status === "pending" || status === "running"; + + return ( +
+
+ + + {config && Icon && ( + + + {config.label} + {durationMs != null && ( + + ({(durationMs / 1000).toFixed(1)}s) + + )} + + )} +
+ + {status === "completed" && output && ( +
+
{output}
+
+ )} + + {(status === "failed" || status === "timeout") && error && ( +
+

{error}

+
+ )} +
+ ); +} + +// --------------------------------------------------------------------------- +// Runtime Detail +// --------------------------------------------------------------------------- + +function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) { + return ( +
+ {/* Header */} +
+
+
+ +
+
+

{runtime.name}

+

+ {runtime.provider} ·{" "} + {runtime.device_info || "Unknown device"} +

+
+
+ +
+ + {/* Content */} +
+ {/* Info grid */} +
+ + + + + {runtime.device_info && ( + + )} + {runtime.daemon_id && ( + + )} +
+ + {/* Connection Test */} +
+

+ Connection Test +

+ +
+ + {/* Usage */} +
+

+ Token Usage (Last 30 Days) +

+ +
+ + {/* Metadata */} + {runtime.metadata && Object.keys(runtime.metadata).length > 0 && ( +
+

+ Metadata +

+
+
+                {JSON.stringify(runtime.metadata, null, 2)}
+              
+
+
+ )} + + {/* Timestamps */} +
+ + +
+
+
+ ); +} + +function InfoField({ + label, + value, + mono, +}: { + label: string; + value: string; + mono?: boolean; +}) { + return ( +
+
{label}
+
+ {value} +
+
+ ); +} + +// --------------------------------------------------------------------------- +// Page +// --------------------------------------------------------------------------- + +export default function RuntimesPage() { + const isLoading = useAuthStore((s) => s.isLoading); + const workspace = useWorkspaceStore((s) => s.workspace); + const [runtimes, setRuntimes] = useState([]); + const [selectedId, setSelectedId] = useState(""); + const [fetching, setFetching] = useState(true); + + const fetchRuntimes = useCallback(async () => { + if (!workspace) return; + try { + const data = await api.listRuntimes({ workspace_id: workspace.id }); + setRuntimes(data); + } finally { + setFetching(false); + } + }, [workspace]); + + useEffect(() => { + fetchRuntimes(); + }, [fetchRuntimes]); + + // Auto-select first runtime + useEffect(() => { + if (runtimes.length > 0 && !selectedId) { + setSelectedId(runtimes[0]!.id); + } + }, [runtimes, selectedId]); + + // Real-time updates + const handleDaemonEvent = useCallback(() => { + fetchRuntimes(); + }, [fetchRuntimes]); + + useWSEvent("daemon:register", handleDaemonEvent); + useWSEvent("daemon:heartbeat", handleDaemonEvent); + + const selected = runtimes.find((r) => r.id === selectedId) ?? null; + + if (isLoading || fetching) { + return ( +
+ Loading... +
+ ); + } + + return ( +
+ {/* Left column - runtime list */} +
+
+

Runtimes

+ + {runtimes.filter((r) => r.status === "online").length}/ + {runtimes.length} online + +
+ {runtimes.length === 0 ? ( +
+ +

+ No runtimes registered +

+

+ Run{" "} + + multica daemon start + {" "} + to register a local runtime. +

+
+ ) : ( +
+ {runtimes.map((runtime) => ( + setSelectedId(runtime.id)} + /> + ))} +
+ )} +
+ + {/* Right column - runtime detail */} +
+ {selected ? ( + + ) : ( +
+ +

Select a runtime to view details

+
+ )} +
+
+ ); +} diff --git a/apps/web/features/runtimes/index.ts b/apps/web/features/runtimes/index.ts new file mode 100644 index 00000000..5fa5b0bf --- /dev/null +++ b/apps/web/features/runtimes/index.ts @@ -0,0 +1 @@ +export { RuntimesPage } from "./components"; diff --git a/packages/sdk/src/api-client.ts b/packages/sdk/src/api-client.ts index 402d45d8..c9105f1a 100644 --- a/packages/sdk/src/api-client.ts +++ b/packages/sdk/src/api-client.ts @@ -26,6 +26,8 @@ import type { PersonalAccessToken, CreatePersonalAccessTokenRequest, CreatePersonalAccessTokenResponse, + RuntimeUsage, + RuntimePing, } from "@multica/types"; import { type SDKLogger, noopLogger } from "./logger"; @@ -232,6 +234,20 @@ export class ApiClient { return this.fetch(`/api/runtimes?${search}`); } + async getRuntimeUsage(runtimeId: string, params?: { days?: number }): Promise { + const search = new URLSearchParams(); + if (params?.days) search.set("days", String(params.days)); + return this.fetch(`/api/runtimes/${runtimeId}/usage?${search}`); + } + + async pingRuntime(runtimeId: string): Promise { + return this.fetch(`/api/runtimes/${runtimeId}/ping`, { method: "POST" }); + } + + async getPingResult(runtimeId: string, pingId: string): Promise { + return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`); + } + async listAgentTasks(agentId: string): Promise { return this.fetch(`/api/agents/${agentId}/tasks`); } diff --git a/packages/types/src/agent.ts b/packages/types/src/agent.ts index 7daabb79..31d83d91 100644 --- a/packages/types/src/agent.ts +++ b/packages/types/src/agent.ts @@ -142,3 +142,27 @@ export interface UpdateSkillRequest { export interface SetAgentSkillsRequest { skill_ids: string[]; } + +export type RuntimePingStatus = "pending" | "running" | "completed" | "failed" | "timeout"; + +export interface RuntimePing { + id: string; + runtime_id: string; + status: RuntimePingStatus; + output?: string; + error?: string; + duration_ms?: number; + created_at: string; + updated_at: string; +} + +export interface RuntimeUsage { + runtime_id: string; + date: string; + provider: string; + model: string; + input_tokens: number; + output_tokens: number; + cache_read_tokens: number; + cache_write_tokens: number; +} diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index c2e69a3c..10729698 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -17,6 +17,9 @@ export type { CreateSkillRequest, UpdateSkillRequest, SetAgentSkillsRequest, + RuntimeUsage, + RuntimePing, + RuntimePingStatus, } from "./agent.js"; export type { Workspace, Member, MemberRole, User, MemberWithUser } from "./workspace.js"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox.js"; diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 66c52614..cfa13a49 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -90,6 +90,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime) r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) + r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage) + r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult) r.Post("/tasks/{taskId}/start", h.StartTask) r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress) @@ -154,6 +156,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Route("/api/runtimes", func(r chi.Router) { r.Get("/", h.ListAgentRuntimes) + r.Get("/{runtimeId}/usage", h.GetRuntimeUsage) + r.Post("/{runtimeId}/ping", h.InitiatePing) + r.Get("/{runtimeId}/ping/{pingId}", h.GetPing) }) r.Post("/api/daemon/pairing-sessions/{token}/approve", h.ApproveDaemonPairingSession) diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go index bd060987..9985f56d 100644 --- a/server/internal/daemon/client.go +++ b/server/internal/daemon/client.go @@ -111,12 +111,37 @@ func (c *Client) FailTask(ctx context.Context, taskID, errMsg string) error { }, nil) } -func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) error { - return c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ - "runtime_id": runtimeID, +func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []map[string]any) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/usage", runtimeID), map[string]any{ + "entries": entries, }, nil) } +// HeartbeatResponse contains the server's response to a heartbeat, including any pending actions. +type HeartbeatResponse struct { + Status string `json:"status"` + PendingPing *PendingPing `json:"pending_ping,omitempty"` +} + +// PendingPing represents a ping test request from the server. +type PendingPing struct { + ID string `json:"id"` +} + +func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) { + var resp HeartbeatResponse + if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ + "runtime_id": runtimeID, + }, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *Client) ReportPingResult(ctx context.Context, runtimeID, pingID string, result map[string]any) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil) +} + func (c *Client) Register(ctx context.Context, req map[string]any) ([]Runtime, error) { var resp struct { Runtimes []Runtime `json:"runtimes"` diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 5b746d65..40ce2be5 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -8,6 +8,7 @@ import ( "time" "github.com/multica-ai/multica/server/internal/daemon/execenv" + "github.com/multica-ai/multica/server/internal/daemon/usage" "github.com/multica-ai/multica/server/pkg/agent" ) @@ -54,7 +55,8 @@ func (d *Daemon) Run(ctx context.Context) error { runtimeIDs = append(runtimeIDs, rt.ID) } - go d.heartbeatLoop(ctx, runtimeIDs) + go d.heartbeatLoop(ctx, runtimes) + go d.usageScanLoop(ctx, runtimes) return d.pollLoop(ctx, runtimeIDs) } @@ -164,7 +166,7 @@ func (d *Daemon) ensurePaired(ctx context.Context) (string, error) { } } -func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { +func (d *Daemon) heartbeatLoop(ctx context.Context, runtimes []Runtime) { ticker := time.NewTicker(d.cfg.HeartbeatInterval) defer ticker.Stop() @@ -173,15 +175,155 @@ func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) { case <-ctx.Done(): return case <-ticker.C: - for _, rid := range runtimeIDs { - if err := d.client.SendHeartbeat(ctx, rid); err != nil { - d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err) + for _, rt := range runtimes { + resp, err := d.client.SendHeartbeat(ctx, rt.ID) + if err != nil { + d.logger.Warn("heartbeat failed", "runtime_id", rt.ID, "error", err) + continue + } + + // Handle pending ping requests. + if resp.PendingPing != nil { + go d.handlePing(ctx, rt, resp.PendingPing.ID) } } } } } +func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) { + d.logger.Info("ping requested", "runtime_id", rt.ID, "ping_id", pingID, "provider", rt.Provider) + + start := time.Now() + + entry, ok := d.cfg.Agents[rt.Provider] + if !ok { + d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{ + "status": "failed", + "error": fmt.Sprintf("no agent configured for provider %q", rt.Provider), + "duration_ms": time.Since(start).Milliseconds(), + }) + return + } + + backend, err := agent.New(rt.Provider, agent.Config{ + ExecutablePath: entry.Path, + Logger: d.logger, + }) + if err != nil { + d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{ + "status": "failed", + "error": err.Error(), + "duration_ms": time.Since(start).Milliseconds(), + }) + return + } + + pingCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + session, err := backend.Execute(pingCtx, "Respond with exactly one word: pong", agent.ExecOptions{ + MaxTurns: 1, + Timeout: 60 * time.Second, + }) + if err != nil { + d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{ + "status": "failed", + "error": err.Error(), + "duration_ms": time.Since(start).Milliseconds(), + }) + return + } + + // Drain messages + go func() { + for range session.Messages { + } + }() + + result := <-session.Result + durationMs := time.Since(start).Milliseconds() + + if result.Status == "completed" { + d.logger.Info("ping completed", "runtime_id", rt.ID, "ping_id", pingID, "duration_ms", durationMs) + d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{ + "status": "completed", + "output": result.Output, + "duration_ms": durationMs, + }) + } else { + errMsg := result.Error + if errMsg == "" { + errMsg = fmt.Sprintf("agent returned status: %s", result.Status) + } + d.logger.Warn("ping failed", "runtime_id", rt.ID, "ping_id", pingID, "error", errMsg) + d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{ + "status": "failed", + "error": errMsg, + "duration_ms": durationMs, + }) + } +} + +func (d *Daemon) usageScanLoop(ctx context.Context, runtimes []Runtime) { + scanner := usage.NewScanner(d.logger) + + // Build provider -> runtime ID mapping. + providerToRuntime := make(map[string]string) + for _, rt := range runtimes { + providerToRuntime[rt.Provider] = rt.ID + } + + report := func() { + records := scanner.Scan() + if len(records) == 0 { + return + } + + // Group records by provider to send to the correct runtime. + byProvider := make(map[string][]map[string]any) + for _, r := range records { + byProvider[r.Provider] = append(byProvider[r.Provider], map[string]any{ + "date": r.Date, + "provider": r.Provider, + "model": r.Model, + "input_tokens": r.InputTokens, + "output_tokens": r.OutputTokens, + "cache_read_tokens": r.CacheReadTokens, + "cache_write_tokens": r.CacheWriteTokens, + }) + } + + for provider, entries := range byProvider { + runtimeID, ok := providerToRuntime[provider] + if !ok { + d.logger.Debug("no runtime for provider, skipping usage report", "provider", provider) + continue + } + if err := d.client.ReportUsage(ctx, runtimeID, entries); err != nil { + d.logger.Warn("usage report failed", "provider", provider, "runtime_id", runtimeID, "error", err) + } else { + d.logger.Info("usage reported", "provider", provider, "runtime_id", runtimeID, "entries", len(entries)) + } + } + } + + // Initial scan on startup. + report() + + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + report() + } + } +} + func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error { pollOffset := 0 pollCount := 0 diff --git a/server/internal/daemon/usage/claude.go b/server/internal/daemon/usage/claude.go new file mode 100644 index 00000000..fa082644 --- /dev/null +++ b/server/internal/daemon/usage/claude.go @@ -0,0 +1,173 @@ +package usage + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "strings" + "time" +) + +// scanClaude reads Claude Code JSONL session logs from ~/.config/claude/projects/**/*.jsonl +// and extracts token usage from "assistant" message lines. +func (s *Scanner) scanClaude() []Record { + roots := claudeLogRoots() + if len(roots) == 0 { + return nil + } + + var allRecords []Record + seen := make(map[string]bool) // dedup by "messageId:requestId" + + for _, root := range roots { + files, err := filepath.Glob(filepath.Join(root, "**", "*.jsonl")) + if err != nil { + s.logger.Debug("claude glob error", "root", root, "error", err) + continue + } + // Also glob one level deeper for subagent logs + deeper, _ := filepath.Glob(filepath.Join(root, "**", "**", "*.jsonl")) + files = append(files, deeper...) + + for _, f := range files { + records := s.parseClaudeFile(f, seen) + allRecords = append(allRecords, records...) + } + } + + return mergeRecords(allRecords) +} + +// claudeLogRoots returns the directories to scan for Claude JSONL logs. +func claudeLogRoots() []string { + var roots []string + + // Check CLAUDE_CONFIG_DIR env var + if configDir := os.Getenv("CLAUDE_CONFIG_DIR"); configDir != "" { + for _, dir := range strings.Split(configDir, ",") { + dir = strings.TrimSpace(dir) + if dir != "" { + roots = append(roots, filepath.Join(dir, "projects")) + } + } + } + + // Standard locations + home, err := os.UserHomeDir() + if err != nil { + return roots + } + + candidates := []string{ + filepath.Join(home, ".config", "claude", "projects"), + filepath.Join(home, ".claude", "projects"), + } + for _, dir := range candidates { + if info, err := os.Stat(dir); err == nil && info.IsDir() { + roots = append(roots, dir) + } + } + + return roots +} + +// claudeLine represents the subset of a Claude JSONL line we care about. +type claudeLine struct { + Type string `json:"type"` + Timestamp string `json:"timestamp"` + RequestID string `json:"requestId"` + Message *struct { + ID string `json:"id"` + Model string `json:"model"` + Usage *struct { + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheReadInputTokens int64 `json:"cache_read_input_tokens"` + CacheCreationInputTokens int64 `json:"cache_creation_input_tokens"` + } `json:"usage"` + } `json:"message"` +} + +func (s *Scanner) parseClaudeFile(path string, seen map[string]bool) []Record { + f, err := os.Open(path) + if err != nil { + return nil + } + defer f.Close() + + var records []Record + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) // up to 1MB lines + + for scanner.Scan() { + line := scanner.Bytes() + + // Fast pre-filter: skip lines that can't contain what we need + if !bytesContains(line, `"type":"assistant"`) && !bytesContains(line, `"type": "assistant"`) { + continue + } + if !bytesContains(line, `"usage"`) { + continue + } + + var entry claudeLine + if err := json.Unmarshal(line, &entry); err != nil { + continue + } + if entry.Type != "assistant" || entry.Message == nil || entry.Message.Usage == nil { + continue + } + + // Dedup: Claude streaming produces multiple lines with same message.id + requestId + // with cumulative token counts. Take only the first occurrence. + dedupKey := entry.Message.ID + ":" + entry.RequestID + if dedupKey != ":" && seen[dedupKey] { + continue + } + if dedupKey != ":" { + seen[dedupKey] = true + } + + // Parse timestamp to get date + ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp) + if err != nil { + ts, err = time.Parse(time.RFC3339, entry.Timestamp) + if err != nil { + continue + } + } + + model := entry.Message.Model + if model == "" { + model = "unknown" + } + + records = append(records, Record{ + Date: ts.Local().Format("2006-01-02"), + Provider: "claude", + Model: normalizeClaudeModel(model), + InputTokens: entry.Message.Usage.InputTokens, + OutputTokens: entry.Message.Usage.OutputTokens, + CacheReadTokens: entry.Message.Usage.CacheReadInputTokens, + CacheWriteTokens: entry.Message.Usage.CacheCreationInputTokens, + }) + } + + return records +} + +// normalizeClaudeModel strips common prefixes/suffixes from model names. +func normalizeClaudeModel(model string) string { + // Strip "anthropic." prefix + model = strings.TrimPrefix(model, "anthropic.") + // Strip Vertex AI prefixes like "us.anthropic." + if idx := strings.LastIndex(model, "anthropic."); idx >= 0 { + model = model[idx+len("anthropic."):] + } + return model +} + +func bytesContains(data []byte, substr string) bool { + return strings.Contains(string(data), substr) +} diff --git a/server/internal/daemon/usage/codex.go b/server/internal/daemon/usage/codex.go new file mode 100644 index 00000000..bbe17b3c --- /dev/null +++ b/server/internal/daemon/usage/codex.go @@ -0,0 +1,171 @@ +package usage + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "strings" +) + +// scanCodex reads Codex CLI session logs from ~/.codex/sessions/YYYY/MM/DD/*.jsonl +// and extracts token usage from "token_count" event lines. +func (s *Scanner) scanCodex() []Record { + root := codexLogRoot() + if root == "" { + return nil + } + + // Glob for session files: ~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl + pattern := filepath.Join(root, "*", "*", "*", "*.jsonl") + files, err := filepath.Glob(pattern) + if err != nil { + s.logger.Debug("codex glob error", "error", err) + return nil + } + + var allRecords []Record + for _, f := range files { + record := s.parseCodexFile(f) + if record != nil { + allRecords = append(allRecords, *record) + } + } + + return mergeRecords(allRecords) +} + +// codexLogRoot returns the Codex sessions directory. +func codexLogRoot() string { + if codexHome := os.Getenv("CODEX_HOME"); codexHome != "" { + dir := filepath.Join(codexHome, "sessions") + if info, err := os.Stat(dir); err == nil && info.IsDir() { + return dir + } + } + + home, err := os.UserHomeDir() + if err != nil { + return "" + } + + dir := filepath.Join(home, ".codex", "sessions") + if info, err := os.Stat(dir); err == nil && info.IsDir() { + return dir + } + return "" +} + +// codexEvent represents a line in a Codex session JSONL file. +type codexEvent struct { + Type string `json:"type"` + Timestamp string `json:"timestamp"` + Payload *struct { + Type string `json:"type"` + Msg json.RawMessage `json:"msg"` + } `json:"payload"` +} + +// codexTokenCount represents the token_count info structure. +type codexTokenCount struct { + Info *struct { + TotalTokenUsage *struct { + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CachedInputTokens int64 `json:"cached_input_tokens"` + ReasoningOutputTokens int64 `json:"reasoning_output_tokens"` + TotalTokens int64 `json:"total_tokens"` + } `json:"total_token_usage"` + Model string `json:"model"` + } `json:"info"` +} + +// parseCodexFile extracts the final cumulative token_count from a Codex session file. +// Returns nil if no usage data found. +func (s *Scanner) parseCodexFile(path string) *Record { + f, err := os.Open(path) + if err != nil { + return nil + } + defer f.Close() + + // Extract date from directory path: .../sessions/YYYY/MM/DD/file.jsonl + date := extractDateFromPath(path) + if date == "" { + return nil + } + + var lastUsage *codexTokenCount + var lastModel string + + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) + + for scanner.Scan() { + line := scanner.Bytes() + + // Fast pre-filter + if !bytesContains(line, `"token_count"`) { + continue + } + + // Try direct event format: {"type": "event_msg", "payload": {"type": "token_count", ...}} + var evt codexEvent + if err := json.Unmarshal(line, &evt); err != nil { + continue + } + + // Check if payload contains token_count + if evt.Payload != nil && evt.Payload.Type == "token_count" { + var tc codexTokenCount + if err := json.Unmarshal(evt.Payload.Msg, &tc); err == nil && tc.Info != nil && tc.Info.TotalTokenUsage != nil { + lastUsage = &tc + if tc.Info.Model != "" { + lastModel = tc.Info.Model + } + continue + } + } + + // Also try flat format where msg is at top level + var tc codexTokenCount + if err := json.Unmarshal(line, &tc); err == nil && tc.Info != nil && tc.Info.TotalTokenUsage != nil { + lastUsage = &tc + if tc.Info.Model != "" { + lastModel = tc.Info.Model + } + } + } + + if lastUsage == nil || lastUsage.Info == nil || lastUsage.Info.TotalTokenUsage == nil { + return nil + } + + model := lastModel + if model == "" { + model = "unknown" + } + + usage := lastUsage.Info.TotalTokenUsage + return &Record{ + Date: date, + Provider: "codex", + Model: model, + InputTokens: usage.InputTokens, + OutputTokens: usage.OutputTokens + usage.ReasoningOutputTokens, + CacheReadTokens: usage.CachedInputTokens, + CacheWriteTokens: 0, // Codex doesn't have cache write tokens + } +} + +// extractDateFromPath extracts YYYY-MM-DD from a path like .../sessions/2026/03/26/file.jsonl +func extractDateFromPath(path string) string { + parts := strings.Split(filepath.ToSlash(path), "/") + // Look for sessions/YYYY/MM/DD pattern + for i := 0; i < len(parts)-3; i++ { + if parts[i] == "sessions" && len(parts[i+1]) == 4 && len(parts[i+2]) == 2 && len(parts[i+3]) == 2 { + return parts[i+1] + "-" + parts[i+2] + "-" + parts[i+3] + } + } + return "" +} diff --git a/server/internal/daemon/usage/scanner.go b/server/internal/daemon/usage/scanner.go new file mode 100644 index 00000000..d4f015a1 --- /dev/null +++ b/server/internal/daemon/usage/scanner.go @@ -0,0 +1,68 @@ +package usage + +import ( + "log/slog" +) + +// Record represents aggregated token usage for one (date, provider, model) tuple. +type Record struct { + Date string `json:"date"` // "2006-01-02" + Provider string `json:"provider"` // "claude" or "codex" + Model string `json:"model"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheReadTokens int64 `json:"cache_read_tokens"` + CacheWriteTokens int64 `json:"cache_write_tokens"` +} + +// Scanner scans local CLI log files for token usage data. +type Scanner struct { + logger *slog.Logger +} + +// NewScanner creates a new usage scanner. +func NewScanner(logger *slog.Logger) *Scanner { + return &Scanner{logger: logger} +} + +// Scan reads local JSONL log files for both Claude Code and Codex CLI, +// and returns aggregated usage records keyed by (date, provider, model). +func (s *Scanner) Scan() []Record { + var records []Record + + claudeRecords := s.scanClaude() + records = append(records, claudeRecords...) + + codexRecords := s.scanCodex() + records = append(records, codexRecords...) + + return records +} + +// aggregation key for merging records. +type aggKey struct { + Date string + Provider string + Model string +} + +func mergeRecords(records []Record) []Record { + m := make(map[aggKey]*Record) + for _, r := range records { + k := aggKey{Date: r.Date, Provider: r.Provider, Model: r.Model} + if existing, ok := m[k]; ok { + existing.InputTokens += r.InputTokens + existing.OutputTokens += r.OutputTokens + existing.CacheReadTokens += r.CacheReadTokens + existing.CacheWriteTokens += r.CacheWriteTokens + } else { + copy := r + m[k] = © + } + } + result := make([]Record, 0, len(m)) + for _, r := range m { + result = append(result, *r) + } + return result +} diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index 6ac274bd..34c64fe6 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -132,7 +132,15 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { } slog.Debug("daemon heartbeat", "runtime_id", req.RuntimeID) - writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + + resp := map[string]any{"status": "ok"} + + // Check for pending ping requests for this runtime. + if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil { + resp["pending_ping"] = map[string]string{"id": pending.ID} + } + + writeJSON(w, http.StatusOK, resp) } // ClaimTaskByRuntime atomically claims the next queued task for a runtime. diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index 3b555c33..db482fb6 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -34,6 +34,7 @@ type Handler struct { Bus *events.Bus TaskService *service.TaskService EmailService *service.EmailService + PingStore *PingStore } func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService) *Handler { @@ -50,6 +51,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event Bus: bus, TaskService: service.NewTaskService(queries, hub, bus), EmailService: emailService, + PingStore: NewPingStore(), } } diff --git a/server/internal/handler/runtime.go b/server/internal/handler/runtime.go index 48d4dcef..eeb61b5a 100644 --- a/server/internal/handler/runtime.go +++ b/server/internal/handler/runtime.go @@ -3,7 +3,11 @@ package handler import ( "encoding/json" "net/http" + "strconv" + "time" + "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" ) @@ -47,6 +51,114 @@ func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse { } } +// --------------------------------------------------------------------------- +// Runtime Usage +// --------------------------------------------------------------------------- + +type RuntimeUsageEntry struct { + Date string `json:"date"` + Provider string `json:"provider"` + Model string `json:"model"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheReadTokens int64 `json:"cache_read_tokens"` + CacheWriteTokens int64 `json:"cache_write_tokens"` +} + +type RuntimeUsageResponse struct { + RuntimeID string `json:"runtime_id"` + Date string `json:"date"` + Provider string `json:"provider"` + Model string `json:"model"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheReadTokens int64 `json:"cache_read_tokens"` + CacheWriteTokens int64 `json:"cache_write_tokens"` +} + +// ReportRuntimeUsage receives usage data from the daemon (unauthenticated daemon route). +func (h *Handler) ReportRuntimeUsage(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") + if runtimeID == "" { + writeError(w, http.StatusBadRequest, "runtimeId is required") + return + } + + var req struct { + Entries []RuntimeUsageEntry `json:"entries"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + for _, entry := range req.Entries { + date, err := time.Parse("2006-01-02", entry.Date) + if err != nil { + continue + } + h.Queries.UpsertRuntimeUsage(r.Context(), db.UpsertRuntimeUsageParams{ + RuntimeID: parseUUID(runtimeID), + Date: pgtype.Date{Time: date, Valid: true}, + Provider: entry.Provider, + Model: entry.Model, + InputTokens: entry.InputTokens, + OutputTokens: entry.OutputTokens, + CacheReadTokens: entry.CacheReadTokens, + CacheWriteTokens: entry.CacheWriteTokens, + }) + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// GetRuntimeUsage returns usage data for a runtime (protected route). +func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") + + rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID)) + if err != nil { + writeError(w, http.StatusNotFound, "runtime not found") + return + } + + if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok { + return + } + + limit := int32(90) + if l := r.URL.Query().Get("days"); l != "" { + if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 365 { + limit = int32(parsed) + } + } + + rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{ + RuntimeID: parseUUID(runtimeID), + Limit: limit, + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list usage") + return + } + + resp := make([]RuntimeUsageResponse, len(rows)) + for i, row := range rows { + resp[i] = RuntimeUsageResponse{ + RuntimeID: runtimeID, + Date: row.Date.Time.Format("2006-01-02"), + Provider: row.Provider, + Model: row.Model, + InputTokens: row.InputTokens, + OutputTokens: row.OutputTokens, + CacheReadTokens: row.CacheReadTokens, + CacheWriteTokens: row.CacheWriteTokens, + } + } + + writeJSON(w, http.StatusOK, resp) +} + func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) { workspaceID := resolveWorkspaceID(r) if _, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found"); !ok { diff --git a/server/internal/handler/runtime_ping.go b/server/internal/handler/runtime_ping.go new file mode 100644 index 00000000..c23c4683 --- /dev/null +++ b/server/internal/handler/runtime_ping.go @@ -0,0 +1,200 @@ +package handler + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/go-chi/chi/v5" +) + +// --------------------------------------------------------------------------- +// In-memory ping store +// --------------------------------------------------------------------------- + +// PingStatus represents the lifecycle of a runtime ping test. +type PingStatus string + +const ( + PingPending PingStatus = "pending" + PingRunning PingStatus = "running" + PingCompleted PingStatus = "completed" + PingFailed PingStatus = "failed" + PingTimeout PingStatus = "timeout" +) + +// PingRequest represents a pending or completed ping test. +type PingRequest struct { + ID string `json:"id"` + RuntimeID string `json:"runtime_id"` + Status PingStatus `json:"status"` + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` + DurationMs int64 `json:"duration_ms,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// PingStore is a thread-safe in-memory store for ping requests. +// Pings expire after 2 minutes. +type PingStore struct { + mu sync.Mutex + pings map[string]*PingRequest // keyed by ping ID +} + +func NewPingStore() *PingStore { + return &PingStore{ + pings: make(map[string]*PingRequest), + } +} + +func (s *PingStore) Create(runtimeID string) *PingRequest { + s.mu.Lock() + defer s.mu.Unlock() + + // Clean up old pings for this runtime + for id, p := range s.pings { + if time.Since(p.CreatedAt) > 2*time.Minute { + delete(s.pings, id) + } + } + + ping := &PingRequest{ + ID: randomID(), + RuntimeID: runtimeID, + Status: PingPending, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + s.pings[ping.ID] = ping + return ping +} + +func (s *PingStore) Get(id string) *PingRequest { + s.mu.Lock() + defer s.mu.Unlock() + + p, ok := s.pings[id] + if !ok { + return nil + } + // Check for timeout + if p.Status == PingPending && time.Since(p.CreatedAt) > 60*time.Second { + p.Status = PingTimeout + p.Error = "daemon did not respond within 60 seconds" + p.UpdatedAt = time.Now() + } + return p +} + +// PopPending returns and removes the oldest pending ping for a runtime. +func (s *PingStore) PopPending(runtimeID string) *PingRequest { + s.mu.Lock() + defer s.mu.Unlock() + + var oldest *PingRequest + for _, p := range s.pings { + if p.RuntimeID == runtimeID && p.Status == PingPending { + if oldest == nil || p.CreatedAt.Before(oldest.CreatedAt) { + oldest = p + } + } + } + if oldest != nil { + oldest.Status = PingRunning + oldest.UpdatedAt = time.Now() + } + return oldest +} + +func (s *PingStore) Complete(id string, output string, durationMs int64) { + s.mu.Lock() + defer s.mu.Unlock() + + if p, ok := s.pings[id]; ok { + p.Status = PingCompleted + p.Output = output + p.DurationMs = durationMs + p.UpdatedAt = time.Now() + } +} + +func (s *PingStore) Fail(id string, errMsg string, durationMs int64) { + s.mu.Lock() + defer s.mu.Unlock() + + if p, ok := s.pings[id]; ok { + p.Status = PingFailed + p.Error = errMsg + p.DurationMs = durationMs + p.UpdatedAt = time.Now() + } +} + +func randomID() string { + b := make([]byte, 16) + rand.Read(b) + return hex.EncodeToString(b) +} + +// --------------------------------------------------------------------------- +// Handlers +// --------------------------------------------------------------------------- + +// InitiatePing creates a new ping request (protected route, called by frontend). +func (h *Handler) InitiatePing(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") + + rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID)) + if err != nil { + writeError(w, http.StatusNotFound, "runtime not found") + return + } + + if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok { + return + } + + ping := h.PingStore.Create(runtimeID) + writeJSON(w, http.StatusOK, ping) +} + +// GetPing returns the status of a ping request (protected route, called by frontend). +func (h *Handler) GetPing(w http.ResponseWriter, r *http.Request) { + pingID := chi.URLParam(r, "pingId") + + ping := h.PingStore.Get(pingID) + if ping == nil { + writeError(w, http.StatusNotFound, "ping not found") + return + } + + writeJSON(w, http.StatusOK, ping) +} + +// ReportPingResult receives the ping result from the daemon. +func (h *Handler) ReportPingResult(w http.ResponseWriter, r *http.Request) { + pingID := chi.URLParam(r, "pingId") + + var req struct { + Status string `json:"status"` // "completed" or "failed" + Output string `json:"output"` + Error string `json:"error"` + DurationMs int64 `json:"duration_ms"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + if req.Status == "completed" { + h.PingStore.Complete(pingID, req.Output, req.DurationMs) + } else { + h.PingStore.Fail(pingID, req.Error, req.DurationMs) + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} diff --git a/server/migrations/013_runtime_usage.down.sql b/server/migrations/013_runtime_usage.down.sql new file mode 100644 index 00000000..de515246 --- /dev/null +++ b/server/migrations/013_runtime_usage.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS runtime_usage; diff --git a/server/migrations/013_runtime_usage.up.sql b/server/migrations/013_runtime_usage.up.sql new file mode 100644 index 00000000..3e7be6d1 --- /dev/null +++ b/server/migrations/013_runtime_usage.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE runtime_usage ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + runtime_id UUID NOT NULL REFERENCES agent_runtime(id) ON DELETE CASCADE, + date DATE NOT NULL, + provider TEXT NOT NULL, + model TEXT NOT NULL DEFAULT '', + input_tokens BIGINT NOT NULL DEFAULT 0, + output_tokens BIGINT NOT NULL DEFAULT 0, + cache_read_tokens BIGINT NOT NULL DEFAULT 0, + cache_write_tokens BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (runtime_id, date, provider, model) +); + +CREATE INDEX idx_runtime_usage_runtime_date ON runtime_usage(runtime_id, date DESC); diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index a7f99877..dd71c7ea 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -179,6 +179,20 @@ type Member struct { CreatedAt pgtype.Timestamptz `json:"created_at"` } +type RuntimeUsage struct { + ID pgtype.UUID `json:"id"` + RuntimeID pgtype.UUID `json:"runtime_id"` + Date pgtype.Date `json:"date"` + Provider string `json:"provider"` + Model string `json:"model"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheReadTokens int64 `json:"cache_read_tokens"` + CacheWriteTokens int64 `json:"cache_write_tokens"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type PersonalAccessToken struct { ID pgtype.UUID `json:"id"` UserID pgtype.UUID `json:"user_id"` diff --git a/server/pkg/db/generated/runtime_usage.sql.go b/server/pkg/db/generated/runtime_usage.sql.go new file mode 100644 index 00000000..99ffeb21 --- /dev/null +++ b/server/pkg/db/generated/runtime_usage.sql.go @@ -0,0 +1,141 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: runtime_usage.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const upsertRuntimeUsage = `-- name: UpsertRuntimeUsage :exec +INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (runtime_id, date, provider, model) +DO UPDATE SET + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + cache_read_tokens = EXCLUDED.cache_read_tokens, + cache_write_tokens = EXCLUDED.cache_write_tokens, + updated_at = now() +` + +type UpsertRuntimeUsageParams struct { + RuntimeID pgtype.UUID `json:"runtime_id"` + Date pgtype.Date `json:"date"` + Provider string `json:"provider"` + Model string `json:"model"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheReadTokens int64 `json:"cache_read_tokens"` + CacheWriteTokens int64 `json:"cache_write_tokens"` +} + +func (q *Queries) UpsertRuntimeUsage(ctx context.Context, arg UpsertRuntimeUsageParams) error { + _, err := q.db.Exec(ctx, upsertRuntimeUsage, + arg.RuntimeID, + arg.Date, + arg.Provider, + arg.Model, + arg.InputTokens, + arg.OutputTokens, + arg.CacheReadTokens, + arg.CacheWriteTokens, + ) + return err +} + +const listRuntimeUsage = `-- name: ListRuntimeUsage :many +SELECT id, runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM runtime_usage +WHERE runtime_id = $1 +ORDER BY date DESC +LIMIT $2 +` + +type ListRuntimeUsageParams struct { + RuntimeID pgtype.UUID `json:"runtime_id"` + Limit int32 `json:"limit"` +} + +func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]RuntimeUsage, error) { + rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []RuntimeUsage{} + for rows.Next() { + var i RuntimeUsage + if err := rows.Scan( + &i.ID, + &i.RuntimeID, + &i.Date, + &i.Provider, + &i.Model, + &i.InputTokens, + &i.OutputTokens, + &i.CacheReadTokens, + &i.CacheWriteTokens, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getRuntimeUsageSummary = `-- name: GetRuntimeUsageSummary :many +SELECT provider, model, + SUM(input_tokens)::bigint AS total_input_tokens, + SUM(output_tokens)::bigint AS total_output_tokens, + SUM(cache_read_tokens)::bigint AS total_cache_read_tokens, + SUM(cache_write_tokens)::bigint AS total_cache_write_tokens +FROM runtime_usage +WHERE runtime_id = $1 +GROUP BY provider, model +ORDER BY provider, model +` + +type GetRuntimeUsageSummaryRow struct { + Provider string `json:"provider"` + Model string `json:"model"` + TotalInputTokens int64 `json:"total_input_tokens"` + TotalOutputTokens int64 `json:"total_output_tokens"` + TotalCacheReadTokens int64 `json:"total_cache_read_tokens"` + TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"` +} + +func (q *Queries) GetRuntimeUsageSummary(ctx context.Context, runtimeID pgtype.UUID) ([]GetRuntimeUsageSummaryRow, error) { + rows, err := q.db.Query(ctx, getRuntimeUsageSummary, runtimeID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []GetRuntimeUsageSummaryRow{} + for rows.Next() { + var i GetRuntimeUsageSummaryRow + if err := rows.Scan( + &i.Provider, + &i.Model, + &i.TotalInputTokens, + &i.TotalOutputTokens, + &i.TotalCacheReadTokens, + &i.TotalCacheWriteTokens, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/server/pkg/db/queries/runtime_usage.sql b/server/pkg/db/queries/runtime_usage.sql new file mode 100644 index 00000000..c2be6a78 --- /dev/null +++ b/server/pkg/db/queries/runtime_usage.sql @@ -0,0 +1,27 @@ +-- name: UpsertRuntimeUsage :exec +INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (runtime_id, date, provider, model) +DO UPDATE SET + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + cache_read_tokens = EXCLUDED.cache_read_tokens, + cache_write_tokens = EXCLUDED.cache_write_tokens, + updated_at = now(); + +-- name: ListRuntimeUsage :many +SELECT * FROM runtime_usage +WHERE runtime_id = $1 +ORDER BY date DESC +LIMIT $2; + +-- name: GetRuntimeUsageSummary :many +SELECT provider, model, + SUM(input_tokens)::bigint AS total_input_tokens, + SUM(output_tokens)::bigint AS total_output_tokens, + SUM(cache_read_tokens)::bigint AS total_cache_read_tokens, + SUM(cache_write_tokens)::bigint AS total_cache_write_tokens +FROM runtime_usage +WHERE runtime_id = $1 +GROUP BY provider, model +ORDER BY provider, model;