MapReduce实现表连接
表一:
id city
1 北京
2 天津
3 河北
4 山西
5 内蒙古
6 辽宁
7 吉林
8 黑龙江
表二:
id year num
1 2010 1962
1 2011 2019
2 2010 1299
2 2011 1355
4 2011 3574
4 2011 3593
9 2010 2303
9 2011 2347
需求
根据两张表的id,用mapreduce将两个表进行连接操作(关联)
1 北京 2011 2019
1 北京 2010 1962
2 天津 2011 1355
2 天津 2010 1299
4 山西 2011 3593
4 山西 2011 3574
思路:
map程序每次都是读取一行数据,读取两个表内的数据,可以根据数据来源(文件名称)判断当前读取的数据是来自哪一张表,然后打上标记送入reduce去处理,map输出的key是id值,value是id对应的数据。
reduce接收到的数据是<key,{value1,value2,value3,…}>,key是输入的id的值,value中包含表一的id对应的数据,也包含表二的id对应的数据,我们可以通过map里打的标记进行区分,分别记录到list1与list2中,然后将两个list中的数据进行笛卡尔积就能得到一个id连接后的数据,将所有id都进行这样的操作,就能把整个表都处理完
代码
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileSplit
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.io
.IOException
;
import java
.util
.LinkedList
;
import java
.util
.List
;
public class ReduceJoinTest {
public static class mapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
FileSplit fileSplit
= (FileSplit
) context
.getInputSplit();
String name
= fileSplit
.getPath().getName();
String line
= value
.toString();
if (line
== null
|| line
.equals("")) return;
String
[] split
= line
.split("\\s+");
if (name
.contains("tb_a")) {
String id
= split
[0];
String city
= split
[1];
context
.write(new Text(id
), new Text("#" + city
));
} else if (name
.contains("tb_b")) {
String id
= split
[0];
String num1
= split
[1];
String num2
= split
[2];
context
.write(new Text(id
), new Text("$" + num1
+ "\t" + num2
));
}
}
}
public static class reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key
, Iterable
<Text> values
, Context context
) throws IOException
, InterruptedException
{
List
<String> list1
= new LinkedList<>();
List
<String> list2
= new LinkedList<>();
for (Text text
: values
) {
String value
= text
.toString();
if (value
.startsWith("#")) {
value
= value
.substring(1);
list1
.add(value
);
} else if (value
.startsWith("$")) {
value
= value
.substring(1);
list2
.add(value
);
}
}
for (String a
: list1
) {
for (String b
: list2
) {
context
.write(key
, new Text(a
+ "\t" + b
));
}
}
}
}
public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{
Configuration conf
= new Configuration();
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(ReduceJoinTest
.class);
job
.setMapperClass(mapper
.class);
job
.setReducerClass(reducer
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(Text
.class);
FileInputFormat
.setInputPaths(job
, new Path("L:\\JAVA\\Hadoop\\src\\xyz\\liujiawei\\join\\src\\input"));
FileOutputFormat
.setOutputPath(job
, new Path("L:\\JAVA\\Hadoop\\src\\xyz\\liujiawei\\join\\src\\output"));
System
.exit(job
.waitForCompletion(true) ? 0 : -1);
}
}