NestJS Logo
NestJS 中文文档
v10.0.0
  • 介绍
  • 快速上手
  • 控制器
  • 提供者
  • 模块
  • 中间件
  • 异常过滤器
  • 管道
  • 守卫
  • 拦截器
  • 自定义装饰器
  • 自定义提供者
  • 异步提供者
  • 动态模块
  • 依赖注入作用域
  • 循环依赖
  • 模块引用
  • 懒加载模块
  • 执行上下文
  • 生命周期事件
  • 发现服务
  • 跨平台无关性
  • 测试
迁移指南
API 参考
官方课程
  1. 文档
  2. 微服务架构
  3. NATS

MQTT
RabbitMQ

NATS

NATS 是一个简单、安全且高性能的开源消息系统,适用于云原生应用、物联网(IoT)消息传递以及微服务架构。NATS 服务器由 Go 编程语言编写,但与服务器交互的客户端库已覆盖数十种主流编程语言。NATS 支持最多一次(At Most Once) 和至少一次(At Least Once) 的消息投递。它可以运行在各种环境中,包括大型服务器、云实例、边缘网关,甚至物联网设备。

安装

要开始构建基于 NATS 的微服务,请首先安装所需的包:

npm install nats

概述

要使用 NATS 作为传输层(Transport Layer),请将如下 options 对象传递给 createMicroservice() 方法:

main.ts
import { Transport } from '@nestjs/microservices'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.NATS,
    options: {
      servers: ['nats://localhost:4222'],
    },
  }
)

选项

options 对象是针对所选传输层特定的配置。NATS 传输层支持此处描述的属性,以及下列额外属性:

选项说明
queue服务器应订阅的队列(如设置为 undefined 则忽略此配置)。关于 NATS 队列组(queue groups),可阅读下文。
gracefulShutdown是否启用优雅关闭(graceful shutdown)。启用后,服务器会在关闭连接前先取消所有频道的订阅。默认值为 false。
gracePeriod在取消所有频道订阅后,服务器等待的时间(毫秒)。默认值为 10000 毫秒。

客户端

与其他微服务传输层类似,你有多种方式来创建 NATS ClientProxy 实例。

其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要导入该模块,并使用 register() 方法传入包含上述与 createMicroservice() 方法相同属性的配置对象,同时还需指定一个 `name

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.NATS,
        options: {
          servers: ['nats://localhost:4222'],
        }
      },
    ]),
  ]
  ...
})

你也可以选择其他方式来创建客户端(如 ClientProxyFactory 或 @Client() 装饰器)。相关内容可在此查阅。

请求-响应

对于请求-响应(request-response)消息风格(详细说明),NATS 传输器(NATS transporter)并未使用 NATS 内置的请求-回复(Request-Reply) 机制。相反,「请求」会通过 publish() 方法在指定主题(subject)上发布,并带有唯一的回复主题名称,响应方会监听该主题并将响应发送到该回复主题。无论请求方和响应方位于何处,回复主题都会动态地指向请求方。

事件驱动

对于事件驱动(event-based)消息风格(详细说明),NATS 传输器会使用 NATS 内置的发布-订阅(Publish-Subscribe)机制。发布者会在某个主题上发送消息,任何正在监听该主题的活跃订阅者都会收到该消息。订阅者还可以注册通配符主题(wildcard subjects),其工作方式类似于正则表达式。这种一对多的模式有时也被称为「扇出(fan-out)」。

队列组

NATS 提供了一种内置的负载均衡功能,称为分布式队列(distributed queues)。要创建队列订阅,可以如下使用 queue 属性:

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.NATS,
    options: {
      servers: ['nats://localhost:4222'],
      queue: 'cats_queue',
    },
  }
)

上下文

在更复杂的场景中,你可能需要访问有关传入请求的更多信息。使用 NATS 传输器时,可以访问 NatsContext 对象。

import { NatsContext, Payload, Ctx } from '@nestjs/microservices'

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`)
}

通配符

订阅可以是一个明确的主题(subject),也可以包含通配符。

@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`) // 例如 "time.us.east"
  return new Date().toLocaleTimeString(...)
}

消息记录构建器(Record builders)

要配置消息选项,可以使用 NatsRecordBuilder 类(注意:事件驱动流程同样适用)。例如,若要添加 x-version 请求头(Header),可以使用 setHeaders 方法,如下所示:

import { NatsRecordBuilder } from '@nestjs/microservices'
import * as nats from 'nats'

// 代码中的某处
const headers = nats.headers()
headers.set('x-version', '1.0.0')

const record = new NatsRecordBuilder(':cat:').setHeaders(headers).build()
this.client.send('replace-emoji', record).subscribe(...)

你也可以在服务端通过访问 NatsContext 读取这些请求头,如下所示:

@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: NatsContext): string {
  const headers = context.getHeaders()
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈'
}

在某些情况下,你可能希望为多个请求统一配置请求头,可以在创建 ClientProxyFactory 时通过 options 传递 headers:

import { Module } from '@nestjs/common'
import { ClientProxyFactory, Transport } from '@nestjs/microservices'

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.NATS,
          options: {
            servers: ['nats://localhost:4222'],
            headers: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}

实例状态更新

要实时获取连接状态以及底层驱动实例的状态更新,可以订阅 status 流。该流会根据所选驱动提供特定的状态更新。以 NATS 驱动为例,status 流会发出 connected、disconnected 和 reconnecting 等事件。

import { NatsStatus } from '@nestjs/microservices'

this.client.status.subscribe((status: NatsStatus) => {
  console.log(status)
})

同样地,你也可以订阅服务器的 status 流,以接收关于服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: NatsStatus) => {
  console.log(status)
})

监听 Nats 事件

在某些场景下,你可能希望监听微服务内部发出的事件。例如,可以监听 error 事件,在发生错误时触发额外操作。要实现这一点,可以使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err)
})

同样地,也可以监听服务器的内部事件:

import { NatsEvents } from '@nestjs/microservices'

server.on<NatsEvents>('error', (err) => {
  console.error(err)
})

访问底层驱动实例

对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或调用驱动特有方法等场景下非常有用。但请注意,在大多数情况下,你无需直接操作驱动实例。

如需访问底层驱动实例,可以使用 unwrap() 方法。该方法会返回底层驱动实例,泛型类型参数应指定你期望的驱动实例类型。

const natsConnection = this.client.unwrap<import('nats').NatsConnection>()

同样地,也可以访问服务器的底层驱动实例:

const natsConnection = server.unwrap<import('nats').NatsConnection>()