- Coalescing queue: use HasPendingTaskForIssue (queued/dispatched only) instead of HasActiveTaskForIssue so comments during a running task enqueue exactly one follow-up task that picks up all new comments. - Stale task cleanup: runtime sweeper now fails orphaned tasks when their runtime goes offline (daemon crash/network partition). - Cancel-aware daemon: handleTask checks task status after execution and discards results if the task was cancelled mid-run (e.g. reassign). - Terminal issue guard: ClaimTaskForRuntime auto-cancels pending tasks for done/cancelled issues instead of executing them. - Race condition safety net: unique partial index ensures at most one pending task per issue at the DB level.
279 lines
7.1 KiB
Go
279 lines
7.1 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: runtime.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const failTasksForOfflineRuntimes = `-- name: FailTasksForOfflineRuntimes :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed', completed_at = now(), error = 'runtime went offline'
|
|
WHERE status IN ('dispatched', 'running')
|
|
AND runtime_id IN (
|
|
SELECT id FROM agent_runtime WHERE status = 'offline'
|
|
)
|
|
RETURNING id, agent_id, issue_id
|
|
`
|
|
|
|
type FailTasksForOfflineRuntimesRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
// Marks dispatched/running tasks as failed when their runtime is offline.
|
|
// This cleans up orphaned tasks after a daemon crash or network partition.
|
|
func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]FailTasksForOfflineRuntimesRow, error) {
|
|
rows, err := q.db.Query(ctx, failTasksForOfflineRuntimes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []FailTasksForOfflineRuntimesRow{}
|
|
for rows.Next() {
|
|
var i FailTasksForOfflineRuntimesRow
|
|
if err := rows.Scan(&i.ID, &i.AgentID, &i.IssueID); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getAgentRuntime = `-- name: GetAgentRuntime :one
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, getAgentRuntime, id)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime
|
|
WHERE id = $1 AND workspace_id = $2
|
|
`
|
|
|
|
type GetAgentRuntimeForWorkspaceParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentRuntimeForWorkspaceParams) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, getAgentRuntimeForWorkspace, arg.ID, arg.WorkspaceID)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const listAgentRuntimes = `-- name: ListAgentRuntimes :many
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at FROM agent_runtime
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID) ([]AgentRuntime, error) {
|
|
rows, err := q.db.Query(ctx, listAgentRuntimes, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentRuntime{}
|
|
for rows.Next() {
|
|
var i AgentRuntime
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const markStaleRuntimesOffline = `-- name: MarkStaleRuntimesOffline :many
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE status = 'online'
|
|
AND last_seen_at < now() - make_interval(secs => $1::double precision)
|
|
RETURNING id, workspace_id
|
|
`
|
|
|
|
type MarkStaleRuntimesOfflineRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) MarkStaleRuntimesOffline(ctx context.Context, staleSeconds float64) ([]MarkStaleRuntimesOfflineRow, error) {
|
|
rows, err := q.db.Query(ctx, markStaleRuntimesOffline, staleSeconds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []MarkStaleRuntimesOfflineRow{}
|
|
for rows.Next() {
|
|
var i MarkStaleRuntimesOfflineRow
|
|
if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const setAgentRuntimeOffline = `-- name: SetAgentRuntimeOffline :exec
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) SetAgentRuntimeOffline(ctx context.Context, id pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, setAgentRuntimeOffline, id)
|
|
return err
|
|
}
|
|
|
|
const updateAgentRuntimeHeartbeat = `-- name: UpdateAgentRuntimeHeartbeat :one
|
|
UPDATE agent_runtime
|
|
SET status = 'online', last_seen_at = now(), updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at
|
|
`
|
|
|
|
func (q *Queries) UpdateAgentRuntimeHeartbeat(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentRuntimeHeartbeat, id)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const upsertAgentRuntime = `-- name: UpsertAgentRuntime :one
|
|
INSERT INTO agent_runtime (
|
|
workspace_id,
|
|
daemon_id,
|
|
name,
|
|
runtime_mode,
|
|
provider,
|
|
status,
|
|
device_info,
|
|
metadata,
|
|
last_seen_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now())
|
|
ON CONFLICT (workspace_id, daemon_id, provider)
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
runtime_mode = EXCLUDED.runtime_mode,
|
|
status = EXCLUDED.status,
|
|
device_info = EXCLUDED.device_info,
|
|
metadata = EXCLUDED.metadata,
|
|
last_seen_at = now(),
|
|
updated_at = now()
|
|
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at
|
|
`
|
|
|
|
type UpsertAgentRuntimeParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
DaemonID pgtype.Text `json:"daemon_id"`
|
|
Name string `json:"name"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
Provider string `json:"provider"`
|
|
Status string `json:"status"`
|
|
DeviceInfo string `json:"device_info"`
|
|
Metadata []byte `json:"metadata"`
|
|
}
|
|
|
|
func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntimeParams) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, upsertAgentRuntime,
|
|
arg.WorkspaceID,
|
|
arg.DaemonID,
|
|
arg.Name,
|
|
arg.RuntimeMode,
|
|
arg.Provider,
|
|
arg.Status,
|
|
arg.DeviceInfo,
|
|
arg.Metadata,
|
|
)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|