feat: [日本語] DTP Phase B CLI 追加
This commit is contained in:
parent
986d907e5b
commit
273c416344
4 changed files with 1171 additions and 64 deletions
|
|
@ -1,6 +1,6 @@
|
|||
//! Miyabi CLI - Main entry point
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use clap::{Parser, Subcommand, ValueEnum};
|
||||
use miyabi_core::{FeatureFlagManager, RulesLoader};
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
|
@ -95,6 +95,18 @@ enum Commands {
|
|||
#[arg(long)]
|
||||
system: Option<String>,
|
||||
},
|
||||
/// Deterministic Task Protocol gate controls
|
||||
Gate {
|
||||
/// Output format
|
||||
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
|
||||
format: OutputFormat,
|
||||
/// Path to the task ledger JSON file
|
||||
#[arg(long, default_value = "project_memory/tasks.json")]
|
||||
store_path: PathBuf,
|
||||
/// Gate subcommand
|
||||
#[command(subcommand)]
|
||||
command: GateCommand,
|
||||
},
|
||||
/// OpenClaw integration - control OpenClaw agents
|
||||
Openclaw {
|
||||
/// OpenClaw subcommand
|
||||
|
|
@ -109,6 +121,105 @@ enum Commands {
|
|||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, ValueEnum)]
|
||||
enum OutputFormat {
|
||||
Text,
|
||||
Json,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, ValueEnum)]
|
||||
enum CompletionModeArg {
|
||||
GithubPr,
|
||||
Manual,
|
||||
ExternalOp,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, ValueEnum)]
|
||||
enum ImpactRiskArg {
|
||||
Low,
|
||||
Medium,
|
||||
High,
|
||||
Critical,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum GateCommand {
|
||||
/// Register a task in the execution ledger
|
||||
Register {
|
||||
/// GitHub issue number
|
||||
#[arg(long, default_value_t = 0)]
|
||||
issue: u64,
|
||||
/// Task title
|
||||
#[arg(long)]
|
||||
title: String,
|
||||
/// Explicit task ID (defaults to issue-N or slugified title)
|
||||
#[arg(long)]
|
||||
task_id: Option<String>,
|
||||
/// Hard dependencies (comma separated)
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
dependencies: Vec<String>,
|
||||
/// Soft dependencies (comma separated)
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
soft_dependencies: Vec<String>,
|
||||
/// Priority score
|
||||
#[arg(long, default_value_t = 0)]
|
||||
priority: u32,
|
||||
/// Completion mode
|
||||
#[arg(long, value_enum, default_value_t = CompletionModeArg::GithubPr)]
|
||||
completion_mode: CompletionModeArg,
|
||||
},
|
||||
/// Show status for one task or the whole ledger
|
||||
Status {
|
||||
/// Optional task ID
|
||||
task_id: Option<String>,
|
||||
},
|
||||
/// Assign a task and acquire file locks
|
||||
Assign {
|
||||
task_id: String,
|
||||
#[arg(long)]
|
||||
agent: String,
|
||||
#[arg(long)]
|
||||
node: String,
|
||||
#[arg(long, value_delimiter = ',', num_args = 1..)]
|
||||
files: Vec<String>,
|
||||
},
|
||||
/// Record impact analysis
|
||||
Impact {
|
||||
task_id: String,
|
||||
#[arg(long, value_enum)]
|
||||
risk: ImpactRiskArg,
|
||||
#[arg(long)]
|
||||
symbols: usize,
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
depth1: Vec<String>,
|
||||
#[arg(long)]
|
||||
analyzed_commit: Option<String>,
|
||||
#[arg(long)]
|
||||
input_hash: Option<String>,
|
||||
},
|
||||
/// Record branch creation
|
||||
Branch {
|
||||
task_id: String,
|
||||
name: String,
|
||||
},
|
||||
/// Record PR creation
|
||||
Pr {
|
||||
task_id: String,
|
||||
number: u64,
|
||||
},
|
||||
/// Record merge verification
|
||||
Merge {
|
||||
task_id: String,
|
||||
sha: String,
|
||||
},
|
||||
/// List active locks
|
||||
Locks,
|
||||
/// Show DAG levels
|
||||
Dag,
|
||||
/// Show dispatchable tasks
|
||||
Dispatchable,
|
||||
}
|
||||
|
||||
/// Collab canvas subcommands — wraps the collab CLI at ~/.local/bin/collab
|
||||
#[derive(Subcommand)]
|
||||
enum CollabCommand {
|
||||
|
|
@ -662,6 +773,14 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
}
|
||||
Some(Commands::Gate {
|
||||
format,
|
||||
store_path,
|
||||
command,
|
||||
}) => {
|
||||
let code = handle_gate_command(&format, &store_path, command)?;
|
||||
std::process::exit(code);
|
||||
}
|
||||
Some(Commands::Openclaw { command }) => {
|
||||
use miyabi_core::openclaw::{OpenClawClient, OpenClawResult};
|
||||
use std::env;
|
||||
|
|
@ -991,6 +1110,250 @@ async fn main() -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_gate_command(
|
||||
format: &OutputFormat,
|
||||
store_path: &PathBuf,
|
||||
command: GateCommand,
|
||||
) -> anyhow::Result<i32> {
|
||||
use miyabi_core::protocol::{
|
||||
DeterministicExecutionProtocol, ImpactInput, ProtocolError, RegisterTaskRequest,
|
||||
StatusReport,
|
||||
};
|
||||
use miyabi_core::store::{CompletionMode, ImpactRiskLevel};
|
||||
|
||||
let protocol = DeterministicExecutionProtocol::from_store_path(store_path.clone());
|
||||
let actor = "miyabi-cli";
|
||||
let node = std::env::var("HOSTNAME")
|
||||
.ok()
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.unwrap_or_else(|| "local".to_string());
|
||||
|
||||
let result = match command {
|
||||
GateCommand::Register {
|
||||
issue,
|
||||
title,
|
||||
task_id,
|
||||
dependencies,
|
||||
soft_dependencies,
|
||||
priority,
|
||||
completion_mode,
|
||||
} => {
|
||||
let task_id = task_id.unwrap_or_else(|| derive_task_id(issue, &title));
|
||||
protocol
|
||||
.register(
|
||||
RegisterTaskRequest {
|
||||
task_id,
|
||||
title,
|
||||
dependencies,
|
||||
soft_dependencies,
|
||||
priority,
|
||||
completion_mode: match completion_mode {
|
||||
CompletionModeArg::GithubPr => CompletionMode::GithubPr,
|
||||
CompletionModeArg::Manual => CompletionMode::Manual,
|
||||
CompletionModeArg::ExternalOp => CompletionMode::ExternalOp,
|
||||
},
|
||||
},
|
||||
actor,
|
||||
&node,
|
||||
)
|
||||
.map(|task| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&task).unwrap());
|
||||
} else {
|
||||
println!("registered: {} ({})", task.id, task.title);
|
||||
}
|
||||
})
|
||||
}
|
||||
GateCommand::Status { task_id } => protocol.status(task_id.as_deref()).map(|status| {
|
||||
match status {
|
||||
StatusReport::Task(task) => {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&task).unwrap());
|
||||
} else {
|
||||
println!("{}: {:?} - {}", task.id, task.current_state, task.title);
|
||||
}
|
||||
}
|
||||
StatusReport::Snapshot(snapshot) => {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&snapshot).unwrap());
|
||||
} else {
|
||||
println!("tasks: {}", snapshot.tasks.len());
|
||||
for task in snapshot.tasks {
|
||||
println!(" {} [{:?}] {}", task.id, task.current_state, task.title);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
GateCommand::Assign {
|
||||
task_id,
|
||||
agent,
|
||||
node: agent_node,
|
||||
files,
|
||||
} => protocol
|
||||
.assign(&task_id, &agent, &agent_node, &files)
|
||||
.map(|result| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&result).unwrap());
|
||||
} else {
|
||||
println!(
|
||||
"assigned: {} -> {}@{}",
|
||||
result.task.id, agent, agent_node
|
||||
);
|
||||
}
|
||||
}),
|
||||
GateCommand::Impact {
|
||||
task_id,
|
||||
risk,
|
||||
symbols,
|
||||
depth1,
|
||||
analyzed_commit,
|
||||
input_hash,
|
||||
} => protocol
|
||||
.record_impact(
|
||||
&task_id,
|
||||
ImpactInput {
|
||||
risk_level: match risk {
|
||||
ImpactRiskArg::Low => ImpactRiskLevel::Low,
|
||||
ImpactRiskArg::Medium => ImpactRiskLevel::Medium,
|
||||
ImpactRiskArg::High => ImpactRiskLevel::High,
|
||||
ImpactRiskArg::Critical => ImpactRiskLevel::Critical,
|
||||
},
|
||||
affected_symbols: symbols,
|
||||
depth1,
|
||||
analyzed_commit,
|
||||
input_hash,
|
||||
},
|
||||
actor,
|
||||
&node,
|
||||
)
|
||||
.map(|task| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&task).unwrap());
|
||||
} else {
|
||||
println!("impact recorded: {}", task.id);
|
||||
}
|
||||
}),
|
||||
GateCommand::Branch { task_id, name } => protocol
|
||||
.record_branch(&task_id, &name, actor, &node)
|
||||
.map(|task| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&task).unwrap());
|
||||
} else {
|
||||
println!("branch recorded: {} -> {}", task.id, name);
|
||||
}
|
||||
}),
|
||||
GateCommand::Pr { task_id, number } => protocol
|
||||
.record_pr(&task_id, number, actor, &node)
|
||||
.map(|task| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&task).unwrap());
|
||||
} else {
|
||||
println!("pr recorded: {} -> #{}", task.id, number);
|
||||
}
|
||||
}),
|
||||
GateCommand::Merge { task_id, sha } => protocol
|
||||
.record_merge(&task_id, &sha, actor, &node)
|
||||
.map(|task| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&task).unwrap());
|
||||
} else {
|
||||
println!("merge recorded: {} -> {}", task.id, sha);
|
||||
}
|
||||
}),
|
||||
GateCommand::Locks => protocol.locks().map(|locks| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&locks).unwrap());
|
||||
} else if locks.is_empty() {
|
||||
println!("no active locks");
|
||||
} else {
|
||||
for (file, lock) in locks {
|
||||
println!("{} -> {}@{}", file, lock.agent, lock.node);
|
||||
}
|
||||
}
|
||||
}),
|
||||
GateCommand::Dag => protocol.dag().map(|report| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&report).unwrap());
|
||||
} else {
|
||||
for (index, level) in report.levels.iter().enumerate() {
|
||||
println!("level {}: {}", index, level.join(", "));
|
||||
}
|
||||
}
|
||||
}),
|
||||
GateCommand::Dispatchable => protocol.dispatchable().map(|report| {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!("{}", serde_json::to_string_pretty(&report).unwrap());
|
||||
} else if report.tasks.is_empty() {
|
||||
println!("no dispatchable tasks");
|
||||
} else {
|
||||
for task in report.tasks {
|
||||
println!("{} [{}] {}", task.id, task.priority, task.title);
|
||||
}
|
||||
}
|
||||
}),
|
||||
};
|
||||
|
||||
Ok(match result {
|
||||
Ok(()) => 0,
|
||||
Err(ProtocolError::GateRejected(message)) => {
|
||||
emit_gate_error(format, "gate_rejected", &message);
|
||||
1
|
||||
}
|
||||
Err(ProtocolError::Input(message)) => {
|
||||
emit_gate_error(format, "input_error", &message);
|
||||
2
|
||||
}
|
||||
Err(ProtocolError::Internal(error)) => {
|
||||
emit_gate_error(format, "internal_error", &error.to_string());
|
||||
1
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn emit_gate_error(format: &OutputFormat, kind: &str, message: &str) {
|
||||
if matches!(format, OutputFormat::Json) {
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string_pretty(&serde_json::json!({
|
||||
"error": kind,
|
||||
"message": message,
|
||||
}))
|
||||
.unwrap()
|
||||
);
|
||||
} else {
|
||||
eprintln!("{}: {}", kind, message);
|
||||
}
|
||||
}
|
||||
|
||||
fn derive_task_id(issue: u64, title: &str) -> String {
|
||||
if issue > 0 {
|
||||
return format!("issue-{issue}");
|
||||
}
|
||||
|
||||
let slug: String = title
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() {
|
||||
ch.to_ascii_lowercase()
|
||||
} else {
|
||||
'-'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let slug = slug
|
||||
.split('-')
|
||||
.filter(|part| !part.is_empty())
|
||||
.collect::<Vec<_>>()
|
||||
.join("-");
|
||||
|
||||
if slug.is_empty() {
|
||||
"task".to_string()
|
||||
} else {
|
||||
slug
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_str(s: &str, max_len: usize) -> String {
|
||||
if s.len() > max_len {
|
||||
format!("{}...", &s[..max_len.saturating_sub(3)])
|
||||
|
|
|
|||
|
|
@ -24,14 +24,14 @@ impl Default for LeaseConfig {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LockConflict {
|
||||
pub conflicting: bool,
|
||||
pub held_by: Option<String>,
|
||||
pub task_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LeaseSweep {
|
||||
pub released: Vec<String>,
|
||||
pub active: Vec<String>,
|
||||
|
|
|
|||
|
|
@ -1,11 +1,17 @@
|
|||
//! Deterministic execution protocol entry point.
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::error::Error;
|
||||
use crate::gate::{evaluate_gate, Gate, GateContext, GateReport};
|
||||
use crate::lock::FileLockManager;
|
||||
use crate::store::{EventStore, SnapshotStore, TaskEvent, TaskEventType};
|
||||
use crate::lock::{FileLockManager, LeaseConfig, LockConflict};
|
||||
use crate::store::{
|
||||
CompletionMode, EventStore, ExecutionTask, GitHubEvidence, GitHubIssueState, GitHubPrState,
|
||||
ImpactRiskLevel, ReviewDecision, SnapshotStore, TaskEvent, TaskEventType, TaskImpact,
|
||||
TaskState, TasksSnapshot,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -28,22 +34,38 @@ impl DeterministicExecutionProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn from_store_path(store_path: impl Into<PathBuf>) -> Self {
|
||||
let store_path = store_path.into();
|
||||
let parent = store_path
|
||||
.parent()
|
||||
.map(Path::to_path_buf)
|
||||
.unwrap_or_else(|| PathBuf::from("."));
|
||||
let event_store = EventStore::new(parent.join("task-events.jsonl"));
|
||||
let snapshot_store = SnapshotStore::new(store_path, parent.join(".tasks.lock"));
|
||||
let lock_manager = FileLockManager::new(
|
||||
event_store.clone(),
|
||||
snapshot_store.clone(),
|
||||
LeaseConfig::default(),
|
||||
);
|
||||
Self::new(event_store, snapshot_store, lock_manager)
|
||||
}
|
||||
|
||||
pub fn run(
|
||||
&self,
|
||||
task_id: &str,
|
||||
gates: &[Gate],
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> Result<ProtocolReport> {
|
||||
) -> ProtocolResult<ProtocolReport> {
|
||||
let start = Instant::now();
|
||||
let mut steps = Vec::new();
|
||||
let mut success = true;
|
||||
|
||||
for gate in gates {
|
||||
let snapshot = self.snapshot_store.load()?;
|
||||
let task = snapshot
|
||||
.get_task(task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?;
|
||||
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
let task = snapshot.get_task(task_id).ok_or_else(|| {
|
||||
ProtocolError::input(format!("unknown task: {task_id}"))
|
||||
})?;
|
||||
|
||||
let context = if matches!(gate, Gate::Gate4) {
|
||||
let files = task
|
||||
|
|
@ -52,7 +74,11 @@ impl DeterministicExecutionProtocol {
|
|||
.map(|lock| lock.affected_files.clone())
|
||||
.unwrap_or_default();
|
||||
GateContext {
|
||||
lock_conflict: Some(self.lock_manager.has_conflict(&files)?),
|
||||
lock_conflict: Some(
|
||||
self.lock_manager
|
||||
.has_conflict(&files)
|
||||
.map_err(ProtocolError::from)?,
|
||||
),
|
||||
}
|
||||
} else {
|
||||
GateContext::default()
|
||||
|
|
@ -68,14 +94,316 @@ impl DeterministicExecutionProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
Ok(ProtocolReport {
|
||||
let report = ProtocolReport {
|
||||
task_id: task_id.to_string(),
|
||||
steps,
|
||||
total_duration: start.elapsed(),
|
||||
success,
|
||||
};
|
||||
|
||||
if report.success {
|
||||
Ok(report)
|
||||
} else {
|
||||
Err(ProtocolError::gate_rejected(
|
||||
report
|
||||
.steps
|
||||
.last()
|
||||
.map(|step| step.detail.clone())
|
||||
.unwrap_or_else(|| "gate rejected".to_string()),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register(
|
||||
&self,
|
||||
request: RegisterTaskRequest,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> ProtocolResult<ExecutionTask> {
|
||||
let mut snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
if snapshot.get_task(&request.task_id).is_some() {
|
||||
return Err(ProtocolError::input(format!(
|
||||
"task already exists: {}",
|
||||
request.task_id
|
||||
)));
|
||||
}
|
||||
|
||||
let mut task = ExecutionTask::new(&request.task_id, request.title);
|
||||
task.current_state = TaskState::Pending;
|
||||
task.dependencies = request.dependencies;
|
||||
task.soft_dependencies = request.soft_dependencies;
|
||||
task.priority = request.priority;
|
||||
task.completion_mode = request.completion_mode;
|
||||
task.updated_at = Utc::now();
|
||||
snapshot.upsert_task(task.clone());
|
||||
recompute_dependents(&mut snapshot);
|
||||
let version = snapshot.version;
|
||||
self.snapshot_store
|
||||
.save(&snapshot, version)
|
||||
.map_err(ProtocolError::from)?;
|
||||
|
||||
self.append_event(
|
||||
&task.id,
|
||||
TaskEventType::DagChanged,
|
||||
actor,
|
||||
node,
|
||||
version + 1,
|
||||
serde_json::json!({
|
||||
"title": task.title,
|
||||
"dependencies": task.dependencies,
|
||||
"soft_dependencies": task.soft_dependencies,
|
||||
}),
|
||||
)?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
pub fn status(&self, task_id: Option<&str>) -> ProtocolResult<StatusReport> {
|
||||
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
if let Some(task_id) = task_id {
|
||||
let task = snapshot
|
||||
.get_task(task_id)
|
||||
.cloned()
|
||||
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?;
|
||||
Ok(StatusReport::Task(task))
|
||||
} else {
|
||||
Ok(StatusReport::Snapshot(snapshot))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn assign(
|
||||
&self,
|
||||
task_id: &str,
|
||||
agent: &str,
|
||||
node: &str,
|
||||
files: &[String],
|
||||
) -> ProtocolResult<AssignmentResult> {
|
||||
self.run(task_id, &[Gate::Gate2], agent, node)?;
|
||||
let conflict = self
|
||||
.lock_manager
|
||||
.has_conflict(files)
|
||||
.map_err(ProtocolError::from)?;
|
||||
if conflict.conflicting {
|
||||
return Err(ProtocolError::gate_rejected(format!(
|
||||
"lock conflict held by {}",
|
||||
conflict.held_by.unwrap_or_else(|| "unknown".to_string())
|
||||
)));
|
||||
}
|
||||
|
||||
self.lock_manager
|
||||
.acquire_lock(task_id, agent, node, files)
|
||||
.map_err(ProtocolError::from)?;
|
||||
|
||||
let task = self.transition_task(task_id, TaskState::Implementing, agent, node)?;
|
||||
Ok(AssignmentResult {
|
||||
task,
|
||||
lock_conflict: LockConflict {
|
||||
conflicting: false,
|
||||
held_by: None,
|
||||
task_id: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn record_impact(
|
||||
&self,
|
||||
task_id: &str,
|
||||
impact: ImpactInput,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> ProtocolResult<ExecutionTask> {
|
||||
let payload_risk = impact.risk_level;
|
||||
let payload_symbols = impact.affected_symbols;
|
||||
let depth1 = impact.depth1.clone();
|
||||
let analyzed_commit = impact.analyzed_commit.clone();
|
||||
let input_hash = impact.input_hash.clone();
|
||||
self.update_task(task_id, actor, node, TaskEventType::ImpactRecorded, |task| {
|
||||
task.impact = Some(TaskImpact {
|
||||
risk_level: impact.risk_level,
|
||||
affected_symbols: impact.affected_symbols,
|
||||
depth1: depth1.clone(),
|
||||
analyzed_at: Utc::now(),
|
||||
analyzed_commit: analyzed_commit.clone(),
|
||||
input_hash: input_hash.clone(),
|
||||
});
|
||||
Ok(serde_json::json!({
|
||||
"risk_level": payload_risk,
|
||||
"affected_symbols": payload_symbols
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn record_branch(
|
||||
&self,
|
||||
task_id: &str,
|
||||
branch_name: &str,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> ProtocolResult<ExecutionTask> {
|
||||
if branch_name.trim().is_empty() {
|
||||
return Err(ProtocolError::input("branch name must not be empty"));
|
||||
}
|
||||
self.update_task(task_id, actor, node, TaskEventType::BranchCreated, |task| {
|
||||
task.branch_name = Some(branch_name.to_string());
|
||||
Ok(serde_json::json!({ "branch_name": branch_name }))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn record_pr(
|
||||
&self,
|
||||
task_id: &str,
|
||||
pr_number: u64,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> ProtocolResult<ExecutionTask> {
|
||||
if pr_number == 0 {
|
||||
return Err(ProtocolError::input("pr number must be greater than 0"));
|
||||
}
|
||||
self.update_task(task_id, actor, node, TaskEventType::PrCreated, |task| {
|
||||
let branch_name = task.branch_name.clone().unwrap_or_default();
|
||||
task.github_evidence = Some(GitHubEvidence {
|
||||
pr_number,
|
||||
pr_head_ref: branch_name.clone(),
|
||||
pr_state: GitHubPrState::Open,
|
||||
merge_commit_sha: None,
|
||||
merged_at: None,
|
||||
review_decision: Some(ReviewDecision::ReviewRequired),
|
||||
issue_state: GitHubIssueState::Open,
|
||||
issue_closed_by_pr: false,
|
||||
});
|
||||
task.current_state = TaskState::Reviewing;
|
||||
Ok(serde_json::json!({ "pr_number": pr_number, "head_ref": branch_name }))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn record_merge(
|
||||
&self,
|
||||
task_id: &str,
|
||||
merge_commit_sha: &str,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> ProtocolResult<ExecutionTask> {
|
||||
if merge_commit_sha.len() != 40 || !merge_commit_sha.chars().all(|c| c.is_ascii_hexdigit()) {
|
||||
return Err(ProtocolError::input("merge sha must be a 40-char hex string"));
|
||||
}
|
||||
|
||||
self.update_task(task_id, actor, node, TaskEventType::MergeVerified, |task| {
|
||||
let mut evidence = task.github_evidence.clone().ok_or_else(|| {
|
||||
ProtocolError::gate_rejected("pull request must be recorded before merge")
|
||||
})?;
|
||||
evidence.pr_state = GitHubPrState::Merged;
|
||||
evidence.merge_commit_sha = Some(merge_commit_sha.to_string());
|
||||
evidence.merged_at = Some(Utc::now());
|
||||
evidence.review_decision = Some(ReviewDecision::Approved);
|
||||
evidence.issue_state = GitHubIssueState::Closed;
|
||||
evidence.issue_closed_by_pr = true;
|
||||
task.github_evidence = Some(evidence);
|
||||
task.current_state = TaskState::Merged;
|
||||
Ok(serde_json::json!({ "merge_commit_sha": merge_commit_sha }))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn locks(&self) -> ProtocolResult<HashMap<String, crate::store::FileLockEntry>> {
|
||||
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
Ok(snapshot.file_locks)
|
||||
}
|
||||
|
||||
pub fn dag(&self) -> ProtocolResult<DagReport> {
|
||||
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
Ok(compute_dag(&snapshot))
|
||||
}
|
||||
|
||||
pub fn dispatchable(&self) -> ProtocolResult<DispatchableReport> {
|
||||
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
let tasks = snapshot
|
||||
.tasks
|
||||
.iter()
|
||||
.filter(|task| matches!(task.current_state, TaskState::Pending | TaskState::Blocked))
|
||||
.filter(|task| dependencies_satisfied(task, &snapshot))
|
||||
.filter(|task| {
|
||||
let files = task
|
||||
.lock
|
||||
.as_ref()
|
||||
.map(|lock| lock.affected_files.clone())
|
||||
.unwrap_or_default();
|
||||
!self
|
||||
.lock_manager
|
||||
.has_conflict(&files)
|
||||
.map(|conflict| conflict.conflicting)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
Ok(DispatchableReport { tasks })
|
||||
}
|
||||
|
||||
fn transition_task(
|
||||
&self,
|
||||
task_id: &str,
|
||||
to: TaskState,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> ProtocolResult<ExecutionTask> {
|
||||
self.update_task(task_id, actor, node, TaskEventType::StateTransition, |task| {
|
||||
task.current_state = to;
|
||||
Ok(serde_json::json!({ "to": to }))
|
||||
})
|
||||
}
|
||||
|
||||
fn update_task<F>(
|
||||
&self,
|
||||
task_id: &str,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
event_type: TaskEventType,
|
||||
mut update: F,
|
||||
) -> ProtocolResult<ExecutionTask>
|
||||
where
|
||||
F: FnMut(&mut ExecutionTask) -> ProtocolResult<serde_json::Value>,
|
||||
{
|
||||
let mut snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
|
||||
let task = snapshot
|
||||
.get_task_mut(task_id)
|
||||
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?;
|
||||
let payload = update(task)?;
|
||||
task.updated_at = Utc::now();
|
||||
let updated = task.clone();
|
||||
let version = snapshot.version;
|
||||
self.snapshot_store
|
||||
.save(&snapshot, version)
|
||||
.map_err(ProtocolError::from)?;
|
||||
self.append_event(task_id, event_type, actor, node, version + 1, payload)?;
|
||||
Ok(updated)
|
||||
}
|
||||
|
||||
fn append_event(
|
||||
&self,
|
||||
task_id: &str,
|
||||
event_type: TaskEventType,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
version: u64,
|
||||
payload: serde_json::Value,
|
||||
) -> ProtocolResult<()> {
|
||||
self.event_store
|
||||
.append(&TaskEvent {
|
||||
id: format!(
|
||||
"{task_id}-{:?}-{}",
|
||||
event_type,
|
||||
Utc::now().timestamp_millis()
|
||||
),
|
||||
ts: Utc::now(),
|
||||
event_type,
|
||||
task_id: task_id.to_string(),
|
||||
agent: actor.to_string(),
|
||||
node: node.to_string(),
|
||||
payload,
|
||||
version,
|
||||
})
|
||||
.map_err(ProtocolError::from)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn record_gate(
|
||||
&self,
|
||||
task_id: &str,
|
||||
|
|
@ -84,31 +412,24 @@ impl DeterministicExecutionProtocol {
|
|||
actor: &str,
|
||||
node: &str,
|
||||
version: u64,
|
||||
) -> Result<()> {
|
||||
) -> ProtocolResult<()> {
|
||||
let event_type = if report.success {
|
||||
TaskEventType::GatePassed
|
||||
} else {
|
||||
TaskEventType::GateRejected
|
||||
};
|
||||
self.event_store.append(&TaskEvent {
|
||||
id: format!(
|
||||
"{task_id}-{}-{}",
|
||||
gate.label(),
|
||||
Utc::now().timestamp_millis()
|
||||
),
|
||||
ts: Utc::now(),
|
||||
self.append_event(
|
||||
task_id,
|
||||
event_type,
|
||||
task_id: task_id.to_string(),
|
||||
agent: actor.to_string(),
|
||||
node: node.to_string(),
|
||||
payload: serde_json::json!({
|
||||
actor,
|
||||
node,
|
||||
version,
|
||||
serde_json::json!({
|
||||
"gate": gate.label(),
|
||||
"detail": report.detail,
|
||||
"duration_ms": report.duration.as_millis(),
|
||||
}),
|
||||
version,
|
||||
})?;
|
||||
Ok(())
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -120,57 +441,298 @@ pub struct ProtocolReport {
|
|||
pub success: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RegisterTaskRequest {
|
||||
pub task_id: String,
|
||||
pub title: String,
|
||||
pub dependencies: Vec<String>,
|
||||
pub soft_dependencies: Vec<String>,
|
||||
pub priority: u32,
|
||||
pub completion_mode: CompletionMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AssignmentResult {
|
||||
pub task: ExecutionTask,
|
||||
pub lock_conflict: LockConflict,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ImpactInput {
|
||||
pub risk_level: ImpactRiskLevel,
|
||||
pub affected_symbols: usize,
|
||||
pub depth1: Vec<String>,
|
||||
pub analyzed_commit: Option<String>,
|
||||
pub input_hash: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum StatusReport {
|
||||
Task(ExecutionTask),
|
||||
Snapshot(TasksSnapshot),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct DagReport {
|
||||
pub levels: Vec<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct DispatchableReport {
|
||||
pub tasks: Vec<ExecutionTask>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtocolError {
|
||||
#[error("gate rejected: {0}")]
|
||||
GateRejected(String),
|
||||
#[error("input error: {0}")]
|
||||
Input(String),
|
||||
#[error("{0}")]
|
||||
Internal(#[from] Error),
|
||||
}
|
||||
|
||||
impl ProtocolError {
|
||||
pub fn gate_rejected(msg: impl Into<String>) -> Self {
|
||||
Self::GateRejected(msg.into())
|
||||
}
|
||||
|
||||
pub fn input(msg: impl Into<String>) -> Self {
|
||||
Self::Input(msg.into())
|
||||
}
|
||||
}
|
||||
|
||||
pub type ProtocolResult<T> = std::result::Result<T, ProtocolError>;
|
||||
|
||||
fn dependencies_satisfied(task: &ExecutionTask, snapshot: &TasksSnapshot) -> bool {
|
||||
task.dependencies.iter().all(|dep_id| {
|
||||
snapshot.get_task(dep_id).is_some_and(|dep| {
|
||||
matches!(dep.current_state, TaskState::Done | TaskState::Merged)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn recompute_dependents(snapshot: &mut TasksSnapshot) {
|
||||
let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for task in &snapshot.tasks {
|
||||
for dependency in &task.dependencies {
|
||||
dependents
|
||||
.entry(dependency.clone())
|
||||
.or_default()
|
||||
.push(task.id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for task in &mut snapshot.tasks {
|
||||
task.dependents = dependents.remove(&task.id).unwrap_or_default();
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_dag(snapshot: &TasksSnapshot) -> DagReport {
|
||||
let mut indegree: HashMap<String, usize> = snapshot
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|task| (task.id.clone(), task.dependencies.len()))
|
||||
.collect();
|
||||
let mut queue: VecDeque<String> = snapshot
|
||||
.tasks
|
||||
.iter()
|
||||
.filter(|task| task.dependencies.is_empty())
|
||||
.map(|task| task.id.clone())
|
||||
.collect();
|
||||
let mut levels = Vec::new();
|
||||
let task_map: HashMap<&str, &ExecutionTask> =
|
||||
snapshot.tasks.iter().map(|task| (task.id.as_str(), task)).collect();
|
||||
|
||||
while !queue.is_empty() {
|
||||
let mut next = VecDeque::new();
|
||||
let mut level = Vec::new();
|
||||
|
||||
while let Some(task_id) = queue.pop_front() {
|
||||
level.push(task_id.clone());
|
||||
for dependent in snapshot
|
||||
.tasks
|
||||
.iter()
|
||||
.filter(|task| task.dependencies.contains(&task_id))
|
||||
{
|
||||
if let Some(entry) = indegree.get_mut(&dependent.id) {
|
||||
*entry = entry.saturating_sub(1);
|
||||
if *entry == 0 {
|
||||
next.push_back(dependent.id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
levels.push(level);
|
||||
queue = next;
|
||||
}
|
||||
|
||||
for task in &snapshot.tasks {
|
||||
if task_map.contains_key(task.id.as_str())
|
||||
&& !levels.iter().flatten().any(|task_id| task_id == &task.id)
|
||||
{
|
||||
levels.push(vec![task.id.clone()]);
|
||||
}
|
||||
}
|
||||
|
||||
DagReport { levels }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::lock::LeaseConfig;
|
||||
use crate::store::{ExecutionTask, SnapshotStore, TasksSnapshot};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn fixture() -> (TempDir, DeterministicExecutionProtocol) {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let protocol =
|
||||
DeterministicExecutionProtocol::from_store_path(tmp.path().join("tasks.json"));
|
||||
(tmp, protocol)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn protocol_stops_at_first_failed_gate_and_records_events() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let event_store = EventStore::new(tmp.path().join("events.jsonl"));
|
||||
let snapshot_store = SnapshotStore::new(
|
||||
tmp.path().join("tasks.snapshot.json"),
|
||||
tmp.path().join(".tasks.lock"),
|
||||
);
|
||||
let lock_manager = FileLockManager::new(
|
||||
event_store.clone(),
|
||||
snapshot_store.clone(),
|
||||
LeaseConfig::default(),
|
||||
);
|
||||
let protocol = DeterministicExecutionProtocol::new(
|
||||
event_store.clone(),
|
||||
snapshot_store.clone(),
|
||||
lock_manager,
|
||||
);
|
||||
let (_tmp, protocol) = fixture();
|
||||
protocol
|
||||
.register(
|
||||
RegisterTaskRequest {
|
||||
task_id: "phase-0".into(),
|
||||
title: "Phase 0".into(),
|
||||
dependencies: vec![],
|
||||
soft_dependencies: vec![],
|
||||
priority: 0,
|
||||
completion_mode: CompletionMode::GithubPr,
|
||||
},
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
protocol
|
||||
.register(
|
||||
RegisterTaskRequest {
|
||||
task_id: "phase-a".into(),
|
||||
title: "Phase A".into(),
|
||||
dependencies: vec!["phase-0".into()],
|
||||
soft_dependencies: vec![],
|
||||
priority: 0,
|
||||
completion_mode: CompletionMode::GithubPr,
|
||||
},
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
let mut task = ExecutionTask::new("phase-a", "Phase A");
|
||||
task.current_state = crate::store::TaskState::Pending;
|
||||
task.dependencies.push("phase-0".into());
|
||||
snapshot.upsert_task(task);
|
||||
snapshot.upsert_task(ExecutionTask::new("phase-0", "Phase 0"));
|
||||
snapshot_store.save(&snapshot, 0).unwrap();
|
||||
|
||||
let report = protocol
|
||||
let err = protocol
|
||||
.run(
|
||||
"phase-a",
|
||||
&[Gate::Gate1, Gate::Gate2, Gate::Gate3],
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, ProtocolError::GateRejected(_)));
|
||||
|
||||
let events = protocol.event_store.replay_for_task("phase-a").unwrap();
|
||||
assert_eq!(events.len(), 3);
|
||||
assert_eq!(events[1].event_type, TaskEventType::GatePassed);
|
||||
assert_eq!(events[2].event_type, TaskEventType::GateRejected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_status_and_dispatchable_flow() {
|
||||
let (_tmp, protocol) = fixture();
|
||||
protocol
|
||||
.register(
|
||||
RegisterTaskRequest {
|
||||
task_id: "phase-a".into(),
|
||||
title: "Phase A".into(),
|
||||
dependencies: vec![],
|
||||
soft_dependencies: vec![],
|
||||
priority: 10,
|
||||
completion_mode: CompletionMode::GithubPr,
|
||||
},
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(!report.success);
|
||||
assert_eq!(report.steps.len(), 2);
|
||||
assert!(report.steps[0].success);
|
||||
assert!(!report.steps[1].success);
|
||||
let status = protocol.status(Some("phase-a")).unwrap();
|
||||
match status {
|
||||
StatusReport::Task(task) => assert_eq!(task.title, "Phase A"),
|
||||
StatusReport::Snapshot(_) => panic!("expected task status"),
|
||||
}
|
||||
|
||||
let events = event_store.replay_for_task("phase-a").unwrap();
|
||||
assert_eq!(events.len(), 2);
|
||||
assert_eq!(events[0].event_type, TaskEventType::GatePassed);
|
||||
assert_eq!(events[1].event_type, TaskEventType::GateRejected);
|
||||
let dispatchable = protocol.dispatchable().unwrap();
|
||||
assert_eq!(dispatchable.tasks.len(), 1);
|
||||
assert_eq!(dispatchable.tasks[0].id, "phase-a");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn assign_branch_pr_merge_updates_task() {
|
||||
let (_tmp, protocol) = fixture();
|
||||
protocol
|
||||
.register(
|
||||
RegisterTaskRequest {
|
||||
task_id: "phase-a".into(),
|
||||
title: "Phase A".into(),
|
||||
dependencies: vec![],
|
||||
soft_dependencies: vec![],
|
||||
priority: 0,
|
||||
completion_mode: CompletionMode::GithubPr,
|
||||
},
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
protocol
|
||||
.record_impact(
|
||||
"phase-a",
|
||||
ImpactInput {
|
||||
risk_level: ImpactRiskLevel::Low,
|
||||
affected_symbols: 1,
|
||||
depth1: vec!["main".into()],
|
||||
analyzed_commit: None,
|
||||
input_hash: None,
|
||||
},
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let assigned = protocol
|
||||
.assign(
|
||||
"phase-a",
|
||||
"codex",
|
||||
"macbook",
|
||||
&[String::from("crates/miyabi-core/src/main.rs")],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(assigned.task.current_state, TaskState::Implementing);
|
||||
|
||||
protocol
|
||||
.record_branch("phase-a", "feature/phase-a", "codex", "macbook")
|
||||
.unwrap();
|
||||
protocol.record_pr("phase-a", 12, "codex", "macbook").unwrap();
|
||||
let merged = protocol
|
||||
.record_merge(
|
||||
"phase-a",
|
||||
"0123456789abcdef0123456789abcdef01234567",
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(merged.current_state, TaskState::Merged);
|
||||
assert_eq!(
|
||||
merged
|
||||
.github_evidence
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.merge_commit_sha
|
||||
.as_deref(),
|
||||
Some("0123456789abcdef0123456789abcdef01234567")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -225,6 +225,11 @@ impl TasksSnapshot {
|
|||
self.tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_task(&mut self, task_id: &str) -> Option<ExecutionTask> {
|
||||
let index = self.tasks.iter().position(|task| task.id == task_id)?;
|
||||
Some(self.tasks.remove(index))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -313,7 +318,13 @@ impl SnapshotStore {
|
|||
return Ok(TasksSnapshot::default());
|
||||
}
|
||||
let raw = fs::read_to_string(&self.file_path)?;
|
||||
Ok(serde_json::from_str(&raw)?)
|
||||
match serde_json::from_str(&raw) {
|
||||
Ok(snapshot) => Ok(snapshot),
|
||||
Err(_) => {
|
||||
let legacy: LegacyTasksFile = serde_json::from_str(&raw)?;
|
||||
Ok(legacy.into_snapshot())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn save(&self, snapshot: &TasksSnapshot, expected_version: u64) -> Result<()> {
|
||||
|
|
@ -475,6 +486,177 @@ fn ensure_parent_dir(path: &Path) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct LegacyTasksFile {
|
||||
#[serde(default)]
|
||||
version: u64,
|
||||
#[serde(default)]
|
||||
tasks: Vec<LegacyTask>,
|
||||
}
|
||||
|
||||
impl LegacyTasksFile {
|
||||
fn into_snapshot(self) -> TasksSnapshot {
|
||||
let mut snapshot = TasksSnapshot {
|
||||
version: self.version,
|
||||
generated_at: Utc::now(),
|
||||
generated_from_event_id: None,
|
||||
tasks: self.tasks.into_iter().map(LegacyTask::into_execution_task).collect(),
|
||||
file_locks: HashMap::new(),
|
||||
};
|
||||
|
||||
for task in &snapshot.tasks {
|
||||
if let Some(lock) = &task.lock {
|
||||
let owner_parts: Vec<&str> = lock.locked_by.split('@').collect();
|
||||
let agent = owner_parts.first().copied().unwrap_or("unknown").to_string();
|
||||
let node = owner_parts.get(1).copied().unwrap_or("unknown").to_string();
|
||||
let expires_at = lease_expiry(lock.last_heartbeat, lock.lease_duration_sec);
|
||||
|
||||
for file in &lock.affected_files {
|
||||
snapshot.file_locks.insert(
|
||||
file.clone(),
|
||||
FileLockEntry {
|
||||
task_id: task.id.clone(),
|
||||
agent: agent.clone(),
|
||||
node: node.clone(),
|
||||
expires_at,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
snapshot
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct LegacyTask {
|
||||
id: String,
|
||||
title: String,
|
||||
#[serde(default)]
|
||||
state: String,
|
||||
#[serde(default)]
|
||||
dependencies: Vec<String>,
|
||||
#[serde(default)]
|
||||
dependents: Vec<String>,
|
||||
#[serde(default)]
|
||||
soft_dependencies: Vec<String>,
|
||||
#[serde(default)]
|
||||
lock: Option<LegacyLock>,
|
||||
#[serde(default)]
|
||||
impact: Option<LegacyImpact>,
|
||||
#[serde(default)]
|
||||
branch_name: Option<String>,
|
||||
#[serde(default)]
|
||||
pr_number: Option<u64>,
|
||||
#[serde(default)]
|
||||
merge_commit: Option<String>,
|
||||
#[serde(default)]
|
||||
created_at: Option<DateTime<Utc>>,
|
||||
#[serde(default)]
|
||||
updated_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl LegacyTask {
|
||||
fn into_execution_task(self) -> ExecutionTask {
|
||||
let created_at = self.created_at.unwrap_or_else(Utc::now);
|
||||
let updated_at = self.updated_at.unwrap_or(created_at);
|
||||
let github_evidence = self.pr_number.map(|pr_number| GitHubEvidence {
|
||||
pr_number,
|
||||
pr_head_ref: self.branch_name.clone().unwrap_or_default(),
|
||||
pr_state: if self.merge_commit.is_some() {
|
||||
GitHubPrState::Merged
|
||||
} else {
|
||||
GitHubPrState::Open
|
||||
},
|
||||
merge_commit_sha: self.merge_commit,
|
||||
merged_at: None,
|
||||
review_decision: None,
|
||||
issue_state: GitHubIssueState::Open,
|
||||
issue_closed_by_pr: false,
|
||||
});
|
||||
|
||||
ExecutionTask {
|
||||
id: self.id,
|
||||
title: self.title,
|
||||
current_state: legacy_state(&self.state),
|
||||
dependencies: self.dependencies,
|
||||
dependents: self.dependents,
|
||||
soft_dependencies: self.soft_dependencies,
|
||||
lock: self.lock.map(LegacyLock::into_snapshot),
|
||||
impact: self.impact.map(LegacyImpact::into_snapshot),
|
||||
branch_name: self.branch_name,
|
||||
github_evidence,
|
||||
completion_mode: CompletionMode::GithubPr,
|
||||
human_approval: None,
|
||||
priority: 0,
|
||||
created_at,
|
||||
updated_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct LegacyLock {
|
||||
locked_by: String,
|
||||
locked_at: DateTime<Utc>,
|
||||
ttl_secs: u64,
|
||||
#[serde(default)]
|
||||
affected_files: Vec<String>,
|
||||
}
|
||||
|
||||
impl LegacyLock {
|
||||
fn into_snapshot(self) -> TaskLockSnapshot {
|
||||
TaskLockSnapshot {
|
||||
locked_by: self.locked_by,
|
||||
locked_at: self.locked_at,
|
||||
lease_duration_sec: self.ttl_secs,
|
||||
last_heartbeat: self.locked_at,
|
||||
affected_files: self.affected_files,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct LegacyImpact {
|
||||
risk_level: ImpactRiskLevel,
|
||||
affected_symbols: usize,
|
||||
#[serde(default)]
|
||||
depth1: Vec<String>,
|
||||
analyzed_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl LegacyImpact {
|
||||
fn into_snapshot(self) -> TaskImpact {
|
||||
TaskImpact {
|
||||
risk_level: self.risk_level,
|
||||
affected_symbols: self.affected_symbols,
|
||||
depth1: self.depth1,
|
||||
analyzed_at: self.analyzed_at,
|
||||
analyzed_commit: None,
|
||||
input_hash: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn legacy_state(state: &str) -> TaskState {
|
||||
match state {
|
||||
"draft" => TaskState::Draft,
|
||||
"pending" => TaskState::Pending,
|
||||
"analyzing" => TaskState::Analyzing,
|
||||
"implementing" => TaskState::Implementing,
|
||||
"reviewing" => TaskState::Reviewing,
|
||||
"merged" => TaskState::Merged,
|
||||
"deploying" => TaskState::Deploying,
|
||||
"done" => TaskState::Done,
|
||||
"blocked" => TaskState::Blocked,
|
||||
"failed" => TaskState::Failed,
|
||||
"cancelled" => TaskState::Cancelled,
|
||||
"awaiting_github_sync" => TaskState::AwaitingGithubSync,
|
||||
_ => TaskState::Draft,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lease_expiry(last_heartbeat: DateTime<Utc>, lease_duration_sec: u64) -> DateTime<Utc> {
|
||||
last_heartbeat + Duration::seconds(lease_duration_sec as i64)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue