毫不夸张的说,有没有掌握hive调优,是判断一个数据工程师是否合格的重要指标 。 hive调优涉及到压缩和存储调优,参数调优,sql的调优,数据倾斜调优,小文件问题的调优等。
结论,一般选择orcfile/parquet + snappy 方式
分区是将表的数据在物理上分成不同的文件夹,以便于在查询时可以精准指定所要读取的分区目录,从来降低读取的数据量。
分桶是将表数据按指定列的hash散列后分在了不同的文件中,将来查询时,hive可以根据分桶结构,快速定位到一行数据所在的分桶文件,从来提高读取效率。
优化前(关系数据库不用考虑会自动优化)
select m.cid,u.id from order m join customer u on( m.cid =u.id )where m.dt='20180808';优化后(where条件在map端执行而不是在reduce端执行)
select m.cid,u.id from (select * from order where dt='20180818') m join customer u on( m.cid =u.id);尽量不要使用union (union 去掉重复的记录)而是使用 union all 然后在用group by 去重
不要使用count (distinct cloumn) ,使用子查询
select count(1) from (select id from tablename group by id) tmp;如果需要根据一个表的字段来约束另为一个表,尽量用in来代替join . in 要比join 快
select id,name from tb1 a join tb2 b on(a.id = b.id); select id,name from tb1 where id in(select id from tb2);消灭子查询内的 group by 、 COUNT(DISTINCT),MAX,MIN。可以减少job的数量。
Common/shuffle/Reduce JOIN 连接发生的阶段,发生在reduce 阶段, 适用于大表 连接 大表(默认的方式) Map join :连接发生在map阶段 , 适用于小表 连接 大表 大表的数据从文件中读取 小表的数据存放在内存中(hive中已经自动进行了优化,自动判断小表,然后进行缓存)
set hive.auto.convert.join=true;SMB join Sort -Merge -Bucket Join 对大表连接大表的优化,用桶表的概念来进行优化。在一个桶内发生笛卡尔积连接(需要是两个桶表进行join)
set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.auto.convert.sortmerge.join.noconditionaltask=true;表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。 原因:某个reduce的数据输入量远远大于其他reduce数据的输入量
1)group by 如果是在group by中产生了数据倾斜,是否可以讲group by的维度变得更细,如果没法变得更细,就可以在原分组key上添加随机数后分组聚合一次,然后对结果去掉随机数后再分组聚合 在join时,有大量为null的join key,则可以将null转成随机值,避免聚集
2)count(distinct) 情形:某特殊值过多 后果:处理此特殊值的 reduce 耗时;只有一个 reduce 任务 解决方式:count distinct 时,将值为空的情况单独处理,比如可以直接过滤空值的行, 在最后结果中加 1。如果还有其他计算,需要进行 group by,可以先将值为空的记录单独处理,再和其他计算结果进行 union。
3)不同数据类型关联产生数据倾斜 情形:比如用户表中 user_id 字段为 int,log 表中 user_id 字段既有 string 类型也有 int 类型。当按照 user_id 进行两个表的 Join 操作时。 后果:处理此特殊值的 reduce 耗时;只有一个 reduce 任务 默认的 Hash 操作会按 int 型的 id 来进行分配,这样会导致所有 string 类型 id 的记录都分配 到一个 Reducer 中。 解决方式:把数字类型转换成字符串类型 select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
4)mapjoin
join的每路输入都比较大,且长尾是热点值导致的,可以对热点值和非热点值分别进行处理,再合并数据 key本身分布不均 可以在key上加随机数,或者增加reduceTask数量
set hive.groupby.skewindata=true; 思想:就是先随机分发并处理,再按照 key group by 来分发处理。 操作:当选项设定为 true,生成的查询计划会有两个 MRJob。 第一个 MRJob 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 GroupBy Key 有可能被分发到不同的Reduce 中,从而达到负载均衡的目的;
第二个 MRJob 再根据预处理的数据结果按照 GroupBy Key 分布到 Reduce 中(这个过程可以保证相同的原始 GroupBy Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
将为空的 key 转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个 Reducer。 注:对于异常值如果不需要的话,最好是提前在 where 条件里过滤掉,这样可以使计算量大大减少
小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:
设置map输入的小文件合并
set mapred.max.split.size=256000000; //一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; //一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; //执行Map前进行小文件合并 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;设置map输出和reduce输出进行合并的相关参数:
//设置map端输出进行合并,默认为true set hive.merge.mapfiles = true //设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true //设置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000explain sql 学会查看sql的执行计划,优化业务逻辑 ,减少job的数据量。对调优也非常重要
