文檔倒排算法簡介
Inverted Index(反向索引)是目前幾乎所有支援全文檢索的搜尋引擎都要依賴的一個資料結構。基于索引結構,給出一個詞(term),能取得含有這個term的文檔清單(the list of documents)
Web Search中的問題主要分為三部分:
- crawling(gathering web content) ,網頁爬蟲,收集資料
- indexing(construction of the inverted index) ,根據大量資料建構反向索引結構
-
retrieval(ranking documents given a query),根據一個搜尋單詞進行索引并對結果進行排序,比如可以根據詞頻多少來排
crawling和indexing都是離線的,retrieval是線上、實時的。
此處有個問題,索引結構會如何進行存儲呢?
給定一個單詞,如何快速得到結果呢?
一般可以采用兩種存儲方式,一種是hash連結清單,還有一種則是B(B+)樹。
說起存儲我就想到我同學面試時被問到一個問題:a~z26個單詞如何存儲能快速索引?
(⊙﹏⊙)b居然是26叉樹,好喪心病狂啊!
基本的反向索引結構
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyN1IDOzATMxIzMwgDM1EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
實驗任務
請實作課堂上介紹的“帶詞頻屬性的文檔倒排算法”。
在統計詞語的反向索引時,除了要輸出帶詞頻屬性的反向索引,還請計算每個詞語的“平均
提及次數”(定義見下)并輸出。
“平均提及次數”在這裡定義為:
平均提及次數= 詞語在全部文檔中出現的頻數總和/ 包含該詞語的文檔數
假如文檔集中有四本小說:A、B、C、D。詞語“江湖”在文檔A 中出現了100 次,在文檔B
中出現了200 次,在文檔C 中出現了300 次,在文檔D 中沒有出現。則詞語“江湖”在該文
檔集中的“平均提及次數”為(100 + 200 + 300) / 3 = 200。
輸出格式
對于每個詞語,輸出兩個鍵值對,兩個鍵值對的格式如下:
[詞語] \TAB 詞語1:詞頻, 詞語2:詞頻, 詞語3:詞頻, …, 詞語100:詞頻
[詞語] \TAB 平均提及次數
下圖展示了輸出檔案的一個片段(圖中内容僅為格式示例):
設計
反向索引可以看做是wordcount的拓展,它需要統計一個單詞在多個檔案中出現的次數,那麼它的Mapper和Reducer該如何設計呢?
很自然地我們會想到
Mapper:
- 對于檔案file中任一word,
- Key = word, Value = fileName + 1.
Reduer:
- 對于輸入Key, Iterable(Text) Values,
- 統計Values中每個Value,記錄出現的fileName以及頻數.
這裡有個問題,它需要假定對于一個相同的Key,Mapper給出的輸出
design trick: value-to-key conversion
Value到Key的轉換
比如說對于原來的(term, (docid, tf))可以将value中的docid放到key中進而得到
新的鍵值對((term, docid), tf)。
這樣具有相同key值的鍵值對數目就降低啦!
關于Mapper裡邊的代碼我遇到兩個問題:
1.用空格 ” “來對line做分詞,在最後的輸出結果裡邊會出現空白單詞,很奇怪,雖然我在輸出的時候加了token為” “或”\t”的時候都不輸出,但是最後結果裡邊還是有空白單詞,匪夷所思诶。
2.Mapper的輸出如果采用((term:docid), tf)的形式,使用“:”來分隔term和docid,那麼在Combiner裡邊如果我使用”:”來分隔key(也就是下邊錯誤的Mapper方式),那麼得到的String個數有時候長度居然<2,是以我現在使用”->”來進行分隔。
public static class InverseIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString();
String[] strTokens = line.split(" ");
FileSplit inputSplit = (FileSplit)context.getInputSplit();
Path path = inputSplit.getPath();
String pathStr = path.toString();
int index = pathStr.lastIndexOf("/");
String strFileName = pathStr.substring(index + );
for (String token : strTokens){
if (token != " " && token != "\t"){
context.write(new Text(token + "->" + strFileName), new Text("1"));
}
}
}
}
Combiner的使用
為了減少Mapper的輸出,進而降低Mapper到Reducer的傳輸開銷以及存儲開銷,使用Combiner是個好方法,相當于是在每個Mapper結束之後先進行一次Reducer将結果彙總一下。
這裡是将相同文檔的相同term的詞頻統計一下。
我看到過有這樣一種處理方法,它為了Reducer友善處理,是以将Mapper的輸出從<
public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String strKey = key.toString();
String[] tokens = strKey.split("->");
int freq = ;
for (Text value : values){
freq++;
}
context.write(new Text(tokens[]), new Text(tokens[] + "->" + freq));
}
}
正确的如下
public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int freq = ;
for (Text value : values){
freq++;
}
context.write(key, new Text("" + freq));
}
}
Partitioner的設計
因為value-to-key conversion,Mapper的輸出中key變為了(term, docid)。如果采用預設的Partitioner,那麼具有相同term,不同docid的項很可能會被劃分到不同的Reducer,這與初衷是違背的啊,是以需要自定義一個Partitioner,用key中的term作為劃分的依據!
這裡有個小問題,如果我采用Combiner中的錯誤方式,将Combiner的輸出重新變化為了(term, (docid, tf)),那麼是否還需要自定義Partitioner了呢?
答案是需要的,看來Partitioner的判斷依據不是Combiner的輸出啦!
public static class InverseIndexPartitioner extends HashPartitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
// TODO Auto-generated method stub
String strKey = key.toString();
String[] tokens = strKey.split("->");
return super.getPartition(new Text(tokens[]), value, numReduceTasks);
}
}
Reducer的設計
根據Mapper傳來的輸出( (term, docid), tf),這裡需要進行的處理便是将具有相同term的鍵值對聚集在一起,并重組成( term , (docid1:tf1, docid2:tf2, …) )的輸出形式。
- 采用靜态變量strWord來記錄上一次reduce過程中的term ;
- 采用靜态變量map記錄靜态變量strWord對應的docid:tf對;
- 處理reduce過程時,首先将key分割出term以及fileName,
- 判斷term是否與strWord相等,
- 如果相等,首先累計額values,得到docid,tf對之後加入map;
- 否則将strWord,map輸出,并清空map,strWord指派為term,處理目前docid,tf,并加入map;
- 因為最後一次reduce過程不可能将它自己的資料輸出,是以需要重載cleanup函數在裡邊進行輸出
- 還有一點需要注意,String的相等判斷用“==”是不行的哦,如果用了“==”而不是“equals”,會出現什麼後果呢?
public static class InverseIndexReducer extends Reducer<Text, Text, Text, Text>{
static Map<String, Integer> map = new HashMap<String, Integer>();
static String strWord = null;
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens = key.toString().split("->");
if (strWord == null){
strWord = tokens[];
}
if (strWord.equals(tokens[])){
String strFileName = tokens[];
int freq = ;
for (Text value : values){
freq += Integer.parseInt(value.toString());
}
map.put(strFileName, freq);
} else {
String strNewValue = "";
double aveFreq = ;
for (Map.Entry<String, Integer> entry : map.entrySet()){
strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
aveFreq += (double)entry.getValue();
}
aveFreq /= (double)map.size();
Text newKey = new Text(strWord);
map.clear();
context.write(newKey, new Text(strNewValue));
context.write(newKey, new Text("" + aveFreq));
strWord = tokens[];
String strFileName = tokens[];
int freq = ;
for (Text value : values){
freq += Integer.parseInt(value.toString());
}
map.put(strFileName, freq);
}
}
@Override
protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String strNewValue = "";
double aveFreq = ;
for (Map.Entry<String, Integer> entry : map.entrySet()){
strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
aveFreq += (double)entry.getValue();
}
aveFreq /= (double)map.size();
Text newKey = new Text(strWord);
map.clear();
context.write(newKey, new Text(strNewValue));
context.write(newKey, new Text("" + aveFreq));
super.cleanup(context);
}
}
main函數
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = new Job(conf, "InverseIndex");
job.setJarByClass(InverseIndex.class);
job.setNumReduceTasks();
job.setMapperClass(InverseIndexMapper.class);
job.setCombinerClass(InverseIndexCombiner.class);
job.setPartitionerClass(InverseIndexPartitioner.class);
job.setReducerClass(InverseIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[]));
FileOutputFormat.setOutputPath(job, new Path(args[]));
job.waitForCompletion(true);
}