diff --git a/apps/web/app/(dashboard)/_components/app-sidebar.tsx b/apps/web/app/(dashboard)/_components/app-sidebar.tsx index 642360ea..a8d3485e 100644 --- a/apps/web/app/(dashboard)/_components/app-sidebar.tsx +++ b/apps/web/app/(dashboard)/_components/app-sidebar.tsx @@ -64,9 +64,7 @@ export function AppSidebar() { const workspaces = useWorkspaceStore((s) => s.workspaces); const switchWorkspace = useWorkspaceStore((s) => s.switchWorkspace); - const unreadCount = useInboxStore((s) => - s.items.filter((i) => !i.read && !i.archived).length - ); + const unreadCount = useInboxStore((s) => s.unreadCount()); const logout = () => { authLogout(); diff --git a/apps/web/app/(dashboard)/agents/page.tsx b/apps/web/app/(dashboard)/agents/page.tsx index d8f919bf..63762143 100644 --- a/apps/web/app/(dashboard)/agents/page.tsx +++ b/apps/web/app/(dashboard)/agents/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useState, useEffect, useCallback } from "react"; +import { useState, useEffect } from "react"; import { useDefaultLayout } from "react-resizable-panels"; import { Bot, @@ -65,7 +65,7 @@ import { Label } from "@/components/ui/label"; import { api } from "@/shared/api"; import { useAuthStore } from "@/features/auth"; import { useWorkspaceStore } from "@/features/workspace"; -import { useWSEvent } from "@/features/realtime"; + // --------------------------------------------------------------------------- // Helpers @@ -1153,13 +1153,6 @@ export default function AgentsPage() { } }, [agents, selectedId]); - useWSEvent( - "agent:status", - useCallback(() => { - refreshAgents(); - }, [refreshAgents]), - ); - const handleCreate = async (data: CreateAgentRequest) => { const agent = await api.createAgent(data); await refreshAgents(); diff --git a/apps/web/app/(dashboard)/inbox/page.tsx b/apps/web/app/(dashboard)/inbox/page.tsx index 8c474198..9459ac40 100644 --- a/apps/web/app/(dashboard)/inbox/page.tsx +++ b/apps/web/app/(dashboard)/inbox/page.tsx @@ -1,10 +1,8 @@ "use client"; -import { useMemo } from "react"; import { useSearchParams, useRouter } from "next/navigation"; import { useDefaultLayout } from "react-resizable-panels"; import { useInboxStore } from "@/features/inbox"; -import { useIssueStore } from "@/features/issues"; import { IssueDetail, StatusIcon, PriorityIcon } from "@/features/issues/components"; import { STATUS_CONFIG, PRIORITY_CONFIG } from "@/features/issues/config"; import { useActorName } from "@/features/workspace"; @@ -19,7 +17,7 @@ import { BookCheck, ListChecks, } from "lucide-react"; -import type { InboxItem, InboxItemType, InboxSeverity, IssueStatus, IssuePriority } from "@/shared/types"; +import type { InboxItem, InboxItemType, IssueStatus, IssuePriority } from "@/shared/types"; import { Button } from "@/components/ui/button"; import { ResizablePanelGroup, @@ -34,23 +32,12 @@ import { DropdownMenuItem, DropdownMenuSeparator, } from "@/components/ui/dropdown-menu"; -import { - HoverCard, - HoverCardTrigger, - HoverCardContent, -} from "@/components/ui/hover-card"; import { api } from "@/shared/api"; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -const severityOrder: Record = { - action_required: 0, - attention: 1, - info: 2, -}; - const typeLabels: Record = { issue_assigned: "Assigned", unassigned: "Unassigned", @@ -86,29 +73,6 @@ function shortDate(dateStr: string): string { }); } -// --------------------------------------------------------------------------- -// InboxHoverContent — shows issue context on hover -// --------------------------------------------------------------------------- - -function InboxHoverContent({ item }: { item: InboxItem }) { - const issues = useIssueStore((s) => s.issues); - const issue = item.issue_id ? issues.find((i) => i.id === item.issue_id) : null; - - if (!issue) return null; - - return ( -
-
- -

{issue.title}

-
- {issue.description && ( -

{issue.description}

- )} -
- ); -} - // --------------------------------------------------------------------------- // InboxDetailLabel — renders rich subtitle per notification type // --------------------------------------------------------------------------- @@ -159,7 +123,7 @@ function InboxDetailLabel({ item }: { item: InboxItem }) { return Removed due date; } case "new_comment": { - if (item.body) return {item.body}; + if (item.body) return {item.body}; return {typeLabels[item.type]}; } default: @@ -181,52 +145,43 @@ function InboxListItem({ onClick: () => void; }) { return ( - - - } - > - -
-
-
- {!item.read && ( - - )} - - {item.title} - -
- {item.issue_status && ( - + ); } @@ -246,40 +201,13 @@ export default function InboxPage() { } }; - const storeItems = useInboxStore((s) => s.items); + const items = useInboxStore((s) => s.dedupedItems()); const loading = useInboxStore((s) => s.loading); const { defaultLayout, onLayoutChanged } = useDefaultLayout({ id: "multica_inbox_layout", }); - // Group by (issue_id, type, actor_id) and take the latest from each group - const items = useMemo(() => { - const active = storeItems.filter((i) => !i.archived); - const groups = new Map(); - active.forEach((item) => { - const key = `${item.issue_id ?? "none"}|${item.type}|${item.actor_id ?? "none"}`; - const group = groups.get(key) ?? []; - group.push(item); - groups.set(key, group); - }); - - const merged: InboxItem[] = []; - groups.forEach((group) => { - const sorted = group.sort( - (a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime() - ); - const latest = sorted[0]; - if (latest) merged.push(latest); - }); - - return merged.sort( - (a, b) => - severityOrder[a.severity] - severityOrder[b.severity] || - new Date(b.created_at).getTime() - new Date(a.created_at).getTime() - ); - }, [storeItems]); - const selected = items.find((i) => i.id === selectedId) ?? null; const unreadCount = items.filter((i) => !i.read).length; diff --git a/apps/web/features/inbox/store.ts b/apps/web/features/inbox/store.ts index 2fa7b8ab..a4de9b1b 100644 --- a/apps/web/features/inbox/store.ts +++ b/apps/web/features/inbox/store.ts @@ -7,6 +7,41 @@ import { createLogger } from "@/shared/logger"; const logger = createLogger("inbox-store"); +/** + * Deduplicate inbox items by issue_id (one entry per issue, Linear-style), + * keep latest, sort by time DESC. + * Memoized by reference — returns the same array if `items` hasn't changed. + */ +let _prevItems: InboxItem[] = []; +let _prevDeduped: InboxItem[] = []; + +function deduplicateInboxItems(items: InboxItem[]): InboxItem[] { + if (items === _prevItems) return _prevDeduped; + _prevItems = items; + + const active = items.filter((i) => !i.archived); + const groups = new Map(); + active.forEach((item) => { + const key = item.issue_id ?? item.id; + const group = groups.get(key) ?? []; + group.push(item); + groups.set(key, group); + }); + const merged: InboxItem[] = []; + groups.forEach((group) => { + const sorted = group.sort( + (a, b) => + new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), + ); + if (sorted[0]) merged.push(sorted[0]); + }); + _prevDeduped = merged.sort( + (a, b) => + new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), + ); + return _prevDeduped; +} + interface InboxState { items: InboxItem[]; loading: boolean; @@ -19,6 +54,7 @@ interface InboxState { archiveAll: () => void; archiveAllRead: () => void; updateIssueStatus: (issueId: string, status: IssueStatus) => void; + dedupedItems: () => InboxItem[]; unreadCount: () => number; } @@ -28,14 +64,15 @@ export const useInboxStore = create((set, get) => ({ fetch: async () => { logger.debug("fetch start"); - set({ loading: true }); + const isInitialLoad = get().items.length === 0; + if (isInitialLoad) set({ loading: true }); try { const data = await api.listInbox(); logger.info("fetched", data.length, "items"); set({ items: data, loading: false }); } catch (err) { logger.error("fetch failed", err); - set({ loading: false }); + if (isInitialLoad) set({ loading: false }); } }, @@ -74,5 +111,7 @@ export const useInboxStore = create((set, get) => ({ i.issue_id === issueId ? { ...i, issue_status: status } : i ), })), - unreadCount: () => get().items.filter((i) => !i.read && !i.archived).length, + dedupedItems: () => deduplicateInboxItems(get().items), + unreadCount: () => + get().dedupedItems().filter((i) => !i.read).length, })); diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx index bb2120cf..f770639c 100644 --- a/apps/web/features/issues/components/issue-detail.tsx +++ b/apps/web/features/issues/components/issue-detail.tsx @@ -7,6 +7,7 @@ import { useRouter } from "next/navigation"; import { Bot, Calendar, + Check, ChevronLeft, ChevronRight, Link2, @@ -37,8 +38,6 @@ import { DropdownMenuSeparator, DropdownMenuGroup, DropdownMenuLabel, - DropdownMenuRadioGroup, - DropdownMenuRadioItem, DropdownMenuSub, DropdownMenuSubTrigger, DropdownMenuSubContent, @@ -112,6 +111,8 @@ function formatActivity( const formatted = new Date(details.to).toLocaleDateString("en-US", { month: "short", day: "numeric" }); return `set due date to ${formatted}`; } + case "title_changed": + return `renamed this issue from "${details.from ?? "?"}" to "${details.to ?? "?"}"`; case "description_updated": return "updated the description"; case "task_completed": @@ -978,14 +979,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { {STATUS_CONFIG[issue.status].label} - handleUpdateField({ status: v as IssueStatus })}> - {ALL_STATUSES.map((s) => ( - - - {STATUS_CONFIG[s].label} - - ))} - + {ALL_STATUSES.map((s) => ( + handleUpdateField({ status: s })}> + + {STATUS_CONFIG[s].label} + {s === issue.status && } + + ))} @@ -998,14 +998,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { {PRIORITY_CONFIG[issue.priority].label} - handleUpdateField({ priority: v as IssuePriority })}> - {PRIORITY_ORDER.map((p) => ( - - - {PRIORITY_CONFIG[p].label} - - ))} - + {PRIORITY_ORDER.map((p) => ( + handleUpdateField({ priority: p })}> + + {PRIORITY_CONFIG[p].label} + {p === issue.priority && } + + ))} diff --git a/apps/web/features/issues/store.ts b/apps/web/features/issues/store.ts index 4daa27b1..32915ea2 100644 --- a/apps/web/features/issues/store.ts +++ b/apps/web/features/issues/store.ts @@ -19,21 +19,22 @@ interface IssueState { setActiveIssue: (id: string | null) => void; } -export const useIssueStore = create((set) => ({ +export const useIssueStore = create((set, get) => ({ issues: [], loading: true, activeIssueId: null, fetch: async () => { logger.debug("fetch start"); - set({ loading: true }); + const isInitialLoad = get().issues.length === 0; + if (isInitialLoad) set({ loading: true }); try { const res = await api.listIssues({ limit: 200 }); logger.info("fetched", res.issues.length, "issues"); set({ issues: res.issues, loading: false }); } catch (err) { logger.error("fetch failed", err); - set({ loading: false }); + if (isInitialLoad) set({ loading: false }); } }, diff --git a/apps/web/features/realtime/use-realtime-sync.ts b/apps/web/features/realtime/use-realtime-sync.ts index 97750cf6..3d6e2162 100644 --- a/apps/web/features/realtime/use-realtime-sync.ts +++ b/apps/web/features/realtime/use-realtime-sync.ts @@ -8,19 +8,10 @@ import { useInboxStore } from "@/features/inbox"; import { useWorkspaceStore } from "@/features/workspace"; import { useAuthStore } from "@/features/auth"; import { createLogger } from "@/shared/logger"; +import { api } from "@/shared/api"; import type { - IssueCreatedPayload, - IssueUpdatedPayload, - IssueDeletedPayload, - AgentStatusPayload, - AgentCreatedPayload, - InboxNewPayload, - InboxReadPayload, - InboxArchivedPayload, - WorkspaceUpdatedPayload, - WorkspaceDeletedPayload, MemberAddedPayload, - MemberUpdatedPayload, + WorkspaceDeletedPayload, MemberRemovedPayload, } from "@/shared/types"; @@ -28,139 +19,99 @@ const logger = createLogger("realtime-sync"); /** * Centralized WS → store sync. Called once from WSProvider. - * Subscribes to all global WS events and dispatches to Zustand stores. - * Comment events are NOT handled here — they stay per-page on issue detail. + * + * Uses the "WS as invalidation signal + refetch" pattern: + * - onAny handler extracts event prefix and calls the matching store refresh + * - Debounce per-prefix prevents rapid-fire refetches (e.g. bulk issue updates) + * - Precise handlers only for side effects (toast, navigation, self-check) + * + * Per-page events (comments, activity, subscribers, daemon) are still handled + * by individual components via useWSEvent — not here. */ export function useRealtimeSync(ws: WSClient | null) { - // Issue events → useIssueStore + // Main sync: onAny → refreshMap with debounce useEffect(() => { if (!ws) return; - const unsubs = [ - ws.on("issue:created", (p) => { - const { issue } = p as IssueCreatedPayload; - useIssueStore.getState().addIssue(issue); - }), - ws.on("issue:updated", (p) => { - const { issue } = p as IssueUpdatedPayload; - useIssueStore.getState().updateIssue(issue.id, issue); - useInboxStore.getState().updateIssueStatus(issue.id, issue.status); - }), - ws.on("issue:deleted", (p) => { - const { issue_id } = p as IssueDeletedPayload; - useIssueStore.getState().removeIssue(issue_id); - }), - ]; + const refreshMap: Record void> = { + issue: () => void useIssueStore.getState().fetch(), + inbox: () => void useInboxStore.getState().fetch(), + agent: () => void useWorkspaceStore.getState().refreshAgents(), + member: () => void useWorkspaceStore.getState().refreshMembers(), + workspace: () => { + // Lightweight: only re-fetch workspace list, don't hydrate everything. + // workspace:deleted is handled by a precise side-effect handler below. + api.listWorkspaces().then((wsList) => { + const current = useWorkspaceStore.getState().workspace; + const updated = current + ? wsList.find((w) => w.id === current.id) + : null; + if (updated) useWorkspaceStore.getState().updateWorkspace(updated); + }).catch((err) => { + logger.error("workspace refresh failed", err); + }); + }, + skill: () => void useWorkspaceStore.getState().refreshSkills(), + }; - return () => unsubs.forEach((u) => u()); - }, [ws]); + const timers = new Map>(); + const debouncedRefresh = (prefix: string, fn: () => void) => { + const existing = timers.get(prefix); + if (existing) clearTimeout(existing); + timers.set( + prefix, + setTimeout(() => { + timers.delete(prefix); + fn(); + }, 100), + ); + }; - // Inbox events → useInboxStore - useEffect(() => { - if (!ws) return; + const unsubAny = ws.onAny((msg) => { + const prefix = msg.type.split(":")[0] ?? ""; + const refresh = refreshMap[prefix]; + if (refresh) debouncedRefresh(prefix, refresh); + }); - const unsubs = [ - ws.on("inbox:new", (p) => { - const { item } = p as InboxNewPayload; - const myUserId = useAuthStore.getState().user?.id; - // Only add if I'm the recipient (WS broadcasts to all workspace members) - if (item.recipient_type === "member" && item.recipient_id === myUserId) { - useInboxStore.getState().addItem(item); - } - }), - ws.on("inbox:read", (p) => { - const { item_id } = p as InboxReadPayload; - useInboxStore.getState().markRead(item_id); - }), - ws.on("inbox:archived", (p) => { - const { item_id } = p as InboxArchivedPayload; - useInboxStore.getState().archive(item_id); - }), - ws.on("inbox:batch-read", () => { - useInboxStore.getState().markAllRead(); - }), - ws.on("inbox:batch-archived", () => { - useInboxStore.getState().fetch(); - }), - ]; + // --- Side-effect handlers (toast, navigation, self-check) --- - return () => unsubs.forEach((u) => u()); - }, [ws]); + const unsubWsDeleted = ws.on("workspace:deleted", (p) => { + const { workspace_id } = p as WorkspaceDeletedPayload; + const currentWs = useWorkspaceStore.getState().workspace; + if (currentWs?.id === workspace_id) { + logger.warn("current workspace deleted, switching"); + toast.info("This workspace was deleted"); + useWorkspaceStore.getState().refreshWorkspaces(); + } + }); - // Agent events → workspace store - useEffect(() => { - if (!ws) return; + const unsubMemberRemoved = ws.on("member:removed", (p) => { + const { user_id } = p as MemberRemovedPayload; + const myUserId = useAuthStore.getState().user?.id; + if (user_id === myUserId) { + logger.warn("removed from workspace, switching"); + toast.info("You were removed from this workspace"); + useWorkspaceStore.getState().refreshWorkspaces(); + } + }); - const unsubs = [ - ws.on("agent:status", (p) => { - const { agent } = p as AgentStatusPayload; - useWorkspaceStore.getState().updateAgent(agent.id, agent); - }), - ws.on("agent:created", (p) => { - const { agent } = p as AgentCreatedPayload; - const agents = useWorkspaceStore.getState().agents; - if (!agents.find((a) => a.id === agent.id)) { - useWorkspaceStore.getState().refreshAgents(); - } - }), - ws.on("agent:deleted", () => { - useWorkspaceStore.getState().refreshAgents(); - }), - ]; + const unsubMemberAdded = ws.on("member:added", (p) => { + const { member } = p as MemberAddedPayload; + const myUserId = useAuthStore.getState().user?.id; + if (member.user_id === myUserId) { + // I was invited to a new workspace — refresh workspace list + useWorkspaceStore.getState().refreshWorkspaces(); + } + }); - return () => unsubs.forEach((u) => u()); - }, [ws]); - - // Workspace + member events → useWorkspaceStore - useEffect(() => { - if (!ws) return; - - const unsubs = [ - ws.on("workspace:updated", (p) => { - const { workspace } = p as WorkspaceUpdatedPayload; - logger.debug("workspace:updated", workspace.name); - useWorkspaceStore.getState().updateWorkspace(workspace); - }), - ws.on("workspace:deleted", (p) => { - const { workspace_id } = p as WorkspaceDeletedPayload; - const currentWs = useWorkspaceStore.getState().workspace; - if (currentWs?.id === workspace_id) { - logger.warn("current workspace deleted, switching"); - toast.info("This workspace was deleted"); - useWorkspaceStore.getState().refreshWorkspaces(); - } - }), - ws.on("member:updated", (p) => { - const payload = p as MemberUpdatedPayload; - logger.debug("member:updated", payload.member.email, payload.member.role); - useWorkspaceStore.getState().refreshMembers(); - }), - ws.on("member:added", (p) => { - const payload = p as MemberAddedPayload; - const myUserId = useAuthStore.getState().user?.id; - logger.debug("member:added", payload.member.email); - if (payload.member.user_id === myUserId) { - // I was invited to a workspace — refresh list so it appears - useWorkspaceStore.getState().refreshWorkspaces(); - } else { - useWorkspaceStore.getState().refreshMembers(); - } - }), - ws.on("member:removed", (p) => { - const payload = p as MemberRemovedPayload; - const myUserId = useAuthStore.getState().user?.id; - logger.debug("member:removed", payload.user_id); - if (payload.user_id === myUserId) { - logger.warn("removed from workspace, switching"); - toast.info("You were removed from this workspace"); - useWorkspaceStore.getState().refreshWorkspaces(); - } else { - useWorkspaceStore.getState().refreshMembers(); - } - }), - ]; - - return () => unsubs.forEach((u) => u()); + return () => { + unsubAny(); + unsubWsDeleted(); + unsubMemberRemoved(); + unsubMemberAdded(); + timers.forEach(clearTimeout); + timers.clear(); + }; }, [ws]); // Reconnect → refetch all data to recover missed events @@ -174,6 +125,8 @@ export function useRealtimeSync(ws: WSClient | null) { useIssueStore.getState().fetch(), useInboxStore.getState().fetch(), useWorkspaceStore.getState().refreshAgents(), + useWorkspaceStore.getState().refreshMembers(), + useWorkspaceStore.getState().refreshSkills(), ]); } catch { // Silently fail; next reconnect will retry diff --git a/apps/web/features/skills/components/skills-page.tsx b/apps/web/features/skills/components/skills-page.tsx index 92f49b83..89b3e8c9 100644 --- a/apps/web/features/skills/components/skills-page.tsx +++ b/apps/web/features/skills/components/skills-page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useState, useEffect, useCallback, useMemo } from "react"; +import { useState, useEffect, useMemo } from "react"; import { useDefaultLayout } from "react-resizable-panels"; import { Sparkles, @@ -33,7 +33,7 @@ import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; import { api } from "@/shared/api"; import { useAuthStore } from "@/features/auth"; import { useWorkspaceStore } from "@/features/workspace"; -import { useWSEvent } from "@/features/realtime"; + import { FileTree } from "./file-tree"; import { FileViewer } from "./file-viewer"; @@ -600,14 +600,6 @@ export default function SkillsPage() { } }, [skills, selectedId]); - const handleRefresh = useCallback(() => { - refreshSkills(); - }, [refreshSkills]); - - useWSEvent("skill:created", handleRefresh); - useWSEvent("skill:updated", handleRefresh); - useWSEvent("skill:deleted", handleRefresh); - const handleCreate = async (data: CreateSkillRequest) => { const skill = await api.createSkill(data); upsertSkill(skill); diff --git a/apps/web/shared/api/ws-client.ts b/apps/web/shared/api/ws-client.ts index 95c1dcd9..17282a44 100644 --- a/apps/web/shared/api/ws-client.ts +++ b/apps/web/shared/api/ws-client.ts @@ -12,6 +12,7 @@ export class WSClient { private reconnectTimer: ReturnType | null = null; private hasConnectedBefore = false; private onReconnectCallbacks = new Set<() => void>(); + private anyHandlers = new Set<(msg: WSMessage) => void>(); private logger: Logger; constructor(url: string, options?: { logger?: Logger }) { @@ -54,8 +55,9 @@ export class WSClient { for (const handler of eventHandlers) { handler(msg.payload); } - } else { - this.logger.debug("unhandled event", msg.type); + } + for (const handler of this.anyHandlers) { + handler(msg); } }; @@ -83,6 +85,9 @@ export class WSClient { this.ws = null; } this.hasConnectedBefore = false; + this.handlers.clear(); + this.anyHandlers.clear(); + this.onReconnectCallbacks.clear(); } on(event: WSEventType, handler: EventHandler) { @@ -95,6 +100,13 @@ export class WSClient { }; } + onAny(handler: (msg: WSMessage) => void) { + this.anyHandlers.add(handler); + return () => { + this.anyHandlers.delete(handler); + }; + } + onReconnect(callback: () => void) { this.onReconnectCallbacks.add(callback); return () => { diff --git a/server/cmd/server/activity_listeners.go b/server/cmd/server/activity_listeners.go index fbf5b359..5e8864d5 100644 --- a/server/cmd/server/activity_listeners.go +++ b/server/cmd/server/activity_listeners.go @@ -171,6 +171,28 @@ func registerActivityListeners(bus *events.Bus, queries *db.Queries) { } } + if titleChanged, _ := payload["title_changed"].(bool); titleChanged { + prevTitle, _ := payload["prev_title"].(string) + details, _ := json.Marshal(map[string]string{ + "from": prevTitle, + "to": issue.Title, + }) + activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(issue.WorkspaceID), + IssueID: parseUUID(issue.ID), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + Action: "title_changed", + Details: details, + }) + if err != nil { + slog.Error("activity: failed to record title change", + "issue_id", issue.ID, "error", err) + } else { + publishActivityEvent(bus, e, activity) + } + } + if descriptionChanged { activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ WorkspaceID: parseUUID(issue.WorkspaceID), diff --git a/server/cmd/server/activity_listeners_test.go b/server/cmd/server/activity_listeners_test.go index 3935793a..aea726de 100644 --- a/server/cmd/server/activity_listeners_test.go +++ b/server/cmd/server/activity_listeners_test.go @@ -221,6 +221,57 @@ func TestActivityIssueUpdated_NoChangeFlags(t *testing.T) { } } +func TestActivityIssueUpdated_TitleChanged(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "renamed issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + "title_changed": true, + "prev_title": "activity test issue", + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 1 { + t.Fatalf("expected 1 activity, got %d", len(activities)) + } + if activities[0].Action != "title_changed" { + t.Fatalf("expected action 'title_changed', got %q", activities[0].Action) + } + + var details map[string]string + if err := json.Unmarshal(activities[0].Details, &details); err != nil { + t.Fatalf("failed to unmarshal details: %v", err) + } + if details["from"] != "activity test issue" { + t.Fatalf("expected from 'activity test issue', got %q", details["from"]) + } + if details["to"] != "renamed issue" { + t.Fatalf("expected to 'renamed issue', got %q", details["to"]) + } +} + func TestActivityTaskCompleted(t *testing.T) { queries := db.New(testPool) bus := events.New() diff --git a/server/cmd/server/listeners.go b/server/cmd/server/listeners.go index 9aeb0ca2..4bee8856 100644 --- a/server/cmd/server/listeners.go +++ b/server/cmd/server/listeners.go @@ -6,57 +6,26 @@ import ( "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/realtime" - "github.com/multica-ai/multica/server/pkg/protocol" ) // registerListeners wires up event bus listeners for WS broadcasting. +// Uses SubscribeAll to automatically broadcast ALL events to WebSocket clients, +// eliminating the need to maintain a manual event type list. func registerListeners(bus *events.Bus, hub *realtime.Hub) { - allEvents := []string{ - protocol.EventIssueCreated, - protocol.EventIssueUpdated, - protocol.EventIssueDeleted, - protocol.EventCommentCreated, - protocol.EventCommentUpdated, - protocol.EventCommentDeleted, - protocol.EventAgentStatus, - protocol.EventAgentCreated, - protocol.EventAgentDeleted, - protocol.EventTaskDispatch, - protocol.EventTaskProgress, - protocol.EventTaskCompleted, - protocol.EventTaskFailed, - protocol.EventInboxNew, - protocol.EventInboxRead, - protocol.EventInboxArchived, - protocol.EventInboxBatchRead, - protocol.EventInboxBatchArchived, - protocol.EventWorkspaceUpdated, - protocol.EventWorkspaceDeleted, - protocol.EventMemberAdded, - protocol.EventMemberUpdated, - protocol.EventMemberRemoved, - protocol.EventSubscriberAdded, - protocol.EventSubscriberRemoved, - protocol.EventActivityCreated, - } - - for _, et := range allEvents { - eventType := et - bus.Subscribe(eventType, func(e events.Event) { - msg := map[string]any{ - "type": eventType, - "payload": e.Payload, - } - data, err := json.Marshal(msg) - if err != nil { - slog.Error("failed to marshal event", "event_type", eventType, "error", err) - return - } - if e.WorkspaceID != "" { - hub.BroadcastToWorkspace(e.WorkspaceID, data) - } else { - hub.Broadcast(data) - } - }) - } + bus.SubscribeAll(func(e events.Event) { + msg := map[string]any{ + "type": e.Type, + "payload": e.Payload, + } + data, err := json.Marshal(msg) + if err != nil { + slog.Error("failed to marshal event", "event_type", e.Type, "error", err) + return + } + if e.WorkspaceID != "" { + hub.BroadcastToWorkspace(e.WorkspaceID, data) + } else { + hub.Broadcast(data) + } + }) } diff --git a/server/internal/events/bus.go b/server/internal/events/bus.go index 3ac233f4..cf676adc 100644 --- a/server/internal/events/bus.go +++ b/server/internal/events/bus.go @@ -19,8 +19,9 @@ type Handler func(Event) // Bus is an in-process synchronous pub/sub event bus. type Bus struct { - mu sync.RWMutex - listeners map[string][]Handler + mu sync.RWMutex + listeners map[string][]Handler + globalHandlers []Handler } // New creates a new event bus. @@ -38,12 +39,22 @@ func (b *Bus) Subscribe(eventType string, h Handler) { b.listeners[eventType] = append(b.listeners[eventType], h) } +// SubscribeAll registers a handler that receives ALL events regardless of type. +// Global handlers are called after type-specific handlers. +func (b *Bus) SubscribeAll(h Handler) { + b.mu.Lock() + defer b.mu.Unlock() + b.globalHandlers = append(b.globalHandlers, h) +} + // Publish dispatches an event to all registered handlers for that event type. +// Type-specific handlers run first, then global (SubscribeAll) handlers. // Each handler is called synchronously. Panics in individual handlers are // recovered so one failing handler does not prevent others from executing. func (b *Bus) Publish(e Event) { b.mu.RLock() handlers := b.listeners[e.Type] + globals := b.globalHandlers b.mu.RUnlock() for _, h := range handlers { @@ -56,4 +67,15 @@ func (b *Bus) Publish(e Event) { h(e) }() } + + for _, h := range globals { + func() { + defer func() { + if r := recover(); r != nil { + slog.Error("panic in global event listener", "event_type", e.Type, "recovered", r) + } + }() + h(e) + }() + } } diff --git a/server/internal/events/bus_test.go b/server/internal/events/bus_test.go index c1e84dde..5ccb3f1a 100644 --- a/server/internal/events/bus_test.go +++ b/server/internal/events/bus_test.go @@ -62,6 +62,62 @@ func TestPanicInHandlerDoesNotBreakOthers(t *testing.T) { } } +func TestSubscribeAllReceivesAllEventTypes(t *testing.T) { + bus := New() + + var received []string + bus.SubscribeAll(func(e Event) { + received = append(received, e.Type) + }) + + bus.Publish(Event{Type: "issue:created"}) + bus.Publish(Event{Type: "comment:deleted"}) + bus.Publish(Event{Type: "skill:updated"}) + + if len(received) != 3 { + t.Fatalf("expected 3 events, got %d", len(received)) + } + if received[0] != "issue:created" || received[1] != "comment:deleted" || received[2] != "skill:updated" { + t.Fatalf("unexpected events: %v", received) + } +} + +func TestSubscribeAllCalledAfterTypeSpecific(t *testing.T) { + bus := New() + + var order []string + bus.Subscribe("issue:created", func(e Event) { + order = append(order, "specific") + }) + bus.SubscribeAll(func(e Event) { + order = append(order, "global") + }) + + bus.Publish(Event{Type: "issue:created"}) + + if len(order) != 2 || order[0] != "specific" || order[1] != "global" { + t.Fatalf("expected [specific, global], got %v", order) + } +} + +func TestSubscribeAllPanicRecovery(t *testing.T) { + bus := New() + + var secondCalled bool + bus.SubscribeAll(func(e Event) { + panic("test panic") + }) + bus.SubscribeAll(func(e Event) { + secondCalled = true + }) + + bus.Publish(Event{Type: "test"}) + + if !secondCalled { + t.Fatal("second global handler was not called after first panicked") + } +} + func TestEventFieldsPassedThrough(t *testing.T) { bus := New() var received Event diff --git a/server/internal/handler/issue.go b/server/internal/handler/issue.go index 69c76b0c..08369f5d 100644 --- a/server/internal/handler/issue.go +++ b/server/internal/handler/issue.go @@ -334,6 +334,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { statusChanged := req.Status != nil && prevIssue.Status != issue.Status priorityChanged := req.Priority != nil && prevIssue.Priority != issue.Priority descriptionChanged := req.Description != nil && textToPtr(prevIssue.Description) != resp.Description + titleChanged := req.Title != nil && prevIssue.Title != issue.Title prevDueDate := timestampToPtr(prevIssue.DueDate) dueDateChanged := prevDueDate != resp.DueDate && (prevDueDate == nil) != (resp.DueDate == nil) || (prevDueDate != nil && resp.DueDate != nil && *prevDueDate != *resp.DueDate) @@ -345,6 +346,8 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { "priority_changed": priorityChanged, "due_date_changed": dueDateChanged, "description_changed": descriptionChanged, + "title_changed": titleChanged, + "prev_title": prevIssue.Title, "prev_assignee_type": textToPtr(prevIssue.AssigneeType), "prev_assignee_id": uuidToPtr(prevIssue.AssigneeID), "prev_status": prevIssue.Status, diff --git a/server/internal/handler/skill.go b/server/internal/handler/skill.go index f878e650..96ccb6a2 100644 --- a/server/internal/handler/skill.go +++ b/server/internal/handler/skill.go @@ -14,6 +14,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) // --- Response structs --- @@ -266,7 +267,7 @@ func (h *Handler) CreateSkill(w http.ResponseWriter, r *http.Request) { SkillResponse: skillToResponse(skill), Files: fileResps, } - h.publish("skill:created", workspaceID, "member", creatorID, map[string]any{"skill": resp}) + h.publish(protocol.EventSkillCreated, workspaceID, "member", creatorID, map[string]any{"skill": resp}) writeJSON(w, http.StatusCreated, resp) } @@ -366,7 +367,7 @@ func (h *Handler) UpdateSkill(w http.ResponseWriter, r *http.Request) { SkillResponse: skillToResponse(skill), Files: fileResps, } - h.publish("skill:updated", resolveWorkspaceID(r), "member", requestUserID(r), map[string]any{"skill": resp}) + h.publish(protocol.EventSkillUpdated, resolveWorkspaceID(r), "member", requestUserID(r), map[string]any{"skill": resp}) writeJSON(w, http.StatusOK, resp) } @@ -384,7 +385,7 @@ func (h *Handler) DeleteSkill(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "failed to delete skill") return } - h.publish("skill:deleted", uuidToString(skill.WorkspaceID), "member", requestUserID(r), map[string]any{"skill_id": id}) + h.publish(protocol.EventSkillDeleted, uuidToString(skill.WorkspaceID), "member", requestUserID(r), map[string]any{"skill_id": id}) w.WriteHeader(http.StatusNoContent) } @@ -854,7 +855,7 @@ func (h *Handler) ImportSkill(w http.ResponseWriter, r *http.Request) { SkillResponse: skillToResponse(skill), Files: fileResps, } - h.publish("skill:created", workspaceID, "member", creatorID, map[string]any{"skill": resp}) + h.publish(protocol.EventSkillCreated, workspaceID, "member", creatorID, map[string]any{"skill": resp}) writeJSON(w, http.StatusCreated, resp) } @@ -1010,6 +1011,6 @@ func (h *Handler) SetAgentSkills(w http.ResponseWriter, r *http.Request) { for i, s := range skills { resp[i] = skillToResponse(s) } - h.publish("agent:status", uuidToString(agent.WorkspaceID), "member", requestUserID(r), map[string]any{"agent_id": uuidToString(agent.ID), "skills": resp}) + h.publish(protocol.EventAgentStatus, uuidToString(agent.WorkspaceID), "member", requestUserID(r), map[string]any{"agent_id": uuidToString(agent.ID), "skills": resp}) writeJSON(w, http.StatusOK, resp) } diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index 6029bf2a..3edd5bdb 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -46,6 +46,11 @@ const ( // Activity events EventActivityCreated = "activity:created" + // Skill events + EventSkillCreated = "skill:created" + EventSkillUpdated = "skill:updated" + EventSkillDeleted = "skill:deleted" + // Daemon events EventDaemonHeartbeat = "daemon:heartbeat" EventDaemonRegister = "daemon:register"