mapreduce 练习13 找出博客共同好友

    科技2024-06-25  66

    1.输入数据

    gjh@gjh:~/date$ cat fridents.txt A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D1 L:D,E,F M:E,F,G O:A,H,I,J

    2.第一次输出

    3.第二次输出

    4.FriendsOneMapper

    import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FriendsOneMapper extends Mapper<LongWritable, Text, Text, Text>{ Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //1.获取一行 //A:B,C,D,F,E,O String line = value.toString(); //2.先以冒号切割 String[] oneSplit = line.split(":"); //3.将第一次切割的第二部分以逗号切割 String[] twoSplit = oneSplit[1].split(","); //4.将第一次切割的第一部分。和第二次切割的每一部分写出去 //第二次切割的每一部分为key 第一次切割的部分为value for (String string : twoSplit) { k.set(string); v.set(oneSplit[0]); context.write(k, v); } } }

    5.FriendsOneReducer

    import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FriendsOneReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer kstr = new StringBuffer(); for (Text text : values) { kstr.append(text.toString()+","); } context.write(key, new Text(kstr.toString())); } }

    6.FriendsOneDirver

    import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FriendsOneDirver { public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException { //1.获取job // 1 获取配置信息,或者job对象实例 String int_path = "hdfs://gjh:9000/1702240034/fridents.txt"; String out_path = "hdfs://gjh:9000/1702240034/output_comfridents1"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(int_path), conf); if (fs.exists(new Path(out_path))) { fs.delete(new Path(out_path), true); } Job job = Job.getInstance(conf); //2.设置jar file job.setJarByClass(FriendsOneDirver.class); //3.设置map class job.setMapperClass(FriendsOneMapper.class); //4.设置reducer class job.setReducerClass(FriendsOneReducer.class); //5.设置map的输出 job.setMapOutputKeyClass(Text.class); //6设置reduce 的输出 job.setMapOutputValueClass(Text.class); //7.设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(int_path)); FileOutputFormat.setOutputPath(job, new Path(out_path)); //8.提交任务 job.waitForCompletion(true); } }

    7.FriendsTwoMapper

    import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FriendsTwoMapper extends Mapper<LongWritable, Text, Text, Text> { Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // 1.获取一行 String line = value.toString(); // 2.切割 String[] oneSplit = line.split("\t"); String[] twoSplit = oneSplit[1].split(","); // 3.输出 for (int i = 0; i < twoSplit.length; i++) { for (int j = i + 1; j < twoSplit.length; j++) { k.set(twoSplit[i] + "-" + twoSplit[j]); v.set(oneSplit[0]); context.write(k, v); } } } }

    8.FriendsTwoReducer

    import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FriendsTwoReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key , Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer valuesBuffer = new StringBuffer(); for (Text text : values) { valuesBuffer.append(text.toString()+" "); } Text v = new Text(); v.set(valuesBuffer.toString()); context.write(key, v); } }

    9.FriendsTwoDirver

    import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FriendsTwoDirver { public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException { //1.获取job // 1 获取配置信息,或者job对象实例 String int_path = "hdfs://gjh:9000/1702240034/output_comfridents1/part-r-00000"; String out_path = "hdfs://gjh:9000/1702240034/output_comfridents2"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(int_path), conf); if (fs.exists(new Path(out_path))) { fs.delete(new Path(out_path), true); } Job job = Job.getInstance(conf); //2.设置jar file job.setJarByClass(FriendsTwoDirver.class); //3.设置map class job.setMapperClass(FriendsTwoMapper.class); //4.设置reducer class job.setReducerClass(FriendsTwoReducer.class); //5.设置map的输出 job.setMapOutputKeyClass(Text.class); //6设置reduce 的输出 job.setMapOutputValueClass(Text.class); //7.设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(int_path)); FileOutputFormat.setOutputPath(job, new Path(out_path)); //8.提交任务 job.waitForCompletion(true); } }
    Processed: 0.019, SQL: 8