multica/server/internal/handler/handler.go
Jiayuan 903fbee55d feat(runtimes): add Runtimes tab with usage tracking and connection test
Add a new "Runtimes" sidebar tab to manage local agent runtimes with three
main capabilities: runtime status overview, token usage tracking (reading
Claude Code and Codex CLI local JSONL logs via daemon), and an interactive
connection test that sends a ping through the daemon to verify end-to-end
agent CLI connectivity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 18:28:36 +08:00

231 lines
6.5 KiB
Go

package handler
import (
"context"
"encoding/json"
"errors"
"net/http"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/util"
)
type txStarter interface {
Begin(ctx context.Context) (pgx.Tx, error)
}
type dbExecutor interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
type Handler struct {
Queries *db.Queries
DB dbExecutor
TxStarter txStarter
Hub *realtime.Hub
Bus *events.Bus
TaskService *service.TaskService
EmailService *service.EmailService
PingStore *PingStore
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService) *Handler {
var executor dbExecutor
if candidate, ok := txStarter.(dbExecutor); ok {
executor = candidate
}
return &Handler{
Queries: queries,
DB: executor,
TxStarter: txStarter,
Hub: hub,
Bus: bus,
TaskService: service.NewTaskService(queries, hub, bus),
EmailService: emailService,
PingStore: NewPingStore(),
}
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}
// Thin wrappers around util functions (preserve existing handler code unchanged).
func parseUUID(s string) pgtype.UUID { return util.ParseUUID(s) }
func uuidToString(u pgtype.UUID) string { return util.UUIDToString(u) }
func textToPtr(t pgtype.Text) *string { return util.TextToPtr(t) }
func ptrToText(s *string) pgtype.Text { return util.PtrToText(s) }
func strToText(s string) pgtype.Text { return util.StrToText(s) }
func timestampToString(t pgtype.Timestamptz) string { return util.TimestampToString(t) }
func timestampToPtr(t pgtype.Timestamptz) *string { return util.TimestampToPtr(t) }
func uuidToPtr(u pgtype.UUID) *string { return util.UUIDToPtr(u) }
// publish sends a domain event through the event bus.
func (h *Handler) publish(eventType, workspaceID, actorType, actorID string, payload any) {
h.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
Payload: payload,
})
}
func isNotFound(err error) bool {
return errors.Is(err, pgx.ErrNoRows)
}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && pgErr.Code == "23505"
}
func requestUserID(r *http.Request) string {
return r.Header.Get("X-User-ID")
}
func requireUserID(w http.ResponseWriter, r *http.Request) (string, bool) {
userID := requestUserID(r)
if userID == "" {
writeError(w, http.StatusUnauthorized, "user not authenticated")
return "", false
}
return userID, true
}
func resolveWorkspaceID(r *http.Request) string {
workspaceID := r.URL.Query().Get("workspace_id")
if workspaceID != "" {
return workspaceID
}
return r.Header.Get("X-Workspace-ID")
}
func roleAllowed(role string, roles ...string) bool {
for _, candidate := range roles {
if role == candidate {
return true
}
}
return false
}
func countOwners(members []db.Member) int {
owners := 0
for _, member := range members {
if member.Role == "owner" {
owners++
}
}
return owners
}
func (h *Handler) getWorkspaceMember(ctx context.Context, userID, workspaceID string) (db.Member, error) {
return h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: parseUUID(userID),
WorkspaceID: parseUUID(workspaceID),
})
}
func (h *Handler) requireWorkspaceMember(w http.ResponseWriter, r *http.Request, workspaceID, notFoundMsg string) (db.Member, bool) {
if workspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return db.Member{}, false
}
userID, ok := requireUserID(w, r)
if !ok {
return db.Member{}, false
}
member, err := h.getWorkspaceMember(r.Context(), userID, workspaceID)
if err != nil {
writeError(w, http.StatusNotFound, notFoundMsg)
return db.Member{}, false
}
return member, true
}
func (h *Handler) requireWorkspaceRole(w http.ResponseWriter, r *http.Request, workspaceID, notFoundMsg string, roles ...string) (db.Member, bool) {
member, ok := h.requireWorkspaceMember(w, r, workspaceID, notFoundMsg)
if !ok {
return db.Member{}, false
}
if !roleAllowed(member.Role, roles...) {
writeError(w, http.StatusForbidden, "insufficient permissions")
return db.Member{}, false
}
return member, true
}
func (h *Handler) loadIssueForUser(w http.ResponseWriter, r *http.Request, issueID string) (db.Issue, bool) {
if _, ok := requireUserID(w, r); !ok {
return db.Issue{}, false
}
issue, err := h.Queries.GetIssue(r.Context(), parseUUID(issueID))
if err != nil {
writeError(w, http.StatusNotFound, "issue not found")
return db.Issue{}, false
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(issue.WorkspaceID), "issue not found"); !ok {
return db.Issue{}, false
}
return issue, true
}
func (h *Handler) loadAgentForUser(w http.ResponseWriter, r *http.Request, agentID string) (db.Agent, bool) {
if _, ok := requireUserID(w, r); !ok {
return db.Agent{}, false
}
agent, err := h.Queries.GetAgent(r.Context(), parseUUID(agentID))
if err != nil {
writeError(w, http.StatusNotFound, "agent not found")
return db.Agent{}, false
}
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(agent.WorkspaceID), "agent not found"); !ok {
return db.Agent{}, false
}
return agent, true
}
func (h *Handler) loadInboxItemForUser(w http.ResponseWriter, r *http.Request, itemID string) (db.InboxItem, bool) {
userID, ok := requireUserID(w, r)
if !ok {
return db.InboxItem{}, false
}
item, err := h.Queries.GetInboxItem(r.Context(), parseUUID(itemID))
if err != nil {
writeError(w, http.StatusNotFound, "inbox item not found")
return db.InboxItem{}, false
}
if item.RecipientType != "member" || uuidToString(item.RecipientID) != userID {
writeError(w, http.StatusNotFound, "inbox item not found")
return db.InboxItem{}, false
}
return item, true
}