refactor(server): extract inbox creation to bus listeners, add agent visibility filtering

- Move all CreateInboxItem calls from handlers to centralized inbox_listeners.go
- Enrich issue:updated payload with change context (assignee_changed, status_changed, prev values)
- Enrich comment:created payload with issue context (assignee info)
- Bus listeners handle: issue assign, unassign, reassign, status change, comment notification
- ListAgents filters private agents: only visible to owner_id or workspace admin
- Zero CreateInboxItem calls remain in handler package

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-03-25 11:24:45 +08:00
parent 19504a217c
commit 759dd741bd
7 changed files with 279 additions and 125 deletions

View file

@ -31,8 +31,8 @@
},
{
"task": "Backend: Register event listeners for inbox creation",
"done": false,
"scope": "Deferred: inbox creation stays in handlers/TaskService for now. Will extract to bus listeners when adding new notification triggers (inbox-notifications feature)."
"done": true,
"scope": "Inbox creation extracted to server/cmd/server/inbox_listeners.go. Listeners for issue:created, issue:updated, comment:created create inbox items and publish inbox:new. Handlers enriched with change context in payloads."
},
{
"task": "Backend: Refactor handlers to publish events instead of direct broadcast/inbox",

View file

@ -33,8 +33,8 @@
},
{
"task": "Backend: Agent visibility filtering",
"done": false,
"scope": "Deferred: all agents are workspace-visible for MVP. Private agent filtering not needed yet."
"done": true,
"scope": "ListAgents filters private agents: only visible to agent owner_id or workspace owner/admin. Regular members only see workspace-visible agents."
},
{
"task": "Frontend: Settings page use shadcn components consistently",

View file

@ -0,0 +1,237 @@
package main
import (
"context"
"log"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// registerInboxListeners wires up event bus listeners that create inbox
// notifications. This replaces the inline CreateInboxItem calls that were
// previously scattered across issue and comment handlers.
func registerInboxListeners(bus *events.Bus, queries *db.Queries) {
// issue:created — notify assignee about new assignment
bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
if issue.AssigneeType == nil || issue.AssigneeID == nil {
return
}
item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(issue.WorkspaceID),
RecipientType: *issue.AssigneeType,
RecipientID: parseUUID(*issue.AssigneeID),
Type: "issue_assigned",
Severity: "action_required",
IssueID: parseUUID(issue.ID),
Title: "New issue assigned: " + issue.Title,
Body: util.PtrToText(issue.Description),
})
if err != nil {
log.Printf("[inbox-listener] issue:created inbox error: %v", err)
return
}
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": inboxItemToResponse(item)},
})
})
// issue:updated — notify on assignee change and status change
bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
assigneeChanged, _ := payload["assignee_changed"].(bool)
statusChanged, _ := payload["status_changed"].(bool)
prevAssigneeType, _ := payload["prev_assignee_type"].(*string)
prevAssigneeID, _ := payload["prev_assignee_id"].(*string)
creatorType, _ := payload["creator_type"].(string)
creatorID, _ := payload["creator_id"].(string)
actorID := e.ActorID // the user who made the change
if assigneeChanged {
// Notify old assignee about unassignment
if prevAssigneeType != nil && prevAssigneeID != nil &&
*prevAssigneeType == "member" && *prevAssigneeID != actorID {
oldItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(*prevAssigneeID),
Type: "status_change",
Severity: "info",
IssueID: parseUUID(issue.ID),
Title: "Unassigned from: " + issue.Title,
})
if err == nil {
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": inboxItemToResponse(oldItem)},
})
}
}
// Notify new assignee about assignment
if issue.AssigneeType != nil && issue.AssigneeID != nil {
newItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: *issue.AssigneeType,
RecipientID: parseUUID(*issue.AssigneeID),
Type: "issue_assigned",
Severity: "action_required",
IssueID: parseUUID(issue.ID),
Title: "Assigned to you: " + issue.Title,
})
if err == nil {
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": inboxItemToResponse(newItem)},
})
}
}
}
if statusChanged {
// Notify assignee about status change
if issue.AssigneeType != nil && issue.AssigneeID != nil {
aItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: *issue.AssigneeType,
RecipientID: parseUUID(*issue.AssigneeID),
Type: "status_change",
Severity: "info",
IssueID: parseUUID(issue.ID),
Title: issue.Title + " moved to " + issue.Status,
})
if err == nil {
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": inboxItemToResponse(aItem)},
})
}
}
// Notify creator about status change (if creator is member and != the person making change)
if creatorType == "member" && creatorID != actorID {
// Don't double-notify if creator is also the assignee
isAlsoAssignee := prevAssigneeID != nil && *prevAssigneeID == creatorID
if !isAlsoAssignee {
cItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(creatorID),
Type: "status_change",
Severity: "info",
IssueID: parseUUID(issue.ID),
Title: "Status changed: " + issue.Title,
})
if err == nil {
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": inboxItemToResponse(cItem)},
})
}
}
}
}
})
// comment:created — notify issue assignee about new comment
bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
comment, ok := payload["comment"].(handler.CommentResponse)
if !ok {
return
}
issueTitle, _ := payload["issue_title"].(string)
issueAssigneeType, _ := payload["issue_assignee_type"].(*string)
issueAssigneeID, _ := payload["issue_assignee_id"].(*string)
// Only notify if assignee is a member and is not the commenter
if issueAssigneeType == nil || issueAssigneeID == nil {
return
}
if *issueAssigneeType != "member" || *issueAssigneeID == e.ActorID {
return
}
item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(*issueAssigneeID),
Type: "mentioned",
Severity: "info",
IssueID: parseUUID(comment.IssueID),
Title: "New comment on: " + issueTitle,
Body: util.StrToText(comment.Content),
})
if err != nil {
log.Printf("[inbox-listener] comment:created inbox error: %v", err)
return
}
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": inboxItemToResponse(item)},
})
})
}
// inboxItemToResponse converts a db.InboxItem into a map suitable for
// JSON-serializable event payloads (mirrors handler.inboxToResponse fields).
func inboxItemToResponse(item db.InboxItem) map[string]any {
return map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
}
}

View file

@ -13,6 +13,7 @@ import (
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
func main() {
@ -44,6 +45,9 @@ func main() {
go hub.Run()
registerListeners(bus, hub)
queries := db.New(pool)
registerInboxListeners(bus, queries)
r := NewRouter(pool, hub, bus)
srv := &http.Server{

View file

@ -123,7 +123,8 @@ func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse {
func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
if _, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found"); !ok {
member, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found")
if !ok {
return
}
@ -133,12 +134,22 @@ func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
return
}
resp := make([]AgentResponse, len(agents))
for i, a := range agents {
resp[i] = agentToResponse(a)
userID := requestUserID(r)
isAdmin := roleAllowed(member.Role, "owner", "admin")
// Filter private agents: only visible to owner_id or workspace admin
var visible []AgentResponse
for _, a := range agents {
if a.Visibility == "private" && !isAdmin && uuidToString(a.OwnerID) != userID {
continue
}
visible = append(visible, agentToResponse(a))
}
if visible == nil {
visible = []AgentResponse{}
}
writeJSON(w, http.StatusOK, resp)
writeJSON(w, http.StatusOK, visible)
}
func (h *Handler) GetAgent(w http.ResponseWriter, r *http.Request) {

View file

@ -98,27 +98,12 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
}
resp := commentToResponse(comment)
h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{"comment": resp})
// Notify issue assignee about new comment (if assignee is a member and != commenter)
if issue.AssigneeType.Valid && issue.AssigneeID.Valid &&
issue.AssigneeType.String == "member" && uuidToString(issue.AssigneeID) != userID {
body := req.Content
inboxItem, inboxErr := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
WorkspaceID: issue.WorkspaceID,
RecipientType: "member",
RecipientID: issue.AssigneeID,
Type: "mentioned",
Severity: "info",
IssueID: issue.ID,
Title: "New comment on: " + issue.Title,
Body: ptrToText(&body),
})
if inboxErr == nil {
h.publish(protocol.EventInboxNew, uuidToString(issue.WorkspaceID), "member", userID,
map[string]any{"item": inboxToResponse(inboxItem)})
}
}
h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{
"comment": resp,
"issue_title": issue.Title,
"issue_assignee_type": textToPtr(issue.AssigneeType),
"issue_assignee_id": uuidToPtr(issue.AssigneeID),
})
writeJSON(w, http.StatusCreated, resp)
}

View file

@ -236,23 +236,8 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) {
resp := issueToResponse(issue)
h.publish(protocol.EventIssueCreated, workspaceID, "member", creatorID, map[string]any{"issue": resp})
// Create inbox notification for assignee
// Only ready issues in todo are enqueued for agents.
if issue.AssigneeType.Valid && issue.AssigneeID.Valid {
inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
WorkspaceID: issue.WorkspaceID,
RecipientType: issue.AssigneeType.String,
RecipientID: issue.AssigneeID,
Type: "issue_assigned",
Severity: "action_required",
IssueID: issue.ID,
Title: "New issue assigned: " + issue.Title,
Body: ptrToText(req.Description),
})
if err == nil {
h.publish(protocol.EventInboxNew, workspaceID, "member", creatorID, map[string]any{"item": inboxToResponse(inboxItem)})
}
// Only ready issues in todo are enqueued for agents.
if h.shouldEnqueueAgentTask(r.Context(), issue) {
h.TaskService.EnqueueTaskForIssue(r.Context(), issue)
}
@ -368,12 +353,22 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
}
resp := issueToResponse(issue)
h.publish(protocol.EventIssueUpdated, workspaceID, "member", userID, map[string]any{"issue": resp})
assigneeChanged := (req.AssigneeType != nil || req.AssigneeID != nil) &&
(prevIssue.AssigneeType.String != issue.AssigneeType.String || uuidToString(prevIssue.AssigneeID) != uuidToString(issue.AssigneeID))
statusChanged := req.Status != nil && prevIssue.Status != issue.Status
h.publish(protocol.EventIssueUpdated, workspaceID, "member", userID, map[string]any{
"issue": resp,
"assignee_changed": assigneeChanged,
"status_changed": statusChanged,
"prev_assignee_type": textToPtr(prevIssue.AssigneeType),
"prev_assignee_id": uuidToPtr(prevIssue.AssigneeID),
"prev_status": prevIssue.Status,
"creator_type": prevIssue.CreatorType,
"creator_id": uuidToString(prevIssue.CreatorID),
})
// If assignee or readiness status changed, reconcile the task queue.
if assigneeChanged || statusChanged {
h.TaskService.CancelTasksForIssue(r.Context(), issue.ID)
@ -383,84 +378,6 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
}
}
// If assignee changed, create a notification for the new assignee.
if assigneeChanged {
// Notify old assignee about unassignment
if prevIssue.AssigneeID.Valid && prevIssue.AssigneeType.Valid &&
prevIssue.AssigneeType.String == "member" && uuidToString(prevIssue.AssigneeID) != userID {
oldInbox, oErr := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
WorkspaceID: prevIssue.WorkspaceID,
RecipientType: "member",
RecipientID: prevIssue.AssigneeID,
Type: "status_change",
Severity: "info",
IssueID: prevIssue.ID,
Title: "Unassigned from: " + issue.Title,
Body: ptrToText(nil),
})
if oErr == nil {
h.publish(protocol.EventInboxNew, workspaceID, "member", userID,
map[string]any{"item": inboxToResponse(oldInbox)})
}
}
// Create inbox notification for new assignee
if issue.AssigneeType.Valid && issue.AssigneeID.Valid {
inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
WorkspaceID: issue.WorkspaceID,
RecipientType: issue.AssigneeType.String,
RecipientID: issue.AssigneeID,
Type: "issue_assigned",
Severity: "action_required",
IssueID: issue.ID,
Title: "Assigned to you: " + issue.Title,
})
if err == nil {
h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)})
}
}
}
// If status changed, create a notification
if req.Status != nil {
if issue.AssigneeType.Valid && issue.AssigneeID.Valid {
inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
WorkspaceID: issue.WorkspaceID,
RecipientType: issue.AssigneeType.String,
RecipientID: issue.AssigneeID,
Type: "status_change",
Severity: "info",
IssueID: issue.ID,
Title: issue.Title + " moved to " + *req.Status,
})
if err == nil {
h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)})
}
}
// Notify creator about status change (if creator is member and != the person making change)
if prevIssue.CreatorType == "member" && uuidToString(prevIssue.CreatorID) != userID {
// Don't double-notify if creator is also the assignee
isAlsoAssignee := prevIssue.AssigneeID.Valid && uuidToString(prevIssue.AssigneeID) == uuidToString(prevIssue.CreatorID)
if !isAlsoAssignee {
creatorInbox, cErr := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{
WorkspaceID: prevIssue.WorkspaceID,
RecipientType: "member",
RecipientID: prevIssue.CreatorID,
Type: "status_change",
Severity: "info",
IssueID: prevIssue.ID,
Title: "Status changed: " + issue.Title,
Body: ptrToText(nil),
})
if cErr == nil {
h.publish(protocol.EventInboxNew, workspaceID, "member", userID,
map[string]any{"item": inboxToResponse(creatorInbox)})
}
}
}
}
writeJSON(w, http.StatusOK, resp)
}