Main added execenv.Reuse() for workdir reuse across tasks on the same issue. Our branch removed Type/BranchName/gitRoot from Environment (repos are now checked out on demand). Resolution: keep Reuse() but simplify it to work with the new Environment struct (no workspace type tracking). Keep the "reused" log field from main, drop removed fields.
386 lines
12 KiB
Go
386 lines
12 KiB
Go
package handler
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
"github.com/multica-ai/multica/server/pkg/protocol"
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Daemon Registration & Heartbeat
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type DaemonRegisterRequest struct {
|
|
WorkspaceID string `json:"workspace_id"`
|
|
DaemonID string `json:"daemon_id"`
|
|
DeviceName string `json:"device_name"`
|
|
Runtimes []struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
Version string `json:"version"`
|
|
Status string `json:"status"`
|
|
} `json:"runtimes"`
|
|
}
|
|
|
|
func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
|
|
var req DaemonRegisterRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
req.WorkspaceID = strings.TrimSpace(req.WorkspaceID)
|
|
req.DaemonID = strings.TrimSpace(req.DaemonID)
|
|
req.DeviceName = strings.TrimSpace(req.DeviceName)
|
|
|
|
if req.DaemonID == "" {
|
|
writeError(w, http.StatusBadRequest, "daemon_id is required")
|
|
return
|
|
}
|
|
if req.WorkspaceID == "" {
|
|
writeError(w, http.StatusBadRequest, "workspace_id is required")
|
|
return
|
|
}
|
|
if len(req.Runtimes) == 0 {
|
|
writeError(w, http.StatusBadRequest, "at least one runtime is required")
|
|
return
|
|
}
|
|
ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(req.WorkspaceID))
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "workspace not found")
|
|
return
|
|
}
|
|
|
|
resp := make([]AgentRuntimeResponse, 0, len(req.Runtimes))
|
|
for _, runtime := range req.Runtimes {
|
|
provider := strings.TrimSpace(runtime.Type)
|
|
if provider == "" {
|
|
provider = "unknown"
|
|
}
|
|
name := strings.TrimSpace(runtime.Name)
|
|
if name == "" {
|
|
name = provider
|
|
if req.DeviceName != "" {
|
|
name = fmt.Sprintf("%s (%s)", provider, req.DeviceName)
|
|
}
|
|
}
|
|
deviceInfo := strings.TrimSpace(req.DeviceName)
|
|
if runtime.Version != "" && deviceInfo != "" {
|
|
deviceInfo = fmt.Sprintf("%s · %s", deviceInfo, runtime.Version)
|
|
} else if runtime.Version != "" {
|
|
deviceInfo = runtime.Version
|
|
}
|
|
status := "online"
|
|
if runtime.Status == "offline" {
|
|
status = "offline"
|
|
}
|
|
metadata, _ := json.Marshal(map[string]any{
|
|
"version": runtime.Version,
|
|
})
|
|
|
|
registered, err := h.Queries.UpsertAgentRuntime(r.Context(), db.UpsertAgentRuntimeParams{
|
|
WorkspaceID: parseUUID(req.WorkspaceID),
|
|
DaemonID: strToText(req.DaemonID),
|
|
Name: name,
|
|
RuntimeMode: "local",
|
|
Provider: provider,
|
|
Status: status,
|
|
DeviceInfo: deviceInfo,
|
|
Metadata: metadata,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to register runtime: "+err.Error())
|
|
return
|
|
}
|
|
resp = append(resp, runtimeToResponse(registered))
|
|
}
|
|
|
|
slog.Info("daemon registered", "workspace_id", req.WorkspaceID, "daemon_id", req.DaemonID, "runtimes_count", len(resp))
|
|
|
|
h.publish(protocol.EventDaemonRegister, req.WorkspaceID, "system", "", map[string]any{
|
|
"runtimes": resp,
|
|
})
|
|
|
|
// Include workspace repos so the daemon can cache them locally.
|
|
var repos []RepoData
|
|
if ws.Repos != nil {
|
|
json.Unmarshal(ws.Repos, &repos)
|
|
}
|
|
if repos == nil {
|
|
repos = []RepoData{}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{"runtimes": resp, "repos": repos})
|
|
}
|
|
|
|
// DaemonDeregister marks runtimes as offline when the daemon shuts down.
|
|
func (h *Handler) DaemonDeregister(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
RuntimeIDs []string `json:"runtime_ids"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
if len(req.RuntimeIDs) == 0 {
|
|
writeError(w, http.StatusBadRequest, "runtime_ids is required")
|
|
return
|
|
}
|
|
|
|
// Track affected workspaces for WS notifications.
|
|
affectedWorkspaces := make(map[string]bool)
|
|
|
|
for _, rid := range req.RuntimeIDs {
|
|
// Look up the runtime to find its workspace.
|
|
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(rid))
|
|
if err != nil {
|
|
slog.Warn("deregister: runtime not found", "runtime_id", rid, "error", err)
|
|
continue
|
|
}
|
|
|
|
if err := h.Queries.SetAgentRuntimeOffline(r.Context(), parseUUID(rid)); err != nil {
|
|
slog.Warn("deregister: failed to set offline", "runtime_id", rid, "error", err)
|
|
continue
|
|
}
|
|
|
|
affectedWorkspaces[uuidToString(rt.WorkspaceID)] = true
|
|
}
|
|
|
|
// Notify frontend clients so they re-fetch runtime list.
|
|
for wsID := range affectedWorkspaces {
|
|
h.publish(protocol.EventDaemonRegister, wsID, "system", "", map[string]any{
|
|
"action": "deregister",
|
|
})
|
|
}
|
|
|
|
slog.Info("daemon deregistered", "runtime_ids", req.RuntimeIDs)
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
type DaemonHeartbeatRequest struct {
|
|
RuntimeID string `json:"runtime_id"`
|
|
}
|
|
|
|
func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
|
var req DaemonHeartbeatRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
if req.RuntimeID == "" {
|
|
writeError(w, http.StatusBadRequest, "runtime_id is required")
|
|
return
|
|
}
|
|
|
|
_, err := h.Queries.UpdateAgentRuntimeHeartbeat(r.Context(), parseUUID(req.RuntimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "heartbeat failed")
|
|
return
|
|
}
|
|
|
|
slog.Debug("daemon heartbeat", "runtime_id", req.RuntimeID)
|
|
|
|
resp := map[string]any{"status": "ok"}
|
|
|
|
// Check for pending ping requests for this runtime.
|
|
if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil {
|
|
resp["pending_ping"] = map[string]string{"id": pending.ID}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// ClaimTaskByRuntime atomically claims the next queued task for a runtime.
|
|
// The response includes the agent's name and skills, fetched fresh from the DB.
|
|
func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
|
|
task, err := h.TaskService.ClaimTaskForRuntime(r.Context(), parseUUID(runtimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to claim task: "+err.Error())
|
|
return
|
|
}
|
|
|
|
if task == nil {
|
|
slog.Debug("no task to claim", "runtime_id", runtimeID)
|
|
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
|
|
return
|
|
}
|
|
|
|
// Build response with fresh agent data (name + skills).
|
|
resp := taskToResponse(*task)
|
|
if agent, err := h.Queries.GetAgent(r.Context(), task.AgentID); err == nil {
|
|
skills := h.TaskService.LoadAgentSkills(r.Context(), task.AgentID)
|
|
resp.Agent = &TaskAgentData{
|
|
ID: uuidToString(agent.ID),
|
|
Name: agent.Name,
|
|
Instructions: agent.Instructions,
|
|
Skills: skills,
|
|
}
|
|
}
|
|
|
|
// Include workspace repos so the daemon can set up worktrees.
|
|
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
|
|
if ws, err := h.Queries.GetWorkspace(r.Context(), issue.WorkspaceID); err == nil && ws.Repos != nil {
|
|
var repos []RepoData
|
|
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
|
|
resp.Repos = repos
|
|
}
|
|
}
|
|
}
|
|
|
|
// Look up the prior session for this (agent, issue) pair so the daemon
|
|
// can resume the Claude Code conversation context.
|
|
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
|
|
AgentID: task.AgentID,
|
|
IssueID: task.IssueID,
|
|
}); err == nil && prior.SessionID.Valid {
|
|
resp.PriorSessionID = prior.SessionID.String
|
|
if prior.WorkDir.Valid {
|
|
resp.PriorWorkDir = prior.WorkDir.String
|
|
}
|
|
}
|
|
|
|
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
|
|
writeJSON(w, http.StatusOK, map[string]any{"task": resp})
|
|
}
|
|
|
|
// ListPendingTasksByRuntime returns queued/dispatched tasks for a runtime.
|
|
func (h *Handler) ListPendingTasksByRuntime(w http.ResponseWriter, r *http.Request) {
|
|
runtimeID := chi.URLParam(r, "runtimeId")
|
|
|
|
tasks, err := h.Queries.ListPendingTasksByRuntime(r.Context(), parseUUID(runtimeID))
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list pending tasks")
|
|
return
|
|
}
|
|
|
|
resp := make([]AgentTaskResponse, len(tasks))
|
|
for i, t := range tasks {
|
|
resp[i] = taskToResponse(t)
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Task Lifecycle (called by daemon)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// StartTask marks a dispatched task as running.
|
|
func (h *Handler) StartTask(w http.ResponseWriter, r *http.Request) {
|
|
taskID := chi.URLParam(r, "taskId")
|
|
|
|
task, err := h.TaskService.StartTask(r.Context(), parseUUID(taskID))
|
|
if err != nil {
|
|
slog.Warn("start task failed", "task_id", taskID, "error", err)
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
slog.Info("task started", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
|
|
writeJSON(w, http.StatusOK, taskToResponse(*task))
|
|
}
|
|
|
|
// ReportTaskProgress broadcasts a progress update.
|
|
type TaskProgressRequest struct {
|
|
Summary string `json:"summary"`
|
|
Step int `json:"step"`
|
|
Total int `json:"total"`
|
|
}
|
|
|
|
func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) {
|
|
taskID := chi.URLParam(r, "taskId")
|
|
|
|
var req TaskProgressRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
// Look up task to get workspace ID via the associated issue.
|
|
workspaceID := ""
|
|
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
|
|
if err == nil {
|
|
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
|
|
workspaceID = uuidToString(issue.WorkspaceID)
|
|
}
|
|
}
|
|
|
|
h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total)
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
// CompleteTask marks a running task as completed.
|
|
type TaskCompleteRequest struct {
|
|
PRURL string `json:"pr_url"`
|
|
Output string `json:"output"`
|
|
SessionID string `json:"session_id"` // Claude session ID for future resumption
|
|
WorkDir string `json:"work_dir"` // working directory used during execution
|
|
}
|
|
|
|
func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
|
|
taskID := chi.URLParam(r, "taskId")
|
|
|
|
var req TaskCompleteRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
result, _ := json.Marshal(req)
|
|
task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir)
|
|
if err != nil {
|
|
slog.Warn("complete task failed", "task_id", taskID, "error", err)
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
slog.Info("task completed", "task_id", taskID, "agent_id", uuidToString(task.AgentID))
|
|
writeJSON(w, http.StatusOK, taskToResponse(*task))
|
|
}
|
|
|
|
// GetTaskStatus returns the current status of a task.
|
|
// Used by the daemon to check whether a task was cancelled mid-execution.
|
|
func (h *Handler) GetTaskStatus(w http.ResponseWriter, r *http.Request) {
|
|
taskID := chi.URLParam(r, "taskId")
|
|
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": task.Status})
|
|
}
|
|
|
|
// FailTask marks a running task as failed.
|
|
type TaskFailRequest struct {
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
func (h *Handler) FailTask(w http.ResponseWriter, r *http.Request) {
|
|
taskID := chi.URLParam(r, "taskId")
|
|
|
|
var req TaskFailRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
|
|
task, err := h.TaskService.FailTask(r.Context(), parseUUID(taskID), req.Error)
|
|
if err != nil {
|
|
slog.Warn("fail task failed", "task_id", taskID, "error", err)
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
slog.Info("task failed", "task_id", taskID, "agent_id", uuidToString(task.AgentID), "task_error", req.Error)
|
|
writeJSON(w, http.StatusOK, taskToResponse(*task))
|
|
}
|