NestJS Logo
NestJS 中文文档
v10.0.0
  • 介绍
  • 快速上手
  • 控制器
  • 提供者
  • 模块
  • 中间件
  • 异常过滤器
  • 管道
  • 守卫
  • 拦截器
  • 自定义装饰器
  • 自定义提供者
  • 异步提供者
  • 动态模块
  • 依赖注入作用域
  • 循环依赖
  • 模块引用
  • 懒加载模块
  • 执行上下文
  • 生命周期事件
  • 发现服务
  • 跨平台无关性
  • 测试
迁移指南
API 参考
官方课程
  1. 文档
  2. 微服务架构
  3. gRPC

Kafka
自定义传输器

gRPC

gRPC 是一个现代化、开源且高性能的远程过程调用(RPC,Remote Procedure Call)框架,可以在任何环境中运行。它能够高效地连接数据中心内外的服务,并且支持可插拔的负载均衡、链路追踪、健康检查和身份验证等功能。

与许多 RPC 系统类似,gRPC 基于服务(Service)的概念,将服务定义为一组可远程调用的函数(方法)。对于每个方法,你需要定义其参数和返回类型。服务、参数和返回类型都在 .proto 文件中通过 Google 开源的、与语言无关的 Protocol Buffers(协议缓冲) 机制进行定义。

在 gRPC 传输层(Transport Layer)中,Nest 使用 .proto 文件动态绑定客户端和服务端,从而简化远程过程调用的实现,并自动对结构化数据进行序列化和反序列化。

安装

要开始构建基于 gRPC 的微服务,首先需要安装以下依赖包:

npm install @grpc/grpc-js @grpc/proto-loader

概述

与其他 Nest 微服务传输层实现类似,你可以通过传递给 createMicroservice() 方法的选项对象中的 transport 属性来选择 gRPC 传输机制。下面的示例将演示如何搭建一个 hero 服务。options 属性用于提供该服务的元数据,其各项属性将在下文详细说明。

main.ts
import { NestFactory } from '@nestjs/core'
import { type MicroserviceOptions, Transport } from '@nestjs/microservices'
import { join } from 'node:path'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.GRPC,
    options: {
      package: 'hero',
      protoPath: join(__dirname, 'hero/hero.proto'),
    },
  }
)

在 nest-cli.json 文件中,我们添加了 assets 属性,用于分发非 TypeScript 文件,同时通过 watchAssets 属性开启对所有非 TypeScript 资源的监听。在本例中,我们希望 .proto 文件能够被自动复制到 dist 目录。

{
  "compilerOptions": {
    "assets": ["**/*.proto"],
    "watchAssets": true
  }
}

选项

gRPC 传输器选项对象包含以下属性。

属性描述
packageProtobuf 包名(需与 .proto 文件中的 package 设置一致)。必填
protoPath.proto 文件的绝对路径(或相对于根目录的相对路径)。必填
url连接地址。格式为 ip 地址/dns 名称:端口(例如 '0.0.0.0:50051',用于 Docker 服务器),用于指定传输器建立连接的地址和端口。可选,默认值为 'localhost:5000'
protoLoader用于加载 .proto 文件的 NPM 包名称。可选,默认值为 '@grpc/proto-loader'
loader@grpc/proto-loader 的选项。用于详细控制 .proto 文件的加载行为。可选。详细信息请参见这里
credentials服务器凭证,可选。详细说明请见这里

gRPC 服务示例

下面我们来定义一个名为 HeroesService 的示例 gRPC 服务。在上文的 options 对象中,protoPath 属性指定了 .proto 定义文件 hero.proto 的路径。hero.proto 文件采用 Protocol Buffers(协议缓冲) 语法进行结构定义。示例如下:

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc FindOne (HeroById) returns (Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

我们的 HeroesService 暴露了一个 FindOne() 方法。该方法期望接收一个类型为 HeroById 的输入参数,并返回一个 Hero 消息(Protocol Buffers 使用 message 元素来定义参数类型和返回类型)。

接下来,我们需要实现该服务。要定义一个满足上述定义的处理器(handler),可以在控制器中使用 @GrpcMethod() 装饰器,如下所示。该装饰器为方法声明为 gRPC 服务方法提供了必要的元数据。

提示

之前微服务章节介绍的 @MessagePattern() 装饰器(详细说明)在基于 gRPC 的微服务中不再使用。对于 gRPC 微服务,@GrpcMethod() 装饰器起到了同样的作用。

heroes.controller.ts
import { GrpcMethod } from '@nestjs/microservices'
import { ServerUnaryCall } from 'grpc'

@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findOne(
    data: HeroById,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>
  ): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]
    return items.find(({ id }) => id === data.id)
  }
}

如上所示,装饰器接收两个参数。第一个参数是服务名称(如 'HeroesService'),对应于 hero.proto 文件中的 HeroesService 服务定义。第二个参数(字符串 'FindOne')对应于 HeroesService 中定义的 FindOne() rpc 方法。

findOne() 处理器方法接收三个参数:调用方传递的 data,用于存储 gRPC 请求元数据的 metadata,以及 call,可用于获取 GrpcCall 对象的属性(如 sendMetadata,用于向客户端发送元数据)。

@GrpcMethod() 装饰器的两个参数都是可选的。如果省略第二个参数(如 'FindOne'),Nest 会自动将 .proto 文件中的 rpc 方法与处理器关联,关联规则是将处理器方法名转换为大驼峰命名(UpperCamelCase),例如 findOne 处理器会自动关联到 FindOne rpc 方法。如下所示:

heroes.controller.ts
@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService')
  findOne(
    data: HeroById,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>
  ): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]
    return items.find(({ id }) => id === data.id)
  }
}

你也可以省略 @GrpcMethod() 的第一个参数。在这种情况下,Nest 会根据处理器所在类的名称自动将其与 proto 定义文件中的服务定义关联。例如,下面的代码中,HeroesService 类会将其处理器方法与 hero.proto 文件中名称为 HeroesService 的服务定义自动关联。

heroes.controller.ts
@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(
    data: HeroById,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>
  ): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]
    return items.find(({ id }) => id === data.id)
  }
}

客户端(Client)

Nest 应用可以作为 gRPC 客户端,消费在 .proto 文件中定义的服务。你可以通过 ClientGrpc 对象访问远程服务。获取 ClientGrpc 对象有多种方式。

推荐的方式是导入 ClientsModule。使用 register() 方法,将在 .proto 文件中定义的一组服务绑定到一个注入令牌(injection token),并进行相关配置。name 属性即为注入令牌。对于 gRPC 服务,需设置 transport: Transport.GRPC。options 属性是一个对象,其属性与上文描述一致。

imports: [
  ClientsModule.register([
    {
      name: 'HERO_PACKAGE',
      transport: Transport.GRPC,
      options: {
        package: 'hero',
        protoPath: join(__dirname, 'hero/hero.proto'),
      },
    },
  ]),
]
提示

register() 方法接收一个对象数组。要注册多个包,只需传入多个注册对象即可。

注册完成后,我们可以通过 @Inject() 注入已配置的 ClientGrpc 对象。然后,使用 ClientGrpc 对象的 getService() 方法获取服务实例,如下所示。

@Injectable()
export class AppService implements OnModuleInit {
  private heroesService: HeroesService

  constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService')
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 })
  }
}
警告

如果未在 proto 加载器配置中(microservice transporter 配置的 options.loader.keepcase)将 keepCase 选项设置为 true,gRPC 客户端不会发送名称中包含下划线 _ 的字段。

请注意,与其他微服务传输方式相比,这里有一个小区别。我们不是使用 ClientProxy 类,而是使用 ClientGrpc 类,它提供了 getService() 方法。getService() 泛型方法接收服务名称作为参数,并返回其实例(如果可用)。

另外,你也可以使用 @Client() 装饰器来实例化 ClientGrpc 对象,如下所示:

@Injectable()
export class AppService implements OnModuleInit {
  @Client({
    transport: Transport.GRPC,
    options: {
      package: 'hero',
      protoPath: join(__dirname, 'hero/hero.proto'),
    },
  })
  client: ClientGrpc

  private heroesService: HeroesService

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService')
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 })
  }
}

最后,对于更复杂的场景,我们可以使用 ClientProxyFactory 类注入动态配置的客户端,具体用法见这里。

无论采用哪种方式,最终我们都能获得 HeroesService 代理对象的引用,该对象暴露了 .proto 文件中定义的所有方法。现在,当我们访问这个代理对象(即 heroesService)时,gRPC 系统会自动序列化请求、转发到远程系统、返回响应并反序列化响应。由于 gRPC 屏蔽了这些网络通信细节,heroesService 的使用方式与本地提供者无异。

注意,所有服务方法都采用小驼峰命名法(lower camel case)(以符合语言的自然约定)。例如,虽然 .proto 文件 HeroesService 的定义中包含 FindOne() 函数,但 heroesService 实例会提供 findOne() 方法。

interface HeroesService {
  findOne(data: { id: number }): Observable<any>
}

消息处理器(message handler)同样可以返回一个 Observable,此时结果值会在流结束前持续发出。

heroes.controller.ts
@Get()
call(): Observable<any> {
  return this.heroesService.findOne({ id: 1 })
}

如需随请求发送 gRPC 元数据(metadata),可以传递第二个参数,如下所示:

import { Metadata } from 'grpc'

call(): Observable<any> {
  const metadata = new Metadata()
  metadata.add('Set-Cookie', 'yummy_cookie=choco')

  return this.heroesService.findOne({ id: 1 }, metadata)
}

请注意,这需要我们前面定义的 HeroesService 接口进行相应更新。

示例

一个可用的完整示例可在这里查看。

gRPC 反射机制(Reflection)

gRPC 服务器反射规范(gRPC Server Reflection Specification) 是一项标准,允许 gRPC 客户端请求服务器所暴露 API 的详细信息,类似于为 RESTful API 公开 OpenAPI 文档。这将极大地方便开发者使用如 grpc-ui 或 postman 等调试工具进行接口调试。

要为你的服务器添加 gRPC 反射机制支持,首先需要安装相关实现包:

npm install @grpc/reflection

然后,可以通过在 gRPC 服务器选项中的 onLoadPackageDefinition 钩子(hook)集成反射服务,示例如下:

main.ts
import { ReflectionService } from '@grpc/reflection'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    options: {
      onLoadPackageDefinition: (pkg, server) => {
        new ReflectionService(pkg).addToServer(server)
      },
    },
  }
)

现在,服务器将能够响应基于反射规范的 API 信息请求。

gRPC 流式通信(Streaming)

gRPC 本身支持长期保持的实时连接,通常被称为 流(streams)。流在诸如聊天、实时观测或分块数据传输等场景中非常有用。更多详细信息可参考官方文档这里。

Nest 支持两种方式实现 gRPC 流式处理器:

  • 使用 RxJS 的 Subject + Observable 处理器:可以直接在控制器方法内部编写响应逻辑,或将其传递给 Subject/Observable 的消费者。
  • 纯 gRPC 调用流处理器:适用于将流传递给某个执行器,由其处理后续的分发,符合 Node 标准的 Duplex 流处理器。

流式通信示例

我们来定义一个新的 gRPC 示例服务,名为 HelloService。hello.proto 文件采用 Protocol Buffers(协议缓冲) 进行结构定义。文件内容如下:

// hello/hello.proto
syntax = "proto3";

package hello;

service HelloService {
  rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
  rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}
提示

LotsOfGreetings 方法可以直接通过 @GrpcMethod 装饰器(如上方示例所示)实现,因为返回的流可以发出多个值。

基于上述 .proto 文件,我们定义 HelloService 接口如下:

interface HelloService {
  bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>
  lotsOfGreetings(upstream: Observable<HelloRequest>): Observable<HelloResponse>
}

interface HelloRequest {
  greeting: string
}

interface HelloResponse {
  reply: string
}
提示

该 proto 接口可通过 ts-proto 包自动生成,详细用法见这里。

Subject 策略

@GrpcStreamMethod() 装饰器会将函数参数作为 RxJS 的 Observable(可观察对象)传递。因此,我们可以接收并处理多条消息。

@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
  const subject = new Subject()

  const onNext = message => {
    console.log(message)
    subject.next({
      reply: 'Hello, world!'
    })
  }
  const onComplete = () => subject.complete()
  messages.subscribe({
    next: onNext,
    complete: onComplete,
  })

  return subject.asObservable()
}
警告

要支持使用 @GrpcStreamMethod() 装饰器实现全双工(full-duplex)交互,控制器方法必须返回一个 RxJS Observable(可观察对象)。

提示

Metadata 和 ServerUnaryCall 类/接口是从 grpc 包中导入的。

根据服务定义(在 .proto 文件中),BidiHello 方法应当以流的方式向服务端发送请求。为了从客户端向流发送多条异步消息,我们可以利用 RxJS 的 ReplaySubject 类。

const helloService = this.client.getService<HelloService>('HelloService')
const helloRequest$ = new ReplaySubject<HelloRequest>()

helloRequest$.next({ greeting: 'Hello (1)!' })
helloRequest$.next({ greeting: 'Hello (2)!' })
helloRequest$.complete()

return helloService.bidiHello(helloRequest$)

在上面的示例中,我们向流中写入了两条消息(通过 next() 方法),并通过 complete() 方法通知服务端我们已完成数据发送。

调用流处理器(Call stream handler)

当方法的返回值被定义为 stream 时,@GrpcStreamCall() 装饰器会将函数参数提供为 grpc.ServerDuplexStream,它支持标准方法,例如 .on('data', callback)、.write(message) 或 .cancel()。关于可用方法的完整文档,请参见此处。

另外,当方法的返回值不是 stream 时,@GrpcStreamCall() 装饰器会分别提供两个函数参数,分别是 grpc.ServerReadableStream(详细内容见此处)和 callback。

我们先来实现支持全双工交互的 BidiHello 方法。

@GrpcStreamCall()
bidiHello(requestStream: any) {
  requestStream.on('data', message => {
    console.log(message)
    requestStream.write({
      reply: 'Hello, world!'
    })
  })
}
提示

该装饰器不要求必须有特定的返回参数。通常期望像处理其他标准流类型一样处理该流。

在上面的示例中,我们使用 write() 方法将对象写入响应流。传递给 .on() 方法的回调函数会在服务每次收到新的数据块时被调用。

接下来我们来实现 LotsOfGreetings 方法。

@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
  requestStream.on('data', message => {
    console.log(message)
  })
  requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }))
}

在这里,我们使用 callback 函数在 requestStream 处理完成后发送响应。

健康检查(Health checks)

在 Kubernetes 等编排器(orchestrator)中运行 gRPC 应用时,通常需要了解应用是否正在运行且处于健康状态。gRPC 健康检查规范 是一个标准,允许 gRPC 客户端暴露其健康状态,以便编排器能够据此做出相应操作。

要添加 gRPC 健康检查支持,首先安装 grpc-node 包:

npm install grpc-health-check

然后,可以通过在 gRPC 服务器选项中的 onLoadPackageDefinition 钩子(hook)将其集成到 gRPC 服务中。请注意,protoPath 需要同时包含健康检查和 hero 包。

main.ts
import { HealthImplementation, protoPath as healthCheckProtoPath } from 'grpc-health-check'

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  options: {
    protoPath: [
      healthCheckProtoPath,
      protoPath: join(__dirname, 'hero/hero.proto'),
    ],
    onLoadPackageDefinition: (pkg, server) => {
      const healthImpl = new HealthImplementation({
        '': 'UNKNOWN',
      })

      healthImpl.addToServer(server)
      healthImpl.setStatus('', 'SERVING')
    },
  },
})
提示

gRPC health probe 是一个在容器化环境中测试 gRPC 健康检查非常实用的命令行工具(CLI)。

gRPC 元数据(Metadata)

gRPC 元数据是关于某个远程过程调用(RPC)的信息,以键值对列表的形式存在。键为字符串,值通常为字符串,也可以是二进制数据。元数据对于 gRPC 本身是透明的 —— 它允许客户端在调用时向服务器传递相关信息,服务器也可以向客户端传递信息。元数据常用于包含身份验证令牌、请求标识符、监控标签,以及诸如数据集记录数等数据相关信息。

要在 @GrpcMethod() 处理器中读取元数据,可以使用第二个参数(metadata),其类型为 Metadata(从 grpc 包导入)。

如果需要从处理器中返回元数据,可以使用 ServerUnaryCall#sendMetadata() 方法(处理器的第三个参数)。

heroes.controller.ts
@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(
    data: HeroById,
    metadata: Metadata,
    call: ServerUnaryCall<any, any>
  ): Hero {
    const serverMetadata = new Metadata()
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ]

    serverMetadata.add('Set-Cookie', 'yummy_cookie=choco')
    call.sendMetadata(serverMetadata)

    return items.find(({ id }) => id === data.id)
  }
}

同样地,要在使用 @GrpcStreamMethod() 装饰器的处理器(详见 Subject 策略)中读取元数据,也可以通过第二个参数(metadata),其类型为 Metadata(从 grpc 包导入)。

如果需要从处理器中返回元数据,可以使用 ServerDuplexStream#sendMetadata() 方法(处理器的第三个参数)。

如果需要在调用流处理器(即使用 @GrpcStreamCall() 装饰器的处理器)中读取元数据,可以监听 requestStream 对象上的 metadata 事件,示例如下:

requestStream.on('metadata', (metadata: Metadata) => {
  const meta = metadata.get('X-Meta')
})