feat(agent): add per-task session persistence for Claude Code resumption

Store the Claude Code session ID and working directory when a task
completes. On the next task for the same (agent, issue) pair, look up
the prior session and pass --resume <session_id> to Claude Code so
the agent retains conversation context across multiple tasks on the
same issue.

Changes:
- Migration 020: add session_id and work_dir columns to agent_task_queue
- CompleteAgentTask stores session_id and work_dir on completion
- GetLastTaskSession query retrieves prior session for (agent, issue)
- ClaimTaskByRuntime handler populates prior_session_id in response
- Daemon passes ResumeSessionID through to Claude backend Execute()
- Claude backend adds --resume flag when ResumeSessionID is set
This commit is contained in:
Jiayuan 2026-03-29 16:53:28 +08:00
parent 42f72371bd
commit ffda18c809
13 changed files with 147 additions and 49 deletions

View file

@ -83,11 +83,17 @@ func (c *Client) ReportProgress(ctx context.Context, taskID, summary string, ste
}, nil)
}
func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName string) error {
func (c *Client) CompleteTask(ctx context.Context, taskID, output, branchName, sessionID, workDir string) error {
body := map[string]any{"output": output}
if branchName != "" {
body["branch_name"] = branchName
}
if sessionID != "" {
body["session_id"] = sessionID
}
if workDir != "" {
body["work_dir"] = workDir
}
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/tasks/%s/complete", taskID), body, nil)
}

View file

@ -564,7 +564,7 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) {
}
default:
d.logger.Info("task completed", "task_id", task.ID, "status", result.Status)
if err := d.client.CompleteTask(ctx, task.ID, result.Comment, result.BranchName); err != nil {
if err := d.client.CompleteTask(ctx, task.ID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil {
d.logger.Error("complete task failed", "task_id", task.ID, "error", err)
}
}
@ -632,12 +632,13 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string) (TaskR
return TaskResult{}, fmt.Errorf("create agent backend: %w", err)
}
d.logger.Info("starting agent", "provider", provider, "task_id", task.ID, "workdir", env.WorkDir, "branch", env.BranchName, "env_type", env.Type, "model", entry.Model, "timeout", d.cfg.AgentTimeout.String())
d.logger.Info("starting agent", "provider", provider, "task_id", task.ID, "workdir", env.WorkDir, "branch", env.BranchName, "env_type", env.Type, "model", entry.Model, "timeout", d.cfg.AgentTimeout.String(), "resume_session", task.PriorSessionID)
session, err := backend.Execute(ctx, prompt, agent.ExecOptions{
Cwd: env.WorkDir,
Model: entry.Model,
Timeout: d.cfg.AgentTimeout,
Cwd: env.WorkDir,
Model: entry.Model,
Timeout: d.cfg.AgentTimeout,
ResumeSessionID: task.PriorSessionID,
})
if err != nil {
return TaskResult{}, err
@ -667,6 +668,8 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string) (TaskR
Comment: result.Output,
BranchName: env.BranchName,
EnvType: string(env.Type),
SessionID: result.SessionID,
WorkDir: env.WorkDir,
}, nil
case "timeout":
return TaskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout)

View file

@ -17,11 +17,12 @@ type Runtime struct {
// Task represents a claimed task from the server.
// Agent data (name, skills) is populated by the claim endpoint.
type Task struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
Agent *AgentData `json:"agent,omitempty"`
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
Agent *AgentData `json:"agent,omitempty"`
PriorSessionID string `json:"prior_session_id,omitempty"` // Claude session ID from a previous task on this issue
}
// AgentData holds agent details returned by the claim endpoint.
@ -50,4 +51,6 @@ type TaskResult struct {
Comment string `json:"comment"`
BranchName string `json:"branch_name,omitempty"`
EnvType string `json:"env_type,omitempty"`
SessionID string `json:"session_id,omitempty"` // Claude session ID for future resumption
WorkDir string `json:"work_dir,omitempty"` // working directory used during execution
}

View file

@ -80,19 +80,20 @@ func agentToResponse(a db.Agent) AgentResponse {
}
type AgentTaskResponse struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
DispatchedAt *string `json:"dispatched_at"`
StartedAt *string `json:"started_at"`
CompletedAt *string `json:"completed_at"`
Result any `json:"result"`
Error *string `json:"error"`
Agent *TaskAgentData `json:"agent,omitempty"`
CreatedAt string `json:"created_at"`
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
DispatchedAt *string `json:"dispatched_at"`
StartedAt *string `json:"started_at"`
CompletedAt *string `json:"completed_at"`
Result any `json:"result"`
Error *string `json:"error"`
Agent *TaskAgentData `json:"agent,omitempty"`
CreatedAt string `json:"created_at"`
PriorSessionID string `json:"prior_session_id,omitempty"` // session ID from a previous task on same issue
}
// TaskAgentData holds agent info included in claim responses so the daemon

View file

@ -216,7 +216,16 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
}
}
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID))
// Look up the prior session for this (agent, issue) pair so the daemon
// can resume the Claude Code conversation context.
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
}
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
writeJSON(w, http.StatusOK, map[string]any{"task": resp})
}
@ -288,8 +297,10 @@ func (h *Handler) ReportTaskProgress(w http.ResponseWriter, r *http.Request) {
// CompleteTask marks a running task as completed.
type TaskCompleteRequest struct {
PRURL string `json:"pr_url"`
Output string `json:"output"`
PRURL string `json:"pr_url"`
Output string `json:"output"`
SessionID string `json:"session_id"` // Claude session ID for future resumption
WorkDir string `json:"work_dir"` // working directory used during execution
}
func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
@ -302,7 +313,7 @@ func (h *Handler) CompleteTask(w http.ResponseWriter, r *http.Request) {
}
result, _ := json.Marshal(req)
task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result)
task, err := h.TaskService.CompleteTask(r.Context(), parseUUID(taskID), result, req.SessionID, req.WorkDir)
if err != nil {
slog.Warn("complete task failed", "task_id", taskID, "error", err)
writeError(w, http.StatusBadRequest, err.Error())

View file

@ -144,10 +144,12 @@ func (s *TaskService) StartTask(ctx context.Context, taskID pgtype.UUID) (*db.Ag
// CompleteTask marks a task as completed.
// Issue status is NOT changed here — the agent manages it via the CLI.
func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte) (*db.AgentTaskQueue, error) {
func (s *TaskService) CompleteTask(ctx context.Context, taskID pgtype.UUID, result []byte, sessionID, workDir string) (*db.AgentTaskQueue, error) {
task, err := s.Queries.CompleteAgentTask(ctx, db.CompleteAgentTaskParams{
ID: taskID,
Result: result,
ID: taskID,
Result: result,
SessionID: pgtype.Text{String: sessionID, Valid: sessionID != ""},
WorkDir: pgtype.Text{String: workDir, Valid: workDir != ""},
})
if err != nil {
// Log the current task state to help debug why the update matched no rows.

View file

@ -0,0 +1,2 @@
ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS session_id;
ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS work_dir;

View file

@ -0,0 +1,7 @@
-- Add session persistence columns to agent_task_queue.
-- session_id: the Claude Code session ID returned after execution.
-- work_dir: the working directory used during execution.
-- These enable resuming the same Claude Code session across multiple tasks
-- for the same (agent, issue) pair via --resume <session_id>.
ALTER TABLE agent_task_queue ADD COLUMN session_id TEXT;
ALTER TABLE agent_task_queue ADD COLUMN work_dir TEXT;

View file

@ -20,11 +20,12 @@ type Backend interface {
// ExecOptions configures a single execution.
type ExecOptions struct {
Cwd string
Model string
SystemPrompt string
MaxTurns int
Timeout time.Duration
Cwd string
Model string
SystemPrompt string
MaxTurns int
Timeout time.Duration
ResumeSessionID string // if non-empty, resume a previous Claude Code session
}
// Session represents a running agent execution.

View file

@ -47,6 +47,9 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
if opts.SystemPrompt != "" {
args = append(args, "--append-system-prompt", opts.SystemPrompt)
}
if opts.ResumeSessionID != "" {
args = append(args, "--resume", opts.ResumeSessionID)
}
args = append(args, "-p", prompt)
cmd := exec.CommandContext(runCtx, execPath, args...)

View file

@ -32,7 +32,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir
`
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
@ -52,24 +52,33 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
)
return i, err
}
const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir
`
type CompleteAgentTaskParams struct {
ID pgtype.UUID `json:"id"`
Result []byte `json:"result"`
ID pgtype.UUID `json:"id"`
Result []byte `json:"result"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
}
func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, completeAgentTask, arg.ID, arg.Result)
row := q.db.QueryRow(ctx, completeAgentTask,
arg.ID,
arg.Result,
arg.SessionID,
arg.WorkDir,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
@ -85,6 +94,8 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
)
return i, err
}
@ -165,7 +176,7 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
const createAgentTask = `-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority)
VALUES ($1, $2, $3, 'queued', $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir
`
type CreateAgentTaskParams struct {
@ -197,6 +208,8 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
)
return i, err
}
@ -214,7 +227,7 @@ const failAgentTask = `-- name: FailAgentTask :one
UPDATE agent_task_queue
SET status = 'failed', completed_at = now(), error = $2
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir
`
type FailAgentTaskParams struct {
@ -239,6 +252,8 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
)
return i, err
}
@ -273,7 +288,7 @@ func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE id = $1
`
@ -294,12 +309,40 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
)
return i, err
}
const getLastTaskSession = `-- name: GetLastTaskSession :one
SELECT session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1 AND issue_id = $2 AND status = 'completed' AND session_id IS NOT NULL
ORDER BY completed_at DESC
LIMIT 1
`
type GetLastTaskSessionParams struct {
AgentID pgtype.UUID `json:"agent_id"`
IssueID pgtype.UUID `json:"issue_id"`
}
type GetLastTaskSessionRow struct {
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
}
// Returns the session_id and work_dir from the most recent completed task
// for a given (agent_id, issue_id) pair, used for session resumption.
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
var i GetLastTaskSessionRow
err := row.Scan(&i.SessionID, &i.WorkDir)
return i, err
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
@ -327,6 +370,8 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
); err != nil {
return nil, err
}
@ -382,7 +427,7 @@ func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Ag
}
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
@ -410,6 +455,8 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
); err != nil {
return nil, err
}
@ -425,7 +472,7 @@ const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir
`
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@ -445,6 +492,8 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
)
return i, err
}

View file

@ -73,6 +73,8 @@ type AgentTaskQueue struct {
CreatedAt pgtype.Timestamptz `json:"created_at"`
Context []byte `json:"context"`
RuntimeID pgtype.UUID `json:"runtime_id"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
}
type Comment struct {

View file

@ -74,10 +74,18 @@ RETURNING *;
-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
WHERE id = $1 AND status = 'running'
RETURNING *;
-- name: GetLastTaskSession :one
-- Returns the session_id and work_dir from the most recent completed task
-- for a given (agent_id, issue_id) pair, used for session resumption.
SELECT session_id, work_dir FROM agent_task_queue
WHERE agent_id = $1 AND issue_id = $2 AND status = 'completed' AND session_id IS NOT NULL
ORDER BY completed_at DESC
LIMIT 1;
-- name: FailAgentTask :one
UPDATE agent_task_queue
SET status = 'failed', completed_at = now(), error = $2