feat(cron): add cron job scheduling module
Implements a timer-based cron job system: - types.ts: Job types (at, every, cron schedules) - schedule.ts: Next run computation using croner - store.ts: Persistent JSON storage with JSONL run logs - service.ts: CronService with timer management - execute.ts: Job execution (system-event, agent-turn) Based on OpenClaw's implementation (MIT License). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
00b31e23f5
commit
de355cace3
6 changed files with 1129 additions and 0 deletions
139
src/cron/execute.ts
Normal file
139
src/cron/execute.ts
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Cron Job Execution
|
||||
*
|
||||
* Handles the actual execution of cron job payloads.
|
||||
* Based on OpenClaw's implementation (MIT License)
|
||||
*/
|
||||
|
||||
import type { CronJob } from "./types.js";
|
||||
import { getHub, isHubInitialized } from "../hub/hub-singleton.js";
|
||||
|
||||
/** Execution result */
|
||||
export type ExecutionResult = {
|
||||
summary?: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Execute a cron job payload.
|
||||
*
|
||||
* For system-event: Injects text into the main session
|
||||
* For agent-turn: Creates an isolated agent turn
|
||||
*/
|
||||
export async function executeCronJob(job: CronJob): Promise<ExecutionResult> {
|
||||
const { payload } = job;
|
||||
|
||||
switch (payload.kind) {
|
||||
case "system-event":
|
||||
return executeSystemEvent(job);
|
||||
case "agent-turn":
|
||||
return executeAgentTurn(job);
|
||||
default:
|
||||
return { error: `Unknown payload kind: ${(payload as { kind: string }).kind}` };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a system-event payload.
|
||||
* Injects the text into the main session as a system message.
|
||||
*/
|
||||
async function executeSystemEvent(job: CronJob): Promise<ExecutionResult> {
|
||||
if (!isHubInitialized()) {
|
||||
return { error: "Hub not available" };
|
||||
}
|
||||
const hub = getHub();
|
||||
|
||||
const payload = job.payload as { kind: "system-event"; text: string };
|
||||
|
||||
// Get the list of active agents
|
||||
const agentIds = hub.listAgents();
|
||||
if (agentIds.length === 0) {
|
||||
return { error: "No active agents" };
|
||||
}
|
||||
|
||||
// For now, inject into the first (main) agent
|
||||
// TODO: Support targeting specific agent by ID
|
||||
const agentId = agentIds[0]!;
|
||||
const agent = hub.getAgent(agentId);
|
||||
if (!agent || agent.closed) {
|
||||
return { error: `Agent ${agentId} not found or closed` };
|
||||
}
|
||||
|
||||
// Format the cron message with metadata
|
||||
const cronMessage = `[CRON] ${job.name}: ${payload.text}`;
|
||||
|
||||
try {
|
||||
// Write to agent (non-blocking, will be processed in queue)
|
||||
agent.write(cronMessage);
|
||||
|
||||
// Wait for the agent to process the message
|
||||
await agent.waitForIdle();
|
||||
|
||||
return { summary: `Injected message into agent ${agentId.slice(0, 8)}` };
|
||||
} catch (err) {
|
||||
return { error: err instanceof Error ? err.message : String(err) };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an agent-turn payload.
|
||||
* Creates an isolated subagent to run the task.
|
||||
*/
|
||||
async function executeAgentTurn(job: CronJob): Promise<ExecutionResult> {
|
||||
if (!isHubInitialized()) {
|
||||
return { error: "Hub not available" };
|
||||
}
|
||||
const hub = getHub();
|
||||
|
||||
const payload = job.payload as {
|
||||
kind: "agent-turn";
|
||||
message: string;
|
||||
model?: string;
|
||||
thinkingLevel?: string;
|
||||
timeoutSeconds?: number;
|
||||
};
|
||||
|
||||
// Generate a unique session ID for this isolated run
|
||||
const sessionId = `cron-${job.id}-${Date.now()}`;
|
||||
|
||||
try {
|
||||
// Create isolated subagent
|
||||
// TODO: Support model/thinkingLevel override
|
||||
const agent = hub.createSubagent(sessionId, {
|
||||
profileId: "default",
|
||||
});
|
||||
|
||||
// Set up timeout if specified
|
||||
const timeoutMs = (payload.timeoutSeconds ?? 300) * 1000; // default 5 minutes
|
||||
let timeoutHandle: NodeJS.Timeout | undefined;
|
||||
|
||||
const timeoutPromise = new Promise<ExecutionResult>((_, reject) => {
|
||||
timeoutHandle = setTimeout(() => {
|
||||
reject(new Error(`Cron job timed out after ${payload.timeoutSeconds}s`));
|
||||
}, timeoutMs);
|
||||
});
|
||||
|
||||
// Execute the agent turn
|
||||
const executePromise = (async (): Promise<ExecutionResult> => {
|
||||
const cronMessage = `[CRON Job: ${job.name}]\n\n${payload.message}`;
|
||||
agent.write(cronMessage);
|
||||
await agent.waitForIdle();
|
||||
return { summary: `Completed agent turn in isolated session ${sessionId.slice(0, 16)}` };
|
||||
})();
|
||||
|
||||
// Race between execution and timeout
|
||||
const result = await Promise.race([executePromise, timeoutPromise]);
|
||||
|
||||
// Clear timeout
|
||||
if (timeoutHandle) {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
|
||||
// Close the subagent
|
||||
agent.close();
|
||||
|
||||
return result;
|
||||
} catch (err) {
|
||||
return { error: err instanceof Error ? err.message : String(err) };
|
||||
}
|
||||
}
|
||||
39
src/cron/index.ts
Normal file
39
src/cron/index.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Cron Module
|
||||
*
|
||||
* Provides scheduled task functionality for Super Multica.
|
||||
*/
|
||||
|
||||
export type {
|
||||
CronSchedule,
|
||||
CronSessionTarget,
|
||||
CronWakeMode,
|
||||
CronPayload,
|
||||
CronJobState,
|
||||
CronJob,
|
||||
CronJobInput,
|
||||
CronJobPatch,
|
||||
CronRunLogEntry,
|
||||
CronConfig,
|
||||
} from "./types.js";
|
||||
|
||||
export {
|
||||
computeNextRunAtMs,
|
||||
isValidCronExpr,
|
||||
parseTimeInput,
|
||||
parseIntervalInput,
|
||||
formatSchedule,
|
||||
formatDuration,
|
||||
} from "./schedule.js";
|
||||
|
||||
export { CronStore } from "./store.js";
|
||||
|
||||
export {
|
||||
CronService,
|
||||
getCronService,
|
||||
shutdownCronService,
|
||||
type CronJobExecutor,
|
||||
type CronServiceStatus,
|
||||
} from "./service.js";
|
||||
|
||||
export { executeCronJob, type ExecutionResult } from "./execute.js";
|
||||
187
src/cron/schedule.ts
Normal file
187
src/cron/schedule.ts
Normal file
|
|
@ -0,0 +1,187 @@
|
|||
/**
|
||||
* Cron Schedule Computation
|
||||
*
|
||||
* Based on OpenClaw's implementation (MIT License)
|
||||
*/
|
||||
|
||||
import { Cron } from "croner";
|
||||
import type { CronSchedule } from "./types.js";
|
||||
|
||||
/**
|
||||
* Compute the next run time for a schedule.
|
||||
*
|
||||
* @param schedule - The schedule configuration
|
||||
* @param nowMs - Current time in milliseconds (default: Date.now())
|
||||
* @returns Next run time in ms, or undefined if no future run
|
||||
*/
|
||||
export function computeNextRunAtMs(
|
||||
schedule: CronSchedule,
|
||||
nowMs: number = Date.now(),
|
||||
): number | undefined {
|
||||
switch (schedule.kind) {
|
||||
case "at":
|
||||
// One-shot: return the timestamp if it's in the future
|
||||
return schedule.atMs > nowMs ? schedule.atMs : undefined;
|
||||
|
||||
case "every": {
|
||||
// Fixed interval: compute next occurrence
|
||||
const everyMs = Math.max(1, Math.floor(schedule.everyMs));
|
||||
const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs));
|
||||
|
||||
if (nowMs < anchor) return anchor;
|
||||
|
||||
const elapsed = nowMs - anchor;
|
||||
const steps = Math.max(1, Math.floor((elapsed + everyMs - 1) / everyMs));
|
||||
return anchor + steps * everyMs;
|
||||
}
|
||||
|
||||
case "cron": {
|
||||
// Cron expression: use croner to compute next run
|
||||
const expr = schedule.expr.trim();
|
||||
if (!expr) return undefined;
|
||||
|
||||
try {
|
||||
const tz = schedule.tz?.trim();
|
||||
const cron = tz ? new Cron(expr, { timezone: tz }) : new Cron(expr);
|
||||
const next = cron.nextRun(new Date(nowMs));
|
||||
return next ? next.getTime() : undefined;
|
||||
} catch (error) {
|
||||
console.error(`[Cron] Invalid cron expression: ${expr}`, error);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate a cron expression.
|
||||
*
|
||||
* @param expr - Cron expression (5-field)
|
||||
* @param tz - Optional timezone
|
||||
* @returns true if valid, false otherwise
|
||||
*/
|
||||
export function isValidCronExpr(expr: string, tz?: string): boolean {
|
||||
try {
|
||||
const timezone = tz?.trim();
|
||||
if (timezone) {
|
||||
new Cron(expr.trim(), { timezone });
|
||||
} else {
|
||||
new Cron(expr.trim());
|
||||
}
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a human-readable time string into milliseconds.
|
||||
*
|
||||
* Supports:
|
||||
* - Relative: "10s", "5m", "2h", "1d"
|
||||
* - ISO 8601: "2024-01-15T09:00:00Z"
|
||||
* - Unix timestamp (if numeric)
|
||||
*
|
||||
* @param input - Time string
|
||||
* @param nowMs - Current time for relative calculations
|
||||
* @returns Timestamp in ms, or undefined if invalid
|
||||
*/
|
||||
export function parseTimeInput(input: string, nowMs: number = Date.now()): number | undefined {
|
||||
const trimmed = input.trim();
|
||||
|
||||
// Check for relative time (e.g., "10m", "2h")
|
||||
const relativeMatch = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i);
|
||||
if (relativeMatch) {
|
||||
const [, numStr, unit] = relativeMatch;
|
||||
const num = parseFloat(numStr!);
|
||||
const multipliers: Record<string, number> = {
|
||||
s: 1000,
|
||||
m: 60 * 1000,
|
||||
h: 60 * 60 * 1000,
|
||||
d: 24 * 60 * 60 * 1000,
|
||||
};
|
||||
const ms = multipliers[unit!.toLowerCase()];
|
||||
if (ms !== undefined) {
|
||||
return nowMs + num * ms;
|
||||
}
|
||||
}
|
||||
|
||||
// Check for numeric (unix timestamp in ms or seconds)
|
||||
if (/^\d+$/.test(trimmed)) {
|
||||
const num = parseInt(trimmed, 10);
|
||||
// If it looks like seconds (before year 2100), convert to ms
|
||||
if (num < 4102444800) {
|
||||
return num * 1000;
|
||||
}
|
||||
return num;
|
||||
}
|
||||
|
||||
// Try ISO 8601 date parsing
|
||||
const date = new Date(trimmed);
|
||||
if (!isNaN(date.getTime())) {
|
||||
return date.getTime();
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse an interval string into milliseconds.
|
||||
*
|
||||
* Supports: "30s", "5m", "2h", "1d", or raw milliseconds
|
||||
*
|
||||
* @param input - Interval string
|
||||
* @returns Interval in ms, or undefined if invalid
|
||||
*/
|
||||
export function parseIntervalInput(input: string): number | undefined {
|
||||
const trimmed = input.trim();
|
||||
|
||||
// Check for duration format (e.g., "30m", "2h")
|
||||
const match = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i);
|
||||
if (match) {
|
||||
const [, numStr, unit] = match;
|
||||
const num = parseFloat(numStr!);
|
||||
const multipliers: Record<string, number> = {
|
||||
s: 1000,
|
||||
m: 60 * 1000,
|
||||
h: 60 * 60 * 1000,
|
||||
d: 24 * 60 * 60 * 1000,
|
||||
};
|
||||
const ms = multipliers[unit!.toLowerCase()];
|
||||
if (ms !== undefined) {
|
||||
return num * ms;
|
||||
}
|
||||
}
|
||||
|
||||
// Check for raw milliseconds
|
||||
if (/^\d+$/.test(trimmed)) {
|
||||
return parseInt(trimmed, 10);
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a schedule for display.
|
||||
*/
|
||||
export function formatSchedule(schedule: CronSchedule): string {
|
||||
switch (schedule.kind) {
|
||||
case "at":
|
||||
return `at ${new Date(schedule.atMs).toISOString()}`;
|
||||
case "every":
|
||||
return `every ${formatDuration(schedule.everyMs)}`;
|
||||
case "cron":
|
||||
return `cron "${schedule.expr}"${schedule.tz ? ` (${schedule.tz})` : ""}`;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Format milliseconds as human-readable duration.
|
||||
*/
|
||||
export function formatDuration(ms: number): string {
|
||||
if (ms < 1000) return `${ms}ms`;
|
||||
if (ms < 60 * 1000) return `${Math.round(ms / 1000)}s`;
|
||||
if (ms < 60 * 60 * 1000) return `${Math.round(ms / (60 * 1000))}m`;
|
||||
if (ms < 24 * 60 * 60 * 1000) return `${Math.round(ms / (60 * 60 * 1000))}h`;
|
||||
return `${Math.round(ms / (24 * 60 * 60 * 1000))}d`;
|
||||
}
|
||||
432
src/cron/service.ts
Normal file
432
src/cron/service.ts
Normal file
|
|
@ -0,0 +1,432 @@
|
|||
/**
|
||||
* Cron Service
|
||||
*
|
||||
* Manages scheduled jobs with timer-based execution.
|
||||
* Based on OpenClaw's implementation (MIT License)
|
||||
*/
|
||||
|
||||
import { v7 as uuidv7 } from "uuid";
|
||||
import type {
|
||||
CronJob,
|
||||
CronJobInput,
|
||||
CronJobPatch,
|
||||
CronJobState,
|
||||
CronRunLogEntry,
|
||||
CronConfig,
|
||||
} from "./types.js";
|
||||
import { CronStore } from "./store.js";
|
||||
import { computeNextRunAtMs } from "./schedule.js";
|
||||
|
||||
/** Callback for job execution */
|
||||
export type CronJobExecutor = (job: CronJob) => Promise<{ summary?: string; error?: string }>;
|
||||
|
||||
/** Service status */
|
||||
export type CronServiceStatus = {
|
||||
running: boolean;
|
||||
enabled: boolean;
|
||||
storePath: string;
|
||||
jobCount: number;
|
||||
enabledJobCount: number;
|
||||
nextWakeAtMs: number | null;
|
||||
};
|
||||
|
||||
/** Default stuck job timeout (2 hours) */
|
||||
const STUCK_JOB_TIMEOUT_MS = 2 * 60 * 60 * 1000;
|
||||
|
||||
export class CronService {
|
||||
private readonly store: CronStore;
|
||||
private readonly config: CronConfig;
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
private running = false;
|
||||
private executor: CronJobExecutor | null = null;
|
||||
|
||||
constructor(config: CronConfig = {}) {
|
||||
this.config = {
|
||||
enabled: config.enabled ?? true,
|
||||
maxConcurrentRuns: config.maxConcurrentRuns ?? 1,
|
||||
...config,
|
||||
};
|
||||
this.store = new CronStore(config.storePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the job executor callback.
|
||||
* This is called when a job needs to be executed.
|
||||
*/
|
||||
setExecutor(executor: CronJobExecutor): void {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the cron service.
|
||||
* Loads jobs from disk, computes schedules, and starts the timer.
|
||||
*/
|
||||
async start(): Promise<void> {
|
||||
if (this.running) return;
|
||||
if (!this.config.enabled) {
|
||||
console.log("[CronService] Cron is disabled by config");
|
||||
return;
|
||||
}
|
||||
|
||||
this.running = true;
|
||||
console.log("[CronService] Starting...");
|
||||
|
||||
// Load jobs and compute next run times
|
||||
const jobs = this.store.load();
|
||||
console.log(`[CronService] Loaded ${jobs.length} jobs`);
|
||||
|
||||
// Recompute all schedules
|
||||
this.recomputeAllSchedules();
|
||||
|
||||
// Clear any stuck jobs (running for > 2 hours)
|
||||
this.clearStuckJobs();
|
||||
|
||||
// Arm timer for next job
|
||||
this.armTimer();
|
||||
|
||||
console.log("[CronService] Started");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the cron service.
|
||||
*/
|
||||
stop(): void {
|
||||
if (!this.running) return;
|
||||
|
||||
this.running = false;
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
|
||||
console.log("[CronService] Stopped");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get service status.
|
||||
*/
|
||||
status(): CronServiceStatus {
|
||||
const allJobs = this.store.list();
|
||||
const enabledJobs = this.store.list({ enabled: true });
|
||||
|
||||
const nextWake = enabledJobs.reduce((min, job) => {
|
||||
const next = job.state.nextRunAtMs;
|
||||
return next !== undefined && next < min ? next : min;
|
||||
}, Infinity);
|
||||
|
||||
return {
|
||||
running: this.running,
|
||||
enabled: this.config.enabled ?? true,
|
||||
storePath: this.store.getStorePath(),
|
||||
jobCount: allJobs.length,
|
||||
enabledJobCount: enabledJobs.length,
|
||||
nextWakeAtMs: nextWake === Infinity ? null : nextWake,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* List jobs with optional filter.
|
||||
*/
|
||||
list(filter?: { enabled?: boolean }): CronJob[] {
|
||||
return this.store.list(filter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a job by ID.
|
||||
*/
|
||||
get(id: string): CronJob | undefined {
|
||||
return this.store.get(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new job.
|
||||
*/
|
||||
add(input: CronJobInput): CronJob {
|
||||
const now = Date.now();
|
||||
const job: CronJob = {
|
||||
...input,
|
||||
id: uuidv7(),
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
state: {},
|
||||
};
|
||||
|
||||
// Compute initial next run time
|
||||
this.computeNextRun(job);
|
||||
|
||||
this.store.set(job);
|
||||
console.log(`[CronService] Added job: ${job.name} (${job.id}), next run: ${job.state.nextRunAtMs ? new Date(job.state.nextRunAtMs).toISOString() : "none"}`);
|
||||
|
||||
// Re-arm timer in case this job runs sooner
|
||||
if (this.running) {
|
||||
this.armTimer();
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update an existing job.
|
||||
*/
|
||||
update(id: string, patch: CronJobPatch): CronJob | null {
|
||||
const job = this.store.get(id);
|
||||
if (!job) return null;
|
||||
|
||||
// Apply patch
|
||||
Object.assign(job, patch, { updatedAtMs: Date.now() });
|
||||
|
||||
// Recompute schedule if changed
|
||||
if (patch.schedule || patch.enabled !== undefined) {
|
||||
this.computeNextRun(job);
|
||||
}
|
||||
|
||||
this.store.set(job);
|
||||
console.log(`[CronService] Updated job: ${job.name} (${job.id})`);
|
||||
|
||||
// Re-arm timer
|
||||
if (this.running) {
|
||||
this.armTimer();
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a job.
|
||||
*/
|
||||
remove(id: string): boolean {
|
||||
const job = this.store.get(id);
|
||||
if (!job) return false;
|
||||
|
||||
const deleted = this.store.delete(id);
|
||||
if (deleted) {
|
||||
console.log(`[CronService] Removed job: ${job.name} (${id})`);
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a job immediately.
|
||||
*
|
||||
* @param id - Job ID
|
||||
* @param force - Run even if disabled
|
||||
*/
|
||||
async run(id: string, force = false): Promise<{ ok: boolean; reason?: string }> {
|
||||
const job = this.store.get(id);
|
||||
if (!job) {
|
||||
return { ok: false, reason: "Job not found" };
|
||||
}
|
||||
|
||||
if (!job.enabled && !force) {
|
||||
return { ok: false, reason: "Job is disabled" };
|
||||
}
|
||||
|
||||
if (job.state.runningAtMs) {
|
||||
return { ok: false, reason: "Job is already running" };
|
||||
}
|
||||
|
||||
await this.executeJob(job);
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get run logs for a job.
|
||||
*/
|
||||
getRunLogs(id: string, limit?: number): CronRunLogEntry[] {
|
||||
return this.store.getRunLogs(id, limit);
|
||||
}
|
||||
|
||||
// === Private Methods ===
|
||||
|
||||
/**
|
||||
* Compute next run time for a job.
|
||||
*/
|
||||
private computeNextRun(job: CronJob): void {
|
||||
if (!job.enabled) {
|
||||
job.state.nextRunAtMs = undefined;
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const nextMs = computeNextRunAtMs(job.schedule, now);
|
||||
job.state.nextRunAtMs = nextMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recompute schedules for all enabled jobs.
|
||||
*/
|
||||
private recomputeAllSchedules(): void {
|
||||
for (const job of this.store.list({ enabled: true })) {
|
||||
this.computeNextRun(job);
|
||||
this.store.set(job);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear stuck jobs (running for too long).
|
||||
*/
|
||||
private clearStuckJobs(): void {
|
||||
const now = Date.now();
|
||||
for (const job of this.store.list()) {
|
||||
if (job.state.runningAtMs && now - job.state.runningAtMs > STUCK_JOB_TIMEOUT_MS) {
|
||||
console.warn(`[CronService] Clearing stuck job: ${job.name} (${job.id})`);
|
||||
job.state.runningAtMs = undefined;
|
||||
job.state.lastStatus = "error";
|
||||
job.state.lastError = "Job was stuck (running > 2 hours)";
|
||||
this.store.set(job);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Arm the timer for the next due job.
|
||||
*/
|
||||
private armTimer(): void {
|
||||
if (!this.running) return;
|
||||
|
||||
// Clear existing timer
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
|
||||
// Find next wake time
|
||||
const enabledJobs = this.store.list({ enabled: true });
|
||||
const nextWake = enabledJobs.reduce((min, job) => {
|
||||
const next = job.state.nextRunAtMs;
|
||||
return next !== undefined && next < min ? next : min;
|
||||
}, Infinity);
|
||||
|
||||
if (nextWake === Infinity) {
|
||||
// No jobs to run
|
||||
return;
|
||||
}
|
||||
|
||||
const delay = Math.max(0, nextWake - Date.now());
|
||||
this.timer = setTimeout(() => this.onTimer(), delay);
|
||||
}
|
||||
|
||||
/**
|
||||
* Timer callback: run all due jobs.
|
||||
*/
|
||||
private async onTimer(): Promise<void> {
|
||||
if (!this.running) return;
|
||||
|
||||
const now = Date.now();
|
||||
const dueJobs = this.store
|
||||
.list({ enabled: true })
|
||||
.filter((j) => {
|
||||
const next = j.state.nextRunAtMs;
|
||||
return next !== undefined && next <= now && !j.state.runningAtMs;
|
||||
});
|
||||
|
||||
for (const job of dueJobs) {
|
||||
try {
|
||||
await this.executeJob(job);
|
||||
} catch (error) {
|
||||
console.error(`[CronService] Error executing job ${job.id}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
// Re-arm timer for next batch
|
||||
this.armTimer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a single job.
|
||||
*/
|
||||
private async executeJob(job: CronJob): Promise<void> {
|
||||
const startMs = Date.now();
|
||||
console.log(`[CronService] Executing job: ${job.name} (${job.id})`);
|
||||
|
||||
// Mark as running
|
||||
job.state.runningAtMs = startMs;
|
||||
this.store.set(job);
|
||||
|
||||
let status: "ok" | "error" = "ok";
|
||||
let error: string | undefined;
|
||||
let summary: string | undefined;
|
||||
|
||||
try {
|
||||
if (this.executor) {
|
||||
const result = await this.executor(job);
|
||||
summary = result.summary;
|
||||
if (result.error) {
|
||||
status = "error";
|
||||
error = result.error;
|
||||
}
|
||||
} else {
|
||||
// No executor set, just log
|
||||
console.log(`[CronService] Job ${job.id} payload:`, job.payload);
|
||||
}
|
||||
} catch (err) {
|
||||
status = "error";
|
||||
error = err instanceof Error ? err.message : String(err);
|
||||
console.error(`[CronService] Job ${job.id} failed:`, err);
|
||||
}
|
||||
|
||||
const durationMs = Date.now() - startMs;
|
||||
|
||||
// Update job state
|
||||
job.state.runningAtMs = undefined;
|
||||
job.state.lastRunAtMs = startMs;
|
||||
job.state.lastStatus = status;
|
||||
job.state.lastError = error;
|
||||
job.state.lastDurationMs = durationMs;
|
||||
|
||||
// Handle one-shot jobs
|
||||
if (job.schedule.kind === "at") {
|
||||
if (status === "ok" && job.deleteAfterRun) {
|
||||
this.store.delete(job.id);
|
||||
console.log(`[CronService] Deleted one-shot job: ${job.name} (${job.id})`);
|
||||
} else {
|
||||
job.enabled = false;
|
||||
job.state.nextRunAtMs = undefined;
|
||||
this.store.set(job);
|
||||
}
|
||||
} else {
|
||||
// Compute next run for recurring jobs
|
||||
this.computeNextRun(job);
|
||||
this.store.set(job);
|
||||
}
|
||||
|
||||
// Append run log
|
||||
this.store.appendRunLog(job.id, {
|
||||
ts: startMs,
|
||||
jobId: job.id,
|
||||
action: status === "ok" ? "run" : "error",
|
||||
status,
|
||||
error,
|
||||
summary,
|
||||
durationMs,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
});
|
||||
|
||||
console.log(`[CronService] Job ${job.id} completed: ${status} (${durationMs}ms)`);
|
||||
}
|
||||
}
|
||||
|
||||
// === Singleton ===
|
||||
|
||||
let cronServiceInstance: CronService | null = null;
|
||||
|
||||
/**
|
||||
* Get or create the singleton CronService instance.
|
||||
*/
|
||||
export function getCronService(config?: CronConfig): CronService {
|
||||
if (!cronServiceInstance) {
|
||||
cronServiceInstance = new CronService(config);
|
||||
}
|
||||
return cronServiceInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the singleton CronService.
|
||||
*/
|
||||
export function shutdownCronService(): void {
|
||||
if (cronServiceInstance) {
|
||||
cronServiceInstance.stop();
|
||||
cronServiceInstance = null;
|
||||
}
|
||||
}
|
||||
216
src/cron/store.ts
Normal file
216
src/cron/store.ts
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
/**
|
||||
* Cron Job Storage
|
||||
*
|
||||
* Persists jobs to JSON file and run logs to JSONL files.
|
||||
* Based on OpenClaw's implementation (MIT License)
|
||||
*/
|
||||
|
||||
import { existsSync, mkdirSync, readFileSync, writeFileSync, appendFileSync } from "fs";
|
||||
import path from "path";
|
||||
import type { CronJob, CronRunLogEntry } from "./types.js";
|
||||
|
||||
/** Default cron storage directory */
|
||||
const DEFAULT_CRON_DIR = path.join(
|
||||
process.env["HOME"] ?? ".",
|
||||
".super-multica",
|
||||
"cron",
|
||||
);
|
||||
|
||||
/** Store data structure */
|
||||
type StoreData = {
|
||||
version: number;
|
||||
jobs: CronJob[];
|
||||
};
|
||||
|
||||
const STORE_VERSION = 1;
|
||||
|
||||
export class CronStore {
|
||||
private readonly jobsPath: string;
|
||||
private readonly runsDir: string;
|
||||
private jobs: Map<string, CronJob> = new Map();
|
||||
private loaded = false;
|
||||
|
||||
constructor(baseDir: string = DEFAULT_CRON_DIR) {
|
||||
this.jobsPath = path.join(baseDir, "jobs.json");
|
||||
this.runsDir = path.join(baseDir, "runs");
|
||||
}
|
||||
|
||||
/** Ensure directories exist */
|
||||
private ensureDirs() {
|
||||
const dir = path.dirname(this.jobsPath);
|
||||
if (!existsSync(dir)) {
|
||||
mkdirSync(dir, { recursive: true });
|
||||
}
|
||||
if (!existsSync(this.runsDir)) {
|
||||
mkdirSync(this.runsDir, { recursive: true });
|
||||
}
|
||||
}
|
||||
|
||||
/** Load jobs from disk */
|
||||
load(): CronJob[] {
|
||||
this.ensureDirs();
|
||||
|
||||
if (!existsSync(this.jobsPath)) {
|
||||
this.jobs = new Map();
|
||||
this.loaded = true;
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
const raw = readFileSync(this.jobsPath, "utf-8");
|
||||
const data: StoreData = JSON.parse(raw);
|
||||
|
||||
// Validate version
|
||||
if (data.version !== STORE_VERSION) {
|
||||
console.warn(`[CronStore] Store version mismatch: ${data.version} vs ${STORE_VERSION}`);
|
||||
}
|
||||
|
||||
this.jobs = new Map(data.jobs.map((j) => [j.id, j]));
|
||||
this.loaded = true;
|
||||
return Array.from(this.jobs.values());
|
||||
} catch (error) {
|
||||
console.error("[CronStore] Failed to load jobs:", error);
|
||||
this.jobs = new Map();
|
||||
this.loaded = true;
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/** Save jobs to disk */
|
||||
save(): void {
|
||||
this.ensureDirs();
|
||||
|
||||
const data: StoreData = {
|
||||
version: STORE_VERSION,
|
||||
jobs: Array.from(this.jobs.values()),
|
||||
};
|
||||
|
||||
// Write to temp file first, then rename (atomic)
|
||||
const tmpPath = this.jobsPath + ".tmp";
|
||||
const bakPath = this.jobsPath + ".bak";
|
||||
|
||||
try {
|
||||
writeFileSync(tmpPath, JSON.stringify(data, null, 2), "utf-8");
|
||||
|
||||
// Backup existing file
|
||||
if (existsSync(this.jobsPath)) {
|
||||
writeFileSync(bakPath, readFileSync(this.jobsPath));
|
||||
}
|
||||
|
||||
// Rename temp to actual (atomic on most filesystems)
|
||||
const fs = require("fs");
|
||||
fs.renameSync(tmpPath, this.jobsPath);
|
||||
} catch (error) {
|
||||
console.error("[CronStore] Failed to save jobs:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/** Ensure store is loaded */
|
||||
private ensureLoaded() {
|
||||
if (!this.loaded) {
|
||||
this.load();
|
||||
}
|
||||
}
|
||||
|
||||
/** Get a job by ID */
|
||||
get(id: string): CronJob | undefined {
|
||||
this.ensureLoaded();
|
||||
return this.jobs.get(id);
|
||||
}
|
||||
|
||||
/** Set (create or update) a job */
|
||||
set(job: CronJob): void {
|
||||
this.ensureLoaded();
|
||||
this.jobs.set(job.id, job);
|
||||
this.save();
|
||||
}
|
||||
|
||||
/** Delete a job by ID */
|
||||
delete(id: string): boolean {
|
||||
this.ensureLoaded();
|
||||
const deleted = this.jobs.delete(id);
|
||||
if (deleted) {
|
||||
this.save();
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/** List all jobs, optionally filtered */
|
||||
list(filter?: { enabled?: boolean }): CronJob[] {
|
||||
this.ensureLoaded();
|
||||
let jobs = Array.from(this.jobs.values());
|
||||
|
||||
if (filter?.enabled !== undefined) {
|
||||
jobs = jobs.filter((j) => j.enabled === filter.enabled);
|
||||
}
|
||||
|
||||
// Sort by next run time
|
||||
jobs.sort((a, b) => {
|
||||
const aNext = a.state.nextRunAtMs ?? Infinity;
|
||||
const bNext = b.state.nextRunAtMs ?? Infinity;
|
||||
return aNext - bNext;
|
||||
});
|
||||
|
||||
return jobs;
|
||||
}
|
||||
|
||||
/** Get job count */
|
||||
count(filter?: { enabled?: boolean }): number {
|
||||
return this.list(filter).length;
|
||||
}
|
||||
|
||||
// === Run Log Methods ===
|
||||
|
||||
/** Append a run log entry */
|
||||
appendRunLog(jobId: string, entry: CronRunLogEntry): void {
|
||||
this.ensureDirs();
|
||||
const logPath = path.join(this.runsDir, `${jobId}.jsonl`);
|
||||
const line = JSON.stringify(entry) + "\n";
|
||||
appendFileSync(logPath, line, "utf-8");
|
||||
}
|
||||
|
||||
/** Get run logs for a job */
|
||||
getRunLogs(jobId: string, limit = 50): CronRunLogEntry[] {
|
||||
const logPath = path.join(this.runsDir, `${jobId}.jsonl`);
|
||||
|
||||
if (!existsSync(logPath)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
const content = readFileSync(logPath, "utf-8").trim();
|
||||
if (!content) return [];
|
||||
|
||||
const lines = content.split("\n");
|
||||
const entries = lines
|
||||
.slice(-limit)
|
||||
.map((line) => {
|
||||
try {
|
||||
return JSON.parse(line) as CronRunLogEntry;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.filter((e): e is CronRunLogEntry => e !== null);
|
||||
|
||||
return entries;
|
||||
} catch (error) {
|
||||
console.error(`[CronStore] Failed to read run logs for ${jobId}:`, error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/** Clear run logs for a job */
|
||||
clearRunLogs(jobId: string): void {
|
||||
const logPath = path.join(this.runsDir, `${jobId}.jsonl`);
|
||||
if (existsSync(logPath)) {
|
||||
writeFileSync(logPath, "", "utf-8");
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the store path (for status display) */
|
||||
getStorePath(): string {
|
||||
return this.jobsPath;
|
||||
}
|
||||
}
|
||||
116
src/cron/types.ts
Normal file
116
src/cron/types.ts
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Cron Job Types
|
||||
*
|
||||
* Based on OpenClaw's implementation (MIT License)
|
||||
*/
|
||||
|
||||
/** Cron schedule: one-shot, interval, or cron expression */
|
||||
export type CronSchedule =
|
||||
| { kind: "at"; atMs: number }
|
||||
| { kind: "every"; everyMs: number; anchorMs?: number }
|
||||
| { kind: "cron"; expr: string; tz?: string };
|
||||
|
||||
/** Where to run the job */
|
||||
export type CronSessionTarget = "main" | "isolated";
|
||||
|
||||
/** When to wake after job execution */
|
||||
export type CronWakeMode = "next-heartbeat" | "now";
|
||||
|
||||
/** Job payload: what to execute */
|
||||
export type CronPayload =
|
||||
| {
|
||||
kind: "system-event";
|
||||
/** Text to inject into main session */
|
||||
text: string;
|
||||
}
|
||||
| {
|
||||
kind: "agent-turn";
|
||||
/** Message/prompt for the agent */
|
||||
message: string;
|
||||
/** Optional model override (e.g., "anthropic/claude-3-opus") */
|
||||
model?: string;
|
||||
/** Optional thinking level override */
|
||||
thinkingLevel?: string;
|
||||
/** Timeout in seconds */
|
||||
timeoutSeconds?: number;
|
||||
};
|
||||
|
||||
/** Runtime state of a job */
|
||||
export type CronJobState = {
|
||||
/** Next scheduled run (ms since epoch) */
|
||||
nextRunAtMs?: number | undefined;
|
||||
/** Currently running (lock marker, ms since epoch) */
|
||||
runningAtMs?: number | undefined;
|
||||
/** Last completed run (ms since epoch) */
|
||||
lastRunAtMs?: number | undefined;
|
||||
/** Last run status */
|
||||
lastStatus?: "ok" | "error" | "skipped" | undefined;
|
||||
/** Last error message */
|
||||
lastError?: string | undefined;
|
||||
/** Last run duration in ms */
|
||||
lastDurationMs?: number | undefined;
|
||||
};
|
||||
|
||||
/** Cron job definition */
|
||||
export type CronJob = {
|
||||
/** Unique identifier (UUIDv7) */
|
||||
id: string;
|
||||
/** User-friendly name */
|
||||
name: string;
|
||||
/** Optional description */
|
||||
description?: string;
|
||||
/** Whether the job is enabled */
|
||||
enabled: boolean;
|
||||
/** Delete after successful one-shot run */
|
||||
deleteAfterRun?: boolean;
|
||||
/** Creation timestamp (ms) */
|
||||
createdAtMs: number;
|
||||
/** Last update timestamp (ms) */
|
||||
updatedAtMs: number;
|
||||
/** When to run */
|
||||
schedule: CronSchedule;
|
||||
/** Where to run (main session or isolated) */
|
||||
sessionTarget: CronSessionTarget;
|
||||
/** Wake mode after execution */
|
||||
wakeMode: CronWakeMode;
|
||||
/** What to execute */
|
||||
payload: CronPayload;
|
||||
/** Runtime state */
|
||||
state: CronJobState;
|
||||
};
|
||||
|
||||
/** Input for creating a new job (without auto-generated fields) */
|
||||
export type CronJobInput = Omit<CronJob, "id" | "createdAtMs" | "updatedAtMs" | "state">;
|
||||
|
||||
/** Input for updating an existing job */
|
||||
export type CronJobPatch = Partial<Omit<CronJob, "id" | "createdAtMs" | "updatedAtMs">>;
|
||||
|
||||
/** Run log entry */
|
||||
export type CronRunLogEntry = {
|
||||
/** Timestamp (ms) */
|
||||
ts: number;
|
||||
/** Job ID */
|
||||
jobId: string;
|
||||
/** Action taken */
|
||||
action: "run" | "skip" | "error";
|
||||
/** Result status */
|
||||
status: "ok" | "error" | "skipped";
|
||||
/** Error message if failed */
|
||||
error?: string | undefined;
|
||||
/** Summary of execution (for agent-turn) */
|
||||
summary?: string | undefined;
|
||||
/** Duration in ms */
|
||||
durationMs?: number | undefined;
|
||||
/** Next scheduled run */
|
||||
nextRunAtMs?: number | undefined;
|
||||
};
|
||||
|
||||
/** Cron service configuration */
|
||||
export type CronConfig = {
|
||||
/** Whether cron is enabled (default: true) */
|
||||
enabled?: boolean;
|
||||
/** Custom store path */
|
||||
storePath?: string;
|
||||
/** Max concurrent job runs (default: 1) */
|
||||
maxConcurrentRuns?: number;
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue