文章目錄
- 前言
- Rocksdb寫流程圖
- WAL 原理分析
- 概述
- 檔案格式
- 檢視WAL的工具
- 建立WAL
- 清理WAL
- MANIFEST原理分析
- 概述
- 檢視MANIFEST的工具
- 建立 及 清除 MANIFEST
- 檔案内容
- CcolumnFamily 詳解
- 概述
- API介紹
- 核心資料結構
- 建立以及删除
- MEMTABLE 實作
- 概述
- 實作
- Rocksdb寫入邏輯
- 概述
- 實作
- 總結
- 關于寫的一些參數調優
- 讀流程
- 讀流程圖
- 概述
- memtable 源碼分析
- SST 源碼分析
- 參考資料
前言
Rocksdb作為當下nosql中性能的代表被各個存儲元件(mysql,tikv,pmdk,bluestore)作為存儲引擎底座,其基于LSM tree的核心存儲結構(将随機寫通過資料結構轉化為順序寫)來提供高性能的寫吞吐時保證了讀性能。同時大量的并發性配置來降低compaction的影響。且最近社群也推出了key-value分離存儲的blobdb,在大value場景的寫性能又有了進一步的提升。完善且全面的各種語言的SDK和社群,讓rocskdb迅速占領存儲引擎的核心區域。
是以為了提升存儲引擎的核心開發能力,特此針對rocksdb的核心實作學習研究。
通過近期的學習了解,将最終的成果做一個固化總結,友善後續的持續複習。
Rocksdb寫流程圖
涉及到的幾個核心檔案:
- WAL 儲存目前rocksdb的memtable中的檔案資訊,當memtable --》 immutable memtable中的資料刷到L0之後即之前的會被删除 — 即于DB目錄下的 00012.log
- MANIFEST 儲存目前db的狀态資訊(類似于快照),主要是SST檔案的各個版本資訊(當sst檔案被改動,即會生成對應的versionEdit,并觸發sync寫manifest檔案),用于異常斷電後恢複— 即 MANIFEST-000001 的檔案
- CURRENT 記錄目前最新的manifest檔案編号
- Memtable 常駐于記憶體中,在wal寫之後,接受具體的key-value資料。每個memtable大小以及個數都有指定的參數進行控制,write_buffer_size 表示memtable的大小,max_write_buffer_number表示記憶體中最多可以同時存在多少個memtable的個數
- Immutable memtable ,當memtable被寫滿之後會生成一個新的memtable繼續接受IO,舊的memtable就會變成 immutable memtable ,隻讀的狀态,且開始flush到磁盤的L0。
- SST檔案,核心key-value的存儲檔案。DB目錄下的000023.sst形态。
分析IO過程主要是通過rocksdb的幾個接口:
-
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
-
rocksdb::Status s = db->Get(rocksdb::ReadOptions(), key1, &pinnable_val);
-
詳細的接口可以參考basic operationsrocksdb::Status s = db->Put(rocksdb::WriteOptions(), key2, value);
在介紹詳細的寫流程之前需要先整體得了解流程圖中的各個重要檔案的作用,以及其基本的實作過程。
WAL 原理分析
概述
在RocksDB中每一次資料的更新都會涉及到兩個結構,一個是記憶體中的memtable(後續會重新整理到磁盤成為SST),第二個是WAL(WriteAheadLog)。
WAL主要的功能是當RocksDB異常退出後,能夠恢複出錯前的記憶體中(memtable)資料,是以RocksDB預設是每次使用者寫都會重新整理資料到WAL. 每次當目前WAL對應的記憶體資料(memtable)重新整理到磁盤之後,都會建立一個WAL.
所有的WAL檔案都是儲存在WAL目錄(options.wal_dir),為了保證資料的狀态,所有的WAL檔案的名字都是按照順序的(log_number).
檔案格式
WAL檔案由一堆變長的record組成,而每個record是由kBlockSize(32k)來分組,比如某一個record大于kBlockSize的話,他就會被切分為多個record(通過type來判斷).
+-----+-------------+--+----+----------+------+-- ... ----+
File | r0 | r1 |P | r2 | r3 | r4 | |
+-----+-------------+--+----+----------+------+-- ... ----+
<--- kBlockSize ------>|<-- kBlockSize ------>|
rn = variable size records
P = Padding
record的格式如下:
+---------+-----------+-----------+--- ... ---+
|CRC (4B) | Size (2B) | Type (1B) | Payload |
+---------+-----------+-----------+--- ... ---+
CRC = 32bit hash computed over the payload using CRC
Size = Length of the payload data
Type = Type of record
(kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
The type is used to group a bunch of records together to represent
blocks that are larger than kBlockSize
Payload = Byte stream as long as specified by the payload size
最後是WAL的payload的格式,其中是一批操作的集合,從record中可以看出wal的寫入是一批一批寫入得。
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring
// kTypeDeletion varstring
// kTypeSingleDeletion varstring
// kTypeMerge varstring varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// kTypeBeginPrepareXID varstring
// kTypeEndPrepareXID
// kTypeCommitXID varstring
// kTypeRollbackXID varstring
// kTypeNoop
// varstring :=
// len: varint32
// data: uint8[len]
上面的格式中可以看到有一個sequence的值,這個值主要用來表示WAL中操作的時序,這裡要注意每次sequence的更新是按照WriteBatch來更新的.
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
Status status;
.........................................
WriteBatchInternal::SetSequence(merged_batch, sequence);
檢視WAL的工具
這裡我是在mac上直接安裝的rocksdb(
brew install rocksdb
)的工具來列印的,如果是在标準linux作業系統,編譯好rocksdb代碼之後會有
ldb
工具,兩者是同一個工具
bogon:rocksdb-master baron$ rocksdb_ldb dump_wal --walfile=./000285.log --header
Sequence,Count,ByteSize,Physical Offset,Key(s)
1255,1,110,0,PUT(1) : 0x00000006000000000000013C
以上列印的是一個reocord,且目前record隻有一個操作,如果有多個是一個bactch,那麼也會添加到同一個record之中的。
建立WAL
首先是一個新的DB被打開的時候會建立一個WAL;
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
......................................................................
s = impl->Recover(column_families);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
.............................................
s = NewWritableFile(
impl->immutable_db_options_.env,
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
&lfile, opt_env_options);
................................................
第二個情況是當一個
CF(column family)
被重新整理到磁盤之後,也會建立新的WAL,這種情況下建立WAL是用過SwitchMemtable函數. 這個函數主要是用來切換memtable,也就是做flush之前的切換(生成新的memtable,然後把老的重新整理到磁盤)
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
..................................................
{
if (creating_new_log) {
...............................................
} else {
s = NewWritableFile(
env_, LogFileName(immutable_db_options_.wal_dir, new_log_number),
&lfile, opt_env_opt);
}
.................................
}
...............................................
return s;
}
通過上面的兩個函數我們可以看到每次建立WAL都會有一個new_log_number,這個值就是對應的WAL的檔案名字首,可以看到每次生成新的log_number, 基本都會調用NewFileNumber函數.這裡注意如果option設定了recycle_log_file_num的話,是有可能重用老的log_number的。我們先來看下NewFileNumber函數:
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
可以看到函數實作很簡單,就是每次log_number加一,是以一般來說WAL的檔案格式都是類似0000001.log這樣子
清理WAL
WAL的删除隻有當包含在此WAL中的所有的資料都已經被持久化為SST之後(也有可能會延遲删除,因為有時候需要master發送transcation Log到slave來回放). 先來看DBImpl::FIndObsoleteFiles函數,這個函數很長,我們隻關注對應的WAL部分,這裡邏輯很簡單,就是周遊所有的WAL,然後找出log_number小于目前min_log_number的檔案然後加入到對應的結構(log_delete_files).
if (!alive_log_files_.empty() && !logs_.empty()) {
uint64_t min_log_number = job_context->log_number;
size_t num_alive_log_files = alive_log_files_.size();
// find newly obsoleted log files
while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin();
if (immutable_db_options_.recycle_log_file_num >
log_recycle_files.size()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"adding log %" PRIu64 " to recycle list\n",
earliest.number);
log_recycle_files.push_back(earliest.number);
} else {
job_context->log_delete_files.push_back(earliest.number);
}
.....................................................................
}
while (!logs_.empty() && logs_.front().number < min_log_number) {
auto& log = logs_.front();
if (log.getting_synced) {
log_sync_cv_.Wait();
// logs_ could have changed while we were waiting.
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
{
InstrumentedMutexLock wl(&log_write_mutex_);
logs_.pop_front();
}
}
// Current log cannot be obsolete.
assert(!logs_.empty());
}
這裡可以看到有兩個核心的資料結構alive_log_files和logs_,他們的差別就是前一個表示有寫入的WAL,而後一個則是包括了所有的WAL(比如open一個DB,而沒有寫入資料,此時也會生成WAL).
最終删除WAL的操作是在DBImpl::DeleteObsoleteFileImpl這個函數,而WAL删除不會單獨觸發,而是和temp/sst這類檔案一起被删除的(PurgeObsoleteFiles).
MANIFEST原理分析
概述
在RocksDB中MANIFEST儲存了存儲引擎的内部的一些狀态中繼資料,簡單來說當系統異常重新開機,或者程式異常被退出之後,RocksDB需要有一種機制能夠恢複到一個一緻性的狀态, 而這個一緻性的狀态就是靠MANIFEST來保證的.
MANIFEST在RocksDB中是一個單獨的檔案,而這個檔案所儲存的資料基本是來自于VersionEdit這個結構.
MANIFEST包含了兩個檔案,一個log檔案一個包含最新MANIFEST檔案名的檔案,Manifest的log檔案名是這樣 MANIFEST-(seqnumber),這個seq會一直增長.隻有當 超過了指定的大小之後,MANIFEST會重新整理一個新的檔案,當新的檔案重新整理到磁盤(并且檔案名更新)之後,老的檔案會被删除掉.這裡可以認為每一次MANIFEST的更新都代表一次snapshot.
下面就是MANIFEST的基本檔案組成:
MANIFEST = { CURRENT, MANIFEST-<seq-no>* }
CURRENT = File pointer to the latest manifest log
MANIFEST-<seq no> = Contains snapshot of RocksDB state and subsequent modifications
在RocksDB中任意時間存儲引擎的狀态都會儲存為一個Version(也就是SST的集合),而每次對Version的修改都是一個VersionEdit,而最終這些VersionEdit就是 組成manifest-log檔案的内容.
下面就是MANIFEST的log檔案的基本構成:
version-edit = Any RocksDB state change
version = { version-edit* }
manifest-log-file = { version, version-edit* }
= { version-edit* }
檢視MANIFEST的工具
依舊使用
ldb
工具,不過我用的是mac自動安裝的
rocksdb_ldb
工具
bogon:rocksdb-master baron$ rocksdb_ldb manifest_dump --path=./MANIFEST-000001
--------------- Column family "default" (ID 0) --------------
log number: 13
comparator: <NO COMPARATOR>
--- level 0 --- version# 0 ---
11:80860['
--------------- Column family "__system__" (ID 1) --------------
log number: 24
comparator: RocksDB_SE_v3.10
--- level 0 --- version# 1 ---
25:1094['
next_file_number 27 last_sequence 190 prev_log_number 0 max_column_family 1
建立 及 清除 MANIFEST
整個MANIFEST涉及到三個資料結構分别是VersionEdit/Version/VersionSet,其中前兩個上面已經有介紹,而最後一個VersionSet顧名思義表示一堆Version的集合,其實就是 記錄了各個版本的資訊用來管理整個Version.
class VersionSet {
public:
VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
~VersionSet();
.......................
private:
struct ManifestWriter;
friend class Version;
.................................
// Opened lazily
unique_ptr<log::Writer> descriptor_log_;
// generates a increasing version number for every new version
uint64_t current_version_number_;
// Queue of writers to the manifest file
std::deque<ManifestWriter*> manifest_writers_;
..........................................
這裡最關鍵的兩個資料結構是descriptor_log_和manifest_writers_,前一個表示了目前manifest-log檔案,後一個表示需要寫入到manifest-log檔案中的内容.
下面就是ManifestWriter的結構,可以看到其中包含了一個VersionEdit的數組,這個數組就是即将要寫入到manifest-log檔案中的内容.
// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
Status status;
bool done;
InstrumentedCondVar cv;
ColumnFamilyData* cfd;
const autovector<VersionEdit*>& edit_list;
explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
const autovector<VersionEdit*>& e)
: done(false), cv(mu), cfd(_cfd), edit_list(e) {}
};
然後我們來看RocksDB如何來建立以及寫入檔案,下面所有的代碼都是包含在VersionSet::LogAndApply這個函數中.
首先在每次LogAndApply的時候都會建立一個新的ManifesWriter加入到manifest_writers_隊列中.這裡隻有當之前儲存在隊列中 的所有Writer都寫入完畢之後才會加入到隊列,否則就會等待 (後續會在詳細的寫流程中說明writer的作用)
// queue our request
ManifestWriter w(mu, column_family_data, edit_list);
manifest_writers_.push_back(&w);
while (!w.done && &w != manifest_writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
接下來就是儲存對應的資料到batch_edits中(manifest_writers_).
autovector<VersionEdit*> batch_edits;
....................................
if (w.edit_list.front()->IsColumnFamilyManipulation()) {
// no group commits for column family add or drop
LogAndApplyCFHelper(w.edit_list.front());
batch_edits.push_back(w.edit_list.front());
} else {
v = new Version(column_family_data, this, current_version_number_++);
........................................................
for (const auto& writer : manifest_writers_) {
if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
writer->cfd->GetID() != column_family_data->GetID()) {
break;
}
last_writer = writer;
for (const auto& edit : writer->edit_list) {
...........................................
batch_edits.push_back(edit);
}
}
builder->SaveTo(v->storage_info());
}
然後就是建立新的manifest-log檔案的邏輯.這裡可以看到要麼是第一次進入,要麼檔案大小大于option對應的值才會建立新的檔案
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
} else {
pending_manifest_file_number_ = manifest_file_number_;
}
if (new_descriptor_log) {
// if we're writing out new snapshot make sure to persist max column family
if (column_family_set_->GetMaxColumnFamily() > 0) {
w.edit_list.front()->SetMaxColumnFamily(
column_family_set_->GetMaxColumnFamily());
}
}
如果需要建立新的manifest-log檔案,則開始構造對應的檔案資訊并建立檔案.
if (new_descriptor_log) {
// create manifest file
ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
pending_manifest_file_number_);
unique_ptr<WritableFile> descriptor_file;
EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
s = NewWritableFile(
env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, opt_env_opts);
if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());
}
}
開始寫入對應的VersionEdit的record到檔案(最後我們會來看這個record的構成),這裡看到寫入完成後會調用Sync來重新整理内容到磁盤,等這些操作都做完之後,則會更新Current檔案也就是更新最新的manifest-log檔案名到CURRENT檔案中.
for (auto& e : batch_edits) {
std::string record;
if (!e->EncodeTo(&record)) {
s = Status::Corruption(
"Unable to Encode VersionEdit:" + e->DebugString(true));
break;
}
TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
rocksdb_kill_odds * REDUCE_ODDS2);
s = descriptor_log_->AddRecord(record);
if (!s.ok()) {
break;
}
}
if (s.ok()) {
s = SyncManifest(env_, db_options_, descriptor_log_->file());
}
.............................
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
db_directory);
}
CURRENT檔案更新完畢之後,就可以删除老的mainfest檔案了.
// Append the old mainfest file to the obsolete_manifests_ list to be deleted
// by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) {
obsolete_manifests_.emplace_back(
DescriptorFileName("", manifest_file_number_));
}
最後則是更新manifest_writers_隊列,喚醒之前阻塞的内容.
// wake up all the waiting writers
while (true) {
ManifestWriter* ready = manifest_writers_.front();
manifest_writers_.pop_front();
if (ready != &w) {
ready->status = s;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
檔案内容
具體的檔案格式可以參考官方wikiMANIFEST
比如compaction過程中針對某個sst檔案的edit結果會記錄到MANIFEST,格式如下:
+--------------+-------------+--------------+------------+----------------+--------------+----------------+----------------+
| kNewFile4 | level | file number | file size | smallest_key | largest_key | smallest_seqno | largest_seq_no |
+--------------+-------------+--------------+------------+----------------+--------------+----------------+----------------+
|<-- var32 -->|<-- var32 -->|<-- var64 -->|<- var64 ->|<-- String -->|<-- String -->|<-- var64 -->|<-- var64 -->|
+--------------+------------------+---------+------+----------------+--------------------+---------+------------+
| CustomTag1 | Field 1 size n1 | field1 | ... | CustomTag(m) | Field m size n(m) | field(m)| kTerminate |
+--------------+------------------+---------+------+----------------+--------------------+---------+------------+
<-- var32 -->|<-- var32 -->|<- n1 ->| |<-- var32 - ->|<-- var32 -->|<- n(m)->|<- var32 -->|
通過上面的分析我們可以看到最終是通過VersionEdit::EncodeTo來序列化資料,而VersionEdit主要包含了比如log_number/last_sequence_這些字段,這裡還有一個比較關鍵的資訊被序列化了,那就是FileMetaData,也就是SST檔案的元資訊.
struct FileMetaData {
FileDescriptor fd;
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
SequenceNumber smallest_seqno; // The smallest seqno in this file
SequenceNumber largest_seqno; // The largest seqno in this file
.........................................
// File size compensated by deletion entry.
// This is updated in Version::UpdateAccumulatedStats() first time when the
// file is created or loaded. After it is updated (!= 0), it is immutable.
uint64_t compensated_file_size;
// These values can mutate, but they can only be read or written from
// single-threaded LogAndApply thread
uint64_t num_entries; // the number of entries.
uint64_t num_deletions; // the number of deletion entries.
uint64_t raw_key_size; // total uncompressed key size.
uint64_t raw_value_size; // total uncompressed value size.
int refs; // Reference count
bool being_compacted; // Is this file undergoing compaction?
bool init_stats_from_file; // true if the data-entry stats of this file
// has initialized from file.
bool marked_for_compaction; // True if client asked us nicely to compact this
// file.
};
CcolumnFamily 詳解
概述
在RocksDB 3.0中加入了Column Family特性,加入這個特性之後,每一個KV對都會關聯一個Column Family,其中預設的Column Family是 “default”. Column Family主要是提供給RocksDB一個邏輯的分區.從實作上來看不同的Column Family共享WAL,而都有自己的Memtable和SST.這就意味着我們可以很 快速已經友善的設定不同的屬性給不同的Column Family以及快速删除對應的Column Family.
API介紹
首先是建立Column Family,這裡注意我們可以通過兩種方式來建立Column Family:
- 在Open DB的時候通過傳遞需要建立的Column Family
- 當DB建立并打開之後, 通過直接的CreateColumnFamily來建立Column Family
以上建立調用的接口如下:
DB::Open(const DBOptions& db_options, const std::string& name, const std::vector<ColumnFamilyDescriptor>& column_families, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
DB::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle);
這裡可以看到不管是哪一種方式最終都會傳回一個ColumnFamilyHandle給調用者來使用.
然後就是删除Column Family的方式,這裡很簡單就是傳遞之前建立的ColumnFamilyHandle給RocksDB,然後用以删除.
DropColumnFamily(ColumnFamilyHandle* column_family);
核心資料結構
所有的Column Family都是通過一個叫做ColumnFamilySet的結構來管理的,而每一個Column Family都是一個ColumnFamilyData.
先來看ColumnFamilySet,這裡可以看到它有兩個資料結構來管理Column Family,分别是map(column_family_data_)以及一個雙向連結清單(dummy_cfd_). 其中map用來儲存Column Family名字和對應的id以及ColumnFamilyData的映射. 這裡要注意在RocksDB内部是将沒一個ColumnFamily的名字表示為一個uint32類型的ID(max_column_family_).也就是這個ID是一個簡單的遞增的數值.
class ColumnFamilySet {
public:
// ColumnFamilySet supports iteration
public:
.................................
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); }
...............................
private:
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor
// REQUIRES: DB mutex held
void RemoveColumnFamily(ColumnFamilyData* cfd);
// column_families_ and column_family_data_ need to be protected:
// * when mutating both conditions have to be satisfied:
// 1. DB mutex locked
// 2. thread currently in single-threaded write thread
// * when reading, at least one condition needs to be satisfied:
// 1. DB mutex locked
// 2. accessed from a single-threaded write thread
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
// We don't hold the refcount here, since default column family always exists
// We are also not responsible for cleaning up default_cfd_cache_. This is
// just a cache that makes common case (accessing default column family)
// faster
ColumnFamilyData* default_cfd_cache_;
..................................
};
然後來看ColumnFamilyData,這個資料結構就是用來表示一個ColumnFamily,儲存了對應的資訊,我們可以看到有ID/name以及目前ColumnFamily對應的所有的version(dummy_versions_). 其中這裡的next_/prev_就是在ColumnFamilySet中用來表示所有ColumnFamily的雙向連結清單.
class ColumnFamilyData {
public:
~ColumnFamilyData();
// thread-safe
uint32_t GetID() const { return id_; }
// thread-safe
const std::string& GetName() const { return name_; }
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
// Unref decreases the reference count, but does not handle deletion
// when the count goes to 0. If this method returns true then the
// caller should delete the instance immediately, or later, by calling
// FreeDeadColumnFamilies(). Unref() can only be called while holding
// a DB mutex, or during single-threaded recovery.
bool Unref() {
int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
assert(old_refs > 0);
return old_refs == 1;
}
..............................
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& options,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set);
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_
......................................................
// Thread's local copy of SuperVersion pointer
// This needs to be destructed before mutex_
std::unique_ptr<ThreadLocalPtr> local_sv_;
// pointers for a circular linked list. we use it to support iterations over
// all column families that are alive (note: dropped column families can also
// be alive as long as client holds a reference)
ColumnFamilyData* next_;
ColumnFamilyData* prev_;
...................................
ColumnFamilySet* column_family_set_;
..................................
};
然後就是傳回給調用者的ColumnFamilyHandleImpl結構,這個結構主要是封裝了ColumnFamilyData.
// ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client
// is done using the column family
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
// create while holding the mutex
ColumnFamilyHandleImpl(
ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
......................................
private:
ColumnFamilyData* cfd_;
DBImpl* db_;
InstrumentedMutex* mutex_;
};
建立以及删除
從DBImpl::CreateColumnFamilyImpl開始.在這個函數 中首先就是通過調用GetNextColumnFamilyID來得到目前建立的ColumnFamily對應的ID(自增).然後再調用LogAndApply來對ColumnFamily 進行對應的操作.最後再傳回封裝好的ColumnFamilyHandle給調用者.
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
.......................................
{
...................................
VersionEdit edit;
edit.AddColumnFamily(column_family_name);
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(new_id);
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(cf_options.comparator->Name());
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
{ // write thread
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
&mutex_, directories_.GetDbDir(), false,
&cf_options);
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
........................................
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
}
.............................................
} // InstrumentedMutexLock l(&mutex_)
.................................
return s;
}
最終會在LogAndApply調用ColumnFamilySet的CreateColumnFamily函數(通過VersionSet::CreateColumnFamily),這個函數我們可看到主要做了下面三件事情:
- 建立ColumnFamilyData對象
- 将新的建立好的CFD加入到雙向連結清單
- 對應的Map資料結構更新資料
// under a DB mutex AND write thread
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, env_options_, this);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
// add to linked list
new_cfd->next_ = dummy_cfd_;
auto prev = dummy_cfd_->prev_;
new_cfd->prev_ = prev;
prev->next_ = new_cfd;
dummy_cfd_->prev_ = new_cfd;
if (id == 0) {
default_cfd_cache_ = new_cfd;
}
return new_cfd;
}
然後來看如何删除ColumnFamily,這裡所有的删除最終都會調用ColumnFamilySet::RemoveColumnFamily函數,這個函數是是從兩個Map中删除對應的ColumnFamily. 這裡或許我們要問了,為什麼管理的雙向連結清單不需要删除呢。這裡原因是這樣的,由于ColumnFamilyData是通過引用計數管理的,是以隻有當所有的引用計數都清零之後, 才需要真正的函數ColumnFamilyData(也就是會從雙向連結清單中删除資料).
// under a DB mutex AND from a write thread
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end());
column_family_data_.erase(cfd_iter);
column_families_.erase(cfd->GetName());
}
是以我們來看ColumnFamilyData的析構函數.可以看到析構函數中會從雙向連結清單中删除對應的資料,以及處理對應的Version(corrent_).
// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_.load(std::memory_order_relaxed) == 0);
// remove from linked list
auto prev = prev_;
auto next = next_;
prev->next_ = next;
next->prev_ = prev;
if (!dropped_ && column_family_set_ != nullptr) {
// If it's dropped, it's already removed from column family set
// If column_family_set_ == nullptr, this is dummy CFD and not in
// ColumnFamilySet
column_family_set_->RemoveColumnFamily(this);
}
if (current_ != nullptr) {
current_->Unref();
}
..............................
}
最後我們來看一下在磁盤上ColumnFamily是如何儲存的,首先需要明确的是ColumnFamily是儲存在MANIFEST檔案中的,資訊的儲存比較簡單(之前的文章有介紹), 和MANIFEST中其他的資訊沒什麼差別,是以這裡我們主要來看資料的讀取以及初始化,這裡所有的操作都是包含在VersionSet::Recover中,我們來看這個函數.
函數主要的邏輯就是讀取MANIFEST然後來再來将磁盤上讀取的ColumnFamily的資訊初始化(初始化ColumnFamilySet結構),可以看到這裡相當于将之前的create/drop 的操作全部回放一遍,也就是會調用CreateColumnFamily/DropColumnFamily來将磁盤的資訊初始化到記憶體.
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
// we will delete the column family from
// column_families_not_found.
bool cf_in_not_found =
column_families_not_found.find(edit.column_family_) !=
column_families_not_found.end();
// in builders means that user supplied that column family
// option AND that we encountered column family add record
bool cf_in_builders =
builders.find(edit.column_family_) != builders.end();
// they can't both be true
assert(!(cf_in_not_found && cf_in_builders));
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) {
if (cf_in_builders || cf_in_not_found) {
s = Status::Corruption(
"Manifest adding the same column family twice");
break;
}
auto cf_options = cf_name_to_options.find(edit.column_family_name_);
if (cf_options == cf_name_to_options.end()) {
column_families_not_found.insert(
{edit.column_family_, edit.column_family_name_});
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
delete builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
} else {
// who else can have reference to cfd!?
assert(false);
}
} else if (cf_in_not_found) {
column_families_not_found.erase(edit.column_family_);
} else {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
} else if (!cf_in_not_found) {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family");
break;
}
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
}
if (cfd != nullptr) {
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
ROCKS_LOG_WARN(
db_options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
}
}
if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break;
}
}
if (edit.has_prev_log_number_) {
previous_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_max_column_family_) {
max_column_family = edit.max_column_family_;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
MEMTABLE 實作
概述
我們知道RocksDB每一次寫入,都是先寫WAL,然後寫Memtable,這次我們就來分析下MemTable的實作。
在RocksDB中,每個ColumnFamily都有自己的Memtable,互不影響.而在RocksDB中Memtable有多種實作(SkipList/HashSkipList/HashLinkList/Vector),具體的差別可以看memtable,我們這次主要來分析預設的實作skiplist(隻有skiplist是可以并發插入的).
實作
首先從建立Memtable開始,Memtable的建立(ColumnFamilyData::CreateNewMemtable)是在建立ColumnFamily(VersionSet::CreateColumnFamily)的時候建立的.這裡就是建立memtable,然後設定到ColumnFamilyData的mem_域中.
MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_manager_, earliest_seq, id_);
}
void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
mem_->Ref();
}
上面所提及的,RocksDB有多種MemTable的實作,那麼它是如何來做的呢,RocksDB通過memtable_factory來根據使用者的設定來建立不同的memtable.這裡要注意的是核心的memtable實作是在MemTable這個類的table_域中.
MemTable::MemTable:
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log,
column_family_id)),
class MemTableRepFactory {
public:
virtual ~MemTableRepFactory() {}
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
Allocator*, const SliceTransform*,
Logger* logger) = 0;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& key_cmp, Allocator* allocator,
const SliceTransform* slice_transform, Logger* logger,
uint32_t /* column_family_id */) {
return CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
}
........................
然後最後會調用對應的實作的CreateMemTableRep方法,這裡我們就來看SkipList的實作.
MemTableRep* SkipListFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* /*logger*/) {
return new SkipListRep(compare, allocator, transform, lookahead_);
}
最終就是建立SkipListRep對象,在這個對象裡面會建立SkipList(class InlineSkipList).
class SkipListRep : public MemTableRep {
InlineSkipList<const MemTableRep::KeyComparator&> skip_list_;
...................................
public:
explicit SkipListRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator, const SliceTransform* transform,
const size_t lookahead)
: MemTableRep(allocator),
skip_list_(compare, allocator),
cmp_(compare),
transform_(transform),
lookahead_(lookahead) {}
這裡我們隻需要知道最終所有的memtable資料都是儲存在SkipList中就可以了.
在之前的分析中我們知道Memtable的插入是通過WriteBatch然後周遊ColumnFamily來插入的,而最終則是會調用MemTable::Add這個函數.
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
bool res = table->InsertKeyConcurrently(handle);
if (UNLIKELY(!res)) {
return res;
}
..............................
}
最終會調用InlineSkipList來對資料進行插入.
template <class Comparator>
bool InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
Node* prev[kMaxPossibleHeight];
Node* next[kMaxPossibleHeight];
Splice splice;
splice.prev_ = prev;
splice.next_ = next;
return Insert<true>(key, &splice, false);
}
看到這裡或許會有疑問了,那就是skiplist裡面隻有key,而RocksDB是一個KV存儲,那麼這個KV是如何存儲的呢,這裡是這樣的,RocksDB會将KV打包成一個key傳遞給SkipList, 對應的KEY的結構是這樣的.
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
而資料的格式化就在之前的MemTable::Add中實作的.
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t val_size = static_cast<uint32_t>(value.size());
uint32_t internal_key_size = key_size + 8;
const uint32_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = nullptr;
std::unique_ptr<MemTableRep>& table =
type == kTypeRangeDeletion ? range_del_table_ : table_;
KeyHandle handle = table->Allocate(encoded_len, &buf);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
Slice key_slice(p, key_size);
p += key_size;
uint64_t packed = PackSequenceAndType(s, type);
EncodeFixed64(p, packed);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
而對于真正的KEY的解析是在SkipList的Comparator中實作的(compare_).下面的代碼片段可以看到會解析出來真正的key,然後再進行查找以及插入.
bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
bool allow_partial_splice_fix) {
Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
const DecodedKey key_decoded = compare_.decode_key(key);
...............................
}
Rocksdb寫入邏輯
概述
在RocksDB中,每次寫入它都會先寫WAL,然後再寫入MemTable,這次我們就來分析這兩個邏輯具體是如何實作的. 首先需要明确的是在RocksDB中,WAL的寫入是單線程順序串行寫入的,而MemTable則是可以并發多線程寫入的。
而在RocksDB 5.5中引進了一個選項enable_pipelined_write,這個選項的目的就是将WAL和MemTable的寫入pipeline化, 也就是說當一個線程寫完畢WAL之後,此時在WAL的write隊列中等待的其他的write則會開始繼續寫入WAL, 而目前線程将會繼續 寫入MemTable.此時就将不同的Writer的寫入WAL和寫入MemTable并發執行了.
實作
這裡分析pipeline的實作,核心函數就是DBImpl::PipelinedWriteImpl.通過設定參數
enable_pipelined_write = true
來開啟pipeline的寫方式。
- 每一個DB(DBImpl)都有一個write_thread_(class WriteThread).
- 每次調用Write的時候會先寫入WAL, 此時建立一個WriteThread::Writer對象,并将這個對象加入到一個Group中(調用JoinBatchGroup)
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
write_thread_.JoinBatchGroup(&w);
- JoinBatchGroup,這個函數主要是用來将所有的寫入WAL加入到一個Group中.這裡可以看到當目前的Writer 對象是leader(比如第一個進入的對象)的時候将會直接傳回,否則将會等待直到更新為對應的狀态.
void WriteThread::JoinBatchGroup(Writer* w) {
...................................
bool linked_as_leader = LinkOne(w, &newest_writer_);
if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
if (!linked_as_leader) {
/**
* Wait util:
* 1) An existing leader pick us as the new leader when it finishes
* 2) An existing leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
* 2.2) Or tell us to finish the memtable writes in pralallel
* 3) (pipelined write) An existing leader pick us as its follower and
* finish book-keeping and WAL write for us, enqueue us as pending
* memtable writer, and
* 3.1) we become memtable writer group leader, or
* 3.2) an existing memtable writer group leader tell us to finish memtable
* writes in parallel.
*/
AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
}
}
- 然後我們來看LinkOne函數,這個函數主要用來講目前的Writer對象加入到group中,這裡可以看到由于 寫入是并發的是以對應的newest_writer_(儲存最新的寫入對象)需要原子操作來更新.
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}
- 當從JoinBatchGroup傳回之後,當目前的Writer對象為leader的話,則将會把此leader下的所有的write都 連結到一個WriteGroup中(調用EnterAsBatchGroupLeader函數), 并開始寫入WAL,這裡要注意非leader的write将會直接 進入memtable的寫入,這是因為非leader的write都将會被目前它所從屬的leader來打包(group)寫入,後面我們會看到實作.
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);
................................................
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
// This is safe regardless of any db mutex status of the caller. Previous
// calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
// (they emptied the list and then we added ourself as leader) or had to
// explicitly wake us up (the list was non-empty when we added ourself,
// so we have already received our MarkJoined).
CreateMissingNewerLinks(newest_writer);
// Tricky. Iteration start (leader) is exclusive and finish
// (newest_writer) is inclusive. Iteration goes from old to new.
Writer* w = leader;
while (w != newest_writer) {
w = w->link_newer;
.........................................
w->write_group = write_group;
size += batch_size;
write_group->last_writer = w;
write_group->size++;
}
..............................
}
- 這裡注意到周遊是通過link_newer進行的,之是以這樣做是相當于在寫入WAL之前,對于目前leader的Write 做一次snapshot(通過CreateMissingNewerLinks函數).
void WriteThread::CreateMissingNewerLinks(Writer* head) {
while (true) {
Writer* next = head->link_older;
if (next == nullptr || next->link_newer != nullptr) {
assert(next == nullptr || next->link_newer == head);
break;
}
next->link_newer = head;
head = next;
}
}
- 上述操作進行完畢之後,進入寫WAL操作,最終會把這個write_group打包成一個writeBatch(通過MergeBatch函數)進行寫入.
if (w.ShouldWriteToWAL()) {
...............................
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
}
- 當目前的leader将它自己與它的follow寫入之後,此時它将需要寫入memtable,那麼此時之前還阻塞的Writer,分為兩種情況 第一種是已經被目前的leader打包寫入到WAL,這些writer(包括leader自己)需要将他們連結到memtable writer list.還有一種情況,那就是還沒有寫入WAL的,此時這類writer則需要選擇一個leader然後繼續寫入WAL.
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
.....................................
if (enable_pipelined_write_) {
// Notify writers don't write to memtable to exit.
......................................
// Link the ramaining of the group to memtable writer list.
if (write_group.size > 0) {
if (LinkGroup(write_group, &newest_memtable_writer_)) {
// The leader can now be different from current writer.
SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
}
}
// Reset newest_writer_ and wake up the next leader.
Writer* newest_writer = last_writer;
if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
Writer* next_leader = newest_writer;
while (next_leader->link_older != last_writer) {
next_leader = next_leader->link_older;
assert(next_leader != nullptr);
}
next_leader->link_older = nullptr;
SetState(next_leader, STATE_GROUP_LEADER);
}
AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&eabgl_ctx);
} else {
.....................................
}
}
- 接下來我們來看寫入memtable的操作,這裡邏輯類似寫入WAL,如果是leader的話,則依舊會建立一個group(WriteGroup),然後周遊需要寫入memtable的writer,将他們都加入到group中(EnterAsMemTableWriter),然後則設定并發執行的大小,以及設定對應狀态(LaunchParallelMemTableWriters).這裡注意每次setstate就将會喚醒之前阻塞的Writer.
void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
assert(write_group != nullptr);
write_group->running.store(write_group->size);
for (auto w : *write_group) {
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
}
}
- 這裡要注意,在構造memtable的group的時候,我們不需要建立link_newer,因為之前在寫入WAL的時候,我們已經構造好link_newer,那麼此時我們使用構造好的group也就是表示這個group中包含的都是已經寫入到WAL的操作.
void WriteThread::EnterAsMemTableWriter(Writer* leader,
WriteGroup* write_group) {
....................................
if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
....................................................
}
write_group->last_writer = last_writer;
write_group->last_sequence =
last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
}
- 最後開始執行寫入MemTable的操作,之前在寫入WAL的時候被阻塞的所有Writer此時都會進入下面這個邏輯,此時也就意味着 并發寫入MemTable.
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
.........................
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}
- 最後當目前group的所有Writer都寫入MemTable之後,則将會調用ExitAsMemTableWriter來進行收尾工作.如果有新的memtable writer list需要處理,那麼則喚醒對應的Writer,然後設定已經處理完畢的Writer的狀态.
void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
WriteGroup& write_group) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
Writer* newest_writer = last_writer;
if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
nullptr)) {
CreateMissingNewerLinks(newest_writer);
Writer* next_leader = last_writer->link_newer;
assert(next_leader != nullptr);
next_leader->link_older = nullptr;
SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
}
Writer* w = leader;
while (true) {
if (!write_group.status.ok()) {
w->status = write_group.status;
}
Writer* next = w->link_newer;
if (w != leader) {
SetState(w, STATE_COMPLETED);
}
if (w == last_writer) {
break;
}
w = next;
}
// Note that leader has to exit last, since it owns the write group.
SetState(leader, STATE_COMPLETED);
}
總結
我們可以看到在RocksDB中,WAL的寫入始終是串行寫入,而MemTable可以多線程并發寫入,也就是說在系統壓力到一定階段的時候, 寫入WAL肯定會成為瓶頸.
關于寫的一些參數調優
通過在rocksdb打開的時候增加自調優的參數設定:
options.OptimizeLevelStyleCompaction();
總體上的一個參數調整是增加memtable的吞吐量:增加了memtable的大小,可以同時存在于記憶體中的memtable檔案的個數,并且适配了L1的容量保持和L0的容量接近,還有一些各層的壓縮算法的配置。大概測試了一下該配置的随機寫吞吐能夠在原有基礎之上提升50-80%。不過該配置肯定對記憶體資源的消耗比較大,是以如果系統資源足夠且是IO密集型業務對性能有較高的要求可以嘗試一下該配置。
options.allow_concurrent_memtable_write=true ;
允許多個writer 對memtable的并發寫入
options.enable_pipelined_write=true ;
開啟pipeline的寫機制,允許memtable和wal并發寫入
詳細的rocksb自優化參數實作邏輯
ColumnFamilyOptions* ColumnFamilyOptions::OptimizeLevelStyleCompaction(
uint64_t memtable_memory_budget) {
write_buffer_size = static_cast<size_t>(memtable_memory_budget / 4);
// merge two memtables when flushing to L0
min_write_buffer_number_to_merge = 2;
// this means we'll use 50% extra memory in the worst case, but will reduce
// write stalls.
max_write_buffer_number = 6;
// start flushing L0->L1 as soon as possible. each file on level0 is
// (memtable_memory_budget / 2). This will flush level 0 when it's bigger than
// memtable_memory_budget.
level0_file_num_compaction_trigger = 2;
// doesn't really matter much, but we don't want to create too many files
target_file_size_base = memtable_memory_budget / 8;
// make Level1 size equal to Level0 size, so that L0->L1 compactions are fast
max_bytes_for_level_base = memtable_memory_budget;
// level style compaction
compaction_style = kCompactionStyleLevel;
// only compress levels >= 2
compression_per_level.resize(num_levels);
for (int i = 0; i < num_levels; ++i) {
if (i < 2) {
compression_per_level[i] = kNoCompression;
} else {
compression_per_level[i] =
LZ4_Supported()
? kLZ4Compression
: (Snappy_Supported() ? kSnappyCompression : kNoCompression);
}
}
return this;
}
讀流程
讀流程圖
概述
簡而言之,在RocksDB中的讀取需要處理的最核心的一個問題就是如何讀取最新的資料,這是由于RocksDB是基于LSM,是以在RocksDB中,對于資料的delete以及update,它并不會立即去執行對應的動作,而隻是插入一條新的資料,而資料的最終更新(last-write-win)以及删除是在compact的時候來做的.
其實最那就是如何讀取到一個資料的最新版本,是以首先我們需要知道在RocksDB中,多個版本的資料是如何儲存的。首先我們需要知道在RocksDB中,資料是儲存在兩個地方,一個是memtable(記憶體),一個是sstable(磁盤),是以RocksDB讀取資料也是依次從這兩個地方讀取.
- memtable.在RocksDB中memtable的預設實作是skiplist,RocksDB會将使用者傳入的key改變為memtable内部的key(user_key+seq+type),然後再加上使用者傳入的value之後,作為一個element加入到skiplist.是以我們讀取的時候需要讀取到最新的那條資料.
- sstable.在RocksDB中,除去level0之外的sstable是保證不會overlap,是以在這些sstable中,隻要get到值,那麼就可以進入下一個level了,而在level0中則需要讀取所有的sstable.
memtable 源碼分析
首先我們知道在RocksDB中,每個version都會一個sequence number,每次寫入都會更新這個sequence number,是以相同key的不同版本就是通過這個seq來确定的,這個sequence相當于一個時間戳,這樣通過sequence我們就可以得到某一個key的最新資料.
通過上面我們知道由于使用者插入或者讀取的時候傳遞進來永遠是隻有user_key,是以在RocksDB内部還會維護一個internal_key,這個internal_key格式如下:
user_key + sequence + type
對應的代碼如下:
InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t));
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
這裡type就是表示目前操作,這裡在memtable中,分為三種種操作,下面value就表示是插入,而merge這次暫時忽略,我們以後會詳細介紹這個操作:
enum ValueType : unsigned char {
kTypeDeletion = 0x0,
kTypeValue = 0x1,
kTypeMerge = 0x2,
........................
}
不同版本的key被插入的時候,在RocksDB内部是如何組織的。在RocksDB中的不同版本的key是按照下面的邏輯進行排序:
increasing user key (according to user-supplied comparator)
decreasing sequence number
decreasing type (though sequence# should be enough to disambiguate)
那麼此時為了讀取最新的那條資料,我們隻需要讀取最大seq的那條資料就可以了.
對應代碼就是InternalKeyComparator這個類,可以看到當key相同時說明是相同key的不同版本,是以開始進行後續的處理:
int InternalKeyComparator::Compare(const ParsedInternalKey& a,
const ParsedInternalKey& b) const {
int r = user_comparator_->Compare(a.user_key, b.user_key);
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
if (a.sequence > b.sequence) {
r = -1;
} else if (a.sequence < b.sequence) {
r = +1;
} else if (a.type > b.type) {
r = -1;
} else if (a.type < b.type) {
r = +1;
}
}
return r;
}
這裡InternalKey對于使用者來說是完全透明的,那麼當使用者來查找對應的user_key的時候,RocksDB又是如何來建構對應的internalkey呢,這裡有一個核心的資料結構叫做LookupKey.我們來看這個類的實作:
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& _user_key, SequenceNumber sequence);
...................................................
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
...........................................
};
這裡可以看到每次構造lookupkey的時候,必須得傳入一個seq,那麼這個seq是如何計算的呢,來看代碼:
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
...........................................
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful either.
// Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
// specified we should be fine with skipping seq numbers that are greater
// than that.
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
} else {
.............................................................
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
}
.........................................
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
}
通過上面的代碼我們可以看到每次調用Get的時候,RocksDB都會構造一個LookupKey,這裡我們可以簡單的認為這個seq就是目前的version最後一次寫成功的seq(以後會介紹這裡的publish_seq).
然後上面的代碼最終會調用MemTable::Get,在分析這個函數之前我們先來看一個資料結構Saver,這個資料結構用來儲存查找内容時的上下文.
struct Saver {
Status* status;
const LookupKey* key;
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress;
std::string* value;
SequenceNumber seq;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
RangeDelAggregator* range_del_agg;
MemTable* mem;
Logger* logger;
Statistics* statistics;
bool inplace_update_support;
Env* env_;
ReadCallback* callback_;
bool* is_blob_index;
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsCommitted(_seq);
}
return true;
}
};
然後我們來看MemTable::Get這個函數,這個函數最核心的步驟就是構造Saver對象,然後調用MemTableRep::Get,這裡注意傳遞給Get的第三個參數是一個回調函數,後面我們會詳細分析這個函數.
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
...............................................
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.range_del_agg = range_del_agg;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
table_->Get(key, &saver, SaveValue);
..............................................
}
然後我們來看MemTableRep::Get,首先我們需要知道MemTableRep這個類用來抽象不同的MemTable的實作,也就是說它是一個虛類,然後不同的MemTable實作了它(通過工廠方法模式維護了包括skiplist,vector等多個工廠,用來提供給使用者建立不同的memtable管理方式),這裡我們隻來分析skiplist也就是預設的MemTable實作.
可以通過設定
options.memtable_factory.reset(new rocksdb::VectorRepFactory());
來配置不同的memtable管理方式
void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto iter = GetDynamicPrefixIterator();
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {
}
}
上面的函數中最核心的是兩個一個是iter->Seek一個是callback_func,我們一個個來,先來分析Seek,可以看到這裡Seek的時候傳遞進去有兩個參數,一個是internal_key,一個是memtable_key,那麼這兩個key分别代表什麼呢,我們再次回到LookupKey這個類,可以看到這裡memtable_key就是(end_-start_),而internal_key就是(end_-kstart_)
class LookupKey {
public:
// Return a key suitable for lookup in a MemTable. memtable-key的構造
Slice memtable_key() const {
return Slice(start_, static_cast<size_t>(end_ - start_));
}
// Return an internal key (suitable for passing to an internal iterator) internal-key的構造
Slice internal_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_));
}
然後那麼對應的這三個變量又表示什麼呢,我們來看LookupKey的構造函數:
LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
size_t usize = _user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst; // _start key
// NOTE: We don't support users keys of more than 2GB :)
dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
kstart_ = dst; //kstart_ key
memcpy(dst, _user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst; // end
}
通過上面的構造函數可以看到在LookupKey中會把全部的internal_key(user_key+seq+type)和RocksDB為user_key所添加的内容指針分别儲存起來,也就是memtable_key儲存了内部使用的key,而internal_key儲存了RocksDB為構造在内部key添加的内容.這裡可以看到查找的時候,儲存的type是一個特殊的type,這個type其實是kTypeBlobIndex,也就是是值最大的type.那麼為什麼要這麼做呢,我們在分析之前先來看對應的Seek函數.
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& user_key, const char* memtable_key)
override {
if (memtable_key != nullptr) {
iter_.Seek(memtable_key);
} else {
iter_.Seek(EncodeKey(&tmp_, user_key));
}
}
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
node_ = list_->FindGreaterOrEqual(target);
}
這裡由于上面的memtable_key肯定不為null,那麼就是會調用下面對應的Seek函數,而這個函數最終會調用skiplist的FindGreaterOrEqual函數,這個函數也就是用來定位到大于或者等于memtable_key的位置,此時我們再回憶下一開始介紹的key的排序
(InternalKeyComparator::Compare),也就是當Key相同時,按照seq的降序,如果seq相同則按照type的降序,那麼此時FindGreaterOrEqual就比較好了解了,也就是會傳回小于我們輸入seq的值,而當seq相等的話,則會傳回小于我們的輸入type的值(由于我們傳入的是最大的type,是以也就是會直接傳回值).那麼此時傳回的位置有可能key本身就比我們的輸入key小,并且我們還需要肯根據不同的type來做不同的操作,那麼此時就需要SaveValue回調了.
接下來我們來看對應的callbakc_func(SaveValue)函數,這個函數有兩個參數,第一個參數是之前儲存的Saver對象,第二個則就是我們在skiplist中定位到的位置.這個函數要做的比較簡單,首先就是判斷是否得到的key和我們傳遞進來的key相同,如果不同,則說明查找的key不合法,是以直接傳回.這裡我們着重來看對于插入和删除的處理.
static bool SaveValue(void* arg, const char* entry) {
......................................................
//檢查得到的key和傳入的key是否一緻
if (s->mem->GetInternalKeyComparator().user_comparator()->Equal(
Slice(key_ptr, key_length - 8), s->key->user_key())) {
...........................................................
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}
*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
}
return false;
}
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else {
*(s->status) = Status::NotFound();
}
*(s->found_final_value) = true;
return false;
}
}
}
當查找到對應的值的時候,直接指派然後傳回給使用者(設定found_final_value).這裡可以看到如果是Delete的話,直接傳回NotFound.
SST 源碼分析
當資料不在記憶體中時,讀操作會去到底層SST檔案中讀取資料。
依舊是從DBImpl::GetImpl開始,這個函數隻分析了Memtable相關的代碼,這次我們來看當memtable沒有查找到之後,RocksDB是如何處理的.我們可以看到當MemTable中沒有找到對應的資料之後(包括删除),RocksDB将會進入對應的sst中查找.
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&range_del_agg, value_found, nullptr, nullptr, callback,
is_blob_index);
RecordTick(stats_, MEMTABLE_MISS);
}
從上面的代碼我們可以看到直接從目前的version(sv->current)調用Get方法,是以接下來我們就來詳細看這個函數。 這個函數簡單來說就是根據所需要查找的key,然後選擇對應的檔案,這裡每次會傳回一個檔案(key在sst的key範圍内),然後循環查找.
先來看查找之前的初始化
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, range_del_agg, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
pinned_iters_mgr.StartPinning();
}
FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
第一個是GetContext結構,這個類隻要是根據傳遞進來的檔案元資訊來查找對應的key.然後是FilePicker,這個類主要是根據傳遞進來的key來選擇對應的檔案.這裡最重要就是GetNextFile這個函數,我們來看這個函數。
這個函數他會周遊所有的level,然後再周遊每個level的所有的檔案,這裡會對level 0的檔案做一個特殊處理,這是因為隻有level0的sst的range不是有序的,是以我們每次查找需要查找所有的檔案,也就是會一個個的周遊.
而在非level0,我們隻需要按照二分查找來得到對應的檔案即可,如果二分查找不存在,那麼我就需要進入下一個level進行查找.
FdWithKeyRange* GetNextFile() {
while (!search_ended_) { // Loops over different levels.
while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
// Loops over all files in current level.
FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
int cmp_largest = -1;
if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
assert(
curr_level_ == 0 ||
curr_index_in_curr_level_ == start_index_in_curr_level_ ||
user_comparator_->Compare(user_key_,
ExtractUserKey(f->smallest_key)) <= 0);
int cmp_smallest = user_comparator_->Compare(user_key_,
ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
cmp_largest = user_comparator_->Compare(user_key_,
ExtractUserKey(f->largest_key));
}
// Setup file search bound for the next level based on the
// comparison results
if (curr_level_ > 0) {
file_indexer_->GetNextLevelIndex(curr_level_,
curr_index_in_curr_level_,
cmp_smallest, cmp_largest,
&search_left_bound_,
&search_right_bound_);
}
// Key falls out of current file's range
if (cmp_smallest < 0 || cmp_largest > 0) {
if (curr_level_ == 0) {
++curr_index_in_curr_level_;
continue;
} else {
// Search next level.
break;
}
}
}
returned_file_level_ = curr_level_;
if (curr_level_ > 0 && cmp_largest < 0) {
// No more files to search in this level.
search_ended_ = !PrepareNextLevel();
} else {
++curr_index_in_curr_level_;
}
return f;
}
// Start searching next level.
search_ended_ = !PrepareNextLevel();
}
// Search ended.
return nullptr;
}
這裡RocksDB使用了一個技巧用來加快二分查找的速度,每次更新sst的時候,RocksDB都會調用FileIndexer::UpdateIndex來更新這樣的一個結構,這個結構就是FileIndexer,它主要是用來儲存每一個level和level+1的key範圍的關聯資訊,這樣當我們在level查找的時候,如果沒有查找到資訊,那麼我們将會迅速得到下一個level需要查找的檔案範圍.
每一個key來進行比較總會有三種情況:
- 小于目前sst的smallest.
- 大于目前sst的largest.
- 處于這個範圍.
那麼我們隻需要在初始化索引的時候能夠得到目前的sst在下一個level中的位置,就可以根據上面三種類型來确定下一個level我們需要進行二分查找的檔案範圍.在RocksDB中定義了下面三個值.
// Point to a left most file in a lower level that may contain a key,
// which compares greater than smallest of a FileMetaData (upper level)
int32_t smallest_lb;
// Point to a left most file in a lower level that may contain a key,
// which compares greater than largest of a FileMetaData (upper level)
int32_t largest_lb;
// Point to a right most file in a lower level that may contain a key,
// which compares smaller than smallest of a FileMetaData (upper level)
int32_t smallest_rb;
// Point to a right most file in a lower level that may contain a key,
// which compares smaller than largest of a FileMetaData (upper level)
int32_t largest_rb;
我們通過例子來解釋這三個值.假設有下面兩個level,4個sst.那麼初始化的時候,對應的level1的這個sst對應的四個值分别為. smallest_lb=1;largest_lb=2;smallest_rb=1;largest_rb=2;
level 1: [50 - 60]
level 2: [1 - 40], [45 - 55], [58 - 80]
此時如果我們查找一個key為49,然後第一次比較,也就是key < level1.sst->smallest,那麼我們将會知道我們需要在0和smallest_rb之間來查找,也就是0和1.假設我們查找key是55,也就是 level1.sst->smallest < key < level1.test.largest,此時我們在level2将需要在smallest_rb和largest_rb之間.這裡可以看到其實就是計算一個重合的區間。
來看RocksDB如何根據目前level的比較結果來計算下一個level需要二分查找的檔案範圍:
// During file search, a key is compared against smallest and largest
// from a FileMetaData. It can have 3 possible outcomes:
// (1) key is smaller than smallest, implying it is also smaller than
// larger. Precalculated index based on "smallest < smallest" can
// be used to provide right bound.
// (2) key is in between smallest and largest.
// Precalculated index based on "smallest > greatest" can be used to
// provide left bound.
// Precalculated index based on "largest < smallest" can be used to
// provide right bound.
// (3) key is larger than largest, implying it is also larger than smallest.
// Precalculated index based on "largest > largest" can be used to
// provide left bound.
//
// As a result, we will need to do:
// Compare smallest (<=) and largest keys from upper level file with
// smallest key from lower level to get a right bound.
// Compare smallest (>=) and largest keys from upper level file with
// largest key from lower level to get a left bound.
//
// Example:
// level 1: [50 - 60]
// level 2: [1 - 40], [45 - 55], [58 - 80]
// A key 35, compared to be less than 50, 3rd file on level 2 can be
// skipped according to rule (1). LB = 0, RB = 1.
// A key 53, sits in the middle 50 and 60. 1st file on level 2 can be
// skipped according to rule (2)-a, but the 3rd file cannot be skipped
// because 60 is greater than 58. LB = 1, RB = 2.
// A key 70, compared to be larger than 60. 1st and 2nd file can be skipped
// according to rule (3). LB = 2, RB = 2.
void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index,
const int cmp_smallest,
const int cmp_largest, int32_t* left_bound,
int32_t* right_bound) const {
assert(level > 0);
const IndexUnit* index_units = next_level_index_[level].index_units;
const auto& index = index_units[file_index];
if (cmp_smallest < 0) {
*left_bound = (level > 0 && file_index > 0)
? index_units[file_index - 1].largest_lb
: 0;
*right_bound = index.smallest_rb;
} else if (cmp_smallest == 0) {
*left_bound = index.smallest_lb;
*right_bound = index.smallest_rb;
} else if (cmp_smallest > 0 && cmp_largest < 0) {
*left_bound = index.smallest_lb;
*right_bound = index.largest_rb;
} else if (cmp_largest == 0) {
*left_bound = index.largest_lb;
*right_bound = index.largest_rb;
} else if (cmp_largest > 0) {
*left_bound = index.largest_lb;
*right_bound = level_rb_[level + 1];
} else {
assert(false);
}
}
看完上面這些我們繼續來看RocksDB對于檔案的查找.這裡所有對于key的查找都是在table_cache_->Get中.這裡我們暫且略過這個函數的實作,最後我們再來詳細分析這個函數.
while (f != nullptr) {
................................
*status = table_cache_->Get(
read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
}
.......................
}
當table_cache_->Get傳回之後,我們需要根據get_context來判斷傳回的結果
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kMerge:
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
return;
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
return;
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case GetContext::kBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
return;
}
如果沒有發現對應的值則進入下一次檔案查找
f = fp.GetNextFile();
最後我們來詳細分析最核心的函數TableCache::Get,這個函數不僅僅是傳回對應的查找結果,并且還會cache相應的檔案資訊,并且如果row_cache打開,他還會做row cache.這裡row cache就是對目前的所需要查找的key在目前sst中對應的value進行cache.
先來看如果打開了row cache,RocksDB将會如何處理,首先它會計算row cache的key.通過下面的代碼我們可以看到row cache的key就是fd_number+seq_no+user_key.
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
uint64_t seq_no =
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());
然後就是在row cache中進行一次查找.如果有對應的值則直接傳回結果,否則則将會在對應的sst讀取傳遞進來的key.
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
....................................
done = true;
} else {
// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
row_cache_entry = &row_cache_entry_buffer;
}
接下來就是需要在對應的sst檔案讀取對應的key的值,這裡可以看到每一個fd都包含了一個TableReader的結構,這個結構就是用來儲存檔案的内容.而我們的table_cache主要就是緩存這個結構.
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (!done && s.ok()) {
if (t == nullptr) {
s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
..........................
}
上面的代碼會直接調用TableCache::FindTable, 這個函數主要是用來實作對應tablereader的讀取以及row cache.
Status TableCache::FindTable(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters,
int level,
bool prefetch_index_and_filter_in_cache) {
...................................................
if (*handle == nullptr) {
if (no_io) { // Don't do IO and return a not-found status
return Status::Incomplete("Table not found in table_cache, no_io is set");
}
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
false /* sequential mode */, 0 /* readahead */,
record_read_stats, file_read_hist, &table_reader,
skip_filters, level, prefetch_index_and_filter_in_cache);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
}
}
}
return s;
}
通過上面的代碼可以看到實作很簡單,就是一般的cache邏輯,讀取然後判斷是否存在,不存在則插入到cache. 上面的函數會調用 TableCache::GetTableReader,我們來簡單看下這個函數.
Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
bool for_compaction) {
..........................................
if (s.ok()) {
...............................................
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, env_options, internal_comparator,
skip_filters, level),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
}
return s;
}
可以看到最關鍵的調用就是調用ioptions_.table_factory->NewTableReader, 這裡RocksDB會根據我們配置的不同的sst格式來調用不同的reader,而在RocksDB中預設的格式是基于block.
// Create default block based table factory.
extern TableFactory* NewBlockBasedTableFactory(
const BlockBasedTableOptions& table_options = BlockBasedTableOptions());
這裡我們隻需要知道最終緩存的tablereader就是一個BlockBasedTable對象(假設使用了基于block的sst format).
當讀取完畢TableReader之後,RocksDB就需要從sst檔案中get key了,也就是最終的key查找方式是在每個sst format class的Get方法中實作的。
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, skip_filters);
get_context->SetReplayLog(nullptr);
}
和上面一樣,這裡的get也就是對應的sst format的get.
最後如果查找到key,則開始緩存對應的kv到row_cache.
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
參考資料
http://mysql.taobao.org/monthly/2018/07/