Merge pull request #54 from multica-ai/fetch-agent-history

feat(hub): add RPC protocol for fetching agent history
This commit is contained in:
LinYushen 2026-02-02 14:10:52 +08:00 committed by GitHub
commit 4d4da4ed43
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 432 additions and 2 deletions

235
docs/rpc.md Normal file
View file

@ -0,0 +1,235 @@
# Hub RPC Protocol
The Hub exposes an RPC (Remote Procedure Call) interface over the Gateway WebSocket transport. Clients can invoke methods on the Hub and receive structured responses, all routed through the same Gateway message layer used for regular chat.
## Architecture Overview
```
Client (SDK) Gateway (WebSocket) Hub
| | |
|-- send(RequestAction) ------->|-- route to Hub ----------->|
| | |-- dispatch(method, params)
| | |-- handler executes
|<-- receive(ResponseAction) ---|<-- route to Client --------|
| | |
```
1. The **Client** calls `client.request(hubDeviceId, method, params)`.
2. The SDK generates a `requestId` (UUIDv7), wraps it into a `RequestPayload`, and sends a message with `action = "request"` to the Hub via the Gateway.
3. The **Gateway** routes the message to the Hub's socket (standard device-to-device routing).
4. The **Hub** detects `action === "request"` in its `onMessage` handler and delegates to `RpcDispatcher.dispatch()`.
5. The dispatcher looks up the registered handler for the given `method` and invokes it.
6. The Hub sends back a message with `action = "response"` containing either a success or error payload, addressed to the original sender.
7. The **Client SDK** intercepts incoming `"response"` messages in its `RECEIVE` listener, matches by `requestId`, and resolves (or rejects) the corresponding `Promise`.
## Message Format
All RPC messages use the standard `RoutedMessage` envelope:
```ts
interface RoutedMessage<T> {
id: string; // UUIDv7 message ID
uid: string | null;
from: string; // sender deviceId
to: string; // recipient deviceId
action: string; // "request" or "response"
payload: T;
}
```
### Request Payload
```ts
interface RequestPayload<T = unknown> {
requestId: string; // UUIDv7, generated by the SDK
method: string; // RPC method name
params?: T; // method-specific parameters
}
```
### Response Payload (Success)
```ts
interface ResponseSuccessPayload<T = unknown> {
requestId: string; // matches the request
ok: true;
payload: T; // method-specific result
}
```
### Response Payload (Error)
```ts
interface ResponseErrorPayload {
requestId: string; // matches the request
ok: false;
error: {
code: string; // machine-readable error code
message: string; // human-readable description
};
}
```
## Error Codes
| Code | Description |
|---|---|
| `METHOD_NOT_FOUND` | The requested RPC method does not exist. |
| `INVALID_PARAMS` | Missing or malformed parameters. |
| `AGENT_NOT_FOUND` | No session file found for the given agent ID. |
| `RPC_ERROR` | Catch-all for unexpected errors. |
## Client SDK Usage
The `GatewayClient` provides a `request()` method that handles the full request/response lifecycle:
```ts
request<T = unknown>(
to: string, // target deviceId (Hub's deviceId)
method: string, // RPC method name
params?: unknown, // method parameters
timeout?: number, // timeout in ms (default: 10000)
): Promise<T>
```
The method:
- Generates a `requestId` internally.
- Sends a `RequestPayload` via the Gateway.
- Returns a `Promise` that resolves with the response payload on success, or rejects with an `Error` on failure or timeout.
- Automatically cleans up pending requests on disconnect.
### Example
```ts
import { GatewayClient, type GetAgentMessagesResult } from "@multica/sdk";
const client = new GatewayClient({
url: "http://localhost:3000",
deviceId: "my-client",
deviceType: "client",
});
client.connect();
client.onRegistered(async () => {
try {
const result = await client.request<GetAgentMessagesResult>(
"hub-device-id",
"getAgentMessages",
{ agentId: "019abc12-...", offset: 0, limit: 20 },
);
console.log(`Total: ${result.total}, returned: ${result.messages.length}`);
} catch (err) {
console.error("RPC failed:", err.message);
}
});
```
## Available RPC Methods
### `getAgentMessages`
Retrieves the message history for a given agent session. Works for both active and closed agents as long as the session file exists on disk.
**Parameters:**
```ts
interface GetAgentMessagesParams {
agentId: string; // required - the agent/session ID
offset?: number; // starting index (default: 0)
limit?: number; // max messages to return (default: 50)
}
```
**Response:**
```ts
interface GetAgentMessagesResult {
messages: AgentMessage[]; // array of messages
total: number; // total message count in the session
offset: number; // the offset used
limit: number; // the limit used
}
```
Each `AgentMessage` in the array is one of:
- **UserMessage** (`role: "user"`) - User input (text or multimodal content).
- **AssistantMessage** (`role: "assistant"`) - LLM response, may contain `TextContent`, `ThinkingContent`, or `ToolCall` blocks. Includes `usage` (token counts and costs), `model`, `provider`, and `stopReason`.
- **ToolResultMessage** (`role: "toolResult"`) - Result of a tool invocation, with `toolCallId`, `toolName`, `content`, and `isError`.
**Example request:**
```ts
const result = await client.request<GetAgentMessagesResult>(
hubDeviceId,
"getAgentMessages",
{ agentId: "019abc12-3def-7000-8000-000000000001", offset: 0, limit: 10 },
);
```
**Example success response payload:**
```json
{
"requestId": "019abc12-...",
"ok": true,
"payload": {
"messages": [
{ "role": "user", "content": "Hello", "timestamp": 1700000000000 },
{
"role": "assistant",
"content": [{ "type": "text", "text": "Hi! How can I help?" }],
"model": "claude-sonnet-4-20250514",
"provider": "anthropic",
"usage": { "input": 10, "output": 15, "totalTokens": 25 },
"stopReason": "end_turn",
"timestamp": 1700000001000
}
],
"total": 42,
"offset": 0,
"limit": 10
}
}
```
**Example error response payload:**
```json
{
"requestId": "019abc12-...",
"ok": false,
"error": {
"code": "AGENT_NOT_FOUND",
"message": "No session found for agent: 019abc12-bad-id"
}
}
```
## Adding New RPC Methods
1. Create a handler file in `src/hub/rpc/handlers/`:
```ts
// src/hub/rpc/handlers/my-method.ts
import { RpcError, type RpcHandler } from "../dispatcher.js";
export function createMyMethodHandler(): RpcHandler {
return (params: unknown) => {
if (!params || typeof params !== "object") {
throw new RpcError("INVALID_PARAMS", "params must be an object");
}
// ... validate and handle
return { /* result */ };
};
}
```
2. Register it in `src/hub/hub.ts` constructor:
```ts
this.rpc.register("myMethod", createMyMethodHandler());
```
3. (Optional) Add typed params/result interfaces in `packages/sdk/src/actions/rpc.ts` and export them from `packages/sdk/src/actions/index.ts` for client-side type safety.

View file

@ -14,6 +14,8 @@ export {
type ResponseErrorPayload,
isResponseSuccess,
isResponseError,
type GetAgentMessagesParams,
type GetAgentMessagesResult,
} from "./rpc.js";
export { StreamAction, type StreamPayload } from "./stream.js";

View file

@ -5,6 +5,8 @@ export const ResponseAction = "response" as const;
/** 请求帧 payload */
export interface RequestPayload<T = unknown> {
/** 请求 ID由客户端生成服务端原样回传到 ResponsePayload.requestId */
requestId: string;
/** 调用的方法名 */
method: string;
/** 方法参数 */
@ -53,3 +55,20 @@ export function isResponseError(
): response is ResponseErrorPayload {
return response.ok === false;
}
// ============ RPC Method Types ============
/** getAgentMessages - request params */
export interface GetAgentMessagesParams {
agentId: string;
offset?: number;
limit?: number;
}
/** getAgentMessages - response payload */
export interface GetAgentMessagesResult {
messages: unknown[];
total: number;
offset: number;
limit: number;
}

View file

@ -11,6 +11,19 @@ import type {
DeviceType,
} from "./types.js";
import { GatewayEvents } from "./types.js";
import {
RequestAction,
ResponseAction,
type RequestPayload,
type ResponsePayload,
isResponseSuccess,
} from "./actions/rpc.js";
interface PendingRequest<T = unknown> {
resolve: (value: T) => void;
reject: (reason: Error) => void;
timer: ReturnType<typeof setTimeout>;
}
interface ResolvedOptions {
url: string;
@ -26,6 +39,7 @@ export class GatewayClient {
private options: ResolvedOptions;
private callbacks: GatewayClientCallbacks = {};
private _state: ConnectionState = "disconnected";
private pendingRequests = new Map<string, PendingRequest>();
constructor(options: GatewayClientOptions) {
if (!options.deviceId) {
@ -149,6 +163,32 @@ export class GatewayClient {
});
}
/** Send an RPC request and wait for the response */
request<T = unknown>(
to: string,
method: string,
params?: unknown,
timeout = 10_000,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
if (!this.socket || !this.isRegistered) {
reject(new Error("Not registered"));
return;
}
const requestId = this.generateMessageId();
const timer = setTimeout(() => {
this.pendingRequests.delete(requestId);
reject(new Error(`RPC request timed out: ${method}`));
}, timeout);
this.pendingRequests.set(requestId, { resolve: resolve as (v: unknown) => void, reject, timer });
const payload: RequestPayload = { requestId, method, params };
this.send(to, RequestAction, payload);
});
}
/** 注册连接回调 */
onConnect(callback: (socketId: string) => void): this {
this.callbacks.onConnect = callback;
@ -219,6 +259,12 @@ export class GatewayClient {
this.socket.on("disconnect", (reason: string) => {
this.setState("disconnected");
// Reject all pending RPC requests
for (const [id, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error("Disconnected"));
}
this.pendingRequests.clear();
this.callbacks.onDisconnect?.(reason);
});
@ -235,6 +281,21 @@ export class GatewayClient {
);
this.socket.on(GatewayEvents.RECEIVE, (message: RoutedMessage) => {
// Intercept RPC responses and resolve pending requests
if (message.action === ResponseAction) {
const response = message.payload as ResponsePayload;
const pending = this.pendingRequests.get(response.requestId);
if (pending) {
this.pendingRequests.delete(response.requestId);
clearTimeout(pending.timer);
if (isResponseSuccess(response)) {
pending.resolve(response.payload);
} else {
pending.reject(new Error(`RPC error [${response.error.code}]: ${response.error.message}`));
}
return;
}
}
this.callbacks.onMessage?.(message);
});

View file

@ -1,12 +1,22 @@
import type { HubOptions } from "./types.js";
import { GatewayClient, type ConnectionState } from "@multica/sdk";
import {
GatewayClient,
type ConnectionState,
RequestAction,
ResponseAction,
type RequestPayload,
type ResponseSuccessPayload,
type ResponseErrorPayload,
} from "@multica/sdk";
import { AsyncAgent } from "../agent/async-agent.js";
import { getHubId } from "./hub-identity.js";
import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js";
import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js";
import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
private readonly agentSenders = new Map<string, string>();
private readonly rpc: RpcDispatcher;
private client: GatewayClient;
url: string;
readonly path: string;
@ -21,6 +31,10 @@ export class Hub {
this.url = url;
this.path = path ?? "/ws";
this.hubId = getHubId();
this.rpc = new RpcDispatcher();
this.rpc.register("getAgentMessages", createGetAgentMessagesHandler());
this.client = this.createClient(this.url);
this.client.connect();
this.restoreAgents();
@ -61,6 +75,15 @@ export class Hub {
client.onMessage((msg) => {
console.log(`[Hub] Received message: id=${msg.id} from=${msg.from} to=${msg.to} action=${msg.action} payload=${JSON.stringify(msg.payload)}`);
// RPC request
if (msg.action === RequestAction) {
const payload = msg.payload as RequestPayload;
void this.handleRpc(msg.from, payload);
return;
}
// Regular chat message
const payload = msg.payload as { agentId?: string; content?: string } | undefined;
const agentId = payload?.agentId;
const content = payload?.content;
@ -131,6 +154,28 @@ export class Hub {
}
}
/** Handle RPC request and send response back via Gateway */
private async handleRpc(from: string, request: RequestPayload): Promise<void> {
const { requestId, method } = request;
try {
const result = await this.rpc.dispatch(method, request.params);
this.client.send<ResponseSuccessPayload>(from, ResponseAction, {
requestId,
ok: true,
payload: result,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const code = err instanceof RpcError ? err.code : "RPC_ERROR";
console.error(`[Hub] RPC error: method=${method} code=${code} error=${message}`);
this.client.send<ResponseErrorPayload>(from, ResponseAction, {
requestId,
ok: false,
error: { code, message },
});
}
}
getAgent(id: string): AsyncAgent | undefined {
return this.agents.get(id);
}

32
src/hub/rpc/dispatcher.ts Normal file
View file

@ -0,0 +1,32 @@
export type RpcHandler = (params: unknown) => unknown | Promise<unknown>;
export class RpcError extends Error {
constructor(
public readonly code: string,
message: string,
) {
super(message);
this.name = "RpcError";
}
}
export class RpcDispatcher {
private readonly handlers = new Map<string, RpcHandler>();
/** Register an RPC method handler */
register(method: string, handler: RpcHandler): void {
if (this.handlers.has(method)) {
throw new Error(`RPC method already registered: ${method}`);
}
this.handlers.set(method, handler);
}
/** Dispatch an RPC request to its handler */
async dispatch(method: string, params: unknown): Promise<unknown> {
const handler = this.handlers.get(method);
if (!handler) {
throw new RpcError("METHOD_NOT_FOUND", `Unknown RPC method: ${method}`);
}
return handler(params);
}
}

View file

@ -0,0 +1,34 @@
import { existsSync } from "fs";
import { SessionManager } from "../../../agent/session/session-manager.js";
import { resolveSessionPath } from "../../../agent/session/storage.js";
import { RpcError, type RpcHandler } from "../dispatcher.js";
interface GetAgentMessagesParams {
agentId: string;
offset?: number;
limit?: number;
}
export function createGetAgentMessagesHandler(): RpcHandler {
return (params: unknown) => {
if (!params || typeof params !== "object") {
throw new RpcError("INVALID_PARAMS", "params must be an object");
}
const { agentId, offset = 0, limit = 50 } = params as GetAgentMessagesParams;
if (!agentId) {
throw new RpcError("INVALID_PARAMS", "Missing required param: agentId");
}
const sessionPath = resolveSessionPath(agentId);
if (!existsSync(sessionPath)) {
throw new RpcError("AGENT_NOT_FOUND", `No session found for agent: ${agentId}`);
}
const session = new SessionManager({ sessionId: agentId });
const allMessages = session.loadMessages();
const total = allMessages.length;
const sliced = allMessages.slice(offset, offset + limit);
return { messages: sliced, total, offset, limit };
};
}

2
src/hub/rpc/index.ts Normal file
View file

@ -0,0 +1,2 @@
export { RpcDispatcher, RpcError, type RpcHandler } from "./dispatcher.js";
export { createGetAgentMessagesHandler } from "./handlers/get-agent-messages.js";