【大数据开发】MapReduce——统计每年的最高温度(自定义分区器)、统计每年入职的人数

    科技2025-06-11  19

    第一题:简述MapReduce的核心思想(重点理解)
    一句话:移动计算而非移动数据,分而治之。 原因:因为移动数据,会占用大量的网络带宽,而网络带宽本来就很稀缺,传输时间与分析时间的比例大大提高了, 效率非常低。 反过来,因为计算程序的字节数量不是很大(100M足够大了),所以将计算程序移动到有数据的机器节点上,利用他们的cpu进行运算,一是比移动数据要大大节省了网络带宽和时间,二是可以并发运算,效率翻倍。这样,整个作业的时间只取决于分析时间。 MapReduce的一个完整程序包含两个部分,一部分是N个MapTask并行运算(程序分发到有数据的节点上运行),互不相干,效率翻倍,任务产生的数据临时存储到本地磁盘中。另一部分是N个ReduceTask并行运算,互不相干,需要从MapTask产生的数据中fetch自己要处理的数据,进行合并统计等,将最终分析结果存储到HDFS中。
    第二题:简述MapReduce的阶段分类和数据扭转
    #第一阶段:Map阶段 整个map阶段,通常会有N个maptask,并行运算,互不干扰,各自统计各自的数据,这个数据通常是一个块文件(原始数据)。统计结果会临时保存到本地磁盘上 原始数据经过处理会转成键值对<K1,V1>,然后作为map方法的输入数据,进入map方法,经过map方法的处理,再次以键值对<k2,v2>的形式输出。 #第二阶段:Reduce阶段 reduce阶段,也会有N个ReduceTask,也是并发执行,互不干扰,处理的数据是要fetch Map阶段产生的临时数据,然后把这些临时数据进行汇总,汇总结果通常保存到HDFS上(也可以保存到本地磁盘)。 map阶段的输出数据,作为reduce阶段的输入数据,以<k2,list<V2>>的形式进入reduce方法,经过reduce方法处理后,再次以键值对<K3,V3>的形式输出。
    第三题:简述MapReduce的编程模型
    - 自定义Mapper类型,继承Mapper父类,定义泛型,重写map方法 - 自定义Reducer类型,继承Reducer父类,定义泛型,重新reduce方法 - 自定义Driver类型,获取job对象,设置各种信息,进行job提交
    第四题:完成wordcount入门案例

    使用下面的数据画出流程图,要求,a-n为一个分区,o-z为一个分区

    文件test1.txt的内容如下

    hello welcome to china hello java hello welcome to home

    文件test2.txt内容如下

    you are the best and i like you can you help me
    第五题:统计每年的最高温度。

    提示,谁应该作为K2

    数据格式说明: 每一行中的第16到第19个字符,是年份。如下面的1901 每一行中的第88个字符表示温度的正与负 。如-,+ 每一行中的第89到第92个字符表示温度。如果是9999,表示是无效温度 第93个字符如果是0,1,4,5,9表示有效温度

    002902907099999 1901 010…01N9 -0039 1+99999102231ADDGF108991999999999999999999 002902907099999 1901 010…01N9 +0000 1+99999101821ADDGF104991999999999999999999 ………………………………………

    **数据源:**在Phase02-03-Mapreduce里的datasource目录下的ncdc1里

    要求:2000年以前的数据存到一个文件里,2000年以后的数据存储到一个文件里 >=2000

    代码:

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; public class TemperatureDriver { public static void main(String[] args) throws Exception { //获取配置对象 Configuration conf = new Configuration(); //获取Job实例 Job job = Job.getInstance(conf,"temptrue"); //设定驱动类型 job.setJarByClass(TemperatureDriver.class); //设置job要执行的Mapper类型和Reducer类型 job.setMapperClass(TempMapper.class); job.setReducerClass(TempReducer.class); //设置k3,v3的泛型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //设置自定义的分区器 job.setPartitionerClass(TempPartitioner.class); //修改reduceTask的数量 job.setNumReduceTasks(2); //设置要统计的输入路径以及要输出的位置路径, 借助main方法的形参的第一个元素表示要统计的路径 FileInputFormat.setInputPaths(job,new Path("...\\datasource\\ncdc1")); Path path = new Path("D:/temp"); FileSystem fs = FileSystem.get(conf); if(fs.exists(path)){ fs.delete(path,true); } FileOutputFormat.setOutputPath(job,path);//借助main方法的形参的第二个元素表示输出位置 //提交作业,等待完成 boolean flag = job.waitForCompletion(true); System.exit(flag?0:1); } public static class TempPartitioner extends Partitioner<IntWritable,IntWritable>{ public int getPartition(IntWritable key, IntWritable value, int numPartitions) { if(key.get()<2000){ return 0; } return 1; } } /** *0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999 *数据格式说明: * 每一行中的第16到第19个字符,是年份。如下面的1901 * 每一行中的第88个字符表示温度的正与负 。如-,+ * 每一行中的第89到第92个字符表示温度。如果是9999,表示是无效温度 * 第93个字符如果是0,1,4,5,9表示有效温度 * * * K1泛型: 行偏移量 LongWritable * V1泛型: 行记录 Text * K2泛型: 年份 IntWritable * V2泛型: 温度 IntWritable * */ public static class TempMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable> { //定义k2,v2对象 IntWritable k2 = new IntWritable(); IntWritable v2 = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将value转成字符串类型 String line = value.toString(); //获取年份 int year = Integer.parseInt(line.substring(15,19)); //获取温度的正负符号 String symbol = line.substring(87,88); //获取温度, 有可能最高温度是负数的。比如这一年这种的最高温度是-2. int temp; if("+".equals(symbol)){ temp = Integer.parseInt(line.substring(88,92)); }else{ temp = Integer.parseInt(line.substring(87,92)); } //获取有效温度符号 String sym = line.substring(92,93); if(Math.abs(temp)!=9999&&sym.matches("[01459]")){ //将年份和温度封装到k2,和v2里 k2.set(year); v2.set(temp); //写出去 context.write(k2,v2); } } } /** * * k2: 年份 泛型IntWritable * v2: 温度 泛型IntWritable * k3: 年份 泛型IntWritable * v3: 最高温度 泛型IntWritable * * * 在进入reduce函数之前,同一年的所有温度被汇集到了一起。变成了List<v2> * */ public static class TempReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ IntWritable v3 = new IntWritable(); @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //values就是同一年的所有的温度,只需要找到这些温度的最大值即可 int max = -10000; Iterator<IntWritable> it = values.iterator(); while(it.hasNext()){ int current = it.next().get(); if(max<current){ max = current; } } //循环结束后,max一定是最高温度,封装到v3里. v3.set(max); //k3就是k2,写出去即可 context.write(key,v3); } } } public static class TemperaturePartitioner extends Partitioner<IntWritable,IntWritable>{ /** * * @param key 需要分区的key,这里是k2 * @param value key对应的value * @param numPartitions 分区总数 * @return key的分区号 * * 注意: 在进入环形缓冲区时,就会被调用,调用后,<K2,V2>这一个键值对就已经被标记为时属于哪一个分区的了 */ public int getPartition(IntWritable key, IntWritable value, int numPartitions) { if(key.get()<2000){ return 0; } return 1; } } }
    第六题:数据如下:
    empno ename job mgr hiredate sal comm deptno 7369,SMITH,CLERK,7902,1980-12-17,800,null,20 7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30 7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30 7566,JONES,MANAGER,7839,1981-04-02,2975,null,20 7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30 7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30 7782,CLARK,MANAGER,7839,1981-06-09,2450,null,10 7788,SCOTT,ANALYST,7566,1987-04-19,3000,null,20 7839,KING,PRESIDENT,null,1981-11-17,5000,null,10 7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30 7876,ADAMS,CLERK,7788,1987-05-23,1100,null,20 7900,JAMES,CLERK,7698,1981-12-03,950,null,30 7902,FORD,ANALYST,7566,1981-12-02,3000,null,20 7934,MILLER,CLERK,7782,1982-01-23,1300,null,10

    1):使用mr程序统计每年入职的人数。

    最终结果格式如下: 年份:1980 人数:xxx 年份:1981 人数:xxx import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; /** * @Description 使用mr程序统计每年入职的人数。 */ public class Exercise01 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"emp_count"); //只要不是在集群上使用yarn运行,可以不写 job.setJarByClass(Exercise01.class); job.setMapperClass(EmpMapper.class); job.setReducerClass(EmpReducer.class); job.setMapOutputKeyClass(IntWritable.class); //job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path("D:\\data\\Emp.txt")); FileSystem fs = FileSystem.get(conf); Path dstPath = new Path("D:/empoutput"); if(fs.exists(dstPath)){ fs.delete(dstPath,true); } FileOutputFormat.setOutputPath(job,dstPath); System.exit(job.waitForCompletion(true)?0:1); } public static class EmpMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable>{ //提成 成员变量(属性),节省内存空间 IntWritable k2 = new IntWritable(); IntWritable v2 = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); //获取年份 int year = Integer.parseInt(split[4].substring(0,4)); k2.set(year); //写出去 context.write(k2,v2); } } public static class EmpReducer extends Reducer<IntWritable,IntWritable, Text,Text> { Text k3 = new Text(); Text v3 = new Text(); @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { Iterator<IntWritable> iterator = values.iterator(); int sum = 0;//计数器 while(iterator.hasNext()){ sum+=iterator.next().get(); } k3.set("年份:"+key.get()); v3.set("人数:"+sum); //写出去 context.write(k3,v3); } } }
    Processed: 0.294, SQL: 8