天天看點

elasticsearch merge 源碼

從2.x版本就開始lucene預設采用的合并政策就已經是TieredMergePolicy了。是以今天有時間了解并看了一下TieredMergePolicy.findMerges方法的實作。

// 正在mergin的segmentinfo
Collection<SegmentCommitInfo> merging = writer.getMergingSegments();
Collection<SegmentCommitInfo> toBeMerged = new HashSet<>();

// 把SegmentCommitInfo放到集合中并且對其繼續降序排序
List<SegmentCommitInfo> infosSorted = new ArrayList<>(infos.asList());
Collections.sort(infosSorted, new SegmentByteSizeDescending(writer));
           

接着把統計集合中的segments總大小(這個大小不包含删除),并且計算出最小的大小minSegmentBytes

long totIndexBytes = ;
    long minSegmentBytes = Long.MAX_VALUE;
    for(SegmentCommitInfo info : infosSorted) {
      final long segBytes = size(info, writer);
      // 代碼省略....
      minSegmentBytes = Math.min(segBytes, minSegmentBytes);
      // Accum total byte size
      totIndexBytes += segBytes;
    }
           

計算出集合中的總大小和最小的兩個值之後,把超過maxMergedSegmentBytes/2.0大小的segment給排除掉,并且減去相應的大小(infosSorted已經排好序,由大到小,循環完之後會記錄(tooBigCount )前多少個是超出大小範圍的)

int tooBigCount = ;
    while (tooBigCount < infosSorted.size()) {
      long segBytes = size(infosSorted.get(tooBigCount), writer);
      if (segBytes < maxMergedSegmentBytes/) {
        break;
      }
      totIndexBytes -= segBytes;
      tooBigCount++;
    }
    // 預設值為2mb
    minSegmentBytes = floorSize(minSegmentBytes);
           

計算最小的segment大小是否比floorSegmentBytes還小,如果小于則取floorSegmentBytes作為minSegmentBytes,這裡設定floorSize個人猜想是因為大部分的segment大小都是比較小的,可能還會有10+kb的,由于minSegmentBytes這個值在下面會參與計算allowedSegCountInt,如果minSegmentBytes太小會導緻相對長時間無法滿足merge數量要求。而導緻沒法進行merge。

long levelSize = minSegmentBytes;
    long bytesLeft = totIndexBytes;
    double allowedSegCount = ;
    while(true) {
      final double segCountLevel = bytesLeft / (double) levelSize;
      if (segCountLevel < segsPerTier) {
        allowedSegCount += Math.ceil(segCountLevel);
        break;
      }
      allowedSegCount += segsPerTier;
      bytesLeft -= segsPerTier * levelSize;
      levelSize *= maxMergeAtOnce;
    }
    int allowedSegCountInt = (int) allowedSegCount;
           

最重要的算法代碼,通過minSegmentBytes,segsPerTier,maxMergeAtOnce計算出allowedSegCountInt,這個值可以認為是在這次merge中應該需要有多少個segment才會進行merge。

在下面的while循環中尋找符合條件的SegmentCommitInfo,排除正在merging以及被目前上下文加入可merge的

for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
        final SegmentCommitInfo info = infosSorted.get(idx);
        if (merging.contains(info)) {
          mergingBytes += size(info, writer);
        } else if (!toBeMerged.contains(info)) {
          eligible.add(info);
        }
      }
           

注意tooBigCount變量,這個變量是上面的代碼排除大小大于maxMergedSegmentBytes/2.0得出的,這裡有個技巧是直接取的符合條件的SegmentCommitInfo

得到eligible符合條件的SegmentCommitInfo集合,但此時不一定會進行merge,還需要判斷eligible.size是否大于allowedSegCountInt

if (eligible.size() > allowedSegCountInt) {
....
for(int startIdx = 0;startIdx <= eligible.size()-maxMergeAtOnce; startIdx++) {
....
for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
           

通過這個雙循環得出allowedSegCountInt-1個組合,組合類似

[a1, b2, c3, d4, e5, f6, g7, h8, i9, j10]
[b2, c3, d4, e5, f6, g7, h8, i9, j10, k11]
[c3, d4, e5, f6, g7, h8, i9, j10, k11, l12]
[d4, e5, f6, g7, h8, i9, j10, k11, l12, m13]
[e5, f6, g7, h8, i9, j10, k11, l12, m13, n14]
[f6, g7, h8, i9, j10, k11, l12, m13, n14, o15]
           

在裡面循環中會判斷組合大小是否大于maxMergedSegmentBytes,如果大于則跳過這個segment,這個大小盡量接近maxMergedSegmentBytes

final List<SegmentCommitInfo> candidate = new ArrayList<>();
    boolean hitTooLarge = false;
    for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
        final SegmentCommitInfo info = eligible.get(idx);
        final long segBytes = size(info, writer);

        if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
              // 命中太大的,跳過
              hitTooLarge = true;
              continue;
         }
         candidate.add(info);
         totAfterMergeBytes += segBytes;
    }

final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer);
if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
            best = candidate;
            bestScore = score;
            bestTooLarge = hitTooLarge;
            bestMergeBytes = totAfterMergeBytes;
          }

    if (best != null) {
          if (spec == null) {
            spec = new MergeSpecification();
          }
          final OneMerge merge = new OneMerge(best);
          spec.add(merge);
          for(SegmentCommitInfo info : merge.segments) {
            toBeMerged.add(info);
          }
        }
           

計算目前這組的分值,判斷是否是第一個組合或者分數小于上一個組合的分數,最終加入到toBeMerged集合中,接着又來一次while循環執行以上操作。找到最優的組合之後(在TieredMergePolicy類注釋有說明更優),将MergeSpecification對象傳回。

總結:

index.merge.policy.floor_segment

index.merge.policy.max_merge_at_once

index.merge.policy.segments_per_tier

以上參數會影響計算allowedSegCountInt值,這個值會影響merge的頻率。

繼續閱讀