Merge pull request #207 from multica-ai/feature/ws-self-event-filtering

feat(realtime): WS self-event filtering, issue-detail refactor, sync gap fixes
This commit is contained in:
Naiyuan Qing 2026-03-31 13:19:39 +08:00 committed by GitHub
commit 5517136d73
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 731 additions and 421 deletions

View file

@ -217,11 +217,13 @@ export default function InboxPage() {
const handleSelect = async (item: InboxItem) => {
setSelectedId(item.id);
if (!item.read) {
useInboxStore.getState().markRead(item.id);
try {
await api.markInboxRead(item.id);
useInboxStore.getState().markRead(item.id);
} catch {
// silent — selection still works even if mark-read fails
// Rollback: refetch to get server truth
useInboxStore.getState().fetch();
toast.error("Failed to mark as read");
}
}
};

View file

@ -86,13 +86,16 @@ const stableStoreIssues = vi.hoisted(() => [
},
]);
vi.mock("@/features/issues", () => ({
useIssueStore: (selector: (s: any) => any) =>
selector({ issues: stableStoreIssues }),
useIssueStore: Object.assign(
(selector: (s: any) => any) => selector({ issues: stableStoreIssues }),
{ getState: () => ({ issues: stableStoreIssues, addIssue: vi.fn(), updateIssue: vi.fn(), removeIssue: vi.fn() }) },
),
}));
// Mock ws-context
vi.mock("@/features/realtime", () => ({
useWSEvent: () => {},
useWSReconnect: () => {},
}));
// Mock calendar (react-day-picker needs browser APIs)
@ -270,8 +273,9 @@ describe("IssueDetailPage", () => {
});
it("shows 'Issue not found' for missing issue", async () => {
mockGetIssue.mockRejectedValueOnce(new Error("Not found"));
mockListTimeline.mockRejectedValueOnce(new Error("Not found"));
// issue-detail fetches getIssue, useIssueReactions also fetches getIssue
mockGetIssue.mockRejectedValue(new Error("Not found"));
mockListTimeline.mockRejectedValue(new Error("Not found"));
await renderPage("nonexistent-id");
await waitFor(() => {

View file

@ -48,7 +48,8 @@ vi.mock("@/features/workspace", () => ({
// Mock WebSocket context
vi.mock("@/features/realtime", () => ({
useWSEvent: vi.fn(),
useWS: () => ({ subscribe: vi.fn(() => () => {}) }),
useWSReconnect: vi.fn(),
useWS: () => ({ subscribe: vi.fn(() => () => {}), onReconnect: vi.fn(() => () => {}) }),
WSProvider: ({ children }: { children: React.ReactNode }) => children,
}));

View file

@ -1,6 +1,6 @@
"use client";
import { useCallback } from "react";
import { useCallback, memo } from "react";
import Link from "next/link";
import { useSortable } from "@dnd-kit/sortable";
import { CSS } from "@dnd-kit/utilities";
@ -35,7 +35,7 @@ function PickerWrapper({ children }: { children: React.ReactNode }) {
);
}
export function BoardCardContent({
export const BoardCardContent = memo(function BoardCardContent({
issue,
editable = false,
}: {
@ -47,12 +47,14 @@ export function BoardCardContent({
const handleUpdate = useCallback(
(updates: Partial<UpdateIssueRequest>) => {
const prev = { ...issue };
useIssueStore.getState().updateIssue(issue.id, updates);
api.updateIssue(issue.id, updates).catch(() => {
useIssueStore.getState().updateIssue(issue.id, prev);
toast.error("Failed to update issue");
});
},
[issue.id]
[issue],
);
const showPriority = storeProperties.priority;
@ -163,9 +165,9 @@ export function BoardCardContent({
)}
</div>
);
}
});
export function DraggableBoardCard({ issue }: { issue: Issue }) {
export const DraggableBoardCard = memo(function DraggableBoardCard({ issue }: { issue: Issue }) {
const {
attributes,
listeners,
@ -199,4 +201,4 @@ export function DraggableBoardCard({ issue }: { issue: Issue }) {
</Link>
</div>
);
}
});

View file

@ -1,6 +1,6 @@
"use client";
import { useState, useEffect, useCallback, useRef } from "react";
import { useState, useEffect, useCallback, useRef, memo } from "react";
import { useDefaultLayout, usePanelRef } from "react-resizable-panels";
import Link from "next/link";
import { useRouter } from "next/navigation";
@ -55,7 +55,7 @@ import { Checkbox } from "@/components/ui/checkbox";
import { Command, CommandInput, CommandList, CommandEmpty, CommandGroup, CommandItem } from "@/components/ui/command";
import { Avatar, AvatarFallback, AvatarGroup, AvatarGroupCount } from "@/components/ui/avatar";
import { ActorAvatar } from "@/components/common/actor-avatar";
import type { Issue, IssueReaction, Comment, IssueSubscriber, UpdateIssueRequest, IssueStatus, IssuePriority, TimelineEntry } from "@/shared/types";
import type { UpdateIssueRequest, IssueStatus, IssuePriority, TimelineEntry } from "@/shared/types";
import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/features/issues/config";
import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components";
import { CommentCard } from "./comment-card";
@ -64,9 +64,10 @@ import { AgentLiveCard, TaskRunHistory } from "./agent-live-card";
import { api } from "@/shared/api";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore, useActorName } from "@/features/workspace";
import { useWSEvent } from "@/features/realtime";
import { useIssueStore } from "@/features/issues";
import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload, SubscriberAddedPayload, SubscriberRemovedPayload, ActivityCreatedPayload, ReactionAddedPayload, ReactionRemovedPayload, IssueReactionAddedPayload, IssueReactionRemovedPayload } from "@/shared/types";
import { useIssueTimeline } from "@/features/issues/hooks/use-issue-timeline";
import { useIssueReactions } from "@/features/issues/hooks/use-issue-reactions";
import { useIssueSubscribers } from "@/features/issues/hooks/use-issue-subscribers";
import { ReactionBar } from "@/components/common/reaction-bar";
import { timeAgo } from "@/shared/utils";
@ -126,20 +127,6 @@ function formatActivity(
}
}
function commentToTimelineEntry(c: Comment): TimelineEntry {
return {
type: "comment",
id: c.id,
actor_type: c.author_type,
actor_id: c.author_id,
content: c.content,
parent_id: c.parent_id,
created_at: c.created_at,
updated_at: c.updated_at,
comment_type: c.type,
reactions: c.reactions ?? [],
};
}
// ---------------------------------------------------------------------------
// Property row
@ -197,12 +184,6 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
});
const sidebarRef = usePanelRef();
const [sidebarOpen, setSidebarOpen] = useState(defaultSidebarOpen);
const [issue, setIssue] = useState<Issue | null>(null);
const [issueReactions, setIssueReactions] = useState<IssueReaction[]>([]);
const [timeline, setTimeline] = useState<TimelineEntry[]>([]);
const [subscribers, setSubscribers] = useState<IssueSubscriber[]>([]);
const [loading, setLoading] = useState(true);
const [submitting, setSubmitting] = useState(false);
const [deleting, setDeleting] = useState(false);
const [titleDraft, setTitleDraft] = useState("");
const titleFocusedRef = useRef(false);
@ -210,123 +191,58 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
const [propertiesOpen, setPropertiesOpen] = useState(true);
const [detailsOpen, setDetailsOpen] = useState(true);
// Watch the global issue store for real-time updates from other users/agents
const storeIssue = useIssueStore((s) => s.issues.find((i) => i.id === id));
const wasLoadedRef = useRef(false);
// Single source of truth: read issue directly from global store
const issue = useIssueStore((s) => s.issues.find((i) => i.id === id)) ?? null;
const [issueLoading, setIssueLoading] = useState(!issue);
// If issue isn't in the store yet, fetch and upsert it
useEffect(() => {
if (storeIssue) {
wasLoadedRef.current = true;
setIssue(storeIssue);
if (!titleFocusedRef.current) {
setTitleDraft(storeIssue.title);
}
} else if (wasLoadedRef.current && !loading) {
// Issue was in the store but is now gone (deleted by another user)
setIssue(null);
if (issue) {
setIssueLoading(false);
return;
}
}, [storeIssue, loading]);
useEffect(() => {
wasLoadedRef.current = false;
setIssue(null);
setTitleDraft("");
setIssueReactions([]);
setTimeline([]);
setSubscribers([]);
setLoading(true);
Promise.all([api.getIssue(id), api.listTimeline(id), api.listIssueSubscribers(id)])
.then(([iss, entries, subs]) => {
setIssue(iss);
setIssueReactions(iss.reactions ?? []);
setTitleDraft(iss.title);
setTimeline(entries);
setSubscribers(subs);
setIssueLoading(true);
api
.getIssue(id)
.then((iss) => {
useIssueStore.getState().addIssue(iss);
})
.catch(console.error)
.finally(() => setLoading(false));
}, [id]);
.finally(() => setIssueLoading(false));
}, [id, !!issue]);
const handleSubmitComment = async (content: string) => {
if (!content.trim() || submitting || !user) return;
const tempId = "temp-" + Date.now();
const tempEntry: TimelineEntry = {
type: "comment",
id: tempId,
actor_type: "member",
actor_id: user.id,
content,
parent_id: null,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
comment_type: "comment",
};
setTimeline((prev) => [...prev, tempEntry]);
setSubmitting(true);
try {
const comment = await api.createComment(id, content);
setTimeline((prev) => prev.map((e) => (e.id === tempId ? commentToTimelineEntry(comment) : e)));
} catch {
setTimeline((prev) => prev.filter((e) => e.id !== tempId));
toast.error("Failed to send comment");
} finally {
setSubmitting(false);
// Sync titleDraft when issue title changes (from WS or other views)
useEffect(() => {
if (issue && !titleFocusedRef.current) {
setTitleDraft(issue.title);
}
};
}, [issue?.title]);
const handleSubmitReply = async (parentId: string, content: string) => {
if (!content.trim() || !user) return;
try {
const comment = await api.createComment(id, content, "comment", parentId);
setTimeline((prev) => {
if (prev.some((e) => e.id === comment.id)) return prev;
return [...prev, commentToTimelineEntry(comment)];
});
} catch {
toast.error("Failed to send reply");
}
};
// Custom hooks — encapsulate timeline, reactions, subscribers
const {
timeline, submitting, submitComment, submitReply,
editComment, deleteComment, toggleReaction: handleToggleReaction,
} = useIssueTimeline(id, user?.id);
const handleEditComment = async (commentId: string, content: string) => {
try {
const updated = await api.updateComment(commentId, content);
setTimeline((prev) => prev.map((e) => (e.id === updated.id ? commentToTimelineEntry(updated) : e)));
} catch {
toast.error("Failed to update comment");
}
};
const {
reactions: issueReactions,
toggleReaction: handleToggleIssueReaction,
} = useIssueReactions(id, user?.id);
const handleDeleteComment = async (commentId: string) => {
try {
await api.deleteComment(commentId);
setTimeline((prev) => {
const idsToRemove = new Set<string>([commentId]);
// Recursively collect all descendant IDs
let added = true;
while (added) {
added = false;
for (const e of prev) {
if (e.parent_id && idsToRemove.has(e.parent_id) && !idsToRemove.has(e.id)) {
idsToRemove.add(e.id);
added = true;
}
}
}
return prev.filter((e) => !idsToRemove.has(e.id));
});
} catch {
toast.error("Failed to delete comment");
}
};
const {
subscribers, isSubscribed, toggleSubscribe: handleToggleSubscribe, toggleSubscriber,
} = useIssueSubscribers(id, user?.id);
const loading = issueLoading;
// Issue field updates — write directly to the global store (single source of truth)
const handleUpdateField = useCallback(
(updates: Partial<UpdateIssueRequest>) => {
if (!issue) return;
const prev = issue;
setIssue((curr) => (curr ? ({ ...curr, ...updates } as Issue) : curr));
const prev = { ...issue };
useIssueStore.getState().updateIssue(id, updates);
api.updateIssue(id, updates).catch(() => {
setIssue(prev);
useIssueStore.getState().updateIssue(id, prev);
toast.error("Failed to update issue");
});
},
@ -337,6 +253,7 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
setDeleting(true);
try {
await api.deleteIssue(issue!.id);
useIssueStore.getState().removeIssue(issue!.id);
toast.success("Issue deleted");
if (onDelete) onDelete();
else router.push("/issues");
@ -346,275 +263,6 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
}
};
// Subscriber state
const isSubscribed = subscribers.some(
(s) => s.user_type === "member" && s.user_id === user?.id
);
const toggleSubscriber = async (userId: string, userType: "member" | "agent", currentlySubscribed: boolean) => {
if (!issue) return;
try {
if (currentlySubscribed) {
await api.unsubscribeFromIssue(id, userId, userType);
setSubscribers((prev) => prev.filter((s) => !(s.user_id === userId && s.user_type === userType)));
} else {
await api.subscribeToIssue(id, userId, userType);
setSubscribers((prev) => {
// Deduplicate: WS event may have already added this subscriber
if (prev.some((s) => s.user_id === userId && s.user_type === userType)) return prev;
return [...prev, { issue_id: id, user_type: userType, user_id: userId, reason: "manual" as const, created_at: new Date().toISOString() }];
});
}
} catch {
toast.error("Failed to update subscriber");
}
};
const handleToggleSubscribe = () => {
if (user) toggleSubscriber(user.id, "member", isSubscribed);
};
// Real-time comment updates
useWSEvent(
"comment:created",
useCallback((payload: unknown) => {
const { comment } = payload as CommentCreatedPayload;
if (comment.issue_id !== id) return;
// Skip own comments — already added locally via API response
if (comment.author_type === "member" && comment.author_id === user?.id) return;
setTimeline((prev) => {
if (prev.some((e) => e.id === comment.id)) return prev;
return [...prev, commentToTimelineEntry(comment)];
});
}, [id, user?.id]),
);
useWSEvent(
"comment:updated",
useCallback((payload: unknown) => {
const { comment } = payload as CommentUpdatedPayload;
if (comment.issue_id === id) {
setTimeline((prev) => prev.map((e) => (e.id === comment.id ? commentToTimelineEntry(comment) : e)));
}
}, [id]),
);
useWSEvent(
"comment:deleted",
useCallback((payload: unknown) => {
const { comment_id, issue_id } = payload as CommentDeletedPayload;
if (issue_id === id) {
setTimeline((prev) => {
const idsToRemove = new Set<string>([comment_id]);
let added = true;
while (added) {
added = false;
for (const e of prev) {
if (e.parent_id && idsToRemove.has(e.parent_id) && !idsToRemove.has(e.id)) {
idsToRemove.add(e.id);
added = true;
}
}
}
return prev.filter((e) => !idsToRemove.has(e.id));
});
}
}, [id]),
);
useWSEvent(
"activity:created",
useCallback((payload: unknown) => {
const p = payload as ActivityCreatedPayload;
if (p.issue_id !== id) return;
const entry = p.entry;
if (!entry || !entry.id) return;
setTimeline((prev) => {
if (prev.some((e) => e.id === entry.id)) return prev;
return [...prev, entry];
});
}, [id]),
);
// Real-time reaction updates
useWSEvent(
"reaction:added",
useCallback((payload: unknown) => {
const { reaction, issue_id } = payload as ReactionAddedPayload;
if (issue_id !== id) return;
// Skip own reactions — already added optimistically
if (reaction.actor_type === "member" && reaction.actor_id === user?.id) return;
setTimeline((prev) => prev.map((e) => {
if (e.id !== reaction.comment_id) return e;
const existing = e.reactions ?? [];
if (existing.some((r) => r.id === reaction.id)) return e;
return { ...e, reactions: [...existing, reaction] };
}));
}, [id, user?.id]),
);
useWSEvent(
"reaction:removed",
useCallback((payload: unknown) => {
const p = payload as ReactionRemovedPayload;
if (p.issue_id !== id) return;
// Skip own removals — already removed optimistically
if (p.actor_type === "member" && p.actor_id === user?.id) return;
setTimeline((prev) => prev.map((e) => {
if (e.id !== p.comment_id) return e;
return {
...e,
reactions: (e.reactions ?? []).filter(
(r) => !(r.emoji === p.emoji && r.actor_type === p.actor_type && r.actor_id === p.actor_id),
),
};
}));
}, [id, user?.id]),
);
// Real-time issue reaction updates
useWSEvent(
"issue_reaction:added",
useCallback((payload: unknown) => {
const { reaction, issue_id } = payload as IssueReactionAddedPayload;
if (issue_id !== id) return;
if (reaction.actor_type === "member" && reaction.actor_id === user?.id) return;
setIssueReactions((prev) => {
if (prev.some((r) => r.id === reaction.id)) return prev;
return [...prev, reaction];
});
}, [id, user?.id]),
);
useWSEvent(
"issue_reaction:removed",
useCallback((payload: unknown) => {
const p = payload as IssueReactionRemovedPayload;
if (p.issue_id !== id) return;
if (p.actor_type === "member" && p.actor_id === user?.id) return;
setIssueReactions((prev) =>
prev.filter((r) => !(r.emoji === p.emoji && r.actor_type === p.actor_type && r.actor_id === p.actor_id)),
);
}, [id, user?.id]),
);
const handleToggleIssueReaction = async (emoji: string) => {
if (!user) return;
const existing = issueReactions.find(
(r) => r.emoji === emoji && r.actor_type === "member" && r.actor_id === user.id,
);
if (existing) {
setIssueReactions((prev) => prev.filter((r) => r.id !== existing.id));
try {
await api.removeIssueReaction(id, emoji);
} catch {
setIssueReactions((prev) => [...prev, existing]);
toast.error("Failed to remove reaction");
}
} else {
const temp = {
id: `temp-${Date.now()}`,
issue_id: id,
actor_type: "member",
actor_id: user.id,
emoji,
created_at: new Date().toISOString(),
};
setIssueReactions((prev) => [...prev, temp]);
try {
const reaction = await api.addIssueReaction(id, emoji);
setIssueReactions((prev) => prev.map((r) => (r.id === temp.id ? reaction : r)));
} catch {
setIssueReactions((prev) => prev.filter((r) => r.id !== temp.id));
toast.error("Failed to add reaction");
}
}
};
const handleToggleReaction = async (commentId: string, emoji: string) => {
if (!user) return;
const entry = timeline.find((e) => e.id === commentId);
const existing = (entry?.reactions ?? []).find(
(r) => r.emoji === emoji && r.actor_type === "member" && r.actor_id === user.id,
);
if (existing) {
// Optimistic remove
setTimeline((prev) => prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: (e.reactions ?? []).filter((r) => r.id !== existing.id) };
}));
try {
await api.removeReaction(commentId, emoji);
} catch {
// Rollback
setTimeline((prev) => prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: [...(e.reactions ?? []), existing] };
}));
toast.error("Failed to remove reaction");
}
} else {
// Optimistic add
const tempReaction = {
id: `temp-${Date.now()}`,
comment_id: commentId,
actor_type: "member",
actor_id: user.id,
emoji,
created_at: new Date().toISOString(),
};
setTimeline((prev) => prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: [...(e.reactions ?? []), tempReaction] };
}));
try {
const reaction = await api.addReaction(commentId, emoji);
setTimeline((prev) => prev.map((e) => {
if (e.id !== commentId) return e;
return {
...e,
reactions: (e.reactions ?? []).map((r) => (r.id === tempReaction.id ? reaction : r)),
};
}));
} catch {
// Rollback
setTimeline((prev) => prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: (e.reactions ?? []).filter((r) => r.id !== tempReaction.id) };
}));
toast.error("Failed to add reaction");
}
}
};
// Real-time subscriber updates
useWSEvent(
"subscriber:added",
useCallback((payload: unknown) => {
const p = payload as SubscriberAddedPayload;
if (p.issue_id !== id) return;
setSubscribers((prev) => {
if (prev.some((s) => s.user_id === p.user_id && s.user_type === p.user_type)) return prev;
return [...prev, {
issue_id: p.issue_id,
user_type: p.user_type as "member" | "agent",
user_id: p.user_id,
reason: p.reason as IssueSubscriber["reason"],
created_at: new Date().toISOString(),
}];
});
}, [id]),
);
useWSEvent(
"subscriber:removed",
useCallback((payload: unknown) => {
const p = payload as SubscriberRemovedPayload;
if (p.issue_id !== id) return;
setSubscribers((prev) => prev.filter((s) => !(s.user_id === p.user_id && s.user_type === p.user_type)));
}, [id]),
);
if (loading) {
return (
<div className="flex flex-1 min-h-0 items-center justify-center text-sm text-muted-foreground">
@ -922,6 +570,7 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
/>
<RichTextEditor
key={id}
defaultValue={issue.description || ""}
placeholder="Add description..."
onUpdate={(md) => handleUpdateField({ description: md || undefined })}
@ -1095,9 +744,9 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
entry={entry}
allReplies={repliesByParent}
currentUserId={user?.id}
onReply={handleSubmitReply}
onEdit={handleEditComment}
onDelete={handleDeleteComment}
onReply={submitReply}
onEdit={editComment}
onDelete={deleteComment}
onToggleReaction={handleToggleReaction}
/>
);
@ -1154,7 +803,7 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
{/* Bottom comment input — no avatar, full width */}
<div className="mt-4">
<CommentInput onSubmit={handleSubmitComment} />
<CommentInput onSubmit={submitComment} />
</div>
</div>
</div>

View file

@ -1,5 +1,6 @@
"use client";
import { memo } from "react";
import Link from "next/link";
import type { Issue } from "@/shared/types";
import { ActorAvatar } from "@/components/common/actor-avatar";
@ -13,7 +14,7 @@ function formatDate(date: string): string {
});
}
export function ListRow({ issue }: { issue: Issue }) {
export const ListRow = memo(function ListRow({ issue }: { issue: Issue }) {
const selected = useIssueSelectionStore((s) => s.selectedIds.has(issue.id));
const toggle = useIssueSelectionStore((s) => s.toggle);
@ -60,4 +61,4 @@ export function ListRow({ issue }: { issue: Issue }) {
</Link>
</div>
);
}
});

View file

@ -0,0 +1,3 @@
export { useIssueTimeline } from "./use-issue-timeline";
export { useIssueReactions } from "./use-issue-reactions";
export { useIssueSubscribers } from "./use-issue-subscribers";

View file

@ -0,0 +1,109 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import type { IssueReaction } from "@/shared/types";
import type {
IssueReactionAddedPayload,
IssueReactionRemovedPayload,
} from "@/shared/types";
import { api } from "@/shared/api";
import { useWSEvent, useWSReconnect } from "@/features/realtime";
import { toast } from "sonner";
export function useIssueReactions(issueId: string, userId?: string) {
const [reactions, setReactions] = useState<IssueReaction[]>([]);
const [loading, setLoading] = useState(true);
// Initial fetch
useEffect(() => {
setReactions([]);
setLoading(true);
api
.getIssue(issueId)
.then((iss) => setReactions(iss.reactions ?? []))
.catch(console.error)
.finally(() => setLoading(false));
}, [issueId]);
// Reconnect recovery
useWSReconnect(
useCallback(() => {
api.getIssue(issueId).then((iss) => setReactions(iss.reactions ?? [])).catch(console.error);
}, [issueId]),
);
// --- WS event handlers ---
useWSEvent(
"issue_reaction:added",
useCallback(
(payload: unknown) => {
const { reaction, issue_id } = payload as IssueReactionAddedPayload;
if (issue_id !== issueId) return;
if (reaction.actor_type === "member" && reaction.actor_id === userId) return;
setReactions((prev) => {
if (prev.some((r) => r.id === reaction.id)) return prev;
return [...prev, reaction];
});
},
[issueId, userId],
),
);
useWSEvent(
"issue_reaction:removed",
useCallback(
(payload: unknown) => {
const p = payload as IssueReactionRemovedPayload;
if (p.issue_id !== issueId) return;
if (p.actor_type === "member" && p.actor_id === userId) return;
setReactions((prev) =>
prev.filter(
(r) => !(r.emoji === p.emoji && r.actor_type === p.actor_type && r.actor_id === p.actor_id),
),
);
},
[issueId, userId],
),
);
// --- Mutation ---
const toggleReaction = useCallback(
async (emoji: string) => {
if (!userId) return;
const existing = reactions.find(
(r) => r.emoji === emoji && r.actor_type === "member" && r.actor_id === userId,
);
if (existing) {
setReactions((prev) => prev.filter((r) => r.id !== existing.id));
try {
await api.removeIssueReaction(issueId, emoji);
} catch {
setReactions((prev) => [...prev, existing]);
toast.error("Failed to remove reaction");
}
} else {
const temp: IssueReaction = {
id: `temp-${Date.now()}`,
issue_id: issueId,
actor_type: "member",
actor_id: userId,
emoji,
created_at: new Date().toISOString(),
};
setReactions((prev) => [...prev, temp]);
try {
const reaction = await api.addIssueReaction(issueId, emoji);
setReactions((prev) => prev.map((r) => (r.id === temp.id ? reaction : r)));
} catch {
setReactions((prev) => prev.filter((r) => r.id !== temp.id));
toast.error("Failed to add reaction");
}
}
},
[issueId, userId, reactions],
);
return { reactions, loading, toggleReaction };
}

View file

@ -0,0 +1,128 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import type { IssueSubscriber } from "@/shared/types";
import type {
SubscriberAddedPayload,
SubscriberRemovedPayload,
} from "@/shared/types";
import { api } from "@/shared/api";
import { useWSEvent, useWSReconnect } from "@/features/realtime";
import { toast } from "sonner";
export function useIssueSubscribers(issueId: string, userId?: string) {
const [subscribers, setSubscribers] = useState<IssueSubscriber[]>([]);
const [loading, setLoading] = useState(true);
// Initial fetch
useEffect(() => {
setSubscribers([]);
setLoading(true);
api
.listIssueSubscribers(issueId)
.then((subs) => setSubscribers(subs))
.catch(console.error)
.finally(() => setLoading(false));
}, [issueId]);
// Reconnect recovery
useWSReconnect(
useCallback(() => {
api.listIssueSubscribers(issueId).then(setSubscribers).catch(console.error);
}, [issueId]),
);
// --- WS event handlers ---
useWSEvent(
"subscriber:added",
useCallback(
(payload: unknown) => {
const p = payload as SubscriberAddedPayload;
if (p.issue_id !== issueId) return;
setSubscribers((prev) => {
if (prev.some((s) => s.user_id === p.user_id && s.user_type === p.user_type)) return prev;
return [
...prev,
{
issue_id: p.issue_id,
user_type: p.user_type as "member" | "agent",
user_id: p.user_id,
reason: p.reason as IssueSubscriber["reason"],
created_at: new Date().toISOString(),
},
];
});
},
[issueId],
),
);
useWSEvent(
"subscriber:removed",
useCallback(
(payload: unknown) => {
const p = payload as SubscriberRemovedPayload;
if (p.issue_id !== issueId) return;
setSubscribers((prev) =>
prev.filter((s) => !(s.user_id === p.user_id && s.user_type === p.user_type)),
);
},
[issueId],
),
);
// --- Mutations ---
const isSubscribed = subscribers.some(
(s) => s.user_type === "member" && s.user_id === userId,
);
const toggleSubscriber = useCallback(
async (subUserId: string, userType: "member" | "agent", currentlySubscribed: boolean) => {
if (currentlySubscribed) {
// Optimistic remove + rollback on error
const removed = subscribers.find(
(s) => s.user_id === subUserId && s.user_type === userType,
);
setSubscribers((prev) =>
prev.filter((s) => !(s.user_id === subUserId && s.user_type === userType)),
);
try {
await api.unsubscribeFromIssue(issueId, subUserId, userType);
} catch {
if (removed) setSubscribers((prev) => [...prev, removed]);
toast.error("Failed to update subscriber");
}
} else {
// Optimistic add
const tempSub: IssueSubscriber = {
issue_id: issueId,
user_type: userType,
user_id: subUserId,
reason: "manual" as const,
created_at: new Date().toISOString(),
};
setSubscribers((prev) => {
if (prev.some((s) => s.user_id === subUserId && s.user_type === userType)) return prev;
return [...prev, tempSub];
});
try {
await api.subscribeToIssue(issueId, subUserId, userType);
} catch {
setSubscribers((prev) =>
prev.filter((s) => !(s.user_id === subUserId && s.user_type === userType && s.reason === "manual")),
);
toast.error("Failed to update subscriber");
}
}
},
[issueId, subscribers],
);
const toggleSubscribe = useCallback(() => {
if (userId) toggleSubscriber(userId, "member", isSubscribed);
}, [userId, isSubscribed, toggleSubscriber]);
return { subscribers, loading, isSubscribed, toggleSubscribe, toggleSubscriber };
}

View file

@ -0,0 +1,371 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import type { Comment, TimelineEntry } from "@/shared/types";
import type {
CommentCreatedPayload,
CommentUpdatedPayload,
CommentDeletedPayload,
ActivityCreatedPayload,
ReactionAddedPayload,
ReactionRemovedPayload,
} from "@/shared/types";
import { api } from "@/shared/api";
import { useWSEvent, useWSReconnect } from "@/features/realtime";
import { toast } from "sonner";
function commentToTimelineEntry(c: Comment): TimelineEntry {
return {
type: "comment",
id: c.id,
actor_type: c.author_type,
actor_id: c.author_id,
content: c.content,
parent_id: c.parent_id,
created_at: c.created_at,
updated_at: c.updated_at,
comment_type: c.type,
reactions: c.reactions ?? [],
};
}
export function useIssueTimeline(issueId: string, userId?: string) {
const [timeline, setTimeline] = useState<TimelineEntry[]>([]);
const [submitting, setSubmitting] = useState(false);
const [loading, setLoading] = useState(true);
// Initial fetch + reset on id change
useEffect(() => {
setTimeline([]);
setLoading(true);
api
.listTimeline(issueId)
.then((entries) => setTimeline(entries))
.catch(console.error)
.finally(() => setLoading(false));
}, [issueId]);
// Reconnect recovery
useWSReconnect(
useCallback(() => {
api.listTimeline(issueId).then(setTimeline).catch(console.error);
}, [issueId]),
);
// --- WS event handlers ---
useWSEvent(
"comment:created",
useCallback(
(payload: unknown) => {
const { comment } = payload as CommentCreatedPayload;
if (comment.issue_id !== issueId) return;
if (comment.author_type === "member" && comment.author_id === userId) return;
setTimeline((prev) => {
if (prev.some((e) => e.id === comment.id)) return prev;
return [...prev, commentToTimelineEntry(comment)];
});
},
[issueId, userId],
),
);
useWSEvent(
"comment:updated",
useCallback(
(payload: unknown) => {
const { comment } = payload as CommentUpdatedPayload;
if (comment.issue_id === issueId) {
setTimeline((prev) =>
prev.map((e) => (e.id === comment.id ? commentToTimelineEntry(comment) : e)),
);
}
},
[issueId],
),
);
useWSEvent(
"comment:deleted",
useCallback(
(payload: unknown) => {
const { comment_id, issue_id } = payload as CommentDeletedPayload;
if (issue_id === issueId) {
setTimeline((prev) => {
const idsToRemove = new Set<string>([comment_id]);
let added = true;
while (added) {
added = false;
for (const e of prev) {
if (e.parent_id && idsToRemove.has(e.parent_id) && !idsToRemove.has(e.id)) {
idsToRemove.add(e.id);
added = true;
}
}
}
return prev.filter((e) => !idsToRemove.has(e.id));
});
}
},
[issueId],
),
);
useWSEvent(
"activity:created",
useCallback(
(payload: unknown) => {
const p = payload as ActivityCreatedPayload;
if (p.issue_id !== issueId) return;
const entry = p.entry;
if (!entry || !entry.id) return;
setTimeline((prev) => {
if (prev.some((e) => e.id === entry.id)) return prev;
return [...prev, entry];
});
},
[issueId],
),
);
useWSEvent(
"reaction:added",
useCallback(
(payload: unknown) => {
const { reaction, issue_id } = payload as ReactionAddedPayload;
if (issue_id !== issueId) return;
if (reaction.actor_type === "member" && reaction.actor_id === userId) return;
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== reaction.comment_id) return e;
const existing = e.reactions ?? [];
if (existing.some((r) => r.id === reaction.id)) return e;
return { ...e, reactions: [...existing, reaction] };
}),
);
},
[issueId, userId],
),
);
useWSEvent(
"reaction:removed",
useCallback(
(payload: unknown) => {
const p = payload as ReactionRemovedPayload;
if (p.issue_id !== issueId) return;
if (p.actor_type === "member" && p.actor_id === userId) return;
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== p.comment_id) return e;
return {
...e,
reactions: (e.reactions ?? []).filter(
(r) => !(r.emoji === p.emoji && r.actor_type === p.actor_type && r.actor_id === p.actor_id),
),
};
}),
);
},
[issueId, userId],
),
);
// --- Mutation functions ---
const submitComment = useCallback(
async (content: string) => {
if (!content.trim() || submitting || !userId) return;
const tempId = "temp-" + Date.now();
const tempEntry: TimelineEntry = {
type: "comment",
id: tempId,
actor_type: "member",
actor_id: userId,
content,
parent_id: null,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
comment_type: "comment",
};
setTimeline((prev) => [...prev, tempEntry]);
setSubmitting(true);
try {
const comment = await api.createComment(issueId, content);
setTimeline((prev) =>
prev.map((e) => (e.id === tempId ? commentToTimelineEntry(comment) : e)),
);
} catch {
setTimeline((prev) => prev.filter((e) => e.id !== tempId));
toast.error("Failed to send comment");
} finally {
setSubmitting(false);
}
},
[issueId, userId, submitting],
);
const submitReply = useCallback(
async (parentId: string, content: string) => {
if (!content.trim() || !userId) return;
const tempId = "temp-" + Date.now();
const tempEntry: TimelineEntry = {
type: "comment",
id: tempId,
actor_type: "member",
actor_id: userId,
content,
parent_id: parentId,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
comment_type: "comment",
};
setTimeline((prev) => [...prev, tempEntry]);
try {
const comment = await api.createComment(issueId, content, "comment", parentId);
setTimeline((prev) =>
prev.map((e) => (e.id === tempId ? commentToTimelineEntry(comment) : e)),
);
} catch {
setTimeline((prev) => prev.filter((e) => e.id !== tempId));
toast.error("Failed to send reply");
}
},
[issueId, userId],
);
const editComment = useCallback(
async (commentId: string, content: string) => {
// Optimistic: update content immediately
let prevContent: string | undefined;
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== commentId) return e;
prevContent = e.content;
return { ...e, content, updated_at: new Date().toISOString() };
}),
);
try {
const updated = await api.updateComment(commentId, content);
setTimeline((prev) =>
prev.map((e) => (e.id === updated.id ? commentToTimelineEntry(updated) : e)),
);
} catch {
// Rollback
if (prevContent !== undefined) {
setTimeline((prev) =>
prev.map((e) => (e.id === commentId ? { ...e, content: prevContent! } : e)),
);
}
toast.error("Failed to update comment");
}
},
[],
);
const deleteComment = useCallback(
async (commentId: string) => {
// Capture entries for rollback
let removedEntries: TimelineEntry[] = [];
setTimeline((prev) => {
const idsToRemove = new Set<string>([commentId]);
let added = true;
while (added) {
added = false;
for (const e of prev) {
if (e.parent_id && idsToRemove.has(e.parent_id) && !idsToRemove.has(e.id)) {
idsToRemove.add(e.id);
added = true;
}
}
}
removedEntries = prev.filter((e) => idsToRemove.has(e.id));
return prev.filter((e) => !idsToRemove.has(e.id));
});
try {
await api.deleteComment(commentId);
} catch {
// Rollback: re-add removed entries
setTimeline((prev) => [...prev, ...removedEntries]);
toast.error("Failed to delete comment");
}
},
[],
);
const toggleReaction = useCallback(
async (commentId: string, emoji: string) => {
if (!userId) return;
const entry = timeline.find((e) => e.id === commentId);
const existing = (entry?.reactions ?? []).find(
(r) => r.emoji === emoji && r.actor_type === "member" && r.actor_id === userId,
);
if (existing) {
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: (e.reactions ?? []).filter((r) => r.id !== existing.id) };
}),
);
try {
await api.removeReaction(commentId, emoji);
} catch {
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: [...(e.reactions ?? []), existing] };
}),
);
toast.error("Failed to remove reaction");
}
} else {
const tempReaction = {
id: `temp-${Date.now()}`,
comment_id: commentId,
actor_type: "member",
actor_id: userId,
emoji,
created_at: new Date().toISOString(),
};
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: [...(e.reactions ?? []), tempReaction] };
}),
);
try {
const reaction = await api.addReaction(commentId, emoji);
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== commentId) return e;
return {
...e,
reactions: (e.reactions ?? []).map((r) => (r.id === tempReaction.id ? reaction : r)),
};
}),
);
} catch {
setTimeline((prev) =>
prev.map((e) => {
if (e.id !== commentId) return e;
return { ...e, reactions: (e.reactions ?? []).filter((r) => r.id !== tempReaction.id) };
}),
);
toast.error("Failed to add reaction");
}
}
},
[userId, timeline],
);
return {
timeline,
loading,
submitting,
submitComment,
submitReply,
editComment,
deleteComment,
toggleReaction,
};
}

View file

@ -18,3 +18,16 @@ export function useWSEvent(event: WSEventType, handler: EventHandler) {
return unsub;
}, [event, handler, subscribe]);
}
/**
* Hook that registers a callback to run on WebSocket reconnection.
* Useful for refetching component-local data after a network interruption.
*/
export function useWSReconnect(callback: () => void) {
const { onReconnect } = useWS();
useEffect(() => {
const unsub = onReconnect(callback);
return unsub;
}, [callback, onReconnect]);
}

View file

@ -1,2 +1,2 @@
export { WSProvider, useWS } from "./provider";
export { useWSEvent } from "./hooks";
export { useWSEvent, useWSReconnect } from "./hooks";

View file

@ -22,6 +22,7 @@ type EventHandler = (payload: unknown) => void;
interface WSContextValue {
subscribe: (event: WSEventType, handler: EventHandler) => () => void;
onReconnect: (callback: () => void) => () => void;
}
const WSContext = createContext<WSContextValue | null>(null);
@ -63,8 +64,17 @@ export function WSProvider({ children }: { children: ReactNode }) {
[],
);
const onReconnectCb = useCallback(
(callback: () => void) => {
const ws = wsRef.current;
if (!ws) return () => {};
return ws.onReconnect(callback);
},
[],
);
return (
<WSContext.Provider value={{ subscribe }}>
<WSContext.Provider value={{ subscribe, onReconnect: onReconnectCb }}>
{children}
</WSContext.Provider>
);

View file

@ -13,6 +13,7 @@ import type {
MemberAddedPayload,
WorkspaceDeletedPayload,
MemberRemovedPayload,
IssueUpdatedPayload,
} from "@/shared/types";
const logger = createLogger("realtime-sync");
@ -68,12 +69,25 @@ export function useRealtimeSync(ws: WSClient | null) {
};
const unsubAny = ws.onAny((msg) => {
const myUserId = useAuthStore.getState().user?.id;
if (msg.actor_id && msg.actor_id === myUserId) {
logger.debug("skipping self-event", msg.type);
return;
}
const prefix = msg.type.split(":")[0] ?? "";
const refresh = refreshMap[prefix];
if (refresh) debouncedRefresh(prefix, refresh);
});
// --- Side-effect handlers (toast, navigation, self-check) ---
// --- Side-effect handlers (toast, navigation, cross-store sync) ---
// Keep inbox issue_status in sync when issues change
const unsubIssueUpdated = ws.on("issue:updated", (p) => {
const { issue } = p as IssueUpdatedPayload;
if (issue?.id && issue?.status) {
useInboxStore.getState().updateIssueStatus(issue.id, issue.status);
}
});
const unsubWsDeleted = ws.on("workspace:deleted", (p) => {
const { workspace_id } = p as WorkspaceDeletedPayload;
@ -108,6 +122,7 @@ export function useRealtimeSync(ws: WSClient | null) {
return () => {
unsubAny();
unsubIssueUpdated();
unsubWsDeleted();
unsubMemberRemoved();
unsubMemberAdded();

View file

@ -47,6 +47,7 @@ export type WSEventType =
export interface WSMessage<T = unknown> {
type: WSEventType;
payload: T;
actor_id?: string;
}
export interface IssueCreatedPayload {

View file

@ -30,7 +30,7 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) {
if recipientID == "" {
return
}
data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload})
data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload, "actor_id": e.ActorID})
if err != nil {
return
}
@ -87,7 +87,7 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) {
if userID == "" {
return
}
data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload})
data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload, "actor_id": e.ActorID})
if err != nil {
return
}
@ -102,8 +102,9 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) {
}
msg := map[string]any{
"type": e.Type,
"payload": e.Payload,
"type": e.Type,
"payload": e.Payload,
"actor_id": e.ActorID,
}
data, err := json.Marshal(msg)
if err != nil {