Implement WebSocket Gateway with NestJS and client SDK

- Add NestJS WebSocket Gateway with Socket.IO for real-time communication
- Create client SDK (GatewayClient) supporting both browser and Node.js
- Implement device registration and point-to-point message routing
- Add action types: request/response (RPC), stream (for chat messages)
- Integrate Pino logger for structured logging
- Configure heartbeat detection (pingInterval/pingTimeout)
- Use UUID v7 for time-ordered message IDs

Gateway features:
- Device registration with deviceId and deviceType (client/agent)
- Message routing between devices via Gateway
- HTTP API endpoints (/ping, /broadcast)
- Auto-reconnect support in client SDK

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yushen 2026-01-28 16:46:51 +08:00
parent b36769f913
commit 7d94b40a11
19 changed files with 2237 additions and 5 deletions

View file

@ -6,6 +6,7 @@
"main": "dist/index.js",
"scripts": {
"dev": "tsx src/index.ts",
"dev:gateway": "tsx --watch src/gateway/main.ts",
"build": "tsc",
"start": "node dist/index.js",
"typecheck": "tsc --noEmit"
@ -16,7 +17,24 @@
"packageManager": "pnpm@10.16.1",
"devDependencies": {
"@types/node": "^25.0.10",
"@types/uuid": "^11.0.0",
"socket.io-client": "^4.8.3",
"tsx": "^4.21.0",
"typescript": "^5.9.3"
},
"dependencies": {
"@nestjs/common": "^11.1.12",
"@nestjs/core": "^11.1.12",
"@nestjs/platform-express": "^11.1.12",
"@nestjs/platform-socket.io": "^11.1.12",
"@nestjs/websockets": "^11.1.12",
"nestjs-pino": "^4.5.0",
"pino": "^10.3.0",
"pino-http": "^11.0.0",
"pino-pretty": "^13.1.3",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.2",
"socket.io": "^4.8.3",
"uuid": "^13.0.0"
}
}

1354
pnpm-lock.yaml generated

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,33 @@
import { Controller, Get, Post, Body, Inject } from "@nestjs/common";
import { EventsGateway } from "./events.gateway.js";
@Controller()
export class AppController {
constructor(
@Inject(EventsGateway) private readonly eventsGateway: EventsGateway
) {}
@Get()
getHello(): { message: string; timestamp: string } {
return {
message: "Hello from Gateway!",
timestamp: new Date().toISOString(),
};
}
@Get("ping")
ping(): { ping: string } {
return { ping: "pong" };
}
@Post("broadcast")
broadcast(@Body() body: { text: string }): { success: boolean } {
// 通过 HTTP 接口广播消息给所有 WebSocket 客户端
this.eventsGateway.server.emit("message", {
from: "server",
text: body.text,
timestamp: new Date().toISOString(),
});
return { success: true };
}
}

30
src/gateway/app.module.ts Normal file
View file

@ -0,0 +1,30 @@
import { Module } from "@nestjs/common";
import { LoggerModule } from "nestjs-pino";
import { EventsGateway } from "./events.gateway.js";
import { AppController } from "./app.controller.js";
const isDev = process.env.NODE_ENV !== "production";
@Module({
imports: [
LoggerModule.forRoot({
pinoHttp: isDev
? {
transport: {
target: "pino-pretty",
options: {
colorize: true,
singleLine: true,
},
},
level: process.env.LOG_LEVEL ?? "debug",
}
: {
level: process.env.LOG_LEVEL ?? "info",
},
}),
],
providers: [EventsGateway],
controllers: [AppController],
})
export class AppModule {}

View file

@ -0,0 +1,184 @@
import { Injectable } from "@nestjs/common";
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
} from "@nestjs/websockets";
import type {
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
} from "@nestjs/websockets";
import type { Server, Socket } from "socket.io";
import { InjectPinoLogger, PinoLogger } from "nestjs-pino";
import {
GatewayEvents,
type RegisterPayload,
type RegisteredResponse,
type RoutedMessage,
type SendErrorResponse,
type PingPayload,
type PongResponse,
type DeviceInfo,
} from "../shared/gateway-sdk/index.js";
@Injectable()
@WebSocketGateway({
path: "/ws",
cors: {
origin: "*",
},
// 心跳检测配置
pingInterval: 25000, // 每 25 秒发送 PING
pingTimeout: 20000, // 20 秒内需响应,否则断开
})
export class EventsGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
constructor(
@InjectPinoLogger(EventsGateway.name)
private readonly logger: PinoLogger
) {}
@WebSocketServer()
server!: Server;
// deviceId -> socketId 映射
private deviceToSocket = new Map<string, string>();
// socketId -> deviceInfo 映射
private socketToDevice = new Map<string, DeviceInfo>();
afterInit(_server: Server): void {
this.logger.info("WebSocket Gateway initialized");
}
handleConnection(client: Socket): void {
this.logger.info({ socketId: client.id }, "Socket connected");
}
handleDisconnect(client: Socket): void {
const deviceInfo = this.socketToDevice.get(client.id);
if (deviceInfo) {
this.logger.info(
{ deviceId: deviceInfo.deviceId, deviceType: deviceInfo.deviceType },
"Device disconnected"
);
this.deviceToSocket.delete(deviceInfo.deviceId);
this.socketToDevice.delete(client.id);
} else {
this.logger.info({ socketId: client.id }, "Socket disconnected");
}
}
@SubscribeMessage(GatewayEvents.REGISTER)
handleRegister(
@MessageBody() data: RegisterPayload,
@ConnectedSocket() client: Socket
): void {
const { deviceId, deviceType, metadata } = data;
// 检查 deviceId 是否已被其他 socket 使用
const existingSocketId = this.deviceToSocket.get(deviceId);
if (existingSocketId && existingSocketId !== client.id) {
this.logger.warn(
{ deviceId, existingSocketId },
"Device already registered by another socket"
);
const response: RegisteredResponse = {
success: false,
deviceId,
error: "Device ID already in use",
};
client.emit(GatewayEvents.REGISTERED, response);
return;
}
// 注册设备
const deviceInfo: DeviceInfo = { deviceId, deviceType, metadata };
this.deviceToSocket.set(deviceId, client.id);
this.socketToDevice.set(client.id, deviceInfo);
this.logger.info({ deviceId, deviceType }, "Device registered");
const response: RegisteredResponse = { success: true, deviceId };
client.emit(GatewayEvents.REGISTERED, response);
}
@SubscribeMessage(GatewayEvents.SEND)
handleSend(
@MessageBody() message: RoutedMessage,
@ConnectedSocket() client: Socket
): void {
const senderDevice = this.socketToDevice.get(client.id);
// 检查发送者是否已注册
if (!senderDevice) {
const error: SendErrorResponse = {
messageId: message.id,
error: "Sender not registered",
code: "NOT_REGISTERED",
};
client.emit(GatewayEvents.SEND_ERROR, error);
return;
}
// 检查消息 from 是否匹配
if (message.from !== senderDevice.deviceId) {
const error: SendErrorResponse = {
messageId: message.id,
error: "Invalid sender ID",
code: "INVALID_MESSAGE",
};
client.emit(GatewayEvents.SEND_ERROR, error);
return;
}
// 查找目标设备
const targetSocketId = this.deviceToSocket.get(message.to);
if (!targetSocketId) {
const error: SendErrorResponse = {
messageId: message.id,
error: `Device ${message.to} not found`,
code: "DEVICE_NOT_FOUND",
};
client.emit(GatewayEvents.SEND_ERROR, error);
return;
}
// 转发消息
this.logger.debug(
{ messageId: message.id, from: message.from, to: message.to, action: message.action },
"Routing message"
);
this.server.to(targetSocketId).emit(GatewayEvents.RECEIVE, message);
}
@SubscribeMessage(GatewayEvents.PING)
handlePing(
@MessageBody() data: PingPayload,
@ConnectedSocket() client: Socket
): PongResponse {
this.logger.debug({ socketId: client.id, data }, "Received ping");
return { event: GatewayEvents.PONG, data: "Hello from Gateway!" };
}
/** 获取所有在线设备(供 HTTP API 使用) */
getOnlineDevices(): DeviceInfo[] {
return Array.from(this.socketToDevice.values());
}
/** 获取指定类型的在线设备 */
getOnlineDevicesByType(type: "client" | "agent"): DeviceInfo[] {
return this.getOnlineDevices().filter((d) => d.deviceType === type);
}
/** 向指定设备发送消息(供 HTTP API 使用) */
sendToDevice(deviceId: string, event: string, data: unknown): boolean {
const socketId = this.deviceToSocket.get(deviceId);
if (!socketId) return false;
this.server.to(socketId).emit(event, data);
return true;
}
}

View file

@ -0,0 +1,8 @@
import { Module } from "@nestjs/common";
import { EventsGateway } from "./events.gateway.js";
@Module({
providers: [EventsGateway],
exports: [EventsGateway],
})
export class GatewayModule {}

View file

@ -1,3 +0,0 @@
export class Gateway {
constructor(public readonly port: number) {}
}

View file

@ -1 +1,3 @@
export * from "./gateway.js";
export * from "./gateway.module.js";
export * from "./events.gateway.js";
export * from "./app.module.js";

21
src/gateway/main.ts Normal file
View file

@ -0,0 +1,21 @@
import "reflect-metadata";
import { NestFactory } from "@nestjs/core";
import { Logger } from "nestjs-pino";
import { AppModule } from "./app.module.js";
async function bootstrap(): Promise<void> {
const app = await NestFactory.create(AppModule, { bufferLogs: true });
app.useLogger(app.get(Logger));
const port = process.env["PORT"] ?? 3000;
await app.listen(port);
const logger = app.get(Logger);
logger.log(`Gateway is running on http://localhost:${port}`);
logger.log(`WebSocket endpoint: ws://localhost:${port}/ws`);
}
bootstrap().catch((err) => {
console.error("Failed to start gateway:", err);
process.exit(1);
});

View file

@ -0,0 +1,72 @@
import {
GatewayClient,
HelloAction,
HelloResponseAction,
type HelloPayload,
type HelloResponsePayload,
} from "../shared/gateway-sdk/index.js";
// 模拟一个 Client
const client = new GatewayClient({
url: "http://localhost:3000",
deviceId: "client-001",
deviceType: "client",
metadata: { name: "Test Client" },
});
// 模拟一个 Agent
const agent = new GatewayClient({
url: "http://localhost:3000",
deviceId: "agent-001",
deviceType: "agent",
metadata: { name: "Test Agent" },
});
// Agent 监听消息
agent
.onStateChange((state) => console.log("[Agent] State:", state))
.onRegistered((deviceId) => {
console.log("[Agent] Registered as:", deviceId);
})
.onMessage((message) => {
console.log("[Agent] Received message:", message);
// 回复消息
if (message.action === HelloAction) {
const payload = message.payload as HelloPayload;
console.log("[Agent] Replying to client...");
agent.send<HelloResponsePayload>(message.from, HelloResponseAction, {
reply: `Hello ${message.from}! I received: "${payload.greeting}"`,
});
}
})
.onSendError((error) => console.error("[Agent] Send error:", error))
.connect();
// Client 监听消息
client
.onStateChange((state) => console.log("[Client] State:", state))
.onRegistered((deviceId) => {
console.log("[Client] Registered as:", deviceId);
// 注册后发送消息给 Agent
setTimeout(() => {
console.log("[Client] Sending message to agent-001...");
client.send<HelloPayload>("agent-001", HelloAction, {
greeting: "Hello Agent!",
});
}, 500);
})
.onMessage((message) => {
console.log("[Client] Received message:", message);
})
.onSendError((error) => console.error("[Client] Send error:", error))
.connect();
// 5秒后断开
setTimeout(() => {
console.log("\nClosing connections...");
client.disconnect();
agent.disconnect();
process.exit(0);
}, 5000);

View file

@ -0,0 +1,14 @@
/** Hello Action - 测试用的问候消息 */
export const HelloAction = "hello" as const;
export const HelloResponseAction = "hello_response" as const;
/** Hello 请求 payload */
export interface HelloPayload {
greeting: string;
}
/** Hello 响应 payload */
export interface HelloResponsePayload {
reply: string;
}

View file

@ -0,0 +1,19 @@
export {
HelloAction,
HelloResponseAction,
type HelloPayload,
type HelloResponsePayload,
} from "./hello.js";
export {
RequestAction,
ResponseAction,
type RequestPayload,
type ResponsePayload,
type ResponseSuccessPayload,
type ResponseErrorPayload,
isResponseSuccess,
isResponseError,
} from "./rpc.js";
export { StreamAction, type StreamPayload } from "./stream.js";

View file

@ -0,0 +1,55 @@
/** RPC Actions - 请求/响应模式 */
export const RequestAction = "request" as const;
export const ResponseAction = "response" as const;
/** 请求帧 payload */
export interface RequestPayload<T = unknown> {
/** 调用的方法名 */
method: string;
/** 方法参数 */
params?: T;
}
/** 响应帧 payload - 成功 */
export interface ResponseSuccessPayload<T = unknown> {
/** 与请求消息 ID 匹配 */
requestId: string;
/** 是否成功 */
ok: true;
/** 返回数据 */
payload: T;
}
/** 响应帧 payload - 失败 */
export interface ResponseErrorPayload {
/** 与请求消息 ID 匹配 */
requestId: string;
/** 是否成功 */
ok: false;
/** 错误信息 */
error: {
code: string;
message: string;
retryable?: boolean;
};
}
/** 响应帧 payload联合类型 */
export type ResponsePayload<T = unknown> =
| ResponseSuccessPayload<T>
| ResponseErrorPayload;
/** 类型守卫:判断响应是否成功 */
export function isResponseSuccess<T>(
response: ResponsePayload<T>
): response is ResponseSuccessPayload<T> {
return response.ok === true;
}
/** 类型守卫:判断响应是否失败 */
export function isResponseError(
response: ResponsePayload
): response is ResponseErrorPayload {
return response.ok === false;
}

View file

@ -0,0 +1,11 @@
/** Stream Action - 流式消息传输 */
export const StreamAction = "stream" as const;
/** 流消息 payload */
export interface StreamPayload<T = unknown> {
/** 流 ID用于关联同一个流的所有消息 */
streamId: string;
/** 数据 */
data: T;
}

View file

@ -0,0 +1,261 @@
import { io, Socket } from "socket.io-client";
import { v7 as uuidv7 } from "uuid";
import type {
GatewayClientOptions,
GatewayClientCallbacks,
ConnectionState,
RoutedMessage,
RegisteredResponse,
SendErrorResponse,
PingPayload,
DeviceType,
} from "./types.js";
import { GatewayEvents } from "./types.js";
interface ResolvedOptions {
url: string;
path: string;
deviceId: string;
deviceType: DeviceType;
metadata: Record<string, unknown> | undefined;
autoReconnect: boolean;
reconnectDelay: number;
}
export class GatewayClient {
private socket: Socket | null = null;
private options: ResolvedOptions;
private callbacks: GatewayClientCallbacks = {};
private _state: ConnectionState = "disconnected";
constructor(options: GatewayClientOptions) {
if (!options.deviceId) {
throw new Error("deviceId is required");
}
this.options = {
url: options.url,
path: options.path ?? "/ws",
deviceId: options.deviceId,
deviceType: options.deviceType,
metadata: options.metadata,
autoReconnect: options.autoReconnect ?? true,
reconnectDelay: options.reconnectDelay ?? 1000,
};
}
/** 当前连接状态 */
get state(): ConnectionState {
return this._state;
}
/** 设备ID */
get deviceId(): string {
return this.options.deviceId;
}
/** 设备类型 */
get deviceType(): DeviceType {
return this.options.deviceType;
}
/** Socket ID连接后可用 */
get socketId(): string | undefined {
return this.socket?.id;
}
/** 是否已连接 */
get isConnected(): boolean {
return this._state === "connected" || this._state === "registered";
}
/** 是否已注册 */
get isRegistered(): boolean {
return this._state === "registered";
}
/** 连接到服务器 */
connect(): this {
if (this.socket) {
return this;
}
this.setState("connecting");
this.socket = io(this.options.url, {
path: this.options.path,
reconnection: this.options.autoReconnect,
reconnectionDelay: this.options.reconnectDelay,
});
this.setupListeners();
return this;
}
/** 断开连接 */
disconnect(): this {
if (this.socket) {
this.socket.disconnect();
this.socket = null;
}
this.setState("disconnected");
return this;
}
/** 发送消息给指定设备 */
send<T = unknown>(
to: string,
action: string,
payload: T,
messageId?: string
): string {
if (!this.socket || !this.isRegistered) {
throw new Error("Not registered");
}
const id = messageId ?? this.generateMessageId();
const message: RoutedMessage<T> = {
id,
uid: null,
from: this.options.deviceId,
to,
action,
payload,
timestamp: new Date().toISOString(),
};
this.socket.emit(GatewayEvents.SEND, message);
return id;
}
/** 发送 ping */
ping(data: PingPayload = {}): Promise<string> {
return new Promise((resolve, reject) => {
if (!this.socket || !this.isConnected) {
reject(new Error("Not connected"));
return;
}
this.socket.emit(
GatewayEvents.PING,
data,
(response: { event: string; data: string }) => {
resolve(response.data);
}
);
});
}
/** 注册连接回调 */
onConnect(callback: (socketId: string) => void): this {
this.callbacks.onConnect = callback;
return this;
}
/** 注册断开回调 */
onDisconnect(callback: (reason: string) => void): this {
this.callbacks.onDisconnect = callback;
return this;
}
/** 注册成功回调 */
onRegistered(callback: (deviceId: string) => void): this {
this.callbacks.onRegistered = callback;
return this;
}
/** 注册消息回调 */
onMessage(callback: (message: RoutedMessage) => void): this {
this.callbacks.onMessage = callback;
return this;
}
/** 注册发送失败回调 */
onSendError(callback: (error: SendErrorResponse) => void): this {
this.callbacks.onSendError = callback;
return this;
}
/** 注册 pong 回调 */
onPong(callback: (data: string) => void): this {
this.callbacks.onPong = callback;
return this;
}
/** 注册错误回调 */
onError(callback: (error: Error) => void): this {
this.callbacks.onError = callback;
return this;
}
/** 注册状态变化回调 */
onStateChange(callback: (state: ConnectionState) => void): this {
this.callbacks.onStateChange = callback;
return this;
}
private setState(state: ConnectionState): void {
if (this._state !== state) {
this._state = state;
this.callbacks.onStateChange?.(state);
}
}
private generateMessageId(): string {
return uuidv7();
}
private register(): void {
if (!this.socket) return;
this.socket.emit(GatewayEvents.REGISTER, {
deviceId: this.options.deviceId,
deviceType: this.options.deviceType,
metadata: this.options.metadata,
});
}
private setupListeners(): void {
if (!this.socket) return;
this.socket.on("connect", () => {
this.setState("connected");
this.callbacks.onConnect?.(this.socket!.id!);
// 连接后自动注册
this.register();
});
this.socket.on("disconnect", (reason: string) => {
this.setState("disconnected");
this.callbacks.onDisconnect?.(reason);
});
this.socket.on(
GatewayEvents.REGISTERED,
(response: RegisteredResponse) => {
if (response.success) {
this.setState("registered");
this.callbacks.onRegistered?.(response.deviceId);
} else {
this.callbacks.onError?.(new Error(response.error ?? "Registration failed"));
}
}
);
this.socket.on(GatewayEvents.RECEIVE, (message: RoutedMessage) => {
this.callbacks.onMessage?.(message);
});
this.socket.on(GatewayEvents.SEND_ERROR, (error: SendErrorResponse) => {
this.callbacks.onSendError?.(error);
});
this.socket.on(GatewayEvents.PONG, (data: string) => {
this.callbacks.onPong?.(data);
});
this.socket.on("connect_error", (error: Error) => {
this.callbacks.onError?.(error);
});
}
}

View file

@ -0,0 +1,18 @@
export { GatewayClient } from "./client.js";
export {
GatewayEvents,
type DeviceType,
type DeviceInfo,
type RegisterPayload,
type RegisteredResponse,
type RoutedMessage,
type SendErrorResponse,
type GatewayClientOptions,
type GatewayClientCallbacks,
type ConnectionState,
type PingPayload,
type PongResponse,
} from "./types.js";
// Actions
export * from "./actions/index.js";

View file

@ -0,0 +1,132 @@
/** WebSocket 事件名称 */
export const GatewayEvents = {
// 系统事件
PING: "ping",
PONG: "pong",
REGISTER: "register",
REGISTERED: "registered",
// 消息路由
SEND: "send",
RECEIVE: "receive",
SEND_ERROR: "send_error",
} as const;
// ============ 设备相关 ============
/** 设备类型 */
export type DeviceType = "client" | "agent";
/** 设备信息 */
export interface DeviceInfo {
deviceId: string;
deviceType: DeviceType;
metadata?: Record<string, unknown> | undefined;
}
/** 注册请求 */
export interface RegisterPayload {
deviceId: string;
deviceType: DeviceType;
metadata?: Record<string, unknown>;
}
/** 注册响应 */
export interface RegisteredResponse {
success: boolean;
deviceId: string;
error?: string;
}
// ============ 消息路由 ============
/** 路由消息 */
export interface RoutedMessage<T = unknown> {
/** 消息唯一ID */
id: string;
/** 用户ID登录后填充 */
uid: string | null;
/** 发送者 deviceId */
from: string;
/** 接收者 deviceId */
to: string;
/** 动作类型 */
action: string;
/** 消息内容 */
payload: T;
/** 时间戳 */
timestamp: string;
}
/** 发送失败响应 */
export interface SendErrorResponse {
messageId: string;
error: string;
code: "DEVICE_NOT_FOUND" | "NOT_REGISTERED" | "INVALID_MESSAGE";
}
// ============ Ping/Pong ============
/** Ping 请求 */
export interface PingPayload {
[key: string]: unknown;
}
/** Ping 响应 */
export interface PongResponse {
event: string;
data: string;
}
// ============ 客户端配置 ============
/** 连接配置 */
export interface GatewayClientOptions {
/** 服务器地址,如 http://localhost:3000 */
url: string;
/** WebSocket 路径,默认 /ws */
path?: string | undefined;
/** 设备ID */
deviceId: string;
/** 设备类型 */
deviceType: DeviceType;
/** 设备元数据 */
metadata?: Record<string, unknown> | undefined;
/** 自动重连,默认 true */
autoReconnect?: boolean | undefined;
/** 重连延迟(毫秒),默认 1000 */
reconnectDelay?: number | undefined;
}
/** 连接状态 */
export type ConnectionState =
| "disconnected"
| "connecting"
| "connected"
| "registered";
/** 事件回调类型 */
export interface GatewayClientCallbacks {
onConnect?: (socketId: string) => void;
onDisconnect?: (reason: string) => void;
onRegistered?: (deviceId: string) => void;
onMessage?: (message: RoutedMessage) => void;
onSendError?: (error: SendErrorResponse) => void;
onPong?: (data: string) => void;
onError?: (error: Error) => void;
onStateChange?: (state: ConnectionState) => void;
}
// ============ 兼容旧API可删除 ============
/** @deprecated 使用 RoutedMessage */
export interface SendMessagePayload {
text: string;
}
/** @deprecated 使用 RoutedMessage */
export interface BroadcastMessage {
from: string;
text: string;
timestamp: string;
}

View file

@ -1 +1,2 @@
export * from "./types.js";
export * from "./gateway-sdk/index.js";

View file

@ -18,7 +18,9 @@
"moduleDetection": "force",
"skipLibCheck": true,
"esModuleInterop": true,
"resolveJsonModule": true
"resolveJsonModule": true,
"experimentalDecorators": true,
"emitDecoratorMetadata": true
},
"include": ["src"],
"exclude": ["node_modules", "dist"]