03.kafka原理
# 01.kafka原理
# 1、kafka特性
# 1、架构图
- broker:Kafka服务器,负责消息存储和转发
- topic:消息类别,Kafka按照topic来分类消息
- partition:topic的分区,一个topic可以包含多个partition,topic消息保存在各个partition上
- offset:消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表该消息的唯一序号
- Producer:消息生产者
- Consumer:消息消费者
- Consumer Group:消费者分组,每个Consumer必须属于一个group
- Zookeeper:
- 保存着集群broker、topic、partition等meta数据
- 还负责broker故障发现,partition leader选举,负载均衡等功能
# 2、生产消费流程
1)生产消息
生产者创建消息,该消息包含一个key和一个value
生产者将消息发送到Topic
- 如果指定了Partition,消息将被发送到指定的Partition
- 如果没有指定,Kafka会根据key的Hash值或者round-robin策略选择一个Partition
消息到达Broker后,首先会被
写入到Leader副本
的本地logFollower副本从Leader副本中拉取消息,写入自己的本地log
当所有的ISR副本都成功写入消息后,消息可以被消费
2)消费消息
消费者订阅一个或多个Topic,
从ZooKeeper
获取每个Topic的Leader副本信息
消费者向Leader副本发送fetch请求,拉取消息
Leader副本返回已提交的消息给消费者,消费者处理消息
在处理完消息后,消费者会更新自己的offset,表示已经成功消费了哪些消息
3)ZooKeeper作用
在整个过程中,ZooKeeper主要负责管理和协调Broker和消费者
当Broker或者消费者的状态发生变化时(例如Broker宕机,新的消费者加入等)
ZooKeeper会通知相关的Broker和消费者,使它们能够及时地响应这些变化
同时,ZooKeeper还负责存储每个消费者消费的offset
以便在消费者重启时,能够从上次消费的位置继续消费
# 3、路由机制
消费者启动后会发送请求给任意 Broker,Broker从zookeeper获取集群元数据返回给消费者
这样生产者和消费者就可以控制消息被推送到哪些partition
实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法
kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送请求
Batch的数量大小可以通过Producer的参数控制(
如500条、100ms、64KB
)
# 2、kafka存储设计
# 1、Topic(主题)
- 在Kafka中,
主题(Topic)
会被划分为一个或多个分区(Partition)
每个分区在物理上对应一个文件夹
,这个文件夹中包含了多个数据文件(.log文件)和索引文件(.index文件)
.log文件
是存储实际消息数据
的文件,每个.log文件的大小是可以配置的,默认情况下为1GB- 当.log文件达到最大大小时,Kafka会创建一个新的.log文件来存储后续的消息,这样就形成了一个按照时间顺序排列的.log文件序列
- 每个
.log文件的文件名
是由该文件中第一条消息的偏移量(offset)组成的
.index文件
是对应.log文件的索引文件
,用于快速定位消息在.log文件中的位置
- .index文件的文件名和对应的.log文件相同,只是扩展名不同
- 除了.log和.index文件,每个分区的文件夹中还可能包含一个.timeindex文件和一个.txnindex文件
- .timeindex文件是用于根据时间戳来查找消息的索引文件,.txnindex文件则是用于事务索引的
- Kafka的每个分区都由多个.log文件和相应的索引文件组成,这些文件共同提供了消息的存储和索引功能
# 2、partition(分区)
- partition以文件形式存储在文件系统,目录命名规则:
<topic_name>-<partition_id>
- 例如,名为test的topic,其有3个partition,则Kafka数据目录中有3个目录
- test-0, test-1, test-2,分别存储相应partition的数据
.log
文件是存储实际的消息数据
.index
文件是存储对应的偏移量和物理位置
,用于快速定位消息在.log文件中的位置
.timeindex
文件则是存储消息的时间戳和对应的偏移量
,用于按照时间戳查找消息
/test-0/
00000000000000000000.index # .index`文件是存储`对应的偏移量和物理位置
00000000000000000000.log # .log文件是存储实际的消息数据
00000000000000000000.timeindex
/test-1/
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
/test-2/
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
2
3
4
5
6
7
8
9
10
11
12
# 3、Segment(分割文件)
- 每个Partition的数据并不是存储在一个大的文件中,而是
被分割成多个较小的文件
- 每个
这样的文件就被称为一个Segment
Kafka中,数据被存储在
Topic的Partition中
每个Partition对应一个或
多个数据文件
(这就是Segment)每个Segment数据文件中包含了
多条消息(Message)
(每条消息包含三个属性)1、Offset
- 这是每条消息在Partition中的
唯一标识
,可以看作是消息的序号
- 它是
逻辑上的一个值
,表示这条消息是Partition中的第几条消息
- Offset是连续的,
从0开始
,每写入一条消息,Offset就会增加1 - 消费者在读取消息时,也是根据Offset来读取的,所以Offset的作用就像是消息的ID一样
- 这是每条消息在Partition中的
2、MessageSize:
- 表示消息内容data的大小,单位是字节
3、Data:
- 这是消息的具体内容,也就是要发送的数据
# Partition的数据文件中,每个 segment 存储数据内容就像下面这样
Offset: 0, MessageSize: 5, Data: "Hello"
Offset: 1, MessageSize: 3, Data: "Tom"
Offset: 2, MessageSize: 2, Data: "NB"
2
3
4
# 4、kafka索引原理
Kafka通过将部分消息的偏移量和对应的物理位置存储在索引文件中
实现了快速定位数据的能力,从而提高了数据读取的性能
Kafka数据存储在磁盘文件中,
每个文件都对应一个索引文件
(用于快速定位数据)Kafka数据文件
(.log)是顺序写入
的,每条消息
位置都由一个偏移量(offset)标识
索引文件
(.index)
是存储了部分消息的偏移量
和对应在数据文件中的物理位置Kafka的索引文件是
每隔一段偏移量
(比如每隔4096个字节)才中添加一条记录
查询时,Kafka先在
索引文件
中通过二分查找定位到对应位置
,然后在数据文件中顺序读取数据
- 磁盘顺序读取的性能远高于随机读取,所以Kafka在处理大量数据时能保持高性能
eg: 从下面例子更深入理解索引原理
- 假设我们有一个Kafka Topic,它有一个分区
- 这个分区有三个Segment,每个Segment包含了1000条消息
- 假设每隔100条消息存储一个索引
- 那么第
三个Segment的.index文件可能看起来像这样
# Offset是消息的偏移量,Position是消息在.log文件中的物理位置
# 注意这里的Position是假设的,实际的Position取决于每条消息的大小
Offset: 2000, Position: 0
Offset: 2100, Position: 10000 # 第2100条消息在第三个Segment的.log文件中的第 10000 个字节的位置
Offset: 2200, Position: 20000
Offset: 2300, Position: 30000
Offset: 2400, Position: 40000
Offset: 2500, Position: 50000
Offset: 2600, Position: 60000
Offset: 2700, Position: 70000
Offset: 2800, Position: 80000
Offset: 2900, Position: 90000
2
3
4
5
6
7
8
9
10
11
12
现在,我们想要查找offset为2500的消息
- 第三个Segment的起始offset为2000,结束offset为2999,所以offset为2500的消息应该在第三个Segment
- 打开第三个Segment对应的.index文件,然后使用二分查找法在.index文件中查找offset为2500的条目
- 假设我们的.index文件每隔100条消息存储一个索引,那么Kafka可能会找到offset为2400的索引
- 然后,Kafka会根据这个索引条目中存储的物理位置,定位到.log文件中offset为2400的消息
- 由于.log文件中的消息是按照offset连续存储的,所以Kafka只需要顺序读取100条消息,就可以找到offset为2500的消息
# 3、生产者&消费者
# 1、生产者设计
- 1)负载均衡
- 由于消息topic由多个partition组成,且partition会均衡分布到不同broker上
- producer可以通过随机或者hash等方式,将消息平均发送到多个partition上,以实现负载均衡
- 2)批量发送
- Producer端可以合并多条消息,一次发送批量消息给broker(减少broker存储消息的IO操作次数)
- 但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量
# 2、消费者设计
- 任何Consumer必须属于一个Consumer Group
- 同一Consumer Group中的多个Consumer实例,不同时消费同一个partition,等效于队列模式
Consumer Group 1
的三个Consumer实例分别消费不同的partition
的消息
- 不同Consumer Group的Consumer实例可以同时消费同一个partition,等效于发布订阅模式
- Consumer1和Consumer4,同时消费TopicA-part0的消息(不同Group)
- partition内消息是有序的,Kafka不删除已消费的消息
# 3、队列模式
队列模式,指Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息
1)保证不重复消费
一个partition的数据只会被一个消费者消费
但是一个消费者可以消费多个 partition
对消息分配以partition为单位,而非以条消息作为分配单元
2)Consumer比 partition 多是没有意义的·
- 如果Consumer的数量与partition数量相同,则正好一个Consumer消费一个partition的数据
- 而如果Consumer的数量多于partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息
设计的优势是:
- 每个Consumer不用都跟大量的broker通信,减少通信开销
- 同时也降低了分配难度,实现也更简单
- 可以保证
每个partition里的数据
可以被Consumer有序消费
设计的劣势是:
- 无法保证同一个Consumer Group里的Consumer均匀消费数据
- 且在Consumer实例多于partition个数时导致有些Consumer会饿死
Consumer Rebalance(重平衡)
指发生在消费者的数量变化
(增减消费者)时,进行partition重新分配
假设Topic它有4个Partition(P0,P1,P2,P3),有一个消费者组,开始只有一个消费者C1
初始状态:所有的Partition都分配给C1,所以C1消费P0,P1,P2和P3
现在假设有一个新的消费者C2加入到消费者组中,此时Kafka会触发一次Rebalance
在Rebalance过程中,首先,Kafka会停止所有的消费者的数据读取,然后重新计算Partition到消费者的分配
在这个例子中,Rebalance的结果可能是:C1消费P0和P1,C2消费P2和P3
最后,消费者会根据新的分配结果,开始消费新分配到的Partition
注:
- Rebalance过程中会停止数据的读取,这可能会导致消费的延迟
- 因此,虽然Rebalance可以保证Partition的均匀分配,但是频繁的Rebalance可能会对性能产生影响
# 4、找Partition Leader
生产者和消费者如何确定 Partition Leader 在那个 Broker
Kafka中,生产和消费消息时,并不是随便选择一个Broker发送请求
而是通过一种叫做元数据请求的方式来找到自己需要的Partition Leader
下面以消费者为例说明(生产者也是这个流程)
1)Producer发送元数据请求
- 消费者启动后,会向Kafka集群发送元数据请求(Metadata Request)给任意Broker
2)Broker 返回元数据响应
- Broker收到元数据请求后,会返回元数据响应(Metadata Response)
- 响应中包含了集群中所有Topic和Partition的信息,包括每个Partition的Leader是哪个Broker
3)Producer找到Partition Leader
- 消费者收到元数据响应后,就知道了每个Partition的Leader在哪个Broker
- 然后就可以直接向这个Broker发送数据拉取请求(Fetch Request)
4)Partition Leaer改变
- 如果在消费过程中,某个Partition的Leader发生了变化
- 消费者在发送Fetch Request时会收到一个Leader Not Available的错误
- 此时消费者会再次发送Metadata Request来更新元数据
# 4、Replication 副本
# 1、副本概述
Replication解释
在Kafka中,每个Topic的数据被分割成多个Partition
每个
Partition都可以在多个Broker上进行复制
,这就是Kafka的Replication(副本)设计
副本分为两种:Leader副本和Follower副本
Leader副本
负责处理所有的读写请求
,而Follower副本
则从Leader副本那里复制数据
- 如果Leader副本发生故障,就会从Follower副本中选举出新的Leader
但是一般消费者读写都只会使用 Leader 副本,而Followers仅提供容错性和可靠性
- 一致性:副本中可能没有同步到 Leader 的最新数据
- 负载均衡:一个topic被分成多个partition,每个partition分布在不同broker中可以负载均衡
Replication举例说明
当一个Producer向一个Topic的Partition写入消息时
这个消息首先会被发送到该Partition的Leader副本所在的Broker
Leader副本接收到消息后,将消息写入到本地的日志
Follower副本会从Leader副本那里拉取数据,将消息复制到自己的日志
当所有的Follower副本都成功复制了消息,这个消息就被认为是"已提交"(committed)的
只有已提交的消息才能被Consumer消费
如果Leader副本发生故障,Kafka会从ISRs(副本)中选举出新的Leader
In-Sync-Replica(简称ISR)
在Kafka中,ISR是指那些与Leader副本保持同步的Follower副本
ISR是Kafka保证数据可靠性的一个重要机制
只有当消息被所有ISR中的副本都写入后,这条消息才被认为是"已提交"的(才可以被消费)
例:
Topic一个Partition有3个副本,R1是Leader副本,R2和R3是Follower副本
在开始时,R1,R2和R3都是同步的,所以ISR={R1, R2, R3}。
假设R3出现了问题,不能及时从R1那里拉取数据
此时,R3就会被从ISR中移除,ISR变为{R1, R2}
当有新消息发送时,{R1, R2}写入就算是"已提交”
注:Kafka的Zookeeper维护了每个partition的ISR信息
# 2、副本复制
1)保证消息写入副本
消息所在partition的ISR replicas会定时异步从leader上批量复制数据log
当所有ISR replica都返回ack,告诉leader该消息已经写log成功后
leader认为该消息committed,并告诉Producer生产成功
- 因为leader有超时机制,leader等ISR的follower复制数据,如果一定时间不返回ack
- 可能数据复制进度落后太多,则leader将该follower replica从ISR中剔除
消息committed之后,Consumer才能消费到
2)副本数据同步原理(不是绝对可靠的不丢失数据)
ISR机制下的数据复制,既不是完全的同步复制,也不是单纯的异步复制,这是Kafka高吞吐很重要的机制
同步复制要求所有能工作的follower都复制完,这条消息才会被认为committed,这种复制方式极大的影响了吞吐量
而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经committed
这种情况下如果follower都复制完都落后于leader,而如果leader突然宕机,则会丢失数据
而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐量
follower可以批量的从leader复制数据,数据复制到内存即返回ack
这样极大的提高复制性能,当然数据仍然是有丢失风险的
Kafka本身定位于高性能的MQ,更多注重消息吞吐量
在此基础上结合ISR的机制去尽量保证消息的可靠性,但不是绝对可靠的
# 3、数据可靠性
1)副本复制(Replication)
- Kafka的每个Topic可以被分为多个分区,每个分区可以有多个副本
- 副本中的Leader负责处理所有客户端请求(包括读和写),Follower副本则从Leader副本复制数据
- 如果Leader副本出现问题,就会从Follower副本中选举出新的Leader,以此来保证数据的可用性
2)In-Sync Replicas (ISR)
- ISR是一组和leader副本保持同步的follower副本
- 只有当消息被所有ISR中的副本都写入后,这条消息才可以被消费
- 通过ISR,Kafka可以在副本出现问题时,仍然保证数据的可靠性
3)Acknowledgements(消息确认)
- 生产者在写入消息时,可以等待Broker的确认
- 生产者可以设置等待所有ISR中的副本都确认后再返回,这样可以进一步保证数据的可靠性
4)日志持久化
- Kafka将所有的消息都保存在日志中,并且可以通过配置将日志持久化到磁盘
- 即使在系统故障的情况下,只要磁盘数据没有损坏,消息就不会丢失
5)消息压缩
- Kafka支持消息的压缩,可以有效地减少网络传输和磁盘存储的开销,提高系统的整体性能
# 5、高可用原理
# 1、partition高可用
leader选过程
Kafka的leader选举为了在leader副本发生故障时,选出一个新的leader副本
当某个broker副本被认为无法与zookeeper保持心跳连接时,zookeeper会将该副本从ISR列表中移除
leader选
- 如果被移除的副本是当前的leader副本,那么zookeeper会触发一次leader选举过程
- 在leader选举过程中,zookeeper会从剩余的ISR列表中选择一个新的leader
- 选择的策略默认是选择副本ID最小的那个
- 也可以通过配置kafka的参数(unclean.leader.election.enable)来改变选举策略
选举出新的leader后,zookeeper会更新元数据信息,并通知所有的broker和consumer新的leader信息
之后,所有的读写请求都会被转发到新的leader副本
如果所有replica都宕机了,有两种方式恢复服务
- 1)等ISR任一节点恢复,并选举为leader(不容易丢失消息)
- 2)选择第一个恢复的节点(不一定是ISR中的节点)为leader(容易丢失消息)
- 建议非ISR列表中的副本不要参与 leader选举
- 如果允许非ISR列表中的副本成为leader(即unclean.leader.election.enable=true),可能会导致数据丢失
- 因为非ISR列表中的副本可能没有最新的数据
- 在生产环境中,一般推荐将unclean.leader.election.enable设置为false,以保证数据的可靠性
# 2、broker高可用
broker集群信息由Zookeeper维护,并选
举出一个controller
所有partition的leader选举都由controller决定
controller也负责增删topic以及partition replica的重新分配
controller在Zookeeper上注册watch,一旦有broker宕机,其对应在Zookeeper的临时节点自动被删除
controller对宕机broker上的所有partition重新分配新leader
如果controller宕机,其他broker通过Zookeeper选举出新的controller
然后同样对宕机broker上的所有partition重新分配新leader
# 6、kafka 可靠性
# 1、消息投递可靠性
消息确认
指 producer要求leader partition 收到确认的副本个数
1)acks 配置
如果acks设置数量为0
- 表示producer不会等待broker的响应,acks值为0会得到最大的系统吞吐量
- 但是producer无法知道消息是否发送成功,这样有可能会导致数据丢失
若acks设置为(
request.required.acks = 1
)- 表示producer会在leader partition收到消息时得到broker的一个确认
若设置为-1
- producer会在所有备份的partition收到消息时得到broker的确认
- 这个设置可以得到最高的可靠性保证
- producer会在所有备份的partition收到消息时得到broker的确认
2)事务性消息
- 从Kafka 0.11版本开始,Kafka引入了事务性消息的特性,可以实现Exactly-Once语义
- 生产者可以通过事务机制来发送事务性消息
- 消费者可以通过读取已提交的位移和使用幂等性来实现Exactly-Once语义
# 2、重复消费
1)唯一标识符(Message Key)
- 在生产者发送消息时,可以为每条消息设置一个唯一标识符,通常是一个字符串或者数字
- Kafka保证相同Key的消息会被写入到同一个Partition中
- 这样消费者在消费消息时,可以根据Key来判断是否已经处理过该消息,避免重复消费
2)消费者位移(Consumer Offset)
- Kafka会为每个消费者维护一个位移(Offset),表示消费者已经消费的消息位置
- 消费者可以定期将位移提交给Kafka,Kafka会将位移存储在内部的位移管理器中
- 当消费者重启或者发生故障时,可以根据位移的信息来恢复消费进度
3)消费者组(Consumer Group)
- 多个消费者可以组成一个消费者组,共同消费一个或多个Topic的消息
- Kafka会确保同一个消费者组内的消费者不会重复消费同一条消息
- 每个消息只会被消费者组中的一个消费者消费,这样可以实现负载均衡和水平扩展
4)幂等性和Exactly-Once语义
- Kafka从0.11版本开始引入了幂等性和Exactly-Once语义的特性
- 生产者可以通过设置幂等性参数,确保相同Key的消息只会被写入一次
- 即使发生重试或者重发也不会导致消息的重复
- At most once:消息可能会丢失,但不会重复传输
- At least once:消息不会丢失,但可能重复传输
- Exactly once:消息保证会被传输一次且仅传输一次
出现重复消费场景举例
1)消费者提交位移失败
- 当消费者在消费消息后,未能成功提交位移给Kafka时
2)消费者组重平衡
- 当消费者组发生重平衡时,Kafka会重新分配分区给消费者
- 在重平衡期间,消费者可能会同时消费属于不同分区的消息,从而导致重复消费
3)消费者异常重启
- 当消费者发生异常重启时,可能会导致消费者重新从上次提交的位移处开始消费
4)消费者处理逻辑错误
- 如果消费者在处理消息的过程中发生错误,比如在处理完消息后未能正确提交位移
- 或者消息处理过程中发生异常导致消费者重启,都可能导致消息的重复消费
5)生产者消息重试
- 当生产者发送消息时发生错误,生产者可能会进行消息的重试
- 如果在重试过程中,消息已经被成功写入Kafka
- 但由于生产者未能正确处理重试的逻辑,可能会导致消息的重复写入和消费
# 3、Exactly-Once
- Exactly once:消息保证会被传输一次且仅传输一次
- Exactly-Once语义在Kafka中是通过两个主要机制实现的
- Producer端的幂等性(重复发送消息结果不变)
- 事务性消息处理(实质实现了原子性,要么全部成功要么全部失败)
1)Producer端的幂等性
- Kafka为每个Producer分配了一个唯一的ID,每个Producer发送的消息都会携带这个ID和一个序列号
- Kafka的broker会跟踪这个ID和序列号,以防止消息的重复发送
- 例如,当Producer因网络问题而未收到ack确认而决定重发消息,broker会检查该消息的Producer ID和序列号
- 如果发现已经处理过,则不会再次处理,从而避免了消息的重复
2)事务性消息处理
- Kafka引入了事务的概念,允许Producer将一系列的消息写入和消费者偏移量的提交包含在一个事务中
- 要么这个事务中的所有操作都成功,要么都失败
- 实现Exactly-Once的具体流程如下
Producer初始化一个事务,发送"BeginTransaction"请求给Kafka broker
Producer发送一系列消息到Kafka broker
Producer发送"EndTransaction"请求,标记事务结束
这个请求将消息和消费者偏移量的提交包含在同一个事务中
Kafka broker在收到"EndTransaction"请求后,将事务中的所有消息写入到日志,并更新消费者的偏移量
Consumer在消费消息时,只消费已提交的事务中的消息
,通过读取已提交的位移来确定消费的起始位置
这样,Kafka就实现了Exactly-Once语义
- 即使在网络问题或者Consumer/Producer崩溃的情况下,每条消息也只会被处理一次
- 需要注意的是,这种机制需要Producer和Consumer都支持事务,并且消息处理逻辑需要是幂等的
# 4、事务原理
- Kafka的事务支持确保了Producer发送到多个分区的消息以及Consumer消费的位移的原子性
1)初始化事务
- Producer调用
beginTransaction()
方法开始一个新的事务 - 此时,Kafka会为这个Producer生成一个全局唯一的事务ID
- Producer调用
2)发送消息
- 在事务中,Producer可以发送任意多条消息到一个或多个Topic的分区
- 这些消息在初始时被标记为"未决"状态,意味着它们对Consumer来说是不可见的
3)位移提交
- 在消费消息的过程中,Consumer可以将消费的位移提交到事务中
- 这样可以
确保位移的提交和消息的发送在同一个事务中,要么都成功,要么都失败
4)提交或中止事务
- Producer可以选择提交事务或者中止事务
- 如果事务被提交,那么事务中的所有消息会变为"已提交"状态,对Consumer可见(消费的位移也会被提交)
- 如果事务被中止,那么事务中的所有消息会被丢弃,消费的位移也不会被提交
5)消费事务消息
- 对于消费者来说,只有已提交的事务中的消息才是可见的
- 消费者会跳过未决的消息,只消费已提交的消息
- 同时,消费者会读取已提交的位移,从而确保消费的连续性和一致性
注:
- Kafka的事务需要Kafka版本0.11及以上,且需要Producer和Consumer都支持事务
- 同时,为了保证Exactly-Once语义,应用程序需要保证处理消息的逻辑是幂等的
# 7、ZooKeeper
ZooKeeper 是一个分布式、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 开源实现
分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有
- Broker 注册、Topic 注册
- Producer 和 Consumer 负载均衡
- 维护 Partition 与 Consumer 的关系
- 记录消息消费的进度以及 Consumer 注册
# 1、Broker注册到 ZK
ZooKeeper 是一个共享配置中心,我们可以将一些信息存放入其中,比如 Broker 信息,本质上就是存放一个文件目录
这个配置中心是共享的,分布式系统的各个节点都可以从配置中心访问到相关信息
同时,ZooKeeper 还具有 Watch 机制,一旦注册信息发生变化,比如某个注册的 Broker 下线
ZooKeeper 就会删除与之相关的注册信息,其它节点可以通过 Watch 机制监听到这一变化,进而做出响应
Broker 注册,也就是 Kafka 节点注册,本质上就是在 ZooKeeper 中创建一个专属的目录(又称为节点),其路径为
/brokers
在 Broker 启动的同时,需要到配置中心(ZooKeeper)注册,而
broker.id
作为唯一标识根据它便可在 ZooKeeper 中创建专属节点(目录),其路径为
/brokers/ids/{broker.id}
在专属节点创建好后,Kafka 会将该 Broker 相关的信息存入其中,包括
broker.name
、端口号Broker 在 ZooKeeper 中注册的节点是“临时节点”,一旦 Broker 故障下线,ZooKeeper 就会将该节点删除
同时,可以基于 Watch 机制监听到这一删除事件,进而做出响应(如负载均衡)
# 2、Topic注册到 ZK
在 Kafka 中,所有 Topic 与 Broker 的对应关系都由 ZooKeeper 来维护
在 ZooKeeper 中,通过建立专属的节点来存储这些信息,其路径为
/brokers/topics/{topic_name}
下图中 TopicA 被分为两个Partition(Partition1 和 Partition2)
- Partition1有一个Leader 副本在 broker2 中
- 其余两个副本分别在:broker1 和 broker4
当 Producer Push 的消息写入 Partition(分区)时,作为 Leader 的 Broker(Kafka 节点)会将消息写入自己的分区
同时还会将此消息复制到各个 Follower,实现同步
如果某个 Follower 挂掉,Leader 会再找一个替代并同步消息
如果 Leader 挂了,Follower 们会选举出一个新的 Leader 替代,继续业务,这些都是由 ZooKeeper 完成的
# 3、Consumer Group注册
与 Broker、Topic 注册类似,Consumer Group 注册本质上也是在 ZooKeeper 中创建专属的节点,以记录相关信息
其路径为
/consumers/{group_id}
的一个目录既然是目录,在记录信息时,就可以根据信息的不同,进一步创建子目录(子节点),分别记录不同类别的信息
对于 Consumer Group 而言,有三类信息需要记录,因此,
/consumers/{group_id}
下还有三个子目录ids
:Consumer Group 中有多个 Consumer,ids 用于记录这些 Consumerowners
:记录该 Consumer Group 可消费的 Topic 信息offsets
:记录 owners 中每个 Topic 的所有 Partition 的 Offset
- Consumer 注册
- 原理同 Consumer Group 注册,不过需要注意的是,其节点路径比较特殊
- 需在路径
/ consumers/{group_id}/ids
下创建专属子节点,它是临时的 - 比如,某 Consumer 的临时节点路径为
/ consumers/{group_id}/ids/my_consumer_for_test-1223234-fdfv1233df23
。
- 负载均衡
- 我们知道,对于一条消息,订阅了它的 Consumer Group 中只能有一个 Consumer 消费它
- 那么就存在一个问题:一个 Consumer Group 中有多个 Consumer,如何让它们尽可能均匀地消费订阅的消息呢(也就是负载均衡)
- 这里不讨论实现细节,但要实现负载均衡,实时获取 Consumer 的数量显然是必要的
- 通过 Watch 机制监听
/ consumers/{group_id}/ids
下子节点的事件便可实现
# 4、Producers 负载均衡
为了负载均衡和避免连锁反应,Kafka 中,同一个 Topic 的 Partition 会尽量分散到不同的 Broker 上
而 Producers 则根据指定的 Topic 将消息 Push 到相应的 Partition
那么,如何将消息均衡地 Push 到各个 Partition 呢?这便是 Producers 负载均衡的问题
Producers 启动后同样也要进行注册(依然是创建一个专属的临时节点)
为了负载均衡,Producers 会通过 Watcher 机制监听 Brokers 注册节点的变化
一旦 Brokers 发生变化,如增加、减少,Producers 可以收到通知并更新自己记录的 Broker 列表
此外,基于 ZooKeeper 提供的 Watcher 机制,还可以监听其它在 ZooKeeper 上注册的节点,如 Topic、Consumer 等
Producer 向 Kafka 集群 Push 消息的时候,必须指定 Topic,不过,Partition 却是非必要的
事实上,目前高级的客户端已经不提供指定 Partition 的接口
虽然不提供,但并不代表无须指定 Partition,只是隐藏了细节
通常有两种方式用于指定 Partition
低级接口
- 在指定 Topic 的同时,需指定 Partition 编号(0、1、2……N),消息数据将被 Push 到指定的 Partition 中
- 从负载均衡的角度看,这并不是一种友好的方式
高级接口
- 不支持指定 Partition,隐藏相关细节,内部则采用轮询、对传入 Key 进行 Hash 等策略将消息数据均衡地发送到各个 Partition
- 此外,有一些 Kafka 客户端还支持自定义负载均衡策略。
# 5、Consumer 负载均衡
借助 ZooKeeper 实现负载均衡
在 Consumer 消费消息时,高级别 API 只需指定 Topic 即可,隐藏了负载均衡策略
而低级别的 API 通常需要同时指定 Topic 和 Partition,需要自行实现负载均衡策略
高级别 API 的负载均衡策略需借助 ZooKeeper 实现,具体原理如下
1、对任意一个 Topic 中所有的 Partirtion 进行排序,用数组 PT 记录;
2、某一 Consumer Group 订阅了上述 Topic,对它的所有 Consumer 排序,用数组 CG 记录,第 i 个 Consumer 记为
CG[i]
3、比例系数为
F=size(PT)/size(CG)
,向上取整;4、解除
CG[i]
对原来分配的 Partition 的消费权(i 从 0 开始,最大值为size(CG)-1
);5、将第
i*F
到(i+1)*F-1
个 Partition 分配给 CG[i]。
# 6、记录消费进度 Offset
- Kafka 中,Consumer 采用 Pull 模式消费相应 Partition 中的消息,是一种异步消费模式
- 为了避免因 Consumer 故障、重启、Rebalance 等原因造成重复消费、遗漏消费消息
- 需要记录 Consumer 对 Partition 中消息的消费进度,即偏移量 Offset
- Offset 在 ZooKeeper 中,有一个专属的节点(目录)用于记录 Offset,其路径样式如下
#节点内容就是Offset的值。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
2
- 需要说明的是,在 Kafka 的最新版本 Kafka 2.0 中,Offset 信息不再记录于 ZooKeeper
- 而是保存于 Kafka 的 Topic 中,路径如下
__consumer_offsets(/brokers/topics/__consumer_offsets)
# 7、记录Partition Consumer关系
- Consumer Group 在 ZooKeeper 上的注册节点为
/consumers/[group_id]
- 而 Consumer Group 中的 Consumer 在 ZooKeeper 上的注册节点为
/consumers/[group_id]
下的子节点 owners,它们共享一个 Group ID - 为了 Consumer 负载均衡,同一个 Group 订阅的 Topic 下的任一 Partition 都只能分配给一个 Consumer
- Partition 与 Consumer 的对应关系也需要在 ZooKeeper 中记录,路径为
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
- 补充:
- 这个路径也是一个临时节点,进行 Rebalance 时会被删除,而后依据新的对应关系重建
- 此外,
[broker_id-partition_id]
是一个消息分区的标识,其内容就是该消息分区消费者的 Consumer ID - 通常采用
hostname:UUID
形式表示
# 8、全程解析
# 1、Producer发布消息
Producer 采用 Push 模式将消息发布到 Kafka Broker,根据负载均衡算法(如轮询、Hash 等)
这些消息将均衡写入到相应 Topic 对应的各个 Partition 中
在存储层面,采用顺序写磁盘(即 Append)模式写入
详细流程如下
1)Producer Push 消息
- 基于负载均衡算法获得目标 Partition 后,Producer 先从 ZooKeeper 的
/brokers/.../state
节点找到该 Partition 的 Leader
- 基于负载均衡算法获得目标 Partition 后,Producer 先从 ZooKeeper 的
2)Producer 将消息发送给该 Leader
3)Leader 将消息写入本地 Log
4)所有 Follower 主动从 Leader Pull 消息,写入本地 Log 后向 Leader 发送 ACK
5)Leader 收到所有 ISR 中所有 Replica 的 ACK 后,更新 HW(High Watermark,最后 Commit 的 Offset),并向 Producer 发送 ACK
6)Producer 接到 ACK,确认发送成功。
# 2、Broker存储消息
- Topic 是逻辑概念,而 Topic 对应的 Partition 则是物理概念,每个 Partition 在存储层面都对应一个文件夹(目录)
- 由于 Partition 并不是最终的存储粒度,该文件夹下还有多个 Segment(消息索引和数据文件,它们是真正的存储文件)
# 3、Consumer消费消息
目前采用的高级 API,Consumer 在消费消息时,只需指定 Topic 即可,API 内部实现负载均衡,并将 Offset 记录到 ZooKeeper 上
Consumer 采用 Pull 模式从 Broker 中读取数据,这是一种异步消费模式,与 Producer 采用的 Push 模式全然不同
Push 模式追求速度,越快越好,当然它取决于 Broker 的性能,而 Pull 模式则是追求自适应能力,Consumer 根据自己的消费能力消费