安装包
链接:https://pan.baidu.com/s/17RE9B2SSDsHNbSQysOU8hg 提取码:rpyk1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传到hadoop102的/opt/module目录 2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop103
[root@hadoop102 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar3)在/root/bin目录下创建脚本lg.sh
[root@hadoop102 bin]$ vim lg.sh4)在脚本中编写如下内容
#! /bin/bash for i in hadoop102 hadoop103 do ssh $i " source /etc/profile ; java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain $1 $2 >/opt/module/test.log &" done5)修改脚本执行权限
[root@hadoop102 bin]$ chmod +x lg.sh6)启动脚本
[root@hadoop102 module]$ lg.sh1)集群规划
服务器hadoop102服务器hadoop103服务器hadoop104Flume(日志采集)FlumeFlume2)Flume配置分析 Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。 3)Flume的具体配置如下: (1)在CM管理页面上点击Flume (2)点击实例
(3)点击hadoop102的Agent选项
(4)点击配置 (5)对Flume Agent进行具体配置
内容如下
a1.sources=r1 a1.channels=c1 c2 a1.sinks=k1 k2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder # selector a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = memory a1.channels.c1.capacity=10000 a1.channels.c1.byteCapacityBufferPercentage=20 a1.channels.c2.type = memory a1.channels.c2.capacity=10000 a1.channels.c2.byteCapacityBufferPercentage=20 # configure sink # start-sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic_start a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 # event-sink a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.topic = topic_event a1.sinks.k2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k2.kafka.flumeBatchSize = 2000 a1.sinks.k2.kafka.producer.acks = 1 a1.sinks.k2.channel = c2注意:com.atguigu.flume.interceptor.LogETLInterceptor和com.atguigu.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类需要根据用户自定义的拦截器做相应修改。 (6)在hadoop103上重复相同的操作名。
1)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop102的/opt/cloudera/parcels/CDH/lib/flume-ng/lib文件夹下面。 2)分发Flume到hadoop103
[root@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar1)集群规划
服务器hadoop102服务器hadoop103服务器hadoop104Flume(消费Kafka)Flume2)Flume配置分析 3)Flume的具体配置如下: (1)在CM管理页面hadoop104上Flume的配置中找到代理名称
a1在配置文件如下内容(kafka-hdfs)
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.proxyUser=hive a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.proxyUser=hive a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c21)确保Flume、Kafka等服务正常运行
2)在HDFS创建/origin_data路径,并修改所有者为hive
sudo -u hdfs hadoop fs -mkdir /origin_data sudo -u hdfs hadoop fs -chown hive:hive /origin_data3)调用日志生成脚本
lg.sh原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
1)创建数据仓库目录,并修改所有者
sudo -u hdfs hadoop fs -mkdir /warehouse sudo -u hdfs hadoop fs -chown hive:hive /warehouse2)修改hive配置
3)启动Hive客户端(注意要以hive用户启动)
sudo -u hive hive4)创建gmall数据库
hive (default)> create database gmall;说明:如果数据库存在且有数据,需要强制删除时执行:drop database gmall cascade; 5)使用gmall数据库
hive (default)> use gmall;说明Hive的LZO压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO
1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim ods_log.sh在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi echo "===日志日期为 $do_date===" sql=" load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table "$APP".ods_start_log partition(dt='$do_date'); " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"说明1: [ -n 变量值 ] 判断变量的值,是否为空 – 变量的值,非空,返回true – 变量的值,为空,返回false 说明2: 查看date命令的使用,[root@hadoop102 ~]$ date --help 2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 ods_log.sh3)脚本使用
[root@hadoop102 module]$ ods_log.sh 2019-09-031)建表语句
drop table if exists dwd_start_log; CREATE EXTERNAL TABLE dwd_start_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `entry` string, `open_ad_type` string, `action` string, `loading_time` string, `detail` string, `extend1` string ) PARTITIONED BY (dt string) location '/warehouse/gmall/dwd/dwd_start_log/';1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim dwd_start_log.sh在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_start_log PARTITION (dt='$do_date') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date'; " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 dwd_start_log.sh3)脚本使用
[root@hadoop102 module]$ dwd_start_log.sh 2019-09-03目标:统计当日、当周、当月活动的每个设备明细
1)建表语句
drop table if exists dws_uv_detail_day; create external table dws_uv_detail_day ( `mid_id` string COMMENT '设备唯一标识', `user_id` string COMMENT '用户标识', `version_code` string COMMENT '程序版本号', `version_name` string COMMENT '程序版本名', `lang` string COMMENT '系统语言', `source` string COMMENT '渠道号', `os` string COMMENT '安卓系统版本', `area` string COMMENT '区域', `model` string COMMENT '手机型号', `brand` string COMMENT '手机品牌', `sdk_version` string COMMENT 'sdkVersion', `gmail` string COMMENT 'gmail', `height_width` string COMMENT '屏幕宽高', `app_time` string COMMENT '客户端日志产生时的时间', `network` string COMMENT '网络模式', `lng` string COMMENT '经度', `lat` string COMMENT '纬度' ) partitioned by(dt string) stored as parquet location '/warehouse/gmall/dws/dws_uv_detail_day' ;1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim dws_log.sh在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dws_uv_detail_day partition(dt='$do_date') select mid_id, concat_ws('|', collect_set(user_id)) user_id, concat_ws('|', collect_set(version_code)) version_code, concat_ws('|', collect_set(version_name)) version_name, concat_ws('|', collect_set(lang)) lang, concat_ws('|', collect_set(source)) source, concat_ws('|', collect_set(os)) os, concat_ws('|', collect_set(area)) area, concat_ws('|', collect_set(model)) model, concat_ws('|', collect_set(brand)) brand, concat_ws('|', collect_set(sdk_version)) sdk_version, concat_ws('|', collect_set(gmail)) gmail, concat_ws('|', collect_set(height_width)) height_width, concat_ws('|', collect_set(app_time)) app_time, concat_ws('|', collect_set(network)) network, concat_ws('|', collect_set(lng)) lng, concat_ws('|', collect_set(lat)) lat from "$APP".dwd_start_log where dt='$do_date' group by mid_id; " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 dws_log.sh3)脚本使用
[root@hadoop102 module]$ dws_log.sh 2019-09-03目标:当日活跃设备数
1)建表语句
drop table if exists ads_uv_count; create external table ads_uv_count( `dt` string COMMENT '统计日期', `day_count` bigint COMMENT '当日用户数量' ) COMMENT '活跃设备数' row format delimited fields terminated by '\t' location '/warehouse/gmall/ads/ads_uv_count/' ;1)在hadoop102的/root/bin目录下创建脚本
[root@hadoop102 bin]$ vim ads_uv_log.sh在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert into table "$APP".ads_uv_count select '$do_date' dt, daycount.ct from ( select '$do_date' dt, count(*) ct from "$APP".dws_uv_detail_day where dt='$do_date' )daycount; " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"2)增加脚本执行权限
[root@hadoop102 bin]$ chmod 777 ads_uv_log.sh3)脚本使用
[root@hadoop102 module]$ ads_uv_log.sh 2019-09-03