当你刚开始学习后端开发时,可能会接触到这样的应用架构:
在一个典型的 CRUD(创建、读取、更新和删除)应用中,数据的处理流程是这样的:
这种架构就像一个万能工具,无论是查询数据还是修改数据,都走同一套流程。对于小型应用来说,这样做完全够用,简单直接。
随着应用规模的增长,你可能会遇到以下问题:
这时候,CQRS(Command and Query Responsibility Segregation,命令查询职责分离)就派上用场了。
CQRS 的基本思想很简单:把「读」和「写」分开处理。
这样做有什么好处呢?
为了在 NestJS 中实现 CQRS 模式,Nest 提供了一个专门的 CQRS 模块。接下来,我们将详细介绍如何使用这个模块来构建更加清晰、可扩展的应用架构。
首先,安装所需依赖包:
npm install @nestjs/cqrs安装完成后,在应用的根模块(通常是 AppModule)中导入 CqrsModule.forRoot():
import { Module } from '@nestjs/common'
import { CqrsModule } from '@nestjs/cqrs'
@Module({
imports: [CqrsModule.forRoot()],
})
export class AppModule {}CqrsModule 支持一个可选的配置对象,其可用属性如下表所示:
| 属性 | 说明 | 默认值 |
|---|---|---|
commandPublisher | 负责将命令(Command)分发给相应处理器的发布器。 | DefaultCommandPubSub |
eventPublisher | 用于发布事件(Event)的发布器,支持事件广播或后续处理。 | DefaultPubSub |
queryPublisher | 用于发布查询(Query)并触发数据检索操作的发布器。 | DefaultQueryPubSub |
unhandledExceptionPublisher | 负责处理未捕获异常(Unhandled Exception)的发布器,以确保异常被追踪或记录。 | DefaultUnhandledExceptionPubSub |
eventIdProvider | 提供唯一事件 ID 的服务,可通过生成或从事件实例中提取 ID。 | DefaultEventIdProvider |
rethrowUnhandled | 控制在处理未捕获异常后是否重新抛出该异常,便于调试和错误管理。 | false |
在 CQRS 中,命令(Command)代表一个「写操作」的意图。简单来说,命令就是告诉系统「我要做什么」的指令。
与传统的数据操作不同,命令关注的是业务行为,而不是数据本身。比如:
当你发送一个命令时,系统的处理流程是这样的:
CommandBus 负责找到对应的处理器。让我们通过一个游戏中的「屠龙」功能来理解命令模式。
@Injectable()
export class HeroesGameService {
constructor(private commandBus: CommandBus) {}
async killDragon(heroId: string, killDragonDto: KillDragonDto) {
const command = new KillDragonCommand(heroId, killDragonDto.dragonId)
return this.commandBus.execute(command)
}
}在上面的服务中,当用户要进行屠龙操作时:
KillDragonCommand 命令对象,包含英雄 ID 和龙的 ID。CommandBus.execute() 方法发送这个命令。export class KillDragonCommand extends Command<{
actionId: string // 指定命令处理器应该返回什么格式的数据
}> {
constructor(
public readonly heroId: string,
public readonly dragonId: string
) {
super()
}
}命令类就像一个「任务单」,包含了完成任务所需的所有信息:
heroId:哪个英雄要执行任务。dragonId:要击杀哪条龙。这里有几个要点:
关于 Command 基类:
Command<ReturnType> 可以指定命令执行后的返回类型。actionId 的对象。commandBus.execute() 的返回类型。Command 类是可选的,只有当你需要指定返回类型时才需要这样做。关于 CommandBus:
CommandBus 就像一个「任务分发中心」。execute() 方法返回一个 Promise,包含处理结果。有了命令类,我们还需要一个「处理器」来真正执行这个命令:
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(private repository: HeroesRepository) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command
const hero = this.repository.findOneById(+heroId)
hero.killEnemy(dragonId)
await this.repository.persist(hero)
// 必须返回符合命令定义的数据格式(包含 actionId 的对象)
return {
actionId: crypto.randomUUID(), // 生成唯一ID,让调用方知道操作已完成
}
}
}命令处理器就是真正「干活」的地方。让我们分解一下这个处理器:
装饰器和接口:
@CommandHandler(KillDragonCommand) 告诉 NestJS「这个类负责处理 KillDragonCommand」。ICommandHandler<KillDragonCommand> 确保我们实现了正确的接口。执行逻辑:
killEnemy() 方法(业务逻辑)。类型安全:
别忘了将处理器注册到模块中,这样 NestJS 才能找到并使用它:
@Module({
providers: [
HeroesGameService, // 发送命令的服务
KillDragonHandler, // 处理命令的处理器
HeroesRepository, // 数据访问层
],
// ... 其他配置
})
export class HeroesModule {}通过这个例子,你可以看到命令模式的几个优势:
KillDragonCommand 比直接调用数据库操作更能表达业务意图。在 CQRS 中,查询(Query)负责处理所有的「读操作」。与命令不同,查询不会改变系统状态,它们只是获取数据。
查询关注的是数据需求,而不是业务行为。比如:
让我们继续用游戏的例子,看看如何查询英雄的信息。
export class GetHeroQuery extends Query<Hero> {
constructor(public readonly heroId: string) {}
}查询类就像一个「数据请求单」,包含了获取数据所需的条件:
heroId:我们想要获取哪个英雄的信息。关于 Query 基类:
Query<ReturnType> 可以指定查询的返回类型。Hero 对象。queryBus.execute() 的返回类型为 Promise<Hero>。有了查询类,我们需要一个处理器来实际获取数据:
@QueryHandler(GetHeroQuery)
export class GetHeroHandler implements IQueryHandler<GetHeroQuery> {
constructor(private repository: HeroesRepository) {}
async execute(query: GetHeroQuery) {
return this.repository.findOneById(query.heroId)
}
}查询处理器负责实际获取数据。让我们分析一下:
装饰器和接口:
@QueryHandler(GetHeroQuery) 告诉 NestJS「这个类负责处理 GetHeroQuery」。IQueryHandler<GetHeroQuery> 确保我们实现了正确的接口。执行逻辑:
注意查询处理器通常很简单,因为它们只负责获取数据,不包含复杂的业务逻辑。
providers: [GetHeroHandler]现在可以在服务中使用这个查询:
const hero = await this.queryBus.execute(new GetHeroQuery(heroId))
// TypeScript 自动推断 hero 的类型为 Hero让我们对比一下查询和命令的区别:
| 特性 | 命令 | 查询 |
|---|---|---|
| 目的 | 改变状态 | 获取数据 |
| 副作用 | 有(修改数据) | 无(只读) |
| 返回值 | 通常返回操作结果 | 返回请求的数据 |
| 复杂度 | 可能包含复杂业务逻辑 | 通常比较简单 |
| 性能考虑 | 重点在准确性和一致性 | 重点在速度和效率 |
在 CQRS 中,事件(Event) 表示「已经发生的事情」。当系统状态发生变化时,我们发布事件来通知其他部分。
事件和命令的区别:
比如:
事件就像系统的「广播电台」,当重要的事情发生时,它会告诉所有感兴趣的部分:
让我们继续游戏的例子,看看当英雄杀死巨龙时会发生什么。
export class HeroKilledDragonEvent {
constructor(
public readonly heroId: string,
public readonly dragonId: string
) {
super()
}
}事件类非常简单,它只是一个数据容器,记录了「发生了什么」以及相关的信息:
heroId:哪个英雄参与了这个事件。dragonId:哪条龙被杀死了。注意事件类通常:
HeroKilledDragonEvent 而不是 KillDragonEvent。readonly。当业务操作完成时,我们需要发布相应的事件:
export class Hero extends AggregateRoot {
constructor(private id: string) {
super()
}
killEnemy(enemyId: string) {
// 1. 执行业务逻辑(比如减少HP、增加经验等)
// ... 业务逻辑代码 ...
// 2. 发布事件,告诉系统"英雄杀死了巨龙"
this.apply(new HeroKilledDragonEvent(this.id, enemyId))
}
}在这个模型中:
AggregateRoot:这个基类提供了事件发布的能力。apply() 方法:用来发布事件,告诉系统发生了什么。但是,模型本身不知道如何将事件发送到系统的其他部分。我们需要在命令处理器中把事件发布器关联到模型上。
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(
private repository: HeroesRepository,
private publisher: EventPublisher // 注入事件发布器
) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command
// 1. 获取英雄对象
const hero = await this.repository.findOneById(+heroId)
// 2. 将事件发布能力「装」到英雄对象上
const heroWithEventPublisher = this.publisher.mergeObjectContext(hero)
// 3. 执行业务操作(这时会发布事件)
heroWithEventPublisher.killEnemy(dragonId)
// 4. 提交所有待发布的事件
heroWithEventPublisher.commit()
// 5. 保存到数据库
await this.repository.persist(heroWithEventPublisher)
}
}这里的关键步骤:
EventPublisher:让命令处理器有发布事件的能力。mergeObjectContext():把事件发布能力「装」到英雄对象上。commit():手动发布所有待处理的事件。方式一:自动提交
如果你不想每次都手动调用 commit(),可以设置自动提交:
export class Hero extends AggregateRoot {
constructor(private id: string) {
super()
this.autoCommit = true // 设置自动提交
}
}方式二:类级别的事件发布 如果你想让整个类都具备事件发布能力:
const HeroModel = this.publisher.mergeClassContext(Hero)
const hero = new HeroModel('id') // 这个实例自动具备事件发布能力方式三:手动发布事件
你也可以直接使用 EventBus 发布事件:
this.eventBus.publish(new HeroKilledDragonEvent(heroId, dragonId))现在我们需要创建处理器来响应这个事件:
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler
implements IEventHandler<HeroKilledDragonEvent>
{
constructor(
private repository: HeroesRepository,
private notificationService: NotificationService
) {}
handle(event: HeroKilledDragonEvent) {
console.log(`英雄 ${event.heroId} 杀死了巨龙 ${event.dragonId}!`)
// 可能的后续操作:
// 1. 发送通知给其他玩家
this.notificationService.sendToAll(`英雄击败了巨龙!`)
// 2. 更新排行榜
// this.leaderboardService.updateScore(event.heroId)
// 3. 记录到审计日志
// this.auditService.log('DRAGON_KILLED', event)
}
}事件处理器的特点:
providers: [HeroKilledDragonHandler]异常处理 :事件处理器中的错误不会自动被捕获,需要手动处理(try/catch)。
HTTP 响应:事件处理器无法向客户端发送 HTTP 响应。如需推送信息,请使用 WebSocket 或 SSE。
请求上下文:事件处理器脱离了原始的 HTTP 请求上下文。
补偿机制:如果事件处理失败,考虑使用 Saga 模式进行补偿。
通过这个例子,你可以看到事件模式的几个优势:
Saga(或称「编排器」)是一种设计模式,用于管理应用中的复杂工作流。它通过监听事件并触发新命令来协调各个流程。例如,一个 Saga 可以监听 UserRegisteredEvent 事件,在用户成功注册后触发一个发送欢迎邮件的命令。
Saga 是一项非常强大的功能。单个 Saga 可以监听一个或多个事件。借助 RxJS 库,可以对事件流进行过滤、映射、分支和合并,从而创建复杂的工作流。每个 Saga 都返回一个产生命令实例的 Observable。随后,CommandBus 会异步分发这些命令。
下面,我们来创建一个 Saga,它将监听 HeroKilledDragonEvent 事件,并分发 DropAncientItemCommand 命令。
@Injectable()
export class HeroesGameSagas {
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(HeroKilledDragonEvent),
map((event) => new DropAncientItemCommand(event.heroId, fakeItemID))
)
}
}ofType 操作符和 @Saga() 装饰器均由 @nestjs/cqrs 包导出。@Saga() 装饰器用于将一个方法标记为 Saga。events$ 参数是包含所有事件的 Observable 流。ofType 操作符用于根据指定的事件类型过滤事件流,而 map 操作符则将事件映射为新的命令实例。
在本例中,Saga 将 HeroKilledDragonEvent 事件映射为 DropAncientItemCommand 命令,CommandBus 随后会自动分发该命令。
与查询、命令和事件处理器一样,HeroesGameSagas 也需要在模块中注册为提供者:
...
providers: [HeroesGameSagas]
...事件处理器是异步执行的,因此必须始终妥善处理异常,以防止应用进入不一致的状态。如果异常未被处理,EventBus 会创建一个 UnhandledExceptionInfo 对象,并将其发布到 UnhandledExceptionBus 这个 Observable 流中。之后,你便可以订阅此流来处理未捕获的异常。
private destroy$ = new Subject<void>()
constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
this.unhandledExceptionsBus
.pipe(takeUntil(this.destroy$))
.subscribe((exceptionInfo) => {
// 在此处处理异常
// 例如:发送到外部服务、终止进程或发布新事件
})
}
onModuleDestroy() {
this.destroy$.next()
this.destroy$.complete()
}如果需要筛选特定异常,可以使用 ofType 操作符,示例如下:
this.unhandledExceptionsBus
.pipe(
takeUntil(this.destroy$),
UnhandledExceptionBus.ofType(TransactionNotAllowedException)
)
.subscribe((exceptionInfo) => {
// 在此处处理异常
})其中,TransactionNotAllowedException 是我们希望筛选的异常类型。
UnhandledExceptionInfo 对象包含以下属性:
export interface UnhandledExceptionInfo<
Cause = IEvent | ICommand,
Exception = any,
> {
/**
* 抛出的异常。
*/
exception: Exception
/**
* 异常的原因(事件或命令的引用)。
*/
cause: Cause
}CommandBus、QueryBus 和 EventBus 都是可观察对象。这意味着我们可以订阅整个事件流,例如处理所有事件。比如,我们可以将所有事件记录到控制台,或者保存到事件存储中。
private destroy$ = new Subject<void>()
constructor(private eventBus: EventBus) {
this.eventBus
.pipe(takeUntil(this.destroy$))
.subscribe((event) => {
// 将事件保存到数据库
})
}
onModuleDestroy() {
this.destroy$.next()
this.destroy$.complete()
}对于来自其他编程语言背景的开发者而言,可能会对 Nest 的一个特性感到意外:在 Nest 应用中,绝大部分模块都是单例,并在所有请求之间共享。这包括数据库连接池、包含全局状态的单例服务等。需要强调的是,Node.js 并不采用为每个请求分配独立线程的请求/响应模型。因此,在 Nest 应用中使用单例实例是完全安全的。
不过,在某些特殊场景下,可能希望处理器具有基于请求的生命周期。例如在 GraphQL 应用中进行每个请求的缓存、请求追踪或多租户等场景。你可以在这里了解如何控制作用域。
在 CQRS 模式中,CommandBus、QueryBus 和 EventBus 都是单例,这给请求作用域的提供者带来了额外的复杂性。不过,@nestjs/cqrs 包巧妙地解决了这个问题:它会为每个待处理的命令、查询或事件自动创建请求作用域的处理器实例,从而极大地简化了开发过程。
要让处理器成为请求作用域,你可以:
@CommandHandler、@QueryHandler 或 @EventsHandler 装饰器中将其作用域设置为 REQUEST,如下所示。@CommandHandler(KillDragonCommand, {
scope: Scope.REQUEST,
})
export class KillDragonHandler {
// 具体实现
}如果你想在任何请求作用域的提供者中注入请求载荷,可以使用 @Inject(REQUEST) 装饰器。不过,在 CQRS 场景下,请求载荷的具体内容取决于上下文 —— 它可能是 HTTP 请求、定时任务,或是任何触发命令的操作。
该请求载荷必须是一个继承自 AsyncContext(由 @nestjs/cqrs 包提供)的类实例。AsyncContext 扮演着请求上下文的角色,它允许你在整个请求生命周期内传递和访问数据。
import { AsyncContext } from '@nestjs/cqrs'
export class MyRequest extends AsyncContext {
constructor(public readonly user: User) {
super()
}
}在执行命令时,将自定义的请求上下文作为第二个参数传递给 CommandBus#execute 方法:
const myRequest = new MyRequest(user)
await this.commandBus.execute(
new KillDragonCommand(heroId, killDragonDto.dragonId),
myRequest
)这样,MyRequest 实例就会作为 REQUEST 提供者,注入到对应的处理器中:
@CommandHandler(KillDragonCommand, {
scope: Scope.REQUEST,
})
export class KillDragonHandler {
constructor(
@Inject(REQUEST) private request: MyRequest // 注入请求上下文
) {}
// 处理器具体实现
}对于查询(Query)同样适用:
const myRequest = new MyRequest(user)
const hero = await this.queryBus.execute(new GetHeroQuery(heroId), myRequest)在查询处理器中:
@QueryHandler(GetHeroQuery, {
scope: Scope.REQUEST,
})
export class GetHeroHandler {
constructor(
@Inject(REQUEST) private request: MyRequest // 注入请求上下文
) {}
// 处理器具体实现
}对于事件(Event),虽然你可以将请求提供者传递给 EventBus#publish,但这种做法较少见。更常见的方式是使用 EventPublisher,将请求提供者合并到模型对象中:
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
this.request // 在此注入请求上下文
)订阅这些事件的请求作用域事件处理器将能够访问到请求提供者。
Saga 负责管理长生命周期的业务流程,因此它始终是单例的。不过,你仍然可以从事件对象中提取出请求上下文:
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(HeroKilledDragonEvent),
map((event) => {
const request = AsyncContext.of(event) // 获取请求上下文
const command = new DropAncientItemCommand(event.heroId, fakeItemID)
AsyncContext.merge(request, command) // 将请求上下文合并到命令中
return command
}),
)
}或者,你也可以使用 request.attachTo(command) 方法将请求上下文附加到命令对象上。
完整的示例代码,请参见这个 GitHub 仓库。