队列(Queues)是一种强大的架构模式,常用于提升应用的可扩展性与性能表现。借助队列,你可以优雅地应对以下几类典型问题:
Nest 提供了两个官方包用于队列集成:
@nestjs/bullmq:用于集成 BullMQ,一个基于现代 TypeScript 构建的队列系统,功能丰富、开发活跃。@nestjs/bull:用于集成经典的 Bull 队列库,目前处于维护模式,主要接收 bug 修复。这两个包都是在各自底层库的基础上由 Nest 团队开发和维护的抽象封装。虽然 Bull 仍然稳定可靠,适用于许多场景,但若你希望使用更新的 API 设计、更强的功能与长期支持,推荐优先考虑 BullMQ。
无论是 Bull 还是 BullMQ,底层都依赖 Redis 进行作业持久化,因此你需要在系统中预先安装并运行 Redis 服务。得益于 Redis 的分布式特性,你的队列架构可以轻松横跨多个节点和平台。例如,你可以在一个服务节点上运行队列的生产者(Producer)、消费者(Consumer)及事件监听器,也可以在网络中的其他 Node.js 节点上运行更多的这些角色,实现任务处理的水平扩展。
本章将详细介绍 @nestjs/bullmq 与 @nestjs/bull 的使用方法与核心概念。建议你同时参考 BullMQ 官方文档和 Bull 参考手册,以便更深入地理解其机制与能力。
要在 NestJS 中使用 BullMQ,首先需要安装相关依赖:
npm install @nestjs/bullmq bullmq安装完成后,可以在应用的根模块中导入 BullModule:
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bullmq'
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}上述代码中,forRoot() 方法用于注册全局配置,这些配置将默认应用于模块中声明的所有队列(除非单独覆盖)。常见的配置项包括:
connection: ConnectionOptions
Redis 连接配置。详情请参考官方连接指南。prefix: string
所有队列键的统一前缀,用于区分不同应用中的队列。defaultJobOptions: JobOpts
每个新作业的默认配置选项。可用于设置重试次数、延迟时间等,详见 JobOpts 文档。settings: AdvancedSettings
队列的高级行为设置,通常无需更改。详见 AdvancedSettings 文档。extraOptions
模块初始化时的额外选项。详见手动注册部分。所有配置项均为可选,提供了对队列行为的精细控制。这些选项会被直接传入 BullMQ 的 Queue 构造函数。更多可用参数请参考 QueueOptions 文档。
要声明一个队列,可以使用 BullModule.registerQueue() 方法:
BullModule.registerQueue({
name: 'audio',
})你可以在 registerQueue() 中同时传入多个配置对象,以注册多个队列。
该方法会注册并实例化一个队列。所有注册的队列实例会在连接到相同 Redis 实例的模块和进程之间共享。 队列名称不仅用于注入时的令牌标识,还会作为装饰器参数,用于将消费者类和事件监听器绑定到对应队列。
此外,你可以为某个队列单独设置连接参数或覆盖全局配置,例如:
BullModule.registerQueue({
name: 'audio',
connection: {
port: 6380,
},
})BullMQ 支持作业之间的父子依赖关系,允许构建作业树,从而实现复杂的执行流程(称为 Flow)。 例如:
BullModule.registerFlowProducer({
name: 'flowProducerName',
})有关作业流的更多细节,请参考官方文档。
由于作业会被持久化到 Redis 中,每当某个队列被重新实例化(如应用重启),系统会自动尝试处理上一次遗留的作业。
每个队列可以拥有多个生产者、消费者和监听器。
默认情况下,消费者会按照先进先出(FIFO)顺序处理作业;但 BullMQ 也支持后进先出(LIFO)以及基于优先级的调度策略。 关于消费者和调度顺序的详细说明,请参见消费者章节。
当你的应用需要连接多个 Redis 实例来管理不同的队列时,可以使用命名配置(Named Configurations)。这种方式允许你为每个 Redis 实例注册一个具名配置,并在队列定义中通过对应的键进行引用。
例如,假设部分队列需要连接非默认的 Redis 实例,你可以这样为该实例注册一个配置:
BullModule.forRoot('alternative-config', {
connection: {
port: 6381,
},
})上述示例中,'alternative-config' 是你自定义的配置键(可以是任意字符串),用于标识这套 Redis 连接配置。
完成配置注册后,即可在调用 registerQueue() 时通过 configKey 引用该命名配置:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
})作业生产者的职责是向队列中投递作业(job)。在 NestJS 中,生产者通常是服务(Service)类的实例,也就是 Nest 的提供者(Provider)。
要发送作业,你需要先在服务中注入目标队列:
import { Injectable } from '@nestjs/common'
import { Queue } from 'bullmq'
import { InjectQueue } from '@nestjs/bullmq'
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}@InjectQueue() 装饰器通过队列名称注入队列实例。该名称应与你在
registerQueue() 中指定的名称一致,例如 'audio'。
接下来,即可使用队列的 add() 方法添加作业。每个作业都是一个可序列化的 JavaScript 对象(因为作业数据会存储在 Redis 中)。你可以根据具体业务需求自由设计该对象的结构。
同时,作业还需指定一个作业名称。这个名称可以用于为特定类型的作业创建专属的消费者。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
})在添加任务(Job)时,你可以通过 Queue.add() 方法的第三个参数传入一个选项对象,以控制该任务的行为。以下是常用的任务选项说明:
priority: number — 任务优先级。数值越小优先级越高,范围从 1(最高)到 MAX_INT(最低)。启用优先级排序会增加系统开销,建议谨慎使用。delay: number — 延迟执行。指定在添加任务后等待多少毫秒再执行。为确保延迟准确性,请确保服务端与客户端的系统时间同步。attempts: number — 最大尝试次数。设置任务在失败后可重试的次数。repeat: RepeatOpts — 任务重复。可使用 Cron 表达式定义重复执行规则。详见 RepeatOpts。backoff: number | BackoffOpts — 失败重试的退避策略(Backoff)。详见 BackoffOpts。lifo: boolean — 后进先出(LIFO)策略。为 true 时,任务将添加到队列末尾(即后添加的任务先执行)。默认为 false(先进先出 FIFO)。jobId: string | number — 自定义任务 ID。默认情况下系统会自动生成唯一 ID。若需自定义,请确保唯一性。若添加了重复 ID 的任务,将不会入队。removeOnComplete: boolean | number — 任务完成后的清理策略。设为 true 表示任务成功后自动移除;若为数字,则表示最多保留指定数量的已完成任务。默认行为是不自动移除。removeOnFail: boolean | number — 任务失败后的清理策略。行为与 removeOnComplete 类似,用于控制失败任务的保留数量。stackTraceLimit: number — 堆栈跟踪行数限制。用于限制任务失败时记录的堆栈深度。通过设置 delay 选项,让任务延后 3 秒开始执行:
const job = await this.audioQueue.add(
'transcode',
{ foo: 'bar' },
{ delay: 3000 } // 延迟 3 秒执行
)将 lifo 设为 true,让任务加入队列尾部,后添加的任务优先执行:
const job = await this.audioQueue.add(
'transcode',
{ foo: 'bar' },
{ lifo: true }
)使用 priority 属性定义任务优先级:
const job = await this.audioQueue.add(
'transcode',
{ foo: 'bar' },
{ priority: 2 }
)完整配置项请参考官方 API 文档:
消费者是一个类,用于定义如何处理添加到队列中的任务,或监听队列相关事件,亦或两者兼具。可通过 @Processor() 装饰器将一个类标记为特定队列的消费者:
import { Processor } from '@nestjs/bullmq'
@Processor('audio')
export class AudioConsumer {}消费者类必须以「提供者」的形式进行注册,@nestjs/bullmq
才能正确识别并启用它。
装饰器中的字符串参数(如 'audio')指定了该类关联的队列名称。
要处理任务,可让消费者类继承自 WorkerHost 并实现 process 方法:
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Job } from 'bullmq'
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
let progress = 0
for (let i = 0; i < 100; i++) {
await doSomething(job.data)
progress += 1
await job.updateProgress(progress)
}
return {}
}
}当 worker 空闲且队列中有待处理的任务时,updateProgress 方法将被调用。该方法接收一个 Job 实例作为参数,返回值将存储在任务结果中,供后续监听任务完成事件时访问。
Job 对象还提供了一系列方法用于与任务状态进行交互,例如上述代码中调用的 updateProgress() 用于更新任务进度。完整 API 可参考 Job 类文档。
在早期的 Bull 中,可以通过为 @Process() 装饰器指定 name,让特定方法仅处理具有该名称的任务,例如:
@Process('transcode')
async transcode(job: Job<unknown>) { ... }该特性在 BullMQ 中已被移除,不再支持此方式,具体原因详见下方说明。
为了避免行为混淆,BullMQ 取消了「按任务名称绑定处理方法」的支持。如今,如需根据任务名称执行不同逻辑,推荐在 process() 方法中使用 switch 语句进行分发:
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Job } from 'bullmq'
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
switch (job.name) {
case 'transcode': {
let progress = 0
for (let i = 0; i < 100; i++) {
await doSomething(job.data)
progress += 1
await job.updateProgress(progress)
}
return {}
}
case 'concatenate': {
await doSomeLogic2()
break
}
default:
throw new Error(`Unknown job type: ${job.name}`)
}
}
}更多信息可参考官方文档:Named Processor(具名处理器)。
当消费者类被设置为请求作用域(关于注入作用域的详细说明可参考依赖注入作用域),每当有新的作业(job)被调度执行,框架都会为其创建该类的全新实例。在作业处理完成后,该实例将被自动回收。
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})由于此类消费者是「按作业实例化」的,因此你可以直接在构造函数中通过标准方式注入 JOB_REF 令牌,从而获取当前作业的引用。
import { Inject } from '@nestjs/common'
import { JOB_REF } from '@nestjs/bullmq'
import { Job } from 'bullmq'
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef)
}当队列或作业的状态发生变化时,BullMQ 会触发一系列事件。你可以通过装饰器对这些事件进行监听:
@OnWorkerEvent(event):监听 Worker 端的事件(即处理作业的执行器);@OnQueueEvent(event):监听 Queue 端的事件(即任务在队列中的生命周期变化)。Worker 事件只能在消费者类中定义(即带有 @Processor() 装饰器的类)。要监听某个事件,只需使用 @OnWorkerEvent() 装饰器并传入事件名称。例如,以下代码展示了如何监听 audio 队列中作业进入 active 状态时的事件:
import { Processor, Process, OnWorkerEvent } from '@nestjs/bullmq'
import { Job } from 'bullmq'
@Processor('audio')
export class AudioConsumer {
@OnWorkerEvent('active')
onActive(job: Job) {
console.log(
`正在处理作业 ${job.id},类型为 ${job.name},数据为 ${job.data}...`
)
}
// ...
}完整的 Worker 事件列表及其参数说明,详见 WorkerListener 官方文档。
Queue 事件监听器用于监听队列中的任务状态变化(如排队、完成、失败等),并不依赖 Worker。
你需要使用 @QueueEventsListener(queueName) 装饰器标记监听器类,并继承 QueueEventsHost 基类。事件处理方法则使用 @OnQueueEvent(event) 装饰器定义。例如,以下代码用于监听 audio 队列中作业进入 active 状态的事件:
import {
QueueEventsHost,
QueueEventsListener,
OnQueueEvent,
} from '@nestjs/bullmq'
@QueueEventsListener('audio')
export class AudioEventsListener extends QueueEventsHost {
@OnQueueEvent('active')
onActive(job: { jobId: string; prev?: string }) {
console.log(`正在处理作业 ${job.jobId}...`)
}
// ...
}Queue 事件监听器必须作为提供者注册,@nestjs/bullmq
才能正确识别并绑定事件。
完整的事件类型和参数说明请参阅 QueueEventsListener 官方文档。
Queue 实例提供了一套管理队列的 API,可用于执行诸如暂停队列、恢复处理、统计任务状态等操作。完整的 API 文档可参考 BullMQ Queue API 文档。
这些方法可以直接在 Queue 对象上调用。以下是暂停和恢复队列的示例:
调用 pause() 方法可暂停队列。被暂停后,队列将不再接收新任务,但正在执行中的任务会继续处理直至完成:
await audioQueue.pause()要恢复队列,使用 resume() 方法:
await audioQueue.resume()任务处理器(Job handler)可以运行在独立的子进程中(fork 模式),详见官方文档。这种模式具有以下优势:
示例配置如下:
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bullmq'
import { join } from 'node:path'
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}由于任务处理函数运行在独立进程中,Nest 的依赖注入机制(IoC 容器)将无法生效。因此,你需要自行创建或引入处理函数中所需的所有依赖,包括服务实例、数据库连接等。
在某些场景下,你可能希望通过异步方式来提供 bullmq 的配置项,而不是直接使用静态对象。这时,可以使用 forRootAsync() 方法。它支持多种异步配置的写法。
类似地,如果你希望为队列配置异步选项,也可以使用 registerQueueAsync() 方法。
最常见的方式是通过工厂函数(factory function)返回配置对象:
BullModule.forRootAsync({
useFactory: () => ({
connection: {
host: 'localhost',
port: 6379,
},
}),
})这种方式与其他异步提供者的用法一致 —— 工厂函数可以是 async 的,并可通过 inject 注入依赖项:
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
})你也可以使用 useClass 来封装配置逻辑:
BullModule.forRootAsync({
useClass: BullConfigService,
})这种写法会在 BullModule 内部实例化 BullConfigService,并通过调用其 createSharedConfiguration() 方法来获取配置对象。需要注意,BullConfigService 必须实现 SharedBullConfigurationFactory 接口:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
connection: {
host: 'localhost',
port: 6379,
},
}
}
}如果你希望避免在 BullModule 内部重新创建实例,而是复用已有模块中的提供者,可以使用 useExisting:
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
})useExisting 与 useClass 的差异在于:它不会新建实例,而是引用已导入模块中现有的 ConfigService 实例。
与根模块配置类似,队列也可以使用异步方式进行注册,方法是使用 registerQueueAsync()。请注意:队列名称(name)应放在工厂函数外部:
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
})在默认配置下,BullModule 会在 onModuleInit 生命周期钩子中自动注册 BullMQ 的各类组件,包括队列、处理器和事件监听器。但在某些特定场景中,你可能希望自行控制注册流程。此时,可以通过启用 manualRegistration 选项来关闭自动注册功能,示例如下:
BullModule.forRoot({
extraOptions: {
manualRegistration: true,
},
})启用该选项后,BullMQ 组件将不会自动注册。你需要手动注入 BullRegistrar 并在适当的生命周期钩子(如 onModuleInit 或 onApplicationBootstrap)中显式调用其 register() 方法:
import { Injectable, OnModuleInit } from '@nestjs/common'
import { BullRegistrar } from '@nestjs/bullmq'
@Injectable()
export class AudioService implements OnModuleInit {
constructor(private bullRegistrar: BullRegistrar) {}
onModuleInit() {
if (yourConditionHere) {
this.bullRegistrar.register()
}
}
}需要注意的是,只有在调用了 register() 方法之后,BullMQ 的队列与处理器等组件才会真正生效。否则,任务将不会被处理。
如果你决定使用 BullMQ,请跳过本节及后续所有与 Bull 相关的内容。
在 NestJS 中使用 Bull,首先需要安装相关依赖:
npm install @nestjs/bull bull安装完成后,在根模块 AppModule 中引入并配置 BullModule:
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bull'
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}forRoot() 方法用于全局注册 Bull 的配置,该配置将应用于所有未单独指定设置的队列。主要支持以下选项:
limiter: RateLimiter — 控制任务处理速率的限流配置。详见 RateLimiter。redis: RedisOpts — Redis 连接参数配置。详见 RedisOpts。prefix: string — 队列键名的统一前缀。defaultJobOptions: JobOpts — 新任务的默认参数配置,详见 JobOpts 文档。
⚠️ 注意:如果通过 FlowProducer 创建任务,此设置将不会生效,详见 bullmq#1034。settings: AdvancedSettings — 队列的高级设置项,通常无需更改,详见 AdvancedSettings 文档。以上配置项均为可选,能够提供对队列行为的细粒度控制。这些选项最终将传递给原生 Bull 的 Queue 构造函数。更完整的配置说明请参考官方文档。
若要注册一个队列,可使用 BullModule.registerQueue() 方法:
BullModule.registerQueue({
name: 'audio',
})你可以同时传入多个配置对象来注册多个队列,例如:BullModule.registerQueue({ name: 'audio' }, { name: 'video' })
registerQueue() 方法用于注册一个或多个具体队列。被注册的队列在同一 Redis 数据库中(凭据一致)将被所有模块和进程共享。
每个队列需通过唯一的 name 属性进行标识,该名称用于:
你还可以为某个特定队列覆盖全局配置,例如:
BullModule.registerQueue({
name: 'audio',
redis: {
port: 6380,
},
})由于 Bull 会将任务持久化至 Redis,当某个队列被重新实例化(例如应用重启)时,将自动处理上一次未完成的任务。
每个队列可以拥有多个生产者(Producer)、消费者(Consumer) 和监听器(Listener)。消费者会按照特定顺序(默认 FIFO,也支持 LIFO 或基于优先级)处理任务,详细内容见消费者章节。
如果你的队列需要连接多个 Redis 实例,可以使用一种称为**命名配置(named configurations)**的技术。该特性允许你以指定的键注册多个配置,然后可以在队列选项中引用这些配置。
例如,假设你的应用中有一个额外的 Redis 实例(除了默认实例之外),并且有部分队列需要使用这个实例,你可以按如下方式注册其配置:
BullModule.forRoot('alternative-config', {
redis: {
port: 6381,
},
})在上面的示例中,'alternative-config' 只是一个配置键(可以是任意字符串)。
完成上述配置后,你现在可以在 registerQueue() 的选项对象中通过 configKey 指定该配置:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
})任务生产者(Job Producers)负责将任务添加到队列中。通常,Nest 应用中的服务类(即提供者)会扮演生产者的角色。
要向某个队列添加任务,首先需要在服务中通过依赖注入的方式引入该队列实例:
import { Injectable } from '@nestjs/common'
import { Queue } from 'bull'
import { InjectQueue } from '@nestjs/bull'
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}@InjectQueue() 装饰器根据传入的队列名称(即在 registerQueue()
中注册的名称,如 'audio')注入对应的队列实例。
注入队列后,就可以使用队列实例的 add() 方法向队列中添加任务。任务数据应以可序列化的 JavaScript 对象形式传入(因为底层会存储在 Redis 中)。你可以根据业务需求自由定义任务对象的结构:
const job = await this.audioQueue.add({
foo: 'bar',
})你可以为任务指定一个名称,从而在消费者端更有针对性地处理不同类型的任务:
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
})如果使用了命名任务,必须为队列中每个任务名称提供相应的处理器。否则,当队列尝试处理该任务时会抛出错误。有关消费者如何处理命名任务的更多信息,请参见消费者章节。
在向队列添加任务时,你可以为该任务附加额外的配置选项。这些选项通过 Queue.add() 方法的第二个参数传入,格式为一个对象,用于控制任务的执行行为。
this.audioQueue.add(taskData, jobOptions)常用的任务选项包括:
priority: number
任务的优先级,数值越小优先级越高,取值范围为 1(最高)到 MAX_INT(最低)。注意:启用优先级会引入一定的性能开销,应权衡使用。
delay: number
延迟执行任务的时间(单位:毫秒)。为确保延迟的准确性,请保持服务器与客户端时间同步。
attempts: number
任务失败后的最大重试次数,超过该次数将视为最终失败。
repeat: RepeatOpts
设置任务周期性重复执行,例如基于 cron 表达式。详见 RepeatOpts 文档。
backoff: number | BackoffOpts
设置任务失败后的退避策略,用于控制重试间隔。详见 BackoffOpts 文档。
lifo: boolean
是否启用「后进先出」模式(LIFO)。若为 true,任务将插入队列末端。
timeout: number
指定任务的超时时间(单位:毫秒)。若超时未完成,将被视为失败。
jobId: number | string
自定义任务 ID。 默认情况下,Bull 会自动生成唯一 ID,但你也可以手动指定。请确保自定义 ID 的唯一性,否则任务不会被添加。
removeOnComplete: boolean | number
控制任务在成功完成后是否自动移除。
true,任务完成后会被删除;removeOnFail: boolean | number
控制任务在所有重试失败后是否自动移除。
stackTraceLimit: number
设置任务失败时记录的堆栈追踪(stack trace)最大行数。
const job = await this.audioQueue.add(
{ foo: 'bar' },
{ delay: 3000 } // 延迟 3 秒后执行
)const job = await this.audioQueue.add({ foo: 'bar' }, { lifo: true })const job = await this.audioQueue.add(
{ foo: 'bar' },
{ priority: 2 } // 优先级为 2(高于默认)
)消费者是一个用于处理队列任务或监听队列事件的类。你可以通过 @Processor() 装饰器将其声明为消费者类:
import { Processor } from '@nestjs/bull'
@Processor('audio')
export class AudioConsumer {}要让 @nestjs/bull 正确识别消费者类,必须将其注册为一个提供者。
装饰器中的字符串参数(如 'audio')表示该消费者所监听的队列名称。
在消费者类中,可以通过为方法添加 @Process() 装饰器来定义具体的任务处理逻辑:
import { Processor, Process } from '@nestjs/bull'
import { Job } from 'bull'
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0
for (let i = 0; i < 100; i++) {
await doSomething(job.data)
progress += 1
await job.progress(progress)
}
return {}
}
}当 worker 空闲且队列中有待处理任务时,@Process() 装饰的方法(如上例中的 transcode())将自动被调用。该方法接收一个 Job 对象作为参数,并可返回任意值,该返回值会被存储在任务结果中,供后续任务完成事件使用。
Job 对象提供了多种方法用于任务状态交互。例如,上述代码中调用了 progress() 来更新任务进度。更多可用 API 请参考 Job 文档。
你还可以为 @Process() 装饰器传入特定的任务名称,使其仅处理具备该名称的任务:
@Process('transcode')
async transcode(job: Job<unknown>) {
// 仅处理 name 为 'transcode' 的任务
}在同一个消费者类中,可以通过多个 @Process() 装饰器,分别定义不同类型任务的处理逻辑。使用命名任务时,请确保每个任务名称都对应一个明确的处理方法。
如果为同一个队列定义了多个消费者类,则 @Process({ concurrency: 1 }) 中的并发数限制将不会生效。实际的最小并发数等于消费者类的数量。即便每个处理器方法监听的是不同名称的任务,这一规则仍然适用。
如果将消费者类定义为请求作用域(关于作用域的详细说明请参见依赖注入作用域),那么每个任务在执行时都会创建该类的独立实例,任务完成后实例将被自动销毁。
声明请求作用域的方式如下:
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})由于每个任务都拥有自己的消费者实例,因此可以使用构造函数注入任务本身。通过注入 JOB_REF 令牌,即可访问当前任务对象:
import { Inject } from '@nestjs/common'
import { JOB_REF } from '@nestjs/bull'
import { Job } from 'bull'
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef)
}当队列或任务状态发生变化时,Bull 会生成一系列有用的事件。Nest 提供了一组装饰器,用于订阅这些标准核心事件。这些装饰器由 @nestjs/bull 包导出。
事件监听器必须声明在消费者类中(即,被 @Processor() 装饰器标记的类)。要监听某个事件,可以使用下表中的装饰器之一,在类中声明对应的事件处理方法。例如,若要监听 audio 队列中任务进入 active 状态时触发的事件,可以参考如下写法:
import { Processor, Process, OnQueueActive } from '@nestjs/bull'
import { Job } from 'bull'
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
)
}
...由于 Bull 运行在分布式(多节点)环境中,因此引入了事件本地性(event locality)的概念。该概念指出,事件既可能完全在单个进程内被触发,也可能在多个进程共享的队列上被触发。本地事件(local event) 指的是在本地进程的队列上执行操作或状态变更时产生的事件。换句话说,当事件生产者和消费者都在同一个进程中时,所有队列上的事件都是本地事件。
当队列被多个进程共享时,就会出现全局事件(global event) 的可能性。若希望某个进程中的监听器能够接收到由另一个进程触发的事件通知,则必须注册为全局事件。
事件处理器会在其对应事件被触发时被调用。处理器的调用签名见下表,能够访问与事件相关的信息。下文将讨论本地事件和全局事件处理器签名的一个关键区别。
| 本地事件监听器 | 全局事件监听器 | 处理方法签名 / 触发时机 |
|---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 发生错误时触发。error 包含触发的错误信息。 |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - 当有作业(Job)等待被空闲的工作进程处理时触发。jobId 为进入此状态的作业 ID。 |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job) - 任务 job 已开始处理。 |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job) - 任务 job 被标记为阻塞(stalled)。这对于调试崩溃或事件循环暂停的作业工作进程非常有用。 |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number) - 任务 job 的进度已更新为 progress。 |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) 任务 job 已成功完成,结果为 result。 |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error) 任务 job 失败,原因是 err。 |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler() 队列已被暂停。 |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job) 队列已恢复。 |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 旧任务已从队列中清理。jobs 是被清理的任务数组,type 表示被清理的任务类型。 |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler() 当队列已处理完所有等待中的任务时触发(即使仍有一些延迟任务尚未处理)。 |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job) 任务 job 已被成功移除。 |
当监听全局事件时,方法签名与本地事件监听略有不同。具体来说,任何在本地版本中接收 job 对象的方法签名,在全局版本中会接收一个 jobId(number 类型)。如果你需要获取实际的 job 对象,可以使用 Queue#getJob 方法。由于该方法是异步的,因此事件处理函数应声明为 async。例如:
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId)
console.log('(Global) on completed: job ', job.id, ' -> result: ', result)
}要访问 Queue 对象(以便调用 getJob()
方法),你需要将其依赖注入到当前类中。同时,该队列必须已经在你注入它的模块(Module)中完成注册。
除了这些特定的事件监听装饰器外,你还可以结合使用通用的 @OnQueueEvent() 装饰器和 BullQueueEvents 或 BullQueueGlobalEvents 枚举来监听事件。你可以在这里阅读更多关于事件的内容。
队列(Queue)提供了 API,允许你执行诸如暂停和恢复、获取不同状态下任务数量等管理操作。完整的队列 API 可参考这里。你可以直接在 Queue 对象上调用这些方法,下方以暂停/恢复为例进行演示。
通过调用 pause() 方法可以暂停队列。被暂停的队列不会处理新的任务,但当前正在处理的任务会继续执行直到完成。
await audioQueue.pause()要恢复已暂停的队列,可以使用 resume() 方法,如下所示:
await audioQueue.resume()任务处理器(Job handler)也可以在独立(fork)进程中运行(参考来源)。这样做有以下几个优点:
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bull'
import { join } from 'node:path'
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}请注意,由于你的函数会在独立进程中执行,依赖注入机制(以及 IoC 容器)将不可用。这意味着你的处理器函数需要自行包含(或创建)所有所需的外部依赖实例。
import { Job, DoneCallback } from 'bull'
export default function (job: Job, cb: DoneCallback) {
console.log(`[${process.pid}] ${JSON.stringify(job.data)}`)
cb(null, 'It works')
}有时你可能希望以异步方式(而非静态方式)传递 bull 选项。此时,可以使用 forRootAsync() 方法,该方法提供了多种处理异步配置的方式。
其中一种方式是使用工厂函数:
BullModule.forRootAsync({
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
})我们的工厂函数行为与其他异步提供者类似(例如,它可以是 async,并且能够通过 inject 注入依赖)。
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
})另外,你也可以使用 useClass 语法:
BullModule.forRootAsync({
useClass: BullConfigService,
})上述写法会在 BullModule 内部实例化 BullConfigService,并通过调用 createSharedConfiguration() 方法来提供配置对象。需要注意的是,这意味着 BullConfigService 必须实现 SharedBullConfigurationFactory 接口,如下所示:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
}
}
}如果你希望避免在 BullModule 内部创建 BullConfigService 实例,而是复用其他模块中已导入的提供者,可以使用 useExisting 语法。
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
})这种写法与 useClass 类似,但有一个关键区别 —— BullModule 会查找已导入的模块,复用现有的 ConfigService,而不是新建一个实例。
同样地,如果你想以异步方式传递队列选项,可以使用 registerQueueAsync() 方法,只需注意将 name 属性放在工厂函数外部。
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
})你可以在这里查看完整示例。