前言
c++11雖然加入了線程庫thread,然而 c++ 對于多線程的支援還是比較低級,稍微進階一點的用法都需要自己去實作。比如備受期待的網絡庫至今标準庫裡還沒有支援,常用acl或asio替代。鴻蒙OpenHarmony源碼中的網絡棧子產品部分,也是十分漂亮的實作,值得學習研究。
c++的ThreadPool實作,網上有很多個版本,文章的末尾就有兩種不同的實作。然而經過對比發現,還是OpenHarmony源碼的實作最優雅。代碼簡練,且直覺易懂。寫的真漂亮!隻是使用起來稍麻煩些,比如不支援lambda的寫法。後續可基于此改造,使其支援lambda函數的調用。
關于線程池
簡單來說就是有一堆已經建立好的線程(最大數目一定),初始時他們都處于空閑狀态。當有新的任務進來,從線程池中取出一個空閑的線程處理任務然後當任務處理完成之後,該線程被重新放回到線程池中,供其他的任務使用。當線程池中的線程都在處理任務時,就沒有空閑線程供使用,此時,若有新的任務産生,隻能等待線程池中有線程結束任務空閑才能執行。
線程池優點
線程本來就是可重用的資源,不需要每次使用時都進行初始化。是以可以采用有限的線程個數處理無限的任務。既可以提高速度和效率,又降低線程頻繁建立的開銷。比如要異步幹的活,就沒必要等待。丢到線程池裡處理,結果在回調中處理。頻繁執行的異步任務,若每次都建立線程勢必造成不小的開銷。
源碼位置
OpenHarmony,智能終端裝置作業系統的架構和平台
該網絡子產品的github位址:communication_netstack: 網絡協定棧
harmonyos\communication_netstack-master\utils\common_utils\include\thread_pool.h
網絡協定棧子產品作為電話子系統可裁剪部件,主要分為HTTP和socket子產品。
網絡協定棧子產品的源碼結構:
/foundation/communication/netstack
├─figures # 架構圖
├─frameworks # API實作
│ └─js # JS API實作
│ ├─builtin # 小型系統JS API實作
│ └─napi # 标準系統JS API實作
│ ├─http # http API
│ ├─socket # socket API
│ └─websocket # websocket API
├─interfaces # JS 接口定義
├─test # 測試
└─utils # 工具
圖 socket接口架構圖

ThreadPool源碼
/*
* Copyright (c) 2022 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef NETSTACK_THREAD_POOL
#define NETSTACK_THREAD_POOL
#include <atomic>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>
namespace OHOS::NetStack {
template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
public:
/**
* disallow default constructor
*/
ThreadPool() = delete;
/**
* disallow copy and move
*/
ThreadPool(const ThreadPool &) = delete;
/**
* disallow copy and move
*/
ThreadPool &operator=(const ThreadPool &) = delete;
/**
* disallow copy and move
*/
ThreadPool(ThreadPool &&) = delete;
/**
* disallow copy and move
*/
ThreadPool &operator=(ThreadPool &&) = delete;
/**
* make DEFAULT_THREAD_NUM threads
* @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
*/
explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
{
for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
std::thread([this] { RunTask(); }).detach();
}
}
/**
* if ~ThreadPool, terminate all thread
*/
~ThreadPool()
{
// set needRun_ = false, and notify all the thread to wake and terminate
needRun_ = false;
while (runningNum_ > 0) {
needRunCondition_.notify_all();
}
}
/**
* push it to taskQueue_ and notify a thread to run it
* @param task new task to Execute
*/
void Push(const Task &task)
{
PushTask(task);
if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
std::thread([this] { RunTask(); }).detach();
}
needRunCondition_.notify_all();
}
private:
bool IsQueueEmpty()
{
std::lock_guard<std::mutex> guard(mutex_);
return taskQueue_.empty();
}
bool GetTask(Task &task)
{
std::lock_guard<std::mutex> guard(mutex_);
// if taskQueue_ is empty, means timeout
if (taskQueue_.empty()) {
return false;
}
// if run to this line, means that taskQueue_ is not empty
task = taskQueue_.top();
taskQueue_.pop();
return true;
}
void PushTask(const Task &task)
{
std::lock_guard<std::mutex> guard(mutex_);
taskQueue_.push(task);
}
class NumWrapper {
public:
NumWrapper() = delete;
explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
{
++num_;
}
~NumWrapper()
{
--num_;
}
private:
std::atomic<uint32_t> &num_;
};
void Sleep()
{
std::mutex needRunMutex;
std::unique_lock<std::mutex> lock(needRunMutex);
/**
* if the thread is waiting, it is idle
* if wake up, this thread is not idle:
* 1 this thread should return
* 2 this thread should run task
* 3 this thread should go to next loop
*/
NumWrapper idleWrapper(idleThreadNum_);
(void)idleWrapper;
needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
[this] { return !needRun_ || !IsQueueEmpty(); });
}
void RunTask()
{
NumWrapper runningWrapper(runningNum_);
(void)runningWrapper;
while (needRun_) {
Task task;
if (GetTask(task)) {
task.Execute();
continue;
}
Sleep();
if (!needRun_) {
return;
}
if (GetTask(task)) {
task.Execute();
continue;
}
if (runningNum_ > DEFAULT_THREAD_NUM) {
return;
}
}
}
private:
/**
* other thread put a task to the taskQueue_
*/
std::mutex mutex_;
std::priority_queue<Task> taskQueue_;
/**
* 1 terminate the thread if it is idle for timeout_ seconds
* 2 wait for the thread started util timeout_
* 3 wait for the thread notified util timeout_
* 4 wait for the thread terminated util timeout_
*/
uint32_t timeout_;
/**
* if idleThreadNum_ is zero, make a new thread
*/
std::atomic<uint32_t> idleThreadNum_;
/**
* when ThreadPool object is deleted, wait until runningNum_ is zero.
*/
std::atomic<uint32_t> runningNum_;
/**
* when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
*/
std::atomic_bool needRun_;
std::condition_variable needRunCondition_;
};
} // namespace OHOS::NetStack
#endif /* NETSTACK_THREAD_POOL */
源碼賞析
從這份源碼裡,可以看到queue是如何安全的被使用的。之前部落客有篇文章,記錄了多線程下使用queue造成的崩潰問題。
通過華為鴻蒙源碼的學習研究,可以發現queue的安全使用方式top和pop以及empty的判斷都是使用了 std::lock_guard互斥量原子操作的保護。也證明了部落客上篇文章分析中提到的,類似隊列這種操作,要確定在一個原子操作内完成,不可被打斷。試想一個線程剛好pop,另外一個線程卻剛要執行top會怎樣?那樣邏輯就錯了。
這份源碼的實作,沒有使用一些較難了解的文法,基本上就是使用線程+優先級隊列實作的。提前建立指定數目的線程,每次取一個任務并執行。任務隊列負責存放線程需要處理的任務,工作線程負責從任務隊列中取出和運作任務,可以看成是一個生産者和多個消費者的模型。
ThreadPool使用
以下是該版本thread_pool的簡單使用示例,可以看到使用稍微麻煩了些。必須定義格式如下的task類,必須實作operator<和Execute()方法。
需要注意的是,若有多個同一個實作的task執行個體放入thread_pool,Execute()方法内的邏輯可是在多線程環境下的,需注意多線程下變量通路的保護。如同以下示例,同一個task類的多個執行個體放入了thread_pool,不加std::lock_guard列印出的顯示是亂的。
#include "doctest.h"
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
#include <stdexcept>
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
//#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
//#define DOCTEST_CONFIG_DISABLE
#include <string>
#include <iostream>
#include "thread_pool.h"
//
// Created by Administrator on 2022/8/10.
//
class Task {
public:
Task() = default;
explicit Task(std::string context){
mContext = context;
}
bool operator<(const Task &e) const{
return priority_ < e.priority_;
}
void Execute(){
std::lock_guard<std::mutex> guard(mutex_);
std::cout << "task is execute,name is:"<<mContext<<std::endl;
}
public:
uint32_t priority_;
private:
std::string mContext;
static std::mutex mutex_;
};
#define DEFAULT_THREAD_NUM 3
#define MAX_THREAD_NUM 6
#define TIME_OUT 500
std::mutex Task::mutex_;
static int myTest(){
static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT);
Task task1("name_1");
Task task2("name_2");
Task task3("name_3");
Task task4("name_4");
threadPool_.Push(task1);
threadPool_.Push(task2);
threadPool_.Push(task3);
threadPool_.Push(task4);
//system("pause");
return 0;
}
TEST_CASE("threadPool simple use example, test by doctest unit tool") {
myTest();
}