feat(realtime): route personal events to target user only
Inbox events (new, read, archived, batch) are now sent via SendToUser instead of broadcasting to the entire workspace room. Adds a new Hub.SendToUser method. Also guards task broadcasts against deleted issues to prevent global event leaks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4126073229
commit
4914f1d5dd
3 changed files with 152 additions and 3 deletions
|
|
@ -2,17 +2,105 @@ package main
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/handler"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// registerListeners wires up event bus listeners for WS broadcasting.
|
||||
// Uses SubscribeAll to automatically broadcast ALL events to WebSocket clients,
|
||||
// eliminating the need to maintain a manual event type list.
|
||||
// Personal events (inbox, invites) are sent only to the target user via
|
||||
// SendToUser. All other events are broadcast to the workspace room.
|
||||
func registerListeners(bus *events.Bus, hub *realtime.Hub) {
|
||||
// Personal events should NOT be broadcast to the whole workspace.
|
||||
personalEvents := map[string]bool{
|
||||
protocol.EventInboxNew: true,
|
||||
protocol.EventInboxRead: true,
|
||||
protocol.EventInboxArchived: true,
|
||||
protocol.EventInboxBatchRead: true,
|
||||
protocol.EventInboxBatchArchived: true,
|
||||
}
|
||||
|
||||
// Helper: marshal event and send to a specific user.
|
||||
sendToRecipient := func(hub *realtime.Hub, e events.Event, recipientID string) {
|
||||
if recipientID == "" {
|
||||
return
|
||||
}
|
||||
data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
hub.SendToUser(recipientID, data)
|
||||
}
|
||||
|
||||
// inbox:new — extract recipient from nested item
|
||||
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
|
||||
payload, ok := e.Payload.(map[string]any)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
item, ok := payload["item"].(map[string]any)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
recipientID, _ := item["recipient_id"].(string)
|
||||
sendToRecipient(hub, e, recipientID)
|
||||
})
|
||||
|
||||
// inbox:read, inbox:archived, inbox:batch-read, inbox:batch-archived
|
||||
// — extract recipient from top-level payload
|
||||
for _, eventType := range []string{
|
||||
protocol.EventInboxRead, protocol.EventInboxArchived,
|
||||
protocol.EventInboxBatchRead, protocol.EventInboxBatchArchived,
|
||||
} {
|
||||
bus.Subscribe(eventType, func(e events.Event) {
|
||||
payload, ok := e.Payload.(map[string]any)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
recipientID, _ := payload["recipient_id"].(string)
|
||||
sendToRecipient(hub, e, recipientID)
|
||||
})
|
||||
}
|
||||
|
||||
// member:added — also send to the invited user so they discover the new workspace.
|
||||
// Pass excludeWorkspace so clients already in the target room (reached via
|
||||
// BroadcastToWorkspace in SubscribeAll) don't receive the event twice.
|
||||
bus.Subscribe(protocol.EventMemberAdded, func(e events.Event) {
|
||||
payload, ok := e.Payload.(map[string]any)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
var userID string
|
||||
switch m := payload["member"].(type) {
|
||||
case handler.MemberWithUserResponse:
|
||||
userID = m.UserID
|
||||
case map[string]any:
|
||||
userID, _ = m["user_id"].(string)
|
||||
default:
|
||||
slog.Warn("member:added: unexpected member payload type", "type", fmt.Sprintf("%T", payload["member"]))
|
||||
}
|
||||
if userID == "" {
|
||||
return
|
||||
}
|
||||
data, err := json.Marshal(map[string]any{"type": e.Type, "payload": e.Payload})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
hub.SendToUser(userID, data, e.WorkspaceID)
|
||||
})
|
||||
|
||||
// SubscribeAll handles workspace-broadcast for non-personal events.
|
||||
bus.SubscribeAll(func(e events.Event) {
|
||||
// Skip personal events — they are handled by type-specific listeners above.
|
||||
if personalEvents[e.Type] {
|
||||
return
|
||||
}
|
||||
|
||||
msg := map[string]any{
|
||||
"type": e.Type,
|
||||
"payload": e.Payload,
|
||||
|
|
@ -24,8 +112,9 @@ func registerListeners(bus *events.Bus, hub *realtime.Hub) {
|
|||
}
|
||||
if e.WorkspaceID != "" {
|
||||
hub.BroadcastToWorkspace(e.WorkspaceID, data)
|
||||
} else {
|
||||
} else if strings.HasPrefix(e.Type, "daemon:") {
|
||||
hub.Broadcast(data)
|
||||
}
|
||||
// Otherwise drop — no global broadcast for non-daemon events without a workspace.
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,6 +155,60 @@ func (h *Hub) BroadcastToWorkspace(workspaceID string, message []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
// SendToUser sends a message to all connections belonging to a specific user,
|
||||
// regardless of which workspace room they are in. Connections in excludeWorkspace
|
||||
// are skipped (they already receive the message via BroadcastToWorkspace).
|
||||
func (h *Hub) SendToUser(userID string, message []byte, excludeWorkspace ...string) {
|
||||
exclude := ""
|
||||
if len(excludeWorkspace) > 0 {
|
||||
exclude = excludeWorkspace[0]
|
||||
}
|
||||
|
||||
h.mu.RLock()
|
||||
type target struct {
|
||||
client *Client
|
||||
workspaceID string
|
||||
}
|
||||
var targets []target
|
||||
for wsID, clients := range h.rooms {
|
||||
if wsID == exclude {
|
||||
continue
|
||||
}
|
||||
for client := range clients {
|
||||
if client.userID == userID {
|
||||
targets = append(targets, target{client, wsID})
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
|
||||
var slow []target
|
||||
for _, t := range targets {
|
||||
select {
|
||||
case t.client.send <- message:
|
||||
default:
|
||||
slow = append(slow, t)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove slow clients under write lock (same pattern as BroadcastToWorkspace)
|
||||
if len(slow) > 0 {
|
||||
h.mu.Lock()
|
||||
for _, t := range slow {
|
||||
if room, ok := h.rooms[t.workspaceID]; ok {
|
||||
if _, exists := room[t.client]; exists {
|
||||
delete(room, t.client)
|
||||
close(t.client.send)
|
||||
if len(room) == 0 {
|
||||
delete(h.rooms, t.workspaceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients (used for daemon events).
|
||||
func (h *Hub) Broadcast(message []byte) {
|
||||
h.broadcast <- message
|
||||
|
|
|
|||
|
|
@ -332,6 +332,9 @@ func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTa
|
|||
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
|
||||
workspaceID = util.UUIDToString(issue.WorkspaceID)
|
||||
}
|
||||
if workspaceID == "" {
|
||||
return // Issue deleted; skip broadcast to avoid global leak
|
||||
}
|
||||
s.Bus.Publish(events.Event{
|
||||
Type: protocol.EventTaskDispatch,
|
||||
WorkspaceID: workspaceID,
|
||||
|
|
@ -346,6 +349,9 @@ func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string,
|
|||
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
|
||||
workspaceID = util.UUIDToString(issue.WorkspaceID)
|
||||
}
|
||||
if workspaceID == "" {
|
||||
return // Issue deleted; skip broadcast to avoid global leak
|
||||
}
|
||||
s.Bus.Publish(events.Event{
|
||||
Type: eventType,
|
||||
WorkspaceID: workspaceID,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue