feat(hub): wire heartbeat controls into hub and cron wake mode

This commit is contained in:
Jiang Bohan 2026-02-06 16:42:14 +08:00
parent ffa5355a95
commit 8c87c50281
6 changed files with 168 additions and 17 deletions

View file

@ -44,6 +44,10 @@ async function executeSystemEvent(job: CronJob): Promise<ExecutionResult> {
const hub = getHub();
const payload = job.payload as { kind: "system-event"; text: string };
const text = payload.text.trim();
if (!text) {
return { error: "system-event payload requires non-empty text" };
}
// Get the list of active agents
const agentIds = hub.listAgents();
@ -54,25 +58,29 @@ async function executeSystemEvent(job: CronJob): Promise<ExecutionResult> {
// For now, inject into the first (main) agent
// TODO: Support targeting specific agent by ID
const agentId = agentIds[0]!;
const agent = hub.getAgent(agentId);
if (!agent || agent.closed) {
return { error: `Agent ${agentId} not found or closed` };
const cronMessage = `[CRON] ${job.name}: ${text}`;
hub.enqueueSystemEvent(cronMessage, { agentId });
if (job.wakeMode === "now") {
const result = await hub.runHeartbeatOnce({ reason: `cron:${job.id}` });
if (result.status === "failed") {
return { error: result.reason };
}
if (result.status === "skipped") {
return {
summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wake skipped: ${result.reason})`,
};
}
return {
summary: `Enqueued cron event and triggered immediate heartbeat for agent ${agentId.slice(0, 8)}`,
};
}
// Format the cron message with metadata
const cronMessage = `[CRON] ${job.name}: ${payload.text}`;
try {
// Write to agent (non-blocking, will be processed in queue)
agent.write(cronMessage);
// Wait for the agent to process the message
await agent.waitForIdle();
return { summary: `Injected message into agent ${agentId.slice(0, 8)}` };
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
hub.requestHeartbeatNow({ reason: `cron:${job.id}` });
return {
summary: `Enqueued cron event for agent ${agentId.slice(0, 8)} (wakeMode: next-heartbeat)`,
};
}
/**

View file

@ -22,6 +22,9 @@ import { createListAgentsHandler } from "./rpc/handlers/list-agents.js";
import { createCreateAgentHandler } from "./rpc/handlers/create-agent.js";
import { createDeleteAgentHandler } from "./rpc/handlers/delete-agent.js";
import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js";
import { createGetLastHeartbeatHandler } from "./rpc/handlers/get-last-heartbeat.js";
import { createSetHeartbeatsHandler } from "./rpc/handlers/set-heartbeats.js";
import { createWakeHeartbeatHandler } from "./rpc/handlers/wake-heartbeat.js";
import { DeviceStore, type DeviceMeta } from "./device-store.js";
import { createVerifyHandler } from "./rpc/handlers/verify.js";
import { ExecApprovalManager } from "./exec-approval-manager.js";
@ -31,6 +34,18 @@ import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/
import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js";
import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js";
import { getCronService, shutdownCronService, executeCronJob } from "../cron/index.js";
import {
getLastHeartbeatEvent,
onHeartbeatEvent,
requestHeartbeatNow,
runHeartbeatOnce,
setHeartbeatsEnabled,
startHeartbeatRunner,
type HeartbeatEventPayload,
type HeartbeatRunResult,
type HeartbeatRunner,
} from "../heartbeat/index.js";
import { enqueueSystemEvent } from "../heartbeat/system-events.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
@ -40,6 +55,9 @@ export class Hub {
private readonly localApprovalHandlers = new Map<string, (payload: ExecApprovalRequest) => void>();
private readonly rpc: RpcDispatcher;
private readonly approvalManager: ExecApprovalManager;
private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>();
private heartbeatRunner: HeartbeatRunner | null = null;
private heartbeatUnsubscribe: (() => void) | null = null;
private client: GatewayClient;
readonly deviceStore: DeviceStore;
private _onConfirmDevice: ((deviceId: string, agentId: string, meta?: DeviceMeta) => Promise<boolean>) | null = null;
@ -77,6 +95,9 @@ export class Hub {
this.rpc.register("createAgent", createCreateAgentHandler(this));
this.rpc.register("deleteAgent", createDeleteAgentHandler(this));
this.rpc.register("updateGateway", createUpdateGatewayHandler(this));
this.rpc.register("last-heartbeat", createGetLastHeartbeatHandler(this));
this.rpc.register("set-heartbeats", createSetHeartbeatsHandler(this));
this.rpc.register("wake-heartbeat", createWakeHeartbeatHandler(this));
// Initialize exec approval manager
this.approvalManager = new ExecApprovalManager((agentId, payload) => {
@ -103,6 +124,7 @@ export class Hub {
// Initialize and start cron service
this.initCronService();
this.initHeartbeatService();
this.client = this.createClient(this.url);
this.client.connect();
@ -119,6 +141,32 @@ export class Hub {
console.log("[Hub] Cron service initialized");
}
/** Initialize heartbeat runner + event fanout. */
private initHeartbeatService(): void {
this.heartbeatRunner = startHeartbeatRunner({
getAgent: () => this.getDefaultAgent(),
logger: console,
});
this.heartbeatUnsubscribe = onHeartbeatEvent((event) => {
for (const listener of this.heartbeatListeners) {
try {
listener(event);
} catch {
// Keep fanout resilient against listener errors.
}
}
});
console.log("[Hub] Heartbeat service initialized");
}
private getDefaultAgent(): AsyncAgent | null {
const first = this.listAgents()[0];
if (!first) return null;
return this.getAgent(first) ?? null;
}
/** Restore agents from persistent storage */
private restoreAgents(): void {
const records = loadAgentRecords();
@ -279,6 +327,7 @@ export class Hub {
// Internally consume agent output (AgentEvent stream + error Messages)
void this.consumeAgent(agent);
this.heartbeatRunner?.updateConfig();
console.log(`Agent created: ${agent.sessionId}`);
return agent;
@ -507,6 +556,50 @@ export class Hub {
.map(([id]) => id);
}
/** Subscribe heartbeat state updates. Returns unsubscribe callback. */
onHeartbeatEvent(callback: (event: HeartbeatEventPayload) => void): () => void {
this.heartbeatListeners.add(callback);
return () => {
this.heartbeatListeners.delete(callback);
};
}
/** Get latest heartbeat event payload. */
getLastHeartbeat(): HeartbeatEventPayload | null {
return getLastHeartbeatEvent();
}
/** Enable/disable heartbeat runner globally. */
setHeartbeatsEnabled(enabled: boolean): void {
setHeartbeatsEnabled(enabled);
this.heartbeatRunner?.updateConfig();
}
/** Enqueue a heartbeat wake request. */
requestHeartbeatNow(opts?: { reason?: string }): void {
requestHeartbeatNow(opts);
}
/** Run heartbeat immediately using the current default agent. */
async runHeartbeatOnce(opts?: { reason?: string }): Promise<HeartbeatRunResult> {
if (opts?.reason) {
return runHeartbeatOnce({
agent: this.getDefaultAgent(),
reason: opts.reason,
});
}
return runHeartbeatOnce({
agent: this.getDefaultAgent(),
});
}
/** Enqueue a system event for a specific agent or the default agent. */
enqueueSystemEvent(text: string, opts?: { agentId?: string }): void {
const agentId = opts?.agentId ?? this.listAgents()[0];
if (!agentId) return;
enqueueSystemEvent(text, { sessionKey: agentId });
}
closeAgent(id: string): boolean {
const agent = this.agents.get(id);
if (!agent) return false;
@ -518,12 +611,18 @@ export class Hub {
this.agentStreamCounters.delete(id);
this.localApprovalHandlers.delete(id);
removeAgentRecord(id);
this.heartbeatRunner?.updateConfig();
return true;
}
shutdown(): void {
// Stop cron service
shutdownCronService();
this.heartbeatRunner?.stop();
this.heartbeatRunner = null;
this.heartbeatUnsubscribe?.();
this.heartbeatUnsubscribe = null;
this.heartbeatListeners.clear();
// Finalize subagent registry before closing agents
shutdownSubagentRegistry();

View file

@ -0,0 +1,9 @@
import type { RpcHandler } from "../dispatcher.js";
interface HubLike {
getLastHeartbeat(): unknown;
}
export function createGetLastHeartbeatHandler(hub: HubLike): RpcHandler {
return () => hub.getLastHeartbeat();
}

View file

@ -0,0 +1,18 @@
import type { RpcHandler } from "../dispatcher.js";
import { RpcError } from "../dispatcher.js";
interface HubLike {
setHeartbeatsEnabled(enabled: boolean): void;
}
export function createSetHeartbeatsHandler(hub: HubLike): RpcHandler {
return (params) => {
const enabled = (params as { enabled?: unknown } | undefined)?.enabled;
if (typeof enabled !== "boolean") {
throw new RpcError("INVALID_REQUEST", "enabled (boolean) is required");
}
hub.setHeartbeatsEnabled(enabled);
return { ok: true, enabled };
};
}

View file

@ -0,0 +1,14 @@
import type { RpcHandler } from "../dispatcher.js";
interface HubLike {
requestHeartbeatNow(opts?: { reason?: string }): void;
}
export function createWakeHeartbeatHandler(hub: HubLike): RpcHandler {
return (params) => {
const reasonRaw = (params as { reason?: unknown } | undefined)?.reason;
const reason = typeof reasonRaw === "string" ? reasonRaw.trim() : "";
hub.requestHeartbeatNow({ reason: reason || "manual" });
return { ok: true };
};
}

View file

@ -5,3 +5,6 @@ export { createListAgentsHandler } from "./handlers/list-agents.js";
export { createCreateAgentHandler } from "./handlers/create-agent.js";
export { createDeleteAgentHandler } from "./handlers/delete-agent.js";
export { createUpdateGatewayHandler } from "./handlers/update-gateway.js";
export { createGetLastHeartbeatHandler } from "./handlers/get-last-heartbeat.js";
export { createSetHeartbeatsHandler } from "./handlers/set-heartbeats.js";
export { createWakeHeartbeatHandler } from "./handlers/wake-heartbeat.js";