Tips:
Mr切片是逻辑切分,HDFS的分块是物理切分split与block是一对一的关系map阶段所有的排序都是针对key进行排序,不会针对value流程分析:
TextInputFormat读取文件,并调用getSplits()函数对文件进行逻辑分片,一个split对应一个blockRecordReader读取一个split,调用一次map函数,并将结果输出到环形缓冲区缓冲区内部对结果进行分区(partition),分区规则是key的hashcode对reducer个数进行取模当缓冲区存储达到80%时,进行一次溢写(spil),在磁盘生成溢写文件,溢写时,每个分区内部按key进行快速排序(sort),如果设置了combiner,此时会对相同key的value进行聚合所有计算完成后,对多个溢写文件按分区进行合并(merge),并对每个分区内按key进行归并排序同时维护一份索引文件,记录各个分区的偏移量,map阶段的结果是分区内有序的MapTask并行度决定了Map任务处理的并发度,从而影响到整个job的运行效率 思想: 移动计算比移动数据划算!!! 思考: map任务是越多越好吗?哪些因素影响map任务的个数? 不是,如果一个文件仅比128M大一点点,也会当作一个split split的个数
MapTask并行度的决定机制
一个split对应一个map任务,一个split对应一个blockblocksize=splitsize=128M为什么splitsize等于128M?
思考: A文件300M,B文件100M,请问会有几个split?
split的计算是按文件逐个划分的,即先计算A文件3个split,再计算B文件1个split,而不是将A、B文件相加再除以128
切片机制源码
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException * 翻译:生成文件的列表,并将他们装入FileSplits */ public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start(); //getFormatMinSplitSize() 1 getMinSplitSize(job) 0或1 //minsize 1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //maxsize Long类型的最大值 long maxSize = getMaxSplitSize(job); // generate splits //Split列表 List<InputSplit> splits = new ArrayList<InputSplit>(); //获取job中文件的状态信息 List<FileStatus> files = listStatus(job); //遍历每个文件,由此可见split的生成是按单个文件来计算的 for (FileStatus file: files) { Path path = file.getPath(); //获取文件长度 long length = file.getLen(); //判断文件是否为空 if (length != 0) { BlockLocation[] blkLocations; //获取文件block的位置信息 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } //判断文件是否可切分 if (isSplitable(job, path)) { //获取blocksize和splitsize long blockSize = file.getBlockSize(); //Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; //SPLIT_SLOP=1.1,如果文件大小/splitsize>1.1,则切分一个split while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } //如果文件大小/splitsize>1.1,则单独作为一个split if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable 不可切分文件作为一个split splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; }大数据学习+V:yp2595809239