推薦結合leveldb-handbook 來看源碼
compaction分為兩類:
- minor compaction
- major compaction
minor compaction是将一個記憶體資料庫中的所有資料持久化到一個磁盤檔案中
有三個條件會觸發major compaction:
- 當0層檔案數超過預定的上限(預設為4個);
- 當level i層檔案的總大小超過(10 ^ i) MB;
- 當某個檔案無效讀取的次數過多
leveldb 發起背景壓縮的函數是:MaybeScheduleCompaction
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// 已經排程壓縮任務了
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB 正在被銷毀,不執行壓縮任務
} else if (!bg_error_.ok()) {
// 已經觸發了錯誤,不做修改
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// immutable 為空,不是手動發起壓縮,目前版本不用壓縮
} else {
background_compaction_scheduled_ = true; // 設定目前進入壓縮狀态
env_->Schedule(&DBImpl::BGWork, this); // 排程背景任務 BGWork
}
}
排程的背景任務
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_); // 确實有要排程的壓縮任務
if (shutting_down_.load(std::memory_order_acquire)) {
// db關閉時不進行背景任務.
} else if (!bg_error_.ok()) {
// 背景錯誤時不進行背景任務.
} else {
BackgroundCompaction();
}
background_compaction_scheduled_ = false; // 已經壓縮完
// 之前的壓縮可能在某一層産生太多檔案,是以要再排程一次壓縮以調整(如果需要)
MaybeScheduleCompaction();
background_work_finished_signal_.SignalAll();
}
背景壓縮:BackgroundCompaction
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != nullptr) {
CompactMemTable(); //将immutable壓縮到第0層
return;
}
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
if (is_manual) { // 手動發起壓縮
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end); // 借助manifest找到level層begin到end範圍涉及的檔案
m->done = (c == nullptr);
if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;// 找到目前層要壓縮的最後一個檔案的上界
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
c = versions_->PickCompaction(); // 非手動發起壓縮,借助manifest找到一個需要壓縮的層
}
Status status;
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// 非手動壓縮,是簡單壓縮,将檔案移到下一層
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0); // 目前層第0個檔案
c->edit()->RemoveFile(c->level(), f->number); // 記錄在目前層删除了檔案
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest); // 記錄在下一層添加了檔案
status = versions_->LogAndApply(c->edit(), &mutex_); // 記錄到log中
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); // 壓縮
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
}
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.load(std::memory_order_acquire)) {
// db關閉時忽略壓縮錯誤
} else {
Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
}
if (is_manual) { // 人工壓縮
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// 隻對請求的範圍中一部分進行壓縮,将 *m 更新為剩餘要壓縮的範圍.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = nullptr;
}
}
對指定的層進行壓縮:DoCompactionWork
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
// 列印目前層檔案數,層編号,下一層檔案數,層編号
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(compact->outfile == nullptr);
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}
Iterator* input = versions_->MakeInputIterator(compact->compaction);
// 壓縮過程中釋放鎖
mutex_.Unlock();
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// 優先對 immutable 進行壓縮
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
CompactMemTable(); // 先将immutable 壓縮到manifest 第0層
// Wake up MakeRoomForWrite() if necessary.
background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = input->key();
// 應該在key之前停止壓縮,把之前讀取的内容壓縮到檔案
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// 這個key是第一次出現
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
if (last_sequence_for_key <= compact->smallest_snapshot) {
// 這個key 被後邊寫入的更新的覆寫了,可以丢棄
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // 在更高層沒有這個key
// 對于目前key而言:
// (1) 更高層沒有這個 key(否則删了這個key的删除标記就不知道被删除了)
// (2) 如果在低層有這key,那序号會更大(删除這個删除标記,還可以被更新)
// (3) 在目前壓縮的層之中的這個key若有其他位置的出現但序号更小的,會在接下來的疊代中被删除 (以上規則 A) ).
// 是以這裡的删除标記可以忽略了.
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
if (!drop) {
// Open output file if necessary
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
// 目前檔案還沒存放任何key,就先寫入目前key
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
// 若檔案大小超出門檻值,則關閉
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
input->Next();
}
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = nullptr;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) { // 周遊本次壓縮涉及到的兩層
for (int i = 0; i < compact->compaction->num_input_files(which); i++) { // 周遊壓縮涉及的每個檔案
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact); // 将新的壓縮檔案放到level + 1 層
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
建立檔案對象并寫入壓縮内容
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
assert(compact != nullptr);
assert(compact->builder == nullptr);
uint64_t file_number;
{
mutex_.Lock();
file_number = versions_->NewFileNumber();
pending_outputs_.insert(file_number);
CompactionState::Output out;
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
mutex_.Unlock();
}
// Make the output file
std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile);
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
return s;
}
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Iterator* input) {
assert(compact != nullptr);
assert(compact->outfile != nullptr);
assert(compact->builder != nullptr);
const uint64_t output_number = compact->current_output()->number;
assert(output_number != 0);
// Check for iterator errors
Status s = input->status();
const uint64_t current_entries = compact->builder->NumEntries();
if (s.ok()) {
s = compact->builder->Finish();
} else {
compact->builder->Abandon();
}
const uint64_t current_bytes = compact->builder->FileSize();
compact->current_output()->file_size = current_bytes;
compact->total_bytes += current_bytes;
delete compact->builder;
compact->builder = nullptr;
// Finish and check for file errors
if (s.ok()) {
s = compact->outfile->Sync();
}
if (s.ok()) {
s = compact->outfile->Close();
}
delete compact->outfile;
compact->outfile = nullptr;
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
Iterator* iter =
table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
s = iter->status();
delete iter;
if (s.ok()) {
Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
(unsigned long long)output_number, compact->compaction->level(),
(unsigned long long)current_entries,
(unsigned long long)current_bytes);
}
}
return s;
}
在下一層産生新的壓縮檔案
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}