天天看點

MapReduce--反向索引

文檔倒排算法簡介

Inverted Index(反向索引)是目前幾乎所有支援全文檢索的搜尋引擎都要依賴的一個資料結構。基于索引結構,給出一個詞(term),能取得含有這個term的文檔清單(the list of documents)

Web Search中的問題主要分為三部分:

  1. crawling(gathering web content) ,網頁爬蟲,收集資料
  2. indexing(construction of the inverted index) ,根據大量資料建構反向索引結構
  3. retrieval(ranking documents given a query),根據一個搜尋單詞進行索引并對結果進行排序,比如可以根據詞頻多少來排

    crawling和indexing都是離線的,retrieval是線上、實時的。

    此處有個問題,索引結構會如何進行存儲呢?

    給定一個單詞,如何快速得到結果呢?

    一般可以采用兩種存儲方式,一種是hash連結清單,還有一種則是B(B+)樹。

    說起存儲我就想到我同學面試時被問到一個問題:a~z26個單詞如何存儲能快速索引?

    (⊙﹏⊙)b居然是26叉樹,好喪心病狂啊!

基本的反向索引結構

MapReduce--反向索引

實驗任務

請實作課堂上介紹的“帶詞頻屬性的文檔倒排算法”。

在統計詞語的反向索引時,除了要輸出帶詞頻屬性的反向索引,還請計算每個詞語的“平均

提及次數”(定義見下)并輸出。

“平均提及次數”在這裡定義為:

平均提及次數= 詞語在全部文檔中出現的頻數總和/ 包含該詞語的文檔數

假如文檔集中有四本小說:A、B、C、D。詞語“江湖”在文檔A 中出現了100 次,在文檔B

中出現了200 次,在文檔C 中出現了300 次,在文檔D 中沒有出現。則詞語“江湖”在該文

檔集中的“平均提及次數”為(100 + 200 + 300) / 3 = 200。

輸出格式

對于每個詞語,輸出兩個鍵值對,兩個鍵值對的格式如下:

[詞語] \TAB 詞語1:詞頻, 詞語2:詞頻, 詞語3:詞頻, …, 詞語100:詞頻

[詞語] \TAB 平均提及次數

下圖展示了輸出檔案的一個片段(圖中内容僅為格式示例):

MapReduce--反向索引

設計

反向索引可以看做是wordcount的拓展,它需要統計一個單詞在多個檔案中出現的次數,那麼它的Mapper和Reducer該如何設計呢?

很自然地我們會想到

Mapper:

  • 對于檔案file中任一word,
  • Key = word, Value = fileName + 1.

Reduer:

  • 對于輸入Key, Iterable(Text) Values,
  • 統計Values中每個Value,記錄出現的fileName以及頻數.

這裡有個問題,它需要假定對于一個相同的Key,Mapper給出的輸出

design trick: value-to-key conversion

Value到Key的轉換

比如說對于原來的(term, (docid, tf))可以将value中的docid放到key中進而得到

新的鍵值對((term, docid), tf)。

這樣具有相同key值的鍵值對數目就降低啦!

關于Mapper裡邊的代碼我遇到兩個問題:

1.用空格 ” “來對line做分詞,在最後的輸出結果裡邊會出現空白單詞,很奇怪,雖然我在輸出的時候加了token為” “或”\t”的時候都不輸出,但是最後結果裡邊還是有空白單詞,匪夷所思诶。

2.Mapper的輸出如果采用((term:docid), tf)的形式,使用“:”來分隔term和docid,那麼在Combiner裡邊如果我使用”:”來分隔key(也就是下邊錯誤的Mapper方式),那麼得到的String個數有時候長度居然<2,是以我現在使用”->”來進行分隔。

public static class InverseIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String line = value.toString();
            String[] strTokens = line.split(" ");
            FileSplit inputSplit = (FileSplit)context.getInputSplit();
            Path path = inputSplit.getPath();
            String pathStr = path.toString();
            int index = pathStr.lastIndexOf("/");
            String strFileName = pathStr.substring(index + ); 
            for (String token : strTokens){
                if (token != " " && token != "\t"){
                    context.write(new Text(token + "->" + strFileName), new Text("1"));
                }
            }
        }
    }
           

Combiner的使用

為了減少Mapper的輸出,進而降低Mapper到Reducer的傳輸開銷以及存儲開銷,使用Combiner是個好方法,相當于是在每個Mapper結束之後先進行一次Reducer将結果彙總一下。

這裡是将相同文檔的相同term的詞頻統計一下。

我看到過有這樣一種處理方法,它為了Reducer友善處理,是以将Mapper的輸出從<

public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String strKey = key.toString();
            String[] tokens = strKey.split("->");
            int freq = ;
            for (Text value : values){
                freq++;
            }
            context.write(new Text(tokens[]), new Text(tokens[] + "->" + freq));
        }
    }
           

正确的如下

public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int freq = ;
            for (Text value : values){
                freq++;
            }
            context.write(key, new Text("" + freq));
        }
    }
           

Partitioner的設計

因為value-to-key conversion,Mapper的輸出中key變為了(term, docid)。如果采用預設的Partitioner,那麼具有相同term,不同docid的項很可能會被劃分到不同的Reducer,這與初衷是違背的啊,是以需要自定義一個Partitioner,用key中的term作為劃分的依據!

這裡有個小問題,如果我采用Combiner中的錯誤方式,将Combiner的輸出重新變化為了(term, (docid, tf)),那麼是否還需要自定義Partitioner了呢?

答案是需要的,看來Partitioner的判斷依據不是Combiner的輸出啦!

public static class InverseIndexPartitioner extends HashPartitioner<Text, Text>{

        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) {
            // TODO Auto-generated method stub
            String strKey = key.toString();
            String[] tokens = strKey.split("->");
            return super.getPartition(new Text(tokens[]), value, numReduceTasks);
        }

    }
           

Reducer的設計

根據Mapper傳來的輸出( (term, docid), tf),這裡需要進行的處理便是将具有相同term的鍵值對聚集在一起,并重組成( term , (docid1:tf1, docid2:tf2, …) )的輸出形式。

  • 采用靜态變量strWord來記錄上一次reduce過程中的term ;
  • 采用靜态變量map記錄靜态變量strWord對應的docid:tf對;
  • 處理reduce過程時,首先将key分割出term以及fileName,
  • 判斷term是否與strWord相等,
  • 如果相等,首先累計額values,得到docid,tf對之後加入map;
  • 否則将strWord,map輸出,并清空map,strWord指派為term,處理目前docid,tf,并加入map;
  • 因為最後一次reduce過程不可能将它自己的資料輸出,是以需要重載cleanup函數在裡邊進行輸出
  • 還有一點需要注意,String的相等判斷用“==”是不行的哦,如果用了“==”而不是“equals”,會出現什麼後果呢?
public static class InverseIndexReducer extends Reducer<Text, Text, Text, Text>{

        static Map<String, Integer> map = new HashMap<String, Integer>();
        static String strWord = null;
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
                String[] tokens = key.toString().split("->");
                if (strWord == null){
                    strWord = tokens[];
                } 
                if (strWord.equals(tokens[])){
                    String strFileName = tokens[];
                    int freq = ;
                    for (Text value : values){
                        freq += Integer.parseInt(value.toString());
                    }
                    map.put(strFileName, freq);
                } else {
                    String strNewValue = "";
                    double aveFreq = ;
                    for (Map.Entry<String, Integer> entry : map.entrySet()){
                        strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
                        aveFreq += (double)entry.getValue();
                    }
                    aveFreq /= (double)map.size();
                    Text newKey = new Text(strWord);
                    map.clear();
                    context.write(newKey, new Text(strNewValue));
                    context.write(newKey, new Text("" + aveFreq));

                    strWord = tokens[];
                    String strFileName = tokens[];
                    int freq = ;
                    for (Text value : values){
                        freq += Integer.parseInt(value.toString());
                    }   
                    map.put(strFileName, freq);
                }

        }
        @Override
        protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

            String strNewValue = "";
            double aveFreq = ;
            for (Map.Entry<String, Integer> entry : map.entrySet()){
                strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
                aveFreq += (double)entry.getValue();
            }
            aveFreq /= (double)map.size();
            Text newKey = new Text(strWord);
            map.clear();
            context.write(newKey, new Text(strNewValue));
            context.write(newKey, new Text("" + aveFreq));

            super.cleanup(context);
        }


    }
           

main函數

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = new Job(conf, "InverseIndex");
        job.setJarByClass(InverseIndex.class);

        job.setNumReduceTasks();

        job.setMapperClass(InverseIndexMapper.class);
        job.setCombinerClass(InverseIndexCombiner.class);
        job.setPartitionerClass(InverseIndexPartitioner.class);
        job.setReducerClass(InverseIndexReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[]));
        FileOutputFormat.setOutputPath(job, new Path(args[]));
        job.waitForCompletion(true);
    }
           

運作結果

MapReduce--反向索引
MapReduce--反向索引

繼續閱讀