[追加] Sprint 5: Issue自動作成 + git同期 + Telegram + Heartbeat + Obsidian検索 (#77-81)

This commit is contained in:
林 駿甫 (Shunsuke Hayashi) 2026-04-10 09:48:49 +09:00
parent f5fe5f36cf
commit 17ad33a96a
3 changed files with 581 additions and 26 deletions

View file

@ -213,6 +213,8 @@ enum GateCommand {
Branch { task_id: String, name: String },
/// Attach task context for execution
Attach { task_id: String },
/// Force-refresh task context attachments
Refresh { task_id: String },
/// Record PR creation
Pr { task_id: String, number: u64 },
/// Record merge verification
@ -263,6 +265,12 @@ enum GateCommand {
#[arg(long)]
auto: bool,
},
/// Renew active lock heartbeats
Heartbeat {
/// Renew all implementing task leases
#[arg(long)]
all: bool,
},
}
#[derive(Debug)]
@ -1405,6 +1413,28 @@ fn handle_gate_command(
}
})
}
GateCommand::Refresh { task_id } => {
protocol
.refresh_context(&task_id, actor, &node)
.map(|attachments| {
if matches!(format, OutputFormat::Json) {
println!("{}", serde_json::to_string_pretty(&attachments).unwrap());
} else if attachments.is_empty() {
println!("no context attachments: {}", task_id);
} else {
println!("context refreshed: {}", task_id);
for attachment in attachments {
println!(
"--- [{}] {} ({} tokens)",
attachment.attachment_type,
attachment.source,
attachment.token_estimate
);
println!("{}", attachment.content);
}
}
})
}
GateCommand::Pr { task_id, number } => protocol
.record_pr(&task_id, number, actor, &node)
.map(|task| {
@ -1541,6 +1571,29 @@ fn handle_gate_command(
Ok(())
})
}
GateCommand::Heartbeat { all } => {
if !all {
Err(ProtocolError::input("heartbeat currently requires --all"))
} else {
protocol.heartbeat_all().map(|renewed| {
if matches!(format, OutputFormat::Json) {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"renewed": renewed,
"count": renewed.len(),
}))
.unwrap()
);
} else {
println!("renewed leases: {}", renewed.len());
for task_id in renewed {
println!(" {}", task_id);
}
}
})
}
}
};
Ok(match result {

View file

@ -11,6 +11,7 @@ use crate::store::{
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::fs;
use std::io::{BufRead, BufReader};
@ -21,6 +22,7 @@ use std::time::{Duration, Instant};
const MAX_CONTEXT_TOKENS: usize = 4_000;
const FILE_SNIPPET_LINE_LIMIT: usize = 30;
const ANNOUNCE_CHAR_LIMIT: usize = 180;
const MAX_ATTACHMENT_AGE_HOURS: i64 = 24;
#[derive(Debug, Clone)]
pub struct DeterministicExecutionProtocol {
@ -128,9 +130,6 @@ impl DeterministicExecutionProtocol {
actor: &str,
node: &str,
) -> ProtocolResult<ExecutionTask> {
if request.issue == 0 {
return Err(ProtocolError::input("issue number must be greater than 0"));
}
let mut snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
if snapshot.get_task(&request.task_id).is_some() {
return Err(ProtocolError::input(format!(
@ -140,7 +139,11 @@ impl DeterministicExecutionProtocol {
}
let mut task = ExecutionTask::new(&request.task_id, request.title);
task.issue_number = request.issue;
task.issue_number = if request.issue > 0 {
request.issue
} else {
auto_create_issue(&task.title).unwrap_or(0)
};
task.current_state = TaskState::Pending;
task.dependencies = request.dependencies;
task.soft_dependencies = request.soft_dependencies;
@ -538,6 +541,30 @@ impl DeterministicExecutionProtocol {
Ok(snapshot.file_locks)
}
pub fn heartbeat_all(&self) -> ProtocolResult<Vec<String>> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
let mut renewed = Vec::new();
for task in snapshot
.tasks
.iter()
.filter(|task| task.current_state == TaskState::Implementing && task.lock.is_some())
{
let Some(lock) = task.lock.as_ref() else {
continue;
};
let Some((agent, node)) = parse_lock_owner(&lock.locked_by) else {
continue;
};
self.lock_manager
.renew_lease(&task.id, agent, node)
.map_err(ProtocolError::from)?;
renewed.push(task.id.clone());
}
Ok(renewed)
}
pub fn dag(&self) -> ProtocolResult<DagReport> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
Ok(compute_dag(&snapshot))
@ -583,6 +610,19 @@ impl DeterministicExecutionProtocol {
self.attach_context_with_limit(task_id, actor, node, MAX_CONTEXT_TOKENS)
}
pub fn refresh_context(
&self,
task_id: &str,
actor: &str,
node: &str,
) -> ProtocolResult<Vec<ContextAttachment>> {
self.update_task(task_id, actor, node, TaskEventType::AuditRecorded, |task| {
task.context_attachments.clear();
Ok(serde_json::json!({ "action": "refresh_context" }))
})?;
self.attach_context(task_id, actor, node)
}
pub fn dispatchable(&self) -> ProtocolResult<DispatchableReport> {
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
let tasks = snapshot
@ -619,6 +659,17 @@ impl DeterministicExecutionProtocol {
.get_task(task_id)
.cloned()
.ok_or_else(|| ProtocolError::input(format!("unknown task: {task_id}")))?;
let fresh_attachments = fresh_attachments(&task.context_attachments);
let total_tokens: usize = fresh_attachments
.iter()
.map(|item| item.token_estimate)
.sum();
if !fresh_attachments.is_empty()
&& fresh_attachments.len() == task.context_attachments.len()
&& total_tokens <= max_context_tokens
{
return Ok(fresh_attachments);
}
let attachments = self.build_context_attachments(&task, max_context_tokens)?;
let payload = serde_json::to_value(&attachments).map_err(Error::from)?;
self.update_task(
@ -704,6 +755,9 @@ impl DeterministicExecutionProtocol {
version,
})
.map_err(ProtocolError::from)?;
if event_type == TaskEventType::GateRejected {
notify(&format!("gate rejected: {task_id}"));
}
Ok(())
}
@ -918,6 +972,8 @@ impl DeterministicExecutionProtocol {
.release_lock(task_id)
.map_err(ProtocolError::from)?;
self.unblock_dependents_after_merge(task_id, actor, node)?;
sync_tasks_json(self.snapshot_store.path());
notify(&format!("merged: {task_id}"));
let snapshot = self.snapshot_store.load().map_err(ProtocolError::from)?;
Ok(snapshot.get_task(task_id).cloned().unwrap_or(merged))
@ -1049,9 +1105,21 @@ fn push_attachment(
source: source.to_string(),
content: truncated_content,
token_estimate,
attached_at: Utc::now(),
});
}
fn fresh_attachments(attachments: &[ContextAttachment]) -> Vec<ContextAttachment> {
let now = Utc::now();
attachments
.iter()
.filter(|attachment| {
attachment.attached_at >= now - chrono::Duration::hours(MAX_ATTACHMENT_AGE_HOURS)
})
.cloned()
.collect()
}
fn truncate_to_token_budget(content: &str, max_tokens: usize) -> String {
if max_tokens == 0 {
return String::new();
@ -1088,7 +1156,7 @@ fn is_valid_merge_sha(sha: &str) -> bool {
}
fn fetch_merge_verification(repo: &str, pr_number: u64) -> Result<Option<GhPrViewResponse>, Error> {
let output = match Command::new("gh")
let output = match gh_command()
.args([
"pr",
"view",
@ -1148,6 +1216,156 @@ fn read_file_snippet(path: &Path, max_lines: usize) -> Result<String, Error> {
Ok(lines.join("\n"))
}
fn auto_create_issue(title: &str) -> Option<u64> {
let repo = github_repository_slug()?;
let output = gh_command()
.args([
"issue",
"create",
"--repo",
repo.as_str(),
"--title",
title,
"--json",
"number",
])
.output()
.ok()?;
if !output.status.success() {
eprintln!("warning: failed to auto-create GitHub issue");
return None;
}
let payload: Value = serde_json::from_slice(&output.stdout).ok()?;
payload.get("number").and_then(Value::as_u64)
}
fn gh_command() -> Command {
Command::new(std::env::var("MIYABI_GH_BIN").unwrap_or_else(|_| "gh".to_string()))
}
fn github_repository_slug() -> Option<String> {
std::env::var("GITHUB_REPOSITORY")
.ok()
.filter(|value| !value.trim().is_empty())
.or_else(|| {
std::env::var("REPOSITORY")
.ok()
.filter(|value| !value.trim().is_empty())
})
.or_else(git_origin_github_remote)
}
fn git_origin_github_remote() -> Option<String> {
let output = Command::new("git")
.args(["remote", "get-url", "origin"])
.output()
.ok()?;
if !output.status.success() {
return None;
}
parse_github_remote(&String::from_utf8_lossy(&output.stdout))
}
fn parse_github_remote(remote: &str) -> Option<String> {
let trimmed = remote.trim();
let slug = if let Some(rest) = trimmed.strip_prefix("git@github.com:") {
rest
} else if let Some(index) = trimmed.find("github.com/") {
&trimmed[(index + "github.com/".len())..]
} else {
return None;
};
let slug = slug.trim_end_matches(".git").trim_matches('/');
if slug.is_empty() {
None
} else {
Some(slug.to_string())
}
}
fn parse_lock_owner(locked_by: &str) -> Option<(&str, &str)> {
locked_by.split_once('@')
}
fn sync_tasks_json(store_path: &Path) {
if cfg!(test) {
return;
}
let relative = std::env::current_dir()
.ok()
.and_then(|cwd| store_path.strip_prefix(&cwd).ok().map(Path::to_path_buf))
.unwrap_or_else(|| store_path.to_path_buf());
let add_status = Command::new("git").arg("add").arg(&relative).status();
match add_status {
Ok(status) if status.success() => {}
_ => {
eprintln!(
"warning: failed to stage {} for git sync",
relative.display()
);
return;
}
}
let commit_status = Command::new("git")
.args(["commit", "-m", "[自動] tasks.json sync"])
.status();
match commit_status {
Ok(status) if status.success() => {}
_ => {
eprintln!("warning: failed to commit tasks.json sync");
return;
}
}
let push_status = Command::new("git").arg("push").status();
if !matches!(push_status, Ok(status) if status.success()) {
eprintln!("warning: failed to push tasks.json sync");
}
}
fn notify(message: &str) {
if cfg!(test) {
return;
}
let Some(token) = std::env::var("TELEGRAM_BOT_TOKEN")
.ok()
.filter(|value| !value.trim().is_empty())
else {
return;
};
let Some(chat_id) = std::env::var("TELEGRAM_CHAT_ID")
.ok()
.filter(|value| !value.trim().is_empty())
else {
return;
};
let url = format!("https://api.telegram.org/bot{token}/sendMessage");
let status = Command::new("curl")
.args([
"-sS",
"-X",
"POST",
url.as_str(),
"-d",
&format!("chat_id={chat_id}"),
"-d",
&format!("text={message}"),
])
.status();
let Ok(status) = status else {
eprintln!("warning: telegram notification could not start curl");
return;
};
if !status.success() {
eprintln!("warning: telegram notification failed");
}
}
fn obsidian_vault_path() -> Option<PathBuf> {
std::env::var("OBSIDIAN_VAULT_PATH")
.ok()
@ -1306,7 +1524,9 @@ fn compute_dag(snapshot: &TasksSnapshot) -> DagReport {
#[cfg(test)]
mod tests {
use super::*;
use std::ffi::OsString;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::sync::{Mutex, OnceLock};
use tempfile::TempDir;
@ -1315,7 +1535,7 @@ mod tests {
LOCK.get_or_init(|| Mutex::new(()))
}
fn path_env_lock() -> &'static Mutex<()> {
fn gh_env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
@ -1327,6 +1547,37 @@ mod tests {
(tmp, protocol)
}
struct EnvVarGuard {
key: &'static str,
original: Option<OsString>,
}
impl EnvVarGuard {
fn set(key: &'static str, value: impl AsRef<std::ffi::OsStr>) -> Self {
let original = std::env::var_os(key);
std::env::set_var(key, value);
Self { key, original }
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
if let Some(value) = &self.original {
std::env::set_var(self.key, value);
} else {
std::env::remove_var(self.key);
}
}
}
fn install_mock_command(dir: &Path, name: &str, script: &str) {
let path = dir.join(name);
fs::write(&path, script).unwrap();
let mut permissions = fs::metadata(&path).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(&path, permissions).unwrap();
}
#[test]
fn protocol_stops_at_first_failed_gate_and_records_events() {
let (_tmp, protocol) = fixture();
@ -1408,9 +1659,16 @@ mod tests {
}
#[test]
fn register_rejects_issue_zero() {
let (_tmp, protocol) = fixture();
let err = protocol
fn register_with_issue_zero_continues_without_issue_when_gh_fails() {
let _guard = gh_env_lock().lock().unwrap_or_else(|err| err.into_inner());
let (tmp, protocol) = fixture();
let bin_dir = tmp.path().join("bin");
fs::create_dir_all(&bin_dir).unwrap();
let gh_path = bin_dir.join("gh");
install_mock_command(&bin_dir, "gh", "#!/bin/sh\nexit 1\n");
let _gh_guard = EnvVarGuard::set("MIYABI_GH_BIN", &gh_path);
let task = protocol
.register(
RegisterTaskRequest {
issue: 0,
@ -1424,12 +1682,39 @@ mod tests {
"codex",
"macbook",
)
.unwrap_err();
.unwrap();
assert!(matches!(err, ProtocolError::Input(_)));
assert!(err
.to_string()
.contains("issue number must be greater than 0"));
assert_eq!(task.issue_number, 0);
}
#[test]
fn register_with_issue_zero_uses_auto_created_issue_number() {
let _guard = gh_env_lock().lock().unwrap_or_else(|err| err.into_inner());
let (tmp, protocol) = fixture();
let bin_dir = tmp.path().join("bin");
fs::create_dir_all(&bin_dir).unwrap();
let gh_path = bin_dir.join("gh");
install_mock_command(&bin_dir, "gh", "#!/bin/sh\nprintf '{\"number\": 78}'\n");
let _gh_guard = EnvVarGuard::set("MIYABI_GH_BIN", &gh_path);
let _repo_guard = EnvVarGuard::set("GITHUB_REPOSITORY", "owner/repo");
let task = protocol
.register(
RegisterTaskRequest {
issue: 0,
task_id: "issue-auto".into(),
title: "Auto Issue".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
assert_eq!(task.issue_number, 78);
}
#[test]
@ -1506,7 +1791,9 @@ mod tests {
#[test]
fn attach_context_collects_issue_impact_and_file_snippets() {
let _guard = obsidian_env_lock().lock().unwrap();
let _guard = obsidian_env_lock()
.lock()
.unwrap_or_else(|err| err.into_inner());
std::env::remove_var("OBSIDIAN_VAULT_PATH");
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
@ -1584,7 +1871,9 @@ mod tests {
#[test]
fn attach_context_trims_to_token_budget() {
let _guard = obsidian_env_lock().lock().unwrap();
let _guard = obsidian_env_lock()
.lock()
.unwrap_or_else(|err| err.into_inner());
std::env::remove_var("OBSIDIAN_VAULT_PATH");
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
@ -1640,7 +1929,9 @@ mod tests {
#[test]
fn attach_context_with_obsidian_vault_finds_matching_notes() {
let _guard = obsidian_env_lock().lock().unwrap();
let _guard = obsidian_env_lock()
.lock()
.unwrap_or_else(|err| err.into_inner());
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
let vault_dir = tmp.path().join("vault");
@ -1704,6 +1995,128 @@ mod tests {
}));
}
#[test]
fn attach_context_rebuilds_stale_attachments() {
let _guard = obsidian_env_lock()
.lock()
.unwrap_or_else(|err| err.into_inner());
std::env::remove_var("OBSIDIAN_VAULT_PATH");
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
fs::create_dir_all(&src_dir).unwrap();
fs::write(src_dir.join("lib.rs"), "line 1\nline 2\n").unwrap();
protocol
.register(
RegisterTaskRequest {
issue: 7,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_impact(
"phase-a",
ImpactInput {
risk_level: ImpactRiskLevel::Low,
affected_symbols: 1,
depth1: vec!["attach_context".into()],
analyzed_commit: None,
input_hash: None,
approve: false,
},
"codex",
"macbook",
)
.unwrap();
protocol
.assign("phase-a", "codex", "macbook", &[String::from("src/lib.rs")])
.unwrap();
let mut snapshot = protocol.snapshot_store.load().unwrap();
let version = snapshot.version;
let task = snapshot.get_task_mut("phase-a").unwrap();
for attachment in &mut task.context_attachments {
attachment.attached_at = Utc::now() - chrono::Duration::hours(25);
}
protocol.snapshot_store.save(&snapshot, version).unwrap();
let attachments = protocol
.attach_context("phase-a", "codex", "macbook")
.unwrap();
assert!(!attachments.is_empty());
assert!(attachments.iter().all(|attachment| {
attachment.attached_at >= Utc::now() - chrono::Duration::hours(24)
}));
}
#[test]
fn refresh_context_replaces_existing_attachments() {
let _guard = obsidian_env_lock()
.lock()
.unwrap_or_else(|err| err.into_inner());
std::env::remove_var("OBSIDIAN_VAULT_PATH");
let (tmp, protocol) = fixture();
let src_dir = tmp.path().join("src");
fs::create_dir_all(&src_dir).unwrap();
fs::write(src_dir.join("lib.rs"), "line 1\nline 2\n").unwrap();
protocol
.register(
RegisterTaskRequest {
issue: 11,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_impact(
"phase-a",
ImpactInput {
risk_level: ImpactRiskLevel::Low,
affected_symbols: 1,
depth1: vec!["refresh_context".into()],
analyzed_commit: None,
input_hash: None,
approve: false,
},
"codex",
"macbook",
)
.unwrap();
protocol
.assign("phase-a", "codex", "macbook", &[String::from("src/lib.rs")])
.unwrap();
let first = protocol
.attach_context("phase-a", "codex", "macbook")
.unwrap();
let refreshed = protocol
.refresh_context("phase-a", "codex", "macbook")
.unwrap();
assert_eq!(first.len(), refreshed.len());
assert!(refreshed
.iter()
.zip(first.iter())
.all(|(left, right)| left.attached_at >= right.attached_at));
}
#[test]
fn assign_rejects_high_risk_without_human_approval() {
let (_tmp, protocol) = fixture();
@ -1972,9 +2385,8 @@ mod tests {
#[test]
fn verify_merge_with_valid_sha_passes_via_fallback() {
let _guard = path_env_lock().lock().unwrap();
let original_path = std::env::var_os("PATH");
std::env::set_var("PATH", "");
let _guard = gh_env_lock().lock().unwrap_or_else(|err| err.into_inner());
let _gh_guard = EnvVarGuard::set("MIYABI_GH_BIN", "__missing_gh__");
let (_tmp, protocol) = fixture();
protocol
@ -2015,12 +2427,6 @@ mod tests {
.and_then(|evidence| evidence.merge_commit_sha.as_deref()),
Some("0123456789abcdef0123456789abcdef01234567")
);
if let Some(path) = original_path {
std::env::set_var("PATH", path);
} else {
std::env::remove_var("PATH");
}
}
#[test]
@ -2093,4 +2499,57 @@ mod tests {
assert_eq!(task.current_state, TaskState::Done);
}
#[test]
fn heartbeat_all_renews_implementing_task_leases() {
let (_tmp, protocol) = fixture();
protocol
.register(
RegisterTaskRequest {
issue: 1,
task_id: "phase-a".into(),
title: "Phase A".into(),
dependencies: vec![],
soft_dependencies: vec![],
priority: 0,
completion_mode: CompletionMode::GithubPr,
},
"codex",
"macbook",
)
.unwrap();
protocol
.record_impact(
"phase-a",
ImpactInput {
risk_level: ImpactRiskLevel::Low,
affected_symbols: 1,
depth1: vec!["heartbeat".into()],
analyzed_commit: None,
input_hash: None,
approve: false,
},
"codex",
"macbook",
)
.unwrap();
protocol
.assign("phase-a", "codex", "macbook", &[String::from("src/a.rs")])
.unwrap();
let before_heartbeat = match protocol.status(Some("phase-a")).unwrap() {
StatusReport::Task(task) => task.lock.unwrap().last_heartbeat,
StatusReport::Snapshot(_) => panic!("expected task"),
};
std::thread::sleep(std::time::Duration::from_millis(5));
let renewed = protocol.heartbeat_all().unwrap();
assert_eq!(renewed, vec!["phase-a".to_string()]);
let after_heartbeat = match protocol.status(Some("phase-a")).unwrap() {
StatusReport::Task(task) => task.lock.unwrap().last_heartbeat,
StatusReport::Snapshot(_) => panic!("expected task"),
};
assert!(after_heartbeat > before_heartbeat);
}
}

View file

@ -112,6 +112,8 @@ pub struct ContextAttachment {
pub source: String,
pub content: String,
pub token_estimate: usize,
#[serde(default = "default_attachment_attached_at")]
pub attached_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -137,6 +139,10 @@ pub struct ExecutionTask {
pub updated_at: DateTime<Utc>,
}
fn default_attachment_attached_at() -> DateTime<Utc> {
Utc::now()
}
impl ExecutionTask {
pub fn new(id: impl Into<String>, title: impl Into<String>) -> Self {
let now = Utc::now();
@ -821,4 +827,41 @@ mod tests {
assert_eq!(snapshot.tasks[0].issue_number, 0);
assert!(snapshot.tasks[0].context_attachments.is_empty());
}
#[test]
fn snapshot_load_defaults_missing_attachment_timestamp() {
let tmp = TempDir::new().unwrap();
let snapshot_store = SnapshotStore::new(
tmp.path().join("tasks.snapshot.json"),
tmp.path().join(".tasks.lock"),
);
let mut task_value = serde_json::to_value(sample_task("task-a")).unwrap();
let task_object = task_value.as_object_mut().unwrap();
task_object.insert(
"context_attachments".into(),
serde_json::json!([{
"attachment_type": "issue",
"source": "github://issue/1",
"content": "Issue #1",
"token_estimate": 2
}]),
);
let raw = serde_json::json!({
"version": 1,
"generated_at": Utc::now(),
"generated_from_event_id": null,
"tasks": [task_value],
"file_locks": {}
});
fs::write(
snapshot_store.path(),
serde_json::to_vec_pretty(&raw).unwrap(),
)
.unwrap();
let snapshot = snapshot_store.load().unwrap();
assert_eq!(snapshot.tasks[0].context_attachments.len(), 1);
}
}