diff --git a/_features/infra-event-bus-ws.json b/_features/infra-event-bus-ws.json index 54650cd0..9383e8cb 100644 --- a/_features/infra-event-bus-ws.json +++ b/_features/infra-event-bus-ws.json @@ -31,8 +31,8 @@ }, { "task": "Backend: Register event listeners for inbox creation", - "done": false, - "scope": "Deferred: inbox creation stays in handlers/TaskService for now. Will extract to bus listeners when adding new notification triggers (inbox-notifications feature)." + "done": true, + "scope": "Inbox creation extracted to server/cmd/server/inbox_listeners.go. Listeners for issue:created, issue:updated, comment:created create inbox items and publish inbox:new. Handlers enriched with change context in payloads." }, { "task": "Backend: Refactor handlers to publish events instead of direct broadcast/inbox", diff --git a/_features/workspace-permissions.json b/_features/workspace-permissions.json index 037d2781..bf35a8af 100644 --- a/_features/workspace-permissions.json +++ b/_features/workspace-permissions.json @@ -33,8 +33,8 @@ }, { "task": "Backend: Agent visibility filtering", - "done": false, - "scope": "Deferred: all agents are workspace-visible for MVP. Private agent filtering not needed yet." + "done": true, + "scope": "ListAgents filters private agents: only visible to agent owner_id or workspace owner/admin. Regular members only see workspace-visible agents." }, { "task": "Frontend: Settings page use shadcn components consistently", diff --git a/server/cmd/server/inbox_listeners.go b/server/cmd/server/inbox_listeners.go new file mode 100644 index 00000000..26767ec3 --- /dev/null +++ b/server/cmd/server/inbox_listeners.go @@ -0,0 +1,237 @@ +package main + +import ( + "context" + "log" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// registerInboxListeners wires up event bus listeners that create inbox +// notifications. This replaces the inline CreateInboxItem calls that were +// previously scattered across issue and comment handlers. +func registerInboxListeners(bus *events.Bus, queries *db.Queries) { + // issue:created — notify assignee about new assignment + bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + if issue.AssigneeType == nil || issue.AssigneeID == nil { + return + } + + item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(issue.WorkspaceID), + RecipientType: *issue.AssigneeType, + RecipientID: parseUUID(*issue.AssigneeID), + Type: "issue_assigned", + Severity: "action_required", + IssueID: parseUUID(issue.ID), + Title: "New issue assigned: " + issue.Title, + Body: util.PtrToText(issue.Description), + }) + if err != nil { + log.Printf("[inbox-listener] issue:created inbox error: %v", err) + return + } + + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: e.ActorID, + Payload: map[string]any{"item": inboxItemToResponse(item)}, + }) + }) + + // issue:updated — notify on assignee change and status change + bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + assigneeChanged, _ := payload["assignee_changed"].(bool) + statusChanged, _ := payload["status_changed"].(bool) + prevAssigneeType, _ := payload["prev_assignee_type"].(*string) + prevAssigneeID, _ := payload["prev_assignee_id"].(*string) + creatorType, _ := payload["creator_type"].(string) + creatorID, _ := payload["creator_id"].(string) + + actorID := e.ActorID // the user who made the change + + if assigneeChanged { + // Notify old assignee about unassignment + if prevAssigneeType != nil && prevAssigneeID != nil && + *prevAssigneeType == "member" && *prevAssigneeID != actorID { + oldItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(e.WorkspaceID), + RecipientType: "member", + RecipientID: parseUUID(*prevAssigneeID), + Type: "status_change", + Severity: "info", + IssueID: parseUUID(issue.ID), + Title: "Unassigned from: " + issue.Title, + }) + if err == nil { + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: actorID, + Payload: map[string]any{"item": inboxItemToResponse(oldItem)}, + }) + } + } + + // Notify new assignee about assignment + if issue.AssigneeType != nil && issue.AssigneeID != nil { + newItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(e.WorkspaceID), + RecipientType: *issue.AssigneeType, + RecipientID: parseUUID(*issue.AssigneeID), + Type: "issue_assigned", + Severity: "action_required", + IssueID: parseUUID(issue.ID), + Title: "Assigned to you: " + issue.Title, + }) + if err == nil { + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: actorID, + Payload: map[string]any{"item": inboxItemToResponse(newItem)}, + }) + } + } + } + + if statusChanged { + // Notify assignee about status change + if issue.AssigneeType != nil && issue.AssigneeID != nil { + aItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(e.WorkspaceID), + RecipientType: *issue.AssigneeType, + RecipientID: parseUUID(*issue.AssigneeID), + Type: "status_change", + Severity: "info", + IssueID: parseUUID(issue.ID), + Title: issue.Title + " moved to " + issue.Status, + }) + if err == nil { + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: actorID, + Payload: map[string]any{"item": inboxItemToResponse(aItem)}, + }) + } + } + + // Notify creator about status change (if creator is member and != the person making change) + if creatorType == "member" && creatorID != actorID { + // Don't double-notify if creator is also the assignee + isAlsoAssignee := prevAssigneeID != nil && *prevAssigneeID == creatorID + if !isAlsoAssignee { + cItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(e.WorkspaceID), + RecipientType: "member", + RecipientID: parseUUID(creatorID), + Type: "status_change", + Severity: "info", + IssueID: parseUUID(issue.ID), + Title: "Status changed: " + issue.Title, + }) + if err == nil { + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: actorID, + Payload: map[string]any{"item": inboxItemToResponse(cItem)}, + }) + } + } + } + } + }) + + // comment:created — notify issue assignee about new comment + bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + comment, ok := payload["comment"].(handler.CommentResponse) + if !ok { + return + } + issueTitle, _ := payload["issue_title"].(string) + issueAssigneeType, _ := payload["issue_assignee_type"].(*string) + issueAssigneeID, _ := payload["issue_assignee_id"].(*string) + + // Only notify if assignee is a member and is not the commenter + if issueAssigneeType == nil || issueAssigneeID == nil { + return + } + if *issueAssigneeType != "member" || *issueAssigneeID == e.ActorID { + return + } + + item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(e.WorkspaceID), + RecipientType: "member", + RecipientID: parseUUID(*issueAssigneeID), + Type: "mentioned", + Severity: "info", + IssueID: parseUUID(comment.IssueID), + Title: "New comment on: " + issueTitle, + Body: util.StrToText(comment.Content), + }) + if err != nil { + log.Printf("[inbox-listener] comment:created inbox error: %v", err) + return + } + + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: e.ActorID, + Payload: map[string]any{"item": inboxItemToResponse(item)}, + }) + }) +} + +// inboxItemToResponse converts a db.InboxItem into a map suitable for +// JSON-serializable event payloads (mirrors handler.inboxToResponse fields). +func inboxItemToResponse(item db.InboxItem) map[string]any { + return map[string]any{ + "id": util.UUIDToString(item.ID), + "workspace_id": util.UUIDToString(item.WorkspaceID), + "recipient_type": item.RecipientType, + "recipient_id": util.UUIDToString(item.RecipientID), + "type": item.Type, + "severity": item.Severity, + "issue_id": util.UUIDToPtr(item.IssueID), + "title": item.Title, + "body": util.TextToPtr(item.Body), + "read": item.Read, + "archived": item.Archived, + "created_at": util.TimestampToString(item.CreatedAt), + } +} diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 3b11ec3c..d47290b1 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -13,6 +13,7 @@ import ( "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/realtime" + db "github.com/multica-ai/multica/server/pkg/db/generated" ) func main() { @@ -44,6 +45,9 @@ func main() { go hub.Run() registerListeners(bus, hub) + queries := db.New(pool) + registerInboxListeners(bus, queries) + r := NewRouter(pool, hub, bus) srv := &http.Server{ diff --git a/server/internal/handler/agent.go b/server/internal/handler/agent.go index 46861c0f..8af6176f 100644 --- a/server/internal/handler/agent.go +++ b/server/internal/handler/agent.go @@ -123,7 +123,8 @@ func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse { func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) { workspaceID := resolveWorkspaceID(r) - if _, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found"); !ok { + member, ok := h.requireWorkspaceMember(w, r, workspaceID, "workspace not found") + if !ok { return } @@ -133,12 +134,22 @@ func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) { return } - resp := make([]AgentResponse, len(agents)) - for i, a := range agents { - resp[i] = agentToResponse(a) + userID := requestUserID(r) + isAdmin := roleAllowed(member.Role, "owner", "admin") + + // Filter private agents: only visible to owner_id or workspace admin + var visible []AgentResponse + for _, a := range agents { + if a.Visibility == "private" && !isAdmin && uuidToString(a.OwnerID) != userID { + continue + } + visible = append(visible, agentToResponse(a)) + } + if visible == nil { + visible = []AgentResponse{} } - writeJSON(w, http.StatusOK, resp) + writeJSON(w, http.StatusOK, visible) } func (h *Handler) GetAgent(w http.ResponseWriter, r *http.Request) { diff --git a/server/internal/handler/comment.go b/server/internal/handler/comment.go index 0508c01d..801637d5 100644 --- a/server/internal/handler/comment.go +++ b/server/internal/handler/comment.go @@ -98,27 +98,12 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) { } resp := commentToResponse(comment) - h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{"comment": resp}) - - // Notify issue assignee about new comment (if assignee is a member and != commenter) - if issue.AssigneeType.Valid && issue.AssigneeID.Valid && - issue.AssigneeType.String == "member" && uuidToString(issue.AssigneeID) != userID { - body := req.Content - inboxItem, inboxErr := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ - WorkspaceID: issue.WorkspaceID, - RecipientType: "member", - RecipientID: issue.AssigneeID, - Type: "mentioned", - Severity: "info", - IssueID: issue.ID, - Title: "New comment on: " + issue.Title, - Body: ptrToText(&body), - }) - if inboxErr == nil { - h.publish(protocol.EventInboxNew, uuidToString(issue.WorkspaceID), "member", userID, - map[string]any{"item": inboxToResponse(inboxItem)}) - } - } + h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{ + "comment": resp, + "issue_title": issue.Title, + "issue_assignee_type": textToPtr(issue.AssigneeType), + "issue_assignee_id": uuidToPtr(issue.AssigneeID), + }) writeJSON(w, http.StatusCreated, resp) } diff --git a/server/internal/handler/issue.go b/server/internal/handler/issue.go index dc3aaf73..db824163 100644 --- a/server/internal/handler/issue.go +++ b/server/internal/handler/issue.go @@ -236,23 +236,8 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) { resp := issueToResponse(issue) h.publish(protocol.EventIssueCreated, workspaceID, "member", creatorID, map[string]any{"issue": resp}) - // Create inbox notification for assignee + // Only ready issues in todo are enqueued for agents. if issue.AssigneeType.Valid && issue.AssigneeID.Valid { - inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ - WorkspaceID: issue.WorkspaceID, - RecipientType: issue.AssigneeType.String, - RecipientID: issue.AssigneeID, - Type: "issue_assigned", - Severity: "action_required", - IssueID: issue.ID, - Title: "New issue assigned: " + issue.Title, - Body: ptrToText(req.Description), - }) - if err == nil { - h.publish(protocol.EventInboxNew, workspaceID, "member", creatorID, map[string]any{"item": inboxToResponse(inboxItem)}) - } - - // Only ready issues in todo are enqueued for agents. if h.shouldEnqueueAgentTask(r.Context(), issue) { h.TaskService.EnqueueTaskForIssue(r.Context(), issue) } @@ -368,12 +353,22 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { } resp := issueToResponse(issue) - h.publish(protocol.EventIssueUpdated, workspaceID, "member", userID, map[string]any{"issue": resp}) assigneeChanged := (req.AssigneeType != nil || req.AssigneeID != nil) && (prevIssue.AssigneeType.String != issue.AssigneeType.String || uuidToString(prevIssue.AssigneeID) != uuidToString(issue.AssigneeID)) statusChanged := req.Status != nil && prevIssue.Status != issue.Status + h.publish(protocol.EventIssueUpdated, workspaceID, "member", userID, map[string]any{ + "issue": resp, + "assignee_changed": assigneeChanged, + "status_changed": statusChanged, + "prev_assignee_type": textToPtr(prevIssue.AssigneeType), + "prev_assignee_id": uuidToPtr(prevIssue.AssigneeID), + "prev_status": prevIssue.Status, + "creator_type": prevIssue.CreatorType, + "creator_id": uuidToString(prevIssue.CreatorID), + }) + // If assignee or readiness status changed, reconcile the task queue. if assigneeChanged || statusChanged { h.TaskService.CancelTasksForIssue(r.Context(), issue.ID) @@ -383,84 +378,6 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { } } - // If assignee changed, create a notification for the new assignee. - if assigneeChanged { - // Notify old assignee about unassignment - if prevIssue.AssigneeID.Valid && prevIssue.AssigneeType.Valid && - prevIssue.AssigneeType.String == "member" && uuidToString(prevIssue.AssigneeID) != userID { - oldInbox, oErr := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ - WorkspaceID: prevIssue.WorkspaceID, - RecipientType: "member", - RecipientID: prevIssue.AssigneeID, - Type: "status_change", - Severity: "info", - IssueID: prevIssue.ID, - Title: "Unassigned from: " + issue.Title, - Body: ptrToText(nil), - }) - if oErr == nil { - h.publish(protocol.EventInboxNew, workspaceID, "member", userID, - map[string]any{"item": inboxToResponse(oldInbox)}) - } - } - - // Create inbox notification for new assignee - if issue.AssigneeType.Valid && issue.AssigneeID.Valid { - inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ - WorkspaceID: issue.WorkspaceID, - RecipientType: issue.AssigneeType.String, - RecipientID: issue.AssigneeID, - Type: "issue_assigned", - Severity: "action_required", - IssueID: issue.ID, - Title: "Assigned to you: " + issue.Title, - }) - if err == nil { - h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)}) - } - } - } - - // If status changed, create a notification - if req.Status != nil { - if issue.AssigneeType.Valid && issue.AssigneeID.Valid { - inboxItem, err := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ - WorkspaceID: issue.WorkspaceID, - RecipientType: issue.AssigneeType.String, - RecipientID: issue.AssigneeID, - Type: "status_change", - Severity: "info", - IssueID: issue.ID, - Title: issue.Title + " moved to " + *req.Status, - }) - if err == nil { - h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)}) - } - } - - // Notify creator about status change (if creator is member and != the person making change) - if prevIssue.CreatorType == "member" && uuidToString(prevIssue.CreatorID) != userID { - // Don't double-notify if creator is also the assignee - isAlsoAssignee := prevIssue.AssigneeID.Valid && uuidToString(prevIssue.AssigneeID) == uuidToString(prevIssue.CreatorID) - if !isAlsoAssignee { - creatorInbox, cErr := h.Queries.CreateInboxItem(r.Context(), db.CreateInboxItemParams{ - WorkspaceID: prevIssue.WorkspaceID, - RecipientType: "member", - RecipientID: prevIssue.CreatorID, - Type: "status_change", - Severity: "info", - IssueID: prevIssue.ID, - Title: "Status changed: " + issue.Title, - Body: ptrToText(nil), - }) - if cErr == nil { - h.publish(protocol.EventInboxNew, workspaceID, "member", userID, - map[string]any{"item": inboxToResponse(creatorInbox)}) - } - } - } - } - writeJSON(w, http.StatusOK, resp) }