使用下面的数据画出流程图,要求,a-n为一个分区,o-z为一个分区
文件test1.txt的内容如下
hello welcome to china hello java hello welcome to home文件test2.txt内容如下
you are the best and i like you can you help me提示,谁应该作为K2
数据格式说明: 每一行中的第16到第19个字符,是年份。如下面的1901 每一行中的第88个字符表示温度的正与负 。如-,+ 每一行中的第89到第92个字符表示温度。如果是9999,表示是无效温度 第93个字符如果是0,1,4,5,9表示有效温度002902907099999 1901 010…01N9 -0039 1+99999102231ADDGF108991999999999999999999 002902907099999 1901 010…01N9 +0000 1+99999101821ADDGF104991999999999999999999 ………………………………………
**数据源:**在Phase02-03-Mapreduce里的datasource目录下的ncdc1里
要求:2000年以前的数据存到一个文件里,2000年以后的数据存储到一个文件里 >=2000
代码:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; public class TemperatureDriver { public static void main(String[] args) throws Exception { //获取配置对象 Configuration conf = new Configuration(); //获取Job实例 Job job = Job.getInstance(conf,"temptrue"); //设定驱动类型 job.setJarByClass(TemperatureDriver.class); //设置job要执行的Mapper类型和Reducer类型 job.setMapperClass(TempMapper.class); job.setReducerClass(TempReducer.class); //设置k3,v3的泛型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //设置自定义的分区器 job.setPartitionerClass(TempPartitioner.class); //修改reduceTask的数量 job.setNumReduceTasks(2); //设置要统计的输入路径以及要输出的位置路径, 借助main方法的形参的第一个元素表示要统计的路径 FileInputFormat.setInputPaths(job,new Path("...\\datasource\\ncdc1")); Path path = new Path("D:/temp"); FileSystem fs = FileSystem.get(conf); if(fs.exists(path)){ fs.delete(path,true); } FileOutputFormat.setOutputPath(job,path);//借助main方法的形参的第二个元素表示输出位置 //提交作业,等待完成 boolean flag = job.waitForCompletion(true); System.exit(flag?0:1); } public static class TempPartitioner extends Partitioner<IntWritable,IntWritable>{ public int getPartition(IntWritable key, IntWritable value, int numPartitions) { if(key.get()<2000){ return 0; } return 1; } } /** *0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999 *数据格式说明: * 每一行中的第16到第19个字符,是年份。如下面的1901 * 每一行中的第88个字符表示温度的正与负 。如-,+ * 每一行中的第89到第92个字符表示温度。如果是9999,表示是无效温度 * 第93个字符如果是0,1,4,5,9表示有效温度 * * * K1泛型: 行偏移量 LongWritable * V1泛型: 行记录 Text * K2泛型: 年份 IntWritable * V2泛型: 温度 IntWritable * */ public static class TempMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable> { //定义k2,v2对象 IntWritable k2 = new IntWritable(); IntWritable v2 = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将value转成字符串类型 String line = value.toString(); //获取年份 int year = Integer.parseInt(line.substring(15,19)); //获取温度的正负符号 String symbol = line.substring(87,88); //获取温度, 有可能最高温度是负数的。比如这一年这种的最高温度是-2. int temp; if("+".equals(symbol)){ temp = Integer.parseInt(line.substring(88,92)); }else{ temp = Integer.parseInt(line.substring(87,92)); } //获取有效温度符号 String sym = line.substring(92,93); if(Math.abs(temp)!=9999&&sym.matches("[01459]")){ //将年份和温度封装到k2,和v2里 k2.set(year); v2.set(temp); //写出去 context.write(k2,v2); } } } /** * * k2: 年份 泛型IntWritable * v2: 温度 泛型IntWritable * k3: 年份 泛型IntWritable * v3: 最高温度 泛型IntWritable * * * 在进入reduce函数之前,同一年的所有温度被汇集到了一起。变成了List<v2> * */ public static class TempReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ IntWritable v3 = new IntWritable(); @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //values就是同一年的所有的温度,只需要找到这些温度的最大值即可 int max = -10000; Iterator<IntWritable> it = values.iterator(); while(it.hasNext()){ int current = it.next().get(); if(max<current){ max = current; } } //循环结束后,max一定是最高温度,封装到v3里. v3.set(max); //k3就是k2,写出去即可 context.write(key,v3); } } } public static class TemperaturePartitioner extends Partitioner<IntWritable,IntWritable>{ /** * * @param key 需要分区的key,这里是k2 * @param value key对应的value * @param numPartitions 分区总数 * @return key的分区号 * * 注意: 在进入环形缓冲区时,就会被调用,调用后,<K2,V2>这一个键值对就已经被标记为时属于哪一个分区的了 */ public int getPartition(IntWritable key, IntWritable value, int numPartitions) { if(key.get()<2000){ return 0; } return 1; } } }1):使用mr程序统计每年入职的人数。
最终结果格式如下: 年份:1980 人数:xxx 年份:1981 人数:xxx import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; /** * @Description 使用mr程序统计每年入职的人数。 */ public class Exercise01 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"emp_count"); //只要不是在集群上使用yarn运行,可以不写 job.setJarByClass(Exercise01.class); job.setMapperClass(EmpMapper.class); job.setReducerClass(EmpReducer.class); job.setMapOutputKeyClass(IntWritable.class); //job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path("D:\\data\\Emp.txt")); FileSystem fs = FileSystem.get(conf); Path dstPath = new Path("D:/empoutput"); if(fs.exists(dstPath)){ fs.delete(dstPath,true); } FileOutputFormat.setOutputPath(job,dstPath); System.exit(job.waitForCompletion(true)?0:1); } public static class EmpMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable>{ //提成 成员变量(属性),节省内存空间 IntWritable k2 = new IntWritable(); IntWritable v2 = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); //获取年份 int year = Integer.parseInt(split[4].substring(0,4)); k2.set(year); //写出去 context.write(k2,v2); } } public static class EmpReducer extends Reducer<IntWritable,IntWritable, Text,Text> { Text k3 = new Text(); Text v3 = new Text(); @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { Iterator<IntWritable> iterator = values.iterator(); int sum = 0;//计数器 while(iterator.hasNext()){ sum+=iterator.next().get(); } k3.set("年份:"+key.get()); v3.set("人数:"+sum); //写出去 context.write(k3,v3); } } }