multica/server/internal/service/task.go
LinYushen 09764c5f51
feat(agent): replace hard delete with archive/restore (#346)
* feat(agent): replace hard delete with archive/restore

Replace agent deletion with soft archive pattern. Archived agents
are preserved in the database with all historical references intact
but cannot be assigned, mentioned, or trigger tasks.

Backend:
- Add archived_at/archived_by columns to agent table (migration 031)
- Replace DELETE /api/agents/{id} with POST /api/agents/{id}/archive
- Add POST /api/agents/{id}/restore endpoint
- ListAgents excludes archived by default (?include_archived=true to include)
- Skip archived agents in task triggers (on_assign, on_comment, on_mention)
- Block assignment to archived agents
- Cancel pending tasks on archive
- New events: agent:archived, agent:restored (replacing agent:deleted)

Frontend:
- Agent type includes archived_at/archived_by fields
- Mention autocomplete and assignee picker filter out archived agents
- Agent list shows archived agents with muted styling
- Agent detail shows archive banner with restore button
- Delete button replaced with Archive button and updated confirmation dialog
- API client: archiveAgent/restoreAgent replace deleteAgent

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(agent): self-review fixes for archive feature

- Fix: workspace store now fetches agents with include_archived=true
  so archived agents are actually visible in the frontend (the archived
  UI was dead code before — ListAgents excludes archived by default)
- Fix: add error logging for CancelAgentTasksByAgent in ArchiveAgent
- Fix: add idempotency guards — return 409 Conflict when archiving
  an already-archived agent or restoring a non-archived agent
- Fix: revert unnecessary extra GetAgent query in ReconcileAgentStatus
  (archived agents won't have running tasks after CancelAgentTasksByAgent)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 17:33:52 +08:00

563 lines
19 KiB
Go

package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strconv"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/mention"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/multica-ai/multica/server/pkg/redact"
)
type TaskService struct {
Queries *db.Queries
Hub *realtime.Hub
Bus *events.Bus
}
func NewTaskService(q *db.Queries, hub *realtime.Hub, bus *events.Bus) *TaskService {
return &TaskService{Queries: q, Hub: hub, Bus: bus}
}
// EnqueueTaskForIssue creates a queued task for an agent-assigned issue.
// No context snapshot is stored — the agent fetches all data it needs at
// runtime via the multica CLI.
func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, triggerCommentID ...pgtype.UUID) (db.AgentTaskQueue, error) {
if !issue.AssigneeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "issue has no assignee")
return db.AgentTaskQueue{}, fmt.Errorf("issue has no assignee")
}
agent, err := s.Queries.GetAgent(ctx, issue.AssigneeID)
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("task enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agent.ID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", "agent has no runtime")
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
var commentID pgtype.UUID
if len(triggerCommentID) > 0 {
commentID = triggerCommentID[0]
}
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: issue.AssigneeID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: commentID,
})
if err != nil {
slog.Error("task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(issue.AssigneeID))
return task, nil
}
// EnqueueTaskForMention creates a queued task for a mentioned agent on an issue.
// Unlike EnqueueTaskForIssue, this takes an explicit agent ID rather than
// deriving it from the issue assignee.
func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
slog.Error("mention task enqueue failed: agent not found", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
slog.Debug("mention task enqueue skipped: agent is archived", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
slog.Error("mention task enqueue failed: agent has no runtime", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
}
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
AgentID: agentID,
RuntimeID: agent.RuntimeID,
IssueID: issue.ID,
Priority: priorityToInt(issue.Priority),
TriggerCommentID: triggerCommentID,
})
if err != nil {
slog.Error("mention task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
}
slog.Info("mention task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
return task, nil
}
// CancelTasksForIssue cancels all active tasks for an issue.
func (s *TaskService) CancelTasksForIssue(ctx context.Context, issueID pgtype.UUID) error {
return s.Queries.CancelAgentTasksByIssue(ctx, issueID)
}
// CancelTask cancels a single task by ID. It broadcasts a task:cancelled event
// so frontends can update immediately.
func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.CancelAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
}
slog.Info("task cancelled", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast cancellation as a task:failed event so frontends clear the live card
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, task)
return &task, nil
}
// ClaimTask atomically claims the next queued task for an agent,
// respecting max_concurrent_tasks.
func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.AgentTaskQueue, error) {
agent, err := s.Queries.GetAgent(ctx, agentID)
if err != nil {
return nil, fmt.Errorf("agent not found: %w", err)
}
running, err := s.Queries.CountRunningTasks(ctx, agentID)
if err != nil {
return nil, fmt.Errorf("count running tasks: %w", err)
}
if running >= int64(agent.MaxConcurrentTasks) {
slog.Debug("task claim: no capacity", "agent_id", util.UUIDToString(agentID), "running", running, "max", agent.MaxConcurrentTasks)
return nil, nil // No capacity
}
task, err := s.Queries.ClaimAgentTask(ctx, agentID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
slog.Debug("task claim: no tasks available", "agent_id", util.UUIDToString(agentID))
return nil, nil // No tasks available
}
return nil, fmt.Errorf("claim task: %w", err)
}
slog.Info("task claimed", "task_id", util.UUIDToString(task.ID), "agent_id", util.UUIDToString(agentID))
// Update agent status to working
s.updateAgentStatus(ctx, agentID, "working")
// Broadcast task:dispatch
s.broadcastTaskDispatch(ctx, task)
return &task, nil
}
// ClaimTaskForRuntime claims the next runnable task for a runtime while
// still respecting each agent's max_concurrent_tasks limit.
func (s *TaskService) ClaimTaskForRuntime(ctx context.Context, runtimeID pgtype.UUID) (*db.AgentTaskQueue, error) {
tasks, err := s.Queries.ListPendingTasksByRuntime(ctx, runtimeID)
if err != nil {
return nil, fmt.Errorf("list pending tasks: %w", err)
}
triedAgents := map[string]struct{}{}
for _, candidate := range tasks {
agentKey := util.UUIDToString(candidate.AgentID)
if _, seen := triedAgents[agentKey]; seen {
continue
}
triedAgents[agentKey] = struct{}{}
task, err := s.ClaimTask(ctx, candidate.AgentID)
if err != nil {
return nil, err
}
if task != nil && task.RuntimeID == runtimeID {
return task, nil
}
}
return nil, nil
}
// StartTask transitions a dispatched task to running.
// Issue status is NOT changed here — the agent manages it via the CLI.
func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
task, err := s.Queries.StartAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("start task: %w", err)
}
slog.Info("task started", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
return &task, nil
}
// CompleteTask marks a task as completed.
// Issue status is NOT changed here — the agent manages it via the CLI.
func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte, sessionID, workDir string) (*db.AgentTaskQueue, error) {
task, err := s.Queries.CompleteAgentTask(ctx, db.CompleteAgentTaskParams{
ID: taskID,
Result: result,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
})
if err != nil {
// Log the current task state to help debug why the update matched no rows.
if existing, lookupErr := s.Queries.GetAgentTask(ctx, taskID); lookupErr == nil {
slog.Warn("complete task failed: task not in running state",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"issue_id", util.UUIDToString(existing.IssueID),
"agent_id", util.UUIDToString(existing.AgentID),
)
} else {
slog.Warn("complete task failed: task not found",
"task_id", util.UUIDToString(taskID),
"lookup_error", lookupErr,
)
}
return nil, fmt.Errorf("complete task: %w", err)
}
slog.Info("task completed", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
// Post agent output as a comment, but only for assignment-triggered tasks.
// Comment-triggered tasks: the agent replies via CLI with --parent, so
// posting here would create a duplicate.
if !task.TriggerCommentID.Valid {
var payload protocol.TaskCompletedPayload
if err := json.Unmarshal(result, &payload); err == nil {
if payload.Output != "" {
s.createAgentComment(ctx, task.IssueID, task.AgentID, redact.Text(payload.Output), "comment", task.TriggerCommentID)
}
}
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(ctx, protocol.EventTaskCompleted, task)
return &task, nil
}
// FailTask marks a task as failed.
// Issue status is NOT changed here — the agent manages it via the CLI.
func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg string) (*db.AgentTaskQueue, error) {
task, err := s.Queries.FailAgentTask(ctx, db.FailAgentTaskParams{
ID: taskID,
Error: pgtype.Text{String: errMsg, Valid: true},
})
if err != nil {
if existing, lookupErr := s.Queries.GetAgentTask(ctx, taskID); lookupErr == nil {
slog.Warn("fail task failed: task not in dispatched/running state",
"task_id", util.UUIDToString(taskID),
"current_status", existing.Status,
"issue_id", util.UUIDToString(existing.IssueID),
"agent_id", util.UUIDToString(existing.AgentID),
)
} else {
slog.Warn("fail task failed: task not found",
"task_id", util.UUIDToString(taskID),
"lookup_error", lookupErr,
)
}
return nil, fmt.Errorf("fail task: %w", err)
}
slog.Warn("task failed", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID), "error", errMsg)
if errMsg != "" {
s.createAgentComment(ctx, task.IssueID, task.AgentID, redact.Text(errMsg), "system", task.TriggerCommentID)
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(ctx, protocol.EventTaskFailed, task)
return &task, nil
}
// ReportProgress broadcasts a progress update via the event bus.
func (s *TaskService) ReportProgress(ctx context.Context, taskID string, workspaceID string, summary string, step, total int) {
s.Bus.Publish(events.Event{
Type: protocol.EventTaskProgress,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: protocol.TaskProgressPayload{
TaskID: taskID,
Summary: summary,
Step: step,
Total: total,
},
})
}
// ReconcileAgentStatus checks running task count and sets agent status accordingly.
func (s *TaskService) ReconcileAgentStatus(ctx context.Context, agentID pgtype.UUID) {
running, err := s.Queries.CountRunningTasks(ctx, agentID)
if err != nil {
return
}
newStatus := "idle"
if running > 0 {
newStatus = "working"
}
slog.Debug("agent status reconciled", "agent_id", util.UUIDToString(agentID), "status", newStatus, "running_tasks", running)
s.updateAgentStatus(ctx, agentID, newStatus)
}
func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID, status string) {
agent, err := s.Queries.UpdateAgentStatus(ctx, db.UpdateAgentStatusParams{
ID: agentID,
Status: status,
})
if err != nil {
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventAgentStatus,
WorkspaceID: util.UUIDToString(agent.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"agent": agentToMap(agent)},
})
}
// LoadAgentSkills loads an agent's skills with their files for task execution.
func (s *TaskService) LoadAgentSkills(ctx context.Context, agentID pgtype.UUID) []AgentSkillData {
skills, err := s.Queries.ListAgentSkills(ctx, agentID)
if err != nil || len(skills) == 0 {
return nil
}
result := make([]AgentSkillData, 0, len(skills))
for _, sk := range skills {
data := AgentSkillData{Name: sk.Name, Content: sk.Content}
files, _ := s.Queries.ListSkillFiles(ctx, sk.ID)
for _, f := range files {
data.Files = append(data.Files, AgentSkillFileData{Path: f.Path, Content: f.Content})
}
result = append(result, data)
}
return result
}
// AgentSkillData represents a skill for task execution responses.
type AgentSkillData struct {
Name string `json:"name"`
Content string `json:"content"`
Files []AgentSkillFileData `json:"files,omitempty"`
}
// AgentSkillFileData represents a supporting file within a skill.
type AgentSkillFileData struct {
Path string `json:"path"`
Content string `json:"content"`
}
func priorityToInt(p string) int32 {
switch p {
case "urgent":
return 4
case "high":
return 3
case "medium":
return 2
case "low":
return 1
default:
return 0
}
}
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {
var payload map[string]any
if task.Context != nil {
json.Unmarshal(task.Context, &payload)
}
if payload == nil {
payload = map[string]any{}
}
payload["task_id"] = util.UUIDToString(task.ID)
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
workspaceID := ""
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
}
if workspaceID == "" {
return // Issue deleted; skip broadcast to avoid global leak
}
s.Bus.Publish(events.Event{
Type: protocol.EventTaskDispatch,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: payload,
})
}
func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) {
workspaceID := ""
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
}
if workspaceID == "" {
return // Issue deleted; skip broadcast to avoid global leak
}
s.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(task.IssueID),
"status": task.Status,
},
})
}
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
prefix := s.getIssuePrefix(issue.WorkspaceID)
s.Bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"issue": issueToMap(issue, prefix)},
})
}
func (s *TaskService) getIssuePrefix(workspaceID pgtype.UUID) string {
ws, err := s.Queries.GetWorkspace(context.Background(), workspaceID)
if err != nil {
return ""
}
return ws.IssuePrefix
}
func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID pgtype.UUID, content, commentType string, parentID pgtype.UUID) {
if content == "" {
return
}
// Look up issue to get workspace ID for mention expansion and broadcasting.
issue, err := s.Queries.GetIssue(ctx, issueID)
if err != nil {
return
}
// Expand bare issue identifiers (e.g. MUL-117) into mention links.
content = mention.ExpandIssueIdentifiers(ctx, s.Queries, issue.WorkspaceID, content)
comment, err := s.Queries.CreateComment(ctx, db.CreateCommentParams{
IssueID: issueID,
AuthorType: "agent",
AuthorID: agentID,
Content: content,
Type: commentType,
ParentID: parentID,
})
if err != nil {
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventCommentCreated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "agent",
ActorID: util.UUIDToString(agentID),
Payload: map[string]any{
"comment": map[string]any{
"id": util.UUIDToString(comment.ID),
"issue_id": util.UUIDToString(comment.IssueID),
"author_type": comment.AuthorType,
"author_id": util.UUIDToString(comment.AuthorID),
"content": comment.Content,
"type": comment.Type,
"parent_id": util.UUIDToPtr(comment.ParentID),
"created_at": comment.CreatedAt.Time.Format("2006-01-02T15:04:05Z"),
},
"issue_title": issue.Title,
"issue_status": issue.Status,
},
})
}
func issueToMap(issue db.Issue, issuePrefix string) map[string]any {
return map[string]any{
"id": util.UUIDToString(issue.ID),
"workspace_id": util.UUIDToString(issue.WorkspaceID),
"number": issue.Number,
"identifier": issuePrefix + "-" + strconv.Itoa(int(issue.Number)),
"title": issue.Title,
"description": util.TextToPtr(issue.Description),
"status": issue.Status,
"priority": issue.Priority,
"assignee_type": util.TextToPtr(issue.AssigneeType),
"assignee_id": util.UUIDToPtr(issue.AssigneeID),
"creator_type": issue.CreatorType,
"creator_id": util.UUIDToString(issue.CreatorID),
"parent_issue_id": util.UUIDToPtr(issue.ParentIssueID),
"position": issue.Position,
"due_date": util.TimestampToPtr(issue.DueDate),
"created_at": util.TimestampToString(issue.CreatedAt),
"updated_at": util.TimestampToString(issue.UpdatedAt),
}
}
// agentToMap builds a simple map for broadcasting agent status updates.
func agentToMap(a db.Agent) map[string]any {
var rc any
if a.RuntimeConfig != nil {
json.Unmarshal(a.RuntimeConfig, &rc)
}
var tools any
if a.Tools != nil {
json.Unmarshal(a.Tools, &tools)
}
var triggers any
if a.Triggers != nil {
json.Unmarshal(a.Triggers, &triggers)
}
return map[string]any{
"id": util.UUIDToString(a.ID),
"workspace_id": util.UUIDToString(a.WorkspaceID),
"runtime_id": util.UUIDToString(a.RuntimeID),
"name": a.Name,
"description": a.Description,
"avatar_url": util.TextToPtr(a.AvatarUrl),
"runtime_mode": a.RuntimeMode,
"runtime_config": rc,
"visibility": a.Visibility,
"status": a.Status,
"max_concurrent_tasks": a.MaxConcurrentTasks,
"owner_id": util.UUIDToPtr(a.OwnerID),
"skills": []any{},
"tools": tools,
"triggers": triggers,
"created_at": util.TimestampToString(a.CreatedAt),
"updated_at": util.TimestampToString(a.UpdatedAt),
"archived_at": util.TimestampToPtr(a.ArchivedAt),
"archived_by": util.UUIDToPtr(a.ArchivedBy),
}
}