天天看點

Cassandra LCS壓縮原理詳解

cassandra的壓縮的政策是在cassandra的守護線程cassandraDaemon類中的startUp中進行定時啟動的壓縮機制。

CassandraDaemon setUp()中的定時啟動任務

ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);

從代碼中可以看出,cassandra是啟動5分鐘以後每隔1分鐘就要啟動一次壓縮任務

public static Runnable getBackgroundCompactionTaskSubmitter()

{

return new Runnable()
{
    public void run()
    {
        for (Keyspace keyspace : Keyspace.all())
            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
                CompactionManager.instance.submitBackground(cfs);
    }
};           

}

從上面的代碼中可以看出,擷取到所有的keyspace,然後針對所有的keyspace的表進行壓縮。

/**

  • Call this whenever a compaction might be needed on the given columnfamily.
  • It's okay to over-call (within reason) if a call is unnecessary, it will
  • turn into a no-op in the bucketing/candidate-scan phase.

    */

public List> submitBackground(final ColumnFamilyStore cfs)

if (cfs.isAutoCompactionDisabled()) // 判斷表格是否關閉了壓縮政策
{
    logger.trace("Autocompaction is disabled");
    return Collections.emptyList();
}

 /**
 * 如果CF目前正在被壓縮了,并且沒有閑置的線程池了,我們則等待下一次送出目前的CF壓縮任務,當我們有足夠多線程的時候
 * 否則我們應該至少送出一個任務以防止某個CF長時間霸占線程池,也就是CF饑餓。
 **/
int count = compactingCF.count(cfs);
if (count > 0 && executor.getActiveCount() >= executor.getMaximumPoolSize())
{ // 已經有在壓縮了,并且沒有空間的線程池,則退出
    logger.trace("Background compaction is still running for {}.{} ({} remaining). Skipping",
                 cfs.keyspace.getName(), cfs.name, count);
    return Collections.emptyList();
}

logger.trace("Scheduling a background task check for {}.{} with {}",
             cfs.keyspace.getName(),
             cfs.name,
             cfs.getCompactionStrategyManager().getName());

List<Future<?>> futures = new ArrayList<>(1);
Future<?> fut = executor.submitIfRunning(new BackgroundCompactionCandidate(cfs), "background task");
//沒有正在壓縮的,情況,則送出一次壓縮,以防止CF 饑餓
if (!fut.isCancelled())
    futures.add(fut);
else
    compactingCF.remove(cfs);
return futures;           

public void run()

try
{
    logger.trace("Checking {}.{}", cfs.keyspace.getName(), cfs.name);
    if (!cfs.isValid()) // 如果已經删除了,則不允許在被壓縮了
    {
        logger.trace("Aborting compaction for dropped CF");
        return;
    }
    
    //先從cf表中擷取到目前表格的壓縮政策
    CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
    //根據壓縮政策,擷取到壓縮任務,這裡需要擷取到GC的時間,這裡的GC是指墓碑的删除時間
    AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()));
    if (task == null)
    {
        logger.trace("No tasks available");
        return;
    }
    task.execute(metrics);
}
finally
{
    compactingCF.remove(cfs);
}
submitBackground(cfs);           
  • Return the next background task

    *

  • Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)

public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)

if (!isEnabled())
    return null;

maybeReload(cfs.metadata);

// 将任務分為已經repaired過的,和沒有進行repaired的兩部分
// 哪個預估剩餘的任務量大,就先進行哪個任務
if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
{
    AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
    if (repairedTask != null)
        return repairedTask;
    return unrepaired.getNextBackgroundTask(gcBefore);
}
else
{
    AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
    if (unrepairedTask != null)
        return unrepairedTask;
    return repaired.getNextBackgroundTask(gcBefore);
}           
  • the only difference between background and maximal in LCS is that maximal is still allowed
  • (by explicit user request) even when compaction is disabled.

@SuppressWarnings("resource")

while (true)
{
    OperationType op;
    //擷取到壓縮的候選者
    LeveledManifest.CompactionCandidate candidate = manifest.getCompactionCandidates();
    if (candidate == null)
    {  // 如果沒有壓縮候選者,也就是候選者為null
        // 這個時候,沒有壓縮候選者,那麼就嘗試針對已經删除的資料,也就是墓碑是否有需要被處理的
        SSTableReader sstable = findDroppableSSTable(gcBefore);
        if (sstable == null)
        {
            logger.trace("No compaction necessary for {}", this);
            return null;
        }
        candidate = new LeveledManifest.CompactionCandidate(Collections.singleton(sstable),
                                                            sstable.getSSTableLevel(),
                                                            getMaxSSTableBytes());
        op = OperationType.TOMBSTONE_COMPACTION;
    }
    else
    {
        op = OperationType.COMPACTION;
    }

    LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
    if (txn != null)
    {
        // 傳回分層壓縮任務
        LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
        newTask.setCompactionType(op);
        return newTask;
    }
}           
  • @return highest-priority sstables to compact, and level to compact them to
  • If no compactions are necessary, will return null

public synchronized CompactionCandidate getCompactionCandidates()

// during bootstrap we only do size tiering in L0 to make sure
// the streamed files can be placed in their original levels
if (StorageService.instance.isBootstrapMode())
{
    List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
    if (!mostInteresting.isEmpty())
    {
        logger.info("Bootstrapping - doing STCS in L0");
        return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
    }
    return null;
}
// LevelDB 會給每個level 一個分數(有多少資料它擁有的比上它的理想資料),并且
// 壓縮得分高的層級,但是這樣很容以分崩離析,一旦發生落後
// 舉個例子,現在L0 有 988個sstable,理想的是4個
// L1 117個sstable,理想的是10個
// L2 12個sstable,理想的是100個
// 問題就是當L0(225) 比 L1(11)要高,那麼我們會做一個MAX_COMPACTION_SIZE的L0 和 117個L1壓縮
// 并将壓縮的結果放到L1,當我們計算下一個L0的時候,又需要一次和L1(120)個sstable一起做壓縮
// 這樣就會導緻L1不停的被壓縮,引起頻繁的IO讀取,而且是指針對L1的。
// 這種壓縮政策,一但L0的壓縮落後了以後,我們就不得不阻塞寫性能
// 是以我們采用不同的政策
// 1. 首先先壓縮高層,這樣可以最大限度的減少IO
// 2. 并且L0一旦落後比較嚴重了,會采用SIZE壓縮,以減少讀性能,進而趕上高層的壓縮分數
// 當然這不是一個萬全之策,如果一直處于高壓的寫,也同樣會崩潰,但是偶爾爆發性的寫,這是一個很好的政策
for (int i = generations.length - 1; i > 0; i--)
{
    List<SSTableReader> sstables = getLevel(i);
    if (sstables.isEmpty())
        continue; // mostly this just avoids polluting the debug log with zero scores
    // we want to calculate score excluding compacting ones
    Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
    Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getTracker().getCompacting());
    // 分數為  sstable的總的大小 /  該層級最大的磁盤空間
    double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i, maxSSTableSizeInBytes);
    logger.trace("Compaction score for level {} is {}", i, score);

    if (score > 1.001) // 當分數大于1的時候,也就是目前層級的大小比目前c層級最大的允許的磁盤空間
    {
        // 在處理高層級壓縮的時候,就需要判斷一下L0的層級分數是否落後到足夠多以至于開啟STCS的壓縮
        // before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS
        CompactionCandidate l0Compaction = getSTCSInL0CompactionCandidate();
        if (l0Compaction != null) // 如果L0 已經落後太多了,開啟STCS壓縮
            return l0Compaction;

        // L0目前還好,就直接執行目前的壓縮政策
        // L0 is fine, proceed with this level
        Collection<SSTableReader> candidates = getCandidatesFor(i);
        if (!candidates.isEmpty())
        {
            int nextLevel = getNextLevel(candidates);
            // 将它的上一級的壓縮次數清0,并且判斷是否存在饑餓壓縮的情況,如果是的話,就要考慮一下 sstable是否和候選者之間存在重疊,并且沒有在壓縮
            // 則也需要一起加進來就行一起壓縮,這主要原因是因為有些層級資料量太少了,一直滅有被壓縮過
            candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
            if (logger.isTraceEnabled())
                logger.trace("Compaction candidates for L{} are {}", i, toString(candidates));
            return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategyManager().getMaxSSTableBytes());
        }
        else
        {
            logger.trace("No compaction candidates for L{}", i);
        }
    }
}

// Higher levels are happy, time for a standard, non-STCS L0 compaction
if (getLevel(0).isEmpty())
    return null;
Collection<SSTableReader> candidates = getCandidatesFor(0);
if (candidates.isEmpty())  // 如果擷取到的L0層級的壓縮候選者資料量為0,則直接進行stcs壓縮
{
    // Since we don't have any other compactions to do, see if there is a STCS compaction to perform in L0; if
    // there is a long running compaction, we want to make sure that we continue to keep the number of SSTables
    // small in L0.
    return getSTCSInL0CompactionCandidate();
}
return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes());           
  • @return highest-priority sstables to compact for the given level.
  • If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
  • for prior failure), will return an empty list. Never returns null.

private Collection getCandidatesFor(int level)

assert !getLevel(level).isEmpty();
logger.trace("Choosing candidates for L{}", level);

final Set<SSTableReader> compacting = cfs.getTracker().getCompacting();

if (level == 0) // 如果是level為0就走level 0 的壓縮政策
{
    // 先要擷取到L0正在壓縮的sstable
    Set<SSTableReader> compactingL0 = getCompacting(0);

    // 首選,先要擷取到L0 正在壓縮的sstable中最大的 parttion 
    // 和最小的partion
    PartitionPosition lastCompactingKey = null;
    PartitionPosition firstCompactingKey = null;
    for (SSTableReader candidate : compactingL0)
    {
        if (firstCompactingKey == null || candidate.first.compareTo(firstCompactingKey) < 0)
            firstCompactingKey = candidate.first;
        if (lastCompactingKey == null || candidate.last.compareTo(lastCompactingKey) > 0)
            lastCompactingKey = candidate.last;
    }

    // L0 是很多新得sstable的垃圾場,是以可能會存在很多的sstable重疊
    // 我們對待L0的壓縮比較特殊
    // 1. 添加sstables到 候選者集合中,直到至少最大的數量
    // 2. 優先選擇老的sstable,而不是新的sstable,并且任意和候選者隻有
    // 重疊的sstable也都會加入熬後選擇中,當L0的sstable的數量大于Max的時候
    // 就會發起壓縮
    // 如果所有的候選者的大小小于最大MB的時候,我們将不會打擾L1層,并
    // 将壓縮後的結果儲存到L0中,而不是直接提升。
           
// L0 is the dumping ground for new sstables which thus may overlap each other.
    //
    // We treat L0 compactions specially:
    // 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB
    // 1b. prefer choosing older sstables as candidates, to newer ones
    // 1c. any L0 sstables that overlap a candidate, will also become candidates
    // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once
    // 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1,
    //    and the result of the compaction will stay in L0 instead of being promoted (see promote())
    //
    // Note that we ignore suspect-ness of L1 sstables here, since if an L1 sstable is suspect we're
    // basically screwed, since we expect all or most L0 sstables to overlap with each L1 sstable.
    // So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best.
    Set<SSTableReader> candidates = new HashSet<>();
    Set<SSTableReader> remaining = new HashSet<>();
    //任何可疑的sstable
    Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP)));
    // 将剩餘的可疑的sstable按照sstable生成的時間進行排序
    for (SSTableReader sstable : ageSortedSSTables(remaining))
    {
        // 如果已經在候選者中了,就直接跳過
        if (candidates.contains(sstable))
            continue;

        //剩餘的sstable和目前的sstalec有重疊的部分也會被加如到候選者中
        // 這裡的重疊指得時 sstable中得最大最小得token。也就是說
        // 任何sstable 和 目前得sstable得token之間存在交集,也就是範圍存在交集
        // 這裡可能認為token範圍重疊,就存在内容重疊吧?
        Sets.SetView<SSTableReader> overlappedL0 = Sets.union(Collections.singleton(sstable), overlapping(sstable, remaining));
        if (!Sets.intersection(overlappedL0, compactingL0).isEmpty())
            continue;  // 如果所有重疊額sstable和目前得sstable一起,和正在壓縮得sstable之間存在交集,則直接跳
        // 如果overlappedL0 沒有正在壓縮的sstable,則需要判斷
        // 候選者中是否有和正在壓縮的l0層sstable 有token範圍交集
        // 如果沒有交集,則認為目前的sstable就直接加入候選者
        // 
        for (SSTableReader newCandidate : overlappedL0)
        {
            if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0)
                candidates.add(newCandidate);
            remaining.remove(newCandidate); // 已經經過重疊的sstable就不在進行重複添加了
            // 要麼這個sstable 和 正在壓縮的有重疊,要麼已經加入到候選者,是以可以在剩餘的sstable集合中直接删除
        }

        //如果候選者的資料已經大于MAX_COMPACTING_L0的時候,直接擷取到時間最早的最大資料量的sstable
        if (candidates.size() > MAX_COMPACTING_L0)
        {
            // limit to only the MAX_COMPACTING_L0 oldest candidates
            candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
            break;
        }
    }
    
    // 如果候選者加起來的sstable的大小比最大值的話,就需要加入L1層中的sstable進來一起壓縮
    // leave everything in L0 if we didn't end up with a full sstable's worth of data
    if (SSTableReader.getTotalBytes(candidates) > maxSSTableSizeInBytes)
    {
        // add sstables from L1 that overlap candidates
        // if the overlapping ones are already busy in a compaction, leave it out.
        // TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables
        // 候選者最大最小的tokenf範圍内和L1有重疊的sstable
        Set<SSTableReader> l1overlapping = overlapping(candidates, getLevel(1));
        // L1重疊的sstable和正在壓縮的sstable有重疊,則直接放棄目前的L0壓縮
        if (Sets.intersection(l1overlapping, compacting).size() > 0)
            return Collections.emptyList();
        // 如果L0正在壓縮的sstable 和 候選者之間存在token重疊的話,也直接放棄目前L0壓縮    
        if (!overlapping(candidates, compactingL0).isEmpty())
            return Collections.emptyList();
        candidates = Sets.union(candidates, l1overlapping);
    }
    if (candidates.size() < 2)
        return Collections.emptyList();
    else
        return candidates;
}

// for non-L0 compactions, pick up where we left off last time
Collections.sort(getLevel(level), SSTableReader.sstableComparator);
int start = 0; // handles case where the prior compaction touched the very last range
for (int i = 0; i < getLevel(level).size(); i++)
{
    SSTableReader sstable = getLevel(level).get(i);
    if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
    {
        start = i;
        break;
    }
}

// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
for (int i = 0; i < getLevel(level).size(); i++)
{
    SSTableReader sstable = getLevel(level).get((start + i) % getLevel(level).size());
    Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, getLevel(level + 1)));
    if (Iterables.any(candidates, suspectP))
        continue;
    if (Sets.intersection(candidates, compacting).isEmpty())
        return candidates;
}

// all the sstables were suspect or overlapped with something suspect
return Collections.emptyList();           

private CompactionCandidate getSTCSInL0CompactionCandidate()

if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
{
    List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
    if (!mostInteresting.isEmpty())
    {
        logger.debug("L0 is too far behind, performing size-tiering there first");
        return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
    }
}

return null;           
//  如果開啟了STCS壓縮,并且L0的sstable 的總數大于 MAX量,則開啟STCS壓縮
// 
if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
{
    List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
    if (!mostInteresting.isEmpty())
    {
        logger.debug("L0 is too far behind, performing size-tiering there first");
        return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
    }
}

return null;           

STCS的大小歸類的方法是,比如1 2 sstable的平均大小作為一個值,這個值上上下 0.5倍也都加入

到這個sstable中,然後再進行求解 平均值,然後再重新計算上下值,加入到這個sstable中,進行重新編寫大小。最後傳回大小差不多的sstable加入到一起

然後比較所有大小差不多的 sstable 集合,之間所有的sstable的熱度比較大小,傳回熱度最大的sstable集合進行壓縮。因為讀取越多的sstable,優先進行壓縮,有利于提升讀性能

private List getSSTablesForSTCS(Collection sstables)

Iterable<SSTableReader> candidates = cfs.getTracker().getUncompacting(sstables);
List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
                                                                            options.bucketHigh,
                                                                            options.bucketLow,
                                                                            options.minSSTableSize);
return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);