kafka,sparkstreaming作为实时流数据处理的常用组件,在大数据面试中也是高频考点,秋招的同学可以找一个涉及kafka和spark相关的项目,结合项目来加深对kafka,spark实时处理数据的理解。
从这几个方面来谈: 生产者producer,消费者consumer,主题或者叫队列topic,消费者组consumer group,节点broker,分区partition ,消息偏移量offset。 kafka是一个分布式消息队列,kafka对消息保存时根据topic进行归类,发送者是producer,消费者是consumer,kafka集群有多个kafka实例,每个实例叫broker。producer,consumer都可理解为是broker的客户端。
消费者组:kafka提供可扩展且具有容错机制的消费者机制,组内有多个消费者,它们有一个共同的id,组内所有消费者协调在一起订阅主题的所有分区,每个分区只能由消费者组内的一个消费者来消费。broker:一个kafka服务器就是一个broker。一个集群由多个broker组成,一个broker可以容纳多个topic。partition:一个topic内有多个partition,每个partition都是一个有序队列,partition内的每条消息都会有一个有序的id,即为offset,kafka只能保证一个partition内的消息有序的发给一个消费者。offset:消息在partition内的偏移量,方便定位消息的位置。传统的消息系统有两种消息模型:点对点模型,发布/订阅模型。传统的消息队列模型的缺陷在于消息一旦被消费,就会从队列中删除,且只能被下游的一个消费者消费,这种模型伸缩性差,因为下游的多个消费者要抢队列中的消息。
发布/订阅模型:允许消息被多个consumer消费,订阅者必须订阅所有主题的所有分区,伸缩性差。而消费者组的引入解决了上述两种模式下伸缩性差的问题,当消费者组订阅了多个主题后,组内的实例不要求订阅主题的所有分区,它只会消费部分分区的消息,而消费者组之间互不影响,完美解决伸缩性差的问题。
消费者组机制同时实现了点对点模型和发布/订阅模型,如果所有实例属于一个消费者组,就是消息队列模型,如果所有消费者属于不同的消费者组,且订阅了相同主题,就是发布/订阅模型。
0.10版本已不支持receiver模式,只有direct模式,offset默认存到kafka的consumer_offset中去,使用了key-value的形式
异步提交效率更高,但可能提交不成功,而且你不知道有没有成功。 同步提交会阻塞程序,效率较低。
在kafka0.10开始,kafka与sparksrteaming结合支持新增分区检测 分区检测应该在每次job生成获取kafkaRDD,来给kafkaRDD确定分区数并且赋值offset范围时做的。
Sparkstreaming直连 kafka源码中的调用关系: KafkaUtils.createDirectStream–>new DirectKafkaInputStream–>里面的compute()方法–>生成KafkaRDD。 kafka源码中DirectKafkaInputDStream的compute方法中可以看到,在latestOffsets里面可以获取所有分区信息,得到新的partition,并且将新的partition加到currentOffsets变量里,实现了kafka动态分区检测。 而在低版本中只能根据currentOffsets信息来获取最大offset,没有去感知新的分区,可在每次生成kafkaRDD时,手动将currentOffsets修改为最新值(往里面新增partition数,以及对应的offsets) 查看kafka特定topic的详情的命令:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test 可用命令行或者kafka api增加分区数
在kafka0.11版本可指定生产者幂等性,enable.idempotence=true;底层原理是让kafka自动做消息重复的去重,用空间换时间,在broker端多保留一些字段,当生产者发送了具有相同字段的消息后,broker会知道,然后把重复的过滤掉,但是只能保证分区内幂等。 如要保证多个分区幂等,还需设置生产者端的参数(事物id),在一次事物内id是相同的,为pid,为了解决生产者重试引起的乱序和重复问题,kafka增加了pid+seq,生产者的每一个记录都有一个单调递增的seq,broker上每个topic也会维护一个pid+seq的映射。当记录来了时,broker会先检查记录再保存数据,如果第一条消息的seq比broker维护的序列化(lastseq)大1,则保存数据,否则不保存。
最佳方案:将kafka数据转化为rdd,在窗口操作前在driver端使用transform函数获得到offset信息,然后当foreachRDD这个逻辑处理无误后在输出操作foreachRDD里完成offset提交。 即将消费逻辑和提交offset放到try catch里作为一个事务处理,若在消费完成后,提交offset之前,机器挂了,对事物进行回滚,下次会从原来的offset消费,且不会造成数据丢失或重复。
其他方案:往前调,把重复的数据删了或者在生产者做消息幂等,当机器挂了,即使会出现重复消费,在下游(mysql,redis,hive)做数据去重,去重手段:分组,并按照id开窗取第一个值。
假如kafka.ack机制设成了-1,在生产者将数据传到broker上,broker的某个分区已经有数据了,但是在给生产者响应已经收到数据时由于网络延迟,响应未成功。所以生产者会默认再重发消息retries,造成数据重复。
解决方案:利用kafka0.11版本生产者的幂等性,保证生产者发的消息不重复且不丢,实现幂等性的关键的服务端可区分重复消息且进行过滤。过滤重复消息有两点:唯一标识,如支付请求中订单号就是唯一标识,还需要记录下处理过的请求,当收到新的请求时,用它和处理记录进行比较。在使用幂等性时,必须开启retries=true,ack=all。
kafka生产者实现幂等性原理: kafka给生产者一个唯一的id(pid),pid和序列号和消息捆绑在一起,然后发送给Broker,序列号单调递增,当消息的序列号比pid中最后提交的消息正好大1时,broker才会接受消息,如果不是,broker认定是生产者重新发消息。一般设置max.in.flight.requests.per.connection=1,保证有一个发送不成功就阻塞,保证了生产者端的有序性。
kafka幂等性只能保证分区幂等性,如果要保证所有分区幂等,需要kafka的事物. kafka的事物分为:生产者事物;消费者和生产者事物 一般来说,消费者消费消息的级别是read_uncommited,这有可能读到事物失败的数据,产生脏读,所以为了防止此情况,一般开启事物时,将消费者事物隔离级别设为read_committed。 开启生产者事物时一般只需要配置事物id,保证事物id的取值必须是唯一的,同一时刻只能有一个事物id存在,其他的被关闭。
事物开启步骤:
初始化事务开启事物,事物提交终止事务其中将开启事物与事物提交放到try{}代码块中,如果其中出现错误,用catch{}来终止事物 消费者和生产者事物流程:
初始化事物
while(true){
开启事物控制 try{
迭代数据,进行业务处理
提交消费者的偏移量
提交事物 }catch{
终止事物 } }
也就是系统没有存储该消费者的消费分区的记录信息,默认消费者首次消费策略是:latest 注意是首次消费策略,第一次。
关键点:ISR机制的过程,还有高水位,高版本的Leader Epoch kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步需要所有副本都复制完,这条消息才会被提交,这种方式影响性能,吞吐率。在异步复制方式下,数据只要写入leader,就认为消息提交了,这种在leader挂了会丢失数据,kafka使用ISR机制很好的均衡了确保数据不丢失以及吞吐量。
ISR机制:leader会维护一个与其基本保持同步的副本列表,该列表就是ISR,每个partition都会有一个ISR,如果一个follower的LEO比leader落后太多,或者超过一定时间为发起复制请求,则将此副本从ISR中移除。
0.11版本以前kafka使用高水位机制保证数据的同步即副本的高水位之前的数据都是与leader同步过的, LEO:log end offset标识的是每个分区中消息要写入的位置,每个分区的每个副本都有自己的LEO,采用高水位进行主从同步的问题:丢数据;或数据不一致。
丢数据:A还没来得及使水位线与B相同(即A的水位低于B),此时A发生了重启,重启后A自然会对数据进行截断,A截断完之后B挂了,此时A成为了主,数据丢失
数据不一致:A的水位是0,里边一个记录m0,B的水位是1,里边的数据是m0,m1,此时A,B都挂了,然后A先重启,A是主机,此时A写入一个记录m3,A的水位变成了1。此时B重启,发现A,B水位一样,认为A,B同步完成,造成数据不一样。
在0.11版本之后引入了Leader Epoch(版本),不用高水位作为数据截断的依据,任意一个Leader都有一个Leader Epoch,每个消息都带有Leader Epoch号,在每个分区中都会建新的Leader Epoch Sequence文件,在其中存储序列号以及该Epoch中生成的消息的start offset,并将它缓存再每一个副本,也缓存在broker的内存中。 主机和副本的Leader Epoch可以不一样,恢复时按照不同情况恢复即可,保证最后主机和副本同步。
2个线程,主线程负责产生消息,然后通过拦截器,序列化器,分区器,作用之后缓存到累加器。 sender线程负责将累加器中的消息发送到kafka中
在每一个线程中新建一个kafkaConsumer, 单线程创建kafkaConsumer,多个处理线程处理消息,难点在于是否考虑消息顺序性,offset提交方式。
数据可靠性:RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步Replicaton;kafka异步刷盘,异步Replication. RocketMQ的同步刷盘在单机可靠性上比kafka更高,不会因为操作系统crash,导致数据丢失。且同步Replication比kafka异步Replication更可靠。数据完全无单点。另外由于kafka是异步Replication,主备切换时有数据丢失问题
性能:单机情况下kafka更高;rocketMQ是基于java,若producer缓存很多消息,GC是个大问题,且若宕机消息可能丢失
单机支持的队列数:kafka超过64个队列时load会发生飙高的情况;而rocketMQ单机最高支持5万个队列,load不会发生明显影响。队列多可创建更多的topic。
消费失败重试:kafka消费失败不支持重试;RocketMQ支持定时重试,每次重试时间间隔顺延。
严格的消息顺序:kafka一个broker挂了就会发生消息乱序,RocketMQ支持严格的消息顺序,一个挂机消息会发送失败,但不会乱序。
定时消息:kafka不支持定时消息;RocketMQ支持毫秒级延时时间。
分布式事物:kafka不支持分布式事物消息;RocketMQ支持
手动提交过程是在spark的driver端完成的,driver端有个主线程和循环线程,提交过程是在循环线程完成的。 通过kafkaUtils.createDirectStream得到的DStream的类型是DirectKafkaInputStream;kafkaRDD本身是私有的,它的父接口(父trait)是HasOffsetRanges,DirectKafkaInputStream是由RDD组成,所以DirectKafkaInputStream里边包裹了kafkaRDD;而DirectKafkaInputStream的父接口(父trait)是CanCommitOffsets 所以手动提交kafka的offset是通过第一个提交的DStream得到的,每个job都有一个自己的offset,所以要放到循环线程里去执行,所以提交offset的线程是在循环里。 维护offset的目的:持久化 何时用起你维护的offset:application重启的时候,即driver重启的时候 若手动维护offset到第三方工具需要两头堵 kafka.foreachRDD( //将提交offset的代码放到DStream对象的接收函数里,那么未来在调度线程里,这个函数每个job都有机会调用一次,伴随着提交offset rdd=>{ val ranges:Array[offsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRanges } //driver端就这样拿到了offset //闭包,通过kafkaUtils得到的第一个DStream向上转型,提交offset kafka.asInstanceOf[CanCommitOffsets].commitAsync(ranges)//异步提交,回调它会有延迟
用kafka的seek修正,通过seek修正去找最早的offset或者最晚的offset。
若获取offset失败,先获取分区信息: val topicp=consumer.assignment().toSet;
再暂停消费:consumer.pause(topicp);
最后seek修正:consumer.seekToBeginning(topicp) 或者consumer.seekToEnd(topicp) )