From bfe9498def067a0ba425cedff744240ae3427a4d Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Sat, 28 Mar 2026 19:33:20 +0800 Subject: [PATCH] feat(notifications): replace hardcoded inbox notifications with subscriber-driven model Replace inbox_listeners.go with a subscriber-driven notification system: - Add issue_subscriber table with auto-subscribe on create/assign/comment - New subscriber_listeners.go: maintains subscriber data on domain events - New notification_listeners.go: notifySubscribers (fanout to all subscribers minus actor) and notifyDirect (targeted, punches through unsubscribe) - Subscriber API: list/subscribe/unsubscribe endpoints - Frontend: subscribers section in issue detail sidebar with real-time sync - Frontend: inbox notification grouping by (issue_id, type, actor_id) - Remove createInboxForIssueCreator from task.go (unified through event bus) - 21 new Go tests, all passing Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/web/app/(dashboard)/inbox/page.tsx | 30 +- .../app/(dashboard)/issues/[id]/page.test.tsx | 3 + .../issues/components/issue-detail.tsx | 105 +++- apps/web/shared/api/client.ts | 14 + apps/web/shared/types/events.ts | 17 +- apps/web/shared/types/index.ts | 1 + apps/web/shared/types/subscriber.ts | 7 + server/cmd/server/inbox_listeners.go | 377 ------------ server/cmd/server/listeners.go | 2 + server/cmd/server/main.go | 3 +- server/cmd/server/notification_listeners.go | 458 +++++++++++++++ .../cmd/server/notification_listeners_test.go | 552 ++++++++++++++++++ server/cmd/server/router.go | 3 + server/cmd/server/subscriber_listeners.go | 116 ++++ .../cmd/server/subscriber_listeners_test.go | 377 ++++++++++++ server/internal/handler/subscriber.go | 110 ++++ server/internal/handler/subscriber_test.go | 208 +++++++ server/internal/service/task.go | 59 +- .../migrations/015_issue_subscriber.down.sql | 1 + server/migrations/015_issue_subscriber.up.sql | 11 + .../016_backfill_subscribers.down.sql | 1 + .../016_backfill_subscribers.up.sql | 12 + server/pkg/db/generated/models.go | 8 + server/pkg/db/generated/subscriber.sql.go | 103 ++++ server/pkg/db/queries/subscriber.sql | 19 + server/pkg/protocol/events.go | 4 + 26 files changed, 2144 insertions(+), 457 deletions(-) create mode 100644 apps/web/shared/types/subscriber.ts delete mode 100644 server/cmd/server/inbox_listeners.go create mode 100644 server/cmd/server/notification_listeners.go create mode 100644 server/cmd/server/notification_listeners_test.go create mode 100644 server/cmd/server/subscriber_listeners.go create mode 100644 server/cmd/server/subscriber_listeners_test.go create mode 100644 server/internal/handler/subscriber.go create mode 100644 server/internal/handler/subscriber_test.go create mode 100644 server/migrations/015_issue_subscriber.down.sql create mode 100644 server/migrations/015_issue_subscriber.up.sql create mode 100644 server/migrations/016_backfill_subscribers.down.sql create mode 100644 server/migrations/016_backfill_subscribers.up.sql create mode 100644 server/pkg/db/generated/subscriber.sql.go create mode 100644 server/pkg/db/queries/subscriber.sql diff --git a/apps/web/app/(dashboard)/inbox/page.tsx b/apps/web/app/(dashboard)/inbox/page.tsx index fc28a08f..41adf23f 100644 --- a/apps/web/app/(dashboard)/inbox/page.tsx +++ b/apps/web/app/(dashboard)/inbox/page.tsx @@ -131,15 +131,31 @@ export default function InboxPage() { id: "multica_inbox_layout", }); - // Sort: severity first, then newest first + // Group by (issue_id, type, actor_id) and take the latest from each group const items = useMemo(() => { - return [...storeItems] - .filter((i) => !i.archived) - .sort( - (a, b) => - severityOrder[a.severity] - severityOrder[b.severity] || - new Date(b.created_at).getTime() - new Date(a.created_at).getTime() + const active = storeItems.filter((i) => !i.archived); + const groups = new Map(); + active.forEach((item) => { + const key = `${item.issue_id ?? "none"}|${item.type}|${item.actor_id ?? "none"}`; + const group = groups.get(key) ?? []; + group.push(item); + groups.set(key, group); + }); + + const merged: InboxItem[] = []; + groups.forEach((group) => { + const sorted = group.sort( + (a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime() ); + const latest = sorted[0]; + if (latest) merged.push(latest); + }); + + return merged.sort( + (a, b) => + severityOrder[a.severity] - severityOrder[b.severity] || + new Date(b.created_at).getTime() - new Date(a.created_at).getTime() + ); }, [storeItems]); const selected = items.find((i) => i.id === selectedId) ?? null; diff --git a/apps/web/app/(dashboard)/issues/[id]/page.test.tsx b/apps/web/app/(dashboard)/issues/[id]/page.test.tsx index 36a2dba3..af2b57fd 100644 --- a/apps/web/app/(dashboard)/issues/[id]/page.test.tsx +++ b/apps/web/app/(dashboard)/issues/[id]/page.test.tsx @@ -124,6 +124,9 @@ vi.mock("@/shared/api", () => ({ deleteComment: (...args: any[]) => mockDeleteComment(...args), deleteIssue: (...args: any[]) => mockDeleteIssue(...args), updateIssue: (...args: any[]) => mockUpdateIssue(...args), + listIssueSubscribers: vi.fn().mockResolvedValue([]), + subscribeToIssue: vi.fn().mockResolvedValue(undefined), + unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined), }, })); diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx index a205d5c0..5bb8b606 100644 --- a/apps/web/features/issues/components/issue-detail.tsx +++ b/apps/web/features/issues/components/issue-detail.tsx @@ -8,7 +8,6 @@ import { ArrowUp, Bot, Calendar, - ChevronDown, ChevronLeft, ChevronRight, Link2, @@ -55,7 +54,7 @@ import { TooltipContent, } from "@/components/ui/tooltip"; import { ActorAvatar } from "@/components/common/actor-avatar"; -import type { Issue, Comment, UpdateIssueRequest, IssueStatus, IssuePriority } from "@/shared/types"; +import type { Issue, Comment, IssueSubscriber, UpdateIssueRequest, IssueStatus, IssuePriority } from "@/shared/types"; import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/features/issues/config"; import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components"; import { api } from "@/shared/api"; @@ -63,7 +62,7 @@ import { useAuthStore } from "@/features/auth"; import { useWorkspaceStore, useActorName } from "@/features/workspace"; import { useWSEvent } from "@/features/realtime"; import { useIssueStore } from "@/features/issues"; -import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload } from "@/shared/types"; +import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload, SubscriberAddedPayload, SubscriberRemovedPayload } from "@/shared/types"; // --------------------------------------------------------------------------- // Helpers @@ -144,6 +143,7 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { const [sidebarOpen, setSidebarOpen] = useState(true); const [issue, setIssue] = useState(null); const [comments, setComments] = useState([]); + const [subscribers, setSubscribers] = useState([]); const [loading, setLoading] = useState(true); const [commentEmpty, setCommentEmpty] = useState(true); const commentEditorRef = useRef(null); @@ -154,6 +154,8 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { const [editingTitle, setEditingTitle] = useState(false); const [titleDraft, setTitleDraft] = useState(""); const [deleteDialogOpen, setDeleteDialogOpen] = useState(false); + const [propertiesOpen, setPropertiesOpen] = useState(true); + const [detailsOpen, setDetailsOpen] = useState(true); // Watch the global issue store for real-time updates from other users/agents const storeIssue = useIssueStore((s) => s.issues.find((i) => i.id === id)); @@ -167,11 +169,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { useEffect(() => { setIssue(null); setComments([]); + setSubscribers([]); setLoading(true); - Promise.all([api.getIssue(id), api.listComments(id)]) - .then(([iss, cmts]) => { + Promise.all([api.getIssue(id), api.listComments(id), api.listIssueSubscribers(id)]) + .then(([iss, cmts, subs]) => { setIssue(iss); setComments(cmts); + setSubscribers(subs); }) .catch(console.error) .finally(() => setLoading(false)); @@ -257,6 +261,29 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { } }; + // Subscriber state + const isSubscribed = subscribers.some( + (s) => s.user_type === "member" && s.user_id === user?.id + ); + + const handleToggleSubscribe = async () => { + if (!user || !issue) return; + try { + if (isSubscribed) { + await api.unsubscribeFromIssue(id); + setSubscribers((prev) => prev.filter((s) => s.user_id !== user.id)); + } else { + await api.subscribeToIssue(id); + setSubscribers((prev) => [ + ...prev, + { issue_id: id, user_type: "member" as const, user_id: user.id, reason: "manual" as const, created_at: new Date().toISOString() }, + ]); + } + } catch { + // silently fail + } + }; + // Real-time comment updates useWSEvent( "comment:created", @@ -292,6 +319,34 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { }, [id]), ); + // Real-time subscriber updates + useWSEvent( + "subscriber:added", + useCallback((payload: unknown) => { + const p = payload as SubscriberAddedPayload; + if (p.issue_id !== id) return; + setSubscribers((prev) => { + if (prev.some((s) => s.user_id === p.user_id)) return prev; + return [...prev, { + issue_id: p.issue_id, + user_type: p.user_type as "member" | "agent", + user_id: p.user_id, + reason: p.reason as IssueSubscriber["reason"], + created_at: new Date().toISOString(), + }]; + }); + }, [id]), + ); + + useWSEvent( + "subscriber:removed", + useCallback((payload: unknown) => { + const p = payload as SubscriberRemovedPayload; + if (p.issue_id !== id) return; + setSubscribers((prev) => prev.filter((s) => s.user_id !== p.user_id)); + }, [id]), + ); + if (loading) { return (
@@ -742,14 +797,14 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { {/* Properties section */}
-
+ {propertiesOpen &&
{/* Status */} @@ -855,20 +910,20 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { onUpdate={handleUpdateField} /> -
+
}
{/* Details section */}
-
+ {detailsOpen &&
{shortDate(issue.updated_at)} +
} +
+ + {/* Subscribers section */} +
+

Subscribers

+
+ {subscribers.map((sub) => ( +
+ + {getActorName(sub.user_type, sub.user_id)} + ({sub.reason}) +
+ ))}
+
diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index e28cf9d5..8420177a 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -15,6 +15,7 @@ import type { DaemonPairingSession, ApproveDaemonPairingSessionRequest, InboxItem, + IssueSubscriber, Comment, Workspace, WorkspaceRepo, @@ -200,6 +201,19 @@ export class ApiClient { await this.fetch(`/api/comments/${commentId}`, { method: "DELETE" }); } + // Subscribers + async listIssueSubscribers(issueId: string): Promise { + return this.fetch(`/api/issues/${issueId}/subscribers`); + } + + async subscribeToIssue(issueId: string): Promise { + await this.fetch(`/api/issues/${issueId}/subscribe`, { method: "POST" }); + } + + async unsubscribeFromIssue(issueId: string): Promise { + await this.fetch(`/api/issues/${issueId}/unsubscribe`, { method: "POST" }); + } + // Agents async listAgents(params?: { workspace_id?: string }): Promise { const search = new URLSearchParams(); diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts index 51d1b8ad..a0626e2d 100644 --- a/apps/web/shared/types/events.ts +++ b/apps/web/shared/types/events.ts @@ -33,7 +33,9 @@ export type WSEventType = | "daemon:register" | "skill:created" | "skill:updated" - | "skill:deleted"; + | "skill:deleted" + | "subscriber:added" + | "subscriber:removed"; export interface WSMessage { type: WSEventType; @@ -124,3 +126,16 @@ export interface MemberRemovedPayload { user_id: string; workspace_id: string; } + +export interface SubscriberAddedPayload { + issue_id: string; + user_type: string; + user_id: string; + reason: string; +} + +export interface SubscriberRemovedPayload { + issue_id: string; + user_type: string; + user_id: string; +} diff --git a/apps/web/shared/types/index.ts b/apps/web/shared/types/index.ts index c3b1d72a..61e97fb4 100644 --- a/apps/web/shared/types/index.ts +++ b/apps/web/shared/types/index.ts @@ -24,6 +24,7 @@ export type { export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox"; export type { Comment, CommentType, CommentAuthorType } from "./comment"; +export type { IssueSubscriber } from "./subscriber"; export type { DaemonPairingSession, DaemonPairingSessionStatus, ApproveDaemonPairingSessionRequest } from "./daemon"; export type * from "./events"; export type * from "./api"; diff --git a/apps/web/shared/types/subscriber.ts b/apps/web/shared/types/subscriber.ts new file mode 100644 index 00000000..09ab40ea --- /dev/null +++ b/apps/web/shared/types/subscriber.ts @@ -0,0 +1,7 @@ +export interface IssueSubscriber { + issue_id: string; + user_type: "member" | "agent"; + user_id: string; + reason: "creator" | "assignee" | "commenter" | "mentioned" | "manual"; + created_at: string; +} diff --git a/server/cmd/server/inbox_listeners.go b/server/cmd/server/inbox_listeners.go deleted file mode 100644 index 7c8a2dce..00000000 --- a/server/cmd/server/inbox_listeners.go +++ /dev/null @@ -1,377 +0,0 @@ -package main - -import ( - "context" - "log/slog" - "regexp" - - "github.com/multica-ai/multica/server/internal/events" - "github.com/multica-ai/multica/server/internal/handler" - "github.com/multica-ai/multica/server/internal/util" - db "github.com/multica-ai/multica/server/pkg/db/generated" - "github.com/multica-ai/multica/server/pkg/protocol" -) - -// mention represents a parsed @mention from markdown content. -type mention struct { - Type string // "member" or "agent" - ID string // user_id or agent_id -} - -// mentionRe matches [@Label](mention://type/id) in markdown. -var mentionRe = regexp.MustCompile(`\[@[^\]]*\]\(mention://(member|agent)/([0-9a-fA-F-]+)\)`) - -// parseMentions extracts mentions from markdown content. -func parseMentions(content string) []mention { - matches := mentionRe.FindAllStringSubmatch(content, -1) - seen := make(map[string]bool) - var result []mention - for _, m := range matches { - key := m[1] + ":" + m[2] - if seen[key] { - continue - } - seen[key] = true - result = append(result, mention{Type: m[1], ID: m[2]}) - } - return result -} - -// notifyMentionedMembers creates inbox items for each @mentioned member, -// excluding the actor and any IDs in the skip set. -func notifyMentionedMembers( - bus *events.Bus, - queries *db.Queries, - e events.Event, - mentions []mention, - issueID string, - issueTitle string, - issueStatus string, - title string, - skip map[string]bool, -) { - for _, m := range mentions { - if m.Type != "member" { - continue - } - if m.ID == e.ActorID || skip[m.ID] { - continue - } - item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(e.WorkspaceID), - RecipientType: "member", - RecipientID: parseUUID(m.ID), - Type: "mentioned", - Severity: "info", - IssueID: parseUUID(issueID), - Title: title, - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err != nil { - slog.Error("mention inbox creation failed", "mentioned_id", m.ID, "error", err) - continue - } - resp := inboxItemToResponse(item) - resp["issue_status"] = issueStatus - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: e.ActorID, - Payload: map[string]any{"item": resp}, - }) - } -} - -// registerInboxListeners wires up event bus listeners that create inbox -// notifications. This replaces the inline CreateInboxItem calls that were -// previously scattered across issue and comment handlers. -func registerInboxListeners(bus *events.Bus, queries *db.Queries) { - // issue:created — notify assignee about new assignment + @mentions in description - bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) { - payload, ok := e.Payload.(map[string]any) - if !ok { - return - } - issue, ok := payload["issue"].(handler.IssueResponse) - if !ok { - return - } - - // Track who already got notified to avoid duplicates - skip := map[string]bool{e.ActorID: true} - - // Notify assignee - if issue.AssigneeType != nil && issue.AssigneeID != nil { - skip[*issue.AssigneeID] = true - item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(issue.WorkspaceID), - RecipientType: *issue.AssigneeType, - RecipientID: parseUUID(*issue.AssigneeID), - Type: "issue_assigned", - Severity: "action_required", - IssueID: parseUUID(issue.ID), - Title: "New issue assigned: " + issue.Title, - Body: util.PtrToText(issue.Description), - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err != nil { - slog.Error("inbox item creation failed", "event", "issue:created", "error", err) - } else { - resp := inboxItemToResponse(item) - resp["issue_status"] = issue.Status - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: e.ActorID, - Payload: map[string]any{"item": resp}, - }) - } - } - - // Notify @mentions in description - if issue.Description != nil && *issue.Description != "" { - mentions := parseMentions(*issue.Description) - notifyMentionedMembers(bus, queries, e, mentions, issue.ID, issue.Title, issue.Status, - "Mentioned in: "+issue.Title, skip) - } - }) - - // issue:updated — notify on assignee change and status change - bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) { - payload, ok := e.Payload.(map[string]any) - if !ok { - return - } - issue, ok := payload["issue"].(handler.IssueResponse) - if !ok { - return - } - assigneeChanged, _ := payload["assignee_changed"].(bool) - statusChanged, _ := payload["status_changed"].(bool) - descriptionChanged, _ := payload["description_changed"].(bool) - prevAssigneeType, _ := payload["prev_assignee_type"].(*string) - prevAssigneeID, _ := payload["prev_assignee_id"].(*string) - prevDescription, _ := payload["prev_description"].(*string) - creatorType, _ := payload["creator_type"].(string) - creatorID, _ := payload["creator_id"].(string) - - actorID := e.ActorID // the user who made the change - - if assigneeChanged { - // Notify old assignee about unassignment - if prevAssigneeType != nil && prevAssigneeID != nil && - *prevAssigneeType == "member" && *prevAssigneeID != actorID { - oldItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(e.WorkspaceID), - RecipientType: "member", - RecipientID: parseUUID(*prevAssigneeID), - Type: "status_change", - Severity: "info", - IssueID: parseUUID(issue.ID), - Title: "Unassigned from: " + issue.Title, - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err == nil { - oldResp := inboxItemToResponse(oldItem) - oldResp["issue_status"] = issue.Status - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: actorID, - Payload: map[string]any{"item": oldResp}, - }) - } - } - - // Notify new assignee about assignment - if issue.AssigneeType != nil && issue.AssigneeID != nil { - newItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(e.WorkspaceID), - RecipientType: *issue.AssigneeType, - RecipientID: parseUUID(*issue.AssigneeID), - Type: "issue_assigned", - Severity: "action_required", - IssueID: parseUUID(issue.ID), - Title: "Assigned to you: " + issue.Title, - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err == nil { - newResp := inboxItemToResponse(newItem) - newResp["issue_status"] = issue.Status - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: actorID, - Payload: map[string]any{"item": newResp}, - }) - } - } - } - - if statusChanged { - // Notify assignee about status change - if issue.AssigneeType != nil && issue.AssigneeID != nil { - aItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(e.WorkspaceID), - RecipientType: *issue.AssigneeType, - RecipientID: parseUUID(*issue.AssigneeID), - Type: "status_change", - Severity: "info", - IssueID: parseUUID(issue.ID), - Title: issue.Title + " moved to " + issue.Status, - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err == nil { - aResp := inboxItemToResponse(aItem) - aResp["issue_status"] = issue.Status - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: actorID, - Payload: map[string]any{"item": aResp}, - }) - } - } - - // Notify creator about status change (if creator is member and != the person making change) - if creatorType == "member" && creatorID != actorID { - // Don't double-notify if creator is also the assignee - isAlsoAssignee := prevAssigneeID != nil && *prevAssigneeID == creatorID - if !isAlsoAssignee { - cItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(e.WorkspaceID), - RecipientType: "member", - RecipientID: parseUUID(creatorID), - Type: "status_change", - Severity: "info", - IssueID: parseUUID(issue.ID), - Title: "Status changed: " + issue.Title, - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err == nil { - cResp := inboxItemToResponse(cItem) - cResp["issue_status"] = issue.Status - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: actorID, - Payload: map[string]any{"item": cResp}, - }) - } - } - } - } - - // Notify NEW @mentions in description (only mentions that weren't in previous description) - if descriptionChanged && issue.Description != nil { - newMentions := parseMentions(*issue.Description) - if len(newMentions) > 0 { - // Build set of previously mentioned IDs - prevMentioned := map[string]bool{} - if prevDescription != nil { - for _, m := range parseMentions(*prevDescription) { - prevMentioned[m.Type+":"+m.ID] = true - } - } - // Filter to only new mentions - var added []mention - for _, m := range newMentions { - if !prevMentioned[m.Type+":"+m.ID] { - added = append(added, m) - } - } - skip := map[string]bool{actorID: true} - notifyMentionedMembers(bus, queries, e, added, issue.ID, issue.Title, issue.Status, - "Mentioned in: "+issue.Title, skip) - } - } - }) - - // comment:created — notify issue assignee + @mentions in comment - bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) { - payload, ok := e.Payload.(map[string]any) - if !ok { - return - } - comment, ok := payload["comment"].(handler.CommentResponse) - if !ok { - return - } - issueTitle, _ := payload["issue_title"].(string) - issueAssigneeType, _ := payload["issue_assignee_type"].(*string) - issueAssigneeID, _ := payload["issue_assignee_id"].(*string) - issueStatus, _ := payload["issue_status"].(string) - - // Track who already got notified - skip := map[string]bool{e.ActorID: true} - - // Notify assignee (if member and not the commenter) - if issueAssigneeType != nil && issueAssigneeID != nil && - *issueAssigneeType == "member" && *issueAssigneeID != e.ActorID { - skip[*issueAssigneeID] = true - item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ - WorkspaceID: parseUUID(e.WorkspaceID), - RecipientType: "member", - RecipientID: parseUUID(*issueAssigneeID), - Type: "mentioned", - Severity: "info", - IssueID: parseUUID(comment.IssueID), - Title: "New comment on: " + issueTitle, - Body: util.StrToText(comment.Content), - ActorType: util.StrToText(e.ActorType), - ActorID: parseUUID(e.ActorID), - }) - if err != nil { - slog.Error("inbox item creation failed", "event", "comment:created", "error", err) - } else { - commentResp := inboxItemToResponse(item) - commentResp["issue_status"] = issueStatus - bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: e.WorkspaceID, - ActorType: e.ActorType, - ActorID: e.ActorID, - Payload: map[string]any{"item": commentResp}, - }) - } - } - - // Notify @mentions in comment content - mentions := parseMentions(comment.Content) - notifyMentionedMembers(bus, queries, e, mentions, comment.IssueID, issueTitle, issueStatus, - "Mentioned in comment: "+issueTitle, skip) - }) -} - -// inboxItemToResponse converts a db.InboxItem into a map suitable for -// JSON-serializable event payloads (mirrors handler.inboxToResponse fields). -func inboxItemToResponse(item db.InboxItem) map[string]any { - return map[string]any{ - "id": util.UUIDToString(item.ID), - "workspace_id": util.UUIDToString(item.WorkspaceID), - "recipient_type": item.RecipientType, - "recipient_id": util.UUIDToString(item.RecipientID), - "type": item.Type, - "severity": item.Severity, - "issue_id": util.UUIDToPtr(item.IssueID), - "title": item.Title, - "body": util.TextToPtr(item.Body), - "read": item.Read, - "archived": item.Archived, - "created_at": util.TimestampToString(item.CreatedAt), - "actor_type": util.TextToPtr(item.ActorType), - "actor_id": util.UUIDToPtr(item.ActorID), - } -} diff --git a/server/cmd/server/listeners.go b/server/cmd/server/listeners.go index 3b5bc316..cddc60b5 100644 --- a/server/cmd/server/listeners.go +++ b/server/cmd/server/listeners.go @@ -35,6 +35,8 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) { protocol.EventMemberAdded, protocol.EventMemberUpdated, protocol.EventMemberRemoved, + protocol.EventSubscriberAdded, + protocol.EventSubscriberRemoved, } for _, et := range allEvents { diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 5ed6d901..74fead71 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -51,7 +51,8 @@ func main() { registerListeners(bus, hub) queries := db.New(pool) - registerInboxListeners(bus, queries) + registerSubscriberListeners(bus, queries) + registerNotificationListeners(bus, queries) r := NewRouter(pool, hub, bus) diff --git a/server/cmd/server/notification_listeners.go b/server/cmd/server/notification_listeners.go new file mode 100644 index 00000000..59f01f9d --- /dev/null +++ b/server/cmd/server/notification_listeners.go @@ -0,0 +1,458 @@ +package main + +import ( + "context" + "log/slog" + "regexp" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// mention represents a parsed @mention from markdown content. +type mention struct { + Type string // "member" or "agent" + ID string // user_id or agent_id +} + +// mentionRe matches [@Label](mention://type/id) in markdown. +var mentionRe = regexp.MustCompile(`\[@[^\]]*\]\(mention://(member|agent)/([0-9a-fA-F-]+)\)`) + +// parseMentions extracts mentions from markdown content. +func parseMentions(content string) []mention { + matches := mentionRe.FindAllStringSubmatch(content, -1) + seen := make(map[string]bool) + var result []mention + for _, m := range matches { + key := m[1] + ":" + m[2] + if seen[key] { + continue + } + seen[key] = true + result = append(result, mention{Type: m[1], ID: m[2]}) + } + return result +} + +// notifySubscribers queries the subscriber table for an issue, excludes the +// actor and any extra IDs, and creates inbox items for each remaining member +// subscriber. Publishes an inbox:new event for each notification. +func notifySubscribers( + ctx context.Context, + queries *db.Queries, + bus *events.Bus, + issueID string, + workspaceID string, + e events.Event, + exclude map[string]bool, + notifType string, + severity string, + title string, + body string, +) { + subs, err := queries.ListIssueSubscribers(ctx, parseUUID(issueID)) + if err != nil { + slog.Error("failed to list subscribers for notification", + "issue_id", issueID, "error", err) + return + } + + for _, sub := range subs { + // Only notify member-type subscribers (not agents) + if sub.UserType != "member" { + continue + } + + subID := util.UUIDToString(sub.UserID) + + // Skip the actor + if subID == e.ActorID { + continue + } + + // Skip any extra excluded IDs + if exclude[subID] { + continue + } + + item, err := queries.CreateInboxItem(ctx, db.CreateInboxItemParams{ + WorkspaceID: parseUUID(workspaceID), + RecipientType: "member", + RecipientID: sub.UserID, + Type: notifType, + Severity: severity, + IssueID: parseUUID(issueID), + Title: title, + Body: util.StrToText(body), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + }) + if err != nil { + slog.Error("subscriber notification creation failed", + "subscriber_id", subID, "type", notifType, "error", err) + continue + } + + resp := inboxItemToResponse(item) + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: workspaceID, + ActorType: e.ActorType, + ActorID: e.ActorID, + Payload: map[string]any{"item": resp}, + }) + } +} + +// notifyDirect creates an inbox item for a specific recipient. Skips if the +// recipient is the actor. Publishes an inbox:new event on success. +func notifyDirect( + ctx context.Context, + queries *db.Queries, + bus *events.Bus, + recipientType string, + recipientID string, + workspaceID string, + e events.Event, + issueID string, + notifType string, + severity string, + title string, + body string, +) { + // Skip if recipient is the actor + if recipientID == e.ActorID { + return + } + + item, err := queries.CreateInboxItem(ctx, db.CreateInboxItemParams{ + WorkspaceID: parseUUID(workspaceID), + RecipientType: recipientType, + RecipientID: parseUUID(recipientID), + Type: notifType, + Severity: severity, + IssueID: parseUUID(issueID), + Title: title, + Body: util.StrToText(body), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + }) + if err != nil { + slog.Error("direct notification creation failed", + "recipient_id", recipientID, "type", notifType, "error", err) + return + } + + resp := inboxItemToResponse(item) + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: workspaceID, + ActorType: e.ActorType, + ActorID: e.ActorID, + Payload: map[string]any{"item": resp}, + }) +} + +// notifyMentionedMembers creates inbox items for each @mentioned member, +// excluding the actor and any IDs in the skip set. +func notifyMentionedMembers( + bus *events.Bus, + queries *db.Queries, + e events.Event, + mentions []mention, + issueID string, + issueTitle string, + issueStatus string, + title string, + skip map[string]bool, +) { + for _, m := range mentions { + if m.Type != "member" { + continue + } + if m.ID == e.ActorID || skip[m.ID] { + continue + } + item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{ + WorkspaceID: parseUUID(e.WorkspaceID), + RecipientType: "member", + RecipientID: parseUUID(m.ID), + Type: "mentioned", + Severity: "info", + IssueID: parseUUID(issueID), + Title: title, + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + }) + if err != nil { + slog.Error("mention inbox creation failed", "mentioned_id", m.ID, "error", err) + continue + } + resp := inboxItemToResponse(item) + resp["issue_status"] = issueStatus + bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: e.WorkspaceID, + ActorType: e.ActorType, + ActorID: e.ActorID, + Payload: map[string]any{"item": resp}, + }) + } +} + +// registerNotificationListeners wires up event bus listeners that create inbox +// notifications using the subscriber table. This replaces the old hardcoded +// notification logic from inbox_listeners.go. +func registerNotificationListeners(bus *events.Bus, queries *db.Queries) { + ctx := context.Background() + + // issue:created — Direct notification to assignee if assignee != actor + bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + + // Track who already got notified to avoid duplicates + skip := map[string]bool{e.ActorID: true} + + // Direct notification to assignee + if issue.AssigneeType != nil && issue.AssigneeID != nil { + skip[*issue.AssigneeID] = true + notifyDirect(ctx, queries, bus, + *issue.AssigneeType, *issue.AssigneeID, + issue.WorkspaceID, e, issue.ID, + "issue_assigned", "action_required", + "New issue assigned: "+issue.Title, + "", + ) + } + + // Notify @mentions in description + if issue.Description != nil && *issue.Description != "" { + mentions := parseMentions(*issue.Description) + notifyMentionedMembers(bus, queries, e, mentions, issue.ID, issue.Title, issue.Status, + "Mentioned in: "+issue.Title, skip) + } + }) + + // issue:updated — handle assignee changes and status changes + bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + assigneeChanged, _ := payload["assignee_changed"].(bool) + statusChanged, _ := payload["status_changed"].(bool) + descriptionChanged, _ := payload["description_changed"].(bool) + prevAssigneeType, _ := payload["prev_assignee_type"].(*string) + prevAssigneeID, _ := payload["prev_assignee_id"].(*string) + prevDescription, _ := payload["prev_description"].(*string) + + if assigneeChanged { + // Direct: notify new assignee about assignment + if issue.AssigneeType != nil && issue.AssigneeID != nil { + notifyDirect(ctx, queries, bus, + *issue.AssigneeType, *issue.AssigneeID, + e.WorkspaceID, e, issue.ID, + "issue_assigned", "action_required", + "Assigned to you: "+issue.Title, + "", + ) + } + + // Direct: notify old assignee about unassignment + if prevAssigneeType != nil && prevAssigneeID != nil && *prevAssigneeType == "member" { + notifyDirect(ctx, queries, bus, + "member", *prevAssigneeID, + e.WorkspaceID, e, issue.ID, + "unassigned", "info", + "Unassigned from: "+issue.Title, + "", + ) + } + + // Subscriber: notify remaining subscribers about assignee change, + // excluding actor, old assignee, and new assignee + exclude := map[string]bool{} + if prevAssigneeID != nil { + exclude[*prevAssigneeID] = true + } + if issue.AssigneeID != nil { + exclude[*issue.AssigneeID] = true + } + notifySubscribers(ctx, queries, bus, issue.ID, e.WorkspaceID, e, + exclude, "assignee_changed", "info", + "Assignee changed: "+issue.Title, "") + } + + if statusChanged { + // Subscriber: notify all subscribers except actor + notifySubscribers(ctx, queries, bus, issue.ID, e.WorkspaceID, e, + nil, "status_changed", "info", + issue.Title+" moved to "+issue.Status, "") + } + + // Notify NEW @mentions in description + if descriptionChanged && issue.Description != nil { + newMentions := parseMentions(*issue.Description) + if len(newMentions) > 0 { + prevMentioned := map[string]bool{} + if prevDescription != nil { + for _, m := range parseMentions(*prevDescription) { + prevMentioned[m.Type+":"+m.ID] = true + } + } + var added []mention + for _, m := range newMentions { + if !prevMentioned[m.Type+":"+m.ID] { + added = append(added, m) + } + } + skip := map[string]bool{e.ActorID: true} + notifyMentionedMembers(bus, queries, e, added, issue.ID, issue.Title, issue.Status, + "Mentioned in: "+issue.Title, skip) + } + } + }) + + // comment:created — notify all subscribers except the commenter + bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + + // The comment payload can come as handler.CommentResponse from the + // HTTP handler, or as map[string]any from the agent comment path in + // task.go. Handle both. + var issueID, commentContent string + switch c := payload["comment"].(type) { + case handler.CommentResponse: + issueID = c.IssueID + commentContent = c.Content + case map[string]any: + issueID, _ = c["issue_id"].(string) + commentContent, _ = c["content"].(string) + default: + return + } + + issueTitle, _ := payload["issue_title"].(string) + + notifySubscribers(ctx, queries, bus, issueID, e.WorkspaceID, e, + nil, "new_comment", "info", + "New comment on: "+issueTitle, commentContent) + + // Notify @mentions in comment content + mentions := parseMentions(commentContent) + if len(mentions) > 0 { + issueStatus, _ := payload["issue_status"].(string) + skip := map[string]bool{e.ActorID: true} + notifyMentionedMembers(bus, queries, e, mentions, issueID, issueTitle, issueStatus, + "Mentioned in comment: "+issueTitle, skip) + } + }) + + // task:completed — notify all subscribers except the agent + bus.Subscribe(protocol.EventTaskCompleted, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + agentID, _ := payload["agent_id"].(string) + issueID, _ := payload["issue_id"].(string) + if issueID == "" { + return + } + + // Look up issue to get the title + issue, err := queries.GetIssue(ctx, parseUUID(issueID)) + if err != nil { + slog.Error("task:completed notification: failed to get issue", "issue_id", issueID, "error", err) + return + } + + // Use the agent ID as an exclusion (since the agent did the work) + exclude := map[string]bool{} + if agentID != "" { + exclude[agentID] = true + } + + notifySubscribers(ctx, queries, bus, issueID, e.WorkspaceID, + events.Event{ + Type: e.Type, + WorkspaceID: e.WorkspaceID, + ActorType: "agent", + ActorID: agentID, + }, + exclude, "task_completed", "attention", + "Task completed: "+issue.Title, "") + }) + + // task:failed — notify all subscribers except the agent + bus.Subscribe(protocol.EventTaskFailed, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + agentID, _ := payload["agent_id"].(string) + issueID, _ := payload["issue_id"].(string) + if issueID == "" { + return + } + + issue, err := queries.GetIssue(ctx, parseUUID(issueID)) + if err != nil { + slog.Error("task:failed notification: failed to get issue", "issue_id", issueID, "error", err) + return + } + + exclude := map[string]bool{} + if agentID != "" { + exclude[agentID] = true + } + + notifySubscribers(ctx, queries, bus, issueID, e.WorkspaceID, + events.Event{ + Type: e.Type, + WorkspaceID: e.WorkspaceID, + ActorType: "agent", + ActorID: agentID, + }, + exclude, "task_failed", "action_required", + "Task failed: "+issue.Title, "") + }) +} + +// inboxItemToResponse converts a db.InboxItem into a map suitable for +// JSON-serializable event payloads (mirrors handler.inboxToResponse fields). +func inboxItemToResponse(item db.InboxItem) map[string]any { + return map[string]any{ + "id": util.UUIDToString(item.ID), + "workspace_id": util.UUIDToString(item.WorkspaceID), + "recipient_type": item.RecipientType, + "recipient_id": util.UUIDToString(item.RecipientID), + "type": item.Type, + "severity": item.Severity, + "issue_id": util.UUIDToPtr(item.IssueID), + "title": item.Title, + "body": util.TextToPtr(item.Body), + "read": item.Read, + "archived": item.Archived, + "created_at": util.TimestampToString(item.CreatedAt), + "actor_type": util.TextToPtr(item.ActorType), + "actor_id": util.UUIDToPtr(item.ActorID), + } +} diff --git a/server/cmd/server/notification_listeners_test.go b/server/cmd/server/notification_listeners_test.go new file mode 100644 index 00000000..aa2a2a4c --- /dev/null +++ b/server/cmd/server/notification_listeners_test.go @@ -0,0 +1,552 @@ +package main + +import ( + "context" + "testing" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// notificationTest helpers — reuse the integration test fixtures from TestMain +// (testPool, testUserID, testWorkspaceID are set in integration_test.go). + +// inboxItemsForRecipient returns all non-archived inbox items for a given recipient. +func inboxItemsForRecipient(t *testing.T, queries *db.Queries, recipientID string) []db.ListInboxItemsRow { + t.Helper() + items, err := queries.ListInboxItems(context.Background(), db.ListInboxItemsParams{ + RecipientType: "member", + RecipientID: util.ParseUUID(recipientID), + Limit: 100, + Offset: 0, + }) + if err != nil { + t.Fatalf("ListInboxItems: %v", err) + } + return items +} + +// cleanupInboxForIssue deletes all inbox items related to a given issue. +func cleanupInboxForIssue(t *testing.T, issueID string) { + t.Helper() + testPool.Exec(context.Background(), `DELETE FROM inbox_item WHERE issue_id = $1`, issueID) +} + +// addTestSubscriber manually inserts a subscriber for an issue. +func addTestSubscriber(t *testing.T, issueID, userType, userID, reason string) { + t.Helper() + _, err := testPool.Exec(context.Background(), ` + INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason) + VALUES ($1, $2, $3, $4) + ON CONFLICT (issue_id, user_type, user_id) DO NOTHING + `, issueID, userType, userID, reason) + if err != nil { + t.Fatalf("addTestSubscriber: %v", err) + } +} + +// newNotificationBus creates a bus with subscriber + notification listeners registered. +func newNotificationBus(t *testing.T, queries *db.Queries) *events.Bus { + t.Helper() + bus := events.New() + registerSubscriberListeners(bus, queries) + registerNotificationListeners(bus, queries) + return bus +} + +// TestNotification_IssueCreated_AssigneeNotified verifies that when an issue is +// created with an assignee different from the creator, the assignee receives an +// "issue_assigned" inbox notification and the creator receives nothing. +func TestNotification_IssueCreated_AssigneeNotified(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + assigneeEmail := "notif-assignee-created@multica.ai" + assigneeID := createTestUser(t, assigneeEmail) + t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + // Track inbox:new events + var inboxEvents []events.Event + bus.Subscribe(protocol.EventInboxNew, func(e events.Event) { + inboxEvents = append(inboxEvents, e) + }) + + assigneeType := "member" + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "notif test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &assigneeType, + AssigneeID: &assigneeID, + }, + }, + }) + + // Assignee should have an inbox item + items := inboxItemsForRecipient(t, queries, assigneeID) + if len(items) != 1 { + t.Fatalf("expected 1 inbox item for assignee, got %d", len(items)) + } + if items[0].Type != "issue_assigned" { + t.Fatalf("expected type 'issue_assigned', got %q", items[0].Type) + } + if items[0].Severity != "action_required" { + t.Fatalf("expected severity 'action_required', got %q", items[0].Severity) + } + + // Creator (actor) should NOT have any inbox items + creatorItems := inboxItemsForRecipient(t, queries, testUserID) + if len(creatorItems) != 0 { + t.Fatalf("expected 0 inbox items for creator, got %d", len(creatorItems)) + } + + // At least one inbox:new event should have been published + if len(inboxEvents) < 1 { + t.Fatal("expected at least 1 inbox:new event") + } +} + +// TestNotification_IssueCreated_SelfAssign verifies that when the creator +// assigns the issue to themselves, no notification is generated. +func TestNotification_IssueCreated_SelfAssign(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + var inboxEvents []events.Event + bus.Subscribe(protocol.EventInboxNew, func(e events.Event) { + inboxEvents = append(inboxEvents, e) + }) + + assigneeType := "member" + assigneeID := testUserID // self-assign + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "self-assign issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &assigneeType, + AssigneeID: &assigneeID, + }, + }, + }) + + items := inboxItemsForRecipient(t, queries, testUserID) + if len(items) != 0 { + t.Fatalf("expected 0 inbox items for self-assign, got %d", len(items)) + } + if len(inboxEvents) != 0 { + t.Fatalf("expected 0 inbox:new events for self-assign, got %d", len(inboxEvents)) + } +} + +// TestNotification_IssueCreated_NoAssignee verifies that when an issue is +// created without an assignee, no notifications are generated. +func TestNotification_IssueCreated_NoAssignee(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + var inboxEvents []events.Event + bus.Subscribe(protocol.EventInboxNew, func(e events.Event) { + inboxEvents = append(inboxEvents, e) + }) + + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "no assignee issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + }, + }) + + items := inboxItemsForRecipient(t, queries, testUserID) + if len(items) != 0 { + t.Fatalf("expected 0 inbox items for no-assignee issue, got %d", len(items)) + } + if len(inboxEvents) != 0 { + t.Fatalf("expected 0 inbox:new events, got %d", len(inboxEvents)) + } +} + +// TestNotification_StatusChanged verifies that all subscribers except the actor +// receive a "status_changed" notification when an issue status changes. +func TestNotification_StatusChanged(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + // Create two extra users as subscribers + sub1Email := "notif-sub1-status@multica.ai" + sub1ID := createTestUser(t, sub1Email) + t.Cleanup(func() { cleanupTestUser(t, sub1Email) }) + + sub2Email := "notif-sub2-status@multica.ai" + sub2ID := createTestUser(t, sub2Email) + t.Cleanup(func() { cleanupTestUser(t, sub2Email) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + // Manually add subscribers before the event fires + addTestSubscriber(t, issueID, "member", testUserID, "creator") + addTestSubscriber(t, issueID, "member", sub1ID, "assignee") + addTestSubscriber(t, issueID, "member", sub2ID, "commenter") + + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, // actor is the creator + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "status test issue", + Status: "in_progress", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + "assignee_changed": false, + "status_changed": true, + }, + }) + + // Actor (testUserID) should NOT get a notification + actorItems := inboxItemsForRecipient(t, queries, testUserID) + if len(actorItems) != 0 { + t.Fatalf("expected 0 inbox items for actor, got %d", len(actorItems)) + } + + // sub1 should get a status_changed notification + sub1Items := inboxItemsForRecipient(t, queries, sub1ID) + if len(sub1Items) != 1 { + t.Fatalf("expected 1 inbox item for sub1, got %d", len(sub1Items)) + } + if sub1Items[0].Type != "status_changed" { + t.Fatalf("expected type 'status_changed', got %q", sub1Items[0].Type) + } + if sub1Items[0].Severity != "info" { + t.Fatalf("expected severity 'info', got %q", sub1Items[0].Severity) + } + + // sub2 should also get a status_changed notification + sub2Items := inboxItemsForRecipient(t, queries, sub2ID) + if len(sub2Items) != 1 { + t.Fatalf("expected 1 inbox item for sub2, got %d", len(sub2Items)) + } + if sub2Items[0].Type != "status_changed" { + t.Fatalf("expected type 'status_changed', got %q", sub2Items[0].Type) + } +} + +// TestNotification_CommentCreated verifies that all subscribers except the +// commenter receive a "new_comment" notification. +func TestNotification_CommentCreated(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + commenterEmail := "notif-commenter@multica.ai" + commenterID := createTestUser(t, commenterEmail) + t.Cleanup(func() { cleanupTestUser(t, commenterEmail) }) + + sub1Email := "notif-sub1-comment@multica.ai" + sub1ID := createTestUser(t, sub1Email) + t.Cleanup(func() { cleanupTestUser(t, sub1Email) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + // Pre-add subscribers: creator and sub1. The commenter will also be added + // by subscriber_listeners when the event fires. + addTestSubscriber(t, issueID, "member", testUserID, "creator") + addTestSubscriber(t, issueID, "member", sub1ID, "assignee") + + bus.Publish(events.Event{ + Type: protocol.EventCommentCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: commenterID, // commenter is the actor + Payload: map[string]any{ + "comment": handler.CommentResponse{ + ID: "00000000-0000-0000-0000-000000000000", + IssueID: issueID, + AuthorType: "member", + AuthorID: commenterID, + Content: "test comment content", + Type: "comment", + }, + "issue_title": "comment test issue", + "issue_status": "todo", + }, + }) + + // Creator should get a new_comment notification + creatorItems := inboxItemsForRecipient(t, queries, testUserID) + if len(creatorItems) != 1 { + t.Fatalf("expected 1 inbox item for creator, got %d", len(creatorItems)) + } + if creatorItems[0].Type != "new_comment" { + t.Fatalf("expected type 'new_comment', got %q", creatorItems[0].Type) + } + if creatorItems[0].Severity != "info" { + t.Fatalf("expected severity 'info', got %q", creatorItems[0].Severity) + } + + // sub1 should also get a new_comment notification + sub1Items := inboxItemsForRecipient(t, queries, sub1ID) + if len(sub1Items) != 1 { + t.Fatalf("expected 1 inbox item for sub1, got %d", len(sub1Items)) + } + if sub1Items[0].Type != "new_comment" { + t.Fatalf("expected type 'new_comment', got %q", sub1Items[0].Type) + } + + // Commenter (actor) should NOT get a notification + commenterItems := inboxItemsForRecipient(t, queries, commenterID) + if len(commenterItems) != 0 { + t.Fatalf("expected 0 inbox items for commenter, got %d", len(commenterItems)) + } +} + +// TestNotification_AssigneeChanged verifies the full assignee change flow: +// - New assignee gets "issue_assigned" (Direct) +// - Old assignee gets "unassigned" (Direct) +// - Other subscribers get "assignee_changed" (Subscriber), excluding actor + old + new +// - Actor gets nothing +func TestNotification_AssigneeChanged(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + oldAssigneeEmail := "notif-old-assignee@multica.ai" + oldAssigneeID := createTestUser(t, oldAssigneeEmail) + t.Cleanup(func() { cleanupTestUser(t, oldAssigneeEmail) }) + + newAssigneeEmail := "notif-new-assignee@multica.ai" + newAssigneeID := createTestUser(t, newAssigneeEmail) + t.Cleanup(func() { cleanupTestUser(t, newAssigneeEmail) }) + + bystanderEmail := "notif-bystander@multica.ai" + bystanderID := createTestUser(t, bystanderEmail) + t.Cleanup(func() { cleanupTestUser(t, bystanderEmail) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + // Pre-add subscribers: creator, old assignee, bystander + addTestSubscriber(t, issueID, "member", testUserID, "creator") + addTestSubscriber(t, issueID, "member", oldAssigneeID, "assignee") + addTestSubscriber(t, issueID, "member", bystanderID, "commenter") + + newAssigneeType := "member" + oldAssigneeType := "member" + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, // actor is the creator + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "assignee change issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &newAssigneeType, + AssigneeID: &newAssigneeID, + }, + "assignee_changed": true, + "status_changed": false, + "prev_assignee_type": &oldAssigneeType, + "prev_assignee_id": &oldAssigneeID, + }, + }) + + // New assignee should get "issue_assigned" + newItems := inboxItemsForRecipient(t, queries, newAssigneeID) + if len(newItems) != 1 { + t.Fatalf("expected 1 inbox item for new assignee, got %d", len(newItems)) + } + if newItems[0].Type != "issue_assigned" { + t.Fatalf("expected type 'issue_assigned', got %q", newItems[0].Type) + } + if newItems[0].Severity != "action_required" { + t.Fatalf("expected severity 'action_required', got %q", newItems[0].Severity) + } + + // Old assignee should get "unassigned" + oldItems := inboxItemsForRecipient(t, queries, oldAssigneeID) + if len(oldItems) != 1 { + t.Fatalf("expected 1 inbox item for old assignee, got %d", len(oldItems)) + } + if oldItems[0].Type != "unassigned" { + t.Fatalf("expected type 'unassigned', got %q", oldItems[0].Type) + } + if oldItems[0].Severity != "info" { + t.Fatalf("expected severity 'info', got %q", oldItems[0].Severity) + } + + // Bystander should get "assignee_changed" + bystanderItems := inboxItemsForRecipient(t, queries, bystanderID) + if len(bystanderItems) != 1 { + t.Fatalf("expected 1 inbox item for bystander, got %d", len(bystanderItems)) + } + if bystanderItems[0].Type != "assignee_changed" { + t.Fatalf("expected type 'assignee_changed', got %q", bystanderItems[0].Type) + } + if bystanderItems[0].Severity != "info" { + t.Fatalf("expected severity 'info', got %q", bystanderItems[0].Severity) + } + + // Actor (testUserID / creator) should NOT get any notification + actorItems := inboxItemsForRecipient(t, queries, testUserID) + if len(actorItems) != 0 { + t.Fatalf("expected 0 inbox items for actor, got %d", len(actorItems)) + } +} + +// TestNotification_TaskCompleted verifies that subscribers get a "task_completed" +// notification when a task completes, excluding the agent. +func TestNotification_TaskCompleted(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + // The agent ID (acting as system actor) + agentID := "00000000-0000-0000-0000-aaaaaaaaaaaa" + + // Pre-add subscribers: creator and the agent + addTestSubscriber(t, issueID, "member", testUserID, "creator") + addTestSubscriber(t, issueID, "agent", agentID, "assignee") + + bus.Publish(events.Event{ + Type: protocol.EventTaskCompleted, + WorkspaceID: testWorkspaceID, + ActorType: "system", + ActorID: "", + Payload: map[string]any{ + "task_id": "00000000-0000-0000-0000-bbbbbbbbbbbb", + "agent_id": agentID, + "issue_id": issueID, + "status": "completed", + }, + }) + + // Creator should get a task_completed notification + creatorItems := inboxItemsForRecipient(t, queries, testUserID) + if len(creatorItems) != 1 { + t.Fatalf("expected 1 inbox item for creator, got %d", len(creatorItems)) + } + if creatorItems[0].Type != "task_completed" { + t.Fatalf("expected type 'task_completed', got %q", creatorItems[0].Type) + } + if creatorItems[0].Severity != "attention" { + t.Fatalf("expected severity 'attention', got %q", creatorItems[0].Severity) + } +} + +// TestNotification_TaskFailed verifies that subscribers get a "task_failed" +// notification when a task fails, excluding the agent. +func TestNotification_TaskFailed(t *testing.T) { + queries := db.New(testPool) + bus := newNotificationBus(t, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupInboxForIssue(t, issueID) + cleanupTestIssue(t, issueID) + }) + + agentID := "00000000-0000-0000-0000-aaaaaaaaaaaa" + + addTestSubscriber(t, issueID, "member", testUserID, "creator") + addTestSubscriber(t, issueID, "agent", agentID, "assignee") + + bus.Publish(events.Event{ + Type: protocol.EventTaskFailed, + WorkspaceID: testWorkspaceID, + ActorType: "system", + ActorID: "", + Payload: map[string]any{ + "task_id": "00000000-0000-0000-0000-bbbbbbbbbbbb", + "agent_id": agentID, + "issue_id": issueID, + "status": "failed", + }, + }) + + creatorItems := inboxItemsForRecipient(t, queries, testUserID) + if len(creatorItems) != 1 { + t.Fatalf("expected 1 inbox item for creator, got %d", len(creatorItems)) + } + if creatorItems[0].Type != "task_failed" { + t.Fatalf("expected type 'task_failed', got %q", creatorItems[0].Type) + } + if creatorItems[0].Severity != "action_required" { + t.Fatalf("expected severity 'action_required', got %q", creatorItems[0].Severity) + } +} diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index a0638b2b..d5401a9c 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -117,6 +117,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Delete("/", h.DeleteIssue) r.Post("/comments", h.CreateComment) r.Get("/comments", h.ListComments) + r.Get("/subscribers", h.ListIssueSubscribers) + r.Post("/subscribe", h.SubscribeToIssue) + r.Post("/unsubscribe", h.UnsubscribeFromIssue) }) }) diff --git a/server/cmd/server/subscriber_listeners.go b/server/cmd/server/subscriber_listeners.go new file mode 100644 index 00000000..14261e94 --- /dev/null +++ b/server/cmd/server/subscriber_listeners.go @@ -0,0 +1,116 @@ +package main + +import ( + "context" + "log/slog" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// registerSubscriberListeners wires up event bus listeners that auto-subscribe +// relevant users to issues. This ensures creators, assignees, and commenters +// are automatically tracked as issue subscribers. +func registerSubscriberListeners(bus *events.Bus, queries *db.Queries) { + // issue:created — subscribe creator + assignee (if different) + bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + + // Subscribe the creator + addSubscriber(bus, queries, e.WorkspaceID, issue.ID, issue.CreatorType, issue.CreatorID, "creator") + + // Subscribe the assignee if exists and different from creator + if issue.AssigneeType != nil && issue.AssigneeID != nil && + !(*issue.AssigneeType == issue.CreatorType && *issue.AssigneeID == issue.CreatorID) { + addSubscriber(bus, queries, e.WorkspaceID, issue.ID, *issue.AssigneeType, *issue.AssigneeID, "assignee") + } + }) + + // issue:updated — subscribe new assignee if assignee changed + bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + assigneeChanged, _ := payload["assignee_changed"].(bool) + if !assigneeChanged { + return + } + + if issue.AssigneeType != nil && issue.AssigneeID != nil { + addSubscriber(bus, queries, e.WorkspaceID, issue.ID, *issue.AssigneeType, *issue.AssigneeID, "assignee") + } + }) + + // comment:created — subscribe the commenter + bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + + // Comments created via handler use CommentResponse; agent comments from task.go use map[string]any + var issueID, authorType, authorID string + if comment, ok := payload["comment"].(handler.CommentResponse); ok { + issueID = comment.IssueID + authorType = comment.AuthorType + authorID = comment.AuthorID + } else if commentMap, ok := payload["comment"].(map[string]any); ok { + issueID, _ = commentMap["issue_id"].(string) + authorType, _ = commentMap["author_type"].(string) + authorID, _ = commentMap["author_id"].(string) + } else { + return + } + if issueID == "" || authorID == "" { + return + } + + addSubscriber(bus, queries, e.WorkspaceID, issueID, authorType, authorID, "commenter") + }) +} + +// addSubscriber adds a user as an issue subscriber and publishes a +// subscriber:added event for real-time frontend sync. +func addSubscriber(bus *events.Bus, queries *db.Queries, workspaceID, issueID, userType, userID, reason string) { + err := queries.AddIssueSubscriber(context.Background(), db.AddIssueSubscriberParams{ + IssueID: parseUUID(issueID), + UserType: userType, + UserID: parseUUID(userID), + Reason: reason, + }) + if err != nil { + slog.Error("failed to add issue subscriber", + "issue_id", issueID, + "user_type", userType, + "user_id", userID, + "reason", reason, + "error", err, + ) + return + } + + bus.Publish(events.Event{ + Type: protocol.EventSubscriberAdded, + WorkspaceID: workspaceID, + Payload: map[string]any{ + "issue_id": issueID, + "user_type": userType, + "user_id": userID, + "reason": reason, + }, + }) +} diff --git a/server/cmd/server/subscriber_listeners_test.go b/server/cmd/server/subscriber_listeners_test.go new file mode 100644 index 00000000..1af8e2a1 --- /dev/null +++ b/server/cmd/server/subscriber_listeners_test.go @@ -0,0 +1,377 @@ +package main + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// subscriberTest helpers — reuse the integration test fixtures from TestMain +// (testPool, testUserID, testWorkspaceID are set in integration_test.go). + +// createTestIssue inserts a minimal issue and returns its UUID string. +func createTestIssue(t *testing.T, workspaceID, creatorID string) string { + t.Helper() + ctx := context.Background() + var issueID string + err := testPool.QueryRow(ctx, ` + INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, position) + VALUES ($1, 'subscriber test issue', 'todo', 'medium', 'member', $2, 0) + RETURNING id + `, workspaceID, creatorID).Scan(&issueID) + if err != nil { + t.Fatalf("createTestIssue: %v", err) + } + return issueID +} + +// createTestUser inserts a user with the given email and returns the UUID string. +func createTestUser(t *testing.T, email string) string { + t.Helper() + ctx := context.Background() + var userID string + err := testPool.QueryRow(ctx, ` + INSERT INTO "user" (name, email) + VALUES ($1, $2) + RETURNING id + `, "Subscriber Test User", email).Scan(&userID) + if err != nil { + t.Fatalf("createTestUser: %v", err) + } + return userID +} + +func cleanupTestIssue(t *testing.T, issueID string) { + t.Helper() + testPool.Exec(context.Background(), `DELETE FROM issue WHERE id = $1`, issueID) +} + +func cleanupTestUser(t *testing.T, email string) { + t.Helper() + testPool.Exec(context.Background(), `DELETE FROM "user" WHERE email = $1`, email) +} + +func isSubscribed(t *testing.T, queries *db.Queries, issueID, userType, userID string) bool { + t.Helper() + subscribed, err := queries.IsIssueSubscriber(context.Background(), db.IsIssueSubscriberParams{ + IssueID: util.ParseUUID(issueID), + UserType: userType, + UserID: util.ParseUUID(userID), + }) + if err != nil { + t.Fatalf("IsIssueSubscriber: %v", err) + } + return subscribed +} + +func subscriberCount(t *testing.T, queries *db.Queries, issueID string) int { + t.Helper() + subs, err := queries.ListIssueSubscribers(context.Background(), util.ParseUUID(issueID)) + if err != nil { + t.Fatalf("ListIssueSubscribers: %v", err) + } + return len(subs) +} + +func TestSubscriberIssueCreated_CreatorSubscribed(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + // Publish issue:created event with no assignee + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + }, + }) + + if !isSubscribed(t, queries, issueID, "member", testUserID) { + t.Fatal("expected creator to be subscribed after issue:created") + } + if count := subscriberCount(t, queries, issueID); count != 1 { + t.Fatalf("expected 1 subscriber, got %d", count) + } +} + +func TestSubscriberIssueCreated_CreatorAndAssignee(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + assigneeEmail := "subscriber-assignee-test@multica.ai" + assigneeID := createTestUser(t, assigneeEmail) + t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + assigneeType := "member" + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &assigneeType, + AssigneeID: &assigneeID, + }, + }, + }) + + if !isSubscribed(t, queries, issueID, "member", testUserID) { + t.Fatal("expected creator to be subscribed") + } + if !isSubscribed(t, queries, issueID, "member", assigneeID) { + t.Fatal("expected assignee to be subscribed") + } + if count := subscriberCount(t, queries, issueID); count != 2 { + t.Fatalf("expected 2 subscribers, got %d", count) + } +} + +func TestSubscriberIssueCreated_SelfAssign(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + // Creator is also the assignee (self-assign) + assigneeType := "member" + assigneeID := testUserID + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &assigneeType, + AssigneeID: &assigneeID, + }, + }, + }) + + // Should only have 1 subscriber record (ON CONFLICT DO NOTHING handles idempotency) + if count := subscriberCount(t, queries, issueID); count != 1 { + t.Fatalf("expected 1 subscriber for self-assign, got %d", count) + } + if !isSubscribed(t, queries, issueID, "member", testUserID) { + t.Fatal("expected creator/assignee to be subscribed") + } +} + +func TestSubscriberIssueUpdated_AssigneeChanged(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + assigneeEmail := "subscriber-new-assignee-test@multica.ai" + assigneeID := createTestUser(t, assigneeEmail) + t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + assigneeType := "member" + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &assigneeType, + AssigneeID: &assigneeID, + }, + "assignee_changed": true, + }, + }) + + if !isSubscribed(t, queries, issueID, "member", assigneeID) { + t.Fatal("expected new assignee to be subscribed after assignee change") + } +} + +func TestSubscriberIssueUpdated_NoAssigneeChange(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + // Publish issue:updated without assignee_changed flag + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "test issue", + Status: "in_progress", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + "assignee_changed": false, + "status_changed": true, + }, + }) + + // No subscriber should have been added + if count := subscriberCount(t, queries, issueID); count != 0 { + t.Fatalf("expected 0 subscribers when assignee not changed, got %d", count) + } +} + +func TestSubscriberCommentCreated_CommenterSubscribed(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + commenterEmail := "subscriber-commenter-test@multica.ai" + commenterID := createTestUser(t, commenterEmail) + t.Cleanup(func() { cleanupTestUser(t, commenterEmail) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + bus.Publish(events.Event{ + Type: protocol.EventCommentCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: commenterID, + Payload: map[string]any{ + "comment": handler.CommentResponse{ + ID: "00000000-0000-0000-0000-000000000000", + IssueID: issueID, + AuthorType: "member", + AuthorID: commenterID, + Content: "test comment", + Type: "comment", + }, + }, + }) + + if !isSubscribed(t, queries, issueID, "member", commenterID) { + t.Fatal("expected commenter to be subscribed after comment:created") + } +} + +func TestSubscriberAddedEventPublished(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerSubscriberListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { cleanupTestIssue(t, issueID) }) + + // Track subscriber:added events + var subscriberEvents []events.Event + bus.Subscribe(protocol.EventSubscriberAdded, func(e events.Event) { + subscriberEvents = append(subscriberEvents, e) + }) + + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + }, + }) + + if len(subscriberEvents) != 1 { + t.Fatalf("expected 1 subscriber:added event, got %d", len(subscriberEvents)) + } + evt := subscriberEvents[0] + if evt.WorkspaceID != testWorkspaceID { + t.Fatalf("expected workspace_id %s, got %s", testWorkspaceID, evt.WorkspaceID) + } + payload, ok := evt.Payload.(map[string]any) + if !ok { + t.Fatal("expected map[string]any payload") + } + if payload["issue_id"] != issueID { + t.Fatalf("expected issue_id %s, got %v", issueID, payload["issue_id"]) + } + if payload["user_id"] != testUserID { + t.Fatalf("expected user_id %s, got %v", testUserID, payload["user_id"]) + } +} + +// Verify parseUUID is consistent — pgtype.UUID from our local helper should match util.ParseUUID +func TestParseUUIDConsistency(t *testing.T) { + uuid := "550e8400-e29b-41d4-a716-446655440000" + local := parseUUID(uuid) + utilResult := util.ParseUUID(uuid) + if local != utilResult { + t.Fatalf("parseUUID inconsistency: local=%v, util=%v", local, utilResult) + } + if !local.Valid { + t.Fatal("expected valid UUID") + } + + // Empty string should produce invalid UUID + empty := parseUUID("") + if empty != (pgtype.UUID{}) { + t.Fatalf("expected zero UUID for empty string, got %v", empty) + } +} diff --git a/server/internal/handler/subscriber.go b/server/internal/handler/subscriber.go new file mode 100644 index 00000000..78f4e18b --- /dev/null +++ b/server/internal/handler/subscriber.go @@ -0,0 +1,110 @@ +package handler + +import ( + "net/http" + + "github.com/go-chi/chi/v5" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// SubscriberResponse is the JSON shape returned for each issue subscriber. +type SubscriberResponse struct { + IssueID string `json:"issue_id"` + UserType string `json:"user_type"` + UserID string `json:"user_id"` + Reason string `json:"reason"` + CreatedAt string `json:"created_at"` +} + +func subscriberToResponse(s db.IssueSubscriber) SubscriberResponse { + return SubscriberResponse{ + IssueID: uuidToString(s.IssueID), + UserType: s.UserType, + UserID: uuidToString(s.UserID), + Reason: s.Reason, + CreatedAt: timestampToString(s.CreatedAt), + } +} + +// ListIssueSubscribers returns all subscribers for an issue. +func (h *Handler) ListIssueSubscribers(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + issue, ok := h.loadIssueForUser(w, r, issueID) + if !ok { + return + } + + subscribers, err := h.Queries.ListIssueSubscribers(r.Context(), issue.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list subscribers") + return + } + + resp := make([]SubscriberResponse, len(subscribers)) + for i, s := range subscribers { + resp[i] = subscriberToResponse(s) + } + + writeJSON(w, http.StatusOK, resp) +} + +// SubscribeToIssue subscribes the current user to an issue with reason "manual". +func (h *Handler) SubscribeToIssue(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + issue, ok := h.loadIssueForUser(w, r, issueID) + if !ok { + return + } + + userID := requestUserID(r) + + err := h.Queries.AddIssueSubscriber(r.Context(), db.AddIssueSubscriberParams{ + IssueID: issue.ID, + UserType: "member", + UserID: parseUUID(userID), + Reason: "manual", + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to subscribe") + return + } + + workspaceID := uuidToString(issue.WorkspaceID) + h.publish(protocol.EventSubscriberAdded, workspaceID, "member", userID, map[string]any{ + "issue_id": issueID, + "user_id": userID, + "reason": "manual", + }) + + writeJSON(w, http.StatusOK, map[string]bool{"subscribed": true}) +} + +// UnsubscribeFromIssue removes the current user's subscription from an issue. +func (h *Handler) UnsubscribeFromIssue(w http.ResponseWriter, r *http.Request) { + issueID := chi.URLParam(r, "id") + issue, ok := h.loadIssueForUser(w, r, issueID) + if !ok { + return + } + + userID := requestUserID(r) + + err := h.Queries.RemoveIssueSubscriber(r.Context(), db.RemoveIssueSubscriberParams{ + IssueID: issue.ID, + UserType: "member", + UserID: parseUUID(userID), + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to unsubscribe") + return + } + + workspaceID := uuidToString(issue.WorkspaceID) + h.publish(protocol.EventSubscriberRemoved, workspaceID, "member", userID, map[string]any{ + "issue_id": issueID, + "user_id": userID, + }) + + writeJSON(w, http.StatusOK, map[string]bool{"subscribed": false}) +} diff --git a/server/internal/handler/subscriber_test.go b/server/internal/handler/subscriber_test.go new file mode 100644 index 00000000..bcc9fa55 --- /dev/null +++ b/server/internal/handler/subscriber_test.go @@ -0,0 +1,208 @@ +package handler + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +func TestSubscriberAPI(t *testing.T) { + ctx := context.Background() + + // Helper: create an issue for subscriber tests + createIssue := func(t *testing.T) string { + t.Helper() + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{ + "title": "Subscriber test issue", + }) + testHandler.CreateIssue(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("CreateIssue: expected 201, got %d: %s", w.Code, w.Body.String()) + } + var issue IssueResponse + json.NewDecoder(w.Body).Decode(&issue) + return issue.ID + } + + // Helper: delete an issue + deleteIssue := func(t *testing.T, issueID string) { + t.Helper() + w := httptest.NewRecorder() + req := newRequest("DELETE", "/api/issues/"+issueID, nil) + req = withURLParam(req, "id", issueID) + testHandler.DeleteIssue(w, req) + } + + t.Run("Subscribe", func(t *testing.T) { + issueID := createIssue(t) + defer deleteIssue(t, issueID) + + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.SubscribeToIssue(w, req) + if w.Code != http.StatusOK { + t.Fatalf("SubscribeToIssue: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]bool + json.NewDecoder(w.Body).Decode(&resp) + if !resp["subscribed"] { + t.Fatal("SubscribeToIssue: expected subscribed=true") + } + + // Verify in DB + subscribed, err := testHandler.Queries.IsIssueSubscriber(ctx, db.IsIssueSubscriberParams{ + IssueID: parseUUID(issueID), + UserType: "member", + UserID: parseUUID(testUserID), + }) + if err != nil { + t.Fatalf("IsIssueSubscriber: %v", err) + } + if !subscribed { + t.Fatal("expected user to be subscribed in DB") + } + }) + + t.Run("SubscribeIdempotent", func(t *testing.T) { + issueID := createIssue(t) + defer deleteIssue(t, issueID) + + // Subscribe first time + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.SubscribeToIssue(w, req) + if w.Code != http.StatusOK { + t.Fatalf("SubscribeToIssue (1st): expected 200, got %d: %s", w.Code, w.Body.String()) + } + + // Subscribe second time — should also succeed + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.SubscribeToIssue(w, req) + if w.Code != http.StatusOK { + t.Fatalf("SubscribeToIssue (2nd): expected 200, got %d: %s", w.Code, w.Body.String()) + } + }) + + t.Run("ListSubscribers", func(t *testing.T) { + issueID := createIssue(t) + defer deleteIssue(t, issueID) + + // Subscribe first + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.SubscribeToIssue(w, req) + if w.Code != http.StatusOK { + t.Fatalf("SubscribeToIssue: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + // List + w = httptest.NewRecorder() + req = newRequest("GET", "/api/issues/"+issueID+"/subscribers", nil) + req = withURLParam(req, "id", issueID) + testHandler.ListIssueSubscribers(w, req) + if w.Code != http.StatusOK { + t.Fatalf("ListIssueSubscribers: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var subscribers []SubscriberResponse + json.NewDecoder(w.Body).Decode(&subscribers) + if len(subscribers) == 0 { + t.Fatal("ListIssueSubscribers: expected at least 1 subscriber") + } + found := false + for _, s := range subscribers { + if s.UserID == testUserID && s.UserType == "member" && s.Reason == "manual" { + found = true + break + } + } + if !found { + t.Fatalf("ListIssueSubscribers: expected to find test user subscriber, got %+v", subscribers) + } + }) + + t.Run("Unsubscribe", func(t *testing.T) { + issueID := createIssue(t) + defer deleteIssue(t, issueID) + + // Subscribe first + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.SubscribeToIssue(w, req) + if w.Code != http.StatusOK { + t.Fatalf("SubscribeToIssue: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + // Unsubscribe + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/unsubscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.UnsubscribeFromIssue(w, req) + if w.Code != http.StatusOK { + t.Fatalf("UnsubscribeFromIssue: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]bool + json.NewDecoder(w.Body).Decode(&resp) + if resp["subscribed"] { + t.Fatal("UnsubscribeFromIssue: expected subscribed=false") + } + + // Verify in DB + subscribed, err := testHandler.Queries.IsIssueSubscriber(ctx, db.IsIssueSubscriberParams{ + IssueID: parseUUID(issueID), + UserType: "member", + UserID: parseUUID(testUserID), + }) + if err != nil { + t.Fatalf("IsIssueSubscriber: %v", err) + } + if subscribed { + t.Fatal("expected user to NOT be subscribed in DB") + } + }) + + t.Run("ListAfterUnsubscribe", func(t *testing.T) { + issueID := createIssue(t) + defer deleteIssue(t, issueID) + + // Subscribe + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.SubscribeToIssue(w, req) + + // Unsubscribe + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/unsubscribe", nil) + req = withURLParam(req, "id", issueID) + testHandler.UnsubscribeFromIssue(w, req) + + // List should be empty + w = httptest.NewRecorder() + req = newRequest("GET", "/api/issues/"+issueID+"/subscribers", nil) + req = withURLParam(req, "id", issueID) + testHandler.ListIssueSubscribers(w, req) + if w.Code != http.StatusOK { + t.Fatalf("ListIssueSubscribers: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var subscribers []SubscriberResponse + json.NewDecoder(w.Body).Decode(&subscribers) + if len(subscribers) != 0 { + t.Fatalf("ListIssueSubscribers: expected 0 subscribers after unsubscribe, got %d", len(subscribers)) + } + }) +} diff --git a/server/internal/service/task.go b/server/internal/service/task.go index 65091d42..7fe74d6d 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -176,10 +176,6 @@ func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, resu } } - if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil { - s.createInboxForIssueCreator(ctx, issue, task.AgentID, "task_completed", "attention", "Task completed: "+issue.Title, "") - } - // Reconcile agent status s.ReconcileAgentStatus(ctx, task.AgentID) @@ -218,10 +214,6 @@ func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg s if errMsg != "" { s.createAgentComment(ctx, task.IssueID, task.AgentID, errMsg, "system") } - if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil { - s.createInboxForIssueCreator(ctx, issue, task.AgentID, "task_failed", "action_required", "Task failed: "+issue.Title, errMsg) - } - // Reconcile agent status s.ReconcileAgentStatus(ctx, task.AgentID) @@ -412,40 +404,12 @@ func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID p "type": comment.Type, "created_at": comment.CreatedAt.Time.Format("2006-01-02T15:04:05Z"), }, + "issue_title": issue.Title, + "issue_status": issue.Status, }, }) } -func (s *TaskService) createInboxForIssueCreator(ctx context.Context, issue db.Issue, agentID pgtype.UUID, itemType, severity, title, body string) { - if issue.CreatorType != "member" { - return - } - item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{ - WorkspaceID: issue.WorkspaceID, - RecipientType: "member", - RecipientID: issue.CreatorID, - Type: itemType, - Severity: severity, - IssueID: issue.ID, - Title: title, - Body: util.PtrToText(&body), - ActorType: util.StrToText("agent"), - ActorID: agentID, - }) - if err != nil { - return - } - resp := inboxToMap(item) - resp["issue_status"] = issue.Status - s.Bus.Publish(events.Event{ - Type: protocol.EventInboxNew, - WorkspaceID: util.UUIDToString(issue.WorkspaceID), - ActorType: "agent", - ActorID: util.UUIDToString(agentID), - Payload: map[string]any{"item": resp}, - }) -} - func issueToMap(issue db.Issue) map[string]any { return map[string]any{ "id": util.UUIDToString(issue.ID), @@ -466,25 +430,6 @@ func issueToMap(issue db.Issue) map[string]any { } } -func inboxToMap(item db.InboxItem) map[string]any { - return map[string]any{ - "id": util.UUIDToString(item.ID), - "workspace_id": util.UUIDToString(item.WorkspaceID), - "recipient_type": item.RecipientType, - "recipient_id": util.UUIDToString(item.RecipientID), - "type": item.Type, - "severity": item.Severity, - "issue_id": util.UUIDToPtr(item.IssueID), - "title": item.Title, - "body": util.TextToPtr(item.Body), - "read": item.Read, - "archived": item.Archived, - "created_at": util.TimestampToString(item.CreatedAt), - "actor_type": util.TextToPtr(item.ActorType), - "actor_id": util.UUIDToPtr(item.ActorID), - } -} - // agentToMap builds a simple map for broadcasting agent status updates. func agentToMap(a db.Agent) map[string]any { var rc any diff --git a/server/migrations/015_issue_subscriber.down.sql b/server/migrations/015_issue_subscriber.down.sql new file mode 100644 index 00000000..84d835bb --- /dev/null +++ b/server/migrations/015_issue_subscriber.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS issue_subscriber; diff --git a/server/migrations/015_issue_subscriber.up.sql b/server/migrations/015_issue_subscriber.up.sql new file mode 100644 index 00000000..f263976b --- /dev/null +++ b/server/migrations/015_issue_subscriber.up.sql @@ -0,0 +1,11 @@ +-- Issue subscribers: tracks who is subscribed to notifications for an issue +CREATE TABLE issue_subscriber ( + issue_id UUID NOT NULL REFERENCES issue(id) ON DELETE CASCADE, + user_type TEXT NOT NULL CHECK (user_type IN ('member', 'agent')), + user_id UUID NOT NULL, + reason TEXT NOT NULL CHECK (reason IN ('creator', 'assignee', 'commenter', 'mentioned', 'manual')), + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (issue_id, user_type, user_id) +); + +CREATE INDEX idx_issue_subscriber_user ON issue_subscriber(user_type, user_id); diff --git a/server/migrations/016_backfill_subscribers.down.sql b/server/migrations/016_backfill_subscribers.down.sql new file mode 100644 index 00000000..d9621651 --- /dev/null +++ b/server/migrations/016_backfill_subscribers.down.sql @@ -0,0 +1 @@ +-- No-op: cannot distinguish backfilled from organic subscribers diff --git a/server/migrations/016_backfill_subscribers.up.sql b/server/migrations/016_backfill_subscribers.up.sql new file mode 100644 index 00000000..97271a29 --- /dev/null +++ b/server/migrations/016_backfill_subscribers.up.sql @@ -0,0 +1,12 @@ +-- Backfill creators as subscribers +INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason) +SELECT id, creator_type, creator_id, 'creator' +FROM issue +ON CONFLICT DO NOTHING; + +-- Backfill assignees as subscribers +INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason) +SELECT id, assignee_type, assignee_id, 'assignee' +FROM issue +WHERE assignee_type IS NOT NULL AND assignee_id IS NOT NULL +ON CONFLICT DO NOTHING; diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index ff125cdd..14fd5d05 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -166,6 +166,14 @@ type IssueLabel struct { Color string `json:"color"` } +type IssueSubscriber struct { + IssueID pgtype.UUID `json:"issue_id"` + UserType string `json:"user_type"` + UserID pgtype.UUID `json:"user_id"` + Reason string `json:"reason"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + type IssueToLabel struct { IssueID pgtype.UUID `json:"issue_id"` LabelID pgtype.UUID `json:"label_id"` diff --git a/server/pkg/db/generated/subscriber.sql.go b/server/pkg/db/generated/subscriber.sql.go new file mode 100644 index 00000000..7cae383f --- /dev/null +++ b/server/pkg/db/generated/subscriber.sql.go @@ -0,0 +1,103 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: subscriber.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const addIssueSubscriber = `-- name: AddIssueSubscriber :exec +INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason) +VALUES ($1, $2, $3, $4) +ON CONFLICT (issue_id, user_type, user_id) DO NOTHING +` + +type AddIssueSubscriberParams struct { + IssueID pgtype.UUID `json:"issue_id"` + UserType string `json:"user_type"` + UserID pgtype.UUID `json:"user_id"` + Reason string `json:"reason"` +} + +func (q *Queries) AddIssueSubscriber(ctx context.Context, arg AddIssueSubscriberParams) error { + _, err := q.db.Exec(ctx, addIssueSubscriber, + arg.IssueID, + arg.UserType, + arg.UserID, + arg.Reason, + ) + return err +} + +const isIssueSubscriber = `-- name: IsIssueSubscriber :one +SELECT EXISTS( + SELECT 1 FROM issue_subscriber + WHERE issue_id = $1 AND user_type = $2 AND user_id = $3 +) AS subscribed +` + +type IsIssueSubscriberParams struct { + IssueID pgtype.UUID `json:"issue_id"` + UserType string `json:"user_type"` + UserID pgtype.UUID `json:"user_id"` +} + +func (q *Queries) IsIssueSubscriber(ctx context.Context, arg IsIssueSubscriberParams) (bool, error) { + row := q.db.QueryRow(ctx, isIssueSubscriber, arg.IssueID, arg.UserType, arg.UserID) + var subscribed bool + err := row.Scan(&subscribed) + return subscribed, err +} + +const listIssueSubscribers = `-- name: ListIssueSubscribers :many +SELECT issue_id, user_type, user_id, reason, created_at FROM issue_subscriber +WHERE issue_id = $1 +ORDER BY created_at +` + +func (q *Queries) ListIssueSubscribers(ctx context.Context, issueID pgtype.UUID) ([]IssueSubscriber, error) { + rows, err := q.db.Query(ctx, listIssueSubscribers, issueID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []IssueSubscriber{} + for rows.Next() { + var i IssueSubscriber + if err := rows.Scan( + &i.IssueID, + &i.UserType, + &i.UserID, + &i.Reason, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const removeIssueSubscriber = `-- name: RemoveIssueSubscriber :exec +DELETE FROM issue_subscriber +WHERE issue_id = $1 AND user_type = $2 AND user_id = $3 +` + +type RemoveIssueSubscriberParams struct { + IssueID pgtype.UUID `json:"issue_id"` + UserType string `json:"user_type"` + UserID pgtype.UUID `json:"user_id"` +} + +func (q *Queries) RemoveIssueSubscriber(ctx context.Context, arg RemoveIssueSubscriberParams) error { + _, err := q.db.Exec(ctx, removeIssueSubscriber, arg.IssueID, arg.UserType, arg.UserID) + return err +} diff --git a/server/pkg/db/queries/subscriber.sql b/server/pkg/db/queries/subscriber.sql new file mode 100644 index 00000000..b741b576 --- /dev/null +++ b/server/pkg/db/queries/subscriber.sql @@ -0,0 +1,19 @@ +-- name: AddIssueSubscriber :exec +INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason) +VALUES ($1, $2, $3, $4) +ON CONFLICT (issue_id, user_type, user_id) DO NOTHING; + +-- name: RemoveIssueSubscriber :exec +DELETE FROM issue_subscriber +WHERE issue_id = $1 AND user_type = $2 AND user_id = $3; + +-- name: ListIssueSubscribers :many +SELECT * FROM issue_subscriber +WHERE issue_id = $1 +ORDER BY created_at; + +-- name: IsIssueSubscriber :one +SELECT EXISTS( + SELECT 1 FROM issue_subscriber + WHERE issue_id = $1 AND user_type = $2 AND user_id = $3 +) AS subscribed; diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index 31e40188..137151d7 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -39,6 +39,10 @@ const ( EventMemberUpdated = "member:updated" EventMemberRemoved = "member:removed" + // Subscriber events + EventSubscriberAdded = "subscriber:added" + EventSubscriberRemoved = "subscriber:removed" + // Daemon events EventDaemonHeartbeat = "daemon:heartbeat" EventDaemonRegister = "daemon:register"