一、段合并過程總論
IndexWriter中與段合并有關的成員變量有:
- HashSet<SegmentInfo> mergingSegments = new HashSet<SegmentInfo>(); //儲存正在合并的段,以防止合并期間再次選中被合并。
- MergePolicy mergePolicy = new LogByteSizeMergePolicy(this);//合并政策,也即選取哪些段來進行合并。
- MergeScheduler mergeScheduler = new ConcurrentMergeScheduler();//段合并器,背後有一個線程負責合并。
- LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<MergePolicy.OneMerge>();//等待被合并的任務
- Set<MergePolicy.OneMerge> runningMerges = new HashSet<MergePolicy.OneMerge>();//正在被合并的任務
和段合并有關的一些參數有:
- mergeFactor:當大小幾乎相當的段的數量達到此值的時候,開始合并。
- minMergeSize:所有大小小于此值的段,都被認為是大小幾乎相當,一同參與合并。
- maxMergeSize:當一個段的大小大于此值的時候,就不再參與合并。
- maxMergeDocs:當一個段包含的文檔數大于此值的時候,就不再參與合并。
段合并一般發生在添加完一篇文檔的時候,當一篇文檔添加完後,發現記憶體已經達到使用者設定的ramBufferSize,則寫入檔案系統,形成一個新的段。新段的加入可能造成差不多大小的段的個數達到mergeFactor,進而開始了合并的過程。
合并過程最重要的是兩部分:
- 一個是選擇哪些段應該參與合并,這一步由MergePolicy來決定。
- 一個是将選擇出的段合并成新段的過程,這一步由MergeScheduler來執行。段的合并也主要包括:
- 對正向資訊的合并,如存儲域,詞向量,标準化因子等。
- 對反向資訊的合并,如詞典,倒排表。
在總論中,我們重點描述合并政策對段的選擇以及反向資訊的合并。
1.1、合并政策對段的選擇
在LogMergePolicy中,選擇可以合并的段的基本邏輯是這樣的:
- 選擇的可以合并的段都是在硬碟上的,不再存在記憶體中的段,也不是像早期的版本一樣每添加一個Document就生成一個段,然後進行記憶體中的段合并,然後再合并到硬碟中。
- 由于從記憶體中flush到硬碟上是按照設定的記憶體大小來DocumentsWriter.ramBufferSize觸發的,是以每個剛flush到硬碟上的段大小差不多,當然不排除中途改變記憶體設定,接下來的算法可以解決這個問題。
- 合并的過程是盡量按照合并幾乎相同大小的段這一原則,隻有大小相當的mergeFacetor個段出現的時候,才合并成一個新的段。
- 在硬碟上的段基本應該是大段在前,小段在後,因為大段總是由小段合并而成的,當小段湊夠mergeFactor個的時候,就合并成一個大段,小段就被删除了,然後新來的一定是新的小段。
- 比如mergeFactor=3,開始來的段大小為10M,當湊夠3個10M的時候,0.cfs, 1.cfs, 2.cfs則合并成一個新的段3.cfs,大小為30M,然後再來4.cfs, 5.cfs, 6.cfs,合并成7.cfs,大小為30M,然後再來8.cfs, 9.cfs, a.cfs合并成b.cfs, 大小為30M,這時候又湊夠了3個30M的,合并成90M的c.cfs,然後又來d.cfs, e.cfs, f.cfs合并成10.cfs,大小為30M,然後11.cfs大小為10M,這時候硬碟上的段為:c.cfs(90M) 10.cfs(30M),11.cfs(10M)。
是以LogMergePolicy對合并段的選擇過程如下:
- 将所有的段按照生成的順序,将段的大小以mergeFactor為底取對數,放入數組中,作為選擇的标準。
- 從頭開始,選擇一個值最大的段,然後将此段的值減去0.75(LEVEL_LOG_SPAN) ,之間的段被認為是大小差不多的段,屬于同一階梯,此處稱為第一階梯。
- 然後從後向前尋找第一個屬于第一階梯的段,從start到此段之間的段都被認為是屬于這一階梯的。也包括之間生成較早但大小較小的段,因為考慮到以下幾點:
- 防止較早生成的段由于人工flush或者人工調整ramBufferSize,因而很小,卻破壞了基本從大到小的規則。
- 如果運作較長時間後,緻使段的大小參差不齊,很難合并相同大小的段。
- 也防止一個段由于較小,而不斷的都有大的段生成進而始終不能參與合并。
- 第一階梯總共4個段,小于mergeFactor因而不合并,接着start=end進而選擇下一階梯。
- 從start開始,選擇一個值最大的段,然後将此段的值減去0.75(LEVEL_LOG_SPAN) ,之間的段被認為屬于同一階梯,此處稱為第二階梯。
- 然後從後向前尋找第一個屬于第二階梯的段,從start到此段之間的段都被認為是屬于這一階梯的。
- 第二階梯總共4個段,小于mergeFactor因而不合并,接着start=end進而選擇下一階梯。
- 從start開始,選擇一個值最大的段,然後将此段的值減去0.75(LEVEL_LOG_SPAN) ,之間的段被認為屬于同一階梯,此處稱為第三階梯。
- 由于最大的段減去0.75後為負的,因而從start到此段之間的段都被認為是屬于這一階梯的。
- 第三階梯總共5個段,等于mergeFactor,因而進行合并。
- 第三階梯的五個段合并成一個較大的段。
- 然後從頭開始,依然先考察第一階梯,仍然是4個段,不合并。
- 然後是第二階梯,因為有了新生成的段,并且大小足夠屬于第二階梯,進而第二階梯有5個段,可以合并。
- 第二階段的五個段合并成一個較大的段。
- 然後從頭開始,考察第一階梯,因為有了新生成的段,并且大小足夠屬于第一階梯,進而第一階梯有5個段,可以合并。
- 第一階梯的五個段合并成一個大的段。
1.2、反向資訊的合并
反向資訊的合并包括兩部分:
- 對字典的合并,詞典中的Term是按照字典順序排序的,需要對詞典中的Term進行重新排序
- 對于相同的Term,對包含此Term的文檔号清單進行合并,需要對文檔号重新編号。
對詞典的合并需要找出兩個段中相同的詞,Lucene是通過一個稱為match的SegmentMergeInfo類型的數組以及稱為queue的 SegmentMergeQueue實作的,SegmentMergeQueue是繼承于 PriorityQueue<SegmentMergeInfo>,是一個優先級隊列,是按照字典順序排序的。 SegmentMergeInfo儲存要合并的段的詞典及倒排表資訊,在SegmentMergeQueue中用來排序的key是它代表的段中的第一個 Term。
我們來舉一個例子來說明合并詞典的過程,以便後面解析代碼的時候能夠很好的了解:
- 假設要合并五個段,每個段包含的Term也是按照字典順序排序的,如下圖所示。
- 首先把五個段全部放入優先級隊列中,段在其中也是按照第一個Term的字典順序排序的,如下圖。
- 從優先級隊列中彈出第一個Term("a")相同的段到match數組中,如下圖。
- 合并這些段的第一個Term("a")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 将match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
- 從優先級隊列中彈出第一個Term("b")相同的段到match數組中。
- 合并這些段的第一個Term("b")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 将match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
- 從優先級隊列中彈出第一個Term("c")相同的段到match數組中。
- 合并這些段的第一個Term("c")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 将match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
- 從優先級隊列中彈出第一個Term("d")相同的段到match數組中。
- 合并這些段的第一個Term("d")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 将match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
- 從優先級隊列中彈出第一個Term("e")相同的段到match數組中。
- 合并這些段的第一個Term("e")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 将match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
- 從優先級隊列中彈出第一個Term("f")相同的段到match數組中。
- 合并這些段的第一個Term("f")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 合并完畢。
二、段合并的詳細過程
2.1、将緩存寫入新的段
IndexWriter在添加文檔的時候調用函數addDocument(Document doc, Analyzer analyzer),包含如下步驟:
- doFlush = docWriter.addDocument(doc, analyzer);//DocumentsWriter添加文檔,最後傳回是否進行向硬碟寫入
- return state.doFlushAfter || timeToFlushDeletes();//這取決于timeToFlushDeletes
timeToFlushDeletes傳回return (bufferIsFull || deletesFull()) && setFlushPending(),而在Lucene索引過程分析(2)的DocumentsWriter的緩存管理部分提到,當numBytesUsed+deletesRAMUsed > ramBufferSize的時候bufferIsFull設為true,也即當使用的記憶體大于ramBufferSize的時候,則由記憶體向硬碟寫入。ramBufferSize可以用IndexWriter.setRAMBufferSizeMB(double mb)設定。
- if (doFlush) flush(true, false, false);//如果記憶體中緩存滿了,則寫入硬碟
- if (doFlush(flushDocStores, flushDeletes) && triggerMerge) maybeMerge();//doFlush将緩存寫入硬碟,此過程在Lucene索引過程分析(4)中關閉IndexWriter一節已經描述。
當緩存寫入硬碟,形成了新的段後,就有可能觸發一次段合并,是以調用maybeMerge()
IndexWriter.maybeMerge() --> maybeMerge(false); --> maybeMerge(1, optimize); --> updatePendingMerges(maxNumSegmentsOptimize, optimize); --> mergeScheduler.merge(this); |
IndexWriter.updatePendingMerges(int maxNumSegmentsOptimize, boolean optimize)主要負責找到可以合并的段,并生産段合并任務對象,并向段合并器注冊這個任務。
ConcurrentMergeScheduler.merge(IndexWriter)主要負責進行段的合并。
2.2、選擇合并段,生成合并任務
IndexWriter.updatePendingMerges(int maxNumSegmentsOptimize, boolean optimize)主要包括兩部分:
- 選擇能夠合并段:MergePolicy.MergeSpecification spec = mergePolicy.findMerges(segmentInfos);
- 向段合并器注冊合并任務,将任務加到pendingMerges中:
- for(int i=0;i<spec.merges.size();i++)
- registerMerge(spec.merges.get(i));
- for(int i=0;i<spec.merges.size();i++)
2.2.1、用合并政策選擇合并段
預設的段合并政策是LogByteSizeMergePolicy,其選擇合并段由LogMergePolicy.findMerges(SegmentInfos infos) 完成,包含以下過程:
(1) 生成levels數組,每個段一項。然後根據每個段的大小,計算每個項的值,levels[i]和段的大小的關系為Math.log(size)/Math.log(mergeFactor),代碼如下:
final int numSegments = infos.size(); float[] levels = new float[numSegments]; final float norm = (float) Math.log(mergeFactor); for(int i=0;i<numSegments;i++) { final SegmentInfo info = infos.info(i); long size = size(info); levels[i] = (float) Math.log(size)/norm; } |
(2) 由于段基本是按照由大到小排列的,而且合并段應該大小差不多的段中進行。我們把大小差不多的段稱為屬于同一階梯,因而此處從第一個段開始找屬于相同階梯的段,如果屬于此階梯的段數量達到mergeFactor個,則生成合并任務,否則繼續向後尋找下一階梯。
//計算最低階梯值,所有小于此值的都屬于最低階梯 final float levelFloor = (float) (Math.log(minMergeSize)/norm); MergeSpecification spec = null; int start = 0; while(start < numSegments) { //找到levels數組的最大值,也即目前階梯中的峰值 float maxLevel = levels[start]; for(int i=1+start;i<numSegments;i++) { final float level = levels[i]; if (level > maxLevel) maxLevel = level; } //計算出此階梯的谷值,也即最大值減去0.75,之間的都屬于此階梯。如果峰值小于最低階梯值,則所有此階梯的段都屬于最低階梯。如果峰值大于最低階梯值,谷值小于最低階梯值,則設定谷值為最低階梯值,以保證所有小于最低階梯值的段都屬于最低階梯。 float levelBottom; if (maxLevel < levelFloor) levelBottom = -1.0F; else { levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN); if (levelBottom < levelFloor && maxLevel >= levelFloor) levelBottom = levelFloor; } float levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN); //從最後一個段向左找,當然段越來越大,找到第一個大于此階梯的谷值的段,從start的段開始,一直到upto這個段,都屬于此階梯了。盡管upto 左面也有的段由于記憶體設定原因,雖形成較早,但是沒有足夠大,也作為可合并的一員考慮在内了,将被并入一個大的段,進而保證了基本上左大右小的關系。從 upto這個段向右都是比此階梯小的多的段,應該屬于下一階梯。 int upto = numSegments-1; while(upto >= start) { if (levels[upto] >= levelBottom) { break; } upto--; } //從start段開始,數mergeFactor個段,如果不超過upto段,說明此階梯已經足夠mergeFactor個了,可以合 并了。當然如果此階梯包含太多要合并的段,也是每mergeFactor個段進行一次合并,然後再依次數mergeFactor段進行合并,直到此階梯的 段合并完畢。 int end = start + mergeFactor; while(end <= 1+upto) { boolean anyTooLarge = false; for(int i=start;i<end;i++) { final SegmentInfo info = infos.info(i); //如果一個段的大小超過maxMergeSize或者一個段包含的文檔數量超過maxMergeDocs則不再合并。 anyTooLarge |= (size(info) >= maxMergeSize || sizeDocs(info) >= maxMergeDocs); } if (!anyTooLarge) { if (spec == null) spec = new MergeSpecification(); //如果确認要合并,則從start到end生成一個段合并任務OneMerge. spec.add(new OneMerge(infos.range(start, end), useCompoundFile)); } //剛剛合并的是從start到end共mergeFactor和段,此階梯還有更多的段,則再依次數mergeFactor個段。 start = end; end = start + mergeFactor; } //從start到upto是此階梯的所有的段,已經選擇完畢,下面選擇更小的下一個階梯的段 start = 1+upto; } |
選擇的結果儲存在MergeSpecification中,結構如下:
spec MergePolicy$MergeSpecification (id=25) merges ArrayList<E> (id=28) elementData Object[10] (id=39) [0] MergePolicy$OneMerge (id=42) aborted false error null increfDone false info null isExternal false maxNumSegmentsOptimize 0 mergeDocStores false mergeGen 0 optimize false readers null readersClone null registerDone false segments SegmentInfos (id=50) capacityIncrement 0 counter 0 elementCount 3 elementData Object[10] (id=54) [0] SegmentInfo (id=62) delCount 0 delGen -1 diagnostics HashMap<K,V> (id=67) dir SimpleFSDirectory (id=69) docCount 1062 docStoreIsCompoundFile false docStoreOffset 0 docStoreSegment "_0" files ArrayList<E> (id=73) hasProx true hasSingleNormFile true isCompoundFile 1 name "_0" normGen null preLockless false sizeInBytes 15336467 [1] SegmentInfo (id=64) delCount 0 delGen -1 diagnostics HashMap<K,V> (id=79) dir SimpleFSDirectory (id=69) docCount 1068 docStoreIsCompoundFile false docStoreOffset 1062 docStoreSegment "_0" files ArrayList<E> (id=80) hasProx true hasSingleNormFile true isCompoundFile 1 name "_1" normGen null preLockless false sizeInBytes 15420953 [2] SegmentInfo (id=65) delCount 0 delGen -1 diagnostics HashMap<K,V> (id=86) dir SimpleFSDirectory (id=69) docCount 1068 docStoreIsCompoundFile false docStoreOffset 2130 docStoreSegment "_0" files ArrayList<E> (id=88) hasProx true hasSingleNormFile true isCompoundFile 1 name "_2" normGen null preLockless false sizeInBytes 15420953 generation 0 lastGeneration 0 modCount 1 pendingSegnOutput null userData Collections$EmptyMap (id=57) version 1267460515437 useCompoundFile true modCount 1 size 1 |
2.2.2、注冊段合并任務
注冊段合并任務由IndexWriter.registerMerge(MergePolicy.OneMerge merge)完成:
(1) 如果選擇出的段正在被合并,或者不存在,則退出。
final int count = merge.segments.size(); boolean isExternal = false; for(int i=0;i<count;i++) { final SegmentInfo info = merge.segments.info(i); if (mergingSegments.contains(info)) return false; if (segmentInfos.indexOf(info) == -1) return false; if (info.dir != directory) isExternal = true; } |
(2) 将合并任務加入pendingMerges:pendingMerges.add(merge);
(3) 将要合并的段放入mergingSegments以防正在合并又被選為合并段。
for(int i=0;i<count;i++) mergingSegments.add(merge.segments.info(i)); |
2.3、段合并器進行段合并
段合并器預設為ConcurrentMergeScheduler,段的合并工作由ConcurrentMergeScheduler.merge(IndexWriter) 完成,它包含while(true)的循環,在循環中不斷做以下事情:
- 得到下一個合并任務:MergePolicy.OneMerge merge = writer.getNextMerge();
- 初始化合并任務:writer.mergeInit(merge);
- 将删除文檔寫入硬碟:applyDeletes();
- 是否合并存儲域:mergeDocStores = false。按照Lucene的索引檔案格式(2)中段的中繼資料資訊(segments_N)中提到 的,IndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes)中第二個參數flushDocStores會影響到是否單獨或是共享存儲。其實最終影響的是 DocumentsWriter.closeDocStore()。每當flushDocStores為false時,closeDocStore不被調 用,說明下次添加到索引檔案中的域和詞向量資訊是同此次共享一個段的。直到flushDocStores為true的時候,closeDocStore被 調用,進而下次添加到索引檔案中的域和詞向量資訊将被儲存在一個新的段中,不同此次共享一個段。如2.1節中說的那樣,在addDocument中,如果 記憶體中緩存滿了,則寫入硬碟,調用的是flush(true, false, false),也即所有的存儲域都存儲在共享的域中(_0.fdt),因而不需要合并存儲域。
- 生成新的段:merge.info = new SegmentInfo(newSegmentName(),…)
- 将新的段加入mergingSegments
- 如果已經有足夠多的段合并線程,則等待while (mergeThreadCount() >= maxThreadCount) wait();
- 生成新的段合并線程:
- merger = getMergeThread(writer, merge);
- mergeThreads.add(merger);
- 啟動段合并線程:merger.start();
段合并線程的類型為MergeThread,MergeThread.run()包含while(truy)循環,在循環中做以下事情:
- 合并目前的任務:doMerge(merge);
- 得到下一個段合并任務:merge = writer.getNextMerge();
ConcurrentMergeScheduler.doMerge(OneMerge) 最終調用IndexWriter.merge(OneMerge) ,主要做以下事情:
- 初始化合并任務:mergeInit(merge);
- 進行合并:mergeMiddle(merge);
- 完成合并任務:mergeFinish(merge);
- 從mergingSegments中移除被合并的段和合并新生成的段:
- for(int i=0;i<end;i++) mergingSegments.remove(sourceSegments.info(i));
- mergingSegments.remove(merge.info);
- 從runningMerges中移除此合并任務:runningMerges.remove(merge);
- 從mergingSegments中移除被合并的段和合并新生成的段:
IndexWriter.mergeMiddle(OneMerge)主要做以下幾件事情:
- 生成用于合并段的對象SegmentMerger merger = new SegmentMerger(this, mergedName, merge);
- 打開Reader指向要合并的段:
merge.readers = new SegmentReader[numSegments]; merge.readersClone = new SegmentReader[numSegments]; for (int i = 0; i < numSegments; i++) { final SegmentInfo info = sourceSegments.info(i); // Hold onto the "live" reader; we will use this to // commit merged deletes SegmentReader reader = merge.readers[i] = readerPool.get(info, merge.mergeDocStores,MERGE_READ_BUFFER_SIZE,-1); // We clone the segment readers because other // deletes may come in while we're merging so we // need readers that will not change SegmentReader clone = merge.readersClone[i] = (SegmentReader) reader.clone(true); merger.add(clone); } |
- 進行段合并:mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);
- 合并生成的段生成為cfs:merger.createCompoundFile(compoundFileName);
SegmentMerger.merge(boolean) 包含以下幾部分:
- 合并域:mergeFields()
- 合并詞典和倒排表:mergeTerms();
- 合并标準化因子:mergeNorms();
- 合并詞向量:mergeVectors();
下面依次分析者幾部分。
2.3.1、合并存儲域
合并存儲域主要包含兩部分:一部分是合并fnm資訊,也即域中繼資料資訊,一部分是合并fdt,fdx資訊,也即域資料資訊。
(1) 合并fnm資訊
- 首先生成新的域中繼資料資訊:fieldInfos = new FieldInfos();
- 依次用reader讀取每個合并段的域中繼資料資訊,加入上述對象
for (IndexReader reader : readers) { SegmentReader segmentReader = (SegmentReader) reader; FieldInfos readerFieldInfos = segmentReader.fieldInfos(); int numReaderFieldInfos = readerFieldInfos.size(); for (int j = 0; j < numReaderFieldInfos; j++) { FieldInfo fi = readerFieldInfos.fieldInfo(j); //在通常情況下,所有的段中的文檔都包含相同的域,比如添加文檔的時候,每篇文檔都包 含"title","description","author","time"等,不會為某一篇文檔添加或減少與其他文檔不同的域。但也不排除特殊情況 下有特殊的文檔有特殊的域。因而此處的add是無則添加,有則更新。 fieldInfos.add(fi.name, fi.isIndexed, fi.storeTermVector, fi.storePositionWithTermVector, fi.storeOffsetWithTermVector, !reader.hasNorms(fi.name), fi.storePayloads, fi.omitTermFreqAndPositions); } } |
- 将域中繼資料資訊fnm寫入檔案:fieldInfos.write(directory, segment + ".fnm");
(2) 合并段資料資訊fdt, fdx
在合并段的資料資訊的時候,有兩種情況:
- 情況一:通常情況,要合并的段和新生成段包含的域的名稱,順序都是一樣的,這樣就可以把要合并的段的fdt資訊直接拷貝到新生成段的最後,以提高合并效率。
- 情況二:要合并的段包含特殊的文檔,其包含的域多于或者少于新生成段的域,這樣就不能夠直接拷貝,而是一篇文檔一篇文檔的添加。這樣合并效率大大降低,因而不鼓勵添加文檔的時候,不同的文檔使用不同的域。
具體過程如下:
- 首先檢查要合并的各個段,其包含域的名稱,順序是否同新生成段的一緻,也即是否屬于第一種情況:setMatchingSegmentReaders();
private void setMatchingSegmentReaders() { int numReaders = readers.size(); matchingSegmentReaders = new SegmentReader[numReaders]; //周遊所有的要合并的段 for (int i = 0; i < numReaders; i++) { IndexReader reader = readers.get(i); if (reader instanceof SegmentReader) { SegmentReader segmentReader = (SegmentReader) reader; boolean same = true; FieldInfos segmentFieldInfos = segmentReader.fieldInfos(); int numFieldInfos = segmentFieldInfos.size(); //依次比較要合并的段和新生成的段的段名,順序是否一緻。 for (int j = 0; same && j < numFieldInfos; j++) { same = fieldInfos.fieldName(j).equals(segmentFieldInfos.fieldName(j)); } //最後生成matchingSegmentReaders數組,如果此數組的第i項不是null,則說明第i個段同新生成的段名稱,順序完全一緻,可以采取情況一得方式。如果此數組的第i項是null,則說明第i個段包含特殊的域,則采取情況二的方式。 if (same) { matchingSegmentReaders[i] = segmentReader; } } } } |
- 生成存儲域的寫對象:FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos);
- 依次周遊所有的要合并的段,按照上述兩種情況,使用不同政策進行合并
int idx = 0; for (IndexReader reader : readers) { final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++]; FieldsReader matchingFieldsReader = null; //如果matchingSegmentReader!=null,表示此段屬于情況一,得到matchingFieldsReader if (matchingSegmentReader != null) { final FieldsReader fieldsReader = matchingSegmentReader.getFieldsReader(); if (fieldsReader != null && fieldsReader.canReadRawDocs()) { matchingFieldsReader = fieldsReader; } } //根據此段是否包含删除的文檔采取不同的政策 if (reader.hasDeletions()) { docCount += copyFieldsWithDeletions(fieldsWriter, reader, matchingFieldsReader); } else { docCount += copyFieldsNoDeletions(fieldsWriter,reader, matchingFieldsReader); } } |
- 合并包含删除文檔的段
private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final IndexReader reader, final FieldsReader matchingFieldsReader) throws IOException, MergeAbortedException, CorruptIndexException { int docCount = 0; final int maxDoc = reader.maxDoc(); //matchingFieldsReader!=null,說明此段屬于情況一, 則可以直接拷貝。 if (matchingFieldsReader != null) { for (int j = 0; j < maxDoc;) { if (reader.isDeleted(j)) { // 如果文檔被删除,則跳過此文檔。 ++j; continue; } int start = j, numDocs = 0; do { j++; numDocs++; if (j >= maxDoc) break; if (reader.isDeleted(j)) { j++; break; } } while(numDocs < MAX_RAW_MERGE_DOCS); //從要合并的段中從第start篇文檔開始,依次讀取numDocs篇文檔的文檔長度到rawDocLengths中。 IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs); //用fieldsStream.copyBytes(…)直接将fdt資訊從要合并的段拷貝到新生成的段,然後将上面讀出的rawDocLengths轉換成為每篇文檔在fdt中的偏移量,寫入fdx檔案。 fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs); docCount += numDocs; checkAbort.work(300 * numDocs); } } else { //matchingFieldsReader==null,說明此段屬于情況二,必須每篇文檔依次添加。 for (int j = 0; j < maxDoc; j++) { if (reader.isDeleted(j)) { // 如果文檔被删除,則跳過此文檔。 continue; } //同addDocument的過程中一樣,重新将文檔添加一遍。 Document doc = reader.document(j); fieldsWriter.addDocument(doc); docCount++; checkAbort.work(300); } } return docCount; } |
- 合并不包含删除文檔的段:除了跳過删除的文檔的部分,同上述過程一樣。
- 關閉存儲域的寫對象:fieldsWriter.close();
2.3.2、合并标準化因子
合并标準化因子的過程比較簡單,基本就是對每一個域,用指向合并段的reader讀出标準化因子,然後再寫入新生成的段。
private void mergeNorms() throws IOException { byte[] normBuffer = null; IndexOutput output = null; try { int numFieldInfos = fieldInfos.size(); //對于每一個域 for (int i = 0; i < numFieldInfos; i++) { FieldInfo fi = fieldInfos.fieldInfo(i); if (fi.isIndexed && !fi.omitNorms) { if (output == null) { //指向新生成的段的nrm檔案的寫入流 output = directory.createOutput(segment + "." + IndexFileNames.NORMS_EXTENSION); //寫nrm檔案頭 output.writeBytes(NORMS_HEADER,NORMS_HEADER.length); } //對于每一個合并段的reader for ( IndexReader reader : readers) { int maxDoc = reader.maxDoc(); if (normBuffer == null || normBuffer.length < maxDoc) { // the buffer is too small for the current segment normBuffer = new byte[maxDoc]; } //讀出此段的nrm資訊。 reader.norms(fi.name, normBuffer, 0); if (!reader.hasDeletions()) { //如果沒有文檔被删除則寫入新生成的段。 output.writeBytes(normBuffer, maxDoc); } else { //如果有文檔删除則跳過删除的文檔寫入新生成的段。 for (int k = 0; k < maxDoc; k++) { if (!reader.isDeleted(k)) { output.writeByte(normBuffer[k]); } } } checkAbort.work(maxDoc); } } } } finally { if (output != null) { output.close(); } } } |
2.3.3、合并詞向量
合并詞向量的過程同合并存儲域的過程非常相似,也包括兩種情況:
- 情況一:通常情況,要合并的段和新生成段包含的域的名稱,順序都是一樣的,這樣就可以把要合并的段的詞向量資訊直接拷貝到新生成段的最後,以提高合并效率。
- 情況二:要合并的段包含特殊的文檔,其包含的域多于或者少于新生成段的域,這樣就不能夠直接拷貝,而是一篇文檔一篇文檔的添加。這樣合并效率大大降低,因而不鼓勵添加文檔的時候,不同的文檔使用不同的域。
具體過程如下:
- 生成詞向量的寫對象:TermVectorsWriter termVectorsWriter = new TermVectorsWriter(directory, segment, fieldInfos);
- 依次周遊所有的要合并的段,按照上述兩種情況,使用不同政策進行合并
int idx = 0; for (final IndexReader reader : readers) { final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++]; TermVectorsReader matchingVectorsReader = null; //如果matchingSegmentReader!=null,表示此段屬于情況一,得到matchingFieldsReader if (matchingSegmentReader != null) { TermVectorsReader vectorsReader = matchingSegmentReader.getTermVectorsReaderOrig(); if (vectorsReader != null && vectorsReader.canReadRawDocs()) { matchingVectorsReader = vectorsReader; } } //根據此段是否包含删除的文檔采取不同的政策 if (reader.hasDeletions()) { copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader); } else { copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader); } } |
- 合并包含删除文檔的段
private void copyVectorsWithDeletions(final TermVectorsWriter termVectorsWriter, final TermVectorsReader matchingVectorsReader, final IndexReader reader) throws IOException, MergeAbortedException { final int maxDoc = reader.maxDoc(); //matchingFieldsReader!=null,說明此段屬于情況一, 則可以直接拷貝。 if (matchingVectorsReader != null) { for (int docNum = 0; docNum < maxDoc;) { if (reader.isDeleted(docNum)) { // 如果文檔被删除,則跳過此文檔。 ++docNum; continue; } int start = docNum, numDocs = 0; do { docNum++; numDocs++; if (docNum >= maxDoc) break; if (reader.isDeleted(docNum)) { docNum++; break; } } while(numDocs < MAX_RAW_MERGE_DOCS); //從要合并的段中從第start篇文檔開始,依次讀取numDocs篇文檔的tvd到rawDocLengths中,tvf到rawDocLengths2。 matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, start, numDocs); //用tvd.copyBytes(…)直接将tvd資訊從要合并的段拷貝到新生成的段,然後将上面讀出的rawDocLengths轉 換成為每篇文檔在tvd檔案中的偏移量,寫入tvx檔案。用tvf.copyBytes(…)直接将tvf資訊從要合并的段拷貝到新生成的段,然後将上面 讀出的rawDocLengths2轉換成為每篇文檔在tvf檔案中的偏移量,寫入tvx檔案。 termVectorsWriter.addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs); checkAbort.work(300 * numDocs); } } else { //matchingFieldsReader==null,說明此段屬于情況二,必須每篇文檔依次添加。 for (int docNum = 0; docNum < maxDoc; docNum++) { if (reader.isDeleted(docNum)) { // 如果文檔被删除,則跳過此文檔。 continue; } //同addDocument的過程中一樣,重新将文檔添加一遍。 TermFreqVector[] vectors = reader.getTermFreqVectors(docNum); termVectorsWriter.addAllDocVectors(vectors); checkAbort.work(300); } } } |
- 合并不包含删除文檔的段:除了跳過删除的文檔的部分,同上述過程一樣。
- 關閉詞向量的寫對象:termVectorsWriter.close();
2.3.4、合并詞典和倒排表
以上都是合并正向資訊,相對過程比較清晰。而合并詞典和倒排表就不這麼簡單了,因為在詞典中,Lucene要求按照字典順序排序,在倒排表中,文檔号要按照從小到大順序排序排序,在每個段中,文檔号都是從零開始編号的。
是以反向資訊的合并包括兩部分:
- 對字典的合并,需要對詞典中的Term進行重新排序
- 對于相同的Term,對包含此Term的文檔号清單進行合并,需要對文檔号重新編号。
後者相對簡單,假設如果第一個段的編号是0~N,第二個段的編号是0~M,當兩個段合并成一個段的時候,第一個段的編号依然是0~N,第二個段的編号變成N~N+M就可以了,也即增加一個偏移量(前一個段的文檔個數)。
對詞典的合并需要找出兩個段中相同的詞,Lucene是通過一個稱為match的SegmentMergeInfo類型的數組以及稱為queue的 SegmentMergeQueue實作的,SegmentMergeQueue是繼承于 PriorityQueue<SegmentMergeInfo>,是一個優先級隊列,是按照字典順序排序的。 SegmentMergeInfo儲存要合并的段的詞典及倒排表資訊,在SegmentMergeQueue中用來排序的key是它代表的段中的第一個 Term。
在總論部分,舉了一個例子表明詞典和倒排表合并的過程。
下面讓我們深入代碼看一看具體的實作:
(1) 生成優先級隊列,并将所有的段都加入優先級隊列。
//在Lucene索引過程分析(4)中提到過,FormatPostingsFieldsConsumer 是用來寫入倒排表資訊的。 //FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域資訊,其傳回FormatPostingsTermsConsumer用于添加詞資訊。 //FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加詞資訊,其傳回FormatPostingsDocsConsumer用于添加freq資訊 //FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq資訊,其傳回FormatPostingsPositionsConsumer用于添加prox資訊 //FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox資訊 FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); //優先級隊列 queue = new SegmentMergeQueue(readers.size()); //對于每一個段 final int readerCount = readers.size(); for (int i = 0; i < readerCount; i++) { IndexReader reader = readers.get(i); TermEnum termEnum = reader.terms(); //生成SegmentMergeInfo對象,termEnum就是此段的詞典及倒排表。 SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); //base就是下一個段的文檔号偏移量,等于此段的文檔數目。 base += reader.numDocs(); if (smi.next()) //得到段的第一個Term queue.add(smi); //将此段放入優先級隊列。 else smi.close(); } |
(2) 生成match數組
SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()];
(3) 合并詞典
//如果隊列不為空,則合并尚未結束 while (queue.size() > 0) { int matchSize = 0; //取出優先級隊列的第一個段,放到match數組中 match[matchSize++] = queue.pop(); Term term = match[0].term; SegmentMergeInfo top = queue.top(); //如果優先級隊列的最頂端和已經彈出的match中的段的第一個Term相同,則全部彈出。 while (top != null && term.compareTo(top.term) == 0) { match[matchSize++] = queue.pop(); top = queue.top(); } if (currentField != term.field) { currentField = term.field; if (termsConsumer != null) termsConsumer.finish(); final FieldInfo fieldInfo = fieldInfos.fieldInfo(currentField); //FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域資訊,其傳回FormatPostingsTermsConsumer用于添加詞資訊。 termsConsumer = consumer.addField(fieldInfo); omitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions; } //合并match數組中的所有的段的第一個Term的倒排表資訊,并寫入新生成的段。 int df = appendPostings(termsConsumer, match, matchSize); checkAbort.work(df/3.0); while (matchSize > 0) { SegmentMergeInfo smi = match[—matchSize]; //如果match中的段還有下一個Term,則放回優先級隊列,進行下一輪的循環。 if (smi.next()) queue.add(smi); else smi.close(); } } |
(4) 合并倒排表
private final int appendPostings(final FormatPostingsTermsConsumer termsConsumer, SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { //FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加詞資訊,其傳回FormatPostingsDocsConsumer用于添加freq資訊 //将match數組中段的第一個Term添加到新生成的段中。 final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(smis[0].term.text); int df = 0; for (int i = 0; i < n; i++) { SegmentMergeInfo smi = smis[i]; //得到要合并的段的位置資訊(prox) TermPositions postings = smi.getPositions(); //此段的文檔号偏移量 int base = smi.base; //在要合并的段中找到Term的倒排表位置。 postings.seek(smi.termEnum); //不斷得到下一篇文檔号 while (postings.next()) { df++; int doc = postings.doc(); //文檔号都要加上偏移量 doc += base; //得到詞頻資訊(frq) final int freq = postings.freq(); //FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq資訊,其傳回FormatPostingsPositionsConsumer用于添加prox資訊 final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(doc, freq); //如果位置資訊需要儲存 if (!omitTermFreqAndPositions) { for (int j = 0; j < freq; j++) { //得到位置資訊(prox)以及payload資訊 final int position = postings.nextPosition(); final int payloadLength = postings.getPayloadLength(); if (payloadLength > 0) { if (payloadBuffer == null || payloadBuffer.length < payloadLength) payloadBuffer = new byte[payloadLength]; postings.getPayload(payloadBuffer, 0); } //FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox資訊 posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); } posConsumer.finish(); } } } docConsumer.finish(); return df; } |