NestJS Logo
NestJS 中文文档
v10.0.0
  • 介绍
  • 快速上手
  • 控制器
  • 提供者
  • 模块
  • 中间件
  • 异常过滤器
  • 管道
  • 守卫
  • 拦截器
  • 自定义装饰器
  • 自定义提供者
  • 异步提供者
  • 动态模块
  • 依赖注入作用域
  • 循环依赖
  • 模块引用
  • 懒加载模块
  • 执行上下文
  • 生命周期事件
  • 发现服务
  • 跨平台无关性
  • 测试
迁移指南
API 参考
官方课程
  1. 文档
  2. 微服务架构
  3. MQTT

Redis
NATS

MQTT(消息队列遥测传输协议)

MQTT 是一种开源、轻量级的消息协议,专为低延迟场景优化。该协议通过**发布/订阅(publish/subscribe)**模型,为设备间连接提供了可扩展且高性价比的解决方案。基于 MQTT 的通信系统通常由发布服务器、代理(Broker)以及一个或多个客户端组成。MQTT 专为受限设备和低带宽、高延迟或不稳定网络环境设计。

安装

要开始构建基于 MQTT 的微服务,请先安装所需依赖包:

npm install mqtt

概述

要使用 MQTT 传输层,请将如下 options 对象传递给 createMicroservice() 方法:

main.ts
import { Transport } from '@nestjs/core'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://localhost:1883',
    },
  }
)

配置项

options 对象是针对所选传输层定制的。MQTT 传输层支持的属性详见此处。

客户端(Client)

与其他微服务传输器类似,你有多种方式可以创建 MQTT ClientProxy 实例,详见官方文档。

其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要先导入该模块,并使用 register() 方法传入一个配置对象。该对象的属性与上文 createMicroservice() 方法中展示的属性一致,并且需要额外指定一个 name 属性作为注入令牌。详细内容可参考这里。

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://localhost:1883',
        }
      },
    ]),
  ]
  ...
})

你也可以选择其他方式来创建客户端,例如使用 ClientProxyFactory 或 @Client() 装饰器。相关内容可在这里查阅。

上下文(Context)

在更复杂的场景下,你可能需要访问有关传入请求的更多信息。使用 MQTT 传输器时,可以通过 MqttContext 对象获取这些信息。

import { MqttContext, Payload, Ctx } from '@nestjs/microservices'

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`)
}

如果你需要访问原始的 mqtt 数据包(packet),可以通过 MqttContext 对象的 getPacket() 方法实现,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(context.getPacket())
}

通配符(Wildcards)

订阅可以是某个明确的主题(topic),也可以包含通配符。MQTT 协议中有两种通配符:+ 和 #。其中,+ 是单层通配符,表示匹配单个主题层级;而 # 是多层通配符,可以匹配多个主题层级。

@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`)
}

服务质量(Quality of Service,QoS)

通过 @MessagePattern 或 @EventPattern 装饰器创建的任何订阅,默认使用 QoS 0。如果需要更高的 QoS,可以在建立连接时通过 subscribeOptions 全局设置,如下所示:

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://localhost:1883',
      subscribeOptions: {
        qos: 2,
      },
    },
  }
)

如果需要针对某个主题设置特定的 QoS,建议实现自定义传输器。

消息记录构建器(Record builders)

要配置消息选项(如调整 QoS 等级、设置 Retain 或 DUP 标志,或为负载添加额外属性),可以使用 MqttRecordBuilder 类。例如,要将 QoS 设置为 2,可以使用 setQoS 方法,如下所示:

import { MqttRecordBuilder } from '@nestjs/microservices'

const userProperties = { 'x-version': '1.0.0' }
const record = new MqttRecordBuilder(':cat:')
  .setProperties({ userProperties })
  .setQoS(1)
  .build()
client.send('replace-emoji', record).subscribe(...)

在服务端,也可以通过访问 MqttContext 读取这些选项。

@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
  const { properties: { userProperties } } = context.getPacket()
  return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈'
}

在某些场景下,如果你希望为多个请求统一配置用户属性,可以将这些选项传递给 ClientProxyFactory。

import { Module } from '@nestjs/common'
import { ClientProxyFactory, Transport } from '@nestjs/microservices'

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1833',
            userProperties: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}

实例状态更新

要实时获取连接和底层驱动实例的状态更新,可以订阅 status 流。该流会提供所选驱动专属的状态更新。以 MQTT 驱动为例,status 流会发出 connected、disconnected、reconnecting 和 closed 事件。

import { MqttStatus } from '@nestjs/microservices'

this.client.status.subscribe((status: MqttStatus) => {
  console.log(status)
})

同样,你也可以订阅服务器的 status 流,以接收服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: MqttStatus) => {
  console.log(status)
})

监听 MQTT 事件

在某些情况下,你可能希望监听微服务内部发出的事件。例如,可以监听 error 事件,在发生错误时触发额外操作。要实现这一点,可以使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err)
})

同样,你也可以监听服务器的内部事件:

import { MqttEvents } from '@nestjs/microservices'

server.on<MqttEvents>('error', (err) => {
  console.error(err)
})

访问底层驱动实例

对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或调用驱动专有方法等场景下非常有用。但请注意,在大多数情况下,你无需直接访问驱动。

如需访问底层驱动实例,可以使用 unwrap() 方法。该方法会返回底层驱动实例,泛型类型参数应指定你期望的驱动实例类型。

const mqttClient = this.client.unwrap<import('mqtt').MqttClient>()

同样,你也可以访问服务器的底层驱动实例:

const mqttClient = server.unwrap<import('mqtt').MqttClient>()