Kafka
Kafka是一个开源的、分布式的流式处理平台,具有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息系统。
- 以容错耐用的方式存储记录流。
- 在记录发生时处理记录流。
Kafka项目旨在提供一个统一、高吞吐量、低延迟的平台,用于处理实时数据流。 它与Apache Storm和Spark等实时流数据分析工具集成非常好。
安装
要开始构建基于Kafka的微服务,首先安装所需的包:
npm i --save kafkajs
概述
与其他Nest微服务传输层实现一样,您可以使用options
对象的transport
属性选择Kafka传输机制,
同时还可以使用可选的options
属性,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
Transport
枚举从@nestjs/microservices
包中导入。
选项
options
属性是特定于所选传输器的。Kafka传输器公开了下面描述的属性。
client
:客户端配置选项(在此处阅读更多)consumer
:消费者配置选项(在此处阅读更多)run
:运行配置选项(在此处阅读更多)subscribe
:订阅配置选项(在此处阅读更多)producer
:生产者配置选项(在此处阅读更多)send
:发送配置选项(在此处阅读更多)producerOnlyMode
:特性标志,跳过消费者组注册,仅充当生产者(boolean
)postfixId
:更改clientId值的后缀(string
)
客户端
在Kafka中与其他微服务传输器相比有一个小差异。我们使用ClientKafka
类而不是ClientProxy
类。
像其他微服务传输器一样,
您有几种方法可以创建ClientKafka
实例。
一种创建实例的方法是使用ClientsModule
。
要使用ClientsModule
创建客户端实例,导入它并使用register()
方法传递一个选项对象,
其中包含与createMicroservice()
方法中显示的相同属性,以及一个用作注入令牌的name
属性。
在此处阅读有关ClientsModule
的更多信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
也可以使用其他创建客户端的选项(ClientProxyFactory
或@Client()
)。
您可以在此处阅读有关它们的信息。
使用@Client()
装饰器如下:
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
})
client: ClientKafka;
消息模式
Kafka微服务消息模式利用两个主题来处理请求和回复通道。
ClientKafka#send()
方法通过将关联的关联ID、
回复主题和回复分区与请求消息相关联,
发送带有返回地址的消息。
这要求在发送消息之前,
必须将ClientKafka
实例订阅到回复主题并分配至少一个分区。
随后,您需要为每个运行的Nest应用程序至少有一个回复主题分区。 例如,如果您运行了4个Nest应用程序,但回复主题只有3个分区,那么在尝试发送消息时, 其中1个Nest应用程序将出错。
当新的ClientKafka
实例启动时,它们加入消费者组并订阅其各自的主题。
此过程触发了分配给消费者组消费者的主题分区的重新平衡。
通常,使用循环分区器来分配主题分区,该分区器将主题分区分配给按消费者名称排序的消费者集合, 这些名称在应用程序启动时随机设置。 然而,当新的消费者加入消费者组时,新的消费者可以被放置在消费者集合中的任何位置。 这会导致在重新平衡时,将现有消费者定位在新的消费者之后, 从而将分配给不同分区的现有消费者失去重新平衡之前发送的请求的响应消息。
为防止ClientKafka
消费者失去响应消息,使用了Nest特定的内置自定义分区器。
此自定义分区器将分区分配给按启动时设置的高分辨率时间戳(process.hrtime()
)排序的消费者集合。