Flume案例——自定义interceptor处理数据,并使用mutilplexing selector将数据分路存储

    科技2024-04-13  76

    实际需求:

    将多个日志文件中的数据分类处理,采集出不同业务的数据,然后分路存储

    主要知识点:

    自定义interceptor使用multiplexing selector将数据分路存储

    1.模拟日志生成器

    可以写一个shell脚本,模拟生成日志数据,规定日志数据格式:uid,behavior,type,timestamp

    while true do if [ $(($RANDOM % 2)) -eq 0] then echo "u$RANDOM,e1,shop,`date +%s`000" >> test.log else echo "u$RANDOM,e1,food,`date +%s`000" >> test.log fi sleep 0.2 done

    此处仅为测试使用,因此只随机生成shopping和food两个业务的日志数据

    注意:上述脚本若直接在shell中执行,则最好变为unix格式

    2.创建agent的配置文件

    ## 到flume的安装目录下,创建agentconf,即存放agent配置文件的文件夹 ## 后缀名为 conf / properties 都可以 vi multiplexing.conf

    将如下配置信息写入

    a1.sources = r1 ## source名字 a1.channels = c1 c2 ## channel名字,此处有两个channel a1.sinks = k1 k2 ## sink的名字,有两个sink a1.sources.r1.channels = c1 c2 ## r1 连接两个 channel c1和c2 a1.sources.r1.type = TAILDIR ## source的采集方式为 taildir a1.sources.r1.filegroups = g1 ## r1监控文件组g1,监控指定路径下的文件 a1.sources.r1.filegroups.g1 = /root/test.* a1.sources.r1.fileHeader = false ## 不开启 fileheader a1.sources.r1.interceptors = i1 ## source中的event先传给拦截器i1 a1.sources.r1.interceptors.i1.type = Test.MultiplexingInterceptorDemo$MultiplexingInterceptorDemoBuilder ## i1的拦截器为自定义拦截器 a1.sources.r1.interceptors.i1.flagfield = 2 ## i1中获取event中指定的数据 a1.sources.r1.interceptors.i1.timestampfield = 3 a1.sources.r1.selector.type = multiplexing ## 选择器类型为 multiplexing,多路选择器 a1.sources.r1.selector.header = flag ## 指定header中的key为flag a1.sources.r1.selector.mapping.shop = c1 ## 若对应flag的值为 shop,将数据传给c1 a1.sources.r1.selector.mapping.food = c2 ## 若对应flag的值为 food,将数据传给c2 a1.sources.r1.selector.default = c2 ## 默认传给c2 a1.channels.c1.type = memory ## c1和c2的类型都为 memory channel a1.channels.c1.capacity = 2000 ## c1的容量 a1.channels.c1.transactionCapacity = 1000 ## c1的事务容量 a1.channels.c2.type = memory a1.channels.c2.capacity = 2000 a1.channels.c2.transactionCapacity = 1000 a1.sinks.k1.channel = c1 ## k1连接c1,从c1中取数据 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink ## k1的类型,将数据传到kafka a1.sinks.k1.kafka.bootstrap.servers = linux01:9092,linux01:9092,linux01:9092 ## 三台broker地址 a1.sinks.k1.kafka.topic = shop ## 传入kafka的shop中 a1.sinks.k1.kafka.producer.acks = 1 ## 生产者的通知代号,决定吞吐量大小 a1.sinks.k2.channel = c2 ## k2连接c2 a1.sinks.k2.type = hdfs ## k2的类型是 hdfs a1.sinks.k2.hdfs.path = hdfs://linux01:8020/food/%Y-%m-%d/%H ## 数据存放路径 a1.sinks.k2.hdfs.filePrefix = test-log- a1.sinks.k2.hdfs.fileSuffix = .log a1.sinks.k2.hdfs.rollSize = 268435456 a1.sinks.k2.hdfs.rollInterval = 120 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.batchSize = 1000 a1.sinks.k2.hdfs.fileType = DataStream ## 文件类型 a1.sinks.k2.hdfs.useLocalTimeStamp = false

    3.创建一个Java类,继承Interceptor

    此处的拦截器(Interceptor)逻辑是根据具体需求具体实现的,要重写Interceptor中的多个方法

    public class MultiplexingInterceptorDemo implements Interceptor { //创建成员变量,接收传出的 flagfield private Integer flagfield = 0; //使用有参构造给成员变量 flagfield赋值 public MultiplexingInterceptorDemo(Integer flagfield) { this.flagfield = flagfield; } /** * 拦截器构造实例后的初始化操作 */ @Override public void initialize() { } /** * 接收一个 event ,处理后 ,返回一个event * @param event * @return */ @Override public Event intercept(Event event) { //根据 event 中的数据内容,以及参数中指定的标记字段,产生不同的header值 byte[] body = event.getBody(); //将 字节数组转换为 String String line = new String(body); //将result按照逗号分割 String[] split = line.split(","); //因为flagfield是一个Int类型的值,因此会取出数组中对应的值,即会取出shop/food String flag = split[this.flagfield]; //再将flag放入header中,header是一个hashmap,给后续选择器进行使用 event.getHeaders().put("flag",flag); //最后再将 event 返回 return event; } /** * 接收一个存放event 的 List,返回的也是一个 处理后的存放 event 的 List * @param list * @return */ @Override public List<Event> intercept(List<Event> list) { //可以直接调用上述方法 for (Event event : list) { intercept(event); } //由于会将event取出,处理以后再放回list中,所以可以直接返回list return list; } /** * 收尾操作 */ @Override public void close() { } /** * 程序执行开始,首先运行的是MultiplexingInterceptorDemoBuilder * 然后得到 configure ,得到配置信息 * 再 new 一个实例,传到外部类中 */ public static class MultiplexingInterceptorDemoBuilder implements Interceptor.Builder{ //创建成员变量 Integer flagfield = 0; /** * 用户构建一个拦截器实例 * @return */ @Override public Interceptor build() { //得到flagfield后,new一个实例 return new MultiplexingInterceptorDemo(flagfield); } /** * 获取配置参数的入口 * @param context */ @Override public void configure(Context context) { flagfield = context.getInteger("flagfield"); } } }

    上述代码只为理清逻辑,代码本身还不够健壮

    4.启动Kafka、Zookeeper、hdfs

    由于Kafka依赖于zookeeper,因此要先启动zookeeper,可以创建多个脚本文件,同时启动多台机器的zk和Kafka

    5.启动flume agent

    ## 进入到flume安装目录 bin/flume-ng agent -c conf/ -f agentconf/multiplexing.conf -n a1 -Dflume.root.logger=DEBUG,console

    6.得到分流结果

    6.1Kafka中

    使用消费者消费数据

    bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic shop

    6.2HDFS中

     

     

     

     

    Processed: 0.009, SQL: 8