From 8c87c502817bd3f4d91a28136fd8d1bd7415bba8 Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Fri, 6 Feb 2026 16:42:14 +0800 Subject: [PATCH] feat(hub): wire heartbeat controls into hub and cron wake mode --- src/cron/execute.ts | 42 +++++---- src/hub/hub.ts | 99 ++++++++++++++++++++++ src/hub/rpc/handlers/get-last-heartbeat.ts | 9 ++ src/hub/rpc/handlers/set-heartbeats.ts | 18 ++++ src/hub/rpc/handlers/wake-heartbeat.ts | 14 +++ src/hub/rpc/index.ts | 3 + 6 files changed, 168 insertions(+), 17 deletions(-) create mode 100644 src/hub/rpc/handlers/get-last-heartbeat.ts create mode 100644 src/hub/rpc/handlers/set-heartbeats.ts create mode 100644 src/hub/rpc/handlers/wake-heartbeat.ts diff --git a/src/cron/execute.ts b/src/cron/execute.ts index 23b1526e..3511b544 100644 --- a/src/cron/execute.ts +++ b/src/cron/execute.ts @@ -44,6 +44,10 @@ async function executeSystemEvent(job: CronJob): Promise { const hub = getHub(); const payload = job.payload as { kind: "system-event"; text: string }; + const text = payload.text.trim(); + if (!text) { + return { error: "system-event payload requires non-empty text" }; + } // Get the list of active agents const agentIds = hub.listAgents(); @@ -54,25 +58,29 @@ async function executeSystemEvent(job: CronJob): Promise { // For now, inject into the first (main) agent // TODO: Support targeting specific agent by ID const agentId = agentIds[0]!; - const agent = hub.getAgent(agentId); - if (!agent || agent.closed) { - return { error: `Agent ${agentId} not found or closed` }; + const cronMessage = `[CRON] ${job.name}: ${text}`; + + hub.enqueueSystemEvent(cronMessage, { agentId }); + + if (job.wakeMode === "now") { + const result = await hub.runHeartbeatOnce({ reason: `cron:${job.id}` }); + if (result.status === "failed") { + return { error: result.reason }; + } + if (result.status === "skipped") { + return { + summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wake skipped: ${result.reason})`, + }; + } + return { + summary: `Enqueued cron event and triggered immediate heartbeat for agent ${agentId.slice(0, 8)}`, + }; } - // Format the cron message with metadata - const cronMessage = `[CRON] ${job.name}: ${payload.text}`; - - try { - // Write to agent (non-blocking, will be processed in queue) - agent.write(cronMessage); - - // Wait for the agent to process the message - await agent.waitForIdle(); - - return { summary: `Injected message into agent ${agentId.slice(0, 8)}` }; - } catch (err) { - return { error: err instanceof Error ? err.message : String(err) }; - } + hub.requestHeartbeatNow({ reason: `cron:${job.id}` }); + return { + summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wakeMode: next-heartbeat)`, + }; } /** diff --git a/src/hub/hub.ts b/src/hub/hub.ts index a83e4289..786476c8 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -22,6 +22,9 @@ import { createListAgentsHandler } from "./rpc/handlers/list-agents.js"; import { createCreateAgentHandler } from "./rpc/handlers/create-agent.js"; import { createDeleteAgentHandler } from "./rpc/handlers/delete-agent.js"; import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js"; +import { createGetLastHeartbeatHandler } from "./rpc/handlers/get-last-heartbeat.js"; +import { createSetHeartbeatsHandler } from "./rpc/handlers/set-heartbeats.js"; +import { createWakeHeartbeatHandler } from "./rpc/handlers/wake-heartbeat.js"; import { DeviceStore, type DeviceMeta } from "./device-store.js"; import { createVerifyHandler } from "./rpc/handlers/verify.js"; import { ExecApprovalManager } from "./exec-approval-manager.js"; @@ -31,6 +34,18 @@ import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/ import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js"; import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js"; import { getCronService, shutdownCronService, executeCronJob } from "../cron/index.js"; +import { + getLastHeartbeatEvent, + onHeartbeatEvent, + requestHeartbeatNow, + runHeartbeatOnce, + setHeartbeatsEnabled, + startHeartbeatRunner, + type HeartbeatEventPayload, + type HeartbeatRunResult, + type HeartbeatRunner, +} from "../heartbeat/index.js"; +import { enqueueSystemEvent } from "../heartbeat/system-events.js"; export class Hub { private readonly agents = new Map(); @@ -40,6 +55,9 @@ export class Hub { private readonly localApprovalHandlers = new Map void>(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; + private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>(); + private heartbeatRunner: HeartbeatRunner | null = null; + private heartbeatUnsubscribe: (() => void) | null = null; private client: GatewayClient; readonly deviceStore: DeviceStore; private _onConfirmDevice: ((deviceId: string, agentId: string, meta?: DeviceMeta) => Promise) | null = null; @@ -77,6 +95,9 @@ export class Hub { this.rpc.register("createAgent", createCreateAgentHandler(this)); this.rpc.register("deleteAgent", createDeleteAgentHandler(this)); this.rpc.register("updateGateway", createUpdateGatewayHandler(this)); + this.rpc.register("last-heartbeat", createGetLastHeartbeatHandler(this)); + this.rpc.register("set-heartbeats", createSetHeartbeatsHandler(this)); + this.rpc.register("wake-heartbeat", createWakeHeartbeatHandler(this)); // Initialize exec approval manager this.approvalManager = new ExecApprovalManager((agentId, payload) => { @@ -103,6 +124,7 @@ export class Hub { // Initialize and start cron service this.initCronService(); + this.initHeartbeatService(); this.client = this.createClient(this.url); this.client.connect(); @@ -119,6 +141,32 @@ export class Hub { console.log("[Hub] Cron service initialized"); } + /** Initialize heartbeat runner + event fanout. */ + private initHeartbeatService(): void { + this.heartbeatRunner = startHeartbeatRunner({ + getAgent: () => this.getDefaultAgent(), + logger: console, + }); + + this.heartbeatUnsubscribe = onHeartbeatEvent((event) => { + for (const listener of this.heartbeatListeners) { + try { + listener(event); + } catch { + // Keep fanout resilient against listener errors. + } + } + }); + + console.log("[Hub] Heartbeat service initialized"); + } + + private getDefaultAgent(): AsyncAgent | null { + const first = this.listAgents()[0]; + if (!first) return null; + return this.getAgent(first) ?? null; + } + /** Restore agents from persistent storage */ private restoreAgents(): void { const records = loadAgentRecords(); @@ -279,6 +327,7 @@ export class Hub { // Internally consume agent output (AgentEvent stream + error Messages) void this.consumeAgent(agent); + this.heartbeatRunner?.updateConfig(); console.log(`Agent created: ${agent.sessionId}`); return agent; @@ -507,6 +556,50 @@ export class Hub { .map(([id]) => id); } + /** Subscribe heartbeat state updates. Returns unsubscribe callback. */ + onHeartbeatEvent(callback: (event: HeartbeatEventPayload) => void): () => void { + this.heartbeatListeners.add(callback); + return () => { + this.heartbeatListeners.delete(callback); + }; + } + + /** Get latest heartbeat event payload. */ + getLastHeartbeat(): HeartbeatEventPayload | null { + return getLastHeartbeatEvent(); + } + + /** Enable/disable heartbeat runner globally. */ + setHeartbeatsEnabled(enabled: boolean): void { + setHeartbeatsEnabled(enabled); + this.heartbeatRunner?.updateConfig(); + } + + /** Enqueue a heartbeat wake request. */ + requestHeartbeatNow(opts?: { reason?: string }): void { + requestHeartbeatNow(opts); + } + + /** Run heartbeat immediately using the current default agent. */ + async runHeartbeatOnce(opts?: { reason?: string }): Promise { + if (opts?.reason) { + return runHeartbeatOnce({ + agent: this.getDefaultAgent(), + reason: opts.reason, + }); + } + return runHeartbeatOnce({ + agent: this.getDefaultAgent(), + }); + } + + /** Enqueue a system event for a specific agent or the default agent. */ + enqueueSystemEvent(text: string, opts?: { agentId?: string }): void { + const agentId = opts?.agentId ?? this.listAgents()[0]; + if (!agentId) return; + enqueueSystemEvent(text, { sessionKey: agentId }); + } + closeAgent(id: string): boolean { const agent = this.agents.get(id); if (!agent) return false; @@ -518,12 +611,18 @@ export class Hub { this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); removeAgentRecord(id); + this.heartbeatRunner?.updateConfig(); return true; } shutdown(): void { // Stop cron service shutdownCronService(); + this.heartbeatRunner?.stop(); + this.heartbeatRunner = null; + this.heartbeatUnsubscribe?.(); + this.heartbeatUnsubscribe = null; + this.heartbeatListeners.clear(); // Finalize subagent registry before closing agents shutdownSubagentRegistry(); diff --git a/src/hub/rpc/handlers/get-last-heartbeat.ts b/src/hub/rpc/handlers/get-last-heartbeat.ts new file mode 100644 index 00000000..5c6def87 --- /dev/null +++ b/src/hub/rpc/handlers/get-last-heartbeat.ts @@ -0,0 +1,9 @@ +import type { RpcHandler } from "../dispatcher.js"; + +interface HubLike { + getLastHeartbeat(): unknown; +} + +export function createGetLastHeartbeatHandler(hub: HubLike): RpcHandler { + return () => hub.getLastHeartbeat(); +} diff --git a/src/hub/rpc/handlers/set-heartbeats.ts b/src/hub/rpc/handlers/set-heartbeats.ts new file mode 100644 index 00000000..1defd64d --- /dev/null +++ b/src/hub/rpc/handlers/set-heartbeats.ts @@ -0,0 +1,18 @@ +import type { RpcHandler } from "../dispatcher.js"; +import { RpcError } from "../dispatcher.js"; + +interface HubLike { + setHeartbeatsEnabled(enabled: boolean): void; +} + +export function createSetHeartbeatsHandler(hub: HubLike): RpcHandler { + return (params) => { + const enabled = (params as { enabled?: unknown } | undefined)?.enabled; + if (typeof enabled !== "boolean") { + throw new RpcError("INVALID_REQUEST", "enabled (boolean) is required"); + } + + hub.setHeartbeatsEnabled(enabled); + return { ok: true, enabled }; + }; +} diff --git a/src/hub/rpc/handlers/wake-heartbeat.ts b/src/hub/rpc/handlers/wake-heartbeat.ts new file mode 100644 index 00000000..cb3e9a65 --- /dev/null +++ b/src/hub/rpc/handlers/wake-heartbeat.ts @@ -0,0 +1,14 @@ +import type { RpcHandler } from "../dispatcher.js"; + +interface HubLike { + requestHeartbeatNow(opts?: { reason?: string }): void; +} + +export function createWakeHeartbeatHandler(hub: HubLike): RpcHandler { + return (params) => { + const reasonRaw = (params as { reason?: unknown } | undefined)?.reason; + const reason = typeof reasonRaw === "string" ? reasonRaw.trim() : ""; + hub.requestHeartbeatNow({ reason: reason || "manual" }); + return { ok: true }; + }; +} diff --git a/src/hub/rpc/index.ts b/src/hub/rpc/index.ts index 7bd88565..4560d976 100644 --- a/src/hub/rpc/index.ts +++ b/src/hub/rpc/index.ts @@ -5,3 +5,6 @@ export { createListAgentsHandler } from "./handlers/list-agents.js"; export { createCreateAgentHandler } from "./handlers/create-agent.js"; export { createDeleteAgentHandler } from "./handlers/delete-agent.js"; export { createUpdateGatewayHandler } from "./handlers/update-gateway.js"; +export { createGetLastHeartbeatHandler } from "./handlers/get-last-heartbeat.js"; +export { createSetHeartbeatsHandler } from "./handlers/set-heartbeats.js"; +export { createWakeHeartbeatHandler } from "./handlers/wake-heartbeat.js";