From c6960d39b9ad35f0cddc68740dd72040b10a0365 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 24 Mar 2026 12:02:44 +0800 Subject: [PATCH 1/3] chore(claude): default to replacing stale flows --- CLAUDE.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 87be1702..57c1342a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -54,6 +54,9 @@ docker compose down # Stop PostgreSQL - Go code follows standard Go conventions (gofmt, go vet). - Keep comments in code **English only**. - Prefer existing patterns/components over introducing parallel abstractions. +- Unless the user explicitly asks for backwards compatibility, do **not** add compatibility layers, fallback paths, dual-write logic, legacy adapters, or temporary shims. +- If a flow or API is being replaced and the product is not yet live, prefer removing the old path instead of preserving both old and new behavior. +- Treat compatibility code as a maintenance cost, not a default safety mechanism. Avoid "just in case" branches that make the codebase harder to reason about. - Avoid broad refactors unless required by the task. ## 5. UI/UX Rules From cdfa63af15aaf0373a4635a5904bf2010b8cc4e6 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 24 Mar 2026 12:03:14 +0800 Subject: [PATCH 2/3] feat(runtime): add local codex daemon pairing --- .env.example | 11 + Makefile | 2 +- README.md | 21 + apps/web/app/(auth)/login/page.test.tsx | 5 +- apps/web/app/(auth)/login/page.tsx | 4 +- apps/web/app/(dashboard)/agents/page.tsx | 103 +-- apps/web/app/(dashboard)/settings/page.tsx | 4 +- apps/web/app/pair/local/page.tsx | 155 ++++ apps/web/lib/auth-context.test.tsx | 1 + apps/web/lib/auth-context.tsx | 6 +- apps/web/test/helpers.tsx | 1 + packages/sdk/src/api-client.ts | 24 + packages/types/src/agent.ts | 14 +- packages/types/src/daemon.ts | 22 + packages/types/src/index.ts | 2 + server/cmd/daemon/daemon.go | 826 ++++++++++++++++++ server/cmd/daemon/daemon_test.go | 63 ++ server/cmd/daemon/main.go | 29 +- server/cmd/seed/main.go | 33 +- server/cmd/server/router.go | 18 +- server/internal/handler/agent.go | 41 +- server/internal/handler/daemon.go | 118 ++- server/internal/handler/daemon_pairing.go | 386 ++++++++ server/internal/handler/handler.go | 13 + server/internal/handler/issue.go | 102 ++- server/internal/handler/runtime.go | 68 ++ server/internal/service/task.go | 237 ++++- .../004_agent_runtime_loop.down.sql | 13 + .../migrations/004_agent_runtime_loop.up.sql | 92 ++ server/migrations/005_daemon_pairing.down.sql | 1 + server/migrations/005_daemon_pairing.up.sql | 20 + server/pkg/db/generated/agent.sql.go | 163 ++-- server/pkg/db/generated/models.go | 17 + server/pkg/db/generated/runtime.sql.go | 197 +++++ server/pkg/db/queries/agent.sql | 35 +- server/pkg/db/queries/runtime.sql | 41 + 36 files changed, 2579 insertions(+), 309 deletions(-) create mode 100644 apps/web/app/pair/local/page.tsx create mode 100644 packages/types/src/daemon.ts create mode 100644 server/cmd/daemon/daemon.go create mode 100644 server/cmd/daemon/daemon_test.go create mode 100644 server/internal/handler/daemon_pairing.go create mode 100644 server/internal/handler/runtime.go create mode 100644 server/migrations/004_agent_runtime_loop.down.sql create mode 100644 server/migrations/004_agent_runtime_loop.up.sql create mode 100644 server/migrations/005_daemon_pairing.down.sql create mode 100644 server/migrations/005_daemon_pairing.up.sql create mode 100644 server/pkg/db/generated/runtime.sql.go create mode 100644 server/pkg/db/queries/runtime.sql diff --git a/.env.example b/.env.example index 1d54a44d..3c410d41 100644 --- a/.env.example +++ b/.env.example @@ -10,6 +10,17 @@ DATABASE_URL=postgres://multica:multica@localhost:5432/multica?sslmode=disable PORT=8080 JWT_SECRET=change-me-in-production MULTICA_SERVER_URL=ws://localhost:8080/ws +MULTICA_APP_URL=http://localhost:3000 +MULTICA_DAEMON_CONFIG= +MULTICA_WORKSPACE_ID= +MULTICA_DAEMON_ID= +MULTICA_DAEMON_DEVICE_NAME= +MULTICA_DAEMON_POLL_INTERVAL=3s +MULTICA_DAEMON_HEARTBEAT_INTERVAL=15s +MULTICA_CODEX_PATH=codex +MULTICA_CODEX_MODEL= +MULTICA_CODEX_WORKDIR= +MULTICA_CODEX_TIMEOUT=20m # Google OAuth GOOGLE_CLIENT_ID= diff --git a/Makefile b/Makefile index 0b99e759..9fe220c2 100644 --- a/Makefile +++ b/Makefile @@ -113,7 +113,7 @@ dev: cd server && go run ./cmd/server daemon: - cd server && go run ./cmd/daemon + cd server && MULTICA_CODEX_WORKDIR="${MULTICA_CODEX_WORKDIR:-$(abspath .)}" go run ./cmd/daemon build: cd server && go build -o bin/server ./cmd/server diff --git a/README.md b/README.md index 3cc64671..e821dbf8 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,27 @@ See [`.env.example`](.env.example) for all available variables: - `PORT` — Backend server port (default: 8080) - `FRONTEND_PORT` / `FRONTEND_ORIGIN` — Frontend port and browser origin - `JWT_SECRET` — JWT signing secret +- `MULTICA_APP_URL` — Browser origin used when generating local runtime pairing links +- `MULTICA_DAEMON_CONFIG` — Optional path for the daemon's persisted local config +- `MULTICA_WORKSPACE_ID` — Optional dev override for the workspace id; normal usage should rely on browser pairing instead +- `MULTICA_DAEMON_ID` / `MULTICA_DAEMON_DEVICE_NAME` — Stable daemon identity for local runtime registration +- `MULTICA_CODEX_PATH` / `MULTICA_CODEX_MODEL` — Codex executable and optional model override for local task execution +- `MULTICA_CODEX_WORKDIR` — Default working directory used by the local Codex runtime - `GOOGLE_CLIENT_ID` / `GOOGLE_CLIENT_SECRET` — Google OAuth (optional) - `NEXT_PUBLIC_API_URL` — Frontend → backend API URL - `NEXT_PUBLIC_WS_URL` — Frontend → backend WebSocket URL + +## Local Codex Daemon + +The local daemon currently supports one local runtime type: `codex`. + +1. Start the daemon with `make daemon`. +2. If the daemon does not already know its workspace, it prints a pairing link in the terminal. +3. Open that link in the browser, sign in, and choose the workspace that should own the local Codex runtime. +4. The daemon stores the approved workspace locally in `MULTICA_DAEMON_CONFIG` or `~/.multica/daemon.json`. +5. The daemon registers the local Codex runtime via `/api/daemon/register`. +6. Create an agent in Multica and bind it to that runtime. +7. Assign an issue to the agent and move the issue to `todo`. +8. The daemon claims the task, runs `codex exec`, and reports the final comment back to the issue. + +For local development you can still set `MULTICA_WORKSPACE_ID` directly to skip pairing, but that should be treated as a debug shortcut rather than the normal flow. diff --git a/apps/web/app/(auth)/login/page.test.tsx b/apps/web/app/(auth)/login/page.test.tsx index cb49d361..d9d97164 100644 --- a/apps/web/app/(auth)/login/page.test.tsx +++ b/apps/web/app/(auth)/login/page.test.tsx @@ -6,6 +6,7 @@ import userEvent from "@testing-library/user-event"; vi.mock("next/navigation", () => ({ useRouter: () => ({ push: vi.fn() }), usePathname: () => "/login", + useSearchParams: () => new URLSearchParams(), })); // Mock auth-context @@ -68,7 +69,7 @@ describe("LoginPage", () => { await user.click(screen.getByRole("button", { name: "Sign in" })); await waitFor(() => { - expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", "Test User"); + expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", "Test User", undefined); }); }); @@ -81,7 +82,7 @@ describe("LoginPage", () => { await user.click(screen.getByRole("button", { name: "Sign in" })); await waitFor(() => { - expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", undefined); + expect(mockLogin).toHaveBeenCalledWith("test@multica.ai", undefined, undefined); }); }); diff --git a/apps/web/app/(auth)/login/page.tsx b/apps/web/app/(auth)/login/page.tsx index 49c50fc5..222891e8 100644 --- a/apps/web/app/(auth)/login/page.tsx +++ b/apps/web/app/(auth)/login/page.tsx @@ -1,10 +1,12 @@ "use client"; import { useState } from "react"; +import { useSearchParams } from "next/navigation"; import { useAuth } from "../../../lib/auth-context"; export default function LoginPage() { const { login, isLoading } = useAuth(); + const searchParams = useSearchParams(); const [email, setEmail] = useState(""); const [name, setName] = useState(""); const [error, setError] = useState(""); @@ -19,7 +21,7 @@ export default function LoginPage() { setError(""); setSubmitting(true); try { - await login(email, name || undefined); + await login(email, name || undefined, searchParams.get("next") || undefined); } catch (err) { setError("Login failed. Make sure the server is running."); setSubmitting(false); diff --git a/apps/web/app/(dashboard)/agents/page.tsx b/apps/web/app/(dashboard)/agents/page.tsx index 4517f1f7..8879788d 100644 --- a/apps/web/app/(dashboard)/agents/page.tsx +++ b/apps/web/app/(dashboard)/agents/page.tsx @@ -72,50 +72,8 @@ function generateId(): string { return `${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; } -// --------------------------------------------------------------------------- -// Mock Runtime Devices (will be replaced with real daemon registration API) -// --------------------------------------------------------------------------- - -const MOCK_RUNTIME_DEVICES: RuntimeDevice[] = [ - { - id: "runtime-cloud", - name: "Multica Agent", - runtime_mode: "cloud", - status: "online", - device_info: "Cloud", - }, - { - id: "runtime-macbook", - name: "Jiayuan's MacBook Pro", - runtime_mode: "local", - status: "online", - device_info: "macOS 15.4 · Claude Code v1.2", - }, - { - id: "runtime-linux", - name: "Dev Server (gpu-01)", - runtime_mode: "local", - status: "online", - device_info: "Ubuntu 24.04 · Codex v0.8", - }, - { - id: "runtime-ci", - name: "CI Runner", - runtime_mode: "local", - status: "offline", - device_info: "Linux · GitHub Actions", - }, -]; - -function getRuntimeDevice(agent: Agent): RuntimeDevice | undefined { - const runtimeId = agent.runtime_config?.runtime_id as string | undefined; - if (runtimeId) { - return MOCK_RUNTIME_DEVICES.find((d) => d.id === runtimeId); - } - if (agent.runtime_mode === "cloud") { - return MOCK_RUNTIME_DEVICES.find((d) => d.runtime_mode === "cloud"); - } - return undefined; +function getRuntimeDevice(agent: Agent, runtimes: RuntimeDevice[]): RuntimeDevice | undefined { + return runtimes.find((runtime) => runtime.id === agent.runtime_id); } // --------------------------------------------------------------------------- @@ -123,32 +81,36 @@ function getRuntimeDevice(agent: Agent): RuntimeDevice | undefined { // --------------------------------------------------------------------------- function CreateAgentDialog({ + runtimes, onClose, onCreate, }: { + runtimes: RuntimeDevice[]; onClose: () => void; onCreate: (data: CreateAgentRequest) => Promise; }) { const [name, setName] = useState(""); const [description, setDescription] = useState(""); - const [selectedRuntimeId, setSelectedRuntimeId] = useState(MOCK_RUNTIME_DEVICES[0]!.id); + const [selectedRuntimeId, setSelectedRuntimeId] = useState(runtimes[0]?.id ?? ""); const [creating, setCreating] = useState(false); const [runtimeOpen, setRuntimeOpen] = useState(false); - const selectedRuntime = MOCK_RUNTIME_DEVICES.find((d) => d.id === selectedRuntimeId)!; + useEffect(() => { + if (!selectedRuntimeId && runtimes[0]) { + setSelectedRuntimeId(runtimes[0].id); + } + }, [runtimes, selectedRuntimeId]); + + const selectedRuntime = runtimes.find((d) => d.id === selectedRuntimeId) ?? null; const handleSubmit = async () => { - if (!name.trim()) return; + if (!name.trim() || !selectedRuntime) return; setCreating(true); try { await onCreate({ name: name.trim(), description: description.trim(), - runtime_mode: selectedRuntime.runtime_mode, - runtime_config: { - runtime_id: selectedRuntime.id, - runtime_name: selectedRuntime.name, - }, + runtime_id: selectedRuntime.id, triggers: [{ id: generateId(), type: "on_assign", enabled: true, config: {} }], }); onClose(); @@ -205,23 +167,28 @@ function CreateAgentDialog({ @@ -230,7 +197,7 @@ function CreateAgentDialog({ <>
setRuntimeOpen(false)} />
- {MOCK_RUNTIME_DEVICES.map((device) => ( + {runtimes.map((device) => (
diff --git a/apps/web/app/pair/local/page.tsx b/apps/web/app/pair/local/page.tsx new file mode 100644 index 00000000..79234b5b --- /dev/null +++ b/apps/web/app/pair/local/page.tsx @@ -0,0 +1,155 @@ +"use client"; + +import Link from "next/link"; +import { useEffect, useMemo, useState } from "react"; +import { useSearchParams } from "next/navigation"; +import type { DaemonPairingSession } from "@multica/types"; +import { api } from "../../../lib/api"; +import { useAuth } from "../../../lib/auth-context"; + +function formatExpiresAt(value: string) { + return new Date(value).toLocaleString("en-US", { + month: "short", + day: "numeric", + hour: "numeric", + minute: "2-digit", + }); +} + +export default function LocalDaemonPairPage() { + const searchParams = useSearchParams(); + const token = searchParams.get("token") ?? ""; + const { user, workspaces, workspace, isLoading } = useAuth(); + const [session, setSession] = useState(null); + const [selectedWorkspaceId, setSelectedWorkspaceId] = useState(""); + const [loading, setLoading] = useState(true); + const [submitting, setSubmitting] = useState(false); + const [error, setError] = useState(""); + + const nextLoginURL = useMemo(() => { + const next = `/pair/local?token=${encodeURIComponent(token)}`; + return `/login?next=${encodeURIComponent(next)}`; + }, [token]); + + useEffect(() => { + if (!token) { + setError("Missing pairing token."); + setLoading(false); + return; + } + + setLoading(true); + api.getDaemonPairingSession(token) + .then((value) => { + setSession(value); + setSelectedWorkspaceId(value.workspace_id || workspace?.id || workspaces[0]?.id || ""); + }) + .catch((err) => setError(err instanceof Error ? err.message : "Failed to load pairing session.")) + .finally(() => setLoading(false)); + }, [token, workspace?.id, workspaces]); + + const approve = async () => { + if (!token || !selectedWorkspaceId) return; + setSubmitting(true); + setError(""); + try { + const approved = await api.approveDaemonPairingSession(token, { + workspace_id: selectedWorkspaceId, + }); + setSession(approved); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to approve pairing session."); + } finally { + setSubmitting(false); + } + }; + + return ( +
+
+
+

Connect Local Codex Runtime

+

+ Approve this pairing request to register your local Codex runtime with a workspace. +

+
+ + {loading || isLoading ? ( +
Loading pairing session...
+ ) : error ? ( +
+ {error} +
+ ) : session ? ( + <> +
+
{session.runtime_name}
+
+ {session.device_name} + {session.runtime_version ? ` · ${session.runtime_version}` : ""} +
+
+ {session.runtime_type} +
+
+ Expires {formatExpiresAt(session.expires_at)} +
+
+ + {!user ? ( +
+

+ Sign in first, then choose which workspace should own this local runtime. +

+ + Sign in to continue + +
+ ) : session.status === "approved" || session.status === "claimed" ? ( +
+ This runtime is linked to a workspace. Return to the daemon window to finish setup. +
+ ) : session.status === "expired" ? ( +
+ This pairing link expired. Restart the daemon to generate a new link. +
+ ) : workspaces.length === 0 ? ( +
+ You do not have a workspace yet. Create one first, then reopen this pairing link. +
+ ) : ( +
+
+ + +
+ + +
+ )} + + ) : null} +
+
+ ); +} diff --git a/apps/web/lib/auth-context.test.tsx b/apps/web/lib/auth-context.test.tsx index 5da73903..cb8b3763 100644 --- a/apps/web/lib/auth-context.test.tsx +++ b/apps/web/lib/auth-context.test.tsx @@ -76,6 +76,7 @@ const mockAgents: Agent[] = [ { id: "agent-1", workspace_id: "ws-1", + runtime_id: "runtime-1", name: "Claude", description: "", avatar_url: null, diff --git a/apps/web/lib/auth-context.tsx b/apps/web/lib/auth-context.tsx index 5b64a016..280812c6 100644 --- a/apps/web/lib/auth-context.tsx +++ b/apps/web/lib/auth-context.tsx @@ -19,7 +19,7 @@ interface AuthContextValue { members: MemberWithUser[]; agents: Agent[]; isLoading: boolean; - login: (email: string, name?: string) => Promise; + login: (email: string, name?: string, redirectTo?: string) => Promise; logout: () => void; switchWorkspace: (workspaceId: string) => Promise; createWorkspace: (data: { name: string; slug: string; description?: string }) => Promise; @@ -121,7 +121,7 @@ export function AuthProvider({ children }: { children: ReactNode }) { })(); }, [hydrateWorkspace]); - const login = useCallback(async (email: string, name?: string) => { + const login = useCallback(async (email: string, name?: string, redirectTo?: string) => { const { token, user: u } = await api.login(email, name); api.setToken(token); localStorage.setItem("multica_token", token); @@ -130,7 +130,7 @@ export function AuthProvider({ children }: { children: ReactNode }) { const wsList = await api.listWorkspaces(); await hydrateWorkspace(wsList); - router.push("/issues"); + router.push(redirectTo || "/issues"); }, [hydrateWorkspace, router]); const logout = useCallback(() => { diff --git a/apps/web/test/helpers.tsx b/apps/web/test/helpers.tsx index 169ffbbb..fd03c4b7 100644 --- a/apps/web/test/helpers.tsx +++ b/apps/web/test/helpers.tsx @@ -43,6 +43,7 @@ export const mockAgents: Agent[] = [ { id: "agent-1", workspace_id: "ws-1", + runtime_id: "runtime-1", name: "Claude Agent", description: "", avatar_url: null, diff --git a/packages/sdk/src/api-client.ts b/packages/sdk/src/api-client.ts index 6e7a2385..c2bdadbd 100644 --- a/packages/sdk/src/api-client.ts +++ b/packages/sdk/src/api-client.ts @@ -11,6 +11,9 @@ import type { CreateAgentRequest, UpdateAgentRequest, AgentTask, + AgentRuntime, + DaemonPairingSession, + ApproveDaemonPairingSessionRequest, InboxItem, Comment, Workspace, @@ -187,10 +190,31 @@ export class ApiClient { await this.fetch(`/api/agents/${id}`, { method: "DELETE" }); } + async listRuntimes(params?: { workspace_id?: string }): Promise { + const search = new URLSearchParams(); + const wsId = params?.workspace_id ?? this.workspaceId; + if (wsId) search.set("workspace_id", wsId); + return this.fetch(`/api/runtimes?${search}`); + } + async listAgentTasks(agentId: string): Promise { return this.fetch(`/api/agents/${agentId}/tasks`); } + async getDaemonPairingSession(token: string): Promise { + return this.fetch(`/api/daemon/pairing-sessions/${token}`); + } + + async approveDaemonPairingSession( + token: string, + data: ApproveDaemonPairingSessionRequest, + ): Promise { + return this.fetch(`/api/daemon/pairing-sessions/${token}/approve`, { + method: "POST", + body: JSON.stringify(data), + }); + } + // Inbox async listInbox(): Promise { return this.fetch("/api/inbox"); diff --git a/packages/types/src/agent.ts b/packages/types/src/agent.ts index dbd2b46e..2a83c2b6 100644 --- a/packages/types/src/agent.ts +++ b/packages/types/src/agent.ts @@ -8,12 +8,21 @@ export type AgentTriggerType = "on_assign" | "scheduled"; export interface RuntimeDevice { id: string; + workspace_id: string; + daemon_id: string | null; name: string; runtime_mode: AgentRuntimeMode; + provider: string; status: "online" | "offline"; device_info: string; + metadata: Record; + last_seen_at: string | null; + created_at: string; + updated_at: string; } +export type AgentRuntime = RuntimeDevice; + export interface AgentTool { id: string; name: string; @@ -33,6 +42,7 @@ export interface AgentTrigger { export interface AgentTask { id: string; agent_id: string; + runtime_id: string; issue_id: string; status: "queued" | "dispatched" | "running" | "completed" | "failed" | "cancelled"; priority: number; @@ -47,6 +57,7 @@ export interface AgentTask { export interface Agent { id: string; workspace_id: string; + runtime_id: string; name: string; description: string; avatar_url: string | null; @@ -67,7 +78,7 @@ export interface CreateAgentRequest { name: string; description?: string; avatar_url?: string; - runtime_mode?: AgentRuntimeMode; + runtime_id: string; runtime_config?: Record; visibility?: AgentVisibility; max_concurrent_tasks?: number; @@ -80,6 +91,7 @@ export interface UpdateAgentRequest { name?: string; description?: string; avatar_url?: string; + runtime_id?: string; runtime_config?: Record; visibility?: AgentVisibility; status?: AgentStatus; diff --git a/packages/types/src/daemon.ts b/packages/types/src/daemon.ts new file mode 100644 index 00000000..459a67a5 --- /dev/null +++ b/packages/types/src/daemon.ts @@ -0,0 +1,22 @@ +export type DaemonPairingSessionStatus = "pending" | "approved" | "claimed" | "expired"; + +export interface DaemonPairingSession { + token: string; + daemon_id: string; + device_name: string; + runtime_name: string; + runtime_type: string; + runtime_version: string; + workspace_id: string | null; + status: DaemonPairingSessionStatus; + approved_at: string | null; + claimed_at: string | null; + expires_at: string; + created_at: string; + updated_at: string; + link_url?: string | null; +} + +export interface ApproveDaemonPairingSessionRequest { + workspace_id: string; +} diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 3834a5d0..d1788e69 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -8,6 +8,7 @@ export type { AgentTool, AgentTrigger, AgentTask, + AgentRuntime, RuntimeDevice, CreateAgentRequest, UpdateAgentRequest, @@ -15,5 +16,6 @@ export type { export type { Workspace, Member, MemberRole, User, MemberWithUser } from "./workspace.js"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox.js"; export type { Comment, CommentType, CommentAuthorType } from "./comment.js"; +export type { DaemonPairingSession, DaemonPairingSessionStatus, ApproveDaemonPairingSessionRequest } from "./daemon.js"; export type * from "./events.js"; export type * from "./api.js"; diff --git a/server/cmd/daemon/daemon.go b/server/cmd/daemon/daemon.go new file mode 100644 index 00000000..3723d338 --- /dev/null +++ b/server/cmd/daemon/daemon.go @@ -0,0 +1,826 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +const ( + defaultServerURL = "ws://localhost:8080/ws" + defaultDaemonConfigPath = ".multica/daemon.json" + defaultPollInterval = 3 * time.Second + defaultHeartbeatInterval = 15 * time.Second + defaultCodexTimeout = 20 * time.Minute + defaultRuntimeName = "Local Codex" + defaultCodexPath = "codex" +) + +type config struct { + ServerBaseURL string + ConfigPath string + WorkspaceID string + DaemonID string + DeviceName string + RuntimeName string + CodexPath string + CodexModel string + DefaultWorkdir string + PollInterval time.Duration + HeartbeatInterval time.Duration + CodexTimeout time.Duration +} + +type daemon struct { + cfg config + client *daemonClient + logger *log.Logger +} + +type daemonClient struct { + baseURL string + client *http.Client +} + +type daemonRuntime struct { + ID string `json:"id"` + Name string `json:"name"` + Provider string `json:"provider"` + Status string `json:"status"` +} + +type daemonPairingSession struct { + Token string `json:"token"` + DaemonID string `json:"daemon_id"` + DeviceName string `json:"device_name"` + RuntimeName string `json:"runtime_name"` + RuntimeType string `json:"runtime_type"` + RuntimeVersion string `json:"runtime_version"` + WorkspaceID *string `json:"workspace_id"` + Status string `json:"status"` + ApprovedAt *string `json:"approved_at"` + ClaimedAt *string `json:"claimed_at"` + ExpiresAt string `json:"expires_at"` + LinkURL *string `json:"link_url"` +} + +type daemonPersistedConfig struct { + WorkspaceID string `json:"workspace_id"` +} + +type daemonTask struct { + ID string `json:"id"` + AgentID string `json:"agent_id"` + IssueID string `json:"issue_id"` + Context daemonTaskContext `json:"context"` +} + +type daemonTaskContext struct { + Issue daemonIssueContext `json:"issue"` + Agent daemonAgentContext `json:"agent"` + Runtime daemonRuntimeContext `json:"runtime"` +} + +type daemonIssueContext struct { + ID string `json:"id"` + Title string `json:"title"` + Description string `json:"description"` + AcceptanceCriteria []string `json:"acceptance_criteria"` + ContextRefs []string `json:"context_refs"` + Repository *daemonRepoRef `json:"repository"` +} + +type daemonAgentContext struct { + ID string `json:"id"` + Name string `json:"name"` + Skills string `json:"skills"` +} + +type daemonRuntimeContext struct { + ID string `json:"id"` + Name string `json:"name"` + Provider string `json:"provider"` + DeviceInfo string `json:"device_info"` +} + +type daemonRepoRef struct { + URL string `json:"url"` + Branch string `json:"branch"` + Path string `json:"path"` +} + +type codexTaskResult struct { + Status string `json:"status"` + Comment string `json:"comment"` +} + +func loadConfig() (config, error) { + serverBaseURL, err := normalizeServerBaseURL(envOrDefault("MULTICA_SERVER_URL", defaultServerURL)) + if err != nil { + return config{}, err + } + + configPath, err := resolveDaemonConfigPath(strings.TrimSpace(os.Getenv("MULTICA_DAEMON_CONFIG"))) + if err != nil { + return config{}, err + } + persisted, err := loadPersistedDaemonConfig(configPath) + if err != nil { + return config{}, err + } + workspaceID := strings.TrimSpace(os.Getenv("MULTICA_WORKSPACE_ID")) + if workspaceID == "" { + workspaceID = persisted.WorkspaceID + } + + codexPath := envOrDefault("MULTICA_CODEX_PATH", defaultCodexPath) + if _, err := exec.LookPath(codexPath); err != nil { + return config{}, fmt.Errorf("codex executable not found at %q: %w", codexPath, err) + } + + host, err := os.Hostname() + if err != nil || strings.TrimSpace(host) == "" { + host = "local-machine" + } + + defaultWorkdir := strings.TrimSpace(os.Getenv("MULTICA_CODEX_WORKDIR")) + if defaultWorkdir == "" { + defaultWorkdir, err = os.Getwd() + if err != nil { + return config{}, fmt.Errorf("resolve working directory: %w", err) + } + } + defaultWorkdir, err = filepath.Abs(defaultWorkdir) + if err != nil { + return config{}, fmt.Errorf("resolve absolute workdir: %w", err) + } + + pollInterval, err := durationFromEnv("MULTICA_DAEMON_POLL_INTERVAL", defaultPollInterval) + if err != nil { + return config{}, err + } + heartbeatInterval, err := durationFromEnv("MULTICA_DAEMON_HEARTBEAT_INTERVAL", defaultHeartbeatInterval) + if err != nil { + return config{}, err + } + codexTimeout, err := durationFromEnv("MULTICA_CODEX_TIMEOUT", defaultCodexTimeout) + if err != nil { + return config{}, err + } + + return config{ + ServerBaseURL: serverBaseURL, + ConfigPath: configPath, + WorkspaceID: workspaceID, + DaemonID: envOrDefault("MULTICA_DAEMON_ID", host), + DeviceName: envOrDefault("MULTICA_DAEMON_DEVICE_NAME", host), + RuntimeName: envOrDefault("MULTICA_CODEX_RUNTIME_NAME", defaultRuntimeName), + CodexPath: codexPath, + CodexModel: strings.TrimSpace(os.Getenv("MULTICA_CODEX_MODEL")), + DefaultWorkdir: defaultWorkdir, + PollInterval: pollInterval, + HeartbeatInterval: heartbeatInterval, + CodexTimeout: codexTimeout, + }, nil +} + +func newDaemon(cfg config, logger *log.Logger) *daemon { + return &daemon{ + cfg: cfg, + client: &daemonClient{baseURL: cfg.ServerBaseURL, client: &http.Client{Timeout: 30 * time.Second}}, + logger: logger, + } +} + +func (d *daemon) run(ctx context.Context) error { + d.logger.Printf("starting daemon for workspace=%s server=%s runtime=%s workdir=%s", + d.cfg.WorkspaceID, d.cfg.ServerBaseURL, d.cfg.RuntimeName, d.cfg.DefaultWorkdir) + + if strings.TrimSpace(d.cfg.WorkspaceID) == "" { + workspaceID, err := d.ensurePaired(ctx) + if err != nil { + return err + } + d.cfg.WorkspaceID = workspaceID + d.logger.Printf("pairing completed for workspace=%s", workspaceID) + } + + runtime, err := d.registerRuntime(ctx) + if err != nil { + return err + } + d.logger.Printf("registered runtime id=%s provider=%s status=%s", runtime.ID, runtime.Provider, runtime.Status) + + go d.heartbeatLoop(ctx, runtime.ID) + return d.pollLoop(ctx, runtime.ID) +} + +func (d *daemon) registerRuntime(ctx context.Context) (daemonRuntime, error) { + version, err := detectCodexVersion(ctx, d.cfg.CodexPath) + if err != nil { + return daemonRuntime{}, err + } + + req := map[string]any{ + "workspace_id": d.cfg.WorkspaceID, + "daemon_id": d.cfg.DaemonID, + "device_name": d.cfg.DeviceName, + "runtimes": []map[string]string{ + { + "name": d.cfg.RuntimeName, + "type": "codex", + "version": version, + "status": "online", + }, + }, + } + + var resp struct { + Runtimes []daemonRuntime `json:"runtimes"` + } + if err := d.client.postJSON(ctx, "/api/daemon/register", req, &resp); err != nil { + return daemonRuntime{}, fmt.Errorf("register runtime: %w", err) + } + if len(resp.Runtimes) == 0 { + return daemonRuntime{}, fmt.Errorf("register runtime: empty response") + } + return resp.Runtimes[0], nil +} + +func (d *daemon) ensurePaired(ctx context.Context) (string, error) { + version, err := detectCodexVersion(ctx, d.cfg.CodexPath) + if err != nil { + return "", err + } + + session, err := d.client.createPairingSession(ctx, map[string]string{ + "daemon_id": d.cfg.DaemonID, + "device_name": d.cfg.DeviceName, + "runtime_name": d.cfg.RuntimeName, + "runtime_type": "codex", + "runtime_version": version, + }) + if err != nil { + return "", fmt.Errorf("create pairing session: %w", err) + } + if session.LinkURL != nil { + d.logger.Printf("open this link to pair the local Codex runtime: %s", *session.LinkURL) + } else { + d.logger.Printf("pairing session created: %s", session.Token) + } + + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + current, err := d.client.getPairingSession(ctx, session.Token) + if err != nil { + return "", fmt.Errorf("poll pairing session: %w", err) + } + + switch current.Status { + case "approved", "claimed": + if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" { + return "", fmt.Errorf("pairing session approved without workspace") + } + if err := savePersistedDaemonConfig(d.cfg.ConfigPath, daemonPersistedConfig{ + WorkspaceID: strings.TrimSpace(*current.WorkspaceID), + }); err != nil { + return "", err + } + if current.Status != "claimed" { + if _, err := d.client.claimPairingSession(ctx, current.Token); err != nil { + return "", fmt.Errorf("claim pairing session: %w", err) + } + } + return strings.TrimSpace(*current.WorkspaceID), nil + case "expired": + return "", fmt.Errorf("pairing session expired before approval") + } + + if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return "", err + } + } +} + +func (d *daemon) heartbeatLoop(ctx context.Context, runtimeID string) { + ticker := time.NewTicker(d.cfg.HeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := d.client.postJSON(ctx, "/api/daemon/heartbeat", map[string]string{ + "runtime_id": runtimeID, + }, nil) + if err != nil { + d.logger.Printf("heartbeat failed: %v", err) + } + } + } +} + +func (d *daemon) pollLoop(ctx context.Context, runtimeID string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + task, err := d.client.claimTask(ctx, runtimeID) + if err != nil { + d.logger.Printf("claim task failed: %v", err) + if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return err + } + continue + } + if task == nil { + if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil { + return err + } + continue + } + + d.handleTask(ctx, *task) + } +} + +func (d *daemon) handleTask(ctx context.Context, task daemonTask) { + d.logger.Printf("picked task=%s issue=%s title=%q", task.ID, task.IssueID, task.Context.Issue.Title) + + if err := d.client.startTask(ctx, task.ID); err != nil { + d.logger.Printf("start task %s failed: %v", task.ID, err) + return + } + + _ = d.client.reportProgress(ctx, task.ID, "Launching Codex", 1, 2) + + result, err := d.runTask(ctx, task) + if err != nil { + d.logger.Printf("task %s failed: %v", task.ID, err) + if failErr := d.client.failTask(ctx, task.ID, err.Error()); failErr != nil { + d.logger.Printf("fail task %s callback failed: %v", task.ID, failErr) + } + return + } + + _ = d.client.reportProgress(ctx, task.ID, "Finishing task", 2, 2) + + switch result.Status { + case "blocked": + if err := d.client.failTask(ctx, task.ID, result.Comment); err != nil { + d.logger.Printf("report blocked task %s failed: %v", task.ID, err) + } + default: + if err := d.client.completeTask(ctx, task.ID, result.Comment); err != nil { + d.logger.Printf("complete task %s failed: %v", task.ID, err) + } + } +} + +func (d *daemon) runTask(ctx context.Context, task daemonTask) (codexTaskResult, error) { + workdir, err := resolveTaskWorkdir(d.cfg.DefaultWorkdir, task.Context.Issue.Repository) + if err != nil { + return codexTaskResult{}, err + } + + prompt := buildCodexPrompt(task, workdir) + runCtx, cancel := context.WithTimeout(ctx, d.cfg.CodexTimeout) + defer cancel() + + model := d.cfg.CodexModel + if model == "" { + model = "default" + } + + startedAt := time.Now() + d.logger.Printf( + "starting codex exec task=%s workdir=%s model=%s timeout=%s", + task.ID, + workdir, + model, + d.cfg.CodexTimeout, + ) + + result, err := runCodexExec(runCtx, d.cfg, workdir, prompt) + if err != nil { + d.logger.Printf( + "codex exec failed task=%s duration=%s err=%v", + task.ID, + time.Since(startedAt).Round(time.Millisecond), + err, + ) + if errors.Is(runCtx.Err(), context.DeadlineExceeded) { + return codexTaskResult{}, fmt.Errorf("Codex timed out after %s", d.cfg.CodexTimeout) + } + return codexTaskResult{}, err + } + + d.logger.Printf( + "codex exec finished task=%s duration=%s status=%s", + task.ID, + time.Since(startedAt).Round(time.Millisecond), + result.Status, + ) + return result, nil +} + +func runCodexExec(ctx context.Context, cfg config, workdir, prompt string) (codexTaskResult, error) { + outputFile, err := os.CreateTemp("", "multica-codex-output-*.json") + if err != nil { + return codexTaskResult{}, fmt.Errorf("create codex output file: %w", err) + } + outputPath := outputFile.Name() + outputFile.Close() + defer os.Remove(outputPath) + + schemaFile, err := os.CreateTemp("", "multica-codex-schema-*.json") + if err != nil { + return codexTaskResult{}, fmt.Errorf("create schema file: %w", err) + } + schemaPath := schemaFile.Name() + if _, err := schemaFile.WriteString(codexResultSchema); err != nil { + schemaFile.Close() + return codexTaskResult{}, fmt.Errorf("write schema file: %w", err) + } + schemaFile.Close() + defer os.Remove(schemaPath) + + args := []string{ + "-a", "never", + "exec", + "--skip-git-repo-check", + "--sandbox", "workspace-write", + "-C", workdir, + "--output-schema", schemaPath, + "-o", outputPath, + prompt, + } + if cfg.CodexModel != "" { + args = append([]string{"-m", cfg.CodexModel}, args...) + } + + cmd := exec.CommandContext(ctx, cfg.CodexPath, args...) + var output bytes.Buffer + cmd.Stdout = &output + cmd.Stderr = &output + + if err := cmd.Run(); err != nil { + return codexTaskResult{}, fmt.Errorf("codex exec failed: %w\n%s", err, strings.TrimSpace(output.String())) + } + + data, err := os.ReadFile(outputPath) + if err != nil { + return codexTaskResult{}, fmt.Errorf("read codex result: %w", err) + } + + var result codexTaskResult + if err := json.Unmarshal(data, &result); err != nil { + return codexTaskResult{}, fmt.Errorf("parse codex result: %w", err) + } + if result.Comment == "" { + return codexTaskResult{}, fmt.Errorf("codex returned empty comment") + } + if result.Status == "" { + result.Status = "completed" + } + + return result, nil +} + +func buildCodexPrompt(task daemonTask, workdir string) string { + var b strings.Builder + b.WriteString("You are running as the local Codex runtime for a Multica agent.\n") + b.WriteString("Complete the assigned issue using the local environment.\n") + b.WriteString("Return a concise Markdown comment suitable for posting back to the issue.\n") + b.WriteString("If you cannot complete the task because context, files, or permissions are missing, return status \"blocked\" and explain the blocker in the comment.\n\n") + + fmt.Fprintf(&b, "Working directory: %s\n", workdir) + fmt.Fprintf(&b, "Agent: %s\n", task.Context.Agent.Name) + fmt.Fprintf(&b, "Issue title: %s\n\n", task.Context.Issue.Title) + + if task.Context.Issue.Description != "" { + b.WriteString("Issue description:\n") + b.WriteString(task.Context.Issue.Description) + b.WriteString("\n\n") + } + + if len(task.Context.Issue.AcceptanceCriteria) > 0 { + b.WriteString("Acceptance criteria:\n") + for _, item := range task.Context.Issue.AcceptanceCriteria { + fmt.Fprintf(&b, "- %s\n", item) + } + b.WriteString("\n") + } + + if len(task.Context.Issue.ContextRefs) > 0 { + b.WriteString("Context refs:\n") + for _, item := range task.Context.Issue.ContextRefs { + fmt.Fprintf(&b, "- %s\n", item) + } + b.WriteString("\n") + } + + if repo := task.Context.Issue.Repository; repo != nil { + b.WriteString("Repository context:\n") + if repo.URL != "" { + fmt.Fprintf(&b, "- url: %s\n", repo.URL) + } + if repo.Branch != "" { + fmt.Fprintf(&b, "- branch: %s\n", repo.Branch) + } + if repo.Path != "" { + fmt.Fprintf(&b, "- path: %s\n", repo.Path) + } + b.WriteString("\n") + } + + if task.Context.Agent.Skills != "" { + b.WriteString("Agent skills/instructions:\n") + b.WriteString(task.Context.Agent.Skills) + b.WriteString("\n\n") + } + + b.WriteString("Comment requirements:\n") + b.WriteString("- Lead with the outcome.\n") + b.WriteString("- Mention concrete files or commands if you changed anything.\n") + b.WriteString("- Mention blockers or follow-up actions if relevant.\n") + + return b.String() +} + +func resolveTaskWorkdir(defaultWorkdir string, repo *daemonRepoRef) (string, error) { + base := defaultWorkdir + if repo == nil || strings.TrimSpace(repo.Path) == "" { + return base, nil + } + + path := strings.TrimSpace(repo.Path) + if !filepath.IsAbs(path) { + path = filepath.Join(base, path) + } + path = filepath.Clean(path) + + info, err := os.Stat(path) + if err != nil { + return "", fmt.Errorf("repository path not found: %s", path) + } + if !info.IsDir() { + return "", fmt.Errorf("repository path is not a directory: %s", path) + } + return path, nil +} + +func detectCodexVersion(ctx context.Context, codexPath string) (string, error) { + cmd := exec.CommandContext(ctx, codexPath, "--version") + data, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("detect codex version: %w", err) + } + return strings.TrimSpace(string(data)), nil +} + +func resolveDaemonConfigPath(raw string) (string, error) { + if raw != "" { + return filepath.Abs(raw) + } + + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("resolve daemon config path: %w", err) + } + return filepath.Join(home, defaultDaemonConfigPath), nil +} + +func loadPersistedDaemonConfig(path string) (daemonPersistedConfig, error) { + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return daemonPersistedConfig{}, nil + } + return daemonPersistedConfig{}, fmt.Errorf("read daemon config: %w", err) + } + + var cfg daemonPersistedConfig + if err := json.Unmarshal(data, &cfg); err != nil { + return daemonPersistedConfig{}, fmt.Errorf("parse daemon config: %w", err) + } + return cfg, nil +} + +func savePersistedDaemonConfig(path string, cfg daemonPersistedConfig) error { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return fmt.Errorf("create daemon config directory: %w", err) + } + data, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("encode daemon config: %w", err) + } + if err := os.WriteFile(path, append(data, '\n'), 0o600); err != nil { + return fmt.Errorf("write daemon config: %w", err) + } + return nil +} + +func normalizeServerBaseURL(raw string) (string, error) { + u, err := url.Parse(strings.TrimSpace(raw)) + if err != nil { + return "", fmt.Errorf("invalid MULTICA_SERVER_URL: %w", err) + } + switch u.Scheme { + case "ws": + u.Scheme = "http" + case "wss": + u.Scheme = "https" + case "http", "https": + default: + return "", fmt.Errorf("MULTICA_SERVER_URL must use ws, wss, http, or https") + } + if u.Path == "/ws" { + u.Path = "" + } + u.RawPath = "" + u.RawQuery = "" + u.Fragment = "" + return strings.TrimRight(u.String(), "/"), nil +} + +func durationFromEnv(key string, fallback time.Duration) (time.Duration, error) { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback, nil + } + d, err := time.ParseDuration(value) + if err != nil { + return 0, fmt.Errorf("%s: invalid duration %q: %w", key, value, err) + } + return d, nil +} + +func envOrDefault(key, fallback string) string { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + return value +} + +func sleepWithContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + +func (c *daemonClient) claimTask(ctx context.Context, runtimeID string) (*daemonTask, error) { + var resp struct { + Task *daemonTask `json:"task"` + } + if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/tasks/claim", runtimeID), map[string]any{}, &resp); err != nil { + return nil, err + } + return resp.Task, nil +} + +func (c *daemonClient) createPairingSession(ctx context.Context, req map[string]string) (daemonPairingSession, error) { + var resp daemonPairingSession + if err := c.postJSON(ctx, "/api/daemon/pairing-sessions", req, &resp); err != nil { + return daemonPairingSession{}, err + } + return resp, nil +} + +func (c *daemonClient) getPairingSession(ctx context.Context, token string) (daemonPairingSession, error) { + var resp daemonPairingSession + if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s", url.PathEscape(token)), &resp); err != nil { + return daemonPairingSession{}, err + } + return resp, nil +} + +func (c *daemonClient) claimPairingSession(ctx context.Context, token string) (daemonPairingSession, error) { + var resp daemonPairingSession + if err := c.postJSON(ctx, fmt.Sprintf("/api/daemon/pairing-sessions/%s/claim", url.PathEscape(token)), map[string]any{}, &resp); err != nil { + return daemonPairingSession{}, err + } + return resp, nil +} + +func (c *daemonClient) startTask(ctx context.Context, taskID string) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/start", taskID), map[string]any{}, nil) +} + +func (c *daemonClient) reportProgress(ctx context.Context, taskID, summary string, step, total int) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/progress", taskID), map[string]any{ + "summary": summary, + "step": step, + "total": total, + }, nil) +} + +func (c *daemonClient) completeTask(ctx context.Context, taskID, output string) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), map[string]any{ + "output": output, + }, nil) +} + +func (c *daemonClient) failTask(ctx context.Context, taskID, errMsg string) error { + return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/fail", taskID), map[string]any{ + "error": errMsg, + }, nil) +} + +func (c *daemonClient) postJSON(ctx context.Context, path string, reqBody any, respBody any) error { + var body io.Reader + if reqBody != nil { + data, err := json.Marshal(reqBody) + if err != nil { + return err + } + body = bytes.NewReader(data) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, body) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("%s %s returned %d: %s", http.MethodPost, path, resp.StatusCode, strings.TrimSpace(string(data))) + } + if respBody == nil { + io.Copy(io.Discard, resp.Body) + return nil + } + return json.NewDecoder(resp.Body).Decode(respBody) +} + +func (c *daemonClient) getJSON(ctx context.Context, path string, respBody any) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) + if err != nil { + return err + } + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("%s %s returned %d: %s", http.MethodGet, path, resp.StatusCode, strings.TrimSpace(string(data))) + } + if respBody == nil { + io.Copy(io.Discard, resp.Body) + return nil + } + return json.NewDecoder(resp.Body).Decode(respBody) +} + +const codexResultSchema = `{ + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["completed", "blocked"] + }, + "comment": { + "type": "string" + } + }, + "required": ["status", "comment"], + "additionalProperties": false +}` diff --git a/server/cmd/daemon/daemon_test.go b/server/cmd/daemon/daemon_test.go new file mode 100644 index 00000000..a31be2ff --- /dev/null +++ b/server/cmd/daemon/daemon_test.go @@ -0,0 +1,63 @@ +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestNormalizeServerBaseURL(t *testing.T) { + t.Parallel() + + got, err := normalizeServerBaseURL("ws://localhost:8080/ws") + if err != nil { + t.Fatalf("normalizeServerBaseURL returned error: %v", err) + } + if got != "http://localhost:8080" { + t.Fatalf("expected http://localhost:8080, got %s", got) + } +} + +func TestResolveTaskWorkdirUsesRepoPathWhenPresent(t *testing.T) { + t.Parallel() + + root := t.TempDir() + repoPath := filepath.Join(root, "repo") + if err := os.Mkdir(repoPath, 0o755); err != nil { + t.Fatalf("mkdir repo: %v", err) + } + + got, err := resolveTaskWorkdir(root, &daemonRepoRef{Path: "repo"}) + if err != nil { + t.Fatalf("resolveTaskWorkdir returned error: %v", err) + } + if got != repoPath { + t.Fatalf("expected %s, got %s", repoPath, got) + } +} + +func TestBuildCodexPromptIncludesIssueAndSkills(t *testing.T) { + t.Parallel() + + prompt := buildCodexPrompt(daemonTask{ + Context: daemonTaskContext{ + Issue: daemonIssueContext{ + Title: "Fix failing test", + Description: "Investigate and fix the test failure.", + AcceptanceCriteria: []string{"tests pass"}, + ContextRefs: []string{"log snippet"}, + }, + Agent: daemonAgentContext{ + Name: "Local Codex", + Skills: "Be concise.", + }, + }, + }, "/tmp/work") + + for _, want := range []string{"Fix failing test", "Investigate and fix the test failure.", "tests pass", "log snippet", "Be concise."} { + if !strings.Contains(prompt, want) { + t.Fatalf("prompt missing %q", want) + } + } +} diff --git a/server/cmd/daemon/main.go b/server/cmd/daemon/main.go index ded25335..11d9fd93 100644 --- a/server/cmd/daemon/main.go +++ b/server/cmd/daemon/main.go @@ -1,7 +1,8 @@ package main import ( - "fmt" + "context" + "errors" "log" "os" "os/signal" @@ -9,24 +10,18 @@ import ( ) func main() { - serverURL := os.Getenv("MULTICA_SERVER_URL") - if serverURL == "" { - port := os.Getenv("PORT") - if port == "" { - port = "8080" - } - serverURL = "ws://localhost:" + port + "/ws" + cfg, err := loadConfig() + if err != nil { + log.Fatal(err) } - fmt.Println("Multica Daemon starting...") - fmt.Printf("Connecting to server: %s\n", serverURL) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() - // TODO: Implement daemon connection, heartbeat, and task runner - log.Println("Daemon is running. Press Ctrl+C to stop.") + logger := log.New(os.Stdout, "multica-daemon: ", log.LstdFlags) + d := newDaemon(cfg, logger) - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit - - log.Println("Daemon stopped") + if err := d.run(ctx); err != nil && !errors.Is(err, context.Canceled) { + logger.Fatal(err) + } } diff --git a/server/cmd/seed/main.go b/server/cmd/seed/main.go index 376bb074..fd9ea7f7 100644 --- a/server/cmd/seed/main.go +++ b/server/cmd/seed/main.go @@ -114,10 +114,37 @@ func main() { continue } err = pool.QueryRow(ctx, ` - INSERT INTO agent (workspace_id, name, description, runtime_mode, status, owner_id, skills, tools, triggers) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb) + INSERT INTO agent_runtime (workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at) + VALUES ( + $1, + NULL, + $2, + $3, + $4, + $5, + $6, + '{"seed":true}'::jsonb, + CASE WHEN $5 = 'online' THEN now() ELSE NULL END + ) RETURNING id - `, workspaceID, a.name, a.description, a.runtimeMode, a.status, userID, a.skills, a.tools, a.triggers).Scan(&agentID) + `, + workspaceID, + a.name+" Runtime", + a.runtimeMode, + map[string]string{"cloud": "multica_agent", "local": "seed_local"}[a.runtimeMode], + map[bool]string{true: "offline", false: "online"}[a.status == "offline"], + map[string]string{"cloud": "Seeded cloud runtime", "local": "Seeded local runtime"}[a.runtimeMode], + ).Scan(&agentID) + if err != nil { + log.Printf("Failed to create runtime for agent %s: %v", a.name, err) + continue + } + runtimeID := agentID + err = pool.QueryRow(ctx, ` + INSERT INTO agent (workspace_id, name, description, runtime_mode, runtime_id, status, owner_id, skills, tools, triggers) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb, $10::jsonb) + RETURNING id + `, workspaceID, a.name, a.description, a.runtimeMode, runtimeID, a.status, userID, a.skills, a.tools, a.triggers).Scan(&agentID) if err != nil { log.Printf("Failed to create agent %s: %v", a.name, err) continue diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 078f31f5..4f9e84d3 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -52,7 +52,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router { r.Use(chimw.RequestID) r.Use(cors.Handler(cors.Options{ AllowedOrigins: allowedOrigins(), - AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, + AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"}, AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-Workspace-ID"}, AllowCredentials: true, MaxAge: 300, @@ -74,14 +74,16 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router { // Daemon API routes (no user auth; daemon auth deferred to later) r.Route("/api/daemon", func(r chi.Router) { + r.Post("/pairing-sessions", h.CreateDaemonPairingSession) + r.Get("/pairing-sessions/{token}", h.GetDaemonPairingSession) + r.Post("/pairing-sessions/{token}/claim", h.ClaimDaemonPairingSession) + r.Post("/register", h.DaemonRegister) r.Post("/heartbeat", h.DaemonHeartbeat) - // Task claiming (daemon polls for work) - r.Post("/agents/{agentId}/tasks/claim", h.ClaimTask) - r.Get("/agents/{agentId}/tasks/pending", h.ListPendingTasks) + r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime) + r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime) - // Task lifecycle (daemon reports status) r.Post("/tasks/{taskId}/start", h.StartTask) r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress) r.Post("/tasks/{taskId}/complete", h.CompleteTask) @@ -127,6 +129,12 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router { }) }) + r.Route("/api/runtimes", func(r chi.Router) { + r.Get("/", h.ListAgentRuntimes) + }) + + r.Post("/api/daemon/pairing-sessions/{token}/approve", h.ApproveDaemonPairingSession) + // Inbox r.Route("/api/inbox", func(r chi.Router) { r.Get("/", h.ListInbox) diff --git a/server/internal/handler/agent.go b/server/internal/handler/agent.go index b4c99458..39235b3b 100644 --- a/server/internal/handler/agent.go +++ b/server/internal/handler/agent.go @@ -12,6 +12,7 @@ import ( type AgentResponse struct { ID string `json:"id"` WorkspaceID string `json:"workspace_id"` + RuntimeID string `json:"runtime_id"` Name string `json:"name"` Description string `json:"description"` AvatarURL *string `json:"avatar_url"` @@ -56,6 +57,7 @@ func agentToResponse(a db.Agent) AgentResponse { return AgentResponse{ ID: uuidToString(a.ID), WorkspaceID: uuidToString(a.WorkspaceID), + RuntimeID: uuidToString(a.RuntimeID), Name: a.Name, Description: a.Description, AvatarURL: textToPtr(a.AvatarUrl), @@ -76,6 +78,7 @@ func agentToResponse(a db.Agent) AgentResponse { type AgentTaskResponse struct { ID string `json:"id"` AgentID string `json:"agent_id"` + RuntimeID string `json:"runtime_id"` IssueID string `json:"issue_id"` Status string `json:"status"` Priority int32 `json:"priority"` @@ -100,6 +103,7 @@ func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse { return AgentTaskResponse{ ID: uuidToString(t.ID), AgentID: uuidToString(t.AgentID), + RuntimeID: uuidToString(t.RuntimeID), IssueID: uuidToString(t.IssueID), Status: t.Status, Priority: t.Priority, @@ -146,7 +150,7 @@ type CreateAgentRequest struct { Name string `json:"name"` Description string `json:"description"` AvatarURL *string `json:"avatar_url"` - RuntimeMode string `json:"runtime_mode"` + RuntimeID string `json:"runtime_id"` RuntimeConfig any `json:"runtime_config"` Visibility string `json:"visibility"` MaxConcurrentTasks int32 `json:"max_concurrent_tasks"` @@ -176,8 +180,9 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "name is required") return } - if req.RuntimeMode == "" { - req.RuntimeMode = "local" + if req.RuntimeID == "" { + writeError(w, http.StatusBadRequest, "runtime_id is required") + return } if req.Visibility == "" { req.Visibility = "workspace" @@ -186,6 +191,15 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) { req.MaxConcurrentTasks = 1 } + runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{ + ID: parseUUID(req.RuntimeID), + WorkspaceID: parseUUID(workspaceID), + }) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid runtime_id") + return + } + rc, _ := json.Marshal(req.RuntimeConfig) if req.RuntimeConfig == nil { rc = []byte("{}") @@ -206,8 +220,9 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) { Name: req.Name, Description: req.Description, AvatarUrl: ptrToText(req.AvatarURL), - RuntimeMode: req.RuntimeMode, + RuntimeMode: runtime.RuntimeMode, RuntimeConfig: rc, + RuntimeID: runtime.ID, Visibility: req.Visibility, MaxConcurrentTasks: req.MaxConcurrentTasks, OwnerID: parseUUID(ownerID), @@ -220,6 +235,11 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) { return } + if runtime.Status == "online" { + h.TaskService.ReconcileAgentStatus(r.Context(), agent.ID) + agent, _ = h.Queries.GetAgent(r.Context(), agent.ID) + } + writeJSON(w, http.StatusCreated, agentToResponse(agent)) } @@ -227,6 +247,7 @@ type UpdateAgentRequest struct { Name *string `json:"name"` Description *string `json:"description"` AvatarURL *string `json:"avatar_url"` + RuntimeID *string `json:"runtime_id"` RuntimeConfig any `json:"runtime_config"` Visibility *string `json:"visibility"` Status *string `json:"status"` @@ -268,6 +289,18 @@ func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) { rc, _ := json.Marshal(req.RuntimeConfig) params.RuntimeConfig = rc } + if req.RuntimeID != nil { + runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{ + ID: parseUUID(*req.RuntimeID), + WorkspaceID: agent.WorkspaceID, + }) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid runtime_id") + return + } + params.RuntimeID = runtime.ID + params.RuntimeMode = pgtype.Text{String: runtime.RuntimeMode, Valid: true} + } if req.Visibility != nil { params.Visibility = pgtype.Text{String: *req.Visibility, Valid: true} } diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index 9fb96764..e02f2923 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -2,7 +2,9 @@ package handler import ( "encoding/json" + "fmt" "net/http" + "strings" "github.com/go-chi/chi/v5" db "github.com/multica-ai/multica/server/pkg/db/generated" @@ -13,9 +15,11 @@ import ( // --------------------------------------------------------------------------- type DaemonRegisterRequest struct { - DaemonID string `json:"daemon_id"` - AgentID string `json:"agent_id"` - Runtimes []struct { + WorkspaceID string `json:"workspace_id"` + DaemonID string `json:"daemon_id"` + DeviceName string `json:"device_name"` + Runtimes []struct { + Name string `json:"name"` Type string `json:"type"` Version string `json:"version"` Status string `json:"status"` @@ -29,36 +33,68 @@ func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) { return } - if req.DaemonID == "" || req.AgentID == "" { - writeError(w, http.StatusBadRequest, "daemon_id and agent_id are required") + if req.DaemonID == "" { + writeError(w, http.StatusBadRequest, "daemon_id is required") + return + } + if req.WorkspaceID == "" { + writeError(w, http.StatusBadRequest, "workspace_id is required") + return + } + if len(req.Runtimes) == 0 { + writeError(w, http.StatusBadRequest, "at least one runtime is required") return } - runtimeInfo, _ := json.Marshal(req.Runtimes) + resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes)) + for _, runtime := range req.Runtimes { + provider := strings.TrimSpace(runtime.Type) + if provider == "" { + provider = "unknown" + } + name := strings.TrimSpace(runtime.Name) + if name == "" { + name = provider + if req.DeviceName != "" { + name = fmt.Sprintf("%s (%s)", provider, req.DeviceName) + } + } + deviceInfo := strings.TrimSpace(req.DeviceName) + if runtime.Version != "" && deviceInfo != "" { + deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version) + } else if runtime.Version != "" { + deviceInfo = runtime.Version + } + status := "online" + if runtime.Status == "offline" { + status = "offline" + } + metadata, _ := json.Marshal(map[string]any{ + "version": runtime.Version, + }) - conn, err := h.Queries.UpsertDaemonConnection(r.Context(), db.UpsertDaemonConnectionParams{ - AgentID: parseUUID(req.AgentID), - DaemonID: req.DaemonID, - RuntimeInfo: runtimeInfo, - }) - if err != nil { - writeError(w, http.StatusInternalServerError, "failed to register daemon: "+err.Error()) - return + registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{ + WorkspaceID: parseUUID(req.WorkspaceID), + DaemonID: strToText(req.DaemonID), + Name: name, + RuntimeMode: "local", + Provider: provider, + Status: status, + DeviceInfo: deviceInfo, + Metadata: metadata, + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error()) + return + } + resp = append(resp, runtimeToResponse(registered)) } - // Reconcile agent status (set to idle if no running tasks) - h.TaskService.ReconcileAgentStatus(r.Context(), parseUUID(req.AgentID)) - - writeJSON(w, http.StatusOK, map[string]any{ - "connection_id": uuidToString(conn.ID), - "status": conn.Status, - }) + writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp}) } type DaemonHeartbeatRequest struct { - DaemonID string `json:"daemon_id"` - AgentID string `json:"agent_id"` - CurrentTasks int `json:"current_tasks"` + RuntimeID string `json:"runtime_id"` } func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { @@ -68,10 +104,12 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { return } - err := h.Queries.UpdateDaemonHeartbeat(r.Context(), db.UpdateDaemonHeartbeatParams{ - DaemonID: req.DaemonID, - AgentID: parseUUID(req.AgentID), - }) + if req.RuntimeID == "" { + writeError(w, http.StatusBadRequest, "runtime_id is required") + return + } + + _, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "heartbeat failed") return @@ -80,15 +118,11 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } -// --------------------------------------------------------------------------- -// Task Lifecycle (called by daemon) -// --------------------------------------------------------------------------- +// ClaimTaskByRuntime atomically claims the next queued task for a runtime. +func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") -// ClaimTask atomically claims the next queued task for an agent. -func (h *Handler) ClaimTask(w http.ResponseWriter, r *http.Request) { - agentID := chi.URLParam(r, "agentId") - - task, err := h.TaskService.ClaimTask(r.Context(), parseUUID(agentID)) + task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error()) return @@ -102,11 +136,11 @@ func (h *Handler) ClaimTask(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(*task)}) } -// ListPendingTasks returns queued/dispatched tasks for an agent. -func (h *Handler) ListPendingTasks(w http.ResponseWriter, r *http.Request) { - agentID := chi.URLParam(r, "agentId") +// ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime. +func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) { + runtimeID := chi.URLParam(r, "runtimeId") - tasks, err := h.Queries.ListPendingTasks(r.Context(), parseUUID(agentID)) + tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to list pending tasks") return @@ -120,6 +154,10 @@ func (h *Handler) ListPendingTasks(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, resp) } +// --------------------------------------------------------------------------- +// Task Lifecycle (called by daemon) +// --------------------------------------------------------------------------- + // StartTask marks a dispatched task as running. func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") diff --git a/server/internal/handler/daemon_pairing.go b/server/internal/handler/daemon_pairing.go new file mode 100644 index 00000000..9cf7747f --- /dev/null +++ b/server/internal/handler/daemon_pairing.go @@ -0,0 +1,386 @@ +package handler + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" +) + +const daemonPairingTTL = 10 * time.Minute + +type daemonPairingSessionRecord struct { + Token string + DaemonID string + DeviceName string + RuntimeName string + RuntimeType string + RuntimeVersion string + WorkspaceID pgtype.UUID + ApprovedBy pgtype.UUID + Status string + ApprovedAt pgtype.Timestamptz + ClaimedAt pgtype.Timestamptz + ExpiresAt pgtype.Timestamptz + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz +} + +type DaemonPairingSessionResponse struct { + Token string `json:"token"` + DaemonID string `json:"daemon_id"` + DeviceName string `json:"device_name"` + RuntimeName string `json:"runtime_name"` + RuntimeType string `json:"runtime_type"` + RuntimeVersion string `json:"runtime_version"` + WorkspaceID *string `json:"workspace_id"` + Status string `json:"status"` + ApprovedAt *string `json:"approved_at"` + ClaimedAt *string `json:"claimed_at"` + ExpiresAt string `json:"expires_at"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + LinkURL *string `json:"link_url,omitempty"` +} + +type CreateDaemonPairingSessionRequest struct { + DaemonID string `json:"daemon_id"` + DeviceName string `json:"device_name"` + RuntimeName string `json:"runtime_name"` + RuntimeType string `json:"runtime_type"` + RuntimeVersion string `json:"runtime_version"` +} + +type ApproveDaemonPairingSessionRequest struct { + WorkspaceID string `json:"workspace_id"` +} + +func daemonAppBaseURL() string { + for _, key := range []string{"MULTICA_APP_URL", "FRONTEND_ORIGIN"} { + if value := strings.TrimSpace(os.Getenv(key)); value != "" { + return strings.TrimRight(value, "/") + } + } + return "http://localhost:3000" +} + +func daemonPairingLinkURL(token string) string { + base := daemonAppBaseURL() + return base + "/pair/local?token=" + url.QueryEscape(token) +} + +func daemonPairingSessionToResponse(rec daemonPairingSessionRecord, includeLink bool) DaemonPairingSessionResponse { + resp := DaemonPairingSessionResponse{ + Token: rec.Token, + DaemonID: rec.DaemonID, + DeviceName: rec.DeviceName, + RuntimeName: rec.RuntimeName, + RuntimeType: rec.RuntimeType, + RuntimeVersion: rec.RuntimeVersion, + WorkspaceID: uuidToPtr(rec.WorkspaceID), + Status: rec.Status, + ApprovedAt: timestampToPtr(rec.ApprovedAt), + ClaimedAt: timestampToPtr(rec.ClaimedAt), + ExpiresAt: timestampToString(rec.ExpiresAt), + CreatedAt: timestampToString(rec.CreatedAt), + UpdatedAt: timestampToString(rec.UpdatedAt), + } + if includeLink { + link := daemonPairingLinkURL(rec.Token) + resp.LinkURL = &link + } + return resp +} + +func randomDaemonPairingToken() (string, error) { + bytes := make([]byte, 16) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} + +func (h *Handler) getDaemonPairingSession(ctx context.Context, token string) (daemonPairingSessionRecord, error) { + if h.DB == nil { + return daemonPairingSessionRecord{}, fmt.Errorf("database executor is not configured") + } + + var rec daemonPairingSessionRecord + err := h.DB.QueryRow(ctx, ` + SELECT + token, + daemon_id, + device_name, + runtime_name, + runtime_type, + runtime_version, + workspace_id, + approved_by, + status, + approved_at, + claimed_at, + expires_at, + created_at, + updated_at + FROM daemon_pairing_session + WHERE token = $1 + `, token).Scan( + &rec.Token, + &rec.DaemonID, + &rec.DeviceName, + &rec.RuntimeName, + &rec.RuntimeType, + &rec.RuntimeVersion, + &rec.WorkspaceID, + &rec.ApprovedBy, + &rec.Status, + &rec.ApprovedAt, + &rec.ClaimedAt, + &rec.ExpiresAt, + &rec.CreatedAt, + &rec.UpdatedAt, + ) + if err != nil { + return daemonPairingSessionRecord{}, err + } + + if rec.Status == "pending" && rec.ExpiresAt.Valid && rec.ExpiresAt.Time.Before(time.Now()) { + if _, err := h.DB.Exec(ctx, ` + UPDATE daemon_pairing_session + SET status = 'expired', updated_at = now() + WHERE token = $1 AND status = 'pending' + `, token); err == nil { + rec.Status = "expired" + rec.UpdatedAt = pgtype.Timestamptz{Time: time.Now(), Valid: true} + } + } + + return rec, nil +} + +func (h *Handler) CreateDaemonPairingSession(w http.ResponseWriter, r *http.Request) { + if h.DB == nil { + writeError(w, http.StatusInternalServerError, "database executor is not configured") + return + } + + var req CreateDaemonPairingSessionRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + req.DaemonID = strings.TrimSpace(req.DaemonID) + req.DeviceName = strings.TrimSpace(req.DeviceName) + req.RuntimeName = strings.TrimSpace(req.RuntimeName) + req.RuntimeType = strings.TrimSpace(req.RuntimeType) + req.RuntimeVersion = strings.TrimSpace(req.RuntimeVersion) + + if req.DaemonID == "" { + writeError(w, http.StatusBadRequest, "daemon_id is required") + return + } + if req.DeviceName == "" { + writeError(w, http.StatusBadRequest, "device_name is required") + return + } + if req.RuntimeName == "" { + writeError(w, http.StatusBadRequest, "runtime_name is required") + return + } + if req.RuntimeType == "" { + writeError(w, http.StatusBadRequest, "runtime_type is required") + return + } + + token, err := randomDaemonPairingToken() + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to create pairing token") + return + } + + expiresAt := time.Now().Add(daemonPairingTTL) + var rec daemonPairingSessionRecord + err = h.DB.QueryRow(r.Context(), ` + INSERT INTO daemon_pairing_session ( + token, + daemon_id, + device_name, + runtime_name, + runtime_type, + runtime_version, + expires_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING + token, + daemon_id, + device_name, + runtime_name, + runtime_type, + runtime_version, + workspace_id, + approved_by, + status, + approved_at, + claimed_at, + expires_at, + created_at, + updated_at + `, + token, + req.DaemonID, + req.DeviceName, + req.RuntimeName, + req.RuntimeType, + req.RuntimeVersion, + expiresAt, + ).Scan( + &rec.Token, + &rec.DaemonID, + &rec.DeviceName, + &rec.RuntimeName, + &rec.RuntimeType, + &rec.RuntimeVersion, + &rec.WorkspaceID, + &rec.ApprovedBy, + &rec.Status, + &rec.ApprovedAt, + &rec.ClaimedAt, + &rec.ExpiresAt, + &rec.CreatedAt, + &rec.UpdatedAt, + ) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to create pairing session") + return + } + + writeJSON(w, http.StatusCreated, daemonPairingSessionToResponse(rec, true)) +} + +func (h *Handler) GetDaemonPairingSession(w http.ResponseWriter, r *http.Request) { + token := chi.URLParam(r, "token") + rec, err := h.getDaemonPairingSession(r.Context(), token) + if err != nil { + writeError(w, http.StatusNotFound, "pairing session not found") + return + } + writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true)) +} + +func (h *Handler) ApproveDaemonPairingSession(w http.ResponseWriter, r *http.Request) { + token := chi.URLParam(r, "token") + rec, err := h.getDaemonPairingSession(r.Context(), token) + if err != nil { + writeError(w, http.StatusNotFound, "pairing session not found") + return + } + if rec.Status == "expired" { + writeError(w, http.StatusBadRequest, "pairing session expired") + return + } + if rec.Status == "claimed" { + writeError(w, http.StatusBadRequest, "pairing session already claimed") + return + } + if rec.Status == "approved" { + writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true)) + return + } + + var req ApproveDaemonPairingSessionRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + if req.WorkspaceID == "" { + writeError(w, http.StatusBadRequest, "workspace_id is required") + return + } + + userID, ok := requireUserID(w, r) + if !ok { + return + } + if _, ok := h.requireWorkspaceMember(w, r, req.WorkspaceID, "workspace not found"); !ok { + return + } + + if h.DB == nil { + writeError(w, http.StatusInternalServerError, "database executor is not configured") + return + } + + if _, err := h.DB.Exec(r.Context(), ` + UPDATE daemon_pairing_session + SET + workspace_id = $2, + approved_by = $3, + status = 'approved', + approved_at = now(), + updated_at = now() + WHERE token = $1 AND status = 'pending' + `, token, parseUUID(req.WorkspaceID), parseUUID(userID)); err != nil { + writeError(w, http.StatusInternalServerError, "failed to approve pairing session") + return + } + + rec, err = h.getDaemonPairingSession(r.Context(), token) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to reload pairing session") + return + } + + writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true)) +} + +func (h *Handler) ClaimDaemonPairingSession(w http.ResponseWriter, r *http.Request) { + token := chi.URLParam(r, "token") + rec, err := h.getDaemonPairingSession(r.Context(), token) + if err != nil { + writeError(w, http.StatusNotFound, "pairing session not found") + return + } + if rec.Status == "claimed" { + writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true)) + return + } + if rec.Status != "approved" { + writeError(w, http.StatusBadRequest, "pairing session is not approved") + return + } + + if h.DB == nil { + writeError(w, http.StatusInternalServerError, "database executor is not configured") + return + } + + if _, err := h.DB.Exec(r.Context(), ` + UPDATE daemon_pairing_session + SET + status = 'claimed', + claimed_at = now(), + updated_at = now() + WHERE token = $1 AND status = 'approved' + `, token); err != nil { + writeError(w, http.StatusInternalServerError, "failed to claim pairing session") + return + } + + rec, err = h.getDaemonPairingSession(r.Context(), token) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to reload pairing session") + return + } + + writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true)) +} diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index d5845828..dc31d0b5 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -20,16 +20,29 @@ type txStarter interface { Begin(ctx context.Context) (pgx.Tx, error) } +type dbExecutor interface { + Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +} + type Handler struct { Queries *db.Queries + DB dbExecutor TxStarter txStarter Hub *realtime.Hub TaskService *service.TaskService } func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub) *Handler { + var executor dbExecutor + if candidate, ok := txStarter.(dbExecutor); ok { + executor = candidate + } + return &Handler{ Queries: queries, + DB: executor, TxStarter: txStarter, Hub: hub, TaskService: service.NewTaskService(queries, hub), diff --git a/server/internal/handler/issue.go b/server/internal/handler/issue.go index 88b348fb..f0f431a8 100644 --- a/server/internal/handler/issue.go +++ b/server/internal/handler/issue.go @@ -1,6 +1,7 @@ package handler import ( + "context" "encoding/json" "io" "net/http" @@ -14,24 +15,30 @@ import ( // IssueResponse is the JSON response for an issue. type IssueResponse struct { - ID string `json:"id"` - WorkspaceID string `json:"workspace_id"` - Title string `json:"title"` - Description *string `json:"description"` - Status string `json:"status"` - Priority string `json:"priority"` - AssigneeType *string `json:"assignee_type"` - AssigneeID *string `json:"assignee_id"` - CreatorType string `json:"creator_type"` - CreatorID string `json:"creator_id"` - ParentIssueID *string `json:"parent_issue_id"` - AcceptanceCriteria []any `json:"acceptance_criteria"` - ContextRefs []any `json:"context_refs"` - Repository any `json:"repository"` - Position float64 `json:"position"` - DueDate *string `json:"due_date"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` + ID string `json:"id"` + WorkspaceID string `json:"workspace_id"` + Title string `json:"title"` + Description *string `json:"description"` + Status string `json:"status"` + Priority string `json:"priority"` + AssigneeType *string `json:"assignee_type"` + AssigneeID *string `json:"assignee_id"` + CreatorType string `json:"creator_type"` + CreatorID string `json:"creator_id"` + ParentIssueID *string `json:"parent_issue_id"` + AcceptanceCriteria []any `json:"acceptance_criteria"` + ContextRefs []any `json:"context_refs"` + Repository any `json:"repository"` + Position float64 `json:"position"` + DueDate *string `json:"due_date"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +type agentTriggerSnapshot struct { + Type string `json:"type"` + Enabled bool `json:"enabled"` + Config map[string]any `json:"config"` } func issueToResponse(i db.Issue) IssueResponse { @@ -258,8 +265,8 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) { h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)}) } - // If assigned to an agent, enqueue a task with context - if issue.AssigneeType.String == "agent" { + // Only ready issues in todo are enqueued for agents. + if h.shouldEnqueueAgentTask(r.Context(), issue) { h.TaskService.EnqueueTaskForIssue(r.Context(), issue) } } @@ -283,12 +290,12 @@ type UpdateIssueRequest struct { func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") - current, ok := h.loadIssueForUser(w, r, id) + prevIssue, ok := h.loadIssueForUser(w, r, id) if !ok { return } - // Read body as raw bytes so we can detect which fields were explicitly sent + // Read body as raw bytes so we can detect which fields were explicitly sent. bodyBytes, err := io.ReadAll(r.Body) if err != nil { writeError(w, http.StatusBadRequest, "failed to read request body") @@ -307,10 +314,10 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { // Pre-fill nullable fields (bare sqlc.narg) with current values params := db.UpdateIssueParams{ - ID: current.ID, - AssigneeType: current.AssigneeType, - AssigneeID: current.AssigneeID, - DueDate: current.DueDate, + ID: prevIssue.ID, + AssigneeType: prevIssue.AssigneeType, + AssigneeID: prevIssue.AssigneeID, + DueDate: prevIssue.DueDate, } // COALESCE fields — only set when explicitly provided @@ -379,16 +386,21 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { resp := issueToResponse(issue) h.broadcast("issue:updated", map[string]any{"issue": resp}) - // If assignee changed, handle agent task queue - if req.AssigneeType != nil || req.AssigneeID != nil { - // Cancel any existing agent tasks for this issue + assigneeChanged := (req.AssigneeType != nil || req.AssigneeID != nil) && + (prevIssue.AssigneeType.String != issue.AssigneeType.String || uuidToString(prevIssue.AssigneeID) != uuidToString(issue.AssigneeID)) + statusChanged := req.Status != nil && prevIssue.Status != issue.Status + + // If assignee or readiness status changed, reconcile the task queue. + if assigneeChanged || statusChanged { h.TaskService.CancelTasksForIssue(r.Context(), issue.ID) - // If newly assigned to an agent, enqueue a task with context - if issue.AssigneeType.Valid && issue.AssigneeType.String == "agent" && issue.AssigneeID.Valid { + if h.shouldEnqueueAgentTask(r.Context(), issue) { h.TaskService.EnqueueTaskForIssue(r.Context(), issue) } + } + // If assignee changed, create a notification for the new assignee. + if assigneeChanged { // Create inbox notification for new assignee if issue.AssigneeType.Valid && issue.AssigneeID.Valid { inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ @@ -427,6 +439,34 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, resp) } +func (h *Handler) shouldEnqueueAgentTask(ctx context.Context, issue db.Issue) bool { + if issue.Status != "todo" { + return false + } + if !issue.AssigneeType.Valid || issue.AssigneeType.String != "agent" || !issue.AssigneeID.Valid { + return false + } + + agent, err := h.Queries.GetAgent(ctx, issue.AssigneeID) + if err != nil || !agent.RuntimeID.Valid { + return false + } + if agent.Triggers == nil || len(agent.Triggers) == 0 { + return true + } + + var triggers []agentTriggerSnapshot + if err := json.Unmarshal(agent.Triggers, &triggers); err != nil { + return false + } + for _, trigger := range triggers { + if trigger.Type == "on_assign" && trigger.Enabled { + return true + } + } + return false +} + func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") if _, ok := h.loadIssueForUser(w, r, id); !ok { diff --git a/server/internal/handler/runtime.go b/server/internal/handler/runtime.go new file mode 100644 index 00000000..48d4dcef --- /dev/null +++ b/server/internal/handler/runtime.go @@ -0,0 +1,68 @@ +package handler + +import ( + "encoding/json" + "net/http" + + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +type AgentRuntimeResponse struct { + ID string `json:"id"` + WorkspaceID string `json:"workspace_id"` + DaemonID *string `json:"daemon_id"` + Name string `json:"name"` + RuntimeMode string `json:"runtime_mode"` + Provider string `json:"provider"` + Status string `json:"status"` + DeviceInfo string `json:"device_info"` + Metadata any `json:"metadata"` + LastSeenAt *string `json:"last_seen_at"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse { + var metadata any + if rt.Metadata != nil { + json.Unmarshal(rt.Metadata, &metadata) + } + if metadata == nil { + metadata = map[string]any{} + } + + return AgentRuntimeResponse{ + ID: uuidToString(rt.ID), + WorkspaceID: uuidToString(rt.WorkspaceID), + DaemonID: textToPtr(rt.DaemonID), + Name: rt.Name, + RuntimeMode: rt.RuntimeMode, + Provider: rt.Provider, + Status: rt.Status, + DeviceInfo: rt.DeviceInfo, + Metadata: metadata, + LastSeenAt: timestampToPtr(rt.LastSeenAt), + CreatedAt: timestampToString(rt.CreatedAt), + UpdatedAt: timestampToString(rt.UpdatedAt), + } +} + +func (h *Handler) ListAgentRuntimes(w http.ResponseWriter, r *http.Request) { + workspaceID := resolveWorkspaceID(r) + if _, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found"); !ok { + return + } + + runtimes, err := h.Queries.ListAgentRuntimes(r.Context(), parseUUID(workspaceID)) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list runtimes") + return + } + + resp := make([]AgentRuntimeResponse, len(runtimes)) + for i, rt := range runtimes { + resp[i] = runtimeToResponse(rt) + } + + writeJSON(w, http.StatusOK, resp) +} diff --git a/server/internal/service/task.go b/server/internal/service/task.go index 992935c7..df04849c 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -8,9 +8,9 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/internal/realtime" "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" ) @@ -29,14 +29,28 @@ func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue) ( return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee") } - snapshot := buildContextSnapshot(issue) + agent, err := s.Queries.GetAgent(ctx, issue.AssigneeID) + if err != nil { + return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err) + } + if !agent.RuntimeID.Valid { + return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime") + } + + runtime, err := s.Queries.GetAgentRuntime(ctx, agent.RuntimeID) + if err != nil { + return db.AgentTaskQueue{}, fmt.Errorf("load runtime: %w", err) + } + + snapshot := buildContextSnapshot(issue, agent, runtime) contextJSON, _ := json.Marshal(snapshot) task, err := s.Queries.CreateAgentTaskWithContext(ctx, db.CreateAgentTaskWithContextParams{ - AgentID: issue.AssigneeID, - IssueID: issue.ID, - Priority: priorityToInt(issue.Priority), - Context: contextJSON, + AgentID: issue.AssigneeID, + RuntimeID: agent.RuntimeID, + IssueID: issue.ID, + Priority: priorityToInt(issue.Priority), + Context: contextJSON, }) if err != nil { return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err) @@ -83,6 +97,34 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A return &task, nil } +// ClaimTaskForRuntime claims the next runnable task for a runtime while +// still respecting each agent's max_concurrent_tasks limit. +func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) { + tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID) + if err != nil { + return nil, fmt.Errorf("list pending tasks: %w", err) + } + + triedAgents := map[string]struct{}{} + for _, candidate := range tasks { + agentKey := util.UUIDToString(candidate.AgentID) + if _, seen := triedAgents[agentKey]; seen { + continue + } + triedAgents[agentKey] = struct{}{} + + task, err := s.ClaimTask(ctx, candidate.AgentID) + if err != nil { + return nil, err + } + if task != nil && task.RuntimeID == runtimeID { + return task, nil + } + } + + return nil, nil +} + // StartTask transitions a dispatched task to running and syncs issue status. func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) { task, err := s.Queries.StartAgentTask(ctx, taskID) @@ -91,10 +133,13 @@ func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.Ag } // Sync issue → in_progress - s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ + issue, err := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ ID: task.IssueID, Status: "in_progress", }) + if err == nil { + s.broadcastIssueUpdated(issue) + } return &task, nil } @@ -110,10 +155,23 @@ func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, resu } // Sync issue → in_review - s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ + issue, issueErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ ID: task.IssueID, Status: "in_review", }) + if issueErr == nil { + s.broadcastIssueUpdated(issue) + } + + var payload protocol.TaskCompletedPayload + if err := json.Unmarshal(result, &payload); err == nil { + if payload.Output != "" { + s.createAgentComment(ctx, task.IssueID, task.AgentID, payload.Output, "comment") + } + } + if issueErr == nil { + s.createInboxForIssueCreator(ctx, issue, "review_requested", "attention", "Review requested: "+issue.Title, "") + } // Reconcile agent status s.ReconcileAgentStatus(ctx, task.AgentID) @@ -135,10 +193,19 @@ func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg s } // Sync issue → blocked - s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ + issue, issueErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ ID: task.IssueID, Status: "blocked", }) + if issueErr == nil { + s.broadcastIssueUpdated(issue) + } + if errMsg != "" { + s.createAgentComment(ctx, task.IssueID, task.AgentID, errMsg, "system") + } + if issueErr == nil { + s.createInboxForIssueCreator(ctx, issue, "agent_blocked", "action_required", "Agent blocked: "+issue.Title, errMsg) + } // Reconcile agent status s.ReconcileAgentStatus(ctx, task.AgentID) @@ -183,7 +250,7 @@ func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID s.broadcast(protocol.EventAgentStatus, map[string]any{"agent": agentToMap(agent)}) } -func buildContextSnapshot(issue db.Issue) protocol.TaskDispatchPayload { +func buildContextSnapshot(issue db.Issue, agent db.Agent, runtime db.AgentRuntime) map[string]any { var ac []string if issue.AcceptanceCriteria != nil { json.Unmarshal(issue.AcceptanceCriteria, &ac) @@ -198,13 +265,38 @@ func buildContextSnapshot(issue db.Issue) protocol.TaskDispatchPayload { json.Unmarshal(issue.Repository, repo) } - return protocol.TaskDispatchPayload{ - IssueID: util.UUIDToString(issue.ID), - Title: issue.Title, - Description: issue.Description.String, - AcceptanceCriteria: ac, - ContextRefs: cr, - Repository: repo, + var tools any + if agent.Tools != nil { + json.Unmarshal(agent.Tools, &tools) + } + var metadata any + if runtime.Metadata != nil { + json.Unmarshal(runtime.Metadata, &metadata) + } + + return map[string]any{ + "issue": map[string]any{ + "id": util.UUIDToString(issue.ID), + "title": issue.Title, + "description": issue.Description.String, + "acceptance_criteria": ac, + "context_refs": cr, + "repository": repo, + }, + "agent": map[string]any{ + "id": util.UUIDToString(agent.ID), + "name": agent.Name, + "skills": agent.Skills, + "tools": tools, + }, + "runtime": map[string]any{ + "id": util.UUIDToString(runtime.ID), + "name": runtime.Name, + "runtime_mode": runtime.RuntimeMode, + "provider": runtime.Provider, + "device_info": runtime.DeviceInfo, + "metadata": metadata, + }, } } @@ -224,11 +316,15 @@ func priorityToInt(p string) int32 { } func (s *TaskService) broadcastTaskDispatch(task db.AgentTaskQueue) { - var payload protocol.TaskDispatchPayload + var payload map[string]any if task.Context != nil { json.Unmarshal(task.Context, &payload) } - payload.TaskID = util.UUIDToString(task.ID) + if payload == nil { + payload = map[string]any{} + } + payload["task_id"] = util.UUIDToString(task.ID) + payload["runtime_id"] = util.UUIDToString(task.RuntimeID) s.broadcast(protocol.EventTaskDispatch, payload) } @@ -253,6 +349,108 @@ func (s *TaskService) broadcast(eventType string, payload any) { s.Hub.Broadcast(data) } +func (s *TaskService) broadcastIssueUpdated(issue db.Issue) { + s.broadcast(protocol.EventIssueUpdated, map[string]any{ + "issue": issueToMap(issue), + }) +} + +func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID pgtype.UUID, content, commentType string) { + if content == "" { + return + } + s.Queries.CreateComment(ctx, db.CreateCommentParams{ + IssueID: issueID, + AuthorType: "agent", + AuthorID: agentID, + Content: content, + Type: commentType, + }) +} + +func (s *TaskService) createInboxForIssueCreator(ctx context.Context, issue db.Issue, itemType, severity, title, body string) { + if issue.CreatorType != "member" { + return + } + item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{ + WorkspaceID: issue.WorkspaceID, + RecipientType: "member", + RecipientID: issue.CreatorID, + Type: itemType, + Severity: severity, + IssueID: issue.ID, + Title: title, + Body: util.PtrToText(&body), + }) + if err != nil { + return + } + s.broadcast(protocol.EventInboxNew, map[string]any{ + "item": inboxToMap(item), + }) +} + +func issueToMap(issue db.Issue) map[string]any { + var ac []any + if issue.AcceptanceCriteria != nil { + json.Unmarshal(issue.AcceptanceCriteria, &ac) + } + if ac == nil { + ac = []any{} + } + + var cr []any + if issue.ContextRefs != nil { + json.Unmarshal(issue.ContextRefs, &cr) + } + if cr == nil { + cr = []any{} + } + + var repo any + if issue.Repository != nil { + json.Unmarshal(issue.Repository, &repo) + } + + return map[string]any{ + "id": util.UUIDToString(issue.ID), + "workspace_id": util.UUIDToString(issue.WorkspaceID), + "title": issue.Title, + "description": util.TextToPtr(issue.Description), + "status": issue.Status, + "priority": issue.Priority, + "assignee_type": util.TextToPtr(issue.AssigneeType), + "assignee_id": util.UUIDToPtr(issue.AssigneeID), + "creator_type": issue.CreatorType, + "creator_id": util.UUIDToString(issue.CreatorID), + "parent_issue_id": util.UUIDToPtr(issue.ParentIssueID), + "acceptance_criteria": ac, + "context_refs": cr, + "repository": repo, + "position": issue.Position, + "due_date": util.TimestampToPtr(issue.DueDate), + "created_at": util.TimestampToString(issue.CreatedAt), + "updated_at": util.TimestampToString(issue.UpdatedAt), + } +} + +func inboxToMap(item db.InboxItem) map[string]any { + return map[string]any{ + "id": util.UUIDToString(item.ID), + "workspace_id": util.UUIDToString(item.WorkspaceID), + "recipient_type": item.RecipientType, + "recipient_id": util.UUIDToString(item.RecipientID), + "type": item.Type, + "severity": item.Severity, + "issue_id": util.UUIDToPtr(item.IssueID), + "title": item.Title, + "body": util.TextToPtr(item.Body), + "read": item.Read, + "archived": item.Archived, + "created_at": util.TimestampToString(item.CreatedAt), + } +} + // agentToMap builds a simple map for broadcasting agent status updates. func agentToMap(a db.Agent) map[string]any { var rc any @@ -270,6 +468,7 @@ func agentToMap(a db.Agent) map[string]any { return map[string]any{ "id": util.UUIDToString(a.ID), "workspace_id": util.UUIDToString(a.WorkspaceID), + "runtime_id": util.UUIDToString(a.RuntimeID), "name": a.Name, "description": a.Description, "avatar_url": util.TextToPtr(a.AvatarUrl), diff --git a/server/migrations/004_agent_runtime_loop.down.sql b/server/migrations/004_agent_runtime_loop.down.sql new file mode 100644 index 00000000..76a1aa94 --- /dev/null +++ b/server/migrations/004_agent_runtime_loop.down.sql @@ -0,0 +1,13 @@ +DROP INDEX IF EXISTS idx_agent_task_queue_runtime_pending; +DROP INDEX IF EXISTS idx_agent_runtime_status; +DROP INDEX IF EXISTS idx_agent_runtime_workspace; + +ALTER TABLE agent_task_queue + DROP CONSTRAINT IF EXISTS agent_task_queue_runtime_id_fkey, + DROP COLUMN IF EXISTS runtime_id; + +ALTER TABLE agent + DROP CONSTRAINT IF EXISTS agent_runtime_id_fkey, + DROP COLUMN IF EXISTS runtime_id; + +DROP TABLE IF EXISTS agent_runtime; diff --git a/server/migrations/004_agent_runtime_loop.up.sql b/server/migrations/004_agent_runtime_loop.up.sql new file mode 100644 index 00000000..10da436c --- /dev/null +++ b/server/migrations/004_agent_runtime_loop.up.sql @@ -0,0 +1,92 @@ +CREATE TABLE agent_runtime ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE, + daemon_id TEXT, + name TEXT NOT NULL, + runtime_mode TEXT NOT NULL CHECK (runtime_mode IN ('local', 'cloud')), + provider TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'offline' CHECK (status IN ('online', 'offline')), + device_info TEXT NOT NULL DEFAULT '', + metadata JSONB NOT NULL DEFAULT '{}', + last_seen_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (workspace_id, daemon_id, provider) +); + +ALTER TABLE agent + ADD COLUMN runtime_id UUID; + +INSERT INTO agent_runtime ( + workspace_id, + daemon_id, + name, + runtime_mode, + provider, + status, + device_info, + metadata, + last_seen_at, + created_at, + updated_at +) +SELECT + a.workspace_id, + NULL, + COALESCE(NULLIF(a.runtime_config->>'runtime_name', ''), a.name || ' Runtime'), + a.runtime_mode, + COALESCE( + NULLIF(a.runtime_config->>'provider', ''), + CASE + WHEN a.runtime_mode = 'cloud' THEN 'multica_agent' + ELSE 'legacy_local' + END + ), + CASE + WHEN a.status = 'offline' THEN 'offline' + ELSE 'online' + END, + COALESCE( + NULLIF(a.runtime_config->>'runtime_name', ''), + CASE + WHEN a.runtime_mode = 'cloud' THEN 'Cloud runtime' + ELSE 'Local runtime' + END + ), + jsonb_build_object('migrated_agent_id', a.id::text), + CASE + WHEN a.status = 'offline' THEN NULL + ELSE a.updated_at + END, + a.created_at, + a.updated_at +FROM agent a; + +UPDATE agent a +SET runtime_id = ar.id +FROM agent_runtime ar +WHERE ar.metadata->>'migrated_agent_id' = a.id::text; + +ALTER TABLE agent + ALTER COLUMN runtime_id SET NOT NULL, + ADD CONSTRAINT agent_runtime_id_fkey + FOREIGN KEY (runtime_id) REFERENCES agent_runtime(id) ON DELETE RESTRICT; + +ALTER TABLE agent_task_queue + ADD COLUMN runtime_id UUID; + +UPDATE agent_task_queue atq +SET runtime_id = a.runtime_id +FROM agent a +WHERE a.id = atq.agent_id; + +ALTER TABLE agent_task_queue + ALTER COLUMN runtime_id SET NOT NULL, + ADD CONSTRAINT agent_task_queue_runtime_id_fkey + FOREIGN KEY (runtime_id) REFERENCES agent_runtime(id) ON DELETE CASCADE; + +CREATE INDEX idx_agent_runtime_workspace ON agent_runtime(workspace_id); +CREATE INDEX idx_agent_runtime_status ON agent_runtime(workspace_id, status); +CREATE INDEX idx_agent_task_queue_runtime_pending + ON agent_task_queue(runtime_id, priority DESC, created_at ASC) + WHERE status IN ('queued', 'dispatched'); diff --git a/server/migrations/005_daemon_pairing.down.sql b/server/migrations/005_daemon_pairing.down.sql new file mode 100644 index 00000000..25e28eb7 --- /dev/null +++ b/server/migrations/005_daemon_pairing.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS daemon_pairing_session; diff --git a/server/migrations/005_daemon_pairing.up.sql b/server/migrations/005_daemon_pairing.up.sql new file mode 100644 index 00000000..8ae69808 --- /dev/null +++ b/server/migrations/005_daemon_pairing.up.sql @@ -0,0 +1,20 @@ +CREATE TABLE daemon_pairing_session ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + token TEXT NOT NULL UNIQUE, + daemon_id TEXT NOT NULL, + device_name TEXT NOT NULL, + runtime_name TEXT NOT NULL, + runtime_type TEXT NOT NULL, + runtime_version TEXT NOT NULL DEFAULT '', + workspace_id UUID REFERENCES workspace(id) ON DELETE CASCADE, + approved_by UUID REFERENCES "user"(id) ON DELETE SET NULL, + status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'approved', 'claimed', 'expired')), + approved_at TIMESTAMPTZ, + claimed_at TIMESTAMPTZ, + expires_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_daemon_pairing_session_token ON daemon_pairing_session (token); +CREATE INDEX idx_daemon_pairing_session_status_expires ON daemon_pairing_session (status, expires_at); diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index 8b87bce9..81b20b66 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -32,7 +32,7 @@ WHERE id = ( LIMIT 1 FOR UPDATE SKIP LOCKED ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id ` func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) { @@ -51,6 +51,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } @@ -59,7 +60,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one UPDATE agent_task_queue SET status = 'completed', completed_at = now(), result = $2 WHERE id = $1 AND status = 'running' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id ` type CompleteAgentTaskParams struct { @@ -83,6 +84,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } @@ -102,10 +104,10 @@ func (q *Queries) CountRunningTasks(ctx context.Context, agentID pgtype.UUID) (i const createAgent = `-- name: CreateAgent :one INSERT INTO agent ( workspace_id, name, description, avatar_url, runtime_mode, - runtime_config, visibility, max_concurrent_tasks, owner_id, + runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id, skills, tools, triggers -) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) -RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers +) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id ` type CreateAgentParams struct { @@ -115,6 +117,7 @@ type CreateAgentParams struct { AvatarUrl pgtype.Text `json:"avatar_url"` RuntimeMode string `json:"runtime_mode"` RuntimeConfig []byte `json:"runtime_config"` + RuntimeID pgtype.UUID `json:"runtime_id"` Visibility string `json:"visibility"` MaxConcurrentTasks int32 `json:"max_concurrent_tasks"` OwnerID pgtype.UUID `json:"owner_id"` @@ -131,6 +134,7 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent arg.AvatarUrl, arg.RuntimeMode, arg.RuntimeConfig, + arg.RuntimeID, arg.Visibility, arg.MaxConcurrentTasks, arg.OwnerID, @@ -156,24 +160,31 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent &i.Skills, &i.Tools, &i.Triggers, + &i.RuntimeID, ) return i, err } const createAgentTask = `-- name: CreateAgentTask :one -INSERT INTO agent_task_queue (agent_id, issue_id, status, priority) -VALUES ($1, $2, 'queued', $3) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context +INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority) +VALUES ($1, $2, $3, 'queued', $4) +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id ` type CreateAgentTaskParams struct { - AgentID pgtype.UUID `json:"agent_id"` - IssueID pgtype.UUID `json:"issue_id"` - Priority int32 `json:"priority"` + AgentID pgtype.UUID `json:"agent_id"` + RuntimeID pgtype.UUID `json:"runtime_id"` + IssueID pgtype.UUID `json:"issue_id"` + Priority int32 `json:"priority"` } func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) { - row := q.db.QueryRow(ctx, createAgentTask, arg.AgentID, arg.IssueID, arg.Priority) + row := q.db.QueryRow(ctx, createAgentTask, + arg.AgentID, + arg.RuntimeID, + arg.IssueID, + arg.Priority, + ) var i AgentTaskQueue err := row.Scan( &i.ID, @@ -188,26 +199,29 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } const createAgentTaskWithContext = `-- name: CreateAgentTaskWithContext :one -INSERT INTO agent_task_queue (agent_id, issue_id, status, priority, context) -VALUES ($1, $2, 'queued', $3, $4) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context +INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context) +VALUES ($1, $2, $3, 'queued', $4, $5) +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id ` type CreateAgentTaskWithContextParams struct { - AgentID pgtype.UUID `json:"agent_id"` - IssueID pgtype.UUID `json:"issue_id"` - Priority int32 `json:"priority"` - Context []byte `json:"context"` + AgentID pgtype.UUID `json:"agent_id"` + RuntimeID pgtype.UUID `json:"runtime_id"` + IssueID pgtype.UUID `json:"issue_id"` + Priority int32 `json:"priority"` + Context []byte `json:"context"` } func (q *Queries) CreateAgentTaskWithContext(ctx context.Context, arg CreateAgentTaskWithContextParams) (AgentTaskQueue, error) { row := q.db.QueryRow(ctx, createAgentTaskWithContext, arg.AgentID, + arg.RuntimeID, arg.IssueID, arg.Priority, arg.Context, @@ -226,6 +240,7 @@ func (q *Queries) CreateAgentTaskWithContext(ctx context.Context, arg CreateAgen &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } @@ -239,22 +254,11 @@ func (q *Queries) DeleteAgent(ctx context.Context, id pgtype.UUID) error { return err } -const disconnectDaemon = `-- name: DisconnectDaemon :exec -UPDATE daemon_connection -SET status = 'disconnected', updated_at = now() -WHERE daemon_id = $1 -` - -func (q *Queries) DisconnectDaemon(ctx context.Context, daemonID string) error { - _, err := q.db.Exec(ctx, disconnectDaemon, daemonID) - return err -} - const failAgentTask = `-- name: FailAgentTask :one UPDATE agent_task_queue SET status = 'failed', completed_at = now(), error = $2 WHERE id = $1 AND status = 'running' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id ` type FailAgentTaskParams struct { @@ -278,12 +282,13 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } const getAgent = `-- name: GetAgent :one -SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers FROM agent +SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id FROM agent WHERE id = $1 ` @@ -307,12 +312,13 @@ func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) { &i.Skills, &i.Tools, &i.Triggers, + &i.RuntimeID, ) return i, err } const getAgentTask = `-- name: GetAgentTask :one -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue WHERE id = $1 ` @@ -332,12 +338,13 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } const listAgentTasks = `-- name: ListAgentTasks :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue WHERE agent_id = $1 ORDER BY created_at DESC ` @@ -364,6 +371,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ); err != nil { return nil, err } @@ -376,7 +384,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag } const listAgents = `-- name: ListAgents :many -SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers FROM agent +SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id FROM agent WHERE workspace_id = $1 ORDER BY created_at ASC ` @@ -407,6 +415,7 @@ func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Ag &i.Skills, &i.Tools, &i.Triggers, + &i.RuntimeID, ); err != nil { return nil, err } @@ -418,14 +427,14 @@ func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Ag return items, nil } -const listPendingTasks = `-- name: ListPendingTasks :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context FROM agent_task_queue -WHERE agent_id = $1 AND status IN ('queued', 'dispatched') +const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue +WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC ` -func (q *Queries) ListPendingTasks(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) { - rows, err := q.db.Query(ctx, listPendingTasks, agentID) +func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, listPendingTasksByRuntime, runtimeID) if err != nil { return nil, err } @@ -446,6 +455,7 @@ func (q *Queries) ListPendingTasks(ctx context.Context, agentID pgtype.UUID) ([] &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ); err != nil { return nil, err } @@ -461,7 +471,7 @@ const startAgentTask = `-- name: StartAgentTask :one UPDATE agent_task_queue SET status = 'running', started_at = now() WHERE id = $1 AND status = 'dispatched' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id ` func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) { @@ -480,6 +490,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask &i.Error, &i.CreatedAt, &i.Context, + &i.RuntimeID, ) return i, err } @@ -490,15 +501,17 @@ UPDATE agent SET description = COALESCE($3, description), avatar_url = COALESCE($4, avatar_url), runtime_config = COALESCE($5, runtime_config), - visibility = COALESCE($6, visibility), - status = COALESCE($7, status), - max_concurrent_tasks = COALESCE($8, max_concurrent_tasks), - skills = COALESCE($9, skills), - tools = COALESCE($10, tools), - triggers = COALESCE($11, triggers), + runtime_mode = COALESCE($6, runtime_mode), + runtime_id = COALESCE($7, runtime_id), + visibility = COALESCE($8, visibility), + status = COALESCE($9, status), + max_concurrent_tasks = COALESCE($10, max_concurrent_tasks), + skills = COALESCE($11, skills), + tools = COALESCE($12, tools), + triggers = COALESCE($13, triggers), updated_at = now() WHERE id = $1 -RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers +RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id ` type UpdateAgentParams struct { @@ -507,6 +520,8 @@ type UpdateAgentParams struct { Description pgtype.Text `json:"description"` AvatarUrl pgtype.Text `json:"avatar_url"` RuntimeConfig []byte `json:"runtime_config"` + RuntimeMode pgtype.Text `json:"runtime_mode"` + RuntimeID pgtype.UUID `json:"runtime_id"` Visibility pgtype.Text `json:"visibility"` Status pgtype.Text `json:"status"` MaxConcurrentTasks pgtype.Int4 `json:"max_concurrent_tasks"` @@ -522,6 +537,8 @@ func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent arg.Description, arg.AvatarUrl, arg.RuntimeConfig, + arg.RuntimeMode, + arg.RuntimeID, arg.Visibility, arg.Status, arg.MaxConcurrentTasks, @@ -547,6 +564,7 @@ func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent &i.Skills, &i.Tools, &i.Triggers, + &i.RuntimeID, ) return i, err } @@ -554,7 +572,7 @@ func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent const updateAgentStatus = `-- name: UpdateAgentStatus :one UPDATE agent SET status = $2, updated_at = now() WHERE id = $1 -RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers +RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, skills, tools, triggers, runtime_id ` type UpdateAgentStatusParams struct { @@ -582,52 +600,7 @@ func (q *Queries) UpdateAgentStatus(ctx context.Context, arg UpdateAgentStatusPa &i.Skills, &i.Tools, &i.Triggers, - ) - return i, err -} - -const updateDaemonHeartbeat = `-- name: UpdateDaemonHeartbeat :exec -UPDATE daemon_connection -SET last_heartbeat_at = now(), updated_at = now() -WHERE daemon_id = $1 AND agent_id = $2 -` - -type UpdateDaemonHeartbeatParams struct { - DaemonID string `json:"daemon_id"` - AgentID pgtype.UUID `json:"agent_id"` -} - -func (q *Queries) UpdateDaemonHeartbeat(ctx context.Context, arg UpdateDaemonHeartbeatParams) error { - _, err := q.db.Exec(ctx, updateDaemonHeartbeat, arg.DaemonID, arg.AgentID) - return err -} - -const upsertDaemonConnection = `-- name: UpsertDaemonConnection :one -INSERT INTO daemon_connection (agent_id, daemon_id, status, last_heartbeat_at, runtime_info) -VALUES ($1, $2, 'connected', now(), $3) -ON CONFLICT ON CONSTRAINT uq_daemon_agent -DO UPDATE SET status = 'connected', last_heartbeat_at = now(), runtime_info = $3, updated_at = now() -RETURNING id, agent_id, daemon_id, status, last_heartbeat_at, runtime_info, created_at, updated_at -` - -type UpsertDaemonConnectionParams struct { - AgentID pgtype.UUID `json:"agent_id"` - DaemonID string `json:"daemon_id"` - RuntimeInfo []byte `json:"runtime_info"` -} - -func (q *Queries) UpsertDaemonConnection(ctx context.Context, arg UpsertDaemonConnectionParams) (DaemonConnection, error) { - row := q.db.QueryRow(ctx, upsertDaemonConnection, arg.AgentID, arg.DaemonID, arg.RuntimeInfo) - var i DaemonConnection - err := row.Scan( - &i.ID, - &i.AgentID, - &i.DaemonID, - &i.Status, - &i.LastHeartbeatAt, - &i.RuntimeInfo, - &i.CreatedAt, - &i.UpdatedAt, + &i.RuntimeID, ) return i, err } diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index 654d14f3..7ee875b1 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -36,6 +36,22 @@ type Agent struct { Skills string `json:"skills"` Tools []byte `json:"tools"` Triggers []byte `json:"triggers"` + RuntimeID pgtype.UUID `json:"runtime_id"` +} + +type AgentRuntime struct { + ID pgtype.UUID `json:"id"` + WorkspaceID pgtype.UUID `json:"workspace_id"` + DaemonID pgtype.Text `json:"daemon_id"` + Name string `json:"name"` + RuntimeMode string `json:"runtime_mode"` + Provider string `json:"provider"` + Status string `json:"status"` + DeviceInfo string `json:"device_info"` + Metadata []byte `json:"metadata"` + LastSeenAt pgtype.Timestamptz `json:"last_seen_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` } type AgentTaskQueue struct { @@ -51,6 +67,7 @@ type AgentTaskQueue struct { Error pgtype.Text `json:"error"` CreatedAt pgtype.Timestamptz `json:"created_at"` Context []byte `json:"context"` + RuntimeID pgtype.UUID `json:"runtime_id"` } type Comment struct { diff --git a/server/pkg/db/generated/runtime.sql.go b/server/pkg/db/generated/runtime.sql.go new file mode 100644 index 00000000..230a8eae --- /dev/null +++ b/server/pkg/db/generated/runtime.sql.go @@ -0,0 +1,197 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: runtime.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const getAgentRuntime = `-- name: GetAgentRuntime :one +SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime +WHERE id = $1 +` + +func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) { + row := q.db.QueryRow(ctx, getAgentRuntime, id) + var i AgentRuntime + err := row.Scan( + &i.ID, + &i.WorkspaceID, + &i.DaemonID, + &i.Name, + &i.RuntimeMode, + &i.Provider, + &i.Status, + &i.DeviceInfo, + &i.Metadata, + &i.LastSeenAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one +SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime +WHERE id = $1 AND workspace_id = $2 +` + +type GetAgentRuntimeForWorkspaceParams struct { + ID pgtype.UUID `json:"id"` + WorkspaceID pgtype.UUID `json:"workspace_id"` +} + +func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentRuntimeForWorkspaceParams) (AgentRuntime, error) { + row := q.db.QueryRow(ctx, getAgentRuntimeForWorkspace, arg.ID, arg.WorkspaceID) + var i AgentRuntime + err := row.Scan( + &i.ID, + &i.WorkspaceID, + &i.DaemonID, + &i.Name, + &i.RuntimeMode, + &i.Provider, + &i.Status, + &i.DeviceInfo, + &i.Metadata, + &i.LastSeenAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const listAgentRuntimes = `-- name: ListAgentRuntimes :many +SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime +WHERE workspace_id = $1 +ORDER BY created_at ASC +` + +func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID) ([]AgentRuntime, error) { + rows, err := q.db.Query(ctx, listAgentRuntimes, workspaceID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentRuntime{} + for rows.Next() { + var i AgentRuntime + if err := rows.Scan( + &i.ID, + &i.WorkspaceID, + &i.DaemonID, + &i.Name, + &i.RuntimeMode, + &i.Provider, + &i.Status, + &i.DeviceInfo, + &i.Metadata, + &i.LastSeenAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateAgentRuntimeHeartbeat = `-- name: UpdateAgentRuntimeHeartbeat :one +UPDATE agent_runtime +SET status = 'online', last_seen_at = now(), updated_at = now() +WHERE id = $1 +RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at +` + +func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) { + row := q.db.QueryRow(ctx, updateAgentRuntimeHeartbeat, id) + var i AgentRuntime + err := row.Scan( + &i.ID, + &i.WorkspaceID, + &i.DaemonID, + &i.Name, + &i.RuntimeMode, + &i.Provider, + &i.Status, + &i.DeviceInfo, + &i.Metadata, + &i.LastSeenAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const upsertAgentRuntime = `-- name: UpsertAgentRuntime :one +INSERT INTO agent_runtime ( + workspace_id, + daemon_id, + name, + runtime_mode, + provider, + status, + device_info, + metadata, + last_seen_at +) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now()) +ON CONFLICT (workspace_id, daemon_id, provider) +DO UPDATE SET + name = EXCLUDED.name, + runtime_mode = EXCLUDED.runtime_mode, + status = EXCLUDED.status, + device_info = EXCLUDED.device_info, + metadata = EXCLUDED.metadata, + last_seen_at = now(), + updated_at = now() +RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at +` + +type UpsertAgentRuntimeParams struct { + WorkspaceID pgtype.UUID `json:"workspace_id"` + DaemonID pgtype.Text `json:"daemon_id"` + Name string `json:"name"` + RuntimeMode string `json:"runtime_mode"` + Provider string `json:"provider"` + Status string `json:"status"` + DeviceInfo string `json:"device_info"` + Metadata []byte `json:"metadata"` +} + +func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntimeParams) (AgentRuntime, error) { + row := q.db.QueryRow(ctx, upsertAgentRuntime, + arg.WorkspaceID, + arg.DaemonID, + arg.Name, + arg.RuntimeMode, + arg.Provider, + arg.Status, + arg.DeviceInfo, + arg.Metadata, + ) + var i AgentRuntime + err := row.Scan( + &i.ID, + &i.WorkspaceID, + &i.DaemonID, + &i.Name, + &i.RuntimeMode, + &i.Provider, + &i.Status, + &i.DeviceInfo, + &i.Metadata, + &i.LastSeenAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index cbceaf45..c69b9cbf 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -10,9 +10,9 @@ WHERE id = $1; -- name: CreateAgent :one INSERT INTO agent ( workspace_id, name, description, avatar_url, runtime_mode, - runtime_config, visibility, max_concurrent_tasks, owner_id, + runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id, skills, tools, triggers -) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) +) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *; -- name: UpdateAgent :one @@ -21,6 +21,8 @@ UPDATE agent SET description = COALESCE(sqlc.narg('description'), description), avatar_url = COALESCE(sqlc.narg('avatar_url'), avatar_url), runtime_config = COALESCE(sqlc.narg('runtime_config'), runtime_config), + runtime_mode = COALESCE(sqlc.narg('runtime_mode'), runtime_mode), + runtime_id = COALESCE(sqlc.narg('runtime_id'), runtime_id), visibility = COALESCE(sqlc.narg('visibility'), visibility), status = COALESCE(sqlc.narg('status'), status), max_concurrent_tasks = COALESCE(sqlc.narg('max_concurrent_tasks'), max_concurrent_tasks), @@ -40,8 +42,8 @@ WHERE agent_id = $1 ORDER BY created_at DESC; -- name: CreateAgentTask :one -INSERT INTO agent_task_queue (agent_id, issue_id, status, priority) -VALUES ($1, $2, 'queued', $3) +INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority) +VALUES ($1, $2, $3, 'queued', $4) RETURNING *; -- name: CancelAgentTasksByIssue :exec @@ -54,8 +56,8 @@ SELECT * FROM agent_task_queue WHERE id = $1; -- name: CreateAgentTaskWithContext :one -INSERT INTO agent_task_queue (agent_id, issue_id, status, priority, context) -VALUES ($1, $2, 'queued', $3, $4) +INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context) +VALUES ($1, $2, $3, 'queued', $4, $5) RETURNING *; -- name: ClaimAgentTask :one @@ -92,29 +94,12 @@ RETURNING *; SELECT count(*) FROM agent_task_queue WHERE agent_id = $1 AND status IN ('dispatched', 'running'); --- name: ListPendingTasks :many +-- name: ListPendingTasksByRuntime :many SELECT * FROM agent_task_queue -WHERE agent_id = $1 AND status IN ('queued', 'dispatched') +WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC; -- name: UpdateAgentStatus :one UPDATE agent SET status = $2, updated_at = now() WHERE id = $1 RETURNING *; - --- name: UpsertDaemonConnection :one -INSERT INTO daemon_connection (agent_id, daemon_id, status, last_heartbeat_at, runtime_info) -VALUES ($1, $2, 'connected', now(), $3) -ON CONFLICT ON CONSTRAINT uq_daemon_agent -DO UPDATE SET status = 'connected', last_heartbeat_at = now(), runtime_info = $3, updated_at = now() -RETURNING *; - --- name: UpdateDaemonHeartbeat :exec -UPDATE daemon_connection -SET last_heartbeat_at = now(), updated_at = now() -WHERE daemon_id = $1 AND agent_id = $2; - --- name: DisconnectDaemon :exec -UPDATE daemon_connection -SET status = 'disconnected', updated_at = now() -WHERE daemon_id = $1; diff --git a/server/pkg/db/queries/runtime.sql b/server/pkg/db/queries/runtime.sql new file mode 100644 index 00000000..ff8615bb --- /dev/null +++ b/server/pkg/db/queries/runtime.sql @@ -0,0 +1,41 @@ +-- name: ListAgentRuntimes :many +SELECT * FROM agent_runtime +WHERE workspace_id = $1 +ORDER BY created_at ASC; + +-- name: GetAgentRuntime :one +SELECT * FROM agent_runtime +WHERE id = $1; + +-- name: GetAgentRuntimeForWorkspace :one +SELECT * FROM agent_runtime +WHERE id = $1 AND workspace_id = $2; + +-- name: UpsertAgentRuntime :one +INSERT INTO agent_runtime ( + workspace_id, + daemon_id, + name, + runtime_mode, + provider, + status, + device_info, + metadata, + last_seen_at +) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now()) +ON CONFLICT (workspace_id, daemon_id, provider) +DO UPDATE SET + name = EXCLUDED.name, + runtime_mode = EXCLUDED.runtime_mode, + status = EXCLUDED.status, + device_info = EXCLUDED.device_info, + metadata = EXCLUDED.metadata, + last_seen_at = now(), + updated_at = now() +RETURNING *; + +-- name: UpdateAgentRuntimeHeartbeat :one +UPDATE agent_runtime +SET status = 'online', last_seen_at = now(), updated_at = now() +WHERE id = $1 +RETURNING *; From 20f50226644aeac9ed97532bf05e78200a87e801 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Tue, 24 Mar 2026 12:07:20 +0800 Subject: [PATCH 3/3] fix(web): wrap search params pages in suspense --- apps/web/app/(auth)/login/page.tsx | 12 ++++++++++-- apps/web/app/pair/local/page.tsx | 12 ++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/apps/web/app/(auth)/login/page.tsx b/apps/web/app/(auth)/login/page.tsx index 222891e8..3d76dd87 100644 --- a/apps/web/app/(auth)/login/page.tsx +++ b/apps/web/app/(auth)/login/page.tsx @@ -1,10 +1,10 @@ "use client"; -import { useState } from "react"; +import { Suspense, useState } from "react"; import { useSearchParams } from "next/navigation"; import { useAuth } from "../../../lib/auth-context"; -export default function LoginPage() { +function LoginPageContent() { const { login, isLoading } = useAuth(); const searchParams = useSearchParams(); const [email, setEmail] = useState(""); @@ -65,3 +65,11 @@ export default function LoginPage() { ); } + +export default function LoginPage() { + return ( + + + + ); +} diff --git a/apps/web/app/pair/local/page.tsx b/apps/web/app/pair/local/page.tsx index 79234b5b..023496d9 100644 --- a/apps/web/app/pair/local/page.tsx +++ b/apps/web/app/pair/local/page.tsx @@ -1,7 +1,7 @@ "use client"; import Link from "next/link"; -import { useEffect, useMemo, useState } from "react"; +import { Suspense, useEffect, useMemo, useState } from "react"; import { useSearchParams } from "next/navigation"; import type { DaemonPairingSession } from "@multica/types"; import { api } from "../../../lib/api"; @@ -16,7 +16,7 @@ function formatExpiresAt(value: string) { }); } -export default function LocalDaemonPairPage() { +function LocalDaemonPairPageContent() { const searchParams = useSearchParams(); const token = searchParams.get("token") ?? ""; const { user, workspaces, workspace, isLoading } = useAuth(); @@ -153,3 +153,11 @@ export default function LocalDaemonPairPage() { ); } + +export default function LocalDaemonPairPage() { + return ( + + + + ); +}