Skip to main content

Kafka

Kafka是一个开源的、分布式的流式处理平台,具有三个关键功能:

  1. 发布和订阅记录流,类似于消息队列或企业消息系统。
  2. 以容错耐用的方式存储记录流。
  3. 在记录发生时处理记录流。

Kafka项目旨在提供一个统一、高吞吐量、低延迟的平台,用于处理实时数据流。 它与Apache Storm和Spark等实时流数据分析工具集成非常好。

安装

要开始构建基于Kafka的微服务,首先安装所需的包:

npm i --save kafkajs

概述

与其他Nest微服务传输层实现一样,您可以使用options对象的transport属性选择Kafka传输机制, 同时还可以使用可选的options属性,如下所示:

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
tip

Transport枚举从@nestjs/microservices包中导入。

选项

options属性是特定于所选传输器的。Kafka传输器公开了下面描述的属性。

  • client:客户端配置选项(在此处阅读更多)
  • consumer:消费者配置选项(在此处阅读更多)
  • run:运行配置选项(在此处阅读更多)
  • subscribe:订阅配置选项(在此处阅读更多)
  • producer:生产者配置选项(在此处阅读更多)
  • send:发送配置选项(在此处阅读更多)
  • producerOnlyMode:特性标志,跳过消费者组注册,仅充当生产者(boolean
  • postfixId:更改clientId值的后缀(string

客户端

在Kafka中与其他微服务传输器相比有一个小差异。我们使用ClientKafka类而不是ClientProxy类。

像其他微服务传输器一样, 您有几种方法可以创建ClientKafka实例。

一种创建实例的方法是使用ClientsModule。 要使用ClientsModule创建客户端实例,导入它并使用register()方法传递一个选项对象, 其中包含与createMicroservice()方法中显示的相同属性,以及一个用作注入令牌的name属性。 在此处阅读有关ClientsModule的更多信息。

@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})

也可以使用其他创建客户端的选项(ClientProxyFactory@Client())。 您可以在此处阅读有关它们的信息。

使用@Client()装饰器如下:

@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
})
client: ClientKafka;

消息模式

Kafka微服务消息模式利用两个主题来处理请求和回复通道。 ClientKafka#send()方法通过将关联的关联ID、 回复主题和回复分区与请求消息相关联, 发送带有返回地址的消息。 这要求在发送消息之前, 必须将ClientKafka实例订阅到回复主题并分配至少一个分区。

随后,您需要为每个运行的Nest应用程序至少有一个回复主题分区。 例如,如果您运行了4个Nest应用程序,但回复主题只有3个分区,那么在尝试发送消息时, 其中1个Nest应用程序将出错。

当新的ClientKafka实例启动时,它们加入消费者组并订阅其各自的主题。 此过程触发了分配给消费者组消费者的主题分区的重新平衡。

通常,使用循环分区器来分配主题分区,该分区器将主题分区分配给按消费者名称排序的消费者集合, 这些名称在应用程序启动时随机设置。 然而,当新的消费者加入消费者组时,新的消费者可以被放置在消费者集合中的任何位置。 这会导致在重新平衡时,将现有消费者定位在新的消费者之后, 从而将分配给不同分区的现有消费者失去重新平衡之前发送的请求的响应消息。

为防止ClientKafka消费者失去响应消息,使用了Nest特定的内置自定义分区器。 此自定义分区器将分区分配给按启动时设置的高分辨率时间戳(process.hrtime())排序的消费者集合。

消息响应订阅

note

此部分仅在使用请求-响应 消息样式(使用@MessagePattern装饰器和ClientKafka#send方法)时相关。 对响应主题进行订阅对于基于事件 的通信(@EventPattern装饰器和ClientKafka#emit方法)不是必要的。

ClientKafka类提供了subscribeToResponseOf()方法。 subscribeToResponseOf()方法以请求的主题名称作为参数,将派生的回复主题名称添加到回复主题的集合中。 当实现消息模式时,需要使用此方法。

heroes.controller.ts
onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
}

如果ClientKafka实例是异步创建的, 则必须在调用connect()方法之前调用subscribeToResponseOf()方法。

heroes.controller.ts
async onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
await this.client.connect();
}

传入消息

Nest将Kafka传入消息解析为具有keyvalueheaders属性的对象, 这些属性的值为Buffer类型。然后,Nest通过将缓冲区转换为字符串来解析这些值。 如果字符串是"类似对象",Nest将尝试将字符串解析为JSON。然后将value传递给其关联的处理程序。

传出消息

在发布事件或发送消息时,Nest会在序列化过程之后发送Kafka传出消息。 这发生在传递给ClientKafka#emit()send()方法的参数上, 或者在从@MessagePattern方法返回的值上。 这个序列化过程通过使用JSON.stringify()toString() 原型方法对非字符串或缓冲区的对象进行字符串化来实现。

heroes.controller.ts
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return items;
}
}
tip

@Payload()@nestjs/microservices包中导入。

传出消息还可以通过传递带有keyvalue属性的对象来进行键控。 为了满足协同分区的要求, 对消息进行键控是重要的。

heroes.controller.ts
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest';
const heroId = message.heroId;
const dragonId = message.dragonId;

const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];

return {
headers: {
realm
},
key: heroId,
value: items
}
}
}

此格式传递的消息还可以包含在headers哈希属性中设置的自定义标头。 headers哈希属性的值必须是stringBuffer类型。

heroes.controller.ts
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest';
const heroId = message.heroId;
const dragonId = message.dragonId;

const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];

return {
headers: {
kafka_nestRealm: realm
},
key: heroId,
value: items
}
}
}

基于事件

虽然请求-响应方法在服务之间交换消息时是理想的, 但在消息样式是基于事件的情况下(这在Kafka中是理想的), 即使不等待响应,也可以发布事件时它并不适用于请求-响应的开销。 在这种情况下,您不希望为维护两个主题所需的请求-响应引入开销。

请查看这两个部分,了解更多信息: 概述:基于事件概述:发布事件

上下文

在更复杂的场景中,您可能希望访问有关传入请求的更多信息。 当使用Kafka传输器时,可以访问KafkaContext对象。

// heroes.controller.ts
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
console.log(`Topic: ${context.getTopic()}`);
}
tip

@Payload()@Ctx()KafkaContext@nestjs/microservices包中导入。

要访问原始的Kafka传入消息对象,请使用KafkaContext对象的getMessage()方法,如下所示:

heroes.controller.ts
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}

其中IncomingMessage接口具有以下特性:

interface IncomingMessage {
topic: string;
partition: number;
timestamp: string;
size: number;
attributes: number;
offset: string;
key: any;
value: any;
headers: Record<string, any>;
}

如果处理每个接收到的消息的时间较长,您应该考虑使用心跳回调。 要检索心跳函数,请使用KafkaContextgetHeartbeat()方法, 如下所示:

heroes.controller.ts
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const heartbeat = context.getHeartbeat();

// 执行一些慢处理
await doWorkPart1();

// 发送心跳以不超过会话超时
await heartbeat();

// 再次执行一些慢处理
await doWorkPart2();
}

命名约定

Kafka微服务组件在client.clientIdconsumer.groupId选项上附加各自角色的描述, 以防止Nest微服务客户端和服务器组件之间的冲突。 默认情况下,ClientKafka组件在这两个选项上附加-client, 而ServerKafka组件在这两个选项上附加-server。 请注意下面提供的值如何以这种方式进行转换(如注释中所示)。

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-server
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-server
},
}
});

对于客户端:

heroes.controller.ts
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-client
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-client
}
}
})
client: ClientKafka;
note

可以通过在自定义提供程序中扩展ClientKafkaKafkaServer并重写构造函数来自定义Kafka客户端和消费者的命名约定。

由于Kafka微服务消息模式利用两个主题来处理请求和回复通道,应从请求主题派生出回复模式。 默认情况下,回复主题的名称是请求主题名称与.reply附加在一起的组合。

heroes.controller.ts
onModuleInit() {
this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}
note

可以通过扩展ClientKafka并重写getResponsePatternName方法来自定义Kafka回复主题的命名约定。

可重试的异常

与其他传输器类似,所有未处理的异常都会自动包装成RpcException并转换为“用户友好”的格式。 但是,在某些情况下,您可能希望绕过此机制,让异常被kafkajs驱动程序消耗。 在处理消息时引发异常会指示kafkajs重试它(重新传递它), 这意味着即使触发了消息(或事件)处理程序,偏移量也不会提交到Kafka。

warning

对于事件处理程序(基于事件的通信),默认情况下,所有未处理的异常都被视为可重试异常

为此,可以使用一个专用的类,称为KafkaRetriableException,如下所示:

throw new KafkaRetriableException('...');
tip

KafkaRetriableException类从@nestjs/microservices包中导入。

提交偏移量

在使用Kafka时,提交偏移量是至关重要的。默认情况下,消息将在特定时间后自动提交。 有关更多信息,请参阅KafkaJS文档ClientKafka提供了手动提交偏移量的方法,其工作方式类似于 原生的KafkaJS实现

@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
// 业务逻辑

const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await this.client.commitOffsets([{ topic, partition, offset }]);
}

要禁用消息的自动提交,请在run配置中设置autoCommit: false

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
autoCommit: false
}
}
});