kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。
在kafka中,消息以topic进行分类,生产者与消费者都是面向topic进行生产和消费。但是topic只是在逻辑上存在,实际的物理存储是以partition表现。生产者将生产好数据传输给对应的topic,topic将数据进行存储,等待消费者来消费。那消费者是怎么消费这些数据的,很显然消费者不可能是一个一个数据来进行消费,那么在批量消费数据时,消费者是怎么来确定自己消费到那里。我们需要先知道数据是怎么进行存储的。
数据是怎么怎么存储的呢,假设我们向一个topic生产以下数据,
java offset1 hello offset2 scala offset3kafka 会对每个数据建立的一个offset,每次用户在拉取的时候,都会记录上次拉取的offset,在下次拉取数据直接从上次拉取的offset的下一个开始。kafka会默认创建一个_consumer_offsettopic ,这里会记录我们拉取了到了那里。我们下面在具体看,数据在物理上具体的存储。
我们在kafka配置文件中配置了log.dirs。我们创建一个topic,就会在该目录下创建topic名称+分区号这样的文件夹,我们有几个分区,就会创建几个这个的文件夹。进入这个文件夹,我们会看到这几个文件
# 为数据根据offset段建立的索引文件 这个offset段是多大 # 在配置文件中 log.index.interval.bytes 默认大小为4k # log.index.interval.bytes 最大值为10M 00000000000000000000.index # 具体存储数据的目录 00000000000000000000.log #为数据根据时间建立的索引文件 00000000000000000000.timeindex leader-epoch-checkpoint由于kakfa主要被用于实时计算,如果当前数据存储的数据量过大,虽然建立了索引,但是找数据的时间依然会变的很久。因此kafka中有这样一个概念segment。当log文件中的数据过大时,就会进行一个滚动,会再次创建与上面类似的三个文件,那这个文件什么时候进行滚动,我们也可以在配置文件中进行配置。通过这个参数log.segment.bytes来配置。每三个这样的文件就是一个segment,新的segment 的文件名不是按照第一文件名基础上+1,而是下面这样
#名称是上个segment的最后一个offset+1 #假设上个segment最后一个offset是100,新的文件名称就是 00000000000000000101.index 00000000000000000101.log 00000000000000000101.timeindexkafka 的文件结构,一个kafka可以有多个topic
topic -- partition-0 -- log -- segment -- index文件 log文件 timeindex文件 -- segment -- index文件 log文件 timeindex文件 -- segment -- index文件 log文件 timeindex文件 -- partition-1 -- log -- partition-2 -- log现在我们知道kafka的数据是怎么存储,我们接下来分区从生产者和消费者来讨论数据的流动。
我们知道在kafka集群中一个topic会有多个分区,那为什么要有多个分区。
实现分布式存储 每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;提高并行度 多个消费者可以同时消费一个topic的数据。既然有多个分区,那么生产者生成数据后是怎么确定数据到哪一个分区中。
在我们指明partition的值时,就将值直接发送到对应分区没有指明partition,但是指明了key值时,就将key的hashcode % partition总数量 的值最为数据要发送到那个分区当没有指明partition或者key值时,第一次发送时会产生一个随机数,使用随机数 % 分区数 来确定数据发往那个分区,这里我们可以指定批次, 每次发送数据按照批次来发送,之后的批次不会在发往之前第一个分区中,在剩余的分区随机发。 可能会造成个别分区数据积累过多。【新版本】当没有指明partition或者key值时,第一次发送时会产生一个随机数,使用随机数 % 分区数 来确定数据发往那个分区,下一次发送时,在之前随机数的基础上 +1在%分区数。实现轮询的效果。【旧版本】现在我们知道数据要发送到那个分区,但是我们的数据发送到分区内,我们是怎么保证数据能够完整传输到分区内。
首先我们知道为了避免某个节点宕机,kafka提供了副本机制,每个分区的数据,都会在其他节点进行了备份,生成副本。根据zookeper这几个副本会产生一个leader和多个follower,leader与生产者和消费者进行对接。
当生产者向topic发送数据后,topic成功接收后会向生产者发送一个确认收到的信息,表明数据已经成功接收。那么topic向生产者发送消息的时机有下面几种时机:这种时机在kafka中叫做ack,通过配置ack的值来确定发送确认消息的时机。
当ack配置为0时,生产者给topic发送数据后,topic中的分区leader还没有接受完数据,就向生产者发送确认消息。此时如果该leader所在服务器宕机,还没有将完整的数据落盘,就会造成数据的缺失,但是这种情况下,延迟是最低的。
当ack配置为1时,分区leader已经成功落盘,但是其他的副本还没有将数据进行同步,此时leader所在服务器宕机,其他复本中也没有数据,如果此时选举出新的leader,其数据肯定是不完整的。
当ack配置为-1时,分区leader成功落盘,数据也成功同步,但是向生产者发送确认消息时,leader所在服务器宕机,生产者迟迟没有收到确认消息,继续向新的leader发送数据,此时就会造成数据重复。
这里还有一个问题,副本数据同步后发送消息,是所有副本同步后发送,还是副本同步一半就发送消息。在kafka中,选择的是所有副本都同步后发送消息。因为副本存在选举机制,如果副本只同步一半后,就发送确认消息,此时leader挂掉,选举了没有同步完成的副本当做leader,就会出现问题。所以在kafka中使用的是全部同步后发送确认消息。
此时又会出现一个问题,如果有一个副本,因为网络和其他原因,一直不能同步完成,那么由于ack机制,leader如果要一直等他完成再去发送确认消息,就会影响其实时性。
此时kafka提供了一个ISR机制。ISR就是和leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。那leader或者是follower故障后里面数据应该怎么去同步。
首先要了解俩个概念
LEO 每个副本的最后一个offsetHW 所有副本中最小的LEO # 假设现在一个leader,俩个follower,leader中的数据已经落盘 follower正在同步leader的数据, # 下面的数字代表offset leader 0 1 2 3 4 5 6 7 8 9 10 11 LEO=11 follower 0 1 2 3 4 5 6 7 LEO=7 follower 0 1 2 3 4 5 6 7 8 9 LEO=9 此时HW为7假设现在leader故障
就要选举新的leader,那么新的leader怎么选举,我们知道ISR中是和leader保持同步的follower集合,怎么算是与leader保持同步,kafka中定义ISR中存放所有LEO大于HW的follower。新的leader就会从ISR 集合中进行选举,选举之后,因为之前的leader挂掉,数据很有可能小于leader,所以每个follower就会把高于HW的部分切割掉。例如,上面的案例假设leader挂掉,选举第一个follower为leader,此时就要将第二个follower中高于HW的删除后,HW为7;
假设现在follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
Exactly Once语义
我们将ack级别设置为-1,能够保证数据不会丢失,但是可能会出现重复,这种情况就是At Least Once(至少一条)。
将ack级别设置为0,不会出现数据重复,但是会出现数据丢失,这种情况就是At Most Once(最多一条)
但是上面的情况不是我们想要的效果,有没有一种语义可以实现数据有且仅有一条。在kafka中有这样一种语义
Exactly Once = At Least Once + 幂等性。
At Least Once 我们可以使用将ack设置为-1来实现
幂等性 在kafka中producer在发送数据时,将producerid也发送过去,通过这样的方式保证数据只有一条。我们在kafka中要开启幂等性,需要配置producer的参数enable.idompotence来开启幂等性。这里还会有一个问题,就是在producer重启后就会产生一个新的producerid。这样就无法保证的数据的一致性。
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
我们知道为了提高并行度,在kafka中有消费者组的概念,一个消费者组内有多个消费者,一个消费者组去消费一个topic,消费者去消费topic的partition的数据。在kafka中有俩种分配策略。
roundrobin 轮询消费着轮流消费每个分区中的数据。
range主要分为俩步
先大概评估每个消费者消费几个分区,分区数量/消费者的个数。
了解前多少个消费者多消费一个分区,分区数量/消费者的个数
# 假设我们现在有7个分区,三个消费者, 7/3 = 2 7%3=1 此时前三个分区发给一个消费者,后四个分区平均发给剩余俩个消费者。我们现在知道了消费者是从那个分区中拉取数据,当分区中的数据太多时,消费这该怎么知道自己拉取了多少数据?
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。consumer默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offset。
我们已经了解了kafka中的生产者,消费者与topic 的数据交互,那为什么kafka可以使用在实时性高的应用场景。
多个分区,并发读写。
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写,省去了大量磁头寻址的时间。
zookeeper在kafka的作用 kafka的监听broker的状态,选举leader都依赖zookeeper leader选举流程: 1、在启动集群的时候broker回启动,在broker启动的过程中会选出一个座位controller 2、broker启动之后会在zookeeper上进行注册[/brokers/ids] 3、controller会监听broker注册的目录[/brokers/ids] 4、broker上保存的partition,partition也会在zookeeper上注册[/brokers/topics/topic名称/partitions/分区号/state] 5、一旦broker宕机,controller能够监测到 6、宕机的broker可能有partition的leader,此时选举出新leader 7、contoller更新leader所在的主机[/brokers/topics/topic名称/partitions/分区号/state]
在之前我们知道,kafka在数据一致性上可以实现Exactly Once,但是当时有一个问题,当生产者因为一些原因,断开链接,就无法在保持数据的一致性,当producer再次连接上,会产生不同的producerId。因为我们的topic中是有多个分区,为了能够实现跨分区的事务,就引入了一个Transaction ID。现在将producerID和TransactionID绑定到一起,当producerId重启后通过正在进行的Transaction ID获得原来的producerId。