feat(runtimes): add Runtimes tab with usage tracking and connection test

Add a new "Runtimes" sidebar tab to manage local agent runtimes with three
main capabilities: runtime status overview, token usage tracking (reading
Claude Code and Codex CLI local JSONL logs via daemon), and an interactive
connection test that sends a ping through the daemon to verify end-to-end
agent CLI connectivity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jiayuan 2026-03-26 18:28:36 +08:00
parent 6fd0e2b319
commit 903fbee55d
24 changed files with 1773 additions and 9 deletions

View file

@ -95,6 +95,10 @@ check-main:
@ENV_FILE=$(MAIN_ENV_FILE) bash scripts/check.sh
setup-worktree:
@if [ ! -f "$(WORKTREE_ENV_FILE)" ]; then \
echo "==> No $(WORKTREE_ENV_FILE) found, generating..."; \
bash scripts/init-worktree-env.sh $(WORKTREE_ENV_FILE); \
fi
@$(MAKE) setup ENV_FILE=$(WORKTREE_ENV_FILE)
start-worktree:

View file

@ -6,6 +6,7 @@ import {
Inbox,
ListTodo,
Bot,
Monitor,
BookOpen,
ChevronDown,
Settings,
@ -49,6 +50,7 @@ const primaryNav = [
const workspaceNav = [
{ href: "/agents", label: "Agents", icon: Bot },
{ href: "/runtimes", label: "Runtimes", icon: Monitor },
{ href: "/skills", label: "Skills", icon: Sparkles },
{ href: "/knowledge-base", label: "Knowledge Base", icon: BookOpen },
];

View file

@ -0,0 +1 @@
export { RuntimesPage as default } from "@/features/runtimes";

View file

@ -0,0 +1 @@
export { default as RuntimesPage } from "./runtimes-page";

View file

@ -0,0 +1,607 @@
"use client";
import { useState, useEffect, useCallback, useRef } from "react";
import {
Monitor,
Cloud,
Wifi,
WifiOff,
Server,
BarChart3,
Loader2,
CheckCircle2,
XCircle,
Zap,
} from "lucide-react";
import type { AgentRuntime, RuntimeUsage, RuntimePingStatus } from "@multica/types";
import { Button } from "@/components/ui/button";
import { api } from "@/shared/api";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore } from "@/features/workspace";
import { useWSEvent } from "@/features/realtime";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function formatLastSeen(lastSeenAt: string | null): string {
if (!lastSeenAt) return "Never";
const diff = Date.now() - new Date(lastSeenAt).getTime();
if (diff < 60_000) return "Just now";
if (diff < 3_600_000) return `${Math.floor(diff / 60_000)}m ago`;
if (diff < 86_400_000) return `${Math.floor(diff / 3_600_000)}h ago`;
return `${Math.floor(diff / 86_400_000)}d ago`;
}
function formatTokens(n: number): string {
if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M`;
if (n >= 1_000) return `${(n / 1_000).toFixed(1)}K`;
return n.toLocaleString();
}
// Pricing per million tokens (USD)
const MODEL_PRICING: Record<string, { input: number; output: number; cacheRead: number; cacheWrite: number }> = {
"claude-haiku-4-5": { input: 1, output: 5, cacheRead: 0.1, cacheWrite: 1.25 },
"claude-sonnet-4-5": { input: 3, output: 15, cacheRead: 0.3, cacheWrite: 3.75 },
"claude-sonnet-4-6": { input: 3, output: 15, cacheRead: 0.3, cacheWrite: 3.75 },
"claude-opus-4-5": { input: 5, output: 25, cacheRead: 0.5, cacheWrite: 6.25 },
"claude-opus-4-6": { input: 5, output: 25, cacheRead: 0.5, cacheWrite: 6.25 },
};
function estimateCost(usage: RuntimeUsage): number {
// Try to find a matching model in pricing table
const model = usage.model;
let pricing = MODEL_PRICING[model];
if (!pricing) {
// Try partial match
for (const [key, p] of Object.entries(MODEL_PRICING)) {
if (model.startsWith(key)) {
pricing = p;
break;
}
}
}
if (!pricing) return 0;
return (
(usage.input_tokens * pricing.input +
usage.output_tokens * pricing.output +
usage.cache_read_tokens * pricing.cacheRead +
usage.cache_write_tokens * pricing.cacheWrite) /
1_000_000
);
}
function RuntimeModeIcon({ mode }: { mode: string }) {
return mode === "cloud" ? (
<Cloud className="h-3.5 w-3.5" />
) : (
<Monitor className="h-3.5 w-3.5" />
);
}
function StatusBadge({ status }: { status: string }) {
const isOnline = status === "online";
return (
<span
className={`inline-flex items-center gap-1.5 rounded-full px-2 py-0.5 text-xs font-medium ${
isOnline
? "bg-success/10 text-success"
: "bg-muted text-muted-foreground"
}`}
>
{isOnline ? (
<Wifi className="h-3 w-3" />
) : (
<WifiOff className="h-3 w-3" />
)}
{isOnline ? "Online" : "Offline"}
</span>
);
}
// ---------------------------------------------------------------------------
// Runtime List Item
// ---------------------------------------------------------------------------
function RuntimeListItem({
runtime,
isSelected,
onClick,
}: {
runtime: AgentRuntime;
isSelected: boolean;
onClick: () => void;
}) {
return (
<button
onClick={onClick}
className={`flex w-full items-center gap-3 px-4 py-3 text-left transition-colors ${
isSelected ? "bg-accent" : "hover:bg-accent/50"
}`}
>
<div
className={`flex h-8 w-8 shrink-0 items-center justify-center rounded-lg ${
runtime.status === "online" ? "bg-success/10" : "bg-muted"
}`}
>
<RuntimeModeIcon mode={runtime.runtime_mode} />
</div>
<div className="min-w-0 flex-1">
<div className="truncate text-sm font-medium">{runtime.name}</div>
<div className="mt-0.5 truncate text-xs text-muted-foreground">
{runtime.provider} &middot; {runtime.runtime_mode}
</div>
</div>
<div
className={`h-2 w-2 shrink-0 rounded-full ${
runtime.status === "online" ? "bg-success" : "bg-muted-foreground/40"
}`}
/>
</button>
);
}
// ---------------------------------------------------------------------------
// Usage Section
// ---------------------------------------------------------------------------
function UsageSection({ runtimeId }: { runtimeId: string }) {
const [usage, setUsage] = useState<RuntimeUsage[]>([]);
const [loading, setLoading] = useState(true);
useEffect(() => {
setLoading(true);
api
.getRuntimeUsage(runtimeId, { days: 30 })
.then(setUsage)
.catch(() => setUsage([]))
.finally(() => setLoading(false));
}, [runtimeId]);
if (loading) {
return (
<div className="text-xs text-muted-foreground">Loading usage...</div>
);
}
if (usage.length === 0) {
return (
<div className="flex flex-col items-center rounded-lg border border-dashed py-6">
<BarChart3 className="h-5 w-5 text-muted-foreground/40" />
<p className="mt-2 text-xs text-muted-foreground">
No usage data yet
</p>
</div>
);
}
// Compute totals
const totals = usage.reduce(
(acc, u) => ({
input: acc.input + u.input_tokens,
output: acc.output + u.output_tokens,
cacheRead: acc.cacheRead + u.cache_read_tokens,
cacheWrite: acc.cacheWrite + u.cache_write_tokens,
cost: acc.cost + estimateCost(u),
}),
{ input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0 },
);
// Group by date for the table
const byDate = new Map<string, RuntimeUsage[]>();
for (const u of usage) {
const existing = byDate.get(u.date) ?? [];
existing.push(u);
byDate.set(u.date, existing);
}
return (
<div className="space-y-4">
{/* Summary cards */}
<div className="grid grid-cols-4 gap-3">
<TokenCard label="Input" value={totals.input} />
<TokenCard label="Output" value={totals.output} />
<TokenCard label="Cache Read" value={totals.cacheRead} />
<TokenCard label="Cache Write" value={totals.cacheWrite} />
</div>
{totals.cost > 0 && (
<div className="rounded-lg border bg-muted/30 px-3 py-2">
<span className="text-xs text-muted-foreground">
Estimated cost (30d):{" "}
</span>
<span className="text-sm font-semibold">
${totals.cost.toFixed(2)}
</span>
</div>
)}
{/* Daily breakdown table */}
<div className="rounded-lg border">
<div className="grid grid-cols-[100px_1fr_80px_80px_80px_80px] gap-2 border-b px-3 py-2 text-xs font-medium text-muted-foreground">
<div>Date</div>
<div>Model</div>
<div className="text-right">Input</div>
<div className="text-right">Output</div>
<div className="text-right">Cache R</div>
<div className="text-right">Cache W</div>
</div>
<div className="max-h-64 overflow-y-auto divide-y">
{[...byDate.entries()].map(([date, rows]) =>
rows.map((row, i) => (
<div
key={`${date}-${row.model}-${i}`}
className="grid grid-cols-[100px_1fr_80px_80px_80px_80px] gap-2 px-3 py-1.5 text-xs"
>
<div className="text-muted-foreground">{date}</div>
<div className="truncate font-mono">{row.model}</div>
<div className="text-right tabular-nums">
{formatTokens(row.input_tokens)}
</div>
<div className="text-right tabular-nums">
{formatTokens(row.output_tokens)}
</div>
<div className="text-right tabular-nums">
{formatTokens(row.cache_read_tokens)}
</div>
<div className="text-right tabular-nums">
{formatTokens(row.cache_write_tokens)}
</div>
</div>
)),
)}
</div>
</div>
</div>
);
}
function TokenCard({ label, value }: { label: string; value: number }) {
return (
<div className="rounded-lg border px-3 py-2">
<div className="text-xs text-muted-foreground">{label}</div>
<div className="mt-0.5 text-sm font-semibold tabular-nums">
{formatTokens(value)}
</div>
</div>
);
}
// ---------------------------------------------------------------------------
// Connection Test (Ping)
// ---------------------------------------------------------------------------
const pingStatusConfig: Record<
RuntimePingStatus,
{ label: string; icon: typeof Loader2; color: string }
> = {
pending: { label: "Waiting for daemon...", icon: Loader2, color: "text-muted-foreground" },
running: { label: "Running test...", icon: Loader2, color: "text-info" },
completed: { label: "Connected", icon: CheckCircle2, color: "text-success" },
failed: { label: "Failed", icon: XCircle, color: "text-destructive" },
timeout: { label: "Timeout", icon: XCircle, color: "text-warning" },
};
function PingSection({ runtimeId }: { runtimeId: string }) {
const [status, setStatus] = useState<RuntimePingStatus | null>(null);
const [output, setOutput] = useState("");
const [error, setError] = useState("");
const [durationMs, setDurationMs] = useState<number | null>(null);
const [testing, setTesting] = useState(false);
const pollRef = useRef<ReturnType<typeof setInterval> | null>(null);
const cleanup = useCallback(() => {
if (pollRef.current) {
clearInterval(pollRef.current);
pollRef.current = null;
}
}, []);
useEffect(() => cleanup, [cleanup]);
const handleTest = async () => {
cleanup();
setTesting(true);
setStatus("pending");
setOutput("");
setError("");
setDurationMs(null);
try {
const ping = await api.pingRuntime(runtimeId);
// Poll for result every 2 seconds
pollRef.current = setInterval(async () => {
try {
const result = await api.getPingResult(runtimeId, ping.id);
setStatus(result.status as RuntimePingStatus);
if (result.status === "completed") {
setOutput(result.output ?? "");
setDurationMs(result.duration_ms ?? null);
setTesting(false);
cleanup();
} else if (result.status === "failed" || result.status === "timeout") {
setError(result.error ?? "Unknown error");
setDurationMs(result.duration_ms ?? null);
setTesting(false);
cleanup();
}
} catch {
// ignore poll errors
}
}, 2000);
} catch {
setStatus("failed");
setError("Failed to initiate test");
setTesting(false);
}
};
const config = status ? pingStatusConfig[status] : null;
const Icon = config?.icon;
const isActive = status === "pending" || status === "running";
return (
<div className="space-y-2">
<div className="flex items-center gap-2">
<Button
variant="outline"
size="xs"
onClick={handleTest}
disabled={testing}
>
{testing ? (
<Loader2 className="h-3 w-3 animate-spin" />
) : (
<Zap className="h-3 w-3" />
)}
{testing ? "Testing..." : "Test Connection"}
</Button>
{config && Icon && (
<span className={`inline-flex items-center gap-1 text-xs ${config.color}`}>
<Icon className={`h-3 w-3 ${isActive ? "animate-spin" : ""}`} />
{config.label}
{durationMs != null && (
<span className="text-muted-foreground">
({(durationMs / 1000).toFixed(1)}s)
</span>
)}
</span>
)}
</div>
{status === "completed" && output && (
<div className="rounded-lg border bg-success/5 px-3 py-2">
<pre className="text-xs font-mono whitespace-pre-wrap">{output}</pre>
</div>
)}
{(status === "failed" || status === "timeout") && error && (
<div className="rounded-lg border border-destructive/20 bg-destructive/5 px-3 py-2">
<p className="text-xs text-destructive">{error}</p>
</div>
)}
</div>
);
}
// ---------------------------------------------------------------------------
// Runtime Detail
// ---------------------------------------------------------------------------
function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) {
return (
<div className="flex flex-1 min-h-0 flex-col">
{/* Header */}
<div className="flex items-center justify-between border-b px-6 py-4">
<div className="flex items-center gap-3">
<div
className={`flex h-9 w-9 shrink-0 items-center justify-center rounded-lg ${
runtime.status === "online" ? "bg-success/10" : "bg-muted"
}`}
>
<RuntimeModeIcon mode={runtime.runtime_mode} />
</div>
<div className="min-w-0">
<h2 className="text-sm font-semibold truncate">{runtime.name}</h2>
<p className="text-xs text-muted-foreground truncate">
{runtime.provider} &middot;{" "}
{runtime.device_info || "Unknown device"}
</p>
</div>
</div>
<StatusBadge status={runtime.status} />
</div>
{/* Content */}
<div className="flex-1 overflow-y-auto p-6 space-y-6">
{/* Info grid */}
<div className="grid grid-cols-2 gap-4">
<InfoField label="Runtime Mode" value={runtime.runtime_mode} />
<InfoField label="Provider" value={runtime.provider} />
<InfoField label="Status" value={runtime.status} />
<InfoField
label="Last Seen"
value={formatLastSeen(runtime.last_seen_at)}
/>
{runtime.device_info && (
<InfoField label="Device" value={runtime.device_info} />
)}
{runtime.daemon_id && (
<InfoField label="Daemon ID" value={runtime.daemon_id} mono />
)}
</div>
{/* Connection Test */}
<div>
<h3 className="text-xs font-medium text-muted-foreground mb-3">
Connection Test
</h3>
<PingSection runtimeId={runtime.id} />
</div>
{/* Usage */}
<div>
<h3 className="text-xs font-medium text-muted-foreground mb-3">
Token Usage (Last 30 Days)
</h3>
<UsageSection runtimeId={runtime.id} />
</div>
{/* Metadata */}
{runtime.metadata && Object.keys(runtime.metadata).length > 0 && (
<div>
<h3 className="text-xs font-medium text-muted-foreground mb-2">
Metadata
</h3>
<div className="rounded-lg border bg-muted/30 p-3">
<pre className="text-xs font-mono whitespace-pre-wrap break-all">
{JSON.stringify(runtime.metadata, null, 2)}
</pre>
</div>
</div>
)}
{/* Timestamps */}
<div className="grid grid-cols-2 gap-4 border-t pt-4">
<InfoField
label="Created"
value={new Date(runtime.created_at).toLocaleString()}
/>
<InfoField
label="Updated"
value={new Date(runtime.updated_at).toLocaleString()}
/>
</div>
</div>
</div>
);
}
function InfoField({
label,
value,
mono,
}: {
label: string;
value: string;
mono?: boolean;
}) {
return (
<div>
<div className="text-xs text-muted-foreground">{label}</div>
<div
className={`mt-0.5 text-sm truncate ${mono ? "font-mono text-xs" : ""}`}
>
{value}
</div>
</div>
);
}
// ---------------------------------------------------------------------------
// Page
// ---------------------------------------------------------------------------
export default function RuntimesPage() {
const isLoading = useAuthStore((s) => s.isLoading);
const workspace = useWorkspaceStore((s) => s.workspace);
const [runtimes, setRuntimes] = useState<AgentRuntime[]>([]);
const [selectedId, setSelectedId] = useState<string>("");
const [fetching, setFetching] = useState(true);
const fetchRuntimes = useCallback(async () => {
if (!workspace) return;
try {
const data = await api.listRuntimes({ workspace_id: workspace.id });
setRuntimes(data);
} finally {
setFetching(false);
}
}, [workspace]);
useEffect(() => {
fetchRuntimes();
}, [fetchRuntimes]);
// Auto-select first runtime
useEffect(() => {
if (runtimes.length > 0 && !selectedId) {
setSelectedId(runtimes[0]!.id);
}
}, [runtimes, selectedId]);
// Real-time updates
const handleDaemonEvent = useCallback(() => {
fetchRuntimes();
}, [fetchRuntimes]);
useWSEvent("daemon:register", handleDaemonEvent);
useWSEvent("daemon:heartbeat", handleDaemonEvent);
const selected = runtimes.find((r) => r.id === selectedId) ?? null;
if (isLoading || fetching) {
return (
<div className="flex h-full items-center justify-center text-sm text-muted-foreground">
Loading...
</div>
);
}
return (
<div className="flex flex-1 min-h-0">
{/* Left column - runtime list */}
<div className="w-72 shrink-0 overflow-y-auto border-r">
<div className="flex h-12 items-center justify-between border-b px-4">
<h1 className="text-sm font-semibold">Runtimes</h1>
<span className="text-xs text-muted-foreground">
{runtimes.filter((r) => r.status === "online").length}/
{runtimes.length} online
</span>
</div>
{runtimes.length === 0 ? (
<div className="flex flex-col items-center justify-center px-4 py-12">
<Server className="h-8 w-8 text-muted-foreground/40" />
<p className="mt-3 text-sm text-muted-foreground">
No runtimes registered
</p>
<p className="mt-1 text-xs text-muted-foreground text-center">
Run{" "}
<code className="rounded bg-muted px-1 py-0.5">
multica daemon start
</code>{" "}
to register a local runtime.
</p>
</div>
) : (
<div className="divide-y">
{runtimes.map((runtime) => (
<RuntimeListItem
key={runtime.id}
runtime={runtime}
isSelected={runtime.id === selectedId}
onClick={() => setSelectedId(runtime.id)}
/>
))}
</div>
)}
</div>
{/* Right column - runtime detail */}
<div className="flex-1 overflow-hidden">
{selected ? (
<RuntimeDetail key={selected.id} runtime={selected} />
) : (
<div className="flex h-full flex-col items-center justify-center text-muted-foreground">
<Server className="h-10 w-10 text-muted-foreground/30" />
<p className="mt-3 text-sm">Select a runtime to view details</p>
</div>
)}
</div>
</div>
);
}

View file

@ -0,0 +1 @@
export { RuntimesPage } from "./components";

View file

@ -26,6 +26,8 @@ import type {
PersonalAccessToken,
CreatePersonalAccessTokenRequest,
CreatePersonalAccessTokenResponse,
RuntimeUsage,
RuntimePing,
} from "@multica/types";
import { type SDKLogger, noopLogger } from "./logger";
@ -232,6 +234,20 @@ export class ApiClient {
return this.fetch(`/api/runtimes?${search}`);
}
async getRuntimeUsage(runtimeId: string, params?: { days?: number }): Promise<RuntimeUsage[]> {
const search = new URLSearchParams();
if (params?.days) search.set("days", String(params.days));
return this.fetch(`/api/runtimes/${runtimeId}/usage?${search}`);
}
async pingRuntime(runtimeId: string): Promise<RuntimePing> {
return this.fetch(`/api/runtimes/${runtimeId}/ping`, { method: "POST" });
}
async getPingResult(runtimeId: string, pingId: string): Promise<RuntimePing> {
return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`);
}
async listAgentTasks(agentId: string): Promise<AgentTask[]> {
return this.fetch(`/api/agents/${agentId}/tasks`);
}

View file

@ -142,3 +142,27 @@ export interface UpdateSkillRequest {
export interface SetAgentSkillsRequest {
skill_ids: string[];
}
export type RuntimePingStatus = "pending" | "running" | "completed" | "failed" | "timeout";
export interface RuntimePing {
id: string;
runtime_id: string;
status: RuntimePingStatus;
output?: string;
error?: string;
duration_ms?: number;
created_at: string;
updated_at: string;
}
export interface RuntimeUsage {
runtime_id: string;
date: string;
provider: string;
model: string;
input_tokens: number;
output_tokens: number;
cache_read_tokens: number;
cache_write_tokens: number;
}

View file

@ -17,6 +17,9 @@ export type {
CreateSkillRequest,
UpdateSkillRequest,
SetAgentSkillsRequest,
RuntimeUsage,
RuntimePing,
RuntimePingStatus,
} from "./agent.js";
export type { Workspace, Member, MemberRole, User, MemberWithUser } from "./workspace.js";
export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox.js";

View file

@ -90,6 +90,8 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage)
r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult)
r.Post("/tasks/{taskId}/start", h.StartTask)
r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress)
@ -154,6 +156,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Route("/api/runtimes", func(r chi.Router) {
r.Get("/", h.ListAgentRuntimes)
r.Get("/{runtimeId}/usage", h.GetRuntimeUsage)
r.Post("/{runtimeId}/ping", h.InitiatePing)
r.Get("/{runtimeId}/ping/{pingId}", h.GetPing)
})
r.Post("/api/daemon/pairing-sessions/{token}/approve", h.ApproveDaemonPairingSession)

View file

@ -111,12 +111,37 @@ func (c *Client) FailTask(ctx context.Context, taskID, errMsg string) error {
}, nil)
}
func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) error {
return c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": runtimeID,
func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/usage", runtimeID), map[string]any{
"entries": entries,
}, nil)
}
// HeartbeatResponse contains the server's response to a heartbeat, including any pending actions.
type HeartbeatResponse struct {
Status string `json:"status"`
PendingPing *PendingPing `json:"pending_ping,omitempty"`
}
// PendingPing represents a ping test request from the server.
type PendingPing struct {
ID string `json:"id"`
}
func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*HeartbeatResponse, error) {
var resp HeartbeatResponse
if err := c.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": runtimeID,
}, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *Client) ReportPingResult(ctx context.Context, runtimeID, pingID string, result map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil)
}
func (c *Client) Register(ctx context.Context, req map[string]any) ([]Runtime, error) {
var resp struct {
Runtimes []Runtime `json:"runtimes"`

View file

@ -8,6 +8,7 @@ import (
"time"
"github.com/multica-ai/multica/server/internal/daemon/execenv"
"github.com/multica-ai/multica/server/internal/daemon/usage"
"github.com/multica-ai/multica/server/pkg/agent"
)
@ -54,7 +55,8 @@ func (d *Daemon) Run(ctx context.Context) error {
runtimeIDs = append(runtimeIDs, rt.ID)
}
go d.heartbeatLoop(ctx, runtimeIDs)
go d.heartbeatLoop(ctx, runtimes)
go d.usageScanLoop(ctx, runtimes)
return d.pollLoop(ctx, runtimeIDs)
}
@ -164,7 +166,7 @@ func (d *Daemon) ensurePaired(ctx context.Context) (string, error) {
}
}
func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) {
func (d *Daemon) heartbeatLoop(ctx context.Context, runtimes []Runtime) {
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
defer ticker.Stop()
@ -173,15 +175,155 @@ func (d *Daemon) heartbeatLoop(ctx context.Context, runtimeIDs []string) {
case <-ctx.Done():
return
case <-ticker.C:
for _, rid := range runtimeIDs {
if err := d.client.SendHeartbeat(ctx, rid); err != nil {
d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err)
for _, rt := range runtimes {
resp, err := d.client.SendHeartbeat(ctx, rt.ID)
if err != nil {
d.logger.Warn("heartbeat failed", "runtime_id", rt.ID, "error", err)
continue
}
// Handle pending ping requests.
if resp.PendingPing != nil {
go d.handlePing(ctx, rt, resp.PendingPing.ID)
}
}
}
}
}
func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
d.logger.Info("ping requested", "runtime_id", rt.ID, "ping_id", pingID, "provider", rt.Provider)
start := time.Now()
entry, ok := d.cfg.Agents[rt.Provider]
if !ok {
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
"status": "failed",
"error": fmt.Sprintf("no agent configured for provider %q", rt.Provider),
"duration_ms": time.Since(start).Milliseconds(),
})
return
}
backend, err := agent.New(rt.Provider, agent.Config{
ExecutablePath: entry.Path,
Logger: d.logger,
})
if err != nil {
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
"status": "failed",
"error": err.Error(),
"duration_ms": time.Since(start).Milliseconds(),
})
return
}
pingCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
session, err := backend.Execute(pingCtx, "Respond with exactly one word: pong", agent.ExecOptions{
MaxTurns: 1,
Timeout: 60 * time.Second,
})
if err != nil {
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
"status": "failed",
"error": err.Error(),
"duration_ms": time.Since(start).Milliseconds(),
})
return
}
// Drain messages
go func() {
for range session.Messages {
}
}()
result := <-session.Result
durationMs := time.Since(start).Milliseconds()
if result.Status == "completed" {
d.logger.Info("ping completed", "runtime_id", rt.ID, "ping_id", pingID, "duration_ms", durationMs)
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
"status": "completed",
"output": result.Output,
"duration_ms": durationMs,
})
} else {
errMsg := result.Error
if errMsg == "" {
errMsg = fmt.Sprintf("agent returned status: %s", result.Status)
}
d.logger.Warn("ping failed", "runtime_id", rt.ID, "ping_id", pingID, "error", errMsg)
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
"status": "failed",
"error": errMsg,
"duration_ms": durationMs,
})
}
}
func (d *Daemon) usageScanLoop(ctx context.Context, runtimes []Runtime) {
scanner := usage.NewScanner(d.logger)
// Build provider -> runtime ID mapping.
providerToRuntime := make(map[string]string)
for _, rt := range runtimes {
providerToRuntime[rt.Provider] = rt.ID
}
report := func() {
records := scanner.Scan()
if len(records) == 0 {
return
}
// Group records by provider to send to the correct runtime.
byProvider := make(map[string][]map[string]any)
for _, r := range records {
byProvider[r.Provider] = append(byProvider[r.Provider], map[string]any{
"date": r.Date,
"provider": r.Provider,
"model": r.Model,
"input_tokens": r.InputTokens,
"output_tokens": r.OutputTokens,
"cache_read_tokens": r.CacheReadTokens,
"cache_write_tokens": r.CacheWriteTokens,
})
}
for provider, entries := range byProvider {
runtimeID, ok := providerToRuntime[provider]
if !ok {
d.logger.Debug("no runtime for provider, skipping usage report", "provider", provider)
continue
}
if err := d.client.ReportUsage(ctx, runtimeID, entries); err != nil {
d.logger.Warn("usage report failed", "provider", provider, "runtime_id", runtimeID, "error", err)
} else {
d.logger.Info("usage reported", "provider", provider, "runtime_id", runtimeID, "entries", len(entries))
}
}
}
// Initial scan on startup.
report()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
report()
}
}
}
func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error {
pollOffset := 0
pollCount := 0

View file

@ -0,0 +1,173 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
"time"
)
// scanClaude reads Claude Code JSONL session logs from ~/.config/claude/projects/**/*.jsonl
// and extracts token usage from "assistant" message lines.
func (s *Scanner) scanClaude() []Record {
roots := claudeLogRoots()
if len(roots) == 0 {
return nil
}
var allRecords []Record
seen := make(map[string]bool) // dedup by "messageId:requestId"
for _, root := range roots {
files, err := filepath.Glob(filepath.Join(root, "**", "*.jsonl"))
if err != nil {
s.logger.Debug("claude glob error", "root", root, "error", err)
continue
}
// Also glob one level deeper for subagent logs
deeper, _ := filepath.Glob(filepath.Join(root, "**", "**", "*.jsonl"))
files = append(files, deeper...)
for _, f := range files {
records := s.parseClaudeFile(f, seen)
allRecords = append(allRecords, records...)
}
}
return mergeRecords(allRecords)
}
// claudeLogRoots returns the directories to scan for Claude JSONL logs.
func claudeLogRoots() []string {
var roots []string
// Check CLAUDE_CONFIG_DIR env var
if configDir := os.Getenv("CLAUDE_CONFIG_DIR"); configDir != "" {
for _, dir := range strings.Split(configDir, ",") {
dir = strings.TrimSpace(dir)
if dir != "" {
roots = append(roots, filepath.Join(dir, "projects"))
}
}
}
// Standard locations
home, err := os.UserHomeDir()
if err != nil {
return roots
}
candidates := []string{
filepath.Join(home, ".config", "claude", "projects"),
filepath.Join(home, ".claude", "projects"),
}
for _, dir := range candidates {
if info, err := os.Stat(dir); err == nil && info.IsDir() {
roots = append(roots, dir)
}
}
return roots
}
// claudeLine represents the subset of a Claude JSONL line we care about.
type claudeLine struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
RequestID string `json:"requestId"`
Message *struct {
ID string `json:"id"`
Model string `json:"model"`
Usage *struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
CacheCreationInputTokens int64 `json:"cache_creation_input_tokens"`
} `json:"usage"`
} `json:"message"`
}
func (s *Scanner) parseClaudeFile(path string, seen map[string]bool) []Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
var records []Record
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) // up to 1MB lines
for scanner.Scan() {
line := scanner.Bytes()
// Fast pre-filter: skip lines that can't contain what we need
if !bytesContains(line, `"type":"assistant"`) && !bytesContains(line, `"type": "assistant"`) {
continue
}
if !bytesContains(line, `"usage"`) {
continue
}
var entry claudeLine
if err := json.Unmarshal(line, &entry); err != nil {
continue
}
if entry.Type != "assistant" || entry.Message == nil || entry.Message.Usage == nil {
continue
}
// Dedup: Claude streaming produces multiple lines with same message.id + requestId
// with cumulative token counts. Take only the first occurrence.
dedupKey := entry.Message.ID + ":" + entry.RequestID
if dedupKey != ":" && seen[dedupKey] {
continue
}
if dedupKey != ":" {
seen[dedupKey] = true
}
// Parse timestamp to get date
ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
if err != nil {
ts, err = time.Parse(time.RFC3339, entry.Timestamp)
if err != nil {
continue
}
}
model := entry.Message.Model
if model == "" {
model = "unknown"
}
records = append(records, Record{
Date: ts.Local().Format("2006-01-02"),
Provider: "claude",
Model: normalizeClaudeModel(model),
InputTokens: entry.Message.Usage.InputTokens,
OutputTokens: entry.Message.Usage.OutputTokens,
CacheReadTokens: entry.Message.Usage.CacheReadInputTokens,
CacheWriteTokens: entry.Message.Usage.CacheCreationInputTokens,
})
}
return records
}
// normalizeClaudeModel strips common prefixes/suffixes from model names.
func normalizeClaudeModel(model string) string {
// Strip "anthropic." prefix
model = strings.TrimPrefix(model, "anthropic.")
// Strip Vertex AI prefixes like "us.anthropic."
if idx := strings.LastIndex(model, "anthropic."); idx >= 0 {
model = model[idx+len("anthropic."):]
}
return model
}
func bytesContains(data []byte, substr string) bool {
return strings.Contains(string(data), substr)
}

View file

@ -0,0 +1,171 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
)
// scanCodex reads Codex CLI session logs from ~/.codex/sessions/YYYY/MM/DD/*.jsonl
// and extracts token usage from "token_count" event lines.
func (s *Scanner) scanCodex() []Record {
root := codexLogRoot()
if root == "" {
return nil
}
// Glob for session files: ~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl
pattern := filepath.Join(root, "*", "*", "*", "*.jsonl")
files, err := filepath.Glob(pattern)
if err != nil {
s.logger.Debug("codex glob error", "error", err)
return nil
}
var allRecords []Record
for _, f := range files {
record := s.parseCodexFile(f)
if record != nil {
allRecords = append(allRecords, *record)
}
}
return mergeRecords(allRecords)
}
// codexLogRoot returns the Codex sessions directory.
func codexLogRoot() string {
if codexHome := os.Getenv("CODEX_HOME"); codexHome != "" {
dir := filepath.Join(codexHome, "sessions")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
}
home, err := os.UserHomeDir()
if err != nil {
return ""
}
dir := filepath.Join(home, ".codex", "sessions")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
return ""
}
// codexEvent represents a line in a Codex session JSONL file.
type codexEvent struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
Payload *struct {
Type string `json:"type"`
Msg json.RawMessage `json:"msg"`
} `json:"payload"`
}
// codexTokenCount represents the token_count info structure.
type codexTokenCount struct {
Info *struct {
TotalTokenUsage *struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CachedInputTokens int64 `json:"cached_input_tokens"`
ReasoningOutputTokens int64 `json:"reasoning_output_tokens"`
TotalTokens int64 `json:"total_tokens"`
} `json:"total_token_usage"`
Model string `json:"model"`
} `json:"info"`
}
// parseCodexFile extracts the final cumulative token_count from a Codex session file.
// Returns nil if no usage data found.
func (s *Scanner) parseCodexFile(path string) *Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
// Extract date from directory path: .../sessions/YYYY/MM/DD/file.jsonl
date := extractDateFromPath(path)
if date == "" {
return nil
}
var lastUsage *codexTokenCount
var lastModel string
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
// Fast pre-filter
if !bytesContains(line, `"token_count"`) {
continue
}
// Try direct event format: {"type": "event_msg", "payload": {"type": "token_count", ...}}
var evt codexEvent
if err := json.Unmarshal(line, &evt); err != nil {
continue
}
// Check if payload contains token_count
if evt.Payload != nil && evt.Payload.Type == "token_count" {
var tc codexTokenCount
if err := json.Unmarshal(evt.Payload.Msg, &tc); err == nil && tc.Info != nil && tc.Info.TotalTokenUsage != nil {
lastUsage = &tc
if tc.Info.Model != "" {
lastModel = tc.Info.Model
}
continue
}
}
// Also try flat format where msg is at top level
var tc codexTokenCount
if err := json.Unmarshal(line, &tc); err == nil && tc.Info != nil && tc.Info.TotalTokenUsage != nil {
lastUsage = &tc
if tc.Info.Model != "" {
lastModel = tc.Info.Model
}
}
}
if lastUsage == nil || lastUsage.Info == nil || lastUsage.Info.TotalTokenUsage == nil {
return nil
}
model := lastModel
if model == "" {
model = "unknown"
}
usage := lastUsage.Info.TotalTokenUsage
return &Record{
Date: date,
Provider: "codex",
Model: model,
InputTokens: usage.InputTokens,
OutputTokens: usage.OutputTokens + usage.ReasoningOutputTokens,
CacheReadTokens: usage.CachedInputTokens,
CacheWriteTokens: 0, // Codex doesn't have cache write tokens
}
}
// extractDateFromPath extracts YYYY-MM-DD from a path like .../sessions/2026/03/26/file.jsonl
func extractDateFromPath(path string) string {
parts := strings.Split(filepath.ToSlash(path), "/")
// Look for sessions/YYYY/MM/DD pattern
for i := 0; i < len(parts)-3; i++ {
if parts[i] == "sessions" && len(parts[i+1]) == 4 && len(parts[i+2]) == 2 && len(parts[i+3]) == 2 {
return parts[i+1] + "-" + parts[i+2] + "-" + parts[i+3]
}
}
return ""
}

View file

@ -0,0 +1,68 @@
package usage
import (
"log/slog"
)
// Record represents aggregated token usage for one (date, provider, model) tuple.
type Record struct {
Date string `json:"date"` // "2006-01-02"
Provider string `json:"provider"` // "claude" or "codex"
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// Scanner scans local CLI log files for token usage data.
type Scanner struct {
logger *slog.Logger
}
// NewScanner creates a new usage scanner.
func NewScanner(logger *slog.Logger) *Scanner {
return &Scanner{logger: logger}
}
// Scan reads local JSONL log files for both Claude Code and Codex CLI,
// and returns aggregated usage records keyed by (date, provider, model).
func (s *Scanner) Scan() []Record {
var records []Record
claudeRecords := s.scanClaude()
records = append(records, claudeRecords...)
codexRecords := s.scanCodex()
records = append(records, codexRecords...)
return records
}
// aggregation key for merging records.
type aggKey struct {
Date string
Provider string
Model string
}
func mergeRecords(records []Record) []Record {
m := make(map[aggKey]*Record)
for _, r := range records {
k := aggKey{Date: r.Date, Provider: r.Provider, Model: r.Model}
if existing, ok := m[k]; ok {
existing.InputTokens += r.InputTokens
existing.OutputTokens += r.OutputTokens
existing.CacheReadTokens += r.CacheReadTokens
existing.CacheWriteTokens += r.CacheWriteTokens
} else {
copy := r
m[k] = &copy
}
}
result := make([]Record, 0, len(m))
for _, r := range m {
result = append(result, *r)
}
return result
}

View file

@ -132,7 +132,15 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
}
slog.Debug("daemon heartbeat", "runtime_id", req.RuntimeID)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
resp := map[string]any{"status": "ok"}
// Check for pending ping requests for this runtime.
if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil {
resp["pending_ping"] = map[string]string{"id": pending.ID}
}
writeJSON(w, http.StatusOK, resp)
}
// ClaimTaskByRuntime atomically claims the next queued task for a runtime.

View file

@ -34,6 +34,7 @@ type Handler struct {
Bus *events.Bus
TaskService *service.TaskService
EmailService *service.EmailService
PingStore *PingStore
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService) *Handler {
@ -50,6 +51,7 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
Bus: bus,
TaskService: service.NewTaskService(queries, hub, bus),
EmailService: emailService,
PingStore: NewPingStore(),
}
}

View file

@ -3,7 +3,11 @@ package handler
import (
"encoding/json"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
@ -47,6 +51,114 @@ func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse {
}
}
// ---------------------------------------------------------------------------
// Runtime Usage
// ---------------------------------------------------------------------------
type RuntimeUsageEntry struct {
Date string `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
type RuntimeUsageResponse struct {
RuntimeID string `json:"runtime_id"`
Date string `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// ReportRuntimeUsage receives usage data from the daemon (unauthenticated daemon route).
func (h *Handler) ReportRuntimeUsage(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
if runtimeID == "" {
writeError(w, http.StatusBadRequest, "runtimeId is required")
return
}
var req struct {
Entries []RuntimeUsageEntry `json:"entries"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
for _, entry := range req.Entries {
date, err := time.Parse("2006-01-02", entry.Date)
if err != nil {
continue
}
h.Queries.UpsertRuntimeUsage(r.Context(), db.UpsertRuntimeUsageParams{
RuntimeID: parseUUID(runtimeID),
Date: pgtype.Date{Time: date, Valid: true},
Provider: entry.Provider,
Model: entry.Model,
InputTokens: entry.InputTokens,
OutputTokens: entry.OutputTokens,
CacheReadTokens: entry.CacheReadTokens,
CacheWriteTokens: entry.CacheWriteTokens,
})
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// GetRuntimeUsage returns usage data for a runtime (protected route).
func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
limit := int32(90)
if l := r.URL.Query().Get("days"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 365 {
limit = int32(parsed)
}
}
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
RuntimeID: parseUUID(runtimeID),
Limit: limit,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list usage")
return
}
resp := make([]RuntimeUsageResponse, len(rows))
for i, row := range rows {
resp[i] = RuntimeUsageResponse{
RuntimeID: runtimeID,
Date: row.Date.Time.Format("2006-01-02"),
Provider: row.Provider,
Model: row.Model,
InputTokens: row.InputTokens,
OutputTokens: row.OutputTokens,
CacheReadTokens: row.CacheReadTokens,
CacheWriteTokens: row.CacheWriteTokens,
}
}
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
if _, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found"); !ok {

View file

@ -0,0 +1,200 @@
package handler
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/go-chi/chi/v5"
)
// ---------------------------------------------------------------------------
// In-memory ping store
// ---------------------------------------------------------------------------
// PingStatus represents the lifecycle of a runtime ping test.
type PingStatus string
const (
PingPending PingStatus = "pending"
PingRunning PingStatus = "running"
PingCompleted PingStatus = "completed"
PingFailed PingStatus = "failed"
PingTimeout PingStatus = "timeout"
)
// PingRequest represents a pending or completed ping test.
type PingRequest struct {
ID string `json:"id"`
RuntimeID string `json:"runtime_id"`
Status PingStatus `json:"status"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// PingStore is a thread-safe in-memory store for ping requests.
// Pings expire after 2 minutes.
type PingStore struct {
mu sync.Mutex
pings map[string]*PingRequest // keyed by ping ID
}
func NewPingStore() *PingStore {
return &PingStore{
pings: make(map[string]*PingRequest),
}
}
func (s *PingStore) Create(runtimeID string) *PingRequest {
s.mu.Lock()
defer s.mu.Unlock()
// Clean up old pings for this runtime
for id, p := range s.pings {
if time.Since(p.CreatedAt) > 2*time.Minute {
delete(s.pings, id)
}
}
ping := &PingRequest{
ID: randomID(),
RuntimeID: runtimeID,
Status: PingPending,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
s.pings[ping.ID] = ping
return ping
}
func (s *PingStore) Get(id string) *PingRequest {
s.mu.Lock()
defer s.mu.Unlock()
p, ok := s.pings[id]
if !ok {
return nil
}
// Check for timeout
if p.Status == PingPending && time.Since(p.CreatedAt) > 60*time.Second {
p.Status = PingTimeout
p.Error = "daemon did not respond within 60 seconds"
p.UpdatedAt = time.Now()
}
return p
}
// PopPending returns and removes the oldest pending ping for a runtime.
func (s *PingStore) PopPending(runtimeID string) *PingRequest {
s.mu.Lock()
defer s.mu.Unlock()
var oldest *PingRequest
for _, p := range s.pings {
if p.RuntimeID == runtimeID && p.Status == PingPending {
if oldest == nil || p.CreatedAt.Before(oldest.CreatedAt) {
oldest = p
}
}
}
if oldest != nil {
oldest.Status = PingRunning
oldest.UpdatedAt = time.Now()
}
return oldest
}
func (s *PingStore) Complete(id string, output string, durationMs int64) {
s.mu.Lock()
defer s.mu.Unlock()
if p, ok := s.pings[id]; ok {
p.Status = PingCompleted
p.Output = output
p.DurationMs = durationMs
p.UpdatedAt = time.Now()
}
}
func (s *PingStore) Fail(id string, errMsg string, durationMs int64) {
s.mu.Lock()
defer s.mu.Unlock()
if p, ok := s.pings[id]; ok {
p.Status = PingFailed
p.Error = errMsg
p.DurationMs = durationMs
p.UpdatedAt = time.Now()
}
}
func randomID() string {
b := make([]byte, 16)
rand.Read(b)
return hex.EncodeToString(b)
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
// InitiatePing creates a new ping request (protected route, called by frontend).
func (h *Handler) InitiatePing(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
return
}
ping := h.PingStore.Create(runtimeID)
writeJSON(w, http.StatusOK, ping)
}
// GetPing returns the status of a ping request (protected route, called by frontend).
func (h *Handler) GetPing(w http.ResponseWriter, r *http.Request) {
pingID := chi.URLParam(r, "pingId")
ping := h.PingStore.Get(pingID)
if ping == nil {
writeError(w, http.StatusNotFound, "ping not found")
return
}
writeJSON(w, http.StatusOK, ping)
}
// ReportPingResult receives the ping result from the daemon.
func (h *Handler) ReportPingResult(w http.ResponseWriter, r *http.Request) {
pingID := chi.URLParam(r, "pingId")
var req struct {
Status string `json:"status"` // "completed" or "failed"
Output string `json:"output"`
Error string `json:"error"`
DurationMs int64 `json:"duration_ms"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.Status == "completed" {
h.PingStore.Complete(pingID, req.Output, req.DurationMs)
} else {
h.PingStore.Fail(pingID, req.Error, req.DurationMs)
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS runtime_usage;

View file

@ -0,0 +1,16 @@
CREATE TABLE runtime_usage (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
runtime_id UUID NOT NULL REFERENCES agent_runtime(id) ON DELETE CASCADE,
date DATE NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL DEFAULT '',
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (runtime_id, date, provider, model)
);
CREATE INDEX idx_runtime_usage_runtime_date ON runtime_usage(runtime_id, date DESC);

View file

@ -179,6 +179,20 @@ type Member struct {
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type RuntimeUsage struct {
ID pgtype.UUID `json:"id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Date pgtype.Date `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type PersonalAccessToken struct {
ID pgtype.UUID `json:"id"`
UserID pgtype.UUID `json:"user_id"`

View file

@ -0,0 +1,141 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: runtime_usage.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const upsertRuntimeUsage = `-- name: UpsertRuntimeUsage :exec
INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (runtime_id, date, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now()
`
type UpsertRuntimeUsageParams struct {
RuntimeID pgtype.UUID `json:"runtime_id"`
Date pgtype.Date `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
func (q *Queries) UpsertRuntimeUsage(ctx context.Context, arg UpsertRuntimeUsageParams) error {
_, err := q.db.Exec(ctx, upsertRuntimeUsage,
arg.RuntimeID,
arg.Date,
arg.Provider,
arg.Model,
arg.InputTokens,
arg.OutputTokens,
arg.CacheReadTokens,
arg.CacheWriteTokens,
)
return err
}
const listRuntimeUsage = `-- name: ListRuntimeUsage :many
SELECT id, runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM runtime_usage
WHERE runtime_id = $1
ORDER BY date DESC
LIMIT $2
`
type ListRuntimeUsageParams struct {
RuntimeID pgtype.UUID `json:"runtime_id"`
Limit int32 `json:"limit"`
}
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]RuntimeUsage, error) {
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Limit)
if err != nil {
return nil, err
}
defer rows.Close()
items := []RuntimeUsage{}
for rows.Next() {
var i RuntimeUsage
if err := rows.Scan(
&i.ID,
&i.RuntimeID,
&i.Date,
&i.Provider,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getRuntimeUsageSummary = `-- name: GetRuntimeUsageSummary :many
SELECT provider, model,
SUM(input_tokens)::bigint AS total_input_tokens,
SUM(output_tokens)::bigint AS total_output_tokens,
SUM(cache_read_tokens)::bigint AS total_cache_read_tokens,
SUM(cache_write_tokens)::bigint AS total_cache_write_tokens
FROM runtime_usage
WHERE runtime_id = $1
GROUP BY provider, model
ORDER BY provider, model
`
type GetRuntimeUsageSummaryRow struct {
Provider string `json:"provider"`
Model string `json:"model"`
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
}
func (q *Queries) GetRuntimeUsageSummary(ctx context.Context, runtimeID pgtype.UUID) ([]GetRuntimeUsageSummaryRow, error) {
rows, err := q.db.Query(ctx, getRuntimeUsageSummary, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []GetRuntimeUsageSummaryRow{}
for rows.Next() {
var i GetRuntimeUsageSummaryRow
if err := rows.Scan(
&i.Provider,
&i.Model,
&i.TotalInputTokens,
&i.TotalOutputTokens,
&i.TotalCacheReadTokens,
&i.TotalCacheWriteTokens,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}

View file

@ -0,0 +1,27 @@
-- name: UpsertRuntimeUsage :exec
INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (runtime_id, date, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now();
-- name: ListRuntimeUsage :many
SELECT * FROM runtime_usage
WHERE runtime_id = $1
ORDER BY date DESC
LIMIT $2;
-- name: GetRuntimeUsageSummary :many
SELECT provider, model,
SUM(input_tokens)::bigint AS total_input_tokens,
SUM(output_tokens)::bigint AS total_output_tokens,
SUM(cache_read_tokens)::bigint AS total_cache_read_tokens,
SUM(cache_write_tokens)::bigint AS total_cache_write_tokens
FROM runtime_usage
WHERE runtime_id = $1
GROUP BY provider, model
ORDER BY provider, model;