From ffda18c809c81e175b9e9f1a511e9f167c4de80f Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Sun, 29 Mar 2026 16:53:28 +0800 Subject: [PATCH] feat(agent): add per-task session persistence for Claude Code resumption Store the Claude Code session ID and working directory when a task completes. On the next task for the same (agent, issue) pair, look up the prior session and pass --resume to Claude Code so the agent retains conversation context across multiple tasks on the same issue. Changes: - Migration 020: add session_id and work_dir columns to agent_task_queue - CompleteAgentTask stores session_id and work_dir on completion - GetLastTaskSession query retrieves prior session for (agent, issue) - ClaimTaskByRuntime handler populates prior_session_id in response - Daemon passes ResumeSessionID through to Claude backend Execute() - Claude backend adds --resume flag when ResumeSessionID is set --- server/internal/daemon/client.go | 8 ++- server/internal/daemon/daemon.go | 13 ++-- server/internal/daemon/types.go | 13 ++-- server/internal/handler/agent.go | 27 ++++---- server/internal/handler/daemon.go | 19 ++++-- server/internal/service/task.go | 8 ++- server/migrations/020_task_session.down.sql | 2 + server/migrations/020_task_session.up.sql | 7 ++ server/pkg/agent/agent.go | 11 ++-- server/pkg/agent/claude.go | 3 + server/pkg/db/generated/agent.sql.go | 73 +++++++++++++++++---- server/pkg/db/generated/models.go | 2 + server/pkg/db/queries/agent.sql | 10 ++- 13 files changed, 147 insertions(+), 49 deletions(-) create mode 100644 server/migrations/020_task_session.down.sql create mode 100644 server/migrations/020_task_session.up.sql diff --git a/server/internal/daemon/client.go b/server/internal/daemon/client.go index 88df73ff..e29a0977 100644 --- a/server/internal/daemon/client.go +++ b/server/internal/daemon/client.go @@ -83,11 +83,17 @@ func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, ste }, nil) } -func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName string) error { +func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error { body := map[string]any{"output": output} if branchName != "" { body["branch_name"] = branchName } + if sessionID != "" { + body["session_id"] = sessionID + } + if workDir != "" { + body["work_dir"] = workDir + } return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil) } diff --git a/server/internal/daemon/daemon.go b/server/internal/daemon/daemon.go index 752bab58..26a2c218 100644 --- a/server/internal/daemon/daemon.go +++ b/server/internal/daemon/daemon.go @@ -564,7 +564,7 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) { } default: d.logger.Info("task completed", "task_id", task.ID, "status", result.Status) - if err := d.client.CompleteTask(ctx, task.ID, result.Comment, result.BranchName); err != nil { + if err := d.client.CompleteTask(ctx, task.ID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil { d.logger.Error("complete task failed", "task_id", task.ID, "error", err) } } @@ -632,12 +632,13 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string) (TaskR return TaskResult{}, fmt.Errorf("create agent backend: %w", err) } - d.logger.Info("starting agent", "provider", provider, "task_id", task.ID, "workdir", env.WorkDir, "branch", env.BranchName, "env_type", env.Type, "model", entry.Model, "timeout", d.cfg.AgentTimeout.String()) + d.logger.Info("starting agent", "provider", provider, "task_id", task.ID, "workdir", env.WorkDir, "branch", env.BranchName, "env_type", env.Type, "model", entry.Model, "timeout", d.cfg.AgentTimeout.String(), "resume_session", task.PriorSessionID) session, err := backend.Execute(ctx, prompt, agent.ExecOptions{ - Cwd: env.WorkDir, - Model: entry.Model, - Timeout: d.cfg.AgentTimeout, + Cwd: env.WorkDir, + Model: entry.Model, + Timeout: d.cfg.AgentTimeout, + ResumeSessionID: task.PriorSessionID, }) if err != nil { return TaskResult{}, err @@ -667,6 +668,8 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string) (TaskR Comment: result.Output, BranchName: env.BranchName, EnvType: string(env.Type), + SessionID: result.SessionID, + WorkDir: env.WorkDir, }, nil case "timeout": return TaskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout) diff --git a/server/internal/daemon/types.go b/server/internal/daemon/types.go index e8430846..f8634d22 100644 --- a/server/internal/daemon/types.go +++ b/server/internal/daemon/types.go @@ -17,11 +17,12 @@ type Runtime struct { // Task represents a claimed task from the server. // Agent data (name, skills) is populated by the claim endpoint. type Task struct { - ID string `json:"id"` - AgentID string `json:"agent_id"` - RuntimeID string `json:"runtime_id"` - IssueID string `json:"issue_id"` - Agent *AgentData `json:"agent,omitempty"` + ID string `json:"id"` + AgentID string `json:"agent_id"` + RuntimeID string `json:"runtime_id"` + IssueID string `json:"issue_id"` + Agent *AgentData `json:"agent,omitempty"` + PriorSessionID string `json:"prior_session_id,omitempty"` // Claude session ID from a previous task on this issue } // AgentData holds agent details returned by the claim endpoint. @@ -50,4 +51,6 @@ type TaskResult struct { Comment string `json:"comment"` BranchName string `json:"branch_name,omitempty"` EnvType string `json:"env_type,omitempty"` + SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption + WorkDir string `json:"work_dir,omitempty"` // working directory used during execution } diff --git a/server/internal/handler/agent.go b/server/internal/handler/agent.go index d83af976..6a95a267 100644 --- a/server/internal/handler/agent.go +++ b/server/internal/handler/agent.go @@ -80,19 +80,20 @@ func agentToResponse(a db.Agent) AgentResponse { } type AgentTaskResponse struct { - ID string `json:"id"` - AgentID string `json:"agent_id"` - RuntimeID string `json:"runtime_id"` - IssueID string `json:"issue_id"` - Status string `json:"status"` - Priority int32 `json:"priority"` - DispatchedAt *string `json:"dispatched_at"` - StartedAt *string `json:"started_at"` - CompletedAt *string `json:"completed_at"` - Result any `json:"result"` - Error *string `json:"error"` - Agent *TaskAgentData `json:"agent,omitempty"` - CreatedAt string `json:"created_at"` + ID string `json:"id"` + AgentID string `json:"agent_id"` + RuntimeID string `json:"runtime_id"` + IssueID string `json:"issue_id"` + Status string `json:"status"` + Priority int32 `json:"priority"` + DispatchedAt *string `json:"dispatched_at"` + StartedAt *string `json:"started_at"` + CompletedAt *string `json:"completed_at"` + Result any `json:"result"` + Error *string `json:"error"` + Agent *TaskAgentData `json:"agent,omitempty"` + CreatedAt string `json:"created_at"` + PriorSessionID string `json:"prior_session_id,omitempty"` // session ID from a previous task on same issue } // TaskAgentData holds agent info included in claim responses so the daemon diff --git a/server/internal/handler/daemon.go b/server/internal/handler/daemon.go index c040e6d2..c6088a70 100644 --- a/server/internal/handler/daemon.go +++ b/server/internal/handler/daemon.go @@ -216,7 +216,16 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) { } } - slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID)) + // Look up the prior session for this (agent, issue) pair so the daemon + // can resume the Claude Code conversation context. + if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{ + AgentID: task.AgentID, + IssueID: task.IssueID, + }); err == nil && prior.SessionID.Valid { + resp.PriorSessionID = prior.SessionID.String + } + + slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID) writeJSON(w, http.StatusOK, map[string]any{"task": resp}) } @@ -288,8 +297,10 @@ func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) { // CompleteTask marks a running task as completed. type TaskCompleteRequest struct { - PRURL string `json:"pr_url"` - Output string `json:"output"` + PRURL string `json:"pr_url"` + Output string `json:"output"` + SessionID string `json:"session_id"` // Claude session ID for future resumption + WorkDir string `json:"work_dir"` // working directory used during execution } func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) { @@ -302,7 +313,7 @@ func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) { } result, _ := json.Marshal(req) - task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result) + task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir) if err != nil { slog.Warn("complete task failed", "task_id", taskID, "error", err) writeError(w, http.StatusBadRequest, err.Error()) diff --git a/server/internal/service/task.go b/server/internal/service/task.go index 7fe74d6d..fae0ae32 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -144,10 +144,12 @@ func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.Ag // CompleteTask marks a task as completed. // Issue status is NOT changed here — the agent manages it via the CLI. -func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte) (*db.AgentTaskQueue, error) { +func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte, sessionID, workDir string) (*db.AgentTaskQueue, error) { task, err := s.Queries.CompleteAgentTask(ctx, db.CompleteAgentTaskParams{ - ID: taskID, - Result: result, + ID: taskID, + Result: result, + SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""}, + WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""}, }) if err != nil { // Log the current task state to help debug why the update matched no rows. diff --git a/server/migrations/020_task_session.down.sql b/server/migrations/020_task_session.down.sql new file mode 100644 index 00000000..5d7911cc --- /dev/null +++ b/server/migrations/020_task_session.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS session_id; +ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS work_dir; diff --git a/server/migrations/020_task_session.up.sql b/server/migrations/020_task_session.up.sql new file mode 100644 index 00000000..92fa2038 --- /dev/null +++ b/server/migrations/020_task_session.up.sql @@ -0,0 +1,7 @@ +-- Add session persistence columns to agent_task_queue. +-- session_id: the Claude Code session ID returned after execution. +-- work_dir: the working directory used during execution. +-- These enable resuming the same Claude Code session across multiple tasks +-- for the same (agent, issue) pair via --resume . +ALTER TABLE agent_task_queue ADD COLUMN session_id TEXT; +ALTER TABLE agent_task_queue ADD COLUMN work_dir TEXT; diff --git a/server/pkg/agent/agent.go b/server/pkg/agent/agent.go index e2198f40..d19887ed 100644 --- a/server/pkg/agent/agent.go +++ b/server/pkg/agent/agent.go @@ -20,11 +20,12 @@ type Backend interface { // ExecOptions configures a single execution. type ExecOptions struct { - Cwd string - Model string - SystemPrompt string - MaxTurns int - Timeout time.Duration + Cwd string + Model string + SystemPrompt string + MaxTurns int + Timeout time.Duration + ResumeSessionID string // if non-empty, resume a previous Claude Code session } // Session represents a running agent execution. diff --git a/server/pkg/agent/claude.go b/server/pkg/agent/claude.go index 6c83a288..610df39c 100644 --- a/server/pkg/agent/claude.go +++ b/server/pkg/agent/claude.go @@ -47,6 +47,9 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt if opts.SystemPrompt != "" { args = append(args, "--append-system-prompt", opts.SystemPrompt) } + if opts.ResumeSessionID != "" { + args = append(args, "--resume", opts.ResumeSessionID) + } args = append(args, "-p", prompt) cmd := exec.CommandContext(runCtx, execPath, args...) diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index b9f7db53..7e8f4e8b 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -32,7 +32,7 @@ WHERE id = ( LIMIT 1 FOR UPDATE SKIP LOCKED ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir ` func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) { @@ -52,24 +52,33 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ) return i, err } const completeAgentTask = `-- name: CompleteAgentTask :one UPDATE agent_task_queue -SET status = 'completed', completed_at = now(), result = $2 +SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4 WHERE id = $1 AND status = 'running' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir ` type CompleteAgentTaskParams struct { - ID pgtype.UUID `json:"id"` - Result []byte `json:"result"` + ID pgtype.UUID `json:"id"` + Result []byte `json:"result"` + SessionID pgtype.Text `json:"session_id"` + WorkDir pgtype.Text `json:"work_dir"` } func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskParams) (AgentTaskQueue, error) { - row := q.db.QueryRow(ctx, completeAgentTask, arg.ID, arg.Result) + row := q.db.QueryRow(ctx, completeAgentTask, + arg.ID, + arg.Result, + arg.SessionID, + arg.WorkDir, + ) var i AgentTaskQueue err := row.Scan( &i.ID, @@ -85,6 +94,8 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ) return i, err } @@ -165,7 +176,7 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent const createAgentTask = `-- name: CreateAgentTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority) VALUES ($1, $2, $3, 'queued', $4) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir ` type CreateAgentTaskParams struct { @@ -197,6 +208,8 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ) return i, err } @@ -214,7 +227,7 @@ const failAgentTask = `-- name: FailAgentTask :one UPDATE agent_task_queue SET status = 'failed', completed_at = now(), error = $2 WHERE id = $1 AND status = 'running' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir ` type FailAgentTaskParams struct { @@ -239,6 +252,8 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ) return i, err } @@ -273,7 +288,7 @@ func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) { } const getAgentTask = `-- name: GetAgentTask :one -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue WHERE id = $1 ` @@ -294,12 +309,40 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ) return i, err } +const getLastTaskSession = `-- name: GetLastTaskSession :one +SELECT session_id, work_dir FROM agent_task_queue +WHERE agent_id = $1 AND issue_id = $2 AND status = 'completed' AND session_id IS NOT NULL +ORDER BY completed_at DESC +LIMIT 1 +` + +type GetLastTaskSessionParams struct { + AgentID pgtype.UUID `json:"agent_id"` + IssueID pgtype.UUID `json:"issue_id"` +} + +type GetLastTaskSessionRow struct { + SessionID pgtype.Text `json:"session_id"` + WorkDir pgtype.Text `json:"work_dir"` +} + +// Returns the session_id and work_dir from the most recent completed task +// for a given (agent_id, issue_id) pair, used for session resumption. +func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) { + row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID) + var i GetLastTaskSessionRow + err := row.Scan(&i.SessionID, &i.WorkDir) + return i, err +} + const listAgentTasks = `-- name: ListAgentTasks :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue WHERE agent_id = $1 ORDER BY created_at DESC ` @@ -327,6 +370,8 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ); err != nil { return nil, err } @@ -382,7 +427,7 @@ func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Ag } const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC ` @@ -410,6 +455,8 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ); err != nil { return nil, err } @@ -425,7 +472,7 @@ const startAgentTask = `-- name: StartAgentTask :one UPDATE agent_task_queue SET status = 'running', started_at = now() WHERE id = $1 AND status = 'dispatched' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir ` func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) { @@ -445,6 +492,8 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask &i.CreatedAt, &i.Context, &i.RuntimeID, + &i.SessionID, + &i.WorkDir, ) return i, err } diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index 546a3bad..a2ab29e8 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -73,6 +73,8 @@ type AgentTaskQueue struct { CreatedAt pgtype.Timestamptz `json:"created_at"` Context []byte `json:"context"` RuntimeID pgtype.UUID `json:"runtime_id"` + SessionID pgtype.Text `json:"session_id"` + WorkDir pgtype.Text `json:"work_dir"` } type Comment struct { diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index fd12d39c..489da756 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -74,10 +74,18 @@ RETURNING *; -- name: CompleteAgentTask :one UPDATE agent_task_queue -SET status = 'completed', completed_at = now(), result = $2 +SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4 WHERE id = $1 AND status = 'running' RETURNING *; +-- name: GetLastTaskSession :one +-- Returns the session_id and work_dir from the most recent completed task +-- for a given (agent_id, issue_id) pair, used for session resumption. +SELECT session_id, work_dir FROM agent_task_queue +WHERE agent_id = $1 AND issue_id = $2 AND status = 'completed' AND session_id IS NOT NULL +ORDER BY completed_at DESC +LIMIT 1; + -- name: FailAgentTask :one UPDATE agent_task_queue SET status = 'failed', completed_at = now(), error = $2