1.前言
學習hadoop的童鞋,反向索引這個算法還是挺重要的。這是以後展開工作的基礎。首先,我們來認識下什麼是倒拍索引:
反向索引簡單地就是:根據單詞,傳回它在哪個檔案中出現過,而且頻率是多少的結果。這就像百度裡的搜尋,你輸入一個關鍵字,那麼百度引擎就迅速的在它的伺服器裡找到有該關鍵字的檔案,并根據頻率和其他一些政策(如頁面點選投票率)等來給你傳回結果。這個過程中,反向索引就起到很關鍵的作用。
2.分析設計
反向索引涉及幾個過程:Map過程,Combine過程,Reduce過程。下面我們來分析以上的過程。
2.1Map過程
當你把需要處理的文檔上傳到hdfs時,首先預設的TextInputFormat類對輸入的檔案進行處理,得到檔案中每一行的偏移量和這一行内容的鍵值對<偏移量,内容>做為map的輸入。在改寫map函數的時候,我們就需要考慮,怎麼設計key和value的值來适合MapReduce架構,進而得到正确的結果。由于我們要得到單詞,所屬的文檔URL,詞頻,而<key,value>隻有兩個值,那麼就必須得合并其中得兩個資訊了。這裡我們設計key=單詞+URL,value=詞頻。即map得輸出為<單詞+URL,詞頻>,之是以将單詞+URL做為key,時利用MapReduce架構自帶得Map端進行排序。
下面舉個簡單得例子:

圖1 map過程 輸入/輸出
2.2 Combine過程
combine過程将key值相同得value值累加,得到一個單詞在文檔上得詞頻。但是為了把相同得key交給同一個reduce處理,我們需要設計為key=單詞,value=URL+詞頻
圖2 Combin過程 輸入/輸出
2.3Reduce過程
reduce過程其實就是一個合并的過程了,隻需将相同的key值的value值合并成反向索引需要的格式即可。
package reverseIndex;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
private Text keyInfo=new Text();
private Text valueInfo=new Text();
private FileSplit split;
public void map(Object key,Text value,Context context)throws IOException,InterruptedException {
//獲得<key,value>對所屬的對象
split=(FileSplit)context.getInputSplit();
StringTokenizer itr=new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
//key值有單詞和url組成,如"mapreduce:1.txt"
keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text info=new Text();
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException {
//統計詞頻
int sum=0;
for (Text value:values) {
sum+=Integer.parseInt(value.toString());
}
int splitIndex=key.toString().indexOf(":");
//重新設定value值由url和詞頻組成
info.set(key.toString().substring(splitIndex+1)+":"+sum);
//重新設定key值為單詞
key.set(key.toString().substring(0,splitIndex));
context.write(key, info);
}
}
public static class InvertedIndexReduce extends Reducer<Text, Text, Text, Text> {
private Text result=new Text();
public void reduce(Text key,Iterable<Text>values,Context context) throws IOException,InterruptedException{
//生成文檔清單
String fileList=new String();
for (Text value:values) {
fileList+=value.toString()+";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf=new Configuration();
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length!=2) {
System.err.println("Usage:invertedindex<in><out>");
System.exit(2);
}
Job job=new Job(conf,"InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
View Code