From de355cace37366cae40aa5464c21413694c1d1ea Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Thu, 5 Feb 2026 17:46:57 +0800 Subject: [PATCH] feat(cron): add cron job scheduling module Implements a timer-based cron job system: - types.ts: Job types (at, every, cron schedules) - schedule.ts: Next run computation using croner - store.ts: Persistent JSON storage with JSONL run logs - service.ts: CronService with timer management - execute.ts: Job execution (system-event, agent-turn) Based on OpenClaw's implementation (MIT License). Co-Authored-By: Claude Opus 4.5 --- src/cron/execute.ts | 139 ++++++++++++++ src/cron/index.ts | 39 ++++ src/cron/schedule.ts | 187 +++++++++++++++++++ src/cron/service.ts | 432 +++++++++++++++++++++++++++++++++++++++++++ src/cron/store.ts | 216 ++++++++++++++++++++++ src/cron/types.ts | 116 ++++++++++++ 6 files changed, 1129 insertions(+) create mode 100644 src/cron/execute.ts create mode 100644 src/cron/index.ts create mode 100644 src/cron/schedule.ts create mode 100644 src/cron/service.ts create mode 100644 src/cron/store.ts create mode 100644 src/cron/types.ts diff --git a/src/cron/execute.ts b/src/cron/execute.ts new file mode 100644 index 00000000..23b1526e --- /dev/null +++ b/src/cron/execute.ts @@ -0,0 +1,139 @@ +/** + * Cron Job Execution + * + * Handles the actual execution of cron job payloads. + * Based on OpenClaw's implementation (MIT License) + */ + +import type { CronJob } from "./types.js"; +import { getHub, isHubInitialized } from "../hub/hub-singleton.js"; + +/** Execution result */ +export type ExecutionResult = { + summary?: string; + error?: string; +}; + +/** + * Execute a cron job payload. + * + * For system-event: Injects text into the main session + * For agent-turn: Creates an isolated agent turn + */ +export async function executeCronJob(job: CronJob): Promise { + const { payload } = job; + + switch (payload.kind) { + case "system-event": + return executeSystemEvent(job); + case "agent-turn": + return executeAgentTurn(job); + default: + return { error: `Unknown payload kind: ${(payload as { kind: string }).kind}` }; + } +} + +/** + * Execute a system-event payload. + * Injects the text into the main session as a system message. + */ +async function executeSystemEvent(job: CronJob): Promise { + if (!isHubInitialized()) { + return { error: "Hub not available" }; + } + const hub = getHub(); + + const payload = job.payload as { kind: "system-event"; text: string }; + + // Get the list of active agents + const agentIds = hub.listAgents(); + if (agentIds.length === 0) { + return { error: "No active agents" }; + } + + // For now, inject into the first (main) agent + // TODO: Support targeting specific agent by ID + const agentId = agentIds[0]!; + const agent = hub.getAgent(agentId); + if (!agent || agent.closed) { + return { error: `Agent ${agentId} not found or closed` }; + } + + // Format the cron message with metadata + const cronMessage = `[CRON] ${job.name}: ${payload.text}`; + + try { + // Write to agent (non-blocking, will be processed in queue) + agent.write(cronMessage); + + // Wait for the agent to process the message + await agent.waitForIdle(); + + return { summary: `Injected message into agent ${agentId.slice(0, 8)}` }; + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } +} + +/** + * Execute an agent-turn payload. + * Creates an isolated subagent to run the task. + */ +async function executeAgentTurn(job: CronJob): Promise { + if (!isHubInitialized()) { + return { error: "Hub not available" }; + } + const hub = getHub(); + + const payload = job.payload as { + kind: "agent-turn"; + message: string; + model?: string; + thinkingLevel?: string; + timeoutSeconds?: number; + }; + + // Generate a unique session ID for this isolated run + const sessionId = `cron-${job.id}-${Date.now()}`; + + try { + // Create isolated subagent + // TODO: Support model/thinkingLevel override + const agent = hub.createSubagent(sessionId, { + profileId: "default", + }); + + // Set up timeout if specified + const timeoutMs = (payload.timeoutSeconds ?? 300) * 1000; // default 5 minutes + let timeoutHandle: NodeJS.Timeout | undefined; + + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Cron job timed out after ${payload.timeoutSeconds}s`)); + }, timeoutMs); + }); + + // Execute the agent turn + const executePromise = (async (): Promise => { + const cronMessage = `[CRON Job: ${job.name}]\n\n${payload.message}`; + agent.write(cronMessage); + await agent.waitForIdle(); + return { summary: `Completed agent turn in isolated session ${sessionId.slice(0, 16)}` }; + })(); + + // Race between execution and timeout + const result = await Promise.race([executePromise, timeoutPromise]); + + // Clear timeout + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + + // Close the subagent + agent.close(); + + return result; + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } +} diff --git a/src/cron/index.ts b/src/cron/index.ts new file mode 100644 index 00000000..21d4b659 --- /dev/null +++ b/src/cron/index.ts @@ -0,0 +1,39 @@ +/** + * Cron Module + * + * Provides scheduled task functionality for Super Multica. + */ + +export type { + CronSchedule, + CronSessionTarget, + CronWakeMode, + CronPayload, + CronJobState, + CronJob, + CronJobInput, + CronJobPatch, + CronRunLogEntry, + CronConfig, +} from "./types.js"; + +export { + computeNextRunAtMs, + isValidCronExpr, + parseTimeInput, + parseIntervalInput, + formatSchedule, + formatDuration, +} from "./schedule.js"; + +export { CronStore } from "./store.js"; + +export { + CronService, + getCronService, + shutdownCronService, + type CronJobExecutor, + type CronServiceStatus, +} from "./service.js"; + +export { executeCronJob, type ExecutionResult } from "./execute.js"; diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts new file mode 100644 index 00000000..e06d02af --- /dev/null +++ b/src/cron/schedule.ts @@ -0,0 +1,187 @@ +/** + * Cron Schedule Computation + * + * Based on OpenClaw's implementation (MIT License) + */ + +import { Cron } from "croner"; +import type { CronSchedule } from "./types.js"; + +/** + * Compute the next run time for a schedule. + * + * @param schedule - The schedule configuration + * @param nowMs - Current time in milliseconds (default: Date.now()) + * @returns Next run time in ms, or undefined if no future run + */ +export function computeNextRunAtMs( + schedule: CronSchedule, + nowMs: number = Date.now(), +): number | undefined { + switch (schedule.kind) { + case "at": + // One-shot: return the timestamp if it's in the future + return schedule.atMs > nowMs ? schedule.atMs : undefined; + + case "every": { + // Fixed interval: compute next occurrence + const everyMs = Math.max(1, Math.floor(schedule.everyMs)); + const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs)); + + if (nowMs < anchor) return anchor; + + const elapsed = nowMs - anchor; + const steps = Math.max(1, Math.floor((elapsed + everyMs - 1) / everyMs)); + return anchor + steps * everyMs; + } + + case "cron": { + // Cron expression: use croner to compute next run + const expr = schedule.expr.trim(); + if (!expr) return undefined; + + try { + const tz = schedule.tz?.trim(); + const cron = tz ? new Cron(expr, { timezone: tz }) : new Cron(expr); + const next = cron.nextRun(new Date(nowMs)); + return next ? next.getTime() : undefined; + } catch (error) { + console.error(`[Cron] Invalid cron expression: ${expr}`, error); + return undefined; + } + } + } +} + +/** + * Validate a cron expression. + * + * @param expr - Cron expression (5-field) + * @param tz - Optional timezone + * @returns true if valid, false otherwise + */ +export function isValidCronExpr(expr: string, tz?: string): boolean { + try { + const timezone = tz?.trim(); + if (timezone) { + new Cron(expr.trim(), { timezone }); + } else { + new Cron(expr.trim()); + } + return true; + } catch { + return false; + } +} + +/** + * Parse a human-readable time string into milliseconds. + * + * Supports: + * - Relative: "10s", "5m", "2h", "1d" + * - ISO 8601: "2024-01-15T09:00:00Z" + * - Unix timestamp (if numeric) + * + * @param input - Time string + * @param nowMs - Current time for relative calculations + * @returns Timestamp in ms, or undefined if invalid + */ +export function parseTimeInput(input: string, nowMs: number = Date.now()): number | undefined { + const trimmed = input.trim(); + + // Check for relative time (e.g., "10m", "2h") + const relativeMatch = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i); + if (relativeMatch) { + const [, numStr, unit] = relativeMatch; + const num = parseFloat(numStr!); + const multipliers: Record = { + s: 1000, + m: 60 * 1000, + h: 60 * 60 * 1000, + d: 24 * 60 * 60 * 1000, + }; + const ms = multipliers[unit!.toLowerCase()]; + if (ms !== undefined) { + return nowMs + num * ms; + } + } + + // Check for numeric (unix timestamp in ms or seconds) + if (/^\d+$/.test(trimmed)) { + const num = parseInt(trimmed, 10); + // If it looks like seconds (before year 2100), convert to ms + if (num < 4102444800) { + return num * 1000; + } + return num; + } + + // Try ISO 8601 date parsing + const date = new Date(trimmed); + if (!isNaN(date.getTime())) { + return date.getTime(); + } + + return undefined; +} + +/** + * Parse an interval string into milliseconds. + * + * Supports: "30s", "5m", "2h", "1d", or raw milliseconds + * + * @param input - Interval string + * @returns Interval in ms, or undefined if invalid + */ +export function parseIntervalInput(input: string): number | undefined { + const trimmed = input.trim(); + + // Check for duration format (e.g., "30m", "2h") + const match = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i); + if (match) { + const [, numStr, unit] = match; + const num = parseFloat(numStr!); + const multipliers: Record = { + s: 1000, + m: 60 * 1000, + h: 60 * 60 * 1000, + d: 24 * 60 * 60 * 1000, + }; + const ms = multipliers[unit!.toLowerCase()]; + if (ms !== undefined) { + return num * ms; + } + } + + // Check for raw milliseconds + if (/^\d+$/.test(trimmed)) { + return parseInt(trimmed, 10); + } + + return undefined; +} + +/** + * Format a schedule for display. + */ +export function formatSchedule(schedule: CronSchedule): string { + switch (schedule.kind) { + case "at": + return `at ${new Date(schedule.atMs).toISOString()}`; + case "every": + return `every ${formatDuration(schedule.everyMs)}`; + case "cron": + return `cron "${schedule.expr}"${schedule.tz ? ` (${schedule.tz})` : ""}`; + } +} + +/** + * Format milliseconds as human-readable duration. + */ +export function formatDuration(ms: number): string { + if (ms < 1000) return `${ms}ms`; + if (ms < 60 * 1000) return `${Math.round(ms / 1000)}s`; + if (ms < 60 * 60 * 1000) return `${Math.round(ms / (60 * 1000))}m`; + if (ms < 24 * 60 * 60 * 1000) return `${Math.round(ms / (60 * 60 * 1000))}h`; + return `${Math.round(ms / (24 * 60 * 60 * 1000))}d`; +} diff --git a/src/cron/service.ts b/src/cron/service.ts new file mode 100644 index 00000000..ff5c911e --- /dev/null +++ b/src/cron/service.ts @@ -0,0 +1,432 @@ +/** + * Cron Service + * + * Manages scheduled jobs with timer-based execution. + * Based on OpenClaw's implementation (MIT License) + */ + +import { v7 as uuidv7 } from "uuid"; +import type { + CronJob, + CronJobInput, + CronJobPatch, + CronJobState, + CronRunLogEntry, + CronConfig, +} from "./types.js"; +import { CronStore } from "./store.js"; +import { computeNextRunAtMs } from "./schedule.js"; + +/** Callback for job execution */ +export type CronJobExecutor = (job: CronJob) => Promise<{ summary?: string; error?: string }>; + +/** Service status */ +export type CronServiceStatus = { + running: boolean; + enabled: boolean; + storePath: string; + jobCount: number; + enabledJobCount: number; + nextWakeAtMs: number | null; +}; + +/** Default stuck job timeout (2 hours) */ +const STUCK_JOB_TIMEOUT_MS = 2 * 60 * 60 * 1000; + +export class CronService { + private readonly store: CronStore; + private readonly config: CronConfig; + private timer: NodeJS.Timeout | null = null; + private running = false; + private executor: CronJobExecutor | null = null; + + constructor(config: CronConfig = {}) { + this.config = { + enabled: config.enabled ?? true, + maxConcurrentRuns: config.maxConcurrentRuns ?? 1, + ...config, + }; + this.store = new CronStore(config.storePath); + } + + /** + * Set the job executor callback. + * This is called when a job needs to be executed. + */ + setExecutor(executor: CronJobExecutor): void { + this.executor = executor; + } + + /** + * Start the cron service. + * Loads jobs from disk, computes schedules, and starts the timer. + */ + async start(): Promise { + if (this.running) return; + if (!this.config.enabled) { + console.log("[CronService] Cron is disabled by config"); + return; + } + + this.running = true; + console.log("[CronService] Starting..."); + + // Load jobs and compute next run times + const jobs = this.store.load(); + console.log(`[CronService] Loaded ${jobs.length} jobs`); + + // Recompute all schedules + this.recomputeAllSchedules(); + + // Clear any stuck jobs (running for > 2 hours) + this.clearStuckJobs(); + + // Arm timer for next job + this.armTimer(); + + console.log("[CronService] Started"); + } + + /** + * Stop the cron service. + */ + stop(): void { + if (!this.running) return; + + this.running = false; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + + console.log("[CronService] Stopped"); + } + + /** + * Get service status. + */ + status(): CronServiceStatus { + const allJobs = this.store.list(); + const enabledJobs = this.store.list({ enabled: true }); + + const nextWake = enabledJobs.reduce((min, job) => { + const next = job.state.nextRunAtMs; + return next !== undefined && next < min ? next : min; + }, Infinity); + + return { + running: this.running, + enabled: this.config.enabled ?? true, + storePath: this.store.getStorePath(), + jobCount: allJobs.length, + enabledJobCount: enabledJobs.length, + nextWakeAtMs: nextWake === Infinity ? null : nextWake, + }; + } + + /** + * List jobs with optional filter. + */ + list(filter?: { enabled?: boolean }): CronJob[] { + return this.store.list(filter); + } + + /** + * Get a job by ID. + */ + get(id: string): CronJob | undefined { + return this.store.get(id); + } + + /** + * Add a new job. + */ + add(input: CronJobInput): CronJob { + const now = Date.now(); + const job: CronJob = { + ...input, + id: uuidv7(), + createdAtMs: now, + updatedAtMs: now, + state: {}, + }; + + // Compute initial next run time + this.computeNextRun(job); + + this.store.set(job); + console.log(`[CronService] Added job: ${job.name} (${job.id}), next run: ${job.state.nextRunAtMs ? new Date(job.state.nextRunAtMs).toISOString() : "none"}`); + + // Re-arm timer in case this job runs sooner + if (this.running) { + this.armTimer(); + } + + return job; + } + + /** + * Update an existing job. + */ + update(id: string, patch: CronJobPatch): CronJob | null { + const job = this.store.get(id); + if (!job) return null; + + // Apply patch + Object.assign(job, patch, { updatedAtMs: Date.now() }); + + // Recompute schedule if changed + if (patch.schedule || patch.enabled !== undefined) { + this.computeNextRun(job); + } + + this.store.set(job); + console.log(`[CronService] Updated job: ${job.name} (${job.id})`); + + // Re-arm timer + if (this.running) { + this.armTimer(); + } + + return job; + } + + /** + * Remove a job. + */ + remove(id: string): boolean { + const job = this.store.get(id); + if (!job) return false; + + const deleted = this.store.delete(id); + if (deleted) { + console.log(`[CronService] Removed job: ${job.name} (${id})`); + } + + return deleted; + } + + /** + * Run a job immediately. + * + * @param id - Job ID + * @param force - Run even if disabled + */ + async run(id: string, force = false): Promise<{ ok: boolean; reason?: string }> { + const job = this.store.get(id); + if (!job) { + return { ok: false, reason: "Job not found" }; + } + + if (!job.enabled && !force) { + return { ok: false, reason: "Job is disabled" }; + } + + if (job.state.runningAtMs) { + return { ok: false, reason: "Job is already running" }; + } + + await this.executeJob(job); + return { ok: true }; + } + + /** + * Get run logs for a job. + */ + getRunLogs(id: string, limit?: number): CronRunLogEntry[] { + return this.store.getRunLogs(id, limit); + } + + // === Private Methods === + + /** + * Compute next run time for a job. + */ + private computeNextRun(job: CronJob): void { + if (!job.enabled) { + job.state.nextRunAtMs = undefined; + return; + } + + const now = Date.now(); + const nextMs = computeNextRunAtMs(job.schedule, now); + job.state.nextRunAtMs = nextMs; + } + + /** + * Recompute schedules for all enabled jobs. + */ + private recomputeAllSchedules(): void { + for (const job of this.store.list({ enabled: true })) { + this.computeNextRun(job); + this.store.set(job); + } + } + + /** + * Clear stuck jobs (running for too long). + */ + private clearStuckJobs(): void { + const now = Date.now(); + for (const job of this.store.list()) { + if (job.state.runningAtMs && now - job.state.runningAtMs > STUCK_JOB_TIMEOUT_MS) { + console.warn(`[CronService] Clearing stuck job: ${job.name} (${job.id})`); + job.state.runningAtMs = undefined; + job.state.lastStatus = "error"; + job.state.lastError = "Job was stuck (running > 2 hours)"; + this.store.set(job); + } + } + } + + /** + * Arm the timer for the next due job. + */ + private armTimer(): void { + if (!this.running) return; + + // Clear existing timer + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + + // Find next wake time + const enabledJobs = this.store.list({ enabled: true }); + const nextWake = enabledJobs.reduce((min, job) => { + const next = job.state.nextRunAtMs; + return next !== undefined && next < min ? next : min; + }, Infinity); + + if (nextWake === Infinity) { + // No jobs to run + return; + } + + const delay = Math.max(0, nextWake - Date.now()); + this.timer = setTimeout(() => this.onTimer(), delay); + } + + /** + * Timer callback: run all due jobs. + */ + private async onTimer(): Promise { + if (!this.running) return; + + const now = Date.now(); + const dueJobs = this.store + .list({ enabled: true }) + .filter((j) => { + const next = j.state.nextRunAtMs; + return next !== undefined && next <= now && !j.state.runningAtMs; + }); + + for (const job of dueJobs) { + try { + await this.executeJob(job); + } catch (error) { + console.error(`[CronService] Error executing job ${job.id}:`, error); + } + } + + // Re-arm timer for next batch + this.armTimer(); + } + + /** + * Execute a single job. + */ + private async executeJob(job: CronJob): Promise { + const startMs = Date.now(); + console.log(`[CronService] Executing job: ${job.name} (${job.id})`); + + // Mark as running + job.state.runningAtMs = startMs; + this.store.set(job); + + let status: "ok" | "error" = "ok"; + let error: string | undefined; + let summary: string | undefined; + + try { + if (this.executor) { + const result = await this.executor(job); + summary = result.summary; + if (result.error) { + status = "error"; + error = result.error; + } + } else { + // No executor set, just log + console.log(`[CronService] Job ${job.id} payload:`, job.payload); + } + } catch (err) { + status = "error"; + error = err instanceof Error ? err.message : String(err); + console.error(`[CronService] Job ${job.id} failed:`, err); + } + + const durationMs = Date.now() - startMs; + + // Update job state + job.state.runningAtMs = undefined; + job.state.lastRunAtMs = startMs; + job.state.lastStatus = status; + job.state.lastError = error; + job.state.lastDurationMs = durationMs; + + // Handle one-shot jobs + if (job.schedule.kind === "at") { + if (status === "ok" && job.deleteAfterRun) { + this.store.delete(job.id); + console.log(`[CronService] Deleted one-shot job: ${job.name} (${job.id})`); + } else { + job.enabled = false; + job.state.nextRunAtMs = undefined; + this.store.set(job); + } + } else { + // Compute next run for recurring jobs + this.computeNextRun(job); + this.store.set(job); + } + + // Append run log + this.store.appendRunLog(job.id, { + ts: startMs, + jobId: job.id, + action: status === "ok" ? "run" : "error", + status, + error, + summary, + durationMs, + nextRunAtMs: job.state.nextRunAtMs, + }); + + console.log(`[CronService] Job ${job.id} completed: ${status} (${durationMs}ms)`); + } +} + +// === Singleton === + +let cronServiceInstance: CronService | null = null; + +/** + * Get or create the singleton CronService instance. + */ +export function getCronService(config?: CronConfig): CronService { + if (!cronServiceInstance) { + cronServiceInstance = new CronService(config); + } + return cronServiceInstance; +} + +/** + * Shutdown the singleton CronService. + */ +export function shutdownCronService(): void { + if (cronServiceInstance) { + cronServiceInstance.stop(); + cronServiceInstance = null; + } +} diff --git a/src/cron/store.ts b/src/cron/store.ts new file mode 100644 index 00000000..fe4ca903 --- /dev/null +++ b/src/cron/store.ts @@ -0,0 +1,216 @@ +/** + * Cron Job Storage + * + * Persists jobs to JSON file and run logs to JSONL files. + * Based on OpenClaw's implementation (MIT License) + */ + +import { existsSync, mkdirSync, readFileSync, writeFileSync, appendFileSync } from "fs"; +import path from "path"; +import type { CronJob, CronRunLogEntry } from "./types.js"; + +/** Default cron storage directory */ +const DEFAULT_CRON_DIR = path.join( + process.env["HOME"] ?? ".", + ".super-multica", + "cron", +); + +/** Store data structure */ +type StoreData = { + version: number; + jobs: CronJob[]; +}; + +const STORE_VERSION = 1; + +export class CronStore { + private readonly jobsPath: string; + private readonly runsDir: string; + private jobs: Map = new Map(); + private loaded = false; + + constructor(baseDir: string = DEFAULT_CRON_DIR) { + this.jobsPath = path.join(baseDir, "jobs.json"); + this.runsDir = path.join(baseDir, "runs"); + } + + /** Ensure directories exist */ + private ensureDirs() { + const dir = path.dirname(this.jobsPath); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + if (!existsSync(this.runsDir)) { + mkdirSync(this.runsDir, { recursive: true }); + } + } + + /** Load jobs from disk */ + load(): CronJob[] { + this.ensureDirs(); + + if (!existsSync(this.jobsPath)) { + this.jobs = new Map(); + this.loaded = true; + return []; + } + + try { + const raw = readFileSync(this.jobsPath, "utf-8"); + const data: StoreData = JSON.parse(raw); + + // Validate version + if (data.version !== STORE_VERSION) { + console.warn(`[CronStore] Store version mismatch: ${data.version} vs ${STORE_VERSION}`); + } + + this.jobs = new Map(data.jobs.map((j) => [j.id, j])); + this.loaded = true; + return Array.from(this.jobs.values()); + } catch (error) { + console.error("[CronStore] Failed to load jobs:", error); + this.jobs = new Map(); + this.loaded = true; + return []; + } + } + + /** Save jobs to disk */ + save(): void { + this.ensureDirs(); + + const data: StoreData = { + version: STORE_VERSION, + jobs: Array.from(this.jobs.values()), + }; + + // Write to temp file first, then rename (atomic) + const tmpPath = this.jobsPath + ".tmp"; + const bakPath = this.jobsPath + ".bak"; + + try { + writeFileSync(tmpPath, JSON.stringify(data, null, 2), "utf-8"); + + // Backup existing file + if (existsSync(this.jobsPath)) { + writeFileSync(bakPath, readFileSync(this.jobsPath)); + } + + // Rename temp to actual (atomic on most filesystems) + const fs = require("fs"); + fs.renameSync(tmpPath, this.jobsPath); + } catch (error) { + console.error("[CronStore] Failed to save jobs:", error); + throw error; + } + } + + /** Ensure store is loaded */ + private ensureLoaded() { + if (!this.loaded) { + this.load(); + } + } + + /** Get a job by ID */ + get(id: string): CronJob | undefined { + this.ensureLoaded(); + return this.jobs.get(id); + } + + /** Set (create or update) a job */ + set(job: CronJob): void { + this.ensureLoaded(); + this.jobs.set(job.id, job); + this.save(); + } + + /** Delete a job by ID */ + delete(id: string): boolean { + this.ensureLoaded(); + const deleted = this.jobs.delete(id); + if (deleted) { + this.save(); + } + return deleted; + } + + /** List all jobs, optionally filtered */ + list(filter?: { enabled?: boolean }): CronJob[] { + this.ensureLoaded(); + let jobs = Array.from(this.jobs.values()); + + if (filter?.enabled !== undefined) { + jobs = jobs.filter((j) => j.enabled === filter.enabled); + } + + // Sort by next run time + jobs.sort((a, b) => { + const aNext = a.state.nextRunAtMs ?? Infinity; + const bNext = b.state.nextRunAtMs ?? Infinity; + return aNext - bNext; + }); + + return jobs; + } + + /** Get job count */ + count(filter?: { enabled?: boolean }): number { + return this.list(filter).length; + } + + // === Run Log Methods === + + /** Append a run log entry */ + appendRunLog(jobId: string, entry: CronRunLogEntry): void { + this.ensureDirs(); + const logPath = path.join(this.runsDir, `${jobId}.jsonl`); + const line = JSON.stringify(entry) + "\n"; + appendFileSync(logPath, line, "utf-8"); + } + + /** Get run logs for a job */ + getRunLogs(jobId: string, limit = 50): CronRunLogEntry[] { + const logPath = path.join(this.runsDir, `${jobId}.jsonl`); + + if (!existsSync(logPath)) { + return []; + } + + try { + const content = readFileSync(logPath, "utf-8").trim(); + if (!content) return []; + + const lines = content.split("\n"); + const entries = lines + .slice(-limit) + .map((line) => { + try { + return JSON.parse(line) as CronRunLogEntry; + } catch { + return null; + } + }) + .filter((e): e is CronRunLogEntry => e !== null); + + return entries; + } catch (error) { + console.error(`[CronStore] Failed to read run logs for ${jobId}:`, error); + return []; + } + } + + /** Clear run logs for a job */ + clearRunLogs(jobId: string): void { + const logPath = path.join(this.runsDir, `${jobId}.jsonl`); + if (existsSync(logPath)) { + writeFileSync(logPath, "", "utf-8"); + } + } + + /** Get the store path (for status display) */ + getStorePath(): string { + return this.jobsPath; + } +} diff --git a/src/cron/types.ts b/src/cron/types.ts new file mode 100644 index 00000000..72867f1b --- /dev/null +++ b/src/cron/types.ts @@ -0,0 +1,116 @@ +/** + * Cron Job Types + * + * Based on OpenClaw's implementation (MIT License) + */ + +/** Cron schedule: one-shot, interval, or cron expression */ +export type CronSchedule = + | { kind: "at"; atMs: number } + | { kind: "every"; everyMs: number; anchorMs?: number } + | { kind: "cron"; expr: string; tz?: string }; + +/** Where to run the job */ +export type CronSessionTarget = "main" | "isolated"; + +/** When to wake after job execution */ +export type CronWakeMode = "next-heartbeat" | "now"; + +/** Job payload: what to execute */ +export type CronPayload = + | { + kind: "system-event"; + /** Text to inject into main session */ + text: string; + } + | { + kind: "agent-turn"; + /** Message/prompt for the agent */ + message: string; + /** Optional model override (e.g., "anthropic/claude-3-opus") */ + model?: string; + /** Optional thinking level override */ + thinkingLevel?: string; + /** Timeout in seconds */ + timeoutSeconds?: number; + }; + +/** Runtime state of a job */ +export type CronJobState = { + /** Next scheduled run (ms since epoch) */ + nextRunAtMs?: number | undefined; + /** Currently running (lock marker, ms since epoch) */ + runningAtMs?: number | undefined; + /** Last completed run (ms since epoch) */ + lastRunAtMs?: number | undefined; + /** Last run status */ + lastStatus?: "ok" | "error" | "skipped" | undefined; + /** Last error message */ + lastError?: string | undefined; + /** Last run duration in ms */ + lastDurationMs?: number | undefined; +}; + +/** Cron job definition */ +export type CronJob = { + /** Unique identifier (UUIDv7) */ + id: string; + /** User-friendly name */ + name: string; + /** Optional description */ + description?: string; + /** Whether the job is enabled */ + enabled: boolean; + /** Delete after successful one-shot run */ + deleteAfterRun?: boolean; + /** Creation timestamp (ms) */ + createdAtMs: number; + /** Last update timestamp (ms) */ + updatedAtMs: number; + /** When to run */ + schedule: CronSchedule; + /** Where to run (main session or isolated) */ + sessionTarget: CronSessionTarget; + /** Wake mode after execution */ + wakeMode: CronWakeMode; + /** What to execute */ + payload: CronPayload; + /** Runtime state */ + state: CronJobState; +}; + +/** Input for creating a new job (without auto-generated fields) */ +export type CronJobInput = Omit; + +/** Input for updating an existing job */ +export type CronJobPatch = Partial>; + +/** Run log entry */ +export type CronRunLogEntry = { + /** Timestamp (ms) */ + ts: number; + /** Job ID */ + jobId: string; + /** Action taken */ + action: "run" | "skip" | "error"; + /** Result status */ + status: "ok" | "error" | "skipped"; + /** Error message if failed */ + error?: string | undefined; + /** Summary of execution (for agent-turn) */ + summary?: string | undefined; + /** Duration in ms */ + durationMs?: number | undefined; + /** Next scheduled run */ + nextRunAtMs?: number | undefined; +}; + +/** Cron service configuration */ +export type CronConfig = { + /** Whether cron is enabled (default: true) */ + enabled?: boolean; + /** Custom store path */ + storePath?: string; + /** Max concurrent job runs (default: 1) */ + maxConcurrentRuns?: number; +};