From 1e7d5f66ec862b1418cc843ca10001958f9bc3ca Mon Sep 17 00:00:00 2001 From: yushen Date: Mon, 2 Feb 2026 14:10:09 +0800 Subject: [PATCH] feat(hub): add RPC dispatcher with structured error handling Add RpcDispatcher and RpcError for method dispatch with typed error codes (METHOD_NOT_FOUND, INVALID_PARAMS, AGENT_NOT_FOUND). Implement getAgentMessages handler that checks session files on disk, supporting both active and closed agents. Wire up RPC request/response flow in Hub. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub.ts | 49 +++++++++++++++++++++- src/hub/rpc/dispatcher.ts | 32 ++++++++++++++ src/hub/rpc/handlers/get-agent-messages.ts | 34 +++++++++++++++ src/hub/rpc/index.ts | 2 + 4 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 src/hub/rpc/dispatcher.ts create mode 100644 src/hub/rpc/handlers/get-agent-messages.ts create mode 100644 src/hub/rpc/index.ts diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 19a61b1b..5765cef1 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -1,12 +1,22 @@ -import type { HubOptions } from "./types.js"; -import { GatewayClient, type ConnectionState } from "@multica/sdk"; +import { + GatewayClient, + type ConnectionState, + RequestAction, + ResponseAction, + type RequestPayload, + type ResponseSuccessPayload, + type ResponseErrorPayload, +} from "@multica/sdk"; import { AsyncAgent } from "../agent/async-agent.js"; import { getHubId } from "./hub-identity.js"; import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; +import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; +import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; export class Hub { private readonly agents = new Map(); private readonly agentSenders = new Map(); + private readonly rpc: RpcDispatcher; private client: GatewayClient; url: string; readonly path: string; @@ -21,6 +31,10 @@ export class Hub { this.url = url; this.path = path ?? "/ws"; this.hubId = getHubId(); + + this.rpc = new RpcDispatcher(); + this.rpc.register("getAgentMessages", createGetAgentMessagesHandler()); + this.client = this.createClient(this.url); this.client.connect(); this.restoreAgents(); @@ -61,6 +75,15 @@ export class Hub { client.onMessage((msg) => { console.log(`[Hub] Received message: id=${msg.id} from=${msg.from} to=${msg.to} action=${msg.action} payload=${JSON.stringify(msg.payload)}`); + + // RPC request + if (msg.action === RequestAction) { + const payload = msg.payload as RequestPayload; + void this.handleRpc(msg.from, payload); + return; + } + + // Regular chat message const payload = msg.payload as { agentId?: string; content?: string } | undefined; const agentId = payload?.agentId; const content = payload?.content; @@ -131,6 +154,28 @@ export class Hub { } } + /** Handle RPC request and send response back via Gateway */ + private async handleRpc(from: string, request: RequestPayload): Promise { + const { requestId, method } = request; + try { + const result = await this.rpc.dispatch(method, request.params); + this.client.send(from, ResponseAction, { + requestId, + ok: true, + payload: result, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const code = err instanceof RpcError ? err.code : "RPC_ERROR"; + console.error(`[Hub] RPC error: method=${method} code=${code} error=${message}`); + this.client.send(from, ResponseAction, { + requestId, + ok: false, + error: { code, message }, + }); + } + } + getAgent(id: string): AsyncAgent | undefined { return this.agents.get(id); } diff --git a/src/hub/rpc/dispatcher.ts b/src/hub/rpc/dispatcher.ts new file mode 100644 index 00000000..1484568d --- /dev/null +++ b/src/hub/rpc/dispatcher.ts @@ -0,0 +1,32 @@ +export type RpcHandler = (params: unknown) => unknown | Promise; + +export class RpcError extends Error { + constructor( + public readonly code: string, + message: string, + ) { + super(message); + this.name = "RpcError"; + } +} + +export class RpcDispatcher { + private readonly handlers = new Map(); + + /** Register an RPC method handler */ + register(method: string, handler: RpcHandler): void { + if (this.handlers.has(method)) { + throw new Error(`RPC method already registered: ${method}`); + } + this.handlers.set(method, handler); + } + + /** Dispatch an RPC request to its handler */ + async dispatch(method: string, params: unknown): Promise { + const handler = this.handlers.get(method); + if (!handler) { + throw new RpcError("METHOD_NOT_FOUND", `Unknown RPC method: ${method}`); + } + return handler(params); + } +} diff --git a/src/hub/rpc/handlers/get-agent-messages.ts b/src/hub/rpc/handlers/get-agent-messages.ts new file mode 100644 index 00000000..15060fff --- /dev/null +++ b/src/hub/rpc/handlers/get-agent-messages.ts @@ -0,0 +1,34 @@ +import { existsSync } from "fs"; +import { SessionManager } from "../../../agent/session/session-manager.js"; +import { resolveSessionPath } from "../../../agent/session/storage.js"; +import { RpcError, type RpcHandler } from "../dispatcher.js"; + +interface GetAgentMessagesParams { + agentId: string; + offset?: number; + limit?: number; +} + +export function createGetAgentMessagesHandler(): RpcHandler { + return (params: unknown) => { + if (!params || typeof params !== "object") { + throw new RpcError("INVALID_PARAMS", "params must be an object"); + } + const { agentId, offset = 0, limit = 50 } = params as GetAgentMessagesParams; + if (!agentId) { + throw new RpcError("INVALID_PARAMS", "Missing required param: agentId"); + } + + const sessionPath = resolveSessionPath(agentId); + if (!existsSync(sessionPath)) { + throw new RpcError("AGENT_NOT_FOUND", `No session found for agent: ${agentId}`); + } + + const session = new SessionManager({ sessionId: agentId }); + const allMessages = session.loadMessages(); + const total = allMessages.length; + const sliced = allMessages.slice(offset, offset + limit); + + return { messages: sliced, total, offset, limit }; + }; +} diff --git a/src/hub/rpc/index.ts b/src/hub/rpc/index.ts new file mode 100644 index 00000000..c482999a --- /dev/null +++ b/src/hub/rpc/index.ts @@ -0,0 +1,2 @@ +export { RpcDispatcher, RpcError, type RpcHandler } from "./dispatcher.js"; +export { createGetAgentMessagesHandler } from "./handlers/get-agent-messages.js";