multica/server/internal/handler/agent.go
Naiyuan Qing 9ede795c5b feat(api): strict workspace isolation + agent parity fixes
Enforce workspace isolation at every layer:

- Router: move RequireWorkspaceMember middleware to group level so ALL
  workspace-scoped routes (issues, agents, skills, runtimes, inbox,
  comments) require workspace context
- SQL: add GetXxxInWorkspace queries that filter by workspace_id,
  eliminating cross-workspace data access at the query level
- Handlers: loadXForUser functions use workspace-scoped queries,
  no fallback to unscoped queries
- Migration 025: add workspace_id column to comment table with backfill
- ListComments: add workspace_id filter for defense-in-depth

Fix daemon workspace mapping:
- Server returns workspace_id in task claim response (from issue)
- Daemon uses task.WorkspaceID directly instead of unreliable
  workspaceIDForRuntime() local map lookup
- Remove workspaceIDForRuntime function

Fix agent/human parity:
- Comment update/delete: use resolveActor for isAuthor check so agents
  can edit/delete their own comments
- Event attribution: replace hardcoded "member" with resolveActor in
  agent, skill, and subscriber publish calls

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:49:13 +08:00

453 lines
14 KiB
Go

package handler
import (
"encoding/json"
"log/slog"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/logger"
"github.com/multica-ai/multica/server/internal/service"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type AgentResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
RuntimeID string `json:"runtime_id"`
Name string `json:"name"`
Description string `json:"description"`
Instructions string `json:"instructions"`
AvatarURL *string `json:"avatar_url"`
RuntimeMode string `json:"runtime_mode"`
RuntimeConfig any `json:"runtime_config"`
Visibility string `json:"visibility"`
Status string `json:"status"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
OwnerID *string `json:"owner_id"`
Skills []SkillResponse `json:"skills"`
Tools any `json:"tools"`
Triggers any `json:"triggers"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
func agentToResponse(a db.Agent) AgentResponse {
var rc any
if a.RuntimeConfig != nil {
json.Unmarshal(a.RuntimeConfig, &rc)
}
if rc == nil {
rc = map[string]any{}
}
var tools any
if a.Tools != nil {
json.Unmarshal(a.Tools, &tools)
}
if tools == nil {
tools = []any{}
}
var triggers any
if a.Triggers != nil {
json.Unmarshal(a.Triggers, &triggers)
}
if triggers == nil {
triggers = []any{}
}
return AgentResponse{
ID: uuidToString(a.ID),
WorkspaceID: uuidToString(a.WorkspaceID),
RuntimeID: uuidToString(a.RuntimeID),
Name: a.Name,
Description: a.Description,
Instructions: a.Instructions,
AvatarURL: textToPtr(a.AvatarUrl),
RuntimeMode: a.RuntimeMode,
RuntimeConfig: rc,
Visibility: a.Visibility,
Status: a.Status,
MaxConcurrentTasks: a.MaxConcurrentTasks,
OwnerID: uuidToPtr(a.OwnerID),
Skills: []SkillResponse{},
Tools: tools,
Triggers: triggers,
CreatedAt: timestampToString(a.CreatedAt),
UpdatedAt: timestampToString(a.UpdatedAt),
}
}
// RepoData holds repository information included in claim responses so the
// daemon can set up worktrees for each workspace repo.
type RepoData struct {
URL string `json:"url"`
Description string `json:"description"`
}
type AgentTaskResponse struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
WorkspaceID string `json:"workspace_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
DispatchedAt *string `json:"dispatched_at"`
StartedAt *string `json:"started_at"`
CompletedAt *string `json:"completed_at"`
Result any `json:"result"`
Error *string `json:"error"`
Agent *TaskAgentData `json:"agent,omitempty"`
Repos []RepoData `json:"repos,omitempty"`
CreatedAt string `json:"created_at"`
PriorSessionID string `json:"prior_session_id,omitempty"` // session ID from a previous task on same issue
PriorWorkDir string `json:"prior_work_dir,omitempty"` // work_dir from a previous task on same issue
}
// TaskAgentData holds agent info included in claim responses so the daemon
// can set up the execution environment (branch naming, skill files, instructions).
type TaskAgentData struct {
ID string `json:"id"`
Name string `json:"name"`
Instructions string `json:"instructions"`
Skills []service.AgentSkillData `json:"skills,omitempty"`
}
func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse {
var result any
if t.Result != nil {
json.Unmarshal(t.Result, &result)
}
return AgentTaskResponse{
ID: uuidToString(t.ID),
AgentID: uuidToString(t.AgentID),
RuntimeID: uuidToString(t.RuntimeID),
IssueID: uuidToString(t.IssueID),
Status: t.Status,
Priority: t.Priority,
DispatchedAt: timestampToPtr(t.DispatchedAt),
StartedAt: timestampToPtr(t.StartedAt),
CompletedAt: timestampToPtr(t.CompletedAt),
Result: result,
Error: textToPtr(t.Error),
CreatedAt: timestampToString(t.CreatedAt),
}
}
func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
member, ok := h.workspaceMember(w, r, workspaceID)
if !ok {
return
}
agents, err := h.Queries.ListAgents(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list agents")
return
}
userID := requestUserID(r)
isAdmin := roleAllowed(member.Role, "owner", "admin")
// Batch-load skills for all agents to avoid N+1.
skillRows, err := h.Queries.ListAgentSkillsByWorkspace(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load agent skills")
return
}
skillMap := map[string][]SkillResponse{}
for _, row := range skillRows {
agentID := uuidToString(row.AgentID)
skillMap[agentID] = append(skillMap[agentID], SkillResponse{
ID: uuidToString(row.ID),
Name: row.Name,
Description: row.Description,
})
}
// Filter private agents: only visible to owner_id or workspace admin
var visible []AgentResponse
for _, a := range agents {
if a.Visibility == "private" && !isAdmin && uuidToString(a.OwnerID) != userID {
continue
}
resp := agentToResponse(a)
if skills, ok := skillMap[resp.ID]; ok {
resp.Skills = skills
}
visible = append(visible, resp)
}
if visible == nil {
visible = []AgentResponse{}
}
writeJSON(w, http.StatusOK, visible)
}
func (h *Handler) GetAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
resp := agentToResponse(agent)
skills, err := h.Queries.ListAgentSkills(r.Context(), agent.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to load agent skills")
return
}
if len(skills) > 0 {
resp.Skills = make([]SkillResponse, len(skills))
for i, s := range skills {
resp.Skills[i] = skillToResponse(s)
}
}
writeJSON(w, http.StatusOK, resp)
}
type CreateAgentRequest struct {
Name string `json:"name"`
Description string `json:"description"`
Instructions string `json:"instructions"`
AvatarURL *string `json:"avatar_url"`
RuntimeID string `json:"runtime_id"`
RuntimeConfig any `json:"runtime_config"`
Visibility string `json:"visibility"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
Tools any `json:"tools"`
Triggers any `json:"triggers"`
}
func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
var req CreateAgentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
ownerID, ok := requireUserID(w, r)
if !ok {
return
}
if req.Name == "" {
writeError(w, http.StatusBadRequest, "name is required")
return
}
if req.RuntimeID == "" {
writeError(w, http.StatusBadRequest, "runtime_id is required")
return
}
if req.Visibility == "" {
req.Visibility = "workspace"
}
if req.MaxConcurrentTasks == 0 {
req.MaxConcurrentTasks = 6
}
runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{
ID: parseUUID(req.RuntimeID),
WorkspaceID: parseUUID(workspaceID),
})
if err != nil {
writeError(w, http.StatusBadRequest, "invalid runtime_id")
return
}
rc, _ := json.Marshal(req.RuntimeConfig)
if req.RuntimeConfig == nil {
rc = []byte("{}")
}
tools, _ := json.Marshal(req.Tools)
if req.Tools == nil {
tools = []byte("[]")
}
triggers, _ := json.Marshal(req.Triggers)
if req.Triggers == nil {
triggers = []byte("[]")
}
agent, err := h.Queries.CreateAgent(r.Context(), db.CreateAgentParams{
WorkspaceID: parseUUID(workspaceID),
Name: req.Name,
Description: req.Description,
Instructions: req.Instructions,
AvatarUrl: ptrToText(req.AvatarURL),
RuntimeMode: runtime.RuntimeMode,
RuntimeConfig: rc,
RuntimeID: runtime.ID,
Visibility: req.Visibility,
MaxConcurrentTasks: req.MaxConcurrentTasks,
OwnerID: parseUUID(ownerID),
Tools: tools,
Triggers: triggers,
})
if err != nil {
slog.Warn("create agent failed", append(logger.RequestAttrs(r), "error", err, "workspace_id", workspaceID)...)
writeError(w, http.StatusInternalServerError, "failed to create agent: "+err.Error())
return
}
slog.Info("agent created", append(logger.RequestAttrs(r), "agent_id", uuidToString(agent.ID), "name", agent.Name, "workspace_id", workspaceID)...)
if runtime.Status == "online" {
h.TaskService.ReconcileAgentStatus(r.Context(), agent.ID)
agent, _ = h.Queries.GetAgent(r.Context(), agent.ID)
}
resp := agentToResponse(agent)
actorType, actorID := h.resolveActor(r, ownerID, workspaceID)
h.publish(protocol.EventAgentCreated, workspaceID, actorType, actorID, map[string]any{"agent": resp})
writeJSON(w, http.StatusCreated, resp)
}
type UpdateAgentRequest struct {
Name *string `json:"name"`
Description *string `json:"description"`
Instructions *string `json:"instructions"`
AvatarURL *string `json:"avatar_url"`
RuntimeID *string `json:"runtime_id"`
RuntimeConfig any `json:"runtime_config"`
Visibility *string `json:"visibility"`
Status *string `json:"status"`
MaxConcurrentTasks *int32 `json:"max_concurrent_tasks"`
Tools any `json:"tools"`
Triggers any `json:"triggers"`
}
func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
if _, ok := h.requireWorkspaceRole(w, r, uuidToString(agent.WorkspaceID), "agent not found", "owner", "admin"); !ok {
return
}
var req UpdateAgentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
params := db.UpdateAgentParams{
ID: parseUUID(id),
}
if req.Name != nil {
params.Name = pgtype.Text{String: *req.Name, Valid: true}
}
if req.Description != nil {
params.Description = pgtype.Text{String: *req.Description, Valid: true}
}
if req.Instructions != nil {
params.Instructions = pgtype.Text{String: *req.Instructions, Valid: true}
}
if req.AvatarURL != nil {
params.AvatarUrl = pgtype.Text{String: *req.AvatarURL, Valid: true}
}
if req.RuntimeConfig != nil {
rc, _ := json.Marshal(req.RuntimeConfig)
params.RuntimeConfig = rc
}
if req.RuntimeID != nil {
runtime, err := h.Queries.GetAgentRuntimeForWorkspace(r.Context(), db.GetAgentRuntimeForWorkspaceParams{
ID: parseUUID(*req.RuntimeID),
WorkspaceID: agent.WorkspaceID,
})
if err != nil {
writeError(w, http.StatusBadRequest, "invalid runtime_id")
return
}
params.RuntimeID = runtime.ID
params.RuntimeMode = pgtype.Text{String: runtime.RuntimeMode, Valid: true}
}
if req.Visibility != nil {
params.Visibility = pgtype.Text{String: *req.Visibility, Valid: true}
}
if req.Status != nil {
params.Status = pgtype.Text{String: *req.Status, Valid: true}
}
if req.MaxConcurrentTasks != nil {
params.MaxConcurrentTasks = pgtype.Int4{Int32: *req.MaxConcurrentTasks, Valid: true}
}
if req.Tools != nil {
tools, _ := json.Marshal(req.Tools)
params.Tools = tools
}
if req.Triggers != nil {
triggers, _ := json.Marshal(req.Triggers)
params.Triggers = triggers
}
agent, err := h.Queries.UpdateAgent(r.Context(), params)
if err != nil {
slog.Warn("update agent failed", append(logger.RequestAttrs(r), "error", err, "agent_id", id)...)
writeError(w, http.StatusInternalServerError, "failed to update agent: "+err.Error())
return
}
resp := agentToResponse(agent)
slog.Info("agent updated", append(logger.RequestAttrs(r), "agent_id", id, "workspace_id", uuidToString(agent.WorkspaceID))...)
userID := requestUserID(r)
actorType, actorID := h.resolveActor(r, userID, uuidToString(agent.WorkspaceID))
h.publish(protocol.EventAgentStatus, uuidToString(agent.WorkspaceID), actorType, actorID, map[string]any{"agent": resp})
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) DeleteAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
wsID := uuidToString(agent.WorkspaceID)
// Require owner or admin role
if _, ok := h.requireWorkspaceRole(w, r, wsID, "agent not found", "owner", "admin"); !ok {
return
}
err := h.Queries.DeleteAgent(r.Context(), parseUUID(id))
if err != nil {
slog.Warn("delete agent failed", append(logger.RequestAttrs(r), "error", err, "agent_id", id)...)
writeError(w, http.StatusInternalServerError, "failed to delete agent")
return
}
slog.Info("agent deleted", append(logger.RequestAttrs(r), "agent_id", id, "workspace_id", wsID)...)
userID := requestUserID(r)
actorType, actorID := h.resolveActor(r, userID, wsID)
h.publish(protocol.EventAgentDeleted, wsID, actorType, actorID, map[string]any{"agent_id": id, "workspace_id": wsID})
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) ListAgentTasks(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if _, ok := h.loadAgentForUser(w, r, id); !ok {
return
}
tasks, err := h.Queries.ListAgentTasks(r.Context(), parseUUID(id))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list agent tasks")
return
}
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, resp)
}