Compaction代碼解讀
對compacttion的解讀主要是分為以下幾部分:
1、 compaction線程的管理
2、 compaction候選sstable的選取
3、 compaction對sstable的合并。
其中1,3比較固定,應該是不用優化的,僅僅了解即可。
1 compaction線程的管理
1 compaction線程的管理
1.1 compaction的啟動
1.1 compaction的啟動
在cassandra啟動後,會在open table 的最後建立cfs,cfs的構造函數會根據具體的compactionstrategy建立相應的對象,讓我們看看兩種具體的compactionstrategy的構造函數。
public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String,String> options)
{
super(cfs, options);
this.estimatedRemainingTasks = 0;
String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue): DEFAULT_MIN_SSTABLE_SIZE;
cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
}
public LeveledCompactionStrategy(ColumnFamilyStorecfs, Map<String, String> options)
{
super(cfs, options);
int configuredMaxSSTableSize = 5;
if (options != null)
{
String value = options.containsKey(SSTABLE_SIZE_OPTION)? options.get(SSTABLE_SIZE_OPTION): null;
if (null != value)
{
try
{
configuredMaxSSTableSize =Integer.parseInt(value);
}
catch (NumberFormatException ex)
{
logger.warn(String.format("%s is not a parsable int (base10) for %s usingdefault value",
value, SSTABLE_SIZE_OPTION));
}
}
}
maxSSTableSizeInMB =configuredMaxSSTableSize;
cfs.getDataTracker().subscribe(this);
logger.debug("{} subscribed to the data tracker.",this);
manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB);
logger.debug("Created {}", manifest);
// override min/max for this strategy
cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
cfs.setMinimumCompactionThreshold(1);
}
關鍵是這個:反正很簡單,就不說明了。
protectedAbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>options)
{
assertcfs != null;
this.cfs = cfs;
this.options = options;
// start compactions in fiveminutes (if no flushes have occurred by then to do so)
Runnable runnable = new Runnable()
{
public void run()
{
if (CompactionManager.instance.getActiveCompactions()== 0)
{
CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
}
}
};
StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);// optionalTasks是個線程庫
}
從以上三段代碼,可以看出,cassandra啟動後就會建立compactionstrategy的線程,如果在5分鐘内沒有flush就會啟動。
CompactionManager是對正在進行compaction線程的管理,compaction一般來說是單線程的(對一個cf來說)。牽涉到同步啊,鎖啊,以及線程池。
下面看下主要的代碼:CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
/**
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) since the compactions aresingle-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
*/
public Future<Integer>submitBackground(finalColumnFamilyStore cfs)
{
logger.debug("Scheduling a background task check for {}.{} with{}",
new Object[] {cfs.table.name,
cfs.columnFamily,
cfs.getCompactionStrategy().getClass().getSimpleName()});
Callable<Integer> callable = new Callable<Integer>()
{
publicInteger call() throwsIOException
{
compactionLock.readLock().lock();
try
{
logger.debug("Checking {}.{}", cfs.table.name, cfs.columnFamily); // log after we get the lock so we can see delays fromthat if any
if (!cfs.isValid())
{
logger.debug("Aborting compaction for dropped CF");
return 0;
}
boolean taskExecuted = false;
AbstractCompactionStrategystrategy = cfs.getCompactionStrategy();
List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));//選取候選compaction的sstable的過程。不解釋了。
logger.debug("{} minor compaction tasks available",tasks.size());
for (AbstractCompactionTask task : tasks)
{
if(!task.markSSTablesForCompaction())//mark過程
{
logger.debug("Skipping {}; sstables are busy", task);
continue;
}
taskExecuted = true;
try
{
task.execute(executor);//主要的combine過程
}
finally
{
task.unmarkSSTables();
}
}
// newly created sstables might have made other compactionseligible
if (taskExecuted)
submitBackground(cfs);
}
finally
{
compactionLock.readLock().unlock();
}
return 0;
}
};
return executor.submit(callable);
}
從上面這段代碼可以看出compaction的過程是單線程的。
2 compaction候選sstable的選取
2.1 SizeTieredCompactionStrategy
由于該政策中有min/maxCompactionThreshold,是以在選取sstable時候,如果大小相似的sstable的數量不到min則構不成一個task,但一個task最多不超過max個sstable。
在這個政策中最重要的是如何找到所有大小相似的sstable清單。
看下面代碼:
/*
* Group files of similar size into buckets.
*/
static <T> List<List<T>>getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize)
{
// Sort the list in order to getdeterministic results during the grouping below
List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
Collections.sort(sortedFiles, newComparator<Pair<T, Long>>()
{
public int compare(Pair<T, Long> p1, Pair<T,Long> p2)
{
return p1.right.compareTo(p2.right);
}
});
Map<Long, List<T>> buckets = newHashMap<Long, List<T>>();
outer:
for (Pair<T, Long> pair: sortedFiles)
{
long size = pair.right;
// look for a bucket containingsimilar-sized files:
// group in the same bucket if it's w/in50% of the average for this bucket,
// or this file and the bucket are allconsidered "small" (less than `minSSTableSize`)
for (Entry<Long, List<T>> entry :buckets.entrySet())//不是已經sorted了嗎?為什麼還要這樣?效率不高吧。其實消耗時間的不在這裡。
{
List<T> bucket =entry.getValue();
long oldAverageSize = entry.getKey();
if ((size> (oldAverageSize / 2) && size < (3 * oldAverageSize) / 2)
|| (size <minSSTableSize && oldAverageSize < minSSTableSize))//關鍵是這裡,用來判斷大小是否相似。
{
// remove and re-add under new new average size
buckets.remove(oldAverageSize);
longtotalSize = bucket.size() * oldAverageSize;
longnewAverageSize = (totalSize + size) / (bucket.size() + 1);
bucket.add(pair.left);
buckets.put(newAverageSize,bucket);
continueouter;
}
}
// no similar bucket found; put it in a newone
ArrayList<T> bucket = new ArrayList<T>();
bucket.add(pair.left);
buckets.put(size, bucket);
}
return newArrayList<List<T>>(buckets.values());
}
上面代碼最重要的部分是找出相似大小的sstable清單。找到後封裝成資料結構List<List<SSTableReader>>buckets.對每個list最多建構一個tasks。而且隻有在所有現有的tasks全部執行完才會重新建構tasks。
2.2LeveledCompactionStrategy
這種政策不同前面,沒有Threhold。在該政策中所有的sstable都有相似的大小,而且最重要的是在同一level各個sstable的key是不相交的(除了L0)。先說一般情況(L0除外),當把Li的sstable compact到Li+1時,Li的選擇是從高層到低層的,詳細下面叙述。Li層sstable的選取根據lastCompactedKeys[i]的(sstable.first.compareTo(lastCompactedKeys[level]) > 0),Li+1層參與compaction的sstable的選取:
return overlapping(sstable,generations[(level + 1)]);可見在Li上僅有一個sstable參與,Li+1上所有與它key重疊的sstable都要參與。對于L0到L1的compaction,與上面不同的是所有L0 sstable key重疊的也要參與compaction(當然這個并不是十分必要)。
public synchronizedCollection<SSTableReader> getCompactionCandidates()
{
// LevelDB gives each level a score of howmuch data it contains vs its ideal amount, and
// compacts the level with the highestscore. But this falls apart spectacularly once you
// get behind. Consider this set of levels:
// L0: 988 [ideal: 4]
// L1: 117 [ideal: 10]
// L2: 12 [ideal: 100]
//
// The problem is that L0 has a much higherscore (almost 250) than L1 (11), so what we'll
// do is compact a batch ofMAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
// result (say, 120 sstables) in L1.Then we'll compact the next batch of MAX_COMPACTING_L0,
// and so forth. So we spend most of our i/o rewriting the L1data with each batch.
//
// If we could just do *all* L0 a singletime with L1, that would be ideal. Butwe can't
// -- see the javadoc forMAX_COMPACTING_L0.
//
// LevelDB's way around this is to simplyblock writes if L0 compaction falls behind.
// We don't have that luxury.
//
// So instead, we force compacting higherlevels first. This may not minimize thenumber
// of reads done as quickly in the shortterm, but it minimizes the i/o needed to compact
// optimially which gives us a longterm win.
for(inti = generations.length - 1; i >= 0; i--)
{
List<SSTableReader>sstables = generations[i];
if(sstables.isEmpty())
continue; // mostly this just avoids polluting the debug log with zero scores
doublescore = (double)SSTableReader.getTotalBytes(sstables)/ (double)maxBytesForLevel(i);
logger.debug("Compaction score for level {} is {}", i, score);
// L0 gets a special case that if we don'thave anything more important to do,
// we'll go ahead and compact even just onesstable
if(score > 1.001 || i == 0)
{
Collection<SSTableReader>candidates = getCandidatesFor(i);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}",i, toString(candidates));
return candidates;
}
}
returnCollections.emptyList();
}
上面這段代碼是選擇從那層開始compaction,代碼講解很清楚,不叙述了。
該政策每次隻會建構一個task。
3 tocompact sstable的合并
這節的内容由于用到了大量的模闆類以及繼承的知識,使得合并的過程十分難懂。最基本要知道大體的過程。
/**
* For internal use and testing only. The rest of the system should go through the submit* methods,
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;
Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;
if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (compactionFileLocation == null && partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}
if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact; aborting compaction");
return 0;
}
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);//
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(compactionType, toCompact, controller)
: new CompactionIterable(compactionType, toCompact, controller);//先看單線程的
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescanner out=AbstractCompactedRow>
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();
Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact, compactionType);
return 0;
}
SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;
long position = writer.append(row);
totalkeysWritten++;
if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}
cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}
long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;
StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");
double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}
這段代碼來自CompactionTask,task正如第二節所叙述的,主要是儲存了tocompact的sstables。這段代碼有三個比較重要的點,1、選擇directory;2、整個合并過程;3、合并後的寫入過程。
directory選擇data目錄中剩餘空間最多的directory,比較簡單,不單獨叙述。
3.1合并過程
真正的合并過程是以下幾段代碼:
AbstractCompactionIterable ci =DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(compactionType,toCompact, controller)
: new CompactionIterable (compactionType,toCompact, controller);//先看單線程的
//主要的作用是
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescannerout=AbstractCompactedRow>
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,Predicates.notNull());//真正的合并
這幾段代碼比較難了解,雖然隻有三行,缺是整個的合并過程。
需要綜合CompactionIterable.java、Reducer、MergeIterator、ManyToOne、SstableReader、SstableScanner幾個類來了解。而且要知道父類子類之間的關系。
public static <T>Iterator<T> filter(final Iterator<T> unfiltered, finalPredicate<? super T> predicate) { checkNotNull(unfiltered);checkNotNull(predicate);
return newAbstractIterator<T>() {
@Override protected T computeNext() {
while (unfiltered.hasNext()) {
T element = unfiltered.next();
if (predicate.apply(element)) {
return element;
}
}
endOfData();
return null; } };}
要了解這段代碼,首先要知道 com.google.common.collect這個jar包,這個jar包封裝了很多有用的容器。其中AbstractIterator就在其中。iter是ManyToOne,繼承了MergeIterator,後者又繼承了AbstractIterator,通過對源碼的閱讀,unfiltered.hasNext(),會調用computeNext();根據父類子類的關系,顯然執行的是子類的實作。即 ManyToOne.computeNext();這樣一切都易懂了。
大體的過程是把所有的SstableScanner加入一個PriorityQueue裡面(依據首個Row key來排序),然後從queue中取SstableScanner得目前的row進行合并(CF.add()),直到key不相同為止。然後把取出的SstableScanner再次加入queue(位置已經指向下一個row)。
直到queue為空位置,這是合并就完成了:下面是關鍵的代碼:
protected final Out consume()
{
reducer.onKeyChange();
Candidate<In> candidate = queue.peek();
if (candidate == null)
return endOfData();
do
{
candidate = queue.poll();
candidates.push(candidate);
reducer.reduce(candidate.item);//合并
}
while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);//合并的過程,注意循環的判斷條件。compareTo關鍵是看這裡的comp。key.
return reducer.getReduced();
}
/** Advance and re-enqueue all items we consumed in the last iteration. */// andidate是SstableScanner,它是對sstable的解析,是一個個row。而candidate.advance()顧名思義,就是取出了一個個row、
protected final void advance()
{//加入優先序列,排序自然是根據candidate的comp,queue中的内容是非空的。
Candidate<In> candidate;
while ((candidate = candidates.pollFirst()) != null)
if (candidate.advance())//直到取完了。這個sstable就完成了合并了。是以comp應該是按照排頭的key來進行的。
queue.add(candidate);
}
}
上面代碼中candidate=SstableScanner。candidate.advance()是把SstableScanner的疊代器移到下一個row。
3.2 flush
對于SizeTieredCompactionStrategy,合并後成為一個sstable,寫入時,沒有sstable大小的限制。LeveledCompactionStrategy則有sstable大小的限制,當sstable大小超過限制時,會建立一個新的sstable,接着寫入。
4 總結
幾個比較關鍵的類:
CompactionManager是整個compaction的管理者,它把其他的幾個類聯系在一起,共同完成compaction的過程,其中更是包括對compaction個個task的管理。
CompactionStrategy,應該說是有兩個,它是Compaction線程實體,并提供了建立task的方法。
CompactionTask也是又兩種,它承載了需要compaction的sstable。而且提供合并寫入過程的方法。是compaction的實際執行。
CompactedRow兩種不一樣,可以了解為存儲合并後Row的資料結構。
要想充分了解Compaction過程,還需要對SStable SStableReader SStableScanner SStableWriter的了解。