gRPC 是一个现代化、开源且高性能的远程过程调用(RPC,Remote Procedure Call)框架,可以在任何环境中运行。它能够高效地连接数据中心内外的服务,并且支持可插拔的负载均衡、链路追踪、健康检查和身份验证等功能。
与许多 RPC 系统类似,gRPC 基于服务(Service)的概念,将服务定义为一组可远程调用的函数(方法)。对于每个方法,你需要定义其参数和返回类型。服务、参数和返回类型都在 .proto 文件中通过 Google 开源的、与语言无关的 Protocol Buffers(协议缓冲) 机制进行定义。
在 gRPC 传输层(Transport Layer)中,Nest 使用 .proto 文件动态绑定客户端和服务端,从而简化远程过程调用的实现,并自动对结构化数据进行序列化和反序列化。
要开始构建基于 gRPC 的微服务,首先需要安装以下依赖包:
npm install @grpc/grpc-js @grpc/proto-loader与其他 Nest 微服务传输层实现类似,你可以通过传递给 createMicroservice() 方法的选项对象中的 transport 属性来选择 gRPC 传输机制。下面的示例将演示如何搭建一个 hero 服务。options 属性用于提供该服务的元数据,其各项属性将在下文详细说明。
import { NestFactory } from '@nestjs/core'
import { type MicroserviceOptions, Transport } from '@nestjs/microservices'
import { join } from 'node:path'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
}
)在 nest-cli.json 文件中,我们添加了 assets 属性,用于分发非 TypeScript 文件,同时通过 watchAssets 属性开启对所有非 TypeScript 资源的监听。在本例中,我们希望 .proto 文件能够被自动复制到 dist 目录。
{
"compilerOptions": {
"assets": ["**/*.proto"],
"watchAssets": true
}
}gRPC 传输器选项对象包含以下属性。
| 属性 | 描述 |
|---|---|
package | Protobuf 包名(需与 .proto 文件中的 package 设置一致)。必填 |
protoPath | .proto 文件的绝对路径(或相对于根目录的相对路径)。必填 |
url | 连接地址。格式为 ip 地址/dns 名称:端口(例如 '0.0.0.0:50051',用于 Docker 服务器),用于指定传输器建立连接的地址和端口。可选,默认值为 'localhost:5000' |
protoLoader | 用于加载 .proto 文件的 NPM 包名称。可选,默认值为 '@grpc/proto-loader' |
loader | @grpc/proto-loader 的选项。用于详细控制 .proto 文件的加载行为。可选。详细信息请参见这里 |
credentials | 服务器凭证,可选。详细说明请见这里 |
下面我们来定义一个名为 HeroesService 的示例 gRPC 服务。在上文的 options 对象中,protoPath 属性指定了 .proto 定义文件 hero.proto 的路径。hero.proto 文件采用 Protocol Buffers(协议缓冲) 语法进行结构定义。示例如下:
// hero/hero.proto
syntax = "proto3";
package hero;
service HeroesService {
rpc FindOne (HeroById) returns (Hero) {}
}
message HeroById {
int32 id = 1;
}
message Hero {
int32 id = 1;
string name = 2;
}我们的 HeroesService 暴露了一个 FindOne() 方法。该方法期望接收一个类型为 HeroById 的输入参数,并返回一个 Hero 消息(Protocol Buffers 使用 message 元素来定义参数类型和返回类型)。
接下来,我们需要实现该服务。要定义一个满足上述定义的处理器(handler),可以在控制器中使用 @GrpcMethod() 装饰器,如下所示。该装饰器为方法声明为 gRPC 服务方法提供了必要的元数据。
之前微服务章节介绍的 @MessagePattern()
装饰器(详细说明)在基于 gRPC
的微服务中不再使用。对于 gRPC 微服务,@GrpcMethod() 装饰器起到了同样的作用。
import { GrpcMethod } from '@nestjs/microservices'
import { ServerUnaryCall } from 'grpc'
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService', 'FindOne')
findOne(
data: HeroById,
metadata: Metadata,
call: ServerUnaryCall<any, any>
): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
return items.find(({ id }) => id === data.id)
}
}如上所示,装饰器接收两个参数。第一个参数是服务名称(如 'HeroesService'),对应于 hero.proto 文件中的 HeroesService 服务定义。第二个参数(字符串 'FindOne')对应于 HeroesService 中定义的 FindOne() rpc 方法。
findOne() 处理器方法接收三个参数:调用方传递的 data,用于存储 gRPC 请求元数据的 metadata,以及 call,可用于获取 GrpcCall 对象的属性(如 sendMetadata,用于向客户端发送元数据)。
@GrpcMethod() 装饰器的两个参数都是可选的。如果省略第二个参数(如 'FindOne'),Nest 会自动将 .proto 文件中的 rpc 方法与处理器关联,关联规则是将处理器方法名转换为大驼峰命名(UpperCamelCase),例如 findOne 处理器会自动关联到 FindOne rpc 方法。如下所示:
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService')
findOne(
data: HeroById,
metadata: Metadata,
call: ServerUnaryCall<any, any>
): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
return items.find(({ id }) => id === data.id)
}
}你也可以省略 @GrpcMethod() 的第一个参数。在这种情况下,Nest 会根据处理器所在类的名称自动将其与 proto 定义文件中的服务定义关联。例如,下面的代码中,HeroesService 类会将其处理器方法与 hero.proto 文件中名称为 HeroesService 的服务定义自动关联。
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(
data: HeroById,
metadata: Metadata,
call: ServerUnaryCall<any, any>
): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
return items.find(({ id }) => id === data.id)
}
}Nest 应用可以作为 gRPC 客户端,消费在 .proto 文件中定义的服务。你可以通过 ClientGrpc 对象访问远程服务。获取 ClientGrpc 对象有多种方式。
推荐的方式是导入 ClientsModule。使用 register() 方法,将在 .proto 文件中定义的一组服务绑定到一个注入令牌(injection token),并进行相关配置。name 属性即为注入令牌。对于 gRPC 服务,需设置 transport: Transport.GRPC。options 属性是一个对象,其属性与上文描述一致。
imports: [
ClientsModule.register([
{
name: 'HERO_PACKAGE',
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
},
]),
]register() 方法接收一个对象数组。要注册多个包,只需传入多个注册对象即可。
注册完成后,我们可以通过 @Inject() 注入已配置的 ClientGrpc 对象。然后,使用 ClientGrpc 对象的 getService() 方法获取服务实例,如下所示。
@Injectable()
export class AppService implements OnModuleInit {
private heroesService: HeroesService
constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService')
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 })
}
}如果未在 proto 加载器配置中(microservice transporter 配置的
options.loader.keepcase)将 keepCase 选项设置为 true,gRPC
客户端不会发送名称中包含下划线 _ 的字段。
请注意,与其他微服务传输方式相比,这里有一个小区别。我们不是使用 ClientProxy 类,而是使用 ClientGrpc 类,它提供了 getService() 方法。getService() 泛型方法接收服务名称作为参数,并返回其实例(如果可用)。
另外,你也可以使用 @Client() 装饰器来实例化 ClientGrpc 对象,如下所示:
@Injectable()
export class AppService implements OnModuleInit {
@Client({
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
})
client: ClientGrpc
private heroesService: HeroesService
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService')
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 })
}
}最后,对于更复杂的场景,我们可以使用 ClientProxyFactory 类注入动态配置的客户端,具体用法见这里。
无论采用哪种方式,最终我们都能获得 HeroesService 代理对象的引用,该对象暴露了 .proto 文件中定义的所有方法。现在,当我们访问这个代理对象(即 heroesService)时,gRPC 系统会自动序列化请求、转发到远程系统、返回响应并反序列化响应。由于 gRPC 屏蔽了这些网络通信细节,heroesService 的使用方式与本地提供者无异。
注意,所有服务方法都采用小驼峰命名法(lower camel case)(以符合语言的自然约定)。例如,虽然 .proto 文件 HeroesService 的定义中包含 FindOne() 函数,但 heroesService 实例会提供 findOne() 方法。
interface HeroesService {
findOne(data: { id: number }): Observable<any>
}消息处理器(message handler)同样可以返回一个 Observable,此时结果值会在流结束前持续发出。
@Get()
call(): Observable<any> {
return this.heroesService.findOne({ id: 1 })
}如需随请求发送 gRPC 元数据(metadata),可以传递第二个参数,如下所示:
import { Metadata } from 'grpc'
call(): Observable<any> {
const metadata = new Metadata()
metadata.add('Set-Cookie', 'yummy_cookie=choco')
return this.heroesService.findOne({ id: 1 }, metadata)
}请注意,这需要我们前面定义的 HeroesService 接口进行相应更新。
一个可用的完整示例可在这里查看。
gRPC 服务器反射规范(gRPC Server Reflection Specification) 是一项标准,允许 gRPC 客户端请求服务器所暴露 API 的详细信息,类似于为 RESTful API 公开 OpenAPI 文档。这将极大地方便开发者使用如 grpc-ui 或 postman 等调试工具进行接口调试。
要为你的服务器添加 gRPC 反射机制支持,首先需要安装相关实现包:
npm install @grpc/reflection然后,可以通过在 gRPC 服务器选项中的 onLoadPackageDefinition 钩子(hook)集成反射服务,示例如下:
import { ReflectionService } from '@grpc/reflection'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
options: {
onLoadPackageDefinition: (pkg, server) => {
new ReflectionService(pkg).addToServer(server)
},
},
}
)现在,服务器将能够响应基于反射规范的 API 信息请求。
gRPC 本身支持长期保持的实时连接,通常被称为 流(streams)。流在诸如聊天、实时观测或分块数据传输等场景中非常有用。更多详细信息可参考官方文档这里。
Nest 支持两种方式实现 gRPC 流式处理器:
Subject + Observable 处理器:可以直接在控制器方法内部编写响应逻辑,或将其传递给 Subject/Observable 的消费者。Duplex 流处理器。我们来定义一个新的 gRPC 示例服务,名为 HelloService。hello.proto 文件采用 Protocol Buffers(协议缓冲) 进行结构定义。文件内容如下:
// hello/hello.proto
syntax = "proto3";
package hello;
service HelloService {
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}LotsOfGreetings 方法可以直接通过 @GrpcMethod
装饰器(如上方示例所示)实现,因为返回的流可以发出多个值。
基于上述 .proto 文件,我们定义 HelloService 接口如下:
interface HelloService {
bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>
lotsOfGreetings(upstream: Observable<HelloRequest>): Observable<HelloResponse>
}
interface HelloRequest {
greeting: string
}
interface HelloResponse {
reply: string
}@GrpcStreamMethod() 装饰器会将函数参数作为 RxJS 的 Observable(可观察对象)传递。因此,我们可以接收并处理多条消息。
@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
const subject = new Subject()
const onNext = message => {
console.log(message)
subject.next({
reply: 'Hello, world!'
})
}
const onComplete = () => subject.complete()
messages.subscribe({
next: onNext,
complete: onComplete,
})
return subject.asObservable()
}要支持使用 @GrpcStreamMethod()
装饰器实现全双工(full-duplex)交互,控制器方法必须返回一个 RxJS
Observable(可观察对象)。
Metadata 和 ServerUnaryCall 类/接口是从 grpc 包中导入的。
根据服务定义(在 .proto 文件中),BidiHello 方法应当以流的方式向服务端发送请求。为了从客户端向流发送多条异步消息,我们可以利用 RxJS 的 ReplaySubject 类。
const helloService = this.client.getService<HelloService>('HelloService')
const helloRequest$ = new ReplaySubject<HelloRequest>()
helloRequest$.next({ greeting: 'Hello (1)!' })
helloRequest$.next({ greeting: 'Hello (2)!' })
helloRequest$.complete()
return helloService.bidiHello(helloRequest$)在上面的示例中,我们向流中写入了两条消息(通过 next() 方法),并通过 complete() 方法通知服务端我们已完成数据发送。
当方法的返回值被定义为 stream 时,@GrpcStreamCall() 装饰器会将函数参数提供为 grpc.ServerDuplexStream,它支持标准方法,例如 .on('data', callback)、.write(message) 或 .cancel()。关于可用方法的完整文档,请参见此处。
另外,当方法的返回值不是 stream 时,@GrpcStreamCall() 装饰器会分别提供两个函数参数,分别是 grpc.ServerReadableStream(详细内容见此处)和 callback。
我们先来实现支持全双工交互的 BidiHello 方法。
@GrpcStreamCall()
bidiHello(requestStream: any) {
requestStream.on('data', message => {
console.log(message)
requestStream.write({
reply: 'Hello, world!'
})
})
}该装饰器不要求必须有特定的返回参数。通常期望像处理其他标准流类型一样处理该流。
在上面的示例中,我们使用 write() 方法将对象写入响应流。传递给 .on() 方法的回调函数会在服务每次收到新的数据块时被调用。
接下来我们来实现 LotsOfGreetings 方法。
@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
requestStream.on('data', message => {
console.log(message)
})
requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }))
}在这里,我们使用 callback 函数在 requestStream 处理完成后发送响应。
在 Kubernetes 等编排器(orchestrator)中运行 gRPC 应用时,通常需要了解应用是否正在运行且处于健康状态。gRPC 健康检查规范 是一个标准,允许 gRPC 客户端暴露其健康状态,以便编排器能够据此做出相应操作。
要添加 gRPC 健康检查支持,首先安装 grpc-node 包:
npm install grpc-health-check然后,可以通过在 gRPC 服务器选项中的 onLoadPackageDefinition 钩子(hook)将其集成到 gRPC 服务中。请注意,protoPath 需要同时包含健康检查和 hero 包。
import { HealthImplementation, protoPath as healthCheckProtoPath } from 'grpc-health-check'
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
options: {
protoPath: [
healthCheckProtoPath,
protoPath: join(__dirname, 'hero/hero.proto'),
],
onLoadPackageDefinition: (pkg, server) => {
const healthImpl = new HealthImplementation({
'': 'UNKNOWN',
})
healthImpl.addToServer(server)
healthImpl.setStatus('', 'SERVING')
},
},
})gRPC health probe 是一个在容器化环境中测试 gRPC 健康检查非常实用的命令行工具(CLI)。
gRPC 元数据是关于某个远程过程调用(RPC)的信息,以键值对列表的形式存在。键为字符串,值通常为字符串,也可以是二进制数据。元数据对于 gRPC 本身是透明的 —— 它允许客户端在调用时向服务器传递相关信息,服务器也可以向客户端传递信息。元数据常用于包含身份验证令牌、请求标识符、监控标签,以及诸如数据集记录数等数据相关信息。
要在 @GrpcMethod() 处理器中读取元数据,可以使用第二个参数(metadata),其类型为 Metadata(从 grpc 包导入)。
如果需要从处理器中返回元数据,可以使用 ServerUnaryCall#sendMetadata() 方法(处理器的第三个参数)。
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(
data: HeroById,
metadata: Metadata,
call: ServerUnaryCall<any, any>
): Hero {
const serverMetadata = new Metadata()
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
]
serverMetadata.add('Set-Cookie', 'yummy_cookie=choco')
call.sendMetadata(serverMetadata)
return items.find(({ id }) => id === data.id)
}
}同样地,要在使用 @GrpcStreamMethod() 装饰器的处理器(详见 Subject 策略)中读取元数据,也可以通过第二个参数(metadata),其类型为 Metadata(从 grpc 包导入)。
如果需要从处理器中返回元数据,可以使用 ServerDuplexStream#sendMetadata() 方法(处理器的第三个参数)。
如果需要在调用流处理器(即使用 @GrpcStreamCall() 装饰器的处理器)中读取元数据,可以监听 requestStream 对象上的 metadata 事件,示例如下:
requestStream.on('metadata', (metadata: Metadata) => {
const meta = metadata.get('X-Meta')
})