diff --git a/crates/miyabi-cli/src/main.rs b/crates/miyabi-cli/src/main.rs index fcde201..6b88714 100644 --- a/crates/miyabi-cli/src/main.rs +++ b/crates/miyabi-cli/src/main.rs @@ -1,6 +1,6 @@ //! Miyabi CLI - Main entry point -use clap::{Parser, Subcommand}; +use clap::{Parser, Subcommand, ValueEnum}; use miyabi_core::{FeatureFlagManager, RulesLoader}; use std::collections::HashMap; use std::path::PathBuf; @@ -95,6 +95,18 @@ enum Commands { #[arg(long)] system: Option, }, + /// Deterministic Task Protocol gate controls + Gate { + /// Output format + #[arg(long, value_enum, default_value_t = OutputFormat::Text)] + format: OutputFormat, + /// Path to the task ledger JSON file + #[arg(long, default_value = "project_memory/tasks.json")] + store_path: PathBuf, + /// Gate subcommand + #[command(subcommand)] + command: GateCommand, + }, /// OpenClaw integration - control OpenClaw agents Openclaw { /// OpenClaw subcommand @@ -109,6 +121,105 @@ enum Commands { }, } +#[derive(Clone, Debug, ValueEnum)] +enum OutputFormat { + Text, + Json, +} + +#[derive(Clone, Debug, ValueEnum)] +enum CompletionModeArg { + GithubPr, + Manual, + ExternalOp, +} + +#[derive(Clone, Debug, ValueEnum)] +enum ImpactRiskArg { + Low, + Medium, + High, + Critical, +} + +#[derive(Subcommand)] +enum GateCommand { + /// Register a task in the execution ledger + Register { + /// GitHub issue number + #[arg(long, default_value_t = 0)] + issue: u64, + /// Task title + #[arg(long)] + title: String, + /// Explicit task ID (defaults to issue-N or slugified title) + #[arg(long)] + task_id: Option, + /// Hard dependencies (comma separated) + #[arg(long, value_delimiter = ',')] + dependencies: Vec, + /// Soft dependencies (comma separated) + #[arg(long, value_delimiter = ',')] + soft_dependencies: Vec, + /// Priority score + #[arg(long, default_value_t = 0)] + priority: u32, + /// Completion mode + #[arg(long, value_enum, default_value_t = CompletionModeArg::GithubPr)] + completion_mode: CompletionModeArg, + }, + /// Show status for one task or the whole ledger + Status { + /// Optional task ID + task_id: Option, + }, + /// Assign a task and acquire file locks + Assign { + task_id: String, + #[arg(long)] + agent: String, + #[arg(long)] + node: String, + #[arg(long, value_delimiter = ',', num_args = 1..)] + files: Vec, + }, + /// Record impact analysis + Impact { + task_id: String, + #[arg(long, value_enum)] + risk: ImpactRiskArg, + #[arg(long)] + symbols: usize, + #[arg(long, value_delimiter = ',')] + depth1: Vec, + #[arg(long)] + analyzed_commit: Option, + #[arg(long)] + input_hash: Option, + }, + /// Record branch creation + Branch { + task_id: String, + name: String, + }, + /// Record PR creation + Pr { + task_id: String, + number: u64, + }, + /// Record merge verification + Merge { + task_id: String, + sha: String, + }, + /// List active locks + Locks, + /// Show DAG levels + Dag, + /// Show dispatchable tasks + Dispatchable, +} + /// Collab canvas subcommands — wraps the collab CLI at ~/.local/bin/collab #[derive(Subcommand)] enum CollabCommand { @@ -662,6 +773,14 @@ async fn main() -> anyhow::Result<()> { } } } + Some(Commands::Gate { + format, + store_path, + command, + }) => { + let code = handle_gate_command(&format, &store_path, command)?; + std::process::exit(code); + } Some(Commands::Openclaw { command }) => { use miyabi_core::openclaw::{OpenClawClient, OpenClawResult}; use std::env; @@ -991,6 +1110,250 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +fn handle_gate_command( + format: &OutputFormat, + store_path: &PathBuf, + command: GateCommand, +) -> anyhow::Result { + use miyabi_core::protocol::{ + DeterministicExecutionProtocol, ImpactInput, ProtocolError, RegisterTaskRequest, + StatusReport, + }; + use miyabi_core::store::{CompletionMode, ImpactRiskLevel}; + + let protocol = DeterministicExecutionProtocol::from_store_path(store_path.clone()); + let actor = "miyabi-cli"; + let node = std::env::var("HOSTNAME") + .ok() + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| "local".to_string()); + + let result = match command { + GateCommand::Register { + issue, + title, + task_id, + dependencies, + soft_dependencies, + priority, + completion_mode, + } => { + let task_id = task_id.unwrap_or_else(|| derive_task_id(issue, &title)); + protocol + .register( + RegisterTaskRequest { + task_id, + title, + dependencies, + soft_dependencies, + priority, + completion_mode: match completion_mode { + CompletionModeArg::GithubPr => CompletionMode::GithubPr, + CompletionModeArg::Manual => CompletionMode::Manual, + CompletionModeArg::ExternalOp => CompletionMode::ExternalOp, + }, + }, + actor, + &node, + ) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("registered: {} ({})", task.id, task.title); + } + }) + } + GateCommand::Status { task_id } => protocol.status(task_id.as_deref()).map(|status| { + match status { + StatusReport::Task(task) => { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("{}: {:?} - {}", task.id, task.current_state, task.title); + } + } + StatusReport::Snapshot(snapshot) => { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&snapshot).unwrap()); + } else { + println!("tasks: {}", snapshot.tasks.len()); + for task in snapshot.tasks { + println!(" {} [{:?}] {}", task.id, task.current_state, task.title); + } + } + } + } + }), + GateCommand::Assign { + task_id, + agent, + node: agent_node, + files, + } => protocol + .assign(&task_id, &agent, &agent_node, &files) + .map(|result| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&result).unwrap()); + } else { + println!( + "assigned: {} -> {}@{}", + result.task.id, agent, agent_node + ); + } + }), + GateCommand::Impact { + task_id, + risk, + symbols, + depth1, + analyzed_commit, + input_hash, + } => protocol + .record_impact( + &task_id, + ImpactInput { + risk_level: match risk { + ImpactRiskArg::Low => ImpactRiskLevel::Low, + ImpactRiskArg::Medium => ImpactRiskLevel::Medium, + ImpactRiskArg::High => ImpactRiskLevel::High, + ImpactRiskArg::Critical => ImpactRiskLevel::Critical, + }, + affected_symbols: symbols, + depth1, + analyzed_commit, + input_hash, + }, + actor, + &node, + ) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("impact recorded: {}", task.id); + } + }), + GateCommand::Branch { task_id, name } => protocol + .record_branch(&task_id, &name, actor, &node) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("branch recorded: {} -> {}", task.id, name); + } + }), + GateCommand::Pr { task_id, number } => protocol + .record_pr(&task_id, number, actor, &node) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("pr recorded: {} -> #{}", task.id, number); + } + }), + GateCommand::Merge { task_id, sha } => protocol + .record_merge(&task_id, &sha, actor, &node) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("merge recorded: {} -> {}", task.id, sha); + } + }), + GateCommand::Locks => protocol.locks().map(|locks| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&locks).unwrap()); + } else if locks.is_empty() { + println!("no active locks"); + } else { + for (file, lock) in locks { + println!("{} -> {}@{}", file, lock.agent, lock.node); + } + } + }), + GateCommand::Dag => protocol.dag().map(|report| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&report).unwrap()); + } else { + for (index, level) in report.levels.iter().enumerate() { + println!("level {}: {}", index, level.join(", ")); + } + } + }), + GateCommand::Dispatchable => protocol.dispatchable().map(|report| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&report).unwrap()); + } else if report.tasks.is_empty() { + println!("no dispatchable tasks"); + } else { + for task in report.tasks { + println!("{} [{}] {}", task.id, task.priority, task.title); + } + } + }), + }; + + Ok(match result { + Ok(()) => 0, + Err(ProtocolError::GateRejected(message)) => { + emit_gate_error(format, "gate_rejected", &message); + 1 + } + Err(ProtocolError::Input(message)) => { + emit_gate_error(format, "input_error", &message); + 2 + } + Err(ProtocolError::Internal(error)) => { + emit_gate_error(format, "internal_error", &error.to_string()); + 1 + } + }) +} + +fn emit_gate_error(format: &OutputFormat, kind: &str, message: &str) { + if matches!(format, OutputFormat::Json) { + println!( + "{}", + serde_json::to_string_pretty(&serde_json::json!({ + "error": kind, + "message": message, + })) + .unwrap() + ); + } else { + eprintln!("{}: {}", kind, message); + } +} + +fn derive_task_id(issue: u64, title: &str) -> String { + if issue > 0 { + return format!("issue-{issue}"); + } + + let slug: String = title + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() { + ch.to_ascii_lowercase() + } else { + '-' + } + }) + .collect(); + let slug = slug + .split('-') + .filter(|part| !part.is_empty()) + .collect::>() + .join("-"); + + if slug.is_empty() { + "task".to_string() + } else { + slug + } +} + fn truncate_str(s: &str, max_len: usize) -> String { if s.len() > max_len { format!("{}...", &s[..max_len.saturating_sub(3)]) diff --git a/crates/miyabi-core/src/lock.rs b/crates/miyabi-core/src/lock.rs index 24a717a..cb69369 100644 --- a/crates/miyabi-core/src/lock.rs +++ b/crates/miyabi-core/src/lock.rs @@ -24,14 +24,14 @@ impl Default for LeaseConfig { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct LockConflict { pub conflicting: bool, pub held_by: Option, pub task_id: Option, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct LeaseSweep { pub released: Vec, pub active: Vec, diff --git a/crates/miyabi-core/src/protocol.rs b/crates/miyabi-core/src/protocol.rs index d73b0a9..972f50f 100644 --- a/crates/miyabi-core/src/protocol.rs +++ b/crates/miyabi-core/src/protocol.rs @@ -1,11 +1,17 @@ //! Deterministic execution protocol entry point. -use crate::error::{Error, Result}; +use crate::error::Error; use crate::gate::{evaluate_gate, Gate, GateContext, GateReport}; -use crate::lock::FileLockManager; -use crate::store::{EventStore, SnapshotStore, TaskEvent, TaskEventType}; +use crate::lock::{FileLockManager, LeaseConfig, LockConflict}; +use crate::store::{ + CompletionMode, EventStore, ExecutionTask, GitHubEvidence, GitHubIssueState, GitHubPrState, + ImpactRiskLevel, ReviewDecision, SnapshotStore, TaskEvent, TaskEventType, TaskImpact, + TaskState, TasksSnapshot, +}; use chrono::Utc; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, VecDeque}; +use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; #[derive(Debug, Clone)] @@ -28,22 +34,38 @@ impl DeterministicExecutionProtocol { } } + pub fn from_store_path(store_path: impl Into) -> Self { + let store_path = store_path.into(); + let parent = store_path + .parent() + .map(Path::to_path_buf) + .unwrap_or_else(|| PathBuf::from(".")); + let event_store = EventStore::new(parent.join("task-events.jsonl")); + let snapshot_store = SnapshotStore::new(store_path, parent.join(".tasks.lock")); + let lock_manager = FileLockManager::new( + event_store.clone(), + snapshot_store.clone(), + LeaseConfig::default(), + ); + Self::new(event_store, snapshot_store, lock_manager) + } + pub fn run( &self, task_id: &str, gates: &[Gate], actor: &str, node: &str, - ) -> Result { + ) -> ProtocolResult { let start = Instant::now(); let mut steps = Vec::new(); let mut success = true; for gate in gates { - let snapshot = self.snapshot_store.load()?; - let task = snapshot - .get_task(task_id) - .ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?; + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + let task = snapshot.get_task(task_id).ok_or_else(|| { + ProtocolError::input(format!("unknown task: {task_id}")) + })?; let context = if matches!(gate, Gate::Gate4) { let files = task @@ -52,7 +74,11 @@ impl DeterministicExecutionProtocol { .map(|lock| lock.affected_files.clone()) .unwrap_or_default(); GateContext { - lock_conflict: Some(self.lock_manager.has_conflict(&files)?), + lock_conflict: Some( + self.lock_manager + .has_conflict(&files) + .map_err(ProtocolError::from)?, + ), } } else { GateContext::default() @@ -68,14 +94,316 @@ impl DeterministicExecutionProtocol { } } - Ok(ProtocolReport { + let report = ProtocolReport { task_id: task_id.to_string(), steps, total_duration: start.elapsed(), success, + }; + + if report.success { + Ok(report) + } else { + Err(ProtocolError::gate_rejected( + report + .steps + .last() + .map(|step| step.detail.clone()) + .unwrap_or_else(|| "gate rejected".to_string()), + )) + } + } + + pub fn register( + &self, + request: RegisterTaskRequest, + actor: &str, + node: &str, + ) -> ProtocolResult { + let mut snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + if snapshot.get_task(&request.task_id).is_some() { + return Err(ProtocolError::input(format!( + "task already exists: {}", + request.task_id + ))); + } + + let mut task = ExecutionTask::new(&request.task_id, request.title); + task.current_state = TaskState::Pending; + task.dependencies = request.dependencies; + task.soft_dependencies = request.soft_dependencies; + task.priority = request.priority; + task.completion_mode = request.completion_mode; + task.updated_at = Utc::now(); + snapshot.upsert_task(task.clone()); + recompute_dependents(&mut snapshot); + let version = snapshot.version; + self.snapshot_store + .save(&snapshot, version) + .map_err(ProtocolError::from)?; + + self.append_event( + &task.id, + TaskEventType::DagChanged, + actor, + node, + version + 1, + serde_json::json!({ + "title": task.title, + "dependencies": task.dependencies, + "soft_dependencies": task.soft_dependencies, + }), + )?; + + Ok(task) + } + + pub fn status(&self, task_id: Option<&str>) -> ProtocolResult { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + if let Some(task_id) = task_id { + let task = snapshot + .get_task(task_id) + .cloned() + .ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?; + Ok(StatusReport::Task(task)) + } else { + Ok(StatusReport::Snapshot(snapshot)) + } + } + + pub fn assign( + &self, + task_id: &str, + agent: &str, + node: &str, + files: &[String], + ) -> ProtocolResult { + self.run(task_id, &[Gate::Gate2], agent, node)?; + let conflict = self + .lock_manager + .has_conflict(files) + .map_err(ProtocolError::from)?; + if conflict.conflicting { + return Err(ProtocolError::gate_rejected(format!( + "lock conflict held by {}", + conflict.held_by.unwrap_or_else(|| "unknown".to_string()) + ))); + } + + self.lock_manager + .acquire_lock(task_id, agent, node, files) + .map_err(ProtocolError::from)?; + + let task = self.transition_task(task_id, TaskState::Implementing, agent, node)?; + Ok(AssignmentResult { + task, + lock_conflict: LockConflict { + conflicting: false, + held_by: None, + task_id: None, + }, }) } + pub fn record_impact( + &self, + task_id: &str, + impact: ImpactInput, + actor: &str, + node: &str, + ) -> ProtocolResult { + let payload_risk = impact.risk_level; + let payload_symbols = impact.affected_symbols; + let depth1 = impact.depth1.clone(); + let analyzed_commit = impact.analyzed_commit.clone(); + let input_hash = impact.input_hash.clone(); + self.update_task(task_id, actor, node, TaskEventType::ImpactRecorded, |task| { + task.impact = Some(TaskImpact { + risk_level: impact.risk_level, + affected_symbols: impact.affected_symbols, + depth1: depth1.clone(), + analyzed_at: Utc::now(), + analyzed_commit: analyzed_commit.clone(), + input_hash: input_hash.clone(), + }); + Ok(serde_json::json!({ + "risk_level": payload_risk, + "affected_symbols": payload_symbols + })) + }) + } + + pub fn record_branch( + &self, + task_id: &str, + branch_name: &str, + actor: &str, + node: &str, + ) -> ProtocolResult { + if branch_name.trim().is_empty() { + return Err(ProtocolError::input("branch name must not be empty")); + } + self.update_task(task_id, actor, node, TaskEventType::BranchCreated, |task| { + task.branch_name = Some(branch_name.to_string()); + Ok(serde_json::json!({ "branch_name": branch_name })) + }) + } + + pub fn record_pr( + &self, + task_id: &str, + pr_number: u64, + actor: &str, + node: &str, + ) -> ProtocolResult { + if pr_number == 0 { + return Err(ProtocolError::input("pr number must be greater than 0")); + } + self.update_task(task_id, actor, node, TaskEventType::PrCreated, |task| { + let branch_name = task.branch_name.clone().unwrap_or_default(); + task.github_evidence = Some(GitHubEvidence { + pr_number, + pr_head_ref: branch_name.clone(), + pr_state: GitHubPrState::Open, + merge_commit_sha: None, + merged_at: None, + review_decision: Some(ReviewDecision::ReviewRequired), + issue_state: GitHubIssueState::Open, + issue_closed_by_pr: false, + }); + task.current_state = TaskState::Reviewing; + Ok(serde_json::json!({ "pr_number": pr_number, "head_ref": branch_name })) + }) + } + + pub fn record_merge( + &self, + task_id: &str, + merge_commit_sha: &str, + actor: &str, + node: &str, + ) -> ProtocolResult { + if merge_commit_sha.len() != 40 || !merge_commit_sha.chars().all(|c| c.is_ascii_hexdigit()) { + return Err(ProtocolError::input("merge sha must be a 40-char hex string")); + } + + self.update_task(task_id, actor, node, TaskEventType::MergeVerified, |task| { + let mut evidence = task.github_evidence.clone().ok_or_else(|| { + ProtocolError::gate_rejected("pull request must be recorded before merge") + })?; + evidence.pr_state = GitHubPrState::Merged; + evidence.merge_commit_sha = Some(merge_commit_sha.to_string()); + evidence.merged_at = Some(Utc::now()); + evidence.review_decision = Some(ReviewDecision::Approved); + evidence.issue_state = GitHubIssueState::Closed; + evidence.issue_closed_by_pr = true; + task.github_evidence = Some(evidence); + task.current_state = TaskState::Merged; + Ok(serde_json::json!({ "merge_commit_sha": merge_commit_sha })) + }) + } + + pub fn locks(&self) -> ProtocolResult> { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + Ok(snapshot.file_locks) + } + + pub fn dag(&self) -> ProtocolResult { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + Ok(compute_dag(&snapshot)) + } + + pub fn dispatchable(&self) -> ProtocolResult { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + let tasks = snapshot + .tasks + .iter() + .filter(|task| matches!(task.current_state, TaskState::Pending | TaskState::Blocked)) + .filter(|task| dependencies_satisfied(task, &snapshot)) + .filter(|task| { + let files = task + .lock + .as_ref() + .map(|lock| lock.affected_files.clone()) + .unwrap_or_default(); + !self + .lock_manager + .has_conflict(&files) + .map(|conflict| conflict.conflicting) + .unwrap_or(false) + }) + .cloned() + .collect(); + Ok(DispatchableReport { tasks }) + } + + fn transition_task( + &self, + task_id: &str, + to: TaskState, + actor: &str, + node: &str, + ) -> ProtocolResult { + self.update_task(task_id, actor, node, TaskEventType::StateTransition, |task| { + task.current_state = to; + Ok(serde_json::json!({ "to": to })) + }) + } + + fn update_task( + &self, + task_id: &str, + actor: &str, + node: &str, + event_type: TaskEventType, + mut update: F, + ) -> ProtocolResult + where + F: FnMut(&mut ExecutionTask) -> ProtocolResult, + { + let mut snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + let task = snapshot + .get_task_mut(task_id) + .ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?; + let payload = update(task)?; + task.updated_at = Utc::now(); + let updated = task.clone(); + let version = snapshot.version; + self.snapshot_store + .save(&snapshot, version) + .map_err(ProtocolError::from)?; + self.append_event(task_id, event_type, actor, node, version + 1, payload)?; + Ok(updated) + } + + fn append_event( + &self, + task_id: &str, + event_type: TaskEventType, + actor: &str, + node: &str, + version: u64, + payload: serde_json::Value, + ) -> ProtocolResult<()> { + self.event_store + .append(&TaskEvent { + id: format!( + "{task_id}-{:?}-{}", + event_type, + Utc::now().timestamp_millis() + ), + ts: Utc::now(), + event_type, + task_id: task_id.to_string(), + agent: actor.to_string(), + node: node.to_string(), + payload, + version, + }) + .map_err(ProtocolError::from)?; + Ok(()) + } + fn record_gate( &self, task_id: &str, @@ -84,31 +412,24 @@ impl DeterministicExecutionProtocol { actor: &str, node: &str, version: u64, - ) -> Result<()> { + ) -> ProtocolResult<()> { let event_type = if report.success { TaskEventType::GatePassed } else { TaskEventType::GateRejected }; - self.event_store.append(&TaskEvent { - id: format!( - "{task_id}-{}-{}", - gate.label(), - Utc::now().timestamp_millis() - ), - ts: Utc::now(), + self.append_event( + task_id, event_type, - task_id: task_id.to_string(), - agent: actor.to_string(), - node: node.to_string(), - payload: serde_json::json!({ + actor, + node, + version, + serde_json::json!({ "gate": gate.label(), "detail": report.detail, "duration_ms": report.duration.as_millis(), }), - version, - })?; - Ok(()) + ) } } @@ -120,57 +441,298 @@ pub struct ProtocolReport { pub success: bool, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RegisterTaskRequest { + pub task_id: String, + pub title: String, + pub dependencies: Vec, + pub soft_dependencies: Vec, + pub priority: u32, + pub completion_mode: CompletionMode, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AssignmentResult { + pub task: ExecutionTask, + pub lock_conflict: LockConflict, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ImpactInput { + pub risk_level: ImpactRiskLevel, + pub affected_symbols: usize, + pub depth1: Vec, + pub analyzed_commit: Option, + pub input_hash: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum StatusReport { + Task(ExecutionTask), + Snapshot(TasksSnapshot), +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DagReport { + pub levels: Vec>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DispatchableReport { + pub tasks: Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtocolError { + #[error("gate rejected: {0}")] + GateRejected(String), + #[error("input error: {0}")] + Input(String), + #[error("{0}")] + Internal(#[from] Error), +} + +impl ProtocolError { + pub fn gate_rejected(msg: impl Into) -> Self { + Self::GateRejected(msg.into()) + } + + pub fn input(msg: impl Into) -> Self { + Self::Input(msg.into()) + } +} + +pub type ProtocolResult = std::result::Result; + +fn dependencies_satisfied(task: &ExecutionTask, snapshot: &TasksSnapshot) -> bool { + task.dependencies.iter().all(|dep_id| { + snapshot.get_task(dep_id).is_some_and(|dep| { + matches!(dep.current_state, TaskState::Done | TaskState::Merged) + }) + }) +} + +fn recompute_dependents(snapshot: &mut TasksSnapshot) { + let mut dependents: HashMap> = HashMap::new(); + for task in &snapshot.tasks { + for dependency in &task.dependencies { + dependents + .entry(dependency.clone()) + .or_default() + .push(task.id.clone()); + } + } + + for task in &mut snapshot.tasks { + task.dependents = dependents.remove(&task.id).unwrap_or_default(); + } +} + +fn compute_dag(snapshot: &TasksSnapshot) -> DagReport { + let mut indegree: HashMap = snapshot + .tasks + .iter() + .map(|task| (task.id.clone(), task.dependencies.len())) + .collect(); + let mut queue: VecDeque = snapshot + .tasks + .iter() + .filter(|task| task.dependencies.is_empty()) + .map(|task| task.id.clone()) + .collect(); + let mut levels = Vec::new(); + let task_map: HashMap<&str, &ExecutionTask> = + snapshot.tasks.iter().map(|task| (task.id.as_str(), task)).collect(); + + while !queue.is_empty() { + let mut next = VecDeque::new(); + let mut level = Vec::new(); + + while let Some(task_id) = queue.pop_front() { + level.push(task_id.clone()); + for dependent in snapshot + .tasks + .iter() + .filter(|task| task.dependencies.contains(&task_id)) + { + if let Some(entry) = indegree.get_mut(&dependent.id) { + *entry = entry.saturating_sub(1); + if *entry == 0 { + next.push_back(dependent.id.clone()); + } + } + } + } + + levels.push(level); + queue = next; + } + + for task in &snapshot.tasks { + if task_map.contains_key(task.id.as_str()) + && !levels.iter().flatten().any(|task_id| task_id == &task.id) + { + levels.push(vec![task.id.clone()]); + } + } + + DagReport { levels } +} + #[cfg(test)] mod tests { use super::*; - use crate::lock::LeaseConfig; - use crate::store::{ExecutionTask, SnapshotStore, TasksSnapshot}; use tempfile::TempDir; + fn fixture() -> (TempDir, DeterministicExecutionProtocol) { + let tmp = TempDir::new().unwrap(); + let protocol = + DeterministicExecutionProtocol::from_store_path(tmp.path().join("tasks.json")); + (tmp, protocol) + } + #[test] fn protocol_stops_at_first_failed_gate_and_records_events() { - let tmp = TempDir::new().unwrap(); - let event_store = EventStore::new(tmp.path().join("events.jsonl")); - let snapshot_store = SnapshotStore::new( - tmp.path().join("tasks.snapshot.json"), - tmp.path().join(".tasks.lock"), - ); - let lock_manager = FileLockManager::new( - event_store.clone(), - snapshot_store.clone(), - LeaseConfig::default(), - ); - let protocol = DeterministicExecutionProtocol::new( - event_store.clone(), - snapshot_store.clone(), - lock_manager, - ); + let (_tmp, protocol) = fixture(); + protocol + .register( + RegisterTaskRequest { + task_id: "phase-0".into(), + title: "Phase 0".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .register( + RegisterTaskRequest { + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec!["phase-0".into()], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); - let mut snapshot = TasksSnapshot::default(); - let mut task = ExecutionTask::new("phase-a", "Phase A"); - task.current_state = crate::store::TaskState::Pending; - task.dependencies.push("phase-0".into()); - snapshot.upsert_task(task); - snapshot.upsert_task(ExecutionTask::new("phase-0", "Phase 0")); - snapshot_store.save(&snapshot, 0).unwrap(); - - let report = protocol + let err = protocol .run( "phase-a", &[Gate::Gate1, Gate::Gate2, Gate::Gate3], "codex", "macbook", ) + .unwrap_err(); + assert!(matches!(err, ProtocolError::GateRejected(_))); + + let events = protocol.event_store.replay_for_task("phase-a").unwrap(); + assert_eq!(events.len(), 3); + assert_eq!(events[1].event_type, TaskEventType::GatePassed); + assert_eq!(events[2].event_type, TaskEventType::GateRejected); + } + + #[test] + fn register_status_and_dispatchable_flow() { + let (_tmp, protocol) = fixture(); + protocol + .register( + RegisterTaskRequest { + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 10, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) .unwrap(); - assert!(!report.success); - assert_eq!(report.steps.len(), 2); - assert!(report.steps[0].success); - assert!(!report.steps[1].success); + let status = protocol.status(Some("phase-a")).unwrap(); + match status { + StatusReport::Task(task) => assert_eq!(task.title, "Phase A"), + StatusReport::Snapshot(_) => panic!("expected task status"), + } - let events = event_store.replay_for_task("phase-a").unwrap(); - assert_eq!(events.len(), 2); - assert_eq!(events[0].event_type, TaskEventType::GatePassed); - assert_eq!(events[1].event_type, TaskEventType::GateRejected); + let dispatchable = protocol.dispatchable().unwrap(); + assert_eq!(dispatchable.tasks.len(), 1); + assert_eq!(dispatchable.tasks[0].id, "phase-a"); + } + + #[test] + fn assign_branch_pr_merge_updates_task() { + let (_tmp, protocol) = fixture(); + protocol + .register( + RegisterTaskRequest { + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .record_impact( + "phase-a", + ImpactInput { + risk_level: ImpactRiskLevel::Low, + affected_symbols: 1, + depth1: vec!["main".into()], + analyzed_commit: None, + input_hash: None, + }, + "codex", + "macbook", + ) + .unwrap(); + + let assigned = protocol + .assign( + "phase-a", + "codex", + "macbook", + &[String::from("crates/miyabi-core/src/main.rs")], + ) + .unwrap(); + assert_eq!(assigned.task.current_state, TaskState::Implementing); + + protocol + .record_branch("phase-a", "feature/phase-a", "codex", "macbook") + .unwrap(); + protocol.record_pr("phase-a", 12, "codex", "macbook").unwrap(); + let merged = protocol + .record_merge( + "phase-a", + "0123456789abcdef0123456789abcdef01234567", + "codex", + "macbook", + ) + .unwrap(); + + assert_eq!(merged.current_state, TaskState::Merged); + assert_eq!( + merged + .github_evidence + .as_ref() + .unwrap() + .merge_commit_sha + .as_deref(), + Some("0123456789abcdef0123456789abcdef01234567") + ); } } diff --git a/crates/miyabi-core/src/store.rs b/crates/miyabi-core/src/store.rs index a474b4a..7e3482f 100644 --- a/crates/miyabi-core/src/store.rs +++ b/crates/miyabi-core/src/store.rs @@ -225,6 +225,11 @@ impl TasksSnapshot { self.tasks.push(task); } } + + pub fn remove_task(&mut self, task_id: &str) -> Option { + let index = self.tasks.iter().position(|task| task.id == task_id)?; + Some(self.tasks.remove(index)) + } } #[derive(Debug, Clone)] @@ -313,7 +318,13 @@ impl SnapshotStore { return Ok(TasksSnapshot::default()); } let raw = fs::read_to_string(&self.file_path)?; - Ok(serde_json::from_str(&raw)?) + match serde_json::from_str(&raw) { + Ok(snapshot) => Ok(snapshot), + Err(_) => { + let legacy: LegacyTasksFile = serde_json::from_str(&raw)?; + Ok(legacy.into_snapshot()) + } + } } pub fn save(&self, snapshot: &TasksSnapshot, expected_version: u64) -> Result<()> { @@ -475,6 +486,177 @@ fn ensure_parent_dir(path: &Path) -> Result<()> { Ok(()) } +#[derive(Debug, Clone, Deserialize)] +struct LegacyTasksFile { + #[serde(default)] + version: u64, + #[serde(default)] + tasks: Vec, +} + +impl LegacyTasksFile { + fn into_snapshot(self) -> TasksSnapshot { + let mut snapshot = TasksSnapshot { + version: self.version, + generated_at: Utc::now(), + generated_from_event_id: None, + tasks: self.tasks.into_iter().map(LegacyTask::into_execution_task).collect(), + file_locks: HashMap::new(), + }; + + for task in &snapshot.tasks { + if let Some(lock) = &task.lock { + let owner_parts: Vec<&str> = lock.locked_by.split('@').collect(); + let agent = owner_parts.first().copied().unwrap_or("unknown").to_string(); + let node = owner_parts.get(1).copied().unwrap_or("unknown").to_string(); + let expires_at = lease_expiry(lock.last_heartbeat, lock.lease_duration_sec); + + for file in &lock.affected_files { + snapshot.file_locks.insert( + file.clone(), + FileLockEntry { + task_id: task.id.clone(), + agent: agent.clone(), + node: node.clone(), + expires_at, + }, + ); + } + } + } + + snapshot + } +} + +#[derive(Debug, Clone, Deserialize)] +struct LegacyTask { + id: String, + title: String, + #[serde(default)] + state: String, + #[serde(default)] + dependencies: Vec, + #[serde(default)] + dependents: Vec, + #[serde(default)] + soft_dependencies: Vec, + #[serde(default)] + lock: Option, + #[serde(default)] + impact: Option, + #[serde(default)] + branch_name: Option, + #[serde(default)] + pr_number: Option, + #[serde(default)] + merge_commit: Option, + #[serde(default)] + created_at: Option>, + #[serde(default)] + updated_at: Option>, +} + +impl LegacyTask { + fn into_execution_task(self) -> ExecutionTask { + let created_at = self.created_at.unwrap_or_else(Utc::now); + let updated_at = self.updated_at.unwrap_or(created_at); + let github_evidence = self.pr_number.map(|pr_number| GitHubEvidence { + pr_number, + pr_head_ref: self.branch_name.clone().unwrap_or_default(), + pr_state: if self.merge_commit.is_some() { + GitHubPrState::Merged + } else { + GitHubPrState::Open + }, + merge_commit_sha: self.merge_commit, + merged_at: None, + review_decision: None, + issue_state: GitHubIssueState::Open, + issue_closed_by_pr: false, + }); + + ExecutionTask { + id: self.id, + title: self.title, + current_state: legacy_state(&self.state), + dependencies: self.dependencies, + dependents: self.dependents, + soft_dependencies: self.soft_dependencies, + lock: self.lock.map(LegacyLock::into_snapshot), + impact: self.impact.map(LegacyImpact::into_snapshot), + branch_name: self.branch_name, + github_evidence, + completion_mode: CompletionMode::GithubPr, + human_approval: None, + priority: 0, + created_at, + updated_at, + } + } +} + +#[derive(Debug, Clone, Deserialize)] +struct LegacyLock { + locked_by: String, + locked_at: DateTime, + ttl_secs: u64, + #[serde(default)] + affected_files: Vec, +} + +impl LegacyLock { + fn into_snapshot(self) -> TaskLockSnapshot { + TaskLockSnapshot { + locked_by: self.locked_by, + locked_at: self.locked_at, + lease_duration_sec: self.ttl_secs, + last_heartbeat: self.locked_at, + affected_files: self.affected_files, + } + } +} + +#[derive(Debug, Clone, Deserialize)] +struct LegacyImpact { + risk_level: ImpactRiskLevel, + affected_symbols: usize, + #[serde(default)] + depth1: Vec, + analyzed_at: DateTime, +} + +impl LegacyImpact { + fn into_snapshot(self) -> TaskImpact { + TaskImpact { + risk_level: self.risk_level, + affected_symbols: self.affected_symbols, + depth1: self.depth1, + analyzed_at: self.analyzed_at, + analyzed_commit: None, + input_hash: None, + } + } +} + +fn legacy_state(state: &str) -> TaskState { + match state { + "draft" => TaskState::Draft, + "pending" => TaskState::Pending, + "analyzing" => TaskState::Analyzing, + "implementing" => TaskState::Implementing, + "reviewing" => TaskState::Reviewing, + "merged" => TaskState::Merged, + "deploying" => TaskState::Deploying, + "done" => TaskState::Done, + "blocked" => TaskState::Blocked, + "failed" => TaskState::Failed, + "cancelled" => TaskState::Cancelled, + "awaiting_github_sync" => TaskState::AwaitingGithubSync, + _ => TaskState::Draft, + } +} + pub fn lease_expiry(last_heartbeat: DateTime, lease_duration_sec: u64) -> DateTime { last_heartbeat + Duration::seconds(lease_duration_sec as i64) }