From 348133b63d1730ef73afe256702ad0402f3109bf Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Tue, 7 Apr 2026 18:08:35 +0800 Subject: [PATCH] merge: resolve conflicts with main (open_only pagination) - Resolve issues/store.ts: keep client-only store, port pagination strategy (open_only + closed page) to core/issues/queries.ts - Resolve issues-page.tsx, batch-action-toolbar.tsx: keep TQ mutations - Auto-merge agents/page.tsx trigger null fix Co-Authored-By: Claude Opus 4.6 (1M context) --- .env.example | 1 + apps/web/app/(auth)/login/page.tsx | 58 +- apps/web/app/(dashboard)/agents/page.tsx | 21 +- apps/web/app/auth/callback/page.tsx | 90 +++ apps/web/core/issues/queries.ts | 15 +- apps/web/features/auth/store.ts | 10 + apps/web/features/landing/i18n/en.ts | 29 + apps/web/features/landing/i18n/zh.ts | 29 + apps/web/shared/api/client.ts | 8 + apps/web/shared/types/agent.ts | 2 +- apps/web/shared/types/api.ts | 1 + server/cmd/server/router.go | 1 + server/internal/daemon/config.go | 11 +- server/internal/daemon/daemon.go | 8 + .../internal/daemon/execenv/runtime_config.go | 21 +- server/internal/handler/auth.go | 162 ++++- server/internal/handler/comment.go | 16 +- server/internal/handler/issue.go | 62 +- server/internal/middleware/cloudfront.go | 2 +- server/pkg/agent/agent.go | 10 +- server/pkg/agent/openclaw.go | 313 ++++++++++ server/pkg/agent/openclaw_test.go | 574 ++++++++++++++++++ server/pkg/db/generated/issue.sql.go | 81 +++ server/pkg/db/queries/issue.sql | 15 + 24 files changed, 1489 insertions(+), 51 deletions(-) create mode 100644 apps/web/app/auth/callback/page.tsx create mode 100644 server/pkg/agent/openclaw.go create mode 100644 server/pkg/agent/openclaw_test.go diff --git a/.env.example b/.env.example index bfa38ae7..8a98d2d8 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,7 @@ RESEND_FROM_EMAIL=noreply@multica.ai GOOGLE_CLIENT_ID= GOOGLE_CLIENT_SECRET= GOOGLE_REDIRECT_URI=http://localhost:3000/auth/callback +NEXT_PUBLIC_GOOGLE_CLIENT_ID= # S3 / CloudFront S3_BUCKET= diff --git a/apps/web/app/(auth)/login/page.tsx b/apps/web/app/(auth)/login/page.tsx index 74d60858..34194933 100644 --- a/apps/web/app/(auth)/login/page.tsx +++ b/apps/web/app/(auth)/login/page.tsx @@ -282,6 +282,22 @@ function LoginPageContent() { ); } + const googleClientId = process.env.NEXT_PUBLIC_GOOGLE_CLIENT_ID; + + const handleGoogleLogin = () => { + if (!googleClientId) return; + const redirectUri = `${window.location.origin}/auth/callback`; + const params = new URLSearchParams({ + client_id: googleClientId, + redirect_uri: redirectUri, + response_type: "code", + scope: "openid email profile", + access_type: "offline", + prompt: "select_account", + }); + window.location.href = `https://accounts.google.com/o/oauth2/v2/auth?${params}`; + }; + return (
@@ -307,7 +323,7 @@ function LoginPageContent() { )} - + + {googleClientId && ( + <> +
+
+ +
+
+ or +
+
+ + + )}
diff --git a/apps/web/app/(dashboard)/agents/page.tsx b/apps/web/app/(dashboard)/agents/page.tsx index 24ad9e44..bd3c6ddd 100644 --- a/apps/web/app/(dashboard)/agents/page.tsx +++ b/apps/web/app/(dashboard)/agents/page.tsx @@ -923,7 +923,13 @@ function TriggersTab({
- {triggers.map((trigger) => ( + {triggers.map((trigger) => { + const scheduledConfig = (trigger.config ?? {}) as { + cron?: string; + timezone?: string; + }; + + return (
@@ -986,10 +992,10 @@ function TriggersTab({ updateTriggerConfig(trigger.id, { - ...trigger.config, + ...scheduledConfig, cron: e.target.value, }) } @@ -1003,10 +1009,10 @@ function TriggersTab({ updateTriggerConfig(trigger.id, { - ...trigger.config, + ...scheduledConfig, timezone: e.target.value, }) } @@ -1017,7 +1023,8 @@ function TriggersTab({
)}
- ))} + ); + })}
diff --git a/apps/web/app/auth/callback/page.tsx b/apps/web/app/auth/callback/page.tsx new file mode 100644 index 00000000..660f9fd6 --- /dev/null +++ b/apps/web/app/auth/callback/page.tsx @@ -0,0 +1,90 @@ +"use client"; + +import { Suspense, useEffect, useState } from "react"; +import { useSearchParams, useRouter } from "next/navigation"; +import { useAuthStore } from "@/features/auth"; +import { useWorkspaceStore } from "@/features/workspace"; +import { api } from "@/shared/api"; +import { + Card, + CardHeader, + CardTitle, + CardDescription, + CardContent, +} from "@/components/ui/card"; +import { Loader2 } from "lucide-react"; + +function CallbackContent() { + const router = useRouter(); + const searchParams = useSearchParams(); + const loginWithGoogle = useAuthStore((s) => s.loginWithGoogle); + const hydrateWorkspace = useWorkspaceStore((s) => s.hydrateWorkspace); + const [error, setError] = useState(""); + + useEffect(() => { + const code = searchParams.get("code"); + if (!code) { + setError("Missing authorization code"); + return; + } + + const errorParam = searchParams.get("error"); + if (errorParam) { + setError(errorParam === "access_denied" ? "Access denied" : errorParam); + return; + } + + const redirectUri = `${window.location.origin}/auth/callback`; + + loginWithGoogle(code, redirectUri) + .then(async () => { + const wsList = await api.listWorkspaces(); + const lastWsId = localStorage.getItem("multica_workspace_id"); + await hydrateWorkspace(wsList, lastWsId); + router.push("/issues"); + }) + .catch((err) => { + setError(err instanceof Error ? err.message : "Login failed"); + }); + }, [searchParams, loginWithGoogle, hydrateWorkspace, router]); + + if (error) { + return ( +
+ + + Login Failed + {error} + + + + Back to login + + + +
+ ); + } + + return ( +
+ + + Signing in... + Please wait while we complete your login + + + + + +
+ ); +} + +export default function CallbackPage() { + return ( + + + + ); +} diff --git a/apps/web/core/issues/queries.ts b/apps/web/core/issues/queries.ts index bdf50d03..2d30d5a3 100644 --- a/apps/web/core/issues/queries.ts +++ b/apps/web/core/issues/queries.ts @@ -12,15 +12,28 @@ export const issueKeys = { ["issues", "subscribers", issueId] as const, }; +const CLOSED_PAGE_SIZE = 50; + /** * CACHE SHAPE NOTE: The raw cache stores ListIssuesResponse ({ issues, total }), * but `select` transforms it to Issue[] for consumers. Mutations and ws-updaters * must use setQueryData(...) — NOT setQueryData. + * + * Fetches all open issues + first page of closed issues (matching main's pagination strategy). */ export function issueListOptions(wsId: string) { return queryOptions({ queryKey: issueKeys.list(wsId), - queryFn: () => api.listIssues({ limit: 200 }), + queryFn: async () => { + const [openRes, closedRes] = await Promise.all([ + api.listIssues({ open_only: true }), + api.listIssues({ status: "done", limit: CLOSED_PAGE_SIZE, offset: 0 }), + ]); + return { + issues: [...openRes.issues, ...closedRes.issues], + total: openRes.total + closedRes.total, + }; + }, select: (data) => data.issues, }); } diff --git a/apps/web/features/auth/store.ts b/apps/web/features/auth/store.ts index 0f6ce7be..12f85513 100644 --- a/apps/web/features/auth/store.ts +++ b/apps/web/features/auth/store.ts @@ -12,6 +12,7 @@ interface AuthState { initialize: () => Promise; sendCode: (email: string) => Promise; verifyCode: (email: string, code: string) => Promise; + loginWithGoogle: (code: string, redirectUri: string) => Promise; logout: () => void; setUser: (user: User) => void; } @@ -53,6 +54,15 @@ export const useAuthStore = create((set) => ({ return user; }, + loginWithGoogle: async (code: string, redirectUri: string) => { + const { token, user } = await api.googleLogin(code, redirectUri); + localStorage.setItem("multica_token", token); + api.setToken(token); + setLoggedInCookie(); + set({ user }); + return user; + }, + logout: () => { localStorage.removeItem("multica_token"); api.setToken(null); diff --git a/apps/web/features/landing/i18n/en.ts b/apps/web/features/landing/i18n/en.ts index 5ca5c6a2..233edf6a 100644 --- a/apps/web/features/landing/i18n/en.ts +++ b/apps/web/features/landing/i18n/en.ts @@ -272,6 +272,35 @@ export const en: LandingDict = { title: "Changelog", subtitle: "New updates and improvements to Multica.", entries: [ + { + version: "0.1.8", + date: "2026-04-07", + title: "OAuth, OpenClaw & Issue Loading", + changes: [ + "Google OAuth login", + "OpenClaw runtime support for running agents on OpenClaw infrastructure", + "Redesigned agent live card — always sticky with manual expand/collapse toggle", + "Load all open issues without pagination limit; closed issues paginate on scroll", + "JWT and CloudFront cookie expiration extended from 72 hours to 30 days", + "Remember last selected workspace after re-login", + "Daemon ensures multica CLI is on PATH in agent task environment", + "PR template and CLI install guide for agent-driven setup", + ], + }, + { + version: "0.1.7", + date: "2026-04-05", + title: "Comment Pagination & CLI Polish", + changes: [ + "Comment list pagination in both the API and CLI", + "Inbox archive now dismisses all items for the same issue at once", + "CLI help output overhauled to match gh CLI style with examples", + "Attachments use UUIDv7 as S3 key and auto-link on issue/comment creation", + "@mention assigned agents on done or cancelled issues", + "Reply @mention inheritance skips when the reply only mentions members", + "Worktree setup preserves existing .env.worktree variables", + ], + }, { version: "0.1.6", date: "2026-04-03", diff --git a/apps/web/features/landing/i18n/zh.ts b/apps/web/features/landing/i18n/zh.ts index 9b463178..d7ec3d01 100644 --- a/apps/web/features/landing/i18n/zh.ts +++ b/apps/web/features/landing/i18n/zh.ts @@ -272,6 +272,35 @@ export const zh: LandingDict = { title: "\u66f4\u65b0\u65e5\u5fd7", subtitle: "Multica \u7684\u6700\u65b0\u66f4\u65b0\u548c\u6539\u8fdb\u3002", entries: [ + { + version: "0.1.8", + date: "2026-04-07", + title: "OAuth、OpenClaw 与 Issue 加载优化", + changes: [ + "支持 Google OAuth 登录", + "新增 OpenClaw 运行时,支持在 OpenClaw 基础设施上运行 Agent", + "Agent 实时卡片重新设计——始终吸顶,支持手动展开/收起", + "打开的 Issue 不再分页限制全量加载,已关闭的 Issue 滚动分页", + "JWT 和 CloudFront Cookie 有效期从 72 小时延长至 30 天", + "重新登录后记住上次选择的工作区", + "守护进程确保 Agent 任务环境中 multica CLI 在 PATH 上", + "新增 PR 模板和面向 Agent 的 CLI 安装指南", + ], + }, + { + version: "0.1.7", + date: "2026-04-05", + title: "评论分页与 CLI 优化", + changes: [ + "评论列表支持分页,API 和 CLI 均已适配", + "收件箱归档操作现在一次性归档同一 Issue 的所有通知", + "CLI 帮助输出重新设计,匹配 gh CLI 风格并增加示例", + "附件使用 UUIDv7 作为 S3 key,创建 Issue/评论时自动关联附件", + "支持在已完成或已取消的 Issue 上 @提及已分配的 Agent", + "回复仅 @提及成员时跳过父级提及继承逻辑", + "Worktree 环境配置保留已有的 .env.worktree 变量", + ], + }, { version: "0.1.6", date: "2026-04-03", diff --git a/apps/web/shared/api/client.ts b/apps/web/shared/api/client.ts index f7323a16..2c3a4207 100644 --- a/apps/web/shared/api/client.ts +++ b/apps/web/shared/api/client.ts @@ -144,6 +144,13 @@ export class ApiClient { }); } + async googleLogin(code: string, redirectUri: string): Promise { + return this.fetch("/auth/google", { + method: "POST", + body: JSON.stringify({ code, redirect_uri: redirectUri }), + }); + } + async getMe(): Promise { return this.fetch("/api/me"); } @@ -165,6 +172,7 @@ export class ApiClient { if (params?.status) search.set("status", params.status); if (params?.priority) search.set("priority", params.priority); if (params?.assignee_id) search.set("assignee_id", params.assignee_id); + if (params?.open_only) search.set("open_only", "true"); return this.fetch(`/api/issues?${search}`); } diff --git a/apps/web/shared/types/agent.ts b/apps/web/shared/types/agent.ts index 9f596ab4..b478d101 100644 --- a/apps/web/shared/types/agent.ts +++ b/apps/web/shared/types/agent.ts @@ -36,7 +36,7 @@ export interface AgentTrigger { id: string; type: AgentTriggerType; enabled: boolean; - config: Record; + config: Record | null; } export interface AgentTask { diff --git a/apps/web/shared/types/api.ts b/apps/web/shared/types/api.ts index 882750bc..39e4d712 100644 --- a/apps/web/shared/types/api.ts +++ b/apps/web/shared/types/api.ts @@ -32,6 +32,7 @@ export interface ListIssuesParams { status?: IssueStatus; priority?: IssuePriority; assignee_id?: string; + open_only?: boolean; } export interface ListIssuesResponse { diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 477b9ae0..a7500007 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -82,6 +82,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route // Auth (public) r.Post("/auth/send-code", h.SendCode) r.Post("/auth/verify-code", h.VerifyCode) + r.Post("/auth/google", h.GoogleLogin) // Daemon API routes (all require a valid token) r.Route("/api/daemon", func(r chi.Router) { diff --git a/server/internal/daemon/config.go b/server/internal/daemon/config.go index bb41e30e..896c7f17 100644 --- a/server/internal/daemon/config.go +++ b/server/internal/daemon/config.go @@ -30,7 +30,7 @@ type Config struct { RuntimeName string CLIVersion string // multica CLI version (e.g. "0.1.13") Profile string // profile name (empty = default) - Agents map[string]AgentEntry // "claude" -> entry, "codex" -> entry, "opencode" -> entry + Agents map[string]AgentEntry // "claude" -> entry, "codex" -> entry, "opencode" -> entry, "openclaw" -> entry WorkspacesRoot string // base path for execution envs (default: ~/multica_workspaces) KeepEnvAfterTask bool // preserve env after task for debugging HealthPort int // local HTTP port for health checks (default: 19514) @@ -92,8 +92,15 @@ func LoadConfig(overrides Overrides) (Config, error) { Model: strings.TrimSpace(os.Getenv("MULTICA_OPENCODE_MODEL")), } } + openclawPath := envOrDefault("MULTICA_OPENCLAW_PATH", "openclaw") + if _, err := exec.LookPath(openclawPath); err == nil { + agents["openclaw"] = AgentEntry{ + Path: openclawPath, + Model: strings.TrimSpace(os.Getenv("MULTICA_OPENCLAW_MODEL")), + } + } if len(agents) == 0 { - return Config{}, fmt.Errorf("no agent CLI found: install claude, codex, or opencode and ensure it is on PATH") + return Config{}, fmt.Errorf("no agent CLI found: install claude, codex, opencode, or openclaw and ensure it is on PATH") } // Host info diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 9d1b494a..b264d04c 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -921,6 +921,14 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo "MULTICA_AGENT_ID": task.AgentID, "MULTICA_TASK_ID": task.ID, } + // Ensure the multica CLI is on PATH inside the agent's environment. + // Some runtimes (e.g. Codex) run in an isolated sandbox that may not + // inherit the daemon's PATH. Prepend the directory of the running + // multica binary so that `multica` commands in the agent always resolve. + if selfBin, err := os.Executable(); err == nil { + binDir := filepath.Dir(selfBin) + agentEnv["PATH"] = binDir + string(os.PathListSeparator) + os.Getenv("PATH") + } // Point Codex to the per-task CODEX_HOME so it discovers skills natively // without polluting the system ~/.codex/skills/. if env.CodexHome != "" { diff --git a/server/internal/daemon/execenv/runtime_config.go b/server/internal/daemon/execenv/runtime_config.go index e9e8f9ce..e3b15dd6 100644 --- a/server/internal/daemon/execenv/runtime_config.go +++ b/server/internal/daemon/execenv/runtime_config.go @@ -13,13 +13,14 @@ import ( // For Claude: writes {workDir}/CLAUDE.md (skills discovered natively from .claude/skills/) // For Codex: writes {workDir}/AGENTS.md (skills discovered natively via CODEX_HOME) // For OpenCode: writes {workDir}/AGENTS.md (skills discovered natively from .config/opencode/skills/) +// For OpenClaw: writes {workDir}/AGENTS.md (skills discovered natively from .openclaw/skills/) func InjectRuntimeConfig(workDir, provider string, ctx TaskContextForEnv) error { content := buildMetaSkillContent(provider, ctx) switch provider { case "claude": return os.WriteFile(filepath.Join(workDir, "CLAUDE.md"), []byte(content), 0o644) - case "codex", "opencode": + case "codex", "opencode", "openclaw": return os.WriteFile(filepath.Join(workDir, "AGENTS.md"), []byte(content), 0o644) default: // Unknown provider — skip config injection, prompt-only mode. @@ -49,13 +50,18 @@ func buildMetaSkillContent(provider string, ctx TaskContextForEnv) string { b.WriteString("- `multica issue list [--status X] [--priority X] [--assignee X] --output json` — List issues in workspace\n") b.WriteString("- `multica issue comment list [--limit N] [--offset N] [--since ] --output json` — List comments on an issue (supports pagination; includes id, parent_id for threading)\n") b.WriteString("- `multica workspace get --output json` — Get workspace details and context\n") + b.WriteString("- `multica workspace members [workspace-id] --output json` — List workspace members (user IDs, names, roles)\n") b.WriteString("- `multica agent list --output json` — List agents in workspace\n") + b.WriteString("- `multica repo checkout ` — Check out a repository into the working directory (creates a git worktree with a dedicated branch)\n") b.WriteString("- `multica issue runs --output json` — List all execution runs for an issue (status, timestamps, errors)\n") b.WriteString("- `multica issue run-messages [--since ] --output json` — List messages for a specific execution run (supports incremental fetch)\n") b.WriteString("- `multica attachment download [-o ]` — Download an attachment file locally by ID\n\n") b.WriteString("### Write\n") + b.WriteString("- `multica issue create --title \"...\" [--description \"...\"] [--priority X] [--assignee X] [--parent ] [--status X]` — Create a new issue\n") + b.WriteString("- `multica issue assign --to ` — Assign an issue to a member or agent by name (use --unassign to remove assignee)\n") b.WriteString("- `multica issue comment add --content \"...\" [--parent ]` — Post a comment (use --parent to reply to a specific comment)\n") + b.WriteString("- `multica issue comment delete ` — Delete a comment\n") b.WriteString("- `multica issue status ` — Update issue status (todo, in_progress, in_review, done, blocked)\n") b.WriteString("- `multica issue update [--title X] [--description X] [--priority X]` — Update issue fields\n\n") @@ -99,13 +105,16 @@ func buildMetaSkillContent(provider string, ctx TaskContextForEnv) string { b.WriteString(" a. Run `multica repo checkout ` to check out the appropriate repository\n") b.WriteString(" b. `cd` into the checked-out directory\n") b.WriteString(" c. Implement the changes and commit\n") + b.WriteString(" d. Push the branch to the remote\n") + b.WriteString(" e. Create a pull request (decide the target branch based on the repo's conventions)\n") + fmt.Fprintf(&b, " f. Post the PR link as a comment: `multica issue comment add %s --content \"PR: \"`\n", ctx.IssueID) } else { b.WriteString(" a. Create a new branch\n") b.WriteString(" b. Implement the changes and commit\n") + b.WriteString(" c. Push the branch to the remote\n") + b.WriteString(" d. Create a pull request (decide the target branch based on the repo's conventions)\n") + fmt.Fprintf(&b, " e. Post the PR link as a comment: `multica issue comment add %s --content \"PR: \"`\n", ctx.IssueID) } - b.WriteString(" c. Push the branch to the remote\n") - b.WriteString(" d. Create a pull request (decide the target branch based on the repo's conventions)\n") - fmt.Fprintf(&b, " e. Post the PR link as a comment: `multica issue comment add %s --content \"PR: \"`\n", ctx.IssueID) b.WriteString("5. If the task does not require code (e.g. research, documentation), post your findings as a comment\n") fmt.Fprintf(&b, "6. Run `multica issue status %s in_review`\n", ctx.IssueID) fmt.Fprintf(&b, "7. If blocked, run `multica issue status %s blocked` and post a comment explaining why\n\n", ctx.IssueID) @@ -117,8 +126,8 @@ func buildMetaSkillContent(provider string, ctx TaskContextForEnv) string { case "claude": // Claude discovers skills natively from .claude/skills/ — just list names. b.WriteString("You have the following skills installed (discovered automatically):\n\n") - case "codex", "opencode": - // Codex and OpenCode discover skills natively from their respective paths — just list names. + case "codex", "opencode", "openclaw": + // Codex, OpenCode, and OpenClaw discover skills natively from their respective paths — just list names. b.WriteString("You have the following skills installed (discovered automatically):\n\n") default: b.WriteString("Detailed skill instructions are in `.agent_context/skills/`. Each subdirectory contains a `SKILL.md`.\n\n") diff --git a/server/internal/handler/auth.go b/server/internal/handler/auth.go index 5339190c..7fc99b96 100644 --- a/server/internal/handler/auth.go +++ b/server/internal/handler/auth.go @@ -7,8 +7,10 @@ import ( "encoding/binary" "encoding/json" "fmt" + "io" "log/slog" "net/http" + "net/url" "os" "strings" "time" @@ -175,7 +177,7 @@ func (h *Handler) issueJWT(user db.User) (string, error) { "sub": uuidToString(user.ID), "email": user.Email, "name": user.Name, - "exp": time.Now().Add(72 * time.Hour).Unix(), + "exp": time.Now().Add(30 * 24 * time.Hour).Unix(), "iat": time.Now().Unix(), }) return token.SignedString(auth.JWTSecret()) @@ -302,7 +304,7 @@ func (h *Handler) VerifyCode(w http.ResponseWriter, r *http.Request) { // Set CloudFront signed cookies for CDN access. if h.CFSigner != nil { - for _, cookie := range h.CFSigner.SignedCookies(time.Now().Add(72 * time.Hour)) { + for _, cookie := range h.CFSigner.SignedCookies(time.Now().Add(30 * 24 * time.Hour)) { http.SetCookie(w, cookie) } } @@ -334,6 +336,162 @@ type UpdateMeRequest struct { AvatarURL *string `json:"avatar_url"` } +type GoogleLoginRequest struct { + Code string `json:"code"` + RedirectURI string `json:"redirect_uri"` +} + +type googleTokenResponse struct { + AccessToken string `json:"access_token"` + IDToken string `json:"id_token"` + TokenType string `json:"token_type"` +} + +type googleUserInfo struct { + Email string `json:"email"` + Name string `json:"name"` + Picture string `json:"picture"` +} + +func (h *Handler) GoogleLogin(w http.ResponseWriter, r *http.Request) { + var req GoogleLoginRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + if req.Code == "" { + writeError(w, http.StatusBadRequest, "code is required") + return + } + + clientID := os.Getenv("GOOGLE_CLIENT_ID") + clientSecret := os.Getenv("GOOGLE_CLIENT_SECRET") + if clientID == "" || clientSecret == "" { + writeError(w, http.StatusServiceUnavailable, "Google login is not configured") + return + } + + redirectURI := req.RedirectURI + if redirectURI == "" { + redirectURI = os.Getenv("GOOGLE_REDIRECT_URI") + } + + // Exchange authorization code for tokens. + tokenResp, err := http.PostForm("https://oauth2.googleapis.com/token", url.Values{ + "code": {req.Code}, + "client_id": {clientID}, + "client_secret": {clientSecret}, + "redirect_uri": {redirectURI}, + "grant_type": {"authorization_code"}, + }) + if err != nil { + slog.Error("google oauth token exchange failed", "error", err) + writeError(w, http.StatusBadGateway, "failed to exchange code with Google") + return + } + defer tokenResp.Body.Close() + + tokenBody, err := io.ReadAll(tokenResp.Body) + if err != nil { + writeError(w, http.StatusBadGateway, "failed to read Google token response") + return + } + + if tokenResp.StatusCode != http.StatusOK { + slog.Error("google oauth token exchange returned error", "status", tokenResp.StatusCode, "body", string(tokenBody)) + writeError(w, http.StatusBadRequest, "failed to exchange code with Google") + return + } + + var gToken googleTokenResponse + if err := json.Unmarshal(tokenBody, &gToken); err != nil { + writeError(w, http.StatusBadGateway, "failed to parse Google token response") + return + } + + // Fetch user info from Google. + userInfoReq, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, "https://www.googleapis.com/oauth2/v2/userinfo", nil) + userInfoReq.Header.Set("Authorization", "Bearer "+gToken.AccessToken) + + userInfoResp, err := http.DefaultClient.Do(userInfoReq) + if err != nil { + slog.Error("google userinfo fetch failed", "error", err) + writeError(w, http.StatusBadGateway, "failed to fetch user info from Google") + return + } + defer userInfoResp.Body.Close() + + var gUser googleUserInfo + if err := json.NewDecoder(userInfoResp.Body).Decode(&gUser); err != nil { + writeError(w, http.StatusBadGateway, "failed to parse Google user info") + return + } + + if gUser.Email == "" { + writeError(w, http.StatusBadRequest, "Google account has no email") + return + } + + email := strings.ToLower(strings.TrimSpace(gUser.Email)) + + user, err := h.findOrCreateUser(r.Context(), email) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to create user") + return + } + + // Update name and avatar from Google profile if the user was just created + // (default name is email prefix) or has no avatar yet. + needsUpdate := false + newName := user.Name + newAvatar := user.AvatarUrl + + if gUser.Name != "" && user.Name == strings.Split(email, "@")[0] { + newName = gUser.Name + needsUpdate = true + } + if gUser.Picture != "" && !user.AvatarUrl.Valid { + newAvatar = pgtype.Text{String: gUser.Picture, Valid: true} + needsUpdate = true + } + + if needsUpdate { + updated, err := h.Queries.UpdateUser(r.Context(), db.UpdateUserParams{ + ID: user.ID, + Name: newName, + AvatarUrl: newAvatar, + }) + if err == nil { + user = updated + } + } + + if err := h.ensureUserWorkspace(r.Context(), user); err != nil { + writeError(w, http.StatusInternalServerError, "failed to provision workspace") + return + } + + tokenString, err := h.issueJWT(user) + if err != nil { + slog.Warn("google login failed", append(logger.RequestAttrs(r), "error", err, "email", email)...) + writeError(w, http.StatusInternalServerError, "failed to generate token") + return + } + + if h.CFSigner != nil { + for _, cookie := range h.CFSigner.SignedCookies(time.Now().Add(72 * time.Hour)) { + http.SetCookie(w, cookie) + } + } + + slog.Info("user logged in via google", append(logger.RequestAttrs(r), "user_id", uuidToString(user.ID), "email", user.Email)...) + writeJSON(w, http.StatusOK, LoginResponse{ + Token: tokenString, + User: userToResponse(user), + }) +} + func (h *Handler) UpdateMe(w http.ResponseWriter, r *http.Request) { userID, ok := requireUserID(w, r) if !ok { diff --git a/server/internal/handler/comment.go b/server/internal/handler/comment.go index 4902f646..e2f2f7fb 100644 --- a/server/internal/handler/comment.go +++ b/server/internal/handler/comment.go @@ -357,9 +357,8 @@ func (h *Handler) isReplyToMemberThread(parent *db.Comment, content string, issu // re-triggered by subsequent replies in the same thread — unless the reply // explicitly @mentions only non-agent entities (members, issues), which // signals the user is talking to other people and not the agent. -// Skips self-mentions, agents that are already the issue's assignee (handled -// by on_comment), agents with on_mention trigger disabled, and private agents -// mentioned by non-owner members (only the agent owner or workspace +// Skips self-mentions, agents with on_mention trigger disabled, and private +// agents mentioned by non-owner members (only the agent owner or workspace // admin/owner can mention a private agent). // Note: no status gate here — @mention is an explicit action and should work // even on done/cancelled issues (the agent can reopen the issue if needed). @@ -404,17 +403,6 @@ func (h *Handler) enqueueMentionedAgentTasks(ctx context.Context, issue db.Issue continue } agentUUID := parseUUID(m.ID) - // Prevent duplicate: skip if this agent is the issue's assignee - // (already handled by the on_comment trigger above) — but only - // when the issue is in a non-terminal status where on_comment - // will actually fire. For done/cancelled issues on_comment is - // suppressed, so an explicit @mention must still go through. - isAssignee := issue.AssigneeType.Valid && issue.AssigneeType.String == "agent" && - issue.AssigneeID.Valid && uuidToString(issue.AssigneeID) == m.ID - isTerminal := issue.Status == "done" || issue.Status == "cancelled" - if isAssignee && !isTerminal { - continue - } // Load the agent to check visibility, archive status, and trigger config. agent, err := h.Queries.GetAgent(ctx, agentUUID) if err != nil || !agent.RuntimeID.Valid || agent.ArchivedAt.Valid { diff --git a/server/internal/handler/issue.go b/server/internal/handler/issue.go index 0259bb21..389bbede 100644 --- a/server/internal/handler/issue.go +++ b/server/internal/handler/issue.go @@ -83,6 +83,42 @@ func (h *Handler) ListIssues(w http.ResponseWriter, r *http.Request) { ctx := r.Context() workspaceID := resolveWorkspaceID(r) + wsUUID := parseUUID(workspaceID) + + // Parse optional filter params + var priorityFilter pgtype.Text + if p := r.URL.Query().Get("priority"); p != "" { + priorityFilter = pgtype.Text{String: p, Valid: true} + } + var assigneeFilter pgtype.UUID + if a := r.URL.Query().Get("assignee_id"); a != "" { + assigneeFilter = parseUUID(a) + } + + // open_only=true returns all non-done/cancelled issues (no limit). + if r.URL.Query().Get("open_only") == "true" { + issues, err := h.Queries.ListOpenIssues(ctx, db.ListOpenIssuesParams{ + WorkspaceID: wsUUID, + Priority: priorityFilter, + AssigneeID: assigneeFilter, + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list issues") + return + } + + prefix := h.getIssuePrefix(ctx, wsUUID) + resp := make([]IssueResponse, len(issues)) + for i, issue := range issues { + resp[i] = issueToResponse(issue, prefix) + } + + writeJSON(w, http.StatusOK, map[string]any{ + "issues": resp, + "total": len(resp), + }) + return + } limit := 100 offset := 0 @@ -97,22 +133,13 @@ func (h *Handler) ListIssues(w http.ResponseWriter, r *http.Request) { } } - // Parse optional filter params var statusFilter pgtype.Text if s := r.URL.Query().Get("status"); s != "" { statusFilter = pgtype.Text{String: s, Valid: true} } - var priorityFilter pgtype.Text - if p := r.URL.Query().Get("priority"); p != "" { - priorityFilter = pgtype.Text{String: p, Valid: true} - } - var assigneeFilter pgtype.UUID - if a := r.URL.Query().Get("assignee_id"); a != "" { - assigneeFilter = parseUUID(a) - } issues, err := h.Queries.ListIssues(ctx, db.ListIssuesParams{ - WorkspaceID: parseUUID(workspaceID), + WorkspaceID: wsUUID, Limit: int32(limit), Offset: int32(offset), Status: statusFilter, @@ -124,7 +151,18 @@ func (h *Handler) ListIssues(w http.ResponseWriter, r *http.Request) { return } - prefix := h.getIssuePrefix(ctx, parseUUID(workspaceID)) + // Get the true total count for pagination awareness. + total, err := h.Queries.CountIssues(ctx, db.CountIssuesParams{ + WorkspaceID: wsUUID, + Status: statusFilter, + Priority: priorityFilter, + AssigneeID: assigneeFilter, + }) + if err != nil { + total = int64(len(issues)) + } + + prefix := h.getIssuePrefix(ctx, wsUUID) resp := make([]IssueResponse, len(issues)) for i, issue := range issues { resp[i] = issueToResponse(issue, prefix) @@ -132,7 +170,7 @@ func (h *Handler) ListIssues(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{ "issues": resp, - "total": len(resp), + "total": total, }) } diff --git a/server/internal/middleware/cloudfront.go b/server/internal/middleware/cloudfront.go index ab749998..b6a27d75 100644 --- a/server/internal/middleware/cloudfront.go +++ b/server/internal/middleware/cloudfront.go @@ -18,7 +18,7 @@ func RefreshCloudFrontCookies(signer *auth.CloudFrontSigner) func(http.Handler) } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if _, err := r.Cookie("CloudFront-Policy"); err != nil { - for _, cookie := range signer.SignedCookies(time.Now().Add(72 * time.Hour)) { + for _, cookie := range signer.SignedCookies(time.Now().Add(30 * 24 * time.Hour)) { http.SetCookie(w, cookie) } } diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go index 383d6fe1..5636617d 100644 --- a/server/pkg/agent/agent.go +++ b/server/pkg/agent/agent.go @@ -1,5 +1,5 @@ // Package agent provides a unified interface for executing prompts via -// coding agents (Claude Code, Codex, OpenCode). It mirrors the happy-cli AgentBackend +// coding agents (Claude Code, Codex, OpenCode, OpenClaw). It mirrors the happy-cli AgentBackend // pattern, translated to idiomatic Go. package agent @@ -73,13 +73,13 @@ type Result struct { // Config configures a Backend instance. type Config struct { - ExecutablePath string // path to CLI binary (claude, codex, or opencode) + ExecutablePath string // path to CLI binary (claude, codex, opencode, or openclaw) Env map[string]string // extra environment variables Logger *slog.Logger } // New creates a Backend for the given agent type. -// Supported types: "claude", "codex", "opencode". +// Supported types: "claude", "codex", "opencode", "openclaw". func New(agentType string, cfg Config) (Backend, error) { if cfg.Logger == nil { cfg.Logger = slog.Default() @@ -92,8 +92,10 @@ func New(agentType string, cfg Config) (Backend, error) { return &codexBackend{cfg: cfg}, nil case "opencode": return &opencodeBackend{cfg: cfg}, nil + case "openclaw": + return &openclawBackend{cfg: cfg}, nil default: - return nil, fmt.Errorf("unknown agent type: %q (supported: claude, codex, opencode)", agentType) + return nil, fmt.Errorf("unknown agent type: %q (supported: claude, codex, opencode, openclaw)", agentType) } } diff --git a/server/pkg/agent/openclaw.go b/server/pkg/agent/openclaw.go new file mode 100644 index 00000000..96c240d8 --- /dev/null +++ b/server/pkg/agent/openclaw.go @@ -0,0 +1,313 @@ +package agent + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os/exec" + "strings" + "time" +) + +// openclawBackend implements Backend by spawning `openclaw agent -p +// --output-format stream-json --yes` and reading streaming NDJSON events from +// stdout — similar to the opencode backend. +type openclawBackend struct { + cfg Config +} + +func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) { + execPath := b.cfg.ExecutablePath + if execPath == "" { + execPath = "openclaw" + } + if _, err := exec.LookPath(execPath); err != nil { + return nil, fmt.Errorf("openclaw executable not found at %q: %w", execPath, err) + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = 20 * time.Minute + } + runCtx, cancel := context.WithTimeout(ctx, timeout) + + args := []string{"agent", "--output-format", "stream-json", "--yes"} + if opts.Model != "" { + args = append(args, "--model", opts.Model) + } + if opts.SystemPrompt != "" { + args = append(args, "--system-prompt", opts.SystemPrompt) + } + if opts.MaxTurns > 0 { + args = append(args, "--max-turns", fmt.Sprintf("%d", opts.MaxTurns)) + } + if opts.ResumeSessionID != "" { + args = append(args, "--session", opts.ResumeSessionID) + } + args = append(args, "-p", prompt) + + cmd := exec.CommandContext(runCtx, execPath, args...) + if opts.Cwd != "" { + cmd.Dir = opts.Cwd + } + cmd.Env = buildEnv(b.cfg.Env) + + stdout, err := cmd.StdoutPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("openclaw stdout pipe: %w", err) + } + cmd.Stderr = newLogWriter(b.cfg.Logger, "[openclaw:stderr] ") + + if err := cmd.Start(); err != nil { + cancel() + return nil, fmt.Errorf("start openclaw: %w", err) + } + + b.cfg.Logger.Info("openclaw started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model) + + msgCh := make(chan Message, 256) + resCh := make(chan Result, 1) + + go func() { + defer cancel() + defer close(msgCh) + defer close(resCh) + + startTime := time.Now() + scanResult := b.processEvents(stdout, msgCh) + + // Wait for process exit. + exitErr := cmd.Wait() + duration := time.Since(startTime) + + if runCtx.Err() == context.DeadlineExceeded { + scanResult.status = "timeout" + scanResult.errMsg = fmt.Sprintf("openclaw timed out after %s", timeout) + } else if runCtx.Err() == context.Canceled { + scanResult.status = "aborted" + scanResult.errMsg = "execution cancelled" + } else if exitErr != nil && scanResult.status == "completed" { + scanResult.status = "failed" + scanResult.errMsg = fmt.Sprintf("openclaw exited with error: %v", exitErr) + } + + b.cfg.Logger.Info("openclaw finished", "pid", cmd.Process.Pid, "status", scanResult.status, "duration", duration.Round(time.Millisecond).String()) + + resCh <- Result{ + Status: scanResult.status, + Output: scanResult.output, + Error: scanResult.errMsg, + DurationMs: duration.Milliseconds(), + SessionID: scanResult.sessionID, + } + }() + + return &Session{Messages: msgCh, Result: resCh}, nil +} + +// ── Event handlers ── + +// openclawEventResult holds accumulated state from processing the event stream. +type openclawEventResult struct { + status string + errMsg string + output string + sessionID string +} + +// processEvents reads NDJSON lines from r, dispatches events to ch, and returns +// the accumulated result. +func (b *openclawBackend) processEvents(r io.Reader, ch chan<- Message) openclawEventResult { + var output strings.Builder + var sessionID string + finalStatus := "completed" + var finalError string + + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + var event openclawEvent + if err := json.Unmarshal([]byte(line), &event); err != nil { + continue + } + + if event.SessionID != "" { + sessionID = event.SessionID + } + + switch event.Type { + case "text": + b.handleOCTextEvent(event, ch, &output) + case "thinking": + b.handleOCThinkingEvent(event, ch) + case "tool_call": + b.handleOCToolCallEvent(event, ch) + case "error": + // NOTE: error events unconditionally set finalStatus to "failed" and + // it stays sticky — subsequent text or result events won't revert it. + // This is intentional: once an error fires, the session is considered + // failed regardless of later events. + b.handleOCErrorEvent(event, ch, &finalStatus, &finalError) + case "step_start": + trySend(ch, Message{Type: MessageStatus, Status: "running"}) + case "step_end": + // Captures final session ID from step_end if present. + case "result": + // The result event only updates status on explicit failure. A + // "completed" result is a no-op because finalStatus defaults to + // "completed". Any unrecognized status (e.g. "partial") is also + // treated as success — update this if OpenClaw adds new statuses. + if event.Data != nil { + if s, ok := event.Data["status"].(string); ok && s != "" { + if s == "error" || s == "failed" { + finalStatus = "failed" + if msg, ok := event.Data["error"].(string); ok { + finalError = msg + } + } + } + } + } + } + + // Check for scanner errors (e.g. broken pipe, read errors). + if scanErr := scanner.Err(); scanErr != nil { + b.cfg.Logger.Warn("openclaw stdout scanner error", "error", scanErr) + if finalStatus == "completed" { + finalStatus = "failed" + finalError = fmt.Sprintf("stdout read error: %v", scanErr) + } + } + + return openclawEventResult{ + status: finalStatus, + errMsg: finalError, + output: output.String(), + sessionID: sessionID, + } +} + +func (b *openclawBackend) handleOCTextEvent(event openclawEvent, ch chan<- Message, output *strings.Builder) { + text := openclawExtractText(event.Data) + if text != "" { + output.WriteString(text) + trySend(ch, Message{Type: MessageText, Content: text}) + } +} + +func (b *openclawBackend) handleOCThinkingEvent(event openclawEvent, ch chan<- Message) { + text := openclawExtractText(event.Data) + if text != "" { + trySend(ch, Message{Type: MessageThinking, Content: text}) + } +} + +// handleOCToolCallEvent processes "tool_call" events from OpenClaw. A single +// tool_call event may contain both the call and result when the tool has +// completed (status == "completed"). +func (b *openclawBackend) handleOCToolCallEvent(event openclawEvent, ch chan<- Message) { + if event.Data == nil { + return + } + + name, _ := event.Data["name"].(string) + callID, _ := event.Data["callId"].(string) + + // Extract input. + var input map[string]any + if raw, ok := event.Data["input"]; ok { + if m, ok := raw.(map[string]any); ok { + input = m + } + } + + // Emit the tool-use message. + trySend(ch, Message{ + Type: MessageToolUse, + Tool: name, + CallID: callID, + Input: input, + }) + + // If the tool has completed, also emit a tool-result message. + status, _ := event.Data["status"].(string) + if status == "completed" { + outputStr := extractToolOutput(event.Data["output"]) + trySend(ch, Message{ + Type: MessageToolResult, + Tool: name, + CallID: callID, + Output: outputStr, + }) + } +} + +func (b *openclawBackend) handleOCErrorEvent(event openclawEvent, ch chan<- Message, finalStatus, finalError *string) { + errMsg := "" + if event.Data != nil { + if msg, ok := event.Data["message"].(string); ok { + errMsg = msg + } + if errMsg == "" { + if code, ok := event.Data["code"].(string); ok { + errMsg = code + } + } + } + if errMsg == "" { + errMsg = "unknown openclaw error" + } + + b.cfg.Logger.Warn("openclaw error event", "error", errMsg) + trySend(ch, Message{Type: MessageError, Content: errMsg}) + + *finalStatus = "failed" + *finalError = errMsg +} + +// openclawExtractText extracts text content from an openclaw event data map. +// Supports both flat {"text": "..."} and nested {"content": {"text": "..."}} layouts. +func openclawExtractText(data map[string]any) string { + if data == nil { + return "" + } + // Try "text" field directly. + if text, ok := data["text"].(string); ok { + return text + } + // Try nested "content.text". + if content, ok := data["content"].(map[string]any); ok { + if text, ok := content["text"].(string); ok { + return text + } + } + return "" +} + +// ── JSON types for `openclaw agent --output-format stream-json` stdout events ── + +// openclawEvent represents a single NDJSON line from OpenClaw's stream-json output. +// +// Event types: +// +// "step_start" — agent step begins +// "text" — text output from agent +// "thinking" — model reasoning/thinking +// "tool_call" — tool invocation with call and result +// "error" — error from openclaw +// "step_end" — agent step completes +// "result" — final result with status +type openclawEvent struct { + Type string `json:"type"` + SessionID string `json:"sessionId,omitempty"` + Data map[string]any `json:"data,omitempty"` +} diff --git a/server/pkg/agent/openclaw_test.go b/server/pkg/agent/openclaw_test.go new file mode 100644 index 00000000..3e3a6c38 --- /dev/null +++ b/server/pkg/agent/openclaw_test.go @@ -0,0 +1,574 @@ +package agent + +import ( + "log/slog" + "strings" + "testing" +) + +func TestNewReturnsOpenclawBackend(t *testing.T) { + t.Parallel() + b, err := New("openclaw", Config{ExecutablePath: "/nonexistent/openclaw"}) + if err != nil { + t.Fatalf("New(openclaw) error: %v", err) + } + if _, ok := b.(*openclawBackend); !ok { + t.Fatalf("expected *openclawBackend, got %T", b) + } +} + +// ── Text event tests ── + +func TestOpenclawHandleTextEvent(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + var output strings.Builder + + event := openclawEvent{ + Type: "text", + SessionID: "ses_abc", + Data: map[string]any{"text": "Hello from openclaw"}, + } + + b.handleOCTextEvent(event, ch, &output) + + if output.String() != "Hello from openclaw" { + t.Errorf("output: got %q, want %q", output.String(), "Hello from openclaw") + } + msg := <-ch + if msg.Type != MessageText { + t.Errorf("type: got %v, want MessageText", msg.Type) + } + if msg.Content != "Hello from openclaw" { + t.Errorf("content: got %q, want %q", msg.Content, "Hello from openclaw") + } +} + +func TestOpenclawHandleTextEventEmpty(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + var output strings.Builder + + event := openclawEvent{ + Type: "text", + Data: map[string]any{"text": ""}, + } + + b.handleOCTextEvent(event, ch, &output) + + if output.String() != "" { + t.Errorf("expected empty output, got %q", output.String()) + } + if len(ch) != 0 { + t.Errorf("expected no messages, got %d", len(ch)) + } +} + +func TestOpenclawHandleTextEventNilData(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + var output strings.Builder + + event := openclawEvent{Type: "text"} + + b.handleOCTextEvent(event, ch, &output) + + if output.String() != "" { + t.Errorf("expected empty output, got %q", output.String()) + } + if len(ch) != 0 { + t.Errorf("expected no messages, got %d", len(ch)) + } +} + +// ── Thinking event tests ── + +func TestOpenclawHandleThinkingEvent(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + + event := openclawEvent{ + Type: "thinking", + Data: map[string]any{"text": "Let me think about this..."}, + } + + b.handleOCThinkingEvent(event, ch) + + if len(ch) != 1 { + t.Fatalf("expected 1 message, got %d", len(ch)) + } + msg := <-ch + if msg.Type != MessageThinking { + t.Errorf("type: got %v, want MessageThinking", msg.Type) + } + if msg.Content != "Let me think about this..." { + t.Errorf("content: got %q", msg.Content) + } +} + +// ── Tool call event tests ── + +func TestOpenclawHandleToolCallCompleted(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + + event := openclawEvent{ + Type: "tool_call", + Data: map[string]any{ + "name": "bash", + "callId": "call_123", + "input": map[string]any{"command": "pwd"}, + "status": "completed", + "output": "/tmp/project\n", + }, + } + + b.handleOCToolCallEvent(event, ch) + + // Should emit both tool-use and tool-result. + if len(ch) != 2 { + t.Fatalf("expected 2 messages, got %d", len(ch)) + } + + // First: tool-use + msg := <-ch + if msg.Type != MessageToolUse { + t.Errorf("type: got %v, want MessageToolUse", msg.Type) + } + if msg.Tool != "bash" { + t.Errorf("tool: got %q, want %q", msg.Tool, "bash") + } + if msg.CallID != "call_123" { + t.Errorf("callID: got %q, want %q", msg.CallID, "call_123") + } + if cmd, ok := msg.Input["command"].(string); !ok || cmd != "pwd" { + t.Errorf("input.command: got %v", msg.Input["command"]) + } + + // Second: tool-result + msg = <-ch + if msg.Type != MessageToolResult { + t.Errorf("type: got %v, want MessageToolResult", msg.Type) + } + if msg.Output != "/tmp/project\n" { + t.Errorf("output: got %q", msg.Output) + } +} + +func TestOpenclawHandleToolCallPending(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + + event := openclawEvent{ + Type: "tool_call", + Data: map[string]any{ + "name": "read", + "callId": "call_456", + "input": map[string]any{"filePath": "/tmp/test.go"}, + "status": "pending", + }, + } + + b.handleOCToolCallEvent(event, ch) + + if len(ch) != 1 { + t.Fatalf("expected 1 message for pending tool, got %d", len(ch)) + } + msg := <-ch + if msg.Type != MessageToolUse { + t.Errorf("type: got %v, want MessageToolUse", msg.Type) + } +} + +func TestOpenclawHandleToolCallNilData(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + + event := openclawEvent{Type: "tool_call"} + + b.handleOCToolCallEvent(event, ch) + + if len(ch) != 0 { + t.Errorf("expected no messages for nil data, got %d", len(ch)) + } +} + +// ── Error event tests ── + +func TestOpenclawHandleErrorEvent(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 10) + status := "completed" + errMsg := "" + + event := openclawEvent{ + Type: "error", + SessionID: "ses_abc", + Data: map[string]any{"message": "Model not found: bad/model"}, + } + + b.handleOCErrorEvent(event, ch, &status, &errMsg) + + if status != "failed" { + t.Errorf("status: got %q, want %q", status, "failed") + } + if errMsg != "Model not found: bad/model" { + t.Errorf("error: got %q", errMsg) + } + msg := <-ch + if msg.Type != MessageError { + t.Errorf("type: got %v, want MessageError", msg.Type) + } +} + +func TestOpenclawHandleErrorEventCodeOnly(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 10) + status := "completed" + errMsg := "" + + event := openclawEvent{ + Type: "error", + Data: map[string]any{"code": "RateLimitError"}, + } + + b.handleOCErrorEvent(event, ch, &status, &errMsg) + + if errMsg != "RateLimitError" { + t.Errorf("error: got %q, want %q", errMsg, "RateLimitError") + } +} + +func TestOpenclawHandleErrorEventNilData(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 10) + status := "completed" + errMsg := "" + + event := openclawEvent{Type: "error"} + + b.handleOCErrorEvent(event, ch, &status, &errMsg) + + if errMsg != "unknown openclaw error" { + t.Errorf("error: got %q, want %q", errMsg, "unknown openclaw error") + } +} + +// ── Integration-level tests: processEvents ── + +func TestOpenclawProcessEventsHappyPath(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + // Simulate a successful run: step_start → text → tool_call → text → step_end + lines := strings.Join([]string{ + `{"type":"step_start","sessionId":"ses_happy"}`, + `{"type":"text","sessionId":"ses_happy","data":{"text":"Analyzing..."}}`, + `{"type":"tool_call","sessionId":"ses_happy","data":{"name":"bash","callId":"call_1","input":{"command":"ls"},"status":"completed","output":"file.go\n"}}`, + `{"type":"text","sessionId":"ses_happy","data":{"text":" Done."}}`, + `{"type":"step_end","sessionId":"ses_happy"}`, + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.status != "completed" { + t.Errorf("status: got %q, want %q", result.status, "completed") + } + if result.sessionID != "ses_happy" { + t.Errorf("sessionID: got %q, want %q", result.sessionID, "ses_happy") + } + if result.output != "Analyzing... Done." { + t.Errorf("output: got %q, want %q", result.output, "Analyzing... Done.") + } + if result.errMsg != "" { + t.Errorf("errMsg: got %q, want empty", result.errMsg) + } + + // Drain and verify messages. + close(ch) + var msgs []Message + for m := range ch { + msgs = append(msgs, m) + } + + // Expected: status(running), text, tool-use, tool-result, text = 5 messages + if len(msgs) != 5 { + t.Fatalf("expected 5 messages, got %d: %+v", len(msgs), msgs) + } + if msgs[0].Type != MessageStatus || msgs[0].Status != "running" { + t.Errorf("msg[0]: got %+v, want status=running", msgs[0]) + } + if msgs[1].Type != MessageText || msgs[1].Content != "Analyzing..." { + t.Errorf("msg[1]: got %+v", msgs[1]) + } + if msgs[2].Type != MessageToolUse || msgs[2].Tool != "bash" { + t.Errorf("msg[2]: got %+v, want tool-use(bash)", msgs[2]) + } + if msgs[3].Type != MessageToolResult || msgs[3].Output != "file.go\n" { + t.Errorf("msg[3]: got %+v, want tool-result", msgs[3]) + } + if msgs[4].Type != MessageText || msgs[4].Content != " Done." { + t.Errorf("msg[4]: got %+v", msgs[4]) + } +} + +func TestOpenclawProcessEventsErrorCausesFailedStatus(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + lines := strings.Join([]string{ + `{"type":"step_start","sessionId":"ses_err"}`, + `{"type":"error","sessionId":"ses_err","data":{"message":"Model not found: bad/model"}}`, + `{"type":"step_end","sessionId":"ses_err"}`, + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.status != "failed" { + t.Errorf("status: got %q, want %q", result.status, "failed") + } + if result.errMsg != "Model not found: bad/model" { + t.Errorf("errMsg: got %q", result.errMsg) + } + if result.sessionID != "ses_err" { + t.Errorf("sessionID: got %q, want %q", result.sessionID, "ses_err") + } + + close(ch) + var errorMsgs int + for m := range ch { + if m.Type == MessageError { + errorMsgs++ + } + } + if errorMsgs != 1 { + t.Errorf("expected 1 error message, got %d", errorMsgs) + } +} + +func TestOpenclawProcessEventsSessionIDExtracted(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + lines := strings.Join([]string{ + `{"type":"step_start","sessionId":"ses_first"}`, + `{"type":"text","sessionId":"ses_updated","data":{"text":"hi"}}`, + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.sessionID != "ses_updated" { + t.Errorf("sessionID: got %q, want %q (should use last seen)", result.sessionID, "ses_updated") + } + + close(ch) +} + +func TestOpenclawProcessEventsScannerError(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + result := b.processEvents(&ioErrReader{ + data: `{"type":"text","sessionId":"ses_scan","data":{"text":"before error"}}` + "\n", + }, ch) + + if result.status != "failed" { + t.Errorf("status: got %q, want %q", result.status, "failed") + } + if !strings.Contains(result.errMsg, "stdout read error") { + t.Errorf("errMsg: got %q, want it to contain 'stdout read error'", result.errMsg) + } + if result.output != "before error" { + t.Errorf("output: got %q, want %q", result.output, "before error") + } + + close(ch) +} + +func TestOpenclawProcessEventsEmptyLines(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + lines := strings.Join([]string{ + "", + " ", + "not json at all", + `{"type":"text","sessionId":"ses_ok","data":{"text":"valid"}}`, + "", + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.status != "completed" { + t.Errorf("status: got %q, want %q", result.status, "completed") + } + if result.output != "valid" { + t.Errorf("output: got %q, want %q", result.output, "valid") + } + if result.sessionID != "ses_ok" { + t.Errorf("sessionID: got %q, want %q", result.sessionID, "ses_ok") + } + + close(ch) + var msgs []Message + for m := range ch { + msgs = append(msgs, m) + } + if len(msgs) != 1 || msgs[0].Type != MessageText { + t.Errorf("expected 1 text message, got %d: %+v", len(msgs), msgs) + } +} + +func TestOpenclawProcessEventsErrorDoesNotRevertToCompleted(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + lines := strings.Join([]string{ + `{"type":"error","sessionId":"ses_x","data":{"message":"RateLimitError"}}`, + `{"type":"text","sessionId":"ses_x","data":{"text":"recovered?"}}`, + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.status != "failed" { + t.Errorf("status: got %q, want %q (error should stick)", result.status, "failed") + } + if result.errMsg != "RateLimitError" { + t.Errorf("errMsg: got %q, want %q", result.errMsg, "RateLimitError") + } + + close(ch) +} + +func TestOpenclawProcessEventsResultEvent(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + lines := strings.Join([]string{ + `{"type":"text","sessionId":"ses_r","data":{"text":"Done"}}`, + `{"type":"result","sessionId":"ses_r","data":{"status":"completed"}}`, + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.status != "completed" { + t.Errorf("status: got %q, want %q", result.status, "completed") + } + if result.output != "Done" { + t.Errorf("output: got %q, want %q", result.output, "Done") + } + + close(ch) +} + +func TestOpenclawProcessEventsResultErrorStatus(t *testing.T) { + t.Parallel() + + b := &openclawBackend{cfg: Config{Logger: slog.Default()}} + ch := make(chan Message, 256) + + lines := strings.Join([]string{ + `{"type":"result","sessionId":"ses_rf","data":{"status":"error","error":"out of tokens"}}`, + }, "\n") + + result := b.processEvents(strings.NewReader(lines), ch) + + if result.status != "failed" { + t.Errorf("status: got %q, want %q", result.status, "failed") + } + if result.errMsg != "out of tokens" { + t.Errorf("errMsg: got %q, want %q", result.errMsg, "out of tokens") + } + + close(ch) +} + +// ── openclawExtractText tests ── + +func TestExtractEventTextDirect(t *testing.T) { + t.Parallel() + data := map[string]any{"text": "hello"} + if got := openclawExtractText(data); got != "hello" { + t.Errorf("got %q, want %q", got, "hello") + } +} + +func TestExtractEventTextNested(t *testing.T) { + t.Parallel() + data := map[string]any{ + "content": map[string]any{"text": "nested hello"}, + } + if got := openclawExtractText(data); got != "nested hello" { + t.Errorf("got %q, want %q", got, "nested hello") + } +} + +func TestExtractEventTextNil(t *testing.T) { + t.Parallel() + if got := openclawExtractText(nil); got != "" { + t.Errorf("got %q, want empty", got) + } +} + +// ── Thinking event with nested content ── + +func TestOpenclawHandleThinkingEventNestedContent(t *testing.T) { + t.Parallel() + + b := &openclawBackend{} + ch := make(chan Message, 10) + + event := openclawEvent{ + Type: "thinking", + Data: map[string]any{ + "content": map[string]any{"text": "Nested thinking"}, + }, + } + + b.handleOCThinkingEvent(event, ch) + + if len(ch) != 1 { + t.Fatalf("expected 1 message, got %d", len(ch)) + } + msg := <-ch + if msg.Type != MessageThinking { + t.Errorf("type: got %v, want MessageThinking", msg.Type) + } + if msg.Content != "Nested thinking" { + t.Errorf("content: got %q, want %q", msg.Content, "Nested thinking") + } +} diff --git a/server/pkg/db/generated/issue.sql.go b/server/pkg/db/generated/issue.sql.go index f899eb6e..97ec6788 100644 --- a/server/pkg/db/generated/issue.sql.go +++ b/server/pkg/db/generated/issue.sql.go @@ -11,6 +11,33 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const countIssues = `-- name: CountIssues :one +SELECT count(*) FROM issue +WHERE workspace_id = $1 + AND ($2::text IS NULL OR status = $2) + AND ($3::text IS NULL OR priority = $3) + AND ($4::uuid IS NULL OR assignee_id = $4) +` + +type CountIssuesParams struct { + WorkspaceID pgtype.UUID `json:"workspace_id"` + Status pgtype.Text `json:"status"` + Priority pgtype.Text `json:"priority"` + AssigneeID pgtype.UUID `json:"assignee_id"` +} + +func (q *Queries) CountIssues(ctx context.Context, arg CountIssuesParams) (int64, error) { + row := q.db.QueryRow(ctx, countIssues, + arg.WorkspaceID, + arg.Status, + arg.Priority, + arg.AssigneeID, + ) + var count int64 + err := row.Scan(&count) + return count, err +} + const createIssue = `-- name: CreateIssue :one INSERT INTO issue ( workspace_id, title, description, status, priority, @@ -254,6 +281,60 @@ func (q *Queries) ListIssues(ctx context.Context, arg ListIssuesParams) ([]Issue return items, nil } +const listOpenIssues = `-- name: ListOpenIssues :many +SELECT id, workspace_id, title, description, status, priority, assignee_type, assignee_id, creator_type, creator_id, parent_issue_id, acceptance_criteria, context_refs, position, due_date, created_at, updated_at, number FROM issue +WHERE workspace_id = $1 + AND status NOT IN ('done', 'cancelled') + AND ($2::text IS NULL OR priority = $2) + AND ($3::uuid IS NULL OR assignee_id = $3) +ORDER BY position ASC, created_at DESC +` + +type ListOpenIssuesParams struct { + WorkspaceID pgtype.UUID `json:"workspace_id"` + Priority pgtype.Text `json:"priority"` + AssigneeID pgtype.UUID `json:"assignee_id"` +} + +func (q *Queries) ListOpenIssues(ctx context.Context, arg ListOpenIssuesParams) ([]Issue, error) { + rows, err := q.db.Query(ctx, listOpenIssues, arg.WorkspaceID, arg.Priority, arg.AssigneeID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Issue{} + for rows.Next() { + var i Issue + if err := rows.Scan( + &i.ID, + &i.WorkspaceID, + &i.Title, + &i.Description, + &i.Status, + &i.Priority, + &i.AssigneeType, + &i.AssigneeID, + &i.CreatorType, + &i.CreatorID, + &i.ParentIssueID, + &i.AcceptanceCriteria, + &i.ContextRefs, + &i.Position, + &i.DueDate, + &i.CreatedAt, + &i.UpdatedAt, + &i.Number, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const updateIssue = `-- name: UpdateIssue :one UPDATE issue SET title = COALESCE($2, title), diff --git a/server/pkg/db/queries/issue.sql b/server/pkg/db/queries/issue.sql index edc229c3..c8821ffb 100644 --- a/server/pkg/db/queries/issue.sql +++ b/server/pkg/db/queries/issue.sql @@ -51,3 +51,18 @@ RETURNING *; -- name: DeleteIssue :exec DELETE FROM issue WHERE id = $1; + +-- name: ListOpenIssues :many +SELECT * FROM issue +WHERE workspace_id = $1 + AND status NOT IN ('done', 'cancelled') + AND (sqlc.narg('priority')::text IS NULL OR priority = sqlc.narg('priority')) + AND (sqlc.narg('assignee_id')::uuid IS NULL OR assignee_id = sqlc.narg('assignee_id')) +ORDER BY position ASC, created_at DESC; + +-- name: CountIssues :one +SELECT count(*) FROM issue +WHERE workspace_id = $1 + AND (sqlc.narg('status')::text IS NULL OR status = sqlc.narg('status')) + AND (sqlc.narg('priority')::text IS NULL OR priority = sqlc.narg('priority')) + AND (sqlc.narg('assignee_id')::uuid IS NULL OR assignee_id = sqlc.narg('assignee_id'));