1:问题场景: 通过flume同步mysql数据到kafka,再由flink消费,kafka分区功能可以提高消费能里,但是我们必须把一些有相同属性的数据放在同一个partition中。
2:flume对kafkaSink提供两种指定partition的配置
defaultPartitionId – 指定此通道中要发送到的所有事件的Kafka分区ID(整数),除非partitionIdHeader覆盖。默认情况下,如果没有设置此属性,则事件将由Kafka生成器的分词器分发——如果指定,包括按键分发(或者由Kafka .partitioner.class指定的分词器分发)。
partitionIdHeader – 设置后,接收器将从事件头获取使用此属性值命名的字段的值,并将消息发送到主题的指定分区。如果该值表示一个无效的分区,则将抛出一个EventDeliveryException。如果头值存在,则此设置将覆
3:怎么解决自定义分区问题: 我们解决问题1的方法利用了partitionIdHeader 配置,就是想方法给event的头添加上我们希望他存在的partition,做法是创建maven工程,继承flume的Interceptor拦截器类,对event进行处理,具体步骤这篇博客不详细记录,只记录遇到的问题,后续博客补充。
4:flume和mysql关系: 我为了检测结果曾经把mysql数据清空方便做验证,但是导致了我的脚本获取不到数据的更新,这个问题我被困扰了很久,我一开始并不知道与我删除数据有关,结果浪费了很多时间排查,最后的解决方案是把脚本里配置的state缓存文件删除,再删除数据库从新开始处理数据
5:为什么flume推送的数据是重复的 推送到kafka的数据我发现并不是我mysql最新新增的数据,并且因为我这个问题让我思考,flume是怎么做到推送最新数据的?经过查找我发现需要设置一个数字类型的自增id,才能实现推送最新数据,且在保存state的文件里记录着上次消费到的最大数,我曾在网上看到有人说flume的同步是做查询同步与canal拉取mysql日志不同,flume是消耗mysql查询资源的。所以我猜测flume的最新数据推送很可能就是类似mysql做了排序分页查询出来的。感觉在高质量数据同步的情况下并不敢贸然使用。
6:数据延迟场景: 在保证实用同一个id自增逻辑的数据,有多台机器新增到某台mysql机器,那也许到mysql的新增数据可能是 01273,45689,在我的测试中,第一次kafka收到的数据会是01273,第二次收到的数据是56789,所以就漏了一个4,重复了一个7。所以对于精确度要求高的数据不该使用flume计算