[追加] verify_merge + escape hatch (force_unlock, manual_complete)

This commit is contained in:
林 駿甫 (Shunsuke Hayashi) 2026-04-10 08:11:29 +09:00
parent e6ec2cab8d
commit a965bd3b62
2 changed files with 443 additions and 19 deletions

View file

@ -210,6 +210,28 @@ enum GateCommand {
Pr { task_id: String, number: u64 },
/// Record merge verification
Merge { task_id: String, sha: String },
/// Verify merge state using GitHub metadata
VerifyMerge {
task_id: String,
#[arg(long)]
repo: String,
},
/// Force-release an active lock
ForceUnlock {
task_id: String,
#[arg(long)]
reason: String,
#[arg(long)]
operator: String,
},
/// Mark a task complete without merge verification
ManualComplete {
task_id: String,
#[arg(long)]
reason: String,
#[arg(long)]
operator: String,
},
/// List active locks
Locks,
/// Show DAG levels
@ -1350,6 +1372,46 @@ fn handle_gate_command(
println!("merge recorded: {} -> {}", task.id, sha);
}
}),
GateCommand::VerifyMerge { task_id, repo } => protocol
.verify_merge(&task_id, &repo, actor, &node)
.map(|task| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&task).unwrap());
} else {
let sha = task
.github_evidence
.as_ref()
.and_then(|evidence| evidence.merge_commit_sha.as_deref())
.unwrap_or("unknown");
println!("merge verified: {} -> {}", task.id, sha);
}
}),
GateCommand::ForceUnlock {
task_id,
reason,
operator,
} => protocol
.force_unlock(&task_id, &reason, &operator)
.map(|task| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&task).unwrap());
} else {
println!("lock released: {} by {}", task.id, operator);
}
}),
GateCommand::ManualComplete {
task_id,
reason,
operator,
} => protocol
.manual_complete(&task_id, &reason, &operator)
.map(|task| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&task).unwrap());
} else {
println!("task completed manually: {} by {}", task.id, operator);
}
}),
GateCommand::Locks => protocol.locks().map(|locks| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&locks).unwrap());

View file

@ -15,10 +15,12 @@ use std::collections::{HashMap, VecDeque};
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
const MAX_CONTEXT_TOKENS: usize = 4_000;
const FILE_SNIPPET_LINE_LIMIT: usize = 30;
const ANNOUNCE_CHAR_LIMIT: usize = 180;
#[derive(Debug, Clone)]
pub struct DeterministicExecutionProtocol {
@ -370,36 +372,165 @@ impl DeterministicExecutionProtocol {
actor: &str,
node: &str,
) -> ProtocolResult<ExecutionTask> {
if merge_commit_sha.len() != 40 || !merge_commit_sha.chars().all(|c| c.is_ascii_hexdigit())
{
if !is_valid_merge_sha(merge_commit_sha) {
return Err(ProtocolError::gate_rejected(
"merge sha must be a 40-char hex string",
));
}
let merged =
self.update_task(task_id, actor, node, TaskEventType::MergeVerified, |task| {
let mut evidence = task.github_evidence.clone().ok_or_else(|| {
ProtocolError::gate_rejected("pull request must be recorded before merge")
})?;
evidence.pr_state = GitHubPrState::Merged;
evidence.merge_commit_sha = Some(merge_commit_sha.to_string());
evidence.merged_at = Some(Utc::now());
evidence.review_decision = Some(ReviewDecision::Approved);
evidence.issue_state = GitHubIssueState::Closed;
evidence.issue_closed_by_pr = true;
task.github_evidence = Some(evidence);
task.current_state = TaskState::Merged;
Ok(serde_json::json!({ "merge_commit_sha": merge_commit_sha }))
})?;
self.apply_verified_merge(task_id, merge_commit_sha, Some(Utc::now()), actor, node)
}
pub fn verify_merge(
&self,
task_id: &str,
repo: &str,
actor: &str,
node: &str,
) -> ProtocolResult<ExecutionTask> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
let task = snapshot
.get_task(task_id)
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?;
let evidence = task.github_evidence.as_ref().ok_or_else(|| {
ProtocolError::gate_rejected("pull request must be recorded before merge")
})?;
match fetch_merge_verification(repo, evidence.pr_number) {
Ok(Some(verification)) => {
if verification.state != "MERGED" {
return Err(ProtocolError::gate_rejected(format!(
"pull request #{} is not merged",
evidence.pr_number
)));
}
let merge_commit_sha = verification
.merge_commit
.as_ref()
.map(|commit| commit.oid.as_str())
.filter(|oid| is_valid_merge_sha(oid))
.ok_or_else(|| {
ProtocolError::gate_rejected(
"merged pull request is missing a valid merge commit sha",
)
})?;
self.apply_verified_merge(
task_id,
merge_commit_sha,
verification.merged_at,
actor,
node,
)
}
Ok(None) => {
let merge_commit_sha = evidence
.merge_commit_sha
.as_deref()
.filter(|sha| is_valid_merge_sha(sha))
.ok_or_else(|| {
ProtocolError::gate_rejected(
"gh CLI unavailable and no valid recorded merge sha found",
)
})?;
self.apply_verified_merge(
task_id,
merge_commit_sha,
evidence.merged_at,
actor,
node,
)
}
Err(err) => Err(ProtocolError::from(err)),
}
}
pub fn force_unlock(
&self,
task_id: &str,
reason: &str,
operator: &str,
) -> ProtocolResult<ExecutionTask> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
let had_lock = snapshot
.get_task(task_id)
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?
.lock
.is_some();
self.lock_manager
.release_lock(task_id)
.map_err(ProtocolError::from)?;
self.unblock_dependents_after_merge(task_id, actor, node)?;
let refreshed = self.snapshot_store.load().map_err(ProtocolError::from)?;
self.append_event(
task_id,
TaskEventType::AuditRecorded,
operator,
"manual",
refreshed.version + 1,
serde_json::json!({
"action": "force_unlock",
"reason": reason,
"operator": operator,
"had_lock": had_lock
}),
)?;
announce_escape_hatch("force_unlock", task_id, operator, reason);
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
Ok(snapshot.get_task(task_id).cloned().unwrap_or(merged))
snapshot
.get_task(task_id)
.cloned()
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))
}
pub fn manual_complete(
&self,
task_id: &str,
reason: &str,
operator: &str,
) -> ProtocolResult<ExecutionTask> {
self.lock_manager
.release_lock(task_id)
.map_err(ProtocolError::from)?;
let task = self.update_task(
task_id,
operator,
"manual",
TaskEventType::StateTransition,
|task| {
let from = task.current_state;
task.current_state = TaskState::Done;
Ok(serde_json::json!({
"from": from,
"to": TaskState::Done,
"reason": reason,
"operator": operator,
"manual": true
}))
},
)?;
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
self.append_event(
task_id,
TaskEventType::AuditRecorded,
operator,
"manual",
snapshot.version + 1,
serde_json::json!({
"action": "manual_complete",
"reason": reason,
"operator": operator
}),
)?;
announce_escape_hatch("manual_complete", task_id, operator, reason);
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
Ok(snapshot.get_task(task_id).cloned().unwrap_or(task))
}
pub fn locks(&self) -> ProtocolResult<HashMap<String, crate::store::FileLockEntry>> {
@ -758,6 +889,39 @@ impl DeterministicExecutionProtocol {
Ok(())
}
fn apply_verified_merge(
&self,
task_id: &str,
merge_commit_sha: &str,
merged_at: Option<chrono::DateTime<Utc>>,
actor: &str,
node: &str,
) -> ProtocolResult<ExecutionTask> {
let merged =
self.update_task(task_id, actor, node, TaskEventType::MergeVerified, |task| {
let mut evidence = task.github_evidence.clone().ok_or_else(|| {
ProtocolError::gate_rejected("pull request must be recorded before merge")
})?;
evidence.pr_state = GitHubPrState::Merged;
evidence.merge_commit_sha = Some(merge_commit_sha.to_string());
evidence.merged_at = Some(merged_at.unwrap_or_else(Utc::now));
evidence.review_decision = Some(ReviewDecision::Approved);
evidence.issue_state = GitHubIssueState::Closed;
evidence.issue_closed_by_pr = true;
task.github_evidence = Some(evidence);
task.current_state = TaskState::Merged;
Ok(serde_json::json!({ "merge_commit_sha": merge_commit_sha }))
})?;
self.lock_manager
.release_lock(task_id)
.map_err(ProtocolError::from)?;
self.unblock_dependents_after_merge(task_id, actor, node)?;
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
Ok(snapshot.get_task(task_id).cloned().unwrap_or(merged))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -840,6 +1004,20 @@ impl ProtocolError {
pub type ProtocolResult<T> = std::result::Result<T, ProtocolError>;
#[derive(Debug, Deserialize)]
struct GhPrViewResponse {
#[serde(rename = "mergeCommit")]
merge_commit: Option<GhMergeCommit>,
state: String,
#[serde(rename = "mergedAt")]
merged_at: Option<chrono::DateTime<Utc>>,
}
#[derive(Debug, Deserialize)]
struct GhMergeCommit {
oid: String,
}
fn dependencies_satisfied(task: &ExecutionTask, snapshot: &TasksSnapshot) -> bool {
task.dependencies.iter().all(|dep_id| {
snapshot
@ -905,6 +1083,61 @@ fn estimate_tokens(content: &str) -> usize {
}
}
fn is_valid_merge_sha(sha: &str) -> bool {
sha.len() == 40 && sha.chars().all(|c| c.is_ascii_hexdigit())
}
fn fetch_merge_verification(repo: &str, pr_number: u64) -> Result<Option<GhPrViewResponse>, Error> {
let output = match Command::new("gh")
.args([
"pr",
"view",
&pr_number.to_string(),
"--repo",
repo,
"--json",
"mergeCommit,state,mergedAt",
])
.output()
{
Ok(output) => output,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(Error::Io(err)),
};
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let detail = if stderr.is_empty() {
format!("gh pr view exited with status {}", output.status)
} else {
stderr
};
return Err(Error::Tool(detail));
}
Ok(Some(serde_json::from_slice(&output.stdout)?))
}
fn announce_escape_hatch(action: &str, task_id: &str, operator: &str, reason: &str) {
let mut message = format!("{action} {task_id} by {operator}: {reason}");
if message.chars().count() > ANNOUNCE_CHAR_LIMIT {
message = format!(
"{}...",
message
.chars()
.take(ANNOUNCE_CHAR_LIMIT - 3)
.collect::<String>()
);
}
let _ = Command::new(format!(
"{}/bin/announce",
std::env::var("HOME").unwrap_or_else(|_| "/Users/shunsukehayashi".to_string())
))
.args(["--mode", "macbook", &message])
.output();
}
fn read_file_snippet(path: &Path, max_lines: usize) -> Result<String, Error> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
@ -1082,6 +1315,11 @@ mod tests {
LOCK.get_or_init(|| Mutex::new(()))
}
fn path_env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
fn fixture() -> (TempDir, DeterministicExecutionProtocol) {
let tmp = TempDir::new().unwrap();
let protocol =
@ -1731,4 +1969,128 @@ mod tests {
assert!(matches!(err, ProtocolError::GateRejected(_)));
}
#[test]
fn verify_merge_with_valid_sha_passes_via_fallback() {
let _guard = path_env_lock().lock().unwrap();
let original_path = std::env::var_os("PATH");
std::env::set_var("PATH", "");
let (_tmp, protocol) = fixture();
protocol
.register(
RegisterTaskRequest {
issue: 1,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_pr("phase-a", 12, "codex", "macbook")
.unwrap();
let mut snapshot = protocol.snapshot_store.load().unwrap();
let version = snapshot.version;
let task = snapshot.get_task_mut("phase-a").unwrap();
let evidence = task.github_evidence.as_mut().unwrap();
evidence.merge_commit_sha = Some("0123456789abcdef0123456789abcdef01234567".into());
protocol.snapshot_store.save(&snapshot, version).unwrap();
let merged = protocol
.verify_merge("phase-a", "owner/repo", "codex", "macbook")
.unwrap();
assert_eq!(merged.current_state, TaskState::Merged);
assert_eq!(
merged
.github_evidence
.as_ref()
.and_then(|evidence| evidence.merge_commit_sha.as_deref()),
Some("0123456789abcdef0123456789abcdef01234567")
);
if let Some(path) = original_path {
std::env::set_var("PATH", path);
} else {
std::env::remove_var("PATH");
}
}
#[test]
fn force_unlock_releases_lock() {
let (_tmp, protocol) = fixture();
protocol
.register(
RegisterTaskRequest {
issue: 1,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_impact(
"phase-a",
ImpactInput {
risk_level: ImpactRiskLevel::Low,
affected_symbols: 1,
depth1: vec!["force_unlock".into()],
analyzed_commit: None,
input_hash: None,
approve: false,
},
"codex",
"macbook",
)
.unwrap();
protocol
.assign("phase-a", "codex", "macbook", &[String::from("src/a.rs")])
.unwrap();
let task = protocol
.force_unlock("phase-a", "operator override", "maintainer")
.unwrap();
assert!(task.lock.is_none());
assert!(protocol.locks().unwrap().is_empty());
}
#[test]
fn manual_complete_sets_state_to_done() {
let (_tmp, protocol) = fixture();
protocol
.register(
RegisterTaskRequest {
issue: 1,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
let task = protocol
.manual_complete("phase-a", "manual verification", "maintainer")
.unwrap();
assert_eq!(task.current_state, TaskState::Done);
}
}