大数据秋招面经之kafka,sparkstreaming系列

    科技2022-07-10  179

    文章目录

    前言1.kafka的架构2.kafka的特点或者作用3.kafka中消费者组是怎么回事?消费者组可不可以订阅多个分区?为什么有消费者组,作用是啥?4.sparkstreaming直连kafka的两种方式(这题面试都问烂了)5.怎么维护offset6.同步提交和异步调提交:7.sparkstreaming端直连kafka时如何知道kafka有几个分区,那如果kafka分区数增加了,sparkstreaming端怎么知道的?8.kafka中的消息加前缀是怎么加的?9.kafka在消费完成后,提交offset之前,机器挂了,如何处理10.kafka的acks与retries也可能会在生产者端造成数据重复,为什么11.kafka的事物:kafka的事物在分布式微服务中有不可替代的地位。12.kafka消费者默认对于未订阅的topic的offset的消费策略:13.kafka主从同步机制14.kafka高吞吐的原因15.kafka生产者客户端有几个线程?16. kafkaConsumer是非线程安全,如何实现多线程消费?17.rocketMQ和kafka的区别:18.kafka手动提交offset底层实现(源码级别):19.将offset保存到了hbase里面去了,假如做了rebalance,或者增加了分区怎么办?或者offset out of range exception(offset过期),对应的offset没了怎么办


    前言

    kafka,sparkstreaming作为实时流数据处理的常用组件,在大数据面试中也是高频考点,秋招的同学可以找一个涉及kafka和spark相关的项目,结合项目来加深对kafka,spark实时处理数据的理解。


    1.kafka的架构

    从这几个方面来谈: 生产者producer,消费者consumer,主题或者叫队列topic,消费者组consumer group,节点broker,分区partition ,消息偏移量offset。 kafka是一个分布式消息队列,kafka对消息保存时根据topic进行归类,发送者是producer,消费者是consumer,kafka集群有多个kafka实例,每个实例叫broker。producer,consumer都可理解为是broker的客户端。

    消费者组:kafka提供可扩展且具有容错机制的消费者机制,组内有多个消费者,它们有一个共同的id,组内所有消费者协调在一起订阅主题的所有分区,每个分区只能由消费者组内的一个消费者来消费。broker:一个kafka服务器就是一个broker。一个集群由多个broker组成,一个broker可以容纳多个topic。partition:一个topic内有多个partition,每个partition都是一个有序队列,partition内的每条消息都会有一个有序的id,即为offset,kafka只能保证一个partition内的消息有序的发给一个消费者。offset:消息在partition内的偏移量,方便定位消息的位置。

    2.kafka的特点或者作用

    解耦:允许你独立的扩展或者修改两边的处理过程,只要确保它们遵守同样的接口约束异步:允许用户不立刻处理队列中的数据,在需要的适合处理它们。缓冲:控制和优化数据流经过系统的速度,解决生产者和消费者消息处理速度不一致的问题。削峰,限流:使用消息队列可使关键组件顶住突发压力,降低处理数据峰值。容错机制:通过topic内有多个分区,分布在不同的broker上实现。可扩展性:分布式系统都具备可扩展能力,因为消息队列解耦了处理过程,所以增大消息入队和处理频率是很容易的。可恢复性:系统中一部分组件挂掉,不会影响整个系统。消息队列降低了进程间耦合程度,,即使一个处理消息的进程挂了,加入队列的消息依然可以在系统恢复后被处理。分区内顺序保证:一个partition内数据有序。

    3.kafka中消费者组是怎么回事?消费者组可不可以订阅多个分区?为什么有消费者组,作用是啥?

    传统的消息系统有两种消息模型:点对点模型,发布/订阅模型。传统的消息队列模型的缺陷在于消息一旦被消费,就会从队列中删除,且只能被下游的一个消费者消费,这种模型伸缩性差,因为下游的多个消费者要抢队列中的消息。

    发布/订阅模型:允许消息被多个consumer消费,订阅者必须订阅所有主题的所有分区,伸缩性差。而消费者组的引入解决了上述两种模式下伸缩性差的问题,当消费者组订阅了多个主题后,组内的实例不要求订阅主题的所有分区,它只会消费部分分区的消息,而消费者组之间互不影响,完美解决伸缩性差的问题。

    消费者组机制同时实现了点对点模型和发布/订阅模型,如果所有实例属于一个消费者组,就是消息队列模型,如果所有消费者属于不同的消费者组,且订阅了相同主题,就是发布/订阅模型。

    4.sparkstreaming直连kafka的两种方式(这题面试都问烂了)

    receiver模式:worker中的Executor里会启动线程去接收数据,每隔一段时间去接收数据(如200毫秒),receiver把接收到的数据封装成一个个block,然后把这些block数据写入该executor的内存中。receiver把接收到的数据块block信息通知给driver,driver端会根据一定的时间间隔(如2秒),把这些block块组织成一个RDD,然后对这些RDD进行处理。每一个block块就是RDD中一个partition,一个partition对应一个task任务。worker端的executor内会另启线程去处理数据。使用kafka的高级api。 receiver模式最大的弊端:接收数据的线程与处理数据的线程不是同一个线程,若处理线程挂了,而接收数据线程一直在接收数据,易挤爆内存,oom。 在这种模式下为了防止数据丢失,要启用高可用机制WAL,让数据零丢失,即开启sparkstreaming的预写入日志WAL,该机制会同步地将接收到的kafka数据写入分布式文件系统HDFS,但开启WAL无疑进一步降低处理性能。direct模式:worker中的Executor里会启动线程去获取数据,然后进行处理,接收数据的线程与处理数据的线程是同一个线程;不会出现oom的问题。还可以配合背压机制,控制数据流的速度。 sparkstreaming的背压机制借鉴了flink的反压机制。反压:数据堆积了,下游来不及处理,向上反馈,上游接收数据的速度慢一点。使用kafka的low api。 基于direct模式的优点: 1.简化并行读取:kafka某个topic的分区数与spark中分区数保持一致,在kafka partition和RDD partition之间有一个一对一的映射关系。效率高 2.高性能:无需开启WAL,只要kafka中做了数据复制,就可以用kafka的副本进行恢复。

    5.怎么维护offset

    低版本自动维护offset到ZK group/topic/partition1/offset1 /partition2/offset2 /partition3/offset3

    0.10版本已不支持receiver模式,只有direct模式,offset默认存到kafka的consumer_offset中去,使用了key-value的形式

    6.同步提交和异步调提交:

    异步提交效率更高,但可能提交不成功,而且你不知道有没有成功。 同步提交会阻塞程序,效率较低。

    7.sparkstreaming端直连kafka时如何知道kafka有几个分区,那如果kafka分区数增加了,sparkstreaming端怎么知道的?

    在kafka0.10开始,kafka与sparksrteaming结合支持新增分区检测 分区检测应该在每次job生成获取kafkaRDD,来给kafkaRDD确定分区数并且赋值offset范围时做的。

    Sparkstreaming直连 kafka源码中的调用关系: KafkaUtils.createDirectStream–>new DirectKafkaInputStream–>里面的compute()方法–>生成KafkaRDD。 kafka源码中DirectKafkaInputDStream的compute方法中可以看到,在latestOffsets里面可以获取所有分区信息,得到新的partition,并且将新的partition加到currentOffsets变量里,实现了kafka动态分区检测。 而在低版本中只能根据currentOffsets信息来获取最大offset,没有去感知新的分区,可在每次生成kafkaRDD时,手动将currentOffsets修改为最新值(往里面新增partition数,以及对应的offsets) 查看kafka特定topic的详情的命令:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test 可用命令行或者kafka api增加分区数

    8.kafka中的消息加前缀是怎么加的?

    在kafka0.11版本可指定生产者幂等性,enable.idempotence=true;底层原理是让kafka自动做消息重复的去重,用空间换时间,在broker端多保留一些字段,当生产者发送了具有相同字段的消息后,broker会知道,然后把重复的过滤掉,但是只能保证分区内幂等。 如要保证多个分区幂等,还需设置生产者端的参数(事物id),在一次事物内id是相同的,为pid,为了解决生产者重试引起的乱序和重复问题,kafka增加了pid+seq,生产者的每一个记录都有一个单调递增的seq,broker上每个topic也会维护一个pid+seq的映射。当记录来了时,broker会先检查记录再保存数据,如果第一条消息的seq比broker维护的序列化(lastseq)大1,则保存数据,否则不保存。

    9.kafka在消费完成后,提交offset之前,机器挂了,如何处理

    最佳方案:将kafka数据转化为rdd,在窗口操作前在driver端使用transform函数获得到offset信息,然后当foreachRDD这个逻辑处理无误后在输出操作foreachRDD里完成offset提交。 即将消费逻辑和提交offset放到try catch里作为一个事务处理,若在消费完成后,提交offset之前,机器挂了,对事物进行回滚,下次会从原来的offset消费,且不会造成数据丢失或重复。

    其他方案:往前调,把重复的数据删了或者在生产者做消息幂等,当机器挂了,即使会出现重复消费,在下游(mysql,redis,hive)做数据去重,去重手段:分组,并按照id开窗取第一个值。

    10.kafka的acks与retries也可能会在生产者端造成数据重复,为什么

    假如kafka.ack机制设成了-1,在生产者将数据传到broker上,broker的某个分区已经有数据了,但是在给生产者响应已经收到数据时由于网络延迟,响应未成功。所以生产者会默认再重发消息retries,造成数据重复。

    解决方案:利用kafka0.11版本生产者的幂等性,保证生产者发的消息不重复且不丢,实现幂等性的关键的服务端可区分重复消息且进行过滤。过滤重复消息有两点:唯一标识,如支付请求中订单号就是唯一标识,还需要记录下处理过的请求,当收到新的请求时,用它和处理记录进行比较。在使用幂等性时,必须开启retries=true,ack=all。

    kafka生产者实现幂等性原理: kafka给生产者一个唯一的id(pid),pid和序列号和消息捆绑在一起,然后发送给Broker,序列号单调递增,当消息的序列号比pid中最后提交的消息正好大1时,broker才会接受消息,如果不是,broker认定是生产者重新发消息。一般设置max.in.flight.requests.per.connection=1,保证有一个发送不成功就阻塞,保证了生产者端的有序性。

    11.kafka的事物:kafka的事物在分布式微服务中有不可替代的地位。

    kafka幂等性只能保证分区幂等性,如果要保证所有分区幂等,需要kafka的事物. kafka的事物分为:生产者事物;消费者和生产者事物 一般来说,消费者消费消息的级别是read_uncommited,这有可能读到事物失败的数据,产生脏读,所以为了防止此情况,一般开启事物时,将消费者事物隔离级别设为read_committed。 开启生产者事物时一般只需要配置事物id,保证事物id的取值必须是唯一的,同一时刻只能有一个事物id存在,其他的被关闭。

    事物开启步骤:

    初始化事务开启事物,事物提交终止事务

    其中将开启事物与事物提交放到try{}代码块中,如果其中出现错误,用catch{}来终止事物 消费者和生产者事物流程:

    初始化事物

    while(true){

    开启事物控制 try{

    迭代数据,进行业务处理

    提交消费者的偏移量

    提交事物 }catch{

    终止事物 } }

    12.kafka消费者默认对于未订阅的topic的offset的消费策略:

    也就是系统没有存储该消费者的消费分区的记录信息,默认消费者首次消费策略是:latest 注意是首次消费策略,第一次。

    13.kafka主从同步机制

    关键点:ISR机制的过程,还有高水位,高版本的Leader Epoch kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步需要所有副本都复制完,这条消息才会被提交,这种方式影响性能,吞吐率。在异步复制方式下,数据只要写入leader,就认为消息提交了,这种在leader挂了会丢失数据,kafka使用ISR机制很好的均衡了确保数据不丢失以及吞吐量。

    ISR机制:leader会维护一个与其基本保持同步的副本列表,该列表就是ISR,每个partition都会有一个ISR,如果一个follower的LEO比leader落后太多,或者超过一定时间为发起复制请求,则将此副本从ISR中移除。

    0.11版本以前kafka使用高水位机制保证数据的同步即副本的高水位之前的数据都是与leader同步过的, LEO:log end offset标识的是每个分区中消息要写入的位置,每个分区的每个副本都有自己的LEO,采用高水位进行主从同步的问题:丢数据;或数据不一致。

    丢数据:A还没来得及使水位线与B相同(即A的水位低于B),此时A发生了重启,重启后A自然会对数据进行截断,A截断完之后B挂了,此时A成为了主,数据丢失

    数据不一致:A的水位是0,里边一个记录m0,B的水位是1,里边的数据是m0,m1,此时A,B都挂了,然后A先重启,A是主机,此时A写入一个记录m3,A的水位变成了1。此时B重启,发现A,B水位一样,认为A,B同步完成,造成数据不一样。

    在0.11版本之后引入了Leader Epoch(版本),不用高水位作为数据截断的依据,任意一个Leader都有一个Leader Epoch,每个消息都带有Leader Epoch号,在每个分区中都会建新的Leader Epoch Sequence文件,在其中存储序列号以及该Epoch中生成的消息的start offset,并将它缓存再每一个副本,也缓存在broker的内存中。 主机和副本的Leader Epoch可以不一样,恢复时按照不同情况恢复即可,保证最后主机和副本同步。

    14.kafka高吞吐的原因

    零拷贝通过多分区进行分布式计算顺序写磁盘

    15.kafka生产者客户端有几个线程?

    2个线程,主线程负责产生消息,然后通过拦截器,序列化器,分区器,作用之后缓存到累加器。 sender线程负责将累加器中的消息发送到kafka中

    16. kafkaConsumer是非线程安全,如何实现多线程消费?

    在每一个线程中新建一个kafkaConsumer, 单线程创建kafkaConsumer,多个处理线程处理消息,难点在于是否考虑消息顺序性,offset提交方式。

    17.rocketMQ和kafka的区别:

    数据可靠性:RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步Replicaton;kafka异步刷盘,异步Replication. RocketMQ的同步刷盘在单机可靠性上比kafka更高,不会因为操作系统crash,导致数据丢失。且同步Replication比kafka异步Replication更可靠。数据完全无单点。另外由于kafka是异步Replication,主备切换时有数据丢失问题

    性能:单机情况下kafka更高;rocketMQ是基于java,若producer缓存很多消息,GC是个大问题,且若宕机消息可能丢失

    单机支持的队列数:kafka超过64个队列时load会发生飙高的情况;而rocketMQ单机最高支持5万个队列,load不会发生明显影响。队列多可创建更多的topic。

    消费失败重试:kafka消费失败不支持重试;RocketMQ支持定时重试,每次重试时间间隔顺延。

    严格的消息顺序:kafka一个broker挂了就会发生消息乱序,RocketMQ支持严格的消息顺序,一个挂机消息会发送失败,但不会乱序。

    定时消息:kafka不支持定时消息;RocketMQ支持毫秒级延时时间。

    分布式事物:kafka不支持分布式事物消息;RocketMQ支持

    18.kafka手动提交offset底层实现(源码级别):

    手动提交过程是在spark的driver端完成的,driver端有个主线程和循环线程,提交过程是在循环线程完成的。 通过kafkaUtils.createDirectStream得到的DStream的类型是DirectKafkaInputStream;kafkaRDD本身是私有的,它的父接口(父trait)是HasOffsetRanges,DirectKafkaInputStream是由RDD组成,所以DirectKafkaInputStream里边包裹了kafkaRDD;而DirectKafkaInputStream的父接口(父trait)是CanCommitOffsets 所以手动提交kafka的offset是通过第一个提交的DStream得到的,每个job都有一个自己的offset,所以要放到循环线程里去执行,所以提交offset的线程是在循环里。 维护offset的目的:持久化 何时用起你维护的offset:application重启的时候,即driver重启的时候 若手动维护offset到第三方工具需要两头堵 kafka.foreachRDD( //将提交offset的代码放到DStream对象的接收函数里,那么未来在调度线程里,这个函数每个job都有机会调用一次,伴随着提交offset rdd=>{ val ranges:Array[offsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRanges } //driver端就这样拿到了offset //闭包,通过kafkaUtils得到的第一个DStream向上转型,提交offset kafka.asInstanceOf[CanCommitOffsets].commitAsync(ranges)//异步提交,回调它会有延迟

    19.将offset保存到了hbase里面去了,假如做了rebalance,或者增加了分区怎么办?或者offset out of range exception(offset过期),对应的offset没了怎么办

    用kafka的seek修正,通过seek修正去找最早的offset或者最晚的offset。

    若获取offset失败,先获取分区信息: val topicp=consumer.assignment().toSet;

    再暂停消费:consumer.pause(topicp);

    最后seek修正:consumer.seekToBeginning(topicp) 或者consumer.seekToEnd(topicp) )


    Processed: 0.195, SQL: 8