DB打开流程
使用leveldb的第一步是调用open接口,打开或者重启一个db,得到一个DB*,后续对db的操作通过DB*进行
static Status Open(const Options& options, const std::string& name,
DB** dbptr);
整个open的过程分为以下几步:
1、如果是重启db,在恢复数据库
2、如果是打开新的sb,则创建WAL和memtable
3、保存当前Manifest文件
4、删除多余文件,尝试进行compaction
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
// 1、如果是重启db,在恢复数据库
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
// 2、如果是新打开的db, 创建WAL和memtable
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
// 3、保存当前Manifest文件
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
// 4、删除多余文件,尝试进行compaction
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}
恢复DB
1、首先检查是否是新打开的DB,如果是,则从调用NewDB创建新的DB
2、如果是重启DB,那么先根据Manifest文件,读取Manifest中每次版本的更改,恢复出当前的版本
3、从WAL中恢复memtable
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
...
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
Log(options_.info_log, "Creating DB %s since it was missing.",
dbname_.c_str());
// 1、原DB不存在,则是新打开的DB
s = NewDB();
if (!s.ok()) {
return s;
}
}
...
// 2、如果是重启DB,那么先根据Manifest文件,读取Manifest中每次版本的更改,恢复出当前的版本
s = versions_->Recover(save_manifest);
...
// 3.从WAL中恢复memtable
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
return Status::OK();
}
写流程
KV写入有3个接口:
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Note: consider setting options.sync = true.
// 单个KV写入
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) = 0;
// Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database.
// Note: consider setting options.sync = true.
// 删除K
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
// 批量写入
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
这三个接口底层都是调用批量写入的接口,能够保证批量写入时原子性的。
写流程如下:
1、合并批量请求,等待请求开始或者,或者被合并到其他请求中执行完成
2、轮到自己处理时,检查是否可写,写前预处理
3、合并写请求
4、写WAL
5、写memtable
6、唤醒合并处理的写请求,最后唤醒还没有开始处理的第一个请求
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
// 1、合并批量请求,等待请求开始或者,或者被合并到其他请求中执行完成
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
// 2、轮到自己处理时,检查是否可写,写前预处理
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// 3、合并写请求
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
// 4、写WAL
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// 5、写memtable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
// 6、唤醒合并处理的写请求,最后唤醒还没有开始处理的第一个请求
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
其中,写前预处理的步骤如下:
1、判断是否需要停写
2、如果memtable写满,则转化为immemtable,等待后台compaction
3、创建新的WAL
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
// level0有太多的文件,暂停1ms再去写
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
// memtale空间够
break;
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
// imemtable还没来得及compaction,等待其compaction完成
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
// level0文件太多,等待后台compaction
background_work_finished_signal_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
// 创建新的WAL
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
// memtable转变为imemtable,同时创建新的memtable
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
读流程
与读有关的接口:
// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
// If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true.
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
先从简单的Get KV,Get接口可以读指定版本的value,如果不指定,则读最新版本的value,Get分为三步:
1、从memtable中查找
2、从imemtable中查找
3、从sstable中查找
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
// 指定版本号
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
// 给各个组件ref+1,防止在查找过程中因为compaction被释放掉
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
// 1、从memtable中查找
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// 2、从imemtable中查找
// Done
} else {
// 3、从sstable中查找
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
// 在查找过程中,左sstable的信息统计,更新信息之后,可能需要做一次compaction
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}
主要看下第二步:从sstable中查找KV,遍历每层level,对于level0,因为sstable有重叠,需要遍历每个sstable,对于level>0层,由于sstable没有重叠且有序排列,通过二分查找找到包含key的sstable,然后在查找该sstable:
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();
// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}
快照
获取和释放快照接口如下:
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
// snapshot is no longer needed.
virtual const Snapshot* GetSnapshot() = 0;
// Release a previously acquired snapshot. The caller must not
// use "snapshot" after this call.
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
快照在内部用链表管理,获取和释放快照就是在这个链表中添加或者删除节点,本质上是取快照时刻的最大seq,这样,通过这个快照查询数据时,只要KV对中的seq<=这个快照的seq,就是这个快照的KV对
同时还保证在快照释放前,即使sstable该KV对已经可以merge到更高seq的KV对中,但是由于快照的存在,不会被merge,防止该快照读不到。
遍历DB
遍历接口,返回一个Iterator,通过这个迭代器遍历DB
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
//
// Caller should delete the iterator when it is no longer needed.
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
迭代整个DB,需要迭代memtable、imemtable和所有的sstable:
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
}
这里的两个迭代器NewInternalIterator用来迭代内部的memtable、imemtable和所有的sstable,NewDBIterator是对NewInternalIterator的封装。
NewInternalIterator会把内部的memtable、imemtable和所有的sstable的迭代器都收集起来,最后形成NewMergingIterator来对不同组件之间的key进行merge排序,对于sstable来说,level0层的每个sstable都需要创建迭代器,对于key不重叠的其他level的sstable,每一层创建一个NewConcatenatingIterator即可。
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot,
uint32_t* seed) {
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != nullptr) {
list.push_back(imm_->NewIterator());
imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
*seed = ++seed_;
mutex_.Unlock();
return internal_iter;
}
NewConcatenatingIterator是个两层迭代器,第一层迭代每层sstable,第二层迭代sstable的KV:
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator,
vset_->table_cache_, options);
}
MergingIterator是多个迭代器的组合,每次迭代时,会便面内部所有的迭代器,然后选择最小的key,与归并排序的过程很类似:
void Seek(const Slice& target) override {
for (int i = 0; i < n_; i++) {
// 每个迭代器都Seek
children_[i].Seek(target);
}
// 找到最小值
FindSmallest();
direction_ = kForward;
}
void Next() override {
assert(Valid());
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() &&
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
}
}
direction_ = kForward;
}
current_->Next();
FindSmallest();
}
NewDBIterator是对MergingIterator的封装,但是要处理被删除的key,这些key存在于memtable和sstable中,这些key在scan时不应该被显示,所以当MergingIterator遍历到这种key时,NewDBIterator解析出key的类型,并且对key相等且swq小于该key的key进行跳过。