1.FlowBean
import org
.apache
.hadoop
.io
.WritableComparable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow
;
private long downFlow
;
private long sumFlow
;
@Override
public String
toString() {
return upFlow
+ "\t" + downFlow
+ "\t" + sumFlow
;
}
public long getUpFlow() {
return upFlow
;
}
public void setUpFlow(long upFlow
) {
this.upFlow
= upFlow
;
}
public long getDownFlow() {
return downFlow
;
}
public void setDownFlow(long downFlow
) {
this.downFlow
= downFlow
;
}
public long getSumFlow() {
return sumFlow
;
}
public void setSumFlow(long sumFlow
) {
this.sumFlow
= sumFlow
;
}
@Override
public int compareTo(FlowBean o
) {
return Long
.compare(o
.sumFlow
, this.sumFlow
);
}
@Override
public void write(DataOutput out
) throws IOException
{
out
.writeLong(upFlow
);
out
.writeLong(downFlow
);
out
.writeLong(sumFlow
);
}
@Override
public void readFields(DataInput in
) throws IOException
{
this.upFlow
= in
.readLong();
this.downFlow
= in
.readLong();
this.sumFlow
= in
.readLong();
}
}
2.FlowGroupingComparator
import org
.apache
.hadoop
.io
.WritableComparable
;
import org
.apache
.hadoop
.io
.WritableComparator
;
public class FlowGroupingComparator extends WritableComparator {
protected FlowGroupingComparator() {
super(FlowBean
.class, true);
}
@Override
public int compare(WritableComparable a
, WritableComparable b
) {
return 0;
}
}
3.FlowMapper
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean k
= new FlowBean();
private Text v
= new Text();
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String
[] fields
= value
.toString().split("\t");
v
.set(fields
[0]);
k
.setUpFlow(Long
.parseLong(fields
[1]));
k
.setDownFlow(Long
.parseLong(fields
[2]));
k
.setSumFlow(Long
.parseLong(fields
[3]));
context
.write(k
, v
);
}
}
4.FlowReducer
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
import java
.util
.Iterator
;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key
, Iterable
<Text> values
, Context context
) throws IOException
, InterruptedException
{
Iterator
<Text> iterator
= values
.iterator();
for (int i
= 0; i
< 10; i
++) {
context
.write(iterator
.next(), key
);
}
}
}
5.FlowDriver
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.io
.IOException
;
import java
.net
.URI
;
public class FlowDriver {
public static void main(String
[] args
) throws Exception
, ClassNotFoundException
, InterruptedException
{
String int_path
= "hdfs://gjh:9000/1702240034/output/part-r-00000";
String out_path
= "hdfs://gjh:9000/1702240034/output_top10";
Configuration conf
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI(int_path
), conf
);
if (fs
.exists(new Path(out_path
))) {
fs
.delete(new Path(out_path
), true);
}
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(FlowDriver
.class);
job
.setMapperClass(FlowMapper
.class);
job
.setReducerClass(FlowReducer
.class);
job
.setGroupingComparatorClass(FlowGroupingComparator
.class);
job
.setMapOutputKeyClass(FlowBean
.class);
job
.setMapOutputValueClass(Text
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(FlowBean
.class);
FileInputFormat
.setInputPaths(job
, new Path(int_path
));
FileOutputFormat
.setOutputPath(job
, new Path(out_path
));
boolean b
= job
.waitForCompletion(true);
System
.exit(b
? 0 : 1);
}
}
6.
转载请注明原文地址:https://blackberry.8miu.com/read-31762.html