merge: resolve conflicts with main

This commit is contained in:
Jiang Bohan 2026-04-01 13:12:23 +08:00
commit 4780540bd2
121 changed files with 6937 additions and 1556 deletions

View file

@ -25,11 +25,12 @@ type TimelineEntry struct {
Details json.RawMessage `json:"details,omitempty"`
// Comment-only fields
Content *string `json:"content,omitempty"`
ParentID *string `json:"parent_id,omitempty"`
UpdatedAt *string `json:"updated_at,omitempty"`
CommentType *string `json:"comment_type,omitempty"`
Reactions []ReactionResponse `json:"reactions,omitempty"`
Content *string `json:"content,omitempty"`
ParentID *string `json:"parent_id,omitempty"`
UpdatedAt *string `json:"updated_at,omitempty"`
CommentType *string `json:"comment_type,omitempty"`
Reactions []ReactionResponse `json:"reactions,omitempty"`
Attachments []AttachmentResponse `json:"attachments,omitempty"`
}
// ListTimeline returns a merged, chronologically-sorted timeline of activities
@ -79,20 +80,22 @@ func (h *Handler) ListTimeline(w http.ResponseWriter, r *http.Request) {
})
}
// Fetch reactions for all comments in one batch.
// Fetch reactions and attachments for all comments in one batch.
commentIDs := make([]pgtype.UUID, len(comments))
for i, c := range comments {
commentIDs[i] = c.ID
}
grouped := h.groupReactions(r, commentIDs)
groupedAtt := h.groupAttachments(r, commentIDs)
for _, c := range comments {
content := c.Content
commentType := c.Type
updatedAt := timestampToString(c.UpdatedAt)
cid := uuidToString(c.ID)
timeline = append(timeline, TimelineEntry{
Type: "comment",
ID: uuidToString(c.ID),
ID: cid,
ActorType: c.AuthorType,
ActorID: uuidToString(c.AuthorID),
Content: &content,
@ -100,7 +103,8 @@ func (h *Handler) ListTimeline(w http.ResponseWriter, r *http.Request) {
ParentID: uuidToPtr(c.ParentID),
CreatedAt: timestampToString(c.CreatedAt),
UpdatedAt: &updatedAt,
Reactions: grouped[uuidToString(c.ID)],
Reactions: grouped[cid],
Attachments: groupedAtt[cid],
})
}

View file

@ -328,24 +328,23 @@ type UpdateAgentRequest struct {
}
// canManageAgent checks whether the current user can update or delete an agent.
// Workspace-visible agents require owner/admin role. Private agents additionally
// require the user to be the agent's owner (or a workspace owner/admin).
// Workspace-visible agents can be managed by any workspace member.
// Private agents can only be managed by their owner or workspace owner/admin.
func (h *Handler) canManageAgent(w http.ResponseWriter, r *http.Request, agent db.Agent) bool {
wsID := uuidToString(agent.WorkspaceID)
member, ok := h.requireWorkspaceRole(w, r, wsID, "agent not found", "owner", "admin", "member")
if !ok {
return false
}
if agent.Visibility != "private" {
return true
}
isAdmin := roleAllowed(member.Role, "owner", "admin")
isAgentOwner := uuidToString(agent.OwnerID) == requestUserID(r)
if agent.Visibility == "private" && !isAdmin && !isAgentOwner {
if !isAdmin && !isAgentOwner {
writeError(w, http.StatusForbidden, "only the agent owner can manage this private agent")
return false
}
if agent.Visibility != "private" && !isAdmin && !isAgentOwner {
writeError(w, http.StatusForbidden, "insufficient permissions")
return false
}
return true
}

View file

@ -300,6 +300,13 @@ func (h *Handler) VerifyCode(w http.ResponseWriter, r *http.Request) {
return
}
// Set CloudFront signed cookies for CDN access.
if h.CFSigner != nil {
for _, cookie := range h.CFSigner.SignedCookies(time.Now().Add(72 * time.Hour)) {
http.SetCookie(w, cookie)
}
}
slog.Info("user logged in", append(logger.RequestAttrs(r), "user_id", uuidToString(user.ID), "email", user.Email)...)
writeJSON(w, http.StatusOK, LoginResponse{
Token: tokenString,

View file

@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"log/slog"
"net/http"
@ -8,38 +9,44 @@ import (
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/logger"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type CommentResponse struct {
ID string `json:"id"`
IssueID string `json:"issue_id"`
AuthorType string `json:"author_type"`
AuthorID string `json:"author_id"`
Content string `json:"content"`
Type string `json:"type"`
ParentID *string `json:"parent_id"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
Reactions []ReactionResponse `json:"reactions"`
ID string `json:"id"`
IssueID string `json:"issue_id"`
AuthorType string `json:"author_type"`
AuthorID string `json:"author_id"`
Content string `json:"content"`
Type string `json:"type"`
ParentID *string `json:"parent_id"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
Reactions []ReactionResponse `json:"reactions"`
Attachments []AttachmentResponse `json:"attachments"`
}
func commentToResponse(c db.Comment, reactions []ReactionResponse) CommentResponse {
func commentToResponse(c db.Comment, reactions []ReactionResponse, attachments []AttachmentResponse) CommentResponse {
if reactions == nil {
reactions = []ReactionResponse{}
}
if attachments == nil {
attachments = []AttachmentResponse{}
}
return CommentResponse{
ID: uuidToString(c.ID),
IssueID: uuidToString(c.IssueID),
AuthorType: c.AuthorType,
AuthorID: uuidToString(c.AuthorID),
Content: c.Content,
Type: c.Type,
ParentID: uuidToPtr(c.ParentID),
CreatedAt: timestampToString(c.CreatedAt),
UpdatedAt: timestampToString(c.UpdatedAt),
Reactions: reactions,
ID: uuidToString(c.ID),
IssueID: uuidToString(c.IssueID),
AuthorType: c.AuthorType,
AuthorID: uuidToString(c.AuthorID),
Content: c.Content,
Type: c.Type,
ParentID: uuidToPtr(c.ParentID),
CreatedAt: timestampToString(c.CreatedAt),
UpdatedAt: timestampToString(c.UpdatedAt),
Reactions: reactions,
Attachments: attachments,
}
}
@ -64,19 +71,22 @@ func (h *Handler) ListComments(w http.ResponseWriter, r *http.Request) {
commentIDs[i] = c.ID
}
grouped := h.groupReactions(r, commentIDs)
groupedAtt := h.groupAttachments(r, commentIDs)
resp := make([]CommentResponse, len(comments))
for i, c := range comments {
resp[i] = commentToResponse(c, grouped[uuidToString(c.ID)])
cid := uuidToString(c.ID)
resp[i] = commentToResponse(c, grouped[cid], groupedAtt[cid])
}
writeJSON(w, http.StatusOK, resp)
}
type CreateCommentRequest struct {
Content string `json:"content"`
Type string `json:"type"`
ParentID *string `json:"parent_id"`
Content string `json:"content"`
Type string `json:"type"`
ParentID *string `json:"parent_id"`
AttachmentIDs []string `json:"attachment_ids"`
}
func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
@ -133,7 +143,12 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
return
}
resp := commentToResponse(comment, nil)
// Link uploaded attachments to this comment.
if len(req.AttachmentIDs) > 0 {
h.linkAttachmentsByIDs(r.Context(), comment.ID, issue.ID, req.AttachmentIDs)
}
resp := commentToResponse(comment, nil, nil)
slog.Info("comment created", append(logger.RequestAttrs(r), "comment_id", uuidToString(comment.ID), "issue_id", issueID)...)
h.publish(protocol.EventCommentCreated, uuidToString(issue.WorkspaceID), authorType, authorID, map[string]any{
"comment": resp,
@ -145,7 +160,10 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
// If the issue is assigned to an agent with on_comment trigger, enqueue a new task.
// Skip when the comment comes from the assigned agent itself to avoid loops.
if authorType == "member" && h.shouldEnqueueOnComment(r.Context(), issue) {
// Also skip when the comment @mentions others but not the assignee agent —
// the user is talking to someone else, not requesting work from the assignee.
if authorType == "member" && h.shouldEnqueueOnComment(r.Context(), issue) &&
!h.commentMentionsOthersButNotAssignee(comment.Content, issue) {
// Resolve thread root: if the comment is a reply, agent should reply
// to the thread root (matching frontend behavior where all replies
// in a thread share the same top-level parent).
@ -158,9 +176,82 @@ func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
}
}
// Trigger @mentioned agents: parse agent mentions and enqueue tasks for each.
h.enqueueMentionedAgentTasks(r.Context(), issue, comment, authorType, authorID)
writeJSON(w, http.StatusCreated, resp)
}
// commentMentionsOthersButNotAssignee returns true if the comment @mentions
// anyone but does NOT @mention the issue's assignee agent. This is used to
// suppress the on_comment trigger when the user is directing their comment at
// someone else (e.g. sharing results with a colleague, asking another agent).
func (h *Handler) commentMentionsOthersButNotAssignee(content string, issue db.Issue) bool {
mentions := util.ParseMentions(content)
if len(mentions) == 0 {
return false // No mentions — normal on_comment behavior
}
if !issue.AssigneeID.Valid {
return true // No assignee — mentions target others
}
assigneeID := uuidToString(issue.AssigneeID)
for _, m := range mentions {
if m.ID == assigneeID {
return false // Assignee is mentioned — allow trigger
}
}
return true // Others mentioned but not assignee — suppress trigger
}
// enqueueMentionedAgentTasks parses @agent mentions from comment content and
// enqueues a task for each mentioned agent. Skips self-mentions, agents that
// are already the issue's assignee (handled by on_comment), and agents with
// on_mention trigger disabled.
func (h *Handler) enqueueMentionedAgentTasks(ctx context.Context, issue db.Issue, comment db.Comment, authorType, authorID string) {
// Don't trigger on terminal statuses.
if issue.Status == "done" || issue.Status == "cancelled" {
return
}
mentions := util.ParseMentions(comment.Content)
for _, m := range mentions {
if m.Type != "agent" {
continue
}
// Prevent self-trigger: skip if the comment author is this agent.
if authorType == "agent" && authorID == m.ID {
continue
}
agentUUID := parseUUID(m.ID)
// Prevent duplicate: skip if this agent is the issue's assignee
// (already handled by the on_comment trigger above).
if issue.AssigneeType.Valid && issue.AssigneeType.String == "agent" &&
issue.AssigneeID.Valid && uuidToString(issue.AssigneeID) == m.ID {
continue
}
// Check if the agent has on_mention trigger enabled.
if !h.isAgentMentionTriggerEnabled(ctx, agentUUID) {
continue
}
// Dedup: skip if this agent already has a pending task for this issue.
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
IssueID: issue.ID,
AgentID: agentUUID,
})
if err != nil || hasPending {
continue
}
// Resolve thread root for reply threading.
replyTo := comment.ID
if comment.ParentID.Valid {
replyTo = comment.ParentID
}
if _, err := h.TaskService.EnqueueTaskForMention(ctx, issue, agentUUID, replyTo); err != nil {
slog.Warn("enqueue mention agent task failed", "issue_id", uuidToString(issue.ID), "agent_id", m.ID, "error", err)
}
}
}
func (h *Handler) UpdateComment(w http.ResponseWriter, r *http.Request) {
commentId := chi.URLParam(r, "commentId")
@ -215,9 +306,11 @@ func (h *Handler) UpdateComment(w http.ResponseWriter, r *http.Request) {
return
}
// Fetch reactions for the updated comment.
// Fetch reactions and attachments for the updated comment.
grouped := h.groupReactions(r, []pgtype.UUID{comment.ID})
resp := commentToResponse(comment, grouped[uuidToString(comment.ID)])
groupedAtt := h.groupAttachments(r, []pgtype.UUID{comment.ID})
cid := uuidToString(comment.ID)
resp := commentToResponse(comment, grouped[cid], groupedAtt[cid])
slog.Info("comment updated", append(logger.RequestAttrs(r), "comment_id", commentId)...)
h.publish(protocol.EventCommentUpdated, workspaceID, actorType, actorID, map[string]any{"comment": resp})
writeJSON(w, http.StatusOK, resp)
@ -255,11 +348,16 @@ func (h *Handler) DeleteComment(w http.ResponseWriter, r *http.Request) {
return
}
// Collect attachment URLs before CASCADE delete removes them.
attachmentURLs, _ := h.Queries.ListAttachmentURLsByCommentID(r.Context(), parseUUID(commentId))
if err := h.Queries.DeleteComment(r.Context(), parseUUID(commentId)); err != nil {
slog.Warn("delete comment failed", append(logger.RequestAttrs(r), "error", err, "comment_id", commentId)...)
writeError(w, http.StatusInternalServerError, "failed to delete comment")
return
}
h.deleteS3Objects(r.Context(), attachmentURLs)
slog.Info("comment deleted", append(logger.RequestAttrs(r), "comment_id", commentId, "issue_id", uuidToString(comment.IssueID))...)
h.publish(protocol.EventCommentDeleted, workspaceID, actorType, actorID, map[string]any{
"comment_id": commentId,

View file

@ -53,6 +53,12 @@ func (h *Handler) DaemonRegister(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "at least one runtime is required")
return
}
// Verify the caller is a member of the target workspace.
if _, ok := h.requireWorkspaceMember(w, r, req.WorkspaceID, "workspace not found"); !ok {
return
}
ws, err := h.Queries.GetWorkspace(r.Context(), parseUUID(req.WorkspaceID))
if err != nil {
writeError(w, http.StatusNotFound, "workspace not found")
@ -471,12 +477,20 @@ func (h *Handler) ReportTaskMessages(w http.ResponseWriter, r *http.Request) {
func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
messages, err := h.Queries.ListTaskMessages(r.Context(), parseUUID(taskID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list task messages")
return
}
issueID := uuidToString(task.IssueID)
resp := make([]protocol.TaskMessagePayload, len(messages))
for i, m := range messages {
var input map[string]any
@ -485,6 +499,7 @@ func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
}
resp[i] = protocol.TaskMessagePayload{
TaskID: taskID,
IssueID: issueID,
Seq: int(m.Seq),
Type: m.Type,
Tool: m.Tool.String,

View file

@ -1,386 +0,0 @@
package handler
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
)
const daemonPairingTTL = 10 * time.Minute
type daemonPairingSessionRecord struct {
Token string
DaemonID string
DeviceName string
RuntimeName string
RuntimeType string
RuntimeVersion string
WorkspaceID pgtype.UUID
ApprovedBy pgtype.UUID
Status string
ApprovedAt pgtype.Timestamptz
ClaimedAt pgtype.Timestamptz
ExpiresAt pgtype.Timestamptz
CreatedAt pgtype.Timestamptz
UpdatedAt pgtype.Timestamptz
}
type DaemonPairingSessionResponse struct {
Token string `json:"token"`
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
WorkspaceID *string `json:"workspace_id"`
Status string `json:"status"`
ApprovedAt *string `json:"approved_at"`
ClaimedAt *string `json:"claimed_at"`
ExpiresAt string `json:"expires_at"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
LinkURL *string `json:"link_url,omitempty"`
}
type CreateDaemonPairingSessionRequest struct {
DaemonID string `json:"daemon_id"`
DeviceName string `json:"device_name"`
RuntimeName string `json:"runtime_name"`
RuntimeType string `json:"runtime_type"`
RuntimeVersion string `json:"runtime_version"`
}
type ApproveDaemonPairingSessionRequest struct {
WorkspaceID string `json:"workspace_id"`
}
func daemonAppBaseURL() string {
for _, key := range []string{"MULTICA_APP_URL", "FRONTEND_ORIGIN"} {
if value := strings.TrimSpace(os.Getenv(key)); value != "" {
return strings.TrimRight(value, "/")
}
}
return "http://localhost:3000"
}
func daemonPairingLinkURL(token string) string {
base := daemonAppBaseURL()
return base + "/pair/local?token=" + url.QueryEscape(token)
}
func daemonPairingSessionToResponse(rec daemonPairingSessionRecord, includeLink bool) DaemonPairingSessionResponse {
resp := DaemonPairingSessionResponse{
Token: rec.Token,
DaemonID: rec.DaemonID,
DeviceName: rec.DeviceName,
RuntimeName: rec.RuntimeName,
RuntimeType: rec.RuntimeType,
RuntimeVersion: rec.RuntimeVersion,
WorkspaceID: uuidToPtr(rec.WorkspaceID),
Status: rec.Status,
ApprovedAt: timestampToPtr(rec.ApprovedAt),
ClaimedAt: timestampToPtr(rec.ClaimedAt),
ExpiresAt: timestampToString(rec.ExpiresAt),
CreatedAt: timestampToString(rec.CreatedAt),
UpdatedAt: timestampToString(rec.UpdatedAt),
}
if includeLink {
link := daemonPairingLinkURL(rec.Token)
resp.LinkURL = &link
}
return resp
}
func randomDaemonPairingToken() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func (h *Handler) getDaemonPairingSession(ctx context.Context, token string) (daemonPairingSessionRecord, error) {
if h.DB == nil {
return daemonPairingSessionRecord{}, fmt.Errorf("database executor is not configured")
}
var rec daemonPairingSessionRecord
err := h.DB.QueryRow(ctx, `
SELECT
token,
daemon_id,
device_name,
runtime_name,
runtime_type,
runtime_version,
workspace_id,
approved_by,
status,
approved_at,
claimed_at,
expires_at,
created_at,
updated_at
FROM daemon_pairing_session
WHERE token = $1
`, token).Scan(
&rec.Token,
&rec.DaemonID,
&rec.DeviceName,
&rec.RuntimeName,
&rec.RuntimeType,
&rec.RuntimeVersion,
&rec.WorkspaceID,
&rec.ApprovedBy,
&rec.Status,
&rec.ApprovedAt,
&rec.ClaimedAt,
&rec.ExpiresAt,
&rec.CreatedAt,
&rec.UpdatedAt,
)
if err != nil {
return daemonPairingSessionRecord{}, err
}
if rec.Status == "pending" && rec.ExpiresAt.Valid && rec.ExpiresAt.Time.Before(time.Now()) {
if _, err := h.DB.Exec(ctx, `
UPDATE daemon_pairing_session
SET status = 'expired', updated_at = now()
WHERE token = $1 AND status = 'pending'
`, token); err == nil {
rec.Status = "expired"
rec.UpdatedAt = pgtype.Timestamptz{Time: time.Now(), Valid: true}
}
}
return rec, nil
}
func (h *Handler) CreateDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
if h.DB == nil {
writeError(w, http.StatusInternalServerError, "database executor is not configured")
return
}
var req CreateDaemonPairingSessionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
req.DaemonID = strings.TrimSpace(req.DaemonID)
req.DeviceName = strings.TrimSpace(req.DeviceName)
req.RuntimeName = strings.TrimSpace(req.RuntimeName)
req.RuntimeType = strings.TrimSpace(req.RuntimeType)
req.RuntimeVersion = strings.TrimSpace(req.RuntimeVersion)
if req.DaemonID == "" {
writeError(w, http.StatusBadRequest, "daemon_id is required")
return
}
if req.DeviceName == "" {
writeError(w, http.StatusBadRequest, "device_name is required")
return
}
if req.RuntimeName == "" {
writeError(w, http.StatusBadRequest, "runtime_name is required")
return
}
if req.RuntimeType == "" {
writeError(w, http.StatusBadRequest, "runtime_type is required")
return
}
token, err := randomDaemonPairingToken()
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create pairing token")
return
}
expiresAt := time.Now().Add(daemonPairingTTL)
var rec daemonPairingSessionRecord
err = h.DB.QueryRow(r.Context(), `
INSERT INTO daemon_pairing_session (
token,
daemon_id,
device_name,
runtime_name,
runtime_type,
runtime_version,
expires_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING
token,
daemon_id,
device_name,
runtime_name,
runtime_type,
runtime_version,
workspace_id,
approved_by,
status,
approved_at,
claimed_at,
expires_at,
created_at,
updated_at
`,
token,
req.DaemonID,
req.DeviceName,
req.RuntimeName,
req.RuntimeType,
req.RuntimeVersion,
expiresAt,
).Scan(
&rec.Token,
&rec.DaemonID,
&rec.DeviceName,
&rec.RuntimeName,
&rec.RuntimeType,
&rec.RuntimeVersion,
&rec.WorkspaceID,
&rec.ApprovedBy,
&rec.Status,
&rec.ApprovedAt,
&rec.ClaimedAt,
&rec.ExpiresAt,
&rec.CreatedAt,
&rec.UpdatedAt,
)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create pairing session")
return
}
writeJSON(w, http.StatusCreated, daemonPairingSessionToResponse(rec, true))
}
func (h *Handler) GetDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
rec, err := h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusNotFound, "pairing session not found")
return
}
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
}
func (h *Handler) ApproveDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
rec, err := h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusNotFound, "pairing session not found")
return
}
if rec.Status == "expired" {
writeError(w, http.StatusBadRequest, "pairing session expired")
return
}
if rec.Status == "claimed" {
writeError(w, http.StatusBadRequest, "pairing session already claimed")
return
}
if rec.Status == "approved" {
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
return
}
var req ApproveDaemonPairingSessionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.WorkspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
if _, ok := h.requireWorkspaceMember(w, r, req.WorkspaceID, "workspace not found"); !ok {
return
}
if h.DB == nil {
writeError(w, http.StatusInternalServerError, "database executor is not configured")
return
}
if _, err := h.DB.Exec(r.Context(), `
UPDATE daemon_pairing_session
SET
workspace_id = $2,
approved_by = $3,
status = 'approved',
approved_at = now(),
updated_at = now()
WHERE token = $1 AND status = 'pending'
`, token, parseUUID(req.WorkspaceID), parseUUID(userID)); err != nil {
writeError(w, http.StatusInternalServerError, "failed to approve pairing session")
return
}
rec, err = h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to reload pairing session")
return
}
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
}
func (h *Handler) ClaimDaemonPairingSession(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
rec, err := h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusNotFound, "pairing session not found")
return
}
if rec.Status == "claimed" {
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
return
}
if rec.Status != "approved" {
writeError(w, http.StatusBadRequest, "pairing session is not approved")
return
}
if h.DB == nil {
writeError(w, http.StatusInternalServerError, "database executor is not configured")
return
}
if _, err := h.DB.Exec(r.Context(), `
UPDATE daemon_pairing_session
SET
status = 'claimed',
claimed_at = now(),
updated_at = now()
WHERE token = $1 AND status = 'approved'
`, token); err != nil {
writeError(w, http.StatusInternalServerError, "failed to claim pairing session")
return
}
rec, err = h.getDaemonPairingSession(r.Context(), token)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to reload pairing session")
return
}
writeJSON(w, http.StatusOK, daemonPairingSessionToResponse(rec, true))
}

View file

@ -0,0 +1,338 @@
package handler
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"log/slog"
"net/http"
"path"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
const maxUploadSize = 10 << 20 // 10 MB
// Allowed MIME type prefixes and exact types for uploads.
var allowedContentTypes = map[string]bool{
"image/png": true,
"image/jpeg": true,
"image/gif": true,
"image/webp": true,
"image/svg+xml": true,
"application/pdf": true,
"text/plain": true,
"text/csv": true,
"application/json": true,
"video/mp4": true,
"video/webm": true,
"audio/mpeg": true,
"audio/wav": true,
"application/zip": true,
}
func isContentTypeAllowed(ct string) bool {
// Normalize: take only the media type, strip parameters like charset.
ct = strings.TrimSpace(strings.SplitN(ct, ";", 2)[0])
ct = strings.ToLower(ct)
return allowedContentTypes[ct]
}
// ---------------------------------------------------------------------------
// Response types
// ---------------------------------------------------------------------------
type AttachmentResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
IssueID *string `json:"issue_id"`
CommentID *string `json:"comment_id"`
UploaderType string `json:"uploader_type"`
UploaderID string `json:"uploader_id"`
Filename string `json:"filename"`
URL string `json:"url"`
DownloadURL string `json:"download_url"`
ContentType string `json:"content_type"`
SizeBytes int64 `json:"size_bytes"`
CreatedAt string `json:"created_at"`
}
func (h *Handler) attachmentToResponse(a db.Attachment) AttachmentResponse {
resp := AttachmentResponse{
ID: uuidToString(a.ID),
WorkspaceID: uuidToString(a.WorkspaceID),
UploaderType: a.UploaderType,
UploaderID: uuidToString(a.UploaderID),
Filename: a.Filename,
URL: a.Url,
DownloadURL: a.Url,
ContentType: a.ContentType,
SizeBytes: a.SizeBytes,
CreatedAt: a.CreatedAt.Time.Format("2006-01-02T15:04:05Z07:00"),
}
if h.CFSigner != nil {
resp.DownloadURL = h.CFSigner.SignedURL(a.Url, time.Now().Add(5*time.Minute))
}
if a.IssueID.Valid {
s := uuidToString(a.IssueID)
resp.IssueID = &s
}
if a.CommentID.Valid {
s := uuidToString(a.CommentID)
resp.CommentID = &s
}
return resp
}
// groupAttachments loads attachments for multiple comments and groups them by comment ID.
func (h *Handler) groupAttachments(r *http.Request, commentIDs []pgtype.UUID) map[string][]AttachmentResponse {
if len(commentIDs) == 0 {
return nil
}
attachments, err := h.Queries.ListAttachmentsByCommentIDs(r.Context(), commentIDs)
if err != nil {
slog.Error("failed to load attachments for comments", "error", err)
return nil
}
grouped := make(map[string][]AttachmentResponse, len(commentIDs))
for _, a := range attachments {
cid := uuidToString(a.CommentID)
grouped[cid] = append(grouped[cid], h.attachmentToResponse(a))
}
return grouped
}
// ---------------------------------------------------------------------------
// UploadFile — POST /api/upload-file
// ---------------------------------------------------------------------------
func (h *Handler) UploadFile(w http.ResponseWriter, r *http.Request) {
if h.Storage == nil {
writeError(w, http.StatusServiceUnavailable, "file upload not configured")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := resolveWorkspaceID(r)
r.Body = http.MaxBytesReader(w, r.Body, maxUploadSize)
if err := r.ParseMultipartForm(maxUploadSize); err != nil {
writeError(w, http.StatusBadRequest, "file too large or invalid multipart form")
return
}
defer r.MultipartForm.RemoveAll()
file, header, err := r.FormFile("file")
if err != nil {
writeError(w, http.StatusBadRequest, fmt.Sprintf("missing file field: %v", err))
return
}
defer file.Close()
// Sniff actual content type from file bytes instead of trusting the client header.
buf := make([]byte, 512)
n, err := file.Read(buf)
if err != nil && err != io.EOF {
writeError(w, http.StatusBadRequest, "failed to read file")
return
}
contentType := http.DetectContentType(buf[:n])
if !isContentTypeAllowed(contentType) {
writeError(w, http.StatusBadRequest, fmt.Sprintf("file type not allowed: %s", contentType))
return
}
// Seek back so the full file is uploaded.
if _, err := file.Seek(0, io.SeekStart); err != nil {
writeError(w, http.StatusInternalServerError, "failed to read file")
return
}
data, err := io.ReadAll(file)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read file")
return
}
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
slog.Error("failed to generate file key", "error", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
key := hex.EncodeToString(b) + path.Ext(header.Filename)
link, err := h.Storage.Upload(r.Context(), key, data, contentType, header.Filename)
if err != nil {
slog.Error("file upload failed", "error", err)
writeError(w, http.StatusInternalServerError, "upload failed")
return
}
// If workspace context is available, create an attachment record.
if workspaceID != "" {
uploaderType, uploaderID := h.resolveActor(r, userID, workspaceID)
params := db.CreateAttachmentParams{
WorkspaceID: parseUUID(workspaceID),
UploaderType: uploaderType,
UploaderID: parseUUID(uploaderID),
Filename: header.Filename,
Url: link,
ContentType: contentType,
SizeBytes: int64(len(data)),
}
// Optional issue_id / comment_id from form fields
if issueID := r.FormValue("issue_id"); issueID != "" {
params.IssueID = parseUUID(issueID)
}
if commentID := r.FormValue("comment_id"); commentID != "" {
params.CommentID = parseUUID(commentID)
}
att, err := h.Queries.CreateAttachment(r.Context(), params)
if err != nil {
slog.Error("failed to create attachment record", "error", err)
// S3 upload succeeded but DB record failed — still return the link
// so the file is usable. Log the error for investigation.
} else {
writeJSON(w, http.StatusOK, h.attachmentToResponse(att))
return
}
}
// Fallback response (no workspace context, e.g. avatar upload)
writeJSON(w, http.StatusOK, map[string]string{
"filename": header.Filename,
"link": link,
})
}
// ---------------------------------------------------------------------------
// ListAttachments — GET /api/issues/{id}/attachments
// ---------------------------------------------------------------------------
func (h *Handler) ListAttachments(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
attachments, err := h.Queries.ListAttachmentsByIssue(r.Context(), db.ListAttachmentsByIssueParams{
IssueID: issue.ID,
WorkspaceID: issue.WorkspaceID,
})
if err != nil {
slog.Error("failed to list attachments", "error", err)
writeError(w, http.StatusInternalServerError, "failed to list attachments")
return
}
resp := make([]AttachmentResponse, len(attachments))
for i, a := range attachments {
resp[i] = h.attachmentToResponse(a)
}
writeJSON(w, http.StatusOK, resp)
}
// ---------------------------------------------------------------------------
// DeleteAttachment — DELETE /api/attachments/{id}
// ---------------------------------------------------------------------------
func (h *Handler) DeleteAttachment(w http.ResponseWriter, r *http.Request) {
attachmentID := chi.URLParam(r, "id")
workspaceID := resolveWorkspaceID(r)
if workspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
att, err := h.Queries.GetAttachment(r.Context(), db.GetAttachmentParams{
ID: parseUUID(attachmentID),
WorkspaceID: parseUUID(workspaceID),
})
if err != nil {
writeError(w, http.StatusNotFound, "attachment not found")
return
}
// Only the uploader (or workspace admin) can delete
uploaderID := uuidToString(att.UploaderID)
isUploader := att.UploaderType == "member" && uploaderID == userID
member, hasMember := ctxMember(r.Context())
isAdmin := hasMember && (member.Role == "admin" || member.Role == "owner")
if !isUploader && !isAdmin {
writeError(w, http.StatusForbidden, "not authorized to delete this attachment")
return
}
if err := h.Queries.DeleteAttachment(r.Context(), db.DeleteAttachmentParams{
ID: att.ID,
WorkspaceID: att.WorkspaceID,
}); err != nil {
slog.Error("failed to delete attachment", "error", err)
writeError(w, http.StatusInternalServerError, "failed to delete attachment")
return
}
h.deleteS3Object(r.Context(), att.Url)
w.WriteHeader(http.StatusNoContent)
}
// ---------------------------------------------------------------------------
// Attachment linking
// ---------------------------------------------------------------------------
// linkAttachmentsByIDs links the given attachment IDs to a comment.
// Only updates attachments that belong to the same issue and have no comment_id yet.
func (h *Handler) linkAttachmentsByIDs(ctx context.Context, commentID, issueID pgtype.UUID, ids []string) {
uuids := make([]pgtype.UUID, len(ids))
for i, id := range ids {
uuids[i] = parseUUID(id)
}
if err := h.Queries.LinkAttachmentsToComment(ctx, db.LinkAttachmentsToCommentParams{
CommentID: commentID,
IssueID: issueID,
Column3: uuids,
}); err != nil {
slog.Error("failed to link attachments to comment", "error", err)
}
}
// deleteS3Object removes a single file from S3 by its CDN URL.
func (h *Handler) deleteS3Object(ctx context.Context, url string) {
if h.Storage == nil || url == "" {
return
}
h.Storage.Delete(ctx, h.Storage.KeyFromURL(url))
}
// deleteS3Objects removes multiple files from S3 by their CDN URLs.
func (h *Handler) deleteS3Objects(ctx context.Context, urls []string) {
if h.Storage == nil || len(urls) == 0 {
return
}
keys := make([]string, len(urls))
for i, u := range urls {
keys[i] = h.Storage.KeyFromURL(u)
}
h.Storage.DeleteKeys(ctx, keys)
}

View file

@ -12,10 +12,12 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/internal/auth"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/middleware"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/storage"
"github.com/multica-ai/multica/server/internal/util"
)
@ -38,9 +40,11 @@ type Handler struct {
TaskService *service.TaskService
EmailService *service.EmailService
PingStore *PingStore
Storage *storage.S3Storage
CFSigner *auth.CloudFrontSigner
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService) *Handler {
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService, s3 *storage.S3Storage, cfSigner *auth.CloudFrontSigner) *Handler {
var executor dbExecutor
if candidate, ok := txStarter.(dbExecutor); ok {
executor = candidate
@ -55,6 +59,8 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
TaskService: service.NewTaskService(queries, hub, bus),
EmailService: emailService,
PingStore: NewPingStore(),
Storage: s3,
CFSigner: cfSigner,
}
}

View file

@ -53,7 +53,7 @@ func TestMain(m *testing.M) {
go hub.Run()
bus := events.New()
emailSvc := service.NewEmailService()
testHandler = New(queries, pool, hub, bus, emailSvc)
testHandler = New(queries, pool, hub, bus, emailSvc, nil, nil)
testPool = pool
testUserID, testWorkspaceID, err = setupHandlerTestFixture(ctx, pool)
@ -729,6 +729,7 @@ func TestDaemonRegisterMissingWorkspaceReturns404(t *testing.T) {
"runtimes":[{"name":"Local Codex","type":"codex","version":"1.0.0","status":"online"}]
}`))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-User-ID", testUserID)
testHandler.DaemonRegister(w, req)
if w.Code != http.StatusNotFound {

View file

@ -5,7 +5,6 @@ import (
"encoding/json"
"log/slog"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
@ -93,25 +92,10 @@ func (h *Handler) ListInbox(w http.ResponseWriter, r *http.Request) {
}
workspaceID := r.Header.Get("X-Workspace-ID")
limit := 50
offset := 0
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil {
limit = v
}
}
if o := r.URL.Query().Get("offset"); o != "" {
if v, err := strconv.Atoi(o); err == nil {
offset = v
}
}
items, err := h.Queries.ListInboxItems(r.Context(), db.ListInboxItemsParams{
WorkspaceID: parseUUID(workspaceID),
RecipientType: "member",
RecipientID: parseUUID(userID),
Limit: int32(limit),
Offset: int32(offset),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list inbox")

View file

@ -515,6 +515,30 @@ func (h *Handler) isAgentTriggerEnabled(ctx context.Context, issue db.Issue, tri
return false
}
// isAgentMentionTriggerEnabled checks if a specific agent has the on_mention
// trigger enabled. Unlike isAgentTriggerEnabled, this takes an explicit agent
// ID rather than deriving it from the issue assignee.
func (h *Handler) isAgentMentionTriggerEnabled(ctx context.Context, agentID pgtype.UUID) bool {
agent, err := h.Queries.GetAgent(ctx, agentID)
if err != nil || !agent.RuntimeID.Valid {
return false
}
if agent.Triggers == nil || len(agent.Triggers) == 0 {
return true // No config = all triggers enabled by default
}
var triggers []agentTriggerSnapshot
if err := json.Unmarshal(agent.Triggers, &triggers); err != nil {
return false
}
for _, trigger := range triggers {
if trigger.Type == "on_mention" {
return trigger.Enabled
}
}
return true // on_mention not configured = enabled by default
}
func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, id)
@ -524,12 +548,16 @@ func (h *Handler) DeleteIssue(w http.ResponseWriter, r *http.Request) {
h.TaskService.CancelTasksForIssue(r.Context(), issue.ID)
// Collect all attachment URLs (issue-level + comment-level) before CASCADE delete.
attachmentURLs, _ := h.Queries.ListAttachmentURLsByIssueOrComments(r.Context(), issue.ID)
err := h.Queries.DeleteIssue(r.Context(), parseUUID(id))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete issue")
return
}
h.deleteS3Objects(r.Context(), attachmentURLs)
userID := requestUserID(r)
actorType, actorID := h.resolveActor(r, userID, uuidToString(issue.WorkspaceID))
h.publish(protocol.EventIssueDeleted, uuidToString(issue.WorkspaceID), actorType, actorID, map[string]any{"issue_id": id})