Kafka 是一个开源的分布式流式处理平台,具备以下三大核心能力:
Kafka 项目旨在提供一个统一的、高吞吐量、低延迟的平台,用于处理实时数据流。它能够很好地与 Apache Storm 和 Spark 集成,实现实时流数据分析。
要开始构建基于 Kafka 的微服务,首先需要安装相关依赖包:
npm install kafkajs与其他 Nest 微服务传输层实现类似,你可以通过在 createMicroservice() 方法中传入的 options 对象的 transport 属性来选择 Kafka 传输机制,同时可以通过可选的 options 属性进行配置,示例如下:
import { Transport } from '@nestjs/microservices'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
}
)options 属性是针对所选传输层特有的配置项。Kafka 传输层支持以下配置属性:
与其他微服务传输层实现相比,Kafka 有一个小区别。我们不再使用 ClientProxy 类,而是使用 ClientKafkaProxy 类。
与其他微服务传输层类似,你有多种方式来创建 ClientKafkaProxy 实例。
其中一种方式是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需要导入该模块,并使用 register() 方法传入一个配置对象。该对象的属性与上文 createMicroservice() 方法中展示的属性一致,并且还需要额外指定一个 name 属性作为注入令牌。你可以在这里查看更多关于 ClientsModule 的内容。
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})你也可以选择其他方式来创建客户端(如 ClientProxyFactory 或 @Client() 装饰器)。相关内容可参考这里。
使用 @Client() 装饰器的方式如下:
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
})
client: ClientKafkaProxyKafka 微服务的消息模式(Message pattern)会为请求和响应通道分别使用两个主题(topic)。ClientKafkaProxy.send() 方法通过为请求消息关联返回地址、关联 ID、响应主题和响应分区,实现消息的发送。这要求 ClientKafkaProxy 实例在发送消息前,必须已订阅响应主题,并且至少分配到一个分区。
随后,你需要为每个正在运行的 Nest 应用分配至少一个响应主题分区。例如,如果你运行了 4 个 Nest 应用,但响应主题只有 3 个分区,那么其中 1 个 Nest 应用在尝试发送消息时会报错。
当新的 ClientKafkaProxy 实例启动时,它们会加入消费者组,并订阅各自的主题。这个过程会触发消费者组内主题分区的再平衡(rebalance)。
通常,主题分区的分配采用轮询分区器(round robin partitioner),它会根据应用启动时随机设置的消费者名称,对消费者集合进行排序并分配分区。然而,当有新消费者加入消费者组时,新消费者可能会被插入到消费者集合的任意位置。这会导致已有消费者在新消费者之后的位置发生变化,从而被分配到不同的分区。结果是,这些被重新分配分区的消费者会丢失再平衡前已发送请求的响应消息。
为防止 ClientKafkaProxy 的消费者丢失响应消息,Nest 内置了专用的自定义分区器(custom partitioner)。该分区器会根据应用启动时设置的高精度时间戳(process.hrtime()),对消费者集合进行排序并分配分区。
ClientKafkaProxy 类提供了 subscribeToResponseOf() 方法。该方法接收一个请求主题名称作为参数,并将派生出的响应主题名称添加到响应主题集合中。在实现消息模式时,必须调用此方法。
onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon')
}如果 ClientKafkaProxy 实例是异步创建的,则必须在调用 connect() 方法之前调用 subscribeToResponseOf() 方法。
async onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon')
await this.client.connect()
}Nest 会将接收到的 Kafka 消息解析为一个包含 key、value 和 headers 属性的对象,这些属性的值类型均为 Buffer。随后,Nest 会将这些 Buffer 转换为字符串。如果字符串表现为「类对象」格式,Nest 会尝试将其作为 JSON 进行解析。最终,value 会被传递给其对应的处理器。
Nest 在发布事件或发送消息时,会对即将发送的 Kafka 消息进行序列化处理。这一过程会应用于传递给 ClientKafkaProxy 的 emit() 和 send() 方法的参数,或是 @MessagePattern 方法的返回值。序列化时,若对象不是字符串或 Buffer,则会通过 JSON.stringify() 或 toString() 原型方法将其「字符串化」。
import { Payload } from '@nestjs/microservices'
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const dragonId = message.dragonId
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
]
return items
}
}发送的消息也可以通过传递包含 key 和 value 属性的对象来指定消息键。为消息设置键对于满足分区协同(co-partitioning)要求非常重要。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest'
const heroId = message.heroId
const dragonId = message.dragonId
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
]
return {
headers: {
realm,
},
key: heroId,
value: items,
}
}
}此外,以上述格式传递的消息还可以通过 headers 哈希属性设置自定义消息头。消息头哈希属性的值必须为 string 或 Buffer 类型。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest'
const heroId = message.heroId
const dragonId = message.dragonId
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
]
return {
headers: {
kafka_nestRealm: realm,
},
key: heroId,
value: items,
}
}
}虽然请求-响应(request-response)方式非常适合服务之间的消息交换,但当你的消息风格是基于事件(event-based)时(例如 Kafka 场景下),这种方式就不太合适了 —— 此时你只需要发布事件而无需等待响应。在这种情况下,你不希望为维护两个主题(topic)而引入请求-响应所需的额外开销。
你可以参考以下两个章节,进一步了解相关内容:概览:基于事件以及概览:事件发布。
在更复杂的场景下,你可能需要访问有关传入请求的更多信息。当你使用 Kafka 传输器(Kafka transporter)时,可以通过 KafkaContext 对象获取这些信息。
import { type KafkaContext, Payload, Ctx } from '@nestjs/microservices'
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
console.log(`Topic: ${context.getTopic()}`)
}如果你需要访问原始的 Kafka IncomingMessage(传入消息)对象,可以通过 KafkaContext 对象的 getMessage() 方法获取,如下所示:
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage()
const partition = context.getPartition()
const { headers, timestamp } = originalMessage
}其中,IncomingMessage(传入消息)接口定义如下:
interface IncomingMessage {
topic: string
partition: number
timestamp: string
size: number
attributes: number
offset: string
key: any
value: any
headers: Record<string, any>
}如果你的处理函数每次处理消息的耗时较长,建议使用 heartbeat 回调。你可以通过 KafkaContext 的 getHeartbeat() 方法获取 heartbeat 函数,示例如下:
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const heartbeat = context.getHeartbeat()
// 执行耗时操作
await doWorkPart1()
// 发送心跳,避免超过 sessionTimeout
await heartbeat()
// 再次执行耗时操作
await doWorkPart2()
}Kafka 微服务组件会在 client.clientId 和 consumer.groupId 选项后追加各自角色的描述,以防止 Nest 微服务客户端和服务端组件之间发生命名冲突。默认情况下,ClientKafkaProxy 组件会在这两个选项后追加 -client,而 ServerKafka 组件则追加 -server。请注意下方示例中,所提供的值会被这样转换(如注释所示)。
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-server
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer', // hero-consumer-server
},
},
}
)对于客户端:
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-client
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-client
}
}
})
client: ClientKafkaProxy你可以通过继承自定义提供者中的 ClientKafkaProxy 和
KafkaServer,并重写构造函数,来自定义 Kafka 客户端和消费者的命名规范。
由于 Kafka 微服务消息模式会为请求和响应通道各使用一个主题(Topic),因此响应模式应当基于请求主题生成。默认情况下,响应主题的名称是在请求主题名称后追加 .reply 得到的。
onModuleInit() {
this.client.subscribeToResponseOf('hero.get') // hero.get.reply
}你可以通过继承自定义提供者中的 ClientKafkaProxy,并重写
getResponsePatternName 方法,来自定义 Kafka 响应主题的命名规范。
与其他传输层类似,所有未处理的异常都会被自动包装为 RpcException,并转换为「用户友好」的格式。然而,在某些边界场景下,你可能希望绕过这一机制,让异常直接被 kafkajs 驱动程序处理。当消息处理过程中抛出异常时,会指示 kafkajs 对该消息进行重试(重新投递)。这意味着即使消息(或事件)处理器已被触发,其 offset 也不会被提交到 Kafka。
对于事件处理器(基于事件的通信),所有未处理的异常默认都被视为可重试异常。
为此,你可以使用一个专用的类 KafkaRetriableException,用法如下:
import { KafkaRetriableException } from '@nestjs/microservices'
throw new KafkaRetriableException('...')除了默认的错误处理机制外,你还可以为 Kafka 事件创建自定义异常过滤器(Exception Filter),以便管理重试逻辑。例如,下面的示例演示了如何在达到可配置的最大重试次数后跳过有问题的事件:
import { Catch, ArgumentsHost, Logger } from '@nestjs/common'
import { BaseExceptionFilter } from '@nestjs/core'
import { KafkaContext } from '../ctx-host'
@Catch()
export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name)
constructor(
private readonly maxRetries: number,
// 超过最大重试次数时执行的可选自定义函数
private readonly skipHandler?: (message: any) => Promise<void>
) {
super()
}
async catch(exception: unknown, host: ArgumentsHost) {
const kafkaContext = host.switchToRpc().getContext<KafkaContext>()
const message = kafkaContext.getMessage()
const currentRetryCount = this.getRetryCountFromContext(kafkaContext)
if (currentRetryCount >= this.maxRetries) {
this.logger.warn(
`已超过最大重试次数(${this.maxRetries}),消息:${JSON.stringify(message)}`
)
if (this.skipHandler) {
try {
await this.skipHandler(message)
} catch (err) {
this.logger.error('skipHandler 执行出错:', err)
}
}
try {
await this.commitOffset(kafkaContext)
} catch (commitError) {
this.logger.error('提交 offset 失败:', commitError)
}
return // 停止异常的进一步传播
}
// 如果重试次数未达到上限,继续执行默认的异常过滤器逻辑
super.catch(exception, host)
}
private getRetryCountFromContext(context: KafkaContext): number {
const headers = context.getMessage().headers || {}
const retryHeader = headers['retryCount'] || headers['retry-count']
return retryHeader ? Number(retryHeader) : 0
}
private async commitOffset(context: KafkaContext): Promise<void> {
const consumer = context.getConsumer && context.getConsumer()
if (!consumer) {
throw new Error('无法从 KafkaContext 获取 Consumer 实例。')
}
const topic = context.getTopic && context.getTopic()
const partition = context.getPartition && context.getPartition()
const message = context.getMessage()
const offset = message.offset
if (!topic || partition === undefined || offset === undefined) {
throw new Error('提交 offset 时 Kafka 消息上下文信息不完整。')
}
await consumer.commitOffsets([
{
topic,
partition,
// 提交 offset 时需提交下一个编号(即当前 offset + 1)
offset: (Number(offset) + 1).toString(),
},
])
}
}该过滤器(Filter)允许你对 Kafka 事件的处理进行最多可配置次数的重试。一旦达到最大重试次数,会触发自定义的 skipHandler(如果有提供),并提交 offset,从而跳过有问题的事件,使后续事件能够继续被正常处理。
你可以通过将该过滤器添加到事件处理器中来集成使用:
@UseFilters(new KafkaMaxRetryExceptionFilter(5))
export class MyEventHandler {
@EventPattern('your-topic')
async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
// 你的事件处理逻辑...
}
}在使用 Kafka 时,提交偏移量(offset)是非常重要的操作。默认情况下,消息会在特定时间后自动提交。更多信息请参阅 KafkaJS 文档。KafkaContext 提供了一种方式,可以访问当前活跃的消费者(consumer),以手动提交偏移量。这里的消费者即 KafkaJS 的 consumer,其行为与原生 KafkaJS 实现一致。
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
// 业务逻辑
const { offset } = context.getMessage()
const partition = context.getPartition()
const topic = context.getTopic()
const consumer = context.getConsumer()
await consumer.commitOffsets([{ topic, partition, offset }])
}如果你希望禁用消息的自动提交,可以在 run 配置中将 autoCommit 设置为 false,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
autoCommit: false,
},
},
}
)要实时获取连接状态以及底层驱动实例的状态更新,可以订阅 status 流。该流会根据所选驱动提供特定的状态更新。以 Kafka 驱动为例,status 流会发出 connected(已连接)、disconnected(已断开连接)、rebalancing(再平衡)、crashed(已崩溃)和 stopped(已停止)等事件。
import type { KafkaStatus } from '@nestjs/microservices'
this.client.status.subscribe((status: KafkaStatus) => {
console.log(status)
})同样,你也可以订阅服务端的 status 流,以接收服务端状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...)
server.status.subscribe((status: KafkaStatus) => {
console.log(status)
})对于更高级的使用场景,你可能需要访问底层的生产者(producer)和消费者(consumer)实例。这在需要手动关闭连接或调用驱动专有方法时非常有用。但请注意,在大多数情况下,你无需直接操作驱动。
你可以通过 ClientKafkaProxy 实例暴露的 producer 和 consumer 属性来实现:
const producer = this.client.producer
const consumer = this.client.consumer