1.输入数据
gjh@gjh:~/date$ cat index1.txt
MapReduce is sample gjh
gjh@gjh:~/date$ cat index2.txt
MapReduce is powerful is sample
gjh@gjh:~/date$ cat index3.txt
Hello MapReduce hello world gjh
2.输出
3.代码
import java
.io
.IOException
;
import java
.net
.URI
;
import java
.util
.StringTokenizer
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.FileSystem
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileSplit
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
public class InversedIndex {
public static class InversedIndexMapper extends Mapper<Object, Text, Text, Text> {
private Text outKey
= new Text();
private Text outVal
= new Text();
@Override
public void map
(Object key
,Text value
,Context context
) {
StringTokenizer tokens
= new StringTokenizer(value
.toString());
FileSplit split
= (FileSplit
) context
.getInputSplit();
while(tokens
.hasMoreTokens()) {
String token
= tokens
.nextToken();
try {
outKey
.set(token
+ ":" + split
.getPath());
outVal
.set("1");
context
.write(outKey
, outVal
);
} catch (IOException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
}
public static class InversedIndexCombiner extends Reducer<Text, Text, Text, Text> {
private Text outKey
= new Text();
private Text outVal
= new Text();
@Override
public void reduce(Text key
,Iterable
<Text> values
,Context context
) {
String
[] keys
= key
.toString().split(":");
int sum
= 0;
for(Text val
: values
) {
sum
+= Integer
.parseInt(val
.toString());
}
try {
outKey
.set(keys
[0]);
int index
= keys
[keys
.length
-1].lastIndexOf('/');
outVal
.set(keys
[keys
.length
-1].substring(index
+1) + ":" + sum
);
context
.write(outKey
, outVal
);
} catch (IOException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
public static class InversedIndexReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce
(Text key
,Iterable
<Text> values
,Context context
) {
StringBuffer sb
= new StringBuffer();
for(Text text
: values
) {
sb
.append(text
.toString() + " ,");
}
try {
context
.write(key
, new Text(sb
.toString()));
} catch (IOException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
public static void main(String
[] args
) throws Exception
, InterruptedException
, ClassNotFoundException
{
String int_path
= "hdfs://gjh:9000/1702240034/index";
String out_path
= "hdfs://gjh:9000/1702240034/output_index";
Configuration conf
= new Configuration();
FileSystem fs
= FileSystem
.get(new URI(int_path
), conf
);
if (fs
.exists(new Path(out_path
))) {
fs
.delete(new Path(out_path
), true);
}
Job job
= new Job(conf
,"index inversed");
job
.setJarByClass(InversedIndex
.class);
job
.setMapperClass(InversedIndexMapper
.class);
job
.setCombinerClass(InversedIndexCombiner
.class);
job
.setReducerClass(InversedIndexReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(Text
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(Text
.class);
job
.setNumReduceTasks(3);
FileInputFormat
.addInputPath(job
, new Path(int_path
));
FileOutputFormat
.setOutputPath(job
, new Path(out_path
));
System
.exit(job
.waitForCompletion(true)?0:1);
}
}
转载请注明原文地址:https://blackberry.8miu.com/read-31767.html