在Linux上leveldb的安裝和使用中我們寫了這麼一段測試代碼,内容以及輸出結果如下:
#include <iostream>
#include <string>
#include <assert.h>
#include "leveldb/db.h"
using namespace std;
int main(void)
{
leveldb::DB *db;
leveldb::Options options;
options.create_if_missing = true;
// open
leveldb::Status status = leveldb::DB::Open(options,"/tmp/testdb", &db);
assert(status.ok());
string key = "name";
string value = "chenqi";
// write
status = db->Put(leveldb::WriteOptions(), key, value);
assert(status.ok());
// read
status = db->Get(leveldb::ReadOptions(), key, &value);
assert(status.ok());
cout<<value<<endl;
// delete
status = db->Delete(leveldb::WriteOptions(), key);
assert(status.ok());
status = db->Get(leveldb::ReadOptions(),key, &value);
if(!status.ok()) {
cerr<<key<<" "<<status.ToString()<<endl;
} else {
cout<<key<<"==="<<value<<endl;
}
// close
delete db;
return 0;
}
chenqi
name NotFound:
Leveldb的讀資料入口為db檔案夾下db_impl.cc檔案中的DBImpl::Get函數,首先擷取目前的版本号,然後依次在三個資料源memtable,immutable table,和sst表中進行查找,傳回之前再判斷一下是否需要啟動Compact任務.
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
//擷取版本号
snapshot = versions_->LastSequence();
}
//三個查找源,memtable,immutable,sstable
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) { //在memtable中查找
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) {//在imutable中查找
// Done
} else {
s = current->Get(options, lkey, value, &stats); //在磁盤檔案中查找,目前Version
have_stat_update = true;
}
mutex_.Lock();
}
//判斷是否需要排程Compact
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}
首先,按照leveldb代碼的慣例線上鎖,然後生成一個SequenceNumber作為标記, 後續不管線程會不會被切出去, 結果都要相當于在這個時間點瞬間完成,memtable、immemtable以及Version都由于采用了引用計數, 是以要Ref().快照建立完了, 接下來的操作隻會有單純的讀, 可以把鎖暫時釋放.查詢的順序是先找memtable, 再immemtable, 最後是SSTable.這裡調用了db檔案夾下dbformat.cc中的LookupKey::LookupKey, 源碼内容如下:
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
這個類主要的功能是把輸入的key轉換成用于查詢的key. 比如key是"Sherry", 實際在資料庫中的表達可能會是"6Sherry", 6是長度. 這樣比對key是否相等時速度會更快.LookupKey格式 = 長度 + key + SequenceNumber + type,這裡用了兩個tricks:
1.在棧上配置設定一個200長度的數組, 如果運作時發現長度不夠用再從堆上new一個, 可以極大避免記憶體配置設定
2.黑科技函數"EncodeVarint32", 一般key的長度不可能用滿32bit. 大量很短的Key卻要用32bit來描述長度無疑是很浪費的. 這個函數讓小數值用更少的空間, 代價是最糟要多花一位元組(8bit).EncodeVarint32的代碼出現在util檔案夾下的coding.cc檔案裡,源碼内容如下:
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
if (v < (1<<7)) {
*(ptr++) = v;
} else if (v < (1<<14)) {
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}
現在回到DBImpl::Get函數,memtable和immutable table都是通過記憶體中的skiplist進行的,磁盤檔案的查找是通過db檔案夾下version_set.cc中Version::Get來進行的.源碼内容如下:
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
//查找使用者提供的key可能在的檔案,通過各個level的檔案的最小值,最大值來判斷
//按層查找
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// Binary search to find earliest index whose largest key >= ikey.
//查找使用者提供的key可能在的檔案
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
//在table_cache中查找key對應的value
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
if (!s.ok()) {
return s;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
return s;
case kDeleted:
s = Status::NotFound(Slice()); // Use empty error message for speed
return s;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
FindFile源碼内容如下:
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
Version::Get函數首先查找key可能存在的sst表,然後調用table_cache->Get進行查找。即對SSTable的查詢就是對table_cache_的查詢, 這個cache是不可取消的, 解決了什麼問題呢?
LevelDB的資料庫"檔案"是一個檔案夾, 裡面包含大量的檔案. 這是把複雜度甩鍋給作業系統的做法, 但很多系統資源是有限的. 比如, file handle(檔案句柄). 一個程式如果開了1W個file handle會浪費大量資源. 這裡做個LRU cache, 隻有常用的SSTable才會開一個活躍的file handle.
另外就是索引的問題. LSMT是沒有主索引的, 隻有在各個SSTable内有微縮版索引. 是以, 最最優的情況下也需要2次硬碟讀寫. 第一張SSTable就存着key, 先讀微型索引, 然後二分法找到具體位置, 再讀value.
TableCache把熱點SSTable的微型索引預先放在記憶體裡, 這樣隻要1次硬碟讀取就能取到key. 這個優化對于LSMT的資料庫來說尤為重要, 因為很可能會不止查詢一張SSTable. 情況會劣化非常快.
總結, TableCache既承擔管理資源(file handle)的作用, 又加速索引的讀取.
TableCache的實作在db檔案夾下table_cache.cc中,源碼内容如下:
Status TableCache::Get(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
Cache::Handle* handle = NULL;
Status s = FindTable(file_number, file_size, &handle);//查找table,沒有則建立table結構并插入table_cache
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, saver); //在table中查找
cache_->Release(handle);
}
return s;
}
該函數流程很簡單,先從table_cache中擷取Table結構,沒有則建立Table結構加入table_cache,然後調用Table::InternalGet在具體的sst表中查找.需要注意的是,在util檔案夾下檢視cache.cc可以看到
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
這個hash table做了兩遍hash, 先把key分片一遍, 然後再扔給真正的hash table cache(有鎖)去lookup.這麼做的邏輯是可以減少鎖的使用率和提升并發.
進一步檢視table_cache.cc中的Table::InternalGet函數:
Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);//在索引中找,是否存在某個塊可能包含這個key
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
if (filter != NULL &&
handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
//在具體的block中找
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*saver)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}
Table::Get函數先在table的indexblock中查找該key所處的block,然後利用Bloom Filter來過濾,最後在具體的block中查找。在查找過程中使用了Iterator機制。
總體來說,leveldb中Get操作的流程可以用下圖來說明:
參考文獻:
1.http://blog.csdn.net/joeyon1985/article/details/47154249
2.http://masutangu.com/2017/06/leveldb_1/
3.https://zhuanlan.zhihu.com/jimderestaurant?topic=LevelDB