NestJS Logo
NestJS 中文文档
v10.0.0
  • 介绍
  • 快速上手
  • 控制器
  • 提供者
  • 模块
  • 中间件
  • 异常过滤器
  • 管道
  • 守卫
  • 拦截器
  • 自定义装饰器
  • 自定义提供者
  • 异步提供者
  • 动态模块
  • 依赖注入作用域
  • 循环依赖
  • 模块引用
  • 懒加载模块
  • 执行上下文
  • 生命周期事件
  • 发现服务
  • 跨平台无关性
  • 测试
迁移指南
API 参考
官方课程
  1. 文档
  2. 微服务架构
  3. 自定义传输器

gRPC
异常过滤器

自定义传输器(Custom transporters)

Nest 提供了多种传输器,并且还为开发者提供了 API,可以构建全新的自定义传输层策略。 传输器让你能够通过可插拔的通信层和极其简单的应用层消息协议(message protocol),在网络中连接各个组件。你可以阅读完整文章。

提示

使用 Nest 构建微服务并不一定要用 @nestjs/microservices 包。例如,如果你需要与外部服务(比如用其他语言编写的微服务)通信,可能并不需要 @nestjs/microservices 提供的全部功能。 > 实际上,如果你不需要用装饰器(如 @EventPattern 或 @MessagePattern)来声明式地定义订阅者(Subscriber),那么只需运行一个独立应用(Standalone Application),并手动维护连接/订阅通道(Channel),对于大多数场景来说就足够了,并且会带来更高的灵活性。

通过自定义传输器,你可以集成任何消息系统/协议(比如 Google Cloud Pub/Sub、Amazon Kinesis 等),或者扩展现有传输器,在其基础上增加额外功能(例如为 MQTT 增加服务质量(QoS))。

提示

如果你想深入了解 Nest 微服务的工作原理,以及如何扩展现有传输器的能力,推荐阅读 NestJS Microservices in Action 和 Advanced NestJS Microservices 系列文章。

创建自定义策略

首先,我们来定义一个代表自定义传输器(Transporter)的类。

import { CustomTransportStrategy, Server } from '@nestjs/microservices'

class GoogleCloudPubSubServer
  extends Server
  implements CustomTransportStrategy
{
  /**
   * 当你调用 "app.listen()" 时会触发。
   */
  listen(callback: () => void) {
    callback()
  }

  /**
   * 应用关闭时触发。
   */
  close() {}

  /**
   * 如果你不希望传输器用户能够注册事件监听器,可以忽略此方法。大多数自定义实现都不需要。
   */
  on(event: string, callback: Function) {
    throw new Error('Method not implemented.')
  }

  /**
   * 如果你不希望传输器用户能够获取底层原生服务器,可以忽略此方法。大多数自定义实现都不需要。
   */
  unwrap<T = never>(): T {
    throw new Error('Method not implemented.')
  }
}
注意

本章节不会实现一个完整功能的 Google Cloud Pub/Sub 服务器,因为这需要深入传输器的具体技术细节。

在上面的示例中,我们声明了 GoogleCloudPubSubServer 类,并实现了 CustomTransportStrategy(自定义传输策略) 接口所要求的 listen() 和 close() 方法。 此外,我们的类继承自 @nestjs/microservices 包中的 Server(服务器) 类,该类提供了一些实用方法,例如 Nest 运行时用于注册消息处理器的方法。你也可以选择扩展现有传输策略对应的服务器类,例如 ServerRedis,以增强其功能。 按照惯例,我们为类名添加了 "Server" 后缀,因为它负责订阅消息/事件(如有需要,也负责响应这些消息/事件)。

有了这些准备后,我们就可以像下面这样,使用自定义策略来替代内置传输器:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    strategy: new GoogleCloudPubSubServer(),
  }
)

本质上,我们不再传递包含 transport 和 options 属性的常规传输器配置对象,而是传递一个名为 strategy 的属性,其值为自定义传输器类的实例。

回到我们的 GoogleCloudPubSubServer 类,在实际应用中,我们通常会在 listen() 方法中建立与消息代理/外部服务的连接,并注册订阅者/监听特定通道(随后在 close() 方法中移除订阅并关闭连接)。 但由于这需要深入理解 Nest 微服务之间的通信方式,我们建议你阅读这篇系列文章。 在本章节中,我们将重点介绍 Server 类所提供的能力,以及如何利用这些能力构建自定义策略。

例如,假设在应用的某处定义了如下消息处理器:

@MessagePattern('echo')
echo(@Payload() data: object) {
  return data
}

该消息处理器会被 Nest 运行时自动注册。通过 Server 类,你可以查看已注册的所有消息(和事件)处理器,还可以访问并执行分配给它们的实际方法。 为了验证这一点,我们可以在 listen() 方法中,在调用 callback 之前添加一个简单的 console.log:

listen(callback: () => void) {
  console.log(this.messageHandlers)
  callback()
}

当应用重启后,你会在终端看到如下日志:

Map { 'echo' => [AsyncFunction] { isEventHandler: false } }
提示

如果我们使用了 @EventPattern 装饰器,你会看到类似的输出,但 isEventHandler 属性会被设置为 true。

如你所见,messageHandlers 属性是一个包含所有消息(和事件)处理器的 Map 集合,消息模式作为键。 现在,你可以通过键(例如 "echo")获取对应的消息处理器引用:

async listen(callback: () => void) {
  const echoHandler = this.messageHandlers.get('echo')
  console.log(await echoHandler('Hello world!'))
  callback()
}

当我们调用 echoHandler 并传入任意字符串参数(此处为 "Hello world!")时,你会在控制台看到:

Hello world!

这说明我们的消息处理器已被正确执行。

当你在拦截器中使用 CustomTransportStrategy(自定义传输策略) 时,处理器会被包装为 RxJS 流。这意味着你需要订阅(subscribe)它们,才能执行流的底层逻辑(例如在拦截器执行后继续进入控制器逻辑)。

下面是一个示例:

async listen(callback: () => void) {
  const echoHandler = this.messageHandlers.get('echo')
  const streamOrResult = await echoHandler('Hello World')
  if (isObservable(streamOrResult)) {
    streamOrResult.subscribe()
  }
  callback()
}

客户端代理(Client proxy)

正如我们在第一节中提到的,你并不一定需要使用 @nestjs/microservices 包来创建微服务,但如果你决定这样做,并且需要集成自定义策略(custom strategy),你还需要提供一个「客户端」类。

提示

需要注意的是,实现一个与所有 @nestjs/microservices 功能(例如流式处理)兼容的完整客户端类,需要对框架所用的通信技术有较深入的理解。想要了解更多内容,可以参考这篇文章。

要与外部服务通信、发送和发布消息(或事件),你可以选择使用特定库的 SDK 包,或者实现一个继承自 ClientProxy 的自定义客户端类,如下所示:

import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices'

class GoogleCloudPubSubClient extends ClientProxy {
  async connect(): Promise<any> {}
  async close() {}
  async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
  publish(
    packet: ReadPacket<any>,
    callback: (packet: WritePacket<any>) => void
  ): Function {}
  unwrap<T = never>(): T {
    throw new Error('Method not implemented.')
  }
}
注意

请注意,本章不会实现一个完整的 Google Cloud Pub/Sub 客户端,因为这需要深入探讨具体的传输层技术细节。

如你所见,ClientProxy 类要求我们实现多个方法,用于建立和关闭连接,以及发布消息(publish)和事件(dispatchEvent)。 需要注意的是,如果你不需要支持请求-响应(request-response)通信风格,可以将 publish() 方法留空。同样地,如果你不需要支持基于事件的通信(event-based communication),可以省略 dispatchEvent() 方法。

为了观察这些方法的执行时机和内容,我们可以像下面这样添加多个 console.log 调用:

class GoogleCloudPubSubClient extends ClientProxy {
  async connect(): Promise<any> {
    console.log('connect')
  }

  async close() {
    console.log('close')
  }

  async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
    return console.log('event to dispatch: ', packet)
  }

  publish(
    packet: ReadPacket<any>,
    callback: (packet: WritePacket<any>) => void
  ): Function {
    console.log('message:', packet)

    // 在实际应用中,应该由响应方返回数据后执行 `callback` 回调。
    // 这里我们仅做模拟(延迟 5 秒),假设响应已返回,并传递原始 `data`。
    // 在 WritePacket 中,`isDisposed` 布尔值表示响应方不再期望收到更多数据。如果未发送或为 `false`,则只会将数据发送到可观察流。
    setTimeout(
      () =>
        callback({
          response: packet.data,
          isDisposed: true,
        }),
      5000
    )

    return () => console.log('teardown')
  }

  unwrap<T = never>(): T {
    throw new Error('Method not implemented.')
  }
}

有了上述实现后,我们可以创建 GoogleCloudPubSubClient 类的实例,并运行 send() 方法(你可能在前面的章节见过),并订阅返回的可观察流(observable stream)。

const googlePubSubClient = new GoogleCloudPubSubClient()
googlePubSubClient
  .send('pattern', 'Hello world!')
  .subscribe((response) => console.log(response))

此时,你应该会在终端看到如下输出:

connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- 5 秒后输出

为了测试我们的 "teardown" 方法(即 publish() 方法返回的函数)是否被正确执行,我们可以为流添加一个超时操作符(timeout operator),将超时时间设置为 2 秒,以确保它会在 setTimeout 调用 callback 之前抛出异常。

import { timeout } from 'rxjs/operators'

const googlePubSubClient = new GoogleCloudPubSubClient()
googlePubSubClient
  .send('pattern', 'Hello world!')
  .pipe(timeout(2000))
  .subscribe(
    (response) => console.log(response),
    (error) => console.error(error.message)
  )

应用 timeout 操作符后,你的终端输出应如下所示:

connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred

如果你想分发事件(而不是发送消息),可以使用 emit() 方法:

googlePubSubClient.emit('event', 'Hello world!')

此时,控制台输出如下:

connect
event to dispatch:  { pattern: 'event', data: 'Hello world!' }

消息序列化(Message serialization)

如果你需要在客户端响应的序列化过程中添加自定义逻辑,可以自定义一个继承自 ClientProxy 类或其子类的类。若要修改成功请求的响应内容,可以重写 serializeResponse 方法;若要处理通过该客户端的所有错误,则可以重写 serializeError 方法。要使用这个自定义类,只需通过 customClass 属性将该类本身传递给 ClientsModule.register() 方法即可。下面是一个自定义 ClientProxy 的示例,它会将每个错误序列化为 RpcException(RPC 异常)。

error-handling.proxy.ts
import { ClientTcp, RpcException } from '@nestjs/microservices'

class ErrorHandlingProxy extends ClientTCP {
  serializeError(err: Error) {
    return new RpcException(err)
  }
}

然后在 ClientsModule 中这样使用:

app.module.ts
@Module({
  imports: [
    ClientsModule.register([{
      name: 'CustomProxy',
      customClass: ErrorHandlingProxy,
    }]),
  ]
})
export class AppModule
提示

这里传递给 customClass 的是类本身,而不是类的实例。Nest 会在底层自动为你创建实例,并且会将传递给 options 属性的任何选项传递给新的 ClientProxy。