天天看點

c++的ThreadPool,OpenHarmony源碼實作版賞析和使用

前言

c++11雖然加入了線程庫thread,然而 c++ 對于多線程的支援還是比較低級,稍微進階一點的用法都需要自己去實作。比如備受期待的網絡庫至今标準庫裡還沒有支援,常用acl或asio替代。鴻蒙OpenHarmony源碼中的網絡棧子產品部分,也是十分漂亮的實作,值得學習研究。

c++的ThreadPool實作,網上有很多個版本,文章的末尾就有兩種不同的實作。然而經過對比發現,還是OpenHarmony源碼的實作最優雅。代碼簡練,且直覺易懂。寫的真漂亮!隻是使用起來稍麻煩些,比如不支援lambda的寫法。後續可基于此改造,使其支援lambda函數的調用。

關于線程池

簡單來說就是有一堆已經建立好的線程(最大數目一定),初始時他們都處于空閑狀态。當有新的任務進來,從線程池中取出一個空閑的線程處理任務然後當任務處理完成之後,該線程被重新放回到線程池中,供其他的任務使用。當線程池中的線程都在處理任務時,就沒有空閑線程供使用,此時,若有新的任務産生,隻能等待線程池中有線程結束任務空閑才能執行。

線程池優點

線程本來就是可重用的資源,不需要每次使用時都進行初始化。是以可以采用有限的線程個數處理無限的任務。既可以提高速度和效率,又降低線程頻繁建立的開銷。比如要異步幹的活,就沒必要等待。丢到線程池裡處理,結果在回調中處理。頻繁執行的異步任務,若每次都建立線程勢必造成不小的開銷。

源碼位置

​​OpenHarmony,智能終端裝置作業系統的架構和平台​​

該網絡子產品的github位址:​​communication_netstack: 網絡協定棧​​

harmonyos\communication_netstack-master\utils\common_utils\include\thread_pool.h      

網絡協定棧子產品作為電話子系統可裁剪部件,主要分為HTTP和socket子產品。

網絡協定棧子產品的源碼結構: 

/foundation/communication/netstack
├─figures                            # 架構圖
├─frameworks                         # API實作
│  └─js                              # JS API實作
│      ├─builtin                     # 小型系統JS API實作
│      └─napi                        # 标準系統JS API實作
│          ├─http                    # http API
│          ├─socket                  # socket API
│          └─websocket               # websocket API
├─interfaces                         # JS 接口定義
├─test                               # 測試
└─utils                              # 工具      

圖  socket接口架構圖 

c++的ThreadPool,OpenHarmony源碼實作版賞析和使用

ThreadPool源碼

/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef NETSTACK_THREAD_POOL
#define NETSTACK_THREAD_POOL

#include <atomic>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>

namespace OHOS::NetStack {
template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
public:
    /**
     * disallow default constructor
     */
    ThreadPool() = delete;

    /**
     * disallow copy and move
     */
    ThreadPool(const ThreadPool &) = delete;

    /**
     * disallow copy and move
     */
    ThreadPool &operator=(const ThreadPool &) = delete;

    /**
     * disallow copy and move
     */
    ThreadPool(ThreadPool &&) = delete;

    /**
     * disallow copy and move
     */
    ThreadPool &operator=(ThreadPool &&) = delete;

    /**
     * make DEFAULT_THREAD_NUM threads
     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
     */
    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
    {
        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
            std::thread([this] { RunTask(); }).detach();
        }
    }

    /**
     * if ~ThreadPool, terminate all thread
     */
    ~ThreadPool()
    {
        // set needRun_ = false, and notify all the thread to wake and terminate
        needRun_ = false;
        while (runningNum_ > 0) {
            needRunCondition_.notify_all();
        }
    }

    /**
     * push it to taskQueue_ and notify a thread to run it
     * @param task new task to Execute
     */
    void Push(const Task &task)
    {
        PushTask(task);

        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
            std::thread([this] { RunTask(); }).detach();
        }

        needRunCondition_.notify_all();
    }

private:
    bool IsQueueEmpty()
    {
        std::lock_guard<std::mutex> guard(mutex_);
        return taskQueue_.empty();
    }

    bool GetTask(Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);

        // if taskQueue_ is empty, means timeout
        if (taskQueue_.empty()) {
            return false;
        }

        // if run to this line, means that taskQueue_ is not empty
        task = taskQueue_.top();
        taskQueue_.pop();
        return true;
    }

    void PushTask(const Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        taskQueue_.push(task);
    }

    class NumWrapper {
    public:
        NumWrapper() = delete;

        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
        {
            ++num_;
        }

        ~NumWrapper()
        {
            --num_;
        }

    private:
        std::atomic<uint32_t> &num_;
    };

    void Sleep()
    {
        std::mutex needRunMutex;
        std::unique_lock<std::mutex> lock(needRunMutex);

        /**
         * if the thread is waiting, it is idle
         * if wake up, this thread is not idle:
         *     1 this thread should return
         *     2 this thread should run task
         *     3 this thread should go to next loop
         */
        NumWrapper idleWrapper(idleThreadNum_);
        (void)idleWrapper;

        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
                                   [this] { return !needRun_ || !IsQueueEmpty(); });
    }

    void RunTask()
    {
        NumWrapper runningWrapper(runningNum_);
        (void)runningWrapper;

        while (needRun_) {
            Task task;
            if (GetTask(task)) {
                task.Execute();
                continue;
            }

            Sleep();

            if (!needRun_) {
                return;
            }

            if (GetTask(task)) {
                task.Execute();
                continue;
            }

            if (runningNum_ > DEFAULT_THREAD_NUM) {
                return;
            }
        }
    }

private:
    /**
     * other thread put a task to the taskQueue_
     */
    std::mutex mutex_;
    std::priority_queue<Task> taskQueue_;
    /**
     * 1 terminate the thread if it is idle for timeout_ seconds
     * 2 wait for the thread started util timeout_
     * 3 wait for the thread notified util timeout_
     * 4 wait for the thread terminated util timeout_
     */
    uint32_t timeout_;
    /**
     * if idleThreadNum_ is zero, make a new thread
     */
    std::atomic<uint32_t> idleThreadNum_;
    /**
     * when ThreadPool object is deleted, wait until runningNum_ is zero.
     */
    std::atomic<uint32_t> runningNum_;
    /**
     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
     */
    std::atomic_bool needRun_;
    std::condition_variable needRunCondition_;
};
} // namespace OHOS::NetStack
#endif /* NETSTACK_THREAD_POOL */      

源碼賞析

從這份源碼裡,可以看到queue是如何安全的被使用的。之前部落客有篇文章,記錄了多線程下使用queue造成的崩潰問題。

通過華為鴻蒙源碼的學習研究,可以發現queue的安全使用方式top和pop以及empty的判斷都是使用了 std::lock_guard互斥量原子操作的保護。也證明了部落客上篇文章分析中提到的,類似隊列這種操作,要確定在一個原子操作内完成,不可被打斷。試想一個線程剛好pop,另外一個線程卻剛要執行top會怎樣?那樣邏輯就錯了。 

這份源碼的實作,沒有使用一些較難了解的文法,基本上就是使用線程+優先級隊列實作的。提前建立指定數目的線程,每次取一個任務并執行。任務隊列負責存放線程需要處理的任務,工作線程負責從任務隊列中取出和運作任務,可以看成是一個生産者和多個消費者的模型。

ThreadPool使用

以下是該版本thread_pool的簡單使用示例,可以看到使用稍微麻煩了些。必須定義格式如下的task類,必須實作operator<和Execute()方法。

需要注意的是,若有多個同一個實作的task執行個體放入thread_pool,Execute()方法内的邏輯可是在多線程環境下的,需注意多線程下變量通路的保護。如同以下示例,同一個task類的多個執行個體放入了thread_pool,不加std::lock_guard列印出的顯示是亂的。

#include "doctest.h"
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
#include <stdexcept>
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END

//#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
//#define DOCTEST_CONFIG_DISABLE
#include <string>
#include <iostream>
#include "thread_pool.h"
//
// Created by Administrator on 2022/8/10.
//
class Task {
public:
    Task() = default;

    explicit Task(std::string context){
        mContext = context;
    }

    bool operator<(const Task &e) const{
        return priority_ < e.priority_;
    }

    void Execute(){
        std::lock_guard<std::mutex> guard(mutex_);
        std::cout <<  "task is execute,name is:"<<mContext<<std::endl;
    }

public:
    uint32_t priority_;
private:
    std::string mContext;
    static std::mutex mutex_;
};


#define DEFAULT_THREAD_NUM 3
#define MAX_THREAD_NUM 6
#define TIME_OUT 500

std::mutex Task::mutex_;

static int myTest(){
    static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT);

    Task task1("name_1");
    Task task2("name_2");
    Task task3("name_3");
    Task task4("name_4");
    threadPool_.Push(task1);
    threadPool_.Push(task2);
    threadPool_.Push(task3);
    threadPool_.Push(task4);

    //system("pause");

    return 0;
}

TEST_CASE("threadPool simple use example, test by doctest unit tool") {
    myTest();
}      

繼續閱讀