From 70cd9067763dc49ca51c2e15535a1b98dc01ab6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=20=E9=A7=BF=E7=94=AB=20=28Shunsuke=20Hayashi=29?= Date: Fri, 10 Apr 2026 07:29:03 +0900 Subject: [PATCH] =?UTF-8?q?[=E8=BF=BD=E5=8A=A0]=20=E3=83=89=E3=83=AA?= =?UTF-8?q?=E3=83=BC=E3=83=9F=E3=83=B3=E3=82=B0:=20event=20log=20=E2=86=92?= =?UTF-8?q?=20=E5=AD=A6=E3=81=B3=E6=8A=BD=E5=87=BA=20(#59)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + crates/miyabi-cli/Cargo.toml | 3 + crates/miyabi-cli/src/main.rs | 145 ++++++++++++ crates/miyabi-core/src/dream.rs | 335 ++++++++++++++++++++++++++ crates/miyabi-core/src/lib.rs | 2 + crates/miyabi-core/src/protocol.rs | 364 ++++++++++++++++++++++++++++- crates/miyabi-core/src/store.rs | 50 ++++ 7 files changed, 897 insertions(+), 3 deletions(-) create mode 100644 crates/miyabi-core/src/dream.rs diff --git a/Cargo.lock b/Cargo.lock index 00b6179..fc320e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1350,6 +1350,7 @@ name = "miyabi-cli" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap", "crossterm 0.29.0", "miyabi-core", diff --git a/crates/miyabi-cli/Cargo.toml b/crates/miyabi-cli/Cargo.toml index 9f75826..40bbebc 100644 --- a/crates/miyabi-cli/Cargo.toml +++ b/crates/miyabi-cli/Cargo.toml @@ -34,6 +34,9 @@ anyhow = { workspace = true } # Serialization serde_json = { workspace = true } +# Utilities +chrono = { workspace = true } + # Logging tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/miyabi-cli/src/main.rs b/crates/miyabi-cli/src/main.rs index e0ddce2..a4dab59 100644 --- a/crates/miyabi-cli/src/main.rs +++ b/crates/miyabi-cli/src/main.rs @@ -1,5 +1,6 @@ //! Miyabi CLI - Main entry point +use chrono::Duration as ChronoDuration; use clap::{Parser, Subcommand, ValueEnum}; use miyabi_core::{FeatureFlagManager, RulesLoader}; use std::collections::HashMap; @@ -201,6 +202,8 @@ enum GateCommand { }, /// Record branch creation Branch { task_id: String, name: String }, + /// Attach task context for execution + Attach { task_id: String }, /// Record PR creation Pr { task_id: String, number: u64 }, /// Record merge verification @@ -211,6 +214,15 @@ enum GateCommand { Dag, /// Show dispatchable tasks Dispatchable, + /// Analyze recent event logs and extract learnings + Dream { + /// Analyze only recent events, e.g. 24h, 30m, 7d + #[arg(long)] + since: Option, + /// Persist High learnings into docs/learnings/ + #[arg(long)] + auto: bool, + }, } /// Collab canvas subcommands — wraps the collab CLI at ~/.local/bin/collab @@ -1154,6 +1166,7 @@ fn handle_gate_command( store_path: &std::path::Path, command: GateCommand, ) -> anyhow::Result { + use miyabi_core::dream::write_high_learnings; use miyabi_core::protocol::{ DeterministicExecutionProtocol, ImpactInput, ProtocolError, RegisterTaskRequest, StatusReport, @@ -1284,6 +1297,28 @@ fn handle_gate_command( println!("branch recorded: {} -> {}", task.id, name); } }), + GateCommand::Attach { task_id } => { + protocol + .attach_context(&task_id, actor, &node) + .map(|attachments| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&attachments).unwrap()); + } else if attachments.is_empty() { + println!("no context attachments: {}", task_id); + } else { + println!("context attachments: {}", task_id); + for attachment in attachments { + println!( + "--- [{}] {} ({} tokens)", + attachment.attachment_type, + attachment.source, + attachment.token_estimate + ); + println!("{}", attachment.content); + } + } + }) + } GateCommand::Pr { task_id, number } => protocol .record_pr(&task_id, number, actor, &node) .map(|task| { @@ -1333,6 +1368,40 @@ fn handle_gate_command( } } }), + GateCommand::Dream { since, auto } => { + let since = since + .as_deref() + .map(parse_gate_since) + .transpose() + .map_err(|error: anyhow::Error| ProtocolError::input(error.to_string()))?; + protocol.dream(since, actor, &node).and_then(|report| { + if matches!(format, OutputFormat::Json) { + println!("{}", serde_json::to_string_pretty(&report).unwrap()); + } else { + print_dream_report(&report); + } + + if auto { + let written = write_high_learnings( + &report, + &std::env::current_dir() + .map_err(|error| ProtocolError::input(error.to_string()))? + .join("docs") + .join("learnings"), + ) + .map_err(ProtocolError::Internal)?; + if !matches!(format, OutputFormat::Json) { + if written.is_empty() { + println!("high learnings: none"); + } else { + println!("high learnings written: {}", written.len()); + } + } + } + + Ok(()) + }) + } }; Ok(match result { @@ -1356,6 +1425,82 @@ fn handle_gate_command( }) } +fn parse_gate_since(input: &str) -> anyhow::Result { + let trimmed = input.trim(); + if trimmed.len() < 2 { + return Err(anyhow::anyhow!("invalid --since value: {trimmed}")); + } + + let (number, unit) = trimmed.split_at(trimmed.len() - 1); + let value: i64 = number + .parse() + .map_err(|_| anyhow::anyhow!("invalid --since value: {trimmed}"))?; + + match unit { + "s" => Ok(ChronoDuration::seconds(value)), + "m" => Ok(ChronoDuration::minutes(value)), + "h" => Ok(ChronoDuration::hours(value)), + "d" => Ok(ChronoDuration::days(value)), + _ => Err(anyhow::anyhow!( + "unsupported --since unit: {unit} (use s, m, h, d)" + )), + } +} + +fn print_dream_report(report: &miyabi_core::DreamReport) { + println!("events processed: {}", report.events_processed); + + if report.patterns.gate_rejections.is_empty() { + println!("gate rejections: none"); + } else { + println!("gate rejections:"); + let mut gates: Vec<_> = report.patterns.gate_rejections.iter().collect(); + gates.sort_by(|left, right| left.0.cmp(right.0)); + for (gate, count) in gates { + println!(" {} -> {}", gate, count); + } + } + + if report.patterns.lock_conflicts.is_empty() { + println!("lock conflicts: none"); + } else { + println!("lock conflicts:"); + let mut files: Vec<_> = report.patterns.lock_conflicts.iter().collect(); + files.sort_by(|left, right| left.0.cmp(right.0)); + for (file, count) in files { + println!(" {} -> {}", file, count); + } + } + + if report.patterns.completion_times.is_empty() { + println!("completion times: none"); + } else { + println!("completion times:"); + for (task_id, duration) in &report.patterns.completion_times { + println!(" {} -> {}s", task_id, duration.as_secs()); + } + } + + if report.learnings.is_empty() { + println!("learnings: none"); + } else { + println!("learnings:"); + for learning in &report.learnings { + println!( + " [{:?}] {}{}", + learning.importance, + learning.title, + learning + .related_task + .as_deref() + .map(|task| format!(" ({task})")) + .unwrap_or_default() + ); + println!(" {}", learning.content); + } + } +} + fn emit_gate_error(format: &OutputFormat, kind: &str, message: &str) { if matches!(format, OutputFormat::Json) { println!( diff --git a/crates/miyabi-core/src/dream.rs b/crates/miyabi-core/src/dream.rs new file mode 100644 index 0000000..bd3ef3d --- /dev/null +++ b/crates/miyabi-core/src/dream.rs @@ -0,0 +1,335 @@ +//! Dreaming over deterministic task events to extract patterns and learnings. + +use crate::error::Result; +use crate::store::{EventStore, TaskEvent, TaskEventType}; +use chrono::{Duration as ChronoDuration, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +const DREAM_TASK_ID: &str = "__dream__"; +const LONG_COMPLETION_SECS: u64 = 60 * 60; +const VERY_LONG_COMPLETION_SECS: u64 = 4 * 60 * 60; +const FREQUENT_PATTERN_THRESHOLD: usize = 2; +const HIGH_PATTERN_THRESHOLD: usize = 3; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DreamReport { + pub patterns: DreamPatterns, + pub learnings: Vec, + pub events_processed: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct DreamPatterns { + pub gate_rejections: HashMap, + pub lock_conflicts: HashMap, + pub completion_times: Vec<(String, Duration)>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Learning { + pub title: String, + pub importance: Importance, + pub content: String, + pub related_task: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Importance { + High, + Medium, + Low, +} + +pub fn dream(event_store: &EventStore, since: Option) -> Result { + let events = collect_events(event_store, since)?; + Ok(analyze_events(&events)) +} + +pub fn write_high_learnings(report: &DreamReport, directory: &Path) -> Result> { + fs::create_dir_all(directory)?; + + let mut written = Vec::new(); + for (index, learning) in report + .learnings + .iter() + .filter(|learning| learning.importance == Importance::High) + .enumerate() + { + let timestamp = Utc::now().format("%Y%m%d-%H%M%S"); + let filename = format!("{timestamp}-{index:02}-{}.md", slugify(&learning.title)); + let path = directory.join(filename); + let tmp_path = path.with_extension("md.tmp"); + let content = format!( + "# {}\n\n- importance: {:?}\n- related_task: {}\n\n{}\n", + learning.title, + learning.importance, + learning.related_task.as_deref().unwrap_or("none"), + learning.content + ); + fs::write(&tmp_path, content)?; + fs::rename(&tmp_path, &path)?; + written.push(path); + } + + Ok(written) +} + +pub fn analyze_events(events: &[TaskEvent]) -> DreamReport { + let mut patterns = DreamPatterns::default(); + let mut task_started_at: HashMap<&str, chrono::DateTime> = HashMap::new(); + + for event in events { + match event.event_type { + TaskEventType::DagChanged => { + if event.task_id != DREAM_TASK_ID { + task_started_at + .entry(event.task_id.as_str()) + .or_insert(event.ts); + } + } + TaskEventType::GateRejected => { + if let Some(gate_name) = event.payload.get("gate").and_then(|value| value.as_str()) + { + *patterns + .gate_rejections + .entry(gate_name.to_string()) + .or_default() += 1; + } + if let Some(files) = event + .payload + .get("files") + .and_then(|value| value.as_array()) + { + for file in files.iter().filter_map(|value| value.as_str()) { + *patterns.lock_conflicts.entry(file.to_string()).or_default() += 1; + } + } + } + TaskEventType::MergeVerified => { + if let Some(started_at) = task_started_at.get(event.task_id.as_str()) { + let elapsed = event.ts.signed_duration_since(*started_at); + if let Ok(duration) = elapsed.to_std() { + patterns + .completion_times + .push((event.task_id.clone(), duration)); + } + } + } + _ => {} + } + } + + patterns + .completion_times + .sort_by(|left, right| left.0.cmp(&right.0)); + let learnings = extract_learnings(&patterns); + + DreamReport { + patterns, + learnings, + events_processed: events.len(), + } +} + +fn collect_events( + event_store: &EventStore, + since: Option, +) -> Result> { + let events = event_store.replay(None)?; + if let Some(since) = since { + let cutoff = Utc::now() - since; + return Ok(events + .into_iter() + .filter(|event| event.ts >= cutoff) + .collect()); + } + + let start_index = events + .iter() + .rposition(|event| event.event_type == TaskEventType::DreamRecorded) + .map_or(0, |index| index + 1); + Ok(events.into_iter().skip(start_index).collect()) +} + +fn extract_learnings(patterns: &DreamPatterns) -> Vec { + let mut learnings = Vec::new(); + + let mut gate_entries: Vec<_> = patterns.gate_rejections.iter().collect(); + gate_entries.sort_by(|left, right| left.0.cmp(right.0)); + for (gate_name, count) in gate_entries { + if *count >= FREQUENT_PATTERN_THRESHOLD { + learnings.push(Learning { + title: format!("{gate_name} の拒否が多発"), + importance: if *count >= HIGH_PATTERN_THRESHOLD { + Importance::High + } else { + Importance::Medium + }, + content: format!( + "{gate_name} の拒否が {count} 回発生しています。手順書の改善が必要です。" + ), + related_task: None, + }); + } + } + + let mut lock_entries: Vec<_> = patterns.lock_conflicts.iter().collect(); + lock_entries.sort_by(|left, right| left.0.cmp(right.0)); + for (file, count) in lock_entries { + learnings.push(Learning { + title: format!("ロック競合が発生: {file}"), + importance: if *count >= HIGH_PATTERN_THRESHOLD { + Importance::High + } else { + Importance::Medium + }, + content: format!( + "{file} でロック競合が {count} 回発生しました。ファイル分割を検討してください。" + ), + related_task: None, + }); + } + + for (task_id, duration) in &patterns.completion_times { + if duration.as_secs() >= LONG_COMPLETION_SECS { + learnings.push(Learning { + title: format!("完了時間が長い: {task_id}"), + importance: if duration.as_secs() >= VERY_LONG_COMPLETION_SECS { + Importance::High + } else { + Importance::Medium + }, + content: format!( + "{task_id} は完了まで {} 秒かかりました。見積もり精度の改善が必要です。", + duration.as_secs() + ), + related_task: Some(task_id.clone()), + }); + } + } + + learnings +} + +fn slugify(title: &str) -> String { + let slug: String = title + .chars() + .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' }) + .collect(); + let slug = slug.trim_matches('-').to_ascii_lowercase(); + if slug.is_empty() { + "learning".to_string() + } else { + slug + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::TaskEvent; + use chrono::{Duration as ChronoDuration, TimeZone}; + use serde_json::json; + + fn event( + event_type: TaskEventType, + task_id: &str, + ts_offset_sec: i64, + payload: serde_json::Value, + ) -> TaskEvent { + let base = Utc.with_ymd_and_hms(2026, 4, 10, 0, 0, 0).unwrap(); + TaskEvent { + id: format!("{task_id}-{ts_offset_sec}-{:?}", event_type), + ts: base + ChronoDuration::seconds(ts_offset_sec), + event_type, + task_id: task_id.to_string(), + agent: "test".to_string(), + node: "test".to_string(), + payload, + version: 1, + } + } + + #[test] + fn dream_with_mix_of_events_produces_correct_patterns() { + let events = vec![ + event(TaskEventType::DagChanged, "task-a", 0, json!({})), + event( + TaskEventType::GateRejected, + "task-a", + 60, + json!({"gate": "GATE 3", "files": ["src/shared.rs"]}), + ), + event( + TaskEventType::GateRejected, + "task-b", + 120, + json!({"gate": "GATE 4", "files": ["src/shared.rs", "src/other.rs"]}), + ), + event(TaskEventType::MergeVerified, "task-a", 7200, json!({})), + ]; + + let report = analyze_events(&events); + + assert_eq!(report.events_processed, 4); + assert_eq!(report.patterns.gate_rejections.get("GATE 3"), Some(&1)); + assert_eq!(report.patterns.gate_rejections.get("GATE 4"), Some(&1)); + assert_eq!( + report.patterns.lock_conflicts.get("src/shared.rs"), + Some(&2) + ); + assert_eq!(report.patterns.lock_conflicts.get("src/other.rs"), Some(&1)); + assert_eq!(report.patterns.completion_times.len(), 1); + assert_eq!(report.patterns.completion_times[0].0, "task-a"); + assert_eq!( + report.patterns.completion_times[0].1, + Duration::from_secs(7200) + ); + } + + #[test] + fn learning_extraction_from_gate_rejected_events() { + let events = vec![ + event( + TaskEventType::GateRejected, + "task-a", + 0, + json!({"gate": "GATE 3"}), + ), + event( + TaskEventType::GateRejected, + "task-b", + 10, + json!({"gate": "GATE 3"}), + ), + event( + TaskEventType::GateRejected, + "task-c", + 20, + json!({"gate": "GATE 3"}), + ), + ]; + + let report = analyze_events(&events); + assert!(report.learnings.iter().any(|learning| { + learning.title.contains("GATE 3") + && learning.importance == Importance::High + && learning.content.contains("手順書の改善が必要") + })); + } + + #[test] + fn empty_events_produce_empty_report() { + let report = analyze_events(&[]); + assert_eq!(report.events_processed, 0); + assert!(report.patterns.gate_rejections.is_empty()); + assert!(report.patterns.lock_conflicts.is_empty()); + assert!(report.patterns.completion_times.is_empty()); + assert!(report.learnings.is_empty()); + } +} diff --git a/crates/miyabi-core/src/lib.rs b/crates/miyabi-core/src/lib.rs index 703431b..9ffc7ae 100644 --- a/crates/miyabi-core/src/lib.rs +++ b/crates/miyabi-core/src/lib.rs @@ -8,6 +8,7 @@ pub mod cache; pub mod config; pub mod conversation; pub mod dag; +pub mod dream; pub mod error; pub mod error_policy; pub mod feature_flags; @@ -98,6 +99,7 @@ pub mod openclaw; pub use dag::{ DAGError, Task as DAGTask, TaskGraph, TaskGraphBuilder, TaskId, TaskLevel, TaskNode, }; +pub use dream::{DreamPatterns, DreamReport, Importance, Learning}; pub use openclaw::{AgentInfo, OpenClawClient, OpenClawError, OpenClawResult}; pub use orchestration::{ create_orchestrator, Orchestrator, OrchestratorTask, ParallelConfig, ParallelResult, TaskResult, diff --git a/crates/miyabi-core/src/protocol.rs b/crates/miyabi-core/src/protocol.rs index 18b2a75..903bb78 100644 --- a/crates/miyabi-core/src/protocol.rs +++ b/crates/miyabi-core/src/protocol.rs @@ -1,19 +1,25 @@ //! Deterministic execution protocol entry point. +use crate::dream::DreamReport; use crate::error::Error; use crate::gate::{evaluate_gate, validate_branch_name, Gate, GateContext, GateReport}; use crate::lock::{FileLockManager, LeaseConfig, LockConflict}; use crate::store::{ - CompletionMode, EventStore, ExecutionTask, GitHubEvidence, GitHubIssueState, GitHubPrState, - HumanApproval, ImpactRiskLevel, ReviewDecision, SnapshotStore, TaskEvent, TaskEventType, - TaskImpact, TaskState, TasksSnapshot, + CompletionMode, ContextAttachment, EventStore, ExecutionTask, GitHubEvidence, GitHubIssueState, + GitHubPrState, HumanApproval, ImpactRiskLevel, ReviewDecision, SnapshotStore, TaskEvent, + TaskEventType, TaskImpact, TaskState, TasksSnapshot, }; use chrono::Utc; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; +use std::fs; +use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; +const MAX_CONTEXT_TOKENS: usize = 4_000; +const FILE_SNIPPET_LINE_LIMIT: usize = 30; + #[derive(Debug, Clone)] pub struct DeterministicExecutionProtocol { event_store: EventStore, @@ -132,6 +138,7 @@ impl DeterministicExecutionProtocol { } let mut task = ExecutionTask::new(&request.task_id, request.title); + task.issue_number = request.issue; task.current_state = TaskState::Pending; task.dependencies = request.dependencies; task.soft_dependencies = request.soft_dependencies; @@ -222,6 +229,21 @@ impl DeterministicExecutionProtocol { .has_conflict(files) .map_err(ProtocolError::from)?; if conflict.conflicting { + self.append_event( + task_id, + TaskEventType::GateRejected, + agent, + node, + snapshot.version + 1, + serde_json::json!({ + "gate": Gate::Gate4.label(), + "detail": format!( + "lock conflict held by {}", + conflict.held_by.as_deref().unwrap_or("unknown") + ), + "files": files, + }), + )?; return Err(ProtocolError::gate_rejected(format!( "lock conflict held by {}", conflict.held_by.unwrap_or_else(|| "unknown".to_string()) @@ -231,6 +253,7 @@ impl DeterministicExecutionProtocol { self.lock_manager .acquire_lock(task_id, agent, node, files) .map_err(ProtocolError::from)?; + self.attach_context(task_id, agent, node)?; let task = self.transition_task(task_id, TaskState::Implementing, agent, node)?; Ok(AssignmentResult { @@ -389,6 +412,43 @@ impl DeterministicExecutionProtocol { Ok(compute_dag(&snapshot)) } + pub fn dream( + &self, + since: Option, + actor: &str, + node: &str, + ) -> ProtocolResult { + let report = crate::dream::dream(&self.event_store, since).map_err(ProtocolError::from)?; + let version = self + .snapshot_store + .load() + .map_err(ProtocolError::from)? + .version; + self.append_event( + "__dream__", + TaskEventType::DreamRecorded, + actor, + node, + version, + serde_json::json!({ + "events_processed": report.events_processed, + "gate_rejections": report.patterns.gate_rejections.len(), + "lock_conflicts": report.patterns.lock_conflicts.len(), + "learnings": report.learnings.len(), + }), + )?; + Ok(report) + } + + pub fn attach_context( + &self, + task_id: &str, + actor: &str, + node: &str, + ) -> ProtocolResult> { + self.attach_context_with_limit(task_id, actor, node, MAX_CONTEXT_TOKENS) + } + pub fn dispatchable(&self) -> ProtocolResult { let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; let tasks = snapshot @@ -413,6 +473,33 @@ impl DeterministicExecutionProtocol { Ok(DispatchableReport { tasks }) } + fn attach_context_with_limit( + &self, + task_id: &str, + actor: &str, + node: &str, + max_context_tokens: usize, + ) -> ProtocolResult> { + let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?; + let task = snapshot + .get_task(task_id) + .cloned() + .ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?; + let attachments = self.build_context_attachments(&task, max_context_tokens)?; + let payload = serde_json::to_value(&attachments).map_err(Error::from)?; + self.update_task( + task_id, + actor, + node, + TaskEventType::ContextAttached, + |task| { + task.context_attachments = attachments.clone(); + Ok(payload.clone()) + }, + )?; + Ok(attachments) + } + fn transition_task( &self, task_id: &str, @@ -514,6 +601,87 @@ impl DeterministicExecutionProtocol { ) } + fn build_context_attachments( + &self, + task: &ExecutionTask, + max_context_tokens: usize, + ) -> ProtocolResult> { + let mut attachments = Vec::new(); + let mut remaining_tokens = max_context_tokens; + + if task.issue_number > 0 { + push_attachment( + &mut attachments, + &mut remaining_tokens, + "issue", + &format!("github://issue/{}", task.issue_number), + &format!("Issue #{}", task.issue_number), + ); + } + + if let Some(impact) = &task.impact { + push_attachment( + &mut attachments, + &mut remaining_tokens, + "impact", + &format!("dtp://impact/{}", task.id), + &format!( + "risk_level: {:?}\naffected_symbols: {}", + impact.risk_level, impact.affected_symbols + ), + ); + } + + if let Some(lock) = &task.lock { + for file in &lock.affected_files { + if remaining_tokens == 0 { + break; + } + let source_path = self.resolve_attachment_path(file); + if !source_path.exists() { + continue; + } + let content = read_file_snippet(&source_path, FILE_SNIPPET_LINE_LIMIT) + .map_err(ProtocolError::from)?; + push_attachment( + &mut attachments, + &mut remaining_tokens, + "file_snippet", + &source_path.display().to_string(), + &content, + ); + } + } + + Ok(attachments) + } + + fn resolve_attachment_path(&self, source: &str) -> PathBuf { + let path = PathBuf::from(source); + if path.is_absolute() { + return path; + } + + let store_relative = self + .snapshot_store + .path() + .parent() + .map(|base| base.join(path)) + .unwrap_or_else(|| PathBuf::from(source)); + if store_relative.exists() { + return store_relative; + } + + let cwd_relative = std::env::current_dir() + .map(|cwd| cwd.join(source)) + .unwrap_or_else(|_| PathBuf::from(source)); + if cwd_relative.exists() { + return cwd_relative; + } + + store_relative + } + fn unblock_dependents_after_merge( &self, task_id: &str, @@ -660,6 +828,73 @@ fn dependencies_satisfied(task: &ExecutionTask, snapshot: &TasksSnapshot) -> boo }) } +fn push_attachment( + attachments: &mut Vec, + remaining_tokens: &mut usize, + attachment_type: &str, + source: &str, + content: &str, +) { + if *remaining_tokens == 0 { + return; + } + + let truncated_content = truncate_to_token_budget(content, *remaining_tokens); + let token_estimate = estimate_tokens(&truncated_content); + if token_estimate == 0 { + return; + } + + *remaining_tokens = remaining_tokens.saturating_sub(token_estimate); + attachments.push(ContextAttachment { + attachment_type: attachment_type.to_string(), + source: source.to_string(), + content: truncated_content, + token_estimate, + }); +} + +fn truncate_to_token_budget(content: &str, max_tokens: usize) -> String { + if max_tokens == 0 { + return String::new(); + } + + let max_chars = max_tokens.saturating_mul(4); + let content_chars = content.chars().count(); + if content_chars <= max_chars { + return content.to_string(); + } + + let truncated: String = content.chars().take(max_chars).collect(); + if max_chars >= 3 { + format!( + "{}...", + truncated.chars().take(max_chars - 3).collect::() + ) + } else { + truncated + } +} + +fn estimate_tokens(content: &str) -> usize { + let char_count = content.chars().count(); + if char_count == 0 { + 0 + } else { + char_count.div_ceil(4) + } +} + +fn read_file_snippet(path: &Path, max_lines: usize) -> Result { + let file = fs::File::open(path)?; + let reader = BufReader::new(file); + let mut lines = Vec::new(); + for line in reader.lines().take(max_lines) { + lines.push(line?); + } + Ok(lines.join("\n")) +} + fn recompute_dependents(snapshot: &mut TasksSnapshot) { let mut dependents: HashMap> = HashMap::new(); for task in &snapshot.tasks { @@ -733,6 +968,7 @@ fn compute_dag(snapshot: &TasksSnapshot) -> DagReport { #[cfg(test)] mod tests { use super::*; + use std::fs; use tempfile::TempDir; fn fixture() -> (TempDir, DeterministicExecutionProtocol) { @@ -919,6 +1155,128 @@ mod tests { assert!(protocol.locks().unwrap().is_empty()); } + #[test] + fn attach_context_collects_issue_impact_and_file_snippets() { + let (tmp, protocol) = fixture(); + let src_dir = tmp.path().join("src"); + fs::create_dir_all(&src_dir).unwrap(); + let file_path = src_dir.join("lib.rs"); + let file_content = (1..=35) + .map(|line| format!("line {line}")) + .collect::>() + .join("\n"); + fs::write(&file_path, file_content).unwrap(); + + protocol + .register( + RegisterTaskRequest { + issue: 42, + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .record_impact( + "phase-a", + ImpactInput { + risk_level: ImpactRiskLevel::Low, + affected_symbols: 7, + depth1: vec!["attach_context".into()], + analyzed_commit: None, + input_hash: None, + approve: false, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .assign("phase-a", "codex", "macbook", &[String::from("src/lib.rs")]) + .unwrap(); + + let attachments = protocol + .attach_context("phase-a", "codex", "macbook") + .unwrap(); + + assert_eq!(attachments.len(), 3); + assert_eq!(attachments[0].attachment_type, "issue"); + assert_eq!(attachments[0].content, "Issue #42"); + assert_eq!(attachments[1].attachment_type, "impact"); + assert!(attachments[1].content.contains("affected_symbols: 7")); + assert_eq!(attachments[2].attachment_type, "file_snippet"); + assert!(attachments[2].content.contains("line 1")); + assert!(attachments[2].content.contains("line 30")); + assert!(!attachments[2].content.contains("line 31")); + + let task = match protocol.status(Some("phase-a")).unwrap() { + StatusReport::Task(task) => task, + StatusReport::Snapshot(_) => panic!("expected task status"), + }; + assert_eq!(task.issue_number, 42); + assert_eq!(task.context_attachments.len(), 3); + } + + #[test] + fn attach_context_trims_to_token_budget() { + let (tmp, protocol) = fixture(); + let src_dir = tmp.path().join("src"); + fs::create_dir_all(&src_dir).unwrap(); + fs::write( + src_dir.join("big.rs"), + "abcdefghijklmnopqrstuvwxyz".repeat(40), + ) + .unwrap(); + + protocol + .register( + RegisterTaskRequest { + issue: 9, + task_id: "phase-a".into(), + title: "Phase A".into(), + dependencies: vec![], + soft_dependencies: vec![], + priority: 0, + completion_mode: CompletionMode::GithubPr, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .record_impact( + "phase-a", + ImpactInput { + risk_level: ImpactRiskLevel::Low, + affected_symbols: 99, + depth1: vec!["attach_context".into()], + analyzed_commit: None, + input_hash: None, + approve: false, + }, + "codex", + "macbook", + ) + .unwrap(); + protocol + .assign("phase-a", "codex", "macbook", &[String::from("src/big.rs")]) + .unwrap(); + + let attachments = protocol + .attach_context_with_limit("phase-a", "codex", "macbook", 8) + .unwrap(); + + let total_tokens: usize = attachments.iter().map(|item| item.token_estimate).sum(); + assert!(total_tokens <= 8); + assert!(!attachments.is_empty()); + } + #[test] fn assign_rejects_high_risk_without_human_approval() { let (_tmp, protocol) = fixture(); diff --git a/crates/miyabi-core/src/store.rs b/crates/miyabi-core/src/store.rs index 275f681..b999d21 100644 --- a/crates/miyabi-core/src/store.rs +++ b/crates/miyabi-core/src/store.rs @@ -106,10 +106,20 @@ pub struct TaskLockSnapshot { pub affected_files: Vec, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ContextAttachment { + pub attachment_type: String, + pub source: String, + pub content: String, + pub token_estimate: usize, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ExecutionTask { pub id: String, pub title: String, + #[serde(default)] + pub issue_number: u64, pub current_state: TaskState, pub dependencies: Vec, pub dependents: Vec, @@ -120,6 +130,8 @@ pub struct ExecutionTask { pub github_evidence: Option, pub completion_mode: CompletionMode, pub human_approval: Option, + #[serde(default)] + pub context_attachments: Vec, pub priority: u32, pub created_at: DateTime, pub updated_at: DateTime, @@ -131,6 +143,7 @@ impl ExecutionTask { Self { id: id.into(), title: title.into(), + issue_number: 0, current_state: TaskState::Draft, dependencies: Vec::new(), dependents: Vec::new(), @@ -141,6 +154,7 @@ impl ExecutionTask { github_evidence: None, completion_mode: CompletionMode::GithubPr, human_approval: None, + context_attachments: Vec::new(), priority: 0, created_at: now, updated_at: now, @@ -164,7 +178,9 @@ pub enum TaskEventType { BranchCreated, PrCreated, MergeVerified, + ContextAttached, AuditRecorded, + DreamRecorded, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -587,6 +603,7 @@ impl LegacyTask { ExecutionTask { id: self.id, title: self.title, + issue_number: 0, current_state: legacy_state(&self.state), dependencies: self.dependencies, dependents: self.dependents, @@ -597,6 +614,7 @@ impl LegacyTask { github_evidence, completion_mode: CompletionMode::GithubPr, human_approval: None, + context_attachments: Vec::new(), priority: 0, created_at, updated_at, @@ -771,4 +789,36 @@ mod tests { assert!(task.lock.is_some()); assert!(rebuilt.file_locks.contains_key("src/lib.rs")); } + + #[test] + fn snapshot_load_defaults_missing_context_attachments() { + let tmp = TempDir::new().unwrap(); + let snapshot_store = SnapshotStore::new( + tmp.path().join("tasks.snapshot.json"), + tmp.path().join(".tasks.lock"), + ); + + let mut task_value = serde_json::to_value(sample_task("task-a")).unwrap(); + let task_object = task_value.as_object_mut().unwrap(); + task_object.remove("context_attachments"); + task_object.remove("issue_number"); + + let raw = serde_json::json!({ + "version": 1, + "generated_at": Utc::now(), + "generated_from_event_id": null, + "tasks": [task_value], + "file_locks": {} + }); + fs::write( + snapshot_store.path(), + serde_json::to_vec_pretty(&raw).unwrap(), + ) + .unwrap(); + + let snapshot = snapshot_store.load().unwrap(); + assert_eq!(snapshot.tasks.len(), 1); + assert_eq!(snapshot.tasks[0].issue_number, 0); + assert!(snapshot.tasks[0].context_attachments.is_empty()); + } }