feat(heartbeat): add heartbeat core runner and profile integration

This commit is contained in:
Jiang Bohan 2026-02-06 16:41:56 +08:00
parent 91aa433f34
commit ffa5355a95
19 changed files with 934 additions and 2 deletions

View file

@ -15,6 +15,7 @@ export class AsyncAgent {
private readonly channel = new Channel<ChannelItem>();
private _closed = false;
private queue: Promise<void> = Promise.resolve();
private pendingWrites = 0;
private closeCallbacks: Array<() => void> = [];
readonly sessionId: string;
@ -38,6 +39,7 @@ export class AsyncAgent {
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
if (this._closed) throw new Error("Agent is closed");
this.pendingWrites += 1;
this.queue = this.queue
.then(async () => {
@ -54,6 +56,9 @@ export class AsyncAgent {
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
})
.finally(() => {
this.pendingWrites = Math.max(0, this.pendingWrites - 1);
});
}
@ -170,6 +175,34 @@ export class AsyncAgent {
return this.agent.getProfileId();
}
/**
* Get profile directory path, if profile is enabled.
*/
getProfileDir(): string | undefined {
return this.agent.getProfileDir();
}
/**
* Get heartbeat configuration from profile config.
*/
getHeartbeatConfig():
| {
enabled?: boolean | undefined;
every?: string | undefined;
prompt?: string | undefined;
ackMaxChars?: number | undefined;
}
| undefined {
return this.agent.getHeartbeatConfig();
}
/**
* Number of queued/in-flight writes.
*/
getPendingWrites(): number {
return this.pendingWrites;
}
/**
* Get agent display name from profile config.
*/

View file

@ -50,6 +50,7 @@ export function createAgentProfile(
profile.user = DEFAULT_TEMPLATES.user;
profile.workspace = DEFAULT_TEMPLATES.workspace;
profile.memory = DEFAULT_TEMPLATES.memory;
profile.heartbeat = DEFAULT_TEMPLATES.heartbeat;
// 保存到文件
saveProfile(profile, { baseDir });
@ -150,6 +151,7 @@ export class ProfileManager {
user: profile.user,
workspace: profile.workspace,
memory: profile.memory,
heartbeat: profile.heartbeat,
config: profile.config,
},
profileDir: this.getProfileDir(),
@ -168,6 +170,19 @@ export class ProfileManager {
return profile?.config;
}
/** Get heartbeat configuration from profile config */
getHeartbeatConfig():
| {
enabled?: boolean | undefined;
every?: string | undefined;
prompt?: string | undefined;
ackMaxChars?: number | undefined;
}
| undefined {
const profile = this.getProfile();
return profile?.config?.heartbeat;
}
/** 更新 tools 配置 */
updateToolsConfig(toolsConfig: ToolsConfig): void {
const profile = this.getOrCreateProfile(false);

View file

@ -95,13 +95,14 @@ export function loadProfile(profileId: string, options?: StorageOptions): AgentP
user: readProfileFile(profileId, PROFILE_FILES.user, options),
workspace: readProfileFile(profileId, PROFILE_FILES.workspace, options),
memory: readProfileFile(profileId, PROFILE_FILES.memory, options),
heartbeat: readProfileFile(profileId, PROFILE_FILES.heartbeat, options),
config: readProfileConfig(profileId, options),
};
}
/** 保存 AgentProfile只写入非空字段 */
export function saveProfile(profile: AgentProfile, options?: StorageOptions): void {
const { id, soul, user, workspace, memory, config } = profile;
const { id, soul, user, workspace, memory, heartbeat, config } = profile;
if (soul !== undefined) {
writeProfileFile(id, PROFILE_FILES.soul, soul, options);
@ -115,6 +116,9 @@ export function saveProfile(profile: AgentProfile, options?: StorageOptions): vo
if (memory !== undefined) {
writeProfileFile(id, PROFILE_FILES.memory, memory, options);
}
if (heartbeat !== undefined) {
writeProfileFile(id, PROFILE_FILES.heartbeat, heartbeat, options);
}
if (config !== undefined) {
writeProfileConfig(id, config, options);
}

View file

@ -72,6 +72,7 @@ Your profile directory contains these files (use \`edit\` or \`write\` to update
| \`user.md\` | About your human | As you learn about them |
| \`workspace.md\` | This file — your rules | When you discover better conventions |
| \`memory.md\` | Long-term knowledge | Regularly — capture what matters |
| \`heartbeat.md\` | Background check instructions | When heartbeat behavior should change |
## Every Session
@ -89,6 +90,7 @@ You wake up fresh each session. These files are your continuity:
- **Long-term:** \`MEMORY.md\` — your curated memories, lessons learned
- **Daily notes:** \`memory/YYYY-MM-DD.md\` — raw logs of what happened (optional)
- **Heartbeat:** \`heartbeat.md\` — periodic check loop instructions
Capture what matters. Decisions, context, things to remember.
@ -101,6 +103,7 @@ Capture what matters. Decisions, context, things to remember.
- \`memory.md\` — Your learnings: decisions made, lessons learned, important context
- \`workspace.md\` — Your rules: conventions, workflows, how you should operate
- \`soul.md\` — Your identity: only change if user wants to reshape who you are
- \`heartbeat.md\` — Periodic background checks and alert rules
**Rules:**
- **DO NOT** say "I'll remember that" without ACTUALLY calling \`edit\` or \`write\` on a file
@ -148,5 +151,11 @@ _(Persistent knowledge will be stored here. Update this as you learn.)_
## Lessons Learned
## Important Context
`,
heartbeat: `# heartbeat.md
# Keep this file empty (or with only comments) to skip heartbeat API calls.
# Add tasks below when you want the agent to check something periodically.
`,
} as const;

View file

@ -11,6 +11,7 @@ export const PROFILE_FILES = {
user: "user.md",
workspace: "workspace.md",
memory: "memory.md",
heartbeat: "heartbeat.md",
config: "config.json",
} as const;
@ -42,6 +43,17 @@ export interface ProfileConfig {
reasoningMode?: "off" | "on" | "stream" | undefined;
/** Exec approval configuration (security level, ask mode, allowlist) */
execApproval?: ExecApprovalConfig | undefined;
/** Heartbeat configuration */
heartbeat?: {
/** Global heartbeat enable switch */
enabled?: boolean | undefined;
/** Interval, e.g. "30m", "1h" */
every?: string | undefined;
/** Optional prompt override */
prompt?: string | undefined;
/** Max chars after HEARTBEAT_OK to still treat as ack */
ackMaxChars?: number | undefined;
} | undefined;
}
/** Agent Profile configuration */
@ -56,6 +68,8 @@ export interface AgentProfile {
workspace?: string | undefined;
/** Persistent memory - long-term knowledge base */
memory?: string | undefined;
/** Periodic heartbeat instructions */
heartbeat?: string | undefined;
/** Profile configuration (from config.json) */
config?: ProfileConfig | undefined;
}

View file

@ -595,6 +595,27 @@ export class Agent {
return this.profile?.getProfile()?.id;
}
/**
* Get profile directory path, if profile is enabled.
*/
getProfileDir(): string | undefined {
return this.profile?.getProfileDir();
}
/**
* Get heartbeat configuration from profile config.
*/
getHeartbeatConfig():
| {
enabled?: boolean | undefined;
every?: string | undefined;
prompt?: string | undefined;
ackMaxChars?: number | undefined;
}
| undefined {
return this.profile?.getHeartbeatConfig();
}
/**
* Get agent display name from profile config.
*/
@ -770,6 +791,7 @@ export class Agent {
user: profile.user,
workspace: profile.workspace,
memory: profile.memory,
heartbeat: profile.heartbeat,
config: profile.config,
},
profileDir: this.profile!.getProfileDir(),

View file

@ -10,6 +10,7 @@ import type {
SystemPromptReport,
} from "./types.js";
import {
buildHeartbeatSection,
buildConditionalToolSections,
buildExtraPromptSection,
buildIdentitySection,
@ -58,6 +59,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): {
{ name: "user", lines: buildUserSection(profile, mode) },
{ name: "workspace", lines: buildWorkspaceSection(profile, mode, profileDir) },
{ name: "memory", lines: buildMemoryFileSection(profile, mode) },
{ name: "heartbeat", lines: buildHeartbeatSection(profile, mode) },
{ name: "safety", lines: buildSafetySection(includeSafety) },
{ name: "tooling", lines: buildToolingSummary(tools, mode) },
{ name: "tool-call-style", lines: buildToolCallStyleSection(mode) },

View file

@ -6,6 +6,7 @@
import { SAFETY_CONSTITUTION } from "./constitution.js";
import { formatRuntimeLine } from "./runtime-info.js";
import { resolveHeartbeatPrompt } from "../../heartbeat/heartbeat-text.js";
import type {
ProfileContent,
RuntimeInfo,
@ -97,13 +98,14 @@ export function buildWorkspaceSection(
"## Profile",
"",
`Your profile directory: \`${profileDir}\``,
"Use this as the base path for profile files (soul.md, user.md, memory.md, memory/*.md).",
"Use this as the base path for profile files (soul.md, user.md, memory.md, heartbeat.md, memory/*.md).",
"",
"Profile files:",
"- `soul.md` — Your identity and values",
"- `user.md` — Information about your user",
"- `workspace.md` — Guidelines and conventions (below)",
"- `memory.md` — Persistent knowledge",
"- `heartbeat.md` — Background heartbeat loop instructions",
"",
);
}
@ -128,6 +130,26 @@ export function buildMemoryFileSection(
return [];
}
/**
* Heartbeat section full mode only.
* Keeps heartbeat protocol explicit in the agent instructions.
*/
export function buildHeartbeatSection(
profile: ProfileContent | undefined,
mode: SystemPromptMode,
): string[] {
if (mode !== "full") return [];
const prompt = resolveHeartbeatPrompt(profile?.config?.heartbeat?.prompt);
return [
"## Heartbeats",
`Heartbeat prompt: ${prompt}`,
'If you receive a heartbeat poll (a user message matching the heartbeat prompt above), and there is nothing that needs attention, reply exactly:',
"HEARTBEAT_OK",
'If something needs attention, do NOT include "HEARTBEAT_OK"; reply with the alert text instead.',
"",
];
}
/**
* Safety constitution always included.
*/

View file

@ -53,6 +53,7 @@ export interface ProfileContent {
user?: string | undefined;
workspace?: string | undefined;
memory?: string | undefined;
heartbeat?: string | undefined;
config?: ProfileConfig | undefined;
}

View file

@ -0,0 +1,50 @@
export type HeartbeatIndicatorType = "ok" | "alert" | "error";
export type HeartbeatEventPayload = {
ts: number;
status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed";
preview?: string;
durationMs?: number;
reason?: string;
indicatorType?: HeartbeatIndicatorType;
};
export function resolveIndicatorType(
status: HeartbeatEventPayload["status"],
): HeartbeatIndicatorType | undefined {
switch (status) {
case "ok-empty":
case "ok-token":
return "ok";
case "sent":
return "alert";
case "failed":
return "error";
case "skipped":
return undefined;
}
}
let lastHeartbeat: HeartbeatEventPayload | null = null;
const listeners = new Set<(evt: HeartbeatEventPayload) => void>();
export function emitHeartbeatEvent(evt: Omit<HeartbeatEventPayload, "ts">): void {
const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt };
lastHeartbeat = enriched;
for (const listener of listeners) {
try {
listener(enriched);
} catch {
// Ignore listener errors so heartbeat flow stays robust.
}
}
}
export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void {
listeners.add(listener);
return () => listeners.delete(listener);
}
export function getLastHeartbeatEvent(): HeartbeatEventPayload | null {
return lastHeartbeat;
}

View file

@ -0,0 +1,31 @@
import { describe, expect, it } from "vitest";
import {
HEARTBEAT_TOKEN,
isHeartbeatContentEffectivelyEmpty,
stripHeartbeatToken,
} from "./heartbeat-text.js";
describe("heartbeat-text", () => {
it("treats comment-only heartbeat files as empty", () => {
expect(isHeartbeatContentEffectivelyEmpty("# title\n\n- [ ]\n")).toBe(true);
expect(isHeartbeatContentEffectivelyEmpty("\n# note\n")).toBe(true);
expect(isHeartbeatContentEffectivelyEmpty("check disk health")).toBe(false);
});
it("strips plain token responses", () => {
const result = stripHeartbeatToken(HEARTBEAT_TOKEN, { mode: "heartbeat" });
expect(result.shouldSkip).toBe(true);
expect(result.text).toBe("");
});
it("keeps substantial content around token in heartbeat mode", () => {
const longTail = "Potential issue detected: disk usage is 92% on /Users";
const result = stripHeartbeatToken(`${HEARTBEAT_TOKEN} ${longTail}`, {
mode: "heartbeat",
maxAckChars: 10,
});
expect(result.shouldSkip).toBe(false);
expect(result.text).toContain("disk usage");
});
});

View file

@ -0,0 +1,117 @@
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT =
"Read heartbeat.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.";
export const DEFAULT_HEARTBEAT_EVERY = "30m";
export const DEFAULT_HEARTBEAT_ACK_MAX_CHARS = 300;
export function isHeartbeatContentEffectivelyEmpty(
content: string | undefined | null,
): boolean {
if (content === undefined || content === null || typeof content !== "string") {
return false;
}
const lines = content.split("\n");
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
if (/^#+(\s|$)/.test(trimmed)) continue;
if (/^[-*+]\s*(\[[\sXx]?\]\s*)?$/.test(trimmed)) continue;
return false;
}
return true;
}
export function resolveHeartbeatPrompt(raw?: string): string {
const trimmed = typeof raw === "string" ? raw.trim() : "";
return trimmed || HEARTBEAT_PROMPT;
}
export type StripHeartbeatMode = "heartbeat" | "message";
function stripTokenAtEdges(raw: string): { text: string; didStrip: boolean } {
let text = raw.trim();
if (!text) return { text: "", didStrip: false };
if (!text.includes(HEARTBEAT_TOKEN)) return { text, didStrip: false };
let didStrip = false;
let changed = true;
while (changed) {
changed = false;
const next = text.trim();
if (next.startsWith(HEARTBEAT_TOKEN)) {
text = next.slice(HEARTBEAT_TOKEN.length).trimStart();
didStrip = true;
changed = true;
continue;
}
if (next.endsWith(HEARTBEAT_TOKEN)) {
text = next.slice(0, Math.max(0, next.length - HEARTBEAT_TOKEN.length)).trimEnd();
didStrip = true;
changed = true;
}
}
return {
text: text.replace(/\s+/g, " ").trim(),
didStrip,
};
}
export function stripHeartbeatToken(
raw?: string,
opts: { mode?: StripHeartbeatMode; maxAckChars?: number } = {},
): { shouldSkip: boolean; text: string; didStrip: boolean } {
if (!raw) return { shouldSkip: true, text: "", didStrip: false };
const trimmed = raw.trim();
if (!trimmed) return { shouldSkip: true, text: "", didStrip: false };
const mode = opts.mode ?? "message";
const maxAckCharsRaw = opts.maxAckChars;
const maxAckChars = Math.max(
0,
typeof maxAckCharsRaw === "number" && Number.isFinite(maxAckCharsRaw)
? maxAckCharsRaw
: DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
);
const stripMarkup = (text: string) =>
text
.replace(/<[^>]*>/g, " ")
.replace(/&nbsp;/gi, " ")
.replace(/^[*`~_]+/, "")
.replace(/[*`~_]+$/, "");
const normalized = stripMarkup(trimmed);
const hasToken =
trimmed.includes(HEARTBEAT_TOKEN) || normalized.includes(HEARTBEAT_TOKEN);
if (!hasToken) {
return { shouldSkip: false, text: trimmed, didStrip: false };
}
const strippedOriginal = stripTokenAtEdges(trimmed);
const strippedNormalized = stripTokenAtEdges(normalized);
const picked =
strippedOriginal.didStrip && strippedOriginal.text
? strippedOriginal
: strippedNormalized;
if (!picked.didStrip) {
return { shouldSkip: false, text: trimmed, didStrip: false };
}
if (!picked.text) {
return { shouldSkip: true, text: "", didStrip: true };
}
const rest = picked.text.trim();
if (mode === "heartbeat" && rest.length <= maxAckChars) {
return { shouldSkip: true, text: "", didStrip: true };
}
return { shouldSkip: false, text: rest, didStrip: true };
}

View file

@ -0,0 +1,47 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
hasPendingHeartbeatWake,
requestHeartbeatNow,
setHeartbeatWakeHandler,
} from "./heartbeat-wake.js";
describe("heartbeat-wake", () => {
afterEach(() => {
setHeartbeatWakeHandler(null);
vi.useRealTimers();
});
it("coalesces multiple wake requests into one run", async () => {
vi.useFakeTimers();
const handler = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 }));
setHeartbeatWakeHandler(handler);
requestHeartbeatNow({ reason: "a" });
requestHeartbeatNow({ reason: "b" });
requestHeartbeatNow({ reason: "c" });
expect(hasPendingHeartbeatWake()).toBe(true);
await vi.advanceTimersByTimeAsync(300);
expect(handler).toHaveBeenCalledTimes(1);
});
it("retries when requests are in flight", async () => {
vi.useFakeTimers();
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped" as const, reason: "requests-in-flight" })
.mockResolvedValueOnce({ status: "ran" as const, durationMs: 3 });
setHeartbeatWakeHandler(handler);
requestHeartbeatNow({ reason: "retry-case" });
await vi.advanceTimersByTimeAsync(300);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1100);
expect(handler).toHaveBeenCalledTimes(2);
});
});

View file

@ -0,0 +1,73 @@
export type HeartbeatRunResult =
| { status: "ran"; durationMs: number }
| { status: "skipped"; reason: string }
| { status: "failed"; reason: string };
export type HeartbeatWakeHandler = (opts: {
reason?: string;
}) => Promise<HeartbeatRunResult>;
let handler: HeartbeatWakeHandler | null = null;
let pendingReason: string | null = null;
let scheduled = false;
let running = false;
let timer: NodeJS.Timeout | null = null;
const DEFAULT_COALESCE_MS = 250;
const DEFAULT_RETRY_MS = 1000;
function schedule(coalesceMs: number): void {
if (timer) return;
timer = setTimeout(async () => {
timer = null;
scheduled = false;
const active = handler;
if (!active) return;
if (running) {
scheduled = true;
schedule(coalesceMs);
return;
}
const reason = pendingReason;
pendingReason = null;
running = true;
try {
const result = reason ? await active({ reason }) : await active({});
if (result.status === "skipped" && result.reason === "requests-in-flight") {
pendingReason = reason ?? "retry";
schedule(DEFAULT_RETRY_MS);
}
} catch {
pendingReason = reason ?? "retry";
schedule(DEFAULT_RETRY_MS);
} finally {
running = false;
if (pendingReason || scheduled) {
schedule(coalesceMs);
}
}
}, coalesceMs);
timer.unref?.();
}
export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): void {
handler = next;
if (handler && pendingReason) {
schedule(DEFAULT_COALESCE_MS);
}
}
export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }): void {
pendingReason = opts?.reason ?? pendingReason ?? "requested";
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS);
}
export function hasHeartbeatWakeHandler(): boolean {
return handler !== null;
}
export function hasPendingHeartbeatWake(): boolean {
return pendingReason !== null || Boolean(timer) || scheduled;
}

45
src/heartbeat/index.ts Normal file
View file

@ -0,0 +1,45 @@
export {
emitHeartbeatEvent,
getLastHeartbeatEvent,
onHeartbeatEvent,
resolveIndicatorType,
type HeartbeatEventPayload,
type HeartbeatIndicatorType,
} from "./heartbeat-events.js";
export {
hasHeartbeatWakeHandler,
hasPendingHeartbeatWake,
requestHeartbeatNow,
setHeartbeatWakeHandler,
type HeartbeatRunResult,
type HeartbeatWakeHandler,
} from "./heartbeat-wake.js";
export {
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
DEFAULT_HEARTBEAT_EVERY,
HEARTBEAT_PROMPT,
HEARTBEAT_TOKEN,
isHeartbeatContentEffectivelyEmpty,
resolveHeartbeatPrompt,
stripHeartbeatToken,
type StripHeartbeatMode,
} from "./heartbeat-text.js";
export {
drainSystemEvents,
enqueueSystemEvent,
hasSystemEvents,
peekSystemEvents,
resetSystemEventsForTest,
type SystemEvent,
} from "./system-events.js";
export {
runHeartbeatOnce,
setHeartbeatsEnabled,
startHeartbeatRunner,
type HeartbeatConfig,
type HeartbeatRunner,
} from "./runner.js";

View file

@ -0,0 +1,74 @@
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm, writeFile } from "node:fs/promises";
import { afterEach, describe, expect, it } from "vitest";
import { runHeartbeatOnce, setHeartbeatsEnabled } from "./runner.js";
type StubAgent = {
closed: boolean;
sessionId: string;
ensureInitialized: () => Promise<void>;
getMessages: () => Array<any>;
write: (content: string) => void;
waitForIdle: () => Promise<void>;
getHeartbeatConfig: () => { prompt?: string; ackMaxChars?: number; enabled?: boolean };
getPendingWrites: () => number;
getProfileDir: () => string | undefined;
};
function createStubAgent(opts?: {
profileDir?: string;
replyText?: string;
heartbeatEnabled?: boolean;
}): StubAgent {
const messages: Array<any> = [];
const replyText = opts?.replyText ?? "HEARTBEAT_OK";
return {
closed: false,
sessionId: "test-session",
ensureInitialized: async () => {},
getMessages: () => messages,
write: (content: string) => {
messages.push({ role: "user", content });
messages.push({ role: "assistant", content: [{ type: "text", text: replyText }] });
},
waitForIdle: async () => {},
getHeartbeatConfig: () =>
typeof opts?.heartbeatEnabled === "boolean"
? { enabled: opts.heartbeatEnabled }
: {},
getPendingWrites: () => 0,
getProfileDir: () => opts?.profileDir,
};
}
describe("heartbeat runner", () => {
afterEach(() => {
setHeartbeatsEnabled(true);
});
it("skips when no agent is available", async () => {
const result = await runHeartbeatOnce({ agent: null });
expect(result).toEqual({ status: "skipped", reason: "disabled" });
});
it("skips when heartbeat file is effectively empty", async () => {
const dir = await mkdtemp(path.join(os.tmpdir(), "heartbeat-test-"));
try {
await writeFile(path.join(dir, "heartbeat.md"), "# keep empty\n", "utf-8");
const agent = createStubAgent({ profileDir: dir });
const result = await runHeartbeatOnce({ agent: agent as any });
expect(result).toEqual({ status: "skipped", reason: "empty-heartbeat-file" });
} finally {
await rm(dir, { recursive: true, force: true });
}
});
it("runs and returns ran for heartbeat acknowledgements", async () => {
const agent = createStubAgent({ replyText: "HEARTBEAT_OK" });
const result = await runHeartbeatOnce({ agent: agent as any, reason: "manual" });
expect(result.status).toBe("ran");
});
});

321
src/heartbeat/runner.ts Normal file
View file

@ -0,0 +1,321 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AsyncAgent } from "../agent/async-agent.js";
import {
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
DEFAULT_HEARTBEAT_EVERY,
isHeartbeatContentEffectivelyEmpty,
resolveHeartbeatPrompt,
stripHeartbeatToken,
} from "./heartbeat-text.js";
import {
emitHeartbeatEvent,
resolveIndicatorType,
type HeartbeatEventPayload,
} from "./heartbeat-events.js";
import {
setHeartbeatWakeHandler,
requestHeartbeatNow,
type HeartbeatRunResult,
type HeartbeatWakeHandler,
} from "./heartbeat-wake.js";
import { drainSystemEvents } from "./system-events.js";
export type HeartbeatConfig = {
enabled?: boolean;
every?: string;
prompt?: string;
ackMaxChars?: number;
};
export type HeartbeatRunner = {
stop: () => void;
updateConfig: () => void;
};
type RunnerDeps = {
getAgent: () => AsyncAgent | null;
nowMs?: () => number;
logger?: Pick<Console, "info" | "warn" | "error">;
};
const HEARTBEAT_FILENAME = "heartbeat.md";
const DEFAULT_INTERVAL_MS = 30 * 60 * 1000;
let heartbeatsEnabled = true;
function resolveDurationMs(raw: string | undefined): number | null {
if (!raw) return DEFAULT_INTERVAL_MS;
const trimmed = raw.trim();
if (!trimmed) return DEFAULT_INTERVAL_MS;
const match = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i);
if (match) {
const num = Number.parseFloat(match[1]!);
const unit = match[2]!.toLowerCase();
const unitMs: Record<string, number> = {
s: 1000,
m: 60 * 1000,
h: 60 * 60 * 1000,
d: 24 * 60 * 60 * 1000,
};
const ms = unitMs[unit];
if (!Number.isFinite(num) || !ms) return null;
const value = Math.floor(num * ms);
return value > 0 ? value : null;
}
if (/^\d+$/.test(trimmed)) {
const value = Number.parseInt(trimmed, 10);
return value > 0 ? value : null;
}
return null;
}
function extractMessageText(message: AgentMessage | undefined): string {
if (!message) return "";
const raw = (message as { content?: unknown }).content;
if (typeof raw === "string") return raw;
if (!Array.isArray(raw)) return "";
const parts: string[] = [];
for (const block of raw) {
if (!block || typeof block !== "object") continue;
const text = (block as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
parts.push(text);
}
}
return parts.join("\n").trim();
}
function getHeartbeatConfig(agent: AsyncAgent | null): HeartbeatConfig {
const cfg = agent?.getHeartbeatConfig();
if (!cfg) return {};
const out: HeartbeatConfig = {};
if (typeof cfg.enabled === "boolean") out.enabled = cfg.enabled;
if (typeof cfg.every === "string") out.every = cfg.every;
if (typeof cfg.prompt === "string") out.prompt = cfg.prompt;
if (typeof cfg.ackMaxChars === "number" && Number.isFinite(cfg.ackMaxChars)) {
out.ackMaxChars = cfg.ackMaxChars;
}
return out;
}
function resolveHeartbeatIntervalMs(agent: AsyncAgent | null): number {
const cfg = getHeartbeatConfig(agent);
return resolveDurationMs(cfg.every ?? DEFAULT_HEARTBEAT_EVERY) ?? DEFAULT_INTERVAL_MS;
}
function resolveSessionKey(agent: AsyncAgent): string {
return agent.sessionId;
}
async function isHeartbeatFileEmpty(agent: AsyncAgent): Promise<boolean> {
const profileDir = agent.getProfileDir();
if (!profileDir) return false;
const heartbeatPath = path.join(profileDir, HEARTBEAT_FILENAME);
try {
const content = await fs.readFile(heartbeatPath, "utf-8");
return isHeartbeatContentEffectivelyEmpty(content);
} catch {
return false;
}
}
export function setHeartbeatsEnabled(enabled: boolean): void {
heartbeatsEnabled = enabled;
}
export async function runHeartbeatOnce(opts: {
agent: AsyncAgent | null;
reason?: string;
nowMs?: () => number;
}): Promise<HeartbeatRunResult> {
const startedAt = opts.nowMs?.() ?? Date.now();
const agent = opts.agent;
if (!heartbeatsEnabled) {
return { status: "skipped", reason: "disabled" };
}
if (!agent || agent.closed) {
return { status: "skipped", reason: "disabled" };
}
const cfg = getHeartbeatConfig(agent);
if (cfg.enabled === false) {
return { status: "skipped", reason: "disabled" };
}
if (agent.getPendingWrites() > 0) {
return { status: "skipped", reason: "requests-in-flight" };
}
try {
const isExecEvent = opts.reason === "exec-event";
if (!isExecEvent && (await isHeartbeatFileEmpty(agent))) {
emitHeartbeatEvent({
status: "skipped",
reason: "empty-heartbeat-file",
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: "empty-heartbeat-file" };
}
await agent.ensureInitialized();
const beforeMessages = agent.getMessages();
const sessionKey = resolveSessionKey(agent);
const pendingEvents = drainSystemEvents(sessionKey);
const basePrompt = resolveHeartbeatPrompt(cfg.prompt);
const prompt = pendingEvents.length
? `${basePrompt}\n\nSystem events:\n${pendingEvents.map((line) => `- ${line}`).join("\n")}`
: basePrompt;
agent.write(prompt);
await agent.waitForIdle();
const afterMessages = agent.getMessages();
const appended = afterMessages.slice(beforeMessages.length);
const assistant = [...appended]
.reverse()
.find((msg) => msg.role === "assistant");
const text = extractMessageText(assistant);
if (!text.trim()) {
const okEmptyEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "ok-empty",
durationMs: Date.now() - startedAt,
};
if (opts.reason) okEmptyEvent.reason = opts.reason;
const indicator = resolveIndicatorType("ok-empty");
if (indicator) okEmptyEvent.indicatorType = indicator;
emitHeartbeatEvent(okEmptyEvent);
return { status: "ran", durationMs: Date.now() - startedAt };
}
const stripped = stripHeartbeatToken(text, {
mode: "heartbeat",
maxAckChars: cfg.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
});
if (stripped.shouldSkip) {
const okTokenEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "ok-token",
durationMs: Date.now() - startedAt,
};
if (opts.reason) okTokenEvent.reason = opts.reason;
const indicator = resolveIndicatorType("ok-token");
if (indicator) okTokenEvent.indicatorType = indicator;
emitHeartbeatEvent(okTokenEvent);
return { status: "ran", durationMs: Date.now() - startedAt };
}
const sentEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "sent",
preview: stripped.text.slice(0, 200),
durationMs: Date.now() - startedAt,
};
if (opts.reason) sentEvent.reason = opts.reason;
const sentIndicator = resolveIndicatorType("sent");
if (sentIndicator) sentEvent.indicatorType = sentIndicator;
emitHeartbeatEvent(sentEvent);
return { status: "ran", durationMs: Date.now() - startedAt };
} catch (error) {
const reason = error instanceof Error ? error.message : String(error);
const failedEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "failed",
reason,
durationMs: Date.now() - startedAt,
};
const failedIndicator = resolveIndicatorType("failed");
if (failedIndicator) failedEvent.indicatorType = failedIndicator;
emitHeartbeatEvent(failedEvent);
return { status: "failed", reason };
}
}
export function startHeartbeatRunner(deps: RunnerDeps): HeartbeatRunner {
const logger = deps.logger ?? console;
const nowMs = deps.nowMs ?? (() => Date.now());
let timer: NodeJS.Timeout | null = null;
let stopped = false;
let intervalMs = resolveHeartbeatIntervalMs(deps.getAgent());
let nextDueAtMs = nowMs() + intervalMs;
const clearTimer = () => {
if (timer) {
clearTimeout(timer);
timer = null;
}
};
const scheduleNext = () => {
if (stopped) return;
clearTimer();
const delay = Math.max(0, nextDueAtMs - nowMs());
timer = setTimeout(() => {
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
}, delay);
timer.unref?.();
};
const run: HeartbeatWakeHandler = async (params) => {
const reason = params.reason;
const agent = deps.getAgent();
if (reason === "interval") {
const now = nowMs();
if (now < nextDueAtMs) {
return { status: "skipped", reason: "not-due" };
}
}
const result = await runHeartbeatOnce(
reason
? {
agent,
reason,
nowMs,
}
: {
agent,
nowMs,
},
);
const activeAgent = deps.getAgent();
intervalMs = resolveHeartbeatIntervalMs(activeAgent);
nextDueAtMs = nowMs() + intervalMs;
scheduleNext();
return result;
};
setHeartbeatWakeHandler(run);
scheduleNext();
logger.info?.("[Heartbeat] runner started");
return {
stop: () => {
if (stopped) return;
stopped = true;
clearTimer();
setHeartbeatWakeHandler(null);
logger.info?.("[Heartbeat] runner stopped");
},
updateConfig: () => {
const agent = deps.getAgent();
intervalMs = resolveHeartbeatIntervalMs(agent);
nextDueAtMs = nowMs() + intervalMs;
scheduleNext();
},
};
}
export type { HeartbeatEventPayload };

View file

@ -0,0 +1,51 @@
export type SystemEvent = { text: string; ts: number };
const MAX_EVENTS = 20;
const queues = new Map<string, SystemEvent[]>();
function normalizeSessionKey(key: string | undefined): string {
const trimmed = typeof key === "string" ? key.trim() : "";
if (!trimmed) {
throw new Error("system events require a sessionKey");
}
return trimmed;
}
export function enqueueSystemEvent(text: string, opts: { sessionKey: string }): void {
const sessionKey = normalizeSessionKey(opts.sessionKey);
const cleaned = text.trim();
if (!cleaned) return;
const list = queues.get(sessionKey) ?? [];
const previous = list[list.length - 1];
if (previous?.text === cleaned) {
return;
}
list.push({ text: cleaned, ts: Date.now() });
if (list.length > MAX_EVENTS) {
list.splice(0, list.length - MAX_EVENTS);
}
queues.set(sessionKey, list);
}
export function drainSystemEvents(sessionKey: string): string[] {
const key = normalizeSessionKey(sessionKey);
const list = queues.get(key) ?? [];
queues.delete(key);
return list.map((entry) => entry.text);
}
export function peekSystemEvents(sessionKey: string): string[] {
const key = normalizeSessionKey(sessionKey);
return (queues.get(key) ?? []).map((entry) => entry.text);
}
export function hasSystemEvents(sessionKey: string): boolean {
const key = normalizeSessionKey(sessionKey);
return (queues.get(key)?.length ?? 0) > 0;
}
export function resetSystemEventsForTest(): void {
queues.clear();
}

View file

@ -2,3 +2,4 @@ export * from "./agent/index.js";
export * from "./gateway/index.js";
export * from "./client/index.js";
export * from "./shared/index.js";
export * from "./heartbeat/index.js";