mapreduce练习10 表合并map端

    科技2022-07-14  115

    1.TableBean

    import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class TableBean implements Writable { private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag;// 表的标记 public TableBean() { super(); } public TableBean(String order_id, String p_id, int amount, String pname, String flag) { super(); this.order_id = order_id; this.p_id = p_id; this.amount = amount; this.pname = pname; this.flag = flag; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } @Override public String toString() { return order_id + "\t" + pname + "\t" + amount + "\t" ; } }

    2.TableReducer

    import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 1准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准备bean对象 TableBean pdBean = new TableBean(); for (TableBean bean : values) { if ("0".equals(bean.getFlag())) {// 订单表 // 拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 产品表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } //数据拼接 for (TableBean bean : orderBeans){ //更新产品名称字段 bean.setPname(pdBean.getPname()); //写出 context.write(bean,NullWritable.get()); } } }

    3.DistributedCacheMapper

    import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private Map<String, String> pdMap = new HashMap<>(); private Text k = new Text(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 读取缓存文件 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "utf-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { // 切割 String[] fields = line.split(" "); // 缓存数据到集合中 pdMap.put(fields[0], fields[1]); } // 关流 reader.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //System.out.println(pdMap.keySet()); // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split(" "); // 3 获取产品id String pId = fields[1]; // 4 获取商品名称 String pdName = pdMap.get(pId); // 5 拼接 k.set(line + " "+ pdName); context.write(k, NullWritable.get()); } }

    4.DistributedCacheDriver

    import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistributedCacheDriver { public static void main(String[] args) throws Exception { // 1 获取配置信息,或者job对象实例 String int_path = "hdfs://gjh:9000/1702240034/table"; String out_path = "hdfs://gjh:9000/1702240034/output_maptable"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(int_path), conf); if (fs.exists(new Path(out_path))) { fs.delete(new Path(out_path), true); } Job job = Job.getInstance(conf); // 2 设置加载 jar 包路径 job.setJarByClass(DistributedCacheDriver.class); // 3 关联 map job.setMapperClass(DistributedCacheMapper.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(int_path)); FileOutputFormat.setOutputPath(job, new Path(out_path)); // 6 加载缓存数据 job.addCacheFile(new URI("hdfs://gjh:9000/1702240034/table/pd.txt")); // 7 map 端 join 的逻辑不需要 reduce 阶段,设置 reducetask 数量为 0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

    5.

    Processed: 0.021, SQL: 8