天天看点

elasticsearch merge 源码

从2.x版本就开始lucene默认采用的合并策略就已经是TieredMergePolicy了。所以今天有时间了解并看了一下TieredMergePolicy.findMerges方法的实现。

// 正在mergin的segmentinfo
Collection<SegmentCommitInfo> merging = writer.getMergingSegments();
Collection<SegmentCommitInfo> toBeMerged = new HashSet<>();

// 把SegmentCommitInfo放到集合中并且对其继续降序排序
List<SegmentCommitInfo> infosSorted = new ArrayList<>(infos.asList());
Collections.sort(infosSorted, new SegmentByteSizeDescending(writer));
           

接着把统计集合中的segments总大小(这个大小不包含删除),并且计算出最小的大小minSegmentBytes

long totIndexBytes = ;
    long minSegmentBytes = Long.MAX_VALUE;
    for(SegmentCommitInfo info : infosSorted) {
      final long segBytes = size(info, writer);
      // 代码省略....
      minSegmentBytes = Math.min(segBytes, minSegmentBytes);
      // Accum total byte size
      totIndexBytes += segBytes;
    }
           

计算出集合中的总大小和最小的两个值之后,把超过maxMergedSegmentBytes/2.0大小的segment给排除掉,并且减去相应的大小(infosSorted已经排好序,由大到小,循环完之后会记录(tooBigCount )前多少个是超出大小范围的)

int tooBigCount = ;
    while (tooBigCount < infosSorted.size()) {
      long segBytes = size(infosSorted.get(tooBigCount), writer);
      if (segBytes < maxMergedSegmentBytes/) {
        break;
      }
      totIndexBytes -= segBytes;
      tooBigCount++;
    }
    // 默认值为2mb
    minSegmentBytes = floorSize(minSegmentBytes);
           

计算最小的segment大小是否比floorSegmentBytes还小,如果小于则取floorSegmentBytes作为minSegmentBytes,这里设置floorSize个人猜想是因为大部分的segment大小都是比较小的,可能还会有10+kb的,由于minSegmentBytes这个值在下面会参与计算allowedSegCountInt,如果minSegmentBytes太小会导致相对长时间无法满足merge数量要求。而导致没法进行merge。

long levelSize = minSegmentBytes;
    long bytesLeft = totIndexBytes;
    double allowedSegCount = ;
    while(true) {
      final double segCountLevel = bytesLeft / (double) levelSize;
      if (segCountLevel < segsPerTier) {
        allowedSegCount += Math.ceil(segCountLevel);
        break;
      }
      allowedSegCount += segsPerTier;
      bytesLeft -= segsPerTier * levelSize;
      levelSize *= maxMergeAtOnce;
    }
    int allowedSegCountInt = (int) allowedSegCount;
           

最重要的算法代码,通过minSegmentBytes,segsPerTier,maxMergeAtOnce计算出allowedSegCountInt,这个值可以认为是在这次merge中应该需要有多少个segment才会进行merge。

在下面的while循环中寻找符合条件的SegmentCommitInfo,排除正在merging以及被当前上下文加入可merge的

for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
        final SegmentCommitInfo info = infosSorted.get(idx);
        if (merging.contains(info)) {
          mergingBytes += size(info, writer);
        } else if (!toBeMerged.contains(info)) {
          eligible.add(info);
        }
      }
           

注意tooBigCount变量,这个变量是上面的代码排除大小大于maxMergedSegmentBytes/2.0得出的,这里有个技巧是直接取的符合条件的SegmentCommitInfo

得到eligible符合条件的SegmentCommitInfo集合,但此时不一定会进行merge,还需要判断eligible.size是否大于allowedSegCountInt

if (eligible.size() > allowedSegCountInt) {
....
for(int startIdx = 0;startIdx <= eligible.size()-maxMergeAtOnce; startIdx++) {
....
for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
           

通过这个双循环得出allowedSegCountInt-1个组合,组合类似

[a1, b2, c3, d4, e5, f6, g7, h8, i9, j10]
[b2, c3, d4, e5, f6, g7, h8, i9, j10, k11]
[c3, d4, e5, f6, g7, h8, i9, j10, k11, l12]
[d4, e5, f6, g7, h8, i9, j10, k11, l12, m13]
[e5, f6, g7, h8, i9, j10, k11, l12, m13, n14]
[f6, g7, h8, i9, j10, k11, l12, m13, n14, o15]
           

在里面循环中会判断组合大小是否大于maxMergedSegmentBytes,如果大于则跳过这个segment,这个大小尽量接近maxMergedSegmentBytes

final List<SegmentCommitInfo> candidate = new ArrayList<>();
    boolean hitTooLarge = false;
    for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
        final SegmentCommitInfo info = eligible.get(idx);
        final long segBytes = size(info, writer);

        if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
              // 命中太大的,跳过
              hitTooLarge = true;
              continue;
         }
         candidate.add(info);
         totAfterMergeBytes += segBytes;
    }

final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer);
if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
            best = candidate;
            bestScore = score;
            bestTooLarge = hitTooLarge;
            bestMergeBytes = totAfterMergeBytes;
          }

    if (best != null) {
          if (spec == null) {
            spec = new MergeSpecification();
          }
          final OneMerge merge = new OneMerge(best);
          spec.add(merge);
          for(SegmentCommitInfo info : merge.segments) {
            toBeMerged.add(info);
          }
        }
           

计算当前这组的分值,判断是否是第一个组合或者分数小于上一个组合的分数,最终加入到toBeMerged集合中,接着又来一次while循环执行以上操作。找到最优的组合之后(在TieredMergePolicy类注释有说明更优),将MergeSpecification对象返回。

总结:

index.merge.policy.floor_segment

index.merge.policy.max_merge_at_once

index.merge.policy.segments_per_tier

以上参数会影响计算allowedSegCountInt值,这个值会影响merge的频率。

继续阅读