feat: [日本語] DTP Phase A 基盤追加
This commit is contained in:
parent
1bfc0f9643
commit
986d907e5b
8 changed files with 1485 additions and 12 deletions
11
Cargo.lock
generated
11
Cargo.lock
generated
|
|
@ -616,6 +616,16 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs2"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.31"
|
||||
|
|
@ -1359,6 +1369,7 @@ dependencies = [
|
|||
"async-trait",
|
||||
"chrono",
|
||||
"dirs",
|
||||
"fs2",
|
||||
"futures",
|
||||
"git2",
|
||||
"glob",
|
||||
|
|
|
|||
|
|
@ -63,3 +63,4 @@ uuid = { version = "1", features = ["v4", "serde"] }
|
|||
once_cell = "1"
|
||||
glob = "0.3"
|
||||
regex = "1"
|
||||
fs2 = "0.4"
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ futures = { workspace = true }
|
|||
async-trait = { workspace = true }
|
||||
glob = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
fs2 = { workspace = true }
|
||||
git2 = "0.19"
|
||||
serde_yaml = "0.9"
|
||||
dirs = "5"
|
||||
|
|
|
|||
274
crates/miyabi-core/src/gate.rs
Normal file
274
crates/miyabi-core/src/gate.rs
Normal file
|
|
@ -0,0 +1,274 @@
|
|||
//! Deterministic gate evaluation for task execution.
|
||||
|
||||
use crate::lock::LockConflict;
|
||||
use crate::store::{
|
||||
CompletionMode, ExecutionTask, GitHubIssueState, GitHubPrState, ReviewDecision, TaskState,
|
||||
TasksSnapshot,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Gate {
|
||||
Gate0,
|
||||
Gate1,
|
||||
Gate2,
|
||||
Gate3,
|
||||
Gate4,
|
||||
Gate5,
|
||||
Gate6,
|
||||
Gate7,
|
||||
Gate8,
|
||||
}
|
||||
|
||||
impl Gate {
|
||||
pub fn label(self) -> &'static str {
|
||||
match self {
|
||||
Gate::Gate0 => "gate_0",
|
||||
Gate::Gate1 => "gate_1",
|
||||
Gate::Gate2 => "gate_2",
|
||||
Gate::Gate3 => "gate_3",
|
||||
Gate::Gate4 => "gate_4",
|
||||
Gate::Gate5 => "gate_5",
|
||||
Gate::Gate6 => "gate_6",
|
||||
Gate::Gate7 => "gate_7",
|
||||
Gate::Gate8 => "gate_8",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct GateContext {
|
||||
pub lock_conflict: Option<LockConflict>,
|
||||
}
|
||||
|
||||
impl Default for GateContext {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
lock_conflict: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct GateReport {
|
||||
pub gate: Gate,
|
||||
pub success: bool,
|
||||
pub detail: String,
|
||||
pub duration: Duration,
|
||||
}
|
||||
|
||||
pub fn evaluate_gate(
|
||||
gate: Gate,
|
||||
task: &ExecutionTask,
|
||||
snapshot: &TasksSnapshot,
|
||||
context: &GateContext,
|
||||
) -> GateReport {
|
||||
let start = Instant::now();
|
||||
let (success, detail) = match gate {
|
||||
Gate::Gate0 => (true, "task declared".to_string()),
|
||||
Gate::Gate1 => (
|
||||
matches!(task.current_state, TaskState::Draft | TaskState::Pending),
|
||||
format!("state is {:?}", task.current_state),
|
||||
),
|
||||
Gate::Gate2 => {
|
||||
let blocked_by: Vec<String> = task
|
||||
.dependencies
|
||||
.iter()
|
||||
.filter_map(|dependency| {
|
||||
snapshot
|
||||
.get_task(dependency)
|
||||
.filter(|dep| {
|
||||
!matches!(dep.current_state, TaskState::Done | TaskState::Merged)
|
||||
})
|
||||
.map(|_| dependency.clone())
|
||||
})
|
||||
.collect();
|
||||
(
|
||||
blocked_by.is_empty(),
|
||||
if blocked_by.is_empty() {
|
||||
"all hard dependencies resolved".to_string()
|
||||
} else {
|
||||
format!("blocked by dependencies: {}", blocked_by.join(", "))
|
||||
},
|
||||
)
|
||||
}
|
||||
Gate::Gate3 => {
|
||||
let has_impact = task.impact.is_some();
|
||||
let approval_ok = task.human_approval.as_ref().map_or(true, |approval| {
|
||||
!approval.required || approval.approved_by.is_some()
|
||||
});
|
||||
(
|
||||
has_impact && approval_ok,
|
||||
if !has_impact {
|
||||
"missing impact analysis".to_string()
|
||||
} else if !approval_ok {
|
||||
"human approval required".to_string()
|
||||
} else {
|
||||
"impact and approval satisfied".to_string()
|
||||
},
|
||||
)
|
||||
}
|
||||
Gate::Gate4 => {
|
||||
let conflict = context
|
||||
.lock_conflict
|
||||
.as_ref()
|
||||
.filter(|conflict| conflict.conflicting);
|
||||
(
|
||||
conflict.is_none(),
|
||||
conflict
|
||||
.map(|conflict| {
|
||||
format!(
|
||||
"lock conflict held by {}",
|
||||
conflict.held_by.as_deref().unwrap_or("unknown")
|
||||
)
|
||||
})
|
||||
.unwrap_or_else(|| "lock window available".to_string()),
|
||||
)
|
||||
}
|
||||
Gate::Gate5 => (
|
||||
task.branch_name.is_some(),
|
||||
task.branch_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| "missing branch_name".to_string()),
|
||||
),
|
||||
Gate::Gate6 => {
|
||||
let ok = task.github_evidence.as_ref().map_or(false, |evidence| {
|
||||
evidence.pr_number > 0
|
||||
&& !evidence.pr_head_ref.is_empty()
|
||||
&& matches!(
|
||||
evidence.review_decision,
|
||||
Some(ReviewDecision::Approved | ReviewDecision::ReviewRequired)
|
||||
)
|
||||
});
|
||||
(
|
||||
ok,
|
||||
if ok {
|
||||
"pull request evidence present".to_string()
|
||||
} else {
|
||||
"missing verified pull request evidence".to_string()
|
||||
},
|
||||
)
|
||||
}
|
||||
Gate::Gate7 => {
|
||||
let ok = task.github_evidence.as_ref().map_or(false, |evidence| {
|
||||
evidence.pr_state == GitHubPrState::Merged && evidence.merge_commit_sha.is_some()
|
||||
});
|
||||
(
|
||||
ok,
|
||||
if ok {
|
||||
"merge verified".to_string()
|
||||
} else {
|
||||
"merge not verified".to_string()
|
||||
},
|
||||
)
|
||||
}
|
||||
Gate::Gate8 => {
|
||||
let ok = match task.completion_mode {
|
||||
CompletionMode::GithubPr => {
|
||||
task.github_evidence.as_ref().map_or(false, |evidence| {
|
||||
evidence.issue_state == GitHubIssueState::Closed
|
||||
})
|
||||
}
|
||||
CompletionMode::Manual | CompletionMode::ExternalOp => true,
|
||||
};
|
||||
(
|
||||
ok,
|
||||
if ok {
|
||||
"completion evidence satisfied".to_string()
|
||||
} else {
|
||||
"issue still open".to_string()
|
||||
},
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
GateReport {
|
||||
gate,
|
||||
success,
|
||||
detail,
|
||||
duration: start.elapsed(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::store::{
|
||||
CompletionMode, ExecutionTask, GitHubEvidence, ImpactRiskLevel, ReviewDecision, TaskImpact,
|
||||
TaskState, TasksSnapshot,
|
||||
};
|
||||
use chrono::Utc;
|
||||
|
||||
fn task(id: &str) -> ExecutionTask {
|
||||
let mut task = ExecutionTask::new(id, "gate test");
|
||||
task.current_state = TaskState::Pending;
|
||||
task
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gate_2_blocks_on_incomplete_dependency() {
|
||||
let mut pending = task("child");
|
||||
pending.dependencies.push("parent".into());
|
||||
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
snapshot.upsert_task(pending.clone());
|
||||
let mut parent = task("parent");
|
||||
parent.current_state = TaskState::Implementing;
|
||||
snapshot.upsert_task(parent);
|
||||
|
||||
let report = evaluate_gate(Gate::Gate2, &pending, &snapshot, &GateContext::default());
|
||||
assert!(!report.success);
|
||||
assert!(report.detail.contains("parent"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gate_3_requires_impact_and_human_approval_when_flagged() {
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
let mut gated = task("phase-a");
|
||||
gated.impact = Some(TaskImpact {
|
||||
risk_level: ImpactRiskLevel::Low,
|
||||
affected_symbols: 1,
|
||||
depth1: vec!["create_orchestrator".into()],
|
||||
analyzed_at: Utc::now(),
|
||||
analyzed_commit: None,
|
||||
input_hash: None,
|
||||
});
|
||||
gated.human_approval = Some(crate::store::HumanApproval {
|
||||
required: true,
|
||||
approved_by: None,
|
||||
approved_at: None,
|
||||
reason: Some("touches orchestration".into()),
|
||||
});
|
||||
snapshot.upsert_task(gated.clone());
|
||||
|
||||
let report = evaluate_gate(Gate::Gate3, &gated, &snapshot, &GateContext::default());
|
||||
assert!(!report.success);
|
||||
assert_eq!(report.detail, "human approval required");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_and_close_gates_pass_with_verified_evidence() {
|
||||
let mut task = task("phase-a");
|
||||
task.current_state = TaskState::Reviewing;
|
||||
task.completion_mode = CompletionMode::GithubPr;
|
||||
task.branch_name = Some("feature/phase-a".into());
|
||||
task.github_evidence = Some(GitHubEvidence {
|
||||
pr_number: 12,
|
||||
pr_head_ref: "feature/phase-a".into(),
|
||||
pr_state: GitHubPrState::Merged,
|
||||
merge_commit_sha: Some("0123456789abcdef0123456789abcdef01234567".into()),
|
||||
merged_at: Some(Utc::now()),
|
||||
review_decision: Some(ReviewDecision::Approved),
|
||||
issue_state: GitHubIssueState::Closed,
|
||||
issue_closed_by_pr: true,
|
||||
});
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
snapshot.upsert_task(task.clone());
|
||||
|
||||
assert!(evaluate_gate(Gate::Gate6, &task, &snapshot, &GateContext::default()).success);
|
||||
assert!(evaluate_gate(Gate::Gate7, &task, &snapshot, &GateContext::default()).success);
|
||||
assert!(evaluate_gate(Gate::Gate8, &task, &snapshot, &GateContext::default()).success);
|
||||
}
|
||||
}
|
||||
|
|
@ -11,18 +11,22 @@ pub mod dag;
|
|||
pub mod error;
|
||||
pub mod error_policy;
|
||||
pub mod feature_flags;
|
||||
pub mod gate;
|
||||
pub mod git;
|
||||
pub mod github;
|
||||
pub mod github_tools;
|
||||
pub mod hooks;
|
||||
pub mod lock;
|
||||
pub mod logger;
|
||||
pub mod mcp;
|
||||
pub mod orchestration;
|
||||
pub mod plugin;
|
||||
pub mod protocol;
|
||||
pub mod retry;
|
||||
pub mod streaming;
|
||||
pub mod rules;
|
||||
pub mod session;
|
||||
pub mod store;
|
||||
pub mod streaming;
|
||||
pub mod token;
|
||||
pub mod tool;
|
||||
pub mod tools;
|
||||
|
|
@ -47,14 +51,14 @@ pub use anthropic::{
|
|||
Tool as ApiTool, // Anthropic API tool definition format
|
||||
Usage,
|
||||
};
|
||||
pub use config::{ApiConfig, Config, SessionConfig, ToolConfig, UiConfig};
|
||||
pub use conversation::{
|
||||
Conversation, ConversationError, ConversationManager, ConversationMessage, ConversationMetadata,
|
||||
};
|
||||
pub use cache::{
|
||||
create_api_cache, create_llm_cache, ApiCache, ApiCacheKey, CacheEntry, CacheStats, LLMCache,
|
||||
LLMCacheKey, TTLCache,
|
||||
};
|
||||
pub use config::{ApiConfig, Config, SessionConfig, ToolConfig, UiConfig};
|
||||
pub use conversation::{
|
||||
Conversation, ConversationError, ConversationManager, ConversationMessage, ConversationMetadata,
|
||||
};
|
||||
pub use error::Error;
|
||||
pub use error_policy::{CircuitBreaker, CircuitState, FallbackStrategy};
|
||||
pub use feature_flags::{FeatureFlag, FeatureFlagManager};
|
||||
|
|
@ -63,8 +67,8 @@ pub use git::{
|
|||
has_uncommitted_changes, is_in_git_repo, is_valid_repository,
|
||||
};
|
||||
pub use github::{
|
||||
Comment, CreateIssueRequest, CreatePullRequestRequest, GitHubClient, Issue, Label,
|
||||
PullRequest, User,
|
||||
Comment, CreateIssueRequest, CreatePullRequestRequest, GitHubClient, Issue, Label, PullRequest,
|
||||
User,
|
||||
};
|
||||
pub use github_tools::{
|
||||
create_github_tool_registry, AddCommentTool, AddLabelsTool, CreateIssueTool,
|
||||
|
|
@ -72,6 +76,9 @@ pub use github_tools::{
|
|||
};
|
||||
pub use hooks::{Hook, HookAction, HookContext, HookEvent, HookManager, HookResult, HooksConfig};
|
||||
pub use logger::{init_logger, init_logger_with_config, LogFormat, LogLevel, LoggerConfig};
|
||||
pub use mcp::{
|
||||
McpConfig, McpError, McpManager, McpRequest, McpResponse, McpServer, McpServerConfig, McpTool,
|
||||
};
|
||||
pub use plugin::{Plugin, PluginContext, PluginManager, PluginMetadata, PluginResult, PluginState};
|
||||
pub use retry::{retry_with_backoff, RetryConfig as BackoffRetryConfig};
|
||||
pub use rules::{AgentPreferences, MiyabiRules, Rule, RulesError, RulesLoader};
|
||||
|
|
@ -87,17 +94,14 @@ pub use workflow::{
|
|||
FailurePolicy, StepCondition, StepResult, StepStatus, Workflow, WorkflowContext,
|
||||
WorkflowManager, WorkflowResult, WorkflowStatus, WorkflowStep,
|
||||
};
|
||||
pub use mcp::{
|
||||
McpConfig, McpError, McpManager, McpRequest, McpResponse, McpServer, McpServerConfig, McpTool,
|
||||
};
|
||||
pub mod openclaw;
|
||||
pub use openclaw::{AgentInfo, OpenClawClient, OpenClawError, OpenClawResult};
|
||||
pub use dag::{
|
||||
DAGError, Task as DAGTask, TaskGraph, TaskGraphBuilder, TaskId, TaskLevel, TaskNode,
|
||||
};
|
||||
pub use openclaw::{AgentInfo, OpenClawClient, OpenClawError, OpenClawResult};
|
||||
pub use orchestration::{
|
||||
create_orchestrator, Orchestrator, OrchestratorTask, ParallelConfig, ParallelResult, TaskResult,
|
||||
};
|
||||
pub use streaming::{
|
||||
create_streaming_agent, AgentStreamEvent, StreamingAgent, StreamingConfig, StreamProcessor,
|
||||
create_streaming_agent, AgentStreamEvent, StreamProcessor, StreamingAgent, StreamingConfig,
|
||||
};
|
||||
|
|
|
|||
422
crates/miyabi-core/src/lock.rs
Normal file
422
crates/miyabi-core/src/lock.rs
Normal file
|
|
@ -0,0 +1,422 @@
|
|||
//! File lock manager for deterministic task execution.
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::store::{
|
||||
lease_expiry, EventStore, FileLockEntry, SnapshotStore, TaskEvent, TaskEventType,
|
||||
};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LeaseConfig {
|
||||
pub lease_duration_sec: u64,
|
||||
pub heartbeat_interval_sec: u64,
|
||||
pub stale_after_missed_heartbeats: u64,
|
||||
}
|
||||
|
||||
impl Default for LeaseConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
lease_duration_sec: 300,
|
||||
heartbeat_interval_sec: 60,
|
||||
stale_after_missed_heartbeats: 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LockConflict {
|
||||
pub conflicting: bool,
|
||||
pub held_by: Option<String>,
|
||||
pub task_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LeaseSweep {
|
||||
pub released: Vec<String>,
|
||||
pub active: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileLockManager {
|
||||
event_store: EventStore,
|
||||
snapshot_store: SnapshotStore,
|
||||
config: LeaseConfig,
|
||||
}
|
||||
|
||||
impl FileLockManager {
|
||||
pub fn new(
|
||||
event_store: EventStore,
|
||||
snapshot_store: SnapshotStore,
|
||||
config: LeaseConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
event_store,
|
||||
snapshot_store,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn acquire_lock(
|
||||
&self,
|
||||
task_id: &str,
|
||||
agent: &str,
|
||||
node: &str,
|
||||
files: &[String],
|
||||
) -> Result<()> {
|
||||
let mut snapshot = self.snapshot_store.load()?;
|
||||
self.purge_stale_from_snapshot(&mut snapshot, Utc::now());
|
||||
|
||||
if snapshot.get_task(task_id).is_none() {
|
||||
return Err(Error::Validation(format!("unknown task: {task_id}")));
|
||||
}
|
||||
|
||||
let conflict = find_conflict(&snapshot.file_locks, task_id, files);
|
||||
if let Some(entry) = conflict {
|
||||
return Err(Error::Validation(format!(
|
||||
"lock conflict: {} held by {}@{}",
|
||||
entry.task_id, entry.agent, entry.node
|
||||
)));
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
let expires_at = self.compute_expires_at(now);
|
||||
{
|
||||
let task = snapshot
|
||||
.get_task_mut(task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?;
|
||||
task.lock = Some(crate::store::TaskLockSnapshot {
|
||||
locked_by: format!("{agent}@{node}"),
|
||||
locked_at: now,
|
||||
lease_duration_sec: self.config.lease_duration_sec,
|
||||
last_heartbeat: now,
|
||||
affected_files: files.to_vec(),
|
||||
});
|
||||
task.updated_at = now;
|
||||
}
|
||||
|
||||
for file in files {
|
||||
snapshot.file_locks.insert(
|
||||
file.clone(),
|
||||
FileLockEntry {
|
||||
task_id: task_id.to_string(),
|
||||
agent: agent.to_string(),
|
||||
node: node.to_string(),
|
||||
expires_at,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let expected_version = snapshot.version;
|
||||
self.snapshot_store.save(&snapshot, expected_version)?;
|
||||
self.event_store.append(&TaskEvent {
|
||||
id: format!("{task_id}-lock-acquired-{}", now.timestamp_millis()),
|
||||
ts: now,
|
||||
event_type: TaskEventType::LockAcquired,
|
||||
task_id: task_id.to_string(),
|
||||
agent: agent.to_string(),
|
||||
node: node.to_string(),
|
||||
payload: serde_json::json!({
|
||||
"files": files,
|
||||
"expires_at": expires_at,
|
||||
"lease_duration_sec": self.config.lease_duration_sec
|
||||
}),
|
||||
version: expected_version + 1,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn renew_lease(&self, task_id: &str, agent: &str, node: &str) -> Result<()> {
|
||||
let mut snapshot = self.snapshot_store.load()?;
|
||||
let now = Utc::now();
|
||||
let expires_at = self.compute_expires_at(now);
|
||||
let expected_version = snapshot.version;
|
||||
let owner = format!("{agent}@{node}");
|
||||
let files = {
|
||||
let task = snapshot
|
||||
.get_task_mut(task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?;
|
||||
let lock = task
|
||||
.lock
|
||||
.as_mut()
|
||||
.ok_or_else(|| Error::Validation(format!("task {task_id} has no lock")))?;
|
||||
|
||||
if lock.locked_by != owner {
|
||||
return Err(Error::PermissionDenied(format!(
|
||||
"lock owned by {}, not {owner}",
|
||||
lock.locked_by
|
||||
)));
|
||||
}
|
||||
|
||||
lock.last_heartbeat = now;
|
||||
task.updated_at = now;
|
||||
lock.affected_files.clone()
|
||||
};
|
||||
|
||||
for file in &files {
|
||||
if let Some(entry) = snapshot.file_locks.get_mut(file) {
|
||||
entry.expires_at = expires_at;
|
||||
}
|
||||
}
|
||||
|
||||
self.snapshot_store.save(&snapshot, expected_version)?;
|
||||
self.event_store.append(&TaskEvent {
|
||||
id: format!("{task_id}-lock-heartbeat-{}", now.timestamp_millis()),
|
||||
ts: now,
|
||||
event_type: TaskEventType::LockHeartbeat,
|
||||
task_id: task_id.to_string(),
|
||||
agent: agent.to_string(),
|
||||
node: node.to_string(),
|
||||
payload: serde_json::json!({ "expires_at": expires_at }),
|
||||
version: expected_version + 1,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn release_lock(&self, task_id: &str) -> Result<()> {
|
||||
let mut snapshot = self.snapshot_store.load()?;
|
||||
let now = Utc::now();
|
||||
let expected_version = snapshot.version;
|
||||
let Some(files) = ({
|
||||
let task = snapshot
|
||||
.get_task_mut(task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?;
|
||||
|
||||
let files = task.lock.take().map(|lock| lock.affected_files);
|
||||
task.updated_at = now;
|
||||
files
|
||||
}) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
for file in files {
|
||||
snapshot.file_locks.remove(&file);
|
||||
}
|
||||
|
||||
self.snapshot_store.save(&snapshot, expected_version)?;
|
||||
self.event_store.append(&TaskEvent {
|
||||
id: format!("{task_id}-lock-released-{}", now.timestamp_millis()),
|
||||
ts: now,
|
||||
event_type: TaskEventType::LockReleased,
|
||||
task_id: task_id.to_string(),
|
||||
agent: "system".into(),
|
||||
node: "system".into(),
|
||||
payload: serde_json::json!({}),
|
||||
version: expected_version + 1,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn release_expired_leases(&self) -> Result<LeaseSweep> {
|
||||
let mut snapshot = self.snapshot_store.load()?;
|
||||
let now = Utc::now();
|
||||
let mut released = Vec::new();
|
||||
let mut active = Vec::new();
|
||||
|
||||
for task in snapshot.tasks.iter_mut() {
|
||||
let is_expired = task
|
||||
.lock
|
||||
.as_ref()
|
||||
.map(|lock| self.is_expired(lock.last_heartbeat, now))
|
||||
.unwrap_or(false);
|
||||
|
||||
if is_expired {
|
||||
if let Some(lock) = task.lock.take() {
|
||||
for file in lock.affected_files {
|
||||
snapshot.file_locks.remove(&file);
|
||||
}
|
||||
released.push(task.id.clone());
|
||||
}
|
||||
task.updated_at = now;
|
||||
} else if task.lock.is_some() {
|
||||
active.push(task.id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !released.is_empty() {
|
||||
let expected_version = snapshot.version;
|
||||
self.snapshot_store.save(&snapshot, expected_version)?;
|
||||
for task_id in &released {
|
||||
self.event_store.append(&TaskEvent {
|
||||
id: format!("{task_id}-lock-released-expired-{}", now.timestamp_millis()),
|
||||
ts: now,
|
||||
event_type: TaskEventType::LockReleased,
|
||||
task_id: task_id.clone(),
|
||||
agent: "system".into(),
|
||||
node: "system".into(),
|
||||
payload: serde_json::json!({ "forced": true, "reason": "lease_expired" }),
|
||||
version: expected_version + 1,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(LeaseSweep { released, active })
|
||||
}
|
||||
|
||||
pub fn has_conflict(&self, files: &[String]) -> Result<LockConflict> {
|
||||
let mut snapshot = self.snapshot_store.load()?;
|
||||
self.purge_stale_from_snapshot(&mut snapshot, Utc::now());
|
||||
|
||||
if let Some(entry) = find_conflict(&snapshot.file_locks, "", files) {
|
||||
return Ok(LockConflict {
|
||||
conflicting: true,
|
||||
held_by: Some(format!("{}@{}", entry.agent, entry.node)),
|
||||
task_id: Some(entry.task_id.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(LockConflict {
|
||||
conflicting: false,
|
||||
held_by: None,
|
||||
task_id: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn purge_stale_from_snapshot(
|
||||
&self,
|
||||
snapshot: &mut crate::store::TasksSnapshot,
|
||||
now: DateTime<Utc>,
|
||||
) {
|
||||
let stale_task_ids: Vec<String> = snapshot
|
||||
.tasks
|
||||
.iter()
|
||||
.filter_map(|task| {
|
||||
task.lock.as_ref().and_then(|lock| {
|
||||
if self.is_expired(lock.last_heartbeat, now) {
|
||||
Some(task.id.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
for task_id in stale_task_ids {
|
||||
if let Some(task) = snapshot.get_task_mut(&task_id) {
|
||||
if let Some(lock) = task.lock.take() {
|
||||
for file in lock.affected_files {
|
||||
snapshot.file_locks.remove(&file);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_expired(&self, last_heartbeat: DateTime<Utc>, now: DateTime<Utc>) -> bool {
|
||||
let max_age = self.config.lease_duration_sec as i64
|
||||
+ (self.config.heartbeat_interval_sec * self.config.stale_after_missed_heartbeats)
|
||||
as i64;
|
||||
now > last_heartbeat + Duration::seconds(max_age)
|
||||
}
|
||||
|
||||
fn compute_expires_at(&self, now: DateTime<Utc>) -> DateTime<Utc> {
|
||||
lease_expiry(
|
||||
now + Duration::seconds(
|
||||
(self.config.heartbeat_interval_sec * self.config.stale_after_missed_heartbeats)
|
||||
as i64,
|
||||
),
|
||||
self.config.lease_duration_sec,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_conflict<'a>(
|
||||
file_locks: &'a std::collections::HashMap<String, FileLockEntry>,
|
||||
task_id: &str,
|
||||
files: &[String],
|
||||
) -> Option<&'a FileLockEntry> {
|
||||
files.iter().find_map(|file| {
|
||||
file_locks.get(file).and_then(|entry| {
|
||||
if task_id.is_empty() || entry.task_id != task_id {
|
||||
Some(entry)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::store::{CompletionMode, ExecutionTask};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn fixture() -> (TempDir, EventStore, SnapshotStore, FileLockManager) {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let event_store = EventStore::new(tmp.path().join("events.jsonl"));
|
||||
let snapshot_store = SnapshotStore::new(
|
||||
tmp.path().join("tasks.snapshot.json"),
|
||||
tmp.path().join(".tasks.lock"),
|
||||
);
|
||||
let manager = FileLockManager::new(
|
||||
event_store.clone(),
|
||||
snapshot_store.clone(),
|
||||
LeaseConfig {
|
||||
lease_duration_sec: 1,
|
||||
heartbeat_interval_sec: 1,
|
||||
stale_after_missed_heartbeats: 0,
|
||||
},
|
||||
);
|
||||
(tmp, event_store, snapshot_store, manager)
|
||||
}
|
||||
|
||||
fn seed_task(snapshot_store: &SnapshotStore, id: &str) {
|
||||
let mut snapshot = snapshot_store.load().unwrap();
|
||||
let mut task = ExecutionTask::new(id, "Phase A");
|
||||
task.completion_mode = CompletionMode::GithubPr;
|
||||
snapshot.upsert_task(task);
|
||||
let version = snapshot.version;
|
||||
snapshot_store.save(&snapshot, version).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acquires_detects_conflict_and_releases() {
|
||||
let (_tmp, _events, snapshot_store, manager) = fixture();
|
||||
seed_task(&snapshot_store, "task-a");
|
||||
seed_task(&snapshot_store, "task-b");
|
||||
|
||||
manager
|
||||
.acquire_lock("task-a", "codex", "macbook", &[String::from("src/lib.rs")])
|
||||
.unwrap();
|
||||
let conflict = manager.has_conflict(&[String::from("src/lib.rs")]).unwrap();
|
||||
assert!(conflict.conflicting);
|
||||
|
||||
let err = manager
|
||||
.acquire_lock("task-b", "codex", "macbook", &[String::from("src/lib.rs")])
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::Validation(_)));
|
||||
|
||||
manager.release_lock("task-a").unwrap();
|
||||
let conflict = manager.has_conflict(&[String::from("src/lib.rs")]).unwrap();
|
||||
assert!(!conflict.conflicting);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn renews_and_expires_leases() {
|
||||
let (_tmp, _events, snapshot_store, manager) = fixture();
|
||||
seed_task(&snapshot_store, "task-a");
|
||||
|
||||
manager
|
||||
.acquire_lock("task-a", "codex", "macbook", &[String::from("src/lib.rs")])
|
||||
.unwrap();
|
||||
manager.renew_lease("task-a", "codex", "macbook").unwrap();
|
||||
|
||||
{
|
||||
let mut snapshot = snapshot_store.load().unwrap();
|
||||
let task = snapshot.get_task_mut("task-a").unwrap();
|
||||
let lock = task.lock.as_mut().unwrap();
|
||||
lock.last_heartbeat = Utc::now() - Duration::seconds(5);
|
||||
let version = snapshot.version;
|
||||
snapshot_store.save(&snapshot, version).unwrap();
|
||||
}
|
||||
|
||||
let sweep = manager.release_expired_leases().unwrap();
|
||||
assert_eq!(sweep.released, vec!["task-a".to_string()]);
|
||||
let snapshot = snapshot_store.load().unwrap();
|
||||
assert!(snapshot.get_task("task-a").unwrap().lock.is_none());
|
||||
}
|
||||
}
|
||||
176
crates/miyabi-core/src/protocol.rs
Normal file
176
crates/miyabi-core/src/protocol.rs
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
//! Deterministic execution protocol entry point.
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::gate::{evaluate_gate, Gate, GateContext, GateReport};
|
||||
use crate::lock::FileLockManager;
|
||||
use crate::store::{EventStore, SnapshotStore, TaskEvent, TaskEventType};
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DeterministicExecutionProtocol {
|
||||
event_store: EventStore,
|
||||
snapshot_store: SnapshotStore,
|
||||
lock_manager: FileLockManager,
|
||||
}
|
||||
|
||||
impl DeterministicExecutionProtocol {
|
||||
pub fn new(
|
||||
event_store: EventStore,
|
||||
snapshot_store: SnapshotStore,
|
||||
lock_manager: FileLockManager,
|
||||
) -> Self {
|
||||
Self {
|
||||
event_store,
|
||||
snapshot_store,
|
||||
lock_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(
|
||||
&self,
|
||||
task_id: &str,
|
||||
gates: &[Gate],
|
||||
actor: &str,
|
||||
node: &str,
|
||||
) -> Result<ProtocolReport> {
|
||||
let start = Instant::now();
|
||||
let mut steps = Vec::new();
|
||||
let mut success = true;
|
||||
|
||||
for gate in gates {
|
||||
let snapshot = self.snapshot_store.load()?;
|
||||
let task = snapshot
|
||||
.get_task(task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {task_id}")))?;
|
||||
|
||||
let context = if matches!(gate, Gate::Gate4) {
|
||||
let files = task
|
||||
.lock
|
||||
.as_ref()
|
||||
.map(|lock| lock.affected_files.clone())
|
||||
.unwrap_or_default();
|
||||
GateContext {
|
||||
lock_conflict: Some(self.lock_manager.has_conflict(&files)?),
|
||||
}
|
||||
} else {
|
||||
GateContext::default()
|
||||
};
|
||||
|
||||
let report = evaluate_gate(*gate, task, &snapshot, &context);
|
||||
self.record_gate(task_id, *gate, &report, actor, node, snapshot.version + 1)?;
|
||||
success &= report.success;
|
||||
steps.push(report);
|
||||
|
||||
if !success {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ProtocolReport {
|
||||
task_id: task_id.to_string(),
|
||||
steps,
|
||||
total_duration: start.elapsed(),
|
||||
success,
|
||||
})
|
||||
}
|
||||
|
||||
fn record_gate(
|
||||
&self,
|
||||
task_id: &str,
|
||||
gate: Gate,
|
||||
report: &GateReport,
|
||||
actor: &str,
|
||||
node: &str,
|
||||
version: u64,
|
||||
) -> Result<()> {
|
||||
let event_type = if report.success {
|
||||
TaskEventType::GatePassed
|
||||
} else {
|
||||
TaskEventType::GateRejected
|
||||
};
|
||||
self.event_store.append(&TaskEvent {
|
||||
id: format!(
|
||||
"{task_id}-{}-{}",
|
||||
gate.label(),
|
||||
Utc::now().timestamp_millis()
|
||||
),
|
||||
ts: Utc::now(),
|
||||
event_type,
|
||||
task_id: task_id.to_string(),
|
||||
agent: actor.to_string(),
|
||||
node: node.to_string(),
|
||||
payload: serde_json::json!({
|
||||
"gate": gate.label(),
|
||||
"detail": report.detail,
|
||||
"duration_ms": report.duration.as_millis(),
|
||||
}),
|
||||
version,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ProtocolReport {
|
||||
pub task_id: String,
|
||||
pub steps: Vec<GateReport>,
|
||||
pub total_duration: Duration,
|
||||
pub success: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::lock::LeaseConfig;
|
||||
use crate::store::{ExecutionTask, SnapshotStore, TasksSnapshot};
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn protocol_stops_at_first_failed_gate_and_records_events() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let event_store = EventStore::new(tmp.path().join("events.jsonl"));
|
||||
let snapshot_store = SnapshotStore::new(
|
||||
tmp.path().join("tasks.snapshot.json"),
|
||||
tmp.path().join(".tasks.lock"),
|
||||
);
|
||||
let lock_manager = FileLockManager::new(
|
||||
event_store.clone(),
|
||||
snapshot_store.clone(),
|
||||
LeaseConfig::default(),
|
||||
);
|
||||
let protocol = DeterministicExecutionProtocol::new(
|
||||
event_store.clone(),
|
||||
snapshot_store.clone(),
|
||||
lock_manager,
|
||||
);
|
||||
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
let mut task = ExecutionTask::new("phase-a", "Phase A");
|
||||
task.current_state = crate::store::TaskState::Pending;
|
||||
task.dependencies.push("phase-0".into());
|
||||
snapshot.upsert_task(task);
|
||||
snapshot.upsert_task(ExecutionTask::new("phase-0", "Phase 0"));
|
||||
snapshot_store.save(&snapshot, 0).unwrap();
|
||||
|
||||
let report = protocol
|
||||
.run(
|
||||
"phase-a",
|
||||
&[Gate::Gate1, Gate::Gate2, Gate::Gate3],
|
||||
"codex",
|
||||
"macbook",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(!report.success);
|
||||
assert_eq!(report.steps.len(), 2);
|
||||
assert!(report.steps[0].success);
|
||||
assert!(!report.steps[1].success);
|
||||
|
||||
let events = event_store.replay_for_task("phase-a").unwrap();
|
||||
assert_eq!(events.len(), 2);
|
||||
assert_eq!(events[0].event_type, TaskEventType::GatePassed);
|
||||
assert_eq!(events[1].event_type, TaskEventType::GateRejected);
|
||||
}
|
||||
}
|
||||
584
crates/miyabi-core/src/store.rs
Normal file
584
crates/miyabi-core/src/store.rs
Normal file
|
|
@ -0,0 +1,584 @@
|
|||
//! Deterministic task protocol storage primitives.
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use fs2::FileExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TaskState {
|
||||
Draft,
|
||||
Pending,
|
||||
Analyzing,
|
||||
Implementing,
|
||||
Reviewing,
|
||||
Merged,
|
||||
Deploying,
|
||||
Done,
|
||||
Blocked,
|
||||
Failed,
|
||||
Cancelled,
|
||||
AwaitingGithubSync,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompletionMode {
|
||||
GithubPr,
|
||||
Manual,
|
||||
ExternalOp,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
|
||||
pub enum ImpactRiskLevel {
|
||||
Low,
|
||||
Medium,
|
||||
High,
|
||||
Critical,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TaskImpact {
|
||||
pub risk_level: ImpactRiskLevel,
|
||||
pub affected_symbols: usize,
|
||||
pub depth1: Vec<String>,
|
||||
pub analyzed_at: DateTime<Utc>,
|
||||
pub analyzed_commit: Option<String>,
|
||||
pub input_hash: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct GitHubEvidence {
|
||||
pub pr_number: u64,
|
||||
pub pr_head_ref: String,
|
||||
pub pr_state: GitHubPrState,
|
||||
pub merge_commit_sha: Option<String>,
|
||||
pub merged_at: Option<DateTime<Utc>>,
|
||||
pub review_decision: Option<ReviewDecision>,
|
||||
pub issue_state: GitHubIssueState,
|
||||
pub issue_closed_by_pr: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
|
||||
pub enum GitHubPrState {
|
||||
Open,
|
||||
Merged,
|
||||
Closed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
|
||||
pub enum ReviewDecision {
|
||||
Approved,
|
||||
ChangesRequested,
|
||||
ReviewRequired,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
|
||||
pub enum GitHubIssueState {
|
||||
Open,
|
||||
Closed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct HumanApproval {
|
||||
pub required: bool,
|
||||
pub approved_by: Option<String>,
|
||||
pub approved_at: Option<DateTime<Utc>>,
|
||||
pub reason: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TaskLockSnapshot {
|
||||
pub locked_by: String,
|
||||
pub locked_at: DateTime<Utc>,
|
||||
pub lease_duration_sec: u64,
|
||||
pub last_heartbeat: DateTime<Utc>,
|
||||
pub affected_files: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ExecutionTask {
|
||||
pub id: String,
|
||||
pub title: String,
|
||||
pub current_state: TaskState,
|
||||
pub dependencies: Vec<String>,
|
||||
pub dependents: Vec<String>,
|
||||
pub soft_dependencies: Vec<String>,
|
||||
pub lock: Option<TaskLockSnapshot>,
|
||||
pub impact: Option<TaskImpact>,
|
||||
pub branch_name: Option<String>,
|
||||
pub github_evidence: Option<GitHubEvidence>,
|
||||
pub completion_mode: CompletionMode,
|
||||
pub human_approval: Option<HumanApproval>,
|
||||
pub priority: u32,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl ExecutionTask {
|
||||
pub fn new(id: impl Into<String>, title: impl Into<String>) -> Self {
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
id: id.into(),
|
||||
title: title.into(),
|
||||
current_state: TaskState::Draft,
|
||||
dependencies: Vec::new(),
|
||||
dependents: Vec::new(),
|
||||
soft_dependencies: Vec::new(),
|
||||
lock: None,
|
||||
impact: None,
|
||||
branch_name: None,
|
||||
github_evidence: None,
|
||||
completion_mode: CompletionMode::GithubPr,
|
||||
human_approval: None,
|
||||
priority: 0,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TaskEventType {
|
||||
StateTransition,
|
||||
LockAcquired,
|
||||
LockReleased,
|
||||
LockHeartbeat,
|
||||
DagChanged,
|
||||
GithubSynced,
|
||||
GatePassed,
|
||||
GateRejected,
|
||||
HumanApproved,
|
||||
ImpactRecorded,
|
||||
BranchCreated,
|
||||
PrCreated,
|
||||
MergeVerified,
|
||||
AuditRecorded,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct TaskEvent {
|
||||
pub id: String,
|
||||
pub ts: DateTime<Utc>,
|
||||
#[serde(rename = "type")]
|
||||
pub event_type: TaskEventType,
|
||||
pub task_id: String,
|
||||
pub agent: String,
|
||||
pub node: String,
|
||||
pub payload: Value,
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct FileLockEntry {
|
||||
pub task_id: String,
|
||||
pub agent: String,
|
||||
pub node: String,
|
||||
pub expires_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct TasksSnapshot {
|
||||
pub version: u64,
|
||||
pub generated_at: DateTime<Utc>,
|
||||
pub generated_from_event_id: Option<String>,
|
||||
pub tasks: Vec<ExecutionTask>,
|
||||
pub file_locks: HashMap<String, FileLockEntry>,
|
||||
}
|
||||
|
||||
impl Default for TasksSnapshot {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
version: 0,
|
||||
generated_at: Utc::now(),
|
||||
generated_from_event_id: None,
|
||||
tasks: Vec::new(),
|
||||
file_locks: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TasksSnapshot {
|
||||
pub fn get_task(&self, task_id: &str) -> Option<&ExecutionTask> {
|
||||
self.tasks.iter().find(|task| task.id == task_id)
|
||||
}
|
||||
|
||||
pub fn get_task_mut(&mut self, task_id: &str) -> Option<&mut ExecutionTask> {
|
||||
self.tasks.iter_mut().find(|task| task.id == task_id)
|
||||
}
|
||||
|
||||
pub fn upsert_task(&mut self, task: ExecutionTask) {
|
||||
if let Some(existing) = self.get_task_mut(&task.id) {
|
||||
*existing = task;
|
||||
} else {
|
||||
self.tasks.push(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EventStore {
|
||||
file_path: PathBuf,
|
||||
}
|
||||
|
||||
impl EventStore {
|
||||
pub fn new(path: impl Into<PathBuf>) -> Self {
|
||||
Self {
|
||||
file_path: path.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.file_path
|
||||
}
|
||||
|
||||
pub fn append(&self, event: &TaskEvent) -> Result<()> {
|
||||
ensure_parent_dir(&self.file_path)?;
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&self.file_path)?;
|
||||
serde_json::to_writer(&mut file, event)?;
|
||||
writeln!(file)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn replay(&self, since_id: Option<&str>) -> Result<Vec<TaskEvent>> {
|
||||
if !self.file_path.exists() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let reader = BufReader::new(File::open(&self.file_path)?);
|
||||
let mut collecting = since_id.is_none();
|
||||
let mut events = Vec::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let event: TaskEvent = serde_json::from_str(&line)?;
|
||||
if !collecting {
|
||||
if Some(event.id.as_str()) == since_id {
|
||||
collecting = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
pub fn replay_for_task(&self, task_id: &str) -> Result<Vec<TaskEvent>> {
|
||||
Ok(self
|
||||
.replay(None)?
|
||||
.into_iter()
|
||||
.filter(|event| event.task_id == task_id)
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SnapshotStore {
|
||||
file_path: PathBuf,
|
||||
lock_file_path: PathBuf,
|
||||
}
|
||||
|
||||
impl SnapshotStore {
|
||||
pub fn new(file_path: impl Into<PathBuf>, lock_file_path: impl Into<PathBuf>) -> Self {
|
||||
Self {
|
||||
file_path: file_path.into(),
|
||||
lock_file_path: lock_file_path.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.file_path
|
||||
}
|
||||
|
||||
pub fn load(&self) -> Result<TasksSnapshot> {
|
||||
if !self.file_path.exists() {
|
||||
return Ok(TasksSnapshot::default());
|
||||
}
|
||||
let raw = fs::read_to_string(&self.file_path)?;
|
||||
Ok(serde_json::from_str(&raw)?)
|
||||
}
|
||||
|
||||
pub fn save(&self, snapshot: &TasksSnapshot, expected_version: u64) -> Result<()> {
|
||||
ensure_parent_dir(&self.file_path)?;
|
||||
ensure_parent_dir(&self.lock_file_path)?;
|
||||
|
||||
let lock_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.truncate(false)
|
||||
.open(&self.lock_file_path)?;
|
||||
lock_file.lock_exclusive()?;
|
||||
|
||||
let result = (|| -> Result<()> {
|
||||
let current = self.load()?;
|
||||
if current.version != expected_version {
|
||||
return Err(Error::Validation(format!(
|
||||
"CAS conflict: expected v{expected_version}, got v{}",
|
||||
current.version
|
||||
)));
|
||||
}
|
||||
|
||||
let mut next = snapshot.clone();
|
||||
next.version = expected_version + 1;
|
||||
next.generated_at = Utc::now();
|
||||
|
||||
let tmp_path = self.file_path.with_extension("json.tmp");
|
||||
fs::write(&tmp_path, serde_json::to_vec_pretty(&next)?)?;
|
||||
fs::rename(&tmp_path, &self.file_path)?;
|
||||
Ok(())
|
||||
})();
|
||||
|
||||
let unlock_result = lock_file.unlock();
|
||||
result?;
|
||||
unlock_result?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn rebuild(&self, event_store: &EventStore) -> Result<TasksSnapshot> {
|
||||
let mut snapshot = self.load()?;
|
||||
|
||||
for event in event_store.replay(None)? {
|
||||
snapshot.generated_from_event_id = Some(event.id.clone());
|
||||
apply_event(&mut snapshot, &event)?;
|
||||
}
|
||||
|
||||
Ok(snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_event(snapshot: &mut TasksSnapshot, event: &TaskEvent) -> Result<()> {
|
||||
match event.event_type {
|
||||
TaskEventType::StateTransition => {
|
||||
let to = event
|
||||
.payload
|
||||
.get("to")
|
||||
.ok_or_else(|| Error::Validation("state_transition missing payload.to".into()))?;
|
||||
let state: TaskState = serde_json::from_value(to.clone())?;
|
||||
let task = snapshot
|
||||
.get_task_mut(&event.task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {}", event.task_id)))?;
|
||||
task.current_state = state;
|
||||
task.updated_at = event.ts;
|
||||
}
|
||||
TaskEventType::LockAcquired => {
|
||||
let files: Vec<String> = serde_json::from_value(
|
||||
event
|
||||
.payload
|
||||
.get("files")
|
||||
.cloned()
|
||||
.ok_or_else(|| Error::Validation("lock_acquired missing files".into()))?,
|
||||
)?;
|
||||
let expires_at: DateTime<Utc> =
|
||||
serde_json::from_value(event.payload.get("expires_at").cloned().ok_or_else(
|
||||
|| Error::Validation("lock_acquired missing expires_at".into()),
|
||||
)?)?;
|
||||
let lease_duration_sec = event
|
||||
.payload
|
||||
.get("lease_duration_sec")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(300);
|
||||
|
||||
let task = snapshot
|
||||
.get_task_mut(&event.task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {}", event.task_id)))?;
|
||||
task.lock = Some(TaskLockSnapshot {
|
||||
locked_by: format!("{}@{}", event.agent, event.node),
|
||||
locked_at: event.ts,
|
||||
lease_duration_sec,
|
||||
last_heartbeat: event.ts,
|
||||
affected_files: files.clone(),
|
||||
});
|
||||
task.updated_at = event.ts;
|
||||
|
||||
for file in files {
|
||||
snapshot.file_locks.insert(
|
||||
file,
|
||||
FileLockEntry {
|
||||
task_id: event.task_id.clone(),
|
||||
agent: event.agent.clone(),
|
||||
node: event.node.clone(),
|
||||
expires_at,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
TaskEventType::LockHeartbeat => {
|
||||
let expires_at: DateTime<Utc> =
|
||||
serde_json::from_value(event.payload.get("expires_at").cloned().ok_or_else(
|
||||
|| Error::Validation("lock_heartbeat missing expires_at".into()),
|
||||
)?)?;
|
||||
let affected_files = {
|
||||
let task = snapshot
|
||||
.get_task_mut(&event.task_id)
|
||||
.ok_or_else(|| Error::Validation(format!("unknown task: {}", event.task_id)))?;
|
||||
let affected_files = if let Some(lock) = &mut task.lock {
|
||||
lock.last_heartbeat = event.ts;
|
||||
lock.affected_files.clone()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
task.updated_at = event.ts;
|
||||
affected_files
|
||||
};
|
||||
|
||||
for file in affected_files {
|
||||
if let Some(entry) = snapshot.file_locks.get_mut(&file) {
|
||||
entry.expires_at = expires_at;
|
||||
}
|
||||
}
|
||||
}
|
||||
TaskEventType::LockReleased => {
|
||||
if let Some(files_to_remove) = {
|
||||
if let Some(task) = snapshot.get_task_mut(&event.task_id) {
|
||||
let files = task.lock.take().map(|lock| lock.affected_files);
|
||||
task.updated_at = event.ts;
|
||||
files
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} {
|
||||
for file in files_to_remove {
|
||||
snapshot.file_locks.remove(&file);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
snapshot.version = event.version;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_parent_dir(path: &Path) -> Result<()> {
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn lease_expiry(last_heartbeat: DateTime<Utc>, lease_duration_sec: u64) -> DateTime<Utc> {
|
||||
last_heartbeat + Duration::seconds(lease_duration_sec as i64)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn sample_task(id: &str) -> ExecutionTask {
|
||||
ExecutionTask::new(id, format!("Task {id}"))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_store_appends_and_filters_by_task() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = EventStore::new(tmp.path().join("events.jsonl"));
|
||||
|
||||
let event_a = TaskEvent {
|
||||
id: "1".into(),
|
||||
ts: Utc::now(),
|
||||
event_type: TaskEventType::GatePassed,
|
||||
task_id: "task-a".into(),
|
||||
agent: "codex".into(),
|
||||
node: "macbook".into(),
|
||||
payload: serde_json::json!({"gate": "gate_0"}),
|
||||
version: 1,
|
||||
};
|
||||
let event_b = TaskEvent {
|
||||
id: "2".into(),
|
||||
task_id: "task-b".into(),
|
||||
..event_a.clone()
|
||||
};
|
||||
|
||||
store.append(&event_a).unwrap();
|
||||
store.append(&event_b).unwrap();
|
||||
|
||||
let filtered = store.replay_for_task("task-a").unwrap();
|
||||
assert_eq!(filtered, vec![event_a]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_store_enforces_cas() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SnapshotStore::new(
|
||||
tmp.path().join("tasks.snapshot.json"),
|
||||
tmp.path().join(".tasks.lock"),
|
||||
);
|
||||
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
snapshot.upsert_task(sample_task("task-a"));
|
||||
store.save(&snapshot, 0).unwrap();
|
||||
|
||||
let err = store.save(&snapshot, 0).unwrap_err();
|
||||
assert!(matches!(err, Error::Validation(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_rebuilds_from_lock_and_transition_events() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let event_store = EventStore::new(tmp.path().join("events.jsonl"));
|
||||
let snapshot_store = SnapshotStore::new(
|
||||
tmp.path().join("tasks.snapshot.json"),
|
||||
tmp.path().join(".tasks.lock"),
|
||||
);
|
||||
|
||||
let mut snapshot = TasksSnapshot::default();
|
||||
snapshot.upsert_task(sample_task("task-a"));
|
||||
snapshot_store.save(&snapshot, 0).unwrap();
|
||||
|
||||
let base_ts = Utc::now();
|
||||
event_store
|
||||
.append(&TaskEvent {
|
||||
id: "1".into(),
|
||||
ts: base_ts,
|
||||
event_type: TaskEventType::StateTransition,
|
||||
task_id: "task-a".into(),
|
||||
agent: "codex".into(),
|
||||
node: "macbook".into(),
|
||||
payload: serde_json::json!({"to": "implementing"}),
|
||||
version: 1,
|
||||
})
|
||||
.unwrap();
|
||||
event_store
|
||||
.append(&TaskEvent {
|
||||
id: "2".into(),
|
||||
ts: base_ts + Duration::seconds(1),
|
||||
event_type: TaskEventType::LockAcquired,
|
||||
task_id: "task-a".into(),
|
||||
agent: "codex".into(),
|
||||
node: "macbook".into(),
|
||||
payload: serde_json::json!({
|
||||
"files": ["src/lib.rs"],
|
||||
"expires_at": base_ts + Duration::seconds(301),
|
||||
"lease_duration_sec": 300
|
||||
}),
|
||||
version: 2,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let rebuilt = snapshot_store.rebuild(&event_store).unwrap();
|
||||
let task = rebuilt.get_task("task-a").unwrap();
|
||||
assert_eq!(task.current_state, TaskState::Implementing);
|
||||
assert!(task.lock.is_some());
|
||||
assert!(rebuilt.file_locks.contains_key("src/lib.rs"));
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue