1.输入数据
gjh@gjh:~/date$ cat fridents.txt
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D1
L:D,E,F
M:E,F,G
O:A,H,I,J
2.第一次输出
3.第二次输出
4.FriendsOneMapper
import java
.io
.IOException
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
public class FriendsOneMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k
= new Text();
Text v
= new Text();
@Override
protected void map(LongWritable key
, Text value
, Mapper
<LongWritable, Text, Text, Text>.Context context
)
throws IOException
, InterruptedException
{
String line
= value
.toString();
String
[] oneSplit
= line
.split(":");
String
[] twoSplit
= oneSplit
[1].split(",");
for (String string
: twoSplit
) {
k
.set(string
);
v
.set(oneSplit
[0]);
context
.write(k
, v
);
}
}
}
5.FriendsOneReducer
import java
.io
.IOException
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
public class FriendsOneReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key
, Iterable
<Text> values
, Reducer
<Text, Text, Text, Text>.Context context
)
throws IOException
, InterruptedException
{
StringBuffer kstr
= new StringBuffer();
for (Text text
: values
) {
kstr
.append(text
.toString()+",");
}
context
.write(key
, new Text(kstr
.toString()));
}
}
6.FriendsOneDirver
import java
.io
.IOException
;
import java
.net
.URI
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
public class FriendsOneDirver {
public static void main(String
[] args
) throws Exception
, ClassNotFoundException
, InterruptedException
{
String int_path
= "hdfs://gjh:9000/1702240034/fridents.txt";
String out_path
= "hdfs://gjh:9000/1702240034/output_comfridents1";
Configuration conf
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI(int_path
), conf
);
if (fs
.exists(new Path(out_path
))) {
fs
.delete(new Path(out_path
), true);
}
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(FriendsOneDirver
.class);
job
.setMapperClass(FriendsOneMapper
.class);
job
.setReducerClass(FriendsOneReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(Text
.class);
FileInputFormat
.setInputPaths(job
, new Path(int_path
));
FileOutputFormat
.setOutputPath(job
, new Path(out_path
));
job
.waitForCompletion(true);
}
}
7.FriendsTwoMapper
import java
.io
.IOException
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
public class FriendsTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k
= new Text();
Text v
= new Text();
@Override
protected void map(LongWritable key
, Text value
, Mapper
<LongWritable, Text, Text, Text>.Context context
)
throws IOException
, InterruptedException
{
String line
= value
.toString();
String
[] oneSplit
= line
.split("\t");
String
[] twoSplit
= oneSplit
[1].split(",");
for (int i
= 0; i
< twoSplit
.length
; i
++) {
for (int j
= i
+ 1; j
< twoSplit
.length
; j
++) {
k
.set(twoSplit
[i
] + "-" + twoSplit
[j
]);
v
.set(oneSplit
[0]);
context
.write(k
, v
);
}
}
}
}
8.FriendsTwoReducer
import java
.io
.IOException
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
public class FriendsTwoReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key
, Iterable
<Text> values
,
Reducer
<Text, Text, Text, Text>.Context context
)
throws IOException
, InterruptedException
{
StringBuffer valuesBuffer
= new StringBuffer();
for (Text text
: values
) {
valuesBuffer
.append(text
.toString()+" ");
}
Text v
= new Text();
v
.set(valuesBuffer
.toString());
context
.write(key
, v
);
}
}
9.FriendsTwoDirver
import java
.io
.IOException
;
import java
.net
.URI
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
public class FriendsTwoDirver {
public static void main(String
[] args
) throws Exception
, ClassNotFoundException
, InterruptedException
{
String int_path
= "hdfs://gjh:9000/1702240034/output_comfridents1/part-r-00000";
String out_path
= "hdfs://gjh:9000/1702240034/output_comfridents2";
Configuration conf
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI(int_path
), conf
);
if (fs
.exists(new Path(out_path
))) {
fs
.delete(new Path(out_path
), true);
}
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(FriendsTwoDirver
.class);
job
.setMapperClass(FriendsTwoMapper
.class);
job
.setReducerClass(FriendsTwoReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(Text
.class);
FileInputFormat
.setInputPaths(job
, new Path(int_path
));
FileOutputFormat
.setOutputPath(job
, new Path(out_path
));
job
.waitForCompletion(true);
}
}