大数据之CDH数仓(8) | 数仓之用户行为数仓搭建

    科技2022-07-10  216

    目录

    用户行为日志生成日志数据导入数仓日志采集Flume配置Flume拦截器消费Kafka Flume配置模拟生成日志 ODS层创建数据库创建启动日志表ods_start_logODS层加载数据脚本 DWD层启动表数据解析创建启动表DWD层启动表加载数据脚本 DWS层(需求:用户日活跃)每日活跃设备明细DWS层加载数据脚本 ADS层(需求:用户日活跃)活跃设备数ADS层加载数据脚本

    用户行为日志生成

    安装包

    链接:https://pan.baidu.com/s/17RE9B2SSDsHNbSQysOU8hg 提取码:rpyk

    1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传到hadoop102的/opt/module目录 2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop103

    [root@hadoop102 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

    3)在/root/bin目录下创建脚本lg.sh

    [root@hadoop102 bin]$ vim lg.sh

    4)在脚本中编写如下内容

    #! /bin/bash for i in hadoop102 hadoop103 do ssh $i " source /etc/profile ; java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain $1 $2 >/opt/module/test.log &" done

    5)修改脚本执行权限

    [root@hadoop102 bin]$ chmod +x lg.sh

    6)启动脚本

    [root@hadoop102 module]$ lg.sh

    日志数据导入数仓

    日志采集Flume配置

    1)集群规划

    服务器hadoop102服务器hadoop103服务器hadoop104Flume(日志采集)FlumeFlume

    2)Flume配置分析 Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。 3)Flume的具体配置如下: (1)在CM管理页面上点击Flume (2)点击实例

    (3)点击hadoop102的Agent选项

    (4)点击配置 (5)对Flume Agent进行具体配置

    内容如下

    a1.sources=r1 a1.channels=c1 c2 a1.sinks=k1 k2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder # selector a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = memory a1.channels.c1.capacity=10000 a1.channels.c1.byteCapacityBufferPercentage=20 a1.channels.c2.type = memory a1.channels.c2.capacity=10000 a1.channels.c2.byteCapacityBufferPercentage=20 # configure sink # start-sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic_start a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 # event-sink a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.topic = topic_event a1.sinks.k2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k2.kafka.flumeBatchSize = 2000 a1.sinks.k2.kafka.producer.acks = 1 a1.sinks.k2.channel = c2

    注意:com.atguigu.flume.interceptor.LogETLInterceptor和com.atguigu.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类需要根据用户自定义的拦截器做相应修改。 (6)在hadoop103上重复相同的操作名。

    Flume拦截器

    1)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop102的/opt/cloudera/parcels/CDH/lib/flume-ng/lib文件夹下面。 2)分发Flume到hadoop103

    [root@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar

    消费Kafka Flume配置

    1)集群规划

    服务器hadoop102服务器hadoop103服务器hadoop104Flume(消费Kafka)Flume

    2)Flume配置分析 3)Flume的具体配置如下: (1)在CM管理页面hadoop104上Flume的配置中找到代理名称

    a1

    在配置文件如下内容(kafka-hdfs)

    ## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.proxyUser=hive a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.proxyUser=hive a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2

    模拟生成日志

    1)确保Flume、Kafka等服务正常运行

    2)在HDFS创建/origin_data路径,并修改所有者为hive

    sudo -u hdfs hadoop fs -mkdir /origin_data sudo -u hdfs hadoop fs -chown hive:hive /origin_data

    3)调用日志生成脚本

    lg.sh

    ODS层

    原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。

    创建数据库

    1)创建数据仓库目录,并修改所有者

    sudo -u hdfs hadoop fs -mkdir /warehouse sudo -u hdfs hadoop fs -chown hive:hive /warehouse

    2)修改hive配置

    3)启动Hive客户端(注意要以hive用户启动)

    sudo -u hive hive

    4)创建gmall数据库

    hive (default)> create database gmall;

    说明:如果数据库存在且有数据,需要强制删除时执行:drop database gmall cascade; 5)使用gmall数据库

    hive (default)> use gmall;

    创建启动日志表ods_start_log

    hive (gmall)> drop table if exists ods_start_log; CREATE EXTERNAL TABLE ods_start_log (`line` string) PARTITIONED BY (`dt` string) STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/warehouse/gmall/ods/ods_start_log';

    说明Hive的LZO压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO

    ODS层加载数据脚本

    1)在hadoop102的/root/bin目录下创建脚本

    [root@hadoop102 bin]$ vim ods_log.sh

    在脚本中编写如下内容

    #!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi echo "===日志日期为 $do_date===" sql=" load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table "$APP".ods_start_log partition(dt='$do_date'); " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"

    说明1: [ -n 变量值 ] 判断变量的值,是否为空 – 变量的值,非空,返回true – 变量的值,为空,返回false 说明2: 查看date命令的使用,[root@hadoop102 ~]$ date --help 2)增加脚本执行权限

    [root@hadoop102 bin]$ chmod 777 ods_log.sh

    3)脚本使用

    [root@hadoop102 module]$ ods_log.sh 2019-09-03

    DWD层启动表数据解析

    创建启动表

    1)建表语句

    drop table if exists dwd_start_log; CREATE EXTERNAL TABLE dwd_start_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `entry` string, `open_ad_type` string, `action` string, `loading_time` string, `detail` string, `extend1` string ) PARTITIONED BY (dt string) location '/warehouse/gmall/dwd/dwd_start_log/';

    DWD层启动表加载数据脚本

    1)在hadoop102的/root/bin目录下创建脚本

    [root@hadoop102 bin]$ vim dwd_start_log.sh

    在脚本中编写如下内容

    #!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_start_log PARTITION (dt='$do_date') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date'; " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"

    2)增加脚本执行权限

    [root@hadoop102 bin]$ chmod 777 dwd_start_log.sh

    3)脚本使用

    [root@hadoop102 module]$ dwd_start_log.sh 2019-09-03

    DWS层(需求:用户日活跃)

    目标:统计当日、当周、当月活动的每个设备明细

    每日活跃设备明细

    1)建表语句

    drop table if exists dws_uv_detail_day; create external table dws_uv_detail_day ( `mid_id` string COMMENT '设备唯一标识', `user_id` string COMMENT '用户标识', `version_code` string COMMENT '程序版本号', `version_name` string COMMENT '程序版本名', `lang` string COMMENT '系统语言', `source` string COMMENT '渠道号', `os` string COMMENT '安卓系统版本', `area` string COMMENT '区域', `model` string COMMENT '手机型号', `brand` string COMMENT '手机品牌', `sdk_version` string COMMENT 'sdkVersion', `gmail` string COMMENT 'gmail', `height_width` string COMMENT '屏幕宽高', `app_time` string COMMENT '客户端日志产生时的时间', `network` string COMMENT '网络模式', `lng` string COMMENT '经度', `lat` string COMMENT '纬度' ) partitioned by(dt string) stored as parquet location '/warehouse/gmall/dws/dws_uv_detail_day' ;

    DWS层加载数据脚本

    1)在hadoop102的/root/bin目录下创建脚本

    [root@hadoop102 bin]$ vim dws_log.sh

    在脚本中编写如下内容

    #!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dws_uv_detail_day partition(dt='$do_date') select mid_id, concat_ws('|', collect_set(user_id)) user_id, concat_ws('|', collect_set(version_code)) version_code, concat_ws('|', collect_set(version_name)) version_name, concat_ws('|', collect_set(lang)) lang, concat_ws('|', collect_set(source)) source, concat_ws('|', collect_set(os)) os, concat_ws('|', collect_set(area)) area, concat_ws('|', collect_set(model)) model, concat_ws('|', collect_set(brand)) brand, concat_ws('|', collect_set(sdk_version)) sdk_version, concat_ws('|', collect_set(gmail)) gmail, concat_ws('|', collect_set(height_width)) height_width, concat_ws('|', collect_set(app_time)) app_time, concat_ws('|', collect_set(network)) network, concat_ws('|', collect_set(lng)) lng, concat_ws('|', collect_set(lat)) lat from "$APP".dwd_start_log where dt='$do_date' group by mid_id; " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"

    2)增加脚本执行权限

    [root@hadoop102 bin]$ chmod 777 dws_log.sh

    3)脚本使用

    [root@hadoop102 module]$ dws_log.sh 2019-09-03

    ADS层(需求:用户日活跃)

    目标:当日活跃设备数

    活跃设备数

    1)建表语句

    drop table if exists ads_uv_count; create external table ads_uv_count( `dt` string COMMENT '统计日期', `day_count` bigint COMMENT '当日用户数量' ) COMMENT '活跃设备数' row format delimited fields terminated by '\t' location '/warehouse/gmall/ads/ads_uv_count/' ;

    ADS层加载数据脚本

    1)在hadoop102的/root/bin目录下创建脚本

    [root@hadoop102 bin]$ vim ads_uv_log.sh

    在脚本中编写如下内容

    #!/bin/bash # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert into table "$APP".ads_uv_count select '$do_date' dt, daycount.ct from ( select '$do_date' dt, count(*) ct from "$APP".dws_uv_detail_day where dt='$do_date' )daycount; " beeline -u "jdbc:hive2://hadoop102:10000/" -n hive -e "$sql"

    2)增加脚本执行权限

    [root@hadoop102 bin]$ chmod 777 ads_uv_log.sh

    3)脚本使用

    [root@hadoop102 module]$ ads_uv_log.sh 2019-09-03
    Processed: 0.050, SQL: 8