Nest 提供了多种传输器,并且还为开发者提供了 API,可以构建全新的自定义传输层策略。 传输器让你能够通过可插拔的通信层和极其简单的应用层消息协议(message protocol),在网络中连接各个组件。你可以阅读完整文章。
使用 Nest 构建微服务并不一定要用 @nestjs/microservices
包。例如,如果你需要与外部服务(比如用其他语言编写的微服务)通信,可能并不需要
@nestjs/microservices 提供的全部功能。 > 实际上,如果你不需要用装饰器(如
@EventPattern 或
@MessagePattern)来声明式地定义订阅者(Subscriber),那么只需运行一个独立应用(Standalone
Application),并手动维护连接/订阅通道(Channel),对于大多数场景来说就足够了,并且会带来更高的灵活性。
通过自定义传输器,你可以集成任何消息系统/协议(比如 Google Cloud Pub/Sub、Amazon Kinesis 等),或者扩展现有传输器,在其基础上增加额外功能(例如为 MQTT 增加服务质量(QoS))。
如果你想深入了解 Nest 微服务的工作原理,以及如何扩展现有传输器的能力,推荐阅读 NestJS Microservices in Action 和 Advanced NestJS Microservices 系列文章。
首先,我们来定义一个代表自定义传输器(Transporter)的类。
import { CustomTransportStrategy, Server } from '@nestjs/microservices'
class GoogleCloudPubSubServer
extends Server
implements CustomTransportStrategy
{
/**
* 当你调用 "app.listen()" 时会触发。
*/
listen(callback: () => void) {
callback()
}
/**
* 应用关闭时触发。
*/
close() {}
/**
* 如果你不希望传输器用户能够注册事件监听器,可以忽略此方法。大多数自定义实现都不需要。
*/
on(event: string, callback: Function) {
throw new Error('Method not implemented.')
}
/**
* 如果你不希望传输器用户能够获取底层原生服务器,可以忽略此方法。大多数自定义实现都不需要。
*/
unwrap<T = never>(): T {
throw new Error('Method not implemented.')
}
}本章节不会实现一个完整功能的 Google Cloud Pub/Sub 服务器,因为这需要深入传输器的具体技术细节。
在上面的示例中,我们声明了 GoogleCloudPubSubServer 类,并实现了 CustomTransportStrategy(自定义传输策略) 接口所要求的 listen() 和 close() 方法。
此外,我们的类继承自 @nestjs/microservices 包中的 Server(服务器) 类,该类提供了一些实用方法,例如 Nest 运行时用于注册消息处理器的方法。你也可以选择扩展现有传输策略对应的服务器类,例如 ServerRedis,以增强其功能。
按照惯例,我们为类名添加了 "Server" 后缀,因为它负责订阅消息/事件(如有需要,也负责响应这些消息/事件)。
有了这些准备后,我们就可以像下面这样,使用自定义策略来替代内置传输器:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new GoogleCloudPubSubServer(),
}
)本质上,我们不再传递包含 transport 和 options 属性的常规传输器配置对象,而是传递一个名为 strategy 的属性,其值为自定义传输器类的实例。
回到我们的 GoogleCloudPubSubServer 类,在实际应用中,我们通常会在 listen() 方法中建立与消息代理/外部服务的连接,并注册订阅者/监听特定通道(随后在 close() 方法中移除订阅并关闭连接)。
但由于这需要深入理解 Nest 微服务之间的通信方式,我们建议你阅读这篇系列文章。
在本章节中,我们将重点介绍 Server 类所提供的能力,以及如何利用这些能力构建自定义策略。
例如,假设在应用的某处定义了如下消息处理器:
@MessagePattern('echo')
echo(@Payload() data: object) {
return data
}该消息处理器会被 Nest 运行时自动注册。通过 Server 类,你可以查看已注册的所有消息(和事件)处理器,还可以访问并执行分配给它们的实际方法。
为了验证这一点,我们可以在 listen() 方法中,在调用 callback 之前添加一个简单的 console.log:
listen(callback: () => void) {
console.log(this.messageHandlers)
callback()
}当应用重启后,你会在终端看到如下日志:
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }如果我们使用了 @EventPattern 装饰器,你会看到类似的输出,但 isEventHandler
属性会被设置为 true。
如你所见,messageHandlers 属性是一个包含所有消息(和事件)处理器的 Map 集合,消息模式作为键。
现在,你可以通过键(例如 "echo")获取对应的消息处理器引用:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo')
console.log(await echoHandler('Hello world!'))
callback()
}当我们调用 echoHandler 并传入任意字符串参数(此处为 "Hello world!")时,你会在控制台看到:
Hello world!这说明我们的消息处理器已被正确执行。
当你在拦截器中使用 CustomTransportStrategy(自定义传输策略) 时,处理器会被包装为 RxJS 流。这意味着你需要订阅(subscribe)它们,才能执行流的底层逻辑(例如在拦截器执行后继续进入控制器逻辑)。
下面是一个示例:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo')
const streamOrResult = await echoHandler('Hello World')
if (isObservable(streamOrResult)) {
streamOrResult.subscribe()
}
callback()
}正如我们在第一节中提到的,你并不一定需要使用 @nestjs/microservices 包来创建微服务,但如果你决定这样做,并且需要集成自定义策略(custom strategy),你还需要提供一个「客户端」类。
需要注意的是,实现一个与所有 @nestjs/microservices
功能(例如流式处理)兼容的完整客户端类,需要对框架所用的通信技术有较深入的理解。想要了解更多内容,可以参考这篇文章。
要与外部服务通信、发送和发布消息(或事件),你可以选择使用特定库的 SDK 包,或者实现一个继承自 ClientProxy 的自定义客户端类,如下所示:
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices'
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {}
async close() {}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void
): Function {}
unwrap<T = never>(): T {
throw new Error('Method not implemented.')
}
}请注意,本章不会实现一个完整的 Google Cloud Pub/Sub 客户端,因为这需要深入探讨具体的传输层技术细节。
如你所见,ClientProxy 类要求我们实现多个方法,用于建立和关闭连接,以及发布消息(publish)和事件(dispatchEvent)。
需要注意的是,如果你不需要支持请求-响应(request-response)通信风格,可以将 publish() 方法留空。同样地,如果你不需要支持基于事件的通信(event-based communication),可以省略 dispatchEvent() 方法。
为了观察这些方法的执行时机和内容,我们可以像下面这样添加多个 console.log 调用:
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {
console.log('connect')
}
async close() {
console.log('close')
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
return console.log('event to dispatch: ', packet)
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void
): Function {
console.log('message:', packet)
// 在实际应用中,应该由响应方返回数据后执行 `callback` 回调。
// 这里我们仅做模拟(延迟 5 秒),假设响应已返回,并传递原始 `data`。
// 在 WritePacket 中,`isDisposed` 布尔值表示响应方不再期望收到更多数据。如果未发送或为 `false`,则只会将数据发送到可观察流。
setTimeout(
() =>
callback({
response: packet.data,
isDisposed: true,
}),
5000
)
return () => console.log('teardown')
}
unwrap<T = never>(): T {
throw new Error('Method not implemented.')
}
}有了上述实现后,我们可以创建 GoogleCloudPubSubClient 类的实例,并运行 send() 方法(你可能在前面的章节见过),并订阅返回的可观察流(observable stream)。
const googlePubSubClient = new GoogleCloudPubSubClient()
googlePubSubClient
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response))此时,你应该会在终端看到如下输出:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- 5 秒后输出为了测试我们的 "teardown" 方法(即 publish() 方法返回的函数)是否被正确执行,我们可以为流添加一个超时操作符(timeout operator),将超时时间设置为 2 秒,以确保它会在 setTimeout 调用 callback 之前抛出异常。
import { timeout } from 'rxjs/operators'
const googlePubSubClient = new GoogleCloudPubSubClient()
googlePubSubClient
.send('pattern', 'Hello world!')
.pipe(timeout(2000))
.subscribe(
(response) => console.log(response),
(error) => console.error(error.message)
)应用 timeout 操作符后,你的终端输出应如下所示:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred如果你想分发事件(而不是发送消息),可以使用 emit() 方法:
googlePubSubClient.emit('event', 'Hello world!')此时,控制台输出如下:
connect
event to dispatch: { pattern: 'event', data: 'Hello world!' }如果你需要在客户端响应的序列化过程中添加自定义逻辑,可以自定义一个继承自 ClientProxy 类或其子类的类。若要修改成功请求的响应内容,可以重写 serializeResponse 方法;若要处理通过该客户端的所有错误,则可以重写 serializeError 方法。要使用这个自定义类,只需通过 customClass 属性将该类本身传递给 ClientsModule.register() 方法即可。下面是一个自定义 ClientProxy 的示例,它会将每个错误序列化为 RpcException(RPC 异常)。
import { ClientTcp, RpcException } from '@nestjs/microservices'
class ErrorHandlingProxy extends ClientTCP {
serializeError(err: Error) {
return new RpcException(err)
}
}然后在 ClientsModule 中这样使用:
@Module({
imports: [
ClientsModule.register([{
name: 'CustomProxy',
customClass: ErrorHandlingProxy,
}]),
]
})
export class AppModule这里传递给 customClass 的是类本身,而不是类的实例。Nest
会在底层自动为你创建实例,并且会将传递给 options 属性的任何选项传递给新的
ClientProxy。