Merge pull request #103 from multica-ai/feat/heartbeat-mechanism

feat(heartbeat): add heartbeat runner and integrate with hub/cron
This commit is contained in:
Bohan Jiang 2026-02-06 17:46:51 +08:00 committed by GitHub
commit dacf8894e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 1452 additions and 58 deletions

View file

@ -190,12 +190,17 @@ interface ElectronAPI {
saveApiKey: (providerId: string, apiKey: string) => Promise<{ ok: boolean; error?: string }>
importOAuth: (providerId: string) => Promise<{ ok: boolean; expiresAt?: number; error?: string }>
}
cron: {
list: () => Promise<unknown[]>
toggle: (jobId: string) => Promise<{ ok: boolean }>
remove: (jobId: string) => Promise<{ ok: boolean }>
}
localChat: {
cron: {
list: () => Promise<unknown[]>
toggle: (jobId: string) => Promise<{ ok: boolean }>
remove: (jobId: string) => Promise<{ ok: boolean }>
}
heartbeat: {
last: () => Promise<unknown>
setEnabled: (enabled: boolean) => Promise<{ ok: boolean; enabled?: boolean; error?: string }>
wake: (reason?: string) => Promise<{ ok: boolean; result?: unknown; error?: string }>
}
localChat: {
subscribe: (agentId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean }>
unsubscribe: (agentId: string) => Promise<{ ok: boolean }>
getHistory: (agentId: string, options?: { offset?: number; limit?: number }) => Promise<{ messages: unknown[]; total: number; offset: number; limit: number }>

View file

@ -0,0 +1,39 @@
/**
* Heartbeat IPC handlers for Electron main process.
*/
import { ipcMain } from "electron";
import { getCurrentHub } from "./hub.js";
export function registerHeartbeatIpcHandlers(): void {
ipcMain.handle("heartbeat:last", async () => {
const hub = getCurrentHub();
if (!hub) return null;
return hub.getLastHeartbeat();
});
ipcMain.handle("heartbeat:setEnabled", async (_event, enabled: boolean) => {
const hub = getCurrentHub();
if (!hub) {
return { ok: false, error: "Hub not initialized" };
}
if (typeof enabled !== "boolean") {
return { ok: false, error: "enabled must be boolean" };
}
hub.setHeartbeatsEnabled(enabled);
return { ok: true, enabled };
});
ipcMain.handle("heartbeat:wake", async (_event, reason?: string) => {
const hub = getCurrentHub();
if (!hub) {
return { ok: false, error: "Hub not initialized" };
}
const result = await hub.runHeartbeatOnce({
reason: typeof reason === "string" ? reason.trim() || "manual" : "manual",
});
return { ok: result.status !== "failed", result };
});
}

View file

@ -7,6 +7,7 @@ export { registerHubIpcHandlers, cleanupHub, initializeHub, setupDeviceConfirmat
export { registerProfileIpcHandlers } from './profile.js'
export { registerProviderIpcHandlers } from './provider.js'
export { registerCronIpcHandlers } from './cron.js'
export { registerHeartbeatIpcHandlers } from './heartbeat.js'
import { registerAgentIpcHandlers, cleanupAgent } from './agent.js'
import { registerSkillsIpcHandlers } from './skills.js'
@ -14,6 +15,7 @@ import { registerHubIpcHandlers, cleanupHub, initializeHub } from './hub.js'
import { registerProfileIpcHandlers } from './profile.js'
import { registerProviderIpcHandlers } from './provider.js'
import { registerCronIpcHandlers } from './cron.js'
import { registerHeartbeatIpcHandlers } from './heartbeat.js'
/**
* Register all IPC handlers.
@ -26,6 +28,7 @@ export function registerAllIpcHandlers(): void {
registerProfileIpcHandlers()
registerProviderIpcHandlers()
registerCronIpcHandlers()
registerHeartbeatIpcHandlers()
}
/**

View file

@ -202,11 +202,17 @@ const electronAPI = {
},
// Cron jobs management
cron: {
list: () => ipcRenderer.invoke('cron:list'),
toggle: (jobId: string) => ipcRenderer.invoke('cron:toggle', jobId),
remove: (jobId: string) => ipcRenderer.invoke('cron:remove', jobId),
},
cron: {
list: () => ipcRenderer.invoke('cron:list'),
toggle: (jobId: string) => ipcRenderer.invoke('cron:toggle', jobId),
remove: (jobId: string) => ipcRenderer.invoke('cron:remove', jobId),
},
heartbeat: {
last: () => ipcRenderer.invoke('heartbeat:last'),
setEnabled: (enabled: boolean) => ipcRenderer.invoke('heartbeat:setEnabled', enabled),
wake: (reason?: string) => ipcRenderer.invoke('heartbeat:wake', reason),
},
// Local chat (direct IPC, no Gateway required)
localChat: {

View file

@ -0,0 +1,70 @@
import { useCallback, useEffect, useState } from "react";
export type HeartbeatEvent = {
ts: number;
status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed";
preview?: string;
durationMs?: number;
reason?: string;
};
export function useHeartbeat() {
const [enabled, setEnabled] = useState(true);
const [lastEvent, setLastEvent] = useState<HeartbeatEvent | null>(null);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const refresh = useCallback(async () => {
try {
setLoading(true);
setError(null);
const event = (await window.electronAPI.heartbeat.last()) as HeartbeatEvent | null;
setLastEvent(event);
} catch (err) {
setError(err instanceof Error ? err.message : String(err));
} finally {
setLoading(false);
}
}, []);
useEffect(() => {
void refresh();
const timer = setInterval(() => {
void refresh();
}, 15000);
return () => clearInterval(timer);
}, [refresh]);
const toggleEnabled = useCallback(async () => {
const next = !enabled;
const result = await window.electronAPI.heartbeat.setEnabled(next);
if (result.ok) {
setEnabled(next);
} else {
setError(result.error ?? "Failed to update heartbeat setting");
}
}, [enabled]);
const wakeNow = useCallback(async () => {
setLoading(true);
try {
const result = await window.electronAPI.heartbeat.wake("manual");
if (!result.ok) {
setError(result.error ?? "Failed to run heartbeat");
}
await refresh();
} finally {
setLoading(false);
}
}, [refresh]);
return {
enabled,
lastEvent,
loading,
error,
refresh,
toggleEnabled,
wakeNow,
};
}

View file

@ -1,9 +1,9 @@
import { useState, useEffect, useRef } from 'react'
import { useNavigate } from 'react-router-dom'
import { Button } from '@multica/ui/components/ui/button'
import { HugeiconsIcon } from '@hugeicons/react'
import {
Comment01Icon,
import { useState, useEffect, useRef } from 'react'
import { useNavigate } from 'react-router-dom'
import { Button } from '@multica/ui/components/ui/button'
import { HugeiconsIcon } from '@hugeicons/react'
import {
Comment01Icon,
LinkSquare01Icon,
Loading03Icon,
AlertCircleIcon,
@ -15,17 +15,17 @@ import {
import { ConnectionQRCode } from '../components/qr-code'
import { DeviceList } from '../components/device-list'
import { AgentSettingsDialog } from '../components/agent-settings-dialog'
import { ApiKeyDialog } from '../components/api-key-dialog'
import { OAuthDialog } from '../components/oauth-dialog'
import { useHub } from '../hooks/use-hub'
import { useProvider } from '../hooks/use-provider'
export default function HomePage() {
const navigate = useNavigate()
const { hubInfo, agents, loading, error } = useHub()
const { providers, current, setProvider, refresh, loading: providerLoading } = useProvider()
const [settingsOpen, setSettingsOpen] = useState(false)
const [agentName, setAgentName] = useState<string | undefined>()
import { ApiKeyDialog } from '../components/api-key-dialog'
import { OAuthDialog } from '../components/oauth-dialog'
import { useHub } from '../hooks/use-hub'
import { useProvider } from '../hooks/use-provider'
export default function HomePage() {
const navigate = useNavigate()
const { hubInfo, agents, loading, error } = useHub()
const { providers, current, setProvider, refresh, loading: providerLoading } = useProvider()
const [settingsOpen, setSettingsOpen] = useState(false)
const [agentName, setAgentName] = useState<string | undefined>()
const [providerDropdownOpen, setProviderDropdownOpen] = useState(false)
const [switching, setSwitching] = useState(false)
const [apiKeyDialogOpen, setApiKeyDialogOpen] = useState(false)
@ -186,8 +186,8 @@ export default function HomePage() {
<p className="font-medium">{agentName || 'Unnamed Agent'}</p>
</div>
{/* Provider Selector */}
<div className="p-4 rounded-lg bg-muted/50 border border-border/50 relative" ref={dropdownRef}>
{/* Provider Selector */}
<div className="p-4 rounded-lg bg-muted/50 border border-border/50 relative" ref={dropdownRef}>
<p className="text-xs text-muted-foreground uppercase tracking-wider mb-2">
LLM Provider
</p>
@ -270,10 +270,10 @@ export default function HomePage() {
</div>
</div>
)}
</div>
{/* Stats Grid */}
<div className="grid grid-cols-2 gap-4">
</div>
{/* Stats Grid */}
<div className="grid grid-cols-2 gap-4">
<div className="p-4 rounded-lg bg-muted/50 border border-border/50">
<p className="text-xs text-muted-foreground uppercase tracking-wider mb-1">
Gateway

View file

@ -15,6 +15,7 @@ export class AsyncAgent {
private readonly channel = new Channel<ChannelItem>();
private _closed = false;
private queue: Promise<void> = Promise.resolve();
private pendingWrites = 0;
private closeCallbacks: Array<() => void> = [];
readonly sessionId: string;
@ -38,6 +39,7 @@ export class AsyncAgent {
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
if (this._closed) throw new Error("Agent is closed");
this.pendingWrites += 1;
this.queue = this.queue
.then(async () => {
@ -54,6 +56,9 @@ export class AsyncAgent {
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
})
.finally(() => {
this.pendingWrites = Math.max(0, this.pendingWrites - 1);
});
}
@ -170,6 +175,34 @@ export class AsyncAgent {
return this.agent.getProfileId();
}
/**
* Get profile directory path, if profile is enabled.
*/
getProfileDir(): string | undefined {
return this.agent.getProfileDir();
}
/**
* Get heartbeat configuration from profile config.
*/
getHeartbeatConfig():
| {
enabled?: boolean | undefined;
every?: string | undefined;
prompt?: string | undefined;
ackMaxChars?: number | undefined;
}
| undefined {
return this.agent.getHeartbeatConfig();
}
/**
* Number of queued/in-flight writes.
*/
getPendingWrites(): number {
return this.pendingWrites;
}
/**
* Get agent display name from profile config.
*/

View file

@ -50,6 +50,7 @@ export function createAgentProfile(
profile.user = DEFAULT_TEMPLATES.user;
profile.workspace = DEFAULT_TEMPLATES.workspace;
profile.memory = DEFAULT_TEMPLATES.memory;
profile.heartbeat = DEFAULT_TEMPLATES.heartbeat;
// 保存到文件
saveProfile(profile, { baseDir });
@ -150,6 +151,7 @@ export class ProfileManager {
user: profile.user,
workspace: profile.workspace,
memory: profile.memory,
heartbeat: profile.heartbeat,
config: profile.config,
},
profileDir: this.getProfileDir(),
@ -168,6 +170,19 @@ export class ProfileManager {
return profile?.config;
}
/** Get heartbeat configuration from profile config */
getHeartbeatConfig():
| {
enabled?: boolean | undefined;
every?: string | undefined;
prompt?: string | undefined;
ackMaxChars?: number | undefined;
}
| undefined {
const profile = this.getProfile();
return profile?.config?.heartbeat;
}
/** 更新 tools 配置 */
updateToolsConfig(toolsConfig: ToolsConfig): void {
const profile = this.getOrCreateProfile(false);

View file

@ -95,13 +95,14 @@ export function loadProfile(profileId: string, options?: StorageOptions): AgentP
user: readProfileFile(profileId, PROFILE_FILES.user, options),
workspace: readProfileFile(profileId, PROFILE_FILES.workspace, options),
memory: readProfileFile(profileId, PROFILE_FILES.memory, options),
heartbeat: readProfileFile(profileId, PROFILE_FILES.heartbeat, options),
config: readProfileConfig(profileId, options),
};
}
/** 保存 AgentProfile只写入非空字段 */
export function saveProfile(profile: AgentProfile, options?: StorageOptions): void {
const { id, soul, user, workspace, memory, config } = profile;
const { id, soul, user, workspace, memory, heartbeat, config } = profile;
if (soul !== undefined) {
writeProfileFile(id, PROFILE_FILES.soul, soul, options);
@ -115,6 +116,9 @@ export function saveProfile(profile: AgentProfile, options?: StorageOptions): vo
if (memory !== undefined) {
writeProfileFile(id, PROFILE_FILES.memory, memory, options);
}
if (heartbeat !== undefined) {
writeProfileFile(id, PROFILE_FILES.heartbeat, heartbeat, options);
}
if (config !== undefined) {
writeProfileConfig(id, config, options);
}

View file

@ -72,6 +72,7 @@ Your profile directory contains these files (use \`edit\` or \`write\` to update
| \`user.md\` | About your human | As you learn about them |
| \`workspace.md\` | This file — your rules | When you discover better conventions |
| \`memory.md\` | Long-term knowledge | Regularly — capture what matters |
| \`heartbeat.md\` | Background check instructions | When heartbeat behavior should change |
## Every Session
@ -89,6 +90,7 @@ You wake up fresh each session. These files are your continuity:
- **Long-term:** \`MEMORY.md\` — your curated memories, lessons learned
- **Daily notes:** \`memory/YYYY-MM-DD.md\` — raw logs of what happened (optional)
- **Heartbeat:** \`heartbeat.md\` — periodic check loop instructions
Capture what matters. Decisions, context, things to remember.
@ -101,6 +103,7 @@ Capture what matters. Decisions, context, things to remember.
- \`memory.md\` — Your learnings: decisions made, lessons learned, important context
- \`workspace.md\` — Your rules: conventions, workflows, how you should operate
- \`soul.md\` — Your identity: only change if user wants to reshape who you are
- \`heartbeat.md\` — Periodic background checks and alert rules
**Rules:**
- **DO NOT** say "I'll remember that" without ACTUALLY calling \`edit\` or \`write\` on a file
@ -148,5 +151,11 @@ _(Persistent knowledge will be stored here. Update this as you learn.)_
## Lessons Learned
## Important Context
`,
heartbeat: `# heartbeat.md
# Keep this file empty (or with only comments) to skip heartbeat API calls.
# Add tasks below when you want the agent to check something periodically.
`,
} as const;

View file

@ -11,6 +11,7 @@ export const PROFILE_FILES = {
user: "user.md",
workspace: "workspace.md",
memory: "memory.md",
heartbeat: "heartbeat.md",
config: "config.json",
} as const;
@ -42,6 +43,17 @@ export interface ProfileConfig {
reasoningMode?: "off" | "on" | "stream" | undefined;
/** Exec approval configuration (security level, ask mode, allowlist) */
execApproval?: ExecApprovalConfig | undefined;
/** Heartbeat configuration */
heartbeat?: {
/** Global heartbeat enable switch */
enabled?: boolean | undefined;
/** Interval, e.g. "30m", "1h" */
every?: string | undefined;
/** Optional prompt override */
prompt?: string | undefined;
/** Max chars after HEARTBEAT_OK to still treat as ack */
ackMaxChars?: number | undefined;
} | undefined;
}
/** Agent Profile configuration */
@ -56,6 +68,8 @@ export interface AgentProfile {
workspace?: string | undefined;
/** Persistent memory - long-term knowledge base */
memory?: string | undefined;
/** Periodic heartbeat instructions */
heartbeat?: string | undefined;
/** Profile configuration (from config.json) */
config?: ProfileConfig | undefined;
}

View file

@ -595,6 +595,27 @@ export class Agent {
return this.profile?.getProfile()?.id;
}
/**
* Get profile directory path, if profile is enabled.
*/
getProfileDir(): string | undefined {
return this.profile?.getProfileDir();
}
/**
* Get heartbeat configuration from profile config.
*/
getHeartbeatConfig():
| {
enabled?: boolean | undefined;
every?: string | undefined;
prompt?: string | undefined;
ackMaxChars?: number | undefined;
}
| undefined {
return this.profile?.getHeartbeatConfig();
}
/**
* Get agent display name from profile config.
*/
@ -770,6 +791,7 @@ export class Agent {
user: profile.user,
workspace: profile.workspace,
memory: profile.memory,
heartbeat: profile.heartbeat,
config: profile.config,
},
profileDir: this.profile!.getProfileDir(),

View file

@ -10,6 +10,7 @@ import type {
SystemPromptReport,
} from "./types.js";
import {
buildHeartbeatSection,
buildConditionalToolSections,
buildExtraPromptSection,
buildIdentitySection,
@ -58,6 +59,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): {
{ name: "user", lines: buildUserSection(profile, mode) },
{ name: "workspace", lines: buildWorkspaceSection(profile, mode, profileDir) },
{ name: "memory", lines: buildMemoryFileSection(profile, mode) },
{ name: "heartbeat", lines: buildHeartbeatSection(profile, mode) },
{ name: "safety", lines: buildSafetySection(includeSafety) },
{ name: "tooling", lines: buildToolingSummary(tools, mode) },
{ name: "tool-call-style", lines: buildToolCallStyleSection(mode) },

View file

@ -6,6 +6,7 @@
import { SAFETY_CONSTITUTION } from "./constitution.js";
import { formatRuntimeLine } from "./runtime-info.js";
import { resolveHeartbeatPrompt } from "../../heartbeat/heartbeat-text.js";
import type {
ProfileContent,
RuntimeInfo,
@ -97,13 +98,14 @@ export function buildWorkspaceSection(
"## Profile",
"",
`Your profile directory: \`${profileDir}\``,
"Use this as the base path for profile files (soul.md, user.md, memory.md, memory/*.md).",
"Use this as the base path for profile files (soul.md, user.md, memory.md, heartbeat.md, memory/*.md).",
"",
"Profile files:",
"- `soul.md` — Your identity and values",
"- `user.md` — Information about your user",
"- `workspace.md` — Guidelines and conventions (below)",
"- `memory.md` — Persistent knowledge",
"- `heartbeat.md` — Background heartbeat loop instructions",
"",
);
}
@ -128,6 +130,26 @@ export function buildMemoryFileSection(
return [];
}
/**
* Heartbeat section full mode only.
* Keeps heartbeat protocol explicit in the agent instructions.
*/
export function buildHeartbeatSection(
profile: ProfileContent | undefined,
mode: SystemPromptMode,
): string[] {
if (mode !== "full") return [];
const prompt = resolveHeartbeatPrompt(profile?.config?.heartbeat?.prompt);
return [
"## Heartbeats",
`Heartbeat prompt: ${prompt}`,
'If you receive a heartbeat poll (a user message matching the heartbeat prompt above), and there is nothing that needs attention, reply exactly:',
"HEARTBEAT_OK",
'If something needs attention, do NOT include "HEARTBEAT_OK"; reply with the alert text instead.',
"",
];
}
/**
* Safety constitution always included.
*/

View file

@ -53,6 +53,7 @@ export interface ProfileContent {
user?: string | undefined;
workspace?: string | undefined;
memory?: string | undefined;
heartbeat?: string | undefined;
config?: ProfileConfig | undefined;
}

View file

@ -44,6 +44,10 @@ async function executeSystemEvent(job: CronJob): Promise<ExecutionResult> {
const hub = getHub();
const payload = job.payload as { kind: "system-event"; text: string };
const text = payload.text.trim();
if (!text) {
return { error: "system-event payload requires non-empty text" };
}
// Get the list of active agents
const agentIds = hub.listAgents();
@ -54,25 +58,29 @@ async function executeSystemEvent(job: CronJob): Promise<ExecutionResult> {
// For now, inject into the first (main) agent
// TODO: Support targeting specific agent by ID
const agentId = agentIds[0]!;
const agent = hub.getAgent(agentId);
if (!agent || agent.closed) {
return { error: `Agent ${agentId} not found or closed` };
const cronMessage = `[CRON] ${job.name}: ${text}`;
hub.enqueueSystemEvent(cronMessage, { agentId });
if (job.wakeMode === "now") {
const result = await hub.runHeartbeatOnce({ reason: `cron:${job.id}` });
if (result.status === "failed") {
return { error: result.reason };
}
if (result.status === "skipped") {
return {
summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wake skipped: ${result.reason})`,
};
}
return {
summary: `Enqueued cron event and triggered immediate heartbeat for agent ${agentId.slice(0, 8)}`,
};
}
// Format the cron message with metadata
const cronMessage = `[CRON] ${job.name}: ${payload.text}`;
try {
// Write to agent (non-blocking, will be processed in queue)
agent.write(cronMessage);
// Wait for the agent to process the message
await agent.waitForIdle();
return { summary: `Injected message into agent ${agentId.slice(0, 8)}` };
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
hub.requestHeartbeatNow({ reason: `cron:${job.id}` });
return {
summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wakeMode: next-heartbeat)`,
};
}
/**

View file

@ -0,0 +1,50 @@
export type HeartbeatIndicatorType = "ok" | "alert" | "error";
export type HeartbeatEventPayload = {
ts: number;
status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed";
preview?: string;
durationMs?: number;
reason?: string;
indicatorType?: HeartbeatIndicatorType;
};
export function resolveIndicatorType(
status: HeartbeatEventPayload["status"],
): HeartbeatIndicatorType | undefined {
switch (status) {
case "ok-empty":
case "ok-token":
return "ok";
case "sent":
return "alert";
case "failed":
return "error";
case "skipped":
return undefined;
}
}
let lastHeartbeat: HeartbeatEventPayload | null = null;
const listeners = new Set<(evt: HeartbeatEventPayload) => void>();
export function emitHeartbeatEvent(evt: Omit<HeartbeatEventPayload, "ts">): void {
const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt };
lastHeartbeat = enriched;
for (const listener of listeners) {
try {
listener(enriched);
} catch {
// Ignore listener errors so heartbeat flow stays robust.
}
}
}
export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void {
listeners.add(listener);
return () => listeners.delete(listener);
}
export function getLastHeartbeatEvent(): HeartbeatEventPayload | null {
return lastHeartbeat;
}

View file

@ -0,0 +1,31 @@
import { describe, expect, it } from "vitest";
import {
HEARTBEAT_TOKEN,
isHeartbeatContentEffectivelyEmpty,
stripHeartbeatToken,
} from "./heartbeat-text.js";
describe("heartbeat-text", () => {
it("treats comment-only heartbeat files as empty", () => {
expect(isHeartbeatContentEffectivelyEmpty("# title\n\n- [ ]\n")).toBe(true);
expect(isHeartbeatContentEffectivelyEmpty("\n# note\n")).toBe(true);
expect(isHeartbeatContentEffectivelyEmpty("check disk health")).toBe(false);
});
it("strips plain token responses", () => {
const result = stripHeartbeatToken(HEARTBEAT_TOKEN, { mode: "heartbeat" });
expect(result.shouldSkip).toBe(true);
expect(result.text).toBe("");
});
it("keeps substantial content around token in heartbeat mode", () => {
const longTail = "Potential issue detected: disk usage is 92% on /Users";
const result = stripHeartbeatToken(`${HEARTBEAT_TOKEN} ${longTail}`, {
mode: "heartbeat",
maxAckChars: 10,
});
expect(result.shouldSkip).toBe(false);
expect(result.text).toContain("disk usage");
});
});

View file

@ -0,0 +1,117 @@
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT =
"Read heartbeat.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.";
export const DEFAULT_HEARTBEAT_EVERY = "30m";
export const DEFAULT_HEARTBEAT_ACK_MAX_CHARS = 300;
export function isHeartbeatContentEffectivelyEmpty(
content: string | undefined | null,
): boolean {
if (content === undefined || content === null || typeof content !== "string") {
return false;
}
const lines = content.split("\n");
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
if (/^#+(\s|$)/.test(trimmed)) continue;
if (/^[-*+]\s*(\[[\sXx]?\]\s*)?$/.test(trimmed)) continue;
return false;
}
return true;
}
export function resolveHeartbeatPrompt(raw?: string): string {
const trimmed = typeof raw === "string" ? raw.trim() : "";
return trimmed || HEARTBEAT_PROMPT;
}
export type StripHeartbeatMode = "heartbeat" | "message";
function stripTokenAtEdges(raw: string): { text: string; didStrip: boolean } {
let text = raw.trim();
if (!text) return { text: "", didStrip: false };
if (!text.includes(HEARTBEAT_TOKEN)) return { text, didStrip: false };
let didStrip = false;
let changed = true;
while (changed) {
changed = false;
const next = text.trim();
if (next.startsWith(HEARTBEAT_TOKEN)) {
text = next.slice(HEARTBEAT_TOKEN.length).trimStart();
didStrip = true;
changed = true;
continue;
}
if (next.endsWith(HEARTBEAT_TOKEN)) {
text = next.slice(0, Math.max(0, next.length - HEARTBEAT_TOKEN.length)).trimEnd();
didStrip = true;
changed = true;
}
}
return {
text: text.replace(/\s+/g, " ").trim(),
didStrip,
};
}
export function stripHeartbeatToken(
raw?: string,
opts: { mode?: StripHeartbeatMode; maxAckChars?: number } = {},
): { shouldSkip: boolean; text: string; didStrip: boolean } {
if (!raw) return { shouldSkip: true, text: "", didStrip: false };
const trimmed = raw.trim();
if (!trimmed) return { shouldSkip: true, text: "", didStrip: false };
const mode = opts.mode ?? "message";
const maxAckCharsRaw = opts.maxAckChars;
const maxAckChars = Math.max(
0,
typeof maxAckCharsRaw === "number" && Number.isFinite(maxAckCharsRaw)
? maxAckCharsRaw
: DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
);
const stripMarkup = (text: string) =>
text
.replace(/<[^>]*>/g, " ")
.replace(/&nbsp;/gi, " ")
.replace(/^[*`~_]+/, "")
.replace(/[*`~_]+$/, "");
const normalized = stripMarkup(trimmed);
const hasToken =
trimmed.includes(HEARTBEAT_TOKEN) || normalized.includes(HEARTBEAT_TOKEN);
if (!hasToken) {
return { shouldSkip: false, text: trimmed, didStrip: false };
}
const strippedOriginal = stripTokenAtEdges(trimmed);
const strippedNormalized = stripTokenAtEdges(normalized);
const picked =
strippedOriginal.didStrip && strippedOriginal.text
? strippedOriginal
: strippedNormalized;
if (!picked.didStrip) {
return { shouldSkip: false, text: trimmed, didStrip: false };
}
if (!picked.text) {
return { shouldSkip: true, text: "", didStrip: true };
}
const rest = picked.text.trim();
if (mode === "heartbeat" && rest.length <= maxAckChars) {
return { shouldSkip: true, text: "", didStrip: true };
}
return { shouldSkip: false, text: rest, didStrip: true };
}

View file

@ -0,0 +1,47 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
hasPendingHeartbeatWake,
requestHeartbeatNow,
setHeartbeatWakeHandler,
} from "./heartbeat-wake.js";
describe("heartbeat-wake", () => {
afterEach(() => {
setHeartbeatWakeHandler(null);
vi.useRealTimers();
});
it("coalesces multiple wake requests into one run", async () => {
vi.useFakeTimers();
const handler = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 }));
setHeartbeatWakeHandler(handler);
requestHeartbeatNow({ reason: "a" });
requestHeartbeatNow({ reason: "b" });
requestHeartbeatNow({ reason: "c" });
expect(hasPendingHeartbeatWake()).toBe(true);
await vi.advanceTimersByTimeAsync(300);
expect(handler).toHaveBeenCalledTimes(1);
});
it("retries when requests are in flight", async () => {
vi.useFakeTimers();
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped" as const, reason: "requests-in-flight" })
.mockResolvedValueOnce({ status: "ran" as const, durationMs: 3 });
setHeartbeatWakeHandler(handler);
requestHeartbeatNow({ reason: "retry-case" });
await vi.advanceTimersByTimeAsync(300);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1100);
expect(handler).toHaveBeenCalledTimes(2);
});
});

View file

@ -0,0 +1,73 @@
export type HeartbeatRunResult =
| { status: "ran"; durationMs: number }
| { status: "skipped"; reason: string }
| { status: "failed"; reason: string };
export type HeartbeatWakeHandler = (opts: {
reason?: string;
}) => Promise<HeartbeatRunResult>;
let handler: HeartbeatWakeHandler | null = null;
let pendingReason: string | null = null;
let scheduled = false;
let running = false;
let timer: NodeJS.Timeout | null = null;
const DEFAULT_COALESCE_MS = 250;
const DEFAULT_RETRY_MS = 1000;
function schedule(coalesceMs: number): void {
if (timer) return;
timer = setTimeout(async () => {
timer = null;
scheduled = false;
const active = handler;
if (!active) return;
if (running) {
scheduled = true;
schedule(coalesceMs);
return;
}
const reason = pendingReason;
pendingReason = null;
running = true;
try {
const result = reason ? await active({ reason }) : await active({});
if (result.status === "skipped" && result.reason === "requests-in-flight") {
pendingReason = reason ?? "retry";
schedule(DEFAULT_RETRY_MS);
}
} catch {
pendingReason = reason ?? "retry";
schedule(DEFAULT_RETRY_MS);
} finally {
running = false;
if (pendingReason || scheduled) {
schedule(coalesceMs);
}
}
}, coalesceMs);
timer.unref?.();
}
export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): void {
handler = next;
if (handler && pendingReason) {
schedule(DEFAULT_COALESCE_MS);
}
}
export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }): void {
pendingReason = opts?.reason ?? pendingReason ?? "requested";
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS);
}
export function hasHeartbeatWakeHandler(): boolean {
return handler !== null;
}
export function hasPendingHeartbeatWake(): boolean {
return pendingReason !== null || Boolean(timer) || scheduled;
}

45
src/heartbeat/index.ts Normal file
View file

@ -0,0 +1,45 @@
export {
emitHeartbeatEvent,
getLastHeartbeatEvent,
onHeartbeatEvent,
resolveIndicatorType,
type HeartbeatEventPayload,
type HeartbeatIndicatorType,
} from "./heartbeat-events.js";
export {
hasHeartbeatWakeHandler,
hasPendingHeartbeatWake,
requestHeartbeatNow,
setHeartbeatWakeHandler,
type HeartbeatRunResult,
type HeartbeatWakeHandler,
} from "./heartbeat-wake.js";
export {
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
DEFAULT_HEARTBEAT_EVERY,
HEARTBEAT_PROMPT,
HEARTBEAT_TOKEN,
isHeartbeatContentEffectivelyEmpty,
resolveHeartbeatPrompt,
stripHeartbeatToken,
type StripHeartbeatMode,
} from "./heartbeat-text.js";
export {
drainSystemEvents,
enqueueSystemEvent,
hasSystemEvents,
peekSystemEvents,
resetSystemEventsForTest,
type SystemEvent,
} from "./system-events.js";
export {
runHeartbeatOnce,
setHeartbeatsEnabled,
startHeartbeatRunner,
type HeartbeatConfig,
type HeartbeatRunner,
} from "./runner.js";

View file

@ -0,0 +1,74 @@
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm, writeFile } from "node:fs/promises";
import { afterEach, describe, expect, it } from "vitest";
import { runHeartbeatOnce, setHeartbeatsEnabled } from "./runner.js";
type StubAgent = {
closed: boolean;
sessionId: string;
ensureInitialized: () => Promise<void>;
getMessages: () => Array<any>;
write: (content: string) => void;
waitForIdle: () => Promise<void>;
getHeartbeatConfig: () => { prompt?: string; ackMaxChars?: number; enabled?: boolean };
getPendingWrites: () => number;
getProfileDir: () => string | undefined;
};
function createStubAgent(opts?: {
profileDir?: string;
replyText?: string;
heartbeatEnabled?: boolean;
}): StubAgent {
const messages: Array<any> = [];
const replyText = opts?.replyText ?? "HEARTBEAT_OK";
return {
closed: false,
sessionId: "test-session",
ensureInitialized: async () => {},
getMessages: () => messages,
write: (content: string) => {
messages.push({ role: "user", content });
messages.push({ role: "assistant", content: [{ type: "text", text: replyText }] });
},
waitForIdle: async () => {},
getHeartbeatConfig: () =>
typeof opts?.heartbeatEnabled === "boolean"
? { enabled: opts.heartbeatEnabled }
: {},
getPendingWrites: () => 0,
getProfileDir: () => opts?.profileDir,
};
}
describe("heartbeat runner", () => {
afterEach(() => {
setHeartbeatsEnabled(true);
});
it("skips when no agent is available", async () => {
const result = await runHeartbeatOnce({ agent: null });
expect(result).toEqual({ status: "skipped", reason: "disabled" });
});
it("skips when heartbeat file is effectively empty", async () => {
const dir = await mkdtemp(path.join(os.tmpdir(), "heartbeat-test-"));
try {
await writeFile(path.join(dir, "heartbeat.md"), "# keep empty\n", "utf-8");
const agent = createStubAgent({ profileDir: dir });
const result = await runHeartbeatOnce({ agent: agent as any });
expect(result).toEqual({ status: "skipped", reason: "empty-heartbeat-file" });
} finally {
await rm(dir, { recursive: true, force: true });
}
});
it("runs and returns ran for heartbeat acknowledgements", async () => {
const agent = createStubAgent({ replyText: "HEARTBEAT_OK" });
const result = await runHeartbeatOnce({ agent: agent as any, reason: "manual" });
expect(result.status).toBe("ran");
});
});

321
src/heartbeat/runner.ts Normal file
View file

@ -0,0 +1,321 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AsyncAgent } from "../agent/async-agent.js";
import {
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
DEFAULT_HEARTBEAT_EVERY,
isHeartbeatContentEffectivelyEmpty,
resolveHeartbeatPrompt,
stripHeartbeatToken,
} from "./heartbeat-text.js";
import {
emitHeartbeatEvent,
resolveIndicatorType,
type HeartbeatEventPayload,
} from "./heartbeat-events.js";
import {
setHeartbeatWakeHandler,
requestHeartbeatNow,
type HeartbeatRunResult,
type HeartbeatWakeHandler,
} from "./heartbeat-wake.js";
import { drainSystemEvents } from "./system-events.js";
export type HeartbeatConfig = {
enabled?: boolean;
every?: string;
prompt?: string;
ackMaxChars?: number;
};
export type HeartbeatRunner = {
stop: () => void;
updateConfig: () => void;
};
type RunnerDeps = {
getAgent: () => AsyncAgent | null;
nowMs?: () => number;
logger?: Pick<Console, "info" | "warn" | "error">;
};
const HEARTBEAT_FILENAME = "heartbeat.md";
const DEFAULT_INTERVAL_MS = 30 * 60 * 1000;
let heartbeatsEnabled = true;
function resolveDurationMs(raw: string | undefined): number | null {
if (!raw) return DEFAULT_INTERVAL_MS;
const trimmed = raw.trim();
if (!trimmed) return DEFAULT_INTERVAL_MS;
const match = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i);
if (match) {
const num = Number.parseFloat(match[1]!);
const unit = match[2]!.toLowerCase();
const unitMs: Record<string, number> = {
s: 1000,
m: 60 * 1000,
h: 60 * 60 * 1000,
d: 24 * 60 * 60 * 1000,
};
const ms = unitMs[unit];
if (!Number.isFinite(num) || !ms) return null;
const value = Math.floor(num * ms);
return value > 0 ? value : null;
}
if (/^\d+$/.test(trimmed)) {
const value = Number.parseInt(trimmed, 10);
return value > 0 ? value : null;
}
return null;
}
function extractMessageText(message: AgentMessage | undefined): string {
if (!message) return "";
const raw = (message as { content?: unknown }).content;
if (typeof raw === "string") return raw;
if (!Array.isArray(raw)) return "";
const parts: string[] = [];
for (const block of raw) {
if (!block || typeof block !== "object") continue;
const text = (block as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
parts.push(text);
}
}
return parts.join("\n").trim();
}
function getHeartbeatConfig(agent: AsyncAgent | null): HeartbeatConfig {
const cfg = agent?.getHeartbeatConfig();
if (!cfg) return {};
const out: HeartbeatConfig = {};
if (typeof cfg.enabled === "boolean") out.enabled = cfg.enabled;
if (typeof cfg.every === "string") out.every = cfg.every;
if (typeof cfg.prompt === "string") out.prompt = cfg.prompt;
if (typeof cfg.ackMaxChars === "number" && Number.isFinite(cfg.ackMaxChars)) {
out.ackMaxChars = cfg.ackMaxChars;
}
return out;
}
function resolveHeartbeatIntervalMs(agent: AsyncAgent | null): number {
const cfg = getHeartbeatConfig(agent);
return resolveDurationMs(cfg.every ?? DEFAULT_HEARTBEAT_EVERY) ?? DEFAULT_INTERVAL_MS;
}
function resolveSessionKey(agent: AsyncAgent): string {
return agent.sessionId;
}
async function isHeartbeatFileEmpty(agent: AsyncAgent): Promise<boolean> {
const profileDir = agent.getProfileDir();
if (!profileDir) return false;
const heartbeatPath = path.join(profileDir, HEARTBEAT_FILENAME);
try {
const content = await fs.readFile(heartbeatPath, "utf-8");
return isHeartbeatContentEffectivelyEmpty(content);
} catch {
return false;
}
}
export function setHeartbeatsEnabled(enabled: boolean): void {
heartbeatsEnabled = enabled;
}
export async function runHeartbeatOnce(opts: {
agent: AsyncAgent | null;
reason?: string;
nowMs?: () => number;
}): Promise<HeartbeatRunResult> {
const startedAt = opts.nowMs?.() ?? Date.now();
const agent = opts.agent;
if (!heartbeatsEnabled) {
return { status: "skipped", reason: "disabled" };
}
if (!agent || agent.closed) {
return { status: "skipped", reason: "disabled" };
}
const cfg = getHeartbeatConfig(agent);
if (cfg.enabled === false) {
return { status: "skipped", reason: "disabled" };
}
if (agent.getPendingWrites() > 0) {
return { status: "skipped", reason: "requests-in-flight" };
}
try {
const isExecEvent = opts.reason === "exec-event";
if (!isExecEvent && (await isHeartbeatFileEmpty(agent))) {
emitHeartbeatEvent({
status: "skipped",
reason: "empty-heartbeat-file",
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: "empty-heartbeat-file" };
}
await agent.ensureInitialized();
const beforeMessages = agent.getMessages();
const sessionKey = resolveSessionKey(agent);
const pendingEvents = drainSystemEvents(sessionKey);
const basePrompt = resolveHeartbeatPrompt(cfg.prompt);
const prompt = pendingEvents.length
? `${basePrompt}\n\nSystem events:\n${pendingEvents.map((line) => `- ${line}`).join("\n")}`
: basePrompt;
agent.write(prompt);
await agent.waitForIdle();
const afterMessages = agent.getMessages();
const appended = afterMessages.slice(beforeMessages.length);
const assistant = [...appended]
.reverse()
.find((msg) => msg.role === "assistant");
const text = extractMessageText(assistant);
if (!text.trim()) {
const okEmptyEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "ok-empty",
durationMs: Date.now() - startedAt,
};
if (opts.reason) okEmptyEvent.reason = opts.reason;
const indicator = resolveIndicatorType("ok-empty");
if (indicator) okEmptyEvent.indicatorType = indicator;
emitHeartbeatEvent(okEmptyEvent);
return { status: "ran", durationMs: Date.now() - startedAt };
}
const stripped = stripHeartbeatToken(text, {
mode: "heartbeat",
maxAckChars: cfg.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
});
if (stripped.shouldSkip) {
const okTokenEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "ok-token",
durationMs: Date.now() - startedAt,
};
if (opts.reason) okTokenEvent.reason = opts.reason;
const indicator = resolveIndicatorType("ok-token");
if (indicator) okTokenEvent.indicatorType = indicator;
emitHeartbeatEvent(okTokenEvent);
return { status: "ran", durationMs: Date.now() - startedAt };
}
const sentEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "sent",
preview: stripped.text.slice(0, 200),
durationMs: Date.now() - startedAt,
};
if (opts.reason) sentEvent.reason = opts.reason;
const sentIndicator = resolveIndicatorType("sent");
if (sentIndicator) sentEvent.indicatorType = sentIndicator;
emitHeartbeatEvent(sentEvent);
return { status: "ran", durationMs: Date.now() - startedAt };
} catch (error) {
const reason = error instanceof Error ? error.message : String(error);
const failedEvent: Omit<HeartbeatEventPayload, "ts"> = {
status: "failed",
reason,
durationMs: Date.now() - startedAt,
};
const failedIndicator = resolveIndicatorType("failed");
if (failedIndicator) failedEvent.indicatorType = failedIndicator;
emitHeartbeatEvent(failedEvent);
return { status: "failed", reason };
}
}
export function startHeartbeatRunner(deps: RunnerDeps): HeartbeatRunner {
const logger = deps.logger ?? console;
const nowMs = deps.nowMs ?? (() => Date.now());
let timer: NodeJS.Timeout | null = null;
let stopped = false;
let intervalMs = resolveHeartbeatIntervalMs(deps.getAgent());
let nextDueAtMs = nowMs() + intervalMs;
const clearTimer = () => {
if (timer) {
clearTimeout(timer);
timer = null;
}
};
const scheduleNext = () => {
if (stopped) return;
clearTimer();
const delay = Math.max(0, nextDueAtMs - nowMs());
timer = setTimeout(() => {
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
}, delay);
timer.unref?.();
};
const run: HeartbeatWakeHandler = async (params) => {
const reason = params.reason;
const agent = deps.getAgent();
if (reason === "interval") {
const now = nowMs();
if (now < nextDueAtMs) {
return { status: "skipped", reason: "not-due" };
}
}
const result = await runHeartbeatOnce(
reason
? {
agent,
reason,
nowMs,
}
: {
agent,
nowMs,
},
);
const activeAgent = deps.getAgent();
intervalMs = resolveHeartbeatIntervalMs(activeAgent);
nextDueAtMs = nowMs() + intervalMs;
scheduleNext();
return result;
};
setHeartbeatWakeHandler(run);
scheduleNext();
logger.info?.("[Heartbeat] runner started");
return {
stop: () => {
if (stopped) return;
stopped = true;
clearTimer();
setHeartbeatWakeHandler(null);
logger.info?.("[Heartbeat] runner stopped");
},
updateConfig: () => {
const agent = deps.getAgent();
intervalMs = resolveHeartbeatIntervalMs(agent);
nextDueAtMs = nowMs() + intervalMs;
scheduleNext();
},
};
}
export type { HeartbeatEventPayload };

View file

@ -0,0 +1,51 @@
export type SystemEvent = { text: string; ts: number };
const MAX_EVENTS = 20;
const queues = new Map<string, SystemEvent[]>();
function normalizeSessionKey(key: string | undefined): string {
const trimmed = typeof key === "string" ? key.trim() : "";
if (!trimmed) {
throw new Error("system events require a sessionKey");
}
return trimmed;
}
export function enqueueSystemEvent(text: string, opts: { sessionKey: string }): void {
const sessionKey = normalizeSessionKey(opts.sessionKey);
const cleaned = text.trim();
if (!cleaned) return;
const list = queues.get(sessionKey) ?? [];
const previous = list[list.length - 1];
if (previous?.text === cleaned) {
return;
}
list.push({ text: cleaned, ts: Date.now() });
if (list.length > MAX_EVENTS) {
list.splice(0, list.length - MAX_EVENTS);
}
queues.set(sessionKey, list);
}
export function drainSystemEvents(sessionKey: string): string[] {
const key = normalizeSessionKey(sessionKey);
const list = queues.get(key) ?? [];
queues.delete(key);
return list.map((entry) => entry.text);
}
export function peekSystemEvents(sessionKey: string): string[] {
const key = normalizeSessionKey(sessionKey);
return (queues.get(key) ?? []).map((entry) => entry.text);
}
export function hasSystemEvents(sessionKey: string): boolean {
const key = normalizeSessionKey(sessionKey);
return (queues.get(key)?.length ?? 0) > 0;
}
export function resetSystemEventsForTest(): void {
queues.clear();
}

View file

@ -0,0 +1,66 @@
import { describe, expect, it } from "vitest";
import {
extractAssistantEventText,
isHeartbeatAckEvent,
} from "./heartbeat-filter.js";
describe("heartbeat-filter", () => {
it("extracts text from string content", () => {
const event = {
message: {
content: " HEARTBEAT_OK ",
},
};
expect(extractAssistantEventText(event)).toBe("HEARTBEAT_OK");
});
it("extracts text from content blocks", () => {
const event = {
message: {
content: [
{ type: "text", text: "line 1" },
{ type: "thinking", thinking: "hidden" },
{ type: "text", text: "line 2" },
],
},
};
expect(extractAssistantEventText(event)).toBe("line 1 line 2");
});
it("treats pure heartbeat token as ack", () => {
const event = {
message: {
content: [{ type: "text", text: "HEARTBEAT_OK" }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(true);
});
it("treats marked-up heartbeat token as ack", () => {
const event = {
message: {
content: [{ type: "text", text: "**HEARTBEAT_OK**" }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(true);
});
it("does not suppress real alert text", () => {
const event = {
message: {
content: [{ type: "text", text: "Reminder: go downstairs now." }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(false);
});
it("does not suppress token plus extra content", () => {
const event = {
message: {
content: [{ type: "text", text: "HEARTBEAT_OK Reminder: check inbox." }],
},
};
expect(isHeartbeatAckEvent(event)).toBe(false);
});
});

View file

@ -0,0 +1,43 @@
import { stripHeartbeatToken } from "../heartbeat/index.js";
function collapseWhitespace(value: string): string {
return value.replace(/\s+/g, " ").trim();
}
/**
* Extract assistant text from an agent stream event.
* Supports both string and rich content-array message shapes.
*/
export function extractAssistantEventText(event: unknown): string {
if (!event || typeof event !== "object") return "";
const message = (event as { message?: unknown }).message;
if (!message || typeof message !== "object") return "";
const content = (message as { content?: unknown }).content;
if (typeof content === "string") {
return collapseWhitespace(content);
}
if (!Array.isArray(content)) return "";
const parts: string[] = [];
for (const block of content) {
if (!block || typeof block !== "object") continue;
const text = (block as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
parts.push(text);
}
}
return collapseWhitespace(parts.join("\n"));
}
/**
* True only for pure heartbeat ACK payloads (e.g. "HEARTBEAT_OK").
* Messages that include any extra text are not suppressed.
*/
export function isHeartbeatAckEvent(event: unknown): boolean {
const text = extractAssistantEventText(event);
if (!text) return false;
const stripped = stripHeartbeatToken(text, { mode: "message" });
return stripped.shouldSkip && stripped.didStrip;
}

View file

@ -22,6 +22,9 @@ import { createListAgentsHandler } from "./rpc/handlers/list-agents.js";
import { createCreateAgentHandler } from "./rpc/handlers/create-agent.js";
import { createDeleteAgentHandler } from "./rpc/handlers/delete-agent.js";
import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js";
import { createGetLastHeartbeatHandler } from "./rpc/handlers/get-last-heartbeat.js";
import { createSetHeartbeatsHandler } from "./rpc/handlers/set-heartbeats.js";
import { createWakeHeartbeatHandler } from "./rpc/handlers/wake-heartbeat.js";
import { DeviceStore, type DeviceMeta } from "./device-store.js";
import { createVerifyHandler } from "./rpc/handlers/verify.js";
import { ExecApprovalManager } from "./exec-approval-manager.js";
@ -31,15 +34,33 @@ import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/
import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js";
import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js";
import { getCronService, shutdownCronService, executeCronJob } from "../cron/index.js";
import {
getLastHeartbeatEvent,
onHeartbeatEvent,
requestHeartbeatNow,
runHeartbeatOnce,
setHeartbeatsEnabled,
startHeartbeatRunner,
type HeartbeatEventPayload,
type HeartbeatRunResult,
type HeartbeatRunner,
} from "../heartbeat/index.js";
import { enqueueSystemEvent } from "../heartbeat/system-events.js";
import { isHeartbeatAckEvent } from "./heartbeat-filter.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
private readonly agentSenders = new Map<string, string>();
private readonly agentStreamIds = new Map<string, string>();
private readonly agentStreamCounters = new Map<string, number>();
private readonly pendingAssistantStarts = new Map<string, { agentId: string; event: unknown }>();
private readonly suppressedStreamAgents = new Set<string>();
private readonly localApprovalHandlers = new Map<string, (payload: ExecApprovalRequest) => void>();
private readonly rpc: RpcDispatcher;
private readonly approvalManager: ExecApprovalManager;
private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>();
private heartbeatRunner: HeartbeatRunner | null = null;
private heartbeatUnsubscribe: (() => void) | null = null;
private client: GatewayClient;
readonly deviceStore: DeviceStore;
private _onConfirmDevice: ((deviceId: string, agentId: string, meta?: DeviceMeta) => Promise<boolean>) | null = null;
@ -77,6 +98,9 @@ export class Hub {
this.rpc.register("createAgent", createCreateAgentHandler(this));
this.rpc.register("deleteAgent", createDeleteAgentHandler(this));
this.rpc.register("updateGateway", createUpdateGatewayHandler(this));
this.rpc.register("last-heartbeat", createGetLastHeartbeatHandler(this));
this.rpc.register("set-heartbeats", createSetHeartbeatsHandler(this));
this.rpc.register("wake-heartbeat", createWakeHeartbeatHandler(this));
// Initialize exec approval manager
this.approvalManager = new ExecApprovalManager((agentId, payload) => {
@ -103,6 +127,7 @@ export class Hub {
// Initialize and start cron service
this.initCronService();
this.initHeartbeatService();
this.client = this.createClient(this.url);
this.client.connect();
@ -119,6 +144,32 @@ export class Hub {
console.log("[Hub] Cron service initialized");
}
/** Initialize heartbeat runner + event fanout. */
private initHeartbeatService(): void {
this.heartbeatRunner = startHeartbeatRunner({
getAgent: () => this.getDefaultAgent(),
logger: console,
});
this.heartbeatUnsubscribe = onHeartbeatEvent((event) => {
for (const listener of this.heartbeatListeners) {
try {
listener(event);
} catch {
// Keep fanout resilient against listener errors.
}
}
});
console.log("[Hub] Heartbeat service initialized");
}
private getDefaultAgent(): AsyncAgent | null {
const first = this.listAgents()[0];
if (!first) return null;
return this.getAgent(first) ?? null;
}
/** Restore agents from persistent storage */
private restoreAgents(): void {
const records = loadAgentRecords();
@ -279,6 +330,7 @@ export class Hub {
// Internally consume agent output (AgentEvent stream + error Messages)
void this.consumeAgent(agent);
this.heartbeatRunner?.updateConfig();
console.log(`Agent created: ${agent.sessionId}`);
return agent;
@ -313,6 +365,14 @@ export class Hub {
this.agentStreamIds.delete(agentId);
}
private clearPendingAssistantStarts(agentId: string): void {
for (const [streamId, pending] of this.pendingAssistantStarts) {
if (pending.agentId === agentId) {
this.pendingAssistantStarts.delete(streamId);
}
}
}
/** Internally read agent output and send via Gateway */
private async consumeAgent(agent: AsyncAgent): Promise<void> {
for await (const item of agent.read()) {
@ -327,6 +387,20 @@ export class Hub {
content: item.content,
});
} else {
const suppressForAgent = this.suppressedStreamAgents.has(agent.sessionId);
// Suppress all user-visible stream events during silent heartbeat runs.
if (suppressForAgent) {
if (item.type === "message_start") {
this.beginStream(agent.sessionId, item);
} else if (item.type === "message_end") {
const streamId = this.getActiveStreamId(agent.sessionId, item);
this.pendingAssistantStarts.delete(streamId);
this.endStream(agent.sessionId);
}
continue;
}
// Compaction events: forward with synthetic streamId (no stream tracking)
const isCompactionEvent =
item.type === "compaction_start" || item.type === "compaction_end";
@ -348,18 +422,55 @@ export class Hub {
|| item.type === "tool_execution_end";
if (!shouldForward) continue;
if (item.type === "message_start") {
this.beginStream(agent.sessionId, item);
const isAssistantMessageEvent =
item.type === "message_start" || item.type === "message_update" || item.type === "message_end";
// Delay assistant message_start forwarding until we see content.
// This lets us suppress pure HEARTBEAT_OK acknowledgements end-to-end.
if (isAssistantMessageEvent && isAssistantMessage) {
if (item.type === "message_start") {
const streamId = this.beginStream(agent.sessionId, item);
this.pendingAssistantStarts.set(streamId, { agentId: agent.sessionId, event: item });
continue;
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
const isHeartbeatAck = isHeartbeatAckEvent(item);
if (isHeartbeatAck) {
if (item.type === "message_end") {
this.pendingAssistantStarts.delete(streamId);
this.endStream(agent.sessionId);
}
continue;
}
const pendingStart = this.pendingAssistantStarts.get(streamId);
if (pendingStart) {
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: pendingStart.event,
});
this.pendingAssistantStarts.delete(streamId);
}
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
continue;
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
}
}
}
@ -507,6 +618,63 @@ export class Hub {
.map(([id]) => id);
}
/** Subscribe heartbeat state updates. Returns unsubscribe callback. */
onHeartbeatEvent(callback: (event: HeartbeatEventPayload) => void): () => void {
this.heartbeatListeners.add(callback);
return () => {
this.heartbeatListeners.delete(callback);
};
}
/** Get latest heartbeat event payload. */
getLastHeartbeat(): HeartbeatEventPayload | null {
return getLastHeartbeatEvent();
}
/** Enable/disable heartbeat runner globally. */
setHeartbeatsEnabled(enabled: boolean): void {
setHeartbeatsEnabled(enabled);
this.heartbeatRunner?.updateConfig();
}
/** Enqueue a heartbeat wake request. */
requestHeartbeatNow(opts?: { reason?: string }): void {
requestHeartbeatNow(opts);
}
/** Run heartbeat immediately using the current default agent. */
async runHeartbeatOnce(opts?: { reason?: string }): Promise<HeartbeatRunResult> {
const agent = this.getDefaultAgent();
const reason = opts?.reason;
const shouldSuppressStreams = reason === "manual";
if (shouldSuppressStreams && agent) {
this.suppressedStreamAgents.add(agent.sessionId);
}
try {
if (reason) {
return runHeartbeatOnce({
agent,
reason,
});
}
return runHeartbeatOnce({
agent,
});
} finally {
if (shouldSuppressStreams && agent) {
this.suppressedStreamAgents.delete(agent.sessionId);
}
}
}
/** Enqueue a system event for a specific agent or the default agent. */
enqueueSystemEvent(text: string, opts?: { agentId?: string }): void {
const agentId = opts?.agentId ?? this.listAgents()[0];
if (!agentId) return;
enqueueSystemEvent(text, { sessionKey: agentId });
}
closeAgent(id: string): boolean {
const agent = this.agents.get(id);
if (!agent) return false;
@ -516,14 +684,22 @@ export class Hub {
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.clearPendingAssistantStarts(id);
this.suppressedStreamAgents.delete(id);
this.localApprovalHandlers.delete(id);
removeAgentRecord(id);
this.heartbeatRunner?.updateConfig();
return true;
}
shutdown(): void {
// Stop cron service
shutdownCronService();
this.heartbeatRunner?.stop();
this.heartbeatRunner = null;
this.heartbeatUnsubscribe?.();
this.heartbeatUnsubscribe = null;
this.heartbeatListeners.clear();
// Finalize subagent registry before closing agents
shutdownSubagentRegistry();
@ -534,6 +710,8 @@ export class Hub {
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.clearPendingAssistantStarts(id);
this.suppressedStreamAgents.delete(id);
this.localApprovalHandlers.delete(id);
}
this.client.disconnect();

View file

@ -0,0 +1,9 @@
import type { RpcHandler } from "../dispatcher.js";
interface HubLike {
getLastHeartbeat(): unknown;
}
export function createGetLastHeartbeatHandler(hub: HubLike): RpcHandler {
return () => hub.getLastHeartbeat();
}

View file

@ -0,0 +1,18 @@
import type { RpcHandler } from "../dispatcher.js";
import { RpcError } from "../dispatcher.js";
interface HubLike {
setHeartbeatsEnabled(enabled: boolean): void;
}
export function createSetHeartbeatsHandler(hub: HubLike): RpcHandler {
return (params) => {
const enabled = (params as { enabled?: unknown } | undefined)?.enabled;
if (typeof enabled !== "boolean") {
throw new RpcError("INVALID_REQUEST", "enabled (boolean) is required");
}
hub.setHeartbeatsEnabled(enabled);
return { ok: true, enabled };
};
}

View file

@ -0,0 +1,14 @@
import type { RpcHandler } from "../dispatcher.js";
interface HubLike {
requestHeartbeatNow(opts?: { reason?: string }): void;
}
export function createWakeHeartbeatHandler(hub: HubLike): RpcHandler {
return (params) => {
const reasonRaw = (params as { reason?: unknown } | undefined)?.reason;
const reason = typeof reasonRaw === "string" ? reasonRaw.trim() : "";
hub.requestHeartbeatNow({ reason: reason || "manual" });
return { ok: true };
};
}

View file

@ -5,3 +5,6 @@ export { createListAgentsHandler } from "./handlers/list-agents.js";
export { createCreateAgentHandler } from "./handlers/create-agent.js";
export { createDeleteAgentHandler } from "./handlers/delete-agent.js";
export { createUpdateGatewayHandler } from "./handlers/update-gateway.js";
export { createGetLastHeartbeatHandler } from "./handlers/get-last-heartbeat.js";
export { createSetHeartbeatsHandler } from "./handlers/set-heartbeats.js";
export { createWakeHeartbeatHandler } from "./handlers/wake-heartbeat.js";

View file

@ -2,3 +2,4 @@ export * from "./agent/index.js";
export * from "./gateway/index.js";
export * from "./client/index.js";
export * from "./shared/index.js";
export * from "./heartbeat/index.js";