当一个上传文件时,分块(因为会根据块大小为128MB进行分块)到几个datanode上,在任务调度时,由于yarn里有resourceManager和nodeManager,resourceManager会创建一个ApplicationMaster和Containers(资源池),nodeManager运行ApplicationMaster和Containers调度资源池,该文件会被InputFormat(默认用的是FileInputFormat(TextInputFormat))切成相应的切片数(见3.1.1),每个切片会分配一个map任务进行处理。
splitsize在默认情况下就等于块大小:
要想将splitsize调大,应该配置mapreduce.input.fileinputformat.split.minsize;
要想将splitsize调小,应该配置mapreduce.input.fileinputformat.split.maxsize;
1、首先,splitSize对应了mapTask的数目,shuffle过程中,mapTask匠人输出的K,V对写入环形缓冲区,环形缓冲区80%是用来分区和存储,20%用来计算排序;环形缓冲区默认大小为100MB。
2、在环形缓冲区中,根据partitioner进行分区,分区数据会进行快速排序的字典排序;
3、环形缓冲区满了的时候会溢出到文件中,这些文件会再一次的进行归并排序成一个文件;
4、每一个mapTask溢出的各个文件进行再一次的归并排序,合并成每个分区的大文件;
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M
从Mapper的map函数执行完成后到Reducer的reduce函数处理之前的过程统称为shuffle。
MRAppMaster会通知ResourceManager进行申请reduceTask的container空间(cpu+内存+job.xml+job.jar等),reduceTask会下载shuffle过程产生出来的分区大文件,然后进行reduce函数的处理。
1、在MapReduce的shuffle过程中执行了三次排序,分别是: map的溢写阶段:根据分区以及key进行快速排序 map的合并溢写文件:将同一个分区的多个溢写文件进行归并排序,合成大的溢写文件 reduce输入阶段:将同一分区,来自不同map task的数据文件进行归并排序
2、在MapReduce整个过程中,默认是会对输出的KV对按照key进行排序的,而且是使用快速排序。 map输出的排序的,其实也就是上面的溢写过程中的排序。 reduce输出的排序,即reduce处理完数据后,MapReduce内部会自动对输出的KV按照key进行排序
以上排序都是根据KV中的Key进行排序的。所以当我们自定义的类作为Key时,需要实现WritableComparable 接口,也就是实现里面的 compareTo() 方法,用于排序时进行比较。 比较规则如下:
public int compareTo(object other) { this>other 返回1,正序,返回 -1,逆序。 }而在依据key进行排序时,如果key是一个复合对象,即该对象中包含多个成员属性,那么在进行key比较时,就会涉及到多个属性间的比较,而如果compareTo() 方法中,比较条件为两个的话,就称为二次排序。
辅助排序也叫分组排序,是指在reduce前的group过程中根据排序规则进行的分组,因为分组的时候是需要比较KV中key是否相同,如果相同才会归为同一个组,如果不相等,就归为不同的组,所以就涉及到key比较方法了。总的来说其实定义key在什么情况下才相等。这个过程可以自己定义分组的方法,也就是分组排序的实现类。 使用方法: 1、自定义分组类,继承 WritableComparator 2、调用父类的构造方法,创建实例 3、重写父类的 compare方法
例子:
public class OrderGroupCompartor extends WritableComparator { protected OrderGroupCompartor() { super(OrderBean.class, true); } /** * 以orderbean对象中的ID为分组依据。 * 同一ID的认为是同一个group,一个group只会调用一次reduce * * @param a 比较对象1 * @param b 比较对象2 * @return */ @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aOrderBean = (OrderBean) a; OrderBean bOrderBean = (OrderBean) b; if (aOrderBean.getID() > bOrderBean.getID()) { return 1; } else if (aOrderBean.getID() < bOrderBean.getID()) { return -1; } else { return 0; } } }我们要注意的是,在进行分组时,同一个分组内的key是以第一个进入该分区的KV对中的key为准的。如:
有两个KV对: 1、<[1,裤子],20> 2、<[1,袜子],21> 其中key由id和物品名称组成的,value则是物品价格 假设分组依据是根据key中的id来分组的,那么上面两个KV是属于同一个group,但是实际上这两个KV 的key是不相等的。当1号KV先进入该group,那么就会以1号的key作为该group的key,分组的结果为: <[1,裤子],[20,21]> 如果2号KV先进入,则按照前面的规则,分组结果为: <[1,袜子],[20,21]> 就会有这样的情况的发生,我们要注意利用好这点。那么谁先进入该group的呢?很简单,是按照事先排序的顺序,在前面的自然先进入。这里的排序其实就是前面reduce端的归并排序的结果,而使用的排序依据其实就是key的包装类中compareTo方法,属于普通排序里面的东西。
编写好自定义的分组排序类之后,需要在job中指定好自定义的分组类:
job.setGroupingComparatorClass(OrderGroupCompartor.class);普通排序请看 “MapReduce-统计手机号流量” 二次排序和辅助排序请看 “MapReduce--获取价格最高的商品”
partition
过程:输入的<key,value>对经过map()处理后输出新的<key,value>对,它首先会被存储到环形缓冲区中(字节数组实现)。该环形缓冲区的大小默认为100MB。并且会对每个<key,value>对hash一个partition值,相同partition值为同一个分区。作用:由于map()处理后的数据量可能会非常大,所以如果由一个reduce()处理效率不高,为了解决这个问题可以用分布式的思想,一个reduce()解决不了,就用多个reduce节点。一般来说有几类分区就对应有几个reduce节点,把相同分区交给一个reduce节点处理。设置 环形缓冲区大小:mapred-site.xml中设置mapreduce.task.io.sort.mb的值环形缓冲区溢写的阈值:mapred-site.xml中设置mapreduce.map.sort.spill.percent的值partition:job.setPartitionerClass(cls)
Paste_Image.png
sort
过程:把环形缓冲区中的数据根据partition值和key值两个关键字升序排序。同一partition内的按照key排序。作用:一般来讲mapreduce框架用来做各种排序操作,先在map端排序,减少reduce端排序的负担。设置:job.setSortComparatorClass(cls)Paste_Image.png
combiner 过程:将sort后的数据进行combiner操作作用:可以理解为map端的预reduce操作,在数据量非常大的时候,这样的优化可以节省很多网络带宽和本地磁盘IO流的读写。设置:job.setCombinerClass(cls)Paste_Image.png
compress
过程:压缩combiner输出后的数据作用:减少本地磁盘的读写和减少reduce拷贝map端数据时的网络带宽设置Configuration对象用来解析XML文件,可以用set方法来设置属性值。
Paste_Image.png
寻找要设置的相关属性
compress.png
内置的三种压缩算法
compresssuanfa.png
配置key,value
peizhi.png
spill
过程:将排序后的内存数据spill到本地磁盘中作用:因为数据量非常大,全部存放在内存中不太现实,所以最后还是会存到本地磁盘中merge
过程:因为可能会有几次spill,本身存放数据的out文件和存放数据偏移量索引index文件都会产生多个,把多个这样的文件合并。作用:方便reduce的一次性拷贝。设置:mapsort.png
merge
过程:reduce拷贝map()最终输出的磁盘数据,一个reduce应该拷贝每个map节点的相同partition的数据。作用:因为有多个map节点,拷贝后的数据文件不止一份,先进性合并操作,为后面的排序做准备。sort
过程、作用:这里和map端的一样。group
过程:将排序好的<key,value>对进行分组,分组规则默认的是将相同key的value放在一起。作用:为了reduce()更好的计算相同key值出现的次数。设置:job.setGroupingComparatorClass(cls);
排序比较和分组比较都是通过实现下图中的接口方法
sort.png
比较2.png
其中b1为第一个字节数组,s1为开始的index,l1为b1比较的长度,b2为第二个字节数组,s2为开始的index,l2为b2比较的长度
比较3.png
底层用到了java.util.Comparator.compare(T o1, T o2)函数
比较4.png