From 254174515257ab9ac19c483fa6b30f16f406d3fb Mon Sep 17 00:00:00 2001 From: yushen Date: Mon, 2 Feb 2026 14:10:00 +0800 Subject: [PATCH 1/3] feat(sdk): add RPC request method and types to GatewayClient Add request() method that handles full request/response lifecycle with auto-generated requestId, timeout handling, and pending request cleanup on disconnect. Also add GetAgentMessagesParams and GetAgentMessagesResult type definitions. Co-Authored-By: Claude Opus 4.5 --- packages/sdk/src/actions/index.ts | 2 + packages/sdk/src/actions/rpc.ts | 19 ++++++++++ packages/sdk/src/client.ts | 61 +++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/packages/sdk/src/actions/index.ts b/packages/sdk/src/actions/index.ts index 8b49ef75..d60637ee 100644 --- a/packages/sdk/src/actions/index.ts +++ b/packages/sdk/src/actions/index.ts @@ -14,6 +14,8 @@ export { type ResponseErrorPayload, isResponseSuccess, isResponseError, + type GetAgentMessagesParams, + type GetAgentMessagesResult, } from "./rpc.js"; export { StreamAction, type StreamPayload } from "./stream.js"; diff --git a/packages/sdk/src/actions/rpc.ts b/packages/sdk/src/actions/rpc.ts index 15344de2..0cdc3200 100644 --- a/packages/sdk/src/actions/rpc.ts +++ b/packages/sdk/src/actions/rpc.ts @@ -5,6 +5,8 @@ export const ResponseAction = "response" as const; /** 请求帧 payload */ export interface RequestPayload { + /** 请求 ID,由客户端生成,服务端原样回传到 ResponsePayload.requestId */ + requestId: string; /** 调用的方法名 */ method: string; /** 方法参数 */ @@ -53,3 +55,20 @@ export function isResponseError( ): response is ResponseErrorPayload { return response.ok === false; } + +// ============ RPC Method Types ============ + +/** getAgentMessages - request params */ +export interface GetAgentMessagesParams { + agentId: string; + offset?: number; + limit?: number; +} + +/** getAgentMessages - response payload */ +export interface GetAgentMessagesResult { + messages: unknown[]; + total: number; + offset: number; + limit: number; +} diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 4a126a2d..05849715 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -11,6 +11,19 @@ import type { DeviceType, } from "./types.js"; import { GatewayEvents } from "./types.js"; +import { + RequestAction, + ResponseAction, + type RequestPayload, + type ResponsePayload, + isResponseSuccess, +} from "./actions/rpc.js"; + +interface PendingRequest { + resolve: (value: T) => void; + reject: (reason: Error) => void; + timer: ReturnType; +} interface ResolvedOptions { url: string; @@ -26,6 +39,7 @@ export class GatewayClient { private options: ResolvedOptions; private callbacks: GatewayClientCallbacks = {}; private _state: ConnectionState = "disconnected"; + private pendingRequests = new Map(); constructor(options: GatewayClientOptions) { if (!options.deviceId) { @@ -149,6 +163,32 @@ export class GatewayClient { }); } + /** Send an RPC request and wait for the response */ + request( + to: string, + method: string, + params?: unknown, + timeout = 10_000, + ): Promise { + return new Promise((resolve, reject) => { + if (!this.socket || !this.isRegistered) { + reject(new Error("Not registered")); + return; + } + + const requestId = this.generateMessageId(); + const timer = setTimeout(() => { + this.pendingRequests.delete(requestId); + reject(new Error(`RPC request timed out: ${method}`)); + }, timeout); + + this.pendingRequests.set(requestId, { resolve: resolve as (v: unknown) => void, reject, timer }); + + const payload: RequestPayload = { requestId, method, params }; + this.send(to, RequestAction, payload); + }); + } + /** 注册连接回调 */ onConnect(callback: (socketId: string) => void): this { this.callbacks.onConnect = callback; @@ -219,6 +259,12 @@ export class GatewayClient { this.socket.on("disconnect", (reason: string) => { this.setState("disconnected"); + // Reject all pending RPC requests + for (const [id, pending] of this.pendingRequests) { + clearTimeout(pending.timer); + pending.reject(new Error("Disconnected")); + } + this.pendingRequests.clear(); this.callbacks.onDisconnect?.(reason); }); @@ -235,6 +281,21 @@ export class GatewayClient { ); this.socket.on(GatewayEvents.RECEIVE, (message: RoutedMessage) => { + // Intercept RPC responses and resolve pending requests + if (message.action === ResponseAction) { + const response = message.payload as ResponsePayload; + const pending = this.pendingRequests.get(response.requestId); + if (pending) { + this.pendingRequests.delete(response.requestId); + clearTimeout(pending.timer); + if (isResponseSuccess(response)) { + pending.resolve(response.payload); + } else { + pending.reject(new Error(`RPC error [${response.error.code}]: ${response.error.message}`)); + } + return; + } + } this.callbacks.onMessage?.(message); }); From 1e7d5f66ec862b1418cc843ca10001958f9bc3ca Mon Sep 17 00:00:00 2001 From: yushen Date: Mon, 2 Feb 2026 14:10:09 +0800 Subject: [PATCH 2/3] feat(hub): add RPC dispatcher with structured error handling Add RpcDispatcher and RpcError for method dispatch with typed error codes (METHOD_NOT_FOUND, INVALID_PARAMS, AGENT_NOT_FOUND). Implement getAgentMessages handler that checks session files on disk, supporting both active and closed agents. Wire up RPC request/response flow in Hub. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub.ts | 49 +++++++++++++++++++++- src/hub/rpc/dispatcher.ts | 32 ++++++++++++++ src/hub/rpc/handlers/get-agent-messages.ts | 34 +++++++++++++++ src/hub/rpc/index.ts | 2 + 4 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 src/hub/rpc/dispatcher.ts create mode 100644 src/hub/rpc/handlers/get-agent-messages.ts create mode 100644 src/hub/rpc/index.ts diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 19a61b1b..5765cef1 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -1,12 +1,22 @@ -import type { HubOptions } from "./types.js"; -import { GatewayClient, type ConnectionState } from "@multica/sdk"; +import { + GatewayClient, + type ConnectionState, + RequestAction, + ResponseAction, + type RequestPayload, + type ResponseSuccessPayload, + type ResponseErrorPayload, +} from "@multica/sdk"; import { AsyncAgent } from "../agent/async-agent.js"; import { getHubId } from "./hub-identity.js"; import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; +import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; +import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; export class Hub { private readonly agents = new Map(); private readonly agentSenders = new Map(); + private readonly rpc: RpcDispatcher; private client: GatewayClient; url: string; readonly path: string; @@ -21,6 +31,10 @@ export class Hub { this.url = url; this.path = path ?? "/ws"; this.hubId = getHubId(); + + this.rpc = new RpcDispatcher(); + this.rpc.register("getAgentMessages", createGetAgentMessagesHandler()); + this.client = this.createClient(this.url); this.client.connect(); this.restoreAgents(); @@ -61,6 +75,15 @@ export class Hub { client.onMessage((msg) => { console.log(`[Hub] Received message: id=${msg.id} from=${msg.from} to=${msg.to} action=${msg.action} payload=${JSON.stringify(msg.payload)}`); + + // RPC request + if (msg.action === RequestAction) { + const payload = msg.payload as RequestPayload; + void this.handleRpc(msg.from, payload); + return; + } + + // Regular chat message const payload = msg.payload as { agentId?: string; content?: string } | undefined; const agentId = payload?.agentId; const content = payload?.content; @@ -131,6 +154,28 @@ export class Hub { } } + /** Handle RPC request and send response back via Gateway */ + private async handleRpc(from: string, request: RequestPayload): Promise { + const { requestId, method } = request; + try { + const result = await this.rpc.dispatch(method, request.params); + this.client.send(from, ResponseAction, { + requestId, + ok: true, + payload: result, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const code = err instanceof RpcError ? err.code : "RPC_ERROR"; + console.error(`[Hub] RPC error: method=${method} code=${code} error=${message}`); + this.client.send(from, ResponseAction, { + requestId, + ok: false, + error: { code, message }, + }); + } + } + getAgent(id: string): AsyncAgent | undefined { return this.agents.get(id); } diff --git a/src/hub/rpc/dispatcher.ts b/src/hub/rpc/dispatcher.ts new file mode 100644 index 00000000..1484568d --- /dev/null +++ b/src/hub/rpc/dispatcher.ts @@ -0,0 +1,32 @@ +export type RpcHandler = (params: unknown) => unknown | Promise; + +export class RpcError extends Error { + constructor( + public readonly code: string, + message: string, + ) { + super(message); + this.name = "RpcError"; + } +} + +export class RpcDispatcher { + private readonly handlers = new Map(); + + /** Register an RPC method handler */ + register(method: string, handler: RpcHandler): void { + if (this.handlers.has(method)) { + throw new Error(`RPC method already registered: ${method}`); + } + this.handlers.set(method, handler); + } + + /** Dispatch an RPC request to its handler */ + async dispatch(method: string, params: unknown): Promise { + const handler = this.handlers.get(method); + if (!handler) { + throw new RpcError("METHOD_NOT_FOUND", `Unknown RPC method: ${method}`); + } + return handler(params); + } +} diff --git a/src/hub/rpc/handlers/get-agent-messages.ts b/src/hub/rpc/handlers/get-agent-messages.ts new file mode 100644 index 00000000..15060fff --- /dev/null +++ b/src/hub/rpc/handlers/get-agent-messages.ts @@ -0,0 +1,34 @@ +import { existsSync } from "fs"; +import { SessionManager } from "../../../agent/session/session-manager.js"; +import { resolveSessionPath } from "../../../agent/session/storage.js"; +import { RpcError, type RpcHandler } from "../dispatcher.js"; + +interface GetAgentMessagesParams { + agentId: string; + offset?: number; + limit?: number; +} + +export function createGetAgentMessagesHandler(): RpcHandler { + return (params: unknown) => { + if (!params || typeof params !== "object") { + throw new RpcError("INVALID_PARAMS", "params must be an object"); + } + const { agentId, offset = 0, limit = 50 } = params as GetAgentMessagesParams; + if (!agentId) { + throw new RpcError("INVALID_PARAMS", "Missing required param: agentId"); + } + + const sessionPath = resolveSessionPath(agentId); + if (!existsSync(sessionPath)) { + throw new RpcError("AGENT_NOT_FOUND", `No session found for agent: ${agentId}`); + } + + const session = new SessionManager({ sessionId: agentId }); + const allMessages = session.loadMessages(); + const total = allMessages.length; + const sliced = allMessages.slice(offset, offset + limit); + + return { messages: sliced, total, offset, limit }; + }; +} diff --git a/src/hub/rpc/index.ts b/src/hub/rpc/index.ts new file mode 100644 index 00000000..c482999a --- /dev/null +++ b/src/hub/rpc/index.ts @@ -0,0 +1,2 @@ +export { RpcDispatcher, RpcError, type RpcHandler } from "./dispatcher.js"; +export { createGetAgentMessagesHandler } from "./handlers/get-agent-messages.js"; From 812710c06af6d3b4bde7ec9e0db7bddcf825f449 Mon Sep 17 00:00:00 2001 From: yushen Date: Mon, 2 Feb 2026 14:10:17 +0800 Subject: [PATCH 3/3] docs(hub): add RPC protocol documentation Document the full RPC flow, message format, error codes, SDK usage, available methods (getAgentMessages), and guide for adding new methods. Co-Authored-By: Claude Opus 4.5 --- docs/rpc.md | 235 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 docs/rpc.md diff --git a/docs/rpc.md b/docs/rpc.md new file mode 100644 index 00000000..535a8078 --- /dev/null +++ b/docs/rpc.md @@ -0,0 +1,235 @@ +# Hub RPC Protocol + +The Hub exposes an RPC (Remote Procedure Call) interface over the Gateway WebSocket transport. Clients can invoke methods on the Hub and receive structured responses, all routed through the same Gateway message layer used for regular chat. + +## Architecture Overview + +``` +Client (SDK) Gateway (WebSocket) Hub + | | | + |-- send(RequestAction) ------->|-- route to Hub ----------->| + | | |-- dispatch(method, params) + | | |-- handler executes + |<-- receive(ResponseAction) ---|<-- route to Client --------| + | | | +``` + +1. The **Client** calls `client.request(hubDeviceId, method, params)`. +2. The SDK generates a `requestId` (UUIDv7), wraps it into a `RequestPayload`, and sends a message with `action = "request"` to the Hub via the Gateway. +3. The **Gateway** routes the message to the Hub's socket (standard device-to-device routing). +4. The **Hub** detects `action === "request"` in its `onMessage` handler and delegates to `RpcDispatcher.dispatch()`. +5. The dispatcher looks up the registered handler for the given `method` and invokes it. +6. The Hub sends back a message with `action = "response"` containing either a success or error payload, addressed to the original sender. +7. The **Client SDK** intercepts incoming `"response"` messages in its `RECEIVE` listener, matches by `requestId`, and resolves (or rejects) the corresponding `Promise`. + +## Message Format + +All RPC messages use the standard `RoutedMessage` envelope: + +```ts +interface RoutedMessage { + id: string; // UUIDv7 message ID + uid: string | null; + from: string; // sender deviceId + to: string; // recipient deviceId + action: string; // "request" or "response" + payload: T; +} +``` + +### Request Payload + +```ts +interface RequestPayload { + requestId: string; // UUIDv7, generated by the SDK + method: string; // RPC method name + params?: T; // method-specific parameters +} +``` + +### Response Payload (Success) + +```ts +interface ResponseSuccessPayload { + requestId: string; // matches the request + ok: true; + payload: T; // method-specific result +} +``` + +### Response Payload (Error) + +```ts +interface ResponseErrorPayload { + requestId: string; // matches the request + ok: false; + error: { + code: string; // machine-readable error code + message: string; // human-readable description + }; +} +``` + +## Error Codes + +| Code | Description | +|---|---| +| `METHOD_NOT_FOUND` | The requested RPC method does not exist. | +| `INVALID_PARAMS` | Missing or malformed parameters. | +| `AGENT_NOT_FOUND` | No session file found for the given agent ID. | +| `RPC_ERROR` | Catch-all for unexpected errors. | + +## Client SDK Usage + +The `GatewayClient` provides a `request()` method that handles the full request/response lifecycle: + +```ts +request( + to: string, // target deviceId (Hub's deviceId) + method: string, // RPC method name + params?: unknown, // method parameters + timeout?: number, // timeout in ms (default: 10000) +): Promise +``` + +The method: +- Generates a `requestId` internally. +- Sends a `RequestPayload` via the Gateway. +- Returns a `Promise` that resolves with the response payload on success, or rejects with an `Error` on failure or timeout. +- Automatically cleans up pending requests on disconnect. + +### Example + +```ts +import { GatewayClient, type GetAgentMessagesResult } from "@multica/sdk"; + +const client = new GatewayClient({ + url: "http://localhost:3000", + deviceId: "my-client", + deviceType: "client", +}); + +client.connect(); + +client.onRegistered(async () => { + try { + const result = await client.request( + "hub-device-id", + "getAgentMessages", + { agentId: "019abc12-...", offset: 0, limit: 20 }, + ); + console.log(`Total: ${result.total}, returned: ${result.messages.length}`); + } catch (err) { + console.error("RPC failed:", err.message); + } +}); +``` + +## Available RPC Methods + +### `getAgentMessages` + +Retrieves the message history for a given agent session. Works for both active and closed agents as long as the session file exists on disk. + +**Parameters:** + +```ts +interface GetAgentMessagesParams { + agentId: string; // required - the agent/session ID + offset?: number; // starting index (default: 0) + limit?: number; // max messages to return (default: 50) +} +``` + +**Response:** + +```ts +interface GetAgentMessagesResult { + messages: AgentMessage[]; // array of messages + total: number; // total message count in the session + offset: number; // the offset used + limit: number; // the limit used +} +``` + +Each `AgentMessage` in the array is one of: + +- **UserMessage** (`role: "user"`) - User input (text or multimodal content). +- **AssistantMessage** (`role: "assistant"`) - LLM response, may contain `TextContent`, `ThinkingContent`, or `ToolCall` blocks. Includes `usage` (token counts and costs), `model`, `provider`, and `stopReason`. +- **ToolResultMessage** (`role: "toolResult"`) - Result of a tool invocation, with `toolCallId`, `toolName`, `content`, and `isError`. + +**Example request:** + +```ts +const result = await client.request( + hubDeviceId, + "getAgentMessages", + { agentId: "019abc12-3def-7000-8000-000000000001", offset: 0, limit: 10 }, +); +``` + +**Example success response payload:** + +```json +{ + "requestId": "019abc12-...", + "ok": true, + "payload": { + "messages": [ + { "role": "user", "content": "Hello", "timestamp": 1700000000000 }, + { + "role": "assistant", + "content": [{ "type": "text", "text": "Hi! How can I help?" }], + "model": "claude-sonnet-4-20250514", + "provider": "anthropic", + "usage": { "input": 10, "output": 15, "totalTokens": 25 }, + "stopReason": "end_turn", + "timestamp": 1700000001000 + } + ], + "total": 42, + "offset": 0, + "limit": 10 + } +} +``` + +**Example error response payload:** + +```json +{ + "requestId": "019abc12-...", + "ok": false, + "error": { + "code": "AGENT_NOT_FOUND", + "message": "No session found for agent: 019abc12-bad-id" + } +} +``` + +## Adding New RPC Methods + +1. Create a handler file in `src/hub/rpc/handlers/`: + +```ts +// src/hub/rpc/handlers/my-method.ts +import { RpcError, type RpcHandler } from "../dispatcher.js"; + +export function createMyMethodHandler(): RpcHandler { + return (params: unknown) => { + if (!params || typeof params !== "object") { + throw new RpcError("INVALID_PARAMS", "params must be an object"); + } + // ... validate and handle + return { /* result */ }; + }; +} +``` + +2. Register it in `src/hub/hub.ts` constructor: + +```ts +this.rpc.register("myMethod", createMyMethodHandler()); +``` + +3. (Optional) Add typed params/result interfaces in `packages/sdk/src/actions/rpc.ts` and export them from `packages/sdk/src/actions/index.ts` for client-side type safety.