[追加] ドリーミング: event log → 学び抽出 (#59)

This commit is contained in:
林 駿甫 (Shunsuke Hayashi) 2026-04-10 07:29:03 +09:00
parent 36eec31fa1
commit 70cd906776
7 changed files with 897 additions and 3 deletions

1
Cargo.lock generated
View file

@ -1350,6 +1350,7 @@ name = "miyabi-cli"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"crossterm 0.29.0",
"miyabi-core",

View file

@ -34,6 +34,9 @@ anyhow = { workspace = true }
# Serialization
serde_json = { workspace = true }
# Utilities
chrono = { workspace = true }
# Logging
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

View file

@ -1,5 +1,6 @@
//! Miyabi CLI - Main entry point
use chrono::Duration as ChronoDuration;
use clap::{Parser, Subcommand, ValueEnum};
use miyabi_core::{FeatureFlagManager, RulesLoader};
use std::collections::HashMap;
@ -201,6 +202,8 @@ enum GateCommand {
},
/// Record branch creation
Branch { task_id: String, name: String },
/// Attach task context for execution
Attach { task_id: String },
/// Record PR creation
Pr { task_id: String, number: u64 },
/// Record merge verification
@ -211,6 +214,15 @@ enum GateCommand {
Dag,
/// Show dispatchable tasks
Dispatchable,
/// Analyze recent event logs and extract learnings
Dream {
/// Analyze only recent events, e.g. 24h, 30m, 7d
#[arg(long)]
since: Option<String>,
/// Persist High learnings into docs/learnings/
#[arg(long)]
auto: bool,
},
}
/// Collab canvas subcommands — wraps the collab CLI at ~/.local/bin/collab
@ -1154,6 +1166,7 @@ fn handle_gate_command(
store_path: &std::path::Path,
command: GateCommand,
) -> anyhow::Result<i32> {
use miyabi_core::dream::write_high_learnings;
use miyabi_core::protocol::{
DeterministicExecutionProtocol, ImpactInput, ProtocolError, RegisterTaskRequest,
StatusReport,
@ -1284,6 +1297,28 @@ fn handle_gate_command(
println!("branch recorded: {} -> {}", task.id, name);
}
}),
GateCommand::Attach { task_id } => {
protocol
.attach_context(&task_id, actor, &node)
.map(|attachments| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&attachments).unwrap());
} else if attachments.is_empty() {
println!("no context attachments: {}", task_id);
} else {
println!("context attachments: {}", task_id);
for attachment in attachments {
println!(
"--- [{}] {} ({} tokens)",
attachment.attachment_type,
attachment.source,
attachment.token_estimate
);
println!("{}", attachment.content);
}
}
})
}
GateCommand::Pr { task_id, number } => protocol
.record_pr(&task_id, number, actor, &node)
.map(|task| {
@ -1333,6 +1368,40 @@ fn handle_gate_command(
}
}
}),
GateCommand::Dream { since, auto } => {
let since = since
.as_deref()
.map(parse_gate_since)
.transpose()
.map_err(|error: anyhow::Error| ProtocolError::input(error.to_string()))?;
protocol.dream(since, actor, &node).and_then(|report| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&report).unwrap());
} else {
print_dream_report(&report);
}
if auto {
let written = write_high_learnings(
&report,
&std::env::current_dir()
.map_err(|error| ProtocolError::input(error.to_string()))?
.join("docs")
.join("learnings"),
)
.map_err(ProtocolError::Internal)?;
if !matches!(format, OutputFormat::Json) {
if written.is_empty() {
println!("high learnings: none");
} else {
println!("high learnings written: {}", written.len());
}
}
}
Ok(())
})
}
};
Ok(match result {
@ -1356,6 +1425,82 @@ fn handle_gate_command(
})
}
fn parse_gate_since(input: &str) -> anyhow::Result<ChronoDuration> {
let trimmed = input.trim();
if trimmed.len() < 2 {
return Err(anyhow::anyhow!("invalid --since value: {trimmed}"));
}
let (number, unit) = trimmed.split_at(trimmed.len() - 1);
let value: i64 = number
.parse()
.map_err(|_| anyhow::anyhow!("invalid --since value: {trimmed}"))?;
match unit {
"s" => Ok(ChronoDuration::seconds(value)),
"m" => Ok(ChronoDuration::minutes(value)),
"h" => Ok(ChronoDuration::hours(value)),
"d" => Ok(ChronoDuration::days(value)),
_ => Err(anyhow::anyhow!(
"unsupported --since unit: {unit} (use s, m, h, d)"
)),
}
}
fn print_dream_report(report: &miyabi_core::DreamReport) {
println!("events processed: {}", report.events_processed);
if report.patterns.gate_rejections.is_empty() {
println!("gate rejections: none");
} else {
println!("gate rejections:");
let mut gates: Vec<_> = report.patterns.gate_rejections.iter().collect();
gates.sort_by(|left, right| left.0.cmp(right.0));
for (gate, count) in gates {
println!(" {} -> {}", gate, count);
}
}
if report.patterns.lock_conflicts.is_empty() {
println!("lock conflicts: none");
} else {
println!("lock conflicts:");
let mut files: Vec<_> = report.patterns.lock_conflicts.iter().collect();
files.sort_by(|left, right| left.0.cmp(right.0));
for (file, count) in files {
println!(" {} -> {}", file, count);
}
}
if report.patterns.completion_times.is_empty() {
println!("completion times: none");
} else {
println!("completion times:");
for (task_id, duration) in &report.patterns.completion_times {
println!(" {} -> {}s", task_id, duration.as_secs());
}
}
if report.learnings.is_empty() {
println!("learnings: none");
} else {
println!("learnings:");
for learning in &report.learnings {
println!(
" [{:?}] {}{}",
learning.importance,
learning.title,
learning
.related_task
.as_deref()
.map(|task| format!(" ({task})"))
.unwrap_or_default()
);
println!(" {}", learning.content);
}
}
}
fn emit_gate_error(format: &OutputFormat, kind: &str, message: &str) {
if matches!(format, OutputFormat::Json) {
println!(

View file

@ -0,0 +1,335 @@
//! Dreaming over deterministic task events to extract patterns and learnings.
use crate::error::Result;
use crate::store::{EventStore, TaskEvent, TaskEventType};
use chrono::{Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Duration;
const DREAM_TASK_ID: &str = "__dream__";
const LONG_COMPLETION_SECS: u64 = 60 * 60;
const VERY_LONG_COMPLETION_SECS: u64 = 4 * 60 * 60;
const FREQUENT_PATTERN_THRESHOLD: usize = 2;
const HIGH_PATTERN_THRESHOLD: usize = 3;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DreamReport {
pub patterns: DreamPatterns,
pub learnings: Vec<Learning>,
pub events_processed: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DreamPatterns {
pub gate_rejections: HashMap<String, usize>,
pub lock_conflicts: HashMap<String, usize>,
pub completion_times: Vec<(String, Duration)>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Learning {
pub title: String,
pub importance: Importance,
pub content: String,
pub related_task: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Importance {
High,
Medium,
Low,
}
pub fn dream(event_store: &EventStore, since: Option<ChronoDuration>) -> Result<DreamReport> {
let events = collect_events(event_store, since)?;
Ok(analyze_events(&events))
}
pub fn write_high_learnings(report: &DreamReport, directory: &Path) -> Result<Vec<PathBuf>> {
fs::create_dir_all(directory)?;
let mut written = Vec::new();
for (index, learning) in report
.learnings
.iter()
.filter(|learning| learning.importance == Importance::High)
.enumerate()
{
let timestamp = Utc::now().format("%Y%m%d-%H%M%S");
let filename = format!("{timestamp}-{index:02}-{}.md", slugify(&learning.title));
let path = directory.join(filename);
let tmp_path = path.with_extension("md.tmp");
let content = format!(
"# {}\n\n- importance: {:?}\n- related_task: {}\n\n{}\n",
learning.title,
learning.importance,
learning.related_task.as_deref().unwrap_or("none"),
learning.content
);
fs::write(&tmp_path, content)?;
fs::rename(&tmp_path, &path)?;
written.push(path);
}
Ok(written)
}
pub fn analyze_events(events: &[TaskEvent]) -> DreamReport {
let mut patterns = DreamPatterns::default();
let mut task_started_at: HashMap<&str, chrono::DateTime<Utc>> = HashMap::new();
for event in events {
match event.event_type {
TaskEventType::DagChanged => {
if event.task_id != DREAM_TASK_ID {
task_started_at
.entry(event.task_id.as_str())
.or_insert(event.ts);
}
}
TaskEventType::GateRejected => {
if let Some(gate_name) = event.payload.get("gate").and_then(|value| value.as_str())
{
*patterns
.gate_rejections
.entry(gate_name.to_string())
.or_default() += 1;
}
if let Some(files) = event
.payload
.get("files")
.and_then(|value| value.as_array())
{
for file in files.iter().filter_map(|value| value.as_str()) {
*patterns.lock_conflicts.entry(file.to_string()).or_default() += 1;
}
}
}
TaskEventType::MergeVerified => {
if let Some(started_at) = task_started_at.get(event.task_id.as_str()) {
let elapsed = event.ts.signed_duration_since(*started_at);
if let Ok(duration) = elapsed.to_std() {
patterns
.completion_times
.push((event.task_id.clone(), duration));
}
}
}
_ => {}
}
}
patterns
.completion_times
.sort_by(|left, right| left.0.cmp(&right.0));
let learnings = extract_learnings(&patterns);
DreamReport {
patterns,
learnings,
events_processed: events.len(),
}
}
fn collect_events(
event_store: &EventStore,
since: Option<ChronoDuration>,
) -> Result<Vec<TaskEvent>> {
let events = event_store.replay(None)?;
if let Some(since) = since {
let cutoff = Utc::now() - since;
return Ok(events
.into_iter()
.filter(|event| event.ts >= cutoff)
.collect());
}
let start_index = events
.iter()
.rposition(|event| event.event_type == TaskEventType::DreamRecorded)
.map_or(0, |index| index + 1);
Ok(events.into_iter().skip(start_index).collect())
}
fn extract_learnings(patterns: &DreamPatterns) -> Vec<Learning> {
let mut learnings = Vec::new();
let mut gate_entries: Vec<_> = patterns.gate_rejections.iter().collect();
gate_entries.sort_by(|left, right| left.0.cmp(right.0));
for (gate_name, count) in gate_entries {
if *count >= FREQUENT_PATTERN_THRESHOLD {
learnings.push(Learning {
title: format!("{gate_name} の拒否が多発"),
importance: if *count >= HIGH_PATTERN_THRESHOLD {
Importance::High
} else {
Importance::Medium
},
content: format!(
"{gate_name} の拒否が {count} 回発生しています。手順書の改善が必要です。"
),
related_task: None,
});
}
}
let mut lock_entries: Vec<_> = patterns.lock_conflicts.iter().collect();
lock_entries.sort_by(|left, right| left.0.cmp(right.0));
for (file, count) in lock_entries {
learnings.push(Learning {
title: format!("ロック競合が発生: {file}"),
importance: if *count >= HIGH_PATTERN_THRESHOLD {
Importance::High
} else {
Importance::Medium
},
content: format!(
"{file} でロック競合が {count} 回発生しました。ファイル分割を検討してください。"
),
related_task: None,
});
}
for (task_id, duration) in &patterns.completion_times {
if duration.as_secs() >= LONG_COMPLETION_SECS {
learnings.push(Learning {
title: format!("完了時間が長い: {task_id}"),
importance: if duration.as_secs() >= VERY_LONG_COMPLETION_SECS {
Importance::High
} else {
Importance::Medium
},
content: format!(
"{task_id} は完了まで {} 秒かかりました。見積もり精度の改善が必要です。",
duration.as_secs()
),
related_task: Some(task_id.clone()),
});
}
}
learnings
}
fn slugify(title: &str) -> String {
let slug: String = title
.chars()
.map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
.collect();
let slug = slug.trim_matches('-').to_ascii_lowercase();
if slug.is_empty() {
"learning".to_string()
} else {
slug
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::TaskEvent;
use chrono::{Duration as ChronoDuration, TimeZone};
use serde_json::json;
fn event(
event_type: TaskEventType,
task_id: &str,
ts_offset_sec: i64,
payload: serde_json::Value,
) -> TaskEvent {
let base = Utc.with_ymd_and_hms(2026, 4, 10, 0, 0, 0).unwrap();
TaskEvent {
id: format!("{task_id}-{ts_offset_sec}-{:?}", event_type),
ts: base + ChronoDuration::seconds(ts_offset_sec),
event_type,
task_id: task_id.to_string(),
agent: "test".to_string(),
node: "test".to_string(),
payload,
version: 1,
}
}
#[test]
fn dream_with_mix_of_events_produces_correct_patterns() {
let events = vec![
event(TaskEventType::DagChanged, "task-a", 0, json!({})),
event(
TaskEventType::GateRejected,
"task-a",
60,
json!({"gate": "GATE 3", "files": ["src/shared.rs"]}),
),
event(
TaskEventType::GateRejected,
"task-b",
120,
json!({"gate": "GATE 4", "files": ["src/shared.rs", "src/other.rs"]}),
),
event(TaskEventType::MergeVerified, "task-a", 7200, json!({})),
];
let report = analyze_events(&events);
assert_eq!(report.events_processed, 4);
assert_eq!(report.patterns.gate_rejections.get("GATE 3"), Some(&1));
assert_eq!(report.patterns.gate_rejections.get("GATE 4"), Some(&1));
assert_eq!(
report.patterns.lock_conflicts.get("src/shared.rs"),
Some(&2)
);
assert_eq!(report.patterns.lock_conflicts.get("src/other.rs"), Some(&1));
assert_eq!(report.patterns.completion_times.len(), 1);
assert_eq!(report.patterns.completion_times[0].0, "task-a");
assert_eq!(
report.patterns.completion_times[0].1,
Duration::from_secs(7200)
);
}
#[test]
fn learning_extraction_from_gate_rejected_events() {
let events = vec![
event(
TaskEventType::GateRejected,
"task-a",
0,
json!({"gate": "GATE 3"}),
),
event(
TaskEventType::GateRejected,
"task-b",
10,
json!({"gate": "GATE 3"}),
),
event(
TaskEventType::GateRejected,
"task-c",
20,
json!({"gate": "GATE 3"}),
),
];
let report = analyze_events(&events);
assert!(report.learnings.iter().any(|learning| {
learning.title.contains("GATE 3")
&& learning.importance == Importance::High
&& learning.content.contains("手順書の改善が必要")
}));
}
#[test]
fn empty_events_produce_empty_report() {
let report = analyze_events(&[]);
assert_eq!(report.events_processed, 0);
assert!(report.patterns.gate_rejections.is_empty());
assert!(report.patterns.lock_conflicts.is_empty());
assert!(report.patterns.completion_times.is_empty());
assert!(report.learnings.is_empty());
}
}

View file

@ -8,6 +8,7 @@ pub mod cache;
pub mod config;
pub mod conversation;
pub mod dag;
pub mod dream;
pub mod error;
pub mod error_policy;
pub mod feature_flags;
@ -98,6 +99,7 @@ pub mod openclaw;
pub use dag::{
DAGError, Task as DAGTask, TaskGraph, TaskGraphBuilder, TaskId, TaskLevel, TaskNode,
};
pub use dream::{DreamPatterns, DreamReport, Importance, Learning};
pub use openclaw::{AgentInfo, OpenClawClient, OpenClawError, OpenClawResult};
pub use orchestration::{
create_orchestrator, Orchestrator, OrchestratorTask, ParallelConfig, ParallelResult, TaskResult,

View file

@ -1,19 +1,25 @@
//! Deterministic execution protocol entry point.
use crate::dream::DreamReport;
use crate::error::Error;
use crate::gate::{evaluate_gate, validate_branch_name, Gate, GateContext, GateReport};
use crate::lock::{FileLockManager, LeaseConfig, LockConflict};
use crate::store::{
CompletionMode, EventStore, ExecutionTask, GitHubEvidence, GitHubIssueState, GitHubPrState,
HumanApproval, ImpactRiskLevel, ReviewDecision, SnapshotStore, TaskEvent, TaskEventType,
TaskImpact, TaskState, TasksSnapshot,
CompletionMode, ContextAttachment, EventStore, ExecutionTask, GitHubEvidence, GitHubIssueState,
GitHubPrState, HumanApproval, ImpactRiskLevel, ReviewDecision, SnapshotStore, TaskEvent,
TaskEventType, TaskImpact, TaskState, TasksSnapshot,
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
const MAX_CONTEXT_TOKENS: usize = 4_000;
const FILE_SNIPPET_LINE_LIMIT: usize = 30;
#[derive(Debug, Clone)]
pub struct DeterministicExecutionProtocol {
event_store: EventStore,
@ -132,6 +138,7 @@ impl DeterministicExecutionProtocol {
}
let mut task = ExecutionTask::new(&request.task_id, request.title);
task.issue_number = request.issue;
task.current_state = TaskState::Pending;
task.dependencies = request.dependencies;
task.soft_dependencies = request.soft_dependencies;
@ -222,6 +229,21 @@ impl DeterministicExecutionProtocol {
.has_conflict(files)
.map_err(ProtocolError::from)?;
if conflict.conflicting {
self.append_event(
task_id,
TaskEventType::GateRejected,
agent,
node,
snapshot.version + 1,
serde_json::json!({
"gate": Gate::Gate4.label(),
"detail": format!(
"lock conflict held by {}",
conflict.held_by.as_deref().unwrap_or("unknown")
),
"files": files,
}),
)?;
return Err(ProtocolError::gate_rejected(format!(
"lock conflict held by {}",
conflict.held_by.unwrap_or_else(|| "unknown".to_string())
@ -231,6 +253,7 @@ impl DeterministicExecutionProtocol {
self.lock_manager
.acquire_lock(task_id, agent, node, files)
.map_err(ProtocolError::from)?;
self.attach_context(task_id, agent, node)?;
let task = self.transition_task(task_id, TaskState::Implementing, agent, node)?;
Ok(AssignmentResult {
@ -389,6 +412,43 @@ impl DeterministicExecutionProtocol {
Ok(compute_dag(&snapshot))
}
pub fn dream(
&self,
since: Option<chrono::Duration>,
actor: &str,
node: &str,
) -> ProtocolResult<DreamReport> {
let report = crate::dream::dream(&self.event_store, since).map_err(ProtocolError::from)?;
let version = self
.snapshot_store
.load()
.map_err(ProtocolError::from)?
.version;
self.append_event(
"__dream__",
TaskEventType::DreamRecorded,
actor,
node,
version,
serde_json::json!({
"events_processed": report.events_processed,
"gate_rejections": report.patterns.gate_rejections.len(),
"lock_conflicts": report.patterns.lock_conflicts.len(),
"learnings": report.learnings.len(),
}),
)?;
Ok(report)
}
pub fn attach_context(
&self,
task_id: &str,
actor: &str,
node: &str,
) -> ProtocolResult<Vec<ContextAttachment>> {
self.attach_context_with_limit(task_id, actor, node, MAX_CONTEXT_TOKENS)
}
pub fn dispatchable(&self) -> ProtocolResult<DispatchableReport> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
let tasks = snapshot
@ -413,6 +473,33 @@ impl DeterministicExecutionProtocol {
Ok(DispatchableReport { tasks })
}
fn attach_context_with_limit(
&self,
task_id: &str,
actor: &str,
node: &str,
max_context_tokens: usize,
) -> ProtocolResult<Vec<ContextAttachment>> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
let task = snapshot
.get_task(task_id)
.cloned()
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?;
let attachments = self.build_context_attachments(&task, max_context_tokens)?;
let payload = serde_json::to_value(&attachments).map_err(Error::from)?;
self.update_task(
task_id,
actor,
node,
TaskEventType::ContextAttached,
|task| {
task.context_attachments = attachments.clone();
Ok(payload.clone())
},
)?;
Ok(attachments)
}
fn transition_task(
&self,
task_id: &str,
@ -514,6 +601,87 @@ impl DeterministicExecutionProtocol {
)
}
fn build_context_attachments(
&self,
task: &ExecutionTask,
max_context_tokens: usize,
) -> ProtocolResult<Vec<ContextAttachment>> {
let mut attachments = Vec::new();
let mut remaining_tokens = max_context_tokens;
if task.issue_number > 0 {
push_attachment(
&mut attachments,
&mut remaining_tokens,
"issue",
&format!("github://issue/{}", task.issue_number),
&format!("Issue #{}", task.issue_number),
);
}
if let Some(impact) = &task.impact {
push_attachment(
&mut attachments,
&mut remaining_tokens,
"impact",
&format!("dtp://impact/{}", task.id),
&format!(
"risk_level: {:?}\naffected_symbols: {}",
impact.risk_level, impact.affected_symbols
),
);
}
if let Some(lock) = &task.lock {
for file in &lock.affected_files {
if remaining_tokens == 0 {
break;
}
let source_path = self.resolve_attachment_path(file);
if !source_path.exists() {
continue;
}
let content = read_file_snippet(&source_path, FILE_SNIPPET_LINE_LIMIT)
.map_err(ProtocolError::from)?;
push_attachment(
&mut attachments,
&mut remaining_tokens,
"file_snippet",
&source_path.display().to_string(),
&content,
);
}
}
Ok(attachments)
}
fn resolve_attachment_path(&self, source: &str) -> PathBuf {
let path = PathBuf::from(source);
if path.is_absolute() {
return path;
}
let store_relative = self
.snapshot_store
.path()
.parent()
.map(|base| base.join(path))
.unwrap_or_else(|| PathBuf::from(source));
if store_relative.exists() {
return store_relative;
}
let cwd_relative = std::env::current_dir()
.map(|cwd| cwd.join(source))
.unwrap_or_else(|_| PathBuf::from(source));
if cwd_relative.exists() {
return cwd_relative;
}
store_relative
}
fn unblock_dependents_after_merge(
&self,
task_id: &str,
@ -660,6 +828,73 @@ fn dependencies_satisfied(task: &ExecutionTask, snapshot: &TasksSnapshot) -> boo
})
}
fn push_attachment(
attachments: &mut Vec<ContextAttachment>,
remaining_tokens: &mut usize,
attachment_type: &str,
source: &str,
content: &str,
) {
if *remaining_tokens == 0 {
return;
}
let truncated_content = truncate_to_token_budget(content, *remaining_tokens);
let token_estimate = estimate_tokens(&truncated_content);
if token_estimate == 0 {
return;
}
*remaining_tokens = remaining_tokens.saturating_sub(token_estimate);
attachments.push(ContextAttachment {
attachment_type: attachment_type.to_string(),
source: source.to_string(),
content: truncated_content,
token_estimate,
});
}
fn truncate_to_token_budget(content: &str, max_tokens: usize) -> String {
if max_tokens == 0 {
return String::new();
}
let max_chars = max_tokens.saturating_mul(4);
let content_chars = content.chars().count();
if content_chars <= max_chars {
return content.to_string();
}
let truncated: String = content.chars().take(max_chars).collect();
if max_chars >= 3 {
format!(
"{}...",
truncated.chars().take(max_chars - 3).collect::<String>()
)
} else {
truncated
}
}
fn estimate_tokens(content: &str) -> usize {
let char_count = content.chars().count();
if char_count == 0 {
0
} else {
char_count.div_ceil(4)
}
}
fn read_file_snippet(path: &Path, max_lines: usize) -> Result<String, Error> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut lines = Vec::new();
for line in reader.lines().take(max_lines) {
lines.push(line?);
}
Ok(lines.join("\n"))
}
fn recompute_dependents(snapshot: &mut TasksSnapshot) {
let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
for task in &snapshot.tasks {
@ -733,6 +968,7 @@ fn compute_dag(snapshot: &TasksSnapshot) -> DagReport {
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
fn fixture() -> (TempDir, DeterministicExecutionProtocol) {
@ -919,6 +1155,128 @@ mod tests {
assert!(protocol.locks().unwrap().is_empty());
}
#[test]
fn attach_context_collects_issue_impact_and_file_snippets() {
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
fs::create_dir_all(&src_dir).unwrap();
let file_path = src_dir.join("lib.rs");
let file_content = (1..=35)
.map(|line| format!("line {line}"))
.collect::<Vec<_>>()
.join("\n");
fs::write(&file_path, file_content).unwrap();
protocol
.register(
RegisterTaskRequest {
issue: 42,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_impact(
"phase-a",
ImpactInput {
risk_level: ImpactRiskLevel::Low,
affected_symbols: 7,
depth1: vec!["attach_context".into()],
analyzed_commit: None,
input_hash: None,
approve: false,
},
"codex",
"macbook",
)
.unwrap();
protocol
.assign("phase-a", "codex", "macbook", &[String::from("src/lib.rs")])
.unwrap();
let attachments = protocol
.attach_context("phase-a", "codex", "macbook")
.unwrap();
assert_eq!(attachments.len(), 3);
assert_eq!(attachments[0].attachment_type, "issue");
assert_eq!(attachments[0].content, "Issue #42");
assert_eq!(attachments[1].attachment_type, "impact");
assert!(attachments[1].content.contains("affected_symbols: 7"));
assert_eq!(attachments[2].attachment_type, "file_snippet");
assert!(attachments[2].content.contains("line 1"));
assert!(attachments[2].content.contains("line 30"));
assert!(!attachments[2].content.contains("line 31"));
let task = match protocol.status(Some("phase-a")).unwrap() {
StatusReport::Task(task) => task,
StatusReport::Snapshot(_) => panic!("expected task status"),
};
assert_eq!(task.issue_number, 42);
assert_eq!(task.context_attachments.len(), 3);
}
#[test]
fn attach_context_trims_to_token_budget() {
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
fs::create_dir_all(&src_dir).unwrap();
fs::write(
src_dir.join("big.rs"),
"abcdefghijklmnopqrstuvwxyz".repeat(40),
)
.unwrap();
protocol
.register(
RegisterTaskRequest {
issue: 9,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_impact(
"phase-a",
ImpactInput {
risk_level: ImpactRiskLevel::Low,
affected_symbols: 99,
depth1: vec!["attach_context".into()],
analyzed_commit: None,
input_hash: None,
approve: false,
},
"codex",
"macbook",
)
.unwrap();
protocol
.assign("phase-a", "codex", "macbook", &[String::from("src/big.rs")])
.unwrap();
let attachments = protocol
.attach_context_with_limit("phase-a", "codex", "macbook", 8)
.unwrap();
let total_tokens: usize = attachments.iter().map(|item| item.token_estimate).sum();
assert!(total_tokens <= 8);
assert!(!attachments.is_empty());
}
#[test]
fn assign_rejects_high_risk_without_human_approval() {
let (_tmp, protocol) = fixture();

View file

@ -106,10 +106,20 @@ pub struct TaskLockSnapshot {
pub affected_files: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContextAttachment {
pub attachment_type: String,
pub source: String,
pub content: String,
pub token_estimate: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ExecutionTask {
pub id: String,
pub title: String,
#[serde(default)]
pub issue_number: u64,
pub current_state: TaskState,
pub dependencies: Vec<String>,
pub dependents: Vec<String>,
@ -120,6 +130,8 @@ pub struct ExecutionTask {
pub github_evidence: Option<GitHubEvidence>,
pub completion_mode: CompletionMode,
pub human_approval: Option<HumanApproval>,
#[serde(default)]
pub context_attachments: Vec<ContextAttachment>,
pub priority: u32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
@ -131,6 +143,7 @@ impl ExecutionTask {
Self {
id: id.into(),
title: title.into(),
issue_number: 0,
current_state: TaskState::Draft,
dependencies: Vec::new(),
dependents: Vec::new(),
@ -141,6 +154,7 @@ impl ExecutionTask {
github_evidence: None,
completion_mode: CompletionMode::GithubPr,
human_approval: None,
context_attachments: Vec::new(),
priority: 0,
created_at: now,
updated_at: now,
@ -164,7 +178,9 @@ pub enum TaskEventType {
BranchCreated,
PrCreated,
MergeVerified,
ContextAttached,
AuditRecorded,
DreamRecorded,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -587,6 +603,7 @@ impl LegacyTask {
ExecutionTask {
id: self.id,
title: self.title,
issue_number: 0,
current_state: legacy_state(&self.state),
dependencies: self.dependencies,
dependents: self.dependents,
@ -597,6 +614,7 @@ impl LegacyTask {
github_evidence,
completion_mode: CompletionMode::GithubPr,
human_approval: None,
context_attachments: Vec::new(),
priority: 0,
created_at,
updated_at,
@ -771,4 +789,36 @@ mod tests {
assert!(task.lock.is_some());
assert!(rebuilt.file_locks.contains_key("src/lib.rs"));
}
#[test]
fn snapshot_load_defaults_missing_context_attachments() {
let tmp = TempDir::new().unwrap();
let snapshot_store = SnapshotStore::new(
tmp.path().join("tasks.snapshot.json"),
tmp.path().join(".tasks.lock"),
);
let mut task_value = serde_json::to_value(sample_task("task-a")).unwrap();
let task_object = task_value.as_object_mut().unwrap();
task_object.remove("context_attachments");
task_object.remove("issue_number");
let raw = serde_json::json!({
"version": 1,
"generated_at": Utc::now(),
"generated_from_event_id": null,
"tasks": [task_value],
"file_locks": {}
});
fs::write(
snapshot_store.path(),
serde_json::to_vec_pretty(&raw).unwrap(),
)
.unwrap();
let snapshot = snapshot_store.load().unwrap();
assert_eq!(snapshot.tasks.len(), 1);
assert_eq!(snapshot.tasks[0].issue_number, 0);
assert!(snapshot.tasks[0].context_attachments.is_empty());
}
}