RabbitMQ 是一个开源且轻量级的消息代理(message broker),支持多种消息协议。它可以以分布式和联邦式架构进行部署,以满足高扩展性和高可用性的需求。此外,RabbitMQ 是全球应用最广泛的消息代理,无论是初创公司还是大型企业都在使用。
要开始构建基于 RabbitMQ 的微服务,首先需要安装以下依赖包:
npm install amqplib amqp-connection-manager要使用 RabbitMQ 作为传输层(Transport Layer),请将如下 options 对象传递给 createMicroservice() 方法:
import { Transport } from '@nestjs/microservices'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
},
},
}
)options 属性是针对所选传输器特有的配置。RabbitMQ 传输器支持以下属性:
| 选项 | 说明 |
|---|---|
urls | 一个包含多个连接 URL 的数组,按顺序依次尝试连接 |
queue | 服务器监听的队列名称 |
prefetchCount | 设置通道的预取(prefetch)消息数量 |
isGlobalPrefetchCount | 启用每个通道的全局预取设置 |
noAck | 如果为 false,则启用手动确认(acknowledgment)模式 |
consumerTag | 服务器用于区分消费者消息投递的名称;该名称在通道上不能重复。通常建议省略此项,由服务器自动生成随机名称并在响应中返回。消费者标签标识符(Consumer Tag Identifier),详细说明 |
queueOptions | 队列的其他配置选项,详细说明 |
socketOptions | 连接的其他 socket 配置选项,详细说明 |
headers | 每条消息都携带的自定义头部(headers) |
replyQueue | 生产者使用的回复队列,默认为 amq.rabbitmq.reply-to |
persistent | 如果为真(truthy),消息将在代理(broker)重启后依然保留,前提是队列本身也支持持久化 |
noAssert | 为 false 时,消费前不会断言(assert)队列 |
wildcards | 仅当你希望使用主题交换机(Topic Exchange)将消息路由到队列时设置为 true。启用后,可以在消息和事件模式中使用通配符(*,#) |
exchange | 交换机(exchange)名称。当 "wildcards" 设置为 true 时,默认为队列名称 |
exchangeType | 交换机类型,默认为 topic。可选值包括 direct、fanout、topic 和 headers |
routingKey | 主题交换机的附加路由键(routing key) |
maxConnectionAttempts | 最大连接尝试次数,仅适用于消费者配置。-1 表示无限重试 |
与其他微服务传输器类似,你有多种方式可以创建 RabbitMQ 客户端代理(ClientProxy)实例。
其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要导入该模块,并使用 register() 方法传入一个选项对象。该对象的属性与上文 createMicroservice() 方法中展示的属性一致,并且还需要包含一个 name 属性作为注入令牌(Injection Token)。你可以在这里查看更多关于 ClientsModule 的内容。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})除此之外,你还可以选择其他方式来创建客户端,例如 ClientProxyFactory 或 @Client() 装饰器。相关内容可在此查阅。
在更复杂的场景中,你可能需要访问有关传入请求的更多信息。当使用 RabbitMQ 传输层时,可以访问 RmqContext 对象。
import { RmqContext, Payload, Ctx } from '@nestjs/microservices'
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`)
}要访问原始的 RabbitMQ 消息(包含 properties、fields 和 content),可以使用 RmqContext 对象的 getMessage() 方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage())
}如果需要获取 RabbitMQ 通道(channel) 的引用,可以使用 RmqContext 对象的 getChannelRef 方法,如下所示:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef())
}为了确保消息不会丢失,RabbitMQ 支持消息确认机制。消费者会向 RabbitMQ 发送一个确认(acknowledgement),告知 RabbitMQ 某条消息已被接收和处理,RabbitMQ 可以安全地将其删除。如果消费者在未发送确认的情况下终止(如其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 会认为该消息尚未被完整处理,并会将其重新入队。
要启用手动确认模式,需要将 noAck 属性设置为 false:
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},当开启手动消费者确认后,必须由工作进程(worker)主动发送确认,告知任务已完成。
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef()
const originalMsg = context.getMessage()
channel.ack(originalMsg)
}如需配置消息选项,可以使用 RmqRecordBuilder 类(注意:事件驱动流程同样适用)。例如,要设置 headers 和 priority 属性,可以使用 setOptions 方法,如下所示:
import { RmqRecordBuilder } from '@nestjs/microservices'
const message = ':cat:'
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build()
this.client.send('replace-emoji', record).subscribe(...)你也可以在服务端通过访问 RmqContext 读取这些值,示例如下:
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage()
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈'
}要实时获取连接状态以及底层驱动实例的状态更新,可以订阅 status 流。该流会根据所选驱动提供特定的状态更新。以 RMQ 驱动为例,status 流会发出 connected(已连接)和 disconnected(已断开)事件。
import { RmqStatus } from '@nestjs/microservices'
this.client.status.subscribe((status: RmqStatus) => {
console.log(status)
})同样地,你也可以订阅服务端的 status 流,以便接收服务端状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: RmqStatus) => {
console.log(status)
})在某些场景下,你可能希望监听微服务内部发出的事件。例如,可以监听 error(错误)事件,在发生错误时触发额外操作。为此,可以使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err)
})同样地,也可以监听服务端的内部事件:
import type { RmqEvents } from '@nestjs/microservices'
server.on<RmqEvents>('error', (err) => {
console.error(err)
})在更高级的用例中,你可能需要访问底层驱动实例。这在需要手动关闭连接或使用驱动特有方法等场景下非常有用。但请注意,在大多数情况下,你无需直接访问驱动。
要实现这一点,可以使用 unwrap() 方法,该方法会返回底层驱动实例。泛型类型参数应指定你期望的驱动实例类型。
const managerRef =
this.client.unwrap<import('amqp-connection-manager').AmqpConnectionManager>()同样地,你也可以访问服务端的底层驱动实例:
const managerRef =
server.unwrap<import('amqp-connection-manager').AmqpConnectionManager>()RabbitMQ 支持在路由键中使用通配符,从而实现灵活的消息路由。# 通配符可以匹配零个或多个单词,而 * 通配符只能匹配一个单词。
例如,路由键 cats.# 可以匹配 cats、cats.meow 和 cats.meow.purr。路由键 cats.* 可以匹配 cats.meow,但不能匹配 cats.meow.purr。
要在 RabbitMQ 微服务中启用通配符支持,需要在配置对象中将 wildcards 选项设置为 true:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
wildcards: true,
},
}
)配置完成后,你可以在订阅事件或消息时使用通配符路由键。例如,监听路由键为 cats.# 的消息,可以使用如下代码:
@MessagePattern('cats.#')
getCats(@Payload() data: { message: string }, @Ctx() context: RmqContext) {
console.log(`收到路由键为: ${context.getPattern()} 的消息`)
return {
message: '来自 cats 服务的问候!',
}
}要发送带有特定路由键的消息,可以使用 ClientProxy 实例的 send() 方法:
this.client.send('cats.meow', { message: 'Meow!' }).subscribe((response) => {
console.log(response)
})