2020.10.7 mapreduce框架的demo测试:WordCount(词频统计) 逻辑实现

    科技2024-04-20  10

    先总结MapReduce的编程规范

    Map阶段2个步骤

    设置InputFormat类,将数据切分为Key-Value(K1与V1)对,输入到第二布自定义Map逻辑,将第一步的结果转变为另外的Key-Value(K2与V2)对,输出结果

    Shuffle阶段4个步骤

    对输出的Key-Value进行分区对不同分区的数据按照相同的key排序(可选)对分组过的数据初步规约,降低数据的网络拷贝对数据进行分组,相同的Key的Value放入一个集合中

    Reduce阶段的2个步骤

    对多个Map任务的结果进行排序以及合并(K2与V2),编写Reduce函数实现自己的逻辑,对输入的Key-Value进行处理,转为新的Key-Value(K3与V3)输出设置OutputFormat处理并保存Reduce输出的Key-Value数据

    逻辑代码:

    Main类:

    package com.fyg.bigdata; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import java.io.IOException; public class WordCountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 设置map和reduce // 类名.class就是返回了一个对象实例,每个类在JVM中都存在一个实例与之对应,用于存储类的初始化信息 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setJarByClass(WordCountMain.class); job.setJobName("Word Count test"); // 设置map的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置reduce的输出格式 job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置格式化类 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.setInputPaths(job, new Path("file:///E:/data/mapreduce/input")); // outputfile 不能存在!因为不允许重写一个已知的文件 TextOutputFormat.setOutputPath(job, new Path("file:///E:/data/mapreduce/output")); // 提交作业 job.waitForCompletion(true); } }

    Mapper类:

    package com.fyg.bigdata; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.LongWritable; import java.io.IOException; //LongWritable就是long型,同理IntWritable就是int public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { @Override // 每读一行就会调用一次map方法 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words=value.toString().split(","); for(String word:words) { context.write(new Text(word),new IntWritable(1)); } } }

    Reduce类:

    package com.fyg.bigdata; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.awt.*; import java.io.IOException; public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; // sum is numbers of occurrences of words for(IntWritable i:values) { sum+=i.get(); } context.write(key,new IntWritable(sum)); } }

    细节梳理:

    每一个split分配一个mapTask并行实例处理

    由FileInputFormat实现类的getSplits()方法实现切片

    默认切片大小就是Block块大小(默认块大小128M,是因为使用的是2.X版本的hadoop,1.X版本则是64m)

    在FileInputFormat中,计算切片大小的逻辑代码为: Math.max(minSize, Math.min(maxSize, blockSize));

    minsize:默认值:1 配置参数: mapreduce.input.fileinputformat.split.minsize maxsize:默认值:Long.MAXValue 配置参数:mapreduce.input.fileinputformat.split.maxsize

    关于job.setJarclass,一个解释的比较好的coder的文章我留在这儿供参考:

    https://blog.csdn.net/joeyon1985/article/details/41316841?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522160213748919725222412332%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=160213748919725222412332&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v3~pc_rank_v2-1-41316841.first_rank_ecpm_v3_pc_rank_v2&utm_term=setjarbyclass%E4%BD%9C%E7%94%A8&spm=1018.2118.3001.4187

    我在此也补充一下hadoop权威指南上的介绍:

    job对象指定作业执行规范。我们可以把它来控制整个作业的运行。我们在hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(hadoop在集群上会发布这个文件)。不必明确指定JAR文件的名称,在job对象的setJarByClass()方法中传递一个类即可,hadoop会利用这个类来查找包含它jar文件,进而找到相关的jar文件。

    Processed: 0.063, SQL: 9