基于hadoop的mapreduce實作反向索引
反向索引(英語:Inverted index),也常被稱為反向索引、置入檔案或反向檔案,是一種索引方法,被用來存儲在全文搜尋下某個單詞在一個文檔或者一組文檔中的存儲位置的映射。它是文檔檢索系統中最常用的資料結構。通過反向索引,可以根據單詞快速擷取包含這個單詞的文檔清單。反向索引主要由兩個部分組成:“單詞詞典”和“倒排檔案”。
反向索引有兩種不同的反向索引形式:一條記錄的水準反向索引(或者反向檔案索引)包含每個引用單詞的文檔的清單。一個單詞的水準反向索引(或者完全反向索引)又包含每個單詞在一個文檔中的位置。後者的形式提供了更多的相容性(比如短語搜尋),但是需要更多的時間和空間來建立。現代搜尋引擎的索引[3]都是基于反向索引。相比“簽名檔案”、“字尾樹”等索引結構,“反向索引”是實作單詞到文檔映射關系的最佳實作方式和最有效的索引結構。
實作目标:
輸入一組文檔:
file1 | xiao lin lin |
file2 | yu yu xiao lin |
file3 | xiao lin xiao lin |
輸出結果:
lin | file1:2;file2:1;file3:2 |
xiao | file3:2;file2:1;file1:1 |
yu | file2:2 |
實作代碼:
package com.ds.demo;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class InvertedIndexMapper extends
Mapper<Object, Text, Text, Text>{
private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split;
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
//擷取文檔
split = (FileSplit)context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static class InvertedIndexCombiner extends
Reducer<Text, Text, Text, Text>{
private Text info= new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException{
int sum = 0;
for(Text value : values){
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
public static class InvertedIndexReducer extends
Reducer<Text, Text, Text, Text>{
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException{
String fileList = new String();
for(Text value : values){
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "192.168.9.201:9001");
String[] ars=new String[]{"B","InvertedOut"};
String[] otherArgs = new GenericOptionsParser(conf, ars).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage: invertedindex <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
經過Map
key | value |
xiao:file1 | 1 |
lin:file1 | 1 |
lin:file1 | 1 |
...... |
經過shuffle
xiao:file1 | [1] |
lin:file1 | [1,1] |
yu:file2 | [1,1] |
.... |
經過reduce
xiao | file1:1 |
lin | file1:2 |
yu | file2:2 |
xiao | file2:1 |
lin | file2:1 |
.... |
再次shuffle
xiao | [file1:1,file2:1.file3:2] |
lin | [file1:2,file2:1,file3:2] |
yu | [file2:2] |
再次reduce 得到結果