multica/server/internal/realtime/hub.go
Naiyuan Qing 9127e543d5 feat: add event bus, WS workspace isolation, and global store migration
- Add internal event bus (server/internal/events/) with synchronous
  pub/sub and panic isolation per listener
- Upgrade WebSocket Hub to workspace-scoped rooms with JWT auth
  and membership verification on connect
- Add 10 new WS event types (comment CRUD, inbox read/archive,
  agent create/delete, workspace/member events)
- Refactor all handlers and TaskService to publish events via Bus
  instead of direct Hub.Broadcast calls
- Add WS broadcast listener that routes events to correct workspace
- Frontend: WSClient sends token + workspace_id on connect with
  auto-reconnect refetch
- Frontend: centralized useRealtimeSync hook dispatches all WS
  events to global Zustand stores
- Migrate issues and inbox pages from local useState to global
  useIssueStore/useInboxStore
- Make store addIssue/addItem idempotent to prevent duplicates
- Remove dead packages/hooks/src/use-realtime.ts
- Add feature tracking files for 4 planned features

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 10:08:27 +08:00

234 lines
5.5 KiB
Go

package realtime
import (
"context"
"log"
"net/http"
"strings"
"sync"
"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/websocket"
"github.com/multica-ai/multica/server/internal/auth"
)
// MembershipChecker verifies a user belongs to a workspace.
type MembershipChecker interface {
IsMember(ctx context.Context, userID, workspaceID string) bool
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// TODO: Restrict origins in production
return true
},
}
// Client represents a single WebSocket connection with identity.
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
userID string
workspaceID string
}
// Hub manages WebSocket connections organized by workspace rooms.
type Hub struct {
rooms map[string]map[*Client]bool // workspaceID -> clients
broadcast chan []byte // global broadcast (daemon events)
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
// NewHub creates a new Hub instance.
func NewHub() *Hub {
return &Hub{
rooms: make(map[string]map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
// Run starts the hub event loop.
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
room := client.workspaceID
if h.rooms[room] == nil {
h.rooms[room] = make(map[*Client]bool)
}
h.rooms[room][client] = true
total := 0
for _, r := range h.rooms {
total += len(r)
}
h.mu.Unlock()
log.Printf("Client connected (workspace=%s). Total: %d", room, total)
case client := <-h.unregister:
h.mu.Lock()
room := client.workspaceID
if clients, ok := h.rooms[room]; ok {
if _, exists := clients[client]; exists {
delete(clients, client)
close(client.send)
if len(clients) == 0 {
delete(h.rooms, room)
}
}
}
total := 0
for _, r := range h.rooms {
total += len(r)
}
h.mu.Unlock()
log.Printf("Client disconnected (workspace=%s). Total: %d", room, total)
case message := <-h.broadcast:
// Global broadcast for daemon events (no workspace filtering)
h.mu.RLock()
for _, clients := range h.rooms {
for client := range clients {
select {
case client.send <- message:
default:
close(client.send)
delete(clients, client)
}
}
}
h.mu.RUnlock()
}
}
}
// BroadcastToWorkspace sends a message only to clients in the given workspace.
func (h *Hub) BroadcastToWorkspace(workspaceID string, message []byte) {
h.mu.RLock()
clients := h.rooms[workspaceID]
var slow []*Client
for client := range clients {
select {
case client.send <- message:
default:
slow = append(slow, client)
}
}
h.mu.RUnlock()
// Remove slow clients under write lock
if len(slow) > 0 {
h.mu.Lock()
for _, client := range slow {
if room, ok := h.rooms[workspaceID]; ok {
if _, exists := room[client]; exists {
delete(room, client)
close(client.send)
if len(room) == 0 {
delete(h.rooms, workspaceID)
}
}
}
}
h.mu.Unlock()
}
}
// Broadcast sends a message to all connected clients (used for daemon events).
func (h *Hub) Broadcast(message []byte) {
h.broadcast <- message
}
// HandleWebSocket upgrades an HTTP connection to WebSocket with JWT auth.
func HandleWebSocket(hub *Hub, mc MembershipChecker, w http.ResponseWriter, r *http.Request) {
tokenStr := r.URL.Query().Get("token")
workspaceID := r.URL.Query().Get("workspace_id")
if tokenStr == "" || workspaceID == "" {
http.Error(w, `{"error":"token and workspace_id required"}`, http.StatusUnauthorized)
return
}
// Validate JWT
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, jwt.ErrSignatureInvalid
}
return auth.JWTSecret(), nil
})
if err != nil || !token.Valid {
http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized)
return
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
return
}
userID, ok := claims["sub"].(string)
if !ok || strings.TrimSpace(userID) == "" {
http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
return
}
// Verify user is a member of the workspace
if !mc.IsMember(r.Context(), userID, workspaceID) {
http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
userID: userID,
workspaceID: workspaceID,
}
hub.register <- client
go client.writePump()
go client.readPump()
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("WebSocket read error: %v", err)
}
break
}
// TODO: Route inbound messages to appropriate handlers
log.Printf("Received message from user=%s workspace=%s: %s", c.userID, c.workspaceID, message)
}
}
func (c *Client) writePump() {
defer c.conn.Close()
for message := range c.send {
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Printf("WebSocket write error: %v", err)
return
}
}
}