diff --git a/apps/desktop/electron/electron-env.d.ts b/apps/desktop/electron/electron-env.d.ts index a626f0e1..4baf763e 100644 --- a/apps/desktop/electron/electron-env.d.ts +++ b/apps/desktop/electron/electron-env.d.ts @@ -190,12 +190,17 @@ interface ElectronAPI { saveApiKey: (providerId: string, apiKey: string) => Promise<{ ok: boolean; error?: string }> importOAuth: (providerId: string) => Promise<{ ok: boolean; expiresAt?: number; error?: string }> } - cron: { - list: () => Promise - toggle: (jobId: string) => Promise<{ ok: boolean }> - remove: (jobId: string) => Promise<{ ok: boolean }> - } - localChat: { + cron: { + list: () => Promise + toggle: (jobId: string) => Promise<{ ok: boolean }> + remove: (jobId: string) => Promise<{ ok: boolean }> + } + heartbeat: { + last: () => Promise + setEnabled: (enabled: boolean) => Promise<{ ok: boolean; enabled?: boolean; error?: string }> + wake: (reason?: string) => Promise<{ ok: boolean; result?: unknown; error?: string }> + } + localChat: { subscribe: (agentId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean }> unsubscribe: (agentId: string) => Promise<{ ok: boolean }> getHistory: (agentId: string, options?: { offset?: number; limit?: number }) => Promise<{ messages: unknown[]; total: number; offset: number; limit: number }> diff --git a/apps/desktop/electron/ipc/heartbeat.ts b/apps/desktop/electron/ipc/heartbeat.ts new file mode 100644 index 00000000..9a3791f5 --- /dev/null +++ b/apps/desktop/electron/ipc/heartbeat.ts @@ -0,0 +1,39 @@ +/** + * Heartbeat IPC handlers for Electron main process. + */ +import { ipcMain } from "electron"; +import { getCurrentHub } from "./hub.js"; + +export function registerHeartbeatIpcHandlers(): void { + ipcMain.handle("heartbeat:last", async () => { + const hub = getCurrentHub(); + if (!hub) return null; + return hub.getLastHeartbeat(); + }); + + ipcMain.handle("heartbeat:setEnabled", async (_event, enabled: boolean) => { + const hub = getCurrentHub(); + if (!hub) { + return { ok: false, error: "Hub not initialized" }; + } + if (typeof enabled !== "boolean") { + return { ok: false, error: "enabled must be boolean" }; + } + + hub.setHeartbeatsEnabled(enabled); + return { ok: true, enabled }; + }); + + ipcMain.handle("heartbeat:wake", async (_event, reason?: string) => { + const hub = getCurrentHub(); + if (!hub) { + return { ok: false, error: "Hub not initialized" }; + } + + const result = await hub.runHeartbeatOnce({ + reason: typeof reason === "string" ? reason.trim() || "manual" : "manual", + }); + + return { ok: result.status !== "failed", result }; + }); +} diff --git a/apps/desktop/electron/ipc/index.ts b/apps/desktop/electron/ipc/index.ts index fc9335ee..a528e74a 100644 --- a/apps/desktop/electron/ipc/index.ts +++ b/apps/desktop/electron/ipc/index.ts @@ -7,6 +7,7 @@ export { registerHubIpcHandlers, cleanupHub, initializeHub, setupDeviceConfirmat export { registerProfileIpcHandlers } from './profile.js' export { registerProviderIpcHandlers } from './provider.js' export { registerCronIpcHandlers } from './cron.js' +export { registerHeartbeatIpcHandlers } from './heartbeat.js' import { registerAgentIpcHandlers, cleanupAgent } from './agent.js' import { registerSkillsIpcHandlers } from './skills.js' @@ -14,6 +15,7 @@ import { registerHubIpcHandlers, cleanupHub, initializeHub } from './hub.js' import { registerProfileIpcHandlers } from './profile.js' import { registerProviderIpcHandlers } from './provider.js' import { registerCronIpcHandlers } from './cron.js' +import { registerHeartbeatIpcHandlers } from './heartbeat.js' /** * Register all IPC handlers. @@ -26,6 +28,7 @@ export function registerAllIpcHandlers(): void { registerProfileIpcHandlers() registerProviderIpcHandlers() registerCronIpcHandlers() + registerHeartbeatIpcHandlers() } /** diff --git a/apps/desktop/electron/preload.ts b/apps/desktop/electron/preload.ts index 1ea1a64c..05313545 100644 --- a/apps/desktop/electron/preload.ts +++ b/apps/desktop/electron/preload.ts @@ -202,11 +202,17 @@ const electronAPI = { }, // Cron jobs management - cron: { - list: () => ipcRenderer.invoke('cron:list'), - toggle: (jobId: string) => ipcRenderer.invoke('cron:toggle', jobId), - remove: (jobId: string) => ipcRenderer.invoke('cron:remove', jobId), - }, + cron: { + list: () => ipcRenderer.invoke('cron:list'), + toggle: (jobId: string) => ipcRenderer.invoke('cron:toggle', jobId), + remove: (jobId: string) => ipcRenderer.invoke('cron:remove', jobId), + }, + + heartbeat: { + last: () => ipcRenderer.invoke('heartbeat:last'), + setEnabled: (enabled: boolean) => ipcRenderer.invoke('heartbeat:setEnabled', enabled), + wake: (reason?: string) => ipcRenderer.invoke('heartbeat:wake', reason), + }, // Local chat (direct IPC, no Gateway required) localChat: { diff --git a/apps/desktop/src/hooks/use-heartbeat.ts b/apps/desktop/src/hooks/use-heartbeat.ts new file mode 100644 index 00000000..0bb4e3c6 --- /dev/null +++ b/apps/desktop/src/hooks/use-heartbeat.ts @@ -0,0 +1,70 @@ +import { useCallback, useEffect, useState } from "react"; + +export type HeartbeatEvent = { + ts: number; + status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed"; + preview?: string; + durationMs?: number; + reason?: string; +}; + +export function useHeartbeat() { + const [enabled, setEnabled] = useState(true); + const [lastEvent, setLastEvent] = useState(null); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const refresh = useCallback(async () => { + try { + setLoading(true); + setError(null); + const event = (await window.electronAPI.heartbeat.last()) as HeartbeatEvent | null; + setLastEvent(event); + } catch (err) { + setError(err instanceof Error ? err.message : String(err)); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + void refresh(); + const timer = setInterval(() => { + void refresh(); + }, 15000); + return () => clearInterval(timer); + }, [refresh]); + + const toggleEnabled = useCallback(async () => { + const next = !enabled; + const result = await window.electronAPI.heartbeat.setEnabled(next); + if (result.ok) { + setEnabled(next); + } else { + setError(result.error ?? "Failed to update heartbeat setting"); + } + }, [enabled]); + + const wakeNow = useCallback(async () => { + setLoading(true); + try { + const result = await window.electronAPI.heartbeat.wake("manual"); + if (!result.ok) { + setError(result.error ?? "Failed to run heartbeat"); + } + await refresh(); + } finally { + setLoading(false); + } + }, [refresh]); + + return { + enabled, + lastEvent, + loading, + error, + refresh, + toggleEnabled, + wakeNow, + }; +} diff --git a/apps/desktop/src/pages/home.tsx b/apps/desktop/src/pages/home.tsx index 9019a3ae..0fc8417c 100644 --- a/apps/desktop/src/pages/home.tsx +++ b/apps/desktop/src/pages/home.tsx @@ -1,9 +1,9 @@ -import { useState, useEffect, useRef } from 'react' -import { useNavigate } from 'react-router-dom' -import { Button } from '@multica/ui/components/ui/button' -import { HugeiconsIcon } from '@hugeicons/react' -import { - Comment01Icon, +import { useState, useEffect, useRef } from 'react' +import { useNavigate } from 'react-router-dom' +import { Button } from '@multica/ui/components/ui/button' +import { HugeiconsIcon } from '@hugeicons/react' +import { + Comment01Icon, LinkSquare01Icon, Loading03Icon, AlertCircleIcon, @@ -15,17 +15,17 @@ import { import { ConnectionQRCode } from '../components/qr-code' import { DeviceList } from '../components/device-list' import { AgentSettingsDialog } from '../components/agent-settings-dialog' -import { ApiKeyDialog } from '../components/api-key-dialog' -import { OAuthDialog } from '../components/oauth-dialog' -import { useHub } from '../hooks/use-hub' -import { useProvider } from '../hooks/use-provider' - -export default function HomePage() { - const navigate = useNavigate() - const { hubInfo, agents, loading, error } = useHub() - const { providers, current, setProvider, refresh, loading: providerLoading } = useProvider() - const [settingsOpen, setSettingsOpen] = useState(false) - const [agentName, setAgentName] = useState() +import { ApiKeyDialog } from '../components/api-key-dialog' +import { OAuthDialog } from '../components/oauth-dialog' +import { useHub } from '../hooks/use-hub' +import { useProvider } from '../hooks/use-provider' + +export default function HomePage() { + const navigate = useNavigate() + const { hubInfo, agents, loading, error } = useHub() + const { providers, current, setProvider, refresh, loading: providerLoading } = useProvider() + const [settingsOpen, setSettingsOpen] = useState(false) + const [agentName, setAgentName] = useState() const [providerDropdownOpen, setProviderDropdownOpen] = useState(false) const [switching, setSwitching] = useState(false) const [apiKeyDialogOpen, setApiKeyDialogOpen] = useState(false) @@ -186,8 +186,8 @@ export default function HomePage() {

{agentName || 'Unnamed Agent'}

- {/* Provider Selector */} -
+ {/* Provider Selector */} +

LLM Provider

@@ -270,10 +270,10 @@ export default function HomePage() {
)} - - - {/* Stats Grid */} -
+
+ + {/* Stats Grid */} +

Gateway diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index db22b843..c1eb9a5e 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -15,6 +15,7 @@ export class AsyncAgent { private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); + private pendingWrites = 0; private closeCallbacks: Array<() => void> = []; readonly sessionId: string; @@ -38,6 +39,7 @@ export class AsyncAgent { /** Write message to agent (non-blocking, serialized queue) */ write(content: string): void { if (this._closed) throw new Error("Agent is closed"); + this.pendingWrites += 1; this.queue = this.queue .then(async () => { @@ -54,6 +56,9 @@ export class AsyncAgent { .catch((err) => { const message = err instanceof Error ? err.message : String(err); this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + }) + .finally(() => { + this.pendingWrites = Math.max(0, this.pendingWrites - 1); }); } @@ -170,6 +175,34 @@ export class AsyncAgent { return this.agent.getProfileId(); } + /** + * Get profile directory path, if profile is enabled. + */ + getProfileDir(): string | undefined { + return this.agent.getProfileDir(); + } + + /** + * Get heartbeat configuration from profile config. + */ + getHeartbeatConfig(): + | { + enabled?: boolean | undefined; + every?: string | undefined; + prompt?: string | undefined; + ackMaxChars?: number | undefined; + } + | undefined { + return this.agent.getHeartbeatConfig(); + } + + /** + * Number of queued/in-flight writes. + */ + getPendingWrites(): number { + return this.pendingWrites; + } + /** * Get agent display name from profile config. */ diff --git a/src/agent/profile/index.ts b/src/agent/profile/index.ts index cf7423e0..43a67d51 100644 --- a/src/agent/profile/index.ts +++ b/src/agent/profile/index.ts @@ -50,6 +50,7 @@ export function createAgentProfile( profile.user = DEFAULT_TEMPLATES.user; profile.workspace = DEFAULT_TEMPLATES.workspace; profile.memory = DEFAULT_TEMPLATES.memory; + profile.heartbeat = DEFAULT_TEMPLATES.heartbeat; // 保存到文件 saveProfile(profile, { baseDir }); @@ -150,6 +151,7 @@ export class ProfileManager { user: profile.user, workspace: profile.workspace, memory: profile.memory, + heartbeat: profile.heartbeat, config: profile.config, }, profileDir: this.getProfileDir(), @@ -168,6 +170,19 @@ export class ProfileManager { return profile?.config; } + /** Get heartbeat configuration from profile config */ + getHeartbeatConfig(): + | { + enabled?: boolean | undefined; + every?: string | undefined; + prompt?: string | undefined; + ackMaxChars?: number | undefined; + } + | undefined { + const profile = this.getProfile(); + return profile?.config?.heartbeat; + } + /** 更新 tools 配置 */ updateToolsConfig(toolsConfig: ToolsConfig): void { const profile = this.getOrCreateProfile(false); diff --git a/src/agent/profile/storage.ts b/src/agent/profile/storage.ts index 29d4c07e..9429aae1 100644 --- a/src/agent/profile/storage.ts +++ b/src/agent/profile/storage.ts @@ -95,13 +95,14 @@ export function loadProfile(profileId: string, options?: StorageOptions): AgentP user: readProfileFile(profileId, PROFILE_FILES.user, options), workspace: readProfileFile(profileId, PROFILE_FILES.workspace, options), memory: readProfileFile(profileId, PROFILE_FILES.memory, options), + heartbeat: readProfileFile(profileId, PROFILE_FILES.heartbeat, options), config: readProfileConfig(profileId, options), }; } /** 保存 AgentProfile(只写入非空字段) */ export function saveProfile(profile: AgentProfile, options?: StorageOptions): void { - const { id, soul, user, workspace, memory, config } = profile; + const { id, soul, user, workspace, memory, heartbeat, config } = profile; if (soul !== undefined) { writeProfileFile(id, PROFILE_FILES.soul, soul, options); @@ -115,6 +116,9 @@ export function saveProfile(profile: AgentProfile, options?: StorageOptions): vo if (memory !== undefined) { writeProfileFile(id, PROFILE_FILES.memory, memory, options); } + if (heartbeat !== undefined) { + writeProfileFile(id, PROFILE_FILES.heartbeat, heartbeat, options); + } if (config !== undefined) { writeProfileConfig(id, config, options); } diff --git a/src/agent/profile/templates.ts b/src/agent/profile/templates.ts index ffbb2d59..ca27f085 100644 --- a/src/agent/profile/templates.ts +++ b/src/agent/profile/templates.ts @@ -72,6 +72,7 @@ Your profile directory contains these files (use \`edit\` or \`write\` to update | \`user.md\` | About your human | As you learn about them | | \`workspace.md\` | This file — your rules | When you discover better conventions | | \`memory.md\` | Long-term knowledge | Regularly — capture what matters | +| \`heartbeat.md\` | Background check instructions | When heartbeat behavior should change | ## Every Session @@ -89,6 +90,7 @@ You wake up fresh each session. These files are your continuity: - **Long-term:** \`MEMORY.md\` — your curated memories, lessons learned - **Daily notes:** \`memory/YYYY-MM-DD.md\` — raw logs of what happened (optional) +- **Heartbeat:** \`heartbeat.md\` — periodic check loop instructions Capture what matters. Decisions, context, things to remember. @@ -101,6 +103,7 @@ Capture what matters. Decisions, context, things to remember. - \`memory.md\` — Your learnings: decisions made, lessons learned, important context - \`workspace.md\` — Your rules: conventions, workflows, how you should operate - \`soul.md\` — Your identity: only change if user wants to reshape who you are +- \`heartbeat.md\` — Periodic background checks and alert rules **Rules:** - **DO NOT** say "I'll remember that" without ACTUALLY calling \`edit\` or \`write\` on a file @@ -148,5 +151,11 @@ _(Persistent knowledge will be stored here. Update this as you learn.)_ ## Lessons Learned ## Important Context +`, + + heartbeat: `# heartbeat.md + +# Keep this file empty (or with only comments) to skip heartbeat API calls. +# Add tasks below when you want the agent to check something periodically. `, } as const; diff --git a/src/agent/profile/types.ts b/src/agent/profile/types.ts index 44e3f066..a250cf91 100644 --- a/src/agent/profile/types.ts +++ b/src/agent/profile/types.ts @@ -11,6 +11,7 @@ export const PROFILE_FILES = { user: "user.md", workspace: "workspace.md", memory: "memory.md", + heartbeat: "heartbeat.md", config: "config.json", } as const; @@ -42,6 +43,17 @@ export interface ProfileConfig { reasoningMode?: "off" | "on" | "stream" | undefined; /** Exec approval configuration (security level, ask mode, allowlist) */ execApproval?: ExecApprovalConfig | undefined; + /** Heartbeat configuration */ + heartbeat?: { + /** Global heartbeat enable switch */ + enabled?: boolean | undefined; + /** Interval, e.g. "30m", "1h" */ + every?: string | undefined; + /** Optional prompt override */ + prompt?: string | undefined; + /** Max chars after HEARTBEAT_OK to still treat as ack */ + ackMaxChars?: number | undefined; + } | undefined; } /** Agent Profile configuration */ @@ -56,6 +68,8 @@ export interface AgentProfile { workspace?: string | undefined; /** Persistent memory - long-term knowledge base */ memory?: string | undefined; + /** Periodic heartbeat instructions */ + heartbeat?: string | undefined; /** Profile configuration (from config.json) */ config?: ProfileConfig | undefined; } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 65fd528b..d364f0fa 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -595,6 +595,27 @@ export class Agent { return this.profile?.getProfile()?.id; } + /** + * Get profile directory path, if profile is enabled. + */ + getProfileDir(): string | undefined { + return this.profile?.getProfileDir(); + } + + /** + * Get heartbeat configuration from profile config. + */ + getHeartbeatConfig(): + | { + enabled?: boolean | undefined; + every?: string | undefined; + prompt?: string | undefined; + ackMaxChars?: number | undefined; + } + | undefined { + return this.profile?.getHeartbeatConfig(); + } + /** * Get agent display name from profile config. */ @@ -770,6 +791,7 @@ export class Agent { user: profile.user, workspace: profile.workspace, memory: profile.memory, + heartbeat: profile.heartbeat, config: profile.config, }, profileDir: this.profile!.getProfileDir(), diff --git a/src/agent/system-prompt/builder.ts b/src/agent/system-prompt/builder.ts index 619b05f8..a8578616 100644 --- a/src/agent/system-prompt/builder.ts +++ b/src/agent/system-prompt/builder.ts @@ -10,6 +10,7 @@ import type { SystemPromptReport, } from "./types.js"; import { + buildHeartbeatSection, buildConditionalToolSections, buildExtraPromptSection, buildIdentitySection, @@ -58,6 +59,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): { { name: "user", lines: buildUserSection(profile, mode) }, { name: "workspace", lines: buildWorkspaceSection(profile, mode, profileDir) }, { name: "memory", lines: buildMemoryFileSection(profile, mode) }, + { name: "heartbeat", lines: buildHeartbeatSection(profile, mode) }, { name: "safety", lines: buildSafetySection(includeSafety) }, { name: "tooling", lines: buildToolingSummary(tools, mode) }, { name: "tool-call-style", lines: buildToolCallStyleSection(mode) }, diff --git a/src/agent/system-prompt/sections.ts b/src/agent/system-prompt/sections.ts index 6a9a10ac..51b80f40 100644 --- a/src/agent/system-prompt/sections.ts +++ b/src/agent/system-prompt/sections.ts @@ -6,6 +6,7 @@ import { SAFETY_CONSTITUTION } from "./constitution.js"; import { formatRuntimeLine } from "./runtime-info.js"; +import { resolveHeartbeatPrompt } from "../../heartbeat/heartbeat-text.js"; import type { ProfileContent, RuntimeInfo, @@ -97,13 +98,14 @@ export function buildWorkspaceSection( "## Profile", "", `Your profile directory: \`${profileDir}\``, - "Use this as the base path for profile files (soul.md, user.md, memory.md, memory/*.md).", + "Use this as the base path for profile files (soul.md, user.md, memory.md, heartbeat.md, memory/*.md).", "", "Profile files:", "- `soul.md` — Your identity and values", "- `user.md` — Information about your user", "- `workspace.md` — Guidelines and conventions (below)", "- `memory.md` — Persistent knowledge", + "- `heartbeat.md` — Background heartbeat loop instructions", "", ); } @@ -128,6 +130,26 @@ export function buildMemoryFileSection( return []; } +/** + * Heartbeat section — full mode only. + * Keeps heartbeat protocol explicit in the agent instructions. + */ +export function buildHeartbeatSection( + profile: ProfileContent | undefined, + mode: SystemPromptMode, +): string[] { + if (mode !== "full") return []; + const prompt = resolveHeartbeatPrompt(profile?.config?.heartbeat?.prompt); + return [ + "## Heartbeats", + `Heartbeat prompt: ${prompt}`, + 'If you receive a heartbeat poll (a user message matching the heartbeat prompt above), and there is nothing that needs attention, reply exactly:', + "HEARTBEAT_OK", + 'If something needs attention, do NOT include "HEARTBEAT_OK"; reply with the alert text instead.', + "", + ]; +} + /** * Safety constitution — always included. */ diff --git a/src/agent/system-prompt/types.ts b/src/agent/system-prompt/types.ts index ae9d313e..2dc2b18e 100644 --- a/src/agent/system-prompt/types.ts +++ b/src/agent/system-prompt/types.ts @@ -53,6 +53,7 @@ export interface ProfileContent { user?: string | undefined; workspace?: string | undefined; memory?: string | undefined; + heartbeat?: string | undefined; config?: ProfileConfig | undefined; } diff --git a/src/cron/execute.ts b/src/cron/execute.ts index 23b1526e..3511b544 100644 --- a/src/cron/execute.ts +++ b/src/cron/execute.ts @@ -44,6 +44,10 @@ async function executeSystemEvent(job: CronJob): Promise { const hub = getHub(); const payload = job.payload as { kind: "system-event"; text: string }; + const text = payload.text.trim(); + if (!text) { + return { error: "system-event payload requires non-empty text" }; + } // Get the list of active agents const agentIds = hub.listAgents(); @@ -54,25 +58,29 @@ async function executeSystemEvent(job: CronJob): Promise { // For now, inject into the first (main) agent // TODO: Support targeting specific agent by ID const agentId = agentIds[0]!; - const agent = hub.getAgent(agentId); - if (!agent || agent.closed) { - return { error: `Agent ${agentId} not found or closed` }; + const cronMessage = `[CRON] ${job.name}: ${text}`; + + hub.enqueueSystemEvent(cronMessage, { agentId }); + + if (job.wakeMode === "now") { + const result = await hub.runHeartbeatOnce({ reason: `cron:${job.id}` }); + if (result.status === "failed") { + return { error: result.reason }; + } + if (result.status === "skipped") { + return { + summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wake skipped: ${result.reason})`, + }; + } + return { + summary: `Enqueued cron event and triggered immediate heartbeat for agent ${agentId.slice(0, 8)}`, + }; } - // Format the cron message with metadata - const cronMessage = `[CRON] ${job.name}: ${payload.text}`; - - try { - // Write to agent (non-blocking, will be processed in queue) - agent.write(cronMessage); - - // Wait for the agent to process the message - await agent.waitForIdle(); - - return { summary: `Injected message into agent ${agentId.slice(0, 8)}` }; - } catch (err) { - return { error: err instanceof Error ? err.message : String(err) }; - } + hub.requestHeartbeatNow({ reason: `cron:${job.id}` }); + return { + summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wakeMode: next-heartbeat)`, + }; } /** diff --git a/src/heartbeat/heartbeat-events.ts b/src/heartbeat/heartbeat-events.ts new file mode 100644 index 00000000..8b64ceea --- /dev/null +++ b/src/heartbeat/heartbeat-events.ts @@ -0,0 +1,50 @@ +export type HeartbeatIndicatorType = "ok" | "alert" | "error"; + +export type HeartbeatEventPayload = { + ts: number; + status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed"; + preview?: string; + durationMs?: number; + reason?: string; + indicatorType?: HeartbeatIndicatorType; +}; + +export function resolveIndicatorType( + status: HeartbeatEventPayload["status"], +): HeartbeatIndicatorType | undefined { + switch (status) { + case "ok-empty": + case "ok-token": + return "ok"; + case "sent": + return "alert"; + case "failed": + return "error"; + case "skipped": + return undefined; + } +} + +let lastHeartbeat: HeartbeatEventPayload | null = null; +const listeners = new Set<(evt: HeartbeatEventPayload) => void>(); + +export function emitHeartbeatEvent(evt: Omit): void { + const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt }; + lastHeartbeat = enriched; + for (const listener of listeners) { + try { + listener(enriched); + } catch { + // Ignore listener errors so heartbeat flow stays robust. + } + } +} + +export function onHeartbeatEvent(listener: (evt: HeartbeatEventPayload) => void): () => void { + listeners.add(listener); + return () => listeners.delete(listener); +} + +export function getLastHeartbeatEvent(): HeartbeatEventPayload | null { + return lastHeartbeat; +} diff --git a/src/heartbeat/heartbeat-text.test.ts b/src/heartbeat/heartbeat-text.test.ts new file mode 100644 index 00000000..2fb60708 --- /dev/null +++ b/src/heartbeat/heartbeat-text.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it } from "vitest"; +import { + HEARTBEAT_TOKEN, + isHeartbeatContentEffectivelyEmpty, + stripHeartbeatToken, +} from "./heartbeat-text.js"; + +describe("heartbeat-text", () => { + it("treats comment-only heartbeat files as empty", () => { + expect(isHeartbeatContentEffectivelyEmpty("# title\n\n- [ ]\n")).toBe(true); + expect(isHeartbeatContentEffectivelyEmpty("\n# note\n")).toBe(true); + expect(isHeartbeatContentEffectivelyEmpty("check disk health")).toBe(false); + }); + + it("strips plain token responses", () => { + const result = stripHeartbeatToken(HEARTBEAT_TOKEN, { mode: "heartbeat" }); + expect(result.shouldSkip).toBe(true); + expect(result.text).toBe(""); + }); + + it("keeps substantial content around token in heartbeat mode", () => { + const longTail = "Potential issue detected: disk usage is 92% on /Users"; + const result = stripHeartbeatToken(`${HEARTBEAT_TOKEN} ${longTail}`, { + mode: "heartbeat", + maxAckChars: 10, + }); + + expect(result.shouldSkip).toBe(false); + expect(result.text).toContain("disk usage"); + }); +}); diff --git a/src/heartbeat/heartbeat-text.ts b/src/heartbeat/heartbeat-text.ts new file mode 100644 index 00000000..a957ca2f --- /dev/null +++ b/src/heartbeat/heartbeat-text.ts @@ -0,0 +1,117 @@ +export const HEARTBEAT_TOKEN = "HEARTBEAT_OK"; + +export const HEARTBEAT_PROMPT = + "Read heartbeat.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK."; + +export const DEFAULT_HEARTBEAT_EVERY = "30m"; +export const DEFAULT_HEARTBEAT_ACK_MAX_CHARS = 300; + +export function isHeartbeatContentEffectivelyEmpty( + content: string | undefined | null, +): boolean { + if (content === undefined || content === null || typeof content !== "string") { + return false; + } + + const lines = content.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + if (/^#+(\s|$)/.test(trimmed)) continue; + if (/^[-*+]\s*(\[[\sXx]?\]\s*)?$/.test(trimmed)) continue; + return false; + } + + return true; +} + +export function resolveHeartbeatPrompt(raw?: string): string { + const trimmed = typeof raw === "string" ? raw.trim() : ""; + return trimmed || HEARTBEAT_PROMPT; +} + +export type StripHeartbeatMode = "heartbeat" | "message"; + +function stripTokenAtEdges(raw: string): { text: string; didStrip: boolean } { + let text = raw.trim(); + if (!text) return { text: "", didStrip: false }; + if (!text.includes(HEARTBEAT_TOKEN)) return { text, didStrip: false }; + + let didStrip = false; + let changed = true; + while (changed) { + changed = false; + const next = text.trim(); + if (next.startsWith(HEARTBEAT_TOKEN)) { + text = next.slice(HEARTBEAT_TOKEN.length).trimStart(); + didStrip = true; + changed = true; + continue; + } + if (next.endsWith(HEARTBEAT_TOKEN)) { + text = next.slice(0, Math.max(0, next.length - HEARTBEAT_TOKEN.length)).trimEnd(); + didStrip = true; + changed = true; + } + } + + return { + text: text.replace(/\s+/g, " ").trim(), + didStrip, + }; +} + +export function stripHeartbeatToken( + raw?: string, + opts: { mode?: StripHeartbeatMode; maxAckChars?: number } = {}, +): { shouldSkip: boolean; text: string; didStrip: boolean } { + if (!raw) return { shouldSkip: true, text: "", didStrip: false }; + + const trimmed = raw.trim(); + if (!trimmed) return { shouldSkip: true, text: "", didStrip: false }; + + const mode = opts.mode ?? "message"; + const maxAckCharsRaw = opts.maxAckChars; + const maxAckChars = Math.max( + 0, + typeof maxAckCharsRaw === "number" && Number.isFinite(maxAckCharsRaw) + ? maxAckCharsRaw + : DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + ); + + const stripMarkup = (text: string) => + text + .replace(/<[^>]*>/g, " ") + .replace(/ /gi, " ") + .replace(/^[*`~_]+/, "") + .replace(/[*`~_]+$/, ""); + + const normalized = stripMarkup(trimmed); + const hasToken = + trimmed.includes(HEARTBEAT_TOKEN) || normalized.includes(HEARTBEAT_TOKEN); + if (!hasToken) { + return { shouldSkip: false, text: trimmed, didStrip: false }; + } + + const strippedOriginal = stripTokenAtEdges(trimmed); + const strippedNormalized = stripTokenAtEdges(normalized); + const picked = + strippedOriginal.didStrip && strippedOriginal.text + ? strippedOriginal + : strippedNormalized; + + if (!picked.didStrip) { + return { shouldSkip: false, text: trimmed, didStrip: false }; + } + + if (!picked.text) { + return { shouldSkip: true, text: "", didStrip: true }; + } + + const rest = picked.text.trim(); + if (mode === "heartbeat" && rest.length <= maxAckChars) { + return { shouldSkip: true, text: "", didStrip: true }; + } + + return { shouldSkip: false, text: rest, didStrip: true }; +} diff --git a/src/heartbeat/heartbeat-wake.test.ts b/src/heartbeat/heartbeat-wake.test.ts new file mode 100644 index 00000000..d74e5b24 --- /dev/null +++ b/src/heartbeat/heartbeat-wake.test.ts @@ -0,0 +1,47 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + hasPendingHeartbeatWake, + requestHeartbeatNow, + setHeartbeatWakeHandler, +} from "./heartbeat-wake.js"; + +describe("heartbeat-wake", () => { + afterEach(() => { + setHeartbeatWakeHandler(null); + vi.useRealTimers(); + }); + + it("coalesces multiple wake requests into one run", async () => { + vi.useFakeTimers(); + const handler = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 })); + + setHeartbeatWakeHandler(handler); + requestHeartbeatNow({ reason: "a" }); + requestHeartbeatNow({ reason: "b" }); + requestHeartbeatNow({ reason: "c" }); + + expect(hasPendingHeartbeatWake()).toBe(true); + + await vi.advanceTimersByTimeAsync(300); + + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("retries when requests are in flight", async () => { + vi.useFakeTimers(); + + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped" as const, reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "ran" as const, durationMs: 3 }); + + setHeartbeatWakeHandler(handler); + requestHeartbeatNow({ reason: "retry-case" }); + + await vi.advanceTimersByTimeAsync(300); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1100); + expect(handler).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/heartbeat/heartbeat-wake.ts b/src/heartbeat/heartbeat-wake.ts new file mode 100644 index 00000000..22592fbc --- /dev/null +++ b/src/heartbeat/heartbeat-wake.ts @@ -0,0 +1,73 @@ +export type HeartbeatRunResult = + | { status: "ran"; durationMs: number } + | { status: "skipped"; reason: string } + | { status: "failed"; reason: string }; + +export type HeartbeatWakeHandler = (opts: { + reason?: string; +}) => Promise; + +let handler: HeartbeatWakeHandler | null = null; +let pendingReason: string | null = null; +let scheduled = false; +let running = false; +let timer: NodeJS.Timeout | null = null; + +const DEFAULT_COALESCE_MS = 250; +const DEFAULT_RETRY_MS = 1000; + +function schedule(coalesceMs: number): void { + if (timer) return; + timer = setTimeout(async () => { + timer = null; + scheduled = false; + const active = handler; + if (!active) return; + + if (running) { + scheduled = true; + schedule(coalesceMs); + return; + } + + const reason = pendingReason; + pendingReason = null; + running = true; + try { + const result = reason ? await active({ reason }) : await active({}); + if (result.status === "skipped" && result.reason === "requests-in-flight") { + pendingReason = reason ?? "retry"; + schedule(DEFAULT_RETRY_MS); + } + } catch { + pendingReason = reason ?? "retry"; + schedule(DEFAULT_RETRY_MS); + } finally { + running = false; + if (pendingReason || scheduled) { + schedule(coalesceMs); + } + } + }, coalesceMs); + timer.unref?.(); +} + +export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): void { + handler = next; + if (handler && pendingReason) { + schedule(DEFAULT_COALESCE_MS); + } +} + +export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }): void { + pendingReason = opts?.reason ?? pendingReason ?? "requested"; + schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS); +} + +export function hasHeartbeatWakeHandler(): boolean { + return handler !== null; +} + +export function hasPendingHeartbeatWake(): boolean { + return pendingReason !== null || Boolean(timer) || scheduled; +} diff --git a/src/heartbeat/index.ts b/src/heartbeat/index.ts new file mode 100644 index 00000000..a01b8b13 --- /dev/null +++ b/src/heartbeat/index.ts @@ -0,0 +1,45 @@ +export { + emitHeartbeatEvent, + getLastHeartbeatEvent, + onHeartbeatEvent, + resolveIndicatorType, + type HeartbeatEventPayload, + type HeartbeatIndicatorType, +} from "./heartbeat-events.js"; + +export { + hasHeartbeatWakeHandler, + hasPendingHeartbeatWake, + requestHeartbeatNow, + setHeartbeatWakeHandler, + type HeartbeatRunResult, + type HeartbeatWakeHandler, +} from "./heartbeat-wake.js"; + +export { + DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + DEFAULT_HEARTBEAT_EVERY, + HEARTBEAT_PROMPT, + HEARTBEAT_TOKEN, + isHeartbeatContentEffectivelyEmpty, + resolveHeartbeatPrompt, + stripHeartbeatToken, + type StripHeartbeatMode, +} from "./heartbeat-text.js"; + +export { + drainSystemEvents, + enqueueSystemEvent, + hasSystemEvents, + peekSystemEvents, + resetSystemEventsForTest, + type SystemEvent, +} from "./system-events.js"; + +export { + runHeartbeatOnce, + setHeartbeatsEnabled, + startHeartbeatRunner, + type HeartbeatConfig, + type HeartbeatRunner, +} from "./runner.js"; diff --git a/src/heartbeat/runner.test.ts b/src/heartbeat/runner.test.ts new file mode 100644 index 00000000..4a77c8fd --- /dev/null +++ b/src/heartbeat/runner.test.ts @@ -0,0 +1,74 @@ +import os from "node:os"; +import path from "node:path"; +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { afterEach, describe, expect, it } from "vitest"; +import { runHeartbeatOnce, setHeartbeatsEnabled } from "./runner.js"; + +type StubAgent = { + closed: boolean; + sessionId: string; + ensureInitialized: () => Promise; + getMessages: () => Array; + write: (content: string) => void; + waitForIdle: () => Promise; + getHeartbeatConfig: () => { prompt?: string; ackMaxChars?: number; enabled?: boolean }; + getPendingWrites: () => number; + getProfileDir: () => string | undefined; +}; + +function createStubAgent(opts?: { + profileDir?: string; + replyText?: string; + heartbeatEnabled?: boolean; +}): StubAgent { + const messages: Array = []; + const replyText = opts?.replyText ?? "HEARTBEAT_OK"; + + return { + closed: false, + sessionId: "test-session", + ensureInitialized: async () => {}, + getMessages: () => messages, + write: (content: string) => { + messages.push({ role: "user", content }); + messages.push({ role: "assistant", content: [{ type: "text", text: replyText }] }); + }, + waitForIdle: async () => {}, + getHeartbeatConfig: () => + typeof opts?.heartbeatEnabled === "boolean" + ? { enabled: opts.heartbeatEnabled } + : {}, + getPendingWrites: () => 0, + getProfileDir: () => opts?.profileDir, + }; +} + +describe("heartbeat runner", () => { + afterEach(() => { + setHeartbeatsEnabled(true); + }); + + it("skips when no agent is available", async () => { + const result = await runHeartbeatOnce({ agent: null }); + expect(result).toEqual({ status: "skipped", reason: "disabled" }); + }); + + it("skips when heartbeat file is effectively empty", async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), "heartbeat-test-")); + try { + await writeFile(path.join(dir, "heartbeat.md"), "# keep empty\n", "utf-8"); + const agent = createStubAgent({ profileDir: dir }); + const result = await runHeartbeatOnce({ agent: agent as any }); + expect(result).toEqual({ status: "skipped", reason: "empty-heartbeat-file" }); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + it("runs and returns ran for heartbeat acknowledgements", async () => { + const agent = createStubAgent({ replyText: "HEARTBEAT_OK" }); + const result = await runHeartbeatOnce({ agent: agent as any, reason: "manual" }); + + expect(result.status).toBe("ran"); + }); +}); diff --git a/src/heartbeat/runner.ts b/src/heartbeat/runner.ts new file mode 100644 index 00000000..281adf9b --- /dev/null +++ b/src/heartbeat/runner.ts @@ -0,0 +1,321 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { AsyncAgent } from "../agent/async-agent.js"; +import { + DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + DEFAULT_HEARTBEAT_EVERY, + isHeartbeatContentEffectivelyEmpty, + resolveHeartbeatPrompt, + stripHeartbeatToken, +} from "./heartbeat-text.js"; +import { + emitHeartbeatEvent, + resolveIndicatorType, + type HeartbeatEventPayload, +} from "./heartbeat-events.js"; +import { + setHeartbeatWakeHandler, + requestHeartbeatNow, + type HeartbeatRunResult, + type HeartbeatWakeHandler, +} from "./heartbeat-wake.js"; +import { drainSystemEvents } from "./system-events.js"; + +export type HeartbeatConfig = { + enabled?: boolean; + every?: string; + prompt?: string; + ackMaxChars?: number; +}; + +export type HeartbeatRunner = { + stop: () => void; + updateConfig: () => void; +}; + +type RunnerDeps = { + getAgent: () => AsyncAgent | null; + nowMs?: () => number; + logger?: Pick; +}; + +const HEARTBEAT_FILENAME = "heartbeat.md"; +const DEFAULT_INTERVAL_MS = 30 * 60 * 1000; +let heartbeatsEnabled = true; + +function resolveDurationMs(raw: string | undefined): number | null { + if (!raw) return DEFAULT_INTERVAL_MS; + const trimmed = raw.trim(); + if (!trimmed) return DEFAULT_INTERVAL_MS; + + const match = trimmed.match(/^(\d+(?:\.\d+)?)\s*([smhd])$/i); + if (match) { + const num = Number.parseFloat(match[1]!); + const unit = match[2]!.toLowerCase(); + const unitMs: Record = { + s: 1000, + m: 60 * 1000, + h: 60 * 60 * 1000, + d: 24 * 60 * 60 * 1000, + }; + const ms = unitMs[unit]; + if (!Number.isFinite(num) || !ms) return null; + const value = Math.floor(num * ms); + return value > 0 ? value : null; + } + + if (/^\d+$/.test(trimmed)) { + const value = Number.parseInt(trimmed, 10); + return value > 0 ? value : null; + } + + return null; +} + +function extractMessageText(message: AgentMessage | undefined): string { + if (!message) return ""; + const raw = (message as { content?: unknown }).content; + if (typeof raw === "string") return raw; + if (!Array.isArray(raw)) return ""; + + const parts: string[] = []; + for (const block of raw) { + if (!block || typeof block !== "object") continue; + const text = (block as { text?: unknown }).text; + if (typeof text === "string" && text.trim()) { + parts.push(text); + } + } + return parts.join("\n").trim(); +} + +function getHeartbeatConfig(agent: AsyncAgent | null): HeartbeatConfig { + const cfg = agent?.getHeartbeatConfig(); + if (!cfg) return {}; + + const out: HeartbeatConfig = {}; + if (typeof cfg.enabled === "boolean") out.enabled = cfg.enabled; + if (typeof cfg.every === "string") out.every = cfg.every; + if (typeof cfg.prompt === "string") out.prompt = cfg.prompt; + if (typeof cfg.ackMaxChars === "number" && Number.isFinite(cfg.ackMaxChars)) { + out.ackMaxChars = cfg.ackMaxChars; + } + return out; +} + +function resolveHeartbeatIntervalMs(agent: AsyncAgent | null): number { + const cfg = getHeartbeatConfig(agent); + return resolveDurationMs(cfg.every ?? DEFAULT_HEARTBEAT_EVERY) ?? DEFAULT_INTERVAL_MS; +} + +function resolveSessionKey(agent: AsyncAgent): string { + return agent.sessionId; +} + +async function isHeartbeatFileEmpty(agent: AsyncAgent): Promise { + const profileDir = agent.getProfileDir(); + if (!profileDir) return false; + const heartbeatPath = path.join(profileDir, HEARTBEAT_FILENAME); + + try { + const content = await fs.readFile(heartbeatPath, "utf-8"); + return isHeartbeatContentEffectivelyEmpty(content); + } catch { + return false; + } +} + +export function setHeartbeatsEnabled(enabled: boolean): void { + heartbeatsEnabled = enabled; +} + +export async function runHeartbeatOnce(opts: { + agent: AsyncAgent | null; + reason?: string; + nowMs?: () => number; +}): Promise { + const startedAt = opts.nowMs?.() ?? Date.now(); + const agent = opts.agent; + + if (!heartbeatsEnabled) { + return { status: "skipped", reason: "disabled" }; + } + + if (!agent || agent.closed) { + return { status: "skipped", reason: "disabled" }; + } + + const cfg = getHeartbeatConfig(agent); + if (cfg.enabled === false) { + return { status: "skipped", reason: "disabled" }; + } + + if (agent.getPendingWrites() > 0) { + return { status: "skipped", reason: "requests-in-flight" }; + } + + try { + const isExecEvent = opts.reason === "exec-event"; + if (!isExecEvent && (await isHeartbeatFileEmpty(agent))) { + emitHeartbeatEvent({ + status: "skipped", + reason: "empty-heartbeat-file", + durationMs: Date.now() - startedAt, + }); + return { status: "skipped", reason: "empty-heartbeat-file" }; + } + + await agent.ensureInitialized(); + const beforeMessages = agent.getMessages(); + const sessionKey = resolveSessionKey(agent); + const pendingEvents = drainSystemEvents(sessionKey); + + const basePrompt = resolveHeartbeatPrompt(cfg.prompt); + const prompt = pendingEvents.length + ? `${basePrompt}\n\nSystem events:\n${pendingEvents.map((line) => `- ${line}`).join("\n")}` + : basePrompt; + + agent.write(prompt); + await agent.waitForIdle(); + + const afterMessages = agent.getMessages(); + const appended = afterMessages.slice(beforeMessages.length); + const assistant = [...appended] + .reverse() + .find((msg) => msg.role === "assistant"); + const text = extractMessageText(assistant); + + if (!text.trim()) { + const okEmptyEvent: Omit = { + status: "ok-empty", + durationMs: Date.now() - startedAt, + }; + if (opts.reason) okEmptyEvent.reason = opts.reason; + const indicator = resolveIndicatorType("ok-empty"); + if (indicator) okEmptyEvent.indicatorType = indicator; + emitHeartbeatEvent(okEmptyEvent); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const stripped = stripHeartbeatToken(text, { + mode: "heartbeat", + maxAckChars: cfg.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + }); + + if (stripped.shouldSkip) { + const okTokenEvent: Omit = { + status: "ok-token", + durationMs: Date.now() - startedAt, + }; + if (opts.reason) okTokenEvent.reason = opts.reason; + const indicator = resolveIndicatorType("ok-token"); + if (indicator) okTokenEvent.indicatorType = indicator; + emitHeartbeatEvent(okTokenEvent); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const sentEvent: Omit = { + status: "sent", + preview: stripped.text.slice(0, 200), + durationMs: Date.now() - startedAt, + }; + if (opts.reason) sentEvent.reason = opts.reason; + const sentIndicator = resolveIndicatorType("sent"); + if (sentIndicator) sentEvent.indicatorType = sentIndicator; + emitHeartbeatEvent(sentEvent); + return { status: "ran", durationMs: Date.now() - startedAt }; + } catch (error) { + const reason = error instanceof Error ? error.message : String(error); + const failedEvent: Omit = { + status: "failed", + reason, + durationMs: Date.now() - startedAt, + }; + const failedIndicator = resolveIndicatorType("failed"); + if (failedIndicator) failedEvent.indicatorType = failedIndicator; + emitHeartbeatEvent(failedEvent); + return { status: "failed", reason }; + } +} + +export function startHeartbeatRunner(deps: RunnerDeps): HeartbeatRunner { + const logger = deps.logger ?? console; + const nowMs = deps.nowMs ?? (() => Date.now()); + let timer: NodeJS.Timeout | null = null; + let stopped = false; + let intervalMs = resolveHeartbeatIntervalMs(deps.getAgent()); + let nextDueAtMs = nowMs() + intervalMs; + + const clearTimer = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + }; + + const scheduleNext = () => { + if (stopped) return; + clearTimer(); + + const delay = Math.max(0, nextDueAtMs - nowMs()); + timer = setTimeout(() => { + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + }, delay); + timer.unref?.(); + }; + + const run: HeartbeatWakeHandler = async (params) => { + const reason = params.reason; + const agent = deps.getAgent(); + + if (reason === "interval") { + const now = nowMs(); + if (now < nextDueAtMs) { + return { status: "skipped", reason: "not-due" }; + } + } + + const result = await runHeartbeatOnce( + reason + ? { + agent, + reason, + nowMs, + } + : { + agent, + nowMs, + }, + ); + + const activeAgent = deps.getAgent(); + intervalMs = resolveHeartbeatIntervalMs(activeAgent); + nextDueAtMs = nowMs() + intervalMs; + scheduleNext(); + + return result; + }; + + setHeartbeatWakeHandler(run); + scheduleNext(); + logger.info?.("[Heartbeat] runner started"); + + return { + stop: () => { + if (stopped) return; + stopped = true; + clearTimer(); + setHeartbeatWakeHandler(null); + logger.info?.("[Heartbeat] runner stopped"); + }, + updateConfig: () => { + const agent = deps.getAgent(); + intervalMs = resolveHeartbeatIntervalMs(agent); + nextDueAtMs = nowMs() + intervalMs; + scheduleNext(); + }, + }; +} + +export type { HeartbeatEventPayload }; diff --git a/src/heartbeat/system-events.ts b/src/heartbeat/system-events.ts new file mode 100644 index 00000000..7927fd9e --- /dev/null +++ b/src/heartbeat/system-events.ts @@ -0,0 +1,51 @@ +export type SystemEvent = { text: string; ts: number }; + +const MAX_EVENTS = 20; +const queues = new Map(); + +function normalizeSessionKey(key: string | undefined): string { + const trimmed = typeof key === "string" ? key.trim() : ""; + if (!trimmed) { + throw new Error("system events require a sessionKey"); + } + return trimmed; +} + +export function enqueueSystemEvent(text: string, opts: { sessionKey: string }): void { + const sessionKey = normalizeSessionKey(opts.sessionKey); + const cleaned = text.trim(); + if (!cleaned) return; + + const list = queues.get(sessionKey) ?? []; + const previous = list[list.length - 1]; + if (previous?.text === cleaned) { + return; + } + + list.push({ text: cleaned, ts: Date.now() }); + if (list.length > MAX_EVENTS) { + list.splice(0, list.length - MAX_EVENTS); + } + queues.set(sessionKey, list); +} + +export function drainSystemEvents(sessionKey: string): string[] { + const key = normalizeSessionKey(sessionKey); + const list = queues.get(key) ?? []; + queues.delete(key); + return list.map((entry) => entry.text); +} + +export function peekSystemEvents(sessionKey: string): string[] { + const key = normalizeSessionKey(sessionKey); + return (queues.get(key) ?? []).map((entry) => entry.text); +} + +export function hasSystemEvents(sessionKey: string): boolean { + const key = normalizeSessionKey(sessionKey); + return (queues.get(key)?.length ?? 0) > 0; +} + +export function resetSystemEventsForTest(): void { + queues.clear(); +} diff --git a/src/hub/heartbeat-filter.test.ts b/src/hub/heartbeat-filter.test.ts new file mode 100644 index 00000000..a273a2af --- /dev/null +++ b/src/hub/heartbeat-filter.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from "vitest"; +import { + extractAssistantEventText, + isHeartbeatAckEvent, +} from "./heartbeat-filter.js"; + +describe("heartbeat-filter", () => { + it("extracts text from string content", () => { + const event = { + message: { + content: " HEARTBEAT_OK ", + }, + }; + expect(extractAssistantEventText(event)).toBe("HEARTBEAT_OK"); + }); + + it("extracts text from content blocks", () => { + const event = { + message: { + content: [ + { type: "text", text: "line 1" }, + { type: "thinking", thinking: "hidden" }, + { type: "text", text: "line 2" }, + ], + }, + }; + expect(extractAssistantEventText(event)).toBe("line 1 line 2"); + }); + + it("treats pure heartbeat token as ack", () => { + const event = { + message: { + content: [{ type: "text", text: "HEARTBEAT_OK" }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(true); + }); + + it("treats marked-up heartbeat token as ack", () => { + const event = { + message: { + content: [{ type: "text", text: "**HEARTBEAT_OK**" }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(true); + }); + + it("does not suppress real alert text", () => { + const event = { + message: { + content: [{ type: "text", text: "Reminder: go downstairs now." }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(false); + }); + + it("does not suppress token plus extra content", () => { + const event = { + message: { + content: [{ type: "text", text: "HEARTBEAT_OK Reminder: check inbox." }], + }, + }; + expect(isHeartbeatAckEvent(event)).toBe(false); + }); +}); + diff --git a/src/hub/heartbeat-filter.ts b/src/hub/heartbeat-filter.ts new file mode 100644 index 00000000..9d16c781 --- /dev/null +++ b/src/hub/heartbeat-filter.ts @@ -0,0 +1,43 @@ +import { stripHeartbeatToken } from "../heartbeat/index.js"; + +function collapseWhitespace(value: string): string { + return value.replace(/\s+/g, " ").trim(); +} + +/** + * Extract assistant text from an agent stream event. + * Supports both string and rich content-array message shapes. + */ +export function extractAssistantEventText(event: unknown): string { + if (!event || typeof event !== "object") return ""; + const message = (event as { message?: unknown }).message; + if (!message || typeof message !== "object") return ""; + const content = (message as { content?: unknown }).content; + + if (typeof content === "string") { + return collapseWhitespace(content); + } + + if (!Array.isArray(content)) return ""; + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const text = (block as { text?: unknown }).text; + if (typeof text === "string" && text.trim()) { + parts.push(text); + } + } + return collapseWhitespace(parts.join("\n")); +} + +/** + * True only for pure heartbeat ACK payloads (e.g. "HEARTBEAT_OK"). + * Messages that include any extra text are not suppressed. + */ +export function isHeartbeatAckEvent(event: unknown): boolean { + const text = extractAssistantEventText(event); + if (!text) return false; + const stripped = stripHeartbeatToken(text, { mode: "message" }); + return stripped.shouldSkip && stripped.didStrip; +} + diff --git a/src/hub/hub.ts b/src/hub/hub.ts index a83e4289..3941f515 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -22,6 +22,9 @@ import { createListAgentsHandler } from "./rpc/handlers/list-agents.js"; import { createCreateAgentHandler } from "./rpc/handlers/create-agent.js"; import { createDeleteAgentHandler } from "./rpc/handlers/delete-agent.js"; import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js"; +import { createGetLastHeartbeatHandler } from "./rpc/handlers/get-last-heartbeat.js"; +import { createSetHeartbeatsHandler } from "./rpc/handlers/set-heartbeats.js"; +import { createWakeHeartbeatHandler } from "./rpc/handlers/wake-heartbeat.js"; import { DeviceStore, type DeviceMeta } from "./device-store.js"; import { createVerifyHandler } from "./rpc/handlers/verify.js"; import { ExecApprovalManager } from "./exec-approval-manager.js"; @@ -31,15 +34,33 @@ import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/ import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js"; import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js"; import { getCronService, shutdownCronService, executeCronJob } from "../cron/index.js"; +import { + getLastHeartbeatEvent, + onHeartbeatEvent, + requestHeartbeatNow, + runHeartbeatOnce, + setHeartbeatsEnabled, + startHeartbeatRunner, + type HeartbeatEventPayload, + type HeartbeatRunResult, + type HeartbeatRunner, +} from "../heartbeat/index.js"; +import { enqueueSystemEvent } from "../heartbeat/system-events.js"; +import { isHeartbeatAckEvent } from "./heartbeat-filter.js"; export class Hub { private readonly agents = new Map(); private readonly agentSenders = new Map(); private readonly agentStreamIds = new Map(); private readonly agentStreamCounters = new Map(); + private readonly pendingAssistantStarts = new Map(); + private readonly suppressedStreamAgents = new Set(); private readonly localApprovalHandlers = new Map void>(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; + private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>(); + private heartbeatRunner: HeartbeatRunner | null = null; + private heartbeatUnsubscribe: (() => void) | null = null; private client: GatewayClient; readonly deviceStore: DeviceStore; private _onConfirmDevice: ((deviceId: string, agentId: string, meta?: DeviceMeta) => Promise) | null = null; @@ -77,6 +98,9 @@ export class Hub { this.rpc.register("createAgent", createCreateAgentHandler(this)); this.rpc.register("deleteAgent", createDeleteAgentHandler(this)); this.rpc.register("updateGateway", createUpdateGatewayHandler(this)); + this.rpc.register("last-heartbeat", createGetLastHeartbeatHandler(this)); + this.rpc.register("set-heartbeats", createSetHeartbeatsHandler(this)); + this.rpc.register("wake-heartbeat", createWakeHeartbeatHandler(this)); // Initialize exec approval manager this.approvalManager = new ExecApprovalManager((agentId, payload) => { @@ -103,6 +127,7 @@ export class Hub { // Initialize and start cron service this.initCronService(); + this.initHeartbeatService(); this.client = this.createClient(this.url); this.client.connect(); @@ -119,6 +144,32 @@ export class Hub { console.log("[Hub] Cron service initialized"); } + /** Initialize heartbeat runner + event fanout. */ + private initHeartbeatService(): void { + this.heartbeatRunner = startHeartbeatRunner({ + getAgent: () => this.getDefaultAgent(), + logger: console, + }); + + this.heartbeatUnsubscribe = onHeartbeatEvent((event) => { + for (const listener of this.heartbeatListeners) { + try { + listener(event); + } catch { + // Keep fanout resilient against listener errors. + } + } + }); + + console.log("[Hub] Heartbeat service initialized"); + } + + private getDefaultAgent(): AsyncAgent | null { + const first = this.listAgents()[0]; + if (!first) return null; + return this.getAgent(first) ?? null; + } + /** Restore agents from persistent storage */ private restoreAgents(): void { const records = loadAgentRecords(); @@ -279,6 +330,7 @@ export class Hub { // Internally consume agent output (AgentEvent stream + error Messages) void this.consumeAgent(agent); + this.heartbeatRunner?.updateConfig(); console.log(`Agent created: ${agent.sessionId}`); return agent; @@ -313,6 +365,14 @@ export class Hub { this.agentStreamIds.delete(agentId); } + private clearPendingAssistantStarts(agentId: string): void { + for (const [streamId, pending] of this.pendingAssistantStarts) { + if (pending.agentId === agentId) { + this.pendingAssistantStarts.delete(streamId); + } + } + } + /** Internally read agent output and send via Gateway */ private async consumeAgent(agent: AsyncAgent): Promise { for await (const item of agent.read()) { @@ -327,6 +387,20 @@ export class Hub { content: item.content, }); } else { + const suppressForAgent = this.suppressedStreamAgents.has(agent.sessionId); + + // Suppress all user-visible stream events during silent heartbeat runs. + if (suppressForAgent) { + if (item.type === "message_start") { + this.beginStream(agent.sessionId, item); + } else if (item.type === "message_end") { + const streamId = this.getActiveStreamId(agent.sessionId, item); + this.pendingAssistantStarts.delete(streamId); + this.endStream(agent.sessionId); + } + continue; + } + // Compaction events: forward with synthetic streamId (no stream tracking) const isCompactionEvent = item.type === "compaction_start" || item.type === "compaction_end"; @@ -348,18 +422,55 @@ export class Hub { || item.type === "tool_execution_end"; if (!shouldForward) continue; - if (item.type === "message_start") { - this.beginStream(agent.sessionId, item); + const isAssistantMessageEvent = + item.type === "message_start" || item.type === "message_update" || item.type === "message_end"; + + // Delay assistant message_start forwarding until we see content. + // This lets us suppress pure HEARTBEAT_OK acknowledgements end-to-end. + if (isAssistantMessageEvent && isAssistantMessage) { + if (item.type === "message_start") { + const streamId = this.beginStream(agent.sessionId, item); + this.pendingAssistantStarts.set(streamId, { agentId: agent.sessionId, event: item }); + continue; + } + + const streamId = this.getActiveStreamId(agent.sessionId, item); + const isHeartbeatAck = isHeartbeatAckEvent(item); + if (isHeartbeatAck) { + if (item.type === "message_end") { + this.pendingAssistantStarts.delete(streamId); + this.endStream(agent.sessionId); + } + continue; + } + + const pendingStart = this.pendingAssistantStarts.get(streamId); + if (pendingStart) { + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: pendingStart.event, + }); + this.pendingAssistantStarts.delete(streamId); + } + + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: item, + }); + if (item.type === "message_end") { + this.endStream(agent.sessionId); + } + continue; } + const streamId = this.getActiveStreamId(agent.sessionId, item); this.client.send(targetDeviceId, StreamAction, { streamId, agentId: agent.sessionId, event: item, }); - if (item.type === "message_end") { - this.endStream(agent.sessionId); - } } } } @@ -507,6 +618,63 @@ export class Hub { .map(([id]) => id); } + /** Subscribe heartbeat state updates. Returns unsubscribe callback. */ + onHeartbeatEvent(callback: (event: HeartbeatEventPayload) => void): () => void { + this.heartbeatListeners.add(callback); + return () => { + this.heartbeatListeners.delete(callback); + }; + } + + /** Get latest heartbeat event payload. */ + getLastHeartbeat(): HeartbeatEventPayload | null { + return getLastHeartbeatEvent(); + } + + /** Enable/disable heartbeat runner globally. */ + setHeartbeatsEnabled(enabled: boolean): void { + setHeartbeatsEnabled(enabled); + this.heartbeatRunner?.updateConfig(); + } + + /** Enqueue a heartbeat wake request. */ + requestHeartbeatNow(opts?: { reason?: string }): void { + requestHeartbeatNow(opts); + } + + /** Run heartbeat immediately using the current default agent. */ + async runHeartbeatOnce(opts?: { reason?: string }): Promise { + const agent = this.getDefaultAgent(); + const reason = opts?.reason; + const shouldSuppressStreams = reason === "manual"; + if (shouldSuppressStreams && agent) { + this.suppressedStreamAgents.add(agent.sessionId); + } + + try { + if (reason) { + return runHeartbeatOnce({ + agent, + reason, + }); + } + return runHeartbeatOnce({ + agent, + }); + } finally { + if (shouldSuppressStreams && agent) { + this.suppressedStreamAgents.delete(agent.sessionId); + } + } + } + + /** Enqueue a system event for a specific agent or the default agent. */ + enqueueSystemEvent(text: string, opts?: { agentId?: string }): void { + const agentId = opts?.agentId ?? this.listAgents()[0]; + if (!agentId) return; + enqueueSystemEvent(text, { sessionKey: agentId }); + } + closeAgent(id: string): boolean { const agent = this.agents.get(id); if (!agent) return false; @@ -516,14 +684,22 @@ export class Hub { this.agentSenders.delete(id); this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); + this.clearPendingAssistantStarts(id); + this.suppressedStreamAgents.delete(id); this.localApprovalHandlers.delete(id); removeAgentRecord(id); + this.heartbeatRunner?.updateConfig(); return true; } shutdown(): void { // Stop cron service shutdownCronService(); + this.heartbeatRunner?.stop(); + this.heartbeatRunner = null; + this.heartbeatUnsubscribe?.(); + this.heartbeatUnsubscribe = null; + this.heartbeatListeners.clear(); // Finalize subagent registry before closing agents shutdownSubagentRegistry(); @@ -534,6 +710,8 @@ export class Hub { this.agentSenders.delete(id); this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); + this.clearPendingAssistantStarts(id); + this.suppressedStreamAgents.delete(id); this.localApprovalHandlers.delete(id); } this.client.disconnect(); diff --git a/src/hub/rpc/handlers/get-last-heartbeat.ts b/src/hub/rpc/handlers/get-last-heartbeat.ts new file mode 100644 index 00000000..5c6def87 --- /dev/null +++ b/src/hub/rpc/handlers/get-last-heartbeat.ts @@ -0,0 +1,9 @@ +import type { RpcHandler } from "../dispatcher.js"; + +interface HubLike { + getLastHeartbeat(): unknown; +} + +export function createGetLastHeartbeatHandler(hub: HubLike): RpcHandler { + return () => hub.getLastHeartbeat(); +} diff --git a/src/hub/rpc/handlers/set-heartbeats.ts b/src/hub/rpc/handlers/set-heartbeats.ts new file mode 100644 index 00000000..1defd64d --- /dev/null +++ b/src/hub/rpc/handlers/set-heartbeats.ts @@ -0,0 +1,18 @@ +import type { RpcHandler } from "../dispatcher.js"; +import { RpcError } from "../dispatcher.js"; + +interface HubLike { + setHeartbeatsEnabled(enabled: boolean): void; +} + +export function createSetHeartbeatsHandler(hub: HubLike): RpcHandler { + return (params) => { + const enabled = (params as { enabled?: unknown } | undefined)?.enabled; + if (typeof enabled !== "boolean") { + throw new RpcError("INVALID_REQUEST", "enabled (boolean) is required"); + } + + hub.setHeartbeatsEnabled(enabled); + return { ok: true, enabled }; + }; +} diff --git a/src/hub/rpc/handlers/wake-heartbeat.ts b/src/hub/rpc/handlers/wake-heartbeat.ts new file mode 100644 index 00000000..cb3e9a65 --- /dev/null +++ b/src/hub/rpc/handlers/wake-heartbeat.ts @@ -0,0 +1,14 @@ +import type { RpcHandler } from "../dispatcher.js"; + +interface HubLike { + requestHeartbeatNow(opts?: { reason?: string }): void; +} + +export function createWakeHeartbeatHandler(hub: HubLike): RpcHandler { + return (params) => { + const reasonRaw = (params as { reason?: unknown } | undefined)?.reason; + const reason = typeof reasonRaw === "string" ? reasonRaw.trim() : ""; + hub.requestHeartbeatNow({ reason: reason || "manual" }); + return { ok: true }; + }; +} diff --git a/src/hub/rpc/index.ts b/src/hub/rpc/index.ts index 7bd88565..4560d976 100644 --- a/src/hub/rpc/index.ts +++ b/src/hub/rpc/index.ts @@ -5,3 +5,6 @@ export { createListAgentsHandler } from "./handlers/list-agents.js"; export { createCreateAgentHandler } from "./handlers/create-agent.js"; export { createDeleteAgentHandler } from "./handlers/delete-agent.js"; export { createUpdateGatewayHandler } from "./handlers/update-gateway.js"; +export { createGetLastHeartbeatHandler } from "./handlers/get-last-heartbeat.js"; +export { createSetHeartbeatsHandler } from "./handlers/set-heartbeats.js"; +export { createWakeHeartbeatHandler } from "./handlers/wake-heartbeat.js"; diff --git a/src/index.ts b/src/index.ts index 4ecfa6ca..961b5b46 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,3 +2,4 @@ export * from "./agent/index.js"; export * from "./gateway/index.js"; export * from "./client/index.js"; export * from "./shared/index.js"; +export * from "./heartbeat/index.js";