multica/server/internal/handler/daemon.go
Jiayuan cdc1ac708e feat(daemon): agent-driven repo checkout with bare clone cache
Agents now decide which repo to use based on issue context and check out
repos on demand via `multica repo checkout <url>`. Workspace repos are
cached locally as bare clones for fast worktree creation.

Key changes:
- Add repocache package for bare clone management (clone, fetch, worktree)
- Add `multica repo checkout` CLI command that talks to local daemon
- Add POST /repo/checkout endpoint on daemon health server
- Pass workspace repos metadata through register + task claim responses
- Remove pre-created worktrees from execenv (workdir starts empty)
- Update CLAUDE.md template to instruct agents to use `multica repo checkout`
- Pass MULTICA_DAEMON_PORT, WORKSPACE_ID, AGENT_NAME, TASK_ID env vars to agent
2026-03-29 19:37:48 +08:00

383 lines
12 KiB
Go

package handler
import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// ---------------------------------------------------------------------------
// Daemon Registration & Heartbeat
// ---------------------------------------------------------------------------
type DaemonRegisterRequest struct {
WorkspaceID string `json:"workspace_id"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
Runtimes []struct {
Name string `json:"name"`
Type string `json:"type"`
Version string `json:"version"`
Status string `json:"status"`
} `json:"runtimes"`
}
func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
var req DaemonRegisterRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
req.WorkspaceID = strings.TrimSpace(req.WorkspaceID)
req.DaemonID = strings.TrimSpace(req.DaemonID)
req.DeviceName = strings.TrimSpace(req.DeviceName)
if req.DaemonID == "" {
writeError(w, http.StatusBadRequest, "daemon_id is required")
return
}
if req.WorkspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
if len(req.Runtimes) == 0 {
writeError(w, http.StatusBadRequest, "at least one runtime is required")
return
}
ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(req.WorkspaceID))
if err != nil {
writeError(w, http.StatusNotFound, "workspace not found")
return
}
resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes))
for _, runtime := range req.Runtimes {
provider := strings.TrimSpace(runtime.Type)
if provider == "" {
provider = "unknown"
}
name := strings.TrimSpace(runtime.Name)
if name == "" {
name = provider
if req.DeviceName != "" {
name = fmt.Sprintf("%s (%s)", provider, req.DeviceName)
}
}
deviceInfo := strings.TrimSpace(req.DeviceName)
if runtime.Version != "" && deviceInfo != "" {
deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version)
} else if runtime.Version != "" {
deviceInfo = runtime.Version
}
status := "online"
if runtime.Status == "offline" {
status = "offline"
}
metadata, _ := json.Marshal(map[string]any{
"version": runtime.Version,
})
registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{
WorkspaceID: parseUUID(req.WorkspaceID),
DaemonID: strToText(req.DaemonID),
Name: name,
RuntimeMode: "local",
Provider: provider,
Status: status,
DeviceInfo: deviceInfo,
Metadata: metadata,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error())
return
}
resp = append(resp, runtimeToResponse(registered))
}
slog.Info("daemon registered", "workspace_id", req.WorkspaceID, "daemon_id", req.DaemonID, "runtimes_count", len(resp))
h.publish(protocol.EventDaemonRegister, req.WorkspaceID, "system", "", map[string]any{
"runtimes": resp,
})
// Include workspace repos so the daemon can cache them locally.
var repos []RepoData
if ws.Repos != nil {
json.Unmarshal(ws.Repos, &repos)
}
if repos == nil {
repos = []RepoData{}
}
writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp, "repos": repos})
}
// DaemonDeregister marks runtimes as offline when the daemon shuts down.
func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) {
var req struct {
RuntimeIDs []string `json:"runtime_ids"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.RuntimeIDs) == 0 {
writeError(w, http.StatusBadRequest, "runtime_ids is required")
return
}
// Track affected workspaces for WS notifications.
affectedWorkspaces := make(map[string]bool)
for _, rid := range req.RuntimeIDs {
// Look up the runtime to find its workspace.
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(rid))
if err != nil {
slog.Warn("deregister: runtime not found", "runtime_id", rid, "error", err)
continue
}
if err := h.Queries.SetAgentRuntimeOffline(r.Context(), parseUUID(rid)); err != nil {
slog.Warn("deregister: failed to set offline", "runtime_id", rid, "error", err)
continue
}
affectedWorkspaces[uuidToString(rt.WorkspaceID)] = true
}
// Notify frontend clients so they re-fetch runtime list.
for wsID := range affectedWorkspaces {
h.publish(protocol.EventDaemonRegister, wsID, "system", "", map[string]any{
"action": "deregister",
})
}
slog.Info("daemon deregistered", "runtime_ids", req.RuntimeIDs)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
type DaemonHeartbeatRequest struct {
RuntimeID string `json:"runtime_id"`
}
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
var req DaemonHeartbeatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.RuntimeID == "" {
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
_, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "heartbeat failed")
return
}
slog.Debug("daemon heartbeat", "runtime_id", req.RuntimeID)
resp := map[string]any{"status": "ok"}
// Check for pending ping requests for this runtime.
if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil {
resp["pending_ping"] = map[string]string{"id": pending.ID}
}
writeJSON(w, http.StatusOK, resp)
}
// ClaimTaskByRuntime atomically claims the next queued task for a runtime.
// The response includes the agent's name and skills, fetched fresh from the DB.
func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error())
return
}
if task == nil {
slog.Debug("no task to claim", "runtime_id", runtimeID)
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
return
}
// Build response with fresh agent data (name + skills).
resp := taskToResponse(*task)
if agent, err := h.Queries.GetAgent(r.Context(), task.AgentID); err == nil {
skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID)
resp.Agent = &TaskAgentData{
ID: uuidToString(agent.ID),
Name: agent.Name,
Instructions: agent.Instructions,
Skills: skills,
}
}
// Include workspace repos so the daemon can set up worktrees.
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
if ws, err := h.Queries.GetWorkspace(r.Context(), issue.WorkspaceID); err == nil && ws.Repos != nil {
var repos []RepoData
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
resp.Repos = repos
}
}
}
// Look up the prior session for this (agent, issue) pair so the daemon
// can resume the Claude Code conversation context.
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
}
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
writeJSON(w, http.StatusOK, map[string]any{"task": resp})
}
// ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime.
func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list pending tasks")
return
}
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, resp)
}
// ---------------------------------------------------------------------------
// Task Lifecycle (called by daemon)
// ---------------------------------------------------------------------------
// StartTask marks a dispatched task as running.
func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.TaskService.StartTask(r.Context(), parseUUID(taskID))
if err != nil {
slog.Warn("start task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task started", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
writeJSON(w, http.StatusOK, taskToResponse(*task))
}
// ReportTaskProgress broadcasts a progress update.
type TaskProgressRequest struct {
Summary string `json:"summary"`
Step int `json:"step"`
Total int `json:"total"`
}
func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskProgressRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
// Look up task to get workspace ID via the associated issue.
workspaceID := ""
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err == nil {
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
}
h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// CompleteTask marks a running task as completed.
type TaskCompleteRequest struct {
PRURL string `json:"pr_url"`
Output string `json:"output"`
SessionID string `json:"session_id"` // Claude session ID for future resumption
WorkDir string `json:"work_dir"` // working directory used during execution
}
func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskCompleteRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
result, _ := json.Marshal(req)
task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir)
if err != nil {
slog.Warn("complete task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task completed", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
writeJSON(w, http.StatusOK, taskToResponse(*task))
}
// GetTaskStatus returns the current status of a task.
// Used by the daemon to check whether a task was cancelled mid-execution.
func (h *Handler) GetTaskStatus(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": task.Status})
}
// FailTask marks a running task as failed.
type TaskFailRequest struct {
Error string `json:"error"`
}
func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskFailRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
task, err := h.TaskService.FailTask(r.Context(), parseUUID(taskID), req.Error)
if err != nil {
slog.Warn("fail task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error)
writeJSON(w, http.StatusOK, taskToResponse(*task))
}