Merge pull request #162 from multica-ai/naiyuan/realtime-sync-refactor

feat(realtime): WS invalidation + refetch pattern
This commit is contained in:
Naiyuan Qing 2026-03-29 13:51:22 +08:00 committed by GitHub
commit 586c3bf470
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 388 additions and 344 deletions

View file

@ -64,9 +64,7 @@ export function AppSidebar() {
const workspaces = useWorkspaceStore((s) => s.workspaces);
const switchWorkspace = useWorkspaceStore((s) => s.switchWorkspace);
const unreadCount = useInboxStore((s) =>
s.items.filter((i) => !i.read && !i.archived).length
);
const unreadCount = useInboxStore((s) => s.unreadCount());
const logout = () => {
authLogout();

View file

@ -1,6 +1,6 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import { useState, useEffect } from "react";
import { useDefaultLayout } from "react-resizable-panels";
import {
Bot,
@ -65,7 +65,7 @@ import { Label } from "@/components/ui/label";
import { api } from "@/shared/api";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore } from "@/features/workspace";
import { useWSEvent } from "@/features/realtime";
// ---------------------------------------------------------------------------
// Helpers
@ -1153,13 +1153,6 @@ export default function AgentsPage() {
}
}, [agents, selectedId]);
useWSEvent(
"agent:status",
useCallback(() => {
refreshAgents();
}, [refreshAgents]),
);
const handleCreate = async (data: CreateAgentRequest) => {
const agent = await api.createAgent(data);
await refreshAgents();

View file

@ -1,10 +1,8 @@
"use client";
import { useMemo } from "react";
import { useSearchParams, useRouter } from "next/navigation";
import { useDefaultLayout } from "react-resizable-panels";
import { useInboxStore } from "@/features/inbox";
import { useIssueStore } from "@/features/issues";
import { IssueDetail, StatusIcon, PriorityIcon } from "@/features/issues/components";
import { STATUS_CONFIG, PRIORITY_CONFIG } from "@/features/issues/config";
import { useActorName } from "@/features/workspace";
@ -19,7 +17,7 @@ import {
BookCheck,
ListChecks,
} from "lucide-react";
import type { InboxItem, InboxItemType, InboxSeverity, IssueStatus, IssuePriority } from "@/shared/types";
import type { InboxItem, InboxItemType, IssueStatus, IssuePriority } from "@/shared/types";
import { Button } from "@/components/ui/button";
import {
ResizablePanelGroup,
@ -34,23 +32,12 @@ import {
DropdownMenuItem,
DropdownMenuSeparator,
} from "@/components/ui/dropdown-menu";
import {
HoverCard,
HoverCardTrigger,
HoverCardContent,
} from "@/components/ui/hover-card";
import { api } from "@/shared/api";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
const severityOrder: Record<InboxSeverity, number> = {
action_required: 0,
attention: 1,
info: 2,
};
const typeLabels: Record<InboxItemType, string> = {
issue_assigned: "Assigned",
unassigned: "Unassigned",
@ -86,29 +73,6 @@ function shortDate(dateStr: string): string {
});
}
// ---------------------------------------------------------------------------
// InboxHoverContent — shows issue context on hover
// ---------------------------------------------------------------------------
function InboxHoverContent({ item }: { item: InboxItem }) {
const issues = useIssueStore((s) => s.issues);
const issue = item.issue_id ? issues.find((i) => i.id === item.issue_id) : null;
if (!issue) return null;
return (
<div className="space-y-1.5">
<div className="flex items-start gap-2">
<StatusIcon status={issue.status} className="mt-0.5 h-3.5 w-3.5 shrink-0" />
<p className="text-sm font-medium line-clamp-2">{issue.title}</p>
</div>
{issue.description && (
<p className="line-clamp-2 text-xs text-muted-foreground">{issue.description}</p>
)}
</div>
);
}
// ---------------------------------------------------------------------------
// InboxDetailLabel — renders rich subtitle per notification type
// ---------------------------------------------------------------------------
@ -159,7 +123,7 @@ function InboxDetailLabel({ item }: { item: InboxItem }) {
return <span>Removed due date</span>;
}
case "new_comment": {
if (item.body) return <span className="truncate">{item.body}</span>;
if (item.body) return <span>{item.body}</span>;
return <span>{typeLabels[item.type]}</span>;
}
default:
@ -181,52 +145,43 @@ function InboxListItem({
onClick: () => void;
}) {
return (
<HoverCard>
<HoverCardTrigger
render={
<button
onClick={onClick}
className={`flex w-full items-center gap-3 px-4 py-2.5 text-left transition-colors ${
isSelected ? "bg-accent" : "hover:bg-accent/50"
}`}
/>
}
>
<ActorAvatar
actorType={item.actor_type ?? item.recipient_type}
actorId={item.actor_id ?? item.recipient_id}
size={28}
/>
<div className="min-w-0 flex-1">
<div className="flex items-center justify-between gap-2">
<div className="flex min-w-0 items-center gap-1.5">
{!item.read && (
<span className="h-1.5 w-1.5 shrink-0 rounded-full bg-brand" />
)}
<span
className={`truncate text-sm ${!item.read ? "font-medium" : "text-muted-foreground"}`}
>
{item.title}
</span>
</div>
{item.issue_status && (
<StatusIcon status={item.issue_status} className="h-3.5 w-3.5 shrink-0" />
<button
onClick={onClick}
className={`flex w-full items-center gap-3 px-4 py-2.5 text-left transition-colors ${
isSelected ? "bg-accent" : "hover:bg-accent/50"
}`}
>
<ActorAvatar
actorType={item.actor_type ?? item.recipient_type}
actorId={item.actor_id ?? item.recipient_id}
size={28}
/>
<div className="min-w-0 flex-1">
<div className="flex items-center justify-between gap-2">
<div className="flex min-w-0 items-center gap-1.5">
{!item.read && (
<span className="h-1.5 w-1.5 shrink-0 rounded-full bg-brand" />
)}
</div>
<div className="mt-0.5 flex items-center justify-between gap-2">
<p className={`min-w-0 truncate text-xs ${item.read ? "text-muted-foreground/60" : "text-muted-foreground"}`}>
<InboxDetailLabel item={item} />
</p>
<span className={`shrink-0 text-xs ${item.read ? "text-muted-foreground/60" : "text-muted-foreground"}`}>
{timeAgo(item.created_at)}
<span
className={`truncate text-sm ${!item.read ? "font-medium" : "text-muted-foreground"}`}
>
{item.title}
</span>
</div>
{item.issue_status && (
<StatusIcon status={item.issue_status} className="h-3.5 w-3.5 shrink-0" />
)}
</div>
</HoverCardTrigger>
<HoverCardContent side="right" align="start" className="w-72">
<InboxHoverContent item={item} />
</HoverCardContent>
</HoverCard>
<div className="mt-0.5 flex items-center justify-between gap-2">
<p className={`min-w-0 overflow-hidden text-ellipsis whitespace-nowrap text-xs ${item.read ? "text-muted-foreground/60" : "text-muted-foreground"}`}>
<InboxDetailLabel item={item} />
</p>
<span className={`shrink-0 text-xs ${item.read ? "text-muted-foreground/60" : "text-muted-foreground"}`}>
{timeAgo(item.created_at)}
</span>
</div>
</div>
</button>
);
}
@ -246,40 +201,13 @@ export default function InboxPage() {
}
};
const storeItems = useInboxStore((s) => s.items);
const items = useInboxStore((s) => s.dedupedItems());
const loading = useInboxStore((s) => s.loading);
const { defaultLayout, onLayoutChanged } = useDefaultLayout({
id: "multica_inbox_layout",
});
// Group by (issue_id, type, actor_id) and take the latest from each group
const items = useMemo(() => {
const active = storeItems.filter((i) => !i.archived);
const groups = new Map<string, InboxItem[]>();
active.forEach((item) => {
const key = `${item.issue_id ?? "none"}|${item.type}|${item.actor_id ?? "none"}`;
const group = groups.get(key) ?? [];
group.push(item);
groups.set(key, group);
});
const merged: InboxItem[] = [];
groups.forEach((group) => {
const sorted = group.sort(
(a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
const latest = sorted[0];
if (latest) merged.push(latest);
});
return merged.sort(
(a, b) =>
severityOrder[a.severity] - severityOrder[b.severity] ||
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
}, [storeItems]);
const selected = items.find((i) => i.id === selectedId) ?? null;
const unreadCount = items.filter((i) => !i.read).length;

View file

@ -7,6 +7,41 @@ import { createLogger } from "@/shared/logger";
const logger = createLogger("inbox-store");
/**
* Deduplicate inbox items by issue_id (one entry per issue, Linear-style),
* keep latest, sort by time DESC.
* Memoized by reference returns the same array if `items` hasn't changed.
*/
let _prevItems: InboxItem[] = [];
let _prevDeduped: InboxItem[] = [];
function deduplicateInboxItems(items: InboxItem[]): InboxItem[] {
if (items === _prevItems) return _prevDeduped;
_prevItems = items;
const active = items.filter((i) => !i.archived);
const groups = new Map<string, InboxItem[]>();
active.forEach((item) => {
const key = item.issue_id ?? item.id;
const group = groups.get(key) ?? [];
group.push(item);
groups.set(key, group);
});
const merged: InboxItem[] = [];
groups.forEach((group) => {
const sorted = group.sort(
(a, b) =>
new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
);
if (sorted[0]) merged.push(sorted[0]);
});
_prevDeduped = merged.sort(
(a, b) =>
new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
);
return _prevDeduped;
}
interface InboxState {
items: InboxItem[];
loading: boolean;
@ -19,6 +54,7 @@ interface InboxState {
archiveAll: () => void;
archiveAllRead: () => void;
updateIssueStatus: (issueId: string, status: IssueStatus) => void;
dedupedItems: () => InboxItem[];
unreadCount: () => number;
}
@ -28,14 +64,15 @@ export const useInboxStore = create<InboxState>((set, get) => ({
fetch: async () => {
logger.debug("fetch start");
set({ loading: true });
const isInitialLoad = get().items.length === 0;
if (isInitialLoad) set({ loading: true });
try {
const data = await api.listInbox();
logger.info("fetched", data.length, "items");
set({ items: data, loading: false });
} catch (err) {
logger.error("fetch failed", err);
set({ loading: false });
if (isInitialLoad) set({ loading: false });
}
},
@ -74,5 +111,7 @@ export const useInboxStore = create<InboxState>((set, get) => ({
i.issue_id === issueId ? { ...i, issue_status: status } : i
),
})),
unreadCount: () => get().items.filter((i) => !i.read && !i.archived).length,
dedupedItems: () => deduplicateInboxItems(get().items),
unreadCount: () =>
get().dedupedItems().filter((i) => !i.read).length,
}));

View file

@ -7,6 +7,7 @@ import { useRouter } from "next/navigation";
import {
Bot,
Calendar,
Check,
ChevronLeft,
ChevronRight,
Link2,
@ -37,8 +38,6 @@ import {
DropdownMenuSeparator,
DropdownMenuGroup,
DropdownMenuLabel,
DropdownMenuRadioGroup,
DropdownMenuRadioItem,
DropdownMenuSub,
DropdownMenuSubTrigger,
DropdownMenuSubContent,
@ -112,6 +111,8 @@ function formatActivity(
const formatted = new Date(details.to).toLocaleDateString("en-US", { month: "short", day: "numeric" });
return `set due date to ${formatted}`;
}
case "title_changed":
return `renamed this issue from "${details.from ?? "?"}" to "${details.to ?? "?"}"`;
case "description_updated":
return "updated the description";
case "task_completed":
@ -978,14 +979,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
<span className="truncate">{STATUS_CONFIG[issue.status].label}</span>
</DropdownMenuTrigger>
<DropdownMenuContent align="start" className="w-44">
<DropdownMenuRadioGroup value={issue.status} onValueChange={(v) => handleUpdateField({ status: v as IssueStatus })}>
{ALL_STATUSES.map((s) => (
<DropdownMenuRadioItem key={s} value={s}>
<StatusIcon status={s} className="h-3.5 w-3.5" />
{STATUS_CONFIG[s].label}
</DropdownMenuRadioItem>
))}
</DropdownMenuRadioGroup>
{ALL_STATUSES.map((s) => (
<DropdownMenuItem key={s} onClick={() => handleUpdateField({ status: s })}>
<StatusIcon status={s} className="h-3.5 w-3.5" />
{STATUS_CONFIG[s].label}
{s === issue.status && <Check className="ml-auto h-3.5 w-3.5" />}
</DropdownMenuItem>
))}
</DropdownMenuContent>
</DropdownMenu>
</PropRow>
@ -998,14 +998,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) {
<span className="truncate">{PRIORITY_CONFIG[issue.priority].label}</span>
</DropdownMenuTrigger>
<DropdownMenuContent align="start" className="w-44">
<DropdownMenuRadioGroup value={issue.priority} onValueChange={(v) => handleUpdateField({ priority: v as IssuePriority })}>
{PRIORITY_ORDER.map((p) => (
<DropdownMenuRadioItem key={p} value={p}>
<PriorityIcon priority={p} />
{PRIORITY_CONFIG[p].label}
</DropdownMenuRadioItem>
))}
</DropdownMenuRadioGroup>
{PRIORITY_ORDER.map((p) => (
<DropdownMenuItem key={p} onClick={() => handleUpdateField({ priority: p })}>
<PriorityIcon priority={p} />
{PRIORITY_CONFIG[p].label}
{p === issue.priority && <Check className="ml-auto h-3.5 w-3.5" />}
</DropdownMenuItem>
))}
</DropdownMenuContent>
</DropdownMenu>
</PropRow>

View file

@ -19,21 +19,22 @@ interface IssueState {
setActiveIssue: (id: string | null) => void;
}
export const useIssueStore = create<IssueState>((set) => ({
export const useIssueStore = create<IssueState>((set, get) => ({
issues: [],
loading: true,
activeIssueId: null,
fetch: async () => {
logger.debug("fetch start");
set({ loading: true });
const isInitialLoad = get().issues.length === 0;
if (isInitialLoad) set({ loading: true });
try {
const res = await api.listIssues({ limit: 200 });
logger.info("fetched", res.issues.length, "issues");
set({ issues: res.issues, loading: false });
} catch (err) {
logger.error("fetch failed", err);
set({ loading: false });
if (isInitialLoad) set({ loading: false });
}
},

View file

@ -8,19 +8,10 @@ import { useInboxStore } from "@/features/inbox";
import { useWorkspaceStore } from "@/features/workspace";
import { useAuthStore } from "@/features/auth";
import { createLogger } from "@/shared/logger";
import { api } from "@/shared/api";
import type {
IssueCreatedPayload,
IssueUpdatedPayload,
IssueDeletedPayload,
AgentStatusPayload,
AgentCreatedPayload,
InboxNewPayload,
InboxReadPayload,
InboxArchivedPayload,
WorkspaceUpdatedPayload,
WorkspaceDeletedPayload,
MemberAddedPayload,
MemberUpdatedPayload,
WorkspaceDeletedPayload,
MemberRemovedPayload,
} from "@/shared/types";
@ -28,139 +19,99 @@ const logger = createLogger("realtime-sync");
/**
* Centralized WS store sync. Called once from WSProvider.
* Subscribes to all global WS events and dispatches to Zustand stores.
* Comment events are NOT handled here they stay per-page on issue detail.
*
* Uses the "WS as invalidation signal + refetch" pattern:
* - onAny handler extracts event prefix and calls the matching store refresh
* - Debounce per-prefix prevents rapid-fire refetches (e.g. bulk issue updates)
* - Precise handlers only for side effects (toast, navigation, self-check)
*
* Per-page events (comments, activity, subscribers, daemon) are still handled
* by individual components via useWSEvent not here.
*/
export function useRealtimeSync(ws: WSClient | null) {
// Issue events → useIssueStore
// Main sync: onAny → refreshMap with debounce
useEffect(() => {
if (!ws) return;
const unsubs = [
ws.on("issue:created", (p) => {
const { issue } = p as IssueCreatedPayload;
useIssueStore.getState().addIssue(issue);
}),
ws.on("issue:updated", (p) => {
const { issue } = p as IssueUpdatedPayload;
useIssueStore.getState().updateIssue(issue.id, issue);
useInboxStore.getState().updateIssueStatus(issue.id, issue.status);
}),
ws.on("issue:deleted", (p) => {
const { issue_id } = p as IssueDeletedPayload;
useIssueStore.getState().removeIssue(issue_id);
}),
];
const refreshMap: Record<string, () => void> = {
issue: () => void useIssueStore.getState().fetch(),
inbox: () => void useInboxStore.getState().fetch(),
agent: () => void useWorkspaceStore.getState().refreshAgents(),
member: () => void useWorkspaceStore.getState().refreshMembers(),
workspace: () => {
// Lightweight: only re-fetch workspace list, don't hydrate everything.
// workspace:deleted is handled by a precise side-effect handler below.
api.listWorkspaces().then((wsList) => {
const current = useWorkspaceStore.getState().workspace;
const updated = current
? wsList.find((w) => w.id === current.id)
: null;
if (updated) useWorkspaceStore.getState().updateWorkspace(updated);
}).catch((err) => {
logger.error("workspace refresh failed", err);
});
},
skill: () => void useWorkspaceStore.getState().refreshSkills(),
};
return () => unsubs.forEach((u) => u());
}, [ws]);
const timers = new Map<string, ReturnType<typeof setTimeout>>();
const debouncedRefresh = (prefix: string, fn: () => void) => {
const existing = timers.get(prefix);
if (existing) clearTimeout(existing);
timers.set(
prefix,
setTimeout(() => {
timers.delete(prefix);
fn();
}, 100),
);
};
// Inbox events → useInboxStore
useEffect(() => {
if (!ws) return;
const unsubAny = ws.onAny((msg) => {
const prefix = msg.type.split(":")[0] ?? "";
const refresh = refreshMap[prefix];
if (refresh) debouncedRefresh(prefix, refresh);
});
const unsubs = [
ws.on("inbox:new", (p) => {
const { item } = p as InboxNewPayload;
const myUserId = useAuthStore.getState().user?.id;
// Only add if I'm the recipient (WS broadcasts to all workspace members)
if (item.recipient_type === "member" && item.recipient_id === myUserId) {
useInboxStore.getState().addItem(item);
}
}),
ws.on("inbox:read", (p) => {
const { item_id } = p as InboxReadPayload;
useInboxStore.getState().markRead(item_id);
}),
ws.on("inbox:archived", (p) => {
const { item_id } = p as InboxArchivedPayload;
useInboxStore.getState().archive(item_id);
}),
ws.on("inbox:batch-read", () => {
useInboxStore.getState().markAllRead();
}),
ws.on("inbox:batch-archived", () => {
useInboxStore.getState().fetch();
}),
];
// --- Side-effect handlers (toast, navigation, self-check) ---
return () => unsubs.forEach((u) => u());
}, [ws]);
const unsubWsDeleted = ws.on("workspace:deleted", (p) => {
const { workspace_id } = p as WorkspaceDeletedPayload;
const currentWs = useWorkspaceStore.getState().workspace;
if (currentWs?.id === workspace_id) {
logger.warn("current workspace deleted, switching");
toast.info("This workspace was deleted");
useWorkspaceStore.getState().refreshWorkspaces();
}
});
// Agent events → workspace store
useEffect(() => {
if (!ws) return;
const unsubMemberRemoved = ws.on("member:removed", (p) => {
const { user_id } = p as MemberRemovedPayload;
const myUserId = useAuthStore.getState().user?.id;
if (user_id === myUserId) {
logger.warn("removed from workspace, switching");
toast.info("You were removed from this workspace");
useWorkspaceStore.getState().refreshWorkspaces();
}
});
const unsubs = [
ws.on("agent:status", (p) => {
const { agent } = p as AgentStatusPayload;
useWorkspaceStore.getState().updateAgent(agent.id, agent);
}),
ws.on("agent:created", (p) => {
const { agent } = p as AgentCreatedPayload;
const agents = useWorkspaceStore.getState().agents;
if (!agents.find((a) => a.id === agent.id)) {
useWorkspaceStore.getState().refreshAgents();
}
}),
ws.on("agent:deleted", () => {
useWorkspaceStore.getState().refreshAgents();
}),
];
const unsubMemberAdded = ws.on("member:added", (p) => {
const { member } = p as MemberAddedPayload;
const myUserId = useAuthStore.getState().user?.id;
if (member.user_id === myUserId) {
// I was invited to a new workspace — refresh workspace list
useWorkspaceStore.getState().refreshWorkspaces();
}
});
return () => unsubs.forEach((u) => u());
}, [ws]);
// Workspace + member events → useWorkspaceStore
useEffect(() => {
if (!ws) return;
const unsubs = [
ws.on("workspace:updated", (p) => {
const { workspace } = p as WorkspaceUpdatedPayload;
logger.debug("workspace:updated", workspace.name);
useWorkspaceStore.getState().updateWorkspace(workspace);
}),
ws.on("workspace:deleted", (p) => {
const { workspace_id } = p as WorkspaceDeletedPayload;
const currentWs = useWorkspaceStore.getState().workspace;
if (currentWs?.id === workspace_id) {
logger.warn("current workspace deleted, switching");
toast.info("This workspace was deleted");
useWorkspaceStore.getState().refreshWorkspaces();
}
}),
ws.on("member:updated", (p) => {
const payload = p as MemberUpdatedPayload;
logger.debug("member:updated", payload.member.email, payload.member.role);
useWorkspaceStore.getState().refreshMembers();
}),
ws.on("member:added", (p) => {
const payload = p as MemberAddedPayload;
const myUserId = useAuthStore.getState().user?.id;
logger.debug("member:added", payload.member.email);
if (payload.member.user_id === myUserId) {
// I was invited to a workspace — refresh list so it appears
useWorkspaceStore.getState().refreshWorkspaces();
} else {
useWorkspaceStore.getState().refreshMembers();
}
}),
ws.on("member:removed", (p) => {
const payload = p as MemberRemovedPayload;
const myUserId = useAuthStore.getState().user?.id;
logger.debug("member:removed", payload.user_id);
if (payload.user_id === myUserId) {
logger.warn("removed from workspace, switching");
toast.info("You were removed from this workspace");
useWorkspaceStore.getState().refreshWorkspaces();
} else {
useWorkspaceStore.getState().refreshMembers();
}
}),
];
return () => unsubs.forEach((u) => u());
return () => {
unsubAny();
unsubWsDeleted();
unsubMemberRemoved();
unsubMemberAdded();
timers.forEach(clearTimeout);
timers.clear();
};
}, [ws]);
// Reconnect → refetch all data to recover missed events
@ -174,6 +125,8 @@ export function useRealtimeSync(ws: WSClient | null) {
useIssueStore.getState().fetch(),
useInboxStore.getState().fetch(),
useWorkspaceStore.getState().refreshAgents(),
useWorkspaceStore.getState().refreshMembers(),
useWorkspaceStore.getState().refreshSkills(),
]);
} catch {
// Silently fail; next reconnect will retry

View file

@ -1,6 +1,6 @@
"use client";
import { useState, useEffect, useCallback, useMemo } from "react";
import { useState, useEffect, useMemo } from "react";
import { useDefaultLayout } from "react-resizable-panels";
import {
Sparkles,
@ -33,7 +33,7 @@ import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
import { api } from "@/shared/api";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore } from "@/features/workspace";
import { useWSEvent } from "@/features/realtime";
import { FileTree } from "./file-tree";
import { FileViewer } from "./file-viewer";
@ -600,14 +600,6 @@ export default function SkillsPage() {
}
}, [skills, selectedId]);
const handleRefresh = useCallback(() => {
refreshSkills();
}, [refreshSkills]);
useWSEvent("skill:created", handleRefresh);
useWSEvent("skill:updated", handleRefresh);
useWSEvent("skill:deleted", handleRefresh);
const handleCreate = async (data: CreateSkillRequest) => {
const skill = await api.createSkill(data);
upsertSkill(skill);

View file

@ -12,6 +12,7 @@ export class WSClient {
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private hasConnectedBefore = false;
private onReconnectCallbacks = new Set<() => void>();
private anyHandlers = new Set<(msg: WSMessage) => void>();
private logger: Logger;
constructor(url: string, options?: { logger?: Logger }) {
@ -54,8 +55,9 @@ export class WSClient {
for (const handler of eventHandlers) {
handler(msg.payload);
}
} else {
this.logger.debug("unhandled event", msg.type);
}
for (const handler of this.anyHandlers) {
handler(msg);
}
};
@ -83,6 +85,9 @@ export class WSClient {
this.ws = null;
}
this.hasConnectedBefore = false;
this.handlers.clear();
this.anyHandlers.clear();
this.onReconnectCallbacks.clear();
}
on(event: WSEventType, handler: EventHandler) {
@ -95,6 +100,13 @@ export class WSClient {
};
}
onAny(handler: (msg: WSMessage) => void) {
this.anyHandlers.add(handler);
return () => {
this.anyHandlers.delete(handler);
};
}
onReconnect(callback: () => void) {
this.onReconnectCallbacks.add(callback);
return () => {

View file

@ -171,6 +171,28 @@ func registerActivityListeners(bus *events.Bus, queries *db.Queries) {
}
}
if titleChanged, _ := payload["title_changed"].(bool); titleChanged {
prevTitle, _ := payload["prev_title"].(string)
details, _ := json.Marshal(map[string]string{
"from": prevTitle,
"to": issue.Title,
})
activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{
WorkspaceID: parseUUID(issue.WorkspaceID),
IssueID: parseUUID(issue.ID),
ActorType: util.StrToText(e.ActorType),
ActorID: parseUUID(e.ActorID),
Action: "title_changed",
Details: details,
})
if err != nil {
slog.Error("activity: failed to record title change",
"issue_id", issue.ID, "error", err)
} else {
publishActivityEvent(bus, e, activity)
}
}
if descriptionChanged {
activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{
WorkspaceID: parseUUID(issue.WorkspaceID),

View file

@ -221,6 +221,57 @@ func TestActivityIssueUpdated_NoChangeFlags(t *testing.T) {
}
}
func TestActivityIssueUpdated_TitleChanged(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
registerActivityListeners(bus, queries)
issueID := createTestIssue(t, testWorkspaceID, testUserID)
t.Cleanup(func() {
cleanupActivities(t, issueID)
cleanupTestIssue(t, issueID)
})
bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: testWorkspaceID,
ActorType: "member",
ActorID: testUserID,
Payload: map[string]any{
"issue": handler.IssueResponse{
ID: issueID,
WorkspaceID: testWorkspaceID,
Title: "renamed issue",
Status: "todo",
Priority: "medium",
CreatorType: "member",
CreatorID: testUserID,
},
"title_changed": true,
"prev_title": "activity test issue",
},
})
activities := listActivitiesForIssue(t, queries, issueID)
if len(activities) != 1 {
t.Fatalf("expected 1 activity, got %d", len(activities))
}
if activities[0].Action != "title_changed" {
t.Fatalf("expected action 'title_changed', got %q", activities[0].Action)
}
var details map[string]string
if err := json.Unmarshal(activities[0].Details, &details); err != nil {
t.Fatalf("failed to unmarshal details: %v", err)
}
if details["from"] != "activity test issue" {
t.Fatalf("expected from 'activity test issue', got %q", details["from"])
}
if details["to"] != "renamed issue" {
t.Fatalf("expected to 'renamed issue', got %q", details["to"])
}
}
func TestActivityTaskCompleted(t *testing.T) {
queries := db.New(testPool)
bus := events.New()

View file

@ -6,57 +6,26 @@ import (
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// registerListeners wires up event bus listeners for WS broadcasting.
// Uses SubscribeAll to automatically broadcast ALL events to WebSocket clients,
// eliminating the need to maintain a manual event type list.
func registerListeners(bus *events.Bus, hub *realtime.Hub) {
allEvents := []string{
protocol.EventIssueCreated,
protocol.EventIssueUpdated,
protocol.EventIssueDeleted,
protocol.EventCommentCreated,
protocol.EventCommentUpdated,
protocol.EventCommentDeleted,
protocol.EventAgentStatus,
protocol.EventAgentCreated,
protocol.EventAgentDeleted,
protocol.EventTaskDispatch,
protocol.EventTaskProgress,
protocol.EventTaskCompleted,
protocol.EventTaskFailed,
protocol.EventInboxNew,
protocol.EventInboxRead,
protocol.EventInboxArchived,
protocol.EventInboxBatchRead,
protocol.EventInboxBatchArchived,
protocol.EventWorkspaceUpdated,
protocol.EventWorkspaceDeleted,
protocol.EventMemberAdded,
protocol.EventMemberUpdated,
protocol.EventMemberRemoved,
protocol.EventSubscriberAdded,
protocol.EventSubscriberRemoved,
protocol.EventActivityCreated,
}
for _, et := range allEvents {
eventType := et
bus.Subscribe(eventType, func(e events.Event) {
msg := map[string]any{
"type": eventType,
"payload": e.Payload,
}
data, err := json.Marshal(msg)
if err != nil {
slog.Error("failed to marshal event", "event_type", eventType, "error", err)
return
}
if e.WorkspaceID != "" {
hub.BroadcastToWorkspace(e.WorkspaceID, data)
} else {
hub.Broadcast(data)
}
})
}
bus.SubscribeAll(func(e events.Event) {
msg := map[string]any{
"type": e.Type,
"payload": e.Payload,
}
data, err := json.Marshal(msg)
if err != nil {
slog.Error("failed to marshal event", "event_type", e.Type, "error", err)
return
}
if e.WorkspaceID != "" {
hub.BroadcastToWorkspace(e.WorkspaceID, data)
} else {
hub.Broadcast(data)
}
})
}

View file

@ -19,8 +19,9 @@ type Handler func(Event)
// Bus is an in-process synchronous pub/sub event bus.
type Bus struct {
mu sync.RWMutex
listeners map[string][]Handler
mu sync.RWMutex
listeners map[string][]Handler
globalHandlers []Handler
}
// New creates a new event bus.
@ -38,12 +39,22 @@ func (b *Bus) Subscribe(eventType string, h Handler) {
b.listeners[eventType] = append(b.listeners[eventType], h)
}
// SubscribeAll registers a handler that receives ALL events regardless of type.
// Global handlers are called after type-specific handlers.
func (b *Bus) SubscribeAll(h Handler) {
b.mu.Lock()
defer b.mu.Unlock()
b.globalHandlers = append(b.globalHandlers, h)
}
// Publish dispatches an event to all registered handlers for that event type.
// Type-specific handlers run first, then global (SubscribeAll) handlers.
// Each handler is called synchronously. Panics in individual handlers are
// recovered so one failing handler does not prevent others from executing.
func (b *Bus) Publish(e Event) {
b.mu.RLock()
handlers := b.listeners[e.Type]
globals := b.globalHandlers
b.mu.RUnlock()
for _, h := range handlers {
@ -56,4 +67,15 @@ func (b *Bus) Publish(e Event) {
h(e)
}()
}
for _, h := range globals {
func() {
defer func() {
if r := recover(); r != nil {
slog.Error("panic in global event listener", "event_type", e.Type, "recovered", r)
}
}()
h(e)
}()
}
}

View file

@ -62,6 +62,62 @@ func TestPanicInHandlerDoesNotBreakOthers(t *testing.T) {
}
}
func TestSubscribeAllReceivesAllEventTypes(t *testing.T) {
bus := New()
var received []string
bus.SubscribeAll(func(e Event) {
received = append(received, e.Type)
})
bus.Publish(Event{Type: "issue:created"})
bus.Publish(Event{Type: "comment:deleted"})
bus.Publish(Event{Type: "skill:updated"})
if len(received) != 3 {
t.Fatalf("expected 3 events, got %d", len(received))
}
if received[0] != "issue:created" || received[1] != "comment:deleted" || received[2] != "skill:updated" {
t.Fatalf("unexpected events: %v", received)
}
}
func TestSubscribeAllCalledAfterTypeSpecific(t *testing.T) {
bus := New()
var order []string
bus.Subscribe("issue:created", func(e Event) {
order = append(order, "specific")
})
bus.SubscribeAll(func(e Event) {
order = append(order, "global")
})
bus.Publish(Event{Type: "issue:created"})
if len(order) != 2 || order[0] != "specific" || order[1] != "global" {
t.Fatalf("expected [specific, global], got %v", order)
}
}
func TestSubscribeAllPanicRecovery(t *testing.T) {
bus := New()
var secondCalled bool
bus.SubscribeAll(func(e Event) {
panic("test panic")
})
bus.SubscribeAll(func(e Event) {
secondCalled = true
})
bus.Publish(Event{Type: "test"})
if !secondCalled {
t.Fatal("second global handler was not called after first panicked")
}
}
func TestEventFieldsPassedThrough(t *testing.T) {
bus := New()
var received Event

View file

@ -334,6 +334,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
statusChanged := req.Status != nil && prevIssue.Status != issue.Status
priorityChanged := req.Priority != nil && prevIssue.Priority != issue.Priority
descriptionChanged := req.Description != nil && textToPtr(prevIssue.Description) != resp.Description
titleChanged := req.Title != nil && prevIssue.Title != issue.Title
prevDueDate := timestampToPtr(prevIssue.DueDate)
dueDateChanged := prevDueDate != resp.DueDate && (prevDueDate == nil) != (resp.DueDate == nil) ||
(prevDueDate != nil && resp.DueDate != nil && *prevDueDate != *resp.DueDate)
@ -345,6 +346,8 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
"priority_changed": priorityChanged,
"due_date_changed": dueDateChanged,
"description_changed": descriptionChanged,
"title_changed": titleChanged,
"prev_title": prevIssue.Title,
"prev_assignee_type": textToPtr(prevIssue.AssigneeType),
"prev_assignee_id": uuidToPtr(prevIssue.AssigneeID),
"prev_status": prevIssue.Status,

View file

@ -14,6 +14,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// --- Response structs ---
@ -266,7 +267,7 @@ func (h *Handler) CreateSkill(w http.ResponseWriter, r *http.Request) {
SkillResponse: skillToResponse(skill),
Files: fileResps,
}
h.publish("skill:created", workspaceID, "member", creatorID, map[string]any{"skill": resp})
h.publish(protocol.EventSkillCreated, workspaceID, "member", creatorID, map[string]any{"skill": resp})
writeJSON(w, http.StatusCreated, resp)
}
@ -366,7 +367,7 @@ func (h *Handler) UpdateSkill(w http.ResponseWriter, r *http.Request) {
SkillResponse: skillToResponse(skill),
Files: fileResps,
}
h.publish("skill:updated", resolveWorkspaceID(r), "member", requestUserID(r), map[string]any{"skill": resp})
h.publish(protocol.EventSkillUpdated, resolveWorkspaceID(r), "member", requestUserID(r), map[string]any{"skill": resp})
writeJSON(w, http.StatusOK, resp)
}
@ -384,7 +385,7 @@ func (h *Handler) DeleteSkill(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "failed to delete skill")
return
}
h.publish("skill:deleted", uuidToString(skill.WorkspaceID), "member", requestUserID(r), map[string]any{"skill_id": id})
h.publish(protocol.EventSkillDeleted, uuidToString(skill.WorkspaceID), "member", requestUserID(r), map[string]any{"skill_id": id})
w.WriteHeader(http.StatusNoContent)
}
@ -854,7 +855,7 @@ func (h *Handler) ImportSkill(w http.ResponseWriter, r *http.Request) {
SkillResponse: skillToResponse(skill),
Files: fileResps,
}
h.publish("skill:created", workspaceID, "member", creatorID, map[string]any{"skill": resp})
h.publish(protocol.EventSkillCreated, workspaceID, "member", creatorID, map[string]any{"skill": resp})
writeJSON(w, http.StatusCreated, resp)
}
@ -1010,6 +1011,6 @@ func (h *Handler) SetAgentSkills(w http.ResponseWriter, r *http.Request) {
for i, s := range skills {
resp[i] = skillToResponse(s)
}
h.publish("agent:status", uuidToString(agent.WorkspaceID), "member", requestUserID(r), map[string]any{"agent_id": uuidToString(agent.ID), "skills": resp})
h.publish(protocol.EventAgentStatus, uuidToString(agent.WorkspaceID), "member", requestUserID(r), map[string]any{"agent_id": uuidToString(agent.ID), "skills": resp})
writeJSON(w, http.StatusOK, resp)
}

View file

@ -46,6 +46,11 @@ const (
// Activity events
EventActivityCreated = "activity:created"
// Skill events
EventSkillCreated = "skill:created"
EventSkillUpdated = "skill:updated"
EventSkillDeleted = "skill:deleted"
// Daemon events
EventDaemonHeartbeat = "daemon:heartbeat"
EventDaemonRegister = "daemon:register"