feat: add event bus, WS workspace isolation, and global store migration

- Add internal event bus (server/internal/events/) with synchronous
  pub/sub and panic isolation per listener
- Upgrade WebSocket Hub to workspace-scoped rooms with JWT auth
  and membership verification on connect
- Add 10 new WS event types (comment CRUD, inbox read/archive,
  agent create/delete, workspace/member events)
- Refactor all handlers and TaskService to publish events via Bus
  instead of direct Hub.Broadcast calls
- Add WS broadcast listener that routes events to correct workspace
- Frontend: WSClient sends token + workspace_id on connect with
  auto-reconnect refetch
- Frontend: centralized useRealtimeSync hook dispatches all WS
  events to global Zustand stores
- Migrate issues and inbox pages from local useState to global
  useIssueStore/useInboxStore
- Make store addIssue/addItem idempotent to prevent duplicates
- Remove dead packages/hooks/src/use-realtime.ts
- Add feature tracking files for 4 planned features

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-03-25 10:08:27 +08:00
parent 0ce25597d6
commit 9127e543d5
30 changed files with 1144 additions and 219 deletions

6
_features/_index.json Normal file
View file

@ -0,0 +1,6 @@
[
{ "id": "infra-event-bus-ws", "status": "done", "name": "Infrastructure: Event Bus + WS Isolation + Global Store" },
{ "id": "issue-board-polish", "status": "designing", "name": "Issue Board & Detail Polish" },
{ "id": "workspace-permissions", "status": "designing", "name": "Workspace & Permissions" },
{ "id": "inbox-notifications", "status": "designing", "name": "Inbox & Notifications" }
]

View file

@ -0,0 +1,80 @@
{
"id": "inbox-notifications",
"name": "Inbox & Notifications",
"status": "designing",
"createdAt": "2026-03-25",
"completedAt": null,
"description": "Complete inbox notification system: sidebar unread badge, notification triggers for all key actions, archive UI, issue navigation from notifications, and real-time sync across tabs.",
"currentState": "Inbox page exists with two-column layout. 5 notification triggers implemented (issue assign, reassign, status change, task complete, task fail). No sidebar badge. No archive button. No link to issue. Mark read/archive don't broadcast WS events. SDK return types wrong for mark read/archive.",
"decisions": [
"Inbox uses useInboxStore from global store (not page-local useState)",
"Sidebar badge reads unread count from store, updated by WS events",
"Backend adds GET /api/inbox/unread-count endpoint using existing CountUnreadInbox SQL query",
"Mark read and archive broadcast WS events (inbox:read, inbox:archived) for cross-tab sync",
"New notification triggers: comment on assigned issue (notify assignee), status change notifies creator too, unassign notifies old assignee",
"SDK markInboxRead and archiveInbox return Promise<InboxItem> not Promise<void>",
"Inbox detail shows 'View Issue' link when issue_id is present"
],
"tasks": [
{
"task": "Backend: Add GET /api/inbox/unread-count endpoint",
"done": false,
"scope": "New handler using existing CountUnreadInbox query. Returns { count: number }. Requires auth + workspace membership. Route added to router.go."
},
{
"task": "Backend: Broadcast WS events for mark read and archive",
"done": false,
"scope": "MarkInboxRead broadcasts inbox:read event with { item_id, recipient_id }. ArchiveInboxItem broadcasts inbox:archived with { item_id, recipient_id }. Events added to protocol/events.go."
},
{
"task": "Backend: Add notification trigger for comment on assigned issue",
"done": false,
"scope": "When comment created, if issue has assignee and commenter is not the assignee, create inbox item type 'mentioned' severity 'info' for assignee. Implemented via event bus listener."
},
{
"task": "Backend: Status change notifies creator in addition to assignee",
"done": false,
"scope": "When issue status changes, create inbox item for creator (if creator != the person making the change). Existing assignee notification stays."
},
{
"task": "Backend: Unassign notifies old assignee",
"done": false,
"scope": "When issue assignee changes from A to B (or to null), create inbox item for old assignee A with type 'status_change' and title 'Unassigned from: {issue title}'."
},
{
"task": "SDK: Fix markInboxRead and archiveInbox return types",
"done": false,
"scope": "Change markInboxRead from Promise<void> to Promise<InboxItem>. Change archiveInbox from Promise<void> to Promise<InboxItem>. Update type imports."
},
{
"task": "Frontend: Add WS event types for inbox:read and inbox:archived",
"done": false,
"scope": "Add inbox:read and inbox:archived to WSEventType in packages/types/src/events.ts. Add payload types."
},
{
"task": "Frontend: Sidebar unread badge",
"done": false,
"scope": "Sidebar Inbox nav item shows unread count badge (shadcn Badge variant). Initial count from /api/inbox/unread-count on mount. Incremented on inbox:new, decremented on inbox:read/inbox:archived WS events."
},
{
"task": "Frontend: Inbox detail 'View Issue' navigation",
"done": false,
"scope": "When selected item has issue_id, show 'View Issue' button/link in InboxDetail that navigates to /issues/{issue_id}. Use shadcn Button variant='outline'."
},
{
"task": "Frontend: Archive button in inbox detail",
"done": false,
"scope": "Archive button next to Mark Read in InboxDetail. Calls api.archiveInbox(id). On success, removes item from store. Shows toast confirmation."
},
{
"task": "Frontend: Inbox real-time sync for read/archive",
"done": false,
"scope": "useRealtimeSync handles inbox:read (mark item read in store) and inbox:archived (remove item from store). Badge count updates automatically from store."
},
{
"task": "Frontend: Inbox loading/empty states with shadcn",
"done": false,
"scope": "Initial load shows Skeleton. Empty inbox shows centered illustration/text 'All caught up'. Error shows toast. Consistent with other pages."
}
]
}

View file

@ -0,0 +1,78 @@
{
"id": "infra-event-bus-ws",
"name": "Infrastructure: Event Bus + WS Isolation + Global Store",
"status": "done",
"createdAt": "2026-03-25",
"completedAt": "2026-03-25",
"description": "Foundation layer: internal event bus to decouple handlers from side-effects, WebSocket workspace isolation to fix multi-tenancy data leakage, and frontend global Zustand store with centralized WS sync.",
"currentState": "All tasks complete. Backend: Event Bus (server/internal/events/bus.go) with pub/sub + panic isolation, Hub upgraded to workspace-scoped rooms with JWT auth, all 11 handler broadcast calls replaced with Bus.Publish, 10 new event types added, inbox/workspace/agent handlers now emit events. Frontend: WSClient sends token+workspace_id, useRealtimeSync hook centralizes WS→store sync with reconnect refetch, issues/inbox pages migrated to global stores, dead use-realtime.ts removed. Go build + typecheck both pass.",
"decisions": [
"Event bus is in-process Go pub/sub (not external MQ), synchronous execution with recover isolation",
"Hub upgraded to room-based: map[workspaceID]map[*Client]bool, BroadcastToWorkspace replaces Broadcast",
"WS auth via query param ?token=xxx&workspace_id=yyy, parsed in HandleWebSocket before upgrade",
"Client struct gains userID + workspaceID fields, set during WS handshake",
"Frontend uses packages/store (useIssueStore, useInboxStore, useAgentStore) as single source of truth",
"useRealtimeSync() called once inside WSProvider, handles all WS event -> store updates",
"WS reconnect triggers refetch of issues + inbox + agents to recover missed events",
"Comment events stay page-local on issue detail page (not in global store)",
"Inbox creation stays in handlers and TaskService for now (complex business logic), will extract to bus listeners later",
"Broadcast() kept for daemon events (no workspace scope), BroadcastToWorkspace() for all user-facing events"
],
"tasks": [
{
"task": "Backend: Create internal event bus (server/internal/events/bus.go)",
"done": true,
"scope": "Bus struct with Publish/Subscribe, Event type with Type/WorkspaceID/ActorType/ActorID/Payload. Synchronous dispatch with recover per listener. Unit test for pub/sub + panic isolation."
},
{
"task": "Backend: Register event listeners for WS broadcast",
"done": true,
"scope": "Listener that receives any event and calls Hub.BroadcastToWorkspace. Covers: issue CRUD, comment CRUD, agent status, inbox new/read/archive, task lifecycle events."
},
{
"task": "Backend: Register event listeners for inbox creation",
"done": false,
"scope": "Deferred: inbox creation stays in handlers/TaskService for now. Will extract to bus listeners when adding new notification triggers (inbox-notifications feature)."
},
{
"task": "Backend: Refactor handlers to publish events instead of direct broadcast/inbox",
"done": true,
"scope": "All handlers (issue, comment, agent, inbox, workspace, daemon) emit events via bus.Publish. Remove direct h.broadcast() calls. Task service also emits events via Bus.Publish."
},
{
"task": "Backend: Upgrade Hub to workspace-scoped rooms",
"done": true,
"scope": "Hub.rooms map, Client has userID+workspaceID, BroadcastToWorkspace method. HandleWebSocket validates JWT from ?token query param before upgrade. Reject unauthenticated connections."
},
{
"task": "Backend: Add missing WS event types to protocol",
"done": true,
"scope": "Added: EventCommentCreated/Updated/Deleted, EventInboxRead, EventInboxArchived, EventAgentCreated, EventAgentDeleted, EventWorkspaceUpdated, EventMemberAdded, EventMemberRemoved to both protocol/events.go and packages/types/src/events.ts."
},
{
"task": "Frontend: WSClient sends token on connect",
"done": true,
"scope": "WSClient.connect() builds URL with ?token=xxx&workspace_id=yyy. setAuth() method sets credentials. WSProvider reads token from localStorage, workspace from store. Reconnects when workspace changes."
},
{
"task": "Frontend: Implement useRealtimeSync() hook",
"done": true,
"scope": "Called inside WSProvider. Subscribes to issue/inbox/agent WS events → dispatches to global stores. onReconnect refetches issues+inbox+agents. Comment events excluded (page-local)."
},
{
"task": "Frontend: Migrate issues page from useState to useIssueStore",
"done": true,
"scope": "Issues page reads from useIssueStore. Filters applied locally via useMemo. Initial fetch populates store. WS event handlers removed (handled by useRealtimeSync). Drag-drop uses store for optimistic updates."
},
{
"task": "Frontend: Migrate inbox page from useState to useInboxStore",
"done": true,
"scope": "Inbox page reads from useInboxStore. Sorting applied locally via useMemo. WS handler removed. markRead updates store directly."
},
{
"task": "Frontend: Clean up dead store code",
"done": true,
"scope": "Removed packages/hooks/src/use-realtime.ts. Updated packages/hooks/src/index.ts. No duplicate WS subscriptions remain in pages."
}
]
}

View file

@ -0,0 +1,102 @@
{
"id": "issue-board-polish",
"name": "Issue Board & Detail Polish",
"status": "designing",
"createdAt": "2026-03-25",
"completedAt": null,
"description": "Fix drag-drop data consistency bugs, complete issue detail page interactions, and polish board/list views to Linear MVP quality with consistent shadcn UI.",
"currentState": "Board view has 5 columns (missing blocked), drag-drop has 3 data consistency bugs, issue detail title/description not editable, create dialog missing assignee/due date fields, comments lack optimistic updates and author-only edit/delete.",
"decisions": [
"Board shows 6 columns: backlog, todo, in_progress, in_review, done, blocked. Cancelled issues visible via filter only.",
"Drag-drop revert respects current filter params",
"WS issue events check against current filter before adding to view",
"AbortController used for filter change requests to prevent race conditions",
"Issue title: click-to-edit inline input, blur/Enter saves, Escape cancels",
"Issue description: click-to-edit textarea, blur saves",
"Comment creation uses optimistic update (show immediately, confirm on API response)",
"Comment edit/delete only by author or workspace admin (backend enforced)",
"All loading/empty/error states use shadcn Skeleton and consistent patterns"
],
"tasks": [
{
"task": "Fix: Board drag-drop revert respects active filters",
"done": false,
"scope": "When drag-drop API call fails, revert calls listIssues with current filterStatus and filterPriority params instead of bare { limit: 200 }."
},
{
"task": "Fix: WS issue events respect current filter",
"done": false,
"scope": "issue:created handler checks if new issue matches filterStatus/filterPriority before adding. issue:updated handler removes issue from view if it no longer matches filter."
},
{
"task": "Fix: Filter change race condition with AbortController",
"done": false,
"scope": "useEffect for filter changes creates AbortController, passes signal to fetch, aborts previous request on new filter change. Stale responses ignored."
},
{
"task": "Fix: Board add blocked column, fix empty column drop target",
"done": false,
"scope": "visibleStatuses includes 'blocked'. All columns have min-h-[200px] for reliable drop target. Cancelled issues not shown as column but accessible via filter."
},
{
"task": "Fix: Board card click vs drag conflict",
"done": false,
"scope": "When isDragging is true, Link inside card has pointer-events-none to prevent navigation during drag."
},
{
"task": "Fix: List view status group order matches STATUS_ORDER",
"done": false,
"scope": "List view groups issues using STATUS_ORDER constant instead of hardcoded different order."
},
{
"task": "Create Issue dialog: add Assignee and Due Date fields",
"done": false,
"scope": "Dialog includes AssigneePicker and date picker (Calendar popover). Fields passed to api.createIssue(). Existing StatusPicker and PriorityPicker remain."
},
{
"task": "Issue detail: inline editable title",
"done": false,
"scope": "Title renders as h1. Click transforms to Input. Blur or Enter calls api.updateIssue with optimistic update. Escape reverts. Empty title rejected."
},
{
"task": "Issue detail: inline editable description",
"done": false,
"scope": "Description renders as paragraph. Click transforms to Textarea. Blur saves with optimistic update. Empty description allowed (clears field)."
},
{
"task": "Issue detail: listen to issue:updated WS event",
"done": false,
"scope": "Detail page subscribes to issue:updated. When event.issue.id matches current issue, merges update into local state (unless user is actively editing that field)."
},
{
"task": "Issue detail: acceptance criteria and context refs always addable",
"done": false,
"scope": "Show 'Add acceptance criteria' and 'Add context reference' buttons even when arrays are empty. Click reveals input field."
},
{
"task": "Comment: optimistic create",
"done": false,
"scope": "On submit, immediately append comment with temp ID and muted styling. On API success, replace temp with real. On API error, remove temp and show toast."
},
{
"task": "Comment: backend author-only edit/delete",
"done": false,
"scope": "UpdateComment and DeleteComment in comment.go load comment first, verify author_id matches request user OR user is workspace admin. Return 403 otherwise."
},
{
"task": "Comment: hover timestamp shows full date tooltip",
"done": false,
"scope": "Relative time ('2h ago') wrapped in shadcn Tooltip showing full ISO-formatted date on hover."
},
{
"task": "Issue delete: cancel running tasks first",
"done": false,
"scope": "DeleteIssue handler calls TaskService.CancelTasksForIssue before deleting issue to prevent FK constraint errors."
},
{
"task": "UI: consistent loading/empty/error states across issues pages",
"done": false,
"scope": "Board empty columns show 'No issues' muted text. List empty groups hidden. Initial load uses Skeleton. Error shows toast + retry. Filter with no results shows 'No matching issues' with clear filter link."
}
]
}

View file

@ -0,0 +1,59 @@
{
"id": "workspace-permissions",
"name": "Workspace & Permissions",
"status": "designing",
"createdAt": "2026-03-25",
"completedAt": null,
"description": "Complete workspace management with proper permission enforcement, member invitation flow, and consistent settings UI using shadcn components.",
"currentState": "Workspace CRUD works. Member add requires pre-existing user (no invite flow). DeleteAgent has no workspace check. Comment edit/delete has no author check. Settings page uses raw textarea for context field. Workspace switch doesn't refetch all data.",
"decisions": [
"Auth stays simple: email-only login, auto-create user, 72h JWT, no refresh token for MVP",
"Member invite: if user doesn't exist, backend auto-creates user record with email-only, they become member immediately",
"3 roles (owner/admin/member) sufficient for MVP, no custom permissions table",
"Owner: full control. Admin: manage members + agents + settings. Member: CRUD issues + comments.",
"All permission checks centralized in handler helpers, enforced at API level",
"Workspace switch triggers full data refresh: disconnect WS, clear stores, reconnect with new workspace_id, refetch all"
],
"tasks": [
{
"task": "Backend: Fix DeleteAgent workspace + role check",
"done": false,
"scope": "DeleteAgent calls loadAgentForUser (workspace membership check) before deletion. Also calls CancelAgentTasksByIssue for all agent's assigned issues. Requires owner or admin role."
},
{
"task": "Backend: Fix ListAgentTasks workspace check",
"done": false,
"scope": "ListAgentTasks verifies agent belongs to user's workspace via loadAgentForUser before returning tasks."
},
{
"task": "Backend: Member invite auto-creates user if not found",
"done": false,
"scope": "CreateMember handler: if GetUserByEmail returns not found, call CreateUser(email, '') to create stub user, then proceed to add as member. Return 201 with member data."
},
{
"task": "Backend: Agent visibility filtering",
"done": false,
"scope": "ListAgents filters private agents: only visible to agent owner_id or workspace owner/admin. Other members only see workspace-visible agents."
},
{
"task": "Frontend: Settings page use shadcn components consistently",
"done": false,
"scope": "Replace raw textarea with shadcn Textarea for context field. All inputs use shadcn Input. Form validation: workspace name required, show inline errors. All buttons use shadcn Button with loading state."
},
{
"task": "Frontend: Workspace switcher error handling and feedback",
"done": false,
"scope": "Create workspace shows error toast on failure (including slug collision). Workspace list sorted alphabetically. Current workspace highlighted with check icon."
},
{
"task": "Frontend: Workspace switch triggers full data refresh",
"done": false,
"scope": "switchWorkspace action: disconnects WS, clears issue/inbox/agent stores, sets new workspace_id on API client, reconnects WS with new workspace, refetches all data."
},
{
"task": "Frontend: Member management UX improvements",
"done": false,
"scope": "Add member shows success/error toast. Email validation before submit. 'Already a member' error shown inline. Remove member confirmation uses AlertDialog. All operations show loading state on button."
}
]
}

View file

@ -1,6 +1,7 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import { useState, useEffect, useMemo } from "react";
import { useInboxStore } from "@multica/store";
import {
AlertCircle,
Bot,
@ -10,10 +11,9 @@ import {
MessageSquare,
ArrowRightLeft,
} from "lucide-react";
import type { InboxItem, InboxItemType, InboxSeverity, InboxNewPayload } from "@multica/types";
import type { InboxItem, InboxItemType, InboxSeverity } from "@multica/types";
import { Button } from "@/components/ui/button";
import { api } from "@/shared/api";
import { useWSEvent } from "@/features/realtime";
// ---------------------------------------------------------------------------
// Helpers
@ -151,43 +151,39 @@ function InboxDetail({
// ---------------------------------------------------------------------------
export default function InboxPage() {
const [items, setItems] = useState<InboxItem[]>([]);
const [selectedId, setSelectedId] = useState<string>("");
const [loading, setLoading] = useState(true);
// Read from global store (updated by useRealtimeSync)
const storeItems = useInboxStore((s) => s.items);
// Sort: severity first, then newest first
const items = useMemo(() => {
return [...storeItems]
.filter((i) => !i.archived)
.sort(
(a, b) =>
severityOrder[a.severity] - severityOrder[b.severity] ||
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
}, [storeItems]);
// Initial fetch → populate store
useEffect(() => {
api
.listInbox()
.then((data) => {
const sorted = [...data].sort(
(a, b) =>
severityOrder[a.severity] - severityOrder[b.severity] ||
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
setItems(sorted);
if (sorted.length > 0) setSelectedId(sorted[0]!.id);
useInboxStore.getState().setItems(data);
if (data.length > 0) setSelectedId(data[0]!.id);
})
.catch(console.error)
.finally(() => setLoading(false));
}, []);
useWSEvent(
"inbox:new",
useCallback((payload: unknown) => {
const { item } = payload as InboxNewPayload;
setItems((prev) => {
if (prev.some((i) => i.id === item.id)) return prev;
return [item, ...prev];
});
}, []),
);
const handleMarkRead = async (id: string) => {
try {
await api.markInboxRead(id);
setItems((prev) =>
prev.map((i) => (i.id === id ? { ...i, read: true } : i))
);
useInboxStore.getState().markRead(id);
} catch (err) {
console.error("Failed to mark read:", err);
}

View file

@ -1,6 +1,7 @@
"use client";
import { useState, useCallback, useEffect } from "react";
import { useState, useCallback, useEffect, useMemo } from "react";
import { useIssueStore } from "@multica/store";
import Link from "next/link";
import {
Columns3,
@ -45,8 +46,6 @@ import { ActorAvatar } from "@/components/common/actor-avatar";
import { StatusIcon, PriorityIcon } from "@/features/issues/components";
import { api } from "@/shared/api";
import { useActorName } from "@/features/workspace";
import { useWSEvent } from "@/features/realtime";
import type { IssueCreatedPayload, IssueUpdatedPayload, IssueDeletedPayload } from "@multica/types";
function formatDate(date: string): string {
return new Date(date).toLocaleDateString("en-US", {
@ -446,78 +445,53 @@ type ViewMode = "board" | "list";
export default function IssuesPage() {
const [view, setView] = useState<ViewMode>("board");
const [issues, setIssues] = useState<Issue[]>([]);
const [loading, setLoading] = useState(true);
const [filterStatus, setFilterStatus] = useState<IssueStatus | "">("");
const [filterPriority, setFilterPriority] = useState<IssuePriority | "">("");
// Read from global store (updated by useRealtimeSync)
const allIssues = useIssueStore((s) => s.issues);
// Apply local filters
const issues = useMemo(() => {
return allIssues.filter((issue) => {
if (filterStatus && issue.status !== filterStatus) return false;
if (filterPriority && issue.priority !== filterPriority) return false;
return true;
});
}, [allIssues, filterStatus, filterPriority]);
// Initial fetch → populate store
useEffect(() => {
setLoading(true);
api
.listIssues({
limit: 200,
...(filterStatus ? { status: filterStatus } : {}),
...(filterPriority ? { priority: filterPriority } : {}),
})
.listIssues({ limit: 200 })
.then((res) => {
setIssues(res.issues);
useIssueStore.getState().setIssues(res.issues);
})
.catch(console.error)
.finally(() => setLoading(false));
}, [filterStatus, filterPriority]);
// Real-time updates
useWSEvent(
"issue:created",
useCallback((payload: unknown) => {
const { issue } = payload as IssueCreatedPayload;
setIssues((prev) => {
if (prev.some((i) => i.id === issue.id)) return prev;
return [...prev, issue];
});
}, []),
);
useWSEvent(
"issue:updated",
useCallback((payload: unknown) => {
const { issue } = payload as IssueUpdatedPayload;
setIssues((prev) => prev.map((i) => (i.id === issue.id ? issue : i)));
}, []),
);
useWSEvent(
"issue:deleted",
useCallback((payload: unknown) => {
const { issue_id } = payload as IssueDeletedPayload;
setIssues((prev) => prev.filter((i) => i.id !== issue_id));
}, []),
);
}, []);
const handleMoveIssue = useCallback(
(issueId: string, newStatus: IssueStatus) => {
// Optimistic update
setIssues((prev) =>
prev.map((issue) =>
issue.id === issueId ? { ...issue, status: newStatus } : issue
)
);
// Optimistic update in store
useIssueStore.getState().updateIssue(issueId, { status: newStatus });
// Persist to API
api.updateIssue(issueId, { status: newStatus }).catch((err) => {
console.error("Failed to update issue:", err);
// Revert on error
api.listIssues({ limit: 200 }).then((res) => setIssues(res.issues));
// Revert on error by refetching
api.listIssues({ limit: 200 }).then((res) => {
useIssueStore.getState().setIssues(res.issues);
});
});
},
[]
);
const handleIssueCreated = useCallback((issue: Issue) => {
setIssues((prev) => {
if (prev.some((i) => i.id === issue.id)) return prev;
return [...prev, issue];
});
useIssueStore.getState().addIssue(issue);
}, []);
if (loading) {

View file

@ -11,6 +11,8 @@ import {
import { WSClient } from "@multica/sdk";
import type { WSEventType } from "@multica/types";
import { useAuthStore } from "@/features/auth";
import { useWorkspaceStore } from "@/features/workspace";
import { useRealtimeSync } from "./use-realtime-sync";
const WS_URL = process.env.NEXT_PUBLIC_WS_URL ?? "ws://localhost:8080/ws";
@ -24,12 +26,17 @@ const WSContext = createContext<WSContextValue | null>(null);
export function WSProvider({ children }: { children: ReactNode }) {
const user = useAuthStore((s) => s.user);
const workspace = useWorkspaceStore((s) => s.workspace);
const wsRef = useRef<WSClient | null>(null);
useEffect(() => {
if (!user) return;
if (!user || !workspace) return;
const token = localStorage.getItem("multica_token");
if (!token) return;
const ws = new WSClient(WS_URL);
ws.setAuth(token, workspace.id);
wsRef.current = ws;
ws.connect();
@ -37,7 +44,10 @@ export function WSProvider({ children }: { children: ReactNode }) {
ws.disconnect();
wsRef.current = null;
};
}, [user]);
}, [user, workspace]);
// Centralized WS → store sync
useRealtimeSync(wsRef.current);
const subscribe = useCallback(
(event: WSEventType, handler: EventHandler) => {

View file

@ -0,0 +1,115 @@
"use client";
import { useEffect } from "react";
import type { WSClient } from "@multica/sdk";
import { useIssueStore, useInboxStore, useAgentStore } from "@multica/store";
import { useWorkspaceStore } from "@/features/workspace";
import { api } from "@/shared/api";
import type {
IssueCreatedPayload,
IssueUpdatedPayload,
IssueDeletedPayload,
AgentStatusPayload,
AgentCreatedPayload,
InboxNewPayload,
InboxReadPayload,
InboxArchivedPayload,
} from "@multica/types";
/**
* Centralized WS store sync. Called once from WSProvider.
* Subscribes to all global WS events and dispatches to Zustand stores.
* Comment events are NOT handled here they stay per-page on issue detail.
*/
export function useRealtimeSync(ws: WSClient | null) {
// Issue events → useIssueStore
useEffect(() => {
if (!ws) return;
const unsubs = [
ws.on("issue:created", (p) => {
const { issue } = p as IssueCreatedPayload;
useIssueStore.getState().addIssue(issue);
}),
ws.on("issue:updated", (p) => {
const { issue } = p as IssueUpdatedPayload;
useIssueStore.getState().updateIssue(issue.id, issue);
}),
ws.on("issue:deleted", (p) => {
const { issue_id } = p as IssueDeletedPayload;
useIssueStore.getState().removeIssue(issue_id);
}),
];
return () => unsubs.forEach((u) => u());
}, [ws]);
// Inbox events → useInboxStore
useEffect(() => {
if (!ws) return;
const unsubs = [
ws.on("inbox:new", (p) => {
const { item } = p as InboxNewPayload;
useInboxStore.getState().addItem(item);
}),
ws.on("inbox:read", (p) => {
const { item_id } = p as InboxReadPayload;
useInboxStore.getState().markRead(item_id);
}),
ws.on("inbox:archived", (p) => {
const { item_id } = p as InboxArchivedPayload;
useInboxStore.getState().archive(item_id);
}),
];
return () => unsubs.forEach((u) => u());
}, [ws]);
// Agent events → useAgentStore / workspace refresh
useEffect(() => {
if (!ws) return;
const unsubs = [
ws.on("agent:status", (p) => {
const { agent } = p as AgentStatusPayload;
useAgentStore.getState().updateAgent(agent.id, agent);
}),
ws.on("agent:created", (p) => {
const { agent } = p as AgentCreatedPayload;
const agents = useAgentStore.getState().agents;
if (!agents.find((a) => a.id === agent.id)) {
useAgentStore.getState().setAgents([...agents, agent]);
}
}),
ws.on("agent:deleted", () => {
// Refresh agents list since we don't have removeAgent in store
useWorkspaceStore.getState().refreshAgents();
}),
];
return () => unsubs.forEach((u) => u());
}, [ws]);
// Reconnect → refetch all data to recover missed events
useEffect(() => {
if (!ws) return;
const unsub = ws.onReconnect(async () => {
try {
const [issuesRes, inboxItems, agents] = await Promise.all([
api.listIssues({ limit: 200 }),
api.listInbox(),
api.listAgents(),
]);
useIssueStore.getState().setIssues(issuesRes.issues);
useInboxStore.getState().setItems(inboxItems);
useAgentStore.getState().setAgents(agents);
} catch {
// Silently fail; next reconnect will retry
}
});
return unsub;
}, [ws]);
}

View file

@ -1,4 +1,3 @@
export { useIssues } from "./use-issues.js";
export { useAgents } from "./use-agents.js";
export { useInbox } from "./use-inbox.js";
export { useRealtime } from "./use-realtime.js";

View file

@ -1,37 +0,0 @@
import { useEffect } from "react";
import type { WSClient } from "@multica/sdk";
import { useIssueStore } from "@multica/store";
import { useInboxStore } from "@multica/store";
import type { IssueCreatedPayload, IssueUpdatedPayload, IssueDeletedPayload, InboxNewPayload } from "@multica/types";
export function useRealtime(ws: WSClient) {
const { addIssue, updateIssue, removeIssue } = useIssueStore();
const { addItem } = useInboxStore();
useEffect(() => {
const unsubscribers = [
ws.on("issue:created", (payload) => {
const { issue } = payload as IssueCreatedPayload;
addIssue(issue);
}),
ws.on("issue:updated", (payload) => {
const { issue } = payload as IssueUpdatedPayload;
updateIssue(issue.id, issue);
}),
ws.on("issue:deleted", (payload) => {
const { issue_id } = payload as IssueDeletedPayload;
removeIssue(issue_id);
}),
ws.on("inbox:new", (payload) => {
const { item } = payload as InboxNewPayload;
addItem(item);
}),
];
return () => {
for (const unsub of unsubscribers) {
unsub();
}
};
}, [ws, addIssue, updateIssue, removeIssue, addItem]);
}

View file

@ -4,19 +4,43 @@ type EventHandler = (payload: unknown) => void;
export class WSClient {
private ws: WebSocket | null = null;
private url: string;
private baseUrl: string;
private token: string | null = null;
private workspaceId: string | null = null;
private handlers = new Map<WSEventType, Set<EventHandler>>();
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private hasConnectedBefore = false;
private onReconnectCallbacks = new Set<() => void>();
constructor(url: string) {
this.url = url;
this.baseUrl = url;
}
setAuth(token: string, workspaceId: string) {
this.token = token;
this.workspaceId = workspaceId;
}
connect() {
this.ws = new WebSocket(this.url);
const url = new URL(this.baseUrl);
if (this.token) url.searchParams.set("token", this.token);
if (this.workspaceId)
url.searchParams.set("workspace_id", this.workspaceId);
this.ws = new WebSocket(url.toString());
this.ws.onopen = () => {
console.log("[ws] connected");
if (this.hasConnectedBefore) {
for (const cb of this.onReconnectCallbacks) {
try {
cb();
} catch {
// ignore reconnect callback errors
}
}
}
this.hasConnectedBefore = true;
};
this.ws.onmessage = (event) => {
@ -42,9 +66,11 @@ export class WSClient {
disconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.ws?.close();
this.ws = null;
this.hasConnectedBefore = false;
}
on(event: WSEventType, handler: EventHandler) {
@ -57,6 +83,13 @@ export class WSClient {
};
}
onReconnect(callback: () => void) {
this.onReconnectCallbacks.add(callback);
return () => {
this.onReconnectCallbacks.delete(callback);
};
}
send(message: WSMessage) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));

View file

@ -13,7 +13,12 @@ interface InboxState {
export const useInboxStore = create<InboxState>((set, get) => ({
items: [],
setItems: (items) => set({ items }),
addItem: (item) => set((s) => ({ items: [item, ...s.items] })),
addItem: (item) =>
set((s) => ({
items: s.items.some((i) => i.id === item.id)
? s.items
: [item, ...s.items],
})),
markRead: (id) =>
set((s) => ({
items: s.items.map((i) => (i.id === id ? { ...i, read: true } : i)),

View file

@ -15,7 +15,12 @@ export const useIssueStore = create<IssueState>((set) => ({
issues: [],
activeIssueId: null,
setIssues: (issues) => set({ issues }),
addIssue: (issue) => set((s) => ({ issues: [...s.issues, issue] })),
addIssue: (issue) =>
set((s) => ({
issues: s.issues.some((i) => i.id === issue.id)
? s.issues
: [...s.issues, issue],
})),
updateIssue: (id, updates) =>
set((s) => ({
issues: s.issues.map((i) => (i.id === id ? { ...i, ...updates } : i)),

View file

@ -2,6 +2,7 @@ import type { Issue } from "./issue.js";
import type { Agent } from "./agent.js";
import type { InboxItem } from "./inbox.js";
import type { Comment } from "./comment.js";
import type { Workspace, MemberWithUser } from "./workspace.js";
// WebSocket event types (matching Go server protocol/events.go)
export type WSEventType =
@ -12,11 +13,18 @@ export type WSEventType =
| "comment:updated"
| "comment:deleted"
| "agent:status"
| "agent:created"
| "agent:deleted"
| "task:dispatch"
| "task:progress"
| "task:completed"
| "task:failed"
| "inbox:new"
| "inbox:read"
| "inbox:archived"
| "workspace:updated"
| "member:added"
| "member:removed"
| "daemon:heartbeat"
| "daemon:register";
@ -41,10 +49,29 @@ export interface AgentStatusPayload {
agent: Agent;
}
export interface AgentCreatedPayload {
agent: Agent;
}
export interface AgentDeletedPayload {
agent_id: string;
workspace_id: string;
}
export interface InboxNewPayload {
item: InboxItem;
}
export interface InboxReadPayload {
item_id: string;
recipient_id: string;
}
export interface InboxArchivedPayload {
item_id: string;
recipient_id: string;
}
export interface CommentCreatedPayload {
comment: Comment;
}
@ -57,3 +84,18 @@ export interface CommentDeletedPayload {
comment_id: string;
issue_id: string;
}
export interface WorkspaceUpdatedPayload {
workspace: Workspace;
}
export interface MemberAddedPayload {
member: MemberWithUser;
workspace_id: string;
}
export interface MemberRemovedPayload {
member_id: string;
user_id: string;
workspace_id: string;
}

View file

@ -0,0 +1,55 @@
package main
import (
"encoding/json"
"log"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// registerListeners wires up event bus listeners for WS broadcasting.
func registerListeners(bus *events.Bus, hub *realtime.Hub) {
allEvents := []string{
protocol.EventIssueCreated,
protocol.EventIssueUpdated,
protocol.EventIssueDeleted,
protocol.EventCommentCreated,
protocol.EventCommentUpdated,
protocol.EventCommentDeleted,
protocol.EventAgentStatus,
protocol.EventAgentCreated,
protocol.EventAgentDeleted,
protocol.EventTaskDispatch,
protocol.EventTaskProgress,
protocol.EventTaskCompleted,
protocol.EventTaskFailed,
protocol.EventInboxNew,
protocol.EventInboxRead,
protocol.EventInboxArchived,
protocol.EventWorkspaceUpdated,
protocol.EventMemberAdded,
protocol.EventMemberRemoved,
}
for _, et := range allEvents {
eventType := et
bus.Subscribe(eventType, func(e events.Event) {
msg := map[string]any{
"type": eventType,
"payload": e.Payload,
}
data, err := json.Marshal(msg)
if err != nil {
log.Printf("[listeners] failed to marshal %s event: %v", eventType, err)
return
}
if e.WorkspaceID != "" {
hub.BroadcastToWorkspace(e.WorkspaceID, data)
} else {
hub.Broadcast(data)
}
})
}
}

View file

@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
)
@ -38,10 +39,12 @@ func main() {
}
log.Println("Connected to database")
bus := events.New()
hub := realtime.NewHub()
go hub.Run()
registerListeners(bus, hub)
r := NewRouter(pool, hub)
r := NewRouter(pool, hub, bus)
srv := &http.Server{
Addr: ":" + port,

View file

@ -1,6 +1,7 @@
package main
import (
"context"
"net/http"
"os"
"strings"
@ -8,8 +9,10 @@ import (
"github.com/go-chi/chi/v5"
chimw "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/middleware"
"github.com/multica-ai/multica/server/internal/realtime"
@ -40,9 +43,9 @@ func allowedOrigins() []string {
}
// NewRouter creates the fully-configured Chi router with all middleware and routes.
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router {
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Router {
queries := db.New(pool)
h := handler.New(queries, pool, hub)
h := handler.New(queries, pool, hub, bus)
r := chi.NewRouter()
@ -65,8 +68,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router {
})
// WebSocket
mc := &membershipChecker{queries: queries}
r.Get("/ws", func(w http.ResponseWriter, r *http.Request) {
realtime.HandleWebSocket(hub, w, r)
realtime.HandleWebSocket(hub, mc, w, r)
})
// Auth (public)
@ -164,3 +168,24 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub) chi.Router {
return r
}
// membershipChecker implements realtime.MembershipChecker using database queries.
type membershipChecker struct {
queries *db.Queries
}
func (mc *membershipChecker) IsMember(ctx context.Context, userID, workspaceID string) bool {
_, err := mc.queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: parseUUID(userID),
WorkspaceID: parseUUID(workspaceID),
})
return err == nil
}
func parseUUID(s string) pgtype.UUID {
var u pgtype.UUID
if err := u.Scan(s); err != nil {
return pgtype.UUID{}
}
return u
}

View file

@ -0,0 +1,59 @@
package events
import (
"log"
"sync"
)
// Event represents a domain event published by handlers or services.
type Event struct {
Type string // e.g. "issue:created", "inbox:new"
WorkspaceID string // routes to correct Hub room
ActorType string // "member", "agent", or "system"
ActorID string
Payload any // JSON-serializable, same shape as current WS payloads
}
// Handler is a function that processes an event.
type Handler func(Event)
// Bus is an in-process synchronous pub/sub event bus.
type Bus struct {
mu sync.RWMutex
listeners map[string][]Handler
}
// New creates a new event bus.
func New() *Bus {
return &Bus{
listeners: make(map[string][]Handler),
}
}
// Subscribe registers a handler for a given event type.
// Handlers are called synchronously in registration order.
func (b *Bus) Subscribe(eventType string, h Handler) {
b.mu.Lock()
defer b.mu.Unlock()
b.listeners[eventType] = append(b.listeners[eventType], h)
}
// Publish dispatches an event to all registered handlers for that event type.
// Each handler is called synchronously. Panics in individual handlers are
// recovered so one failing handler does not prevent others from executing.
func (b *Bus) Publish(e Event) {
b.mu.RLock()
handlers := b.listeners[e.Type]
b.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[event-bus] panic in listener for %q: %v", e.Type, r)
}
}()
h(e)
}()
}
}

View file

@ -0,0 +1,90 @@
package events
import (
"sync/atomic"
"testing"
)
func TestPublishDeliversToSubscribers(t *testing.T) {
bus := New()
var count int32
bus.Subscribe("test:event", func(e Event) {
atomic.AddInt32(&count, 1)
})
bus.Subscribe("test:event", func(e Event) {
atomic.AddInt32(&count, 1)
})
bus.Publish(Event{Type: "test:event", Payload: "hello"})
if count != 2 {
t.Errorf("expected 2 handlers called, got %d", count)
}
}
func TestPublishOnlyMatchingType(t *testing.T) {
bus := New()
var called bool
bus.Subscribe("type:a", func(e Event) {
called = true
})
bus.Publish(Event{Type: "type:b"})
if called {
t.Error("handler for type:a should not be called for type:b event")
}
}
func TestPublishNoSubscribersIsNoop(t *testing.T) {
bus := New()
// Should not panic
bus.Publish(Event{Type: "no:listeners"})
}
func TestPanicInHandlerDoesNotBreakOthers(t *testing.T) {
bus := New()
var secondCalled bool
bus.Subscribe("test:panic", func(e Event) {
panic("handler panic")
})
bus.Subscribe("test:panic", func(e Event) {
secondCalled = true
})
bus.Publish(Event{Type: "test:panic"})
if !secondCalled {
t.Error("second handler should still be called after first panics")
}
}
func TestEventFieldsPassedThrough(t *testing.T) {
bus := New()
var received Event
bus.Subscribe("test:fields", func(e Event) {
received = e
})
bus.Publish(Event{
Type: "test:fields",
WorkspaceID: "ws-123",
ActorType: "member",
ActorID: "user-456",
Payload: map[string]string{"key": "value"},
})
if received.WorkspaceID != "ws-123" {
t.Errorf("expected WorkspaceID ws-123, got %s", received.WorkspaceID)
}
if received.ActorType != "member" {
t.Errorf("expected ActorType member, got %s", received.ActorType)
}
if received.ActorID != "user-456" {
t.Errorf("expected ActorID user-456, got %s", received.ActorID)
}
}

View file

@ -10,6 +10,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type AgentResponse struct {
@ -246,7 +247,9 @@ func (h *Handler) CreateAgent(w http.ResponseWriter, r *http.Request) {
// Best-effort: create an initialization issue assigned to the new agent.
h.createAgentInitIssue(r.Context(), agent, parseUUID(ownerID))
writeJSON(w, http.StatusCreated, agentToResponse(agent))
resp := agentToResponse(agent)
h.publish(protocol.EventAgentCreated, workspaceID, "member", ownerID, map[string]any{"agent": resp})
writeJSON(w, http.StatusCreated, resp)
}
// createAgentInitIssue creates an initialization issue assigned to a newly created agent.
@ -283,7 +286,7 @@ func (h *Handler) createAgentInitIssue(ctx context.Context, agent db.Agent, crea
return
}
h.broadcast("issue:created", map[string]any{"issue": issueToResponse(issue)})
h.publish(protocol.EventIssueCreated, uuidToString(agent.WorkspaceID), "system", "", map[string]any{"issue": issueToResponse(issue)})
// Enqueue the task directly — we know the agent is assigned and status is "todo".
if _, err := h.TaskService.EnqueueTaskForIssue(ctx, issue); err != nil {
@ -377,17 +380,27 @@ func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) {
}
resp := agentToResponse(agent)
h.broadcast("agent:status", map[string]any{"agent": resp})
userID := requestUserID(r)
h.publish(protocol.EventAgentStatus, uuidToString(agent.WorkspaceID), "member", userID, map[string]any{"agent": resp})
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) DeleteAgent(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
agent, ok := h.loadAgentForUser(w, r, id)
if !ok {
return
}
wsID := uuidToString(agent.WorkspaceID)
err := h.Queries.DeleteAgent(r.Context(), parseUUID(id))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete agent")
return
}
userID := requestUserID(r)
h.publish(protocol.EventAgentDeleted, wsID, "member", userID, map[string]any{"agent_id": id, "workspace_id": wsID})
w.WriteHeader(http.StatusNoContent)
}

View file

@ -6,6 +6,7 @@ import (
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type CommentResponse struct {
@ -97,7 +98,7 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
}
resp := commentToResponse(comment)
h.broadcast("comment:created", map[string]any{"comment": resp})
h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{"comment": resp})
writeJSON(w, http.StatusCreated, resp)
}
@ -126,7 +127,12 @@ func (h *Handler) UpdateComment(w http.ResponseWriter, r *http.Request) {
}
resp := commentToResponse(comment)
h.broadcast("comment:updated", map[string]any{"comment": resp})
userID := requestUserID(r)
workspaceID := ""
if issue, err := h.Queries.GetIssue(r.Context(), comment.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
h.publish(protocol.EventCommentUpdated, workspaceID, "member", userID, map[string]any{"comment": resp})
writeJSON(w, http.StatusOK, resp)
}
@ -145,7 +151,12 @@ func (h *Handler) DeleteComment(w http.ResponseWriter, r *http.Request) {
return
}
h.broadcast("comment:deleted", map[string]any{
userID := requestUserID(r)
workspaceID := ""
if issue, err := h.Queries.GetIssue(r.Context(), comment.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
h.publish(protocol.EventCommentDeleted, workspaceID, "member", userID, map[string]any{
"comment_id": commentId,
"issue_id": uuidToString(comment.IssueID),
})

View file

@ -195,7 +195,16 @@ func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) {
return
}
h.TaskService.ReportProgress(taskID, req.Summary, req.Step, req.Total)
// Look up task to get workspace ID via the associated issue.
workspaceID := ""
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err == nil {
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = uuidToString(issue.WorkspaceID)
}
}
h.TaskService.ReportProgress(r.Context(), taskID, workspaceID, req.Summary, req.Step, req.Total)
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}

View file

@ -4,13 +4,13 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/util"
@ -31,10 +31,11 @@ type Handler struct {
DB dbExecutor
TxStarter txStarter
Hub *realtime.Hub
Bus *events.Bus
TaskService *service.TaskService
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub) *Handler {
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus) *Handler {
var executor dbExecutor
if candidate, ok := txStarter.(dbExecutor); ok {
executor = candidate
@ -45,7 +46,8 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub) *Handler {
DB: executor,
TxStarter: txStarter,
Hub: hub,
TaskService: service.NewTaskService(queries, hub),
Bus: bus,
TaskService: service.NewTaskService(queries, hub, bus),
}
}
@ -69,18 +71,15 @@ func timestampToString(t pgtype.Timestamptz) string { return util.TimestampToStr
func timestampToPtr(t pgtype.Timestamptz) *string { return util.TimestampToPtr(t) }
func uuidToPtr(u pgtype.UUID) *string { return util.UUIDToPtr(u) }
// broadcast sends a WebSocket event to all connected clients.
func (h *Handler) broadcast(eventType string, payload any) {
msg := map[string]any{
"type": eventType,
"payload": payload,
}
data, err := json.Marshal(msg)
if err != nil {
fmt.Printf("broadcast marshal error: %v\n", err)
return
}
h.Hub.Broadcast(data)
// publish sends a domain event through the event bus.
func (h *Handler) publish(eventType, workspaceID, actorType, actorID string, payload any) {
h.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
Payload: payload,
})
}
func isNotFound(err error) bool {

View file

@ -6,6 +6,7 @@ import (
"github.com/go-chi/chi/v5"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type InboxItemResponse struct {
@ -88,6 +89,14 @@ func (h *Handler) MarkInboxRead(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "failed to mark read")
return
}
userID := requestUserID(r)
workspaceID := uuidToString(item.WorkspaceID)
h.publish(protocol.EventInboxRead, workspaceID, "member", userID, map[string]any{
"item_id": uuidToString(item.ID),
"recipient_id": uuidToString(item.RecipientID),
})
writeJSON(w, http.StatusOK, inboxToResponse(item))
}
@ -101,5 +110,13 @@ func (h *Handler) ArchiveInboxItem(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "failed to archive")
return
}
userID := requestUserID(r)
workspaceID := uuidToString(item.WorkspaceID)
h.publish(protocol.EventInboxArchived, workspaceID, "member", userID, map[string]any{
"item_id": uuidToString(item.ID),
"recipient_id": uuidToString(item.RecipientID),
})
writeJSON(w, http.StatusOK, inboxToResponse(item))
}

View file

@ -11,6 +11,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// IssueResponse is the JSON response for an issue.
@ -233,7 +234,7 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) {
}
resp := issueToResponse(issue)
h.broadcast("issue:created", map[string]any{"issue": resp})
h.publish(protocol.EventIssueCreated, workspaceID, "member", creatorID, map[string]any{"issue": resp})
// Create inbox notification for assignee
if issue.AssigneeType.Valid && issue.AssigneeID.Valid {
@ -248,7 +249,7 @@ func (h *Handler) CreateIssue(w http.ResponseWriter, r *http.Request) {
Body: ptrToText(req.Description),
})
if err == nil {
h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)})
h.publish(protocol.EventInboxNew, workspaceID, "member", creatorID, map[string]any{"item": inboxToResponse(inboxItem)})
}
// Only ready issues in todo are enqueued for agents.
@ -279,6 +280,8 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
userID := requestUserID(r)
workspaceID := uuidToString(prevIssue.WorkspaceID)
// Read body as raw bytes so we can detect which fields were explicitly sent.
bodyBytes, err := io.ReadAll(r.Body)
@ -365,7 +368,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
}
resp := issueToResponse(issue)
h.broadcast("issue:updated", map[string]any{"issue": resp})
h.publish(protocol.EventIssueUpdated, workspaceID, "member", userID, map[string]any{"issue": resp})
assigneeChanged := (req.AssigneeType != nil || req.AssigneeID != nil) &&
(prevIssue.AssigneeType.String != issue.AssigneeType.String || uuidToString(prevIssue.AssigneeID) != uuidToString(issue.AssigneeID))
@ -394,7 +397,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
Title: "Assigned to you: " + issue.Title,
})
if err == nil {
h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)})
h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)})
}
}
}
@ -412,7 +415,7 @@ func (h *Handler) UpdateIssue(w http.ResponseWriter, r *http.Request) {
Title: issue.Title + " moved to " + *req.Status,
})
if err == nil {
h.broadcast("inbox:new", map[string]any{"item": inboxToResponse(inboxItem)})
h.publish(protocol.EventInboxNew, workspaceID, "member", userID, map[string]any{"item": inboxToResponse(inboxItem)})
}
}
}
@ -450,7 +453,8 @@ func (h *Handler) shouldEnqueueAgentTask(ctx context.Context, issue db.Issue) bo
func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if _, ok := h.loadIssueForUser(w, r, id); !ok {
issue, ok := h.loadIssueForUser(w, r, id)
if !ok {
return
}
@ -460,6 +464,7 @@ func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) {
return
}
h.broadcast("issue:deleted", map[string]any{"issue_id": id})
userID := requestUserID(r)
h.publish(protocol.EventIssueDeleted, uuidToString(issue.WorkspaceID), "member", userID, map[string]any{"issue_id": id})
w.WriteHeader(http.StatusNoContent)
}

View file

@ -8,6 +8,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type WorkspaceResponse struct {
@ -207,6 +208,9 @@ func (h *Handler) UpdateWorkspace(w http.ResponseWriter, r *http.Request) {
return
}
userID := requestUserID(r)
h.publish(protocol.EventWorkspaceUpdated, id, "member", userID, map[string]any{"workspace": workspaceToResponse(ws)})
writeJSON(w, http.StatusOK, workspaceToResponse(ws))
}
@ -355,6 +359,9 @@ func (h *Handler) CreateMember(w http.ResponseWriter, r *http.Request) {
return
}
userID := requestUserID(r)
h.publish(protocol.EventMemberAdded, workspaceID, "member", userID, map[string]any{"member": memberWithUserResponse(member, user)})
writeJSON(w, http.StatusCreated, memberWithUserResponse(member, user))
}
@ -463,6 +470,13 @@ func (h *Handler) DeleteMember(w http.ResponseWriter, r *http.Request) {
return
}
userID := requestUserID(r)
h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{
"member_id": uuidToString(target.ID),
"workspace_id": workspaceID,
"user_id": uuidToString(target.UserID),
})
w.WriteHeader(http.StatusNoContent)
}
@ -490,6 +504,13 @@ func (h *Handler) LeaveWorkspace(w http.ResponseWriter, r *http.Request) {
return
}
userID := requestUserID(r)
h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{
"member_id": uuidToString(member.ID),
"workspace_id": workspaceID,
"user_id": uuidToString(member.UserID),
})
w.WriteHeader(http.StatusNoContent)
}

View file

@ -1,13 +1,22 @@
package realtime
import (
"context"
"log"
"net/http"
"strings"
"sync"
"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/websocket"
"github.com/multica-ai/multica/server/internal/auth"
)
// MembershipChecker verifies a user belongs to a workspace.
type MembershipChecker interface {
IsMember(ctx context.Context, userID, workspaceID string) bool
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// TODO: Restrict origins in production
@ -15,17 +24,19 @@ var upgrader = websocket.Upgrader{
},
}
// Client represents a single WebSocket connection.
// Client represents a single WebSocket connection with identity.
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
hub *Hub
conn *websocket.Conn
send chan []byte
userID string
workspaceID string
}
// Hub manages WebSocket connections and broadcasts.
// Hub manages WebSocket connections organized by workspace rooms.
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
rooms map[string]map[*Client]bool // workspaceID -> clients
broadcast chan []byte // global broadcast (daemon events)
register chan *Client
unregister chan *Client
mu sync.RWMutex
@ -34,7 +45,7 @@ type Hub struct {
// NewHub creates a new Hub instance.
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
rooms: make(map[string]map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
@ -47,27 +58,48 @@ func (h *Hub) Run() {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
room := client.workspaceID
if h.rooms[room] == nil {
h.rooms[room] = make(map[*Client]bool)
}
h.rooms[room][client] = true
total := 0
for _, r := range h.rooms {
total += len(r)
}
h.mu.Unlock()
log.Printf("Client connected. Total: %d", len(h.clients))
log.Printf("Client connected (workspace=%s). Total: %d", room, total)
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
room := client.workspaceID
if clients, ok := h.rooms[room]; ok {
if _, exists := clients[client]; exists {
delete(clients, client)
close(client.send)
if len(clients) == 0 {
delete(h.rooms, room)
}
}
}
total := 0
for _, r := range h.rooms {
total += len(r)
}
h.mu.Unlock()
log.Printf("Client disconnected. Total: %d", len(h.clients))
log.Printf("Client disconnected (workspace=%s). Total: %d", room, total)
case message := <-h.broadcast:
// Global broadcast for daemon events (no workspace filtering)
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
for _, clients := range h.rooms {
for client := range clients {
select {
case client.send <- message:
default:
close(client.send)
delete(clients, client)
}
}
}
h.mu.RUnlock()
@ -75,13 +107,83 @@ func (h *Hub) Run() {
}
}
// Broadcast sends a message to all connected clients.
// BroadcastToWorkspace sends a message only to clients in the given workspace.
func (h *Hub) BroadcastToWorkspace(workspaceID string, message []byte) {
h.mu.RLock()
clients := h.rooms[workspaceID]
var slow []*Client
for client := range clients {
select {
case client.send <- message:
default:
slow = append(slow, client)
}
}
h.mu.RUnlock()
// Remove slow clients under write lock
if len(slow) > 0 {
h.mu.Lock()
for _, client := range slow {
if room, ok := h.rooms[workspaceID]; ok {
if _, exists := room[client]; exists {
delete(room, client)
close(client.send)
if len(room) == 0 {
delete(h.rooms, workspaceID)
}
}
}
}
h.mu.Unlock()
}
}
// Broadcast sends a message to all connected clients (used for daemon events).
func (h *Hub) Broadcast(message []byte) {
h.broadcast <- message
}
// HandleWebSocket upgrades an HTTP connection to WebSocket.
func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request) {
// HandleWebSocket upgrades an HTTP connection to WebSocket with JWT auth.
func HandleWebSocket(hub *Hub, mc MembershipChecker, w http.ResponseWriter, r *http.Request) {
tokenStr := r.URL.Query().Get("token")
workspaceID := r.URL.Query().Get("workspace_id")
if tokenStr == "" || workspaceID == "" {
http.Error(w, `{"error":"token and workspace_id required"}`, http.StatusUnauthorized)
return
}
// Validate JWT
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, jwt.ErrSignatureInvalid
}
return auth.JWTSecret(), nil
})
if err != nil || !token.Valid {
http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized)
return
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
return
}
userID, ok := claims["sub"].(string)
if !ok || strings.TrimSpace(userID) == "" {
http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
return
}
// Verify user is a member of the workspace
if !mc.IsMember(r.Context(), userID, workspaceID) {
http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
@ -89,9 +191,11 @@ func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request) {
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
hub: hub,
conn: conn,
send: make(chan []byte, 256),
userID: userID,
workspaceID: workspaceID,
}
hub.register <- client
@ -113,8 +217,8 @@ func (c *Client) readPump() {
}
break
}
// TODO: Route messages to appropriate handlers
log.Printf("Received message: %s", message)
// TODO: Route inbound messages to appropriate handlers
log.Printf("Received message from user=%s workspace=%s: %s", c.userID, c.workspaceID, message)
}
}

View file

@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
@ -17,10 +18,11 @@ import (
type TaskService struct {
Queries *db.Queries
Hub *realtime.Hub
Bus *events.Bus
}
func NewTaskService(q *db.Queries, hub *realtime.Hub) *TaskService {
return &TaskService{Queries: q, Hub: hub}
func NewTaskService(q *db.Queries, hub *realtime.Hub, bus *events.Bus) *TaskService {
return &TaskService{Queries: q, Hub: hub, Bus: bus}
}
// EnqueueTaskForIssue creates a task with a context snapshot of the issue.
@ -98,7 +100,7 @@ func (s *TaskService) ClaimTask(ctx context.Context, agentID pgtype.UUID) (*db.A
s.updateAgentStatus(ctx, agentID, "working")
// Broadcast task:dispatch
s.broadcastTaskDispatch(task)
s.broadcastTaskDispatch(ctx, task)
return &task, nil
}
@ -184,7 +186,7 @@ func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, resu
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(protocol.EventTaskCompleted, task)
s.broadcastTaskEvent(ctx, protocol.EventTaskCompleted, task)
return &task, nil
}
@ -218,18 +220,24 @@ func (s *TaskService) FailTask(ctx context.Context, taskID pgtype.UUID, errMsg s
s.ReconcileAgentStatus(ctx, task.AgentID)
// Broadcast
s.broadcastTaskEvent(protocol.EventTaskFailed, task)
s.broadcastTaskEvent(ctx, protocol.EventTaskFailed, task)
return &task, nil
}
// ReportProgress broadcasts a progress update via WebSocket.
func (s *TaskService) ReportProgress(taskID string, summary string, step, total int) {
s.broadcast(protocol.EventTaskProgress, protocol.TaskProgressPayload{
TaskID: taskID,
Summary: summary,
Step: step,
Total: total,
// ReportProgress broadcasts a progress update via the event bus.
func (s *TaskService) ReportProgress(ctx context.Context, taskID string, workspaceID string, summary string, step, total int) {
s.Bus.Publish(events.Event{
Type: protocol.EventTaskProgress,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: protocol.TaskProgressPayload{
TaskID: taskID,
Summary: summary,
Step: step,
Total: total,
},
})
}
@ -254,7 +262,13 @@ func (s *TaskService) updateAgentStatus(ctx context.Context, agentID pgtype.UUID
if err != nil {
return
}
s.broadcast(protocol.EventAgentStatus, map[string]any{"agent": agentToMap(agent)})
s.Bus.Publish(events.Event{
Type: protocol.EventAgentStatus,
WorkspaceID: util.UUIDToString(agent.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"agent": agentToMap(agent)},
})
}
func buildContextSnapshot(issue db.Issue, agent db.Agent, runtime db.AgentRuntime, workspaceContext string) map[string]any {
@ -319,7 +333,7 @@ func priorityToInt(p string) int32 {
}
}
func (s *TaskService) broadcastTaskDispatch(task db.AgentTaskQueue) {
func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTaskQueue) {
var payload map[string]any
if task.Context != nil {
json.Unmarshal(task.Context, &payload)
@ -329,33 +343,46 @@ func (s *TaskService) broadcastTaskDispatch(task db.AgentTaskQueue) {
}
payload["task_id"] = util.UUIDToString(task.ID)
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
s.broadcast(protocol.EventTaskDispatch, payload)
}
func (s *TaskService) broadcastTaskEvent(eventType string, task db.AgentTaskQueue) {
s.broadcast(eventType, map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(task.IssueID),
"status": task.Status,
workspaceID := ""
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
}
s.Bus.Publish(events.Event{
Type: protocol.EventTaskDispatch,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: payload,
})
}
func (s *TaskService) broadcast(eventType string, payload any) {
msg := map[string]any{
"type": eventType,
"payload": payload,
func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) {
workspaceID := ""
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
}
data, err := json.Marshal(msg)
if err != nil {
return
}
s.Hub.Broadcast(data)
s.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(task.IssueID),
"status": task.Status,
},
})
}
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
s.broadcast(protocol.EventIssueUpdated, map[string]any{
"issue": issueToMap(issue),
s.Bus.Publish(events.Event{
Type: protocol.EventIssueUpdated,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"issue": issueToMap(issue)},
})
}
@ -389,8 +416,12 @@ func (s *TaskService) createInboxForIssueCreator(ctx context.Context, issue db.I
if err != nil {
return
}
s.broadcast(protocol.EventInboxNew, map[string]any{
"item": inboxToMap(item),
s.Bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: util.UUIDToString(issue.WorkspaceID),
ActorType: "system",
ActorID: "",
Payload: map[string]any{"item": inboxToMap(item)},
})
}

View file

@ -7,8 +7,15 @@ const (
EventIssueUpdated = "issue:updated"
EventIssueDeleted = "issue:deleted"
// Comment events
EventCommentCreated = "comment:created"
EventCommentUpdated = "comment:updated"
EventCommentDeleted = "comment:deleted"
// Agent events
EventAgentStatus = "agent:status"
EventAgentStatus = "agent:status"
EventAgentCreated = "agent:created"
EventAgentDeleted = "agent:deleted"
// Task events (server <-> daemon)
EventTaskDispatch = "task:dispatch"
@ -17,7 +24,16 @@ const (
EventTaskFailed = "task:failed"
// Inbox events
EventInboxNew = "inbox:new"
EventInboxNew = "inbox:new"
EventInboxRead = "inbox:read"
EventInboxArchived = "inbox:archived"
// Workspace events
EventWorkspaceUpdated = "workspace:updated"
// Member events
EventMemberAdded = "member:added"
EventMemberRemoved = "member:removed"
// Daemon events
EventDaemonHeartbeat = "daemon:heartbeat"