mapreduce 练习9 表合并reduce端

    科技2022-07-14  123

    1.TableBean

    import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class TableBean implements Writable { private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag;// 表的标记 public TableBean() { super(); } public TableBean(String order_id, String p_id, int amount, String pname, String flag) { super(); this.order_id = order_id; this.p_id = p_id; this.amount = amount; this.pname = pname; this.flag = flag; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } @Override public String toString() { return order_id + "\t" + pname + "\t" + amount + "\t" ; } }

    2.TableReducer

    import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 1准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准备bean对象 TableBean pdBean = new TableBean(); for (TableBean bean : values) { if ("0".equals(bean.getFlag())) {// 订单表 // 拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 产品表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } //数据拼接 for (TableBean bean : orderBeans){ //更新产品名称字段 bean.setPname(pdBean.getPname()); //写出 context.write(bean,NullWritable.get()); } } }

    3.TableMapper

    import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{ TableBean bean = new TableBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取输入文件类型 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); // 2 获取输入数据 String line = value.toString(); // 3 不同文件分别处理 if (name.startsWith("order.txt")) {// 订单表处理 // 3.1 切割 String[] fields = line.split(" "); // 3.2 封装bean对象 bean.setOrder_id(fields[0]); bean.setP_id(fields[1]); bean.setAmount(Integer.parseInt(fields[2])); bean.setPname(""); bean.setFlag("0"); k.set(fields[1]); }else {// 产品表处理 // 3.3 切割 String[] fields = line.split(" "); // 3.4 封装bean对象 bean.setP_id(fields[0]); bean.setPname(fields[1]); bean.setFlag("1"); bean.setAmount(0); bean.setOrder_id(""); k.set(fields[0]); } // 4 写出 context.write(k, bean); } }

    4.TableDriver

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class TableDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息,或者job对象实例 String int_path = "hdfs://gjh:9000/1702240034/table"; String out_path = "hdfs://gjh:9000/1702240034/output_reducetable"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(int_path), conf); if (fs.exists(new Path(out_path))) { fs.delete(new Path(out_path), true); } Job job = Job.getInstance(conf); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(TableDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(int_path)); FileOutputFormat.setOutputPath(job, new Path(out_path)); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

    5.

    Processed: 0.015, SQL: 8