Redis 传输器实现了发布/订阅(publish/subscribe)消息范式,并利用了 Redis 的 Pub/Sub 功能。发布的消息会被分类到不同的频道(channel)中,发布者无需关心最终会有哪些订阅者(subscriber)接收消息。每个微服务都可以订阅任意数量的频道,并且可以同时订阅多个频道。通过频道交换的消息属于即发即弃(fire-and-forget),也就是说,如果消息发布时没有任何订阅者感兴趣,该消息会被移除且无法恢复。因此,无法保证每条消息或事件至少会被一个服务处理。单条消息可以被多个订阅者订阅(并接收)。
要开始构建基于 Redis 的微服务,首先需要安装所需的依赖包:
npm install ioredis要使用 Redis 传输器,只需将如下选项对象传递给 createMicroservice() 方法:
import { Transport } from '@nestjs/core'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
},
}
)options 属性是针对所选传输层(transporter)特有的配置项。Redis 传输层支持以下属性:
| 选项 | 说明 |
|---|---|
host | 连接地址 |
port | 连接端口 |
retryAttempts | 消息重试的次数(默认值:0) |
retryDelay | 每次消息重试之间的延迟(毫秒)(默认值:0) |
wildcards | 启用 Redis 通配符订阅,指示传输层在底层使用 psubscribe / pmessage。(默认值:false) |
此外,该传输层还支持官方 ioredis 客户端支持的所有属性。
与其他微服务传输器类似,你有多种方式可以创建 Redis ClientProxy 实例,详见官方文档。
其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要先导入该模块,并使用 register() 方法传入一个配置对象。该对象的属性与上文 createMicroservice() 方法中所示一致,并且需要额外指定一个 name 属性作为注入令牌(Injection Token)。你可以在这里阅读更多关于 ClientsModule 的内容。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
}
},
]),
]
...
})除此之外,你还可以选择其他方式来创建客户端,例如 ClientProxyFactory 或 @Client() 装饰器。相关内容可参考官方文档。
在更复杂的场景下,你可能需要访问有关传入请求的更多信息。当使用 Redis 传输器时,可以通过 RedisContext 对象获取这些信息。
import { RedisContext, Payload, Ctx } from '@nestjs/microservices'
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RedisContext) {
console.log(`Channel: ${context.getChannel()}`)
}要启用通配符支持,需要将 wildcards 选项设置为 true。这样会指示传输层在底层使用 psubscribe 和 pmessage。
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.REDIS,
options: {
// 其他选项
wildcards: true,
},
})请确保在创建客户端实例时也传递了 wildcards 选项。
启用该选项后,你可以在消息和事件模式中使用通配符。例如,若要订阅所有以 notifications 开头的频道,可以使用如下模式:
@EventPattern('notifications.*')如需实时获取连接和底层驱动实例状态的更新,可以订阅 status 流。该流会提供特定于所选驱动的状态更新。对于 Redis 驱动,status 流会发出 connected、disconnected 和 reconnecting 事件。
import { RedisStatus } from '@nestjs/microservices'
this.client.status.subscribe((status: RedisStatus) => {
console.log(status)
})同样,你也可以订阅服务器的 status 流,以接收关于服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: RedisStatus) => {
console.log(status)
})在某些情况下,你可能希望监听微服务内部触发的事件。例如,你可以监听 error 事件,在发生错误时执行额外的操作。要实现这一点,可以使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err)
})同样地,你也可以监听服务器内部的事件:
import { RedisEvents } from '@nestjs/microservices'
server.on<RedisEvents>('error', (err) => {
console.error(err)
})对于更高级的用例,你可能需要访问底层驱动实例。这在需要手动关闭连接或使用驱动特有方法等场景下非常有用。但请注意,在大多数情况下,你无需直接操作驱动实例。
要访问底层驱动,可以使用 unwrap() 方法。该方法会返回底层驱动实例。泛型类型参数应指定你期望获得的驱动实例类型。
const [pub, sub] =
this.client.unwrap<[import('ioredis').Redis, import('ioredis').Redis]>()同样地,你也可以访问服务器的底层驱动实例:
const [pub, sub] =
server.unwrap<[import('ioredis').Redis, import('ioredis').Redis]>()需要注意的是,与其他传输器不同,Redis 传输器返回的是一个包含两个 ioredis 实例的元组:第一个用于发布消息,第二个用于订阅消息。