diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index db22b843..c1eb9a5e 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -15,6 +15,7 @@ export class AsyncAgent { private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); + private pendingWrites = 0; private closeCallbacks: Array<() => void> = []; readonly sessionId: string; @@ -38,6 +39,7 @@ export class AsyncAgent { /** Write message to agent (non-blocking, serialized queue) */ write(content: string): void { if (this._closed) throw new Error("Agent is closed"); + this.pendingWrites += 1; this.queue = this.queue .then(async () => { @@ -54,6 +56,9 @@ export class AsyncAgent { .catch((err) => { const message = err instanceof Error ? err.message : String(err); this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + }) + .finally(() => { + this.pendingWrites = Math.max(0, this.pendingWrites - 1); }); } @@ -170,6 +175,34 @@ export class AsyncAgent { return this.agent.getProfileId(); } + /** + * Get profile directory path, if profile is enabled. + */ + getProfileDir(): string | undefined { + return this.agent.getProfileDir(); + } + + /** + * Get heartbeat configuration from profile config. + */ + getHeartbeatConfig(): + | { + enabled?: boolean | undefined; + every?: string | undefined; + prompt?: string | undefined; + ackMaxChars?: number | undefined; + } + | undefined { + return this.agent.getHeartbeatConfig(); + } + + /** + * Number of queued/in-flight writes. + */ + getPendingWrites(): number { + return this.pendingWrites; + } + /** * Get agent display name from profile config. */ diff --git a/src/agent/profile/index.ts b/src/agent/profile/index.ts index cf7423e0..43a67d51 100644 --- a/src/agent/profile/index.ts +++ b/src/agent/profile/index.ts @@ -50,6 +50,7 @@ export function createAgentProfile( profile.user = DEFAULT_TEMPLATES.user; profile.workspace = DEFAULT_TEMPLATES.workspace; profile.memory = DEFAULT_TEMPLATES.memory; + profile.heartbeat = DEFAULT_TEMPLATES.heartbeat; // 保存到文件 saveProfile(profile, { baseDir }); @@ -150,6 +151,7 @@ export class ProfileManager { user: profile.user, workspace: profile.workspace, memory: profile.memory, + heartbeat: profile.heartbeat, config: profile.config, }, profileDir: this.getProfileDir(), @@ -168,6 +170,19 @@ export class ProfileManager { return profile?.config; } + /** Get heartbeat configuration from profile config */ + getHeartbeatConfig(): + | { + enabled?: boolean | undefined; + every?: string | undefined; + prompt?: string | undefined; + ackMaxChars?: number | undefined; + } + | undefined { + const profile = this.getProfile(); + return profile?.config?.heartbeat; + } + /** 更新 tools 配置 */ updateToolsConfig(toolsConfig: ToolsConfig): void { const profile = this.getOrCreateProfile(false); diff --git a/src/agent/profile/storage.ts b/src/agent/profile/storage.ts index 29d4c07e..9429aae1 100644 --- a/src/agent/profile/storage.ts +++ b/src/agent/profile/storage.ts @@ -95,13 +95,14 @@ export function loadProfile(profileId: string, options?: StorageOptions): AgentP user: readProfileFile(profileId, PROFILE_FILES.user, options), workspace: readProfileFile(profileId, PROFILE_FILES.workspace, options), memory: readProfileFile(profileId, PROFILE_FILES.memory, options), + heartbeat: readProfileFile(profileId, PROFILE_FILES.heartbeat, options), config: readProfileConfig(profileId, options), }; } /** 保存 AgentProfile(只写入非空字段) */ export function saveProfile(profile: AgentProfile, options?: StorageOptions): void { - const { id, soul, user, workspace, memory, config } = profile; + const { id, soul, user, workspace, memory, heartbeat, config } = profile; if (soul !== undefined) { writeProfileFile(id, PROFILE_FILES.soul, soul, options); @@ -115,6 +116,9 @@ export function saveProfile(profile: AgentProfile, options?: StorageOptions): vo if (memory !== undefined) { writeProfileFile(id, PROFILE_FILES.memory, memory, options); } + if (heartbeat !== undefined) { + writeProfileFile(id, PROFILE_FILES.heartbeat, heartbeat, options); + } if (config !== undefined) { writeProfileConfig(id, config, options); } diff --git a/src/agent/profile/templates.ts b/src/agent/profile/templates.ts index ffbb2d59..ca27f085 100644 --- a/src/agent/profile/templates.ts +++ b/src/agent/profile/templates.ts @@ -72,6 +72,7 @@ Your profile directory contains these files (use \`edit\` or \`write\` to update | \`user.md\` | About your human | As you learn about them | | \`workspace.md\` | This file — your rules | When you discover better conventions | | \`memory.md\` | Long-term knowledge | Regularly — capture what matters | +| \`heartbeat.md\` | Background check instructions | When heartbeat behavior should change | ## Every Session @@ -89,6 +90,7 @@ You wake up fresh each session. These files are your continuity: - **Long-term:** \`MEMORY.md\` — your curated memories, lessons learned - **Daily notes:** \`memory/YYYY-MM-DD.md\` — raw logs of what happened (optional) +- **Heartbeat:** \`heartbeat.md\` — periodic check loop instructions Capture what matters. Decisions, context, things to remember. @@ -101,6 +103,7 @@ Capture what matters. Decisions, context, things to remember. - \`memory.md\` — Your learnings: decisions made, lessons learned, important context - \`workspace.md\` — Your rules: conventions, workflows, how you should operate - \`soul.md\` — Your identity: only change if user wants to reshape who you are +- \`heartbeat.md\` — Periodic background checks and alert rules **Rules:** - **DO NOT** say "I'll remember that" without ACTUALLY calling \`edit\` or \`write\` on a file @@ -148,5 +151,11 @@ _(Persistent knowledge will be stored here. Update this as you learn.)_ ## Lessons Learned ## Important Context +`, + + heartbeat: `# heartbeat.md + +# Keep this file empty (or with only comments) to skip heartbeat API calls. +# Add tasks below when you want the agent to check something periodically. `, } as const; diff --git a/src/agent/profile/types.ts b/src/agent/profile/types.ts index 44e3f066..a250cf91 100644 --- a/src/agent/profile/types.ts +++ b/src/agent/profile/types.ts @@ -11,6 +11,7 @@ export const PROFILE_FILES = { user: "user.md", workspace: "workspace.md", memory: "memory.md", + heartbeat: "heartbeat.md", config: "config.json", } as const; @@ -42,6 +43,17 @@ export interface ProfileConfig { reasoningMode?: "off" | "on" | "stream" | undefined; /** Exec approval configuration (security level, ask mode, allowlist) */ execApproval?: ExecApprovalConfig | undefined; + /** Heartbeat configuration */ + heartbeat?: { + /** Global heartbeat enable switch */ + enabled?: boolean | undefined; + /** Interval, e.g. "30m", "1h" */ + every?: string | undefined; + /** Optional prompt override */ + prompt?: string | undefined; + /** Max chars after HEARTBEAT_OK to still treat as ack */ + ackMaxChars?: number | undefined; + } | undefined; } /** Agent Profile configuration */ @@ -56,6 +68,8 @@ export interface AgentProfile { workspace?: string | undefined; /** Persistent memory - long-term knowledge base */ memory?: string | undefined; + /** Periodic heartbeat instructions */ + heartbeat?: string | undefined; /** Profile configuration (from config.json) */ config?: ProfileConfig | undefined; } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 65fd528b..d364f0fa 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -595,6 +595,27 @@ export class Agent { return this.profile?.getProfile()?.id; } + /** + * Get profile directory path, if profile is enabled. + */ + getProfileDir(): string | undefined { + return this.profile?.getProfileDir(); + } + + /** + * Get heartbeat configuration from profile config. + */ + getHeartbeatConfig(): + | { + enabled?: boolean | undefined; + every?: string | undefined; + prompt?: string | undefined; + ackMaxChars?: number | undefined; + } + | undefined { + return this.profile?.getHeartbeatConfig(); + } + /** * Get agent display name from profile config. */ @@ -770,6 +791,7 @@ export class Agent { user: profile.user, workspace: profile.workspace, memory: profile.memory, + heartbeat: profile.heartbeat, config: profile.config, }, profileDir: this.profile!.getProfileDir(), diff --git a/src/agent/system-prompt/builder.ts b/src/agent/system-prompt/builder.ts index 619b05f8..a8578616 100644 --- a/src/agent/system-prompt/builder.ts +++ b/src/agent/system-prompt/builder.ts @@ -10,6 +10,7 @@ import type { SystemPromptReport, } from "./types.js"; import { + buildHeartbeatSection, buildConditionalToolSections, buildExtraPromptSection, buildIdentitySection, @@ -58,6 +59,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): { { name: "user", lines: buildUserSection(profile, mode) }, { name: "workspace", lines: buildWorkspaceSection(profile, mode, profileDir) }, { name: "memory", lines: buildMemoryFileSection(profile, mode) }, + { name: "heartbeat", lines: buildHeartbeatSection(profile, mode) }, { name: "safety", lines: buildSafetySection(includeSafety) }, { name: "tooling", lines: buildToolingSummary(tools, mode) }, { name: "tool-call-style", lines: buildToolCallStyleSection(mode) }, diff --git a/src/agent/system-prompt/sections.ts b/src/agent/system-prompt/sections.ts index 6a9a10ac..51b80f40 100644 --- a/src/agent/system-prompt/sections.ts +++ b/src/agent/system-prompt/sections.ts @@ -6,6 +6,7 @@ import { SAFETY_CONSTITUTION } from "./constitution.js"; import { formatRuntimeLine } from "./runtime-info.js"; +import { resolveHeartbeatPrompt } from "../../heartbeat/heartbeat-text.js"; import type { ProfileContent, RuntimeInfo, @@ -97,13 +98,14 @@ export function buildWorkspaceSection( "## Profile", "", `Your profile directory: \`${profileDir}\``, - "Use this as the base path for profile files (soul.md, user.md, memory.md, memory/*.md).", + "Use this as the base path for profile files (soul.md, user.md, memory.md, heartbeat.md, memory/*.md).", "", "Profile files:", "- `soul.md` — Your identity and values", "- `user.md` — Information about your user", "- `workspace.md` — Guidelines and conventions (below)", "- `memory.md` — Persistent knowledge", + "- `heartbeat.md` — Background heartbeat loop instructions", "", ); } @@ -128,6 +130,26 @@ export function buildMemoryFileSection( return []; } +/** + * Heartbeat section — full mode only. + * Keeps heartbeat protocol explicit in the agent instructions. + */ +export function buildHeartbeatSection( + profile: ProfileContent | undefined, + mode: SystemPromptMode, +): string[] { + if (mode !== "full") return []; + const prompt = resolveHeartbeatPrompt(profile?.config?.heartbeat?.prompt); + return [ + "## Heartbeats", + `Heartbeat prompt: ${prompt}`, + 'If you receive a heartbeat poll (a user message matching the heartbeat prompt above), and there is nothing that needs attention, reply exactly:', + "HEARTBEAT_OK", + 'If something needs attention, do NOT include "HEARTBEAT_OK"; reply with the alert text instead.', + "", + ]; +} + /** * Safety constitution — always included. */ diff --git a/src/agent/system-prompt/types.ts b/src/agent/system-prompt/types.ts index ae9d313e..2dc2b18e 100644 --- a/src/agent/system-prompt/types.ts +++ b/src/agent/system-prompt/types.ts @@ -53,6 +53,7 @@ export interface ProfileContent { user?: string | undefined; workspace?: string | undefined; memory?: string | undefined; + heartbeat?: string | undefined; config?: ProfileConfig | undefined; } diff --git a/src/heartbeat/heartbeat-events.ts b/src/heartbeat/heartbeat-events.ts new file mode 100644 index 00000000..8b64ceea --- /dev/null +++ b/src/heartbeat/heartbeat-events.ts @@ -0,0 +1,50 @@ +export type HeartbeatIndicatorType = "ok" | "alert" | "error"; + +export type HeartbeatEventPayload = { + ts: number; + status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed"; + preview?: string; + durationMs?: number; + reason?: string; + indicatorType?: HeartbeatIndicatorType; +}; + +export function resolveIndicatorType( + status: HeartbeatEventPayload["status"], +): HeartbeatIndicatorType | undefined { + switch (status) { + case "ok-empty": + case "ok-token": + return "ok"; + case "sent": + return "alert"; + case "failed": + return "error"; + case "skipped": + return undefined; + } +} + +let lastHeartbeat: HeartbeatEventPayload | null = null; +const listeners = new Set<(evt: HeartbeatEventPayload) => void>(); + +export function emitHeartbeatEvent(evt: Omit): void { + const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt }; + lastHeartbeat = enriched; + for (const listener of listeners) { + try { + listener(enriched); + } catch { + // Ignore listener errors so heartbeat flow stays robust. + } + } +} + +export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void { + listeners.add(listener); + return () => listeners.delete(listener); +} + +export function getLastHeartbeatEvent(): HeartbeatEventPayload | null { + return lastHeartbeat; +} diff --git a/src/heartbeat/heartbeat-text.test.ts b/src/heartbeat/heartbeat-text.test.ts new file mode 100644 index 00000000..2fb60708 --- /dev/null +++ b/src/heartbeat/heartbeat-text.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it } from "vitest"; +import { + HEARTBEAT_TOKEN, + isHeartbeatContentEffectivelyEmpty, + stripHeartbeatToken, +} from "./heartbeat-text.js"; + +describe("heartbeat-text", () => { + it("treats comment-only heartbeat files as empty", () => { + expect(isHeartbeatContentEffectivelyEmpty("# title\n\n- [ ]\n")).toBe(true); + expect(isHeartbeatContentEffectivelyEmpty("\n# note\n")).toBe(true); + expect(isHeartbeatContentEffectivelyEmpty("check disk health")).toBe(false); + }); + + it("strips plain token responses", () => { + const result = stripHeartbeatToken(HEARTBEAT_TOKEN, { mode: "heartbeat" }); + expect(result.shouldSkip).toBe(true); + expect(result.text).toBe(""); + }); + + it("keeps substantial content around token in heartbeat mode", () => { + const longTail = "Potential issue detected: disk usage is 92% on /Users"; + const result = stripHeartbeatToken(`${HEARTBEAT_TOKEN} ${longTail}`, { + mode: "heartbeat", + maxAckChars: 10, + }); + + expect(result.shouldSkip).toBe(false); + expect(result.text).toContain("disk usage"); + }); +}); diff --git a/src/heartbeat/heartbeat-text.ts b/src/heartbeat/heartbeat-text.ts new file mode 100644 index 00000000..a957ca2f --- /dev/null +++ b/src/heartbeat/heartbeat-text.ts @@ -0,0 +1,117 @@ +export const HEARTBEAT_TOKEN = "HEARTBEAT_OK"; + +export const HEARTBEAT_PROMPT = + "Read heartbeat.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK."; + +export const DEFAULT_HEARTBEAT_EVERY = "30m"; +export const DEFAULT_HEARTBEAT_ACK_MAX_CHARS = 300; + +export function isHeartbeatContentEffectivelyEmpty( + content: string | undefined | null, +): boolean { + if (content === undefined || content === null || typeof content !== "string") { + return false; + } + + const lines = content.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + if (/^#+(\s|$)/.test(trimmed)) continue; + if (/^[-*+]\s*(\[[\sXx]?\]\s*)?$/.test(trimmed)) continue; + return false; + } + + return true; +} + +export function resolveHeartbeatPrompt(raw?: string): string { + const trimmed = typeof raw === "string" ? raw.trim() : ""; + return trimmed || HEARTBEAT_PROMPT; +} + +export type StripHeartbeatMode = "heartbeat" | "message"; + +function stripTokenAtEdges(raw: string): { text: string; didStrip: boolean } { + let text = raw.trim(); + if (!text) return { text: "", didStrip: false }; + if (!text.includes(HEARTBEAT_TOKEN)) return { text, didStrip: false }; + + let didStrip = false; + let changed = true; + while (changed) { + changed = false; + const next = text.trim(); + if (next.startsWith(HEARTBEAT_TOKEN)) { + text = next.slice(HEARTBEAT_TOKEN.length).trimStart(); + didStrip = true; + changed = true; + continue; + } + if (next.endsWith(HEARTBEAT_TOKEN)) { + text = next.slice(0, Math.max(0, next.length - HEARTBEAT_TOKEN.length)).trimEnd(); + didStrip = true; + changed = true; + } + } + + return { + text: text.replace(/\s+/g, " ").trim(), + didStrip, + }; +} + +export function stripHeartbeatToken( + raw?: string, + opts: { mode?: StripHeartbeatMode; maxAckChars?: number } = {}, +): { shouldSkip: boolean; text: string; didStrip: boolean } { + if (!raw) return { shouldSkip: true, text: "", didStrip: false }; + + const trimmed = raw.trim(); + if (!trimmed) return { shouldSkip: true, text: "", didStrip: false }; + + const mode = opts.mode ?? "message"; + const maxAckCharsRaw = opts.maxAckChars; + const maxAckChars = Math.max( + 0, + typeof maxAckCharsRaw === "number" && Number.isFinite(maxAckCharsRaw) + ? maxAckCharsRaw + : DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + ); + + const stripMarkup = (text: string) => + text + .replace(/<[^>]*>/g, " ") + .replace(/ /gi, " ") + .replace(/^[*`~_]+/, "") + .replace(/[*`~_]+$/, ""); + + const normalized = stripMarkup(trimmed); + const hasToken = + trimmed.includes(HEARTBEAT_TOKEN) || normalized.includes(HEARTBEAT_TOKEN); + if (!hasToken) { + return { shouldSkip: false, text: trimmed, didStrip: false }; + } + + const strippedOriginal = stripTokenAtEdges(trimmed); + const strippedNormalized = stripTokenAtEdges(normalized); + const picked = + strippedOriginal.didStrip && strippedOriginal.text + ? strippedOriginal + : strippedNormalized; + + if (!picked.didStrip) { + return { shouldSkip: false, text: trimmed, didStrip: false }; + } + + if (!picked.text) { + return { shouldSkip: true, text: "", didStrip: true }; + } + + const rest = picked.text.trim(); + if (mode === "heartbeat" && rest.length <= maxAckChars) { + return { shouldSkip: true, text: "", didStrip: true }; + } + + return { shouldSkip: false, text: rest, didStrip: true }; +} diff --git a/src/heartbeat/heartbeat-wake.test.ts b/src/heartbeat/heartbeat-wake.test.ts new file mode 100644 index 00000000..d74e5b24 --- /dev/null +++ b/src/heartbeat/heartbeat-wake.test.ts @@ -0,0 +1,47 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + hasPendingHeartbeatWake, + requestHeartbeatNow, + setHeartbeatWakeHandler, +} from "./heartbeat-wake.js"; + +describe("heartbeat-wake", () => { + afterEach(() => { + setHeartbeatWakeHandler(null); + vi.useRealTimers(); + }); + + it("coalesces multiple wake requests into one run", async () => { + vi.useFakeTimers(); + const handler = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 })); + + setHeartbeatWakeHandler(handler); + requestHeartbeatNow({ reason: "a" }); + requestHeartbeatNow({ reason: "b" }); + requestHeartbeatNow({ reason: "c" }); + + expect(hasPendingHeartbeatWake()).toBe(true); + + await vi.advanceTimersByTimeAsync(300); + + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("retries when requests are in flight", async () => { + vi.useFakeTimers(); + + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped" as const, reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "ran" as const, durationMs: 3 }); + + setHeartbeatWakeHandler(handler); + requestHeartbeatNow({ reason: "retry-case" }); + + await vi.advanceTimersByTimeAsync(300); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1100); + expect(handler).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/heartbeat/heartbeat-wake.ts b/src/heartbeat/heartbeat-wake.ts new file mode 100644 index 00000000..22592fbc --- /dev/null +++ b/src/heartbeat/heartbeat-wake.ts @@ -0,0 +1,73 @@ +export type HeartbeatRunResult = + | { status: "ran"; durationMs: number } + | { status: "skipped"; reason: string } + | { status: "failed"; reason: string }; + +export type HeartbeatWakeHandler = (opts: { + reason?: string; +}) => Promise; + +let handler: HeartbeatWakeHandler | null = null; +let pendingReason: string | null = null; +let scheduled = false; +let running = false; +let timer: NodeJS.Timeout | null = null; + +const DEFAULT_COALESCE_MS = 250; +const DEFAULT_RETRY_MS = 1000; + +function schedule(coalesceMs: number): void { + if (timer) return; + timer = setTimeout(async () => { + timer = null; + scheduled = false; + const active = handler; + if (!active) return; + + if (running) { + scheduled = true; + schedule(coalesceMs); + return; + } + + const reason = pendingReason; + pendingReason = null; + running = true; + try { + const result = reason ? await active({ reason }) : await active({}); + if (result.status === "skipped" && result.reason === "requests-in-flight") { + pendingReason = reason ?? "retry"; + schedule(DEFAULT_RETRY_MS); + } + } catch { + pendingReason = reason ?? "retry"; + schedule(DEFAULT_RETRY_MS); + } finally { + running = false; + if (pendingReason || scheduled) { + schedule(coalesceMs); + } + } + }, coalesceMs); + timer.unref?.(); +} + +export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): void { + handler = next; + if (handler && pendingReason) { + schedule(DEFAULT_COALESCE_MS); + } +} + +export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }): void { + pendingReason = opts?.reason ?? pendingReason ?? "requested"; + schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS); +} + +export function hasHeartbeatWakeHandler(): boolean { + return handler !== null; +} + +export function hasPendingHeartbeatWake(): boolean { + return pendingReason !== null || Boolean(timer) || scheduled; +} diff --git a/src/heartbeat/index.ts b/src/heartbeat/index.ts new file mode 100644 index 00000000..a01b8b13 --- /dev/null +++ b/src/heartbeat/index.ts @@ -0,0 +1,45 @@ +export { + emitHeartbeatEvent, + getLastHeartbeatEvent, + onHeartbeatEvent, + resolveIndicatorType, + type HeartbeatEventPayload, + type HeartbeatIndicatorType, +} from "./heartbeat-events.js"; + +export { + hasHeartbeatWakeHandler, + hasPendingHeartbeatWake, + requestHeartbeatNow, + setHeartbeatWakeHandler, + type HeartbeatRunResult, + type HeartbeatWakeHandler, +} from "./heartbeat-wake.js"; + +export { + DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + DEFAULT_HEARTBEAT_EVERY, + HEARTBEAT_PROMPT, + HEARTBEAT_TOKEN, + isHeartbeatContentEffectivelyEmpty, + resolveHeartbeatPrompt, + stripHeartbeatToken, + type StripHeartbeatMode, +} from "./heartbeat-text.js"; + +export { + drainSystemEvents, + enqueueSystemEvent, + hasSystemEvents, + peekSystemEvents, + resetSystemEventsForTest, + type SystemEvent, +} from "./system-events.js"; + +export { + runHeartbeatOnce, + setHeartbeatsEnabled, + startHeartbeatRunner, + type HeartbeatConfig, + type HeartbeatRunner, +} from "./runner.js"; diff --git a/src/heartbeat/runner.test.ts b/src/heartbeat/runner.test.ts new file mode 100644 index 00000000..4a77c8fd --- /dev/null +++ b/src/heartbeat/runner.test.ts @@ -0,0 +1,74 @@ +import os from "node:os"; +import path from "node:path"; +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { afterEach, describe, expect, it } from "vitest"; +import { runHeartbeatOnce, setHeartbeatsEnabled } from "./runner.js"; + +type StubAgent = { + closed: boolean; + sessionId: string; + ensureInitialized: () => Promise; + getMessages: () => Array; + write: (content: string) => void; + waitForIdle: () => Promise; + getHeartbeatConfig: () => { prompt?: string; ackMaxChars?: number; enabled?: boolean }; + getPendingWrites: () => number; + getProfileDir: () => string | undefined; +}; + +function createStubAgent(opts?: { + profileDir?: string; + replyText?: string; + heartbeatEnabled?: boolean; +}): StubAgent { + const messages: Array = []; + const replyText = opts?.replyText ?? "HEARTBEAT_OK"; + + return { + closed: false, + sessionId: "test-session", + ensureInitialized: async () => {}, + getMessages: () => messages, + write: (content: string) => { + messages.push({ role: "user", content }); + messages.push({ role: "assistant", content: [{ type: "text", text: replyText }] }); + }, + waitForIdle: async () => {}, + getHeartbeatConfig: () => + typeof opts?.heartbeatEnabled === "boolean" + ? { enabled: opts.heartbeatEnabled } + : {}, + getPendingWrites: () => 0, + getProfileDir: () => opts?.profileDir, + }; +} + +describe("heartbeat runner", () => { + afterEach(() => { + setHeartbeatsEnabled(true); + }); + + it("skips when no agent is available", async () => { + const result = await runHeartbeatOnce({ agent: null }); + expect(result).toEqual({ status: "skipped", reason: "disabled" }); + }); + + it("skips when heartbeat file is effectively empty", async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), "heartbeat-test-")); + try { + await writeFile(path.join(dir, "heartbeat.md"), "# keep empty\n", "utf-8"); + const agent = createStubAgent({ profileDir: dir }); + const result = await runHeartbeatOnce({ agent: agent as any }); + expect(result).toEqual({ status: "skipped", reason: "empty-heartbeat-file" }); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + it("runs and returns ran for heartbeat acknowledgements", async () => { + const agent = createStubAgent({ replyText: "HEARTBEAT_OK" }); + const result = await runHeartbeatOnce({ agent: agent as any, reason: "manual" }); + + expect(result.status).toBe("ran"); + }); +}); diff --git a/src/heartbeat/runner.ts b/src/heartbeat/runner.ts new file mode 100644 index 00000000..281adf9b --- /dev/null +++ b/src/heartbeat/runner.ts @@ -0,0 +1,321 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { AsyncAgent } from "../agent/async-agent.js"; +import { + DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + DEFAULT_HEARTBEAT_EVERY, + isHeartbeatContentEffectivelyEmpty, + resolveHeartbeatPrompt, + stripHeartbeatToken, +} from "./heartbeat-text.js"; +import { + emitHeartbeatEvent, + resolveIndicatorType, + type HeartbeatEventPayload, +} from "./heartbeat-events.js"; +import { + setHeartbeatWakeHandler, + requestHeartbeatNow, + type HeartbeatRunResult, + type HeartbeatWakeHandler, +} from "./heartbeat-wake.js"; +import { drainSystemEvents } from "./system-events.js"; + +export type HeartbeatConfig = { + enabled?: boolean; + every?: string; + prompt?: string; + ackMaxChars?: number; +}; + +export type HeartbeatRunner = { + stop: () => void; + updateConfig: () => void; +}; + +type RunnerDeps = { + getAgent: () => AsyncAgent | null; + nowMs?: () => number; + logger?: Pick; +}; + +const HEARTBEAT_FILENAME = "heartbeat.md"; +const DEFAULT_INTERVAL_MS = 30 * 60 * 1000; +let heartbeatsEnabled = true; + +function resolveDurationMs(raw: string | undefined): number | null { + if (!raw) return DEFAULT_INTERVAL_MS; + const trimmed = raw.trim(); + if (!trimmed) return DEFAULT_INTERVAL_MS; + + const match = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i); + if (match) { + const num = Number.parseFloat(match[1]!); + const unit = match[2]!.toLowerCase(); + const unitMs: Record = { + s: 1000, + m: 60 * 1000, + h: 60 * 60 * 1000, + d: 24 * 60 * 60 * 1000, + }; + const ms = unitMs[unit]; + if (!Number.isFinite(num) || !ms) return null; + const value = Math.floor(num * ms); + return value > 0 ? value : null; + } + + if (/^\d+$/.test(trimmed)) { + const value = Number.parseInt(trimmed, 10); + return value > 0 ? value : null; + } + + return null; +} + +function extractMessageText(message: AgentMessage | undefined): string { + if (!message) return ""; + const raw = (message as { content?: unknown }).content; + if (typeof raw === "string") return raw; + if (!Array.isArray(raw)) return ""; + + const parts: string[] = []; + for (const block of raw) { + if (!block || typeof block !== "object") continue; + const text = (block as { text?: unknown }).text; + if (typeof text === "string" && text.trim()) { + parts.push(text); + } + } + return parts.join("\n").trim(); +} + +function getHeartbeatConfig(agent: AsyncAgent | null): HeartbeatConfig { + const cfg = agent?.getHeartbeatConfig(); + if (!cfg) return {}; + + const out: HeartbeatConfig = {}; + if (typeof cfg.enabled === "boolean") out.enabled = cfg.enabled; + if (typeof cfg.every === "string") out.every = cfg.every; + if (typeof cfg.prompt === "string") out.prompt = cfg.prompt; + if (typeof cfg.ackMaxChars === "number" && Number.isFinite(cfg.ackMaxChars)) { + out.ackMaxChars = cfg.ackMaxChars; + } + return out; +} + +function resolveHeartbeatIntervalMs(agent: AsyncAgent | null): number { + const cfg = getHeartbeatConfig(agent); + return resolveDurationMs(cfg.every ?? DEFAULT_HEARTBEAT_EVERY) ?? DEFAULT_INTERVAL_MS; +} + +function resolveSessionKey(agent: AsyncAgent): string { + return agent.sessionId; +} + +async function isHeartbeatFileEmpty(agent: AsyncAgent): Promise { + const profileDir = agent.getProfileDir(); + if (!profileDir) return false; + const heartbeatPath = path.join(profileDir, HEARTBEAT_FILENAME); + + try { + const content = await fs.readFile(heartbeatPath, "utf-8"); + return isHeartbeatContentEffectivelyEmpty(content); + } catch { + return false; + } +} + +export function setHeartbeatsEnabled(enabled: boolean): void { + heartbeatsEnabled = enabled; +} + +export async function runHeartbeatOnce(opts: { + agent: AsyncAgent | null; + reason?: string; + nowMs?: () => number; +}): Promise { + const startedAt = opts.nowMs?.() ?? Date.now(); + const agent = opts.agent; + + if (!heartbeatsEnabled) { + return { status: "skipped", reason: "disabled" }; + } + + if (!agent || agent.closed) { + return { status: "skipped", reason: "disabled" }; + } + + const cfg = getHeartbeatConfig(agent); + if (cfg.enabled === false) { + return { status: "skipped", reason: "disabled" }; + } + + if (agent.getPendingWrites() > 0) { + return { status: "skipped", reason: "requests-in-flight" }; + } + + try { + const isExecEvent = opts.reason === "exec-event"; + if (!isExecEvent && (await isHeartbeatFileEmpty(agent))) { + emitHeartbeatEvent({ + status: "skipped", + reason: "empty-heartbeat-file", + durationMs: Date.now() - startedAt, + }); + return { status: "skipped", reason: "empty-heartbeat-file" }; + } + + await agent.ensureInitialized(); + const beforeMessages = agent.getMessages(); + const sessionKey = resolveSessionKey(agent); + const pendingEvents = drainSystemEvents(sessionKey); + + const basePrompt = resolveHeartbeatPrompt(cfg.prompt); + const prompt = pendingEvents.length + ? `${basePrompt}\n\nSystem events:\n${pendingEvents.map((line) => `- ${line}`).join("\n")}` + : basePrompt; + + agent.write(prompt); + await agent.waitForIdle(); + + const afterMessages = agent.getMessages(); + const appended = afterMessages.slice(beforeMessages.length); + const assistant = [...appended] + .reverse() + .find((msg) => msg.role === "assistant"); + const text = extractMessageText(assistant); + + if (!text.trim()) { + const okEmptyEvent: Omit = { + status: "ok-empty", + durationMs: Date.now() - startedAt, + }; + if (opts.reason) okEmptyEvent.reason = opts.reason; + const indicator = resolveIndicatorType("ok-empty"); + if (indicator) okEmptyEvent.indicatorType = indicator; + emitHeartbeatEvent(okEmptyEvent); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const stripped = stripHeartbeatToken(text, { + mode: "heartbeat", + maxAckChars: cfg.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + }); + + if (stripped.shouldSkip) { + const okTokenEvent: Omit = { + status: "ok-token", + durationMs: Date.now() - startedAt, + }; + if (opts.reason) okTokenEvent.reason = opts.reason; + const indicator = resolveIndicatorType("ok-token"); + if (indicator) okTokenEvent.indicatorType = indicator; + emitHeartbeatEvent(okTokenEvent); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const sentEvent: Omit = { + status: "sent", + preview: stripped.text.slice(0, 200), + durationMs: Date.now() - startedAt, + }; + if (opts.reason) sentEvent.reason = opts.reason; + const sentIndicator = resolveIndicatorType("sent"); + if (sentIndicator) sentEvent.indicatorType = sentIndicator; + emitHeartbeatEvent(sentEvent); + return { status: "ran", durationMs: Date.now() - startedAt }; + } catch (error) { + const reason = error instanceof Error ? error.message : String(error); + const failedEvent: Omit = { + status: "failed", + reason, + durationMs: Date.now() - startedAt, + }; + const failedIndicator = resolveIndicatorType("failed"); + if (failedIndicator) failedEvent.indicatorType = failedIndicator; + emitHeartbeatEvent(failedEvent); + return { status: "failed", reason }; + } +} + +export function startHeartbeatRunner(deps: RunnerDeps): HeartbeatRunner { + const logger = deps.logger ?? console; + const nowMs = deps.nowMs ?? (() => Date.now()); + let timer: NodeJS.Timeout | null = null; + let stopped = false; + let intervalMs = resolveHeartbeatIntervalMs(deps.getAgent()); + let nextDueAtMs = nowMs() + intervalMs; + + const clearTimer = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + }; + + const scheduleNext = () => { + if (stopped) return; + clearTimer(); + + const delay = Math.max(0, nextDueAtMs - nowMs()); + timer = setTimeout(() => { + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + }, delay); + timer.unref?.(); + }; + + const run: HeartbeatWakeHandler = async (params) => { + const reason = params.reason; + const agent = deps.getAgent(); + + if (reason === "interval") { + const now = nowMs(); + if (now < nextDueAtMs) { + return { status: "skipped", reason: "not-due" }; + } + } + + const result = await runHeartbeatOnce( + reason + ? { + agent, + reason, + nowMs, + } + : { + agent, + nowMs, + }, + ); + + const activeAgent = deps.getAgent(); + intervalMs = resolveHeartbeatIntervalMs(activeAgent); + nextDueAtMs = nowMs() + intervalMs; + scheduleNext(); + + return result; + }; + + setHeartbeatWakeHandler(run); + scheduleNext(); + logger.info?.("[Heartbeat] runner started"); + + return { + stop: () => { + if (stopped) return; + stopped = true; + clearTimer(); + setHeartbeatWakeHandler(null); + logger.info?.("[Heartbeat] runner stopped"); + }, + updateConfig: () => { + const agent = deps.getAgent(); + intervalMs = resolveHeartbeatIntervalMs(agent); + nextDueAtMs = nowMs() + intervalMs; + scheduleNext(); + }, + }; +} + +export type { HeartbeatEventPayload }; diff --git a/src/heartbeat/system-events.ts b/src/heartbeat/system-events.ts new file mode 100644 index 00000000..7927fd9e --- /dev/null +++ b/src/heartbeat/system-events.ts @@ -0,0 +1,51 @@ +export type SystemEvent = { text: string; ts: number }; + +const MAX_EVENTS = 20; +const queues = new Map(); + +function normalizeSessionKey(key: string | undefined): string { + const trimmed = typeof key === "string" ? key.trim() : ""; + if (!trimmed) { + throw new Error("system events require a sessionKey"); + } + return trimmed; +} + +export function enqueueSystemEvent(text: string, opts: { sessionKey: string }): void { + const sessionKey = normalizeSessionKey(opts.sessionKey); + const cleaned = text.trim(); + if (!cleaned) return; + + const list = queues.get(sessionKey) ?? []; + const previous = list[list.length - 1]; + if (previous?.text === cleaned) { + return; + } + + list.push({ text: cleaned, ts: Date.now() }); + if (list.length > MAX_EVENTS) { + list.splice(0, list.length - MAX_EVENTS); + } + queues.set(sessionKey, list); +} + +export function drainSystemEvents(sessionKey: string): string[] { + const key = normalizeSessionKey(sessionKey); + const list = queues.get(key) ?? []; + queues.delete(key); + return list.map((entry) => entry.text); +} + +export function peekSystemEvents(sessionKey: string): string[] { + const key = normalizeSessionKey(sessionKey); + return (queues.get(key) ?? []).map((entry) => entry.text); +} + +export function hasSystemEvents(sessionKey: string): boolean { + const key = normalizeSessionKey(sessionKey); + return (queues.get(key)?.length ?? 0) > 0; +} + +export function resetSystemEventsForTest(): void { + queues.clear(); +} diff --git a/src/index.ts b/src/index.ts index 4ecfa6ca..961b5b46 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,3 +2,4 @@ export * from "./agent/index.js"; export * from "./gateway/index.js"; export * from "./client/index.js"; export * from "./shared/index.js"; +export * from "./heartbeat/index.js";