簡介
在RocksDB中,每次寫入它都會先寫WAL,然後再寫入MemTable,這次我們就來分析這兩個邏輯具體是如何實作的.
首先需要明确的是在RocksDB中,WAL的寫入是單線程順序串行寫入的,而MemTable則是可以并發多線程寫入的。
而在RocksDB 5.5中引進了一個選項enable_pipelined_write,這個選項的目的就是将WAL和MemTable的寫入pipeline化,
也就是說當一個線程寫完畢WAL之後,此時在WAL的write隊列中等待的其他的write則會開始繼續寫入WAL, 而目前線程将會繼續
寫入MemTable.此時就将不同的Writer的寫入WAL和寫入MemTable并發執行了.
實作
我們這裡隻來分析pipeline的實作,核心函數就是DBImpl::PipelinedWriteImpl.
- 每一個DB(DBImpl)都有一個write_thread_(class WriteThread).
- 每次調用Write的時候會先寫入WAL, 此時建立一個WriteThread::Writer對象,并将這個對象加入到一個Group中(調用JoinBatchGroup)
WriteThread::Writer w(write_options, my_batch, callback, log_ref, disable_memtable); write_thread_.JoinBatchGroup(&w);
-
然後我們來看JoinBatchGroup,這個函數主要是用來講所有的寫入WAL加入到一個Group中.這裡可以看到當目前的Writer
對象是leader(比如第一個進入的對象)的時候将會直接傳回,否則将會等待知道更新為對應的狀态.
void WriteThread::JoinBatchGroup(Writer* w) { ................................... bool linked_as_leader = LinkOne(w, &newest_writer_); if (linked_as_leader) { SetState(w, STATE_GROUP_LEADER); } TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); if (!linked_as_leader) { /** * Wait util: * 1) An existing leader pick us as the new leader when it finishes * 2) An existing leader pick us as its follewer and * 2.1) finishes the memtable writes on our behalf * 2.2) Or tell us to finish the memtable writes in pralallel * 3) (pipelined write) An existing leader pick us as its follower and * finish book-keeping and WAL write for us, enqueue us as pending * memtable writer, and * 3.1) we become memtable writer group leader, or * 3.2) an existing memtable writer group leader tell us to finish memtable * writes in parallel. */ AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &jbg_ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } }
-
然後我們來看LinkOne函數,這個函數主要用來講目前的Writer對象加入到group中,這裡可以看到由于
寫入是并發的是以對應的newest_writer_(儲存最新的寫入對象)需要原子操作來更新.
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) { assert(newest_writer != nullptr); assert(w->state == STATE_INIT); Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { w->link_older = writers; if (newest_writer->compare_exchange_weak(writers, w)) { return (writers == nullptr); } } }
-
當從JoinBatchGroup傳回之後,當目前的Writer對象為leader的話,則将會把此leader下的所有的write都
連結到一個WriteGroup中(調用EnterAsBatchGroupLeader函數), 并開始寫入WAL,這裡要注意非leader的write将會直接
進入memtable的寫入,這是因為非leader的write都将會被目前它所從屬的leader來打包(group)寫入,後面我們會看到實作.
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group) { assert(leader->link_older == nullptr); assert(leader->batch != nullptr); assert(write_group != nullptr); ................................................ Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); // This is safe regardless of any db mutex status of the caller. Previous // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks // (they emptied the list and then we added ourself as leader) or had to // explicitly wake us up (the list was non-empty when we added ourself, // so we have already received our MarkJoined). CreateMissingNewerLinks(newest_writer); // Tricky. Iteration start (leader) is exclusive and finish // (newest_writer) is inclusive. Iteration goes from old to new. Writer* w = leader; while (w != newest_writer) { w = w->link_newer; ......................................... w->write_group = write_group; size += batch_size; write_group->last_writer = w; write_group->size++; } .............................. }
-
這裡注意到周遊是通過link_newer進行的,之是以這樣做是相當于在寫入WAL之前,對于目前leader的Write
做一次snapshot(通過CreateMissingNewerLinks函數).
void WriteThread::CreateMissingNewerLinks(Writer* head) { while (true) { Writer* next = head->link_older; if (next == nullptr || next->link_newer != nullptr) { assert(next == nullptr || next->link_newer == head); break; } next->link_newer = head; head = next; } }
- 上述操作進行完畢之後,進入寫WAL操作,最終會把這個write_group打包成一個writeBatch(通過MergeBatch函數)進行寫入.
if (w.ShouldWriteToWAL()) {
...............................
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
}
-
當目前的leader将它自己與它的follow寫入之後,此時它将需要寫入memtable,那麼此時之前還阻塞的Writer,分為兩種情況
第一種是已經被目前的leader打包寫入到WAL,這些writer(包括leader自己)需要将他們連結到memtable writer list.還有一種情況,那就是還沒有寫入WAL的,此時這類writer則需要選擇一個leader然後繼續寫入WAL.
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Status status) { Writer* leader = write_group.leader; Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); ..................................... if (enable_pipelined_write_) { // Notify writers don't write to memtable to exit. ...................................... // Link the ramaining of the group to memtable writer list. if (write_group.size > 0) { if (LinkGroup(write_group, &newest_memtable_writer_)) { // The leader can now be different from current writer. SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); } } // Reset newest_writer_ and wake up the next leader. Writer* newest_writer = last_writer; if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { Writer* next_leader = newest_writer; while (next_leader->link_older != last_writer) { next_leader = next_leader->link_older; assert(next_leader != nullptr); } next_leader->link_older = nullptr; SetState(next_leader, STATE_GROUP_LEADER); } AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &eabgl_ctx); } else { ..................................... } }
- 接下來我們來看寫入memtable的操作,這裡邏輯類似寫入WAL,如果是leader的話,則依舊會建立一個group(WriteGroup),然後周遊需要寫入memtable的writer,将他們都加入到group中(EnterAsMemTableWriter),然後則設定并發執行的大小,以及設定對應狀态(LaunchParallelMemTableWriters).這裡注意每次setstate就将會喚醒之前阻塞的Writer.
void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { assert(write_group != nullptr); write_group->running.store(write_group->size); for (auto w : *write_group) { SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); } }
- 這裡要注意,在構造memtable的group的時候,我們不需要建立link_newer,因為之前在寫入WAL的時候,我們已經構造好link_newer,那麼此時我們使用構造好的group也就是表示這個group中包含的都是已經寫入到WAL的操作.
void WriteThread::EnterAsMemTableWriter(Writer* leader, WriteGroup* write_group) { .................................... if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) { .................................................... } write_group->last_writer = last_writer; write_group->last_sequence = last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1; }
-
最後開始執行寫入MemTable的操作,之前在寫入WAL的時候被阻塞的所有Writer此時都會進入下面這個邏輯,此時也就意味着
并發寫入MemTable.
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { ......................... w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); if (write_thread_.CompleteParallelMemTableWriter(&w)) { MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); write_thread_.ExitAsMemTableWriter(&w, *w.write_group); } }
-
最後當目前group的所有Writer都寫入MemTable之後,則将會調用ExitAsMemTableWriter來進行收尾工作.如果有新的memtable
writer list需要處理,那麼則喚醒對應的Writer,然後設定已經處理完畢的Writer的狀态.
void WriteThread::ExitAsMemTableWriter(Writer* /*self*/, WriteGroup& write_group) { Writer* leader = write_group.leader; Writer* last_writer = write_group.last_writer; Writer* newest_writer = last_writer; if (!newest_memtable_writer_.compare_exchange_strong(newest_writer, nullptr)) { CreateMissingNewerLinks(newest_writer); Writer* next_leader = last_writer->link_newer; assert(next_leader != nullptr); next_leader->link_older = nullptr; SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER); } Writer* w = leader; while (true) { if (!write_group.status.ok()) { w->status = write_group.status; } Writer* next = w->link_newer; if (w != leader) { SetState(w, STATE_COMPLETED); } if (w == last_writer) { break; } w = next; } // Note that leader has to exit last, since it owns the write group. SetState(leader, STATE_COMPLETED); }
總結
我們可以看到在RocksDB中,WAL的寫入始終是串行寫入,而MemTable可以多線程并發寫入,也就是說在系統壓力到一定階段的時候,
寫入WAL肯定會成為瓶頸.