mapreduce练习12 流量使用前10

    科技2024-06-14  91

    1.FlowBean

    import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public int compareTo(FlowBean o) { //比较 逆序,按照降序排列 return Long.compare(o.sumFlow, this.sumFlow); } @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } }

    2.FlowGroupingComparator

    import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class FlowGroupingComparator extends WritableComparator { protected FlowGroupingComparator() { super(FlowBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { return 0; //返回0,让所有数据都相等 } }

    3.FlowMapper

    import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> { private FlowBean k = new FlowBean(); private Text v = new Text(); //ctrl + o 重写方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); //按照/t进行切分 v.set(fields[0]); k.setUpFlow(Long.parseLong(fields[1])); k.setDownFlow(Long.parseLong(fields[2])); k.setSumFlow(Long.parseLong(fields[3])); context.write(k, v); } }

    4.FlowReducer

    import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {//输入k,v,输出k,v @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //获取迭代器 Iterator<Text> iterator = values.iterator(); //取前十 for (int i = 0; i < 10; i++) { context.write(iterator.next(), key); } } }

    5.FlowDriver

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; public class FlowDriver { public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException { // 1 获取配置信息,或者job对象实例 String int_path = "hdfs://gjh:9000/1702240034/output/part-r-00000"; String out_path = "hdfs://gjh:9000/1702240034/output_top10"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(int_path), conf); if (fs.exists(new Path(out_path))) { fs.delete(new Path(out_path), true); } Job job = Job.getInstance(conf); //指定本程序的jar包所在的本地路径 job.setJarByClass(FlowDriver.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setGroupingComparatorClass(FlowGroupingComparator.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(int_path)); FileOutputFormat.setOutputPath(job, new Path(out_path)); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

    6.

    Processed: 0.016, SQL: 9