NestJS Logo
NestJS 中文文档
v10.0.0
  • 介绍
  • 快速上手
  • 控制器
  • 提供者
  • 模块
  • 中间件
  • 异常过滤器
  • 管道
  • 守卫
  • 拦截器
  • 自定义装饰器
  • 自定义提供者
  • 异步提供者
  • 动态模块
  • 依赖注入作用域
  • 循环依赖
  • 模块引用
  • 懒加载模块
  • 执行上下文
  • 生命周期事件
  • 发现服务
  • 跨平台无关性
  • 测试
迁移指南
API 参考
官方课程
  1. 文档
  2. 微服务架构
  3. RabbitMQ

NATS
Kafka

RabbitMQ(消息代理)

RabbitMQ 是一个开源且轻量级的消息代理(message broker),支持多种消息协议。它可以以分布式和联邦式架构进行部署,以满足高扩展性和高可用性的需求。此外,RabbitMQ 是全球应用最广泛的消息代理,无论是初创公司还是大型企业都在使用。

安装

要开始构建基于 RabbitMQ 的微服务,首先需要安装以下依赖包:

npm install amqplib amqp-connection-manager

概述

要使用 RabbitMQ 作为传输层(Transport Layer),请将如下 options 对象传递给 createMicroservice() 方法:

main.ts
import { Transport } from '@nestjs/microservices'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'cats_queue',
      queueOptions: {
        durable: false,
      },
    },
  }
)

选项

options 属性是针对所选传输器特有的配置。RabbitMQ 传输器支持以下属性:

选项说明
urls一个包含多个连接 URL 的数组,按顺序依次尝试连接
queue服务器监听的队列名称
prefetchCount设置通道的预取(prefetch)消息数量
isGlobalPrefetchCount启用每个通道的全局预取设置
noAck如果为 false,则启用手动确认(acknowledgment)模式
consumerTag服务器用于区分消费者消息投递的名称;该名称在通道上不能重复。通常建议省略此项,由服务器自动生成随机名称并在响应中返回。消费者标签标识符(Consumer Tag Identifier),详细说明
queueOptions队列的其他配置选项,详细说明
socketOptions连接的其他 socket 配置选项,详细说明
headers每条消息都携带的自定义头部(headers)
replyQueue生产者使用的回复队列,默认为 amq.rabbitmq.reply-to
persistent如果为真(truthy),消息将在代理(broker)重启后依然保留,前提是队列本身也支持持久化
noAssert为 false 时,消费前不会断言(assert)队列
wildcards仅当你希望使用主题交换机(Topic Exchange)将消息路由到队列时设置为 true。启用后,可以在消息和事件模式中使用通配符(*,#)
exchange交换机(exchange)名称。当 "wildcards" 设置为 true 时,默认为队列名称
exchangeType交换机类型,默认为 topic。可选值包括 direct、fanout、topic 和 headers
routingKey主题交换机的附加路由键(routing key)
maxConnectionAttempts最大连接尝试次数,仅适用于消费者配置。-1 表示无限重试

客户端(Client)

与其他微服务传输器类似,你有多种方式可以创建 RabbitMQ 客户端代理(ClientProxy)实例。

其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要导入该模块,并使用 register() 方法传入一个选项对象。该对象的属性与上文 createMicroservice() 方法中展示的属性一致,并且还需要包含一个 name 属性作为注入令牌(Injection Token)。你可以在这里查看更多关于 ClientsModule 的内容。

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          queueOptions: {
            durable: false
          },
        },
      },
    ]),
  ]
  ...
})

除此之外,你还可以选择其他方式来创建客户端,例如 ClientProxyFactory 或 @Client() 装饰器。相关内容可在此查阅。

上下文

在更复杂的场景中,你可能需要访问有关传入请求的更多信息。当使用 RabbitMQ 传输层时,可以访问 RmqContext 对象。

import { RmqContext, Payload, Ctx } from '@nestjs/microservices'

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(`Pattern: ${context.getPattern()}`)
}

要访问原始的 RabbitMQ 消息(包含 properties、fields 和 content),可以使用 RmqContext 对象的 getMessage() 方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage())
}

如果需要获取 RabbitMQ 通道(channel) 的引用,可以使用 RmqContext 对象的 getChannelRef 方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getChannelRef())
}

消息确认(Message acknowledgement)

为了确保消息不会丢失,RabbitMQ 支持消息确认机制。消费者会向 RabbitMQ 发送一个确认(acknowledgement),告知 RabbitMQ 某条消息已被接收和处理,RabbitMQ 可以安全地将其删除。如果消费者在未发送确认的情况下终止(如其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 会认为该消息尚未被完整处理,并会将其重新入队。

要启用手动确认模式,需要将 noAck 属性设置为 false:

options: {
  urls: ['amqp://localhost:5672'],
  queue: 'cats_queue',
  noAck: false,
  queueOptions: {
    durable: false
  },
},

当开启手动消费者确认后,必须由工作进程(worker)主动发送确认,告知任务已完成。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef()
  const originalMsg = context.getMessage()

  channel.ack(originalMsg)
}

消息构建器(Record builders)

如需配置消息选项,可以使用 RmqRecordBuilder 类(注意:事件驱动流程同样适用)。例如,要设置 headers 和 priority 属性,可以使用 setOptions 方法,如下所示:

import { RmqRecordBuilder } from '@nestjs/microservices'

const message = ':cat:'
const record = new RmqRecordBuilder(message)
  .setOptions({
    headers: {
      ['x-version']: '1.0.0',
    },
    priority: 3,
  })
  .build()

this.client.send('replace-emoji', record).subscribe(...)

你也可以在服务端通过访问 RmqContext 读取这些值,示例如下:

@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
  const { properties: { headers } } = context.getMessage()
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈'
}

实例状态更新

要实时获取连接状态以及底层驱动实例的状态更新,可以订阅 status 流。该流会根据所选驱动提供特定的状态更新。以 RMQ 驱动为例,status 流会发出 connected(已连接)和 disconnected(已断开)事件。

import { RmqStatus } from '@nestjs/microservices'

this.client.status.subscribe((status: RmqStatus) => {
  console.log(status)
})

同样地,你也可以订阅服务端的 status 流,以便接收服务端状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: RmqStatus) => {
  console.log(status)
})

监听 RabbitMQ 事件

在某些场景下,你可能希望监听微服务内部发出的事件。例如,可以监听 error(错误)事件,在发生错误时触发额外操作。为此,可以使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err)
})

同样地,也可以监听服务端的内部事件:

import type { RmqEvents } from '@nestjs/microservices'

server.on<RmqEvents>('error', (err) => {
  console.error(err)
})

底层驱动访问

在更高级的用例中,你可能需要访问底层驱动实例。这在需要手动关闭连接或使用驱动特有方法等场景下非常有用。但请注意,在大多数情况下,你无需直接访问驱动。

要实现这一点,可以使用 unwrap() 方法,该方法会返回底层驱动实例。泛型类型参数应指定你期望的驱动实例类型。

const managerRef =
  this.client.unwrap<import('amqp-connection-manager').AmqpConnectionManager>()

同样地,你也可以访问服务端的底层驱动实例:

const managerRef =
  server.unwrap<import('amqp-connection-manager').AmqpConnectionManager>()

通配符

RabbitMQ 支持在路由键中使用通配符,从而实现灵活的消息路由。# 通配符可以匹配零个或多个单词,而 * 通配符只能匹配一个单词。

例如,路由键 cats.# 可以匹配 cats、cats.meow 和 cats.meow.purr。路由键 cats.* 可以匹配 cats.meow,但不能匹配 cats.meow.purr。

要在 RabbitMQ 微服务中启用通配符支持,需要在配置对象中将 wildcards 选项设置为 true:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'cats_queue',
      wildcards: true,
    },
  }
)

配置完成后,你可以在订阅事件或消息时使用通配符路由键。例如,监听路由键为 cats.# 的消息,可以使用如下代码:

@MessagePattern('cats.#')
getCats(@Payload() data: { message: string }, @Ctx() context: RmqContext) {
  console.log(`收到路由键为: ${context.getPattern()} 的消息`)

  return {
    message: '来自 cats 服务的问候!',
  }
}

要发送带有特定路由键的消息,可以使用 ClientProxy 实例的 send() 方法:

this.client.send('cats.meow', { message: 'Meow!' }).subscribe((response) => {
  console.log(response)
})