feat(server): trigger agents via @mention in comments
When a user @mentions an agent in any issue's comment, the system now enqueues a task for that agent. The agent reads the issue context and replies to the triggering comment thread. Changes: - Add shared util.ParseMentions for mention parsing (used by both comment handler and notification listeners) - Add EnqueueTaskForMention to TaskService for explicit agent targeting - Add on_mention trigger type support in agent trigger config - Add HasPendingTaskForIssueAndAgent SQL query for per-agent dedup - Add enqueueMentionedAgentTasks in CreateComment handler Safety: prevents self-trigger (agent mentioning itself), dedup with assignee on_comment trigger, terminal issue status check, and per-agent pending task dedup.
This commit is contained in:
parent
35f77de9cc
commit
3646ec5a53
7 changed files with 167 additions and 15 deletions
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"regexp"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/handler"
|
||||
|
|
@ -13,15 +12,12 @@ import (
|
|||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// mention represents a parsed @mention from markdown content.
|
||||
// mention represents a parsed @mention from markdown content (local alias).
|
||||
type mention struct {
|
||||
Type string // "member" or "agent"
|
||||
ID string // user_id or agent_id
|
||||
}
|
||||
|
||||
// mentionRe matches [@Label](mention://type/id) in markdown.
|
||||
var mentionRe = regexp.MustCompile(`\[@[^\]]*\]\(mention://(member|agent)/([0-9a-fA-F-]+)\)`)
|
||||
|
||||
// statusLabels maps DB status values to human-readable labels for notifications.
|
||||
var statusLabels = map[string]string{
|
||||
"backlog": "Backlog",
|
||||
|
|
@ -59,17 +55,12 @@ func priorityLabel(p string) string {
|
|||
var emptyDetails = []byte("{}")
|
||||
|
||||
// parseMentions extracts mentions from markdown content.
|
||||
// Delegates to the shared util.ParseMentions and converts to the local type.
|
||||
func parseMentions(content string) []mention {
|
||||
matches := mentionRe.FindAllStringSubmatch(content, -1)
|
||||
seen := make(map[string]bool)
|
||||
var result []mention
|
||||
for _, m := range matches {
|
||||
key := m[1] + ":" + m[2]
|
||||
if seen[key] {
|
||||
continue
|
||||
}
|
||||
seen[key] = true
|
||||
result = append(result, mention{Type: m[1], ID: m[2]})
|
||||
parsed := util.ParseMentions(content)
|
||||
result := make([]mention, len(parsed))
|
||||
for i, m := range parsed {
|
||||
result[i] = mention{Type: m.Type, ID: m.ID}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
|
@ -8,6 +9,7 @@ import (
|
|||
"github.com/go-chi/chi/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/multica-ai/multica/server/internal/logger"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
|
@ -158,9 +160,61 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// Trigger @mentioned agents: parse agent mentions and enqueue tasks for each.
|
||||
h.enqueueMentionedAgentTasks(r.Context(), issue, comment, authorType, authorID)
|
||||
|
||||
writeJSON(w, http.StatusCreated, resp)
|
||||
}
|
||||
|
||||
// enqueueMentionedAgentTasks parses @agent mentions from comment content and
|
||||
// enqueues a task for each mentioned agent. Skips self-mentions, agents that
|
||||
// are already the issue's assignee (handled by on_comment), and agents with
|
||||
// on_mention trigger disabled.
|
||||
func (h *Handler) enqueueMentionedAgentTasks(ctx context.Context, issue db.Issue, comment db.Comment, authorType, authorID string) {
|
||||
// Don't trigger on terminal statuses.
|
||||
if issue.Status == "done" || issue.Status == "cancelled" {
|
||||
return
|
||||
}
|
||||
|
||||
mentions := util.ParseMentions(comment.Content)
|
||||
for _, m := range mentions {
|
||||
if m.Type != "agent" {
|
||||
continue
|
||||
}
|
||||
// Prevent self-trigger: skip if the comment author is this agent.
|
||||
if authorType == "agent" && authorID == m.ID {
|
||||
continue
|
||||
}
|
||||
agentUUID := parseUUID(m.ID)
|
||||
// Prevent duplicate: skip if this agent is the issue's assignee
|
||||
// (already handled by the on_comment trigger above).
|
||||
if issue.AssigneeType.Valid && issue.AssigneeType.String == "agent" &&
|
||||
issue.AssigneeID.Valid && uuidToString(issue.AssigneeID) == m.ID {
|
||||
continue
|
||||
}
|
||||
// Check if the agent has on_mention trigger enabled.
|
||||
if !h.isAgentMentionTriggerEnabled(ctx, agentUUID) {
|
||||
continue
|
||||
}
|
||||
// Dedup: skip if this agent already has a pending task for this issue.
|
||||
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
|
||||
IssueID: issue.ID,
|
||||
AgentID: agentUUID,
|
||||
})
|
||||
if err != nil || hasPending {
|
||||
continue
|
||||
}
|
||||
// Resolve thread root for reply threading.
|
||||
replyTo := comment.ID
|
||||
if comment.ParentID.Valid {
|
||||
replyTo = comment.ParentID
|
||||
}
|
||||
if _, err := h.TaskService.EnqueueTaskForMention(ctx, issue, agentUUID, replyTo); err != nil {
|
||||
slog.Warn("enqueue mention agent task failed", "issue_id", uuidToString(issue.ID), "agent_id", m.ID, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) UpdateComment(w http.ResponseWriter, r *http.Request) {
|
||||
commentId := chi.URLParam(r, "commentId")
|
||||
|
||||
|
|
|
|||
|
|
@ -515,6 +515,30 @@ func (h *Handler) isAgentTriggerEnabled(ctx context.Context, issue db.Issue, tri
|
|||
return false
|
||||
}
|
||||
|
||||
// isAgentMentionTriggerEnabled checks if a specific agent has the on_mention
|
||||
// trigger enabled. Unlike isAgentTriggerEnabled, this takes an explicit agent
|
||||
// ID rather than deriving it from the issue assignee.
|
||||
func (h *Handler) isAgentMentionTriggerEnabled(ctx context.Context, agentID pgtype.UUID) bool {
|
||||
agent, err := h.Queries.GetAgent(ctx, agentID)
|
||||
if err != nil || !agent.RuntimeID.Valid {
|
||||
return false
|
||||
}
|
||||
if agent.Triggers == nil || len(agent.Triggers) == 0 {
|
||||
return true // No config = all triggers enabled by default
|
||||
}
|
||||
|
||||
var triggers []agentTriggerSnapshot
|
||||
if err := json.Unmarshal(agent.Triggers, &triggers); err != nil {
|
||||
return false
|
||||
}
|
||||
for _, trigger := range triggers {
|
||||
if trigger.Type == "on_mention" {
|
||||
return trigger.Enabled
|
||||
}
|
||||
}
|
||||
return true // on_mention not configured = enabled by default
|
||||
}
|
||||
|
||||
func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
issue, ok := h.loadIssueForUser(w, r, id)
|
||||
|
|
|
|||
|
|
@ -69,6 +69,36 @@ func (s *TaskService) EnqueueTaskForIssue(ctx context.Context, issue db.Issue, t
|
|||
return task, nil
|
||||
}
|
||||
|
||||
// EnqueueTaskForMention creates a queued task for a mentioned agent on an issue.
|
||||
// Unlike EnqueueTaskForIssue, this takes an explicit agent ID rather than
|
||||
// deriving it from the issue assignee.
|
||||
func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue, agentID pgtype.UUID, triggerCommentID pgtype.UUID) (db.AgentTaskQueue, error) {
|
||||
agent, err := s.Queries.GetAgent(ctx, agentID)
|
||||
if err != nil {
|
||||
slog.Error("mention task enqueue failed: agent not found", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
|
||||
return db.AgentTaskQueue{}, fmt.Errorf("load agent: %w", err)
|
||||
}
|
||||
if !agent.RuntimeID.Valid {
|
||||
slog.Error("mention task enqueue failed: agent has no runtime", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
|
||||
return db.AgentTaskQueue{}, fmt.Errorf("agent has no runtime")
|
||||
}
|
||||
|
||||
task, err := s.Queries.CreateAgentTask(ctx, db.CreateAgentTaskParams{
|
||||
AgentID: agentID,
|
||||
RuntimeID: agent.RuntimeID,
|
||||
IssueID: issue.ID,
|
||||
Priority: priorityToInt(issue.Priority),
|
||||
TriggerCommentID: triggerCommentID,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("mention task enqueue failed", "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID), "error", err)
|
||||
return db.AgentTaskQueue{}, fmt.Errorf("create task: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("mention task enqueued", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(issue.ID), "agent_id", util.UUIDToString(agentID))
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// CancelTasksForIssue cancels all active tasks for an issue.
|
||||
func (s *TaskService) CancelTasksForIssue(ctx context.Context, issueID pgtype.UUID) error {
|
||||
return s.Queries.CancelAgentTasksByIssue(ctx, issueID)
|
||||
|
|
|
|||
28
server/internal/util/mention.go
Normal file
28
server/internal/util/mention.go
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
package util
|
||||
|
||||
import "regexp"
|
||||
|
||||
// Mention represents a parsed @mention from markdown content.
|
||||
type Mention struct {
|
||||
Type string // "member" or "agent"
|
||||
ID string // user_id or agent_id
|
||||
}
|
||||
|
||||
// MentionRe matches [@Label](mention://type/id) in markdown.
|
||||
var MentionRe = regexp.MustCompile(`\[@[^\]]*\]\(mention://(member|agent)/([0-9a-fA-F-]+)\)`)
|
||||
|
||||
// ParseMentions extracts deduplicated mentions from markdown content.
|
||||
func ParseMentions(content string) []Mention {
|
||||
matches := MentionRe.FindAllStringSubmatch(content, -1)
|
||||
seen := make(map[string]bool)
|
||||
var result []Mention
|
||||
for _, m := range matches {
|
||||
key := m[1] + ":" + m[2]
|
||||
if seen[key] {
|
||||
continue
|
||||
}
|
||||
seen[key] = true
|
||||
result = append(result, Mention{Type: m[1], ID: m[2]})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
@ -458,6 +458,25 @@ func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUI
|
|||
return has_pending, err
|
||||
}
|
||||
|
||||
const hasPendingTaskForIssueAndAgent = `-- name: HasPendingTaskForIssueAndAgent :one
|
||||
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched')
|
||||
`
|
||||
|
||||
type HasPendingTaskForIssueAndAgentParams struct {
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
}
|
||||
|
||||
// Returns true if a specific agent already has a queued or dispatched task
|
||||
// for the given issue. Used by @mention trigger dedup.
|
||||
func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPendingTaskForIssueAndAgentParams) (bool, error) {
|
||||
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgent, arg.IssueID, arg.AgentID)
|
||||
var has_pending bool
|
||||
err := row.Scan(&has_pending)
|
||||
return has_pending, err
|
||||
}
|
||||
|
||||
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
|
||||
|
|
|
|||
|
|
@ -124,6 +124,12 @@ WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running');
|
|||
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('queued', 'dispatched');
|
||||
|
||||
-- name: HasPendingTaskForIssueAndAgent :one
|
||||
-- Returns true if a specific agent already has a queued or dispatched task
|
||||
-- for the given issue. Used by @mention trigger dedup.
|
||||
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched');
|
||||
|
||||
-- name: ListPendingTasksByRuntime :many
|
||||
SELECT * FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue