1.TableBean
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
import org
.apache
.hadoop
.io
.Writable
;
public class TableBean implements Writable {
private String order_id
;
private String p_id
;
private int amount
;
private String pname
;
private String flag
;
public TableBean() {
super();
}
public TableBean(String order_id
, String p_id
, int amount
, String pname
, String flag
) {
super();
this.order_id
= order_id
;
this.p_id
= p_id
;
this.amount
= amount
;
this.pname
= pname
;
this.flag
= flag
;
}
public String
getFlag() {
return flag
;
}
public void setFlag(String flag
) {
this.flag
= flag
;
}
public String
getOrder_id() {
return order_id
;
}
public void setOrder_id(String order_id
) {
this.order_id
= order_id
;
}
public String
getP_id() {
return p_id
;
}
public void setP_id(String p_id
) {
this.p_id
= p_id
;
}
public int getAmount() {
return amount
;
}
public void setAmount(int amount
) {
this.amount
= amount
;
}
public String
getPname() {
return pname
;
}
public void setPname(String pname
) {
this.pname
= pname
;
}
@Override
public void write(DataOutput out
) throws IOException
{
out
.writeUTF(order_id
);
out
.writeUTF(p_id
);
out
.writeInt(amount
);
out
.writeUTF(pname
);
out
.writeUTF(flag
);
}
@Override
public void readFields(DataInput in
) throws IOException
{
this.order_id
= in
.readUTF();
this.p_id
= in
.readUTF();
this.amount
= in
.readInt();
this.pname
= in
.readUTF();
this.flag
= in
.readUTF();
}
@Override
public String
toString() {
return order_id
+ "\t" + pname
+ "\t" + amount
+ "\t" ;
}
}
2.TableReducer
import java
.io
.IOException
;
import java
.util
.ArrayList
;
import org
.apache
.commons
.beanutils
.BeanUtils
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key
, Iterable
<TableBean> values
, Context context
) throws IOException
, InterruptedException
{
ArrayList
<TableBean> orderBeans
= new ArrayList<>();
TableBean pdBean
= new TableBean();
for (TableBean bean
: values
) {
if ("0".equals(bean
.getFlag())) {
TableBean orderBean
= new TableBean();
try {
BeanUtils
.copyProperties(orderBean
, bean
);
} catch (Exception e
) {
e
.printStackTrace();
}
orderBeans
.add(orderBean
);
} else {
try {
BeanUtils
.copyProperties(pdBean
, bean
);
} catch (Exception e
) {
e
.printStackTrace();
}
}
}
for (TableBean bean
: orderBeans
){
bean
.setPname(pdBean
.getPname());
context
.write(bean
,NullWritable
.get());
}
}
}
3.TableMapper
import java
.io
.IOException
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileSplit
;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
TableBean bean
= new TableBean();
Text k
= new Text();
@Override
protected void map(LongWritable key
, Text value
, Context context
)
throws IOException
, InterruptedException
{
FileSplit split
= (FileSplit
) context
.getInputSplit();
String name
= split
.getPath().getName();
String line
= value
.toString();
if (name
.startsWith("order.txt")) {
String
[] fields
= line
.split(" ");
bean
.setOrder_id(fields
[0]);
bean
.setP_id(fields
[1]);
bean
.setAmount(Integer
.parseInt(fields
[2]));
bean
.setPname("");
bean
.setFlag("0");
k
.set(fields
[1]);
}else {
String
[] fields
= line
.split(" ");
bean
.setP_id(fields
[0]);
bean
.setPname(fields
[1]);
bean
.setFlag("1");
bean
.setAmount(0);
bean
.setOrder_id("");
k
.set(fields
[0]);
}
context
.write(k
, bean
);
}
}
4.TableDriver
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.NullWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.net
.URI
;
public class TableDriver {
public static void main(String
[] args
) throws Exception
{
String int_path
= "hdfs://gjh:9000/1702240034/table";
String out_path
= "hdfs://gjh:9000/1702240034/output_reducetable";
Configuration conf
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI(int_path
), conf
);
if (fs
.exists(new Path(out_path
))) {
fs
.delete(new Path(out_path
), true);
}
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(TableDriver
.class);
job
.setMapperClass(TableMapper
.class);
job
.setReducerClass(TableReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(TableBean
.class);
job
.setOutputKeyClass(TableBean
.class);
job
.setOutputValueClass(NullWritable
.class);
FileInputFormat
.setInputPaths(job
, new Path(int_path
));
FileOutputFormat
.setOutputPath(job
, new Path(out_path
));
boolean result
= job
.waitForCompletion(true);
System
.exit(result
? 0 : 1);
}
}
5.
转载请注明原文地址:https://blackberry.8miu.com/read-7720.html