1)本地登录(无需认证)
[root@hadoop102 ~]# kadmin.local Authenticating as principal root/admin@HADOOP.COM with password. kadmin.local:2)远程登录(需进行主体认证,先认证刚刚创建的管理员主体)
[root@hadoop103 ~]# kadmin Authenticating as principal admin/admin@HADOOP.COM with password. Password for admin/admin@HADOOP.COM: kadmin:退出输入:exit
Kerberos提供了两种认证方式,一种是通过输入密码认证,另一种是通过keytab密钥文件认证,但两种方式不可同时使用。
1)使用kinit进行主体认证
[root@hadoop102 ~]# kinit donglin/donglin Password for admin/admin@HADOOP.COM:2)查看认证凭证
[root@hadoop102 ~]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: donglin/donglin@HADOOP.COM Valid starting Expires Service principal 10/27/2019 18:23:57 10/28/2019 18:23:57 krbtgt/HADOOP.COM@HADOOP.COM renew until 11/03/2019 18:23:571)生成主体admin/admin的keytab文件到指定目录/root/admin.keytab
[root@hadoop102 ~]# kadmin.local -q "xst -k /root/donglin.keytab donglin/donglin@HADOOP.COM"2)使用keytab进行认证
[root@hadoop102 ~]# kinit -kt /root/donglin.keytab donglin/donglin3)查看认证凭证
[root@hadoop102 ~]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: donglin/donglin@HADOOP.COM Valid starting Expires Service principal 08/27/19 15:41:28 08/28/19 15:41:28 krbtgt/HADOOP.COM@HADOOP.COM renew until 08/27/19 15:41:28Kerberos 加密类型:aes128-cts、des3-hmac-sha1、arcfour-hmac
查看用户名主体
kadmin.local -q "list_principals"
在启用Kerberos之后,系统与系统(flume-kafka)之间的通讯,以及用户与系统(user-hdfs)之间的通讯都需要先进行安全认证,认证通过之后方可进行通讯。 故在启用Kerberos后,数仓中使用的脚本等,均需要加入一步安全认证的操作,才能正常工作。
开启Kerberos安全认证之后,日常的访问服务(例如访问HDFS,消费Kafka topic等)都需要先进行安全认证 1)在Kerberos数据库中创建用户主体/实例
[root@hadoop102 ~]# kadmin.local -q "addprinc hive/hive@HADOOP.COM"2)进行用户认证
[root@hadoop102 ~]# kinit hive/hive@HADOOP.COM3)访问HDFS
[root@hadoop102 ~]# hadoop fs -ls / Found 4 items drwxr-xr-x - hive hive 0 2019-10-02 01:29 /origin_data drwxrwxrwt - hdfs supergroup 0 2019-10-03 00:20 /tmp drwxr-xr-x - hdfs supergroup 0 2019-10-02 01:35 /user drwxr-xr-x - hive hive 0 2019-10-02 01:38 /warehouse4)hive查询
[root@hadoop102 ~]# hive WARNING: Use "yarn jar" to launch YARN applications. Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/jars/hive-common-2.1.1-cdh6.2.1.jar!/hive-log4j2.properties Async: false WARNING: Hive CLI is deprecated and migration to Beeline is recommended. hive>5)消费Kafka topic (1)修改Kafka配置
1在Kafka的配置项搜索“security.inter.broker.protocol”,设置为SALS_PLAINTEXT。
2在Kafka的配置项搜索“ssl.client.auth”,设置为none。 (2)创建jaas.conf文件
[root@hadoop102 hive]# vim /var/lib/hive/jaas.conf文件内容如下
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; };(3)创建consumer.properties文件
[root@hadoop102 conf]# vim /etc/kafka/conf/consumer.properties文件内容如下
security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka(4)声明jass.conf文件路径
[root@hadoop102 conf]# export KAFKA_OPTS="-Djava.security.auth.login.config=/var/lib/hive/jaas.conf"(5)使用kafka-console-consumer消费Kafka topic数据
[root@hadoop102 ~]# kafka-console-consumer --bootstrap-server hadoop102:9092 --topic topic_start --from-beginning --consumer.config /etc/kafka/conf/consumer.properties6)HDFS WebUI浏览器认证 我们设置CDH支持kerberos后会出现下图所示的情况: 可以登录9870,但是不能查看目录及文件,这是由于我们本地环境没有通过认证。 接下来我们设置本地验证。 注意:由于浏览器限制问题,我们这里使用火狐浏览器,其他如:谷歌,ie等均会出现问题。 (1) 下载火狐 (2)设置浏览器
1打开火狐浏览器,在地址栏输入:about:config,进入设置页面。
3搜索“network.negotiate-auth.trusted-uris”,修改值为自己的服务器主机名。
搜索“network.auth.use-sspi”,双击将值变为false。
(3)安装kfw
1安装提供的kfw-4.1-amd64.msi。 2将集群的/etc/krb5.conf文件的内容复制到C:\ProgramData\MIT\Kerberos5\krb.ini中,删除和路径相关的配置。 [logging] [libdefaults] default_realm = HADOOP.COM dns_lookup_realm = false dns_lookup_kdc = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true udp_preference_limit = 1 [realms] HADOOP.COM = { kdc = hadoop102 admin_server = hadoop102 } [domain_realm]○3打开MIT,输入主体名和密码: (4)测试
1)日志采集Flume配置 日志采集Flume,数据被发送到了Kafka,该Flume相当于一个Kafka生产者。所以需要我们进行上述Kafka客户端的安全认证。但是此处不需要我们进行手动配置,在开启Kerberos后,CM会自动进行配置。 2)消费Kafka Flume配置 消费Kafka Flume,将数据从Kafka传输到HDFS,该Flume相当于一个Kafka消费者。所以也需要我们进行上述Kafka客户端的安全认证(无需手动认证,CM会自动进行配置)。除此之外,我们还需要进行HDFS客户端的安全认证,这需要我们手动配置。 (1)生成hive用户的keytab文件 用户认证的方式有“输入密码”和“使用keytab密钥文件”两种方式,此处需使用keytab密钥文件进行认证。
[root@hadoop102 hive]# kadmin.local -q "xst -k /var/lib/hive/hive.keytab hive/hive@HADOOP.COM"(2)增加读权限
chmod +r /var/lib/hive/hive.keytab(3)分发keytab文件
xsync /var/lib/hive/hive.keytab(4)修改flume agent配置文件
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs #a1.sinks.k1.hdfs.proxyUser=hive a1.sinks.k1.hdfs.kerberosPrincipal=hive/hive@HADOOP.COM a1.sinks.k1.hdfs.kerberosKeytab=/var/lib/hive/hive.keytab a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs #a1.sinks.k2.hdfs.proxyUser=hive a1.sinks.k2.hdfs.kerberosPrincipal=hive/hive@HADOOP.COM a1.sinks.k2.hdfs.kerberosKeytab=/var/lib/hive/hive.keytab a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c23)ods层 编辑ods层数据导入脚本
[root@hadoop102 bin]# vim ods_db.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi echo "===日志日期为 $do_date===" sql=" load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table "$APP".ods_start_log partition(dt='$do_date'); " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"4)dwd层 编辑dwd数据导入脚本
[root@hadoop102 bin]# vim dwd_start_log.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_start_log PARTITION (dt='$do_date') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date'; " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"5)dws层 编辑dws数据导入脚本
[root@hadoop102 bin]$ vim dws_log.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dws_uv_detail_day partition(dt='$do_date') select mid_id, concat_ws('|', collect_set(user_id)) user_id, concat_ws('|', collect_set(version_code)) version_code, concat_ws('|', collect_set(version_name)) version_name, concat_ws('|', collect_set(lang)) lang, concat_ws('|', collect_set(source)) source, concat_ws('|', collect_set(os)) os, concat_ws('|', collect_set(area)) area, concat_ws('|', collect_set(model)) model, concat_ws('|', collect_set(brand)) brand, concat_ws('|', collect_set(sdk_version)) sdk_version, concat_ws('|', collect_set(gmail)) gmail, concat_ws('|', collect_set(height_width)) height_width, concat_ws('|', collect_set(app_time)) app_time, concat_ws('|', collect_set(network)) network, concat_ws('|', collect_set(lng)) lng, concat_ws('|', collect_set(lat)) lat from "$APP".dwd_start_log where dt='$do_date' group by mid_id; " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"6)ads层 编辑ads数据导入脚本
[root@hadoop102 bin]# vim ads_uv_log.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert into table "$APP".ads_uv_count select '$do_date' dt, daycount.ct from ( select '$do_date' dt, count(*) ct from "$APP".dws_uv_detail_day where dt='$do_date' )daycount; " beeline -u "jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"1)sqoop导入 编辑sqoop导入脚本
[root@hadoop102 bin]# vim sqoop_import.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive #export HADOOP_USER_NAME=hive db_date=$2 echo $db_date db_name=gmall import_data() { sqoop import \ --connect jdbc:mysql://hadoop102:3306/$db_name \ --username root \ --password Yy8266603@ \ --target-dir /origin_data/$db_name/db/$1/$db_date \ --delete-target-dir \ --num-mappers 1 \ --fields-terminated-by "\t" \ --query "$2"' and $CONDITIONS;' } import_sku_info(){ import_data "sku_info" "select id, spu_id, price, sku_name, sku_desc, weight, tm_id, category3_id, create_time from sku_info where 1=1" } import_user_info(){ import_data "user_info" "select id, name, birthday, gender, email, user_level, create_time from user_info where 1=1" } import_base_category1(){ import_data "base_category1" "select id, name from base_category1 where 1=1" } import_base_category2(){ import_data "base_category2" "select id, name, category1_id from base_category2 where 1=1" } import_base_category3(){ import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1" } import_order_detail(){ import_data "order_detail" "select od.id, order_id, user_id, sku_id, sku_name, order_price, sku_num, o.create_time from order_info o , order_detail od where o.id=od.order_id and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'" } import_payment_info(){ import_data "payment_info" "select id, out_trade_no, order_id, user_id, alipay_trade_no, total_amount, subject, payment_type, payment_time from payment_info where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'" } import_order_info(){ import_data "order_info" "select id, total_amount, order_status, user_id, payment_way, out_trade_no, create_time, operate_time from order_info where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')" } case $1 in "base_category1") import_base_category1 ;; "base_category2") import_base_category2 ;; "base_category3") import_base_category3 ;; "order_info") import_order_info ;; "order_detail") import_order_detail ;; "sku_info") import_sku_info ;; "user_info") import_user_info ;; "payment_info") import_payment_info ;; "all") import_base_category1 import_base_category2 import_base_category3 import_order_info import_order_detail import_sku_info import_user_info import_payment_info ;; esac2)ods层 编辑ods层导入脚本
[root@hadoop102 bin]# vim ods_db.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table "$APP".ods_order_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table "$APP".ods_order_detail partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table "$APP".ods_sku_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table "$APP".ods_user_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table "$APP".ods_payment_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table "$APP".ods_base_category1 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table "$APP".ods_base_category2 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table "$APP".ods_base_category3 partition(dt='$do_date'); " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"3)dwd层 编辑dwd层导入脚本
[root@hadoop102 bin]# vim dwd_db.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_order_info partition(dt) select * from "$APP".ods_order_info where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_order_detail partition(dt) select * from "$APP".ods_order_detail where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_user_info partition(dt) select * from "$APP".ods_user_info where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_payment_info partition(dt) select * from "$APP".ods_payment_info where dt='$do_date' and id is not null; insert overwrite table "$APP".dwd_sku_info partition(dt) select sku.id, sku.spu_id, sku.price, sku.sku_name, sku.sku_desc, sku.weight, sku.tm_id, sku.category3_id, c2.id category2_id , c1.id category1_id, c3.name category3_name, c2.name category2_name, c1.name category1_name, sku.create_time, sku.dt from "$APP".ods_sku_info sku join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id where sku.dt='$do_date' and c2.dt='$do_date' and c3.dt='$do_date' and c1.dt='$do_date' and sku.id is not null; " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"4)dws层 编辑dws层导入脚本
[root@hadoop102 bin]# vim dws_db_wide.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" with tmp_order as ( select user_id, count(*) order_count, sum(oi.total_amount) order_amount from "$APP".dwd_order_info oi where date_format(oi.create_time,'yyyy-MM-dd')='$do_date' group by user_id ) , tmp_payment as ( select user_id, sum(pi.total_amount) payment_amount, count(*) payment_count from "$APP".dwd_payment_info pi where date_format(pi.payment_time,'yyyy-MM-dd')='$do_date' group by user_id ) insert overwrite table "$APP".dws_user_action partition(dt='$do_date') select user_actions.user_id, sum(user_actions.order_count), sum(user_actions.order_amount), sum(user_actions.payment_count), sum(user_actions.payment_amount) from ( select user_id, order_count, order_amount, 0 payment_count, 0 payment_amount from tmp_order union all select user_id, 0 order_count, 0 order_amount, payment_count, payment_amount from tmp_payment ) user_actions group by user_id; " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"5)ads层 编辑ads层导入脚本
[root@hadoop102 bin]# vim ads_db_gmv.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive # 定义变量方便修改 APP=gmall # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" insert into table "$APP".ads_gmv_sum_day select '$do_date' dt, sum(order_count) gmv_count, sum(order_amount) gmv_amount, sum(payment_amount) payment_amount from "$APP".dws_user_action where dt ='$do_date' group by dt; " beeline -u " jdbc:hive2://hadoop102:10000/;principal=hive/hadoop102@HADOOP.COM" -n hive -e "$sql"6)sqoop导出 编辑sqoop导出脚本
[root@hadoop102 bin]# vim sqoop_export.sh内容如下
#!/bin/bash kinit -kt /var/lib/hive/hive.keytab hive/hive db_name=gmall export_data() { sqoop export \ --connect "jdbc:mysql://hadoop102:3306/${db_name}?useUnicode=true&characterEncoding=utf-8" \ --username root \ --password Yy8266603@ \ --table $1 \ --num-mappers 1 \ --export-dir /warehouse/$db_name/ads/$1 \ --input-fields-terminated-by "\t" \ --update-mode allowinsert \ --update-key $2 \ --input-null-string '\\N' \ --input-null-non-string '\\N' } case $1 in "ads_uv_count") export_data "ads_uv_count" "dt" ;; "ads_user_action_convert_day") export_data "ads_user_action_convert_day" "dt" ;; "ads_gmv_sum_day") export_data "ads_gmv_sum_day" "dt" ;; "all") export_data "ads_uv_count" "dt" export_data "ads_user_action_convert_day" "dt" export_data "ads_gmv_sum_day" "dt" ;; esac