feat(hub): add RPC dispatcher with structured error handling

Add RpcDispatcher and RpcError for method dispatch with typed error
codes (METHOD_NOT_FOUND, INVALID_PARAMS, AGENT_NOT_FOUND). Implement
getAgentMessages handler that checks session files on disk, supporting
both active and closed agents. Wire up RPC request/response flow in Hub.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yushen 2026-02-02 14:10:09 +08:00
parent 2541745152
commit 1e7d5f66ec
4 changed files with 115 additions and 2 deletions

View file

@ -1,12 +1,22 @@
import type { HubOptions } from "./types.js";
import { GatewayClient, type ConnectionState } from "@multica/sdk";
import {
GatewayClient,
type ConnectionState,
RequestAction,
ResponseAction,
type RequestPayload,
type ResponseSuccessPayload,
type ResponseErrorPayload,
} from "@multica/sdk";
import { AsyncAgent } from "../agent/async-agent.js";
import { getHubId } from "./hub-identity.js";
import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js";
import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js";
import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
private readonly agentSenders = new Map<string, string>();
private readonly rpc: RpcDispatcher;
private client: GatewayClient;
url: string;
readonly path: string;
@ -21,6 +31,10 @@ export class Hub {
this.url = url;
this.path = path ?? "/ws";
this.hubId = getHubId();
this.rpc = new RpcDispatcher();
this.rpc.register("getAgentMessages", createGetAgentMessagesHandler());
this.client = this.createClient(this.url);
this.client.connect();
this.restoreAgents();
@ -61,6 +75,15 @@ export class Hub {
client.onMessage((msg) => {
console.log(`[Hub] Received message: id=${msg.id} from=${msg.from} to=${msg.to} action=${msg.action} payload=${JSON.stringify(msg.payload)}`);
// RPC request
if (msg.action === RequestAction) {
const payload = msg.payload as RequestPayload;
void this.handleRpc(msg.from, payload);
return;
}
// Regular chat message
const payload = msg.payload as { agentId?: string; content?: string } | undefined;
const agentId = payload?.agentId;
const content = payload?.content;
@ -131,6 +154,28 @@ export class Hub {
}
}
/** Handle RPC request and send response back via Gateway */
private async handleRpc(from: string, request: RequestPayload): Promise<void> {
const { requestId, method } = request;
try {
const result = await this.rpc.dispatch(method, request.params);
this.client.send<ResponseSuccessPayload>(from, ResponseAction, {
requestId,
ok: true,
payload: result,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const code = err instanceof RpcError ? err.code : "RPC_ERROR";
console.error(`[Hub] RPC error: method=${method} code=${code} error=${message}`);
this.client.send<ResponseErrorPayload>(from, ResponseAction, {
requestId,
ok: false,
error: { code, message },
});
}
}
getAgent(id: string): AsyncAgent | undefined {
return this.agents.get(id);
}

32
src/hub/rpc/dispatcher.ts Normal file
View file

@ -0,0 +1,32 @@
export type RpcHandler = (params: unknown) => unknown | Promise<unknown>;
export class RpcError extends Error {
constructor(
public readonly code: string,
message: string,
) {
super(message);
this.name = "RpcError";
}
}
export class RpcDispatcher {
private readonly handlers = new Map<string, RpcHandler>();
/** Register an RPC method handler */
register(method: string, handler: RpcHandler): void {
if (this.handlers.has(method)) {
throw new Error(`RPC method already registered: ${method}`);
}
this.handlers.set(method, handler);
}
/** Dispatch an RPC request to its handler */
async dispatch(method: string, params: unknown): Promise<unknown> {
const handler = this.handlers.get(method);
if (!handler) {
throw new RpcError("METHOD_NOT_FOUND", `Unknown RPC method: ${method}`);
}
return handler(params);
}
}

View file

@ -0,0 +1,34 @@
import { existsSync } from "fs";
import { SessionManager } from "../../../agent/session/session-manager.js";
import { resolveSessionPath } from "../../../agent/session/storage.js";
import { RpcError, type RpcHandler } from "../dispatcher.js";
interface GetAgentMessagesParams {
agentId: string;
offset?: number;
limit?: number;
}
export function createGetAgentMessagesHandler(): RpcHandler {
return (params: unknown) => {
if (!params || typeof params !== "object") {
throw new RpcError("INVALID_PARAMS", "params must be an object");
}
const { agentId, offset = 0, limit = 50 } = params as GetAgentMessagesParams;
if (!agentId) {
throw new RpcError("INVALID_PARAMS", "Missing required param: agentId");
}
const sessionPath = resolveSessionPath(agentId);
if (!existsSync(sessionPath)) {
throw new RpcError("AGENT_NOT_FOUND", `No session found for agent: ${agentId}`);
}
const session = new SessionManager({ sessionId: agentId });
const allMessages = session.loadMessages();
const total = allMessages.length;
const sliced = allMessages.slice(offset, offset + limit);
return { messages: sliced, total, offset, limit };
};
}

2
src/hub/rpc/index.ts Normal file
View file

@ -0,0 +1,2 @@
export { RpcDispatcher, RpcError, type RpcHandler } from "./dispatcher.js";
export { createGetAgentMessagesHandler } from "./handlers/get-agent-messages.js";