leveldb的資料存儲采用LSM的思想,将随機寫入變為順序寫入,記錄寫入記錄檔,一旦日志被以追加寫的形式寫入硬碟,就傳回寫入成功,由背景線程将寫入日志作用于原有的磁盤檔案生成新的磁盤資料.Leveldb在記憶體中維護一個資料結構memtable,采用skiplist來實作,儲存目前寫入的資料,當資料達到一定規模後變為不可寫的記憶體表immutable table.新的寫入操作會寫入新的memtable,而immutable table會被背景線程寫入到資料檔案.Leveldb的資料檔案是按層存放的,預設配置的最高層級是7,即level0,level1,…,level7.記憶體中的immutable總是寫入level0,除level0之外的各個層leveli的所有資料檔案的key範圍都是互相不相交的.當滿足一定條件時,leveli的資料檔案會和leveli+1的資料檔案進行merge,産生新的leveli+1層級的檔案,這個磁盤檔案的merge過程和immutable的dump過程叫做Compaction,在leveldb中是由一個單獨的背景線程來完成的.
進行Compaction操作的條件如下:
1.産生了新的immutable table需要寫入資料檔案
2.某個level的資料規模過大
3.某個檔案被無效查詢的次數過多(在檔案i中查詢key,沒有找到key,這次查詢稱為檔案i的無效查詢)
4.手動compaction
滿足以上條件會啟動Compaction過程,接下來分析詳細的Compaction過程.
Leveldb進行Compaction的入口函數是db檔案夾下db_impl.cc檔案中的DBImpl::MaybeScheduleCompaction,該函數在每次leveldb進行讀寫操作時都有可能被調用.源碼内容如下:
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this); //建立背景任務并進行排程
}
}
首先調用db檔案夾下version_set.h中的NeedsCompaction()判斷是否需要啟動Compact任務.源碼内容如下:
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}
version_set.cc中compaction_score_ 的計算如下:
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
注意,這裡同時預計算了進行compaction的最佳level.
确認需要啟動compaction之後,調用util檔案夾下env_posix.cc檔案中的PosixEnv::Schedule函數啟動Compact過程.
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Start background thread if necessary
if (!started_bgthread_) {
started_bgthread_ = true;
PthreadCall(
"create thread",
pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
}
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
如果沒有背景線程,則建立背景線程,否則建立一個背景執行任務BGItem壓入背景線程任務隊列,然後調用PosixEnv::BGThreadWrapper喚醒背景線程:
static void* BGThreadWrapper(void* arg) {
reinterpret_cast<PosixEnv*>(arg)->BGThread();
return NULL;
}
BGThreadWrapper調用PosixEnv::BGThread,不斷地從背景任務隊列中拿到任務,然後執行任務
void PosixEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
回到DBImpl::MaybeScheduleCompaction,友善了解起見這裡再重複一遍源碼
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this); //建立背景任務并進行排程
}
}
之前分析了env_->Schedule進行的排程過程,現在來分析實際進行背景任務的DBImpl::BGWork.DBImpl::BGWork在db檔案夾下db_impl.cc檔案中.
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
DBImpl::BGWork調用DBImpl::BackgroundCall(),合并完成後可能導緻有的level的檔案數過多,是以會再次調用MaybeScheduleCompaction()判斷是否需要繼續進行合并.
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackgroundCompaction();
}
bg_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
bg_cv_.SignalAll();
}
DBImpl::BackgroundCall()調用 BackgroundCompaction(),在BackgroundCompaction()中分别完成三種不同的Compaction:對Memtable進行合并、 trivial Compaction(直接将檔案移動到下一層)以及一般的合并,調用DoCompactionWork()實作.
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
CompactMemTable();//1、對Memtable進行合并
return;
}
Compaction* c;
bool is_manual = (manual_compaction_ != NULL);//manual_compaction預設為NULL,則is_manual預設為false
InternalKey manual_end;
if (is_manual) { //取得手動compaction對象
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else { //取得自動compaction對象
c = versions_->PickCompaction();
}
Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {//2、IsTrivialMove 傳回 True,trivial Compaction,則直接将檔案移入 level + 1 層即可
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else { //3、一般的合并
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); //進行compaction
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs(); // input的檔案引用計數減少1
DeleteObsoleteFiles(); //删除無用檔案
}
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
}
if (is_manual) {
ManualCompaction* m = manual_compaction_; //标記手動compaction任務完成
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
}
}
首行mutex_.AssertHeld(),Mutex的AssertHeld函數實作預設為空,在很多函數的實作内有調用,其作用如下:
As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn’t. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don’t have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.
Memtable的合并
Compaction首先檢查imm_,及時将已寫滿的memtable寫入磁盤sstable檔案,對Memtable的合并,調用DBImpl::CompactMemTable()完成:
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != NULL);//imm_不能為空
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base);//将Memtable轉化為.sst檔案,寫入level0 sst table,并寫入到edit中
base->Unref();
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);//應用edit中記錄的變化,來生成新的版本
}
if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}
其中CompactMemTable()主要調用了兩個函數:WriteLevel0Table()和versions_->LogAndApply()
CompactMemTable()首先調用WriteLevel0Table(),源碼内容如下:
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();
FileMetaData meta;
meta.number = versions_->NewFileNumber();//擷取新生成的.sst檔案的編号
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();//用于周遊Memtable中的資料
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//建立.sst檔案,并将其相關資訊記錄在meta中
mutex_.Lock();
}
delete iter; //iter用完之後一定要删除
pending_outputs_.erase(meta.number);
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);//為合并的輸出檔案選擇合适的level
}
edit->AddFile(level, meta.number, meta.file_size,meta.smallest, meta.largest);//将生成的.sst檔案加入到該level
}
return s;
}
WriteLevel0Table()首先調用BuildTable()将Immutable Memtable中所有的資料寫入到一個.sst檔案中,并将.sst檔案的資訊(檔案編号,Key值範圍,檔案大小)記錄到變量meta中.由于Memtable是基于Skiplist的,是一個有序表,是以在寫入.sst檔案時,Key值也是從小到大來排列的.可以發現,将Memtable中的資料轉換為SSTable時,是将所有記錄都寫入SSTable的,要删除的記錄也一樣.删除操作會在更高level的Compaction中完成.是以level 0中可能會存在Key值相同的記錄.
Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
TableCache* table_cache,
Iterator* iter,
FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number);//獲得建立表名字
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file); //建立新的表檔案,後續寫入資料
if (!s.ok()) {
return s;
}
TableBuilder* builder = new TableBuilder(options, file); //建立TableBuilder
meta->smallest.DecodeFrom(iter->key());
for (; iter->Valid(); iter->Next()) { //将key/value對加入builder
Slice key = iter->key();
meta->largest.DecodeFrom(key);
builder->Add(key, iter->value());
}
// Finish and check for builder errors
s = builder->Finish(); //建構indexhandler,metahandler,寫入檔案
if (s.ok()) {
meta->file_size = builder->FileSize();
assert(meta->file_size > 0);
}
delete builder;
// Finish and check for file errors
if (s.ok()) {
s = file->Sync(); //寫入檔案
}
if (s.ok()) {
s = file->Close();
}
delete file;
file = NULL;
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(),
meta->number,
meta->file_size); //将表結構加入表緩存
s = it->status();
delete it;
}
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (s.ok() && meta->file_size > 0) {
// Keep it
} else {
env->DeleteFile(fname);
}
return s;
}
該函數利用iter向TableBuilder中加入key/value對,然後寫入檔案并同步,将新生成的Table結構加入tablecache以備後用.
table_builder檔案在table檔案夾下,其中TableBuilder::Add函數流程如下:
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) {//新的block開始
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
//計算filter
if (r->filter_block != NULL) {
r->filter_block->AddKey(key);
}
//加入blockbuilder
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
// block大于配置的尺寸(預設為4k)則結束該block,輸出後開啟新的Block。
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
将Block結構寫入檔案的TableBuilder::WriteBlock函數流程如下:
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish(); //取得block格式化資料
Slice block_contents;
//擷取是否壓縮配置選項
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
//進行壓縮後,然後寫入檔案,blockdata+type+crc32
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
而TableBuilder::Finish的函數定義如下:
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();//将block資料寫入,可能不是滿的block
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != NULL) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != NULL) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
以上代碼中調用的flush源碼内容如下:
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
if (r->filter_block != NULL) {
r->filter_block->StartBlock(r->offset);
}
}
然後WriteLevel0Table()調用PickLevelForMemTableOutput()為Memtable合并的輸出檔案選擇合适的level,并調用edit->AddFile()将生成的.sst檔案加入到該level中.
WriteLevel0Table()結束後,CompactMemTable()調用db檔案夾下version_set.cc檔案中的versions_->LogAndApply()基于目前版本和更改edit來得到一個新版本.之後會對versions_->LogAndApply()進行分析.
Trivial Compaction
由之前的分析可知,is_manual預設為false,會調用PickCompaction()來選出要進行合并的level和相應的輸入檔案.當c->IsTrivialMove()滿足時,則直接将檔案移動到下一level.
c = versions_->PickCompaction();
Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number); //将檔案從該層删除
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, //将該檔案加入到下一level
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_); //應用更改,建立新的Version
}
首先調用db檔案夾下version_set.cc檔案中的VersionSet::PickCompaction()為接下來的Compaction操作準備輸入資料,由之前對Compaction的資料結構分析可知,Compaction操作有兩種觸發方式:某一level的檔案數太多和某一檔案的查找次數超過允許值,在進行合并時,将優先考慮檔案數過多的情況.
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
const bool size_compaction = (current_->compaction_score_ >= 1);//檔案數過多
const bool seek_compaction = (current_->file_to_compact_ != NULL);//某一檔案的查找次數太多
if (size_compaction) {//檔案數太多優先考慮
level = current_->compaction_level_; //要進行Compaction的level
c = new Compaction(level);
//每一層有一個compact_pointer,用于記錄compaction key,這樣可以進行循環compaction
for (size_t i = 0; i < current_->files_[level].size(); i++) { //從待合并的level中選擇合适的檔案完成合并操作
FileMetaData* f = current_->files_[level][i]; //level層中的第i個檔案
if (compact_pointer_[level].empty() || //compact_pointer_中記錄的是下次合并的起始Key值,為空時都可以進行合并
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { //或者f的最大Key值大于起始值
c->inputs_[0].push_back(f);//則該檔案可以參與合并,将其加入到level輸入檔案中
break;
}
}
if (c->inputs_[0].empty()) { //若level輸入為空,則将level的第一個檔案加入到輸入中
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {//然後考慮查找次數過多的情況
level = current_->file_to_compact_level_;
c = new Compaction(level);
c->inputs_[0].push_back(current_->file_to_compact_);//将待合并的檔案作為level層的輸入
} else {
return NULL;
}
c->input_version_ = current_;
c->input_version_->Ref();
//level 0中的Key值是可以重複的,是以Key值範圍可能互相覆寫,把所有重疊都找出來,一起做compaction
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);//待合并的level層的檔案的Key值範圍
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
SetupOtherInputs(c);//擷取待合并的level+1層的輸入
return c;
}
然後判斷是否為trivial Compaction,當為trivial Compaction時,隻需要簡單的将level層的檔案移動到level +1 層即可
bool Compaction::IsTrivialMove() const {
return (num_input_files(0) == 1 && //level層隻有1個檔案
num_input_files(1) == 0 && //level+1層沒有檔案
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);//level+2層檔案總大小不超過最大覆寫範圍,否則會導緻後面的merge需要很大的開銷
}
最終完成完成Compaction操作
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
一般的合并
一般的合并調用DBImpl::DoCompactionWork()完成,compact是調用VersionSet::PickCompacttion()得到的,與之前的trivial Compaction相同.不同level之間,可能存在Key值相同的記錄,但是記錄的seq不同.由之前的分析可知,最新的資料存放在較低的level中,其對應的seq也一定比level+1中的記錄的seq要大,是以當出現相同Key值的記錄時,隻需要記錄第一條記錄,後面的都可以丢棄.level 0中也可能存在Key值相同的資料,其後面的seq也不同.資料越新,其對應的seq越大,且記錄在level 0中的記錄是按照user_key遞增,seq遞減的方式存儲的,則相同user_key對應的記錄是聚集在一起的,且按照seq遞減的方式存放的.在更高層的Compaction時,隻需要處理第一條出現的user_key相同的記錄即可,後面的相同user_key的記錄都可以丢棄.是以合并後的level +1層的檔案中不會存在Key值相同的記錄.删除記錄的操作也會在此時完成,删除資料的記錄會被丢棄,而不會被寫入到更高level的檔案中.
Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->number_;
}
mutex_.Unlock();
//生成iterator:周遊要compaction的資料
Iterator* input = versions_->MakeInputIterator(compact->compaction);//用于周遊待合并的每一個檔案
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
if (has_imm_.NoBarrier_Load() != NULL) { //immutable memtable的優先級最高
mutex_.Lock();
if (imm_ != NULL) { //當imm_非空時,合并Memtable
CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
}
Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) && //是否需要停止Compaction,中途輸出compaction的結果,避免compaction結果和level N+2 files有過多的重疊
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key || //擷取目前的user_key和sequence
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) != 0) { //可能存在Key值相同但seq不同的記錄
// 此時是這個Key第一次出現
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;//則将其seq設為最大值,表示第一次出現
}
if (last_sequence_for_key <= compact->smallest_snapshot) {//表示key已經出現過,否則seq應為KMaxSequenceNumber
drop = true; // (A) //之前已經存在Key值相同的記錄,丢棄
} else if (ikey.type == kTypeDeletion && //要删除該記錄
ikey.sequence <= compact->smallest_snapshot && //記錄的序号比資料庫之前的最小序号還小
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { //高的level中沒有資料
drop = true; //此時要丢棄該記錄
}
last_sequence_for_key = ikey.sequence;//上次出現的記錄對應的sequence,用于判斷後面出現相同Key值的情況
}
if (!drop) { //如果不需要丢棄該記錄
if (compact->builder == NULL) {
status = OpenCompactionOutputFile(compact);//若需要,則建立一個.sst檔案,用于存放合并後的資料
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());//将記錄寫入.sst檔案
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) { //當.sst檔案超過最大值時
status = FinishCompactionOutputFile(compact, input);//完成Compaction輸出檔案
}
}
input->Next(); //處理下一個檔案
}
if (status.ok() && compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = NULL;
//更新compaction的一些統計資料
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact);//完成合并
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
首先将可以留下的記錄寫入到.sst檔案中,并将相關資訊儲存在變量compact中,然後調用InstallCompactionResults()将所做的改動加入到VersionEdit中,再調用LogAndApply()來得到新的版本.
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
LogAndApply()
在上面三種不同的Compaction操作中,最終當對目前版本的更改VersionEdit全部完成後,都會調用VersionSet::LogAndApply()來應用更改,建立新版本.edit中儲存了level和level+1層要删除和增加的檔案.
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
Version* v = new Version(this); //建立一個新Version
{
Builder builder(this, current_);//基于目前Version建立一個builder變量
builder.Apply(edit);//将edit中記錄的要增加、删除的檔案加入到builder類中
builder.SaveTo(v);//然後将edit中的記錄儲存到新建立的Version中,這樣就得到了一個新的版本
}
Finalize(v);//根據各層檔案數來判斷是否還需要進行Compaction
std::string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) { //隻會在第一次調用時進入
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);//建立一個新的Manifest檔案
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);//快照,系統開始時完整記錄資料庫的所有資訊
}
}
{
mu->Unlock();
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);//将資料庫的變化記錄到Manifest檔案中
if (s.ok()) {
s = descriptor_file_->Sync();
}
}
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
if (s.ok()) {
AppendVersion(v); //将新得到的Version插入到所有Version形成的雙向連結清單的尾部
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
}
}
return s;
}
為了重新開機之後能恢複資料庫之前的狀态,就需要将資料庫的曆史變化資訊記錄下來,這些資訊都是記錄在Manifest檔案中的.為了節省空間和時間,leveldb采用的是在系統開始完整的所有資料庫的資訊(WriteSnapShot()),以後則隻記錄資料庫的變化,即VersionEdit中的資訊(descriptor_log_->AddRecord()).恢複時,隻需要根據Manifest中的資訊就可以一步步的恢複到上次的狀态.
VersionSet::LogAndApply首先建立一個新的Version,然後調用builder.Apply(edit)将edit中所有要删除、增加的檔案編号記錄下來,其源碼如下:
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// 更新每一層下次合并的起始Key值
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}
//将所有要删除的檔案加入到levels_[level].deleted_files變量中
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
iter != del.end();++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
levels_[level].deleted_files.insert(number);
}
// 将所有新增加的檔案加入到levels_[level].added_files中
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
f->allowed_seeks = (f->file_size / 16384);
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
然後VersionSet::LogAndApply再調用builder.SaveTo(v)将更改儲存到新的Version中,其源碼如下:
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& base_files = base_->files_[level];//目前Version中原有的各個level的.sst檔案
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added = levels_[level].added_files;//對應level新增加的檔案
v->files_[level].reserve(base_files.size() + added->size());
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();++added_iter) {
// 将原有檔案中編号比added小的加入到新的Version
for (std::vector<FileMetaData*>::const_iterator bpos
= std::upper_bound(base_iter, base_end, *added_iter, cmp);
base_iter != bpos;++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter);//再将新增的檔案依次加入到新的Version
}
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);//再将原有檔案中剩餘的部分加入到新的Version
}
}
}
bpos = std::upper_bound(base_iter,base_end,*added_iter,cmp); // 傳回base_iter到base_end之間,第一個大于*added_iter的iter.假設原有檔案的編号為1、3、4、6、8,新增檔案的編号為2、5、7,則第一次循環時,bpos為3對應的疊代器,是以base_iter隻周遊一個元素,即将編号1加入到新的Version中.總體對新增檔案來說,就是首先加入base中編号比它小的,然後再将其加入,然後再繼續比那裡下一個新增檔案,是以最終得到的檔案編号順序是 1、2、3、4、5、6、7、8,即每一層的.sst檔案都是按照編号從小到大排列的.這樣就得到了新的Version的每一層的所有檔案.
參考文獻:
1.http://blog.csdn.net/u012658346/article/details/45787233
2.http://blog.csdn.net/u012658346/article/details/45788939
3.http://blog.csdn.net/joeyon1985/article/details/47154249
4.http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html
5.http://www.pandademo.com/2016/04/compaction-of-sstable-leveldb-part-1-source-dissect-9/