feat(runtime): add local codex daemon pairing

This commit is contained in:
Jiayuan Zhang 2026-03-24 12:03:14 +08:00
parent c6960d39b9
commit cdfa63af15
36 changed files with 2579 additions and 309 deletions

View file

@ -10,6 +10,17 @@ DATABASE_URL=postgres://multica:multica@localhost:5432/multica?sslmode=disable
PORT=8080
JWT_SECRET=change-me-in-production
MULTICA_SERVER_URL=ws://localhost:8080/ws
MULTICA_APP_URL=http://localhost:3000
MULTICA_DAEMON_CONFIG=
MULTICA_WORKSPACE_ID=
MULTICA_DAEMON_ID=
MULTICA_DAEMON_DEVICE_NAME=
MULTICA_DAEMON_POLL_INTERVAL=3s
MULTICA_DAEMON_HEARTBEAT_INTERVAL=15s
MULTICA_CODEX_PATH=codex
MULTICA_CODEX_MODEL=
MULTICA_CODEX_WORKDIR=
MULTICA_CODEX_TIMEOUT=20m
# Google OAuth
GOOGLE_CLIENT_ID=

View file

@ -113,7 +113,7 @@ dev:
cd server && go run ./cmd/server
daemon:
cd server && go run ./cmd/daemon
cd server && MULTICA_CODEX_WORKDIR="${MULTICA_CODEX_WORKDIR:-$(abspath .)}" go run ./cmd/daemon
build:
cd server && go build -o bin/server ./cmd/server

View file

@ -106,6 +106,27 @@ See [`.env.example`](.env.example) for all available variables:
- `PORT` — Backend server port (default: 8080)
- `FRONTEND_PORT` / `FRONTEND_ORIGIN` — Frontend port and browser origin
- `JWT_SECRET` — JWT signing secret
- `MULTICA_APP_URL` — Browser origin used when generating local runtime pairing links
- `MULTICA_DAEMON_CONFIG` — Optional path for the daemon's persisted local config
- `MULTICA_WORKSPACE_ID` — Optional dev override for the workspace id; normal usage should rely on browser pairing instead
- `MULTICA_DAEMON_ID` / `MULTICA_DAEMON_DEVICE_NAME` — Stable daemon identity for local runtime registration
- `MULTICA_CODEX_PATH` / `MULTICA_CODEX_MODEL` — Codex executable and optional model override for local task execution
- `MULTICA_CODEX_WORKDIR` — Default working directory used by the local Codex runtime
- `GOOGLE_CLIENT_ID` / `GOOGLE_CLIENT_SECRET` — Google OAuth (optional)
- `NEXT_PUBLIC_API_URL` — Frontend → backend API URL
- `NEXT_PUBLIC_WS_URL` — Frontend → backend WebSocket URL
## Local Codex Daemon
The local daemon currently supports one local runtime type: `codex`.
1. Start the daemon with `make daemon`.
2. If the daemon does not already know its workspace, it prints a pairing link in the terminal.
3. Open that link in the browser, sign in, and choose the workspace that should own the local Codex runtime.
4. The daemon stores the approved workspace locally in `MULTICA_DAEMON_CONFIG` or `~/.multica/daemon.json`.
5. The daemon registers the local Codex runtime via `/api/daemon/register`.
6. Create an agent in Multica and bind it to that runtime.
7. Assign an issue to the agent and move the issue to `todo`.
8. The daemon claims the task, runs `codex exec`, and reports the final comment back to the issue.
For local development you can still set `MULTICA_WORKSPACE_ID` directly to skip pairing, but that should be treated as a debug shortcut rather than the normal flow.

View file

@ -6,6 +6,7 @@ import userEvent from "@testing-library/user-event";
vi.mock("next/navigation", () => ({
useRouter: () => ({ push: vi.fn() }),
usePathname: () => "/login",
useSearchParams: () => new URLSearchParams(),
}));
// Mock auth-context
@ -68,7 +69,7 @@ describe("LoginPage", () => {
await user.click(screen.getByRole("button", { name: "Sign in" }));
await waitFor(() => {
expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", "Test User");
expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", "Test User", undefined);
});
});
@ -81,7 +82,7 @@ describe("LoginPage", () => {
await user.click(screen.getByRole("button", { name: "Sign in" }));
await waitFor(() => {
expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", undefined);
expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", undefined, undefined);
});
});

View file

@ -1,10 +1,12 @@
"use client";
import { useState } from "react";
import { useSearchParams } from "next/navigation";
import { useAuth } from "../../../lib/auth-context";
export default function LoginPage() {
const { login, isLoading } = useAuth();
const searchParams = useSearchParams();
const [email, setEmail] = useState("");
const [name, setName] = useState("");
const [error, setError] = useState("");
@ -19,7 +21,7 @@ export default function LoginPage() {
setError("");
setSubmitting(true);
try {
await login(email, name || undefined);
await login(email, name || undefined, searchParams.get("next") || undefined);
} catch (err) {
setError("Login failed. Make sure the server is running.");
setSubmitting(false);

View file

@ -72,50 +72,8 @@ function generateId(): string {
return `${Date.now()}-${Math.random().toString(36).slice(2, 9)}`;
}
// ---------------------------------------------------------------------------
// Mock Runtime Devices (will be replaced with real daemon registration API)
// ---------------------------------------------------------------------------
const MOCK_RUNTIME_DEVICES: RuntimeDevice[] = [
{
id: "runtime-cloud",
name: "Multica Agent",
runtime_mode: "cloud",
status: "online",
device_info: "Cloud",
},
{
id: "runtime-macbook",
name: "Jiayuan's MacBook Pro",
runtime_mode: "local",
status: "online",
device_info: "macOS 15.4 · Claude Code v1.2",
},
{
id: "runtime-linux",
name: "Dev Server (gpu-01)",
runtime_mode: "local",
status: "online",
device_info: "Ubuntu 24.04 · Codex v0.8",
},
{
id: "runtime-ci",
name: "CI Runner",
runtime_mode: "local",
status: "offline",
device_info: "Linux · GitHub Actions",
},
];
function getRuntimeDevice(agent: Agent): RuntimeDevice | undefined {
const runtimeId = agent.runtime_config?.runtime_id as string | undefined;
if (runtimeId) {
return MOCK_RUNTIME_DEVICES.find((d) => d.id === runtimeId);
}
if (agent.runtime_mode === "cloud") {
return MOCK_RUNTIME_DEVICES.find((d) => d.runtime_mode === "cloud");
}
return undefined;
function getRuntimeDevice(agent: Agent, runtimes: RuntimeDevice[]): RuntimeDevice | undefined {
return runtimes.find((runtime) => runtime.id === agent.runtime_id);
}
// ---------------------------------------------------------------------------
@ -123,32 +81,36 @@ function getRuntimeDevice(agent: Agent): RuntimeDevice | undefined {
// ---------------------------------------------------------------------------
function CreateAgentDialog({
runtimes,
onClose,
onCreate,
}: {
runtimes: RuntimeDevice[];
onClose: () => void;
onCreate: (data: CreateAgentRequest) => Promise<void>;
}) {
const [name, setName] = useState("");
const [description, setDescription] = useState("");
const [selectedRuntimeId, setSelectedRuntimeId] = useState(MOCK_RUNTIME_DEVICES[0]!.id);
const [selectedRuntimeId, setSelectedRuntimeId] = useState(runtimes[0]?.id ?? "");
const [creating, setCreating] = useState(false);
const [runtimeOpen, setRuntimeOpen] = useState(false);
const selectedRuntime = MOCK_RUNTIME_DEVICES.find((d) => d.id === selectedRuntimeId)!;
useEffect(() => {
if (!selectedRuntimeId && runtimes[0]) {
setSelectedRuntimeId(runtimes[0].id);
}
}, [runtimes, selectedRuntimeId]);
const selectedRuntime = runtimes.find((d) => d.id === selectedRuntimeId) ?? null;
const handleSubmit = async () => {
if (!name.trim()) return;
if (!name.trim() || !selectedRuntime) return;
setCreating(true);
try {
await onCreate({
name: name.trim(),
description: description.trim(),
runtime_mode: selectedRuntime.runtime_mode,
runtime_config: {
runtime_id: selectedRuntime.id,
runtime_name: selectedRuntime.name,
},
triggers: [{ id: generateId(), type: "on_assign", enabled: true, config: {} }],
});
onClose();
@ -205,23 +167,28 @@ function CreateAgentDialog({
<button
type="button"
onClick={() => setRuntimeOpen(!runtimeOpen)}
disabled={runtimes.length === 0}
className="flex w-full items-center gap-3 rounded-md border bg-background px-3 py-2.5 text-left text-sm transition-colors hover:bg-accent/50 focus:outline-none focus:ring-2 focus:ring-ring"
>
{selectedRuntime.runtime_mode === "cloud" ? (
{selectedRuntime?.runtime_mode === "cloud" ? (
<Cloud className="h-4 w-4 shrink-0 text-muted-foreground" />
) : (
<Monitor className="h-4 w-4 shrink-0 text-muted-foreground" />
)}
<div className="min-w-0 flex-1">
<div className="flex items-center gap-2">
<span className="truncate font-medium">{selectedRuntime.name}</span>
{selectedRuntime.runtime_mode === "cloud" && (
<span className="truncate font-medium">
{selectedRuntime?.name ?? "No runtime available"}
</span>
{selectedRuntime?.runtime_mode === "cloud" && (
<span className="shrink-0 rounded bg-blue-500/10 px-1.5 py-0.5 text-[10px] font-medium text-blue-600">
Cloud
</span>
)}
</div>
<div className="text-xs text-muted-foreground">{selectedRuntime.device_info}</div>
<div className="text-xs text-muted-foreground">
{selectedRuntime?.device_info ?? "Register a runtime before creating an agent"}
</div>
</div>
<ChevronDown className={`h-4 w-4 shrink-0 text-muted-foreground transition-transform ${runtimeOpen ? "rotate-180" : ""}`} />
</button>
@ -230,7 +197,7 @@ function CreateAgentDialog({
<>
<div className="fixed inset-0 z-40" onClick={() => setRuntimeOpen(false)} />
<div className="absolute left-0 right-0 top-full z-50 mt-1 max-h-60 overflow-y-auto rounded-lg border bg-popover p-1 shadow-md">
{MOCK_RUNTIME_DEVICES.map((device) => (
{runtimes.map((device) => (
<button
key={device.id}
onClick={() => {
@ -280,7 +247,7 @@ function CreateAgentDialog({
</button>
<button
onClick={handleSubmit}
disabled={creating || !name.trim()}
disabled={creating || !name.trim() || !selectedRuntime}
className="rounded-md bg-primary px-3 py-1.5 text-sm font-medium text-primary-foreground hover:bg-primary/90 disabled:opacity-50"
>
{creating ? "Creating..." : "Create"}
@ -899,15 +866,17 @@ const detailTabs: { id: DetailTab; label: string; icon: typeof FileText }[] = [
function AgentDetail({
agent,
runtimes,
onUpdate,
onDelete,
}: {
agent: Agent;
runtimes: RuntimeDevice[];
onUpdate: (id: string, data: Partial<Agent>) => Promise<void>;
onDelete: (id: string) => Promise<void>;
}) {
const st = statusConfig[agent.status];
const runtimeDevice = getRuntimeDevice(agent);
const runtimeDevice = getRuntimeDevice(agent, runtimes);
const [activeTab, setActiveTab] = useState<DetailTab>("skills");
const [showMenu, setShowMenu] = useState(false);
const [confirmDelete, setConfirmDelete] = useState(false);
@ -1055,9 +1024,21 @@ function AgentDetail({
// ---------------------------------------------------------------------------
export default function AgentsPage() {
const { agents, refreshAgents, isLoading } = useAuth();
const { agents, refreshAgents, workspace, isLoading } = useAuth();
const [selectedId, setSelectedId] = useState<string>("");
const [showCreate, setShowCreate] = useState(false);
const [runtimes, setRuntimes] = useState<RuntimeDevice[]>([]);
useEffect(() => {
if (!workspace) {
setRuntimes([]);
return;
}
api
.listRuntimes({ workspace_id: workspace.id })
.then(setRuntimes)
.catch(() => setRuntimes([]));
}, [workspace]);
// Select first agent on initial load
useEffect(() => {
@ -1147,6 +1128,7 @@ export default function AgentsPage() {
{selected ? (
<AgentDetail
agent={selected}
runtimes={runtimes}
onUpdate={handleUpdate}
onDelete={handleDelete}
/>
@ -1167,6 +1149,7 @@ export default function AgentsPage() {
{showCreate && (
<CreateAgentDialog
runtimes={runtimes}
onClose={() => setShowCreate(false)}
onCreate={handleCreate}
/>

View file

@ -260,7 +260,7 @@ export default function SettingsPage() {
Name
</label>
<input
type="text"
type="search"
value={profileName}
onChange={(e) => setProfileName(e.target.value)}
className="mt-1 w-full rounded-md border bg-background px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-ring"
@ -291,7 +291,7 @@ export default function SettingsPage() {
className="flex items-center gap-1.5 rounded-md bg-primary px-3 py-1.5 text-xs font-medium text-primary-foreground hover:bg-primary/90 disabled:opacity-50"
>
<Save className="h-3 w-3" />
{profileSaving ? "Saving..." : "Save"}
{profileSaving ? "Updating..." : "Update Profile"}
</button>
</div>
</div>

View file

@ -0,0 +1,155 @@
"use client";
import Link from "next/link";
import { useEffect, useMemo, useState } from "react";
import { useSearchParams } from "next/navigation";
import type { DaemonPairingSession } from "@multica/types";
import { api } from "../../../lib/api";
import { useAuth } from "../../../lib/auth-context";
function formatExpiresAt(value: string) {
return new Date(value).toLocaleString("en-US", {
month: "short",
day: "numeric",
hour: "numeric",
minute: "2-digit",
});
}
export default function LocalDaemonPairPage() {
const searchParams = useSearchParams();
const token = searchParams.get("token") ?? "";
const { user, workspaces, workspace, isLoading } = useAuth();
const [session, setSession] = useState<DaemonPairingSession | null>(null);
const [selectedWorkspaceId, setSelectedWorkspaceId] = useState("");
const [loading, setLoading] = useState(true);
const [submitting, setSubmitting] = useState(false);
const [error, setError] = useState("");
const nextLoginURL = useMemo(() => {
const next = `/pair/local?token=${encodeURIComponent(token)}`;
return `/login?next=${encodeURIComponent(next)}`;
}, [token]);
useEffect(() => {
if (!token) {
setError("Missing pairing token.");
setLoading(false);
return;
}
setLoading(true);
api.getDaemonPairingSession(token)
.then((value) => {
setSession(value);
setSelectedWorkspaceId(value.workspace_id || workspace?.id || workspaces[0]?.id || "");
})
.catch((err) => setError(err instanceof Error ? err.message : "Failed to load pairing session."))
.finally(() => setLoading(false));
}, [token, workspace?.id, workspaces]);
const approve = async () => {
if (!token || !selectedWorkspaceId) return;
setSubmitting(true);
setError("");
try {
const approved = await api.approveDaemonPairingSession(token, {
workspace_id: selectedWorkspaceId,
});
setSession(approved);
} catch (err) {
setError(err instanceof Error ? err.message : "Failed to approve pairing session.");
} finally {
setSubmitting(false);
}
};
return (
<div className="flex min-h-screen items-center justify-center bg-canvas px-6 py-12">
<div className="w-full max-w-xl rounded-2xl border bg-background p-8 shadow-sm">
<div>
<h1 className="text-2xl font-semibold">Connect Local Codex Runtime</h1>
<p className="mt-2 text-sm text-muted-foreground">
Approve this pairing request to register your local Codex runtime with a workspace.
</p>
</div>
{loading || isLoading ? (
<div className="mt-8 text-sm text-muted-foreground">Loading pairing session...</div>
) : error ? (
<div className="mt-8 rounded-lg border border-red-200 bg-red-50 px-4 py-3 text-sm text-red-700">
{error}
</div>
) : session ? (
<>
<div className="mt-6 rounded-xl border bg-muted/30 p-4">
<div className="text-sm font-medium">{session.runtime_name}</div>
<div className="mt-1 text-sm text-muted-foreground">
{session.device_name}
{session.runtime_version ? ` · ${session.runtime_version}` : ""}
</div>
<div className="mt-1 text-xs uppercase tracking-wide text-muted-foreground">
{session.runtime_type}
</div>
<div className="mt-3 text-xs text-muted-foreground">
Expires {formatExpiresAt(session.expires_at)}
</div>
</div>
{!user ? (
<div className="mt-6 space-y-3">
<p className="text-sm text-muted-foreground">
Sign in first, then choose which workspace should own this local runtime.
</p>
<Link
href={nextLoginURL}
className="inline-flex rounded-md bg-primary px-4 py-2 text-sm font-medium text-primary-foreground hover:bg-primary/90"
>
Sign in to continue
</Link>
</div>
) : session.status === "approved" || session.status === "claimed" ? (
<div className="mt-6 rounded-xl border border-emerald-200 bg-emerald-50 px-4 py-3 text-sm text-emerald-800">
This runtime is linked to a workspace. Return to the daemon window to finish setup.
</div>
) : session.status === "expired" ? (
<div className="mt-6 rounded-xl border border-amber-200 bg-amber-50 px-4 py-3 text-sm text-amber-800">
This pairing link expired. Restart the daemon to generate a new link.
</div>
) : workspaces.length === 0 ? (
<div className="mt-6 rounded-xl border px-4 py-3 text-sm text-muted-foreground">
You do not have a workspace yet. Create one first, then reopen this pairing link.
</div>
) : (
<div className="mt-6 space-y-4">
<div>
<label className="mb-2 block text-sm font-medium">Workspace</label>
<select
value={selectedWorkspaceId}
onChange={(e) => setSelectedWorkspaceId(e.target.value)}
className="w-full rounded-md border bg-background px-3 py-2 text-sm"
>
{workspaces.map((item) => (
<option key={item.id} value={item.id}>
{item.name}
</option>
))}
</select>
</div>
<button
type="button"
onClick={approve}
disabled={submitting || !selectedWorkspaceId}
className="inline-flex rounded-md bg-primary px-4 py-2 text-sm font-medium text-primary-foreground hover:bg-primary/90 disabled:opacity-50"
>
{submitting ? "Registering..." : "Register runtime"}
</button>
</div>
)}
</>
) : null}
</div>
</div>
);
}

View file

@ -76,6 +76,7 @@ const mockAgents: Agent[] = [
{
id: "agent-1",
workspace_id: "ws-1",
runtime_id: "runtime-1",
name: "Claude",
description: "",
avatar_url: null,

View file

@ -19,7 +19,7 @@ interface AuthContextValue {
members: MemberWithUser[];
agents: Agent[];
isLoading: boolean;
login: (email: string, name?: string) => Promise<void>;
login: (email: string, name?: string, redirectTo?: string) => Promise<void>;
logout: () => void;
switchWorkspace: (workspaceId: string) => Promise<void>;
createWorkspace: (data: { name: string; slug: string; description?: string }) => Promise<Workspace>;
@ -121,7 +121,7 @@ export function AuthProvider({ children }: { children: ReactNode }) {
})();
}, [hydrateWorkspace]);
const login = useCallback(async (email: string, name?: string) => {
const login = useCallback(async (email: string, name?: string, redirectTo?: string) => {
const { token, user: u } = await api.login(email, name);
api.setToken(token);
localStorage.setItem("multica_token", token);
@ -130,7 +130,7 @@ export function AuthProvider({ children }: { children: ReactNode }) {
const wsList = await api.listWorkspaces();
await hydrateWorkspace(wsList);
router.push("/issues");
router.push(redirectTo || "/issues");
}, [hydrateWorkspace, router]);
const logout = useCallback(() => {

View file

@ -43,6 +43,7 @@ export const mockAgents: Agent[] = [
{
id: "agent-1",
workspace_id: "ws-1",
runtime_id: "runtime-1",
name: "Claude Agent",
description: "",
avatar_url: null,

View file

@ -11,6 +11,9 @@ import type {
CreateAgentRequest,
UpdateAgentRequest,
AgentTask,
AgentRuntime,
DaemonPairingSession,
ApproveDaemonPairingSessionRequest,
InboxItem,
Comment,
Workspace,
@ -187,10 +190,31 @@ export class ApiClient {
await this.fetch(`/api/agents/${id}`, { method: "DELETE" });
}
async listRuntimes(params?: { workspace_id?: string }): Promise<AgentRuntime[]> {
const search = new URLSearchParams();
const wsId = params?.workspace_id ?? this.workspaceId;
if (wsId) search.set("workspace_id", wsId);
return this.fetch(`/api/runtimes?${search}`);
}
async listAgentTasks(agentId: string): Promise<AgentTask[]> {
return this.fetch(`/api/agents/${agentId}/tasks`);
}
async getDaemonPairingSession(token: string): Promise<DaemonPairingSession> {
return this.fetch(`/api/daemon/pairing-sessions/${token}`);
}
async approveDaemonPairingSession(
token: string,
data: ApproveDaemonPairingSessionRequest,
): Promise<DaemonPairingSession> {
return this.fetch(`/api/daemon/pairing-sessions/${token}/approve`, {
method: "POST",
body: JSON.stringify(data),
});
}
// Inbox
async listInbox(): Promise<InboxItem[]> {
return this.fetch("/api/inbox");

View file

@ -8,12 +8,21 @@ export type AgentTriggerType = "on_assign" | "scheduled";
export interface RuntimeDevice {
id: string;
workspace_id: string;
daemon_id: string | null;
name: string;
runtime_mode: AgentRuntimeMode;
provider: string;
status: "online" | "offline";
device_info: string;
metadata: Record<string, unknown>;
last_seen_at: string | null;
created_at: string;
updated_at: string;
}
export type AgentRuntime = RuntimeDevice;
export interface AgentTool {
id: string;
name: string;
@ -33,6 +42,7 @@ export interface AgentTrigger {
export interface AgentTask {
id: string;
agent_id: string;
runtime_id: string;
issue_id: string;
status: "queued" | "dispatched" | "running" | "completed" | "failed" | "cancelled";
priority: number;
@ -47,6 +57,7 @@ export interface AgentTask {
export interface Agent {
id: string;
workspace_id: string;
runtime_id: string;
name: string;
description: string;
avatar_url: string | null;
@ -67,7 +78,7 @@ export interface CreateAgentRequest {
name: string;
description?: string;
avatar_url?: string;
runtime_mode?: AgentRuntimeMode;
runtime_id: string;
runtime_config?: Record<string, unknown>;
visibility?: AgentVisibility;
max_concurrent_tasks?: number;
@ -80,6 +91,7 @@ export interface UpdateAgentRequest {
name?: string;
description?: string;
avatar_url?: string;
runtime_id?: string;
runtime_config?: Record<string, unknown>;
visibility?: AgentVisibility;
status?: AgentStatus;

View file

@ -0,0 +1,22 @@
export type DaemonPairingSessionStatus = "pending" | "approved" | "claimed" | "expired";
export interface DaemonPairingSession {
token: string;
daemon_id: string;
device_name: string;
runtime_name: string;
runtime_type: string;
runtime_version: string;
workspace_id: string | null;
status: DaemonPairingSessionStatus;
approved_at: string | null;
claimed_at: string | null;
expires_at: string;
created_at: string;
updated_at: string;
link_url?: string | null;
}
export interface ApproveDaemonPairingSessionRequest {
workspace_id: string;
}

View file

@ -8,6 +8,7 @@ export type {
AgentTool,
AgentTrigger,
AgentTask,
AgentRuntime,
RuntimeDevice,
CreateAgentRequest,
UpdateAgentRequest,
@ -15,5 +16,6 @@ export type {
export type { Workspace, Member, MemberRole, User, MemberWithUser } from "./workspace.js";
export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox.js";
export type { Comment, CommentType, CommentAuthorType } from "./comment.js";
export type { DaemonPairingSession, DaemonPairingSessionStatus, ApproveDaemonPairingSessionRequest } from "./daemon.js";
export type * from "./events.js";
export type * from "./api.js";

826
server/cmd/daemon/daemon.go Normal file
View file

@ -0,0 +1,826 @@
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
const (
defaultServerURL = "ws://localhost:8080/ws"
defaultDaemonConfigPath = ".multica/daemon.json"
defaultPollInterval = 3 * time.Second
defaultHeartbeatInterval = 15 * time.Second
defaultCodexTimeout = 20 * time.Minute
defaultRuntimeName = "Local Codex"
defaultCodexPath = "codex"
)
type config struct {
ServerBaseURL string
ConfigPath string
WorkspaceID string
DaemonID string
DeviceName string
RuntimeName string
CodexPath string
CodexModel string
DefaultWorkdir string
PollInterval time.Duration
HeartbeatInterval time.Duration
CodexTimeout time.Duration
}
type daemon struct {
cfg config
client *daemonClient
logger *log.Logger
}
type daemonClient struct {
baseURL string
client *http.Client
}
type daemonRuntime struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
Status string `json:"status"`
}
type daemonPairingSession struct {
Token string `json:"token"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
WorkspaceID *string `json:"workspace_id"`
Status string `json:"status"`
ApprovedAt *string `json:"approved_at"`
ClaimedAt *string `json:"claimed_at"`
ExpiresAt string `json:"expires_at"`
LinkURL *string `json:"link_url"`
}
type daemonPersistedConfig struct {
WorkspaceID string `json:"workspace_id"`
}
type daemonTask struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
IssueID string `json:"issue_id"`
Context daemonTaskContext `json:"context"`
}
type daemonTaskContext struct {
Issue daemonIssueContext `json:"issue"`
Agent daemonAgentContext `json:"agent"`
Runtime daemonRuntimeContext `json:"runtime"`
}
type daemonIssueContext struct {
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
AcceptanceCriteria []string `json:"acceptance_criteria"`
ContextRefs []string `json:"context_refs"`
Repository *daemonRepoRef `json:"repository"`
}
type daemonAgentContext struct {
ID string `json:"id"`
Name string `json:"name"`
Skills string `json:"skills"`
}
type daemonRuntimeContext struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
DeviceInfo string `json:"device_info"`
}
type daemonRepoRef struct {
URL string `json:"url"`
Branch string `json:"branch"`
Path string `json:"path"`
}
type codexTaskResult struct {
Status string `json:"status"`
Comment string `json:"comment"`
}
func loadConfig() (config, error) {
serverBaseURL, err := normalizeServerBaseURL(envOrDefault("MULTICA_SERVER_URL", defaultServerURL))
if err != nil {
return config{}, err
}
configPath, err := resolveDaemonConfigPath(strings.TrimSpace(os.Getenv("MULTICA_DAEMON_CONFIG")))
if err != nil {
return config{}, err
}
persisted, err := loadPersistedDaemonConfig(configPath)
if err != nil {
return config{}, err
}
workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID"))
if workspaceID == "" {
workspaceID = persisted.WorkspaceID
}
codexPath := envOrDefault("MULTICA_CODEX_PATH", defaultCodexPath)
if _, err := exec.LookPath(codexPath); err != nil {
return config{}, fmt.Errorf("codex executable not found at %q: %w", codexPath, err)
}
host, err := os.Hostname()
if err != nil || strings.TrimSpace(host) == "" {
host = "local-machine"
}
defaultWorkdir := strings.TrimSpace(os.Getenv("MULTICA_CODEX_WORKDIR"))
if defaultWorkdir == "" {
defaultWorkdir, err = os.Getwd()
if err != nil {
return config{}, fmt.Errorf("resolve working directory: %w", err)
}
}
defaultWorkdir, err = filepath.Abs(defaultWorkdir)
if err != nil {
return config{}, fmt.Errorf("resolve absolute workdir: %w", err)
}
pollInterval, err := durationFromEnv("MULTICA_DAEMON_POLL_INTERVAL", defaultPollInterval)
if err != nil {
return config{}, err
}
heartbeatInterval, err := durationFromEnv("MULTICA_DAEMON_HEARTBEAT_INTERVAL", defaultHeartbeatInterval)
if err != nil {
return config{}, err
}
codexTimeout, err := durationFromEnv("MULTICA_CODEX_TIMEOUT", defaultCodexTimeout)
if err != nil {
return config{}, err
}
return config{
ServerBaseURL: serverBaseURL,
ConfigPath: configPath,
WorkspaceID: workspaceID,
DaemonID: envOrDefault("MULTICA_DAEMON_ID", host),
DeviceName: envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host),
RuntimeName: envOrDefault("MULTICA_CODEX_RUNTIME_NAME", defaultRuntimeName),
CodexPath: codexPath,
CodexModel: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")),
DefaultWorkdir: defaultWorkdir,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
CodexTimeout: codexTimeout,
}, nil
}
func newDaemon(cfg config, logger *log.Logger) *daemon {
return &daemon{
cfg: cfg,
client: &daemonClient{baseURL: cfg.ServerBaseURL, client: &http.Client{Timeout: 30 * time.Second}},
logger: logger,
}
}
func (d *daemon) run(ctx context.Context) error {
d.logger.Printf("starting daemon for workspace=%s server=%s runtime=%s workdir=%s",
d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.RuntimeName, d.cfg.DefaultWorkdir)
if strings.TrimSpace(d.cfg.WorkspaceID) == "" {
workspaceID, err := d.ensurePaired(ctx)
if err != nil {
return err
}
d.cfg.WorkspaceID = workspaceID
d.logger.Printf("pairing completed for workspace=%s", workspaceID)
}
runtime, err := d.registerRuntime(ctx)
if err != nil {
return err
}
d.logger.Printf("registered runtime id=%s provider=%s status=%s", runtime.ID, runtime.Provider, runtime.Status)
go d.heartbeatLoop(ctx, runtime.ID)
return d.pollLoop(ctx, runtime.ID)
}
func (d *daemon) registerRuntime(ctx context.Context) (daemonRuntime, error) {
version, err := detectCodexVersion(ctx, d.cfg.CodexPath)
if err != nil {
return daemonRuntime{}, err
}
req := map[string]any{
"workspace_id": d.cfg.WorkspaceID,
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtimes": []map[string]string{
{
"name": d.cfg.RuntimeName,
"type": "codex",
"version": version,
"status": "online",
},
},
}
var resp struct {
Runtimes []daemonRuntime `json:"runtimes"`
}
if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil {
return daemonRuntime{}, fmt.Errorf("register runtime: %w", err)
}
if len(resp.Runtimes) == 0 {
return daemonRuntime{}, fmt.Errorf("register runtime: empty response")
}
return resp.Runtimes[0], nil
}
func (d *daemon) ensurePaired(ctx context.Context) (string, error) {
version, err := detectCodexVersion(ctx, d.cfg.CodexPath)
if err != nil {
return "", err
}
session, err := d.client.createPairingSession(ctx, map[string]string{
"daemon_id": d.cfg.DaemonID,
"device_name": d.cfg.DeviceName,
"runtime_name": d.cfg.RuntimeName,
"runtime_type": "codex",
"runtime_version": version,
})
if err != nil {
return "", fmt.Errorf("create pairing session: %w", err)
}
if session.LinkURL != nil {
d.logger.Printf("open this link to pair the local Codex runtime: %s", *session.LinkURL)
} else {
d.logger.Printf("pairing session created: %s", session.Token)
}
for {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
current, err := d.client.getPairingSession(ctx, session.Token)
if err != nil {
return "", fmt.Errorf("poll pairing session: %w", err)
}
switch current.Status {
case "approved", "claimed":
if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" {
return "", fmt.Errorf("pairing session approved without workspace")
}
if err := savePersistedDaemonConfig(d.cfg.ConfigPath, daemonPersistedConfig{
WorkspaceID: strings.TrimSpace(*current.WorkspaceID),
}); err != nil {
return "", err
}
if current.Status != "claimed" {
if _, err := d.client.claimPairingSession(ctx, current.Token); err != nil {
return "", fmt.Errorf("claim pairing session: %w", err)
}
}
return strings.TrimSpace(*current.WorkspaceID), nil
case "expired":
return "", fmt.Errorf("pairing session expired before approval")
}
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return "", err
}
}
}
func (d *daemon) heartbeatLoop(ctx context.Context, runtimeID string) {
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{
"runtime_id": runtimeID,
}, nil)
if err != nil {
d.logger.Printf("heartbeat failed: %v", err)
}
}
}
}
func (d *daemon) pollLoop(ctx context.Context, runtimeID string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
task, err := d.client.claimTask(ctx, runtimeID)
if err != nil {
d.logger.Printf("claim task failed: %v", err)
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
}
continue
}
if task == nil {
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
return err
}
continue
}
d.handleTask(ctx, *task)
}
}
func (d *daemon) handleTask(ctx context.Context, task daemonTask) {
d.logger.Printf("picked task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title)
if err := d.client.startTask(ctx, task.ID); err != nil {
d.logger.Printf("start task %s failed: %v", task.ID, err)
return
}
_ = d.client.reportProgress(ctx, task.ID, "Launching Codex", 1, 2)
result, err := d.runTask(ctx, task)
if err != nil {
d.logger.Printf("task %s failed: %v", task.ID, err)
if failErr := d.client.failTask(ctx, task.ID, err.Error()); failErr != nil {
d.logger.Printf("fail task %s callback failed: %v", task.ID, failErr)
}
return
}
_ = d.client.reportProgress(ctx, task.ID, "Finishing task", 2, 2)
switch result.Status {
case "blocked":
if err := d.client.failTask(ctx, task.ID, result.Comment); err != nil {
d.logger.Printf("report blocked task %s failed: %v", task.ID, err)
}
default:
if err := d.client.completeTask(ctx, task.ID, result.Comment); err != nil {
d.logger.Printf("complete task %s failed: %v", task.ID, err)
}
}
}
func (d *daemon) runTask(ctx context.Context, task daemonTask) (codexTaskResult, error) {
workdir, err := resolveTaskWorkdir(d.cfg.DefaultWorkdir, task.Context.Issue.Repository)
if err != nil {
return codexTaskResult{}, err
}
prompt := buildCodexPrompt(task, workdir)
runCtx, cancel := context.WithTimeout(ctx, d.cfg.CodexTimeout)
defer cancel()
model := d.cfg.CodexModel
if model == "" {
model = "default"
}
startedAt := time.Now()
d.logger.Printf(
"starting codex exec task=%s workdir=%s model=%s timeout=%s",
task.ID,
workdir,
model,
d.cfg.CodexTimeout,
)
result, err := runCodexExec(runCtx, d.cfg, workdir, prompt)
if err != nil {
d.logger.Printf(
"codex exec failed task=%s duration=%s err=%v",
task.ID,
time.Since(startedAt).Round(time.Millisecond),
err,
)
if errors.Is(runCtx.Err(), context.DeadlineExceeded) {
return codexTaskResult{}, fmt.Errorf("Codex timed out after %s", d.cfg.CodexTimeout)
}
return codexTaskResult{}, err
}
d.logger.Printf(
"codex exec finished task=%s duration=%s status=%s",
task.ID,
time.Since(startedAt).Round(time.Millisecond),
result.Status,
)
return result, nil
}
func runCodexExec(ctx context.Context, cfg config, workdir, prompt string) (codexTaskResult, error) {
outputFile, err := os.CreateTemp("", "multica-codex-output-*.json")
if err != nil {
return codexTaskResult{}, fmt.Errorf("create codex output file: %w", err)
}
outputPath := outputFile.Name()
outputFile.Close()
defer os.Remove(outputPath)
schemaFile, err := os.CreateTemp("", "multica-codex-schema-*.json")
if err != nil {
return codexTaskResult{}, fmt.Errorf("create schema file: %w", err)
}
schemaPath := schemaFile.Name()
if _, err := schemaFile.WriteString(codexResultSchema); err != nil {
schemaFile.Close()
return codexTaskResult{}, fmt.Errorf("write schema file: %w", err)
}
schemaFile.Close()
defer os.Remove(schemaPath)
args := []string{
"-a", "never",
"exec",
"--skip-git-repo-check",
"--sandbox", "workspace-write",
"-C", workdir,
"--output-schema", schemaPath,
"-o", outputPath,
prompt,
}
if cfg.CodexModel != "" {
args = append([]string{"-m", cfg.CodexModel}, args...)
}
cmd := exec.CommandContext(ctx, cfg.CodexPath, args...)
var output bytes.Buffer
cmd.Stdout = &output
cmd.Stderr = &output
if err := cmd.Run(); err != nil {
return codexTaskResult{}, fmt.Errorf("codex exec failed: %w\n%s", err, strings.TrimSpace(output.String()))
}
data, err := os.ReadFile(outputPath)
if err != nil {
return codexTaskResult{}, fmt.Errorf("read codex result: %w", err)
}
var result codexTaskResult
if err := json.Unmarshal(data, &result); err != nil {
return codexTaskResult{}, fmt.Errorf("parse codex result: %w", err)
}
if result.Comment == "" {
return codexTaskResult{}, fmt.Errorf("codex returned empty comment")
}
if result.Status == "" {
result.Status = "completed"
}
return result, nil
}
func buildCodexPrompt(task daemonTask, workdir string) string {
var b strings.Builder
b.WriteString("You are running as the local Codex runtime for a Multica agent.\n")
b.WriteString("Complete the assigned issue using the local environment.\n")
b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n")
b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n")
fmt.Fprintf(&b, "Working directory: %s\n", workdir)
fmt.Fprintf(&b, "Agent: %s\n", task.Context.Agent.Name)
fmt.Fprintf(&b, "Issue title: %s\n\n", task.Context.Issue.Title)
if task.Context.Issue.Description != "" {
b.WriteString("Issue description:\n")
b.WriteString(task.Context.Issue.Description)
b.WriteString("\n\n")
}
if len(task.Context.Issue.AcceptanceCriteria) > 0 {
b.WriteString("Acceptance criteria:\n")
for _, item := range task.Context.Issue.AcceptanceCriteria {
fmt.Fprintf(&b, "- %s\n", item)
}
b.WriteString("\n")
}
if len(task.Context.Issue.ContextRefs) > 0 {
b.WriteString("Context refs:\n")
for _, item := range task.Context.Issue.ContextRefs {
fmt.Fprintf(&b, "- %s\n", item)
}
b.WriteString("\n")
}
if repo := task.Context.Issue.Repository; repo != nil {
b.WriteString("Repository context:\n")
if repo.URL != "" {
fmt.Fprintf(&b, "- url: %s\n", repo.URL)
}
if repo.Branch != "" {
fmt.Fprintf(&b, "- branch: %s\n", repo.Branch)
}
if repo.Path != "" {
fmt.Fprintf(&b, "- path: %s\n", repo.Path)
}
b.WriteString("\n")
}
if task.Context.Agent.Skills != "" {
b.WriteString("Agent skills/instructions:\n")
b.WriteString(task.Context.Agent.Skills)
b.WriteString("\n\n")
}
b.WriteString("Comment requirements:\n")
b.WriteString("- Lead with the outcome.\n")
b.WriteString("- Mention concrete files or commands if you changed anything.\n")
b.WriteString("- Mention blockers or follow-up actions if relevant.\n")
return b.String()
}
func resolveTaskWorkdir(defaultWorkdir string, repo *daemonRepoRef) (string, error) {
base := defaultWorkdir
if repo == nil || strings.TrimSpace(repo.Path) == "" {
return base, nil
}
path := strings.TrimSpace(repo.Path)
if !filepath.IsAbs(path) {
path = filepath.Join(base, path)
}
path = filepath.Clean(path)
info, err := os.Stat(path)
if err != nil {
return "", fmt.Errorf("repository path not found: %s", path)
}
if !info.IsDir() {
return "", fmt.Errorf("repository path is not a directory: %s", path)
}
return path, nil
}
func detectCodexVersion(ctx context.Context, codexPath string) (string, error) {
cmd := exec.CommandContext(ctx, codexPath, "--version")
data, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("detect codex version: %w", err)
}
return strings.TrimSpace(string(data)), nil
}
func resolveDaemonConfigPath(raw string) (string, error) {
if raw != "" {
return filepath.Abs(raw)
}
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("resolve daemon config path: %w", err)
}
return filepath.Join(home, defaultDaemonConfigPath), nil
}
func loadPersistedDaemonConfig(path string) (daemonPersistedConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return daemonPersistedConfig{}, nil
}
return daemonPersistedConfig{}, fmt.Errorf("read daemon config: %w", err)
}
var cfg daemonPersistedConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return daemonPersistedConfig{}, fmt.Errorf("parse daemon config: %w", err)
}
return cfg, nil
}
func savePersistedDaemonConfig(path string, cfg daemonPersistedConfig) error {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("create daemon config directory: %w", err)
}
data, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return fmt.Errorf("encode daemon config: %w", err)
}
if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil {
return fmt.Errorf("write daemon config: %w", err)
}
return nil
}
func normalizeServerBaseURL(raw string) (string, error) {
u, err := url.Parse(strings.TrimSpace(raw))
if err != nil {
return "", fmt.Errorf("invalid MULTICA_SERVER_URL: %w", err)
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
case "http", "https":
default:
return "", fmt.Errorf("MULTICA_SERVER_URL must use ws, wss, http, or https")
}
if u.Path == "/ws" {
u.Path = ""
}
u.RawPath = ""
u.RawQuery = ""
u.Fragment = ""
return strings.TrimRight(u.String(), "/"), nil
}
func durationFromEnv(key string, fallback time.Duration) (time.Duration, error) {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback, nil
}
d, err := time.ParseDuration(value)
if err != nil {
return 0, fmt.Errorf("%s: invalid duration %q: %w", key, value, err)
}
return d, nil
}
func envOrDefault(key, fallback string) string {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback
}
return value
}
func sleepWithContext(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func (c *daemonClient) claimTask(ctx context.Context, runtimeID string) (*daemonTask, error) {
var resp struct {
Task *daemonTask `json:"task"`
}
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil {
return nil, err
}
return resp.Task, nil
}
func (c *daemonClient) createPairingSession(ctx context.Context, req map[string]string) (daemonPairingSession, error) {
var resp daemonPairingSession
if err := c.postJSON(ctx, "/api/daemon/pairing-sessions", req, &resp); err != nil {
return daemonPairingSession{}, err
}
return resp, nil
}
func (c *daemonClient) getPairingSession(ctx context.Context, token string) (daemonPairingSession, error) {
var resp daemonPairingSession
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s", url.PathEscape(token)), &resp); err != nil {
return daemonPairingSession{}, err
}
return resp, nil
}
func (c *daemonClient) claimPairingSession(ctx context.Context, token string) (daemonPairingSession, error) {
var resp daemonPairingSession
if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s/claim", url.PathEscape(token)), map[string]any{}, &resp); err != nil {
return daemonPairingSession{}, err
}
return resp, nil
}
func (c *daemonClient) startTask(ctx context.Context, taskID string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil)
}
func (c *daemonClient) reportProgress(ctx context.Context, taskID, summary string, step, total int) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{
"summary": summary,
"step": step,
"total": total,
}, nil)
}
func (c *daemonClient) completeTask(ctx context.Context, taskID, output string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), map[string]any{
"output": output,
}, nil)
}
func (c *daemonClient) failTask(ctx context.Context, taskID, errMsg string) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{
"error": errMsg,
}, nil)
}
func (c *daemonClient) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
var body io.Reader
if reqBody != nil {
data, err := json.Marshal(reqBody)
if err != nil {
return err
}
body = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("%s %s returned %d: %s", http.MethodPost, path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}
func (c *daemonClient) getJSON(ctx context.Context, path string, respBody any) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("%s %s returned %d: %s", http.MethodGet, path, resp.StatusCode, strings.TrimSpace(string(data)))
}
if respBody == nil {
io.Copy(io.Discard, resp.Body)
return nil
}
return json.NewDecoder(resp.Body).Decode(respBody)
}
const codexResultSchema = `{
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["completed", "blocked"]
},
"comment": {
"type": "string"
}
},
"required": ["status", "comment"],
"additionalProperties": false
}`

View file

@ -0,0 +1,63 @@
package main
import (
"os"
"path/filepath"
"strings"
"testing"
)
func TestNormalizeServerBaseURL(t *testing.T) {
t.Parallel()
got, err := normalizeServerBaseURL("ws://localhost:8080/ws")
if err != nil {
t.Fatalf("normalizeServerBaseURL returned error: %v", err)
}
if got != "http://localhost:8080" {
t.Fatalf("expected http://localhost:8080, got %s", got)
}
}
func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) {
t.Parallel()
root := t.TempDir()
repoPath := filepath.Join(root, "repo")
if err := os.Mkdir(repoPath, 0o755); err != nil {
t.Fatalf("mkdir repo: %v", err)
}
got, err := resolveTaskWorkdir(root, &daemonRepoRef{Path: "repo"})
if err != nil {
t.Fatalf("resolveTaskWorkdir returned error: %v", err)
}
if got != repoPath {
t.Fatalf("expected %s, got %s", repoPath, got)
}
}
func TestBuildCodexPromptIncludesIssueAndSkills(t *testing.T) {
t.Parallel()
prompt := buildCodexPrompt(daemonTask{
Context: daemonTaskContext{
Issue: daemonIssueContext{
Title: "Fix failing test",
Description: "Investigate and fix the test failure.",
AcceptanceCriteria: []string{"tests pass"},
ContextRefs: []string{"log snippet"},
},
Agent: daemonAgentContext{
Name: "Local Codex",
Skills: "Be concise.",
},
},
}, "/tmp/work")
for _, want := range []string{"Fix failing test", "Investigate and fix the test failure.", "tests pass", "log snippet", "Be concise."} {
if !strings.Contains(prompt, want) {
t.Fatalf("prompt missing %q", want)
}
}
}

View file

@ -1,7 +1,8 @@
package main
import (
"fmt"
"context"
"errors"
"log"
"os"
"os/signal"
@ -9,24 +10,18 @@ import (
)
func main() {
serverURL := os.Getenv("MULTICA_SERVER_URL")
if serverURL == "" {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
serverURL = "ws://localhost:" + port + "/ws"
cfg, err := loadConfig()
if err != nil {
log.Fatal(err)
}
fmt.Println("Multica Daemon starting...")
fmt.Printf("Connecting to server: %s\n", serverURL)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// TODO: Implement daemon connection, heartbeat, and task runner
log.Println("Daemon is running. Press Ctrl+C to stop.")
logger := log.New(os.Stdout, "multica-daemon: ", log.LstdFlags)
d := newDaemon(cfg, logger)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Daemon stopped")
if err := d.run(ctx); err != nil && !errors.Is(err, context.Canceled) {
logger.Fatal(err)
}
}

View file

@ -114,10 +114,37 @@ func main() {
continue
}
err = pool.QueryRow(ctx, `
INSERT INTO agent (workspace_id, name, description, runtime_mode, status, owner_id, skills, tools, triggers)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb)
INSERT INTO agent_runtime (workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at)
VALUES (
$1,
NULL,
$2,
$3,
$4,
$5,
$6,
'{"seed":true}'::jsonb,
CASE WHEN $5 = 'online' THEN now() ELSE NULL END
)
RETURNING id
`, workspaceID, a.name, a.description, a.runtimeMode, a.status, userID, a.skills, a.tools, a.triggers).Scan(&agentID)
`,
workspaceID,
a.name+" Runtime",
a.runtimeMode,
map[string]string{"cloud": "multica_agent", "local": "seed_local"}[a.runtimeMode],
map[bool]string{true: "offline", false: "online"}[a.status == "offline"],
map[string]string{"cloud": "Seeded cloud runtime", "local": "Seeded local runtime"}[a.runtimeMode],
).Scan(&agentID)
if err != nil {
log.Printf("Failed to create runtime for agent %s: %v", a.name, err)
continue
}
runtimeID := agentID
err = pool.QueryRow(ctx, `
INSERT INTO agent (workspace_id, name, description, runtime_mode, runtime_id, status, owner_id, skills, tools, triggers)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb, $10::jsonb)
RETURNING id
`, workspaceID, a.name, a.description, a.runtimeMode, runtimeID, a.status, userID, a.skills, a.tools, a.triggers).Scan(&agentID)
if err != nil {
log.Printf("Failed to create agent %s: %v", a.name, err)
continue

View file

@ -52,7 +52,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router {
r.Use(chimw.RequestID)
r.Use(cors.Handler(cors.Options{
AllowedOrigins: allowedOrigins(),
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-Workspace-ID"},
AllowCredentials: true,
MaxAge: 300,
@ -74,14 +74,16 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router {
// Daemon API routes (no user auth; daemon auth deferred to later)
r.Route("/api/daemon", func(r chi.Router) {
r.Post("/pairing-sessions", h.CreateDaemonPairingSession)
r.Get("/pairing-sessions/{token}", h.GetDaemonPairingSession)
r.Post("/pairing-sessions/{token}/claim", h.ClaimDaemonPairingSession)
r.Post("/register", h.DaemonRegister)
r.Post("/heartbeat", h.DaemonHeartbeat)
// Task claiming (daemon polls for work)
r.Post("/agents/{agentId}/tasks/claim", h.ClaimTask)
r.Get("/agents/{agentId}/tasks/pending", h.ListPendingTasks)
r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
// Task lifecycle (daemon reports status)
r.Post("/tasks/{taskId}/start", h.StartTask)
r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress)
r.Post("/tasks/{taskId}/complete", h.CompleteTask)
@ -127,6 +129,12 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router {
})
})
r.Route("/api/runtimes", func(r chi.Router) {
r.Get("/", h.ListAgentRuntimes)
})
r.Post("/api/daemon/pairing-sessions/{token}/approve", h.ApproveDaemonPairingSession)
// Inbox
r.Route("/api/inbox", func(r chi.Router) {
r.Get("/", h.ListInbox)

View file

@ -12,6 +12,7 @@ import (
type AgentResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
RuntimeID string `json:"runtime_id"`
Name string `json:"name"`
Description string `json:"description"`
AvatarURL *string `json:"avatar_url"`
@ -56,6 +57,7 @@ func agentToResponse(a db.Agent) AgentResponse {
return AgentResponse{
ID: uuidToString(a.ID),
WorkspaceID: uuidToString(a.WorkspaceID),
RuntimeID: uuidToString(a.RuntimeID),
Name: a.Name,
Description: a.Description,
AvatarURL: textToPtr(a.AvatarUrl),
@ -76,6 +78,7 @@ func agentToResponse(a db.Agent) AgentResponse {
type AgentTaskResponse struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
@ -100,6 +103,7 @@ func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse {
return AgentTaskResponse{
ID: uuidToString(t.ID),
AgentID: uuidToString(t.AgentID),
RuntimeID: uuidToString(t.RuntimeID),
IssueID: uuidToString(t.IssueID),
Status: t.Status,
Priority: t.Priority,
@ -146,7 +150,7 @@ type CreateAgentRequest struct {
Name string `json:"name"`
Description string `json:"description"`
AvatarURL *string `json:"avatar_url"`
RuntimeMode string `json:"runtime_mode"`
RuntimeID string `json:"runtime_id"`
RuntimeConfig any `json:"runtime_config"`
Visibility string `json:"visibility"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
@ -176,8 +180,9 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "name is required")
return
}
if req.RuntimeMode == "" {
req.RuntimeMode = "local"
if req.RuntimeID == "" {
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
if req.Visibility == "" {
req.Visibility = "workspace"
@ -186,6 +191,15 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
req.MaxConcurrentTasks = 1
}
runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{
ID: parseUUID(req.RuntimeID),
WorkspaceID: parseUUID(workspaceID),
})
if err != nil {
writeError(w, http.StatusBadRequest, "invalid runtime_id")
return
}
rc, _ := json.Marshal(req.RuntimeConfig)
if req.RuntimeConfig == nil {
rc = []byte("{}")
@ -206,8 +220,9 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
Name: req.Name,
Description: req.Description,
AvatarUrl: ptrToText(req.AvatarURL),
RuntimeMode: req.RuntimeMode,
RuntimeMode: runtime.RuntimeMode,
RuntimeConfig: rc,
RuntimeID: runtime.ID,
Visibility: req.Visibility,
MaxConcurrentTasks: req.MaxConcurrentTasks,
OwnerID: parseUUID(ownerID),
@ -220,6 +235,11 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
return
}
if runtime.Status == "online" {
h.TaskService.ReconcileAgentStatus(r.Context(), agent.ID)
agent, _ = h.Queries.GetAgent(r.Context(), agent.ID)
}
writeJSON(w, http.StatusCreated, agentToResponse(agent))
}
@ -227,6 +247,7 @@ type UpdateAgentRequest struct {
Name *string `json:"name"`
Description *string `json:"description"`
AvatarURL *string `json:"avatar_url"`
RuntimeID *string `json:"runtime_id"`
RuntimeConfig any `json:"runtime_config"`
Visibility *string `json:"visibility"`
Status *string `json:"status"`
@ -268,6 +289,18 @@ func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) {
rc, _ := json.Marshal(req.RuntimeConfig)
params.RuntimeConfig = rc
}
if req.RuntimeID != nil {
runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{
ID: parseUUID(*req.RuntimeID),
WorkspaceID: agent.WorkspaceID,
})
if err != nil {
writeError(w, http.StatusBadRequest, "invalid runtime_id")
return
}
params.RuntimeID = runtime.ID
params.RuntimeMode = pgtype.Text{String: runtime.RuntimeMode, Valid: true}
}
if req.Visibility != nil {
params.Visibility = pgtype.Text{String: *req.Visibility, Valid: true}
}

View file

@ -2,7 +2,9 @@ package handler
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
@ -13,9 +15,11 @@ import (
// ---------------------------------------------------------------------------
type DaemonRegisterRequest struct {
WorkspaceID string `json:"workspace_id"`
DaemonID string `json:"daemon_id"`
AgentID string `json:"agent_id"`
DeviceName string `json:"device_name"`
Runtimes []struct {
Name string `json:"name"`
Type string `json:"type"`
Version string `json:"version"`
Status string `json:"status"`
@ -29,36 +33,68 @@ func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
return
}
if req.DaemonID == "" || req.AgentID == "" {
writeError(w, http.StatusBadRequest, "daemon_id and agent_id are required")
if req.DaemonID == "" {
writeError(w, http.StatusBadRequest, "daemon_id is required")
return
}
if req.WorkspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
if len(req.Runtimes) == 0 {
writeError(w, http.StatusBadRequest, "at least one runtime is required")
return
}
runtimeInfo, _ := json.Marshal(req.Runtimes)
resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes))
for _, runtime := range req.Runtimes {
provider := strings.TrimSpace(runtime.Type)
if provider == "" {
provider = "unknown"
}
name := strings.TrimSpace(runtime.Name)
if name == "" {
name = provider
if req.DeviceName != "" {
name = fmt.Sprintf("%s (%s)", provider, req.DeviceName)
}
}
deviceInfo := strings.TrimSpace(req.DeviceName)
if runtime.Version != "" && deviceInfo != "" {
deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version)
} else if runtime.Version != "" {
deviceInfo = runtime.Version
}
status := "online"
if runtime.Status == "offline" {
status = "offline"
}
metadata, _ := json.Marshal(map[string]any{
"version": runtime.Version,
})
conn, err := h.Queries.UpsertDaemonConnection(r.Context(), db.UpsertDaemonConnectionParams{
AgentID: parseUUID(req.AgentID),
DaemonID: req.DaemonID,
RuntimeInfo: runtimeInfo,
registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{
WorkspaceID: parseUUID(req.WorkspaceID),
DaemonID: strToText(req.DaemonID),
Name: name,
RuntimeMode: "local",
Provider: provider,
Status: status,
DeviceInfo: deviceInfo,
Metadata: metadata,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to register daemon: "+err.Error())
writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error())
return
}
resp = append(resp, runtimeToResponse(registered))
}
// Reconcile agent status (set to idle if no running tasks)
h.TaskService.ReconcileAgentStatus(r.Context(), parseUUID(req.AgentID))
writeJSON(w, http.StatusOK, map[string]any{
"connection_id": uuidToString(conn.ID),
"status": conn.Status,
})
writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp})
}
type DaemonHeartbeatRequest struct {
DaemonID string `json:"daemon_id"`
AgentID string `json:"agent_id"`
CurrentTasks int `json:"current_tasks"`
RuntimeID string `json:"runtime_id"`
}
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
@ -68,10 +104,12 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
return
}
err := h.Queries.UpdateDaemonHeartbeat(r.Context(), db.UpdateDaemonHeartbeatParams{
DaemonID: req.DaemonID,
AgentID: parseUUID(req.AgentID),
})
if req.RuntimeID == "" {
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
_, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "heartbeat failed")
return
@ -80,15 +118,11 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// ---------------------------------------------------------------------------
// Task Lifecycle (called by daemon)
// ---------------------------------------------------------------------------
// ClaimTaskByRuntime atomically claims the next queued task for a runtime.
func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
// ClaimTask atomically claims the next queued task for an agent.
func (h *Handler) ClaimTask(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentId")
task, err := h.TaskService.ClaimTask(r.Context(), parseUUID(agentID))
task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error())
return
@ -102,11 +136,11 @@ func (h *Handler) ClaimTask(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(*task)})
}
// ListPendingTasks returns queued/dispatched tasks for an agent.
func (h *Handler) ListPendingTasks(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentId")
// ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime.
func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
tasks, err := h.Queries.ListPendingTasks(r.Context(), parseUUID(agentID))
tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list pending tasks")
return
@ -120,6 +154,10 @@ func (h *Handler) ListPendingTasks(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp)
}
// ---------------------------------------------------------------------------
// Task Lifecycle (called by daemon)
// ---------------------------------------------------------------------------
// StartTask marks a dispatched task as running.
func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")

View file

@ -0,0 +1,386 @@
package handler
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
)
const daemonPairingTTL = 10 * time.Minute
type daemonPairingSessionRecord struct {
Token string
DaemonID string
DeviceName string
RuntimeName string
RuntimeType string
RuntimeVersion string
WorkspaceID pgtype.UUID
ApprovedBy pgtype.UUID
Status string
ApprovedAt pgtype.Timestamptz
ClaimedAt pgtype.Timestamptz
ExpiresAt pgtype.Timestamptz
CreatedAt pgtype.Timestamptz
UpdatedAt pgtype.Timestamptz
}
type DaemonPairingSessionResponse struct {
Token string `json:"token"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
WorkspaceID *string `json:"workspace_id"`
Status string `json:"status"`
ApprovedAt *string `json:"approved_at"`
ClaimedAt *string `json:"claimed_at"`
ExpiresAt string `json:"expires_at"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
LinkURL *string `json:"link_url,omitempty"`
}
type CreateDaemonPairingSessionRequest struct {
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
}
type ApproveDaemonPairingSessionRequest struct {
WorkspaceID string `json:"workspace_id"`
}
func daemonAppBaseURL() string {
for _, key := range []string{"MULTICA_APP_URL", "FRONTEND_ORIGIN"} {
if value := strings.TrimSpace(os.Getenv(key)); value != "" {
return strings.TrimRight(value, "/")
}
}
return "http://localhost:3000"
}
func daemonPairingLinkURL(token string) string {
base := daemonAppBaseURL()
return base + "/pair/local?token=" + url.QueryEscape(token)
}
func daemonPairingSessionToResponse(rec daemonPairingSessionRecord, includeLink bool) DaemonPairingSessionResponse {
resp := DaemonPairingSessionResponse{
Token: rec.Token,
DaemonID: rec.DaemonID,
DeviceName: rec.DeviceName,
RuntimeName: rec.RuntimeName,
RuntimeType: rec.RuntimeType,
RuntimeVersion: rec.RuntimeVersion,
WorkspaceID: uuidToPtr(rec.WorkspaceID),
Status: rec.Status,
ApprovedAt: timestampToPtr(rec.ApprovedAt),
ClaimedAt: timestampToPtr(rec.ClaimedAt),
ExpiresAt: timestampToString(rec.ExpiresAt),
CreatedAt: timestampToString(rec.CreatedAt),
UpdatedAt: timestampToString(rec.UpdatedAt),
}
if includeLink {
link := daemonPairingLinkURL(rec.Token)
resp.LinkURL = &link
}
return resp
}
func randomDaemonPairingToken() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func (h *Handler) getDaemonPairingSession(ctx context.Context, token string) (daemonPairingSessionRecord, error) {
if h.DB == nil {
return daemonPairingSessionRecord{}, fmt.Errorf("database executor is not configured")
}
var rec daemonPairingSessionRecord
err := h.DB.QueryRow(ctx, `
SELECT
token,
daemon_id,
device_name,
runtime_name,
runtime_type,
runtime_version,
workspace_id,
approved_by,
status,
approved_at,
claimed_at,
expires_at,
created_at,
updated_at
FROM daemon_pairing_session
WHERE token = $1
`, token).Scan(
&rec.Token,
&rec.DaemonID,
&rec.DeviceName,
&rec.RuntimeName,
&rec.RuntimeType,
&rec.RuntimeVersion,
&rec.WorkspaceID,
&rec.ApprovedBy,
&rec.Status,
&rec.ApprovedAt,
&rec.ClaimedAt,
&rec.ExpiresAt,
&rec.CreatedAt,
&rec.UpdatedAt,
)
if err != nil {
return daemonPairingSessionRecord{}, err
}
if rec.Status == "pending" && rec.ExpiresAt.Valid && rec.ExpiresAt.Time.Before(time.Now()) {
if _, err := h.DB.Exec(ctx, `
UPDATE daemon_pairing_session
SET status = 'expired', updated_at = now()
WHERE token = $1 AND status = 'pending'
`, token); err == nil {
rec.Status = "expired"
rec.UpdatedAt = pgtype.Timestamptz{Time: time.Now(), Valid: true}
}
}
return rec, nil
}
func (h *Handler) CreateDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
if h.DB == nil {
writeError(w, http.StatusInternalServerError, "database executor is not configured")
return
}
var req CreateDaemonPairingSessionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
req.DaemonID = strings.TrimSpace(req.DaemonID)
req.DeviceName = strings.TrimSpace(req.DeviceName)
req.RuntimeName = strings.TrimSpace(req.RuntimeName)
req.RuntimeType = strings.TrimSpace(req.RuntimeType)
req.RuntimeVersion = strings.TrimSpace(req.RuntimeVersion)
if req.DaemonID == "" {
writeError(w, http.StatusBadRequest, "daemon_id is required")
return
}
if req.DeviceName == "" {
writeError(w, http.StatusBadRequest, "device_name is required")
return
}
if req.RuntimeName == "" {
writeError(w, http.StatusBadRequest, "runtime_name is required")
return
}
if req.RuntimeType == "" {
writeError(w, http.StatusBadRequest, "runtime_type is required")
return
}
token, err := randomDaemonPairingToken()
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create pairing token")
return
}
expiresAt := time.Now().Add(daemonPairingTTL)
var rec daemonPairingSessionRecord
err = h.DB.QueryRow(r.Context(), `
INSERT INTO daemon_pairing_session (
token,
daemon_id,
device_name,
runtime_name,
runtime_type,
runtime_version,
expires_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING
token,
daemon_id,
device_name,
runtime_name,
runtime_type,
runtime_version,
workspace_id,
approved_by,
status,
approved_at,
claimed_at,
expires_at,
created_at,
updated_at
`,
token,
req.DaemonID,
req.DeviceName,
req.RuntimeName,
req.RuntimeType,
req.RuntimeVersion,
expiresAt,
).Scan(
&rec.Token,
&rec.DaemonID,
&rec.DeviceName,
&rec.RuntimeName,
&rec.RuntimeType,
&rec.RuntimeVersion,
&rec.WorkspaceID,
&rec.ApprovedBy,
&rec.Status,
&rec.ApprovedAt,
&rec.ClaimedAt,
&rec.ExpiresAt,
&rec.CreatedAt,
&rec.UpdatedAt,
)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create pairing session")
return
}
writeJSON(w, http.StatusCreated, daemonPairingSessionToResponse(rec, true))
}
func (h *Handler) GetDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
rec, err := h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusNotFound, "pairing session not found")
return
}
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
}
func (h *Handler) ApproveDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
rec, err := h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusNotFound, "pairing session not found")
return
}
if rec.Status == "expired" {
writeError(w, http.StatusBadRequest, "pairing session expired")
return
}
if rec.Status == "claimed" {
writeError(w, http.StatusBadRequest, "pairing session already claimed")
return
}
if rec.Status == "approved" {
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
return
}
var req ApproveDaemonPairingSessionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.WorkspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
if _, ok := h.requireWorkspaceMember(w, r, req.WorkspaceID, "workspace not found"); !ok {
return
}
if h.DB == nil {
writeError(w, http.StatusInternalServerError, "database executor is not configured")
return
}
if _, err := h.DB.Exec(r.Context(), `
UPDATE daemon_pairing_session
SET
workspace_id = $2,
approved_by = $3,
status = 'approved',
approved_at = now(),
updated_at = now()
WHERE token = $1 AND status = 'pending'
`, token, parseUUID(req.WorkspaceID), parseUUID(userID)); err != nil {
writeError(w, http.StatusInternalServerError, "failed to approve pairing session")
return
}
rec, err = h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to reload pairing session")
return
}
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
}
func (h *Handler) ClaimDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
rec, err := h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusNotFound, "pairing session not found")
return
}
if rec.Status == "claimed" {
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
return
}
if rec.Status != "approved" {
writeError(w, http.StatusBadRequest, "pairing session is not approved")
return
}
if h.DB == nil {
writeError(w, http.StatusInternalServerError, "database executor is not configured")
return
}
if _, err := h.DB.Exec(r.Context(), `
UPDATE daemon_pairing_session
SET
status = 'claimed',
claimed_at = now(),
updated_at = now()
WHERE token = $1 AND status = 'approved'
`, token); err != nil {
writeError(w, http.StatusInternalServerError, "failed to claim pairing session")
return
}
rec, err = h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to reload pairing session")
return
}
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
}

View file

@ -20,16 +20,29 @@ type txStarter interface {
Begin(ctx context.Context) (pgx.Tx, error)
}
type dbExecutor interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
type Handler struct {
Queries *db.Queries
DB dbExecutor
TxStarter txStarter
Hub *realtime.Hub
TaskService *service.TaskService
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub) *Handler {
var executor dbExecutor
if candidate, ok := txStarter.(dbExecutor); ok {
executor = candidate
}
return &Handler{
Queries: queries,
DB: executor,
TxStarter: txStarter,
Hub: hub,
TaskService: service.NewTaskService(queries, hub),

View file

@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"io"
"net/http"
@ -34,6 +35,12 @@ type IssueResponse struct {
UpdatedAt string `json:"updated_at"`
}
type agentTriggerSnapshot struct {
Type string `json:"type"`
Enabled bool `json:"enabled"`
Config map[string]any `json:"config"`
}
func issueToResponse(i db.Issue) IssueResponse {
var ac []any
if i.AcceptanceCriteria != nil {
@ -258,8 +265,8 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) {
h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)})
}
// If assigned to an agent, enqueue a task with context
if issue.AssigneeType.String == "agent" {
// Only ready issues in todo are enqueued for agents.
if h.shouldEnqueueAgentTask(r.Context(), issue) {
h.TaskService.EnqueueTaskForIssue(r.Context(), issue)
}
}
@ -283,12 +290,12 @@ type UpdateIssueRequest struct {
func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
current, ok := h.loadIssueForUser(w, r, id)
prevIssue, ok := h.loadIssueForUser(w, r, id)
if !ok {
return
}
// Read body as raw bytes so we can detect which fields were explicitly sent
// Read body as raw bytes so we can detect which fields were explicitly sent.
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read request body")
@ -307,10 +314,10 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
// Pre-fill nullable fields (bare sqlc.narg) with current values
params := db.UpdateIssueParams{
ID: current.ID,
AssigneeType: current.AssigneeType,
AssigneeID: current.AssigneeID,
DueDate: current.DueDate,
ID: prevIssue.ID,
AssigneeType: prevIssue.AssigneeType,
AssigneeID: prevIssue.AssigneeID,
DueDate: prevIssue.DueDate,
}
// COALESCE fields — only set when explicitly provided
@ -379,16 +386,21 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
resp := issueToResponse(issue)
h.broadcast("issue:updated", map[string]any{"issue": resp})
// If assignee changed, handle agent task queue
if req.AssigneeType != nil || req.AssigneeID != nil {
// Cancel any existing agent tasks for this issue
assigneeChanged := (req.AssigneeType != nil || req.AssigneeID != nil) &&
(prevIssue.AssigneeType.String != issue.AssigneeType.String || uuidToString(prevIssue.AssigneeID) != uuidToString(issue.AssigneeID))
statusChanged := req.Status != nil && prevIssue.Status != issue.Status
// If assignee or readiness status changed, reconcile the task queue.
if assigneeChanged || statusChanged {
h.TaskService.CancelTasksForIssue(r.Context(), issue.ID)
// If newly assigned to an agent, enqueue a task with context
if issue.AssigneeType.Valid && issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid {
if h.shouldEnqueueAgentTask(r.Context(), issue) {
h.TaskService.EnqueueTaskForIssue(r.Context(), issue)
}
}
// If assignee changed, create a notification for the new assignee.
if assigneeChanged {
// Create inbox notification for new assignee
if issue.AssigneeType.Valid && issue.AssigneeID.Valid {
inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
@ -427,6 +439,34 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) shouldEnqueueAgentTask(ctx context.Context, issue db.Issue) bool {
if issue.Status != "todo" {
return false
}
if !issue.AssigneeType.Valid || issue.AssigneeType.String != "agent" || !issue.AssigneeID.Valid {
return false
}
agent, err := h.Queries.GetAgent(ctx, issue.AssigneeID)
if err != nil || !agent.RuntimeID.Valid {
return false
}
if agent.Triggers == nil || len(agent.Triggers) == 0 {
return true
}
var triggers []agentTriggerSnapshot
if err := json.Unmarshal(agent.Triggers, &triggers); err != nil {
return false
}
for _, trigger := range triggers {
if trigger.Type == "on_assign" && trigger.Enabled {
return true
}
}
return false
}
func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if _, ok := h.loadIssueForUser(w, r, id); !ok {

View file

@ -0,0 +1,68 @@
package handler
import (
"encoding/json"
"net/http"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
type AgentRuntimeResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
DaemonID *string `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata any `json:"metadata"`
LastSeenAt *string `json:"last_seen_at"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse {
var metadata any
if rt.Metadata != nil {
json.Unmarshal(rt.Metadata, &metadata)
}
if metadata == nil {
metadata = map[string]any{}
}
return AgentRuntimeResponse{
ID: uuidToString(rt.ID),
WorkspaceID: uuidToString(rt.WorkspaceID),
DaemonID: textToPtr(rt.DaemonID),
Name: rt.Name,
RuntimeMode: rt.RuntimeMode,
Provider: rt.Provider,
Status: rt.Status,
DeviceInfo: rt.DeviceInfo,
Metadata: metadata,
LastSeenAt: timestampToPtr(rt.LastSeenAt),
CreatedAt: timestampToString(rt.CreatedAt),
UpdatedAt: timestampToString(rt.UpdatedAt),
}
}
func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
if _, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found"); !ok {
return
}
runtimes, err := h.Queries.ListAgentRuntimes(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list runtimes")
return
}
resp := make([]AgentRuntimeResponse, len(runtimes))
for i, rt := range runtimes {
resp[i] = runtimeToResponse(rt)
}
writeJSON(w, http.StatusOK, resp)
}

View file

@ -8,9 +8,9 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
@ -29,11 +29,25 @@ func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue) (
return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee")
}
snapshot := buildContextSnapshot(issue)
agent, err := s.Queries.GetAgent(ctx, issue.AssigneeID)
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if !agent.RuntimeID.Valid {
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
runtime, err := s.Queries.GetAgentRuntime(ctx, agent.RuntimeID)
if err != nil {
return db.AgentTaskQueue{}, fmt.Errorf("load runtime: %w", err)
}
snapshot := buildContextSnapshot(issue, agent, runtime)
contextJSON, _ := json.Marshal(snapshot)
task, err := s.Queries.CreateAgentTaskWithContext(ctx, db.CreateAgentTaskWithContextParams{
AgentID: issue.AssigneeID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
Context: contextJSON,
@ -83,6 +97,34 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A
return &task, nil
}
// ClaimTaskForRuntime claims the next runnable task for a runtime while
// still respecting each agent's max_concurrent_tasks limit.
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID)
if err != nil {
return nil, fmt.Errorf("list pending tasks: %w", err)
}
triedAgents := map[string]struct{}{}
for _, candidate := range tasks {
agentKey := util.UUIDToString(candidate.AgentID)
if _, seen := triedAgents[agentKey]; seen {
continue
}
triedAgents[agentKey] = struct{}{}
task, err := s.ClaimTask(ctx, candidate.AgentID)
if err != nil {
return nil, err
}
if task != nil && task.RuntimeID == runtimeID {
return task, nil
}
}
return nil, nil
}
// StartTask transitions a dispatched task to running and syncs issue status.
func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.StartAgentTask(ctx, taskID)
@ -91,10 +133,13 @@ func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.Ag
}
// Sync issue → in_progress
s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
issue, err := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
ID: task.IssueID,
Status: "in_progress",
})
if err == nil {
s.broadcastIssueUpdated(issue)
}
return &task, nil
}
@ -110,10 +155,23 @@ func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, resu
}
// Sync issue → in_review
s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
issue, issueErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
ID: task.IssueID,
Status: "in_review",
})
if issueErr == nil {
s.broadcastIssueUpdated(issue)
}
var payload protocol.TaskCompletedPayload
if err := json.Unmarshal(result, &payload); err == nil {
if payload.Output != "" {
s.createAgentComment(ctx, task.IssueID, task.AgentID, payload.Output, "comment")
}
}
if issueErr == nil {
s.createInboxForIssueCreator(ctx, issue, "review_requested", "attention", "Review requested: "+issue.Title, "")
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
@ -135,10 +193,19 @@ func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg s
}
// Sync issue → blocked
s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
issue, issueErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{
ID: task.IssueID,
Status: "blocked",
})
if issueErr == nil {
s.broadcastIssueUpdated(issue)
}
if errMsg != "" {
s.createAgentComment(ctx, task.IssueID, task.AgentID, errMsg, "system")
}
if issueErr == nil {
s.createInboxForIssueCreator(ctx, issue, "agent_blocked", "action_required", "Agent blocked: "+issue.Title, errMsg)
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
@ -183,7 +250,7 @@ func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID
s.broadcast(protocol.EventAgentStatus, map[string]any{"agent": agentToMap(agent)})
}
func buildContextSnapshot(issue db.Issue) protocol.TaskDispatchPayload {
func buildContextSnapshot(issue db.Issue, agent db.Agent, runtime db.AgentRuntime) map[string]any {
var ac []string
if issue.AcceptanceCriteria != nil {
json.Unmarshal(issue.AcceptanceCriteria, &ac)
@ -198,13 +265,38 @@ func buildContextSnapshot(issue db.Issue) protocol.TaskDispatchPayload {
json.Unmarshal(issue.Repository, repo)
}
return protocol.TaskDispatchPayload{
IssueID: util.UUIDToString(issue.ID),
Title: issue.Title,
Description: issue.Description.String,
AcceptanceCriteria: ac,
ContextRefs: cr,
Repository: repo,
var tools any
if agent.Tools != nil {
json.Unmarshal(agent.Tools, &tools)
}
var metadata any
if runtime.Metadata != nil {
json.Unmarshal(runtime.Metadata, &metadata)
}
return map[string]any{
"issue": map[string]any{
"id": util.UUIDToString(issue.ID),
"title": issue.Title,
"description": issue.Description.String,
"acceptance_criteria": ac,
"context_refs": cr,
"repository": repo,
},
"agent": map[string]any{
"id": util.UUIDToString(agent.ID),
"name": agent.Name,
"skills": agent.Skills,
"tools": tools,
},
"runtime": map[string]any{
"id": util.UUIDToString(runtime.ID),
"name": runtime.Name,
"runtime_mode": runtime.RuntimeMode,
"provider": runtime.Provider,
"device_info": runtime.DeviceInfo,
"metadata": metadata,
},
}
}
@ -224,11 +316,15 @@ func priorityToInt(p string) int32 {
}
func (s *TaskService) broadcastTaskDispatch(task db.AgentTaskQueue) {
var payload protocol.TaskDispatchPayload
var payload map[string]any
if task.Context != nil {
json.Unmarshal(task.Context, &payload)
}
payload.TaskID = util.UUIDToString(task.ID)
if payload == nil {
payload = map[string]any{}
}
payload["task_id"] = util.UUIDToString(task.ID)
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
s.broadcast(protocol.EventTaskDispatch, payload)
}
@ -253,6 +349,108 @@ func (s *TaskService) broadcast(eventType string, payload any) {
s.Hub.Broadcast(data)
}
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
s.broadcast(protocol.EventIssueUpdated, map[string]any{
"issue": issueToMap(issue),
})
}
func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID pgtype.UUID, content, commentType string) {
if content == "" {
return
}
s.Queries.CreateComment(ctx, db.CreateCommentParams{
IssueID: issueID,
AuthorType: "agent",
AuthorID: agentID,
Content: content,
Type: commentType,
})
}
func (s *TaskService) createInboxForIssueCreator(ctx context.Context, issue db.Issue, itemType, severity, title, body string) {
if issue.CreatorType != "member" {
return
}
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: issue.WorkspaceID,
RecipientType: "member",
RecipientID: issue.CreatorID,
Type: itemType,
Severity: severity,
IssueID: issue.ID,
Title: title,
Body: util.PtrToText(&body),
})
if err != nil {
return
}
s.broadcast(protocol.EventInboxNew, map[string]any{
"item": inboxToMap(item),
})
}
func issueToMap(issue db.Issue) map[string]any {
var ac []any
if issue.AcceptanceCriteria != nil {
json.Unmarshal(issue.AcceptanceCriteria, &ac)
}
if ac == nil {
ac = []any{}
}
var cr []any
if issue.ContextRefs != nil {
json.Unmarshal(issue.ContextRefs, &cr)
}
if cr == nil {
cr = []any{}
}
var repo any
if issue.Repository != nil {
json.Unmarshal(issue.Repository, &repo)
}
return map[string]any{
"id": util.UUIDToString(issue.ID),
"workspace_id": util.UUIDToString(issue.WorkspaceID),
"title": issue.Title,
"description": util.TextToPtr(issue.Description),
"status": issue.Status,
"priority": issue.Priority,
"assignee_type": util.TextToPtr(issue.AssigneeType),
"assignee_id": util.UUIDToPtr(issue.AssigneeID),
"creator_type": issue.CreatorType,
"creator_id": util.UUIDToString(issue.CreatorID),
"parent_issue_id": util.UUIDToPtr(issue.ParentIssueID),
"acceptance_criteria": ac,
"context_refs": cr,
"repository": repo,
"position": issue.Position,
"due_date": util.TimestampToPtr(issue.DueDate),
"created_at": util.TimestampToString(issue.CreatedAt),
"updated_at": util.TimestampToString(issue.UpdatedAt),
}
}
func inboxToMap(item db.InboxItem) map[string]any {
return map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
}
}
// agentToMap builds a simple map for broadcasting agent status updates.
func agentToMap(a db.Agent) map[string]any {
var rc any
@ -270,6 +468,7 @@ func agentToMap(a db.Agent) map[string]any {
return map[string]any{
"id": util.UUIDToString(a.ID),
"workspace_id": util.UUIDToString(a.WorkspaceID),
"runtime_id": util.UUIDToString(a.RuntimeID),
"name": a.Name,
"description": a.Description,
"avatar_url": util.TextToPtr(a.AvatarUrl),

View file

@ -0,0 +1,13 @@
DROP INDEX IF EXISTS idx_agent_task_queue_runtime_pending;
DROP INDEX IF EXISTS idx_agent_runtime_status;
DROP INDEX IF EXISTS idx_agent_runtime_workspace;
ALTER TABLE agent_task_queue
DROP CONSTRAINT IF EXISTS agent_task_queue_runtime_id_fkey,
DROP COLUMN IF EXISTS runtime_id;
ALTER TABLE agent
DROP CONSTRAINT IF EXISTS agent_runtime_id_fkey,
DROP COLUMN IF EXISTS runtime_id;
DROP TABLE IF EXISTS agent_runtime;

View file

@ -0,0 +1,92 @@
CREATE TABLE agent_runtime (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE,
daemon_id TEXT,
name TEXT NOT NULL,
runtime_mode TEXT NOT NULL CHECK (runtime_mode IN ('local', 'cloud')),
provider TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'offline' CHECK (status IN ('online', 'offline')),
device_info TEXT NOT NULL DEFAULT '',
metadata JSONB NOT NULL DEFAULT '{}',
last_seen_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (workspace_id, daemon_id, provider)
);
ALTER TABLE agent
ADD COLUMN runtime_id UUID;
INSERT INTO agent_runtime (
workspace_id,
daemon_id,
name,
runtime_mode,
provider,
status,
device_info,
metadata,
last_seen_at,
created_at,
updated_at
)
SELECT
a.workspace_id,
NULL,
COALESCE(NULLIF(a.runtime_config->>'runtime_name', ''), a.name || ' Runtime'),
a.runtime_mode,
COALESCE(
NULLIF(a.runtime_config->>'provider', ''),
CASE
WHEN a.runtime_mode = 'cloud' THEN 'multica_agent'
ELSE 'legacy_local'
END
),
CASE
WHEN a.status = 'offline' THEN 'offline'
ELSE 'online'
END,
COALESCE(
NULLIF(a.runtime_config->>'runtime_name', ''),
CASE
WHEN a.runtime_mode = 'cloud' THEN 'Cloud runtime'
ELSE 'Local runtime'
END
),
jsonb_build_object('migrated_agent_id', a.id::text),
CASE
WHEN a.status = 'offline' THEN NULL
ELSE a.updated_at
END,
a.created_at,
a.updated_at
FROM agent a;
UPDATE agent a
SET runtime_id = ar.id
FROM agent_runtime ar
WHERE ar.metadata->>'migrated_agent_id' = a.id::text;
ALTER TABLE agent
ALTER COLUMN runtime_id SET NOT NULL,
ADD CONSTRAINT agent_runtime_id_fkey
FOREIGN KEY (runtime_id) REFERENCES agent_runtime(id) ON DELETE RESTRICT;
ALTER TABLE agent_task_queue
ADD COLUMN runtime_id UUID;
UPDATE agent_task_queue atq
SET runtime_id = a.runtime_id
FROM agent a
WHERE a.id = atq.agent_id;
ALTER TABLE agent_task_queue
ALTER COLUMN runtime_id SET NOT NULL,
ADD CONSTRAINT agent_task_queue_runtime_id_fkey
FOREIGN KEY (runtime_id) REFERENCES agent_runtime(id) ON DELETE CASCADE;
CREATE INDEX idx_agent_runtime_workspace ON agent_runtime(workspace_id);
CREATE INDEX idx_agent_runtime_status ON agent_runtime(workspace_id, status);
CREATE INDEX idx_agent_task_queue_runtime_pending
ON agent_task_queue(runtime_id, priority DESC, created_at ASC)
WHERE status IN ('queued', 'dispatched');

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS daemon_pairing_session;

View file

@ -0,0 +1,20 @@
CREATE TABLE daemon_pairing_session (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
token TEXT NOT NULL UNIQUE,
daemon_id TEXT NOT NULL,
device_name TEXT NOT NULL,
runtime_name TEXT NOT NULL,
runtime_type TEXT NOT NULL,
runtime_version TEXT NOT NULL DEFAULT '',
workspace_id UUID REFERENCES workspace(id) ON DELETE CASCADE,
approved_by UUID REFERENCES "user"(id) ON DELETE SET NULL,
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'approved', 'claimed', 'expired')),
approved_at TIMESTAMPTZ,
claimed_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_daemon_pairing_session_token ON daemon_pairing_session (token);
CREATE INDEX idx_daemon_pairing_session_status_expires ON daemon_pairing_session (status, expires_at);

View file

@ -32,7 +32,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
`
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
@ -51,6 +51,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
@ -59,7 +60,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
`
type CompleteAgentTaskParams struct {
@ -83,6 +84,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
@ -102,10 +104,10 @@ func (q *Queries) CountRunningTasks(ctx context.Context, agentID pgtype.UUID) (i
const createAgent = `-- name: CreateAgent :one
INSERT INTO agent (
workspace_id, name, description, avatar_url, runtime_mode,
runtime_config, visibility, max_concurrent_tasks, owner_id,
runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id,
skills, tools, triggers
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id
`
type CreateAgentParams struct {
@ -115,6 +117,7 @@ type CreateAgentParams struct {
AvatarUrl pgtype.Text `json:"avatar_url"`
RuntimeMode string `json:"runtime_mode"`
RuntimeConfig []byte `json:"runtime_config"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Visibility string `json:"visibility"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
OwnerID pgtype.UUID `json:"owner_id"`
@ -131,6 +134,7 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
arg.AvatarUrl,
arg.RuntimeMode,
arg.RuntimeConfig,
arg.RuntimeID,
arg.Visibility,
arg.MaxConcurrentTasks,
arg.OwnerID,
@ -156,24 +160,31 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
&i.Skills,
&i.Tools,
&i.Triggers,
&i.RuntimeID,
)
return i, err
}
const createAgentTask = `-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (agent_id, issue_id, status, priority)
VALUES ($1, $2, 'queued', $3)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority)
VALUES ($1, $2, $3, 'queued', $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
`
type CreateAgentTaskParams struct {
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
IssueID pgtype.UUID `json:"issue_id"`
Priority int32 `json:"priority"`
}
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, createAgentTask, arg.AgentID, arg.IssueID, arg.Priority)
row := q.db.QueryRow(ctx, createAgentTask,
arg.AgentID,
arg.RuntimeID,
arg.IssueID,
arg.Priority,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
@ -188,18 +199,20 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
const createAgentTaskWithContext = `-- name: CreateAgentTaskWithContext :one
INSERT INTO agent_task_queue (agent_id, issue_id, status, priority, context)
VALUES ($1, $2, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
VALUES ($1, $2, $3, 'queued', $4, $5)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
`
type CreateAgentTaskWithContextParams struct {
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
IssueID pgtype.UUID `json:"issue_id"`
Priority int32 `json:"priority"`
Context []byte `json:"context"`
@ -208,6 +221,7 @@ type CreateAgentTaskWithContextParams struct {
func (q *Queries) CreateAgentTaskWithContext(ctx context.Context, arg CreateAgentTaskWithContextParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, createAgentTaskWithContext,
arg.AgentID,
arg.RuntimeID,
arg.IssueID,
arg.Priority,
arg.Context,
@ -226,6 +240,7 @@ func (q *Queries) CreateAgentTaskWithContext(ctx context.Context, arg CreateAgen
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
@ -239,22 +254,11 @@ func (q *Queries) DeleteAgent(ctx context.Context, id pgtype.UUID) error {
return err
}
const disconnectDaemon = `-- name: DisconnectDaemon :exec
UPDATE daemon_connection
SET status = 'disconnected', updated_at = now()
WHERE daemon_id = $1
`
func (q *Queries) DisconnectDaemon(ctx context.Context, daemonID string) error {
_, err := q.db.Exec(ctx, disconnectDaemon, daemonID)
return err
}
const failAgentTask = `-- name: FailAgentTask :one
UPDATE agent_task_queue
SET status = 'failed', completed_at = now(), error = $2
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
`
type FailAgentTaskParams struct {
@ -278,12 +282,13 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
const getAgent = `-- name: GetAgent :one
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers FROM agent
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id FROM agent
WHERE id = $1
`
@ -307,12 +312,13 @@ func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
&i.Skills,
&i.Tools,
&i.Triggers,
&i.RuntimeID,
)
return i, err
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue
WHERE id = $1
`
@ -332,12 +338,13 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
@ -364,6 +371,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
); err != nil {
return nil, err
}
@ -376,7 +384,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
}
const listAgents = `-- name: ListAgents :many
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers FROM agent
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id FROM agent
WHERE workspace_id = $1
ORDER BY created_at ASC
`
@ -407,6 +415,7 @@ func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Ag
&i.Skills,
&i.Tools,
&i.Triggers,
&i.RuntimeID,
); err != nil {
return nil, err
}
@ -418,14 +427,14 @@ func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Ag
return items, nil
}
const listPendingTasks = `-- name: ListPendingTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context FROM agent_task_queue
WHERE agent_id = $1 AND status IN ('queued', 'dispatched')
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
func (q *Queries) ListPendingTasks(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listPendingTasks, agentID)
func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listPendingTasksByRuntime, runtimeID)
if err != nil {
return nil, err
}
@ -446,6 +455,7 @@ func (q *Queries) ListPendingTasks(ctx context.Context, agentID pgtype.UUID) ([]
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
); err != nil {
return nil, err
}
@ -461,7 +471,7 @@ const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
`
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@ -480,6 +490,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
)
return i, err
}
@ -490,15 +501,17 @@ UPDATE agent SET
description = COALESCE($3, description),
avatar_url = COALESCE($4, avatar_url),
runtime_config = COALESCE($5, runtime_config),
visibility = COALESCE($6, visibility),
status = COALESCE($7, status),
max_concurrent_tasks = COALESCE($8, max_concurrent_tasks),
skills = COALESCE($9, skills),
tools = COALESCE($10, tools),
triggers = COALESCE($11, triggers),
runtime_mode = COALESCE($6, runtime_mode),
runtime_id = COALESCE($7, runtime_id),
visibility = COALESCE($8, visibility),
status = COALESCE($9, status),
max_concurrent_tasks = COALESCE($10, max_concurrent_tasks),
skills = COALESCE($11, skills),
tools = COALESCE($12, tools),
triggers = COALESCE($13, triggers),
updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id
`
type UpdateAgentParams struct {
@ -507,6 +520,8 @@ type UpdateAgentParams struct {
Description pgtype.Text `json:"description"`
AvatarUrl pgtype.Text `json:"avatar_url"`
RuntimeConfig []byte `json:"runtime_config"`
RuntimeMode pgtype.Text `json:"runtime_mode"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Visibility pgtype.Text `json:"visibility"`
Status pgtype.Text `json:"status"`
MaxConcurrentTasks pgtype.Int4 `json:"max_concurrent_tasks"`
@ -522,6 +537,8 @@ func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent
arg.Description,
arg.AvatarUrl,
arg.RuntimeConfig,
arg.RuntimeMode,
arg.RuntimeID,
arg.Visibility,
arg.Status,
arg.MaxConcurrentTasks,
@ -547,6 +564,7 @@ func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent
&i.Skills,
&i.Tools,
&i.Triggers,
&i.RuntimeID,
)
return i, err
}
@ -554,7 +572,7 @@ func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent
const updateAgentStatus = `-- name: UpdateAgentStatus :one
UPDATE agent SET status = $2, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id
`
type UpdateAgentStatusParams struct {
@ -582,52 +600,7 @@ func (q *Queries) UpdateAgentStatus(ctx context.Context, arg UpdateAgentStatusPa
&i.Skills,
&i.Tools,
&i.Triggers,
)
return i, err
}
const updateDaemonHeartbeat = `-- name: UpdateDaemonHeartbeat :exec
UPDATE daemon_connection
SET last_heartbeat_at = now(), updated_at = now()
WHERE daemon_id = $1 AND agent_id = $2
`
type UpdateDaemonHeartbeatParams struct {
DaemonID string `json:"daemon_id"`
AgentID pgtype.UUID `json:"agent_id"`
}
func (q *Queries) UpdateDaemonHeartbeat(ctx context.Context, arg UpdateDaemonHeartbeatParams) error {
_, err := q.db.Exec(ctx, updateDaemonHeartbeat, arg.DaemonID, arg.AgentID)
return err
}
const upsertDaemonConnection = `-- name: UpsertDaemonConnection :one
INSERT INTO daemon_connection (agent_id, daemon_id, status, last_heartbeat_at, runtime_info)
VALUES ($1, $2, 'connected', now(), $3)
ON CONFLICT ON CONSTRAINT uq_daemon_agent
DO UPDATE SET status = 'connected', last_heartbeat_at = now(), runtime_info = $3, updated_at = now()
RETURNING id, agent_id, daemon_id, status, last_heartbeat_at, runtime_info, created_at, updated_at
`
type UpsertDaemonConnectionParams struct {
AgentID pgtype.UUID `json:"agent_id"`
DaemonID string `json:"daemon_id"`
RuntimeInfo []byte `json:"runtime_info"`
}
func (q *Queries) UpsertDaemonConnection(ctx context.Context, arg UpsertDaemonConnectionParams) (DaemonConnection, error) {
row := q.db.QueryRow(ctx, upsertDaemonConnection, arg.AgentID, arg.DaemonID, arg.RuntimeInfo)
var i DaemonConnection
err := row.Scan(
&i.ID,
&i.AgentID,
&i.DaemonID,
&i.Status,
&i.LastHeartbeatAt,
&i.RuntimeInfo,
&i.CreatedAt,
&i.UpdatedAt,
&i.RuntimeID,
)
return i, err
}

View file

@ -36,6 +36,22 @@ type Agent struct {
Skills string `json:"skills"`
Tools []byte `json:"tools"`
Triggers []byte `json:"triggers"`
RuntimeID pgtype.UUID `json:"runtime_id"`
}
type AgentRuntime struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata []byte `json:"metadata"`
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type AgentTaskQueue struct {
@ -51,6 +67,7 @@ type AgentTaskQueue struct {
Error pgtype.Text `json:"error"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
Context []byte `json:"context"`
RuntimeID pgtype.UUID `json:"runtime_id"`
}
type Comment struct {

View file

@ -0,0 +1,197 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: runtime.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const getAgentRuntime = `-- name: GetAgentRuntime :one
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime
WHERE id = $1
`
func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, getAgentRuntime, id)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime
WHERE id = $1 AND workspace_id = $2
`
type GetAgentRuntimeForWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentRuntimeForWorkspaceParams) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, getAgentRuntimeForWorkspace, arg.ID, arg.WorkspaceID)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const listAgentRuntimes = `-- name: ListAgentRuntimes :many
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime
WHERE workspace_id = $1
ORDER BY created_at ASC
`
func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID) ([]AgentRuntime, error) {
rows, err := q.db.Query(ctx, listAgentRuntimes, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentRuntime{}
for rows.Next() {
var i AgentRuntime
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updateAgentRuntimeHeartbeat = `-- name: UpdateAgentRuntimeHeartbeat :one
UPDATE agent_runtime
SET status = 'online', last_seen_at = now(), updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at
`
func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, updateAgentRuntimeHeartbeat, id)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const upsertAgentRuntime = `-- name: UpsertAgentRuntime :one
INSERT INTO agent_runtime (
workspace_id,
daemon_id,
name,
runtime_mode,
provider,
status,
device_info,
metadata,
last_seen_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now())
ON CONFLICT (workspace_id, daemon_id, provider)
DO UPDATE SET
name = EXCLUDED.name,
runtime_mode = EXCLUDED.runtime_mode,
status = EXCLUDED.status,
device_info = EXCLUDED.device_info,
metadata = EXCLUDED.metadata,
last_seen_at = now(),
updated_at = now()
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at
`
type UpsertAgentRuntimeParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata []byte `json:"metadata"`
}
func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntimeParams) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, upsertAgentRuntime,
arg.WorkspaceID,
arg.DaemonID,
arg.Name,
arg.RuntimeMode,
arg.Provider,
arg.Status,
arg.DeviceInfo,
arg.Metadata,
)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}

View file

@ -10,9 +10,9 @@ WHERE id = $1;
-- name: CreateAgent :one
INSERT INTO agent (
workspace_id, name, description, avatar_url, runtime_mode,
runtime_config, visibility, max_concurrent_tasks, owner_id,
runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id,
skills, tools, triggers
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING *;
-- name: UpdateAgent :one
@ -21,6 +21,8 @@ UPDATE agent SET
description = COALESCE(sqlc.narg('description'), description),
avatar_url = COALESCE(sqlc.narg('avatar_url'), avatar_url),
runtime_config = COALESCE(sqlc.narg('runtime_config'), runtime_config),
runtime_mode = COALESCE(sqlc.narg('runtime_mode'), runtime_mode),
runtime_id = COALESCE(sqlc.narg('runtime_id'), runtime_id),
visibility = COALESCE(sqlc.narg('visibility'), visibility),
status = COALESCE(sqlc.narg('status'), status),
max_concurrent_tasks = COALESCE(sqlc.narg('max_concurrent_tasks'), max_concurrent_tasks),
@ -40,8 +42,8 @@ WHERE agent_id = $1
ORDER BY created_at DESC;
-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (agent_id, issue_id, status, priority)
VALUES ($1, $2, 'queued', $3)
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority)
VALUES ($1, $2, $3, 'queued', $4)
RETURNING *;
-- name: CancelAgentTasksByIssue :exec
@ -54,8 +56,8 @@ SELECT * FROM agent_task_queue
WHERE id = $1;
-- name: CreateAgentTaskWithContext :one
INSERT INTO agent_task_queue (agent_id, issue_id, status, priority, context)
VALUES ($1, $2, 'queued', $3, $4)
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
VALUES ($1, $2, $3, 'queued', $4, $5)
RETURNING *;
-- name: ClaimAgentTask :one
@ -92,29 +94,12 @@ RETURNING *;
SELECT count(*) FROM agent_task_queue
WHERE agent_id = $1 AND status IN ('dispatched', 'running');
-- name: ListPendingTasks :many
-- name: ListPendingTasksByRuntime :many
SELECT * FROM agent_task_queue
WHERE agent_id = $1 AND status IN ('queued', 'dispatched')
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC;
-- name: UpdateAgentStatus :one
UPDATE agent SET status = $2, updated_at = now()
WHERE id = $1
RETURNING *;
-- name: UpsertDaemonConnection :one
INSERT INTO daemon_connection (agent_id, daemon_id, status, last_heartbeat_at, runtime_info)
VALUES ($1, $2, 'connected', now(), $3)
ON CONFLICT ON CONSTRAINT uq_daemon_agent
DO UPDATE SET status = 'connected', last_heartbeat_at = now(), runtime_info = $3, updated_at = now()
RETURNING *;
-- name: UpdateDaemonHeartbeat :exec
UPDATE daemon_connection
SET last_heartbeat_at = now(), updated_at = now()
WHERE daemon_id = $1 AND agent_id = $2;
-- name: DisconnectDaemon :exec
UPDATE daemon_connection
SET status = 'disconnected', updated_at = now()
WHERE daemon_id = $1;

View file

@ -0,0 +1,41 @@
-- name: ListAgentRuntimes :many
SELECT * FROM agent_runtime
WHERE workspace_id = $1
ORDER BY created_at ASC;
-- name: GetAgentRuntime :one
SELECT * FROM agent_runtime
WHERE id = $1;
-- name: GetAgentRuntimeForWorkspace :one
SELECT * FROM agent_runtime
WHERE id = $1 AND workspace_id = $2;
-- name: UpsertAgentRuntime :one
INSERT INTO agent_runtime (
workspace_id,
daemon_id,
name,
runtime_mode,
provider,
status,
device_info,
metadata,
last_seen_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now())
ON CONFLICT (workspace_id, daemon_id, provider)
DO UPDATE SET
name = EXCLUDED.name,
runtime_mode = EXCLUDED.runtime_mode,
status = EXCLUDED.status,
device_info = EXCLUDED.device_info,
metadata = EXCLUDED.metadata,
last_seen_at = now(),
updated_at = now()
RETURNING *;
-- name: UpdateAgentRuntimeHeartbeat :one
UPDATE agent_runtime
SET status = 'online', last_seen_at = now(), updated_at = now()
WHERE id = $1
RETURNING *;