feat(notifications): replace hardcoded inbox notifications with subscriber-driven model

Replace inbox_listeners.go with a subscriber-driven notification system:

- Add issue_subscriber table with auto-subscribe on create/assign/comment
- New subscriber_listeners.go: maintains subscriber data on domain events
- New notification_listeners.go: notifySubscribers (fanout to all subscribers
  minus actor) and notifyDirect (targeted, punches through unsubscribe)
- Subscriber API: list/subscribe/unsubscribe endpoints
- Frontend: subscribers section in issue detail sidebar with real-time sync
- Frontend: inbox notification grouping by (issue_id, type, actor_id)
- Remove createInboxForIssueCreator from task.go (unified through event bus)
- 21 new Go tests, all passing

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-03-28 19:33:20 +08:00
parent 5fc03c61fe
commit bfe9498def
26 changed files with 2144 additions and 457 deletions

View file

@ -131,15 +131,31 @@ export default function InboxPage() {
id: "multica_inbox_layout",
});
// Sort: severity first, then newest first
// Group by (issue_id, type, actor_id) and take the latest from each group
const items = useMemo(() => {
return [...storeItems]
.filter((i) => !i.archived)
.sort(
(a, b) =>
severityOrder[a.severity] - severityOrder[b.severity] ||
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
const active = storeItems.filter((i) => !i.archived);
const groups = new Map<string, InboxItem[]>();
active.forEach((item) => {
const key = `${item.issue_id ?? "none"}|${item.type}|${item.actor_id ?? "none"}`;
const group = groups.get(key) ?? [];
group.push(item);
groups.set(key, group);
});
const merged: InboxItem[] = [];
groups.forEach((group) => {
const sorted = group.sort(
(a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
const latest = sorted[0];
if (latest) merged.push(latest);
});
return merged.sort(
(a, b) =>
severityOrder[a.severity] - severityOrder[b.severity] ||
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
}, [storeItems]);
const selected = items.find((i) => i.id === selectedId) ?? null;

View file

@ -124,6 +124,9 @@ vi.mock("@/shared/api", () => ({
deleteComment: (...args: any[]) => mockDeleteComment(...args),
deleteIssue: (...args: any[]) => mockDeleteIssue(...args),
updateIssue: (...args: any[]) => mockUpdateIssue(...args),
listIssueSubscribers: vi.fn().mockResolvedValue([]),
subscribeToIssue: vi.fn().mockResolvedValue(undefined),
unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined),
},
}));

View file

@ -8,7 +8,6 @@ import {
ArrowUp,
Bot,
Calendar,
ChevronDown,
ChevronLeft,
ChevronRight,
Link2,
@ -55,7 +54,7 @@ import {
TooltipContent,
} from "@/components/ui/tooltip";
import { ActorAvatar } from "@/components/common/actor-avatar";
import type { Issue, Comment, UpdateIssueRequest, IssueStatus, IssuePriority } from "@/shared/types";
import type { Issue, Comment, IssueSubscriber, UpdateIssueRequest, IssueStatus, IssuePriority } from "@/shared/types";
import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/features/issues/config";
import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components";
import { api } from "@/shared/api";
@ -63,7 +62,7 @@ import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore, useActorName } from "@/features/workspace";
import { useWSEvent } from "@/features/realtime";
import { useIssueStore } from "@/features/issues";
import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload } from "@/shared/types";
import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload, SubscriberAddedPayload, SubscriberRemovedPayload } from "@/shared/types";
// ---------------------------------------------------------------------------
// Helpers
@ -144,6 +143,7 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
const [sidebarOpen, setSidebarOpen] = useState(true);
const [issue, setIssue] = useState<Issue | null>(null);
const [comments, setComments] = useState<Comment[]>([]);
const [subscribers, setSubscribers] = useState<IssueSubscriber[]>([]);
const [loading, setLoading] = useState(true);
const [commentEmpty, setCommentEmpty] = useState(true);
const commentEditorRef = useRef<RichTextEditorRef>(null);
@ -154,6 +154,8 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
const [editingTitle, setEditingTitle] = useState(false);
const [titleDraft, setTitleDraft] = useState("");
const [deleteDialogOpen, setDeleteDialogOpen] = useState(false);
const [propertiesOpen, setPropertiesOpen] = useState(true);
const [detailsOpen, setDetailsOpen] = useState(true);
// Watch the global issue store for real-time updates from other users/agents
const storeIssue = useIssueStore((s) => s.issues.find((i) => i.id === id));
@ -167,11 +169,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
useEffect(() => {
setIssue(null);
setComments([]);
setSubscribers([]);
setLoading(true);
Promise.all([api.getIssue(id), api.listComments(id)])
.then(([iss, cmts]) => {
Promise.all([api.getIssue(id), api.listComments(id), api.listIssueSubscribers(id)])
.then(([iss, cmts, subs]) => {
setIssue(iss);
setComments(cmts);
setSubscribers(subs);
})
.catch(console.error)
.finally(() => setLoading(false));
@ -257,6 +261,29 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
}
};
// Subscriber state
const isSubscribed = subscribers.some(
(s) => s.user_type === "member" && s.user_id === user?.id
);
const handleToggleSubscribe = async () => {
if (!user || !issue) return;
try {
if (isSubscribed) {
await api.unsubscribeFromIssue(id);
setSubscribers((prev) => prev.filter((s) => s.user_id !== user.id));
} else {
await api.subscribeToIssue(id);
setSubscribers((prev) => [
...prev,
{ issue_id: id, user_type: "member" as const, user_id: user.id, reason: "manual" as const, created_at: new Date().toISOString() },
]);
}
} catch {
// silently fail
}
};
// Real-time comment updates
useWSEvent(
"comment:created",
@ -292,6 +319,34 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
}, [id]),
);
// Real-time subscriber updates
useWSEvent(
"subscriber:added",
useCallback((payload: unknown) => {
const p = payload as SubscriberAddedPayload;
if (p.issue_id !== id) return;
setSubscribers((prev) => {
if (prev.some((s) => s.user_id === p.user_id)) return prev;
return [...prev, {
issue_id: p.issue_id,
user_type: p.user_type as "member" | "agent",
user_id: p.user_id,
reason: p.reason as IssueSubscriber["reason"],
created_at: new Date().toISOString(),
}];
});
}, [id]),
);
useWSEvent(
"subscriber:removed",
useCallback((payload: unknown) => {
const p = payload as SubscriberRemovedPayload;
if (p.issue_id !== id) return;
setSubscribers((prev) => prev.filter((s) => s.user_id !== p.user_id));
}, [id]),
);
if (loading) {
return (
<div className="flex flex-1 min-h-0 items-center justify-center text-sm text-muted-foreground">
@ -742,14 +797,14 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
{/* Properties section */}
<div>
<button
className="flex w-full items-center gap-1 text-xs font-medium text-muted-foreground hover:text-foreground transition-colors mb-2"
onClick={() => {/* placeholder for future collapse */}}
className={`flex w-full items-center gap-1 text-xs font-medium transition-colors mb-2 ${propertiesOpen ? "" : "text-muted-foreground hover:text-foreground"}`}
onClick={() => setPropertiesOpen(!propertiesOpen)}
>
<ChevronRight className={`h-3.5 w-3.5 shrink-0 text-muted-foreground transition-transform ${propertiesOpen ? "rotate-90" : ""}`} />
Properties
<ChevronDown className="h-3 w-3" />
</button>
<div className="space-y-0.5">
{propertiesOpen && <div className="space-y-0.5 pl-2">
{/* Status */}
<PropRow label="Status">
<DropdownMenu>
@ -855,20 +910,20 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
onUpdate={handleUpdateField}
/>
</PropRow>
</div>
</div>}
</div>
{/* Details section */}
<div>
<button
className="flex w-full items-center gap-1 text-xs font-medium text-muted-foreground hover:text-foreground transition-colors mb-2"
onClick={() => {/* placeholder for future collapse */}}
className={`flex w-full items-center gap-1 text-xs font-medium transition-colors mb-2 ${detailsOpen ? "" : "text-muted-foreground hover:text-foreground"}`}
onClick={() => setDetailsOpen(!detailsOpen)}
>
<ChevronRight className={`h-3.5 w-3.5 shrink-0 text-muted-foreground transition-transform ${detailsOpen ? "rotate-90" : ""}`} />
Details
<ChevronDown className="h-3 w-3" />
</button>
<div className="space-y-0.5">
{detailsOpen && <div className="space-y-0.5 pl-2">
<PropRow label="Created by">
<ActorAvatar
actorType={issue.creator_type}
@ -883,7 +938,29 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
<PropRow label="Updated">
<span className="text-muted-foreground">{shortDate(issue.updated_at)}</span>
</PropRow>
</div>}
</div>
{/* Subscribers section */}
<div>
<h4 className="text-xs font-medium text-muted-foreground mb-2">Subscribers</h4>
<div className="space-y-1 pl-2">
{subscribers.map((sub) => (
<div key={sub.user_id} className="flex items-center gap-2 text-sm">
<ActorAvatar actorType={sub.user_type} actorId={sub.user_id} size={18} />
<span className="truncate">{getActorName(sub.user_type, sub.user_id)}</span>
<span className="text-xs text-muted-foreground">({sub.reason})</span>
</div>
))}
</div>
<Button
variant="ghost"
size="sm"
className="mt-1 h-7 text-xs w-full"
onClick={handleToggleSubscribe}
>
{isSubscribed ? "Unsubscribe" : "Subscribe"}
</Button>
</div>
</div>
</div>

View file

@ -15,6 +15,7 @@ import type {
DaemonPairingSession,
ApproveDaemonPairingSessionRequest,
InboxItem,
IssueSubscriber,
Comment,
Workspace,
WorkspaceRepo,
@ -200,6 +201,19 @@ export class ApiClient {
await this.fetch(`/api/comments/${commentId}`, { method: "DELETE" });
}
// Subscribers
async listIssueSubscribers(issueId: string): Promise<IssueSubscriber[]> {
return this.fetch(`/api/issues/${issueId}/subscribers`);
}
async subscribeToIssue(issueId: string): Promise<void> {
await this.fetch(`/api/issues/${issueId}/subscribe`, { method: "POST" });
}
async unsubscribeFromIssue(issueId: string): Promise<void> {
await this.fetch(`/api/issues/${issueId}/unsubscribe`, { method: "POST" });
}
// Agents
async listAgents(params?: { workspace_id?: string }): Promise<Agent[]> {
const search = new URLSearchParams();

View file

@ -33,7 +33,9 @@ export type WSEventType =
| "daemon:register"
| "skill:created"
| "skill:updated"
| "skill:deleted";
| "skill:deleted"
| "subscriber:added"
| "subscriber:removed";
export interface WSMessage<T = unknown> {
type: WSEventType;
@ -124,3 +126,16 @@ export interface MemberRemovedPayload {
user_id: string;
workspace_id: string;
}
export interface SubscriberAddedPayload {
issue_id: string;
user_type: string;
user_id: string;
reason: string;
}
export interface SubscriberRemovedPayload {
issue_id: string;
user_type: string;
user_id: string;
}

View file

@ -24,6 +24,7 @@ export type {
export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace";
export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox";
export type { Comment, CommentType, CommentAuthorType } from "./comment";
export type { IssueSubscriber } from "./subscriber";
export type { DaemonPairingSession, DaemonPairingSessionStatus, ApproveDaemonPairingSessionRequest } from "./daemon";
export type * from "./events";
export type * from "./api";

View file

@ -0,0 +1,7 @@
export interface IssueSubscriber {
issue_id: string;
user_type: "member" | "agent";
user_id: string;
reason: "creator" | "assignee" | "commenter" | "mentioned" | "manual";
created_at: string;
}

View file

@ -1,377 +0,0 @@
package main
import (
"context"
"log/slog"
"regexp"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// mention represents a parsed @mention from markdown content.
type mention struct {
Type string // "member" or "agent"
ID string // user_id or agent_id
}
// mentionRe matches [@Label](mention://type/id) in markdown.
var mentionRe = regexp.MustCompile(`\[@[^\]]*\]\(mention://(member|agent)/([0-9a-fA-F-]+)\)`)
// parseMentions extracts mentions from markdown content.
func parseMentions(content string) []mention {
matches := mentionRe.FindAllStringSubmatch(content, -1)
seen := make(map[string]bool)
var result []mention
for _, m := range matches {
key := m[1] + ":" + m[2]
if seen[key] {
continue
}
seen[key] = true
result = append(result, mention{Type: m[1], ID: m[2]})
}
return result
}
// notifyMentionedMembers creates inbox items for each @mentioned member,
// excluding the actor and any IDs in the skip set.
func notifyMentionedMembers(
bus *events.Bus,
queries *db.Queries,
e events.Event,
mentions []mention,
issueID string,
issueTitle string,
issueStatus string,
title string,
skip map[string]bool,
) {
for _, m := range mentions {
if m.Type != "member" {
continue
}
if m.ID == e.ActorID || skip[m.ID] {
continue
}
item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(m.ID),
Type: "mentioned",
Severity: "info",
IssueID: parseUUID(issueID),
Title: title,
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err != nil {
slog.Error("mention inbox creation failed", "mentioned_id", m.ID, "error", err)
continue
}
resp := inboxItemToResponse(item)
resp["issue_status"] = issueStatus
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": resp},
})
}
}
// registerInboxListeners wires up event bus listeners that create inbox
// notifications. This replaces the inline CreateInboxItem calls that were
// previously scattered across issue and comment handlers.
func registerInboxListeners(bus *events.Bus, queries *db.Queries) {
// issue:created — notify assignee about new assignment + @mentions in description
bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
// Track who already got notified to avoid duplicates
skip := map[string]bool{e.ActorID: true}
// Notify assignee
if issue.AssigneeType != nil && issue.AssigneeID != nil {
skip[*issue.AssigneeID] = true
item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(issue.WorkspaceID),
RecipientType: *issue.AssigneeType,
RecipientID: parseUUID(*issue.AssigneeID),
Type: "issue_assigned",
Severity: "action_required",
IssueID: parseUUID(issue.ID),
Title: "New issue assigned: " + issue.Title,
Body: util.PtrToText(issue.Description),
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err != nil {
slog.Error("inbox item creation failed", "event", "issue:created", "error", err)
} else {
resp := inboxItemToResponse(item)
resp["issue_status"] = issue.Status
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": resp},
})
}
}
// Notify @mentions in description
if issue.Description != nil && *issue.Description != "" {
mentions := parseMentions(*issue.Description)
notifyMentionedMembers(bus, queries, e, mentions, issue.ID, issue.Title, issue.Status,
"Mentioned in: "+issue.Title, skip)
}
})
// issue:updated — notify on assignee change and status change
bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
assigneeChanged, _ := payload["assignee_changed"].(bool)
statusChanged, _ := payload["status_changed"].(bool)
descriptionChanged, _ := payload["description_changed"].(bool)
prevAssigneeType, _ := payload["prev_assignee_type"].(*string)
prevAssigneeID, _ := payload["prev_assignee_id"].(*string)
prevDescription, _ := payload["prev_description"].(*string)
creatorType, _ := payload["creator_type"].(string)
creatorID, _ := payload["creator_id"].(string)
actorID := e.ActorID // the user who made the change
if assigneeChanged {
// Notify old assignee about unassignment
if prevAssigneeType != nil && prevAssigneeID != nil &&
*prevAssigneeType == "member" && *prevAssigneeID != actorID {
oldItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(*prevAssigneeID),
Type: "status_change",
Severity: "info",
IssueID: parseUUID(issue.ID),
Title: "Unassigned from: " + issue.Title,
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err == nil {
oldResp := inboxItemToResponse(oldItem)
oldResp["issue_status"] = issue.Status
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": oldResp},
})
}
}
// Notify new assignee about assignment
if issue.AssigneeType != nil && issue.AssigneeID != nil {
newItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: *issue.AssigneeType,
RecipientID: parseUUID(*issue.AssigneeID),
Type: "issue_assigned",
Severity: "action_required",
IssueID: parseUUID(issue.ID),
Title: "Assigned to you: " + issue.Title,
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err == nil {
newResp := inboxItemToResponse(newItem)
newResp["issue_status"] = issue.Status
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": newResp},
})
}
}
}
if statusChanged {
// Notify assignee about status change
if issue.AssigneeType != nil && issue.AssigneeID != nil {
aItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: *issue.AssigneeType,
RecipientID: parseUUID(*issue.AssigneeID),
Type: "status_change",
Severity: "info",
IssueID: parseUUID(issue.ID),
Title: issue.Title + " moved to " + issue.Status,
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err == nil {
aResp := inboxItemToResponse(aItem)
aResp["issue_status"] = issue.Status
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": aResp},
})
}
}
// Notify creator about status change (if creator is member and != the person making change)
if creatorType == "member" && creatorID != actorID {
// Don't double-notify if creator is also the assignee
isAlsoAssignee := prevAssigneeID != nil && *prevAssigneeID == creatorID
if !isAlsoAssignee {
cItem, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(creatorID),
Type: "status_change",
Severity: "info",
IssueID: parseUUID(issue.ID),
Title: "Status changed: " + issue.Title,
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err == nil {
cResp := inboxItemToResponse(cItem)
cResp["issue_status"] = issue.Status
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: actorID,
Payload: map[string]any{"item": cResp},
})
}
}
}
}
// Notify NEW @mentions in description (only mentions that weren't in previous description)
if descriptionChanged && issue.Description != nil {
newMentions := parseMentions(*issue.Description)
if len(newMentions) > 0 {
// Build set of previously mentioned IDs
prevMentioned := map[string]bool{}
if prevDescription != nil {
for _, m := range parseMentions(*prevDescription) {
prevMentioned[m.Type+":"+m.ID] = true
}
}
// Filter to only new mentions
var added []mention
for _, m := range newMentions {
if !prevMentioned[m.Type+":"+m.ID] {
added = append(added, m)
}
}
skip := map[string]bool{actorID: true}
notifyMentionedMembers(bus, queries, e, added, issue.ID, issue.Title, issue.Status,
"Mentioned in: "+issue.Title, skip)
}
}
})
// comment:created — notify issue assignee + @mentions in comment
bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
comment, ok := payload["comment"].(handler.CommentResponse)
if !ok {
return
}
issueTitle, _ := payload["issue_title"].(string)
issueAssigneeType, _ := payload["issue_assignee_type"].(*string)
issueAssigneeID, _ := payload["issue_assignee_id"].(*string)
issueStatus, _ := payload["issue_status"].(string)
// Track who already got notified
skip := map[string]bool{e.ActorID: true}
// Notify assignee (if member and not the commenter)
if issueAssigneeType != nil && issueAssigneeID != nil &&
*issueAssigneeType == "member" && *issueAssigneeID != e.ActorID {
skip[*issueAssigneeID] = true
item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(*issueAssigneeID),
Type: "mentioned",
Severity: "info",
IssueID: parseUUID(comment.IssueID),
Title: "New comment on: " + issueTitle,
Body: util.StrToText(comment.Content),
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err != nil {
slog.Error("inbox item creation failed", "event", "comment:created", "error", err)
} else {
commentResp := inboxItemToResponse(item)
commentResp["issue_status"] = issueStatus
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": commentResp},
})
}
}
// Notify @mentions in comment content
mentions := parseMentions(comment.Content)
notifyMentionedMembers(bus, queries, e, mentions, comment.IssueID, issueTitle, issueStatus,
"Mentioned in comment: "+issueTitle, skip)
})
}
// inboxItemToResponse converts a db.InboxItem into a map suitable for
// JSON-serializable event payloads (mirrors handler.inboxToResponse fields).
func inboxItemToResponse(item db.InboxItem) map[string]any {
return map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
"actor_type": util.TextToPtr(item.ActorType),
"actor_id": util.UUIDToPtr(item.ActorID),
}
}

View file

@ -35,6 +35,8 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) {
protocol.EventMemberAdded,
protocol.EventMemberUpdated,
protocol.EventMemberRemoved,
protocol.EventSubscriberAdded,
protocol.EventSubscriberRemoved,
}
for _, et := range allEvents {

View file

@ -51,7 +51,8 @@ func main() {
registerListeners(bus, hub)
queries := db.New(pool)
registerInboxListeners(bus, queries)
registerSubscriberListeners(bus, queries)
registerNotificationListeners(bus, queries)
r := NewRouter(pool, hub, bus)

View file

@ -0,0 +1,458 @@
package main
import (
"context"
"log/slog"
"regexp"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// mention represents a parsed @mention from markdown content.
type mention struct {
Type string // "member" or "agent"
ID string // user_id or agent_id
}
// mentionRe matches [@Label](mention://type/id) in markdown.
var mentionRe = regexp.MustCompile(`\[@[^\]]*\]\(mention://(member|agent)/([0-9a-fA-F-]+)\)`)
// parseMentions extracts mentions from markdown content.
func parseMentions(content string) []mention {
matches := mentionRe.FindAllStringSubmatch(content, -1)
seen := make(map[string]bool)
var result []mention
for _, m := range matches {
key := m[1] + ":" + m[2]
if seen[key] {
continue
}
seen[key] = true
result = append(result, mention{Type: m[1], ID: m[2]})
}
return result
}
// notifySubscribers queries the subscriber table for an issue, excludes the
// actor and any extra IDs, and creates inbox items for each remaining member
// subscriber. Publishes an inbox:new event for each notification.
func notifySubscribers(
ctx context.Context,
queries *db.Queries,
bus *events.Bus,
issueID string,
workspaceID string,
e events.Event,
exclude map[string]bool,
notifType string,
severity string,
title string,
body string,
) {
subs, err := queries.ListIssueSubscribers(ctx, parseUUID(issueID))
if err != nil {
slog.Error("failed to list subscribers for notification",
"issue_id", issueID, "error", err)
return
}
for _, sub := range subs {
// Only notify member-type subscribers (not agents)
if sub.UserType != "member" {
continue
}
subID := util.UUIDToString(sub.UserID)
// Skip the actor
if subID == e.ActorID {
continue
}
// Skip any extra excluded IDs
if exclude[subID] {
continue
}
item, err := queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: parseUUID(workspaceID),
RecipientType: "member",
RecipientID: sub.UserID,
Type: notifType,
Severity: severity,
IssueID: parseUUID(issueID),
Title: title,
Body: util.StrToText(body),
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err != nil {
slog.Error("subscriber notification creation failed",
"subscriber_id", subID, "type", notifType, "error", err)
continue
}
resp := inboxItemToResponse(item)
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: workspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": resp},
})
}
}
// notifyDirect creates an inbox item for a specific recipient. Skips if the
// recipient is the actor. Publishes an inbox:new event on success.
func notifyDirect(
ctx context.Context,
queries *db.Queries,
bus *events.Bus,
recipientType string,
recipientID string,
workspaceID string,
e events.Event,
issueID string,
notifType string,
severity string,
title string,
body string,
) {
// Skip if recipient is the actor
if recipientID == e.ActorID {
return
}
item, err := queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: parseUUID(workspaceID),
RecipientType: recipientType,
RecipientID: parseUUID(recipientID),
Type: notifType,
Severity: severity,
IssueID: parseUUID(issueID),
Title: title,
Body: util.StrToText(body),
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err != nil {
slog.Error("direct notification creation failed",
"recipient_id", recipientID, "type", notifType, "error", err)
return
}
resp := inboxItemToResponse(item)
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: workspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": resp},
})
}
// notifyMentionedMembers creates inbox items for each @mentioned member,
// excluding the actor and any IDs in the skip set.
func notifyMentionedMembers(
bus *events.Bus,
queries *db.Queries,
e events.Event,
mentions []mention,
issueID string,
issueTitle string,
issueStatus string,
title string,
skip map[string]bool,
) {
for _, m := range mentions {
if m.Type != "member" {
continue
}
if m.ID == e.ActorID || skip[m.ID] {
continue
}
item, err := queries.CreateInboxItem(context.Background(), db.CreateInboxItemParams{
WorkspaceID: parseUUID(e.WorkspaceID),
RecipientType: "member",
RecipientID: parseUUID(m.ID),
Type: "mentioned",
Severity: "info",
IssueID: parseUUID(issueID),
Title: title,
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
})
if err != nil {
slog.Error("mention inbox creation failed", "mentioned_id", m.ID, "error", err)
continue
}
resp := inboxItemToResponse(item)
resp["issue_status"] = issueStatus
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: e.WorkspaceID,
ActorType: e.ActorType,
ActorID: e.ActorID,
Payload: map[string]any{"item": resp},
})
}
}
// registerNotificationListeners wires up event bus listeners that create inbox
// notifications using the subscriber table. This replaces the old hardcoded
// notification logic from inbox_listeners.go.
func registerNotificationListeners(bus *events.Bus, queries *db.Queries) {
ctx := context.Background()
// issue:created — Direct notification to assignee if assignee != actor
bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
// Track who already got notified to avoid duplicates
skip := map[string]bool{e.ActorID: true}
// Direct notification to assignee
if issue.AssigneeType != nil && issue.AssigneeID != nil {
skip[*issue.AssigneeID] = true
notifyDirect(ctx, queries, bus,
*issue.AssigneeType, *issue.AssigneeID,
issue.WorkspaceID, e, issue.ID,
"issue_assigned", "action_required",
"New issue assigned: "+issue.Title,
"",
)
}
// Notify @mentions in description
if issue.Description != nil && *issue.Description != "" {
mentions := parseMentions(*issue.Description)
notifyMentionedMembers(bus, queries, e, mentions, issue.ID, issue.Title, issue.Status,
"Mentioned in: "+issue.Title, skip)
}
})
// issue:updated — handle assignee changes and status changes
bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
assigneeChanged, _ := payload["assignee_changed"].(bool)
statusChanged, _ := payload["status_changed"].(bool)
descriptionChanged, _ := payload["description_changed"].(bool)
prevAssigneeType, _ := payload["prev_assignee_type"].(*string)
prevAssigneeID, _ := payload["prev_assignee_id"].(*string)
prevDescription, _ := payload["prev_description"].(*string)
if assigneeChanged {
// Direct: notify new assignee about assignment
if issue.AssigneeType != nil && issue.AssigneeID != nil {
notifyDirect(ctx, queries, bus,
*issue.AssigneeType, *issue.AssigneeID,
e.WorkspaceID, e, issue.ID,
"issue_assigned", "action_required",
"Assigned to you: "+issue.Title,
"",
)
}
// Direct: notify old assignee about unassignment
if prevAssigneeType != nil && prevAssigneeID != nil && *prevAssigneeType == "member" {
notifyDirect(ctx, queries, bus,
"member", *prevAssigneeID,
e.WorkspaceID, e, issue.ID,
"unassigned", "info",
"Unassigned from: "+issue.Title,
"",
)
}
// Subscriber: notify remaining subscribers about assignee change,
// excluding actor, old assignee, and new assignee
exclude := map[string]bool{}
if prevAssigneeID != nil {
exclude[*prevAssigneeID] = true
}
if issue.AssigneeID != nil {
exclude[*issue.AssigneeID] = true
}
notifySubscribers(ctx, queries, bus, issue.ID, e.WorkspaceID, e,
exclude, "assignee_changed", "info",
"Assignee changed: "+issue.Title, "")
}
if statusChanged {
// Subscriber: notify all subscribers except actor
notifySubscribers(ctx, queries, bus, issue.ID, e.WorkspaceID, e,
nil, "status_changed", "info",
issue.Title+" moved to "+issue.Status, "")
}
// Notify NEW @mentions in description
if descriptionChanged && issue.Description != nil {
newMentions := parseMentions(*issue.Description)
if len(newMentions) > 0 {
prevMentioned := map[string]bool{}
if prevDescription != nil {
for _, m := range parseMentions(*prevDescription) {
prevMentioned[m.Type+":"+m.ID] = true
}
}
var added []mention
for _, m := range newMentions {
if !prevMentioned[m.Type+":"+m.ID] {
added = append(added, m)
}
}
skip := map[string]bool{e.ActorID: true}
notifyMentionedMembers(bus, queries, e, added, issue.ID, issue.Title, issue.Status,
"Mentioned in: "+issue.Title, skip)
}
}
})
// comment:created — notify all subscribers except the commenter
bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
// The comment payload can come as handler.CommentResponse from the
// HTTP handler, or as map[string]any from the agent comment path in
// task.go. Handle both.
var issueID, commentContent string
switch c := payload["comment"].(type) {
case handler.CommentResponse:
issueID = c.IssueID
commentContent = c.Content
case map[string]any:
issueID, _ = c["issue_id"].(string)
commentContent, _ = c["content"].(string)
default:
return
}
issueTitle, _ := payload["issue_title"].(string)
notifySubscribers(ctx, queries, bus, issueID, e.WorkspaceID, e,
nil, "new_comment", "info",
"New comment on: "+issueTitle, commentContent)
// Notify @mentions in comment content
mentions := parseMentions(commentContent)
if len(mentions) > 0 {
issueStatus, _ := payload["issue_status"].(string)
skip := map[string]bool{e.ActorID: true}
notifyMentionedMembers(bus, queries, e, mentions, issueID, issueTitle, issueStatus,
"Mentioned in comment: "+issueTitle, skip)
}
})
// task:completed — notify all subscribers except the agent
bus.Subscribe(protocol.EventTaskCompleted, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
agentID, _ := payload["agent_id"].(string)
issueID, _ := payload["issue_id"].(string)
if issueID == "" {
return
}
// Look up issue to get the title
issue, err := queries.GetIssue(ctx, parseUUID(issueID))
if err != nil {
slog.Error("task:completed notification: failed to get issue", "issue_id", issueID, "error", err)
return
}
// Use the agent ID as an exclusion (since the agent did the work)
exclude := map[string]bool{}
if agentID != "" {
exclude[agentID] = true
}
notifySubscribers(ctx, queries, bus, issueID, e.WorkspaceID,
events.Event{
Type: e.Type,
WorkspaceID: e.WorkspaceID,
ActorType: "agent",
ActorID: agentID,
},
exclude, "task_completed", "attention",
"Task completed: "+issue.Title, "")
})
// task:failed — notify all subscribers except the agent
bus.Subscribe(protocol.EventTaskFailed, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
agentID, _ := payload["agent_id"].(string)
issueID, _ := payload["issue_id"].(string)
if issueID == "" {
return
}
issue, err := queries.GetIssue(ctx, parseUUID(issueID))
if err != nil {
slog.Error("task:failed notification: failed to get issue", "issue_id", issueID, "error", err)
return
}
exclude := map[string]bool{}
if agentID != "" {
exclude[agentID] = true
}
notifySubscribers(ctx, queries, bus, issueID, e.WorkspaceID,
events.Event{
Type: e.Type,
WorkspaceID: e.WorkspaceID,
ActorType: "agent",
ActorID: agentID,
},
exclude, "task_failed", "action_required",
"Task failed: "+issue.Title, "")
})
}
// inboxItemToResponse converts a db.InboxItem into a map suitable for
// JSON-serializable event payloads (mirrors handler.inboxToResponse fields).
func inboxItemToResponse(item db.InboxItem) map[string]any {
return map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
"actor_type": util.TextToPtr(item.ActorType),
"actor_id": util.UUIDToPtr(item.ActorID),
}
}

View file

@ -0,0 +1,552 @@
package main
import (
"context"
"testing"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// notificationTest helpers — reuse the integration test fixtures from TestMain
// (testPool, testUserID, testWorkspaceID are set in integration_test.go).
// inboxItemsForRecipient returns all non-archived inbox items for a given recipient.
func inboxItemsForRecipient(t *testing.T, queries *db.Queries, recipientID string) []db.ListInboxItemsRow {
t.Helper()
items, err := queries.ListInboxItems(context.Background(), db.ListInboxItemsParams{
RecipientType: "member",
RecipientID: util.ParseUUID(recipientID),
Limit: 100,
Offset: 0,
})
if err != nil {
t.Fatalf("ListInboxItems: %v", err)
}
return items
}
// cleanupInboxForIssue deletes all inbox items related to a given issue.
func cleanupInboxForIssue(t *testing.T, issueID string) {
t.Helper()
testPool.Exec(context.Background(), `DELETE FROM inbox_item WHERE issue_id = $1`, issueID)
}
// addTestSubscriber manually inserts a subscriber for an issue.
func addTestSubscriber(t *testing.T, issueID, userType, userID, reason string) {
t.Helper()
_, err := testPool.Exec(context.Background(), `
INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason)
VALUES ($1, $2, $3, $4)
ON CONFLICT (issue_id, user_type, user_id) DO NOTHING
`, issueID, userType, userID, reason)
if err != nil {
t.Fatalf("addTestSubscriber: %v", err)
}
}
// newNotificationBus creates a bus with subscriber + notification listeners registered.
func newNotificationBus(t *testing.T, queries *db.Queries) *events.Bus {
t.Helper()
bus := events.New()
registerSubscriberListeners(bus, queries)
registerNotificationListeners(bus, queries)
return bus
}
// TestNotification_IssueCreated_AssigneeNotified verifies that when an issue is
// created with an assignee different from the creator, the assignee receives an
// "issue_assigned" inbox notification and the creator receives nothing.
func TestNotification_IssueCreated_AssigneeNotified(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
assigneeEmail := "notif-assignee-created@multica.ai"
assigneeID := createTestUser(t, assigneeEmail)
t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
// Track inbox:new events
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
assigneeType := "member"
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "notif test issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
AssigneeType: &assigneeType,
AssigneeID: &assigneeID,
},
},
})
// Assignee should have an inbox item
items := inboxItemsForRecipient(t, queries, assigneeID)
if len(items) != 1 {
t.Fatalf("expected 1 inbox item for assignee, got %d", len(items))
}
if items[0].Type != "issue_assigned" {
t.Fatalf("expected type 'issue_assigned', got %q", items[0].Type)
}
if items[0].Severity != "action_required" {
t.Fatalf("expected severity 'action_required', got %q", items[0].Severity)
}
// Creator (actor) should NOT have any inbox items
creatorItems := inboxItemsForRecipient(t, queries, testUserID)
if len(creatorItems) != 0 {
t.Fatalf("expected 0 inbox items for creator, got %d", len(creatorItems))
}
// At least one inbox:new event should have been published
if len(inboxEvents) < 1 {
t.Fatal("expected at least 1 inbox:new event")
}
}
// TestNotification_IssueCreated_SelfAssign verifies that when the creator
// assigns the issue to themselves, no notification is generated.
func TestNotification_IssueCreated_SelfAssign(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
assigneeType := "member"
assigneeID := testUserID // self-assign
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "self-assign issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
AssigneeType: &assigneeType,
AssigneeID: &assigneeID,
},
},
})
items := inboxItemsForRecipient(t, queries, testUserID)
if len(items) != 0 {
t.Fatalf("expected 0 inbox items for self-assign, got %d", len(items))
}
if len(inboxEvents) != 0 {
t.Fatalf("expected 0 inbox:new events for self-assign, got %d", len(inboxEvents))
}
}
// TestNotification_IssueCreated_NoAssignee verifies that when an issue is
// created without an assignee, no notifications are generated.
func TestNotification_IssueCreated_NoAssignee(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "no assignee issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
},
},
})
items := inboxItemsForRecipient(t, queries, testUserID)
if len(items) != 0 {
t.Fatalf("expected 0 inbox items for no-assignee issue, got %d", len(items))
}
if len(inboxEvents) != 0 {
t.Fatalf("expected 0 inbox:new events, got %d", len(inboxEvents))
}
}
// TestNotification_StatusChanged verifies that all subscribers except the actor
// receive a "status_changed" notification when an issue status changes.
func TestNotification_StatusChanged(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
// Create two extra users as subscribers
sub1Email := "notif-sub1-status@multica.ai"
sub1ID := createTestUser(t, sub1Email)
t.Cleanup(func() { cleanupTestUser(t, sub1Email) })
sub2Email := "notif-sub2-status@multica.ai"
sub2ID := createTestUser(t, sub2Email)
t.Cleanup(func() { cleanupTestUser(t, sub2Email) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
// Manually add subscribers before the event fires
addTestSubscriber(t, issueID, "member", testUserID, "creator")
addTestSubscriber(t, issueID, "member", sub1ID, "assignee")
addTestSubscriber(t, issueID, "member", sub2ID, "commenter")
bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID, // actor is the creator
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "status test issue",
Status: "in_progress",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
},
"assignee_changed": false,
"status_changed": true,
},
})
// Actor (testUserID) should NOT get a notification
actorItems := inboxItemsForRecipient(t, queries, testUserID)
if len(actorItems) != 0 {
t.Fatalf("expected 0 inbox items for actor, got %d", len(actorItems))
}
// sub1 should get a status_changed notification
sub1Items := inboxItemsForRecipient(t, queries, sub1ID)
if len(sub1Items) != 1 {
t.Fatalf("expected 1 inbox item for sub1, got %d", len(sub1Items))
}
if sub1Items[0].Type != "status_changed" {
t.Fatalf("expected type 'status_changed', got %q", sub1Items[0].Type)
}
if sub1Items[0].Severity != "info" {
t.Fatalf("expected severity 'info', got %q", sub1Items[0].Severity)
}
// sub2 should also get a status_changed notification
sub2Items := inboxItemsForRecipient(t, queries, sub2ID)
if len(sub2Items) != 1 {
t.Fatalf("expected 1 inbox item for sub2, got %d", len(sub2Items))
}
if sub2Items[0].Type != "status_changed" {
t.Fatalf("expected type 'status_changed', got %q", sub2Items[0].Type)
}
}
// TestNotification_CommentCreated verifies that all subscribers except the
// commenter receive a "new_comment" notification.
func TestNotification_CommentCreated(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
commenterEmail := "notif-commenter@multica.ai"
commenterID := createTestUser(t, commenterEmail)
t.Cleanup(func() { cleanupTestUser(t, commenterEmail) })
sub1Email := "notif-sub1-comment@multica.ai"
sub1ID := createTestUser(t, sub1Email)
t.Cleanup(func() { cleanupTestUser(t, sub1Email) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
// Pre-add subscribers: creator and sub1. The commenter will also be added
// by subscriber_listeners when the event fires.
addTestSubscriber(t, issueID, "member", testUserID, "creator")
addTestSubscriber(t, issueID, "member", sub1ID, "assignee")
bus.Publish(events.Event{
Type: protocol.EventCommentCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: commenterID, // commenter is the actor
Payload: map[string]any{
"comment": handler.CommentResponse{
ID: "00000000-0000-0000-0000-000000000000",
IssueID: issueID,
AuthorType: "member",
AuthorID: commenterID,
Content: "test comment content",
Type: "comment",
},
"issue_title": "comment test issue",
"issue_status": "todo",
},
})
// Creator should get a new_comment notification
creatorItems := inboxItemsForRecipient(t, queries, testUserID)
if len(creatorItems) != 1 {
t.Fatalf("expected 1 inbox item for creator, got %d", len(creatorItems))
}
if creatorItems[0].Type != "new_comment" {
t.Fatalf("expected type 'new_comment', got %q", creatorItems[0].Type)
}
if creatorItems[0].Severity != "info" {
t.Fatalf("expected severity 'info', got %q", creatorItems[0].Severity)
}
// sub1 should also get a new_comment notification
sub1Items := inboxItemsForRecipient(t, queries, sub1ID)
if len(sub1Items) != 1 {
t.Fatalf("expected 1 inbox item for sub1, got %d", len(sub1Items))
}
if sub1Items[0].Type != "new_comment" {
t.Fatalf("expected type 'new_comment', got %q", sub1Items[0].Type)
}
// Commenter (actor) should NOT get a notification
commenterItems := inboxItemsForRecipient(t, queries, commenterID)
if len(commenterItems) != 0 {
t.Fatalf("expected 0 inbox items for commenter, got %d", len(commenterItems))
}
}
// TestNotification_AssigneeChanged verifies the full assignee change flow:
// - New assignee gets "issue_assigned" (Direct)
// - Old assignee gets "unassigned" (Direct)
// - Other subscribers get "assignee_changed" (Subscriber), excluding actor + old + new
// - Actor gets nothing
func TestNotification_AssigneeChanged(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
oldAssigneeEmail := "notif-old-assignee@multica.ai"
oldAssigneeID := createTestUser(t, oldAssigneeEmail)
t.Cleanup(func() { cleanupTestUser(t, oldAssigneeEmail) })
newAssigneeEmail := "notif-new-assignee@multica.ai"
newAssigneeID := createTestUser(t, newAssigneeEmail)
t.Cleanup(func() { cleanupTestUser(t, newAssigneeEmail) })
bystanderEmail := "notif-bystander@multica.ai"
bystanderID := createTestUser(t, bystanderEmail)
t.Cleanup(func() { cleanupTestUser(t, bystanderEmail) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
// Pre-add subscribers: creator, old assignee, bystander
addTestSubscriber(t, issueID, "member", testUserID, "creator")
addTestSubscriber(t, issueID, "member", oldAssigneeID, "assignee")
addTestSubscriber(t, issueID, "member", bystanderID, "commenter")
newAssigneeType := "member"
oldAssigneeType := "member"
bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID, // actor is the creator
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "assignee change issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
AssigneeType: &newAssigneeType,
AssigneeID: &newAssigneeID,
},
"assignee_changed": true,
"status_changed": false,
"prev_assignee_type": &oldAssigneeType,
"prev_assignee_id": &oldAssigneeID,
},
})
// New assignee should get "issue_assigned"
newItems := inboxItemsForRecipient(t, queries, newAssigneeID)
if len(newItems) != 1 {
t.Fatalf("expected 1 inbox item for new assignee, got %d", len(newItems))
}
if newItems[0].Type != "issue_assigned" {
t.Fatalf("expected type 'issue_assigned', got %q", newItems[0].Type)
}
if newItems[0].Severity != "action_required" {
t.Fatalf("expected severity 'action_required', got %q", newItems[0].Severity)
}
// Old assignee should get "unassigned"
oldItems := inboxItemsForRecipient(t, queries, oldAssigneeID)
if len(oldItems) != 1 {
t.Fatalf("expected 1 inbox item for old assignee, got %d", len(oldItems))
}
if oldItems[0].Type != "unassigned" {
t.Fatalf("expected type 'unassigned', got %q", oldItems[0].Type)
}
if oldItems[0].Severity != "info" {
t.Fatalf("expected severity 'info', got %q", oldItems[0].Severity)
}
// Bystander should get "assignee_changed"
bystanderItems := inboxItemsForRecipient(t, queries, bystanderID)
if len(bystanderItems) != 1 {
t.Fatalf("expected 1 inbox item for bystander, got %d", len(bystanderItems))
}
if bystanderItems[0].Type != "assignee_changed" {
t.Fatalf("expected type 'assignee_changed', got %q", bystanderItems[0].Type)
}
if bystanderItems[0].Severity != "info" {
t.Fatalf("expected severity 'info', got %q", bystanderItems[0].Severity)
}
// Actor (testUserID / creator) should NOT get any notification
actorItems := inboxItemsForRecipient(t, queries, testUserID)
if len(actorItems) != 0 {
t.Fatalf("expected 0 inbox items for actor, got %d", len(actorItems))
}
}
// TestNotification_TaskCompleted verifies that subscribers get a "task_completed"
// notification when a task completes, excluding the agent.
func TestNotification_TaskCompleted(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
// The agent ID (acting as system actor)
agentID := "00000000-0000-0000-0000-aaaaaaaaaaaa"
// Pre-add subscribers: creator and the agent
addTestSubscriber(t, issueID, "member", testUserID, "creator")
addTestSubscriber(t, issueID, "agent", agentID, "assignee")
bus.Publish(events.Event{
Type: protocol.EventTaskCompleted,
WorkspaceID: testWorkspaceID,
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"task_id": "00000000-0000-0000-0000-bbbbbbbbbbbb",
"agent_id": agentID,
"issue_id": issueID,
"status": "completed",
},
})
// Creator should get a task_completed notification
creatorItems := inboxItemsForRecipient(t, queries, testUserID)
if len(creatorItems) != 1 {
t.Fatalf("expected 1 inbox item for creator, got %d", len(creatorItems))
}
if creatorItems[0].Type != "task_completed" {
t.Fatalf("expected type 'task_completed', got %q", creatorItems[0].Type)
}
if creatorItems[0].Severity != "attention" {
t.Fatalf("expected severity 'attention', got %q", creatorItems[0].Severity)
}
}
// TestNotification_TaskFailed verifies that subscribers get a "task_failed"
// notification when a task fails, excluding the agent.
func TestNotification_TaskFailed(t *testing.T) {
queries := db.New(testPool)
bus := newNotificationBus(t, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupInboxForIssue(t, issueID)
cleanupTestIssue(t, issueID)
})
agentID := "00000000-0000-0000-0000-aaaaaaaaaaaa"
addTestSubscriber(t, issueID, "member", testUserID, "creator")
addTestSubscriber(t, issueID, "agent", agentID, "assignee")
bus.Publish(events.Event{
Type: protocol.EventTaskFailed,
WorkspaceID: testWorkspaceID,
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"task_id": "00000000-0000-0000-0000-bbbbbbbbbbbb",
"agent_id": agentID,
"issue_id": issueID,
"status": "failed",
},
})
creatorItems := inboxItemsForRecipient(t, queries, testUserID)
if len(creatorItems) != 1 {
t.Fatalf("expected 1 inbox item for creator, got %d", len(creatorItems))
}
if creatorItems[0].Type != "task_failed" {
t.Fatalf("expected type 'task_failed', got %q", creatorItems[0].Type)
}
if creatorItems[0].Severity != "action_required" {
t.Fatalf("expected severity 'action_required', got %q", creatorItems[0].Severity)
}
}

View file

@ -117,6 +117,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Delete("/", h.DeleteIssue)
r.Post("/comments", h.CreateComment)
r.Get("/comments", h.ListComments)
r.Get("/subscribers", h.ListIssueSubscribers)
r.Post("/subscribe", h.SubscribeToIssue)
r.Post("/unsubscribe", h.UnsubscribeFromIssue)
})
})

View file

@ -0,0 +1,116 @@
package main
import (
"context"
"log/slog"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// registerSubscriberListeners wires up event bus listeners that auto-subscribe
// relevant users to issues. This ensures creators, assignees, and commenters
// are automatically tracked as issue subscribers.
func registerSubscriberListeners(bus *events.Bus, queries *db.Queries) {
// issue:created — subscribe creator + assignee (if different)
bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
// Subscribe the creator
addSubscriber(bus, queries, e.WorkspaceID, issue.ID, issue.CreatorType, issue.CreatorID, "creator")
// Subscribe the assignee if exists and different from creator
if issue.AssigneeType != nil && issue.AssigneeID != nil &&
!(*issue.AssigneeType == issue.CreatorType && *issue.AssigneeID == issue.CreatorID) {
addSubscriber(bus, queries, e.WorkspaceID, issue.ID, *issue.AssigneeType, *issue.AssigneeID, "assignee")
}
})
// issue:updated — subscribe new assignee if assignee changed
bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
issue, ok := payload["issue"].(handler.IssueResponse)
if !ok {
return
}
assigneeChanged, _ := payload["assignee_changed"].(bool)
if !assigneeChanged {
return
}
if issue.AssigneeType != nil && issue.AssigneeID != nil {
addSubscriber(bus, queries, e.WorkspaceID, issue.ID, *issue.AssigneeType, *issue.AssigneeID, "assignee")
}
})
// comment:created — subscribe the commenter
bus.Subscribe(protocol.EventCommentCreated, func(e events.Event) {
payload, ok := e.Payload.(map[string]any)
if !ok {
return
}
// Comments created via handler use CommentResponse; agent comments from task.go use map[string]any
var issueID, authorType, authorID string
if comment, ok := payload["comment"].(handler.CommentResponse); ok {
issueID = comment.IssueID
authorType = comment.AuthorType
authorID = comment.AuthorID
} else if commentMap, ok := payload["comment"].(map[string]any); ok {
issueID, _ = commentMap["issue_id"].(string)
authorType, _ = commentMap["author_type"].(string)
authorID, _ = commentMap["author_id"].(string)
} else {
return
}
if issueID == "" || authorID == "" {
return
}
addSubscriber(bus, queries, e.WorkspaceID, issueID, authorType, authorID, "commenter")
})
}
// addSubscriber adds a user as an issue subscriber and publishes a
// subscriber:added event for real-time frontend sync.
func addSubscriber(bus *events.Bus, queries *db.Queries, workspaceID, issueID, userType, userID, reason string) {
err := queries.AddIssueSubscriber(context.Background(), db.AddIssueSubscriberParams{
IssueID: parseUUID(issueID),
UserType: userType,
UserID: parseUUID(userID),
Reason: reason,
})
if err != nil {
slog.Error("failed to add issue subscriber",
"issue_id", issueID,
"user_type", userType,
"user_id", userID,
"reason", reason,
"error", err,
)
return
}
bus.Publish(events.Event{
Type: protocol.EventSubscriberAdded,
WorkspaceID: workspaceID,
Payload: map[string]any{
"issue_id": issueID,
"user_type": userType,
"user_id": userID,
"reason": reason,
},
})
}

View file

@ -0,0 +1,377 @@
package main
import (
"context"
"testing"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// subscriberTest helpers — reuse the integration test fixtures from TestMain
// (testPool, testUserID, testWorkspaceID are set in integration_test.go).
// createTestIssue inserts a minimal issue and returns its UUID string.
func createTestIssue(t *testing.T, workspaceID, creatorID string) string {
t.Helper()
ctx := context.Background()
var issueID string
err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, position)
VALUES ($1, 'subscriber test issue', 'todo', 'medium', 'member', $2, 0)
RETURNING id
`, workspaceID, creatorID).Scan(&issueID)
if err != nil {
t.Fatalf("createTestIssue: %v", err)
}
return issueID
}
// createTestUser inserts a user with the given email and returns the UUID string.
func createTestUser(t *testing.T, email string) string {
t.Helper()
ctx := context.Background()
var userID string
err := testPool.QueryRow(ctx, `
INSERT INTO "user" (name, email)
VALUES ($1, $2)
RETURNING id
`, "Subscriber Test User", email).Scan(&userID)
if err != nil {
t.Fatalf("createTestUser: %v", err)
}
return userID
}
func cleanupTestIssue(t *testing.T, issueID string) {
t.Helper()
testPool.Exec(context.Background(), `DELETE FROM issue WHERE id = $1`, issueID)
}
func cleanupTestUser(t *testing.T, email string) {
t.Helper()
testPool.Exec(context.Background(), `DELETE FROM "user" WHERE email = $1`, email)
}
func isSubscribed(t *testing.T, queries *db.Queries, issueID, userType, userID string) bool {
t.Helper()
subscribed, err := queries.IsIssueSubscriber(context.Background(), db.IsIssueSubscriberParams{
IssueID: util.ParseUUID(issueID),
UserType: userType,
UserID: util.ParseUUID(userID),
})
if err != nil {
t.Fatalf("IsIssueSubscriber: %v", err)
}
return subscribed
}
func subscriberCount(t *testing.T, queries *db.Queries, issueID string) int {
t.Helper()
subs, err := queries.ListIssueSubscribers(context.Background(), util.ParseUUID(issueID))
if err != nil {
t.Fatalf("ListIssueSubscribers: %v", err)
}
return len(subs)
}
func TestSubscriberIssueCreated_CreatorSubscribed(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
// Publish issue:created event with no assignee
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "test issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
},
},
})
if !isSubscribed(t, queries, issueID, "member", testUserID) {
t.Fatal("expected creator to be subscribed after issue:created")
}
if count := subscriberCount(t, queries, issueID); count != 1 {
t.Fatalf("expected 1 subscriber, got %d", count)
}
}
func TestSubscriberIssueCreated_CreatorAndAssignee(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
assigneeEmail := "subscriber-assignee-test@multica.ai"
assigneeID := createTestUser(t, assigneeEmail)
t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
assigneeType := "member"
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "test issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
AssigneeType: &assigneeType,
AssigneeID: &assigneeID,
},
},
})
if !isSubscribed(t, queries, issueID, "member", testUserID) {
t.Fatal("expected creator to be subscribed")
}
if !isSubscribed(t, queries, issueID, "member", assigneeID) {
t.Fatal("expected assignee to be subscribed")
}
if count := subscriberCount(t, queries, issueID); count != 2 {
t.Fatalf("expected 2 subscribers, got %d", count)
}
}
func TestSubscriberIssueCreated_SelfAssign(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
// Creator is also the assignee (self-assign)
assigneeType := "member"
assigneeID := testUserID
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "test issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
AssigneeType: &assigneeType,
AssigneeID: &assigneeID,
},
},
})
// Should only have 1 subscriber record (ON CONFLICT DO NOTHING handles idempotency)
if count := subscriberCount(t, queries, issueID); count != 1 {
t.Fatalf("expected 1 subscriber for self-assign, got %d", count)
}
if !isSubscribed(t, queries, issueID, "member", testUserID) {
t.Fatal("expected creator/assignee to be subscribed")
}
}
func TestSubscriberIssueUpdated_AssigneeChanged(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
assigneeEmail := "subscriber-new-assignee-test@multica.ai"
assigneeID := createTestUser(t, assigneeEmail)
t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
assigneeType := "member"
bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "test issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
AssigneeType: &assigneeType,
AssigneeID: &assigneeID,
},
"assignee_changed": true,
},
})
if !isSubscribed(t, queries, issueID, "member", assigneeID) {
t.Fatal("expected new assignee to be subscribed after assignee change")
}
}
func TestSubscriberIssueUpdated_NoAssigneeChange(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
// Publish issue:updated without assignee_changed flag
bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "test issue",
Status: "in_progress",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
},
"assignee_changed": false,
"status_changed": true,
},
})
// No subscriber should have been added
if count := subscriberCount(t, queries, issueID); count != 0 {
t.Fatalf("expected 0 subscribers when assignee not changed, got %d", count)
}
}
func TestSubscriberCommentCreated_CommenterSubscribed(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
commenterEmail := "subscriber-commenter-test@multica.ai"
commenterID := createTestUser(t, commenterEmail)
t.Cleanup(func() { cleanupTestUser(t, commenterEmail) })
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
bus.Publish(events.Event{
Type: protocol.EventCommentCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: commenterID,
Payload: map[string]any{
"comment": handler.CommentResponse{
ID: "00000000-0000-0000-0000-000000000000",
IssueID: issueID,
AuthorType: "member",
AuthorID: commenterID,
Content: "test comment",
Type: "comment",
},
},
})
if !isSubscribed(t, queries, issueID, "member", commenterID) {
t.Fatal("expected commenter to be subscribed after comment:created")
}
}
func TestSubscriberAddedEventPublished(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerSubscriberListeners(bus, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() { cleanupTestIssue(t, issueID) })
// Track subscriber:added events
var subscriberEvents []events.Event
bus.Subscribe(protocol.EventSubscriberAdded, func(e events.Event) {
subscriberEvents = append(subscriberEvents, e)
})
bus.Publish(events.Event{
Type: protocol.EventIssueCreated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "test issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
},
},
})
if len(subscriberEvents) != 1 {
t.Fatalf("expected 1 subscriber:added event, got %d", len(subscriberEvents))
}
evt := subscriberEvents[0]
if evt.WorkspaceID != testWorkspaceID {
t.Fatalf("expected workspace_id %s, got %s", testWorkspaceID, evt.WorkspaceID)
}
payload, ok := evt.Payload.(map[string]any)
if !ok {
t.Fatal("expected map[string]any payload")
}
if payload["issue_id"] != issueID {
t.Fatalf("expected issue_id %s, got %v", issueID, payload["issue_id"])
}
if payload["user_id"] != testUserID {
t.Fatalf("expected user_id %s, got %v", testUserID, payload["user_id"])
}
}
// Verify parseUUID is consistent — pgtype.UUID from our local helper should match util.ParseUUID
func TestParseUUIDConsistency(t *testing.T) {
uuid := "550e8400-e29b-41d4-a716-446655440000"
local := parseUUID(uuid)
utilResult := util.ParseUUID(uuid)
if local != utilResult {
t.Fatalf("parseUUID inconsistency: local=%v, util=%v", local, utilResult)
}
if !local.Valid {
t.Fatal("expected valid UUID")
}
// Empty string should produce invalid UUID
empty := parseUUID("")
if empty != (pgtype.UUID{}) {
t.Fatalf("expected zero UUID for empty string, got %v", empty)
}
}

View file

@ -0,0 +1,110 @@
package handler
import (
"net/http"
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// SubscriberResponse is the JSON shape returned for each issue subscriber.
type SubscriberResponse struct {
IssueID string `json:"issue_id"`
UserType string `json:"user_type"`
UserID string `json:"user_id"`
Reason string `json:"reason"`
CreatedAt string `json:"created_at"`
}
func subscriberToResponse(s db.IssueSubscriber) SubscriberResponse {
return SubscriberResponse{
IssueID: uuidToString(s.IssueID),
UserType: s.UserType,
UserID: uuidToString(s.UserID),
Reason: s.Reason,
CreatedAt: timestampToString(s.CreatedAt),
}
}
// ListIssueSubscribers returns all subscribers for an issue.
func (h *Handler) ListIssueSubscribers(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
subscribers, err := h.Queries.ListIssueSubscribers(r.Context(), issue.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list subscribers")
return
}
resp := make([]SubscriberResponse, len(subscribers))
for i, s := range subscribers {
resp[i] = subscriberToResponse(s)
}
writeJSON(w, http.StatusOK, resp)
}
// SubscribeToIssue subscribes the current user to an issue with reason "manual".
func (h *Handler) SubscribeToIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
userID := requestUserID(r)
err := h.Queries.AddIssueSubscriber(r.Context(), db.AddIssueSubscriberParams{
IssueID: issue.ID,
UserType: "member",
UserID: parseUUID(userID),
Reason: "manual",
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to subscribe")
return
}
workspaceID := uuidToString(issue.WorkspaceID)
h.publish(protocol.EventSubscriberAdded, workspaceID, "member", userID, map[string]any{
"issue_id": issueID,
"user_id": userID,
"reason": "manual",
})
writeJSON(w, http.StatusOK, map[string]bool{"subscribed": true})
}
// UnsubscribeFromIssue removes the current user's subscription from an issue.
func (h *Handler) UnsubscribeFromIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
userID := requestUserID(r)
err := h.Queries.RemoveIssueSubscriber(r.Context(), db.RemoveIssueSubscriberParams{
IssueID: issue.ID,
UserType: "member",
UserID: parseUUID(userID),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to unsubscribe")
return
}
workspaceID := uuidToString(issue.WorkspaceID)
h.publish(protocol.EventSubscriberRemoved, workspaceID, "member", userID, map[string]any{
"issue_id": issueID,
"user_id": userID,
})
writeJSON(w, http.StatusOK, map[string]bool{"subscribed": false})
}

View file

@ -0,0 +1,208 @@
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
func TestSubscriberAPI(t *testing.T) {
ctx := context.Background()
// Helper: create an issue for subscriber tests
createIssue := func(t *testing.T) string {
t.Helper()
w := httptest.NewRecorder()
req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{
"title": "Subscriber test issue",
})
testHandler.CreateIssue(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateIssue: expected 201, got %d: %s", w.Code, w.Body.String())
}
var issue IssueResponse
json.NewDecoder(w.Body).Decode(&issue)
return issue.ID
}
// Helper: delete an issue
deleteIssue := func(t *testing.T, issueID string) {
t.Helper()
w := httptest.NewRecorder()
req := newRequest("DELETE", "/api/issues/"+issueID, nil)
req = withURLParam(req, "id", issueID)
testHandler.DeleteIssue(w, req)
}
t.Run("Subscribe", func(t *testing.T) {
issueID := createIssue(t)
defer deleteIssue(t, issueID)
w := httptest.NewRecorder()
req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.SubscribeToIssue(w, req)
if w.Code != http.StatusOK {
t.Fatalf("SubscribeToIssue: expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]bool
json.NewDecoder(w.Body).Decode(&resp)
if !resp["subscribed"] {
t.Fatal("SubscribeToIssue: expected subscribed=true")
}
// Verify in DB
subscribed, err := testHandler.Queries.IsIssueSubscriber(ctx, db.IsIssueSubscriberParams{
IssueID: parseUUID(issueID),
UserType: "member",
UserID: parseUUID(testUserID),
})
if err != nil {
t.Fatalf("IsIssueSubscriber: %v", err)
}
if !subscribed {
t.Fatal("expected user to be subscribed in DB")
}
})
t.Run("SubscribeIdempotent", func(t *testing.T) {
issueID := createIssue(t)
defer deleteIssue(t, issueID)
// Subscribe first time
w := httptest.NewRecorder()
req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.SubscribeToIssue(w, req)
if w.Code != http.StatusOK {
t.Fatalf("SubscribeToIssue (1st): expected 200, got %d: %s", w.Code, w.Body.String())
}
// Subscribe second time — should also succeed
w = httptest.NewRecorder()
req = newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.SubscribeToIssue(w, req)
if w.Code != http.StatusOK {
t.Fatalf("SubscribeToIssue (2nd): expected 200, got %d: %s", w.Code, w.Body.String())
}
})
t.Run("ListSubscribers", func(t *testing.T) {
issueID := createIssue(t)
defer deleteIssue(t, issueID)
// Subscribe first
w := httptest.NewRecorder()
req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.SubscribeToIssue(w, req)
if w.Code != http.StatusOK {
t.Fatalf("SubscribeToIssue: expected 200, got %d: %s", w.Code, w.Body.String())
}
// List
w = httptest.NewRecorder()
req = newRequest("GET", "/api/issues/"+issueID+"/subscribers", nil)
req = withURLParam(req, "id", issueID)
testHandler.ListIssueSubscribers(w, req)
if w.Code != http.StatusOK {
t.Fatalf("ListIssueSubscribers: expected 200, got %d: %s", w.Code, w.Body.String())
}
var subscribers []SubscriberResponse
json.NewDecoder(w.Body).Decode(&subscribers)
if len(subscribers) == 0 {
t.Fatal("ListIssueSubscribers: expected at least 1 subscriber")
}
found := false
for _, s := range subscribers {
if s.UserID == testUserID && s.UserType == "member" && s.Reason == "manual" {
found = true
break
}
}
if !found {
t.Fatalf("ListIssueSubscribers: expected to find test user subscriber, got %+v", subscribers)
}
})
t.Run("Unsubscribe", func(t *testing.T) {
issueID := createIssue(t)
defer deleteIssue(t, issueID)
// Subscribe first
w := httptest.NewRecorder()
req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.SubscribeToIssue(w, req)
if w.Code != http.StatusOK {
t.Fatalf("SubscribeToIssue: expected 200, got %d: %s", w.Code, w.Body.String())
}
// Unsubscribe
w = httptest.NewRecorder()
req = newRequest("POST", "/api/issues/"+issueID+"/unsubscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.UnsubscribeFromIssue(w, req)
if w.Code != http.StatusOK {
t.Fatalf("UnsubscribeFromIssue: expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]bool
json.NewDecoder(w.Body).Decode(&resp)
if resp["subscribed"] {
t.Fatal("UnsubscribeFromIssue: expected subscribed=false")
}
// Verify in DB
subscribed, err := testHandler.Queries.IsIssueSubscriber(ctx, db.IsIssueSubscriberParams{
IssueID: parseUUID(issueID),
UserType: "member",
UserID: parseUUID(testUserID),
})
if err != nil {
t.Fatalf("IsIssueSubscriber: %v", err)
}
if subscribed {
t.Fatal("expected user to NOT be subscribed in DB")
}
})
t.Run("ListAfterUnsubscribe", func(t *testing.T) {
issueID := createIssue(t)
defer deleteIssue(t, issueID)
// Subscribe
w := httptest.NewRecorder()
req := newRequest("POST", "/api/issues/"+issueID+"/subscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.SubscribeToIssue(w, req)
// Unsubscribe
w = httptest.NewRecorder()
req = newRequest("POST", "/api/issues/"+issueID+"/unsubscribe", nil)
req = withURLParam(req, "id", issueID)
testHandler.UnsubscribeFromIssue(w, req)
// List should be empty
w = httptest.NewRecorder()
req = newRequest("GET", "/api/issues/"+issueID+"/subscribers", nil)
req = withURLParam(req, "id", issueID)
testHandler.ListIssueSubscribers(w, req)
if w.Code != http.StatusOK {
t.Fatalf("ListIssueSubscribers: expected 200, got %d: %s", w.Code, w.Body.String())
}
var subscribers []SubscriberResponse
json.NewDecoder(w.Body).Decode(&subscribers)
if len(subscribers) != 0 {
t.Fatalf("ListIssueSubscribers: expected 0 subscribers after unsubscribe, got %d", len(subscribers))
}
})
}

View file

@ -176,10 +176,6 @@ func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, resu
}
}
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
s.createInboxForIssueCreator(ctx, issue, task.AgentID, "task_completed", "attention", "Task completed: "+issue.Title, "")
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
@ -218,10 +214,6 @@ func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg s
if errMsg != "" {
s.createAgentComment(ctx, task.IssueID, task.AgentID, errMsg, "system")
}
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
s.createInboxForIssueCreator(ctx, issue, task.AgentID, "task_failed", "action_required", "Task failed: "+issue.Title, errMsg)
}
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
@ -412,40 +404,12 @@ func (s *TaskService) createAgentComment(ctx context.Context, issueID, agentID p
"type": comment.Type,
"created_at": comment.CreatedAt.Time.Format("2006-01-02T15:04:05Z"),
},
"issue_title": issue.Title,
"issue_status": issue.Status,
},
})
}
func (s *TaskService) createInboxForIssueCreator(ctx context.Context, issue db.Issue, agentID pgtype.UUID, itemType, severity, title, body string) {
if issue.CreatorType != "member" {
return
}
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: issue.WorkspaceID,
RecipientType: "member",
RecipientID: issue.CreatorID,
Type: itemType,
Severity: severity,
IssueID: issue.ID,
Title: title,
Body: util.PtrToText(&body),
ActorType: util.StrToText("agent"),
ActorID: agentID,
})
if err != nil {
return
}
resp := inboxToMap(item)
resp["issue_status"] = issue.Status
s.Bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "agent",
ActorID: util.UUIDToString(agentID),
Payload: map[string]any{"item": resp},
})
}
func issueToMap(issue db.Issue) map[string]any {
return map[string]any{
"id": util.UUIDToString(issue.ID),
@ -466,25 +430,6 @@ func issueToMap(issue db.Issue) map[string]any {
}
}
func inboxToMap(item db.InboxItem) map[string]any {
return map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
"actor_type": util.TextToPtr(item.ActorType),
"actor_id": util.UUIDToPtr(item.ActorID),
}
}
// agentToMap builds a simple map for broadcasting agent status updates.
func agentToMap(a db.Agent) map[string]any {
var rc any

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS issue_subscriber;

View file

@ -0,0 +1,11 @@
-- Issue subscribers: tracks who is subscribed to notifications for an issue
CREATE TABLE issue_subscriber (
issue_id UUID NOT NULL REFERENCES issue(id) ON DELETE CASCADE,
user_type TEXT NOT NULL CHECK (user_type IN ('member', 'agent')),
user_id UUID NOT NULL,
reason TEXT NOT NULL CHECK (reason IN ('creator', 'assignee', 'commenter', 'mentioned', 'manual')),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (issue_id, user_type, user_id)
);
CREATE INDEX idx_issue_subscriber_user ON issue_subscriber(user_type, user_id);

View file

@ -0,0 +1 @@
-- No-op: cannot distinguish backfilled from organic subscribers

View file

@ -0,0 +1,12 @@
-- Backfill creators as subscribers
INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason)
SELECT id, creator_type, creator_id, 'creator'
FROM issue
ON CONFLICT DO NOTHING;
-- Backfill assignees as subscribers
INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason)
SELECT id, assignee_type, assignee_id, 'assignee'
FROM issue
WHERE assignee_type IS NOT NULL AND assignee_id IS NOT NULL
ON CONFLICT DO NOTHING;

View file

@ -166,6 +166,14 @@ type IssueLabel struct {
Color string `json:"color"`
}
type IssueSubscriber struct {
IssueID pgtype.UUID `json:"issue_id"`
UserType string `json:"user_type"`
UserID pgtype.UUID `json:"user_id"`
Reason string `json:"reason"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type IssueToLabel struct {
IssueID pgtype.UUID `json:"issue_id"`
LabelID pgtype.UUID `json:"label_id"`

View file

@ -0,0 +1,103 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: subscriber.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const addIssueSubscriber = `-- name: AddIssueSubscriber :exec
INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason)
VALUES ($1, $2, $3, $4)
ON CONFLICT (issue_id, user_type, user_id) DO NOTHING
`
type AddIssueSubscriberParams struct {
IssueID pgtype.UUID `json:"issue_id"`
UserType string `json:"user_type"`
UserID pgtype.UUID `json:"user_id"`
Reason string `json:"reason"`
}
func (q *Queries) AddIssueSubscriber(ctx context.Context, arg AddIssueSubscriberParams) error {
_, err := q.db.Exec(ctx, addIssueSubscriber,
arg.IssueID,
arg.UserType,
arg.UserID,
arg.Reason,
)
return err
}
const isIssueSubscriber = `-- name: IsIssueSubscriber :one
SELECT EXISTS(
SELECT 1 FROM issue_subscriber
WHERE issue_id = $1 AND user_type = $2 AND user_id = $3
) AS subscribed
`
type IsIssueSubscriberParams struct {
IssueID pgtype.UUID `json:"issue_id"`
UserType string `json:"user_type"`
UserID pgtype.UUID `json:"user_id"`
}
func (q *Queries) IsIssueSubscriber(ctx context.Context, arg IsIssueSubscriberParams) (bool, error) {
row := q.db.QueryRow(ctx, isIssueSubscriber, arg.IssueID, arg.UserType, arg.UserID)
var subscribed bool
err := row.Scan(&subscribed)
return subscribed, err
}
const listIssueSubscribers = `-- name: ListIssueSubscribers :many
SELECT issue_id, user_type, user_id, reason, created_at FROM issue_subscriber
WHERE issue_id = $1
ORDER BY created_at
`
func (q *Queries) ListIssueSubscribers(ctx context.Context, issueID pgtype.UUID) ([]IssueSubscriber, error) {
rows, err := q.db.Query(ctx, listIssueSubscribers, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []IssueSubscriber{}
for rows.Next() {
var i IssueSubscriber
if err := rows.Scan(
&i.IssueID,
&i.UserType,
&i.UserID,
&i.Reason,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const removeIssueSubscriber = `-- name: RemoveIssueSubscriber :exec
DELETE FROM issue_subscriber
WHERE issue_id = $1 AND user_type = $2 AND user_id = $3
`
type RemoveIssueSubscriberParams struct {
IssueID pgtype.UUID `json:"issue_id"`
UserType string `json:"user_type"`
UserID pgtype.UUID `json:"user_id"`
}
func (q *Queries) RemoveIssueSubscriber(ctx context.Context, arg RemoveIssueSubscriberParams) error {
_, err := q.db.Exec(ctx, removeIssueSubscriber, arg.IssueID, arg.UserType, arg.UserID)
return err
}

View file

@ -0,0 +1,19 @@
-- name: AddIssueSubscriber :exec
INSERT INTO issue_subscriber (issue_id, user_type, user_id, reason)
VALUES ($1, $2, $3, $4)
ON CONFLICT (issue_id, user_type, user_id) DO NOTHING;
-- name: RemoveIssueSubscriber :exec
DELETE FROM issue_subscriber
WHERE issue_id = $1 AND user_type = $2 AND user_id = $3;
-- name: ListIssueSubscribers :many
SELECT * FROM issue_subscriber
WHERE issue_id = $1
ORDER BY created_at;
-- name: IsIssueSubscriber :one
SELECT EXISTS(
SELECT 1 FROM issue_subscriber
WHERE issue_id = $1 AND user_type = $2 AND user_id = $3
) AS subscribed;

View file

@ -39,6 +39,10 @@ const (
EventMemberUpdated = "member:updated"
EventMemberRemoved = "member:removed"
// Subscriber events
EventSubscriberAdded = "subscriber:added"
EventSubscriberRemoved = "subscriber:removed"
// Daemon events
EventDaemonHeartbeat = "daemon:heartbeat"
EventDaemonRegister = "daemon:register"