天天看點

Rocksdb 的優秀代碼(三)-- 工業級 線程池實作分享

文章目錄

  • ​​前言​​
  • ​​1. Rocksdb線程池概覽​​
  • ​​2. Rocksdb 線程池實作​​
  • ​​2.1 基本資料結構​​
  • ​​2.2 線程池建立​​
  • ​​2.3 線程池 排程線程執行​​
  • ​​2.4 線程池銷毀線程​​
  • ​​2.5 線程池優先級排程​​
  • ​​2.6 動态調整線程池 線程數目上限​​
  • ​​3. 總結​​

前言

Rocksdb 作為一個第三方庫的形态嵌入到各個存儲系統之中存儲中繼資料,當rocksdb被使用的時候其内部會自啟動一些線程,随着需要處理的使用者資料越來越多,為了保證性能,rocksdb會讓這一些線程也會不斷增加。而在分布式存儲場景,往往一個機器節點會有很多rocksdb執行個體(64個執行個體,每一個執行個體都會有compaction/flush線程),這個時候在Rocksdb内部使用合理的線程管理方式會節省系統CPU排程資源。

是以Rocksdb自實作的​

​Thread Pool​

​就是為了更好得管理Rocksdb内部線程,除了一些基本的線程排程之外,還會有可控制的線程優先級的排程,因為大多數場景Rocksdb讓Flush線程的優先級高于Compaction線程,而有的場景則需要Compaction的優先級高于Flush,為了更快速的compaction清理掉舊資料。

接下來簡單看一下Rocksdb 線程池的基本實作,本人已經将該線程池代碼摘出來單獨維護,可作為一個獨立線程池去排程。

​​https://github.com/BaronStack/ThreadPool​​

線程池存在的目的 正如上面Rocksdb使用線程池的目的一樣, 能夠更加友善得管理我們應用中的線程,包括但不限于:線程建立,線程資源限制,線程優先級排程,線程銷毀 等。

1. Rocksdb線程池概覽

Rocksdb 實作的線程池支援的特性:

  • 建立/銷毀線程
  • 動态增加、減少線程池線程數目上限(線程池數目需要設定上限,因為Compaction/Flush占用的資源也不能無限增加,需根據實際的Rocksdb 寫入量來動态增加)
  • 支援動态調整 線程CPU 和 I/O優先級(為了暴露足夠的接口給使用者,來讓使用者選擇兩個功能排程的優先順序)

2. Rocksdb 線程池實作

2.1 基本資料結構

// 線程池核心的資料結構
struct Impl {
  private:
  bool low_io_priority_;  // I/O 優先級
  bool low_cpu_priority_; // CPU 優先級
  Env::Priority priority_; // 線程優先級
  Env*         env_;       // 擷取目前線程池的環境變量

  int total_threads_limit_; // 線程池線程總數
  std::atomic_uint queue_len_;  // 目前線程池中執行線程的排隊長度
  bool exit_all_threads_; // 清理線程池時會排程所有未執行的線程
  bool wait_for_jobs_to_complete_; // 等待所有線程池的線程執行完畢

  // Entry per Schedule()/Submit() call
  struct BGItem {
    void* tag = nullptr;
    std::function<void()> function; // 執行函數
    std::function<void()> unschedFunction; // 不執行函數
  };

  using BGQueue = std::deque<BGItem>;
  BGQueue       queue_; // deque 儲存線程池中排程的線程相關的資訊:線程函數、函數參數

  std::mutex               mu_;
  std::condition_variable  bgsignal_; // 條件變量,喚醒正在睡眠的線程
  std::vector<port::Thread> bgthreads_; // 儲存需要排程的線程
}      

線程池類:

class ThreadPoolImpl : public ThreadPool {
 private:
   std::unique_ptr<Impl>   impl_;// 線程池核心資料結構
};      

2.2 線程池建立

Rocksdb維護了一個Env 類,這個類再同一個程序中的多個rocksdb執行個體之間是能夠共享的。是以Rocksdb将這個類作為線程池的入口,進而讓Flush/Compaction 這樣的線程排程過程中,多個db可以隻使用同一個線程池。

Rocksdb實作了多個環境變量:​

​HdfsEnv​

​​,​

​PosixEnv​

​​等,友善Rocksdb的檔案操作/線程操作 接口在不同的環境平台下進行擴充,當然如果使用者變更了新的平台,隻需要支援​

​Env​

​基類的接口,就能擴充到使用者的新平台。

​Env​

​​預設執行個體是​

​PosixEnv​

​,為了保證多db執行個體間共享同一個環境變量,PosixEnv僅維護一個單例。

// 建立Env,初始化幾個類的單例
// 這裡注意調用的順序,先調用ThreadLocalPtr執行個體的初始化,再調用PosixEnv的
// 這樣在Env析構的時候能夠反方向析構,進而保證ThreadLocal的資訊最後一個被清理
Env* Env::Default() {
  ThreadLocalPtr::InitSingletons(); // Threadlocal 執行個體資料,用來通路目前db執行個體運作的線程狀态資訊
  CompressionContextCache::InitSingleton();
  INIT_SYNC_POINT_SINGLETONS();
  static PosixEnv default_env; // 建立posix env 
  return &default_env;
}      

緊接着通過 PosixEnv的構造函數建立線程池

// 根據Env設定的線程優先級,為每一個優先級建立一個線程池(友善優先級線程池的排程)
// 建立多個線程池: enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL };
std::vector<ThreadPoolImpl> thread_pools_;

PosixEnv::PosixEnv()
    : checkedDiskForMmap_(false),
      forceMmapOff_(false),
      page_size_(getpagesize()),
      thread_pools_(Priority::TOTAL),
      allow_non_owner_access_(true) {
  ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
  // 根據優先級建立線程池,預設建立四個線程池,但一般隻會用到兩個(LOW,HIGH)
  for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
    thread_pools_[pool_id].SetThreadPriority(
        static_cast<Env::Priority>(pool_id));
    // This allows later initializing the thread-local-env of each thread.
    thread_pools_[pool_id].SetHostEnv(this);
  }
  thread_status_updater_ = CreateThreadStatusUpdater();
}      

2.3 線程池 排程線程執行

線程池排程棧如下:從入口到具體的線程函數的執行

Env::Schedule() // Env對外接口
   PosixEnv::Schedule()
      ThreadPoolImpl::Schedule() // 線程池的排程入口
         ThreadPoolImpl::Impl::Submit() // 将線程函數、參數、線程回收函數封裝,添加到待排程隊列queue_
            ThreadPoolImpl::Impl::StartBGThreads() 
               ThreadPoolImpl::Impl::BGThreadWrapper() // 更新目前執行的線程狀态并啟動一個排程隊列中的線程
                  ThreadPoolImpl::Impl::BGThread()// 從待排程隊列queue_中排程線程
                     func() // 執行線程函數      

Env的執行個體調用​

​Schedule​

​接口,接收待排程的線程執行函數,參數,所屬優先級線程池,以及線程銷毀函數及其參數。

virtual void Schedule(void (*function)(void* arg), void* arg,
                        Priority pri = LOW, void* tag = nullptr,
                        void (*unschedFunction)(void* arg) = nullptr) = 0;      

後續會執行到​

​ThreadPoolImpl::Impl::Submit()​

void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
  std::function<void()>&& unschedule, void* tag) {

  // 後續需要更新目前線程池的線程排程隊列,需要保證更新過程的原子性
  std::lock_guard<std::mutex> lock(mu_);
  
  // 需要銷毀線程池了,不接受新的線程加入
  if (exit_all_threads_) {
    return;
  }

  // 啟動線程
  StartBGThreads();

  // 更新線程函數相關的資訊 到線程排程隊列尾部(雙端隊列)
  queue_.push_back(BGItem());
  // 更新
  auto& item = queue_.back();
  item.tag = tag;
  item.function = std::move(schedule);
  item.unschedFunction = std::move(unschedule);

  queue_len_.store(static_cast<unsigned int>(queue_.size()),
    std::memory_order_relaxed);

  // 如果正在執行的線程沒有超過線程池線程數限制,則喚醒一個正在休眠的線程
  if (!HasExcessiveThread()) {
    // Wake up at least one waiting thread.
    bgsignal_.notify_one();
  } else { // 。。。這個邏輯不太懂,超過限制之後 不應該就不喚醒了嗎?
    // Need to wake up all threads to make sure the one woken
    // up is not the one to terminate.
    WakeUpAllThreads();
  }
}      

後續的執行就是按照以上調用棧進行的,從線程排程隊列頭部取線程函執行。

2.4 線程池銷毀線程

線程池的銷毀也就是Env變量的析構函數,db被destory或者close,則會進入該邏輯,Env的預設環境變量是PosixEnv,即Env的子類。則會先調用PosixEnv 的析構函數,其中線程池相關的清理邏輯:

整體的調用棧如下:

~PosixEnv()
  ThreadPoolImpl::JoinAllThreads() 
    ThreadPoolImpl::Impl::JoinThreads()      

在析構函數中調用相關的線程清理工作:

~PosixEnv() override {
  // 通過Posix startthread 的接口排程的線程函數并發執行完畢
  for (const auto tid : threads_to_join_) {
    pthread_join(tid, nullptr);
  }
  // 讓不同優先級線程池中待執行線程執行完
  for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
    thread_pools_[pool_id].JoinAllThreads();
  }
  
  // 放置Posix析構過程中不應該thread_status_updater_ ,防止一些子線程更新線程狀态出錯
  // Delete the thread_status_updater_ only when the current Env is not
  // Env::Default().  This is to avoid the free-after-use error when
  // Env::Default() is destructed while some other child threads are
  // still trying to update thread status.
  if (this != Env::Default()) {
    delete thread_status_updater_;
  }
}      

其中​

​JoinAllThreads​

​函數用來喚醒所有子線程的執行,并設定标記防止接收新的線程

void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {

  std::unique_lock<std::mutex> lock(mu_);
  assert(!exit_all_threads_);

  wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
  // 原子(加鎖)方式更新如下變量,用作在submit函數中屏蔽接收新的線程
  exit_all_threads_ = true;
  // prevent threads from being recreated right after they're joined, in case
  // the user is concurrently submitting jobs.
  // 重置線程池的線程上限,防止使用者并發調用submit添加待排程線程
  total_threads_limit_ = 0;

  lock.unlock();

  bgsignal_.notify_all(); //喚醒所有等待在bgsignal_的線程

  for (auto& th : bgthreads_) {// join 執行,直到執行完。
    th.join();
  }

  bgthreads_.clear();

  exit_all_threads_ = false;
  wait_for_jobs_to_complete_ = false;
}      

2.5 線程池優先級排程

之前說過Rocksdb線程池支援 使用者針對不同LOW/HIGH 線程池的I/O或者CPU的優先級設定。

比如 設定LOW線程池具有更低的I/O優先級和CPU優先級

target_->LowerThreadPoolIOPriority(Env::Priority::LOW);
target_->LowerThreadPoolCPUPriority(Env::Priority::LOW);      

具體底層的設定方式是針對之前提到的線程資料結構中的兩個參數​

​Impl::low_io_priority_​

​​和​

​Impl::low_c pu_priority_​

​​進行置位​

​true​

​​。在​

​ThreadPoolImpl::Impl::BGThread​

​​排程函數執行之前,會通過系統調用​

​setpriority​

​​和​

​syscall(SYS_ioprio_set,,,)​

​設定目前線程的I/O和CPU優先級。

void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
  bool low_io_priority = false;
  bool low_cpu_priority = false;

  while (true) {
    // Wait until there is an item that is ready to run
    std::unique_lock<std::mutex> lock(mu_);
    ...

    bool decrease_io_priority = (low_io_priority != low_io_priority_);
    bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
    lock.unlock();

#ifdef OS_LINUX
    // Linux 系統支援 設定CPU優先級
    if (decrease_cpu_priority) {
      setpriority(
          PRIO_PROCESS,
          // Current thread.
          0,
          // Lowest priority possible.
          19);
      low_cpu_priority = true;
    }

    
    if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
      // These system calls only have an effect when used in conjunction
      // with an I/O scheduler that supports I/O priorities. As at
      // kernel 2.6.17 the only such scheduler is the Completely
      // Fair Queuing (CFQ) I/O scheduler.
      // To change scheduler:
      //  echo cfq > /sys/block/<device_name>/queue/schedule
      // Tunables to consider:
      //  /sys/block/<device_name>/queue/slice_idle
      //  /sys/block/<device_name>/queue/slice_sync
      // 設定I/O優先級
      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
              0,                  // current thread
              IOPRIO_PRIO_VALUE(3, 0));
      low_io_priority = true;
    }
#else
    // 非Linux系統的話就不做任何處理了,僅僅保證變量被使用而已,防止編譯warning 
    (void)decrease_io_priority;  // avoid 'unused variable' error
    (void)decrease_cpu_priority;
#endif
    func();
  }
}      

2.6 動态調整線程池 線程數目上限

支援動态調整線程池可排程的線程數目上限,這個能夠限制線程池資源的占用,主要用作Rocksdb 中調整Flush和Compaction的各自所處的HIGH和LOW線程池中的線程數目上限。能夠根據db的工作負載,動态增加或者減少線程池中可排程的線程數目。

void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
  impl_->SetBackgroundThreadsInternal(num, false);
}

void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
  bool allow_reduce) {
  std::unique_lock<std::mutex> lock(mu_);
  // 如果線程池已經要被銷毀了,就不用增加線程池的排程線程數目上限了
  if (exit_all_threads_) {
    lock.unlock();
    return;
  }
  
  // 增加線程數目或者減少線程數目
  // 喚醒休眠的線程并排程背景線程繼續執行。
  if (num > total_threads_limit_ ||
      (num < total_threads_limit_ && allow_reduce)) {
    total_threads_limit_ = std::max(0, num);
    WakeUpAllThreads();
    StartBGThreads();
  }
}      

3. 總結

繼續閱讀