multica/server/internal/handler/daemon.go
Jiayuan ce2b263ea5 feat(daemon): reuse workdir across tasks on same agent+issue pair
Previously each task created a fresh workdir via execenv.Prepare(), even
when resuming work on the same (agent, issue). This caused the agent's
session context to be out of sync with a blank code state.

Now the server returns prior_work_dir in the claim response, and the
daemon tries execenv.Reuse() first — which wraps the existing directory,
detects git worktree state, and refreshes context files. Falls back to
Prepare() if the prior workdir no longer exists. Workdirs are no longer
cleaned up after task completion so they remain available for reuse.
2026-03-29 18:40:29 +08:00

366 lines
12 KiB
Go

package handler
import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// ---------------------------------------------------------------------------
// Daemon Registration & Heartbeat
// ---------------------------------------------------------------------------
type DaemonRegisterRequest struct {
WorkspaceID string `json:"workspace_id"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
Runtimes []struct {
Name string `json:"name"`
Type string `json:"type"`
Version string `json:"version"`
Status string `json:"status"`
} `json:"runtimes"`
}
func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
var req DaemonRegisterRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
req.WorkspaceID = strings.TrimSpace(req.WorkspaceID)
req.DaemonID = strings.TrimSpace(req.DaemonID)
req.DeviceName = strings.TrimSpace(req.DeviceName)
if req.DaemonID == "" {
writeError(w, http.StatusBadRequest, "daemon_id is required")
return
}
if req.WorkspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
if len(req.Runtimes) == 0 {
writeError(w, http.StatusBadRequest, "at least one runtime is required")
return
}
if _, err := h.Queries.GetWorkspace(r.Context(), parseUUID(req.WorkspaceID)); err != nil {
writeError(w, http.StatusNotFound, "workspace not found")
return
}
resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes))
for _, runtime := range req.Runtimes {
provider := strings.TrimSpace(runtime.Type)
if provider == "" {
provider = "unknown"
}
name := strings.TrimSpace(runtime.Name)
if name == "" {
name = provider
if req.DeviceName != "" {
name = fmt.Sprintf("%s (%s)", provider, req.DeviceName)
}
}
deviceInfo := strings.TrimSpace(req.DeviceName)
if runtime.Version != "" && deviceInfo != "" {
deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version)
} else if runtime.Version != "" {
deviceInfo = runtime.Version
}
status := "online"
if runtime.Status == "offline" {
status = "offline"
}
metadata, _ := json.Marshal(map[string]any{
"version": runtime.Version,
})
registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{
WorkspaceID: parseUUID(req.WorkspaceID),
DaemonID: strToText(req.DaemonID),
Name: name,
RuntimeMode: "local",
Provider: provider,
Status: status,
DeviceInfo: deviceInfo,
Metadata: metadata,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error())
return
}
resp = append(resp, runtimeToResponse(registered))
}
slog.Info("daemon registered", "workspace_id", req.WorkspaceID, "daemon_id", req.DaemonID, "runtimes_count", len(resp))
h.publish(protocol.EventDaemonRegister, req.WorkspaceID, "system", "", map[string]any{
"runtimes": resp,
})
writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp})
}
// DaemonDeregister marks runtimes as offline when the daemon shuts down.
func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) {
var req struct {
RuntimeIDs []string `json:"runtime_ids"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if len(req.RuntimeIDs) == 0 {
writeError(w, http.StatusBadRequest, "runtime_ids is required")
return
}
// Track affected workspaces for WS notifications.
affectedWorkspaces := make(map[string]bool)
for _, rid := range req.RuntimeIDs {
// Look up the runtime to find its workspace.
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(rid))
if err != nil {
slog.Warn("deregister: runtime not found", "runtime_id", rid, "error", err)
continue
}
if err := h.Queries.SetAgentRuntimeOffline(r.Context(), parseUUID(rid)); err != nil {
slog.Warn("deregister: failed to set offline", "runtime_id", rid, "error", err)
continue
}
affectedWorkspaces[uuidToString(rt.WorkspaceID)] = true
}
// Notify frontend clients so they re-fetch runtime list.
for wsID := range affectedWorkspaces {
h.publish(protocol.EventDaemonRegister, wsID, "system", "", map[string]any{
"action": "deregister",
})
}
slog.Info("daemon deregistered", "runtime_ids", req.RuntimeIDs)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
type DaemonHeartbeatRequest struct {
RuntimeID string `json:"runtime_id"`
}
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
var req DaemonHeartbeatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.RuntimeID == "" {
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
_, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "heartbeat failed")
return
}
slog.Debug("daemon heartbeat", "runtime_id", req.RuntimeID)
resp := map[string]any{"status": "ok"}
// Check for pending ping requests for this runtime.
if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil {
resp["pending_ping"] = map[string]string{"id": pending.ID}
}
writeJSON(w, http.StatusOK, resp)
}
// ClaimTaskByRuntime atomically claims the next queued task for a runtime.
// The response includes the agent's name and skills, fetched fresh from the DB.
func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error())
return
}
if task == nil {
slog.Debug("no task to claim", "runtime_id", runtimeID)
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
return
}
// Build response with fresh agent data (name + skills).
resp := taskToResponse(*task)
if agent, err := h.Queries.GetAgent(r.Context(), task.AgentID); err == nil {
skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID)
resp.Agent = &TaskAgentData{
ID: uuidToString(agent.ID),
Name: agent.Name,
Instructions: agent.Instructions,
Skills: skills,
}
}
// Look up the prior session for this (agent, issue) pair so the daemon
// can resume the Claude Code conversation context.
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
if prior.WorkDir.Valid {
resp.PriorWorkDir = prior.WorkDir.String
}
}
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
writeJSON(w, http.StatusOK, map[string]any{"task": resp})
}
// ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime.
func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list pending tasks")
return
}
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, resp)
}
// ---------------------------------------------------------------------------
// Task Lifecycle (called by daemon)
// ---------------------------------------------------------------------------
// StartTask marks a dispatched task as running.
func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.TaskService.StartTask(r.Context(), parseUUID(taskID))
if err != nil {
slog.Warn("start task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task started", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
writeJSON(w, http.StatusOK, taskToResponse(*task))
}
// ReportTaskProgress broadcasts a progress update.
type TaskProgressRequest struct {
Summary string `json:"summary"`
Step int `json:"step"`
Total int `json:"total"`
}
func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskProgressRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
// Look up task to get workspace ID via the associated issue.
workspaceID := ""
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err == nil {
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
}
h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// CompleteTask marks a running task as completed.
type TaskCompleteRequest struct {
PRURL string `json:"pr_url"`
Output string `json:"output"`
SessionID string `json:"session_id"` // Claude session ID for future resumption
WorkDir string `json:"work_dir"` // working directory used during execution
}
func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskCompleteRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
result, _ := json.Marshal(req)
task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir)
if err != nil {
slog.Warn("complete task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task completed", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
writeJSON(w, http.StatusOK, taskToResponse(*task))
}
// GetTaskStatus returns the current status of a task.
// Used by the daemon to check whether a task was cancelled mid-execution.
func (h *Handler) GetTaskStatus(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": task.Status})
}
// FailTask marks a running task as failed.
type TaskFailRequest struct {
Error string `json:"error"`
}
func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req TaskFailRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
task, err := h.TaskService.FailTask(r.Context(), parseUUID(taskID), req.Error)
if err != nil {
slog.Warn("fail task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())
return
}
slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error)
writeJSON(w, http.StatusOK, taskToResponse(*task))
}