天天看點

MySQL · RocksDB · 寫入邏輯的實作總結

簡介

在RocksDB中,每次寫入它都會先寫WAL,然後再寫入MemTable,這次我們就來分析這兩個邏輯具體是如何實作的.

首先需要明确的是在RocksDB中,WAL的寫入是單線程順序串行寫入的,而MemTable則是可以并發多線程寫入的。

而在RocksDB 5.5中引進了一個選項enable_pipelined_write,這個選項的目的就是将WAL和MemTable的寫入pipeline化,

也就是說當一個線程寫完畢WAL之後,此時在WAL的write隊列中等待的其他的write則會開始繼續寫入WAL, 而目前線程将會繼續

寫入MemTable.此時就将不同的Writer的寫入WAL和寫入MemTable并發執行了.

實作

我們這裡隻來分析pipeline的實作,核心函數就是DBImpl::PipelinedWriteImpl.

  • 每一個DB(DBImpl)都有一個write_thread_(class WriteThread).
  • 每次調用Write的時候會先寫入WAL, 此時建立一個WriteThread::Writer對象,并将這個對象加入到一個Group中(調用JoinBatchGroup)
    WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                          disable_memtable);
    write_thread_.JoinBatchGroup(&w);
               
  • 然後我們來看JoinBatchGroup,這個函數主要是用來講所有的寫入WAL加入到一個Group中.這裡可以看到當目前的Writer

    對象是leader(比如第一個進入的對象)的時候将會直接傳回,否則将會等待知道更新為對應的狀态.

    void WriteThread::JoinBatchGroup(Writer* w) {
    ...................................
    bool linked_as_leader = LinkOne(w, &newest_writer_);
    if (linked_as_leader) {
      SetState(w, STATE_GROUP_LEADER);
    }
    
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
    
    if (!linked_as_leader) {
      /**
       * Wait util:
       * 1) An existing leader pick us as the new leader when it finishes
       * 2) An existing leader pick us as its follewer and
       * 2.1) finishes the memtable writes on our behalf
       * 2.2) Or tell us to finish the memtable writes in pralallel
       * 3) (pipelined write) An existing leader pick us as its follower and
       *    finish book-keeping and WAL write for us, enqueue us as pending
       *    memtable writer, and
       * 3.1) we become memtable writer group leader, or
       * 3.2) an existing memtable writer group leader tell us to finish memtable
       *      writes in parallel.
       */
      AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
                        STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
                 &jbg_ctx);
      TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
    }
    }
               
  • 然後我們來看LinkOne函數,這個函數主要用來講目前的Writer對象加入到group中,這裡可以看到由于

    寫入是并發的是以對應的newest_writer_(儲存最新的寫入對象)需要原子操作來更新.

    bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
    assert(newest_writer != nullptr);
    assert(w->state == STATE_INIT);
    Writer* writers = newest_writer->load(std::memory_order_relaxed);
    while (true) {
      w->link_older = writers;
      if (newest_writer->compare_exchange_weak(writers, w)) {
        return (writers == nullptr);
      }
    }
    }
               
  • 當從JoinBatchGroup傳回之後,當目前的Writer對象為leader的話,則将會把此leader下的所有的write都

    連結到一個WriteGroup中(調用EnterAsBatchGroupLeader函數), 并開始寫入WAL,這裡要注意非leader的write将會直接

    進入memtable的寫入,這是因為非leader的write都将會被目前它所從屬的leader來打包(group)寫入,後面我們會看到實作.

    size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
                                              WriteGroup* write_group) {
    assert(leader->link_older == nullptr);
    assert(leader->batch != nullptr);
    assert(write_group != nullptr);
    ................................................
    Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
    
    // This is safe regardless of any db mutex status of the caller. Previous
    // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
    // (they emptied the list and then we added ourself as leader) or had to
    // explicitly wake us up (the list was non-empty when we added ourself,
    // so we have already received our MarkJoined).
    CreateMissingNewerLinks(newest_writer);
    
    // Tricky. Iteration start (leader) is exclusive and finish
    // (newest_writer) is inclusive. Iteration goes from old to new.
    Writer* w = leader;
    while (w != newest_writer) {
      w = w->link_newer;
    .........................................
      w->write_group = write_group;
      size += batch_size;
      write_group->last_writer = w;
      write_group->size++;
    }
    ..............................
    }
               
  • 這裡注意到周遊是通過link_newer進行的,之是以這樣做是相當于在寫入WAL之前,對于目前leader的Write

    做一次snapshot(通過CreateMissingNewerLinks函數).

    void WriteThread::CreateMissingNewerLinks(Writer* head) {
    while (true) {
      Writer* next = head->link_older;
      if (next == nullptr || next->link_newer != nullptr) {
        assert(next == nullptr || next->link_newer == head);
        break;
      }
      next->link_newer = head;
      head = next;
    }
    }
               
  • 上述操作進行完畢之後,進入寫WAL操作,最終會把這個write_group打包成一個writeBatch(通過MergeBatch函數)進行寫入.
if (w.ShouldWriteToWAL()) {
...............................
      w.status = WriteToWAL(wal_write_group, log_writer, log_used,
                            need_log_sync, need_log_dir_sync, current_sequence);
    }
           
  • 當目前的leader将它自己與它的follow寫入之後,此時它将需要寫入memtable,那麼此時之前還阻塞的Writer,分為兩種情況

    第一種是已經被目前的leader打包寫入到WAL,這些writer(包括leader自己)需要将他們連結到memtable writer list.還有一種情況,那就是還沒有寫入WAL的,此時這類writer則需要選擇一個leader然後繼續寫入WAL.

    void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
                                           Status status) {
    Writer* leader = write_group.leader;
    Writer* last_writer = write_group.last_writer;
    assert(leader->link_older == nullptr);
    .....................................
    
    if (enable_pipelined_write_) {
      // Notify writers don't write to memtable to exit.
    ......................................
      // Link the ramaining of the group to memtable writer list.
      if (write_group.size > 0) {
        if (LinkGroup(write_group, &newest_memtable_writer_)) {
          // The leader can now be different from current writer.
          SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
        }
      }
      // Reset newest_writer_ and wake up the next leader.
      Writer* newest_writer = last_writer;
      if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
        Writer* next_leader = newest_writer;
        while (next_leader->link_older != last_writer) {
          next_leader = next_leader->link_older;
          assert(next_leader != nullptr);
        }
        next_leader->link_older = nullptr;
        SetState(next_leader, STATE_GROUP_LEADER);
      }
      AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
                             STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
                 &eabgl_ctx);
    } else {
     .....................................
    }
    }
               
  • 接下來我們來看寫入memtable的操作,這裡邏輯類似寫入WAL,如果是leader的話,則依舊會建立一個group(WriteGroup),然後周遊需要寫入memtable的writer,将他們都加入到group中(EnterAsMemTableWriter),然後則設定并發執行的大小,以及設定對應狀态(LaunchParallelMemTableWriters).這裡注意每次setstate就将會喚醒之前阻塞的Writer.
    void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
    assert(write_group != nullptr);
    write_group->running.store(write_group->size);
    for (auto w : *write_group) {
      SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
    }
    }
               
  • 這裡要注意,在構造memtable的group的時候,我們不需要建立link_newer,因為之前在寫入WAL的時候,我們已經構造好link_newer,那麼此時我們使用構造好的group也就是表示這個group中包含的都是已經寫入到WAL的操作.
    void WriteThread::EnterAsMemTableWriter(Writer* leader,
                                          WriteGroup* write_group) {
    ....................................
    
    if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
     ....................................................
    }
    
    write_group->last_writer = last_writer;
    write_group->last_sequence =
        last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
    }
               
  • 最後開始執行寫入MemTable的操作,之前在寫入WAL的時候被阻塞的所有Writer此時都會進入下面這個邏輯,此時也就意味着

    并發寫入MemTable.

    if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    .........................
      w.status = WriteBatchInternal::InsertInto(
          &w, w.sequence, &column_family_memtables, &flush_scheduler_,
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
          true /*concurrent_memtable_writes*/);
      if (write_thread_.CompleteParallelMemTableWriter(&w)) {
        MemTableInsertStatusCheck(w.status);
        versions_->SetLastSequence(w.write_group->last_sequence);
        write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
      }
    }
               
  • 最後當目前group的所有Writer都寫入MemTable之後,則将會調用ExitAsMemTableWriter來進行收尾工作.如果有新的memtable

    writer list需要處理,那麼則喚醒對應的Writer,然後設定已經處理完畢的Writer的狀态.

    void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
                                         WriteGroup& write_group) {
    Writer* leader = write_group.leader;
    Writer* last_writer = write_group.last_writer;
    
    Writer* newest_writer = last_writer;
    if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
                                                         nullptr)) {
      CreateMissingNewerLinks(newest_writer);
      Writer* next_leader = last_writer->link_newer;
      assert(next_leader != nullptr);
      next_leader->link_older = nullptr;
      SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
    }
    Writer* w = leader;
    while (true) {
      if (!write_group.status.ok()) {
        w->status = write_group.status;
      }
      Writer* next = w->link_newer;
      if (w != leader) {
        SetState(w, STATE_COMPLETED);
      }
      if (w == last_writer) {
        break;
      }
      w = next;
    }
    // Note that leader has to exit last, since it owns the write group.
    SetState(leader, STATE_COMPLETED);
    }
               

總結

我們可以看到在RocksDB中,WAL的寫入始終是串行寫入,而MemTable可以多線程并發寫入,也就是說在系統壓力到一定階段的時候,

寫入WAL肯定會成為瓶頸.

繼續閱讀