diff --git a/docs/design/auto-memory-refresh.md b/docs/design/auto-memory-refresh.md new file mode 100644 index 00000000..5e2f5929 --- /dev/null +++ b/docs/design/auto-memory-refresh.md @@ -0,0 +1,416 @@ +# Auto Memory Refresh 实现方案 + +## 概述 + +在上下文压缩(compaction)发生之前,自动触发一个特殊的 agent turn,让 agent 分析即将被删除的消息,提取关键信息并写入 memory 文件,防止重要上下文丢失。 + +## OpenClaw 的实现分析 + +### 核心机制 + +OpenClaw 采用 **Pre-compaction Memory Flush** 策略: + +``` +Session 运行中,token 累积 + ↓ +totalTokens >= (contextWindow - reserveTokens - softThreshold)? + ↓ YES +触发 Memory Flush Turn(特殊的 agent 对话轮次) + ↓ +Agent 分析会话,将重要信息保存到 memory/YYYY-MM-DD.md + ↓ +然后才执行 Compaction(删除旧消息) +``` + +### 关键设计点 + +1. **Soft Threshold(软阈值)** + - 默认值:`4000 tokens` + - 触发条件:`totalTokens >= contextWindow - reserveTokens - softThreshold` + - 在真正达到 compaction 阈值之前就触发 memory flush + +2. **Memory Flush Prompt** + ``` + Pre-compaction memory flush. Store durable memories now + (use memory/YYYY-MM-DD.md; create memory/ if needed). + If nothing to store, reply with [SILENT]. + ``` + +3. **防重复机制** + - 使用 `memoryFlushCompactionCount` 追踪 + - 确保每次 compaction 周期只触发一次 flush + +4. **Memory 文件结构** + ``` + ~/.super-multica/agent-profiles// + ├── memory.md # 主 memory 文件 + └── memory/ + ├── 2024-01-15.md # 日期分片 + ├── 2024-01-16.md + └── topics/ + └── project-x.md # 主题分片 + ``` + +--- + +## Super Multica 实现方案 + +### Phase 1: 核心实现 + +#### 1.1 新增配置项 + +**文件:** `src/agent/session/session-manager.ts` + +```typescript +export type SessionManagerOptions = { + // ... existing options ... + + // Memory Flush 配置 + /** 是否启用自动 memory flush(默认:true) */ + enableMemoryFlush?: boolean | undefined; + /** Memory flush 软阈值(在 compaction 前多少 tokens 触发),默认 4000 */ + memoryFlushSoftTokens?: number | undefined; +}; +``` + +#### 1.2 新增 Memory Flush 模块 + +**文件:** `src/agent/memory/memory-flush.ts` + +```typescript +/** Memory flush 配置 */ +export type MemoryFlushSettings = { + /** 软阈值(tokens),在达到 compaction 阈值前多少 tokens 触发 */ + softThresholdTokens: number; + /** Memory flush 使用的系统 prompt */ + systemPrompt: string; + /** Memory flush 使用的用户 prompt */ + userPrompt: string; +}; + +export const DEFAULT_MEMORY_FLUSH_SETTINGS: MemoryFlushSettings = { + softThresholdTokens: 4000, + systemPrompt: `You are in a pre-compaction memory flush turn. The session is approaching context limit and old messages will be deleted soon. + +Your task: Review the conversation and extract any important information that should be preserved in long-term memory. Focus on: +- User preferences and settings +- Key decisions made +- Important technical details or solutions +- Project-specific knowledge +- Anything the user would want remembered in future sessions + +Use the memory_write tool to save important information. If there's nothing worth saving, respond with [SILENT].`, + + userPrompt: `[SYSTEM] Pre-compaction memory flush triggered. Please review recent conversation and save any important information to memory before context compression occurs.`, +}; + +/** 检查是否应该触发 memory flush */ +export function shouldRunMemoryFlush(params: { + currentTokens: number; + contextWindowTokens: number; + reserveTokens: number; + softThresholdTokens: number; + lastMemoryFlushCompactionCount?: number; + currentCompactionCount: number; +}): boolean { + const { + currentTokens, + contextWindowTokens, + reserveTokens, + softThresholdTokens, + lastMemoryFlushCompactionCount, + currentCompactionCount, + } = params; + + // 如果当前 compaction 周期已经 flush 过,不再触发 + if (lastMemoryFlushCompactionCount === currentCompactionCount) { + return false; + } + + // 计算 flush 阈值 + const flushThreshold = contextWindowTokens - reserveTokens - softThresholdTokens; + + return currentTokens >= flushThreshold; +} +``` + +#### 1.3 扩展 SessionEntry 类型 + +**文件:** `src/agent/session/types.ts` + +```typescript +export type SessionMeta = { + // ... existing fields ... + + /** 上次 memory flush 的时间戳 */ + memoryFlushAt?: number; + /** 上次 memory flush 时的 compaction 计数 */ + memoryFlushCompactionCount?: number; + /** Compaction 次数 */ + compactionCount?: number; +}; +``` + +#### 1.4 修改 Agent Runner + +**文件:** `src/agent/runner.ts` + +```typescript +// 在 maybeCompact 之前检查并执行 memory flush +private async maybeCompact() { + const messages = this.agent.state.messages.slice(); + + // Phase 0: Check if memory flush is needed + if (this.enableMemoryFlush) { + const shouldFlush = shouldRunMemoryFlush({ + currentTokens: this.estimateCurrentTokens(messages), + contextWindowTokens: this.contextWindowGuard.tokens, + reserveTokens: this.reserveTokens, + softThresholdTokens: this.memoryFlushSoftTokens, + lastMemoryFlushCompactionCount: this.session.getMeta()?.memoryFlushCompactionCount, + currentCompactionCount: this.session.getCompactionCount(), + }); + + if (shouldFlush) { + await this.runMemoryFlush(); + } + } + + // 继续原有的 compaction 逻辑... + if (!this.session.needsCompaction(messages)) return; + // ... +} + +private async runMemoryFlush() { + this.emitMulticaEvent({ type: "memory_flush_start" }); + + try { + // 创建一个临时的 agent turn 来执行 memory flush + const flushResult = await this.executeMemoryFlushTurn(); + + // 更新 session metadata + this.session.saveMeta({ + ...this.session.getMeta(), + memoryFlushAt: Date.now(), + memoryFlushCompactionCount: this.session.getCompactionCount(), + }); + + this.emitMulticaEvent({ + type: "memory_flush_end", + saved: flushResult.memoriesSaved, + }); + } catch (error) { + console.error("[Agent] Memory flush failed:", error); + // Memory flush 失败不阻塞 compaction + } +} + +private async executeMemoryFlushTurn(): Promise<{ memoriesSaved: number }> { + // 使用特殊的 system prompt 和 user prompt + // 让 agent 分析当前会话并保存重要信息 + // 只允许使用 memory_write 工具 + + const originalSystemPrompt = this.agent.state.systemPrompt; + const originalTools = this.agent.state.tools; + + try { + // 临时切换到 memory flush 模式 + this.agent.setSystemPrompt(this.memoryFlushSettings.systemPrompt); + this.agent.setTools([this.memoryWriteTool]); // 只允许 memory_write + + // 执行一个 agent turn + await this.agent.run(this.memoryFlushSettings.userPrompt); + + // 统计保存了多少 memory + return { memoriesSaved: this.countMemoryWriteCalls() }; + } finally { + // 恢复原始设置 + this.agent.setSystemPrompt(originalSystemPrompt); + this.agent.setTools(originalTools); + } +} +``` + +#### 1.5 新增 Memory Flush Events + +**文件:** `src/agent/events.ts` + +```typescript +/** Memory flush 开始事件 */ +export type MemoryFlushStartEvent = { + type: "memory_flush_start"; +}; + +/** Memory flush 结束事件 */ +export type MemoryFlushEndEvent = { + type: "memory_flush_end"; + /** 保存的 memory 条目数 */ + saved: number; +}; + +/** Union of all Multica-specific events */ +export type MulticaEvent = + | CompactionStartEvent + | CompactionEndEvent + | MemoryFlushStartEvent + | MemoryFlushEndEvent; +``` + +--- + +### Phase 2: Memory 文件管理 + +#### 2.1 Memory 文件结构 + +``` +~/.super-multica/agent-profiles// +├── identity.md # 身份设定(已有) +├── memory.md # 主 memory 文件(已有) +├── memory/ # Memory 分片目录(新增) +│ ├── 2024-01-15.md # 日期分片 +│ ├── 2024-01-16.md +│ └── ... +└── sessions/ # Session 记录(已有) +``` + +#### 2.2 Memory Write Tool 增强 + +**文件:** `src/agent/tools/memory/memory-write.ts` + +```typescript +// 支持写入日期分片 +export function resolveMemoryPath( + profileDir: string, + targetFile?: string +): string { + if (!targetFile) { + // 默认写入今天的日期分片 + const today = new Date().toISOString().split('T')[0]; // YYYY-MM-DD + const memoryDir = path.join(profileDir, 'memory'); + ensureDirSync(memoryDir); + return path.join(memoryDir, `${today}.md`); + } + + // 支持指定文件 + if (targetFile.startsWith('memory/')) { + return path.join(profileDir, targetFile); + } + + return path.join(profileDir, 'memory.md'); +} +``` + +--- + +### Phase 3: 前端集成 + +#### 3.1 SDK 事件类型 + +**文件:** `packages/sdk/src/types.ts` + +```typescript +export type LocalChatEvent = + | MessageStartEvent + | MessageUpdateEvent + | MessageEndEvent + | ToolExecutionStartEvent + | ToolExecutionEndEvent + | CompactionStartEvent + | CompactionEndEvent + | MemoryFlushStartEvent // 新增 + | MemoryFlushEndEvent; // 新增 +``` + +#### 3.2 Zustand Store + +**文件:** `packages/store/src/agent-store.ts` + +```typescript +interface AgentState { + // ... existing state ... + + /** 是否正在进行 memory flush */ + memoryFlushing: boolean; + /** 上次 memory flush 信息 */ + lastMemoryFlush: { + timestamp: number; + saved: number; + } | null; +} +``` + +#### 3.3 Desktop UI 提示 + +在 compaction 提示之前显示 "正在保存重要记忆..." + +--- + +## 实现顺序 + +1. **Step 1**: 新增 `memory-flush.ts` 模块,定义类型和判断逻辑 +2. **Step 2**: 扩展 `SessionMeta` 类型,添加 flush 相关字段 +3. **Step 3**: 新增 `MemoryFlushStartEvent` 和 `MemoryFlushEndEvent` 事件 +4. **Step 4**: 修改 `Agent` 类,添加 `runMemoryFlush` 方法 +5. **Step 5**: 修改 `maybeCompact` 流程,在 compaction 前检查并执行 flush +6. **Step 6**: 增强 `memory_write` tool,支持日期分片 +7. **Step 7**: SDK 和 Store 集成 +8. **Step 8**: Desktop UI 提示 + +--- + +## 配置项汇总 + +| 配置项 | 类型 | 默认值 | 说明 | +|--------|------|--------|------| +| `enableMemoryFlush` | boolean | true | 是否启用自动 memory flush | +| `memoryFlushSoftTokens` | number | 4000 | 在 compaction 阈值前多少 tokens 触发 | + +--- + +## 与现有功能的关系 + +``` +Token 累积 + ↓ +达到 Memory Flush 阈值? ──────────────────────┐ + ↓ YES │ +Memory Flush Turn(新功能) │ + ├─ Agent 分析会话 │ + ├─ 调用 memory_write 保存重要信息 │ + └─ 更新 memoryFlushCompactionCount │ + ↓ │ +达到 Compaction 阈值? ←───────────────────────┘ + ↓ YES ↓ NO +┌─────────────────────────────┐ │ +│ Tool Result Pruning(已实现)│ │ +│ Soft-trim / Hard-clear │ │ +└─────────────────────────────┘ │ + ↓ │ +Message Compaction │ + ├─ 删除旧消息 │ + └─ 或生成摘要 │ + ↓ │ +继续会话 ←──────────────────────────────────┘ +``` + +--- + +## 风险和注意事项 + +1. **Token 消耗**: Memory flush turn 本身会消耗 tokens,需要控制 prompt 长度 +2. **循环触发**: 需要 `memoryFlushCompactionCount` 防止重复触发 +3. **Tool 限制**: Flush turn 应该只允许 `memory_write`,防止执行其他操作 +4. **超时处理**: Flush turn 需要有超时机制,不能阻塞太久 +5. **静默响应**: 如果没有需要保存的内容,agent 应该返回 `[SILENT]` 跳过 + +--- + +## 测试计划 + +1. **单元测试**: `shouldRunMemoryFlush` 判断逻辑 +2. **集成测试**: Memory flush turn 执行流程 +3. **E2E 测试**: 完整的 flush → compaction 流程 +4. **边界测试**: + - 连续多次 compaction 只触发一次 flush + - Flush 失败不阻塞 compaction + - Agent 返回 [SILENT] 时正常跳过 diff --git a/docs/design/cron-job-tool.md b/docs/design/cron-job-tool.md new file mode 100644 index 00000000..a219b586 --- /dev/null +++ b/docs/design/cron-job-tool.md @@ -0,0 +1,823 @@ +# Cron Job Tool 实现方案 + +## 概述 + +Cron Job Tool 允许 Agent 创建定时任务,在指定时间或周期性地执行操作。这对于提醒、定期检查、自动化工作流等场景非常有用。 + +## OpenClaw 实现分析 + +### 核心架构 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ CronService │ +├─────────────────────────────────────────────────────────────┤ +│ start() → 加载 jobs, 计算下次运行时间, 启动 timer │ +│ add() → 创建任务, 计算 schedule, 持久化 │ +│ update() → 修改任务, 重新计算 schedule │ +│ remove() → 删除任务 │ +│ run() → 立即执行任务 │ +│ list() → 列出所有任务 │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Timer Loop │ +├─────────────────────────────────────────────────────────────┤ +│ armTimer(nextWakeAtMs) │ +│ ↓ │ +│ onTimer() → runDueJobs() → executeJob() │ +│ ↓ │ +│ Update state, re-arm timer │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Job 类型 + +**Schedule 类型:** +1. **at** - 一次性任务(指定时间戳) +2. **every** - 固定间隔(如每 30 分钟) +3. **cron** - 标准 cron 表达式(5 字段 + 可选时区) + +**Session Target:** +1. **main** - 注入到主会话(作为系统事件) +2. **isolated** - 在独立会话中运行 agent turn + +**Payload 类型:** +1. **systemEvent** - 注入文本到主会话 +2. **agentTurn** - 在独立会话中执行 agent(可指定 model/thinking) + +### 存储结构 + +``` +~/.openclaw/cron/ +├── jobs.json # 所有任务定义 +├── jobs.json.bak # 备份 +└── runs/ + ├── .jsonl # 任务1的运行历史 + └── .jsonl # 任务2的运行历史 +``` + +--- + +## Super Multica 实现方案 + +### Phase 1: 核心数据结构 + +#### 1.1 Job 类型定义 + +**文件:** `src/cron/types.ts` + +```typescript +import type { v7 as uuidv7 } from "uuid"; + +/** Cron 任务调度类型 */ +export type CronSchedule = + | { kind: "at"; atMs: number } // 一次性(时间戳) + | { kind: "every"; everyMs: number; anchorMs?: number } // 固定间隔 + | { kind: "cron"; expr: string; tz?: string }; // Cron 表达式 + +/** 任务执行目标 */ +export type CronSessionTarget = "main" | "isolated"; + +/** 唤醒模式 */ +export type CronWakeMode = "next-heartbeat" | "now"; + +/** 任务载荷 */ +export type CronPayload = + | { + kind: "system-event"; + text: string; // 注入到主会话的文本 + } + | { + kind: "agent-turn"; + message: string; // Agent 执行的 prompt + model?: string; // 可选 model override + thinkingLevel?: string; // 可选 thinking level + timeoutSeconds?: number; // 超时时间 + }; + +/** 任务运行状态 */ +export type CronJobState = { + nextRunAtMs?: number; // 下次运行时间 + runningAtMs?: number; // 正在运行的时间戳(锁) + lastRunAtMs?: number; // 上次运行时间 + lastStatus?: "ok" | "error" | "skipped"; + lastError?: string; + lastDurationMs?: number; +}; + +/** Cron 任务定义 */ +export type CronJob = { + id: string; // UUID + name: string; // 用户友好名称 + description?: string; // 描述 + enabled: boolean; // 是否启用 + deleteAfterRun?: boolean; // 运行后自动删除(一次性任务) + createdAtMs: number; + updatedAtMs: number; + schedule: CronSchedule; + sessionTarget: CronSessionTarget; + wakeMode: CronWakeMode; + payload: CronPayload; + state: CronJobState; +}; + +/** 创建任务的输入 */ +export type CronJobInput = Omit; + +/** 运行日志条目 */ +export type CronRunLogEntry = { + ts: number; + jobId: string; + action: "run" | "skip" | "error"; + status: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + durationMs?: number; + nextRunAtMs?: number; +}; +``` + +#### 1.2 存储层 + +**文件:** `src/cron/store.ts` + +```typescript +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "fs"; +import path from "path"; +import type { CronJob, CronRunLogEntry } from "./types.js"; + +const DEFAULT_CRON_DIR = path.join( + process.env["HOME"] ?? ".", + ".super-multica", + "cron" +); + +export class CronStore { + private readonly jobsPath: string; + private readonly runsDir: string; + private jobs: Map = new Map(); + + constructor(baseDir: string = DEFAULT_CRON_DIR) { + this.jobsPath = path.join(baseDir, "jobs.json"); + this.runsDir = path.join(baseDir, "runs"); + this.ensureDirs(); + } + + private ensureDirs() { + const dir = path.dirname(this.jobsPath); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + if (!existsSync(this.runsDir)) mkdirSync(this.runsDir, { recursive: true }); + } + + load(): CronJob[] { + if (!existsSync(this.jobsPath)) return []; + const data = JSON.parse(readFileSync(this.jobsPath, "utf-8")); + this.jobs = new Map(data.jobs.map((j: CronJob) => [j.id, j])); + return Array.from(this.jobs.values()); + } + + save() { + const jobs = Array.from(this.jobs.values()); + // Backup first + if (existsSync(this.jobsPath)) { + writeFileSync(this.jobsPath + ".bak", readFileSync(this.jobsPath)); + } + writeFileSync(this.jobsPath, JSON.stringify({ jobs }, null, 2)); + } + + get(id: string): CronJob | undefined { + return this.jobs.get(id); + } + + set(job: CronJob) { + this.jobs.set(job.id, job); + this.save(); + } + + delete(id: string): boolean { + const deleted = this.jobs.delete(id); + if (deleted) this.save(); + return deleted; + } + + list(filter?: { enabled?: boolean }): CronJob[] { + let jobs = Array.from(this.jobs.values()); + if (filter?.enabled !== undefined) { + jobs = jobs.filter((j) => j.enabled === filter.enabled); + } + return jobs; + } + + // Run log methods + appendRunLog(jobId: string, entry: CronRunLogEntry) { + const logPath = path.join(this.runsDir, `${jobId}.jsonl`); + const line = JSON.stringify(entry) + "\n"; + writeFileSync(logPath, line, { flag: "a" }); + } + + getRunLogs(jobId: string, limit = 50): CronRunLogEntry[] { + const logPath = path.join(this.runsDir, `${jobId}.jsonl`); + if (!existsSync(logPath)) return []; + const lines = readFileSync(logPath, "utf-8").trim().split("\n"); + return lines.slice(-limit).map((l) => JSON.parse(l)); + } +} +``` + +--- + +### Phase 2: Cron Service + +**文件:** `src/cron/service.ts` + +```typescript +import { v7 as uuidv7 } from "uuid"; +import Croner from "croner"; +import type { CronJob, CronJobInput, CronSchedule } from "./types.js"; +import { CronStore } from "./store.js"; + +export class CronService { + private store: CronStore; + private timer: NodeJS.Timeout | null = null; + private running = false; + + constructor(store?: CronStore) { + this.store = store ?? new CronStore(); + } + + /** 启动服务 */ + async start() { + if (this.running) return; + this.running = true; + this.store.load(); + this.recomputeAllSchedules(); + this.armTimer(); + } + + /** 停止服务 */ + stop() { + this.running = false; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } + + /** 获取服务状态 */ + status() { + const jobs = this.store.list({ enabled: true }); + const nextWake = Math.min( + ...jobs.map((j) => j.state.nextRunAtMs ?? Infinity) + ); + return { + running: this.running, + jobCount: jobs.length, + nextWakeAtMs: nextWake === Infinity ? null : nextWake, + }; + } + + /** 列出任务 */ + list(filter?: { enabled?: boolean }) { + return this.store.list(filter); + } + + /** 添加任务 */ + add(input: CronJobInput): CronJob { + const now = Date.now(); + const job: CronJob = { + ...input, + id: uuidv7(), + createdAtMs: now, + updatedAtMs: now, + state: {}, + }; + this.computeNextRun(job); + this.store.set(job); + this.armTimer(); + return job; + } + + /** 更新任务 */ + update(id: string, patch: Partial): CronJob | null { + const job = this.store.get(id); + if (!job) return null; + + Object.assign(job, patch, { updatedAtMs: Date.now() }); + if (patch.schedule) { + this.computeNextRun(job); + } + this.store.set(job); + this.armTimer(); + return job; + } + + /** 删除任务 */ + remove(id: string): boolean { + return this.store.delete(id); + } + + /** 立即运行任务 */ + async run(id: string, force = false): Promise<{ ok: boolean; reason?: string }> { + const job = this.store.get(id); + if (!job) return { ok: false, reason: "Job not found" }; + if (!job.enabled && !force) return { ok: false, reason: "Job disabled" }; + + await this.executeJob(job); + return { ok: true }; + } + + /** 获取运行日志 */ + getRunLogs(id: string) { + return this.store.getRunLogs(id); + } + + // === Private Methods === + + private computeNextRun(job: CronJob) { + const now = Date.now(); + let nextMs: number; + + switch (job.schedule.kind) { + case "at": + nextMs = job.schedule.atMs; + break; + case "every": + const anchor = job.schedule.anchorMs ?? now; + const interval = job.schedule.everyMs; + const elapsed = now - anchor; + const periods = Math.ceil(elapsed / interval); + nextMs = anchor + periods * interval; + break; + case "cron": + const cron = Croner(job.schedule.expr, { + timezone: job.schedule.tz, + }); + const next = cron.nextRun(); + nextMs = next ? next.getTime() : now + 86400000; // fallback 1 day + break; + } + + job.state.nextRunAtMs = nextMs; + } + + private recomputeAllSchedules() { + for (const job of this.store.list({ enabled: true })) { + this.computeNextRun(job); + this.store.set(job); + } + } + + private armTimer() { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + + const jobs = this.store.list({ enabled: true }); + const nextWake = Math.min( + ...jobs.map((j) => j.state.nextRunAtMs ?? Infinity) + ); + + if (nextWake === Infinity) return; + + const delay = Math.max(0, nextWake - Date.now()); + this.timer = setTimeout(() => this.onTimer(), delay); + } + + private async onTimer() { + const now = Date.now(); + const dueJobs = this.store + .list({ enabled: true }) + .filter((j) => (j.state.nextRunAtMs ?? Infinity) <= now); + + for (const job of dueJobs) { + await this.executeJob(job); + } + + this.armTimer(); + } + + private async executeJob(job: CronJob) { + const startMs = Date.now(); + job.state.runningAtMs = startMs; + this.store.set(job); + + try { + // TODO: 实际执行逻辑 + // - systemEvent: 注入到主会话 + // - agentTurn: 在独立会话中运行 agent + console.log(`[Cron] Executing job: ${job.name} (${job.id})`); + + // 模拟执行 + await new Promise((r) => setTimeout(r, 100)); + + // 更新状态 + job.state.lastRunAtMs = startMs; + job.state.lastStatus = "ok"; + job.state.lastDurationMs = Date.now() - startMs; + job.state.runningAtMs = undefined; + + // 一次性任务处理 + if (job.schedule.kind === "at") { + if (job.deleteAfterRun) { + this.store.delete(job.id); + } else { + job.enabled = false; + } + } else { + this.computeNextRun(job); + } + + this.store.set(job); + this.store.appendRunLog(job.id, { + ts: startMs, + jobId: job.id, + action: "run", + status: "ok", + durationMs: job.state.lastDurationMs, + nextRunAtMs: job.state.nextRunAtMs, + }); + } catch (error) { + job.state.lastRunAtMs = startMs; + job.state.lastStatus = "error"; + job.state.lastError = String(error); + job.state.lastDurationMs = Date.now() - startMs; + job.state.runningAtMs = undefined; + this.computeNextRun(job); + this.store.set(job); + this.store.appendRunLog(job.id, { + ts: startMs, + jobId: job.id, + action: "error", + status: "error", + error: String(error), + durationMs: job.state.lastDurationMs, + }); + } + } +} +``` + +--- + +### Phase 3: Agent Tool + +**文件:** `src/agent/tools/cron/cron-tool.ts` + +```typescript +import type { Tool } from "@mariozechner/pi-agent-core"; +import { CronService } from "../../../cron/service.js"; + +let cronService: CronService | null = null; + +export function getCronService(): CronService { + if (!cronService) { + cronService = new CronService(); + cronService.start(); + } + return cronService; +} + +export const cronTool: Tool = { + name: "cron", + description: `Create, manage, and execute scheduled tasks (cron jobs). + +## Actions + +### list +List all cron jobs. +\`\`\`json +{ "action": "list", "enabled": true } +\`\`\` + +### add +Create a new cron job. +\`\`\`json +{ + "action": "add", + "name": "Daily reminder", + "schedule": { "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" }, + "sessionTarget": "main", + "wakeMode": "now", + "payload": { "kind": "system-event", "text": "Check your todos!" } +} +\`\`\` + +Schedule types: +- \`{ "kind": "at", "atMs": 1704067200000 }\` - One-time at timestamp +- \`{ "kind": "every", "everyMs": 3600000 }\` - Every hour +- \`{ "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" }\` - Cron expression + +### update +Update an existing job. +\`\`\`json +{ "action": "update", "jobId": "xxx", "enabled": false } +\`\`\` + +### remove +Delete a job. +\`\`\`json +{ "action": "remove", "jobId": "xxx" } +\`\`\` + +### run +Execute a job immediately. +\`\`\`json +{ "action": "run", "jobId": "xxx", "force": true } +\`\`\` + +### status +Get cron service status. +\`\`\`json +{ "action": "status" } +\`\`\` +`, + + parameters: { + type: "object", + properties: { + action: { + type: "string", + enum: ["list", "add", "update", "remove", "run", "status"], + description: "The action to perform", + }, + // list + enabled: { type: "boolean", description: "Filter by enabled status" }, + // add + name: { type: "string", description: "Job name" }, + description: { type: "string", description: "Job description" }, + schedule: { type: "object", description: "Schedule configuration" }, + sessionTarget: { + type: "string", + enum: ["main", "isolated"], + description: "Where to run the job", + }, + wakeMode: { + type: "string", + enum: ["next-heartbeat", "now"], + description: "When to wake after job", + }, + payload: { type: "object", description: "Job payload" }, + deleteAfterRun: { type: "boolean", description: "Delete after one-time run" }, + // update/remove/run + jobId: { type: "string", description: "Job ID" }, + // run + force: { type: "boolean", description: "Force run even if disabled" }, + }, + required: ["action"], + }, + + execute: async (params: Record) => { + const service = getCronService(); + const action = params["action"] as string; + + switch (action) { + case "status": + return JSON.stringify(service.status(), null, 2); + + case "list": + const jobs = service.list({ + enabled: params["enabled"] as boolean | undefined, + }); + return JSON.stringify(jobs, null, 2); + + case "add": + const newJob = service.add({ + name: params["name"] as string, + description: params["description"] as string | undefined, + enabled: true, + deleteAfterRun: params["deleteAfterRun"] as boolean | undefined, + schedule: params["schedule"] as any, + sessionTarget: (params["sessionTarget"] as any) ?? "main", + wakeMode: (params["wakeMode"] as any) ?? "next-heartbeat", + payload: params["payload"] as any, + }); + return `Created job: ${newJob.name} (${newJob.id})\nNext run: ${new Date(newJob.state.nextRunAtMs!).toISOString()}`; + + case "update": + const updated = service.update(params["jobId"] as string, params as any); + if (!updated) return "Job not found"; + return `Updated job: ${updated.name}`; + + case "remove": + const removed = service.remove(params["jobId"] as string); + return removed ? "Job removed" : "Job not found"; + + case "run": + const result = await service.run( + params["jobId"] as string, + params["force"] as boolean + ); + return result.ok ? "Job executed" : `Failed: ${result.reason}`; + + default: + return `Unknown action: ${action}`; + } + }, +}; +``` + +--- + +### Phase 4: CLI 命令 + +**文件:** `src/agent/cli/commands/cron.ts` + +```typescript +import { Command } from "commander"; +import { getCronService } from "../../tools/cron/cron-tool.js"; + +export function registerCronCommands(program: Command) { + const cron = program.command("cron").description("Manage cron jobs"); + + cron + .command("status") + .description("Show cron service status") + .action(() => { + const service = getCronService(); + const status = service.status(); + console.log("Cron Service Status:"); + console.log(` Running: ${status.running}`); + console.log(` Jobs: ${status.jobCount}`); + if (status.nextWakeAtMs) { + console.log(` Next wake: ${new Date(status.nextWakeAtMs).toISOString()}`); + } + }); + + cron + .command("list") + .description("List all cron jobs") + .option("--enabled", "Show only enabled jobs") + .option("--disabled", "Show only disabled jobs") + .action((opts) => { + const service = getCronService(); + const enabled = opts.enabled ? true : opts.disabled ? false : undefined; + const jobs = service.list({ enabled }); + + if (jobs.length === 0) { + console.log("No cron jobs found."); + return; + } + + for (const job of jobs) { + console.log(`\n${job.enabled ? "✓" : "✗"} ${job.name} (${job.id})`); + console.log(` Schedule: ${formatSchedule(job.schedule)}`); + console.log(` Target: ${job.sessionTarget}`); + if (job.state.nextRunAtMs) { + console.log(` Next run: ${new Date(job.state.nextRunAtMs).toISOString()}`); + } + if (job.state.lastStatus) { + console.log(` Last run: ${job.state.lastStatus} (${job.state.lastDurationMs}ms)`); + } + } + }); + + cron + .command("add") + .description("Add a new cron job") + .requiredOption("-n, --name ", "Job name") + .option("--at