MQTT 是一种开源、轻量级的消息协议,专为低延迟场景优化。该协议通过**发布/订阅(publish/subscribe)**模型,为设备间连接提供了可扩展且高性价比的解决方案。基于 MQTT 的通信系统通常由发布服务器、代理(Broker)以及一个或多个客户端组成。MQTT 专为受限设备和低带宽、高延迟或不稳定网络环境设计。
要开始构建基于 MQTT 的微服务,请先安装所需依赖包:
npm install mqtt要使用 MQTT 传输层,请将如下 options 对象传递给 createMicroservice() 方法:
import { Transport } from '@nestjs/core'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
}
)options 对象是针对所选传输层定制的。MQTT 传输层支持的属性详见此处。
与其他微服务传输器类似,你有多种方式可以创建 MQTT ClientProxy 实例,详见官方文档。
其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要先导入该模块,并使用 register() 方法传入一个配置对象。该对象的属性与上文 createMicroservice() 方法中展示的属性一致,并且需要额外指定一个 name 属性作为注入令牌。详细内容可参考这里。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
}
},
]),
]
...
})你也可以选择其他方式来创建客户端,例如使用 ClientProxyFactory 或 @Client() 装饰器。相关内容可在这里查阅。
在更复杂的场景下,你可能需要访问有关传入请求的更多信息。使用 MQTT 传输器时,可以通过 MqttContext 对象获取这些信息。
import { MqttContext, Payload, Ctx } from '@nestjs/microservices'
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`)
}如果你需要访问原始的 mqtt 数据包(packet),可以通过 MqttContext 对象的 getPacket() 方法实现,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(context.getPacket())
}订阅可以是某个明确的主题(topic),也可以包含通配符。MQTT 协议中有两种通配符:+ 和 #。其中,+ 是单层通配符,表示匹配单个主题层级;而 # 是多层通配符,可以匹配多个主题层级。
@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`)
}通过 @MessagePattern 或 @EventPattern 装饰器创建的任何订阅,默认使用 QoS 0。如果需要更高的 QoS,可以在建立连接时通过 subscribeOptions 全局设置,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
subscribeOptions: {
qos: 2,
},
},
}
)如果需要针对某个主题设置特定的 QoS,建议实现自定义传输器。
要配置消息选项(如调整 QoS 等级、设置 Retain 或 DUP 标志,或为负载添加额外属性),可以使用 MqttRecordBuilder 类。例如,要将 QoS 设置为 2,可以使用 setQoS 方法,如下所示:
import { MqttRecordBuilder } from '@nestjs/microservices'
const userProperties = { 'x-version': '1.0.0' }
const record = new MqttRecordBuilder(':cat:')
.setProperties({ userProperties })
.setQoS(1)
.build()
client.send('replace-emoji', record).subscribe(...)在服务端,也可以通过访问 MqttContext 读取这些选项。
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
const { properties: { userProperties } } = context.getPacket()
return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈'
}在某些场景下,如果你希望为多个请求统一配置用户属性,可以将这些选项传递给 ClientProxyFactory。
import { Module } from '@nestjs/common'
import { ClientProxyFactory, Transport } from '@nestjs/microservices'
@Module({
providers: [
{
provide: 'API_v1',
useFactory: () =>
ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1833',
userProperties: { 'x-version': '1.0.0' },
},
}),
},
],
})
export class ApiModule {}要实时获取连接和底层驱动实例的状态更新,可以订阅 status 流。该流会提供所选驱动专属的状态更新。以 MQTT 驱动为例,status 流会发出 connected、disconnected、reconnecting 和 closed 事件。
import { MqttStatus } from '@nestjs/microservices'
this.client.status.subscribe((status: MqttStatus) => {
console.log(status)
})同样,你也可以订阅服务器的 status 流,以接收服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: MqttStatus) => {
console.log(status)
})在某些情况下,你可能希望监听微服务内部发出的事件。例如,可以监听 error 事件,在发生错误时触发额外操作。要实现这一点,可以使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err)
})同样,你也可以监听服务器的内部事件:
import { MqttEvents } from '@nestjs/microservices'
server.on<MqttEvents>('error', (err) => {
console.error(err)
})对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或调用驱动专有方法等场景下非常有用。但请注意,在大多数情况下,你无需直接访问驱动。
如需访问底层驱动实例,可以使用 unwrap() 方法。该方法会返回底层驱动实例,泛型类型参数应指定你期望的驱动实例类型。
const mqttClient = this.client.unwrap<import('mqtt').MqttClient>()同样,你也可以访问服务器的底层驱动实例:
const mqttClient = server.unwrap<import('mqtt').MqttClient>()