diff --git a/apps/web/app/(dashboard)/issues/[id]/page.test.tsx b/apps/web/app/(dashboard)/issues/[id]/page.test.tsx index af2b57fd..32cbbbac 100644 --- a/apps/web/app/(dashboard)/issues/[id]/page.test.tsx +++ b/apps/web/app/(dashboard)/issues/[id]/page.test.tsx @@ -2,7 +2,7 @@ import { Suspense, forwardRef, useRef, useState, useImperativeHandle } from "rea import { describe, it, expect, vi, beforeEach } from "vitest"; import { render, screen, waitFor, act, fireEvent } from "@testing-library/react"; import userEvent from "@testing-library/user-event"; -import type { Issue, Comment } from "@/shared/types"; +import type { Issue, Comment, TimelineEntry } from "@/shared/types"; // Mock next/navigation vi.mock("next/navigation", () => ({ @@ -108,7 +108,7 @@ vi.mock("@/components/markdown", () => ({ // Mock api const mockGetIssue = vi.hoisted(() => vi.fn()); -const mockListComments = vi.hoisted(() => vi.fn()); +const mockListTimeline = vi.hoisted(() => vi.fn()); const mockCreateComment = vi.hoisted(() => vi.fn()); const mockUpdateComment = vi.hoisted(() => vi.fn()); const mockDeleteComment = vi.hoisted(() => vi.fn()); @@ -118,7 +118,8 @@ const mockUpdateIssue = vi.hoisted(() => vi.fn()); vi.mock("@/shared/api", () => ({ api: { getIssue: (...args: any[]) => mockGetIssue(...args), - listComments: (...args: any[]) => mockListComments(...args), + listTimeline: (...args: any[]) => mockListTimeline(...args), + listComments: vi.fn().mockResolvedValue([]), createComment: (...args: any[]) => mockCreateComment(...args), updateComment: (...args: any[]) => mockUpdateComment(...args), deleteComment: (...args: any[]) => mockDeleteComment(...args), @@ -148,26 +149,28 @@ const mockIssue: Issue = { updated_at: "2026-01-20T00:00:00Z", }; -const mockComments: Comment[] = [ +const mockTimeline: TimelineEntry[] = [ { - id: "comment-1", - issue_id: "issue-1", - content: "Started working on this", type: "comment", - author_type: "member", - author_id: "user-1", + id: "comment-1", + actor_type: "member", + actor_id: "user-1", + content: "Started working on this", + parent_id: null, created_at: "2026-01-16T00:00:00Z", updated_at: "2026-01-16T00:00:00Z", + comment_type: "comment", }, { - id: "comment-2", - issue_id: "issue-1", - content: "I can help with this", type: "comment", - author_type: "agent", - author_id: "agent-1", + id: "comment-2", + actor_type: "agent", + actor_id: "agent-1", + content: "I can help with this", + parent_id: null, created_at: "2026-01-17T00:00:00Z", updated_at: "2026-01-17T00:00:00Z", + comment_type: "comment", }, ]; @@ -193,7 +196,7 @@ describe("IssueDetailPage", () => { it("renders issue details after loading", async () => { mockGetIssue.mockResolvedValueOnce(mockIssue); - mockListComments.mockResolvedValueOnce(mockComments); + mockListTimeline.mockResolvedValueOnce(mockTimeline); await renderPage(); await waitFor(() => { @@ -209,7 +212,7 @@ describe("IssueDetailPage", () => { it("renders issue properties sidebar", async () => { mockGetIssue.mockResolvedValueOnce(mockIssue); - mockListComments.mockResolvedValueOnce(mockComments); + mockListTimeline.mockResolvedValueOnce(mockTimeline); await renderPage(); await waitFor(() => { @@ -222,7 +225,7 @@ describe("IssueDetailPage", () => { it("renders comments", async () => { mockGetIssue.mockResolvedValueOnce(mockIssue); - mockListComments.mockResolvedValueOnce(mockComments); + mockListTimeline.mockResolvedValueOnce(mockTimeline); await renderPage(); await waitFor(() => { @@ -232,12 +235,12 @@ describe("IssueDetailPage", () => { }); expect(screen.getByText("I can help with this")).toBeInTheDocument(); - expect(screen.getByText("Activity")).toBeInTheDocument(); + expect(screen.getAllByText("Activity").length).toBeGreaterThanOrEqual(1); }); it("shows 'Issue not found' for missing issue", async () => { mockGetIssue.mockRejectedValueOnce(new Error("Not found")); - mockListComments.mockRejectedValueOnce(new Error("Not found")); + mockListTimeline.mockRejectedValueOnce(new Error("Not found")); await renderPage("nonexistent-id"); await waitFor(() => { @@ -247,7 +250,7 @@ describe("IssueDetailPage", () => { it("submits a new comment", async () => { mockGetIssue.mockResolvedValueOnce(mockIssue); - mockListComments.mockResolvedValueOnce(mockComments); + mockListTimeline.mockResolvedValueOnce(mockTimeline); const newComment: Comment = { id: "comment-3", @@ -256,6 +259,7 @@ describe("IssueDetailPage", () => { type: "comment", author_type: "member", author_id: "user-1", + parent_id: null, created_at: "2026-01-18T00:00:00Z", updated_at: "2026-01-18T00:00:00Z", }; @@ -301,7 +305,7 @@ describe("IssueDetailPage", () => { it("renders breadcrumb navigation", async () => { mockGetIssue.mockResolvedValueOnce(mockIssue); - mockListComments.mockResolvedValueOnce(mockComments); + mockListTimeline.mockResolvedValueOnce(mockTimeline); await renderPage(); await waitFor(() => { diff --git a/apps/web/features/issues/components/issue-detail.tsx b/apps/web/features/issues/components/issue-detail.tsx index 418c1a17..7ca9c12e 100644 --- a/apps/web/features/issues/components/issue-detail.tsx +++ b/apps/web/features/issues/components/issue-detail.tsx @@ -10,7 +10,9 @@ import { Calendar, ChevronLeft, ChevronRight, + Circle, Link2, + MessageSquare, MoreHorizontal, PanelRight, Pencil, @@ -59,7 +61,7 @@ import { Checkbox } from "@/components/ui/checkbox"; import { Command, CommandInput, CommandList, CommandEmpty, CommandGroup, CommandItem } from "@/components/ui/command"; import { Avatar, AvatarFallback, AvatarGroup, AvatarGroupCount } from "@/components/ui/avatar"; import { ActorAvatar } from "@/components/common/actor-avatar"; -import type { Issue, Comment, IssueSubscriber, UpdateIssueRequest, IssueStatus, IssuePriority } from "@/shared/types"; +import type { Issue, Comment, IssueSubscriber, UpdateIssueRequest, IssueStatus, IssuePriority, TimelineEntry } from "@/shared/types"; import { ALL_STATUSES, STATUS_CONFIG, PRIORITY_ORDER, PRIORITY_CONFIG } from "@/features/issues/config"; import { StatusIcon, PriorityIcon, DueDatePicker } from "@/features/issues/components"; import { api } from "@/shared/api"; @@ -67,7 +69,7 @@ import { useAuthStore } from "@/features/auth"; import { useWorkspaceStore, useActorName } from "@/features/workspace"; import { useWSEvent } from "@/features/realtime"; import { useIssueStore } from "@/features/issues"; -import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload, SubscriberAddedPayload, SubscriberRemovedPayload } from "@/shared/types"; +import type { CommentCreatedPayload, CommentUpdatedPayload, CommentDeletedPayload, SubscriberAddedPayload, SubscriberRemovedPayload, ActivityCreatedPayload } from "@/shared/types"; // --------------------------------------------------------------------------- // Helpers @@ -92,6 +94,40 @@ function shortDate(date: string | null): string { }); } +function formatActivity(entry: TimelineEntry): string { + const details = (entry.details ?? {}) as Record; + switch (entry.action) { + case "created": + return "created this issue"; + case "status_changed": + return `changed status from ${details.from ?? "?"} to ${details.to ?? "?"}`; + case "assignee_changed": + return "changed assignee"; + case "description_updated": + return "updated the description"; + case "task_completed": + return "completed the task"; + case "task_failed": + return "task failed"; + default: + return entry.action ?? ""; + } +} + +function commentToTimelineEntry(c: Comment): TimelineEntry { + return { + type: "comment", + id: c.id, + actor_type: c.author_type, + actor_id: c.author_id, + content: c.content, + parent_id: c.parent_id, + created_at: c.created_at, + updated_at: c.updated_at, + comment_type: c.type, + }; +} + // --------------------------------------------------------------------------- // Property row // --------------------------------------------------------------------------- @@ -147,11 +183,12 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { const sidebarRef = usePanelRef(); const [sidebarOpen, setSidebarOpen] = useState(true); const [issue, setIssue] = useState(null); - const [comments, setComments] = useState([]); + const [timeline, setTimeline] = useState([]); const [subscribers, setSubscribers] = useState([]); const [loading, setLoading] = useState(true); const [commentEmpty, setCommentEmpty] = useState(true); const commentEditorRef = useRef(null); + const replyEditorRef = useRef(null); const [submitting, setSubmitting] = useState(false); const [deleting, setDeleting] = useState(false); const [editingCommentId, setEditingCommentId] = useState(null); @@ -161,6 +198,9 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { const [deleteDialogOpen, setDeleteDialogOpen] = useState(false); const [propertiesOpen, setPropertiesOpen] = useState(true); const [detailsOpen, setDetailsOpen] = useState(true); + const [filter, setFilter] = useState<"all" | "comments" | "activity">("all"); + const [replyingTo, setReplyingTo] = useState(null); + const [replyEmpty, setReplyEmpty] = useState(true); // Watch the global issue store for real-time updates from other users/agents const storeIssue = useIssueStore((s) => s.issues.find((i) => i.id === id)); @@ -173,13 +213,13 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { useEffect(() => { setIssue(null); - setComments([]); + setTimeline([]); setSubscribers([]); setLoading(true); - Promise.all([api.getIssue(id), api.listComments(id), api.listIssueSubscribers(id)]) - .then(([iss, cmts, subs]) => { + Promise.all([api.getIssue(id), api.listTimeline(id), api.listIssueSubscribers(id)]) + .then(([iss, entries, subs]) => { setIssue(iss); - setComments(cmts); + setTimeline(entries); setSubscribers(subs); }) .catch(console.error) @@ -190,31 +230,46 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { const content = commentEditorRef.current?.getMarkdown()?.trim(); if (!content || submitting || !user) return; const tempId = "temp-" + Date.now(); - const tempComment: Comment = { - id: tempId, - issue_id: id, - author_type: "member", - author_id: user.id, - content, + const tempEntry: TimelineEntry = { type: "comment", + id: tempId, + actor_type: "member", + actor_id: user.id, + content, + parent_id: null, created_at: new Date().toISOString(), updated_at: new Date().toISOString(), + comment_type: "comment", }; - setComments((prev) => [...prev, tempComment]); + setTimeline((prev) => [...prev, tempEntry]); commentEditorRef.current?.clearContent(); setCommentEmpty(true); setSubmitting(true); try { const comment = await api.createComment(id, content); - setComments((prev) => prev.map((c) => (c.id === tempId ? comment : c))); + setTimeline((prev) => prev.map((e) => (e.id === tempId ? commentToTimelineEntry(comment) : e))); } catch { - setComments((prev) => prev.filter((c) => c.id !== tempId)); + setTimeline((prev) => prev.filter((e) => e.id !== tempId)); toast.error("Failed to send comment"); } finally { setSubmitting(false); } }; + const handleSubmitReply = async (parentId: string) => { + const md = replyEditorRef.current?.getMarkdown()?.trim(); + if (!md || !user) return; + try { + const comment = await api.createComment(id, md, "comment", parentId); + setTimeline((prev) => [...prev, commentToTimelineEntry(comment)]); + replyEditorRef.current?.clearContent(); + setReplyingTo(null); + setReplyEmpty(true); + } catch { + toast.error("Failed to send reply"); + } + }; + const handleUpdateField = useCallback( (updates: Partial) => { if (!issue) return; @@ -241,16 +296,16 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { } }; - const startEditComment = (c: Comment) => { - setEditingCommentId(c.id); - setEditContent(c.content); + const startEditComment = (entry: TimelineEntry) => { + setEditingCommentId(entry.id); + setEditContent(entry.content ?? ""); }; const handleSaveEditComment = async () => { if (!editingCommentId || !editContent.trim()) return; try { const updated = await api.updateComment(editingCommentId, editContent.trim()); - setComments((prev) => prev.map((c) => (c.id === updated.id ? updated : c))); + setTimeline((prev) => prev.map((e) => (e.id === updated.id ? commentToTimelineEntry(updated) : e))); setEditingCommentId(null); } catch { toast.error("Failed to update comment"); @@ -260,7 +315,7 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { const handleDeleteComment = async (commentId: string) => { try { await api.deleteComment(commentId); - setComments((prev) => prev.filter((c) => c.id !== commentId)); + setTimeline((prev) => prev.filter((e) => e.id !== commentId)); } catch { toast.error("Failed to delete comment"); } @@ -302,9 +357,9 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { if (comment.issue_id !== id) return; // Skip own comments — already added locally via API response if (comment.author_type === "member" && comment.author_id === user?.id) return; - setComments((prev) => { - if (prev.some((c) => c.id === comment.id)) return prev; - return [...prev, comment]; + setTimeline((prev) => { + if (prev.some((e) => e.id === comment.id)) return prev; + return [...prev, commentToTimelineEntry(comment)]; }); }, [id, user?.id]), ); @@ -314,7 +369,7 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { useCallback((payload: unknown) => { const { comment } = payload as CommentUpdatedPayload; if (comment.issue_id === id) { - setComments((prev) => prev.map((c) => (c.id === comment.id ? comment : c))); + setTimeline((prev) => prev.map((e) => (e.id === comment.id ? commentToTimelineEntry(comment) : e))); } }, [id]), ); @@ -324,11 +379,25 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { useCallback((payload: unknown) => { const { comment_id, issue_id } = payload as CommentDeletedPayload; if (issue_id === id) { - setComments((prev) => prev.filter((c) => c.id !== comment_id)); + setTimeline((prev) => prev.filter((e) => e.id !== comment_id)); } }, [id]), ); + useWSEvent( + "activity:created", + useCallback((payload: unknown) => { + const p = payload as ActivityCreatedPayload; + if (p.issue_id !== id) return; + const entry = p.entry; + if (!entry || !entry.id) return; + setTimeline((prev) => { + if (prev.some((e) => e.id === entry.id)) return prev; + return [...prev, entry]; + }); + }, [id]), + ); + // Real-time subscriber updates useWSEvent( "subscriber:added", @@ -674,7 +743,14 @@ export function IssueDetail({ issueId, onDelete }: IssueDetailProps) { {/* Activity / Comments */}
-

Activity

+
+

Activity

+
+ + + +
+
+ {/* Timeline entries */}
- {comments.map((comment) => { - const isOwn = comment.author_type === "member" && comment.author_id === user?.id; - return ( -
-
- - - {getActorName(comment.author_type, comment.author_id)} - - - - {timeAgo(comment.created_at)} - - } + {(() => { + // Separate top-level entries from replies + const topLevel = timeline.filter((e) => e.type === "activity" || !e.parent_id); + const repliesByParent = new Map(); + for (const e of timeline) { + if (e.type === "comment" && e.parent_id) { + const list = repliesByParent.get(e.parent_id) ?? []; + list.push(e); + repliesByParent.set(e.parent_id, list); + } + } + + // Apply filter + const filtered = topLevel.filter((e) => { + if (filter === "all") return true; + if (filter === "comments") return e.type === "comment"; + if (filter === "activity") return e.type === "activity"; + return true; + }); + + return filtered.map((entry) => { + if (entry.type === "activity") { + return ( +
+
+ +
+ {getActorName(entry.actor_type, entry.actor_id)} + {formatActivity(entry)} + + + {timeAgo(entry.created_at)} + + } + /> + + {new Date(entry.created_at).toLocaleString()} + + +
+ ); + } + + // Comment entry + const replies = repliesByParent.get(entry.id) ?? []; + const isOwn = entry.actor_type === "member" && entry.actor_id === user?.id; + return ( +
+
+ - - {new Date(comment.created_at).toLocaleString()} - - - {isOwn && ( + + {getActorName(entry.actor_type, entry.actor_id)} + + + + {timeAgo(entry.created_at)} + + } + /> + + {new Date(entry.created_at).toLocaleString()} + +
startEditComment(comment)} + onClick={() => setReplyingTo(replyingTo === entry.id ? null : entry.id)} className="text-muted-foreground hover:text-foreground" > - + } /> - Edit + Reply - - handleDeleteComment(comment.id)} - className="text-muted-foreground hover:text-destructive" - > - - - } + {isOwn && ( + <> + + startEditComment(entry)} + className="text-muted-foreground hover:text-foreground" + > + + + } + /> + Edit + + + handleDeleteComment(entry.id)} + className="text-muted-foreground hover:text-destructive" + > + + + } + /> + Delete + + + )} +
+
+ {editingCommentId === entry.id ? ( +
{ e.preventDefault(); handleSaveEditComment(); }} className="mt-2 pl-9.5"> + setEditContent(e.target.value)} + aria-label="Edit comment" + className="w-full text-sm bg-transparent border-b outline-none" + onKeyDown={(e) => { if (e.key === "Escape") setEditingCommentId(null); }} + /> +
+ ) : ( +
+ {entry.content ?? ""} +
+ )} + + {/* Replies */} + {replies.length > 0 && ( +
+ {replies.map((reply) => { + const isReplyOwn = reply.actor_type === "member" && reply.actor_id === user?.id; + return ( +
+
+ + + {getActorName(reply.actor_type, reply.actor_id)} + + + {timeAgo(reply.created_at)} + + {isReplyOwn && ( +
+ + startEditComment(reply)} + className="text-muted-foreground hover:text-foreground" + > + + + } + /> + Edit + + + handleDeleteComment(reply.id)} + className="text-muted-foreground hover:text-destructive" + > + + + } + /> + Delete + +
+ )} +
+ {editingCommentId === reply.id ? ( +
{ e.preventDefault(); handleSaveEditComment(); }} className="mt-1 pl-7.5"> + setEditContent(e.target.value)} + aria-label="Edit comment" + className="w-full text-sm bg-transparent border-b outline-none" + onKeyDown={(e) => { if (e.key === "Escape") setEditingCommentId(null); }} + /> +
+ ) : ( +
+ {reply.content ?? ""} +
+ )} +
+ ); + })} +
+ )} + + {/* Reply input */} + {replyingTo === entry.id && ( +
+
+ setReplyEmpty(!md.trim())} + onSubmit={() => handleSubmitReply(entry.id)} + debounceMs={100} /> - Delete - +
+
+ + +
)}
- {editingCommentId === comment.id ? ( -
{ e.preventDefault(); handleSaveEditComment(); }} className="mt-2 pl-9.5"> - setEditContent(e.target.value)} - aria-label="Edit comment" - className="w-full text-sm bg-transparent border-b outline-none" - onKeyDown={(e) => { if (e.key === "Escape") setEditingCommentId(null); }} - /> -
- ) : ( -
- {comment.content} -
- )} -
- ); - })} + ); + }); + })()}
{/* Comment input */} diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index b9b34dc4..80b915fa 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -30,6 +30,7 @@ import type { CreatePersonalAccessTokenResponse, RuntimeUsage, RuntimePing, + TimelineEntry, } from "@/shared/types"; import { type Logger, noopLogger } from "@/shared/logger"; @@ -183,13 +184,21 @@ export class ApiClient { return this.fetch(`/api/issues/${issueId}/comments`); } - async createComment(issueId: string, content: string, type?: string): Promise { + async createComment(issueId: string, content: string, type?: string, parentId?: string): Promise { return this.fetch(`/api/issues/${issueId}/comments`, { method: "POST", - body: JSON.stringify({ content, type: type ?? "comment" }), + body: JSON.stringify({ + content, + type: type ?? "comment", + ...(parentId ? { parent_id: parentId } : {}), + }), }); } + async listTimeline(issueId: string): Promise { + return this.fetch(`/api/issues/${issueId}/timeline`); + } + async updateComment(commentId: string, content: string): Promise { return this.fetch(`/api/comments/${commentId}`, { method: "PUT", diff --git a/apps/web/shared/types/activity.ts b/apps/web/shared/types/activity.ts new file mode 100644 index 00000000..335fce8a --- /dev/null +++ b/apps/web/shared/types/activity.ts @@ -0,0 +1,15 @@ +export interface TimelineEntry { + type: "activity" | "comment"; + id: string; + actor_type: string; + actor_id: string; + created_at: string; + // Activity fields + action?: string; + details?: Record; + // Comment fields + content?: string; + parent_id?: string | null; + updated_at?: string; + comment_type?: string; +} diff --git a/apps/web/shared/types/comment.ts b/apps/web/shared/types/comment.ts index 8f5944fd..8ce17d77 100644 --- a/apps/web/shared/types/comment.ts +++ b/apps/web/shared/types/comment.ts @@ -9,6 +9,7 @@ export interface Comment { author_id: string; content: string; type: CommentType; + parent_id: string | null; created_at: string; updated_at: string; } diff --git a/apps/web/shared/types/events.ts b/apps/web/shared/types/events.ts index a0626e2d..954eac26 100644 --- a/apps/web/shared/types/events.ts +++ b/apps/web/shared/types/events.ts @@ -2,6 +2,7 @@ import type { Issue } from "./issue"; import type { Agent } from "./agent"; import type { InboxItem } from "./inbox"; import type { Comment } from "./comment"; +import type { TimelineEntry } from "./activity"; import type { Workspace, MemberWithUser } from "./workspace"; // WebSocket event types (matching Go server protocol/events.go) @@ -35,7 +36,8 @@ export type WSEventType = | "skill:updated" | "skill:deleted" | "subscriber:added" - | "subscriber:removed"; + | "subscriber:removed" + | "activity:created"; export interface WSMessage { type: WSEventType; @@ -139,3 +141,8 @@ export interface SubscriberRemovedPayload { user_type: string; user_id: string; } + +export interface ActivityCreatedPayload { + issue_id: string; + entry: TimelineEntry; +} diff --git a/apps/web/shared/types/index.ts b/apps/web/shared/types/index.ts index 61e97fb4..9ea24796 100644 --- a/apps/web/shared/types/index.ts +++ b/apps/web/shared/types/index.ts @@ -24,6 +24,7 @@ export type { export type { Workspace, WorkspaceRepo, Member, MemberRole, User, MemberWithUser } from "./workspace"; export type { InboxItem, InboxSeverity, InboxItemType } from "./inbox"; export type { Comment, CommentType, CommentAuthorType } from "./comment"; +export type { TimelineEntry } from "./activity"; export type { IssueSubscriber } from "./subscriber"; export type { DaemonPairingSession, DaemonPairingSessionStatus, ApproveDaemonPairingSessionRequest } from "./daemon"; export type * from "./events"; diff --git a/server/cmd/server/activity_listeners.go b/server/cmd/server/activity_listeners.go new file mode 100644 index 00000000..8a05da40 --- /dev/null +++ b/server/cmd/server/activity_listeners.go @@ -0,0 +1,207 @@ +package main + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// registerActivityListeners wires up event bus listeners that record activity +// entries in the activity_log table. Each listener creates one or more activity +// records depending on what changed, then publishes an activity:created event +// for WS broadcasting. +func registerActivityListeners(bus *events.Bus, queries *db.Queries) { + ctx := context.Background() + + // issue:created — record "created" activity + bus.Subscribe(protocol.EventIssueCreated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + + activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(issue.WorkspaceID), + IssueID: parseUUID(issue.ID), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + Action: "created", + Details: []byte("{}"), + }) + if err != nil { + slog.Error("activity: failed to record issue created", + "issue_id", issue.ID, "error", err) + return + } + + publishActivityEvent(bus, e, activity) + }) + + // issue:updated — record specific changes as separate activities + bus.Subscribe(protocol.EventIssueUpdated, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + issue, ok := payload["issue"].(handler.IssueResponse) + if !ok { + return + } + + statusChanged, _ := payload["status_changed"].(bool) + assigneeChanged, _ := payload["assignee_changed"].(bool) + descriptionChanged, _ := payload["description_changed"].(bool) + + if statusChanged { + prevStatus, _ := payload["prev_status"].(string) + details, _ := json.Marshal(map[string]string{ + "from": prevStatus, + "to": issue.Status, + }) + activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(issue.WorkspaceID), + IssueID: parseUUID(issue.ID), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + Action: "status_changed", + Details: details, + }) + if err != nil { + slog.Error("activity: failed to record status change", + "issue_id", issue.ID, "error", err) + } else { + publishActivityEvent(bus, e, activity) + } + } + + if assigneeChanged { + prevAssigneeType, _ := payload["prev_assignee_type"].(*string) + prevAssigneeID, _ := payload["prev_assignee_id"].(*string) + + detailsMap := map[string]string{} + if prevAssigneeType != nil { + detailsMap["from_type"] = *prevAssigneeType + } + if prevAssigneeID != nil { + detailsMap["from_id"] = *prevAssigneeID + } + if issue.AssigneeType != nil { + detailsMap["to_type"] = *issue.AssigneeType + } + if issue.AssigneeID != nil { + detailsMap["to_id"] = *issue.AssigneeID + } + + details, _ := json.Marshal(detailsMap) + activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(issue.WorkspaceID), + IssueID: parseUUID(issue.ID), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + Action: "assignee_changed", + Details: details, + }) + if err != nil { + slog.Error("activity: failed to record assignee change", + "issue_id", issue.ID, "error", err) + } else { + publishActivityEvent(bus, e, activity) + } + } + + if descriptionChanged { + activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(issue.WorkspaceID), + IssueID: parseUUID(issue.ID), + ActorType: util.StrToText(e.ActorType), + ActorID: parseUUID(e.ActorID), + Action: "description_updated", + Details: []byte("{}"), + }) + if err != nil { + slog.Error("activity: failed to record description change", + "issue_id", issue.ID, "error", err) + } else { + publishActivityEvent(bus, e, activity) + } + } + }) + + // task:completed — record "task_completed" activity + bus.Subscribe(protocol.EventTaskCompleted, func(e events.Event) { + handleTaskActivity(ctx, bus, queries, e, "task_completed") + }) + + // task:failed — record "task_failed" activity + bus.Subscribe(protocol.EventTaskFailed, func(e events.Event) { + handleTaskActivity(ctx, bus, queries, e, "task_failed") + }) +} + +// handleTaskActivity records an activity for task:completed or task:failed events. +func handleTaskActivity(ctx context.Context, bus *events.Bus, queries *db.Queries, e events.Event, action string) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + agentID, _ := payload["agent_id"].(string) + issueID, _ := payload["issue_id"].(string) + if issueID == "" { + return + } + + // Look up issue to get workspace_id + issue, err := queries.GetIssue(ctx, parseUUID(issueID)) + if err != nil { + slog.Error("activity: failed to get issue for task event", + "issue_id", issueID, "action", action, "error", err) + return + } + + activity, err := queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: issue.WorkspaceID, + IssueID: parseUUID(issueID), + ActorType: util.StrToText("agent"), + ActorID: parseUUID(agentID), + Action: action, + Details: []byte("{}"), + }) + if err != nil { + slog.Error("activity: failed to record task activity", + "issue_id", issueID, "action", action, "error", err) + return + } + + publishActivityEvent(bus, e, activity) +} + +// publishActivityEvent sends an activity:created event for WS broadcasting. +func publishActivityEvent(bus *events.Bus, original events.Event, activity db.ActivityLog) { + bus.Publish(events.Event{ + Type: protocol.EventActivityCreated, + WorkspaceID: original.WorkspaceID, + ActorType: original.ActorType, + ActorID: original.ActorID, + Payload: map[string]any{ + "activity": map[string]any{ + "id": util.UUIDToString(activity.ID), + "issue_id": util.UUIDToString(activity.IssueID), + "actor_type": util.TextToPtr(activity.ActorType), + "actor_id": util.UUIDToString(activity.ActorID), + "action": activity.Action, + "details": json.RawMessage(activity.Details), + "created_at": util.TimestampToString(activity.CreatedAt), + }, + }, + }) +} diff --git a/server/cmd/server/activity_listeners_test.go b/server/cmd/server/activity_listeners_test.go new file mode 100644 index 00000000..3935793a --- /dev/null +++ b/server/cmd/server/activity_listeners_test.go @@ -0,0 +1,295 @@ +package main + +import ( + "context" + "encoding/json" + "testing" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// listActivitiesForIssue is a test helper that fetches all activity_log records for an issue. +func listActivitiesForIssue(t *testing.T, queries *db.Queries, issueID string) []db.ActivityLog { + t.Helper() + activities, err := queries.ListActivities(context.Background(), db.ListActivitiesParams{ + IssueID: util.ParseUUID(issueID), + Limit: 100, + Offset: 0, + }) + if err != nil { + t.Fatalf("ListActivities: %v", err) + } + return activities +} + +func cleanupActivities(t *testing.T, issueID string) { + t.Helper() + testPool.Exec(context.Background(), `DELETE FROM activity_log WHERE issue_id = $1`, issueID) +} + +func TestActivityIssueCreated(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + bus.Publish(events.Event{ + Type: protocol.EventIssueCreated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "activity test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 1 { + t.Fatalf("expected 1 activity, got %d", len(activities)) + } + if activities[0].Action != "created" { + t.Fatalf("expected action 'created', got %q", activities[0].Action) + } + if util.UUIDToString(activities[0].ActorID) != testUserID { + t.Fatalf("expected actor_id %s, got %s", testUserID, util.UUIDToString(activities[0].ActorID)) + } +} + +func TestActivityIssueUpdated_StatusChanged(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "activity test issue", + Status: "in_progress", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + "status_changed": true, + "prev_status": "todo", + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 1 { + t.Fatalf("expected 1 activity, got %d", len(activities)) + } + if activities[0].Action != "status_changed" { + t.Fatalf("expected action 'status_changed', got %q", activities[0].Action) + } + + var details map[string]string + if err := json.Unmarshal(activities[0].Details, &details); err != nil { + t.Fatalf("failed to unmarshal details: %v", err) + } + if details["from"] != "todo" { + t.Fatalf("expected from 'todo', got %q", details["from"]) + } + if details["to"] != "in_progress" { + t.Fatalf("expected to 'in_progress', got %q", details["to"]) + } +} + +func TestActivityIssueUpdated_AssigneeChanged(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + assigneeEmail := "activity-assignee-test@multica.ai" + assigneeID := createTestUser(t, assigneeEmail) + t.Cleanup(func() { cleanupTestUser(t, assigneeEmail) }) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + assigneeType := "member" + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "activity test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + AssigneeType: &assigneeType, + AssigneeID: &assigneeID, + }, + "assignee_changed": true, + "prev_assignee_type": (*string)(nil), + "prev_assignee_id": (*string)(nil), + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 1 { + t.Fatalf("expected 1 activity, got %d", len(activities)) + } + if activities[0].Action != "assignee_changed" { + t.Fatalf("expected action 'assignee_changed', got %q", activities[0].Action) + } + + var details map[string]string + if err := json.Unmarshal(activities[0].Details, &details); err != nil { + t.Fatalf("failed to unmarshal details: %v", err) + } + if details["to_type"] != "member" { + t.Fatalf("expected to_type 'member', got %q", details["to_type"]) + } + if details["to_id"] != assigneeID { + t.Fatalf("expected to_id %q, got %q", assigneeID, details["to_id"]) + } +} + +func TestActivityIssueUpdated_NoChangeFlags(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + // Publish issue:updated with no change flags set + bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: testWorkspaceID, + ActorType: "member", + ActorID: testUserID, + Payload: map[string]any{ + "issue": handler.IssueResponse{ + ID: issueID, + WorkspaceID: testWorkspaceID, + Title: "activity test issue", + Status: "todo", + Priority: "medium", + CreatorType: "member", + CreatorID: testUserID, + }, + "assignee_changed": false, + "status_changed": false, + "description_changed": false, + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 0 { + t.Fatalf("expected 0 activities when no change flags, got %d", len(activities)) + } +} + +func TestActivityTaskCompleted(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + agentID := testUserID // reuse as a stand-in for agent ID + + bus.Publish(events.Event{ + Type: protocol.EventTaskCompleted, + WorkspaceID: testWorkspaceID, + ActorType: "system", + ActorID: "", + Payload: map[string]any{ + "task_id": "00000000-0000-0000-0000-000000000001", + "agent_id": agentID, + "issue_id": issueID, + "status": "completed", + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 1 { + t.Fatalf("expected 1 activity, got %d", len(activities)) + } + if activities[0].Action != "task_completed" { + t.Fatalf("expected action 'task_completed', got %q", activities[0].Action) + } + if util.UUIDToString(activities[0].ActorID) != agentID { + t.Fatalf("expected actor_id %s, got %s", agentID, util.UUIDToString(activities[0].ActorID)) + } +} + +func TestActivityTaskFailed(t *testing.T) { + queries := db.New(testPool) + bus := events.New() + registerActivityListeners(bus, queries) + + issueID := createTestIssue(t, testWorkspaceID, testUserID) + t.Cleanup(func() { + cleanupActivities(t, issueID) + cleanupTestIssue(t, issueID) + }) + + agentID := testUserID + + bus.Publish(events.Event{ + Type: protocol.EventTaskFailed, + WorkspaceID: testWorkspaceID, + ActorType: "system", + ActorID: "", + Payload: map[string]any{ + "task_id": "00000000-0000-0000-0000-000000000002", + "agent_id": agentID, + "issue_id": issueID, + "status": "failed", + }, + }) + + activities := listActivitiesForIssue(t, queries, issueID) + if len(activities) != 1 { + t.Fatalf("expected 1 activity, got %d", len(activities)) + } + if activities[0].Action != "task_failed" { + t.Fatalf("expected action 'task_failed', got %q", activities[0].Action) + } +} diff --git a/server/cmd/server/listeners.go b/server/cmd/server/listeners.go index cddc60b5..9aeb0ca2 100644 --- a/server/cmd/server/listeners.go +++ b/server/cmd/server/listeners.go @@ -37,6 +37,7 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) { protocol.EventMemberRemoved, protocol.EventSubscriberAdded, protocol.EventSubscriberRemoved, + protocol.EventActivityCreated, } for _, et := range allEvents { diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 18e69a58..7f5bf1f2 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -55,6 +55,7 @@ func main() { // The notification listener queries the subscriber table to determine recipients, // so subscribers must be written first within the same synchronous event dispatch. registerSubscriberListeners(bus, queries) + registerActivityListeners(bus, queries) registerNotificationListeners(bus, queries) r := NewRouter(pool, hub, bus) diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index d5401a9c..42350116 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -117,6 +117,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Delete("/", h.DeleteIssue) r.Post("/comments", h.CreateComment) r.Get("/comments", h.ListComments) + r.Get("/timeline", h.ListTimeline) r.Get("/subscribers", h.ListIssueSubscribers) r.Post("/subscribe", h.SubscribeToIssue) r.Post("/unsubscribe", h.UnsubscribeFromIssue) diff --git a/server/internal/handler/activity.go b/server/internal/handler/activity.go new file mode 100644 index 00000000..b7648834 --- /dev/null +++ b/server/internal/handler/activity.go @@ -0,0 +1,100 @@ +package handler + +import ( + "encoding/json" + "net/http" + "sort" + + "github.com/go-chi/chi/v5" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// TimelineEntry represents a single entry in the issue timeline, which can be +// either an activity log record or a comment. +type TimelineEntry struct { + Type string `json:"type"` // "activity" or "comment" + ID string `json:"id"` + + ActorType string `json:"actor_type"` + ActorID string `json:"actor_id"` + CreatedAt string `json:"created_at"` + + // Activity-only fields + Action *string `json:"action,omitempty"` + Details json.RawMessage `json:"details,omitempty"` + + // Comment-only fields + Content *string `json:"content,omitempty"` + ParentID *string `json:"parent_id,omitempty"` + UpdatedAt *string `json:"updated_at,omitempty"` + CommentType *string `json:"comment_type,omitempty"` +} + +// ListTimeline returns a merged, chronologically-sorted timeline of activities +// and comments for a given issue. +func (h *Handler) ListTimeline(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + issue, ok := h.loadIssueForUser(w, r, id) + if !ok { + return + } + + activities, err := h.Queries.ListActivities(r.Context(), db.ListActivitiesParams{ + IssueID: issue.ID, + Limit: 200, + Offset: 0, + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list activities") + return + } + + comments, err := h.Queries.ListComments(r.Context(), issue.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list comments") + return + } + + timeline := make([]TimelineEntry, 0, len(activities)+len(comments)) + + for _, a := range activities { + action := a.Action + actorType := "" + if a.ActorType.Valid { + actorType = a.ActorType.String + } + timeline = append(timeline, TimelineEntry{ + Type: "activity", + ID: uuidToString(a.ID), + ActorType: actorType, + ActorID: uuidToString(a.ActorID), + Action: &action, + Details: a.Details, + CreatedAt: timestampToString(a.CreatedAt), + }) + } + + for _, c := range comments { + content := c.Content + commentType := c.Type + updatedAt := timestampToString(c.UpdatedAt) + timeline = append(timeline, TimelineEntry{ + Type: "comment", + ID: uuidToString(c.ID), + ActorType: c.AuthorType, + ActorID: uuidToString(c.AuthorID), + Content: &content, + CommentType: &commentType, + ParentID: uuidToPtr(c.ParentID), + CreatedAt: timestampToString(c.CreatedAt), + UpdatedAt: &updatedAt, + }) + } + + // Sort chronologically (ascending by created_at) + sort.Slice(timeline, func(i, j int) bool { + return timeline[i].CreatedAt < timeline[j].CreatedAt + }) + + writeJSON(w, http.StatusOK, timeline) +} diff --git a/server/internal/handler/activity_test.go b/server/internal/handler/activity_test.go new file mode 100644 index 00000000..f2091181 --- /dev/null +++ b/server/internal/handler/activity_test.go @@ -0,0 +1,275 @@ +package handler + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +func TestListTimeline_MergedAndSorted(t *testing.T) { + ctx := context.Background() + + // Create an issue + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{ + "title": "Timeline test issue", + "status": "todo", + }) + testHandler.CreateIssue(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("CreateIssue: expected 201, got %d: %s", w.Code, w.Body.String()) + } + var issue IssueResponse + json.NewDecoder(w.Body).Decode(&issue) + issueID := issue.ID + + t.Cleanup(func() { + testPool.Exec(ctx, `DELETE FROM activity_log WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM comment WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) + }) + + // Create an activity record directly in DB + _, err := testHandler.Queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(testWorkspaceID), + IssueID: parseUUID(issueID), + ActorType: strToText("member"), + ActorID: parseUUID(testUserID), + Action: "created", + Details: []byte("{}"), + }) + if err != nil { + t.Fatalf("CreateActivity: %v", err) + } + + // Create a comment + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/comments", map[string]any{ + "content": "Timeline test comment", + }) + req = withURLParam(req, "id", issueID) + testHandler.CreateComment(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("CreateComment: expected 201, got %d: %s", w.Code, w.Body.String()) + } + + // Fetch timeline + w = httptest.NewRecorder() + req = newRequest("GET", "/api/issues/"+issueID+"/timeline", nil) + req = withURLParam(req, "id", issueID) + testHandler.ListTimeline(w, req) + if w.Code != http.StatusOK { + t.Fatalf("ListTimeline: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var timeline []TimelineEntry + json.NewDecoder(w.Body).Decode(&timeline) + if len(timeline) != 2 { + t.Fatalf("expected 2 timeline entries, got %d", len(timeline)) + } + + // First entry should be the activity (created earlier) + if timeline[0].Type != "activity" { + t.Fatalf("expected first entry type 'activity', got %q", timeline[0].Type) + } + if *timeline[0].Action != "created" { + t.Fatalf("expected action 'created', got %q", *timeline[0].Action) + } + + // Second entry should be the comment + if timeline[1].Type != "comment" { + t.Fatalf("expected second entry type 'comment', got %q", timeline[1].Type) + } + if *timeline[1].Content != "Timeline test comment" { + t.Fatalf("expected comment content 'Timeline test comment', got %q", *timeline[1].Content) + } +} + +func TestListTimeline_ChronologicalOrder(t *testing.T) { + ctx := context.Background() + + // Create an issue + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{ + "title": "Timeline order test issue", + "status": "todo", + }) + testHandler.CreateIssue(w, req) + var issue IssueResponse + json.NewDecoder(w.Body).Decode(&issue) + issueID := issue.ID + + t.Cleanup(func() { + testPool.Exec(ctx, `DELETE FROM activity_log WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM comment WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) + }) + + // Create comment first + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/comments", map[string]any{ + "content": "First comment", + }) + req = withURLParam(req, "id", issueID) + testHandler.CreateComment(w, req) + + // Then create an activity after the comment + _, err := testHandler.Queries.CreateActivity(ctx, db.CreateActivityParams{ + WorkspaceID: parseUUID(testWorkspaceID), + IssueID: parseUUID(issueID), + ActorType: strToText("member"), + ActorID: parseUUID(testUserID), + Action: "status_changed", + Details: []byte(`{"from":"todo","to":"in_progress"}`), + }) + if err != nil { + t.Fatalf("CreateActivity: %v", err) + } + + // Fetch timeline + w = httptest.NewRecorder() + req = newRequest("GET", "/api/issues/"+issueID+"/timeline", nil) + req = withURLParam(req, "id", issueID) + testHandler.ListTimeline(w, req) + + var timeline []TimelineEntry + json.NewDecoder(w.Body).Decode(&timeline) + if len(timeline) != 2 { + t.Fatalf("expected 2 entries, got %d", len(timeline)) + } + + // Entries should be in chronological order + if timeline[0].CreatedAt > timeline[1].CreatedAt { + t.Fatalf("timeline not in chronological order: %s > %s", timeline[0].CreatedAt, timeline[1].CreatedAt) + } +} + +func TestCreateComment_WithParentID(t *testing.T) { + ctx := context.Background() + + // Create an issue + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{ + "title": "Reply test issue", + }) + testHandler.CreateIssue(w, req) + var issue IssueResponse + json.NewDecoder(w.Body).Decode(&issue) + issueID := issue.ID + + t.Cleanup(func() { + testPool.Exec(ctx, `DELETE FROM comment WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) + }) + + // Create parent comment + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/comments", map[string]any{ + "content": "Parent comment", + }) + req = withURLParam(req, "id", issueID) + testHandler.CreateComment(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("CreateComment (parent): expected 201, got %d: %s", w.Code, w.Body.String()) + } + var parentComment CommentResponse + json.NewDecoder(w.Body).Decode(&parentComment) + + // Create reply with parent_id + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/comments", map[string]any{ + "content": "Reply to parent", + "parent_id": parentComment.ID, + }) + req = withURLParam(req, "id", issueID) + testHandler.CreateComment(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("CreateComment (reply): expected 201, got %d: %s", w.Code, w.Body.String()) + } + var replyComment CommentResponse + json.NewDecoder(w.Body).Decode(&replyComment) + + if replyComment.ParentID == nil { + t.Fatal("expected reply to have parent_id set") + } + if *replyComment.ParentID != parentComment.ID { + t.Fatalf("expected parent_id %q, got %q", parentComment.ID, *replyComment.ParentID) + } + + // Verify parent comment has no parent_id + if parentComment.ParentID != nil { + t.Fatalf("expected parent comment to have nil parent_id, got %q", *parentComment.ParentID) + } +} + +func TestCommentWithParentID_AppearsInTimeline(t *testing.T) { + ctx := context.Background() + + // Create an issue + w := httptest.NewRecorder() + req := newRequest("POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{ + "title": "Timeline reply test", + }) + testHandler.CreateIssue(w, req) + var issue IssueResponse + json.NewDecoder(w.Body).Decode(&issue) + issueID := issue.ID + + t.Cleanup(func() { + testPool.Exec(ctx, `DELETE FROM activity_log WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM comment WHERE issue_id = $1`, issueID) + testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID) + }) + + // Create parent comment + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/comments", map[string]any{ + "content": "Parent in timeline", + }) + req = withURLParam(req, "id", issueID) + testHandler.CreateComment(w, req) + var parent CommentResponse + json.NewDecoder(w.Body).Decode(&parent) + + // Create reply + w = httptest.NewRecorder() + req = newRequest("POST", "/api/issues/"+issueID+"/comments", map[string]any{ + "content": "Reply in timeline", + "parent_id": parent.ID, + }) + req = withURLParam(req, "id", issueID) + testHandler.CreateComment(w, req) + + // Fetch timeline + w = httptest.NewRecorder() + req = newRequest("GET", "/api/issues/"+issueID+"/timeline", nil) + req = withURLParam(req, "id", issueID) + testHandler.ListTimeline(w, req) + if w.Code != http.StatusOK { + t.Fatalf("ListTimeline: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var timeline []TimelineEntry + json.NewDecoder(w.Body).Decode(&timeline) + if len(timeline) != 2 { + t.Fatalf("expected 2 timeline entries, got %d", len(timeline)) + } + + // Find the reply entry + var found bool + for _, entry := range timeline { + if entry.Type == "comment" && entry.ParentID != nil && *entry.ParentID == parent.ID { + found = true + if *entry.Content != "Reply in timeline" { + t.Fatalf("expected reply content 'Reply in timeline', got %q", *entry.Content) + } + } + } + if !found { + t.Fatal("expected to find reply with parent_id in timeline") + } +} diff --git a/server/internal/handler/comment.go b/server/internal/handler/comment.go index ea40c200..99d45113 100644 --- a/server/internal/handler/comment.go +++ b/server/internal/handler/comment.go @@ -6,20 +6,22 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" "github.com/multica-ai/multica/server/internal/logger" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" ) type CommentResponse struct { - ID string `json:"id"` - IssueID string `json:"issue_id"` - AuthorType string `json:"author_type"` - AuthorID string `json:"author_id"` - Content string `json:"content"` - Type string `json:"type"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` + ID string `json:"id"` + IssueID string `json:"issue_id"` + AuthorType string `json:"author_type"` + AuthorID string `json:"author_id"` + Content string `json:"content"` + Type string `json:"type"` + ParentID *string `json:"parent_id"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` } func commentToResponse(c db.Comment) CommentResponse { @@ -30,6 +32,7 @@ func commentToResponse(c db.Comment) CommentResponse { AuthorID: uuidToString(c.AuthorID), Content: c.Content, Type: c.Type, + ParentID: uuidToPtr(c.ParentID), CreatedAt: timestampToString(c.CreatedAt), UpdatedAt: timestampToString(c.UpdatedAt), } @@ -57,8 +60,9 @@ func (h *Handler) ListComments(w http.ResponseWriter, r *http.Request) { } type CreateCommentRequest struct { - Content string `json:"content"` - Type string `json:"type"` + Content string `json:"content"` + Type string `json:"type"` + ParentID *string `json:"parent_id"` } func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) { @@ -87,12 +91,18 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) { req.Type = "comment" } + var parentID pgtype.UUID + if req.ParentID != nil { + parentID = parseUUID(*req.ParentID) + } + comment, err := h.Queries.CreateComment(r.Context(), db.CreateCommentParams{ IssueID: issue.ID, AuthorType: "member", AuthorID: parseUUID(userID), Content: req.Content, Type: req.Type, + ParentID: parentID, }) if err != nil { slog.Warn("create comment failed", append(logger.RequestAttrs(r), "error", err, "issue_id", issueID)...) diff --git a/server/migrations/017_comment_parent_id.down.sql b/server/migrations/017_comment_parent_id.down.sql new file mode 100644 index 00000000..3f63f1a9 --- /dev/null +++ b/server/migrations/017_comment_parent_id.down.sql @@ -0,0 +1 @@ +ALTER TABLE comment DROP COLUMN parent_id; diff --git a/server/migrations/017_comment_parent_id.up.sql b/server/migrations/017_comment_parent_id.up.sql new file mode 100644 index 00000000..c9b1e87c --- /dev/null +++ b/server/migrations/017_comment_parent_id.up.sql @@ -0,0 +1 @@ +ALTER TABLE comment ADD COLUMN parent_id UUID REFERENCES comment(id) ON DELETE SET NULL; diff --git a/server/pkg/db/generated/comment.sql.go b/server/pkg/db/generated/comment.sql.go index 9675f23d..0217aee3 100644 --- a/server/pkg/db/generated/comment.sql.go +++ b/server/pkg/db/generated/comment.sql.go @@ -12,9 +12,9 @@ import ( ) const createComment = `-- name: CreateComment :one -INSERT INTO comment (issue_id, author_type, author_id, content, type) -VALUES ($1, $2, $3, $4, $5) -RETURNING id, issue_id, author_type, author_id, content, type, created_at, updated_at +INSERT INTO comment (issue_id, author_type, author_id, content, type, parent_id) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id, issue_id, author_type, author_id, content, type, created_at, updated_at, parent_id ` type CreateCommentParams struct { @@ -23,6 +23,7 @@ type CreateCommentParams struct { AuthorID pgtype.UUID `json:"author_id"` Content string `json:"content"` Type string `json:"type"` + ParentID pgtype.UUID `json:"parent_id"` } func (q *Queries) CreateComment(ctx context.Context, arg CreateCommentParams) (Comment, error) { @@ -32,6 +33,7 @@ func (q *Queries) CreateComment(ctx context.Context, arg CreateCommentParams) (C arg.AuthorID, arg.Content, arg.Type, + arg.ParentID, ) var i Comment err := row.Scan( @@ -43,6 +45,7 @@ func (q *Queries) CreateComment(ctx context.Context, arg CreateCommentParams) (C &i.Type, &i.CreatedAt, &i.UpdatedAt, + &i.ParentID, ) return i, err } @@ -57,7 +60,7 @@ func (q *Queries) DeleteComment(ctx context.Context, id pgtype.UUID) error { } const getComment = `-- name: GetComment :one -SELECT id, issue_id, author_type, author_id, content, type, created_at, updated_at FROM comment +SELECT id, issue_id, author_type, author_id, content, type, created_at, updated_at, parent_id FROM comment WHERE id = $1 ` @@ -73,12 +76,13 @@ func (q *Queries) GetComment(ctx context.Context, id pgtype.UUID) (Comment, erro &i.Type, &i.CreatedAt, &i.UpdatedAt, + &i.ParentID, ) return i, err } const listComments = `-- name: ListComments :many -SELECT id, issue_id, author_type, author_id, content, type, created_at, updated_at FROM comment +SELECT id, issue_id, author_type, author_id, content, type, created_at, updated_at, parent_id FROM comment WHERE issue_id = $1 ORDER BY created_at ASC ` @@ -101,6 +105,7 @@ func (q *Queries) ListComments(ctx context.Context, issueID pgtype.UUID) ([]Comm &i.Type, &i.CreatedAt, &i.UpdatedAt, + &i.ParentID, ); err != nil { return nil, err } @@ -117,7 +122,7 @@ UPDATE comment SET content = $2, updated_at = now() WHERE id = $1 -RETURNING id, issue_id, author_type, author_id, content, type, created_at, updated_at +RETURNING id, issue_id, author_type, author_id, content, type, created_at, updated_at, parent_id ` type UpdateCommentParams struct { @@ -137,6 +142,7 @@ func (q *Queries) UpdateComment(ctx context.Context, arg UpdateCommentParams) (C &i.Type, &i.CreatedAt, &i.UpdatedAt, + &i.ParentID, ) return i, err } diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index 14fd5d05..5e5d9758 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -84,6 +84,7 @@ type Comment struct { Type string `json:"type"` CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` + ParentID pgtype.UUID `json:"parent_id"` } type DaemonConnection struct { diff --git a/server/pkg/db/queries/comment.sql b/server/pkg/db/queries/comment.sql index 7666de4f..4648ad02 100644 --- a/server/pkg/db/queries/comment.sql +++ b/server/pkg/db/queries/comment.sql @@ -8,8 +8,8 @@ SELECT * FROM comment WHERE id = $1; -- name: CreateComment :one -INSERT INTO comment (issue_id, author_type, author_id, content, type) -VALUES ($1, $2, $3, $4, $5) +INSERT INTO comment (issue_id, author_type, author_id, content, type, parent_id) +VALUES ($1, $2, $3, $4, $5, sqlc.narg(parent_id)) RETURNING *; -- name: UpdateComment :one diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index 137151d7..6029bf2a 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -43,6 +43,9 @@ const ( EventSubscriberAdded = "subscriber:added" EventSubscriberRemoved = "subscriber:removed" + // Activity events + EventActivityCreated = "activity:created" + // Daemon events EventDaemonHeartbeat = "daemon:heartbeat" EventDaemonRegister = "daemon:register"