天天看點

Cassandra Compaction代碼解讀

Compaction代碼解讀

對compacttion的解讀主要是分為以下幾部分:

1、  compaction線程的管理

2、  compaction候選sstable的選取

3、  compaction對sstable的合并。

其中1,3比較固定,應該是不用優化的,僅僅了解即可。

1     compaction線程的管理

1.1  compaction的啟動

在cassandra啟動後,會在open table 的最後建立cfs,cfs的構造函數會根據具體的compactionstrategy建立相應的對象,讓我們看看兩種具體的compactionstrategy的構造函數。

public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String,String> options)
    {
        super(cfs, options);
        this.estimatedRemainingTasks = 0;
        String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
        minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue): DEFAULT_MIN_SSTABLE_SIZE;
        cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
        cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
}
 
 
 
public LeveledCompactionStrategy(ColumnFamilyStorecfs, Map<String, String> options)
    {
        super(cfs, options);
        int configuredMaxSSTableSize = 5;
        if (options != null)
        {
            String value = options.containsKey(SSTABLE_SIZE_OPTION)? options.get(SSTABLE_SIZE_OPTION): null;
            if (null != value)
            {
                try
                {
                    configuredMaxSSTableSize =Integer.parseInt(value);
                }
                catch (NumberFormatException ex)
                {
                    logger.warn(String.format("%s is not a parsable int (base10) for %s usingdefault value",
                                             value, SSTABLE_SIZE_OPTION));
                }
            }
        }
        maxSSTableSizeInMB =configuredMaxSSTableSize;
 
        cfs.getDataTracker().subscribe(this);
        logger.debug("{} subscribed to the data tracker.",this);
 
        manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB);
        logger.debug("Created {}", manifest);
        // override min/max for this strategy
       cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
        cfs.setMinimumCompactionThreshold(1);
    }
 
           

關鍵是這個:反正很簡單,就不說明了。

protectedAbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>options)
    {
       assertcfs != null;
       this.cfs = cfs;
       this.options = options;
 
        // start compactions in fiveminutes (if no flushes have occurred by then to do so)
       Runnable runnable = new Runnable()
       {
           public void run()
           {
                if (CompactionManager.instance.getActiveCompactions()== 0)
                {
                    CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
                }
           }
       };
       StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);// optionalTasks是個線程庫
}
           

從以上三段代碼,可以看出,cassandra啟動後就會建立compactionstrategy的線程,如果在5分鐘内沒有flush就會啟動。

CompactionManager是對正在進行compaction線程的管理,compaction一般來說是單線程的(對一個cf來說)。牽涉到同步啊,鎖啊,以及線程池。

下面看下主要的代碼:CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);

/**
    * Call this whenever a compaction might be needed on the given columnfamily.
    * It's okay to over-call (within reason) since the compactions aresingle-threaded,
    * and if a call is unnecessary, it will just be no-oped in the bucketing phase.
    */
    public Future<Integer>submitBackground(finalColumnFamilyStore cfs)
    {
       logger.debug("Scheduling a background task check for {}.{} with{}",
                     new Object[] {cfs.table.name,
                                   cfs.columnFamily,
                                  cfs.getCompactionStrategy().getClass().getSimpleName()});
       Callable<Integer> callable = new Callable<Integer>()
       {
           publicInteger call() throwsIOException
           {
               compactionLock.readLock().lock();
                try
                {
                    logger.debug("Checking {}.{}", cfs.table.name, cfs.columnFamily); // log after we get the lock so we can see delays fromthat if any
                    if (!cfs.isValid())
                    {
                        logger.debug("Aborting compaction for dropped CF");
                        return 0;
                    }
 
                    boolean taskExecuted = false;
                    AbstractCompactionStrategystrategy = cfs.getCompactionStrategy();
                   List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));//選取候選compaction的sstable的過程。不解釋了。
                    logger.debug("{} minor compaction tasks available",tasks.size());
                    for (AbstractCompactionTask task : tasks)
                    {
                        if(!task.markSSTablesForCompaction())//mark過程
                        {
                            logger.debug("Skipping {}; sstables are busy", task);
                            continue;
                        }
 
                        taskExecuted = true;
                        try
                        {
                            task.execute(executor);//主要的combine過程
                        }
                        finally
                        {
                           task.unmarkSSTables();
                        }
                    }
 
                    // newly created sstables might have made other compactionseligible
                    if (taskExecuted)
                        submitBackground(cfs);
                }
                finally
                {
                    compactionLock.readLock().unlock();
                }
                return 0;
           }
       };
       return executor.submit(callable);
    }
           

從上面這段代碼可以看出compaction的過程是單線程的。

2 compaction候選sstable的選取

2.1 SizeTieredCompactionStrategy

由于該政策中有min/maxCompactionThreshold,是以在選取sstable時候,如果大小相似的sstable的數量不到min則構不成一個task,但一個task最多不超過max個sstable。

在這個政策中最重要的是如何找到所有大小相似的sstable清單。

看下面代碼:

/*
    * Group files of similar size into buckets.
    */
    static <T> List<List<T>>getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize)
    {
       // Sort the list in order to getdeterministic results during the grouping below
       List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
       Collections.sort(sortedFiles, newComparator<Pair<T, Long>>()
       {
           public int compare(Pair<T, Long> p1, Pair<T,Long> p2)
           {
                return p1.right.compareTo(p2.right);
           }
       });
       Map<Long, List<T>> buckets = newHashMap<Long, List<T>>();
       outer:
       for (Pair<T, Long> pair: sortedFiles)
       {
           long size = pair.right;
           // look for a bucket containingsimilar-sized files:
           // group in the same bucket if it's w/in50% of the average for this bucket,
           // or this file and the bucket are allconsidered "small" (less than `minSSTableSize`)
           for (Entry<Long, List<T>> entry :buckets.entrySet())//不是已經sorted了嗎?為什麼還要這樣?效率不高吧。其實消耗時間的不在這裡。
           {
                List<T> bucket =entry.getValue();
               long oldAverageSize = entry.getKey();
                if ((size> (oldAverageSize / 2) && size < (3 * oldAverageSize) / 2)
                    || (size <minSSTableSize && oldAverageSize < minSSTableSize))//關鍵是這裡,用來判斷大小是否相似。
                {
                    // remove and re-add under new new average size
                   buckets.remove(oldAverageSize);
                    longtotalSize = bucket.size() * oldAverageSize;
                    longnewAverageSize = (totalSize + size) / (bucket.size() + 1);
                    bucket.add(pair.left);
                    buckets.put(newAverageSize,bucket);
                    continueouter;
                }
           }
           // no similar bucket found; put it in a newone
           ArrayList<T> bucket = new ArrayList<T>();
           bucket.add(pair.left);
           buckets.put(size, bucket);
       }
       return newArrayList<List<T>>(buckets.values());
}
           

上面代碼最重要的部分是找出相似大小的sstable清單。找到後封裝成資料結構List<List<SSTableReader>>buckets.對每個list最多建構一個tasks。而且隻有在所有現有的tasks全部執行完才會重新建構tasks。

2.2LeveledCompactionStrategy

這種政策不同前面,沒有Threhold。在該政策中所有的sstable都有相似的大小,而且最重要的是在同一level各個sstable的key是不相交的(除了L0)。先說一般情況(L0除外),當把Li的sstable compact到Li+1時,Li的選擇是從高層到低層的,詳細下面叙述。Li層sstable的選取根據lastCompactedKeys[i]的(sstable.first.compareTo(lastCompactedKeys[level]) > 0),Li+1層參與compaction的sstable的選取:

return overlapping(sstable,generations[(level + 1)]);可見在Li上僅有一個sstable參與,Li+1上所有與它key重疊的sstable都要參與。對于L0到L1的compaction,與上面不同的是所有L0 sstable key重疊的也要參與compaction(當然這個并不是十分必要)。

public synchronizedCollection<SSTableReader> getCompactionCandidates()
    {
       // LevelDB gives each level a score of howmuch data it contains vs its ideal amount, and
       // compacts the level with the highestscore. But this falls apart spectacularly once you
       // get behind.  Consider this set of levels:
       // L0: 988 [ideal: 4]
       // L1: 117 [ideal: 10]
       // L2: 12 [ideal: 100]
       //
       // The problem is that L0 has a much higherscore (almost 250) than L1 (11), so what we'll
       // do is compact a batch ofMAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
       // result (say, 120 sstables) in L1.Then we'll compact the next batch of MAX_COMPACTING_L0,
       // and so forth.  So we spend most of our i/o rewriting the L1data with each batch.
       //
       // If we could just do *all* L0 a singletime with L1, that would be ideal.  Butwe can't
       // -- see the javadoc forMAX_COMPACTING_L0.
       //
       // LevelDB's way around this is to simplyblock writes if L0 compaction falls behind.
       // We don't have that luxury.
       //
       // So instead, we force compacting higherlevels first.  This may not minimize thenumber
       // of reads done as quickly in the shortterm, but it minimizes the i/o needed to compact
       // optimially which gives us a longterm win.
       for(inti = generations.length - 1; i >= 0; i--)
       {
           List<SSTableReader>sstables = generations[i];
           if(sstables.isEmpty())
                continue; // mostly this just avoids polluting the debug log with zero scores
           doublescore = (double)SSTableReader.getTotalBytes(sstables)/ (double)maxBytesForLevel(i);
           logger.debug("Compaction score for level {} is {}", i, score);
 
           // L0 gets a special case that if we don'thave anything more important to do,
           // we'll go ahead and compact even just onesstable
           if(score > 1.001 || i == 0)
           {
                Collection<SSTableReader>candidates = getCandidatesFor(i);
                if (logger.isDebugEnabled())
                    logger.debug("Compaction candidates for L{} are {}",i, toString(candidates));
                return candidates;
           }
       }
 
       returnCollections.emptyList();
}
           

上面這段代碼是選擇從那層開始compaction,代碼講解很清楚,不叙述了。

該政策每次隻會建構一個task。

3 tocompact sstable的合并

這節的内容由于用到了大量的模闆類以及繼承的知識,使得合并的過程十分難懂。最基本要知道大體的過程。

/**
     * For internal use and testing only.  The rest of the system should go through the submit* methods,
     * which are properly serialized.
     * Caller is in charge of marking/unmarking the sstables as compacting.
     */
    public int execute(CompactionExecutorStatsCollector collector) throws IOException
    {
        // The collection of sstables passed may be empty (but not null); even if
        // it is not empty, it may compact down to nothing if all rows are deleted.
        assert sstables != null;

        Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
        if (!isCompactionInteresting(toCompact))
            return 0;

        if (compactionFileLocation == null)
            compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));

        if (compactionFileLocation == null && partialCompactionsAcceptable())
        {
            // If the compaction file path is null that means we have no space left for this compaction.
            // Try again w/o the largest one.
            while (compactionFileLocation == null && toCompact.size() > 1)
            {
                logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
                // Note that we have removed files that are still marked as compacting.
                // This suboptimal but ok since the caller will unmark all the sstables at the end.
                toCompact.remove(cfs.getMaxSizeFile(toCompact));
                compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
            }
        }
        if (compactionFileLocation == null)
        {
            logger.warn("insufficient space to compact; aborting compaction");
            return 0;
        }

        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
            cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

        // sanity check: all sstables must belong to the same cfs
        for (SSTableReader sstable : toCompact)
            assert sstable.descriptor.cfname.equals(cfs.columnFamily);

        CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);//
        // new sstables from flush can be added during a compaction, but only the compaction can remove them,
        // so in our single-threaded compaction world this is a valid way of determining if we're compacting
        // all the sstables (that existed when we started)
        logger.info("Compacting {}", toCompact);

        long startTime = System.currentTimeMillis();
        long totalkeysWritten = 0;

        long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
        if (logger.isDebugEnabled())
            logger.debug("Expected bloom filter size : " + keysPerSSTable);

        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
                                      ? new ParallelCompactionIterable(compactionType, toCompact, controller)
                                      : new CompactionIterable(compactionType, toCompact, controller);//先看單線程的
        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescanner out=AbstractCompactedRow> 
        Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
        Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
        // replace the old entries.  Track entries to preheat here until then.
        Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap =  new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

        if (collector != null)
            collector.beginCompaction(ci);
        try
        {
            if (!nni.hasNext())
            {
                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                // we need to sync it (via closeAndOpen) first, so there is no period during which
                // a crash could cause data loss.
                cfs.markCompacted(toCompact, compactionType);
                return 0;
            }

            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
            writers.add(writer);
            while (nni.hasNext())
            {
                AbstractCompactedRow row = nni.next();
                if (row.isEmpty())
                    continue;

                long position = writer.append(row);
                totalkeysWritten++;

                if (DatabaseDescriptor.getPreheatKeyCache())
                {
                    for (SSTableReader sstable : toCompact)
                    {
                        if (sstable.getCachedPosition(row.key, false) != null)
                        {
                            cachedKeys.put(row.key, position);
                            break;
                        }
                    }
                }
                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
                {
                    SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
                    cachedKeyMap.put(toIndex, cachedKeys);
                    sstables.add(toIndex);
                    if (nni.hasNext())
                    {
                        writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
                        writers.add(writer);
                        cachedKeys = new HashMap<DecoratedKey, Long>();
                    }
                }
            }
        }
        catch (Exception e)
        {
            for (SSTableWriter writer : writers)
                writer.abort();
            throw FBUtilities.unchecked(e);
        }
        finally
        {
            iter.close();
            if (collector != null)
                collector.finishCompaction(ci);
        }

        cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
        for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
        {
            SSTableReader key = ssTableReaderMapEntry.getKey();
            for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
               key.cacheKey(entry.getKey(), entry.getValue());
        }

        long dTime = System.currentTimeMillis() - startTime;
        long startsize = SSTable.getTotalBytes(toCompact);
        long endsize = SSTable.getTotalBytes(sstables);
        double ratio = (double)endsize / (double)startsize;

        StringBuilder builder = new StringBuilder();
        builder.append("[");
        for (SSTableReader reader : sstables)
            builder.append(reader.getFilename()).append(",");
        builder.append("]");

        double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
        logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s.  Time: %,dms.",
                                  builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
        return toCompact.size();
}
           

這段代碼來自CompactionTask,task正如第二節所叙述的,主要是儲存了tocompact的sstables。這段代碼有三個比較重要的點,1、選擇directory;2、整個合并過程;3、合并後的寫入過程。

directory選擇data目錄中剩餘空間最多的directory,比較簡單,不單獨叙述。

3.1合并過程

真正的合并過程是以下幾段代碼:

AbstractCompactionIterable ci =DatabaseDescriptor.isMultithreadedCompaction()

                                      ? new ParallelCompactionIterable(compactionType,toCompact, controller)

                                      : new CompactionIterable (compactionType,toCompact, controller);//先看單線程的

      //主要的作用是

       CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescannerout=AbstractCompactedRow>

       Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,Predicates.notNull());//真正的合并

這幾段代碼比較難了解,雖然隻有三行,缺是整個的合并過程。

需要綜合CompactionIterable.java、Reducer、MergeIterator、ManyToOne、SstableReader、SstableScanner幾個類來了解。而且要知道父類子類之間的關系。

public static <T>Iterator<T> filter(final Iterator<T> unfiltered, finalPredicate<? super T> predicate) { checkNotNull(unfiltered);checkNotNull(predicate);
 return newAbstractIterator<T>() {
@Override protected T computeNext() {
while (unfiltered.hasNext()) {
T element = unfiltered.next();
if (predicate.apply(element)) {
return element;
 }
 }
 endOfData();
return null; } };}
           

要了解這段代碼,首先要知道 com.google.common.collect這個jar包,這個jar包封裝了很多有用的容器。其中AbstractIterator就在其中。iter是ManyToOne,繼承了MergeIterator,後者又繼承了AbstractIterator,通過對源碼的閱讀,unfiltered.hasNext(),會調用computeNext();根據父類子類的關系,顯然執行的是子類的實作。即 ManyToOne.computeNext();這樣一切都易懂了。    

大體的過程是把所有的SstableScanner加入一個PriorityQueue裡面(依據首個Row key來排序),然後從queue中取SstableScanner得目前的row進行合并(CF.add()),直到key不相同為止。然後把取出的SstableScanner再次加入queue(位置已經指向下一個row)。

直到queue為空位置,這是合并就完成了:下面是關鍵的代碼:

protected final Out consume()
        {
            reducer.onKeyChange();
            Candidate<In> candidate = queue.peek();
            if (candidate == null)
                return endOfData();
            do
            {
                candidate = queue.poll();
                candidates.push(candidate);
                reducer.reduce(candidate.item);//合并
            }
            while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);//合并的過程,注意循環的判斷條件。compareTo關鍵是看這裡的comp。key.
            return reducer.getReduced();
        }

        /** Advance and re-enqueue all items we consumed in the last iteration. */// andidate是SstableScanner,它是對sstable的解析,是一個個row。而candidate.advance()顧名思義,就是取出了一個個row、
        protected final void advance()
        {//加入優先序列,排序自然是根據candidate的comp,queue中的内容是非空的。
            Candidate<In> candidate;
            while ((candidate = candidates.pollFirst()) != null)
                if (candidate.advance())//直到取完了。這個sstable就完成了合并了。是以comp應該是按照排頭的key來進行的。
                    queue.add(candidate);
        }
}
           

上面代碼中candidate=SstableScanner。candidate.advance()是把SstableScanner的疊代器移到下一個row。

3.2 flush

對于SizeTieredCompactionStrategy,合并後成為一個sstable,寫入時,沒有sstable大小的限制。LeveledCompactionStrategy則有sstable大小的限制,當sstable大小超過限制時,會建立一個新的sstable,接着寫入。

4 總結

幾個比較關鍵的類:

CompactionManager是整個compaction的管理者,它把其他的幾個類聯系在一起,共同完成compaction的過程,其中更是包括對compaction個個task的管理。

CompactionStrategy,應該說是有兩個,它是Compaction線程實體,并提供了建立task的方法。

CompactionTask也是又兩種,它承載了需要compaction的sstable。而且提供合并寫入過程的方法。是compaction的實際執行。

CompactedRow兩種不一樣,可以了解為存儲合并後Row的資料結構。

要想充分了解Compaction過程,還需要對SStable SStableReader SStableScanner SStableWriter的了解。

繼續閱讀