MapReduce:WordCount实战

    科技2023-09-22  73

    map:负责“分” reduce:负责“合”

    MapReduce编程规范–wordcount为例

    一、Map阶段2个步骤 1.设置InputFormat类,将数据切分为Key-Value(K1和V1)对,输入到第二步 inputformat的子类是textinputformat,去读取原文件,将每一行数据变成键值对:定义k1(数据偏移量),v1(数据) 2自定义Map逻辑,将第一步的结果转换成另外的Key-Value (K2和V2)对,输出结果 (1)自定义一个Mapper类,重写map方法 (2)在map方法中将k1和v1转化成k2和v2 eg:拆分单词,以逗号为切割,进行统计(可认为1是固定值) 二、Shuffle阶段4个步骤:分区、排序、规约、分组 3.对输出的Key-Value对进行分区 4.对不同分区的数据按照相同的 Key 排序 5.(可选)对分组过的数据初步规约,降低数据的网络拷贝。规约相当于将reduce阶段的事情提前在map阶段做了。 6.对数据进行分组,相同Key 的Value放入一个集合中 三、Reduce阶段2个步骤 7.对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-Value进行处理,转为新的Key-Value (K3和V3)输出 (1)自定义一个Reducer类,重写reduce方法。 (2)在reduce方法中要将新的k2和v2转为k3和v3 8.设置OutputFormat处理并保存Reduce输出的Key-Value数据

    WordCount

    一、数据格式准备 1.创建新文件

    cd /export/servers vim wordcount.txt

    2.将文件中放入以下内容

    hello,world,hadoop,hive,sqoop,hadoop flume,hello,kitty,tom,jerry,word

    3.上传到HDFS

    hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/

    二、WordCoutMapper

    package com.insupr.mapreduce.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* Mapper的范型 KEYIN:K1的类型 行偏移量 LongWritable VALUE:V1的类型 一行的文本数据 Text KEYOUT:K2的类型 每个单词 Text VALUEOUT:V2的类型 固定值 LongWritable */ //LongWritable封装的是Long。Text封装的是String public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ @Override //key是k1,value是v1,context表示mapreduce上下文对象 protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { //1:对每一行数据进行字符串拆分 String line = value.toString(); String[] split = line.split(s:","); //2:遍历数组,获取每一个单词 for (String word : split){ //word 1 context.write(new Text(word),new LongWritable(1)); } /* Text test = new Text(); String line = value.toString(); String[] split = line.split(s:","); //2:遍历数组,获取每一个单词 for (String word : split){ //word 1 text.set(word); context.write(text,new LongWritable(1)); */ } }

    三、WordCoutReducer

    package com.inspur.mapreduce.wc; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** *自定义我们的reduce逻辑 *所有的key都是我们的单词,所有的values都是我们单词出现的次数 *@param key *@param values *@param context *@ithrousIOException *@throws InterruptedException */ /* KEYIN:k2 Text 每个单词 VALUEIN:v2 LongWritable 集合中泛型的类型 KEYOUT:k3 Text 每个单词 VALUEOUT:v3 LongWritable 每个单词出现的次数 */ public class WordCountReducer extends Reducer<Text, LongWritable,Text, LongWritable> { /* reduce的作用是将K2、V2转换为K3、V3 key : K2 values:集合 context: MapReduce的上下文对象 */ @Override protected void reduce(Text key,Iterable LongWritable〉 values,Context context) throws IOException,InterruptedException { long count =0; //1.遍历values集合 for (LongWritable value : values){ //2将集合中的值相加 count += value.get(); } //3.将k3和v3写入上下文中 context.write(key,new LongWritable(count)); } }

    四、JobMain

    package com.inspur.mapreduce.wc; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configured implements Tool{ @Override //需重写implements方法 是run方法 public int run(String[] strings) throws Exception { //创建一个任务对象 //不能new一个configuration,因为是同一对象,但可以从父类中继承 Job.getInstance(super.getConf(),"mapreduce_wordcount"); //打包放在集群运行时需要做一个配置(指定主类的calss对象) job.setJarByClass(JobMain.class); //对任务对象进行设置 //1.读取源文件-设置读取源文件的类 job.setInputFromatClass(TextInputFormat,class); TextInputFormat.addInputPath(job,new Path("hadfs://node01:8020/wordcount") //2.设置mapper类 job.setMapperClass(WordCountMapper.class); //设置map阶段的输出类型:k2v2的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutPutValueClass(LongWritable.calss); //3shuffle阶段默认分区排序规约分组,因此代码省略 //4reduce阶段设置Reducer类 job.setReducerClass(WordCountReducer.class) ; //5设置Reduce阶段的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置输出类,设置文件输出的路径 job.setOutputFormatClass(TextOutputFormat.class) ; TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out")); boolean b = job.waitForCompletion(true) ; //一旦任务提交,让人物进入等待完成状态,最终返回的是run return b?0:1; return 0; } public static void main(String[]args){ Configuration configuration = new Configuration() //configuration对象是配置对象,有配置信息就放在配置对象里面 //启动一个任务 int run = ToolRunner.run(configuration,new JobMain(),args)//0是执行成功 System.exit(run) } }

    三、运行过程 打开xml配置文件 设置jar 1.将module打成jar包 打完包之后多一个target目录 2.把jar包上传到集群上 3.运行:

    hadoop jar jar包名称 主类的全路径名(包名+类名) eg:hadoop jar mapreduce.jar com.inspur.mapreducework.JobMain

    4.补充:如果文件重复了需要删除: hdfs dfs -rm -r /wordcount_out

    Processed: 0.020, SQL: 8