DB打開流程
使用leveldb的第一步是調用open接口,打開或者重新開機一個db,得到一個DB*,後續對db的操作通過DB*進行
static Status Open(const Options& options, const std::string& name,
DB** dbptr);
整個open的過程分為以下幾步:
1、如果是重新開機db,在恢複資料庫
2、如果是打開新的sb,則建立WAL和memtable
3、儲存目前Manifest檔案
4、删除多餘檔案,嘗試進行compaction
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
// 1、如果是重新開機db,在恢複資料庫
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
// 2、如果是新打開的db, 建立WAL和memtable
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
// 3、儲存目前Manifest檔案
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
// 4、删除多餘檔案,嘗試進行compaction
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}
恢複DB
1、首先檢查是否是新打開的DB,如果是,則從調用NewDB建立新的DB
2、如果是重新開機DB,那麼先根據Manifest檔案,讀取Manifest中每次版本的更改,恢複出目前的版本
3、從WAL中恢複memtable
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
...
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
Log(options_.info_log, "Creating DB %s since it was missing.",
dbname_.c_str());
// 1、原DB不存在,則是新打開的DB
s = NewDB();
if (!s.ok()) {
return s;
}
}
...
// 2、如果是重新開機DB,那麼先根據Manifest檔案,讀取Manifest中每次版本的更改,恢複出目前的版本
s = versions_->Recover(save_manifest);
...
// 3.從WAL中恢複memtable
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
return Status::OK();
}
寫流程
KV寫入有3個接口:
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Note: consider setting options.sync = true.
// 單個KV寫入
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) = 0;
// Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database.
// Note: consider setting options.sync = true.
// 删除K
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
// 批量寫入
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
這三個接口底層都是調用批量寫入的接口,能夠保證批量寫入時原子性的。
寫流程如下:
1、合并批量請求,等待請求開始或者,或者被合并到其他請求中執行完成
2、輪到自己處理時,檢查是否可寫,寫前預處理
3、合并寫請求
4、寫WAL
5、寫memtable
6、喚醒合并處理的寫請求,最後喚醒還沒有開始處理的第一個請求
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
// 1、合并批量請求,等待請求開始或者,或者被合并到其他請求中執行完成
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
// 2、輪到自己處理時,檢查是否可寫,寫前預處理
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// 3、合并寫請求
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
// 4、寫WAL
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// 5、寫memtable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
// 6、喚醒合并處理的寫請求,最後喚醒還沒有開始處理的第一個請求
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
其中,寫前預處理的步驟如下:
1、判斷是否需要停寫
2、如果memtable寫滿,則轉化為immemtable,等待背景compaction
3、建立新的WAL
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
// level0有太多的檔案,暫停1ms再去寫
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
// memtale空間夠
break;
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
// imemtable還沒來得及compaction,等待其compaction完成
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
// level0檔案太多,等待背景compaction
background_work_finished_signal_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
// 建立新的WAL
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
// memtable轉變為imemtable,同時建立新的memtable
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
讀流程
與讀有關的接口:
// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
// If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true.
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
先從簡單的Get KV,Get接口可以讀指定版本的value,如果不指定,則讀最新版本的value,Get分為三步:
1、從memtable中查找
2、從imemtable中查找
3、從sstable中查找
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
// 指定版本号
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
// 給各個元件ref+1,防止在查找過程中因為compaction被釋放掉
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
// 1、從memtable中查找
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// 2、從imemtable中查找
// Done
} else {
// 3、從sstable中查找
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
// 在查找過程中,左sstable的資訊統計,更新資訊之後,可能需要做一次compaction
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}
主要看下第二步:從sstable中查找KV,周遊每層level,對于level0,因為sstable有重疊,需要周遊每個sstable,對于level>0層,由于sstable沒有重疊且有序排列,通過二分查找找到包含key的sstable,然後在查找該sstable:
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();
// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}
快照
擷取和釋放快照接口如下:
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
// snapshot is no longer needed.
virtual const Snapshot* GetSnapshot() = 0;
// Release a previously acquired snapshot. The caller must not
// use "snapshot" after this call.
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
快照在内部用連結清單管理,擷取和釋放快照就是在這個連結清單中添加或者删除節點,本質上是取快照時刻的最大seq,這樣,通過這個快照查詢資料時,隻要KV對中的seq<=這個快照的seq,就是這個快照的KV對
同時還保證在快照釋放前,即使sstable該KV對已經可以merge到更高seq的KV對中,但是由于快照的存在,不會被merge,防止該快照讀不到。
周遊DB
周遊接口,傳回一個Iterator,通過這個疊代器周遊DB
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
//
// Caller should delete the iterator when it is no longer needed.
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
疊代整個DB,需要疊代memtable、imemtable和所有的sstable:
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
}
這裡的兩個疊代器NewInternalIterator用來疊代内部的memtable、imemtable和所有的sstable,NewDBIterator是對NewInternalIterator的封裝。
NewInternalIterator會把内部的memtable、imemtable和所有的sstable的疊代器都收集起來,最後形成NewMergingIterator來對不同元件之間的key進行merge排序,對于sstable來說,level0層的每個sstable都需要建立疊代器,對于key不重疊的其他level的sstable,每一層建立一個NewConcatenatingIterator即可。
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot,
uint32_t* seed) {
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != nullptr) {
list.push_back(imm_->NewIterator());
imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
*seed = ++seed_;
mutex_.Unlock();
return internal_iter;
}
NewConcatenatingIterator是個兩層疊代器,第一層疊代每層sstable,第二層疊代sstable的KV:
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator,
vset_->table_cache_, options);
}
MergingIterator是多個疊代器的組合,每次疊代時,會便面内部所有的疊代器,然後選擇最小的key,與歸并排序的過程很類似:
void Seek(const Slice& target) override {
for (int i = 0; i < n_; i++) {
// 每個疊代器都Seek
children_[i].Seek(target);
}
// 找到最小值
FindSmallest();
direction_ = kForward;
}
void Next() override {
assert(Valid());
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() &&
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
}
}
direction_ = kForward;
}
current_->Next();
FindSmallest();
}
NewDBIterator是對MergingIterator的封裝,但是要處理被删除的key,這些key存在于memtable和sstable中,這些key在scan時不應該被顯示,是以當MergingIterator周遊到這種key時,NewDBIterator解析出key的類型,并且對key相等且swq小于該key的key進行跳過。