MapReduce案例之序列化统计流量

    科技2026-01-17  10

    序列化

    序列化:把内存中的对象转化为字节序列(或数据传输协议)以便于存储到硬盘(或网络传输)。 反序列化:把接收的字节序列(或数据传输协议)转化为内存中的对象。 在MapReduce中,默认采用的序列化机制是AVRO,为了方便使用,提供了接口Writable

    需求

    统计用户的上传流量和下载流量

    文件

    1 12344567895 2153 6185 2 13344567895 2253 6285 3 12344567895 2353 6385 4 13344567895 2453 6485 5 14344567895 2553 6585 6 14344567895 2653 6685

    Bean类

    import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private Long upFlow; private Long downFlow; private Long sumFlow; public FlowBean() { } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } public void set(Long upFlow, Long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } @Override public String toString() { return "FlowBean=[\t" + "upFlow="+upFlow+"\t"+"downFlow="+downFlow+"\t"+"sumFlow="+sumFlow+"]"; } /** * 序列化方法 * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } /** * 反序列化方法 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); } }

    Mapper类

    import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private Text phone = new Text(); private FlowBean flowBean = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] datas = value.toString().split("\t"); phone.set(datas[1]); long upFlow = Long.parseLong(datas[datas.length -2]); long downFlow = Long.parseLong(datas[datas.length -1]); flowBean.set(upFlow, downFlow); context.write(phone, flowBean); } }

    Reducer类

    import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean sumFlow = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean value : values) { sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); } sumFlow.set(sumUpFlow, sumDownFlow); context.write(key, sumFlow); } }

    Driver类

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setOutputKeyClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

    输出结果

    12344567895 FlowBean=[ upFlow=2353 downFlow=6385 sumFlow=8738] 12344567895 FlowBean=[ upFlow=2153 downFlow=6185 sumFlow=8338] 13344567895 FlowBean=[ upFlow=2453 downFlow=6485 sumFlow=8938] 13344567895 FlowBean=[ upFlow=2253 downFlow=6285 sumFlow=8538] 14344567895 FlowBean=[ upFlow=2653 downFlow=6685 sumFlow=9338] 14344567895 FlowBean=[ upFlow=2553 downFlow=6585 sumFlow=9138]
    Processed: 0.015, SQL: 9