From a965bd3b625ea86eb3e12257409e512c4fc0e53e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=20=E9=A7=BF=E7=94=AB=20=28Shunsuke=20Hayashi=29?= Date: Fri, 10 Apr 2026 08:11:29 +0900 Subject: [PATCH] =?UTF-8?q?[=E8=BF=BD=E5=8A=A0]=20verify=5Fmerge=20+=20esc?= =?UTF-8?q?ape=20hatch=20(force=5Funlock,=20manual=5Fcomplete)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/miyabi-cli/src/main.rs | 62 +++++ crates/miyabi-core/src/protocol.rs | 400 +++++++++++++++++++++++++++-- 2 files changed, 443 insertions(+), 19 deletions(-) diff --git a/crates/miyabi-cli/src/main.rs b/crates/miyabi-cli/src/main.rs index dfdbdcc..3ab50ad 100644 --- a/crates/miyabi-cli/src/main.rs +++ b/crates/miyabi-cli/src/main.rs @@ -210,6 +210,28 @@ enum GateCommand { Pr { task_id: String, number: u64 }, /// Record merge verification Merge { task_id: String, sha: String }, + /// Verify merge state using GitHub metadata + VerifyMerge { + task_id: String, + #[arg(long)] + repo: String, + }, + /// Force-release an active lock + ForceUnlock { + task_id: String, + #[arg(long)] + reason: String, + #[arg(long)] + operator: String, + }, + /// Mark a task complete without merge verification + ManualComplete { + task_id: String, + #[arg(long)] + reason: String, + #[arg(long)] + operator: String, + }, /// List active locks Locks, /// Show DAG levels @@ -1350,6 +1372,46 @@ fn handle_gate_command( println!("merge recorded: {} -> {}", task.id, sha); } }), + GateCommand::VerifyMerge { task_id, repo } => protocol + .verify_merge(&task_id, &repo, actor, &node) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + let sha = task + .github_evidence + .as_ref() + .and_then(|evidence| evidence.merge_commit_sha.as_deref()) + .unwrap_or("unknown"); + println!("merge verified: {} -> {}", task.id, sha); + } + }), + GateCommand::ForceUnlock { + task_id, + reason, + operator, + } => protocol + .force_unlock(&task_id, &reason, &operator) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("lock released: {} by {}", task.id, operator); + } + }), + GateCommand::ManualComplete { + task_id, + reason, + operator, + } => protocol + .manual_complete(&task_id, &reason, &operator) + .map(|task| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&task).unwrap()); + } else { + println!("task completed manually: {} by {}", task.id, operator); + } + }), GateCommand::Locks => protocol.locks().map(|locks| { if matches!(format, OutputFormat::Json) { println!("{}", serde_json::to_string_pretty(&locks).unwrap()); diff --git a/crates/miyabi-core/src/protocol.rs b/crates/miyabi-core/src/protocol.rs index a2fd520..0ab2b23 100644 --- a/crates/miyabi-core/src/protocol.rs +++ b/crates/miyabi-core/src/protocol.rs @@ -15,10 +15,12 @@ use std::collections::{HashMap, VecDeque}; use std::fs; use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; +use std::process::Command; use std::time::{Duration, Instant}; const MAX_CONTEXT_TOKENS: usize = 4_000; const FILE_SNIPPET_LINE_LIMIT: usize = 30; +const ANNOUNCE_CHAR_LIMIT: usize = 180; #[derive(Debug, Clone)] pub struct DeterministicExecutionProtocol { @@ -370,36 +372,165 @@ impl DeterministicExecutionProtocol { actor: &str, node: &str, ) -> ProtocolResult { - if merge_commit_sha.len() != 40 || !merge_commit_sha.chars().all(|c| c.is_ascii_hexdigit()) - { + if !is_valid_merge_sha(merge_commit_sha) { return Err(ProtocolError::gate_rejected( "merge sha must be a 40-char hex string", )); } - let merged = - self.update_task(task_id, actor, node, TaskEventType::MergeVerified, |task| { - let mut evidence = task.github_evidence.clone().ok_or_else(|| { - ProtocolError::gate_rejected("pull request must be recorded before merge") - })?; - evidence.pr_state = GitHubPrState::Merged; - evidence.merge_commit_sha = Some(merge_commit_sha.to_string()); - evidence.merged_at = Some(Utc::now()); - evidence.review_decision = Some(ReviewDecision::Approved); - evidence.issue_state = GitHubIssueState::Closed; - evidence.issue_closed_by_pr = true; - task.github_evidence = Some(evidence); - task.current_state = TaskState::Merged; - Ok(serde_json::json!({ "merge_commit_sha": merge_commit_sha })) - })?; + self.apply_verified_merge(task_id, merge_commit_sha, Some(Utc::now()), actor, node) + } + + pub fn verify_merge( + &self, + task_id: &str, + repo: &str, + actor: &str, + node: &str, + ) -> ProtocolResult { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + let task = snapshot + .get_task(task_id) + .ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?; + let evidence = task.github_evidence.as_ref().ok_or_else(|| { + ProtocolError::gate_rejected("pull request must be recorded before merge") + })?; + + match fetch_merge_verification(repo, evidence.pr_number) { + Ok(Some(verification)) => { + if verification.state != "MERGED" { + return Err(ProtocolError::gate_rejected(format!( + "pull request #{} is not merged", + evidence.pr_number + ))); + } + + let merge_commit_sha = verification + .merge_commit + .as_ref() + .map(|commit| commit.oid.as_str()) + .filter(|oid| is_valid_merge_sha(oid)) + .ok_or_else(|| { + ProtocolError::gate_rejected( + "merged pull request is missing a valid merge commit sha", + ) + })?; + + self.apply_verified_merge( + task_id, + merge_commit_sha, + verification.merged_at, + actor, + node, + ) + } + Ok(None) => { + let merge_commit_sha = evidence + .merge_commit_sha + .as_deref() + .filter(|sha| is_valid_merge_sha(sha)) + .ok_or_else(|| { + ProtocolError::gate_rejected( + "gh CLI unavailable and no valid recorded merge sha found", + ) + })?; + self.apply_verified_merge( + task_id, + merge_commit_sha, + evidence.merged_at, + actor, + node, + ) + } + Err(err) => Err(ProtocolError::from(err)), + } + } + + pub fn force_unlock( + &self, + task_id: &str, + reason: &str, + operator: &str, + ) -> ProtocolResult { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + let had_lock = snapshot + .get_task(task_id) + .ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))? + .lock + .is_some(); self.lock_manager .release_lock(task_id) .map_err(ProtocolError::from)?; - self.unblock_dependents_after_merge(task_id, actor, node)?; + + let refreshed = self.snapshot_store.load().map_err(ProtocolError::from)?; + self.append_event( + task_id, + TaskEventType::AuditRecorded, + operator, + "manual", + refreshed.version + 1, + serde_json::json!({ + "action": "force_unlock", + "reason": reason, + "operator": operator, + "had_lock": had_lock + }), + )?; + announce_escape_hatch("force_unlock", task_id, operator, reason); let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; - Ok(snapshot.get_task(task_id).cloned().unwrap_or(merged)) + snapshot + .get_task(task_id) + .cloned() + .ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}"))) + } + + pub fn manual_complete( + &self, + task_id: &str, + reason: &str, + operator: &str, + ) -> ProtocolResult { + self.lock_manager + .release_lock(task_id) + .map_err(ProtocolError::from)?; + + let task = self.update_task( + task_id, + operator, + "manual", + TaskEventType::StateTransition, + |task| { + let from = task.current_state; + task.current_state = TaskState::Done; + Ok(serde_json::json!({ + "from": from, + "to": TaskState::Done, + "reason": reason, + "operator": operator, + "manual": true + })) + }, + )?; + + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + self.append_event( + task_id, + TaskEventType::AuditRecorded, + operator, + "manual", + snapshot.version + 1, + serde_json::json!({ + "action": "manual_complete", + "reason": reason, + "operator": operator + }), + )?; + announce_escape_hatch("manual_complete", task_id, operator, reason); + + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + Ok(snapshot.get_task(task_id).cloned().unwrap_or(task)) } pub fn locks(&self) -> ProtocolResult> { @@ -758,6 +889,39 @@ impl DeterministicExecutionProtocol { Ok(()) } + + fn apply_verified_merge( + &self, + task_id: &str, + merge_commit_sha: &str, + merged_at: Option>, + actor: &str, + node: &str, + ) -> ProtocolResult { + let merged = + self.update_task(task_id, actor, node, TaskEventType::MergeVerified, |task| { + let mut evidence = task.github_evidence.clone().ok_or_else(|| { + ProtocolError::gate_rejected("pull request must be recorded before merge") + })?; + evidence.pr_state = GitHubPrState::Merged; + evidence.merge_commit_sha = Some(merge_commit_sha.to_string()); + evidence.merged_at = Some(merged_at.unwrap_or_else(Utc::now)); + evidence.review_decision = Some(ReviewDecision::Approved); + evidence.issue_state = GitHubIssueState::Closed; + evidence.issue_closed_by_pr = true; + task.github_evidence = Some(evidence); + task.current_state = TaskState::Merged; + Ok(serde_json::json!({ "merge_commit_sha": merge_commit_sha })) + })?; + + self.lock_manager + .release_lock(task_id) + .map_err(ProtocolError::from)?; + self.unblock_dependents_after_merge(task_id, actor, node)?; + + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + Ok(snapshot.get_task(task_id).cloned().unwrap_or(merged)) + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -840,6 +1004,20 @@ impl ProtocolError { pub type ProtocolResult = std::result::Result; +#[derive(Debug, Deserialize)] +struct GhPrViewResponse { + #[serde(rename = "mergeCommit")] + merge_commit: Option, + state: String, + #[serde(rename = "mergedAt")] + merged_at: Option>, +} + +#[derive(Debug, Deserialize)] +struct GhMergeCommit { + oid: String, +} + fn dependencies_satisfied(task: &ExecutionTask, snapshot: &TasksSnapshot) -> bool { task.dependencies.iter().all(|dep_id| { snapshot @@ -905,6 +1083,61 @@ fn estimate_tokens(content: &str) -> usize { } } +fn is_valid_merge_sha(sha: &str) -> bool { + sha.len() == 40 && sha.chars().all(|c| c.is_ascii_hexdigit()) +} + +fn fetch_merge_verification(repo: &str, pr_number: u64) -> Result, Error> { + let output = match Command::new("gh") + .args([ + "pr", + "view", + &pr_number.to_string(), + "--repo", + repo, + "--json", + "mergeCommit,state,mergedAt", + ]) + .output() + { + Ok(output) => output, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(Error::Io(err)), + }; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + let detail = if stderr.is_empty() { + format!("gh pr view exited with status {}", output.status) + } else { + stderr + }; + return Err(Error::Tool(detail)); + } + + Ok(Some(serde_json::from_slice(&output.stdout)?)) +} + +fn announce_escape_hatch(action: &str, task_id: &str, operator: &str, reason: &str) { + let mut message = format!("{action} {task_id} by {operator}: {reason}"); + if message.chars().count() > ANNOUNCE_CHAR_LIMIT { + message = format!( + "{}...", + message + .chars() + .take(ANNOUNCE_CHAR_LIMIT - 3) + .collect::() + ); + } + + let _ = Command::new(format!( + "{}/bin/announce", + std::env::var("HOME").unwrap_or_else(|_| "/Users/shunsukehayashi".to_string()) + )) + .args(["--mode", "macbook", &message]) + .output(); +} + fn read_file_snippet(path: &Path, max_lines: usize) -> Result { let file = fs::File::open(path)?; let reader = BufReader::new(file); @@ -1082,6 +1315,11 @@ mod tests { LOCK.get_or_init(|| Mutex::new(())) } + fn path_env_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) + } + fn fixture() -> (TempDir, DeterministicExecutionProtocol) { let tmp = TempDir::new().unwrap(); let protocol = @@ -1731,4 +1969,128 @@ mod tests { assert!(matches!(err, ProtocolError::GateRejected(_))); } + + #[test] + fn verify_merge_with_valid_sha_passes_via_fallback() { + let _guard = path_env_lock().lock().unwrap(); + let original_path = std::env::var_os("PATH"); + std::env::set_var("PATH", ""); + + let (_tmp, protocol) = fixture(); + protocol + .register( + RegisterTaskRequest { + issue: 1, + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .record_pr("phase-a", 12, "codex", "macbook") + .unwrap(); + + let mut snapshot = protocol.snapshot_store.load().unwrap(); + let version = snapshot.version; + let task = snapshot.get_task_mut("phase-a").unwrap(); + let evidence = task.github_evidence.as_mut().unwrap(); + evidence.merge_commit_sha = Some("0123456789abcdef0123456789abcdef01234567".into()); + protocol.snapshot_store.save(&snapshot, version).unwrap(); + + let merged = protocol + .verify_merge("phase-a", "owner/repo", "codex", "macbook") + .unwrap(); + + assert_eq!(merged.current_state, TaskState::Merged); + assert_eq!( + merged + .github_evidence + .as_ref() + .and_then(|evidence| evidence.merge_commit_sha.as_deref()), + Some("0123456789abcdef0123456789abcdef01234567") + ); + + if let Some(path) = original_path { + std::env::set_var("PATH", path); + } else { + std::env::remove_var("PATH"); + } + } + + #[test] + fn force_unlock_releases_lock() { + let (_tmp, protocol) = fixture(); + protocol + .register( + RegisterTaskRequest { + issue: 1, + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .record_impact( + "phase-a", + ImpactInput { + risk_level: ImpactRiskLevel::Low, + affected_symbols: 1, + depth1: vec!["force_unlock".into()], + analyzed_commit: None, + input_hash: None, + approve: false, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .assign("phase-a", "codex", "macbook", &[String::from("src/a.rs")]) + .unwrap(); + + let task = protocol + .force_unlock("phase-a", "operator override", "maintainer") + .unwrap(); + + assert!(task.lock.is_none()); + assert!(protocol.locks().unwrap().is_empty()); + } + + #[test] + fn manual_complete_sets_state_to_done() { + let (_tmp, protocol) = fixture(); + protocol + .register( + RegisterTaskRequest { + issue: 1, + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + + let task = protocol + .manual_complete("phase-a", "manual verification", "maintainer") + .unwrap(); + + assert_eq!(task.current_state, TaskState::Done); + } }