diff --git a/README.md b/README.md index 1608af8f..1d607deb 100644 --- a/README.md +++ b/README.md @@ -113,11 +113,3 @@ Web/Mobile Clients | Channel system | [docs/channels/README.md](./docs/channels/README.md) | | Channel media handling | [docs/channels/media-handling.md](./docs/channels/media-handling.md) | | Desktop login integration | [docs/auth/desktop-integration.md](./docs/auth/desktop-integration.md) | - -**Design Proposals** (not yet implemented) - -| Topic | Link | -|-------|------| -| Auto memory refresh | [docs/design/auto-memory-refresh.md](./docs/design/auto-memory-refresh.md) | -| Cron job tool | [docs/design/cron-job-tool.md](./docs/design/cron-job-tool.md) | -| Dashboard design | [docs/plans/2026-02-12-dashboard-design.md](./docs/plans/2026-02-12-dashboard-design.md) | diff --git a/docs/design/auto-memory-refresh.md b/docs/design/auto-memory-refresh.md deleted file mode 100644 index 5e2f5929..00000000 --- a/docs/design/auto-memory-refresh.md +++ /dev/null @@ -1,416 +0,0 @@ -# Auto Memory Refresh 实现方案 - -## 概述 - -在上下文压缩(compaction)发生之前,自动触发一个特殊的 agent turn,让 agent 分析即将被删除的消息,提取关键信息并写入 memory 文件,防止重要上下文丢失。 - -## OpenClaw 的实现分析 - -### 核心机制 - -OpenClaw 采用 **Pre-compaction Memory Flush** 策略: - -``` -Session 运行中,token 累积 - ↓ -totalTokens >= (contextWindow - reserveTokens - softThreshold)? - ↓ YES -触发 Memory Flush Turn(特殊的 agent 对话轮次) - ↓ -Agent 分析会话,将重要信息保存到 memory/YYYY-MM-DD.md - ↓ -然后才执行 Compaction(删除旧消息) -``` - -### 关键设计点 - -1. **Soft Threshold(软阈值)** - - 默认值:`4000 tokens` - - 触发条件:`totalTokens >= contextWindow - reserveTokens - softThreshold` - - 在真正达到 compaction 阈值之前就触发 memory flush - -2. **Memory Flush Prompt** - ``` - Pre-compaction memory flush. Store durable memories now - (use memory/YYYY-MM-DD.md; create memory/ if needed). - If nothing to store, reply with [SILENT]. - ``` - -3. **防重复机制** - - 使用 `memoryFlushCompactionCount` 追踪 - - 确保每次 compaction 周期只触发一次 flush - -4. **Memory 文件结构** - ``` - ~/.super-multica/agent-profiles// - ├── memory.md # 主 memory 文件 - └── memory/ - ├── 2024-01-15.md # 日期分片 - ├── 2024-01-16.md - └── topics/ - └── project-x.md # 主题分片 - ``` - ---- - -## Super Multica 实现方案 - -### Phase 1: 核心实现 - -#### 1.1 新增配置项 - -**文件:** `src/agent/session/session-manager.ts` - -```typescript -export type SessionManagerOptions = { - // ... existing options ... - - // Memory Flush 配置 - /** 是否启用自动 memory flush(默认:true) */ - enableMemoryFlush?: boolean | undefined; - /** Memory flush 软阈值(在 compaction 前多少 tokens 触发),默认 4000 */ - memoryFlushSoftTokens?: number | undefined; -}; -``` - -#### 1.2 新增 Memory Flush 模块 - -**文件:** `src/agent/memory/memory-flush.ts` - -```typescript -/** Memory flush 配置 */ -export type MemoryFlushSettings = { - /** 软阈值(tokens),在达到 compaction 阈值前多少 tokens 触发 */ - softThresholdTokens: number; - /** Memory flush 使用的系统 prompt */ - systemPrompt: string; - /** Memory flush 使用的用户 prompt */ - userPrompt: string; -}; - -export const DEFAULT_MEMORY_FLUSH_SETTINGS: MemoryFlushSettings = { - softThresholdTokens: 4000, - systemPrompt: `You are in a pre-compaction memory flush turn. The session is approaching context limit and old messages will be deleted soon. - -Your task: Review the conversation and extract any important information that should be preserved in long-term memory. Focus on: -- User preferences and settings -- Key decisions made -- Important technical details or solutions -- Project-specific knowledge -- Anything the user would want remembered in future sessions - -Use the memory_write tool to save important information. If there's nothing worth saving, respond with [SILENT].`, - - userPrompt: `[SYSTEM] Pre-compaction memory flush triggered. Please review recent conversation and save any important information to memory before context compression occurs.`, -}; - -/** 检查是否应该触发 memory flush */ -export function shouldRunMemoryFlush(params: { - currentTokens: number; - contextWindowTokens: number; - reserveTokens: number; - softThresholdTokens: number; - lastMemoryFlushCompactionCount?: number; - currentCompactionCount: number; -}): boolean { - const { - currentTokens, - contextWindowTokens, - reserveTokens, - softThresholdTokens, - lastMemoryFlushCompactionCount, - currentCompactionCount, - } = params; - - // 如果当前 compaction 周期已经 flush 过,不再触发 - if (lastMemoryFlushCompactionCount === currentCompactionCount) { - return false; - } - - // 计算 flush 阈值 - const flushThreshold = contextWindowTokens - reserveTokens - softThresholdTokens; - - return currentTokens >= flushThreshold; -} -``` - -#### 1.3 扩展 SessionEntry 类型 - -**文件:** `src/agent/session/types.ts` - -```typescript -export type SessionMeta = { - // ... existing fields ... - - /** 上次 memory flush 的时间戳 */ - memoryFlushAt?: number; - /** 上次 memory flush 时的 compaction 计数 */ - memoryFlushCompactionCount?: number; - /** Compaction 次数 */ - compactionCount?: number; -}; -``` - -#### 1.4 修改 Agent Runner - -**文件:** `src/agent/runner.ts` - -```typescript -// 在 maybeCompact 之前检查并执行 memory flush -private async maybeCompact() { - const messages = this.agent.state.messages.slice(); - - // Phase 0: Check if memory flush is needed - if (this.enableMemoryFlush) { - const shouldFlush = shouldRunMemoryFlush({ - currentTokens: this.estimateCurrentTokens(messages), - contextWindowTokens: this.contextWindowGuard.tokens, - reserveTokens: this.reserveTokens, - softThresholdTokens: this.memoryFlushSoftTokens, - lastMemoryFlushCompactionCount: this.session.getMeta()?.memoryFlushCompactionCount, - currentCompactionCount: this.session.getCompactionCount(), - }); - - if (shouldFlush) { - await this.runMemoryFlush(); - } - } - - // 继续原有的 compaction 逻辑... - if (!this.session.needsCompaction(messages)) return; - // ... -} - -private async runMemoryFlush() { - this.emitMulticaEvent({ type: "memory_flush_start" }); - - try { - // 创建一个临时的 agent turn 来执行 memory flush - const flushResult = await this.executeMemoryFlushTurn(); - - // 更新 session metadata - this.session.saveMeta({ - ...this.session.getMeta(), - memoryFlushAt: Date.now(), - memoryFlushCompactionCount: this.session.getCompactionCount(), - }); - - this.emitMulticaEvent({ - type: "memory_flush_end", - saved: flushResult.memoriesSaved, - }); - } catch (error) { - console.error("[Agent] Memory flush failed:", error); - // Memory flush 失败不阻塞 compaction - } -} - -private async executeMemoryFlushTurn(): Promise<{ memoriesSaved: number }> { - // 使用特殊的 system prompt 和 user prompt - // 让 agent 分析当前会话并保存重要信息 - // 只允许使用 memory_write 工具 - - const originalSystemPrompt = this.agent.state.systemPrompt; - const originalTools = this.agent.state.tools; - - try { - // 临时切换到 memory flush 模式 - this.agent.setSystemPrompt(this.memoryFlushSettings.systemPrompt); - this.agent.setTools([this.memoryWriteTool]); // 只允许 memory_write - - // 执行一个 agent turn - await this.agent.run(this.memoryFlushSettings.userPrompt); - - // 统计保存了多少 memory - return { memoriesSaved: this.countMemoryWriteCalls() }; - } finally { - // 恢复原始设置 - this.agent.setSystemPrompt(originalSystemPrompt); - this.agent.setTools(originalTools); - } -} -``` - -#### 1.5 新增 Memory Flush Events - -**文件:** `src/agent/events.ts` - -```typescript -/** Memory flush 开始事件 */ -export type MemoryFlushStartEvent = { - type: "memory_flush_start"; -}; - -/** Memory flush 结束事件 */ -export type MemoryFlushEndEvent = { - type: "memory_flush_end"; - /** 保存的 memory 条目数 */ - saved: number; -}; - -/** Union of all Multica-specific events */ -export type MulticaEvent = - | CompactionStartEvent - | CompactionEndEvent - | MemoryFlushStartEvent - | MemoryFlushEndEvent; -``` - ---- - -### Phase 2: Memory 文件管理 - -#### 2.1 Memory 文件结构 - -``` -~/.super-multica/agent-profiles// -├── identity.md # 身份设定(已有) -├── memory.md # 主 memory 文件(已有) -├── memory/ # Memory 分片目录(新增) -│ ├── 2024-01-15.md # 日期分片 -│ ├── 2024-01-16.md -│ └── ... -└── sessions/ # Session 记录(已有) -``` - -#### 2.2 Memory Write Tool 增强 - -**文件:** `src/agent/tools/memory/memory-write.ts` - -```typescript -// 支持写入日期分片 -export function resolveMemoryPath( - profileDir: string, - targetFile?: string -): string { - if (!targetFile) { - // 默认写入今天的日期分片 - const today = new Date().toISOString().split('T')[0]; // YYYY-MM-DD - const memoryDir = path.join(profileDir, 'memory'); - ensureDirSync(memoryDir); - return path.join(memoryDir, `${today}.md`); - } - - // 支持指定文件 - if (targetFile.startsWith('memory/')) { - return path.join(profileDir, targetFile); - } - - return path.join(profileDir, 'memory.md'); -} -``` - ---- - -### Phase 3: 前端集成 - -#### 3.1 SDK 事件类型 - -**文件:** `packages/sdk/src/types.ts` - -```typescript -export type LocalChatEvent = - | MessageStartEvent - | MessageUpdateEvent - | MessageEndEvent - | ToolExecutionStartEvent - | ToolExecutionEndEvent - | CompactionStartEvent - | CompactionEndEvent - | MemoryFlushStartEvent // 新增 - | MemoryFlushEndEvent; // 新增 -``` - -#### 3.2 Zustand Store - -**文件:** `packages/store/src/agent-store.ts` - -```typescript -interface AgentState { - // ... existing state ... - - /** 是否正在进行 memory flush */ - memoryFlushing: boolean; - /** 上次 memory flush 信息 */ - lastMemoryFlush: { - timestamp: number; - saved: number; - } | null; -} -``` - -#### 3.3 Desktop UI 提示 - -在 compaction 提示之前显示 "正在保存重要记忆..." - ---- - -## 实现顺序 - -1. **Step 1**: 新增 `memory-flush.ts` 模块,定义类型和判断逻辑 -2. **Step 2**: 扩展 `SessionMeta` 类型,添加 flush 相关字段 -3. **Step 3**: 新增 `MemoryFlushStartEvent` 和 `MemoryFlushEndEvent` 事件 -4. **Step 4**: 修改 `Agent` 类,添加 `runMemoryFlush` 方法 -5. **Step 5**: 修改 `maybeCompact` 流程,在 compaction 前检查并执行 flush -6. **Step 6**: 增强 `memory_write` tool,支持日期分片 -7. **Step 7**: SDK 和 Store 集成 -8. **Step 8**: Desktop UI 提示 - ---- - -## 配置项汇总 - -| 配置项 | 类型 | 默认值 | 说明 | -|--------|------|--------|------| -| `enableMemoryFlush` | boolean | true | 是否启用自动 memory flush | -| `memoryFlushSoftTokens` | number | 4000 | 在 compaction 阈值前多少 tokens 触发 | - ---- - -## 与现有功能的关系 - -``` -Token 累积 - ↓ -达到 Memory Flush 阈值? ──────────────────────┐ - ↓ YES │ -Memory Flush Turn(新功能) │ - ├─ Agent 分析会话 │ - ├─ 调用 memory_write 保存重要信息 │ - └─ 更新 memoryFlushCompactionCount │ - ↓ │ -达到 Compaction 阈值? ←───────────────────────┘ - ↓ YES ↓ NO -┌─────────────────────────────┐ │ -│ Tool Result Pruning(已实现)│ │ -│ Soft-trim / Hard-clear │ │ -└─────────────────────────────┘ │ - ↓ │ -Message Compaction │ - ├─ 删除旧消息 │ - └─ 或生成摘要 │ - ↓ │ -继续会话 ←──────────────────────────────────┘ -``` - ---- - -## 风险和注意事项 - -1. **Token 消耗**: Memory flush turn 本身会消耗 tokens,需要控制 prompt 长度 -2. **循环触发**: 需要 `memoryFlushCompactionCount` 防止重复触发 -3. **Tool 限制**: Flush turn 应该只允许 `memory_write`,防止执行其他操作 -4. **超时处理**: Flush turn 需要有超时机制,不能阻塞太久 -5. **静默响应**: 如果没有需要保存的内容,agent 应该返回 `[SILENT]` 跳过 - ---- - -## 测试计划 - -1. **单元测试**: `shouldRunMemoryFlush` 判断逻辑 -2. **集成测试**: Memory flush turn 执行流程 -3. **E2E 测试**: 完整的 flush → compaction 流程 -4. **边界测试**: - - 连续多次 compaction 只触发一次 flush - - Flush 失败不阻塞 compaction - - Agent 返回 [SILENT] 时正常跳过 diff --git a/docs/design/cron-job-tool.md b/docs/design/cron-job-tool.md deleted file mode 100644 index a219b586..00000000 --- a/docs/design/cron-job-tool.md +++ /dev/null @@ -1,823 +0,0 @@ -# Cron Job Tool 实现方案 - -## 概述 - -Cron Job Tool 允许 Agent 创建定时任务,在指定时间或周期性地执行操作。这对于提醒、定期检查、自动化工作流等场景非常有用。 - -## OpenClaw 实现分析 - -### 核心架构 - -``` -┌─────────────────────────────────────────────────────────────┐ -│ CronService │ -├─────────────────────────────────────────────────────────────┤ -│ start() → 加载 jobs, 计算下次运行时间, 启动 timer │ -│ add() → 创建任务, 计算 schedule, 持久化 │ -│ update() → 修改任务, 重新计算 schedule │ -│ remove() → 删除任务 │ -│ run() → 立即执行任务 │ -│ list() → 列出所有任务 │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Timer Loop │ -├─────────────────────────────────────────────────────────────┤ -│ armTimer(nextWakeAtMs) │ -│ ↓ │ -│ onTimer() → runDueJobs() → executeJob() │ -│ ↓ │ -│ Update state, re-arm timer │ -└─────────────────────────────────────────────────────────────┘ -``` - -### Job 类型 - -**Schedule 类型:** -1. **at** - 一次性任务(指定时间戳) -2. **every** - 固定间隔(如每 30 分钟) -3. **cron** - 标准 cron 表达式(5 字段 + 可选时区) - -**Session Target:** -1. **main** - 注入到主会话(作为系统事件) -2. **isolated** - 在独立会话中运行 agent turn - -**Payload 类型:** -1. **systemEvent** - 注入文本到主会话 -2. **agentTurn** - 在独立会话中执行 agent(可指定 model/thinking) - -### 存储结构 - -``` -~/.openclaw/cron/ -├── jobs.json # 所有任务定义 -├── jobs.json.bak # 备份 -└── runs/ - ├── .jsonl # 任务1的运行历史 - └── .jsonl # 任务2的运行历史 -``` - ---- - -## Super Multica 实现方案 - -### Phase 1: 核心数据结构 - -#### 1.1 Job 类型定义 - -**文件:** `src/cron/types.ts` - -```typescript -import type { v7 as uuidv7 } from "uuid"; - -/** Cron 任务调度类型 */ -export type CronSchedule = - | { kind: "at"; atMs: number } // 一次性(时间戳) - | { kind: "every"; everyMs: number; anchorMs?: number } // 固定间隔 - | { kind: "cron"; expr: string; tz?: string }; // Cron 表达式 - -/** 任务执行目标 */ -export type CronSessionTarget = "main" | "isolated"; - -/** 唤醒模式 */ -export type CronWakeMode = "next-heartbeat" | "now"; - -/** 任务载荷 */ -export type CronPayload = - | { - kind: "system-event"; - text: string; // 注入到主会话的文本 - } - | { - kind: "agent-turn"; - message: string; // Agent 执行的 prompt - model?: string; // 可选 model override - thinkingLevel?: string; // 可选 thinking level - timeoutSeconds?: number; // 超时时间 - }; - -/** 任务运行状态 */ -export type CronJobState = { - nextRunAtMs?: number; // 下次运行时间 - runningAtMs?: number; // 正在运行的时间戳(锁) - lastRunAtMs?: number; // 上次运行时间 - lastStatus?: "ok" | "error" | "skipped"; - lastError?: string; - lastDurationMs?: number; -}; - -/** Cron 任务定义 */ -export type CronJob = { - id: string; // UUID - name: string; // 用户友好名称 - description?: string; // 描述 - enabled: boolean; // 是否启用 - deleteAfterRun?: boolean; // 运行后自动删除(一次性任务) - createdAtMs: number; - updatedAtMs: number; - schedule: CronSchedule; - sessionTarget: CronSessionTarget; - wakeMode: CronWakeMode; - payload: CronPayload; - state: CronJobState; -}; - -/** 创建任务的输入 */ -export type CronJobInput = Omit; - -/** 运行日志条目 */ -export type CronRunLogEntry = { - ts: number; - jobId: string; - action: "run" | "skip" | "error"; - status: "ok" | "error" | "skipped"; - error?: string; - summary?: string; - durationMs?: number; - nextRunAtMs?: number; -}; -``` - -#### 1.2 存储层 - -**文件:** `src/cron/store.ts` - -```typescript -import { existsSync, mkdirSync, readFileSync, writeFileSync } from "fs"; -import path from "path"; -import type { CronJob, CronRunLogEntry } from "./types.js"; - -const DEFAULT_CRON_DIR = path.join( - process.env["HOME"] ?? ".", - ".super-multica", - "cron" -); - -export class CronStore { - private readonly jobsPath: string; - private readonly runsDir: string; - private jobs: Map = new Map(); - - constructor(baseDir: string = DEFAULT_CRON_DIR) { - this.jobsPath = path.join(baseDir, "jobs.json"); - this.runsDir = path.join(baseDir, "runs"); - this.ensureDirs(); - } - - private ensureDirs() { - const dir = path.dirname(this.jobsPath); - if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); - if (!existsSync(this.runsDir)) mkdirSync(this.runsDir, { recursive: true }); - } - - load(): CronJob[] { - if (!existsSync(this.jobsPath)) return []; - const data = JSON.parse(readFileSync(this.jobsPath, "utf-8")); - this.jobs = new Map(data.jobs.map((j: CronJob) => [j.id, j])); - return Array.from(this.jobs.values()); - } - - save() { - const jobs = Array.from(this.jobs.values()); - // Backup first - if (existsSync(this.jobsPath)) { - writeFileSync(this.jobsPath + ".bak", readFileSync(this.jobsPath)); - } - writeFileSync(this.jobsPath, JSON.stringify({ jobs }, null, 2)); - } - - get(id: string): CronJob | undefined { - return this.jobs.get(id); - } - - set(job: CronJob) { - this.jobs.set(job.id, job); - this.save(); - } - - delete(id: string): boolean { - const deleted = this.jobs.delete(id); - if (deleted) this.save(); - return deleted; - } - - list(filter?: { enabled?: boolean }): CronJob[] { - let jobs = Array.from(this.jobs.values()); - if (filter?.enabled !== undefined) { - jobs = jobs.filter((j) => j.enabled === filter.enabled); - } - return jobs; - } - - // Run log methods - appendRunLog(jobId: string, entry: CronRunLogEntry) { - const logPath = path.join(this.runsDir, `${jobId}.jsonl`); - const line = JSON.stringify(entry) + "\n"; - writeFileSync(logPath, line, { flag: "a" }); - } - - getRunLogs(jobId: string, limit = 50): CronRunLogEntry[] { - const logPath = path.join(this.runsDir, `${jobId}.jsonl`); - if (!existsSync(logPath)) return []; - const lines = readFileSync(logPath, "utf-8").trim().split("\n"); - return lines.slice(-limit).map((l) => JSON.parse(l)); - } -} -``` - ---- - -### Phase 2: Cron Service - -**文件:** `src/cron/service.ts` - -```typescript -import { v7 as uuidv7 } from "uuid"; -import Croner from "croner"; -import type { CronJob, CronJobInput, CronSchedule } from "./types.js"; -import { CronStore } from "./store.js"; - -export class CronService { - private store: CronStore; - private timer: NodeJS.Timeout | null = null; - private running = false; - - constructor(store?: CronStore) { - this.store = store ?? new CronStore(); - } - - /** 启动服务 */ - async start() { - if (this.running) return; - this.running = true; - this.store.load(); - this.recomputeAllSchedules(); - this.armTimer(); - } - - /** 停止服务 */ - stop() { - this.running = false; - if (this.timer) { - clearTimeout(this.timer); - this.timer = null; - } - } - - /** 获取服务状态 */ - status() { - const jobs = this.store.list({ enabled: true }); - const nextWake = Math.min( - ...jobs.map((j) => j.state.nextRunAtMs ?? Infinity) - ); - return { - running: this.running, - jobCount: jobs.length, - nextWakeAtMs: nextWake === Infinity ? null : nextWake, - }; - } - - /** 列出任务 */ - list(filter?: { enabled?: boolean }) { - return this.store.list(filter); - } - - /** 添加任务 */ - add(input: CronJobInput): CronJob { - const now = Date.now(); - const job: CronJob = { - ...input, - id: uuidv7(), - createdAtMs: now, - updatedAtMs: now, - state: {}, - }; - this.computeNextRun(job); - this.store.set(job); - this.armTimer(); - return job; - } - - /** 更新任务 */ - update(id: string, patch: Partial): CronJob | null { - const job = this.store.get(id); - if (!job) return null; - - Object.assign(job, patch, { updatedAtMs: Date.now() }); - if (patch.schedule) { - this.computeNextRun(job); - } - this.store.set(job); - this.armTimer(); - return job; - } - - /** 删除任务 */ - remove(id: string): boolean { - return this.store.delete(id); - } - - /** 立即运行任务 */ - async run(id: string, force = false): Promise<{ ok: boolean; reason?: string }> { - const job = this.store.get(id); - if (!job) return { ok: false, reason: "Job not found" }; - if (!job.enabled && !force) return { ok: false, reason: "Job disabled" }; - - await this.executeJob(job); - return { ok: true }; - } - - /** 获取运行日志 */ - getRunLogs(id: string) { - return this.store.getRunLogs(id); - } - - // === Private Methods === - - private computeNextRun(job: CronJob) { - const now = Date.now(); - let nextMs: number; - - switch (job.schedule.kind) { - case "at": - nextMs = job.schedule.atMs; - break; - case "every": - const anchor = job.schedule.anchorMs ?? now; - const interval = job.schedule.everyMs; - const elapsed = now - anchor; - const periods = Math.ceil(elapsed / interval); - nextMs = anchor + periods * interval; - break; - case "cron": - const cron = Croner(job.schedule.expr, { - timezone: job.schedule.tz, - }); - const next = cron.nextRun(); - nextMs = next ? next.getTime() : now + 86400000; // fallback 1 day - break; - } - - job.state.nextRunAtMs = nextMs; - } - - private recomputeAllSchedules() { - for (const job of this.store.list({ enabled: true })) { - this.computeNextRun(job); - this.store.set(job); - } - } - - private armTimer() { - if (this.timer) { - clearTimeout(this.timer); - this.timer = null; - } - - const jobs = this.store.list({ enabled: true }); - const nextWake = Math.min( - ...jobs.map((j) => j.state.nextRunAtMs ?? Infinity) - ); - - if (nextWake === Infinity) return; - - const delay = Math.max(0, nextWake - Date.now()); - this.timer = setTimeout(() => this.onTimer(), delay); - } - - private async onTimer() { - const now = Date.now(); - const dueJobs = this.store - .list({ enabled: true }) - .filter((j) => (j.state.nextRunAtMs ?? Infinity) <= now); - - for (const job of dueJobs) { - await this.executeJob(job); - } - - this.armTimer(); - } - - private async executeJob(job: CronJob) { - const startMs = Date.now(); - job.state.runningAtMs = startMs; - this.store.set(job); - - try { - // TODO: 实际执行逻辑 - // - systemEvent: 注入到主会话 - // - agentTurn: 在独立会话中运行 agent - console.log(`[Cron] Executing job: ${job.name} (${job.id})`); - - // 模拟执行 - await new Promise((r) => setTimeout(r, 100)); - - // 更新状态 - job.state.lastRunAtMs = startMs; - job.state.lastStatus = "ok"; - job.state.lastDurationMs = Date.now() - startMs; - job.state.runningAtMs = undefined; - - // 一次性任务处理 - if (job.schedule.kind === "at") { - if (job.deleteAfterRun) { - this.store.delete(job.id); - } else { - job.enabled = false; - } - } else { - this.computeNextRun(job); - } - - this.store.set(job); - this.store.appendRunLog(job.id, { - ts: startMs, - jobId: job.id, - action: "run", - status: "ok", - durationMs: job.state.lastDurationMs, - nextRunAtMs: job.state.nextRunAtMs, - }); - } catch (error) { - job.state.lastRunAtMs = startMs; - job.state.lastStatus = "error"; - job.state.lastError = String(error); - job.state.lastDurationMs = Date.now() - startMs; - job.state.runningAtMs = undefined; - this.computeNextRun(job); - this.store.set(job); - this.store.appendRunLog(job.id, { - ts: startMs, - jobId: job.id, - action: "error", - status: "error", - error: String(error), - durationMs: job.state.lastDurationMs, - }); - } - } -} -``` - ---- - -### Phase 3: Agent Tool - -**文件:** `src/agent/tools/cron/cron-tool.ts` - -```typescript -import type { Tool } from "@mariozechner/pi-agent-core"; -import { CronService } from "../../../cron/service.js"; - -let cronService: CronService | null = null; - -export function getCronService(): CronService { - if (!cronService) { - cronService = new CronService(); - cronService.start(); - } - return cronService; -} - -export const cronTool: Tool = { - name: "cron", - description: `Create, manage, and execute scheduled tasks (cron jobs). - -## Actions - -### list -List all cron jobs. -\`\`\`json -{ "action": "list", "enabled": true } -\`\`\` - -### add -Create a new cron job. -\`\`\`json -{ - "action": "add", - "name": "Daily reminder", - "schedule": { "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" }, - "sessionTarget": "main", - "wakeMode": "now", - "payload": { "kind": "system-event", "text": "Check your todos!" } -} -\`\`\` - -Schedule types: -- \`{ "kind": "at", "atMs": 1704067200000 }\` - One-time at timestamp -- \`{ "kind": "every", "everyMs": 3600000 }\` - Every hour -- \`{ "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" }\` - Cron expression - -### update -Update an existing job. -\`\`\`json -{ "action": "update", "jobId": "xxx", "enabled": false } -\`\`\` - -### remove -Delete a job. -\`\`\`json -{ "action": "remove", "jobId": "xxx" } -\`\`\` - -### run -Execute a job immediately. -\`\`\`json -{ "action": "run", "jobId": "xxx", "force": true } -\`\`\` - -### status -Get cron service status. -\`\`\`json -{ "action": "status" } -\`\`\` -`, - - parameters: { - type: "object", - properties: { - action: { - type: "string", - enum: ["list", "add", "update", "remove", "run", "status"], - description: "The action to perform", - }, - // list - enabled: { type: "boolean", description: "Filter by enabled status" }, - // add - name: { type: "string", description: "Job name" }, - description: { type: "string", description: "Job description" }, - schedule: { type: "object", description: "Schedule configuration" }, - sessionTarget: { - type: "string", - enum: ["main", "isolated"], - description: "Where to run the job", - }, - wakeMode: { - type: "string", - enum: ["next-heartbeat", "now"], - description: "When to wake after job", - }, - payload: { type: "object", description: "Job payload" }, - deleteAfterRun: { type: "boolean", description: "Delete after one-time run" }, - // update/remove/run - jobId: { type: "string", description: "Job ID" }, - // run - force: { type: "boolean", description: "Force run even if disabled" }, - }, - required: ["action"], - }, - - execute: async (params: Record) => { - const service = getCronService(); - const action = params["action"] as string; - - switch (action) { - case "status": - return JSON.stringify(service.status(), null, 2); - - case "list": - const jobs = service.list({ - enabled: params["enabled"] as boolean | undefined, - }); - return JSON.stringify(jobs, null, 2); - - case "add": - const newJob = service.add({ - name: params["name"] as string, - description: params["description"] as string | undefined, - enabled: true, - deleteAfterRun: params["deleteAfterRun"] as boolean | undefined, - schedule: params["schedule"] as any, - sessionTarget: (params["sessionTarget"] as any) ?? "main", - wakeMode: (params["wakeMode"] as any) ?? "next-heartbeat", - payload: params["payload"] as any, - }); - return `Created job: ${newJob.name} (${newJob.id})\nNext run: ${new Date(newJob.state.nextRunAtMs!).toISOString()}`; - - case "update": - const updated = service.update(params["jobId"] as string, params as any); - if (!updated) return "Job not found"; - return `Updated job: ${updated.name}`; - - case "remove": - const removed = service.remove(params["jobId"] as string); - return removed ? "Job removed" : "Job not found"; - - case "run": - const result = await service.run( - params["jobId"] as string, - params["force"] as boolean - ); - return result.ok ? "Job executed" : `Failed: ${result.reason}`; - - default: - return `Unknown action: ${action}`; - } - }, -}; -``` - ---- - -### Phase 4: CLI 命令 - -**文件:** `src/agent/cli/commands/cron.ts` - -```typescript -import { Command } from "commander"; -import { getCronService } from "../../tools/cron/cron-tool.js"; - -export function registerCronCommands(program: Command) { - const cron = program.command("cron").description("Manage cron jobs"); - - cron - .command("status") - .description("Show cron service status") - .action(() => { - const service = getCronService(); - const status = service.status(); - console.log("Cron Service Status:"); - console.log(` Running: ${status.running}`); - console.log(` Jobs: ${status.jobCount}`); - if (status.nextWakeAtMs) { - console.log(` Next wake: ${new Date(status.nextWakeAtMs).toISOString()}`); - } - }); - - cron - .command("list") - .description("List all cron jobs") - .option("--enabled", "Show only enabled jobs") - .option("--disabled", "Show only disabled jobs") - .action((opts) => { - const service = getCronService(); - const enabled = opts.enabled ? true : opts.disabled ? false : undefined; - const jobs = service.list({ enabled }); - - if (jobs.length === 0) { - console.log("No cron jobs found."); - return; - } - - for (const job of jobs) { - console.log(`\n${job.enabled ? "✓" : "✗"} ${job.name} (${job.id})`); - console.log(` Schedule: ${formatSchedule(job.schedule)}`); - console.log(` Target: ${job.sessionTarget}`); - if (job.state.nextRunAtMs) { - console.log(` Next run: ${new Date(job.state.nextRunAtMs).toISOString()}`); - } - if (job.state.lastStatus) { - console.log(` Last run: ${job.state.lastStatus} (${job.state.lastDurationMs}ms)`); - } - } - }); - - cron - .command("add") - .description("Add a new cron job") - .requiredOption("-n, --name ", "Job name") - .option("--at