From 9127e543d5c2a0cdd249b5c508b6df4a2d08ab67 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Wed, 25 Mar 2026 10:08:27 +0800 Subject: [PATCH] feat: add event bus, WS workspace isolation, and global store migration - Add internal event bus (server/internal/events/) with synchronous pub/sub and panic isolation per listener - Upgrade WebSocket Hub to workspace-scoped rooms with JWT auth and membership verification on connect - Add 10 new WS event types (comment CRUD, inbox read/archive, agent create/delete, workspace/member events) - Refactor all handlers and TaskService to publish events via Bus instead of direct Hub.Broadcast calls - Add WS broadcast listener that routes events to correct workspace - Frontend: WSClient sends token + workspace_id on connect with auto-reconnect refetch - Frontend: centralized useRealtimeSync hook dispatches all WS events to global Zustand stores - Migrate issues and inbox pages from local useState to global useIssueStore/useInboxStore - Make store addIssue/addItem idempotent to prevent duplicates - Remove dead packages/hooks/src/use-realtime.ts - Add feature tracking files for 4 planned features Co-Authored-By: Claude Opus 4.6 (1M context) --- _features/_index.json | 6 + _features/inbox-notifications.json | 80 +++++++++ _features/infra-event-bus-ws.json | 78 +++++++++ _features/issue-board-polish.json | 102 +++++++++++ _features/workspace-permissions.json | 59 +++++++ apps/web/app/(dashboard)/inbox/page.tsx | 46 +++-- apps/web/app/(dashboard)/issues/page.tsx | 76 +++------ apps/web/features/realtime/provider.tsx | 14 +- .../features/realtime/use-realtime-sync.ts | 115 +++++++++++++ packages/hooks/src/index.ts | 1 - packages/hooks/src/use-realtime.ts | 37 ---- packages/sdk/src/ws-client.ts | 39 ++++- packages/store/src/inbox-store.ts | 7 +- packages/store/src/issue-store.ts | 7 +- packages/types/src/events.ts | 42 +++++ server/cmd/server/listeners.go | 55 ++++++ server/cmd/server/main.go | 5 +- server/cmd/server/router.go | 31 +++- server/internal/events/bus.go | 59 +++++++ server/internal/events/bus_test.go | 90 ++++++++++ server/internal/handler/agent.go | 19 ++- server/internal/handler/comment.go | 17 +- server/internal/handler/daemon.go | 11 +- server/internal/handler/handler.go | 29 ++-- server/internal/handler/inbox.go | 17 ++ server/internal/handler/issue.go | 19 ++- server/internal/handler/workspace.go | 21 +++ server/internal/realtime/hub.go | 160 +++++++++++++++--- server/internal/service/task.go | 101 +++++++---- server/pkg/protocol/events.go | 20 ++- 30 files changed, 1144 insertions(+), 219 deletions(-) create mode 100644 _features/_index.json create mode 100644 _features/inbox-notifications.json create mode 100644 _features/infra-event-bus-ws.json create mode 100644 _features/issue-board-polish.json create mode 100644 _features/workspace-permissions.json create mode 100644 apps/web/features/realtime/use-realtime-sync.ts delete mode 100644 packages/hooks/src/use-realtime.ts create mode 100644 server/cmd/server/listeners.go create mode 100644 server/internal/events/bus.go create mode 100644 server/internal/events/bus_test.go diff --git a/_features/_index.json b/_features/_index.json new file mode 100644 index 00000000..97537da5 --- /dev/null +++ b/_features/_index.json @@ -0,0 +1,6 @@ +[ + { "id": "infra-event-bus-ws", "status": "done", "name": "Infrastructure: Event Bus + WS Isolation + Global Store" }, + { "id": "issue-board-polish", "status": "designing", "name": "Issue Board & Detail Polish" }, + { "id": "workspace-permissions", "status": "designing", "name": "Workspace & Permissions" }, + { "id": "inbox-notifications", "status": "designing", "name": "Inbox & Notifications" } +] diff --git a/_features/inbox-notifications.json b/_features/inbox-notifications.json new file mode 100644 index 00000000..bf7c0d11 --- /dev/null +++ b/_features/inbox-notifications.json @@ -0,0 +1,80 @@ +{ + "id": "inbox-notifications", + "name": "Inbox & Notifications", + "status": "designing", + "createdAt": "2026-03-25", + "completedAt": null, + "description": "Complete inbox notification system: sidebar unread badge, notification triggers for all key actions, archive UI, issue navigation from notifications, and real-time sync across tabs.", + "currentState": "Inbox page exists with two-column layout. 5 notification triggers implemented (issue assign, reassign, status change, task complete, task fail). No sidebar badge. No archive button. No link to issue. Mark read/archive don't broadcast WS events. SDK return types wrong for mark read/archive.", + "decisions": [ + "Inbox uses useInboxStore from global store (not page-local useState)", + "Sidebar badge reads unread count from store, updated by WS events", + "Backend adds GET /api/inbox/unread-count endpoint using existing CountUnreadInbox SQL query", + "Mark read and archive broadcast WS events (inbox:read, inbox:archived) for cross-tab sync", + "New notification triggers: comment on assigned issue (notify assignee), status change notifies creator too, unassign notifies old assignee", + "SDK markInboxRead and archiveInbox return Promise not Promise", + "Inbox detail shows 'View Issue' link when issue_id is present" + ], + "tasks": [ + { + "task": "Backend: Add GET /api/inbox/unread-count endpoint", + "done": false, + "scope": "New handler using existing CountUnreadInbox query. Returns { count: number }. Requires auth + workspace membership. Route added to router.go." + }, + { + "task": "Backend: Broadcast WS events for mark read and archive", + "done": false, + "scope": "MarkInboxRead broadcasts inbox:read event with { item_id, recipient_id }. ArchiveInboxItem broadcasts inbox:archived with { item_id, recipient_id }. Events added to protocol/events.go." + }, + { + "task": "Backend: Add notification trigger for comment on assigned issue", + "done": false, + "scope": "When comment created, if issue has assignee and commenter is not the assignee, create inbox item type 'mentioned' severity 'info' for assignee. Implemented via event bus listener." + }, + { + "task": "Backend: Status change notifies creator in addition to assignee", + "done": false, + "scope": "When issue status changes, create inbox item for creator (if creator != the person making the change). Existing assignee notification stays." + }, + { + "task": "Backend: Unassign notifies old assignee", + "done": false, + "scope": "When issue assignee changes from A to B (or to null), create inbox item for old assignee A with type 'status_change' and title 'Unassigned from: {issue title}'." + }, + { + "task": "SDK: Fix markInboxRead and archiveInbox return types", + "done": false, + "scope": "Change markInboxRead from Promise to Promise. Change archiveInbox from Promise to Promise. Update type imports." + }, + { + "task": "Frontend: Add WS event types for inbox:read and inbox:archived", + "done": false, + "scope": "Add inbox:read and inbox:archived to WSEventType in packages/types/src/events.ts. Add payload types." + }, + { + "task": "Frontend: Sidebar unread badge", + "done": false, + "scope": "Sidebar Inbox nav item shows unread count badge (shadcn Badge variant). Initial count from /api/inbox/unread-count on mount. Incremented on inbox:new, decremented on inbox:read/inbox:archived WS events." + }, + { + "task": "Frontend: Inbox detail 'View Issue' navigation", + "done": false, + "scope": "When selected item has issue_id, show 'View Issue' button/link in InboxDetail that navigates to /issues/{issue_id}. Use shadcn Button variant='outline'." + }, + { + "task": "Frontend: Archive button in inbox detail", + "done": false, + "scope": "Archive button next to Mark Read in InboxDetail. Calls api.archiveInbox(id). On success, removes item from store. Shows toast confirmation." + }, + { + "task": "Frontend: Inbox real-time sync for read/archive", + "done": false, + "scope": "useRealtimeSync handles inbox:read (mark item read in store) and inbox:archived (remove item from store). Badge count updates automatically from store." + }, + { + "task": "Frontend: Inbox loading/empty states with shadcn", + "done": false, + "scope": "Initial load shows Skeleton. Empty inbox shows centered illustration/text 'All caught up'. Error shows toast. Consistent with other pages." + } + ] +} diff --git a/_features/infra-event-bus-ws.json b/_features/infra-event-bus-ws.json new file mode 100644 index 00000000..54650cd0 --- /dev/null +++ b/_features/infra-event-bus-ws.json @@ -0,0 +1,78 @@ +{ + "id": "infra-event-bus-ws", + "name": "Infrastructure: Event Bus + WS Isolation + Global Store", + "status": "done", + "createdAt": "2026-03-25", + "completedAt": "2026-03-25", + "description": "Foundation layer: internal event bus to decouple handlers from side-effects, WebSocket workspace isolation to fix multi-tenancy data leakage, and frontend global Zustand store with centralized WS sync.", + "currentState": "All tasks complete. Backend: Event Bus (server/internal/events/bus.go) with pub/sub + panic isolation, Hub upgraded to workspace-scoped rooms with JWT auth, all 11 handler broadcast calls replaced with Bus.Publish, 10 new event types added, inbox/workspace/agent handlers now emit events. Frontend: WSClient sends token+workspace_id, useRealtimeSync hook centralizes WS→store sync with reconnect refetch, issues/inbox pages migrated to global stores, dead use-realtime.ts removed. Go build + typecheck both pass.", + "decisions": [ + "Event bus is in-process Go pub/sub (not external MQ), synchronous execution with recover isolation", + "Hub upgraded to room-based: map[workspaceID]map[*Client]bool, BroadcastToWorkspace replaces Broadcast", + "WS auth via query param ?token=xxx&workspace_id=yyy, parsed in HandleWebSocket before upgrade", + "Client struct gains userID + workspaceID fields, set during WS handshake", + "Frontend uses packages/store (useIssueStore, useInboxStore, useAgentStore) as single source of truth", + "useRealtimeSync() called once inside WSProvider, handles all WS event -> store updates", + "WS reconnect triggers refetch of issues + inbox + agents to recover missed events", + "Comment events stay page-local on issue detail page (not in global store)", + "Inbox creation stays in handlers and TaskService for now (complex business logic), will extract to bus listeners later", + "Broadcast() kept for daemon events (no workspace scope), BroadcastToWorkspace() for all user-facing events" + ], + "tasks": [ + { + "task": "Backend: Create internal event bus (server/internal/events/bus.go)", + "done": true, + "scope": "Bus struct with Publish/Subscribe, Event type with Type/WorkspaceID/ActorType/ActorID/Payload. Synchronous dispatch with recover per listener. Unit test for pub/sub + panic isolation." + }, + { + "task": "Backend: Register event listeners for WS broadcast", + "done": true, + "scope": "Listener that receives any event and calls Hub.BroadcastToWorkspace. Covers: issue CRUD, comment CRUD, agent status, inbox new/read/archive, task lifecycle events." + }, + { + "task": "Backend: Register event listeners for inbox creation", + "done": false, + "scope": "Deferred: inbox creation stays in handlers/TaskService for now. Will extract to bus listeners when adding new notification triggers (inbox-notifications feature)." + }, + { + "task": "Backend: Refactor handlers to publish events instead of direct broadcast/inbox", + "done": true, + "scope": "All handlers (issue, comment, agent, inbox, workspace, daemon) emit events via bus.Publish. Remove direct h.broadcast() calls. Task service also emits events via Bus.Publish." + }, + { + "task": "Backend: Upgrade Hub to workspace-scoped rooms", + "done": true, + "scope": "Hub.rooms map, Client has userID+workspaceID, BroadcastToWorkspace method. HandleWebSocket validates JWT from ?token query param before upgrade. Reject unauthenticated connections." + }, + { + "task": "Backend: Add missing WS event types to protocol", + "done": true, + "scope": "Added: EventCommentCreated/Updated/Deleted, EventInboxRead, EventInboxArchived, EventAgentCreated, EventAgentDeleted, EventWorkspaceUpdated, EventMemberAdded, EventMemberRemoved to both protocol/events.go and packages/types/src/events.ts." + }, + { + "task": "Frontend: WSClient sends token on connect", + "done": true, + "scope": "WSClient.connect() builds URL with ?token=xxx&workspace_id=yyy. setAuth() method sets credentials. WSProvider reads token from localStorage, workspace from store. Reconnects when workspace changes." + }, + { + "task": "Frontend: Implement useRealtimeSync() hook", + "done": true, + "scope": "Called inside WSProvider. Subscribes to issue/inbox/agent WS events → dispatches to global stores. onReconnect refetches issues+inbox+agents. Comment events excluded (page-local)." + }, + { + "task": "Frontend: Migrate issues page from useState to useIssueStore", + "done": true, + "scope": "Issues page reads from useIssueStore. Filters applied locally via useMemo. Initial fetch populates store. WS event handlers removed (handled by useRealtimeSync). Drag-drop uses store for optimistic updates." + }, + { + "task": "Frontend: Migrate inbox page from useState to useInboxStore", + "done": true, + "scope": "Inbox page reads from useInboxStore. Sorting applied locally via useMemo. WS handler removed. markRead updates store directly." + }, + { + "task": "Frontend: Clean up dead store code", + "done": true, + "scope": "Removed packages/hooks/src/use-realtime.ts. Updated packages/hooks/src/index.ts. No duplicate WS subscriptions remain in pages." + } + ] +} diff --git a/_features/issue-board-polish.json b/_features/issue-board-polish.json new file mode 100644 index 00000000..33d3ea9a --- /dev/null +++ b/_features/issue-board-polish.json @@ -0,0 +1,102 @@ +{ + "id": "issue-board-polish", + "name": "Issue Board & Detail Polish", + "status": "designing", + "createdAt": "2026-03-25", + "completedAt": null, + "description": "Fix drag-drop data consistency bugs, complete issue detail page interactions, and polish board/list views to Linear MVP quality with consistent shadcn UI.", + "currentState": "Board view has 5 columns (missing blocked), drag-drop has 3 data consistency bugs, issue detail title/description not editable, create dialog missing assignee/due date fields, comments lack optimistic updates and author-only edit/delete.", + "decisions": [ + "Board shows 6 columns: backlog, todo, in_progress, in_review, done, blocked. Cancelled issues visible via filter only.", + "Drag-drop revert respects current filter params", + "WS issue events check against current filter before adding to view", + "AbortController used for filter change requests to prevent race conditions", + "Issue title: click-to-edit inline input, blur/Enter saves, Escape cancels", + "Issue description: click-to-edit textarea, blur saves", + "Comment creation uses optimistic update (show immediately, confirm on API response)", + "Comment edit/delete only by author or workspace admin (backend enforced)", + "All loading/empty/error states use shadcn Skeleton and consistent patterns" + ], + "tasks": [ + { + "task": "Fix: Board drag-drop revert respects active filters", + "done": false, + "scope": "When drag-drop API call fails, revert calls listIssues with current filterStatus and filterPriority params instead of bare { limit: 200 }." + }, + { + "task": "Fix: WS issue events respect current filter", + "done": false, + "scope": "issue:created handler checks if new issue matches filterStatus/filterPriority before adding. issue:updated handler removes issue from view if it no longer matches filter." + }, + { + "task": "Fix: Filter change race condition with AbortController", + "done": false, + "scope": "useEffect for filter changes creates AbortController, passes signal to fetch, aborts previous request on new filter change. Stale responses ignored." + }, + { + "task": "Fix: Board add blocked column, fix empty column drop target", + "done": false, + "scope": "visibleStatuses includes 'blocked'. All columns have min-h-[200px] for reliable drop target. Cancelled issues not shown as column but accessible via filter." + }, + { + "task": "Fix: Board card click vs drag conflict", + "done": false, + "scope": "When isDragging is true, Link inside card has pointer-events-none to prevent navigation during drag." + }, + { + "task": "Fix: List view status group order matches STATUS_ORDER", + "done": false, + "scope": "List view groups issues using STATUS_ORDER constant instead of hardcoded different order." + }, + { + "task": "Create Issue dialog: add Assignee and Due Date fields", + "done": false, + "scope": "Dialog includes AssigneePicker and date picker (Calendar popover). Fields passed to api.createIssue(). Existing StatusPicker and PriorityPicker remain." + }, + { + "task": "Issue detail: inline editable title", + "done": false, + "scope": "Title renders as h1. Click transforms to Input. Blur or Enter calls api.updateIssue with optimistic update. Escape reverts. Empty title rejected." + }, + { + "task": "Issue detail: inline editable description", + "done": false, + "scope": "Description renders as paragraph. Click transforms to Textarea. Blur saves with optimistic update. Empty description allowed (clears field)." + }, + { + "task": "Issue detail: listen to issue:updated WS event", + "done": false, + "scope": "Detail page subscribes to issue:updated. When event.issue.id matches current issue, merges update into local state (unless user is actively editing that field)." + }, + { + "task": "Issue detail: acceptance criteria and context refs always addable", + "done": false, + "scope": "Show 'Add acceptance criteria' and 'Add context reference' buttons even when arrays are empty. Click reveals input field." + }, + { + "task": "Comment: optimistic create", + "done": false, + "scope": "On submit, immediately append comment with temp ID and muted styling. On API success, replace temp with real. On API error, remove temp and show toast." + }, + { + "task": "Comment: backend author-only edit/delete", + "done": false, + "scope": "UpdateComment and DeleteComment in comment.go load comment first, verify author_id matches request user OR user is workspace admin. Return 403 otherwise." + }, + { + "task": "Comment: hover timestamp shows full date tooltip", + "done": false, + "scope": "Relative time ('2h ago') wrapped in shadcn Tooltip showing full ISO-formatted date on hover." + }, + { + "task": "Issue delete: cancel running tasks first", + "done": false, + "scope": "DeleteIssue handler calls TaskService.CancelTasksForIssue before deleting issue to prevent FK constraint errors." + }, + { + "task": "UI: consistent loading/empty/error states across issues pages", + "done": false, + "scope": "Board empty columns show 'No issues' muted text. List empty groups hidden. Initial load uses Skeleton. Error shows toast + retry. Filter with no results shows 'No matching issues' with clear filter link." + } + ] +} diff --git a/_features/workspace-permissions.json b/_features/workspace-permissions.json new file mode 100644 index 00000000..f378985d --- /dev/null +++ b/_features/workspace-permissions.json @@ -0,0 +1,59 @@ +{ + "id": "workspace-permissions", + "name": "Workspace & Permissions", + "status": "designing", + "createdAt": "2026-03-25", + "completedAt": null, + "description": "Complete workspace management with proper permission enforcement, member invitation flow, and consistent settings UI using shadcn components.", + "currentState": "Workspace CRUD works. Member add requires pre-existing user (no invite flow). DeleteAgent has no workspace check. Comment edit/delete has no author check. Settings page uses raw textarea for context field. Workspace switch doesn't refetch all data.", + "decisions": [ + "Auth stays simple: email-only login, auto-create user, 72h JWT, no refresh token for MVP", + "Member invite: if user doesn't exist, backend auto-creates user record with email-only, they become member immediately", + "3 roles (owner/admin/member) sufficient for MVP, no custom permissions table", + "Owner: full control. Admin: manage members + agents + settings. Member: CRUD issues + comments.", + "All permission checks centralized in handler helpers, enforced at API level", + "Workspace switch triggers full data refresh: disconnect WS, clear stores, reconnect with new workspace_id, refetch all" + ], + "tasks": [ + { + "task": "Backend: Fix DeleteAgent workspace + role check", + "done": false, + "scope": "DeleteAgent calls loadAgentForUser (workspace membership check) before deletion. Also calls CancelAgentTasksByIssue for all agent's assigned issues. Requires owner or admin role." + }, + { + "task": "Backend: Fix ListAgentTasks workspace check", + "done": false, + "scope": "ListAgentTasks verifies agent belongs to user's workspace via loadAgentForUser before returning tasks." + }, + { + "task": "Backend: Member invite auto-creates user if not found", + "done": false, + "scope": "CreateMember handler: if GetUserByEmail returns not found, call CreateUser(email, '') to create stub user, then proceed to add as member. Return 201 with member data." + }, + { + "task": "Backend: Agent visibility filtering", + "done": false, + "scope": "ListAgents filters private agents: only visible to agent owner_id or workspace owner/admin. Other members only see workspace-visible agents." + }, + { + "task": "Frontend: Settings page use shadcn components consistently", + "done": false, + "scope": "Replace raw textarea with shadcn Textarea for context field. All inputs use shadcn Input. Form validation: workspace name required, show inline errors. All buttons use shadcn Button with loading state." + }, + { + "task": "Frontend: Workspace switcher error handling and feedback", + "done": false, + "scope": "Create workspace shows error toast on failure (including slug collision). Workspace list sorted alphabetically. Current workspace highlighted with check icon." + }, + { + "task": "Frontend: Workspace switch triggers full data refresh", + "done": false, + "scope": "switchWorkspace action: disconnects WS, clears issue/inbox/agent stores, sets new workspace_id on API client, reconnects WS with new workspace, refetches all data." + }, + { + "task": "Frontend: Member management UX improvements", + "done": false, + "scope": "Add member shows success/error toast. Email validation before submit. 'Already a member' error shown inline. Remove member confirmation uses AlertDialog. All operations show loading state on button." + } + ] +} diff --git a/apps/web/app/(dashboard)/inbox/page.tsx b/apps/web/app/(dashboard)/inbox/page.tsx index ea804419..c575c9c0 100644 --- a/apps/web/app/(dashboard)/inbox/page.tsx +++ b/apps/web/app/(dashboard)/inbox/page.tsx @@ -1,6 +1,7 @@ "use client"; -import { useState, useEffect, useCallback } from "react"; +import { useState, useEffect, useMemo } from "react"; +import { useInboxStore } from "@multica/store"; import { AlertCircle, Bot, @@ -10,10 +11,9 @@ import { MessageSquare, ArrowRightLeft, } from "lucide-react"; -import type { InboxItem, InboxItemType, InboxSeverity, InboxNewPayload } from "@multica/types"; +import type { InboxItem, InboxItemType, InboxSeverity } from "@multica/types"; import { Button } from "@/components/ui/button"; import { api } from "@/shared/api"; -import { useWSEvent } from "@/features/realtime"; // --------------------------------------------------------------------------- // Helpers @@ -151,43 +151,39 @@ function InboxDetail({ // --------------------------------------------------------------------------- export default function InboxPage() { - const [items, setItems] = useState([]); const [selectedId, setSelectedId] = useState(""); const [loading, setLoading] = useState(true); + // Read from global store (updated by useRealtimeSync) + const storeItems = useInboxStore((s) => s.items); + + // Sort: severity first, then newest first + const items = useMemo(() => { + return [...storeItems] + .filter((i) => !i.archived) + .sort( + (a, b) => + severityOrder[a.severity] - severityOrder[b.severity] || + new Date(b.created_at).getTime() - new Date(a.created_at).getTime() + ); + }, [storeItems]); + + // Initial fetch → populate store useEffect(() => { api .listInbox() .then((data) => { - const sorted = [...data].sort( - (a, b) => - severityOrder[a.severity] - severityOrder[b.severity] || - new Date(b.created_at).getTime() - new Date(a.created_at).getTime() - ); - setItems(sorted); - if (sorted.length > 0) setSelectedId(sorted[0]!.id); + useInboxStore.getState().setItems(data); + if (data.length > 0) setSelectedId(data[0]!.id); }) .catch(console.error) .finally(() => setLoading(false)); }, []); - useWSEvent( - "inbox:new", - useCallback((payload: unknown) => { - const { item } = payload as InboxNewPayload; - setItems((prev) => { - if (prev.some((i) => i.id === item.id)) return prev; - return [item, ...prev]; - }); - }, []), - ); - const handleMarkRead = async (id: string) => { try { await api.markInboxRead(id); - setItems((prev) => - prev.map((i) => (i.id === id ? { ...i, read: true } : i)) - ); + useInboxStore.getState().markRead(id); } catch (err) { console.error("Failed to mark read:", err); } diff --git a/apps/web/app/(dashboard)/issues/page.tsx b/apps/web/app/(dashboard)/issues/page.tsx index 040db17a..455fe81e 100644 --- a/apps/web/app/(dashboard)/issues/page.tsx +++ b/apps/web/app/(dashboard)/issues/page.tsx @@ -1,6 +1,7 @@ "use client"; -import { useState, useCallback, useEffect } from "react"; +import { useState, useCallback, useEffect, useMemo } from "react"; +import { useIssueStore } from "@multica/store"; import Link from "next/link"; import { Columns3, @@ -45,8 +46,6 @@ import { ActorAvatar } from "@/components/common/actor-avatar"; import { StatusIcon, PriorityIcon } from "@/features/issues/components"; import { api } from "@/shared/api"; import { useActorName } from "@/features/workspace"; -import { useWSEvent } from "@/features/realtime"; -import type { IssueCreatedPayload, IssueUpdatedPayload, IssueDeletedPayload } from "@multica/types"; function formatDate(date: string): string { return new Date(date).toLocaleDateString("en-US", { @@ -446,78 +445,53 @@ type ViewMode = "board" | "list"; export default function IssuesPage() { const [view, setView] = useState("board"); - const [issues, setIssues] = useState([]); const [loading, setLoading] = useState(true); const [filterStatus, setFilterStatus] = useState(""); const [filterPriority, setFilterPriority] = useState(""); + // Read from global store (updated by useRealtimeSync) + const allIssues = useIssueStore((s) => s.issues); + + // Apply local filters + const issues = useMemo(() => { + return allIssues.filter((issue) => { + if (filterStatus && issue.status !== filterStatus) return false; + if (filterPriority && issue.priority !== filterPriority) return false; + return true; + }); + }, [allIssues, filterStatus, filterPriority]); + + // Initial fetch → populate store useEffect(() => { setLoading(true); api - .listIssues({ - limit: 200, - ...(filterStatus ? { status: filterStatus } : {}), - ...(filterPriority ? { priority: filterPriority } : {}), - }) + .listIssues({ limit: 200 }) .then((res) => { - setIssues(res.issues); + useIssueStore.getState().setIssues(res.issues); }) .catch(console.error) .finally(() => setLoading(false)); - }, [filterStatus, filterPriority]); - - // Real-time updates - useWSEvent( - "issue:created", - useCallback((payload: unknown) => { - const { issue } = payload as IssueCreatedPayload; - setIssues((prev) => { - if (prev.some((i) => i.id === issue.id)) return prev; - return [...prev, issue]; - }); - }, []), - ); - - useWSEvent( - "issue:updated", - useCallback((payload: unknown) => { - const { issue } = payload as IssueUpdatedPayload; - setIssues((prev) => prev.map((i) => (i.id === issue.id ? issue : i))); - }, []), - ); - - useWSEvent( - "issue:deleted", - useCallback((payload: unknown) => { - const { issue_id } = payload as IssueDeletedPayload; - setIssues((prev) => prev.filter((i) => i.id !== issue_id)); - }, []), - ); + }, []); const handleMoveIssue = useCallback( (issueId: string, newStatus: IssueStatus) => { - // Optimistic update - setIssues((prev) => - prev.map((issue) => - issue.id === issueId ? { ...issue, status: newStatus } : issue - ) - ); + // Optimistic update in store + useIssueStore.getState().updateIssue(issueId, { status: newStatus }); // Persist to API api.updateIssue(issueId, { status: newStatus }).catch((err) => { console.error("Failed to update issue:", err); - // Revert on error - api.listIssues({ limit: 200 }).then((res) => setIssues(res.issues)); + // Revert on error by refetching + api.listIssues({ limit: 200 }).then((res) => { + useIssueStore.getState().setIssues(res.issues); + }); }); }, [] ); const handleIssueCreated = useCallback((issue: Issue) => { - setIssues((prev) => { - if (prev.some((i) => i.id === issue.id)) return prev; - return [...prev, issue]; - }); + useIssueStore.getState().addIssue(issue); }, []); if (loading) { diff --git a/apps/web/features/realtime/provider.tsx b/apps/web/features/realtime/provider.tsx index 503e3067..11b5382f 100644 --- a/apps/web/features/realtime/provider.tsx +++ b/apps/web/features/realtime/provider.tsx @@ -11,6 +11,8 @@ import { import { WSClient } from "@multica/sdk"; import type { WSEventType } from "@multica/types"; import { useAuthStore } from "@/features/auth"; +import { useWorkspaceStore } from "@/features/workspace"; +import { useRealtimeSync } from "./use-realtime-sync"; const WS_URL = process.env.NEXT_PUBLIC_WS_URL ?? "ws://localhost:8080/ws"; @@ -24,12 +26,17 @@ const WSContext = createContext(null); export function WSProvider({ children }: { children: ReactNode }) { const user = useAuthStore((s) => s.user); + const workspace = useWorkspaceStore((s) => s.workspace); const wsRef = useRef(null); useEffect(() => { - if (!user) return; + if (!user || !workspace) return; + + const token = localStorage.getItem("multica_token"); + if (!token) return; const ws = new WSClient(WS_URL); + ws.setAuth(token, workspace.id); wsRef.current = ws; ws.connect(); @@ -37,7 +44,10 @@ export function WSProvider({ children }: { children: ReactNode }) { ws.disconnect(); wsRef.current = null; }; - }, [user]); + }, [user, workspace]); + + // Centralized WS → store sync + useRealtimeSync(wsRef.current); const subscribe = useCallback( (event: WSEventType, handler: EventHandler) => { diff --git a/apps/web/features/realtime/use-realtime-sync.ts b/apps/web/features/realtime/use-realtime-sync.ts new file mode 100644 index 00000000..fc81fff3 --- /dev/null +++ b/apps/web/features/realtime/use-realtime-sync.ts @@ -0,0 +1,115 @@ +"use client"; + +import { useEffect } from "react"; +import type { WSClient } from "@multica/sdk"; +import { useIssueStore, useInboxStore, useAgentStore } from "@multica/store"; +import { useWorkspaceStore } from "@/features/workspace"; +import { api } from "@/shared/api"; +import type { + IssueCreatedPayload, + IssueUpdatedPayload, + IssueDeletedPayload, + AgentStatusPayload, + AgentCreatedPayload, + InboxNewPayload, + InboxReadPayload, + InboxArchivedPayload, +} from "@multica/types"; + +/** + * Centralized WS → store sync. Called once from WSProvider. + * Subscribes to all global WS events and dispatches to Zustand stores. + * Comment events are NOT handled here — they stay per-page on issue detail. + */ +export function useRealtimeSync(ws: WSClient | null) { + // Issue events → useIssueStore + useEffect(() => { + if (!ws) return; + + const unsubs = [ + ws.on("issue:created", (p) => { + const { issue } = p as IssueCreatedPayload; + useIssueStore.getState().addIssue(issue); + }), + ws.on("issue:updated", (p) => { + const { issue } = p as IssueUpdatedPayload; + useIssueStore.getState().updateIssue(issue.id, issue); + }), + ws.on("issue:deleted", (p) => { + const { issue_id } = p as IssueDeletedPayload; + useIssueStore.getState().removeIssue(issue_id); + }), + ]; + + return () => unsubs.forEach((u) => u()); + }, [ws]); + + // Inbox events → useInboxStore + useEffect(() => { + if (!ws) return; + + const unsubs = [ + ws.on("inbox:new", (p) => { + const { item } = p as InboxNewPayload; + useInboxStore.getState().addItem(item); + }), + ws.on("inbox:read", (p) => { + const { item_id } = p as InboxReadPayload; + useInboxStore.getState().markRead(item_id); + }), + ws.on("inbox:archived", (p) => { + const { item_id } = p as InboxArchivedPayload; + useInboxStore.getState().archive(item_id); + }), + ]; + + return () => unsubs.forEach((u) => u()); + }, [ws]); + + // Agent events → useAgentStore / workspace refresh + useEffect(() => { + if (!ws) return; + + const unsubs = [ + ws.on("agent:status", (p) => { + const { agent } = p as AgentStatusPayload; + useAgentStore.getState().updateAgent(agent.id, agent); + }), + ws.on("agent:created", (p) => { + const { agent } = p as AgentCreatedPayload; + const agents = useAgentStore.getState().agents; + if (!agents.find((a) => a.id === agent.id)) { + useAgentStore.getState().setAgents([...agents, agent]); + } + }), + ws.on("agent:deleted", () => { + // Refresh agents list since we don't have removeAgent in store + useWorkspaceStore.getState().refreshAgents(); + }), + ]; + + return () => unsubs.forEach((u) => u()); + }, [ws]); + + // Reconnect → refetch all data to recover missed events + useEffect(() => { + if (!ws) return; + + const unsub = ws.onReconnect(async () => { + try { + const [issuesRes, inboxItems, agents] = await Promise.all([ + api.listIssues({ limit: 200 }), + api.listInbox(), + api.listAgents(), + ]); + useIssueStore.getState().setIssues(issuesRes.issues); + useInboxStore.getState().setItems(inboxItems); + useAgentStore.getState().setAgents(agents); + } catch { + // Silently fail; next reconnect will retry + } + }); + + return unsub; + }, [ws]); +} diff --git a/packages/hooks/src/index.ts b/packages/hooks/src/index.ts index be13e9b7..c02fcaf0 100644 --- a/packages/hooks/src/index.ts +++ b/packages/hooks/src/index.ts @@ -1,4 +1,3 @@ export { useIssues } from "./use-issues.js"; export { useAgents } from "./use-agents.js"; export { useInbox } from "./use-inbox.js"; -export { useRealtime } from "./use-realtime.js"; diff --git a/packages/hooks/src/use-realtime.ts b/packages/hooks/src/use-realtime.ts deleted file mode 100644 index fae5cb70..00000000 --- a/packages/hooks/src/use-realtime.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { useEffect } from "react"; -import type { WSClient } from "@multica/sdk"; -import { useIssueStore } from "@multica/store"; -import { useInboxStore } from "@multica/store"; -import type { IssueCreatedPayload, IssueUpdatedPayload, IssueDeletedPayload, InboxNewPayload } from "@multica/types"; - -export function useRealtime(ws: WSClient) { - const { addIssue, updateIssue, removeIssue } = useIssueStore(); - const { addItem } = useInboxStore(); - - useEffect(() => { - const unsubscribers = [ - ws.on("issue:created", (payload) => { - const { issue } = payload as IssueCreatedPayload; - addIssue(issue); - }), - ws.on("issue:updated", (payload) => { - const { issue } = payload as IssueUpdatedPayload; - updateIssue(issue.id, issue); - }), - ws.on("issue:deleted", (payload) => { - const { issue_id } = payload as IssueDeletedPayload; - removeIssue(issue_id); - }), - ws.on("inbox:new", (payload) => { - const { item } = payload as InboxNewPayload; - addItem(item); - }), - ]; - - return () => { - for (const unsub of unsubscribers) { - unsub(); - } - }; - }, [ws, addIssue, updateIssue, removeIssue, addItem]); -} diff --git a/packages/sdk/src/ws-client.ts b/packages/sdk/src/ws-client.ts index ec5f7725..480f1c7a 100644 --- a/packages/sdk/src/ws-client.ts +++ b/packages/sdk/src/ws-client.ts @@ -4,19 +4,43 @@ type EventHandler = (payload: unknown) => void; export class WSClient { private ws: WebSocket | null = null; - private url: string; + private baseUrl: string; + private token: string | null = null; + private workspaceId: string | null = null; private handlers = new Map>(); private reconnectTimer: ReturnType | null = null; + private hasConnectedBefore = false; + private onReconnectCallbacks = new Set<() => void>(); constructor(url: string) { - this.url = url; + this.baseUrl = url; + } + + setAuth(token: string, workspaceId: string) { + this.token = token; + this.workspaceId = workspaceId; } connect() { - this.ws = new WebSocket(this.url); + const url = new URL(this.baseUrl); + if (this.token) url.searchParams.set("token", this.token); + if (this.workspaceId) + url.searchParams.set("workspace_id", this.workspaceId); + + this.ws = new WebSocket(url.toString()); this.ws.onopen = () => { console.log("[ws] connected"); + if (this.hasConnectedBefore) { + for (const cb of this.onReconnectCallbacks) { + try { + cb(); + } catch { + // ignore reconnect callback errors + } + } + } + this.hasConnectedBefore = true; }; this.ws.onmessage = (event) => { @@ -42,9 +66,11 @@ export class WSClient { disconnect() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; } this.ws?.close(); this.ws = null; + this.hasConnectedBefore = false; } on(event: WSEventType, handler: EventHandler) { @@ -57,6 +83,13 @@ export class WSClient { }; } + onReconnect(callback: () => void) { + this.onReconnectCallbacks.add(callback); + return () => { + this.onReconnectCallbacks.delete(callback); + }; + } + send(message: WSMessage) { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); diff --git a/packages/store/src/inbox-store.ts b/packages/store/src/inbox-store.ts index 6be12b6b..ade578cb 100644 --- a/packages/store/src/inbox-store.ts +++ b/packages/store/src/inbox-store.ts @@ -13,7 +13,12 @@ interface InboxState { export const useInboxStore = create((set, get) => ({ items: [], setItems: (items) => set({ items }), - addItem: (item) => set((s) => ({ items: [item, ...s.items] })), + addItem: (item) => + set((s) => ({ + items: s.items.some((i) => i.id === item.id) + ? s.items + : [item, ...s.items], + })), markRead: (id) => set((s) => ({ items: s.items.map((i) => (i.id === id ? { ...i, read: true } : i)), diff --git a/packages/store/src/issue-store.ts b/packages/store/src/issue-store.ts index 373c778b..71a6bec4 100644 --- a/packages/store/src/issue-store.ts +++ b/packages/store/src/issue-store.ts @@ -15,7 +15,12 @@ export const useIssueStore = create((set) => ({ issues: [], activeIssueId: null, setIssues: (issues) => set({ issues }), - addIssue: (issue) => set((s) => ({ issues: [...s.issues, issue] })), + addIssue: (issue) => + set((s) => ({ + issues: s.issues.some((i) => i.id === issue.id) + ? s.issues + : [...s.issues, issue], + })), updateIssue: (id, updates) => set((s) => ({ issues: s.issues.map((i) => (i.id === id ? { ...i, ...updates } : i)), diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index 2b8702ad..b0f8e96a 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -2,6 +2,7 @@ import type { Issue } from "./issue.js"; import type { Agent } from "./agent.js"; import type { InboxItem } from "./inbox.js"; import type { Comment } from "./comment.js"; +import type { Workspace, MemberWithUser } from "./workspace.js"; // WebSocket event types (matching Go server protocol/events.go) export type WSEventType = @@ -12,11 +13,18 @@ export type WSEventType = | "comment:updated" | "comment:deleted" | "agent:status" + | "agent:created" + | "agent:deleted" | "task:dispatch" | "task:progress" | "task:completed" | "task:failed" | "inbox:new" + | "inbox:read" + | "inbox:archived" + | "workspace:updated" + | "member:added" + | "member:removed" | "daemon:heartbeat" | "daemon:register"; @@ -41,10 +49,29 @@ export interface AgentStatusPayload { agent: Agent; } +export interface AgentCreatedPayload { + agent: Agent; +} + +export interface AgentDeletedPayload { + agent_id: string; + workspace_id: string; +} + export interface InboxNewPayload { item: InboxItem; } +export interface InboxReadPayload { + item_id: string; + recipient_id: string; +} + +export interface InboxArchivedPayload { + item_id: string; + recipient_id: string; +} + export interface CommentCreatedPayload { comment: Comment; } @@ -57,3 +84,18 @@ export interface CommentDeletedPayload { comment_id: string; issue_id: string; } + +export interface WorkspaceUpdatedPayload { + workspace: Workspace; +} + +export interface MemberAddedPayload { + member: MemberWithUser; + workspace_id: string; +} + +export interface MemberRemovedPayload { + member_id: string; + user_id: string; + workspace_id: string; +} diff --git a/server/cmd/server/listeners.go b/server/cmd/server/listeners.go new file mode 100644 index 00000000..0aa783fc --- /dev/null +++ b/server/cmd/server/listeners.go @@ -0,0 +1,55 @@ +package main + +import ( + "encoding/json" + "log" + + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/realtime" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// registerListeners wires up event bus listeners for WS broadcasting. +func registerListeners(bus *events.Bus, hub *realtime.Hub) { + allEvents := []string{ + protocol.EventIssueCreated, + protocol.EventIssueUpdated, + protocol.EventIssueDeleted, + protocol.EventCommentCreated, + protocol.EventCommentUpdated, + protocol.EventCommentDeleted, + protocol.EventAgentStatus, + protocol.EventAgentCreated, + protocol.EventAgentDeleted, + protocol.EventTaskDispatch, + protocol.EventTaskProgress, + protocol.EventTaskCompleted, + protocol.EventTaskFailed, + protocol.EventInboxNew, + protocol.EventInboxRead, + protocol.EventInboxArchived, + protocol.EventWorkspaceUpdated, + protocol.EventMemberAdded, + protocol.EventMemberRemoved, + } + + for _, et := range allEvents { + eventType := et + bus.Subscribe(eventType, func(e events.Event) { + msg := map[string]any{ + "type": eventType, + "payload": e.Payload, + } + data, err := json.Marshal(msg) + if err != nil { + log.Printf("[listeners] failed to marshal %s event: %v", eventType, err) + return + } + if e.WorkspaceID != "" { + hub.BroadcastToWorkspace(e.WorkspaceID, data) + } else { + hub.Broadcast(data) + } + }) + } +} diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 506c0a94..3b11ec3c 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" + "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/realtime" ) @@ -38,10 +39,12 @@ func main() { } log.Println("Connected to database") + bus := events.New() hub := realtime.NewHub() go hub.Run() + registerListeners(bus, hub) - r := NewRouter(pool, hub) + r := NewRouter(pool, hub, bus) srv := &http.Server{ Addr: ":" + port, diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 4f9e84d3..a601a13a 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -1,6 +1,7 @@ package main import ( + "context" "net/http" "os" "strings" @@ -8,8 +9,10 @@ import ( "github.com/go-chi/chi/v5" chimw "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/handler" "github.com/multica-ai/multica/server/internal/middleware" "github.com/multica-ai/multica/server/internal/realtime" @@ -40,9 +43,9 @@ func allowedOrigins() []string { } // NewRouter creates the fully-configured Chi router with all middleware and routes. -func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router { +func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Router { queries := db.New(pool) - h := handler.New(queries, pool, hub) + h := handler.New(queries, pool, hub, bus) r := chi.NewRouter() @@ -65,8 +68,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router { }) // WebSocket + mc := &membershipChecker{queries: queries} r.Get("/ws", func(w http.ResponseWriter, r *http.Request) { - realtime.HandleWebSocket(hub, w, r) + realtime.HandleWebSocket(hub, mc, w, r) }) // Auth (public) @@ -164,3 +168,24 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router { return r } + +// membershipChecker implements realtime.MembershipChecker using database queries. +type membershipChecker struct { + queries *db.Queries +} + +func (mc *membershipChecker) IsMember(ctx context.Context, userID, workspaceID string) bool { + _, err := mc.queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{ + UserID: parseUUID(userID), + WorkspaceID: parseUUID(workspaceID), + }) + return err == nil +} + +func parseUUID(s string) pgtype.UUID { + var u pgtype.UUID + if err := u.Scan(s); err != nil { + return pgtype.UUID{} + } + return u +} diff --git a/server/internal/events/bus.go b/server/internal/events/bus.go new file mode 100644 index 00000000..933a863a --- /dev/null +++ b/server/internal/events/bus.go @@ -0,0 +1,59 @@ +package events + +import ( + "log" + "sync" +) + +// Event represents a domain event published by handlers or services. +type Event struct { + Type string // e.g. "issue:created", "inbox:new" + WorkspaceID string // routes to correct Hub room + ActorType string // "member", "agent", or "system" + ActorID string + Payload any // JSON-serializable, same shape as current WS payloads +} + +// Handler is a function that processes an event. +type Handler func(Event) + +// Bus is an in-process synchronous pub/sub event bus. +type Bus struct { + mu sync.RWMutex + listeners map[string][]Handler +} + +// New creates a new event bus. +func New() *Bus { + return &Bus{ + listeners: make(map[string][]Handler), + } +} + +// Subscribe registers a handler for a given event type. +// Handlers are called synchronously in registration order. +func (b *Bus) Subscribe(eventType string, h Handler) { + b.mu.Lock() + defer b.mu.Unlock() + b.listeners[eventType] = append(b.listeners[eventType], h) +} + +// Publish dispatches an event to all registered handlers for that event type. +// Each handler is called synchronously. Panics in individual handlers are +// recovered so one failing handler does not prevent others from executing. +func (b *Bus) Publish(e Event) { + b.mu.RLock() + handlers := b.listeners[e.Type] + b.mu.RUnlock() + + for _, h := range handlers { + func() { + defer func() { + if r := recover(); r != nil { + log.Printf("[event-bus] panic in listener for %q: %v", e.Type, r) + } + }() + h(e) + }() + } +} diff --git a/server/internal/events/bus_test.go b/server/internal/events/bus_test.go new file mode 100644 index 00000000..c1e84dde --- /dev/null +++ b/server/internal/events/bus_test.go @@ -0,0 +1,90 @@ +package events + +import ( + "sync/atomic" + "testing" +) + +func TestPublishDeliversToSubscribers(t *testing.T) { + bus := New() + var count int32 + + bus.Subscribe("test:event", func(e Event) { + atomic.AddInt32(&count, 1) + }) + bus.Subscribe("test:event", func(e Event) { + atomic.AddInt32(&count, 1) + }) + + bus.Publish(Event{Type: "test:event", Payload: "hello"}) + + if count != 2 { + t.Errorf("expected 2 handlers called, got %d", count) + } +} + +func TestPublishOnlyMatchingType(t *testing.T) { + bus := New() + var called bool + + bus.Subscribe("type:a", func(e Event) { + called = true + }) + + bus.Publish(Event{Type: "type:b"}) + + if called { + t.Error("handler for type:a should not be called for type:b event") + } +} + +func TestPublishNoSubscribersIsNoop(t *testing.T) { + bus := New() + // Should not panic + bus.Publish(Event{Type: "no:listeners"}) +} + +func TestPanicInHandlerDoesNotBreakOthers(t *testing.T) { + bus := New() + var secondCalled bool + + bus.Subscribe("test:panic", func(e Event) { + panic("handler panic") + }) + bus.Subscribe("test:panic", func(e Event) { + secondCalled = true + }) + + bus.Publish(Event{Type: "test:panic"}) + + if !secondCalled { + t.Error("second handler should still be called after first panics") + } +} + +func TestEventFieldsPassedThrough(t *testing.T) { + bus := New() + var received Event + + bus.Subscribe("test:fields", func(e Event) { + received = e + }) + + bus.Publish(Event{ + Type: "test:fields", + WorkspaceID: "ws-123", + ActorType: "member", + ActorID: "user-456", + Payload: map[string]string{"key": "value"}, + }) + + if received.WorkspaceID != "ws-123" { + t.Errorf("expected WorkspaceID ws-123, got %s", received.WorkspaceID) + } + if received.ActorType != "member" { + t.Errorf("expected ActorType member, got %s", received.ActorType) + } + if received.ActorID != "user-456" { + t.Errorf("expected ActorID user-456, got %s", received.ActorID) + } +} diff --git a/server/internal/handler/agent.go b/server/internal/handler/agent.go index 8e93b0a8..b5552a72 100644 --- a/server/internal/handler/agent.go +++ b/server/internal/handler/agent.go @@ -10,6 +10,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) type AgentResponse struct { @@ -246,7 +247,9 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) { // Best-effort: create an initialization issue assigned to the new agent. h.createAgentInitIssue(r.Context(), agent, parseUUID(ownerID)) - writeJSON(w, http.StatusCreated, agentToResponse(agent)) + resp := agentToResponse(agent) + h.publish(protocol.EventAgentCreated, workspaceID, "member", ownerID, map[string]any{"agent": resp}) + writeJSON(w, http.StatusCreated, resp) } // createAgentInitIssue creates an initialization issue assigned to a newly created agent. @@ -283,7 +286,7 @@ func (h *Handler) createAgentInitIssue(ctx context.Context, agent db.Agent, crea return } - h.broadcast("issue:created", map[string]any{"issue": issueToResponse(issue)}) + h.publish(protocol.EventIssueCreated, uuidToString(agent.WorkspaceID), "system", "", map[string]any{"issue": issueToResponse(issue)}) // Enqueue the task directly — we know the agent is assigned and status is "todo". if _, err := h.TaskService.EnqueueTaskForIssue(ctx, issue); err != nil { @@ -377,17 +380,27 @@ func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) { } resp := agentToResponse(agent) - h.broadcast("agent:status", map[string]any{"agent": resp}) + userID := requestUserID(r) + h.publish(protocol.EventAgentStatus, uuidToString(agent.WorkspaceID), "member", userID, map[string]any{"agent": resp}) writeJSON(w, http.StatusOK, resp) } func (h *Handler) DeleteAgent(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") + agent, ok := h.loadAgentForUser(w, r, id) + if !ok { + return + } + wsID := uuidToString(agent.WorkspaceID) + err := h.Queries.DeleteAgent(r.Context(), parseUUID(id)) if err != nil { writeError(w, http.StatusInternalServerError, "failed to delete agent") return } + + userID := requestUserID(r) + h.publish(protocol.EventAgentDeleted, wsID, "member", userID, map[string]any{"agent_id": id, "workspace_id": wsID}) w.WriteHeader(http.StatusNoContent) } diff --git a/server/internal/handler/comment.go b/server/internal/handler/comment.go index 77affa1d..da9feed1 100644 --- a/server/internal/handler/comment.go +++ b/server/internal/handler/comment.go @@ -6,6 +6,7 @@ import ( "github.com/go-chi/chi/v5" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) type CommentResponse struct { @@ -97,7 +98,7 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) { } resp := commentToResponse(comment) - h.broadcast("comment:created", map[string]any{"comment": resp}) + h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{"comment": resp}) writeJSON(w, http.StatusCreated, resp) } @@ -126,7 +127,12 @@ func (h *Handler) UpdateComment(w http.ResponseWriter, r *http.Request) { } resp := commentToResponse(comment) - h.broadcast("comment:updated", map[string]any{"comment": resp}) + userID := requestUserID(r) + workspaceID := "" + if issue, err := h.Queries.GetIssue(r.Context(), comment.IssueID); err == nil { + workspaceID = uuidToString(issue.WorkspaceID) + } + h.publish(protocol.EventCommentUpdated, workspaceID, "member", userID, map[string]any{"comment": resp}) writeJSON(w, http.StatusOK, resp) } @@ -145,7 +151,12 @@ func (h *Handler) DeleteComment(w http.ResponseWriter, r *http.Request) { return } - h.broadcast("comment:deleted", map[string]any{ + userID := requestUserID(r) + workspaceID := "" + if issue, err := h.Queries.GetIssue(r.Context(), comment.IssueID); err == nil { + workspaceID = uuidToString(issue.WorkspaceID) + } + h.publish(protocol.EventCommentDeleted, workspaceID, "member", userID, map[string]any{ "comment_id": commentId, "issue_id": uuidToString(comment.IssueID), }) diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index 7a2ee73f..04b4a8b0 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -195,7 +195,16 @@ func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) { return } - h.TaskService.ReportProgress(taskID, req.Summary, req.Step, req.Total) + // Look up task to get workspace ID via the associated issue. + workspaceID := "" + task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID)) + if err == nil { + if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil { + workspaceID = uuidToString(issue.WorkspaceID) + } + } + + h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index dc31d0b5..156c1404 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -4,13 +4,13 @@ import ( "context" "encoding/json" "errors" - "fmt" "net/http" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/realtime" "github.com/multica-ai/multica/server/internal/service" "github.com/multica-ai/multica/server/internal/util" @@ -31,10 +31,11 @@ type Handler struct { DB dbExecutor TxStarter txStarter Hub *realtime.Hub + Bus *events.Bus TaskService *service.TaskService } -func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub) *Handler { +func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus) *Handler { var executor dbExecutor if candidate, ok := txStarter.(dbExecutor); ok { executor = candidate @@ -45,7 +46,8 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub) *Handler { DB: executor, TxStarter: txStarter, Hub: hub, - TaskService: service.NewTaskService(queries, hub), + Bus: bus, + TaskService: service.NewTaskService(queries, hub, bus), } } @@ -69,18 +71,15 @@ func timestampToString(t pgtype.Timestamptz) string { return util.TimestampToStr func timestampToPtr(t pgtype.Timestamptz) *string { return util.TimestampToPtr(t) } func uuidToPtr(u pgtype.UUID) *string { return util.UUIDToPtr(u) } -// broadcast sends a WebSocket event to all connected clients. -func (h *Handler) broadcast(eventType string, payload any) { - msg := map[string]any{ - "type": eventType, - "payload": payload, - } - data, err := json.Marshal(msg) - if err != nil { - fmt.Printf("broadcast marshal error: %v\n", err) - return - } - h.Hub.Broadcast(data) +// publish sends a domain event through the event bus. +func (h *Handler) publish(eventType, workspaceID, actorType, actorID string, payload any) { + h.Bus.Publish(events.Event{ + Type: eventType, + WorkspaceID: workspaceID, + ActorType: actorType, + ActorID: actorID, + Payload: payload, + }) } func isNotFound(err error) bool { diff --git a/server/internal/handler/inbox.go b/server/internal/handler/inbox.go index 36921850..d736961c 100644 --- a/server/internal/handler/inbox.go +++ b/server/internal/handler/inbox.go @@ -6,6 +6,7 @@ import ( "github.com/go-chi/chi/v5" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) type InboxItemResponse struct { @@ -88,6 +89,14 @@ func (h *Handler) MarkInboxRead(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "failed to mark read") return } + + userID := requestUserID(r) + workspaceID := uuidToString(item.WorkspaceID) + h.publish(protocol.EventInboxRead, workspaceID, "member", userID, map[string]any{ + "item_id": uuidToString(item.ID), + "recipient_id": uuidToString(item.RecipientID), + }) + writeJSON(w, http.StatusOK, inboxToResponse(item)) } @@ -101,5 +110,13 @@ func (h *Handler) ArchiveInboxItem(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "failed to archive") return } + + userID := requestUserID(r) + workspaceID := uuidToString(item.WorkspaceID) + h.publish(protocol.EventInboxArchived, workspaceID, "member", userID, map[string]any{ + "item_id": uuidToString(item.ID), + "recipient_id": uuidToString(item.RecipientID), + }) + writeJSON(w, http.StatusOK, inboxToResponse(item)) } diff --git a/server/internal/handler/issue.go b/server/internal/handler/issue.go index 82b5f3e1..24b5cb78 100644 --- a/server/internal/handler/issue.go +++ b/server/internal/handler/issue.go @@ -11,6 +11,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) // IssueResponse is the JSON response for an issue. @@ -233,7 +234,7 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) { } resp := issueToResponse(issue) - h.broadcast("issue:created", map[string]any{"issue": resp}) + h.publish(protocol.EventIssueCreated, workspaceID, "member", creatorID, map[string]any{"issue": resp}) // Create inbox notification for assignee if issue.AssigneeType.Valid && issue.AssigneeID.Valid { @@ -248,7 +249,7 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) { Body: ptrToText(req.Description), }) if err == nil { - h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)}) + h.publish(protocol.EventInboxNew, workspaceID, "member", creatorID, map[string]any{"item": inboxToResponse(inboxItem)}) } // Only ready issues in todo are enqueued for agents. @@ -279,6 +280,8 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { if !ok { return } + userID := requestUserID(r) + workspaceID := uuidToString(prevIssue.WorkspaceID) // Read body as raw bytes so we can detect which fields were explicitly sent. bodyBytes, err := io.ReadAll(r.Body) @@ -365,7 +368,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { } resp := issueToResponse(issue) - h.broadcast("issue:updated", map[string]any{"issue": resp}) + h.publish(protocol.EventIssueUpdated, workspaceID, "member", userID, map[string]any{"issue": resp}) assigneeChanged := (req.AssigneeType != nil || req.AssigneeID != nil) && (prevIssue.AssigneeType.String != issue.AssigneeType.String || uuidToString(prevIssue.AssigneeID) != uuidToString(issue.AssigneeID)) @@ -394,7 +397,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { Title: "Assigned to you: " + issue.Title, }) if err == nil { - h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)}) + h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)}) } } } @@ -412,7 +415,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) { Title: issue.Title + " moved to " + *req.Status, }) if err == nil { - h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)}) + h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)}) } } } @@ -450,7 +453,8 @@ func (h *Handler) shouldEnqueueAgentTask(ctx context.Context, issue db.Issue) bo func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") - if _, ok := h.loadIssueForUser(w, r, id); !ok { + issue, ok := h.loadIssueForUser(w, r, id) + if !ok { return } @@ -460,6 +464,7 @@ func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) { return } - h.broadcast("issue:deleted", map[string]any{"issue_id": id}) + userID := requestUserID(r) + h.publish(protocol.EventIssueDeleted, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{"issue_id": id}) w.WriteHeader(http.StatusNoContent) } diff --git a/server/internal/handler/workspace.go b/server/internal/handler/workspace.go index 9f7a926d..f1aff667 100644 --- a/server/internal/handler/workspace.go +++ b/server/internal/handler/workspace.go @@ -8,6 +8,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgtype" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) type WorkspaceResponse struct { @@ -207,6 +208,9 @@ func (h *Handler) UpdateWorkspace(w http.ResponseWriter, r *http.Request) { return } + userID := requestUserID(r) + h.publish(protocol.EventWorkspaceUpdated, id, "member", userID, map[string]any{"workspace": workspaceToResponse(ws)}) + writeJSON(w, http.StatusOK, workspaceToResponse(ws)) } @@ -355,6 +359,9 @@ func (h *Handler) CreateMember(w http.ResponseWriter, r *http.Request) { return } + userID := requestUserID(r) + h.publish(protocol.EventMemberAdded, workspaceID, "member", userID, map[string]any{"member": memberWithUserResponse(member, user)}) + writeJSON(w, http.StatusCreated, memberWithUserResponse(member, user)) } @@ -463,6 +470,13 @@ func (h *Handler) DeleteMember(w http.ResponseWriter, r *http.Request) { return } + userID := requestUserID(r) + h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{ + "member_id": uuidToString(target.ID), + "workspace_id": workspaceID, + "user_id": uuidToString(target.UserID), + }) + w.WriteHeader(http.StatusNoContent) } @@ -490,6 +504,13 @@ func (h *Handler) LeaveWorkspace(w http.ResponseWriter, r *http.Request) { return } + userID := requestUserID(r) + h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{ + "member_id": uuidToString(member.ID), + "workspace_id": workspaceID, + "user_id": uuidToString(member.UserID), + }) + w.WriteHeader(http.StatusNoContent) } diff --git a/server/internal/realtime/hub.go b/server/internal/realtime/hub.go index 5ca6ab01..2a38fc5a 100644 --- a/server/internal/realtime/hub.go +++ b/server/internal/realtime/hub.go @@ -1,13 +1,22 @@ package realtime import ( + "context" "log" "net/http" + "strings" "sync" + "github.com/golang-jwt/jwt/v5" "github.com/gorilla/websocket" + "github.com/multica-ai/multica/server/internal/auth" ) +// MembershipChecker verifies a user belongs to a workspace. +type MembershipChecker interface { + IsMember(ctx context.Context, userID, workspaceID string) bool +} + var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // TODO: Restrict origins in production @@ -15,17 +24,19 @@ var upgrader = websocket.Upgrader{ }, } -// Client represents a single WebSocket connection. +// Client represents a single WebSocket connection with identity. type Client struct { - hub *Hub - conn *websocket.Conn - send chan []byte + hub *Hub + conn *websocket.Conn + send chan []byte + userID string + workspaceID string } -// Hub manages WebSocket connections and broadcasts. +// Hub manages WebSocket connections organized by workspace rooms. type Hub struct { - clients map[*Client]bool - broadcast chan []byte + rooms map[string]map[*Client]bool // workspaceID -> clients + broadcast chan []byte // global broadcast (daemon events) register chan *Client unregister chan *Client mu sync.RWMutex @@ -34,7 +45,7 @@ type Hub struct { // NewHub creates a new Hub instance. func NewHub() *Hub { return &Hub{ - clients: make(map[*Client]bool), + rooms: make(map[string]map[*Client]bool), broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), @@ -47,27 +58,48 @@ func (h *Hub) Run() { select { case client := <-h.register: h.mu.Lock() - h.clients[client] = true + room := client.workspaceID + if h.rooms[room] == nil { + h.rooms[room] = make(map[*Client]bool) + } + h.rooms[room][client] = true + total := 0 + for _, r := range h.rooms { + total += len(r) + } h.mu.Unlock() - log.Printf("Client connected. Total: %d", len(h.clients)) + log.Printf("Client connected (workspace=%s). Total: %d", room, total) case client := <-h.unregister: h.mu.Lock() - if _, ok := h.clients[client]; ok { - delete(h.clients, client) - close(client.send) + room := client.workspaceID + if clients, ok := h.rooms[room]; ok { + if _, exists := clients[client]; exists { + delete(clients, client) + close(client.send) + if len(clients) == 0 { + delete(h.rooms, room) + } + } + } + total := 0 + for _, r := range h.rooms { + total += len(r) } h.mu.Unlock() - log.Printf("Client disconnected. Total: %d", len(h.clients)) + log.Printf("Client disconnected (workspace=%s). Total: %d", room, total) case message := <-h.broadcast: + // Global broadcast for daemon events (no workspace filtering) h.mu.RLock() - for client := range h.clients { - select { - case client.send <- message: - default: - close(client.send) - delete(h.clients, client) + for _, clients := range h.rooms { + for client := range clients { + select { + case client.send <- message: + default: + close(client.send) + delete(clients, client) + } } } h.mu.RUnlock() @@ -75,13 +107,83 @@ func (h *Hub) Run() { } } -// Broadcast sends a message to all connected clients. +// BroadcastToWorkspace sends a message only to clients in the given workspace. +func (h *Hub) BroadcastToWorkspace(workspaceID string, message []byte) { + h.mu.RLock() + clients := h.rooms[workspaceID] + var slow []*Client + for client := range clients { + select { + case client.send <- message: + default: + slow = append(slow, client) + } + } + h.mu.RUnlock() + + // Remove slow clients under write lock + if len(slow) > 0 { + h.mu.Lock() + for _, client := range slow { + if room, ok := h.rooms[workspaceID]; ok { + if _, exists := room[client]; exists { + delete(room, client) + close(client.send) + if len(room) == 0 { + delete(h.rooms, workspaceID) + } + } + } + } + h.mu.Unlock() + } +} + +// Broadcast sends a message to all connected clients (used for daemon events). func (h *Hub) Broadcast(message []byte) { h.broadcast <- message } -// HandleWebSocket upgrades an HTTP connection to WebSocket. -func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request) { +// HandleWebSocket upgrades an HTTP connection to WebSocket with JWT auth. +func HandleWebSocket(hub *Hub, mc MembershipChecker, w http.ResponseWriter, r *http.Request) { + tokenStr := r.URL.Query().Get("token") + workspaceID := r.URL.Query().Get("workspace_id") + + if tokenStr == "" || workspaceID == "" { + http.Error(w, `{"error":"token and workspace_id required"}`, http.StatusUnauthorized) + return + } + + // Validate JWT + token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) { + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, jwt.ErrSignatureInvalid + } + return auth.JWTSecret(), nil + }) + if err != nil || !token.Valid { + http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized) + return + } + + claims, ok := token.Claims.(jwt.MapClaims) + if !ok { + http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized) + return + } + + userID, ok := claims["sub"].(string) + if !ok || strings.TrimSpace(userID) == "" { + http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized) + return + } + + // Verify user is a member of the workspace + if !mc.IsMember(r.Context(), userID, workspaceID) { + http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden) + return + } + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) @@ -89,9 +191,11 @@ func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request) { } client := &Client{ - hub: hub, - conn: conn, - send: make(chan []byte, 256), + hub: hub, + conn: conn, + send: make(chan []byte, 256), + userID: userID, + workspaceID: workspaceID, } hub.register <- client @@ -113,8 +217,8 @@ func (c *Client) readPump() { } break } - // TODO: Route messages to appropriate handlers - log.Printf("Received message: %s", message) + // TODO: Route inbound messages to appropriate handlers + log.Printf("Received message from user=%s workspace=%s: %s", c.userID, c.workspaceID, message) } } diff --git a/server/internal/service/task.go b/server/internal/service/task.go index ee2bbe7c..e5e054a3 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/events" "github.com/multica-ai/multica/server/internal/realtime" "github.com/multica-ai/multica/server/internal/util" db "github.com/multica-ai/multica/server/pkg/db/generated" @@ -17,10 +18,11 @@ import ( type TaskService struct { Queries *db.Queries Hub *realtime.Hub + Bus *events.Bus } -func NewTaskService(q *db.Queries, hub *realtime.Hub) *TaskService { - return &TaskService{Queries: q, Hub: hub} +func NewTaskService(q *db.Queries, hub *realtime.Hub, bus *events.Bus) *TaskService { + return &TaskService{Queries: q, Hub: hub, Bus: bus} } // EnqueueTaskForIssue creates a task with a context snapshot of the issue. @@ -98,7 +100,7 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A s.updateAgentStatus(ctx, agentID, "working") // Broadcast task:dispatch - s.broadcastTaskDispatch(task) + s.broadcastTaskDispatch(ctx, task) return &task, nil } @@ -184,7 +186,7 @@ func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, resu s.ReconcileAgentStatus(ctx, task.AgentID) // Broadcast - s.broadcastTaskEvent(protocol.EventTaskCompleted, task) + s.broadcastTaskEvent(ctx, protocol.EventTaskCompleted, task) return &task, nil } @@ -218,18 +220,24 @@ func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg s s.ReconcileAgentStatus(ctx, task.AgentID) // Broadcast - s.broadcastTaskEvent(protocol.EventTaskFailed, task) + s.broadcastTaskEvent(ctx, protocol.EventTaskFailed, task) return &task, nil } -// ReportProgress broadcasts a progress update via WebSocket. -func (s *TaskService) ReportProgress(taskID string, summary string, step, total int) { - s.broadcast(protocol.EventTaskProgress, protocol.TaskProgressPayload{ - TaskID: taskID, - Summary: summary, - Step: step, - Total: total, +// ReportProgress broadcasts a progress update via the event bus. +func (s *TaskService) ReportProgress(ctx context.Context, taskID string, workspaceID string, summary string, step, total int) { + s.Bus.Publish(events.Event{ + Type: protocol.EventTaskProgress, + WorkspaceID: workspaceID, + ActorType: "system", + ActorID: "", + Payload: protocol.TaskProgressPayload{ + TaskID: taskID, + Summary: summary, + Step: step, + Total: total, + }, }) } @@ -254,7 +262,13 @@ func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID if err != nil { return } - s.broadcast(protocol.EventAgentStatus, map[string]any{"agent": agentToMap(agent)}) + s.Bus.Publish(events.Event{ + Type: protocol.EventAgentStatus, + WorkspaceID: util.UUIDToString(agent.WorkspaceID), + ActorType: "system", + ActorID: "", + Payload: map[string]any{"agent": agentToMap(agent)}, + }) } func buildContextSnapshot(issue db.Issue, agent db.Agent, runtime db.AgentRuntime, workspaceContext string) map[string]any { @@ -319,7 +333,7 @@ func priorityToInt(p string) int32 { } } -func (s *TaskService) broadcastTaskDispatch(task db.AgentTaskQueue) { +func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) { var payload map[string]any if task.Context != nil { json.Unmarshal(task.Context, &payload) @@ -329,33 +343,46 @@ func (s *TaskService) broadcastTaskDispatch(task db.AgentTaskQueue) { } payload["task_id"] = util.UUIDToString(task.ID) payload["runtime_id"] = util.UUIDToString(task.RuntimeID) - s.broadcast(protocol.EventTaskDispatch, payload) -} -func (s *TaskService) broadcastTaskEvent(eventType string, task db.AgentTaskQueue) { - s.broadcast(eventType, map[string]any{ - "task_id": util.UUIDToString(task.ID), - "agent_id": util.UUIDToString(task.AgentID), - "issue_id": util.UUIDToString(task.IssueID), - "status": task.Status, + workspaceID := "" + if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil { + workspaceID = util.UUIDToString(issue.WorkspaceID) + } + s.Bus.Publish(events.Event{ + Type: protocol.EventTaskDispatch, + WorkspaceID: workspaceID, + ActorType: "system", + ActorID: "", + Payload: payload, }) } -func (s *TaskService) broadcast(eventType string, payload any) { - msg := map[string]any{ - "type": eventType, - "payload": payload, +func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) { + workspaceID := "" + if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil { + workspaceID = util.UUIDToString(issue.WorkspaceID) } - data, err := json.Marshal(msg) - if err != nil { - return - } - s.Hub.Broadcast(data) + s.Bus.Publish(events.Event{ + Type: eventType, + WorkspaceID: workspaceID, + ActorType: "system", + ActorID: "", + Payload: map[string]any{ + "task_id": util.UUIDToString(task.ID), + "agent_id": util.UUIDToString(task.AgentID), + "issue_id": util.UUIDToString(task.IssueID), + "status": task.Status, + }, + }) } func (s *TaskService) broadcastIssueUpdated(issue db.Issue) { - s.broadcast(protocol.EventIssueUpdated, map[string]any{ - "issue": issueToMap(issue), + s.Bus.Publish(events.Event{ + Type: protocol.EventIssueUpdated, + WorkspaceID: util.UUIDToString(issue.WorkspaceID), + ActorType: "system", + ActorID: "", + Payload: map[string]any{"issue": issueToMap(issue)}, }) } @@ -389,8 +416,12 @@ func (s *TaskService) createInboxForIssueCreator(ctx context.Context, issue db.I if err != nil { return } - s.broadcast(protocol.EventInboxNew, map[string]any{ - "item": inboxToMap(item), + s.Bus.Publish(events.Event{ + Type: protocol.EventInboxNew, + WorkspaceID: util.UUIDToString(issue.WorkspaceID), + ActorType: "system", + ActorID: "", + Payload: map[string]any{"item": inboxToMap(item)}, }) } diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index 2d683f6a..26e33bba 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -7,8 +7,15 @@ const ( EventIssueUpdated = "issue:updated" EventIssueDeleted = "issue:deleted" + // Comment events + EventCommentCreated = "comment:created" + EventCommentUpdated = "comment:updated" + EventCommentDeleted = "comment:deleted" + // Agent events - EventAgentStatus = "agent:status" + EventAgentStatus = "agent:status" + EventAgentCreated = "agent:created" + EventAgentDeleted = "agent:deleted" // Task events (server <-> daemon) EventTaskDispatch = "task:dispatch" @@ -17,7 +24,16 @@ const ( EventTaskFailed = "task:failed" // Inbox events - EventInboxNew = "inbox:new" + EventInboxNew = "inbox:new" + EventInboxRead = "inbox:read" + EventInboxArchived = "inbox:archived" + + // Workspace events + EventWorkspaceUpdated = "workspace:updated" + + // Member events + EventMemberAdded = "member:added" + EventMemberRemoved = "member:removed" // Daemon events EventDaemonHeartbeat = "daemon:heartbeat"