package handler import ( "encoding/json" "fmt" "net/http" "strings" "github.com/go-chi/chi/v5" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" ) // --------------------------------------------------------------------------- // Daemon Registration & Heartbeat // --------------------------------------------------------------------------- type DaemonRegisterRequest struct { WorkspaceID string `json:"workspace_id"` DaemonID string `json:"daemon_id"` DeviceName string `json:"device_name"` Runtimes []struct { Name string `json:"name"` Type string `json:"type"` Version string `json:"version"` Status string `json:"status"` } `json:"runtimes"` } func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) { var req DaemonRegisterRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } req.WorkspaceID = strings.TrimSpace(req.WorkspaceID) req.DaemonID = strings.TrimSpace(req.DaemonID) req.DeviceName = strings.TrimSpace(req.DeviceName) if req.DaemonID == "" { writeError(w, http.StatusBadRequest, "daemon_id is required") return } if req.WorkspaceID == "" { writeError(w, http.StatusBadRequest, "workspace_id is required") return } if len(req.Runtimes) == 0 { writeError(w, http.StatusBadRequest, "at least one runtime is required") return } if _, err := h.Queries.GetWorkspace(r.Context(), parseUUID(req.WorkspaceID)); err != nil { writeError(w, http.StatusNotFound, "workspace not found") return } resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes)) for _, runtime := range req.Runtimes { provider := strings.TrimSpace(runtime.Type) if provider == "" { provider = "unknown" } name := strings.TrimSpace(runtime.Name) if name == "" { name = provider if req.DeviceName != "" { name = fmt.Sprintf("%s (%s)", provider, req.DeviceName) } } deviceInfo := strings.TrimSpace(req.DeviceName) if runtime.Version != "" && deviceInfo != "" { deviceInfo = fmt.Sprintf("%s ยท %s", deviceInfo, runtime.Version) } else if runtime.Version != "" { deviceInfo = runtime.Version } status := "online" if runtime.Status == "offline" { status = "offline" } metadata, _ := json.Marshal(map[string]any{ "version": runtime.Version, }) registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{ WorkspaceID: parseUUID(req.WorkspaceID), DaemonID: strToText(req.DaemonID), Name: name, RuntimeMode: "local", Provider: provider, Status: status, DeviceInfo: deviceInfo, Metadata: metadata, }) if err != nil { writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error()) return } resp = append(resp, runtimeToResponse(registered)) } h.publish(protocol.EventDaemonRegister, req.WorkspaceID, "system", "", map[string]any{ "runtimes": resp, }) writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp}) } type DaemonHeartbeatRequest struct { RuntimeID string `json:"runtime_id"` } func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { var req DaemonHeartbeatRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } if req.RuntimeID == "" { writeError(w, http.StatusBadRequest, "runtime_id is required") return } _, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "heartbeat failed") return } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } // ClaimTaskByRuntime atomically claims the next queued task for a runtime. func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error()) return } if task == nil { writeJSON(w, http.StatusOK, map[string]any{"task": nil}) return } writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(*task)}) } // ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime. func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to list pending tasks") return } resp := make([]AgentTaskResponse, len(tasks)) for i, t := range tasks { resp[i] = taskToResponse(t) } writeJSON(w, http.StatusOK, resp) } // --------------------------------------------------------------------------- // Task Lifecycle (called by daemon) // --------------------------------------------------------------------------- // StartTask marks a dispatched task as running. func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") task, err := h.TaskService.StartTask(r.Context(), parseUUID(taskID)) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } writeJSON(w, http.StatusOK, taskToResponse(*task)) } // ReportTaskProgress broadcasts a progress update. type TaskProgressRequest struct { Summary string `json:"summary"` Step int `json:"step"` Total int `json:"total"` } func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskProgressRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } // Look up task to get workspace ID via the associated issue. workspaceID := "" task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) if err == nil { if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { workspaceID = uuidToString(issue.WorkspaceID) } } h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } // CompleteTask marks a running task as completed. type TaskCompleteRequest struct { PRURL string `json:"pr_url"` Output string `json:"output"` } func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskCompleteRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } result, _ := json.Marshal(req) task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } writeJSON(w, http.StatusOK, taskToResponse(*task)) } // FailTask marks a running task as failed. type TaskFailRequest struct { Error string `json:"error"` } func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskFailRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } task, err := h.TaskService.FailTask(r.Context(), parseUUID(taskID), req.Error) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } writeJSON(w, http.StatusOK, taskToResponse(*task)) }