package handler import ( "encoding/json" "fmt" "log/slog" "net/http" "strconv" "strings" "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" "github.com/multica-ai/multica/server/pkg/redact" ) // --------------------------------------------------------------------------- // Daemon Registration & Heartbeat // --------------------------------------------------------------------------- type DaemonRegisterRequest struct { WorkspaceID string `json:"workspace_id"` DaemonID string `json:"daemon_id"` DeviceName string `json:"device_name"` CLIVersion string `json:"cli_version"` // multica CLI version Runtimes []struct { Name string `json:"name"` Type string `json:"type"` Version string `json:"version"` // agent CLI version (claude/codex) Status string `json:"status"` } `json:"runtimes"` } func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) { var req DaemonRegisterRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } req.WorkspaceID = strings.TrimSpace(req.WorkspaceID) req.DaemonID = strings.TrimSpace(req.DaemonID) req.DeviceName = strings.TrimSpace(req.DeviceName) if req.DaemonID == "" { writeError(w, http.StatusBadRequest, "daemon_id is required") return } if req.WorkspaceID == "" { writeError(w, http.StatusBadRequest, "workspace_id is required") return } if len(req.Runtimes) == 0 { writeError(w, http.StatusBadRequest, "at least one runtime is required") return } // Verify the caller is a member of the target workspace. if _, ok := h.requireWorkspaceMember(w, r, req.WorkspaceID, "workspace not found"); !ok { return } ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(req.WorkspaceID)) if err != nil { writeError(w, http.StatusNotFound, "workspace not found") return } resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes)) for _, runtime := range req.Runtimes { provider := strings.TrimSpace(runtime.Type) if provider == "" { provider = "unknown" } name := strings.TrimSpace(runtime.Name) if name == "" { name = provider if req.DeviceName != "" { name = fmt.Sprintf("%s (%s)", provider, req.DeviceName) } } deviceInfo := strings.TrimSpace(req.DeviceName) if runtime.Version != "" && deviceInfo != "" { deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version) } else if runtime.Version != "" { deviceInfo = runtime.Version } status := "online" if runtime.Status == "offline" { status = "offline" } metadata, _ := json.Marshal(map[string]any{ "version": runtime.Version, "cli_version": req.CLIVersion, }) registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{ WorkspaceID: parseUUID(req.WorkspaceID), DaemonID: strToText(req.DaemonID), Name: name, RuntimeMode: "local", Provider: provider, Status: status, DeviceInfo: deviceInfo, Metadata: metadata, }) if err != nil { writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error()) return } resp = append(resp, runtimeToResponse(registered)) } slog.Info("daemon registered", "workspace_id", req.WorkspaceID, "daemon_id", req.DaemonID, "runtimes_count", len(resp)) h.publish(protocol.EventDaemonRegister, req.WorkspaceID, "system", "", map[string]any{ "runtimes": resp, }) // Include workspace repos so the daemon can cache them locally. var repos []RepoData if ws.Repos != nil { json.Unmarshal(ws.Repos, &repos) } if repos == nil { repos = []RepoData{} } writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp, "repos": repos}) } // DaemonDeregister marks runtimes as offline when the daemon shuts down. func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) { var req struct { RuntimeIDs []string `json:"runtime_ids"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } if len(req.RuntimeIDs) == 0 { writeError(w, http.StatusBadRequest, "runtime_ids is required") return } // Track affected workspaces for WS notifications. affectedWorkspaces := make(map[string]bool) for _, rid := range req.RuntimeIDs { // Look up the runtime to find its workspace. rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(rid)) if err != nil { slog.Warn("deregister: runtime not found", "runtime_id", rid, "error", err) continue } if err := h.Queries.SetAgentRuntimeOffline(r.Context(), parseUUID(rid)); err != nil { slog.Warn("deregister: failed to set offline", "runtime_id", rid, "error", err) continue } affectedWorkspaces[uuidToString(rt.WorkspaceID)] = true } // Notify frontend clients so they re-fetch runtime list. for wsID := range affectedWorkspaces { h.publish(protocol.EventDaemonRegister, wsID, "system", "", map[string]any{ "action": "deregister", }) } slog.Info("daemon deregistered", "runtime_ids", req.RuntimeIDs) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } type DaemonHeartbeatRequest struct { RuntimeID string `json:"runtime_id"` } func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) { var req DaemonHeartbeatRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } if req.RuntimeID == "" { writeError(w, http.StatusBadRequest, "runtime_id is required") return } _, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "heartbeat failed") return } slog.Debug("daemon heartbeat", "runtime_id", req.RuntimeID) resp := map[string]any{"status": "ok"} // Check for pending ping requests for this runtime. if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil { resp["pending_ping"] = map[string]string{"id": pending.ID} } // Check for pending update requests for this runtime. if pending := h.UpdateStore.PopPending(req.RuntimeID); pending != nil { resp["pending_update"] = map[string]string{ "id": pending.ID, "target_version": pending.TargetVersion, } } writeJSON(w, http.StatusOK, resp) } // ClaimTaskByRuntime atomically claims the next queued task for a runtime. // The response includes the agent's name and skills, fetched fresh from the DB. func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error()) return } if task == nil { slog.Debug("no task to claim", "runtime_id", runtimeID) writeJSON(w, http.StatusOK, map[string]any{"task": nil}) return } // Build response with fresh agent data (name + skills). resp := taskToResponse(*task) if agent, err := h.Queries.GetAgent(r.Context(), task.AgentID); err == nil { skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID) resp.Agent = &TaskAgentData{ ID: uuidToString(agent.ID), Name: agent.Name, Instructions: agent.Instructions, Skills: skills, } } // Include workspace ID and repos so the daemon can set up worktrees. if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { resp.WorkspaceID = uuidToString(issue.WorkspaceID) if ws, err := h.Queries.GetWorkspace(r.Context(), issue.WorkspaceID); err == nil && ws.Repos != nil { var repos []RepoData if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 { resp.Repos = repos } } } // Look up the prior session for this (agent, issue) pair so the daemon // can resume the Claude Code conversation context. if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{ AgentID: task.AgentID, IssueID: task.IssueID, }); err == nil && prior.SessionID.Valid { resp.PriorSessionID = prior.SessionID.String if prior.WorkDir.Valid { resp.PriorWorkDir = prior.WorkDir.String } } slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID) writeJSON(w, http.StatusOK, map[string]any{"task": resp}) } // ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime. func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) { runtimeID := chi.URLParam(r, "runtimeId") tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to list pending tasks") return } resp := make([]AgentTaskResponse, len(tasks)) for i, t := range tasks { resp[i] = taskToResponse(t) } writeJSON(w, http.StatusOK, resp) } // --------------------------------------------------------------------------- // Task Lifecycle (called by daemon) // --------------------------------------------------------------------------- // StartTask marks a dispatched task as running. func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") task, err := h.TaskService.StartTask(r.Context(), parseUUID(taskID)) if err != nil { slog.Warn("start task failed", "task_id", taskID, "error", err) writeError(w, http.StatusBadRequest, err.Error()) return } slog.Info("task started", "task_id", taskID, "agent_id", uuidToString(task.AgentID)) writeJSON(w, http.StatusOK, taskToResponse(*task)) } // ReportTaskProgress broadcasts a progress update. type TaskProgressRequest struct { Summary string `json:"summary"` Step int `json:"step"` Total int `json:"total"` } func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskProgressRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } // Look up task to get workspace ID via the associated issue. workspaceID := "" task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) if err == nil { if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { workspaceID = uuidToString(issue.WorkspaceID) } } h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } // CompleteTask marks a running task as completed. type TaskCompleteRequest struct { PRURL string `json:"pr_url"` Output string `json:"output"` SessionID string `json:"session_id"` // Claude session ID for future resumption WorkDir string `json:"work_dir"` // working directory used during execution Usage []TaskUsagePayload `json:"usage,omitempty"` } // TaskUsagePayload is the per-model token usage reported by the daemon. type TaskUsagePayload struct { Provider string `json:"provider"` Model string `json:"model"` InputTokens int64 `json:"input_tokens"` OutputTokens int64 `json:"output_tokens"` CacheReadTokens int64 `json:"cache_read_tokens"` CacheWriteTokens int64 `json:"cache_write_tokens"` } func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskCompleteRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } result, _ := json.Marshal(req) task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir) if err != nil { slog.Warn("complete task failed", "task_id", taskID, "error", err) writeError(w, http.StatusBadRequest, err.Error()) return } // Store per-task token usage (best-effort, don't fail the request). for _, u := range req.Usage { if err := h.Queries.UpsertTaskUsage(r.Context(), db.UpsertTaskUsageParams{ TaskID: parseUUID(taskID), Provider: u.Provider, Model: u.Model, InputTokens: u.InputTokens, OutputTokens: u.OutputTokens, CacheReadTokens: u.CacheReadTokens, CacheWriteTokens: u.CacheWriteTokens, }); err != nil { slog.Warn("upsert task usage failed", "task_id", taskID, "model", u.Model, "error", err) } } slog.Info("task completed", "task_id", taskID, "agent_id", uuidToString(task.AgentID)) writeJSON(w, http.StatusOK, taskToResponse(*task)) } // GetTaskStatus returns the current status of a task. // Used by the daemon to check whether a task was cancelled mid-execution. func (h *Handler) GetTaskStatus(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) if err != nil { writeError(w, http.StatusNotFound, "task not found") return } writeJSON(w, http.StatusOK, map[string]string{"status": task.Status}) } // FailTask marks a running task as failed. type TaskFailRequest struct { Error string `json:"error"` } func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskFailRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } task, err := h.TaskService.FailTask(r.Context(), parseUUID(taskID), req.Error) if err != nil { slog.Warn("fail task failed", "task_id", taskID, "error", err) writeError(w, http.StatusBadRequest, err.Error()) return } slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error) writeJSON(w, http.StatusOK, taskToResponse(*task)) } // --------------------------------------------------------------------------- // Task Messages (live agent output) // --------------------------------------------------------------------------- type TaskMessageRequest struct { Seq int `json:"seq"` Type string `json:"type"` Tool string `json:"tool,omitempty"` Content string `json:"content,omitempty"` Input map[string]any `json:"input,omitempty"` Output string `json:"output,omitempty"` } type TaskMessageBatchRequest struct { Messages []TaskMessageRequest `json:"messages"` } // ReportTaskMessages receives a batch of agent execution messages from the daemon. func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") var req TaskMessageBatchRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } if len(req.Messages) == 0 { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) if err != nil { writeError(w, http.StatusNotFound, "task not found") return } workspaceID := "" if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { workspaceID = uuidToString(issue.WorkspaceID) } for _, msg := range req.Messages { // Redact sensitive information before persisting or broadcasting. msg.Content = redact.Text(msg.Content) msg.Output = redact.Text(msg.Output) msg.Input = redact.InputMap(msg.Input) var inputJSON []byte if msg.Input != nil { inputJSON, _ = json.Marshal(msg.Input) } h.Queries.CreateTaskMessage(r.Context(), db.CreateTaskMessageParams{ TaskID: parseUUID(taskID), Seq: int32(msg.Seq), Type: msg.Type, Tool: pgtype.Text{String: msg.Tool, Valid: msg.Tool != ""}, Content: pgtype.Text{String: msg.Content, Valid: msg.Content != ""}, Input: inputJSON, Output: pgtype.Text{String: msg.Output, Valid: msg.Output != ""}, }) if workspaceID != "" { h.publish(protocol.EventTaskMessage, workspaceID, "system", "", protocol.TaskMessagePayload{ TaskID: taskID, IssueID: uuidToString(task.IssueID), Seq: msg.Seq, Type: msg.Type, Tool: msg.Tool, Content: msg.Content, Input: msg.Input, Output: msg.Output, }) } } writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } // ListTaskMessages returns the persisted messages for a task (for catch-up after reconnect). func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) if err != nil { writeError(w, http.StatusNotFound, "task not found") return } var messages []db.TaskMessage if sinceStr := r.URL.Query().Get("since"); sinceStr != "" { sinceSeq, parseErr := strconv.Atoi(sinceStr) if parseErr != nil { writeError(w, http.StatusBadRequest, "invalid since parameter") return } messages, err = h.Queries.ListTaskMessagesSince(r.Context(), db.ListTaskMessagesSinceParams{ TaskID: parseUUID(taskID), Seq: int32(sinceSeq), }) } else { messages, err = h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID)) } if err != nil { writeError(w, http.StatusInternalServerError, "failed to list task messages") return } issueID := uuidToString(task.IssueID) resp := make([]protocol.TaskMessagePayload, len(messages)) for i, m := range messages { var input map[string]any if m.Input != nil { json.Unmarshal(m.Input, &input) } resp[i] = protocol.TaskMessagePayload{ TaskID: taskID, IssueID: issueID, Seq: int(m.Seq), Type: m.Type, Tool: m.Tool.String, Content: m.Content.String, Input: input, Output: m.Output.String, } } writeJSON(w, http.StatusOK, resp) } // GetActiveTaskForIssue returns the currently running task for an issue, if any. func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) { issueID := chi.URLParam(r, "id") tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID)) if err != nil || len(tasks) == 0 { writeJSON(w, http.StatusOK, map[string]any{"task": nil}) return } writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])}) } // CancelTask cancels a running or queued task by ID. func (h *Handler) CancelTask(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") task, err := h.TaskService.CancelTask(r.Context(), parseUUID(taskID)) if err != nil { slog.Warn("cancel task failed", "task_id", taskID, "error", err) writeError(w, http.StatusBadRequest, err.Error()) return } slog.Info("task cancelled by user", "task_id", taskID, "issue_id", uuidToString(task.IssueID)) writeJSON(w, http.StatusOK, taskToResponse(*task)) } // ListTasksByIssue returns all tasks (any status) for an issue — used for execution history. func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) { issueID := chi.URLParam(r, "id") tasks, err := h.Queries.ListTasksByIssue(r.Context(), parseUUID(issueID)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to list tasks") return } resp := make([]AgentTaskResponse, len(tasks)) for i, t := range tasks { resp[i] = taskToResponse(t) } writeJSON(w, http.StatusOK, resp) }