WebSocket 模块本身与平台无关,因此你可以通过实现 WebSocketAdapter 接口,集成你自己的库(甚至是原生实现)。该接口要求你实现下表中描述的几个方法:
| 方法 | 说明 |
|---|---|
create | 根据传入参数创建一个 socket 实例 |
bindClientConnect | 绑定客户端连接事件 |
bindClientDisconnect | 绑定客户端断开连接事件(可选) |
bindMessageHandlers | 将收到的消息绑定到对应的消息处理器 |
close | 关闭服务器实例 |
socket.io 包被封装在 IoAdapter 类中。如果你希望增强适配器的基础功能,比如你的技术需求要求能够在多个负载均衡的 Web 服务实例之间广播事件,你可以通过继承 IoAdapter 并重写负责实例化新 socket.io 服务器的方法来实现。首先,我们需要安装相关依赖包。
如果你希望在多个负载均衡实例中使用 socket.io,必须在客户端 socket.io
配置中通过设置 transports: ['websocket']
禁用轮询,或者在负载均衡器中启用基于 Cookie 的路由。仅使用 Redis
并不足够。详情请参见这里。
npm install redis socket.io @socket.io/redis-adapter安装完依赖后,我们可以创建一个 RedisIoAdapter 类。
import { IoAdapter } from '@nestjs/platform-socket.io'
import { ServerOptions } from 'socket.io'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient } from 'redis'
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>
async connectToRedis(): Promise<void> {
const pubClient = createClient({ url: `redis://localhost:6379` })
const subClient = pubClient.duplicate()
await Promise.all([pubClient.connect(), subClient.connect()])
this.adapterConstructor = createAdapter(pubClient, subClient)
}
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options)
server.adapter(this.adapterConstructor)
return server
}
}随后,只需切换为你新创建的 Redis 适配器即可。
const app = await NestFactory.create(AppModule)
const redisIoAdapter = new RedisIoAdapter(app)
await redisIoAdapter.connectToRedis()
app.useWebSocketAdapter(redisIoAdapter)另一个可用的适配器是 WsAdapter,它充当框架与高性能且经过充分测试的 ws 库之间的代理。该适配器与原生浏览器 WebSocket 通信(WebSockets)完全兼容,并且速度远超 socket.io 包。不过,遗憾的是,它开箱即用的功能明显较少。但在某些场景下,你其实并不一定需要这些额外功能。
ws 库不支持命名空间(即由 socket.io 推广的通信通道)。不过,你可以通过在不同路径上挂载多个 ws 服务器来模拟这一特性(示例:@WebSocketGateway({ path: '/users' }))。
要使用 ws,首先需要安装相关依赖包:
npm install @nestjs/platform-ws安装完依赖包后,我们可以切换适配器:
const app = await NestFactory.create(AppModule)
app.useWebSocketAdapter(new WsAdapter(app))WsAdapter 需从 @nestjs/platform-ws 导入。
wsAdapter 设计用于处理 {{ '{' }} event: string, data: any {{ '}' }} 格式的消息。如果你需要接收和处理其他格式的消息,则需要配置消息解析器,将其转换为上述格式。
const wsAdapter = new WsAdapter(app, {
// 用于处理 [event, data] 格式的消息
messageParser: (data) => {
const [event, payload] = JSON.parse(data.toString())
return { event, data: payload }
},
})另外,你也可以在创建适配器后,通过 setMessageParser 方法配置消息解析器。
为了演示,我们将手动集成 ws 库。正如前文所述,该库的适配器已经实现,并通过 @nestjs/platform-ws 包中的 WsAdapter 类对外暴露。下面是一个简化实现的示例:
filename = 'ws-adapter.ts'
import * as WebSocket from 'ws'
import { WebSocketAdapter, INestApplicationContext } from '@nestjs/common'
import { MessageMappingProperties } from '@nestjs/websockets'
import { Observable, fromEvent, EMPTY } from 'rxjs'
import { mergeMap, filter } from 'rxjs/operators'
export class WsAdapter implements WebSocketAdapter {
constructor(private app: INestApplicationContext) {}
create(port: number, options: any = {}): any {
return new WebSocket.Server({ port, ...options })
}
bindClientConnect(server, callback: Function) {
server.on('connection', callback)
}
bindMessageHandlers(
client: WebSocket,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>
) {
fromEvent(client, 'message')
.pipe(
mergeMap((data) => this.bindMessageHandler(data, handlers, process)),
filter((result) => result)
)
.subscribe((response) => client.send(JSON.stringify(response)))
}
bindMessageHandler(
buffer,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>
): Observable<any> {
const message = JSON.parse(buffer.data)
const messageHandler = handlers.find(
(handler) => handler.message === message.event
)
if (!messageHandler) {
return EMPTY
}
return process(messageHandler.callback(message.data))
}
close(server) {
server.close()
}
}如果你希望充分利用 ws
库,建议直接使用内置的 WsAdapter,而不是自行实现适配器。
接下来,我们可以通过 useWebSocketAdapter() 方法设置自定义适配器:
const app = await NestFactory.create(AppModule)
app.useWebSocketAdapter(new WsAdapter(app))一个使用 WsAdapter 的完整示例可在此处查看。