multica/server/internal/handler/agent.go
Jiayuan ce2b263ea5 feat(daemon): reuse workdir across tasks on same agent+issue pair
Previously each task created a fresh workdir via execenv.Prepare(), even
when resuming work on the same (agent, issue). This caused the agent's
session context to be out of sync with a blank code state.

Now the server returns prior_work_dir in the claim response, and the
daemon tries execenv.Reuse() first — which wraps the existing directory,
detects git worktree state, and refreshes context files. Falls back to
Prepare() if the prior workdir no longer exists. Workdirs are no longer
cleaned up after task completion so they remain available for reuse.
2026-03-29 18:40:29 +08:00

444 lines
14 KiB
Go

package handler
import (
"encoding/json"
"log/slog"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/logger"
"github.com/multica-ai/multica/server/internal/service"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type AgentResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
RuntimeID string `json:"runtime_id"`
Name string `json:"name"`
Description string `json:"description"`
Instructions string `json:"instructions"`
AvatarURL *string `json:"avatar_url"`
RuntimeMode string `json:"runtime_mode"`
RuntimeConfig any `json:"runtime_config"`
Visibility string `json:"visibility"`
Status string `json:"status"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
OwnerID *string `json:"owner_id"`
Skills []SkillResponse `json:"skills"`
Tools any `json:"tools"`
Triggers any `json:"triggers"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
func agentToResponse(a db.Agent) AgentResponse {
var rc any
if a.RuntimeConfig != nil {
json.Unmarshal(a.RuntimeConfig, &rc)
}
if rc == nil {
rc = map[string]any{}
}
var tools any
if a.Tools != nil {
json.Unmarshal(a.Tools, &tools)
}
if tools == nil {
tools = []any{}
}
var triggers any
if a.Triggers != nil {
json.Unmarshal(a.Triggers, &triggers)
}
if triggers == nil {
triggers = []any{}
}
return AgentResponse{
ID: uuidToString(a.ID),
WorkspaceID: uuidToString(a.WorkspaceID),
RuntimeID: uuidToString(a.RuntimeID),
Name: a.Name,
Description: a.Description,
Instructions: a.Instructions,
AvatarURL: textToPtr(a.AvatarUrl),
RuntimeMode: a.RuntimeMode,
RuntimeConfig: rc,
Visibility: a.Visibility,
Status: a.Status,
MaxConcurrentTasks: a.MaxConcurrentTasks,
OwnerID: uuidToPtr(a.OwnerID),
Skills: []SkillResponse{},
Tools: tools,
Triggers: triggers,
CreatedAt: timestampToString(a.CreatedAt),
UpdatedAt: timestampToString(a.UpdatedAt),
}
}
type AgentTaskResponse struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
DispatchedAt *string `json:"dispatched_at"`
StartedAt *string `json:"started_at"`
CompletedAt *string `json:"completed_at"`
Result any `json:"result"`
Error *string `json:"error"`
Agent *TaskAgentData `json:"agent,omitempty"`
CreatedAt string `json:"created_at"`
PriorSessionID string `json:"prior_session_id,omitempty"` // session ID from a previous task on same issue
PriorWorkDir string `json:"prior_work_dir,omitempty"` // work_dir from a previous task on same issue
}
// TaskAgentData holds agent info included in claim responses so the daemon
// can set up the execution environment (branch naming, skill files, instructions).
type TaskAgentData struct {
ID string `json:"id"`
Name string `json:"name"`
Instructions string `json:"instructions"`
Skills []service.AgentSkillData `json:"skills,omitempty"`
}
func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse {
var result any
if t.Result != nil {
json.Unmarshal(t.Result, &result)
}
return AgentTaskResponse{
ID: uuidToString(t.ID),
AgentID: uuidToString(t.AgentID),
RuntimeID: uuidToString(t.RuntimeID),
IssueID: uuidToString(t.IssueID),
Status: t.Status,
Priority: t.Priority,
DispatchedAt: timestampToPtr(t.DispatchedAt),
StartedAt: timestampToPtr(t.StartedAt),
CompletedAt: timestampToPtr(t.CompletedAt),
Result: result,
Error: textToPtr(t.Error),
CreatedAt: timestampToString(t.CreatedAt),
}
}
func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
member, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found")
if !ok {
return
}
agents, err := h.Queries.ListAgents(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list agents")
return
}
userID := requestUserID(r)
isAdmin := roleAllowed(member.Role, "owner", "admin")
// Batch-load skills for all agents to avoid N+1.
skillRows, err := h.Queries.ListAgentSkillsByWorkspace(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load agent skills")
return
}
skillMap := map[string][]SkillResponse{}
for _, row := range skillRows {
agentID := uuidToString(row.AgentID)
skillMap[agentID] = append(skillMap[agentID], SkillResponse{
ID: uuidToString(row.ID),
Name: row.Name,
Description: row.Description,
})
}
// Filter private agents: only visible to owner_id or workspace admin
var visible []AgentResponse
for _, a := range agents {
if a.Visibility == "private" && !isAdmin && uuidToString(a.OwnerID) != userID {
continue
}
resp := agentToResponse(a)
if skills, ok := skillMap[resp.ID]; ok {
resp.Skills = skills
}
visible = append(visible, resp)
}
if visible == nil {
visible = []AgentResponse{}
}
writeJSON(w, http.StatusOK, visible)
}
func (h *Handler) GetAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
resp := agentToResponse(agent)
skills, err := h.Queries.ListAgentSkills(r.Context(), agent.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load agent skills")
return
}
if len(skills) > 0 {
resp.Skills = make([]SkillResponse, len(skills))
for i, s := range skills {
resp.Skills[i] = skillToResponse(s)
}
}
writeJSON(w, http.StatusOK, resp)
}
type CreateAgentRequest struct {
Name string `json:"name"`
Description string `json:"description"`
Instructions string `json:"instructions"`
AvatarURL *string `json:"avatar_url"`
RuntimeID string `json:"runtime_id"`
RuntimeConfig any `json:"runtime_config"`
Visibility string `json:"visibility"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
Tools any `json:"tools"`
Triggers any `json:"triggers"`
}
func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
if _, ok := h.requireWorkspaceRole(w, r, workspaceID, "workspace not found", "owner", "admin"); !ok {
return
}
var req CreateAgentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
ownerID, ok := requireUserID(w, r)
if !ok {
return
}
if req.Name == "" {
writeError(w, http.StatusBadRequest, "name is required")
return
}
if req.RuntimeID == "" {
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
if req.Visibility == "" {
req.Visibility = "workspace"
}
if req.MaxConcurrentTasks == 0 {
req.MaxConcurrentTasks = 1
}
runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{
ID: parseUUID(req.RuntimeID),
WorkspaceID: parseUUID(workspaceID),
})
if err != nil {
writeError(w, http.StatusBadRequest, "invalid runtime_id")
return
}
rc, _ := json.Marshal(req.RuntimeConfig)
if req.RuntimeConfig == nil {
rc = []byte("{}")
}
tools, _ := json.Marshal(req.Tools)
if req.Tools == nil {
tools = []byte("[]")
}
triggers, _ := json.Marshal(req.Triggers)
if req.Triggers == nil {
triggers = []byte("[]")
}
agent, err := h.Queries.CreateAgent(r.Context(), db.CreateAgentParams{
WorkspaceID: parseUUID(workspaceID),
Name: req.Name,
Description: req.Description,
Instructions: req.Instructions,
AvatarUrl: ptrToText(req.AvatarURL),
RuntimeMode: runtime.RuntimeMode,
RuntimeConfig: rc,
RuntimeID: runtime.ID,
Visibility: req.Visibility,
MaxConcurrentTasks: req.MaxConcurrentTasks,
OwnerID: parseUUID(ownerID),
Tools: tools,
Triggers: triggers,
})
if err != nil {
slog.Warn("create agent failed", append(logger.RequestAttrs(r), "error", err, "workspace_id", workspaceID)...)
writeError(w, http.StatusInternalServerError, "failed to create agent: "+err.Error())
return
}
slog.Info("agent created", append(logger.RequestAttrs(r), "agent_id", uuidToString(agent.ID), "name", agent.Name, "workspace_id", workspaceID)...)
if runtime.Status == "online" {
h.TaskService.ReconcileAgentStatus(r.Context(), agent.ID)
agent, _ = h.Queries.GetAgent(r.Context(), agent.ID)
}
resp := agentToResponse(agent)
h.publish(protocol.EventAgentCreated, workspaceID, "member", ownerID, map[string]any{"agent": resp})
writeJSON(w, http.StatusCreated, resp)
}
type UpdateAgentRequest struct {
Name *string `json:"name"`
Description *string `json:"description"`
Instructions *string `json:"instructions"`
AvatarURL *string `json:"avatar_url"`
RuntimeID *string `json:"runtime_id"`
RuntimeConfig any `json:"runtime_config"`
Visibility *string `json:"visibility"`
Status *string `json:"status"`
MaxConcurrentTasks *int32 `json:"max_concurrent_tasks"`
Tools any `json:"tools"`
Triggers any `json:"triggers"`
}
func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
if _, ok := h.requireWorkspaceRole(w, r, uuidToString(agent.WorkspaceID), "agent not found", "owner", "admin"); !ok {
return
}
var req UpdateAgentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
params := db.UpdateAgentParams{
ID: parseUUID(id),
}
if req.Name != nil {
params.Name = pgtype.Text{String: *req.Name, Valid: true}
}
if req.Description != nil {
params.Description = pgtype.Text{String: *req.Description, Valid: true}
}
if req.Instructions != nil {
params.Instructions = pgtype.Text{String: *req.Instructions, Valid: true}
}
if req.AvatarURL != nil {
params.AvatarUrl = pgtype.Text{String: *req.AvatarURL, Valid: true}
}
if req.RuntimeConfig != nil {
rc, _ := json.Marshal(req.RuntimeConfig)
params.RuntimeConfig = rc
}
if req.RuntimeID != nil {
runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{
ID: parseUUID(*req.RuntimeID),
WorkspaceID: agent.WorkspaceID,
})
if err != nil {
writeError(w, http.StatusBadRequest, "invalid runtime_id")
return
}
params.RuntimeID = runtime.ID
params.RuntimeMode = pgtype.Text{String: runtime.RuntimeMode, Valid: true}
}
if req.Visibility != nil {
params.Visibility = pgtype.Text{String: *req.Visibility, Valid: true}
}
if req.Status != nil {
params.Status = pgtype.Text{String: *req.Status, Valid: true}
}
if req.MaxConcurrentTasks != nil {
params.MaxConcurrentTasks = pgtype.Int4{Int32: *req.MaxConcurrentTasks, Valid: true}
}
if req.Tools != nil {
tools, _ := json.Marshal(req.Tools)
params.Tools = tools
}
if req.Triggers != nil {
triggers, _ := json.Marshal(req.Triggers)
params.Triggers = triggers
}
agent, err := h.Queries.UpdateAgent(r.Context(), params)
if err != nil {
slog.Warn("update agent failed", append(logger.RequestAttrs(r), "error", err, "agent_id", id)...)
writeError(w, http.StatusInternalServerError, "failed to update agent: "+err.Error())
return
}
resp := agentToResponse(agent)
slog.Info("agent updated", append(logger.RequestAttrs(r), "agent_id", id, "workspace_id", uuidToString(agent.WorkspaceID))...)
userID := requestUserID(r)
h.publish(protocol.EventAgentStatus, uuidToString(agent.WorkspaceID), "member", userID, map[string]any{"agent": resp})
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) DeleteAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
wsID := uuidToString(agent.WorkspaceID)
// Require owner or admin role
if _, ok := h.requireWorkspaceRole(w, r, wsID, "agent not found", "owner", "admin"); !ok {
return
}
err := h.Queries.DeleteAgent(r.Context(), parseUUID(id))
if err != nil {
slog.Warn("delete agent failed", append(logger.RequestAttrs(r), "error", err, "agent_id", id)...)
writeError(w, http.StatusInternalServerError, "failed to delete agent")
return
}
slog.Info("agent deleted", append(logger.RequestAttrs(r), "agent_id", id, "workspace_id", wsID)...)
userID := requestUserID(r)
h.publish(protocol.EventAgentDeleted, wsID, "member", userID, map[string]any{"agent_id": id, "workspace_id": wsID})
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) ListAgentTasks(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if _, ok := h.loadAgentForUser(w, r, id); !ok {
return
}
tasks, err := h.Queries.ListAgentTasks(r.Context(), parseUUID(id))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list agent tasks")
return
}
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, resp)
}