From 4914f1d5ddf1cb71090c0b050286c739cd6791fc Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Sun, 29 Mar 2026 17:42:50 +0800 Subject: [PATCH] feat(realtime): route personal events to target user only Inbox events (new, read, archived, batch) are now sent via SendToUser instead of broadcasting to the entire workspace room. Adds a new Hub.SendToUser method. Also guards task broadcasts against deleted issues to prevent global event leaks. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/cmd/server/listeners.go | 95 +++++++++++++++++++++++++++++++-- server/internal/realtime/hub.go | 54 +++++++++++++++++++ server/internal/service/task.go | 6 +++ 3 files changed, 152 insertions(+), 3 deletions(-) diff --git a/server/cmd/server/listeners.go b/server/cmd/server/listeners.go index 4bee8856..3410e5b6 100644 --- a/server/cmd/server/listeners.go +++ b/server/cmd/server/listeners.go @@ -2,17 +2,105 @@ package main import ( "encoding/json" + "fmt" "log/slog" + "strings" "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/handler" "github.com/multica-ai/multica/server/internal/realtime" + "github.com/multica-ai/multica/server/pkg/protocol" ) // registerListeners wires up event bus listeners for WS broadcasting. -// Uses SubscribeAll to automatically broadcast ALL events to WebSocket clients, -// eliminating the need to maintain a manual event type list. +// Personal events (inbox, invites) are sent only to the target user via +// SendToUser. All other events are broadcast to the workspace room. func registerListeners(bus *events.Bus, hub *realtime.Hub) { + // Personal events should NOT be broadcast to the whole workspace. + personalEvents := map[string]bool{ + protocol.EventInboxNew: true, + protocol.EventInboxRead: true, + protocol.EventInboxArchived: true, + protocol.EventInboxBatchRead: true, + protocol.EventInboxBatchArchived: true, + } + + // Helper: marshal event and send to a specific user. + sendToRecipient := func(hub *realtime.Hub, e events.Event, recipientID string) { + if recipientID == "" { + return + } + data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload}) + if err != nil { + return + } + hub.SendToUser(recipientID, data) + } + + // inbox:new — extract recipient from nested item + bus.Subscribe(protocol.EventInboxNew, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + item, ok := payload["item"].(map[string]any) + if !ok { + return + } + recipientID, _ := item["recipient_id"].(string) + sendToRecipient(hub, e, recipientID) + }) + + // inbox:read, inbox:archived, inbox:batch-read, inbox:batch-archived + // — extract recipient from top-level payload + for _, eventType := range []string{ + protocol.EventInboxRead, protocol.EventInboxArchived, + protocol.EventInboxBatchRead, protocol.EventInboxBatchArchived, + } { + bus.Subscribe(eventType, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + recipientID, _ := payload["recipient_id"].(string) + sendToRecipient(hub, e, recipientID) + }) + } + + // member:added — also send to the invited user so they discover the new workspace. + // Pass excludeWorkspace so clients already in the target room (reached via + // BroadcastToWorkspace in SubscribeAll) don't receive the event twice. + bus.Subscribe(protocol.EventMemberAdded, func(e events.Event) { + payload, ok := e.Payload.(map[string]any) + if !ok { + return + } + var userID string + switch m := payload["member"].(type) { + case handler.MemberWithUserResponse: + userID = m.UserID + case map[string]any: + userID, _ = m["user_id"].(string) + default: + slog.Warn("member:added: unexpected member payload type", "type", fmt.Sprintf("%T", payload["member"])) + } + if userID == "" { + return + } + data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload}) + if err != nil { + return + } + hub.SendToUser(userID, data, e.WorkspaceID) + }) + + // SubscribeAll handles workspace-broadcast for non-personal events. bus.SubscribeAll(func(e events.Event) { + // Skip personal events — they are handled by type-specific listeners above. + if personalEvents[e.Type] { + return + } + msg := map[string]any{ "type": e.Type, "payload": e.Payload, @@ -24,8 +112,9 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) { } if e.WorkspaceID != "" { hub.BroadcastToWorkspace(e.WorkspaceID, data) - } else { + } else if strings.HasPrefix(e.Type, "daemon:") { hub.Broadcast(data) } + // Otherwise drop — no global broadcast for non-daemon events without a workspace. }) } diff --git a/server/internal/realtime/hub.go b/server/internal/realtime/hub.go index 09ab4294..3098b7b1 100644 --- a/server/internal/realtime/hub.go +++ b/server/internal/realtime/hub.go @@ -155,6 +155,60 @@ func (h *Hub) BroadcastToWorkspace(workspaceID string, message []byte) { } } +// SendToUser sends a message to all connections belonging to a specific user, +// regardless of which workspace room they are in. Connections in excludeWorkspace +// are skipped (they already receive the message via BroadcastToWorkspace). +func (h *Hub) SendToUser(userID string, message []byte, excludeWorkspace ...string) { + exclude := "" + if len(excludeWorkspace) > 0 { + exclude = excludeWorkspace[0] + } + + h.mu.RLock() + type target struct { + client *Client + workspaceID string + } + var targets []target + for wsID, clients := range h.rooms { + if wsID == exclude { + continue + } + for client := range clients { + if client.userID == userID { + targets = append(targets, target{client, wsID}) + } + } + } + h.mu.RUnlock() + + var slow []target + for _, t := range targets { + select { + case t.client.send <- message: + default: + slow = append(slow, t) + } + } + + // Remove slow clients under write lock (same pattern as BroadcastToWorkspace) + if len(slow) > 0 { + h.mu.Lock() + for _, t := range slow { + if room, ok := h.rooms[t.workspaceID]; ok { + if _, exists := room[t.client]; exists { + delete(room, t.client) + close(t.client.send) + if len(room) == 0 { + delete(h.rooms, t.workspaceID) + } + } + } + } + h.mu.Unlock() + } +} + // Broadcast sends a message to all connected clients (used for daemon events). func (h *Hub) Broadcast(message []byte) { h.broadcast <- message diff --git a/server/internal/service/task.go b/server/internal/service/task.go index 7fe74d6d..314c54f9 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -332,6 +332,9 @@ func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTa if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil { workspaceID = util.UUIDToString(issue.WorkspaceID) } + if workspaceID == "" { + return // Issue deleted; skip broadcast to avoid global leak + } s.Bus.Publish(events.Event{ Type: protocol.EventTaskDispatch, WorkspaceID: workspaceID, @@ -346,6 +349,9 @@ func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil { workspaceID = util.UUIDToString(issue.WorkspaceID) } + if workspaceID == "" { + return // Issue deleted; skip broadcast to avoid global leak + } s.Bus.Publish(events.Event{ Type: eventType, WorkspaceID: workspaceID,