Apache Hadoop Day5

    科技2022-07-10  108

    Apache Hadoop Day5

    MapReduce Shuffle

    定义

    MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle。总体来说shuffle核心流程主要包括以下几个方面:数据分区、排序、局部聚合/Combiner、缓冲区、溢写、抓取/Fetch、归并排序等。

    常见问题

    1、MapReduce能否实现全局排序?

    默认情况下MapReduce是无法实现全局有序的,因为底层MapReduce使用的是HashPartitioner实现,仅仅只能保证数据分区内部的数据以key的自然顺序排列,因此无法实现全局有序。但是可以有一下思路完成全局排序:

    设置NumReduceTask的个数为1,这样会导致所有的数据落入到同一个分区,即可实现全排序,但是只适用于小批量数据集自定义分区策略让数据按照区间分区,不按照hash策略,此时只需要保证区间之间有序即可实现全局有序。但是这种做法会出现区间数据分布不均匀,导致计算过程中出现数据倾斜。使用Hadoop提供的TotalOrderPartitioner,先对目标进行采样,然后推算出分区区间。

    参考:https://blog.csdn.net/lalaguozhe/article/details/9211919

    2、如何干预MapReduce的分区策略?

    一般来说在实际的开发中,很少去干预分区策略,因为基于大数据首先要考虑的是数据的均匀分布,防止数据倾斜。因此Hash的散列往往是最佳的选择,如果需要覆盖原有分区,可以调用:

    job.setPartitionerClass(分区实现类信息即可) public class CustomHashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

    3、如何解决在MapReduce计算过程中的数据倾斜问题(面试热点问题)?

    场景:统计亚洲国家人口,以中国、日本为例。自然会使用国家作为key、公民信息作为value。在进行MapReduce计算的时候,中国的公民因为国籍都是China自然会落入到一个分区中。这样就出现数据严重倾斜。

    4、Map、Reduce并行度是靠什么决定的?

    Map端并行度是通过计算任务切片决定的,Reduce端是通过job.setNumReduceTask(n)

    5、MapReduce调优策略

    1)避免小文件计算,线下合并成大文件之后,在进行MapReduce分析或者CombineTextInputFormat。

    2)调整环状缓冲区的参数,减少Map任务的IO操作,不能无限制调大,还要考虑到系统GC问题。

    3)开启Map压缩,将溢写文件压缩成GZIP格式,减少ReduceShuffle过程中的网络带宽占用,消耗CPU为代价

    //开启解压缩,必须真实的环境下运行 conf.setBoolean("mapreduce.map.output.compress",true); conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);

    4)如果条件允许,我们可以开启Map端预处理机制,提前在Map端执行Reduce逻辑,进行局部计算,这样既可极大提升计算性能,但是这种优化手段并不是所有场景都适合。例如:求均值,这种场景就不适合在Map端执行Reduce逻辑。

    1)Combiner/Reduce的输入和输出类型必须一致,也就是说预计算逻辑不可以更改Map端输出类型/Reduce端输入类型。

    2)不可以改变原有的业务逻辑,比如 求平均值,虽然类型兼容但是业务计算不正确。

    优点:减少Key数量,节省排序所占用内存空间,极大削减了在ReduceShuffle时候的数据下载量,节省带宽。

    5)适当的调整NodeManager管理的资源数

    yarn.nodemanager.resource.memory-mb=32G yarn.nodemanager.resource.cpu-vcores = 16

    或者开启硬件资源监测

    yarn.nodemanager.resource.detect-hardware-capabilities=true

    6)如果顺序执行多个小任务,我们可以考虑使用JVM重用机制,可以使用一个JVM顺序执行多个任务,无需重启新的jvm。

    mapreduce.job.jvm.numtasks=2

    Hadoop集群

    Hadoop HA构建

    概述

    NameNode HA构建 存储、ResourceManager HA构建 计算

    准备工作

    安装三台CentOS操作系统(完成JDK、SSH免密码认证、IP主机名映射、关闭防火墙等工作)

    主机和服务启动映射表

    主机服务CentOSANameNode、zkfc、DataNode、JournalNode、Zookeeper、NodeManagerCentOSBNameNode、zkfc、DataNode、JournalNode、Zookeeper、NodeManager、ResourceManagerCentOSCDataNode、JournalNode、Zookeeper、NodeManager、ResourceManager

    主机信息

    主机名IP信息CentOSA192.168.80.142CentOSB192.168.80.143CentOSC192.168.80.144

    JDK安装和配置

    [root@CentOSX ~]# rpm -ivh jdk-8u171-linux-x64.rpm [root@CentOSX ~]# vi .bashrc JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin CLASSPATH=. export JAVA_HOME export CLASSPATH export PATH [root@CentOSX ~]# source .bashrc

    IP主机名映射

    [root@CentOSX ~]# vi /etc/hosts 192.168.80.142 CentOSA 192.168.80.143 CentOSB 192.168.80.144 CentOSC

    关闭防火墙

    [root@CentOSX ~]# systemctl stop firewalld [root@CentOSX ~]# systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service. [root@CentOSX ~]# firewall-cmd --state not running

    SSH免密码认证

    [root@CentOSX ~]# ssh-keygen -t rsa [root@CentOSX ~]# ssh-copy-id CentOSA [root@CentOSX ~]# ssh-copy-id CentOSB [root@CentOSX ~]# ssh-copy-id CentOSC

    Zookeeper

    [root@CentOSX ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/ [root@CentOSX ~]# mkdir /root/zkdata [root@CentOSA ~]# echo 1 >> /root/zkdata/myid [root@CentOSB ~]# echo 2 >> /root/zkdata/myid [root@CentOSC ~]# echo 3 >> /root/zkdata/myid [root@CentOSX ~]# touch /usr/zookeeper-3.4.6/conf/zoo.cfg [root@CentOSX ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg tickTime=2000 dataDir=/root/zkdata clientPort=2181 initLimit=5 syncLimit=2 server.1=CentOSA:2887:3887 server.2=CentOSB:2887:3887 server.3=CentOSC:2887:3887 [root@CentOSX ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg [root@CentOSX ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg JMX enabled by default Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: `follower|leader` [root@CentOSX ~]# jps 5879 `QuorumPeerMain` 7423 Jps

    搭建Hadoop 集群(HDFS)

    解压并配置HADOOP_HOME

    [root@CentOSX ~]# tar -zxf hadoop-2.9.2.tar.gz -C /usr/ [root@CentOSX ~]# vi .bashrc HADOOP_HOME=/usr/hadoop-2.9.2 JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin CLASSPATH=. export JAVA_HOME export CLASSPATH export PATH export HADOOP_HOME [root@CentOSX ~]# source .bashrc

    配置core-site.xml

    <!--配置Namenode服务ID--> <property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/usr/hadoop-2.9.2/hadoop-${user.name}</value> </property> <property> <name>fs.trash.interval</name> <value>30</value> </property> <!--配置机架脚本--> <property> <name>net.topology.script.file.name</name> <value>/usr/hadoop-2.9.2/etc/hadoop/rack.sh</value> </property> <!--配置ZK服务信息--> <property> <name>ha.zookeeper.quorum</name> <value>CentOSA:2181,CentOSB:2181,CentOSC:2181</value> </property> <!--配置SSH秘钥位置--> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property>

    配置机架脚本

    [root@CentOSX ~]# touch /usr/hadoop-2.9.2/etc/hadoop/rack.sh [root@CentOSX ~]# chmod u+x /usr/hadoop-2.9.2/etc/hadoop/rack.sh [root@CentOSX ~]# vi /usr/hadoop-2.9.2/etc/hadoop/rack.sh while [ $# -gt 0 ] ; do nodeArg=$1 exec</usr/hadoop-2.9.2/etc/hadoop/topology.data result="" while read line ; do ar=( $line ) if [ "${ar[0]}" = "$nodeArg" ] ; then result="${ar[1]}" fi done shift if [ -z "$result" ] ; then echo -n "/default-rack" else echo -n "$result " fi done [root@CentOSX ~]# touch /usr/hadoop-2.9.2/etc/hadoop/topology.data [root@CentOSX ~]# vi /usr/hadoop-2.9.2/etc/hadoop/topology.data 192.168.80.142 /rack01 192.168.80.143 /rack01 192.168.80.144 /rack03

    配置hdfs-site.xml

    <property> <name>dfs.replication</name> <value>3</value> </property> <!--开启自动故障转移--> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> <!--解释core-site.xml内容--> <property> <name>dfs.nameservices</name> <value>mycluster</value> </property> <property> <name>dfs.ha.namenodes.mycluster</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn1</name> <value>CentOSA:9000</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn2</name> <value>CentOSB:9000</value> </property> <!--配置日志服务器的信息--> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://CentOSA:8485;CentOSB:8485;CentOSC:8485/mycluster</value> </property> <!--实现故障转切换的实现类--> <property> <name>dfs.client.failover.proxy.provider.mycluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property>

    配置slaves

    CentOSA CentOSB CentOSC

    这里因为是CentOS7,需要我们额外安装一个小插件,psmic否则NameNode无法实现故障自动切换。

    [root@CentOSX ~]# yum install -y psmisc

    启动HDFS(集群初始化启动)

    [root@CentOSX ~]# hadoop-daemon.sh start journalnode (等待10s钟) [root@CentOSA ~]# hdfs namenode -format [root@CentOSA ~]# hadoop-daemon.sh start namenode [root@CentOSB ~]# hdfs namenode -bootstrapStandby [root@CentOSB ~]# hadoop-daemon.sh start namenode #注册Namenode信息到zookeeper中,只需要在CentOSA或者B上任意一台执行一下指令 [root@CentOSA|B ~]# hdfs zkfc -formatZK [root@CentOSA ~]# hadoop-daemon.sh start zkfc [root@CentOSB ~]# hadoop-daemon.sh start zkfc [root@CentOSX ~]# hadoop-daemon.sh start datanode

    查看机架信息

    [root@CentOSC ~]# hdfs dfsadmin -printTopology Rack: /rack01 192.168.73.131:50010 (CentOSA) 192.168.73.132:50010 (CentOSB) Rack: /rack03 192.168.73.133:50010 (CentOSC)

    Resource Manager搭建

    yarn-site.xml

    <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>CentOSB</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>CentOSC</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>CentOSB:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>CentOSC:8088</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>CentOSA:2181,CentOSB:2181,CentOSC:2181</value> </property> <!--关闭物理内存检查--> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--关闭虚拟内存检查--> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>

    mapred-site.xml

    <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>

    启动关闭Yarn服务

    [root@CentOSB ~]# yarn-daemon.sh start|stop resourcemanager [root@CentOSC ~]# yarn-daemon.sh start|stop resourcemanager [root@CentOSX ~]# yarn-daemon.sh start|stop nodemanger

    如果集群中某个namenode宕机,需要删除dfs目录下的name文件夹

    Processed: 0.016, SQL: 8