概述
首先我們知道在RocksDB中,最終資料的持久化都是儲存在SST中,而SST則是由Memtable重新整理到磁盤生成的,是以這次我們就主要來分析在RocksDB中何時以及如何來Flush記憶體資料(memtable)到SST.
簡單來說在RocksDB中,每一個ColumnFamily都有自己的Memtable,當Memtable超過固定大小之後(或者WAL檔案超過限制),它将會被設定為immutable,然後會有背景的線程啟動來重新整理這個immutable memtable到磁盤(SST).
相關設定
- write_buffer_size 表示每個columnfamily的memtable的大小限制
- db_write_buffer_size 總的memtable的大小限制(所有的ColumnFamily).
- max_write_buffer_number 最大的memtable的個數
- min_write_buffer_number_to_merge 表示最小的可以被flush的memtable的個數
Flush Memtable的觸發條件
在下面這幾種條件下RocksDB會flush memtable到磁盤.
- 當某一個memtable的大小超過write_buffer_size.
- 當總的memtable的大小超過db_write_buffer_size.
-
當WAL檔案的大小超過max_total_wal_size之後
最後一個條件的原因是,當WAL檔案大小太大之後,我們需要清理WAL,是以此時我們需要将此WAL對應的資料都重新整理到磁盤,也是重新整理Memtable.
源碼
首先在全局的DBImpl中包含了一個flush_queue_的隊列,這個隊列将會儲存所有的将要被flush到磁盤的ColumnFamily.隻有當目前的ColumnFamily滿足flush條件(cfd->imm()->IsFlushPending())才會将此CF加入到flush隊列.
class DBImpl {
................................
std::deque<ColumnFamilyData*> flush_queue_;
...................
};
然後我們來看IsFlushPending的實作.這個函數的意思就是至少有一個memtable需要被flush.而MemTableList這個類則是儲存了所有的immutable memtables.
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.load(std::memory_order_relaxed));
return true;
}
return false;
}
上面這幾個變量的含義在注釋中比較清楚, 而min_write_buffer_number_to_merge_就是min_write_buffer_number_to_merge.
// the number of elements that still need flushing
int num_flush_not_started_;
// committing in progress
bool commit_in_progress_;
// Requested a flush of all memtables to storage
bool flush_requested_;
可以看到在SchedulePendingFlush函數中,最終會将對應的ColumnFamily加入到flush queue中.
void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
FlushReason flush_reason) {
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
AddToFlushQueue(cfd, flush_reason);
++unscheduled_flushes_;
}
}
而重新整理MemTable到磁盤是一個背景線程來做的,這個背景線程叫做BGWorkFlush,最終這個函數會調用BackgroundFlush函數,而BackgroundFlush主要功能是在flush_queue_中找到一個ColumnFamily然後重新整理它的memtable到磁盤.
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer) {
................................
while (!flush_queue_.empty()) {
// This cfd is already referenced
auto first_cfd = PopFirstFromFlushQueue();
if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
if (first_cfd->Unref()) {
delete first_cfd;
}
continue;
}
// found a flush!
cfd = first_cfd;
break;
}
if (cfd != nullptr) {
....................................
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, log_buffer);
if (cfd->Unref()) {
delete cfd;
}
}
return status;
}
通過上面可以看到最終會調用FlushMemTableToOutputFile來重新整理Memtable到磁盤,等到最後我們來分析這個函數.
而這個重新整理線程的調用是在MaybeScheduleFlushOrCompaction函數中進行的。這裡可以看到重新整理縣城的限制是在max_flushes中設定的.
void DBImpl::MaybeScheduleFlushOrCompaction() {
..........................................
auto bg_job_limits = GetBGJobLimits();
bool is_flush_pool_empty =
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
...........................................
}
在RocksDB中,有一個SwitchMemtable函數,這個函數用來将現在的memtable改變為immutable,然後再建立一個memtable,也就是說理論上來說每一次記憶體的memtable被重新整理到磁盤之前肯定會調用這個函數.而在實作中,每一次調用SwitchMemtable之後,都會調用對應immutable memtable的FlushRequested函數來設定對應memtable的flush_requeseted_, 并且會調用上面的SchedulePendingFlush來将對應的ColumnFamily加入到flush_queue_隊列中.是以這裡我們就通過這幾個函數的調用棧來分析RocksDB中何時會觸發flush操作.
在RocksDB中會有四個地方會調用SwitchMemtable,分别是:
- DbImpl::HandleWriteBufferFull
- DBImpl::SwitchWAL
- DBImpl::FlushMemTable
- DBImpl::ScheduleFlushes
接下來我們就來一個個分析這幾個函數.
先來看HandleWriteBufferFull.這個函數主要是處理所有ColumnFamily的memtable記憶體超過限制的情況.可以看到它會調用SwitchMemtable然後再将對應的cfd加入到flush_queue_,最後再來調用背景重新整理線程.
Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
...................................
for (auto cfd : *versions_->GetColumnFamilySet()) {
...............................
if (cfd_picked != nullptr) {
status = SwitchMemtable(cfd_picked, write_context,
FlushReason::kWriteBufferFull);
if (status.ok()) {
cfd_picked->imm()->FlushRequested();
SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
}
return status;
}
這個函數的調用是在是在寫WAL之前,也就是每次寫WAL都會進行這個判斷.
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool* need_log_sync,
WriteContext* write_context) {
..........................................
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
status = HandleWriteBufferFull(write_context);
}
........................................
}
可以看到會調用write_buffer的shouldflush來判斷是否處理bufferfull.而這個函數很簡單,就是判斷memtable所使用的記憶體是否已經超過限制.
// Should only be called from write thread
bool ShouldFlush() const {
if (enabled()) {
if (mutable_memtable_memory_usage() > mutable_limit_) {
return true;
}
if (memory_usage() >= buffer_size_ &&
mutable_memtable_memory_usage() >= buffer_size_ / 2) {
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
return true;
}
}
return false;
}
而mutable_limit_和buffer_size_的初始化在這裡,這裡buffer_size_就是db_write_buffer_size這個可配置的選項.
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache)
: buffer_size_(_buffer_size),
mutable_limit_(buffer_size_ * 7 / 8),
然後我們來看mutable_memtable_memory_usage和memory_usage,這兩個函數用來傳回整體的write_buffer所使用的記憶體(memory_used_)以及将要被釋放的記憶體(memory_active_),比如一個memory table被标記為immutable,則表示這塊記憶體将要被釋放.
// Only valid if enabled()
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
}
size_t mutable_memtable_memory_usage() const {
return memory_active_.load(std::memory_order_relaxed);
}
然後我們來看SwitchWAL,流程和上面的HandleWriteBufferFull基本一緻.
Status DBImpl::SwitchWAL(WriteContext* write_context) {
...............................................
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
status = SwitchMemtable(cfd, write_context);
if (!status.ok()) {
break;
}
cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
}
}
MaybeScheduleFlushOrCompaction();
return status;
}
這個函數被調用比較簡單,就是判斷是否WAL的大小是否已經超過了設定的wal大小(max_total_wal_size).可以看到它的調用也是在每次寫WAL之前.
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool* need_log_sync,
WriteContext* write_context) {
.................................................
if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
total_log_size_ > GetMaxTotalWalSize())) {
status = SwitchWAL(write_context);
}
然後是FlushMemTable,這個函數用來強制重新整理重新整理memtable到磁盤,比如使用者直接調用Flush接口.可以看到和上面的集中情況基本一緻,switchmemtable->flushrequested->maybescheduleflushorcompaction.
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason, bool writes_stopped) {
Status s;
uint64_t flush_memtable_id = 0;
{
.........................................
// SwitchMemtable() will release and reacquire mutex during execution
s = SwitchMemtable(cfd, &context);
flush_memtable_id = cfd->imm()->GetLatestMemTableID();
if (!writes_stopped) {
write_thread_.ExitUnbatched(&w);
}
cfd->imm()->FlushRequested();
// schedule flush
SchedulePendingFlush(cfd, flush_reason);
MaybeScheduleFlushOrCompaction();
}
...........................
return s;
}
最後我們來看最後一種情況,這種情況和前面三種有一個最大的差別就是前面三種情況的出現都是需要立即調用flush線程來重新整理memtable到磁盤,而還有一種情況則是沒那麼緊急的情況,也就是說可以等到後面某個時間段再調用flush線程來重新整理内容到磁盤.
在這種情況下,每一個memtable都會有一個狀态叫做flush_state_,而每個memtable都有可能有三種狀态.而狀态的更新是通過UpdateFlushState來進行的.這裡可以推測的到這些都是對于單個memtable的限制.
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
void MemTable::UpdateFlushState() {
auto state = flush_state_.load(std::memory_order_relaxed);
if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
// ignore CAS failure, because that means somebody else requested
// a flush
flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
}
而UpdateFlushState什麼時候會被調用呢,很簡單,就是當你每次操作memtable的時候,比如update/add這些操作.
可以看到當shoudflushnow之後,将會設定flush_state_狀态為FLUSH_REQUESTED,也就是此memtable将會被flush.
然後來看shouldflushnow函數,這個函數主要的判斷就是判斷是否目前MemTable的記憶體使用是否超過了write_buffer_size,如果超過了,那麼就傳回true.
bool MemTable::ShouldFlushNow() const {
size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
const double kAllowOverAllocationRatio = 0.6;
// If arena still have room for new block allocation, we can safely say it
// shouldn't flush.
auto allocated_memory = table_->ApproximateMemoryUsage() +
range_del_table_->ApproximateMemoryUsage() +
arena_.MemoryAllocatedBytes();
// if we can still allocate one more block without exceeding the
// over-allocation ratio, then we should not flush.
if (allocated_memory + kArenaBlockSize <
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
return false;
}
// if user keeps adding entries that exceeds write_buffer_size, we need to
// flush earlier even though we still have much available memory left.
if (allocated_memory >
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
return true;
}
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}
然後我們來看當設定了flush_state_狀态之後,會做什麼操作.對應的MEmtable有一個ShouldScheduleFlush函數,這個函數用來傳回目前的memtable是否已經被設定flush_requested狀态位。
bool ShouldScheduleFlush() const {
return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
}
而這個函數會在checkmemtablefull中被調用,這個函數主要用來将已經設定flush_state_為flush_requested的memtable的狀态改變為flush_schedule(意思就是已經進入flush的排程隊列),然後将這個columnfamily加入到對應的排程隊列.
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->mem()->ShouldScheduleFlush() &&
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
flush_scheduler_->ScheduleFlush(cfd);
}
}
}
其中MarkFlushScheduled就是用來改變狀态.
bool MarkFlushScheduled() {
auto before = FLUSH_REQUESTED;
return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
而ScheduleFlush則是比較重要的一個函數,就是用來将對應的CF加入到flush排程隊列(FlushScheduler).
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
std::lock_guard<std::mutex> lock(checking_mutex_);
assert(checking_set_.count(cfd) == 0);
checking_set_.insert(cfd);
#endif // NDEBUG
cfd->Ref();
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
while (!head_.compare_exchange_strong(
node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
// failing CAS updates the first param, so we are already set for
// retry. TakeNextColumnFamily won't happen until after another
// inter-thread synchronization, so we don't even need release
// semantics for this CAS
}
#endif // __clang_analyzer__
}
而checkmemtablefull會在下面三種條件下被調用
- delete操作
- put操作
- merge操作.
然後我們來看flushscheduler如何來排程flush線程.首先在每次寫WAL之前都會調用PreprocessWrite,然後這個函數會判斷flush_scheduler是否為空(也就是是否有已經滿掉的memtable需要重新整理到磁盤).
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool* need_log_sync,
WriteContext* write_context) {
..................................................................
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(write_context);
}
而在SscheduleFlushes中,則會周遊之前所有的需要被flush的memtable,然後調用switchMemtable來進行後續操作.這裡要注意在SwitchMemtable也會觸發調用flush線程.
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
if (cfd->Unref()) {
delete cfd;
}
if (!status.ok()) {
return status;
}
}
return Status::OK();
}
重新整理memtable到sst
在RocksDB中重新整理是通過FlushJob這個類來實作的,整個實作還是比較簡單.最終這裡是調用WriteLevel0Table來重新整理内容到磁盤。這裡就不分析sst的格式了,需要了解具體格式的可以看RocksDB的
wiki.
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
FileMetaData* file_meta) {
...........................................
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table();
if (s.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
s = Status::ShutdownInProgress(
"Database shutdown or Column family drop during flush");
}
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
}
........................................................
}