Kafka 基础详解

    科技2024-06-01  87

    文章目录

    定义基础架构 工作流程数据存储小结 生产者分区策略数据可靠性 消费者消费方式分区分配策略offset的维护 高效读写数据分区策略顺序写磁盘pagecache零拷贝 Zookeeper在Kafka中的作用kafka事务

    定义

    kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

    kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。

    基础架构

    Producer :消息生产者,就是向kafka broker发消息的客户端;Consumer :消息消费者,向kafka broker取消息的客户端;Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。Topic : 可以理解为一个队列,生产者和消费者面向的都是一个topic;Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。

    工作流程

    ​ 在kafka中,消息以topic进行分类,生产者与消费者都是面向topic进行生产和消费。但是topic只是在逻辑上存在,实际的物理存储是以partition表现。生产者将生产好数据传输给对应的topic,topic将数据进行存储,等待消费者来消费。那消费者是怎么消费这些数据的,很显然消费者不可能是一个一个数据来进行消费,那么在批量消费数据时,消费者是怎么来确定自己消费到那里。我们需要先知道数据是怎么进行存储的。

    数据存储

    数据是怎么怎么存储的呢,假设我们向一个topic生产以下数据,

    java offset1 hello offset2 scala offset3

    kafka 会对每个数据建立的一个offset,每次用户在拉取的时候,都会记录上次拉取的offset,在下次拉取数据直接从上次拉取的offset的下一个开始。kafka会默认创建一个_consumer_offsettopic ,这里会记录我们拉取了到了那里。我们下面在具体看,数据在物理上具体的存储。

    我们在kafka配置文件中配置了log.dirs。我们创建一个topic,就会在该目录下创建topic名称+分区号这样的文件夹,我们有几个分区,就会创建几个这个的文件夹。进入这个文件夹,我们会看到这几个文件

    # 为数据根据offset段建立的索引文件 这个offset段是多大 # 在配置文件中 log.index.interval.bytes 默认大小为4k # log.index.interval.bytes 最大值为10M 00000000000000000000.index # 具体存储数据的目录 00000000000000000000.log #为数据根据时间建立的索引文件 00000000000000000000.timeindex leader-epoch-checkpoint

    由于kakfa主要被用于实时计算,如果当前数据存储的数据量过大,虽然建立了索引,但是找数据的时间依然会变的很久。因此kafka中有这样一个概念segment。当log文件中的数据过大时,就会进行一个滚动,会再次创建与上面类似的三个文件,那这个文件什么时候进行滚动,我们也可以在配置文件中进行配置。通过这个参数log.segment.bytes来配置。每三个这样的文件就是一个segment,新的segment 的文件名不是按照第一文件名基础上+1,而是下面这样

    #名称是上个segment的最后一个offset+1 #假设上个segment最后一个offset是100,新的文件名称就是 00000000000000000101.index 00000000000000000101.log 00000000000000000101.timeindex

    小结

    kafka 的文件结构,一个kafka可以有多个topic

    topic -- partition-0 -- log -- segment -- index文件 log文件 timeindex文件 -- segment -- index文件 log文件 timeindex文件 -- segment -- index文件 log文件 timeindex文件 -- partition-1 -- log -- partition-2 -- log

    现在我们知道kafka的数据是怎么存储,我们接下来分区从生产者和消费者来讨论数据的流动。

    生产者

    分区策略

    我们知道在kafka集群中一个topic会有多个分区,那为什么要有多个分区。

    实现分布式存储 每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;提高并行度 多个消费者可以同时消费一个topic的数据。

    既然有多个分区,那么生产者生成数据后是怎么确定数据到哪一个分区中。

    在我们指明partition的值时,就将值直接发送到对应分区没有指明partition,但是指明了key值时,就将key的hashcode % partition总数量 的值最为数据要发送到那个分区当没有指明partition或者key值时,第一次发送时会产生一个随机数,使用随机数 % 分区数 来确定数据发往那个分区,这里我们可以指定批次, 每次发送数据按照批次来发送,之后的批次不会在发往之前第一个分区中,在剩余的分区随机发。 可能会造成个别分区数据积累过多。【新版本】当没有指明partition或者key值时,第一次发送时会产生一个随机数,使用随机数 % 分区数 来确定数据发往那个分区,下一次发送时,在之前随机数的基础上 +1在%分区数。实现轮询的效果。【旧版本】

    现在我们知道数据要发送到那个分区,但是我们的数据发送到分区内,我们是怎么保证数据能够完整传输到分区内。

    数据可靠性

    首先我们知道为了避免某个节点宕机,kafka提供了副本机制,每个分区的数据,都会在其他节点进行了备份,生成副本。根据zookeper这几个副本会产生一个leader和多个follower,leader与生产者和消费者进行对接。

    当生产者向topic发送数据后,topic成功接收后会向生产者发送一个确认收到的信息,表明数据已经成功接收。那么topic向生产者发送消息的时机有下面几种时机:这种时机在kafka中叫做ack,通过配置ack的值来确定发送确认消息的时机。

    当ack配置为0时,生产者给topic发送数据后,topic中的分区leader还没有接受完数据,就向生产者发送确认消息。此时如果该leader所在服务器宕机,还没有将完整的数据落盘,就会造成数据的缺失,但是这种情况下,延迟是最低的。

    当ack配置为1时,分区leader已经成功落盘,但是其他的副本还没有将数据进行同步,此时leader所在服务器宕机,其他复本中也没有数据,如果此时选举出新的leader,其数据肯定是不完整的。

    当ack配置为-1时,分区leader成功落盘,数据也成功同步,但是向生产者发送确认消息时,leader所在服务器宕机,生产者迟迟没有收到确认消息,继续向新的leader发送数据,此时就会造成数据重复。

    这里还有一个问题,副本数据同步后发送消息,是所有副本同步后发送,还是副本同步一半就发送消息。在kafka中,选择的是所有副本都同步后发送消息。因为副本存在选举机制,如果副本只同步一半后,就发送确认消息,此时leader挂掉,选举了没有同步完成的副本当做leader,就会出现问题。所以在kafka中使用的是全部同步后发送确认消息。

    此时又会出现一个问题,如果有一个副本,因为网络和其他原因,一直不能同步完成,那么由于ack机制,leader如果要一直等他完成再去发送确认消息,就会影响其实时性。

    此时kafka提供了一个ISR机制。ISR就是和leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。那leader或者是follower故障后里面数据应该怎么去同步。

    首先要了解俩个概念

    LEO 每个副本的最后一个offsetHW 所有副本中最小的LEO # 假设现在一个leader,俩个follower,leader中的数据已经落盘 follower正在同步leader的数据, # 下面的数字代表offset leader 0 1 2 3 4 5 6 7 8 9 10 11 LEO=11 follower 0 1 2 3 4 5 6 7 LEO=7 follower 0 1 2 3 4 5 6 7 8 9 LEO=9 此时HW为7

    假设现在leader故障

    就要选举新的leader,那么新的leader怎么选举,我们知道ISR中是和leader保持同步的follower集合,怎么算是与leader保持同步,kafka中定义ISR中存放所有LEO大于HW的follower。新的leader就会从ISR 集合中进行选举,选举之后,因为之前的leader挂掉,数据很有可能小于leader,所以每个follower就会把高于HW的部分切割掉。例如,上面的案例假设leader挂掉,选举第一个follower为leader,此时就要将第二个follower中高于HW的删除后,HW为7;

    假设现在follower故障

    follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

    Exactly Once语义

    我们将ack级别设置为-1,能够保证数据不会丢失,但是可能会出现重复,这种情况就是At Least Once(至少一条)。

    将ack级别设置为0,不会出现数据重复,但是会出现数据丢失,这种情况就是At Most Once(最多一条)

    但是上面的情况不是我们想要的效果,有没有一种语义可以实现数据有且仅有一条。在kafka中有这样一种语义

    Exactly Once = At Least Once + 幂等性。

    At Least Once 我们可以使用将ack设置为-1来实现

    幂等性 在kafka中producer在发送数据时,将producerid也发送过去,通过这样的方式保证数据只有一条。我们在kafka中要开启幂等性,需要配置producer的参数enable.idompotence来开启幂等性。这里还会有一个问题,就是在producer重启后就会产生一个新的producerid。这样就无法保证的数据的一致性。

    消费者

    消费方式

    consumer采用pull(拉)模式从broker中读取数据。

    push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

    pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

    分区分配策略

    我们知道为了提高并行度,在kafka中有消费者组的概念,一个消费者组内有多个消费者,一个消费者组去消费一个topic,消费者去消费topic的partition的数据。在kafka中有俩种分配策略。

    roundrobin 轮询

    消费着轮流消费每个分区中的数据。

    range

    主要分为俩步

    ​ 先大概评估每个消费者消费几个分区,分区数量/消费者的个数。

    ​ 了解前多少个消费者多消费一个分区,分区数量/消费者的个数

    # 假设我们现在有7个分区,三个消费者, 7/3 = 2 7%3=1 此时前三个分区发给一个消费者,后四个分区平均发给剩余俩个消费者。

    我们现在知道了消费者是从那个分区中拉取数据,当分区中的数据太多时,消费这该怎么知道自己拉取了多少数据?

    offset的维护

    由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。consumer默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offset。

    我们已经了解了kafka中的生产者,消费者与topic 的数据交互,那为什么kafka可以使用在实时性高的应用场景。

    高效读写数据

    分区策略

    多个分区,并发读写。

    顺序写磁盘

    Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写,省去了大量磁头寻址的时间。

    pagecache

    pagecache[是内存区域,数据写入broker之前先写入pagecache中,pagecache的数据flush到磁盘]将单次写磁盘改成批量写磁盘,减少磁盘的交互次数在pagecache中对数据进行地址的排序,减少磁头寻址的时间如果网络够好[producer的生产数据的速度=consumer的消费速度],此时producer刚刚将数据写入pagecache,consumer可以直接从pagecache中将数据拉走,不用与磁盘交互pagecache不在JVM中不会影响GC

    零拷贝

    linux内存划分为两块: 内核区、用户区 正常情况: 1、通过流将数据读取到内核的pagecache中 2、将数据从内核的pagecache中拷贝到用户区,对数据进行操作 3、将数据从用户区拷贝到socketcache中 4、将数据从socketcache中发送到网卡 kafka因为不需要对数据进行额外的操作,所以可以使用零拷贝 零拷贝: 1、通过流将数据读取到内核的pagecache中 2、直接将数从pagecache中发送到网卡

    Zookeeper在Kafka中的作用

    zookeeper在kafka的作用 kafka的监听broker的状态,选举leader都依赖zookeeper leader选举流程: 1、在启动集群的时候broker回启动,在broker启动的过程中会选出一个座位controller 2、broker启动之后会在zookeeper上进行注册[/brokers/ids] 3、controller会监听broker注册的目录[/brokers/ids] 4、broker上保存的partition,partition也会在zookeeper上注册[/brokers/topics/topic名称/partitions/分区号/state] 5、一旦broker宕机,controller能够监测到 6、宕机的broker可能有partition的leader,此时选举出新leader 7、contoller更新leader所在的主机[/brokers/topics/topic名称/partitions/分区号/state]

    kafka事务

    在之前我们知道,kafka在数据一致性上可以实现Exactly Once,但是当时有一个问题,当生产者因为一些原因,断开链接,就无法在保持数据的一致性,当producer再次连接上,会产生不同的producerId。因为我们的topic中是有多个分区,为了能够实现跨分区的事务,就引入了一个Transaction ID。现在将producerID和TransactionID绑定到一起,当producerId重启后通过正在进行的Transaction ID获得原来的producerId。

    Processed: 0.012, SQL: 8