跳到主要内容

gRPC

gRPC 是一个现代、开源、高性能的 RPC 框架, 可以在任何环境中运行。 它可以有效地连接数据中心内外的服务,并支持可插拔的负载均衡、跟踪、健康检查和身份验证。

与许多 RPC 系统一样,gRPC 基于在术语上定义服务的概念,这些服务可以远程调用。 对于每个方法,您定义参数和返回类型。 服务、参数和返回类型是使用 Google 的开源语言中立协议缓冲区 机制在 .proto 文件中定义的。

使用 gRPC 传输器,Nest 使用 .proto 文件动态绑定客户端和服务器, 以便轻松实现远程过程调用,并自动对结构化数据进行序列化和反序列化。

安装

要开始构建基于 gRPC 的微服务,请首先安装所需的包:

npm i --save @grpc/grpc-js @grpc/proto-loader

概述

与其他 Nest 微服务传输层实现一样, 您可以使用传递给 createMicroservice() 方法的 options 对象的 transport 属性选择 gRPC 传输机制。 在下面的示例中,我们将设置一个英雄服务。 options 属性提供了关于该服务的元数据;其属性如下所述

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
});
提示

join() 函数是从 path 包中导入的; Transport 枚举是从 @nestjs/microservices 包中导入的。

nest-cli.json 文件中,我们添加了 assets 属性, 该属性允许我们分发非 TypeScript 文件, 以及 watchAssets 用于打开对所有非 TypeScript 资源的监视。 在我们的情况下,我们希望将 .proto 文件自动复制到 dist 文件夹。

{
"compilerOptions": {
"assets": ["**/*.proto"],
"watchAssets": true
}
}

选项

gRPC 传输器选项对象公开了以下描述的属性。

  • package:Protobuf 包名称(与 .proto 文件中的 package 设置相匹配)。必需。
  • protoPath.proto 文件的绝对路径(或相对于根目录)。必需。
  • url:连接 URL。字符串的格式为 ip address/dns:port(例如,'localhost:50051'),定义传输器在哪个地址/端口上建立连接。可选。默认为 'localhost:5000'
  • protoLoader:用于加载 .proto 文件的 NPM 包名称。可选。默认为 '@grpc/proto-loader'
  • loader@grpc/proto-loader 的选项。这些选项可以详细控制 .proto 文件的行为。可选。有关更多详细信息,请参阅这里
  • credentials:服务器凭据。可选。了解更多

示例 gRPC 服务

让我们定义一个名为 HeroesService 的示例 gRPC 服务。 在上述选项对象中,protoPath 属性设置了一个路径, 指向 .proto 定义文件 hero.protohero.proto 文件使用 Protocol Buffers 进行结构化。 以下是它的样子:

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
rpc FindOne (HeroById) returns (Hero) {}
}

message HeroById {
int32 id = 1;
}

message Hero {
int32 id = 1;
string name = 2;
}

我们的 HeroesService 公开了一个 FindOne() 方法。 该方法期望一个类型为 HeroById 的输入参数, 并返回一个 Hero 消息(Protocol Buffers 使用消息元素来定义参数类型和返回类型)。

接下来,我们需要实现这个服务。 为了定义一个满足这一定义的处理程序,我们在控制器中使用 @GrpcMethod() 装饰器, 如下所示。该装饰器提供将方法声明为 gRPC 服务方法所需的元数据。

备注

前面的微服务章节中介绍的 @MessagePattern() 装饰器(了解更多) 不适用于基于 gRPC 的微服务。 @GrpcMethod() 装饰器有效地取代了基于 gRPC 的微服务。

heroes.controller.ts
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService', 'FindOne')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}
提示

@GrpcMethod() 装饰器是从 @nestjs/microservices 包中导入的, MetadataServerUnaryCall 是从 grpc 包中导入的。

上述装饰器接受两个参数。第一个是服务名称(例如,'HeroesService'), 对应于 hero.proto 文件中的 HeroesService 服务定义。 第二个参数(字符串 'FindOne')对应于在 hero.proto 文件中的 HeroesService 中定义的 FindOne() rpc 方法。

findOne() 处理程序方法接受三个参数: 来自调用者的data、存储 gRPC 请求元数据的 metadata, 以及用于获取 GrpcCall 对象属性的 call。 (例如用于将元数据发送到客户端的 sendMetadata

这两个 @GrpcMethod() 装饰器的参数都是可选的。 如果没有第二个参数调用(例如 'FindOne'), Nest 将根据将处理程序名称转换为大驼峰形式来自动将 .proto 文件的 rpc 方法与处理程序关联起来(例如, findOne 处理程序将与 FindOne rpc 调用定义相关联)。 以下是示例:

heroes.controller.ts
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}

你还可以省略第一个 @GrpcMethod() 装饰器的参数。 在这种情况下,Nest 将根据处理程序定义的类名自动将处理程序与 proto 定义文件中的服务定义相关联。 例如,以下代码中,HeroesService 类将其处理程序方法与 hero.proto 文件中的 HeroesService 服务定义相关联:

heroes.controller.ts
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}

客户端

Nest 应用程序可以作为 gRPC 客户端,消耗在 .proto 文件中定义的服务。 您可以通过 ClientGrpc 对象访问远程服务。 可以通过几种方式获得 ClientGrpc 对象。

首选的技术是导入 ClientsModule。 使用 register() 方法将在 .proto 文件中定义的一组服务绑定到一个注入令牌, 并配置服务。name 属性是注入令牌。 对于 gRPC 服务,请使用 transport: Transport.GRPCoptions 属性是具有上述相同属性的对象。

imports: [
ClientsModule.register([
{
name: 'HERO_PACKAGE',
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
},
]),
];
备注

register() 方法接受一个对象数组。通过提供一个以逗号分隔的注册对象列表,可以注册多个包。

注册后,我们可以使用 @Inject() 注入配置的 ClientGrpc 对象。 然后,我们使用 ClientGrpc 对象的 getService() 方法检索服务实例, 如下所示。

@Injectable()
export class AppService implements OnModuleInit {
private heroesService: HeroesService;

constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}

onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService');
}

getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 });
}
}
注意

gRPC 客户端不会发送字段名称中包含下划线 _ 的字段, 除非在 proto 加载器配置(options.loader.keepcase 在微服务传输器配置中) 中设置 keepCase 选项为 true

请注意,与其他微服务传输方法使用的技术相比,这里有一个小差异。 我们不使用 ClientProxy 类, 而是使用提供了 getService() 方法的 ClientGrpc 类。 getService() 泛型方法接受服务名称作为参数,并返回其实例(如果可用)。

或者,您可以使用 @Client() 装饰器来实例化 ClientGrpc 对象,如下所示:

@Injectable()
export class AppService implements OnModuleInit {
@Client({
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
})
client: ClientGrpc;

private heroesService: HeroesService;

onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService');
}

getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 });
}
}

最后,对于更复杂的场景,我们可以使用 ClientProxyFactory 类注入一个动态配置的客户端, 详情请参阅这里

无论哪种方式,我们最终都获得了对我们的 HeroesService 代理对象的引用, 该对象公开了与 .proto 文件中定义的一组方法相同的方法。 现在,当我们访问此代理对象(即 heroesService)时, gRPC 系统将自动序列化请求,将其转发到远程系统,返回响应并反序列化响应。 由于 gRPC 为我们屏蔽了这些网络通信细节, heroesService 看起来和操作起来就像是一个本地提供者。

请注意,所有服务方法都是小驼峰式的(为了遵循语言的自然约定)。 因此,例如,虽然我们的 .proto 文件 HeroesService 定义包含了 FindOne() 函数, 但 heroesService 实例将提供 findOne() 方法。

interface HeroesService {
findOne(data: { id: number }): Observable<any>;
}

消息处理程序还可以返回 Observable,在这种情况下,结果值将在流完成之前发出。

heroes.controller.ts
@Get()
call(): Observable<any> {
return this.heroesService.findOne({ id: 1 });
}

要发送 gRPC 元数据(与请求一起),可以传递第二个参数,如下所示:

call(): Observable<any> {
const metadata = new Metadata();
metadata.add('Set-Cookie', 'yummy_cookie=choco');

return this.heroesService.findOne({ id: 1 }, metadata);
}
提示

Metadata 类是从 grpc 包中导入的。

请注意,这将要求更新我们前面定义的 HeroesService 接口。

示例

一个可运行的示例可在这里找到。

gRPC 流

gRPC 本身支持长期的实时连接,传统上称为流。 流对于聊天、观测或分块数据传输等情况非常有用。 在官方文档中可以找到更多详细信息。

Nest 支持两种方式的 GRPC 流处理:

  1. RxJS Subject + Observable 处理程序: 可以用于直接在控制器方法中编写响应,或传递给 Subject/Observable 消费者。
  2. 纯 GRPC 调用流处理程序: 可以用于传递给一些执行器,该执行器将处理 Node 标准的双工流(Duplex)处理程序的其余部分。

流式示例

让我们定义一个名为 HelloService 的新示例 gRPC 服务。 hello.proto 文件使用协议缓冲区进行结构化。以下是其内容:

// hello/hello.proto
syntax = "proto3";

package hello;

service HelloService {
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}
备注

LotsOfGreetings 方法可以简单地使用 @GrpcMethod 装饰器实现(如上面的示例),因为返回的流可以发射多个值。

基于此 .proto 文件,让我们定义 HelloService 接口:

interface HelloService {
bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>;
lotsOfGreetings(
upstream: Observable<HelloRequest>,
): Observable<HelloResponse>;
}

interface HelloRequest {
greeting: string;
}

interface HelloResponse {
reply: string;
}
备注

proto 接口可以通过 ts-proto 包自动生成,详情请参阅这里

Subject 策略

@GrpcStreamMethod() 装饰器将函数参数提供为 RxJS Observable。 因此,我们可以接收和处理多个消息。

@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
const subject = new Subject();

const onNext = message => {
console.log(message);
subject.next({
reply: 'Hello, world!'
});
};
const onComplete = () => subject.complete();
messages.subscribe({
next: onNext,
complete: onComplete,
});

return subject.asObservable();
}
注意

为了支持 @GrpcStreamMethod() 装饰器的全双工交互,控制器方法必须返回 RxJS Observable。

提示

MetadataServerUnaryCall 类/接口是从 grpc 包中导入的。

根据服务定义(在 .proto 文件中),BidiHello 方法应将请求流式传输到服务。 要从客户端向流发送多个异步消息,我们利用了 RxJS 的 ReplaySubject 类。

const helloService = this.client.getService<HelloService>('HelloService');
const helloRequest$ = new ReplaySubject<HelloRequest>();

helloRequest$.next({ greeting: 'Hello (1)!' });
helloRequest$.next({ greeting: 'Hello (2)!' });
helloRequest$.complete();

return helloService.bidiHello(helloRequest$);

在上面的示例中,我们向流写入了两条消息(next() 调用), 并通知服务我们已经完成了数据的发送(complete() 调用)。

调用流处理程序

当方法返回值定义为流时,@GrpcStreamCall() 装饰器将函数参数提供为 grpc.ServerDuplexStream, 该对象支持标准方法,如 .on('data', callback).write(message).cancel()。 有关可用方法的完整文档,请参阅这里

或者,当方法返回值不是stream时,@GrpcStreamCall() 装饰器提供两个函数参数, 分别是 grpc.ServerReadableStream更多信息请查看这里) 和callback

让我们从实现应该支持全双工交互的 BidiHello 开始。

@GrpcStreamCall()
bidiHello(requestStream: any) {
requestStream.on('data', message => {
console.log(message);
requestStream.write({
reply: 'Hello, world!'
});
});
}
备注

此装饰器不需要提供任何特定的返回参数。预期将处理流,类似于任何其他标准流类型。

在上面的示例中,我们使用 write() 方法向响应流写入对象。 传递给 .on() 方法的第二个参数的回调将在我们的服务接收到新数据块时每次调用。

让我们实现 LotsOfGreetings 方法。

@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
requestStream.on('data', message => {
console.log(message);
});
requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }));
}

在这里,我们使用callback在处理 requestStream 完成后发送响应。

gRPC 元数据

元数据是关于特定 RPC 调用的信息,以键值对的形式呈现, 其中键是字符串,值通常是字符串,但也可以是二进制数据。 元数据对于 gRPC 本身来说是不透明的 - 它允许客户端提供与调用到服务器和反之亦然相关的信息。 元数据可能包括身份验证令牌、请求标识符、用于监控目的的标签以及数据信息, 例如数据集中的记录数。

要在 @GrpcMethod() 处理程序中读取元数据,请使用第二个参数(metadata), 其类型为 Metadata(从 grpc 包中导入)。

要从处理程序发送回元数据,请使用 ServerUnaryCall#sendMetadata() 方法(第三个处理程序参数)。

heroes.controller.ts
@Controller()
export class HeroesService {
@GrpcMethod()
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const serverMetadata = new Metadata();
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];

serverMetadata.add('Set-Cookie', 'yummy_cookie=choco');
call.sendMetadata(serverMetadata);

return items.find(({ id }) => id === data.id);
}
}

同样,要在使用 @GrpcStreamMethod() 装饰的处理程序(subject 策略) 中读取元数据,请使用第二个参数(metadata), 其类型为 Metadata(从 grpc 包中导入)。

要从处理程序发送回元数据,请使用 ServerDuplexStream#sendMetadata() 方法(第三个处理程序参数)。

要从调用流处理程序 中读取元数据(使用 @GrpcStreamCall() 装饰的处理程序), 请监听 requestStream 引用上的 metadata 事件,如下所示:

requestStream.on('metadata', (metadata: Metadata) => {
const meta = metadata.get('X-Meta');
});