diff --git a/Cargo.lock b/Cargo.lock index 1d976c5..00b6179 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -616,6 +616,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "futures" version = "0.3.31" @@ -1359,6 +1369,7 @@ dependencies = [ "async-trait", "chrono", "dirs", + "fs2", "futures", "git2", "glob", diff --git a/Cargo.toml b/Cargo.toml index c147a04..45a7914 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,3 +63,4 @@ uuid = { version = "1", features = ["v4", "serde"] } once_cell = "1" glob = "0.3" regex = "1" +fs2 = "0.4" diff --git a/crates/miyabi-core/Cargo.toml b/crates/miyabi-core/Cargo.toml index e3103bc..8ce8ad9 100644 --- a/crates/miyabi-core/Cargo.toml +++ b/crates/miyabi-core/Cargo.toml @@ -24,6 +24,7 @@ futures = { workspace = true } async-trait = { workspace = true } glob = { workspace = true } regex = { workspace = true } +fs2 = { workspace = true } git2 = "0.19" serde_yaml = "0.9" dirs = "5" diff --git a/crates/miyabi-core/src/gate.rs b/crates/miyabi-core/src/gate.rs new file mode 100644 index 0000000..1a05cd6 --- /dev/null +++ b/crates/miyabi-core/src/gate.rs @@ -0,0 +1,274 @@ +//! Deterministic gate evaluation for task execution. + +use crate::lock::LockConflict; +use crate::store::{ + CompletionMode, ExecutionTask, GitHubIssueState, GitHubPrState, ReviewDecision, TaskState, + TasksSnapshot, +}; +use serde::{Deserialize, Serialize}; +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Gate { + Gate0, + Gate1, + Gate2, + Gate3, + Gate4, + Gate5, + Gate6, + Gate7, + Gate8, +} + +impl Gate { + pub fn label(self) -> &'static str { + match self { + Gate::Gate0 => "gate_0", + Gate::Gate1 => "gate_1", + Gate::Gate2 => "gate_2", + Gate::Gate3 => "gate_3", + Gate::Gate4 => "gate_4", + Gate::Gate5 => "gate_5", + Gate::Gate6 => "gate_6", + Gate::Gate7 => "gate_7", + Gate::Gate8 => "gate_8", + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct GateContext { + pub lock_conflict: Option, +} + +impl Default for GateContext { + fn default() -> Self { + Self { + lock_conflict: None, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GateReport { + pub gate: Gate, + pub success: bool, + pub detail: String, + pub duration: Duration, +} + +pub fn evaluate_gate( + gate: Gate, + task: &ExecutionTask, + snapshot: &TasksSnapshot, + context: &GateContext, +) -> GateReport { + let start = Instant::now(); + let (success, detail) = match gate { + Gate::Gate0 => (true, "task declared".to_string()), + Gate::Gate1 => ( + matches!(task.current_state, TaskState::Draft | TaskState::Pending), + format!("state is {:?}", task.current_state), + ), + Gate::Gate2 => { + let blocked_by: Vec = task + .dependencies + .iter() + .filter_map(|dependency| { + snapshot + .get_task(dependency) + .filter(|dep| { + !matches!(dep.current_state, TaskState::Done | TaskState::Merged) + }) + .map(|_| dependency.clone()) + }) + .collect(); + ( + blocked_by.is_empty(), + if blocked_by.is_empty() { + "all hard dependencies resolved".to_string() + } else { + format!("blocked by dependencies: {}", blocked_by.join(", ")) + }, + ) + } + Gate::Gate3 => { + let has_impact = task.impact.is_some(); + let approval_ok = task.human_approval.as_ref().map_or(true, |approval| { + !approval.required || approval.approved_by.is_some() + }); + ( + has_impact && approval_ok, + if !has_impact { + "missing impact analysis".to_string() + } else if !approval_ok { + "human approval required".to_string() + } else { + "impact and approval satisfied".to_string() + }, + ) + } + Gate::Gate4 => { + let conflict = context + .lock_conflict + .as_ref() + .filter(|conflict| conflict.conflicting); + ( + conflict.is_none(), + conflict + .map(|conflict| { + format!( + "lock conflict held by {}", + conflict.held_by.as_deref().unwrap_or("unknown") + ) + }) + .unwrap_or_else(|| "lock window available".to_string()), + ) + } + Gate::Gate5 => ( + task.branch_name.is_some(), + task.branch_name + .clone() + .unwrap_or_else(|| "missing branch_name".to_string()), + ), + Gate::Gate6 => { + let ok = task.github_evidence.as_ref().map_or(false, |evidence| { + evidence.pr_number > 0 + && !evidence.pr_head_ref.is_empty() + && matches!( + evidence.review_decision, + Some(ReviewDecision::Approved | ReviewDecision::ReviewRequired) + ) + }); + ( + ok, + if ok { + "pull request evidence present".to_string() + } else { + "missing verified pull request evidence".to_string() + }, + ) + } + Gate::Gate7 => { + let ok = task.github_evidence.as_ref().map_or(false, |evidence| { + evidence.pr_state == GitHubPrState::Merged && evidence.merge_commit_sha.is_some() + }); + ( + ok, + if ok { + "merge verified".to_string() + } else { + "merge not verified".to_string() + }, + ) + } + Gate::Gate8 => { + let ok = match task.completion_mode { + CompletionMode::GithubPr => { + task.github_evidence.as_ref().map_or(false, |evidence| { + evidence.issue_state == GitHubIssueState::Closed + }) + } + CompletionMode::Manual | CompletionMode::ExternalOp => true, + }; + ( + ok, + if ok { + "completion evidence satisfied".to_string() + } else { + "issue still open".to_string() + }, + ) + } + }; + + GateReport { + gate, + success, + detail, + duration: start.elapsed(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::{ + CompletionMode, ExecutionTask, GitHubEvidence, ImpactRiskLevel, ReviewDecision, TaskImpact, + TaskState, TasksSnapshot, + }; + use chrono::Utc; + + fn task(id: &str) -> ExecutionTask { + let mut task = ExecutionTask::new(id, "gate test"); + task.current_state = TaskState::Pending; + task + } + + #[test] + fn gate_2_blocks_on_incomplete_dependency() { + let mut pending = task("child"); + pending.dependencies.push("parent".into()); + + let mut snapshot = TasksSnapshot::default(); + snapshot.upsert_task(pending.clone()); + let mut parent = task("parent"); + parent.current_state = TaskState::Implementing; + snapshot.upsert_task(parent); + + let report = evaluate_gate(Gate::Gate2, &pending, &snapshot, &GateContext::default()); + assert!(!report.success); + assert!(report.detail.contains("parent")); + } + + #[test] + fn gate_3_requires_impact_and_human_approval_when_flagged() { + let mut snapshot = TasksSnapshot::default(); + let mut gated = task("phase-a"); + gated.impact = Some(TaskImpact { + risk_level: ImpactRiskLevel::Low, + affected_symbols: 1, + depth1: vec!["create_orchestrator".into()], + analyzed_at: Utc::now(), + analyzed_commit: None, + input_hash: None, + }); + gated.human_approval = Some(crate::store::HumanApproval { + required: true, + approved_by: None, + approved_at: None, + reason: Some("touches orchestration".into()), + }); + snapshot.upsert_task(gated.clone()); + + let report = evaluate_gate(Gate::Gate3, &gated, &snapshot, &GateContext::default()); + assert!(!report.success); + assert_eq!(report.detail, "human approval required"); + } + + #[test] + fn merge_and_close_gates_pass_with_verified_evidence() { + let mut task = task("phase-a"); + task.current_state = TaskState::Reviewing; + task.completion_mode = CompletionMode::GithubPr; + task.branch_name = Some("feature/phase-a".into()); + task.github_evidence = Some(GitHubEvidence { + pr_number: 12, + pr_head_ref: "feature/phase-a".into(), + pr_state: GitHubPrState::Merged, + merge_commit_sha: Some("0123456789abcdef0123456789abcdef01234567".into()), + merged_at: Some(Utc::now()), + review_decision: Some(ReviewDecision::Approved), + issue_state: GitHubIssueState::Closed, + issue_closed_by_pr: true, + }); + let mut snapshot = TasksSnapshot::default(); + snapshot.upsert_task(task.clone()); + + assert!(evaluate_gate(Gate::Gate6, &task, &snapshot, &GateContext::default()).success); + assert!(evaluate_gate(Gate::Gate7, &task, &snapshot, &GateContext::default()).success); + assert!(evaluate_gate(Gate::Gate8, &task, &snapshot, &GateContext::default()).success); + } +} diff --git a/crates/miyabi-core/src/lib.rs b/crates/miyabi-core/src/lib.rs index bf535b0..703431b 100644 --- a/crates/miyabi-core/src/lib.rs +++ b/crates/miyabi-core/src/lib.rs @@ -11,18 +11,22 @@ pub mod dag; pub mod error; pub mod error_policy; pub mod feature_flags; +pub mod gate; pub mod git; pub mod github; pub mod github_tools; pub mod hooks; +pub mod lock; pub mod logger; pub mod mcp; pub mod orchestration; pub mod plugin; +pub mod protocol; pub mod retry; -pub mod streaming; pub mod rules; pub mod session; +pub mod store; +pub mod streaming; pub mod token; pub mod tool; pub mod tools; @@ -47,14 +51,14 @@ pub use anthropic::{ Tool as ApiTool, // Anthropic API tool definition format Usage, }; -pub use config::{ApiConfig, Config, SessionConfig, ToolConfig, UiConfig}; -pub use conversation::{ - Conversation, ConversationError, ConversationManager, ConversationMessage, ConversationMetadata, -}; pub use cache::{ create_api_cache, create_llm_cache, ApiCache, ApiCacheKey, CacheEntry, CacheStats, LLMCache, LLMCacheKey, TTLCache, }; +pub use config::{ApiConfig, Config, SessionConfig, ToolConfig, UiConfig}; +pub use conversation::{ + Conversation, ConversationError, ConversationManager, ConversationMessage, ConversationMetadata, +}; pub use error::Error; pub use error_policy::{CircuitBreaker, CircuitState, FallbackStrategy}; pub use feature_flags::{FeatureFlag, FeatureFlagManager}; @@ -63,8 +67,8 @@ pub use git::{ has_uncommitted_changes, is_in_git_repo, is_valid_repository, }; pub use github::{ - Comment, CreateIssueRequest, CreatePullRequestRequest, GitHubClient, Issue, Label, - PullRequest, User, + Comment, CreateIssueRequest, CreatePullRequestRequest, GitHubClient, Issue, Label, PullRequest, + User, }; pub use github_tools::{ create_github_tool_registry, AddCommentTool, AddLabelsTool, CreateIssueTool, @@ -72,6 +76,9 @@ pub use github_tools::{ }; pub use hooks::{Hook, HookAction, HookContext, HookEvent, HookManager, HookResult, HooksConfig}; pub use logger::{init_logger, init_logger_with_config, LogFormat, LogLevel, LoggerConfig}; +pub use mcp::{ + McpConfig, McpError, McpManager, McpRequest, McpResponse, McpServer, McpServerConfig, McpTool, +}; pub use plugin::{Plugin, PluginContext, PluginManager, PluginMetadata, PluginResult, PluginState}; pub use retry::{retry_with_backoff, RetryConfig as BackoffRetryConfig}; pub use rules::{AgentPreferences, MiyabiRules, Rule, RulesError, RulesLoader}; @@ -87,17 +94,14 @@ pub use workflow::{ FailurePolicy, StepCondition, StepResult, StepStatus, Workflow, WorkflowContext, WorkflowManager, WorkflowResult, WorkflowStatus, WorkflowStep, }; -pub use mcp::{ - McpConfig, McpError, McpManager, McpRequest, McpResponse, McpServer, McpServerConfig, McpTool, -}; pub mod openclaw; -pub use openclaw::{AgentInfo, OpenClawClient, OpenClawError, OpenClawResult}; pub use dag::{ DAGError, Task as DAGTask, TaskGraph, TaskGraphBuilder, TaskId, TaskLevel, TaskNode, }; +pub use openclaw::{AgentInfo, OpenClawClient, OpenClawError, OpenClawResult}; pub use orchestration::{ create_orchestrator, Orchestrator, OrchestratorTask, ParallelConfig, ParallelResult, TaskResult, }; pub use streaming::{ - create_streaming_agent, AgentStreamEvent, StreamingAgent, StreamingConfig, StreamProcessor, + create_streaming_agent, AgentStreamEvent, StreamProcessor, StreamingAgent, StreamingConfig, }; diff --git a/crates/miyabi-core/src/lock.rs b/crates/miyabi-core/src/lock.rs new file mode 100644 index 0000000..24a717a --- /dev/null +++ b/crates/miyabi-core/src/lock.rs @@ -0,0 +1,422 @@ +//! File lock manager for deterministic task execution. + +use crate::error::{Error, Result}; +use crate::store::{ + lease_expiry, EventStore, FileLockEntry, SnapshotStore, TaskEvent, TaskEventType, +}; +use chrono::{DateTime, Duration, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LeaseConfig { + pub lease_duration_sec: u64, + pub heartbeat_interval_sec: u64, + pub stale_after_missed_heartbeats: u64, +} + +impl Default for LeaseConfig { + fn default() -> Self { + Self { + lease_duration_sec: 300, + heartbeat_interval_sec: 60, + stale_after_missed_heartbeats: 2, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LockConflict { + pub conflicting: bool, + pub held_by: Option, + pub task_id: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LeaseSweep { + pub released: Vec, + pub active: Vec, +} + +#[derive(Debug, Clone)] +pub struct FileLockManager { + event_store: EventStore, + snapshot_store: SnapshotStore, + config: LeaseConfig, +} + +impl FileLockManager { + pub fn new( + event_store: EventStore, + snapshot_store: SnapshotStore, + config: LeaseConfig, + ) -> Self { + Self { + event_store, + snapshot_store, + config, + } + } + + pub fn acquire_lock( + &self, + task_id: &str, + agent: &str, + node: &str, + files: &[String], + ) -> Result<()> { + let mut snapshot = self.snapshot_store.load()?; + self.purge_stale_from_snapshot(&mut snapshot, Utc::now()); + + if snapshot.get_task(task_id).is_none() { + return Err(Error::Validation(format!("unknown task: {task_id}"))); + } + + let conflict = find_conflict(&snapshot.file_locks, task_id, files); + if let Some(entry) = conflict { + return Err(Error::Validation(format!( + "lock conflict: {} held by {}@{}", + entry.task_id, entry.agent, entry.node + ))); + } + + let now = Utc::now(); + let expires_at = self.compute_expires_at(now); + { + let task = snapshot + .get_task_mut(task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?; + task.lock = Some(crate::store::TaskLockSnapshot { + locked_by: format!("{agent}@{node}"), + locked_at: now, + lease_duration_sec: self.config.lease_duration_sec, + last_heartbeat: now, + affected_files: files.to_vec(), + }); + task.updated_at = now; + } + + for file in files { + snapshot.file_locks.insert( + file.clone(), + FileLockEntry { + task_id: task_id.to_string(), + agent: agent.to_string(), + node: node.to_string(), + expires_at, + }, + ); + } + + let expected_version = snapshot.version; + self.snapshot_store.save(&snapshot, expected_version)?; + self.event_store.append(&TaskEvent { + id: format!("{task_id}-lock-acquired-{}", now.timestamp_millis()), + ts: now, + event_type: TaskEventType::LockAcquired, + task_id: task_id.to_string(), + agent: agent.to_string(), + node: node.to_string(), + payload: serde_json::json!({ + "files": files, + "expires_at": expires_at, + "lease_duration_sec": self.config.lease_duration_sec + }), + version: expected_version + 1, + })?; + + Ok(()) + } + + pub fn renew_lease(&self, task_id: &str, agent: &str, node: &str) -> Result<()> { + let mut snapshot = self.snapshot_store.load()?; + let now = Utc::now(); + let expires_at = self.compute_expires_at(now); + let expected_version = snapshot.version; + let owner = format!("{agent}@{node}"); + let files = { + let task = snapshot + .get_task_mut(task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?; + let lock = task + .lock + .as_mut() + .ok_or_else(|| Error::Validation(format!("task {task_id} has no lock")))?; + + if lock.locked_by != owner { + return Err(Error::PermissionDenied(format!( + "lock owned by {}, not {owner}", + lock.locked_by + ))); + } + + lock.last_heartbeat = now; + task.updated_at = now; + lock.affected_files.clone() + }; + + for file in &files { + if let Some(entry) = snapshot.file_locks.get_mut(file) { + entry.expires_at = expires_at; + } + } + + self.snapshot_store.save(&snapshot, expected_version)?; + self.event_store.append(&TaskEvent { + id: format!("{task_id}-lock-heartbeat-{}", now.timestamp_millis()), + ts: now, + event_type: TaskEventType::LockHeartbeat, + task_id: task_id.to_string(), + agent: agent.to_string(), + node: node.to_string(), + payload: serde_json::json!({ "expires_at": expires_at }), + version: expected_version + 1, + })?; + + Ok(()) + } + + pub fn release_lock(&self, task_id: &str) -> Result<()> { + let mut snapshot = self.snapshot_store.load()?; + let now = Utc::now(); + let expected_version = snapshot.version; + let Some(files) = ({ + let task = snapshot + .get_task_mut(task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?; + + let files = task.lock.take().map(|lock| lock.affected_files); + task.updated_at = now; + files + }) else { + return Ok(()); + }; + + for file in files { + snapshot.file_locks.remove(&file); + } + + self.snapshot_store.save(&snapshot, expected_version)?; + self.event_store.append(&TaskEvent { + id: format!("{task_id}-lock-released-{}", now.timestamp_millis()), + ts: now, + event_type: TaskEventType::LockReleased, + task_id: task_id.to_string(), + agent: "system".into(), + node: "system".into(), + payload: serde_json::json!({}), + version: expected_version + 1, + })?; + + Ok(()) + } + + pub fn release_expired_leases(&self) -> Result { + let mut snapshot = self.snapshot_store.load()?; + let now = Utc::now(); + let mut released = Vec::new(); + let mut active = Vec::new(); + + for task in snapshot.tasks.iter_mut() { + let is_expired = task + .lock + .as_ref() + .map(|lock| self.is_expired(lock.last_heartbeat, now)) + .unwrap_or(false); + + if is_expired { + if let Some(lock) = task.lock.take() { + for file in lock.affected_files { + snapshot.file_locks.remove(&file); + } + released.push(task.id.clone()); + } + task.updated_at = now; + } else if task.lock.is_some() { + active.push(task.id.clone()); + } + } + + if !released.is_empty() { + let expected_version = snapshot.version; + self.snapshot_store.save(&snapshot, expected_version)?; + for task_id in &released { + self.event_store.append(&TaskEvent { + id: format!("{task_id}-lock-released-expired-{}", now.timestamp_millis()), + ts: now, + event_type: TaskEventType::LockReleased, + task_id: task_id.clone(), + agent: "system".into(), + node: "system".into(), + payload: serde_json::json!({ "forced": true, "reason": "lease_expired" }), + version: expected_version + 1, + })?; + } + } + + Ok(LeaseSweep { released, active }) + } + + pub fn has_conflict(&self, files: &[String]) -> Result { + let mut snapshot = self.snapshot_store.load()?; + self.purge_stale_from_snapshot(&mut snapshot, Utc::now()); + + if let Some(entry) = find_conflict(&snapshot.file_locks, "", files) { + return Ok(LockConflict { + conflicting: true, + held_by: Some(format!("{}@{}", entry.agent, entry.node)), + task_id: Some(entry.task_id.clone()), + }); + } + + Ok(LockConflict { + conflicting: false, + held_by: None, + task_id: None, + }) + } + + fn purge_stale_from_snapshot( + &self, + snapshot: &mut crate::store::TasksSnapshot, + now: DateTime, + ) { + let stale_task_ids: Vec = snapshot + .tasks + .iter() + .filter_map(|task| { + task.lock.as_ref().and_then(|lock| { + if self.is_expired(lock.last_heartbeat, now) { + Some(task.id.clone()) + } else { + None + } + }) + }) + .collect(); + + for task_id in stale_task_ids { + if let Some(task) = snapshot.get_task_mut(&task_id) { + if let Some(lock) = task.lock.take() { + for file in lock.affected_files { + snapshot.file_locks.remove(&file); + } + } + } + } + } + + fn is_expired(&self, last_heartbeat: DateTime, now: DateTime) -> bool { + let max_age = self.config.lease_duration_sec as i64 + + (self.config.heartbeat_interval_sec * self.config.stale_after_missed_heartbeats) + as i64; + now > last_heartbeat + Duration::seconds(max_age) + } + + fn compute_expires_at(&self, now: DateTime) -> DateTime { + lease_expiry( + now + Duration::seconds( + (self.config.heartbeat_interval_sec * self.config.stale_after_missed_heartbeats) + as i64, + ), + self.config.lease_duration_sec, + ) + } +} + +fn find_conflict<'a>( + file_locks: &'a std::collections::HashMap, + task_id: &str, + files: &[String], +) -> Option<&'a FileLockEntry> { + files.iter().find_map(|file| { + file_locks.get(file).and_then(|entry| { + if task_id.is_empty() || entry.task_id != task_id { + Some(entry) + } else { + None + } + }) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::{CompletionMode, ExecutionTask}; + use tempfile::TempDir; + + fn fixture() -> (TempDir, EventStore, SnapshotStore, FileLockManager) { + let tmp = TempDir::new().unwrap(); + let event_store = EventStore::new(tmp.path().join("events.jsonl")); + let snapshot_store = SnapshotStore::new( + tmp.path().join("tasks.snapshot.json"), + tmp.path().join(".tasks.lock"), + ); + let manager = FileLockManager::new( + event_store.clone(), + snapshot_store.clone(), + LeaseConfig { + lease_duration_sec: 1, + heartbeat_interval_sec: 1, + stale_after_missed_heartbeats: 0, + }, + ); + (tmp, event_store, snapshot_store, manager) + } + + fn seed_task(snapshot_store: &SnapshotStore, id: &str) { + let mut snapshot = snapshot_store.load().unwrap(); + let mut task = ExecutionTask::new(id, "Phase A"); + task.completion_mode = CompletionMode::GithubPr; + snapshot.upsert_task(task); + let version = snapshot.version; + snapshot_store.save(&snapshot, version).unwrap(); + } + + #[test] + fn acquires_detects_conflict_and_releases() { + let (_tmp, _events, snapshot_store, manager) = fixture(); + seed_task(&snapshot_store, "task-a"); + seed_task(&snapshot_store, "task-b"); + + manager + .acquire_lock("task-a", "codex", "macbook", &[String::from("src/lib.rs")]) + .unwrap(); + let conflict = manager.has_conflict(&[String::from("src/lib.rs")]).unwrap(); + assert!(conflict.conflicting); + + let err = manager + .acquire_lock("task-b", "codex", "macbook", &[String::from("src/lib.rs")]) + .unwrap_err(); + assert!(matches!(err, Error::Validation(_))); + + manager.release_lock("task-a").unwrap(); + let conflict = manager.has_conflict(&[String::from("src/lib.rs")]).unwrap(); + assert!(!conflict.conflicting); + } + + #[test] + fn renews_and_expires_leases() { + let (_tmp, _events, snapshot_store, manager) = fixture(); + seed_task(&snapshot_store, "task-a"); + + manager + .acquire_lock("task-a", "codex", "macbook", &[String::from("src/lib.rs")]) + .unwrap(); + manager.renew_lease("task-a", "codex", "macbook").unwrap(); + + { + let mut snapshot = snapshot_store.load().unwrap(); + let task = snapshot.get_task_mut("task-a").unwrap(); + let lock = task.lock.as_mut().unwrap(); + lock.last_heartbeat = Utc::now() - Duration::seconds(5); + let version = snapshot.version; + snapshot_store.save(&snapshot, version).unwrap(); + } + + let sweep = manager.release_expired_leases().unwrap(); + assert_eq!(sweep.released, vec!["task-a".to_string()]); + let snapshot = snapshot_store.load().unwrap(); + assert!(snapshot.get_task("task-a").unwrap().lock.is_none()); + } +} diff --git a/crates/miyabi-core/src/protocol.rs b/crates/miyabi-core/src/protocol.rs new file mode 100644 index 0000000..d73b0a9 --- /dev/null +++ b/crates/miyabi-core/src/protocol.rs @@ -0,0 +1,176 @@ +//! Deterministic execution protocol entry point. + +use crate::error::{Error, Result}; +use crate::gate::{evaluate_gate, Gate, GateContext, GateReport}; +use crate::lock::FileLockManager; +use crate::store::{EventStore, SnapshotStore, TaskEvent, TaskEventType}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone)] +pub struct DeterministicExecutionProtocol { + event_store: EventStore, + snapshot_store: SnapshotStore, + lock_manager: FileLockManager, +} + +impl DeterministicExecutionProtocol { + pub fn new( + event_store: EventStore, + snapshot_store: SnapshotStore, + lock_manager: FileLockManager, + ) -> Self { + Self { + event_store, + snapshot_store, + lock_manager, + } + } + + pub fn run( + &self, + task_id: &str, + gates: &[Gate], + actor: &str, + node: &str, + ) -> Result { + let start = Instant::now(); + let mut steps = Vec::new(); + let mut success = true; + + for gate in gates { + let snapshot = self.snapshot_store.load()?; + let task = snapshot + .get_task(task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?; + + let context = if matches!(gate, Gate::Gate4) { + let files = task + .lock + .as_ref() + .map(|lock| lock.affected_files.clone()) + .unwrap_or_default(); + GateContext { + lock_conflict: Some(self.lock_manager.has_conflict(&files)?), + } + } else { + GateContext::default() + }; + + let report = evaluate_gate(*gate, task, &snapshot, &context); + self.record_gate(task_id, *gate, &report, actor, node, snapshot.version + 1)?; + success &= report.success; + steps.push(report); + + if !success { + break; + } + } + + Ok(ProtocolReport { + task_id: task_id.to_string(), + steps, + total_duration: start.elapsed(), + success, + }) + } + + fn record_gate( + &self, + task_id: &str, + gate: Gate, + report: &GateReport, + actor: &str, + node: &str, + version: u64, + ) -> Result<()> { + let event_type = if report.success { + TaskEventType::GatePassed + } else { + TaskEventType::GateRejected + }; + self.event_store.append(&TaskEvent { + id: format!( + "{task_id}-{}-{}", + gate.label(), + Utc::now().timestamp_millis() + ), + ts: Utc::now(), + event_type, + task_id: task_id.to_string(), + agent: actor.to_string(), + node: node.to_string(), + payload: serde_json::json!({ + "gate": gate.label(), + "detail": report.detail, + "duration_ms": report.duration.as_millis(), + }), + version, + })?; + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ProtocolReport { + pub task_id: String, + pub steps: Vec, + pub total_duration: Duration, + pub success: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::lock::LeaseConfig; + use crate::store::{ExecutionTask, SnapshotStore, TasksSnapshot}; + use tempfile::TempDir; + + #[test] + fn protocol_stops_at_first_failed_gate_and_records_events() { + let tmp = TempDir::new().unwrap(); + let event_store = EventStore::new(tmp.path().join("events.jsonl")); + let snapshot_store = SnapshotStore::new( + tmp.path().join("tasks.snapshot.json"), + tmp.path().join(".tasks.lock"), + ); + let lock_manager = FileLockManager::new( + event_store.clone(), + snapshot_store.clone(), + LeaseConfig::default(), + ); + let protocol = DeterministicExecutionProtocol::new( + event_store.clone(), + snapshot_store.clone(), + lock_manager, + ); + + let mut snapshot = TasksSnapshot::default(); + let mut task = ExecutionTask::new("phase-a", "Phase A"); + task.current_state = crate::store::TaskState::Pending; + task.dependencies.push("phase-0".into()); + snapshot.upsert_task(task); + snapshot.upsert_task(ExecutionTask::new("phase-0", "Phase 0")); + snapshot_store.save(&snapshot, 0).unwrap(); + + let report = protocol + .run( + "phase-a", + &[Gate::Gate1, Gate::Gate2, Gate::Gate3], + "codex", + "macbook", + ) + .unwrap(); + + assert!(!report.success); + assert_eq!(report.steps.len(), 2); + assert!(report.steps[0].success); + assert!(!report.steps[1].success); + + let events = event_store.replay_for_task("phase-a").unwrap(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].event_type, TaskEventType::GatePassed); + assert_eq!(events[1].event_type, TaskEventType::GateRejected); + } +} diff --git a/crates/miyabi-core/src/store.rs b/crates/miyabi-core/src/store.rs new file mode 100644 index 0000000..a474b4a --- /dev/null +++ b/crates/miyabi-core/src/store.rs @@ -0,0 +1,584 @@ +//! Deterministic task protocol storage primitives. + +use crate::error::{Error, Result}; +use chrono::{DateTime, Duration, Utc}; +use fs2::FileExt; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TaskState { + Draft, + Pending, + Analyzing, + Implementing, + Reviewing, + Merged, + Deploying, + Done, + Blocked, + Failed, + Cancelled, + AwaitingGithubSync, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CompletionMode { + GithubPr, + Manual, + ExternalOp, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ImpactRiskLevel { + Low, + Medium, + High, + Critical, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TaskImpact { + pub risk_level: ImpactRiskLevel, + pub affected_symbols: usize, + pub depth1: Vec, + pub analyzed_at: DateTime, + pub analyzed_commit: Option, + pub input_hash: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct GitHubEvidence { + pub pr_number: u64, + pub pr_head_ref: String, + pub pr_state: GitHubPrState, + pub merge_commit_sha: Option, + pub merged_at: Option>, + pub review_decision: Option, + pub issue_state: GitHubIssueState, + pub issue_closed_by_pr: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum GitHubPrState { + Open, + Merged, + Closed, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ReviewDecision { + Approved, + ChangesRequested, + ReviewRequired, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum GitHubIssueState { + Open, + Closed, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct HumanApproval { + pub required: bool, + pub approved_by: Option, + pub approved_at: Option>, + pub reason: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TaskLockSnapshot { + pub locked_by: String, + pub locked_at: DateTime, + pub lease_duration_sec: u64, + pub last_heartbeat: DateTime, + pub affected_files: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ExecutionTask { + pub id: String, + pub title: String, + pub current_state: TaskState, + pub dependencies: Vec, + pub dependents: Vec, + pub soft_dependencies: Vec, + pub lock: Option, + pub impact: Option, + pub branch_name: Option, + pub github_evidence: Option, + pub completion_mode: CompletionMode, + pub human_approval: Option, + pub priority: u32, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +impl ExecutionTask { + pub fn new(id: impl Into, title: impl Into) -> Self { + let now = Utc::now(); + Self { + id: id.into(), + title: title.into(), + current_state: TaskState::Draft, + dependencies: Vec::new(), + dependents: Vec::new(), + soft_dependencies: Vec::new(), + lock: None, + impact: None, + branch_name: None, + github_evidence: None, + completion_mode: CompletionMode::GithubPr, + human_approval: None, + priority: 0, + created_at: now, + updated_at: now, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TaskEventType { + StateTransition, + LockAcquired, + LockReleased, + LockHeartbeat, + DagChanged, + GithubSynced, + GatePassed, + GateRejected, + HumanApproved, + ImpactRecorded, + BranchCreated, + PrCreated, + MergeVerified, + AuditRecorded, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct TaskEvent { + pub id: String, + pub ts: DateTime, + #[serde(rename = "type")] + pub event_type: TaskEventType, + pub task_id: String, + pub agent: String, + pub node: String, + pub payload: Value, + pub version: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FileLockEntry { + pub task_id: String, + pub agent: String, + pub node: String, + pub expires_at: DateTime, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct TasksSnapshot { + pub version: u64, + pub generated_at: DateTime, + pub generated_from_event_id: Option, + pub tasks: Vec, + pub file_locks: HashMap, +} + +impl Default for TasksSnapshot { + fn default() -> Self { + Self { + version: 0, + generated_at: Utc::now(), + generated_from_event_id: None, + tasks: Vec::new(), + file_locks: HashMap::new(), + } + } +} + +impl TasksSnapshot { + pub fn get_task(&self, task_id: &str) -> Option<&ExecutionTask> { + self.tasks.iter().find(|task| task.id == task_id) + } + + pub fn get_task_mut(&mut self, task_id: &str) -> Option<&mut ExecutionTask> { + self.tasks.iter_mut().find(|task| task.id == task_id) + } + + pub fn upsert_task(&mut self, task: ExecutionTask) { + if let Some(existing) = self.get_task_mut(&task.id) { + *existing = task; + } else { + self.tasks.push(task); + } + } +} + +#[derive(Debug, Clone)] +pub struct EventStore { + file_path: PathBuf, +} + +impl EventStore { + pub fn new(path: impl Into) -> Self { + Self { + file_path: path.into(), + } + } + + pub fn path(&self) -> &Path { + &self.file_path + } + + pub fn append(&self, event: &TaskEvent) -> Result<()> { + ensure_parent_dir(&self.file_path)?; + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.file_path)?; + serde_json::to_writer(&mut file, event)?; + writeln!(file)?; + Ok(()) + } + + pub fn replay(&self, since_id: Option<&str>) -> Result> { + if !self.file_path.exists() { + return Ok(Vec::new()); + } + + let reader = BufReader::new(File::open(&self.file_path)?); + let mut collecting = since_id.is_none(); + let mut events = Vec::new(); + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + let event: TaskEvent = serde_json::from_str(&line)?; + if !collecting { + if Some(event.id.as_str()) == since_id { + collecting = true; + } + continue; + } + events.push(event); + } + + Ok(events) + } + + pub fn replay_for_task(&self, task_id: &str) -> Result> { + Ok(self + .replay(None)? + .into_iter() + .filter(|event| event.task_id == task_id) + .collect()) + } +} + +#[derive(Debug, Clone)] +pub struct SnapshotStore { + file_path: PathBuf, + lock_file_path: PathBuf, +} + +impl SnapshotStore { + pub fn new(file_path: impl Into, lock_file_path: impl Into) -> Self { + Self { + file_path: file_path.into(), + lock_file_path: lock_file_path.into(), + } + } + + pub fn path(&self) -> &Path { + &self.file_path + } + + pub fn load(&self) -> Result { + if !self.file_path.exists() { + return Ok(TasksSnapshot::default()); + } + let raw = fs::read_to_string(&self.file_path)?; + Ok(serde_json::from_str(&raw)?) + } + + pub fn save(&self, snapshot: &TasksSnapshot, expected_version: u64) -> Result<()> { + ensure_parent_dir(&self.file_path)?; + ensure_parent_dir(&self.lock_file_path)?; + + let lock_file = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(&self.lock_file_path)?; + lock_file.lock_exclusive()?; + + let result = (|| -> Result<()> { + let current = self.load()?; + if current.version != expected_version { + return Err(Error::Validation(format!( + "CAS conflict: expected v{expected_version}, got v{}", + current.version + ))); + } + + let mut next = snapshot.clone(); + next.version = expected_version + 1; + next.generated_at = Utc::now(); + + let tmp_path = self.file_path.with_extension("json.tmp"); + fs::write(&tmp_path, serde_json::to_vec_pretty(&next)?)?; + fs::rename(&tmp_path, &self.file_path)?; + Ok(()) + })(); + + let unlock_result = lock_file.unlock(); + result?; + unlock_result?; + Ok(()) + } + + pub fn rebuild(&self, event_store: &EventStore) -> Result { + let mut snapshot = self.load()?; + + for event in event_store.replay(None)? { + snapshot.generated_from_event_id = Some(event.id.clone()); + apply_event(&mut snapshot, &event)?; + } + + Ok(snapshot) + } +} + +fn apply_event(snapshot: &mut TasksSnapshot, event: &TaskEvent) -> Result<()> { + match event.event_type { + TaskEventType::StateTransition => { + let to = event + .payload + .get("to") + .ok_or_else(|| Error::Validation("state_transition missing payload.to".into()))?; + let state: TaskState = serde_json::from_value(to.clone())?; + let task = snapshot + .get_task_mut(&event.task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {}", event.task_id)))?; + task.current_state = state; + task.updated_at = event.ts; + } + TaskEventType::LockAcquired => { + let files: Vec = serde_json::from_value( + event + .payload + .get("files") + .cloned() + .ok_or_else(|| Error::Validation("lock_acquired missing files".into()))?, + )?; + let expires_at: DateTime = + serde_json::from_value(event.payload.get("expires_at").cloned().ok_or_else( + || Error::Validation("lock_acquired missing expires_at".into()), + )?)?; + let lease_duration_sec = event + .payload + .get("lease_duration_sec") + .and_then(|v| v.as_u64()) + .unwrap_or(300); + + let task = snapshot + .get_task_mut(&event.task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {}", event.task_id)))?; + task.lock = Some(TaskLockSnapshot { + locked_by: format!("{}@{}", event.agent, event.node), + locked_at: event.ts, + lease_duration_sec, + last_heartbeat: event.ts, + affected_files: files.clone(), + }); + task.updated_at = event.ts; + + for file in files { + snapshot.file_locks.insert( + file, + FileLockEntry { + task_id: event.task_id.clone(), + agent: event.agent.clone(), + node: event.node.clone(), + expires_at, + }, + ); + } + } + TaskEventType::LockHeartbeat => { + let expires_at: DateTime = + serde_json::from_value(event.payload.get("expires_at").cloned().ok_or_else( + || Error::Validation("lock_heartbeat missing expires_at".into()), + )?)?; + let affected_files = { + let task = snapshot + .get_task_mut(&event.task_id) + .ok_or_else(|| Error::Validation(format!("unknown task: {}", event.task_id)))?; + let affected_files = if let Some(lock) = &mut task.lock { + lock.last_heartbeat = event.ts; + lock.affected_files.clone() + } else { + Vec::new() + }; + task.updated_at = event.ts; + affected_files + }; + + for file in affected_files { + if let Some(entry) = snapshot.file_locks.get_mut(&file) { + entry.expires_at = expires_at; + } + } + } + TaskEventType::LockReleased => { + if let Some(files_to_remove) = { + if let Some(task) = snapshot.get_task_mut(&event.task_id) { + let files = task.lock.take().map(|lock| lock.affected_files); + task.updated_at = event.ts; + files + } else { + None + } + } { + for file in files_to_remove { + snapshot.file_locks.remove(&file); + } + } + } + _ => {} + } + + snapshot.version = event.version; + Ok(()) +} + +fn ensure_parent_dir(path: &Path) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + Ok(()) +} + +pub fn lease_expiry(last_heartbeat: DateTime, lease_duration_sec: u64) -> DateTime { + last_heartbeat + Duration::seconds(lease_duration_sec as i64) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn sample_task(id: &str) -> ExecutionTask { + ExecutionTask::new(id, format!("Task {id}")) + } + + #[test] + fn event_store_appends_and_filters_by_task() { + let tmp = TempDir::new().unwrap(); + let store = EventStore::new(tmp.path().join("events.jsonl")); + + let event_a = TaskEvent { + id: "1".into(), + ts: Utc::now(), + event_type: TaskEventType::GatePassed, + task_id: "task-a".into(), + agent: "codex".into(), + node: "macbook".into(), + payload: serde_json::json!({"gate": "gate_0"}), + version: 1, + }; + let event_b = TaskEvent { + id: "2".into(), + task_id: "task-b".into(), + ..event_a.clone() + }; + + store.append(&event_a).unwrap(); + store.append(&event_b).unwrap(); + + let filtered = store.replay_for_task("task-a").unwrap(); + assert_eq!(filtered, vec![event_a]); + } + + #[test] + fn snapshot_store_enforces_cas() { + let tmp = TempDir::new().unwrap(); + let store = SnapshotStore::new( + tmp.path().join("tasks.snapshot.json"), + tmp.path().join(".tasks.lock"), + ); + + let mut snapshot = TasksSnapshot::default(); + snapshot.upsert_task(sample_task("task-a")); + store.save(&snapshot, 0).unwrap(); + + let err = store.save(&snapshot, 0).unwrap_err(); + assert!(matches!(err, Error::Validation(_))); + } + + #[test] + fn snapshot_rebuilds_from_lock_and_transition_events() { + let tmp = TempDir::new().unwrap(); + let event_store = EventStore::new(tmp.path().join("events.jsonl")); + let snapshot_store = SnapshotStore::new( + tmp.path().join("tasks.snapshot.json"), + tmp.path().join(".tasks.lock"), + ); + + let mut snapshot = TasksSnapshot::default(); + snapshot.upsert_task(sample_task("task-a")); + snapshot_store.save(&snapshot, 0).unwrap(); + + let base_ts = Utc::now(); + event_store + .append(&TaskEvent { + id: "1".into(), + ts: base_ts, + event_type: TaskEventType::StateTransition, + task_id: "task-a".into(), + agent: "codex".into(), + node: "macbook".into(), + payload: serde_json::json!({"to": "implementing"}), + version: 1, + }) + .unwrap(); + event_store + .append(&TaskEvent { + id: "2".into(), + ts: base_ts + Duration::seconds(1), + event_type: TaskEventType::LockAcquired, + task_id: "task-a".into(), + agent: "codex".into(), + node: "macbook".into(), + payload: serde_json::json!({ + "files": ["src/lib.rs"], + "expires_at": base_ts + Duration::seconds(301), + "lease_duration_sec": 300 + }), + version: 2, + }) + .unwrap(); + + let rebuilt = snapshot_store.rebuild(&event_store).unwrap(); + let task = rebuilt.get_task("task-a").unwrap(); + assert_eq!(task.current_state, TaskState::Implementing); + assert!(task.lock.is_some()); + assert!(rebuilt.file_locks.contains_key("src/lib.rs")); + } +}