前言:
目前網上的 c++線程池
資源多是使用老版本或者使用系統接口實作,使用c++ 11新特性的不多,最近研究了一下,實作一個簡單版本,可實作任意任意參數函數的調用以及獲得傳回值。
0 前置知識
首先介紹一下用到的c++新特性
- 可變參數模闆:利用這一特性實作任意參數的傳遞
- bind函數,lambda表達式: 用于将帶參數的函數封裝為不帶形參和無傳回值的函數,統一接口
- forward: 完美轉發,防止在函數封裝綁定時改變形參的原始屬性(引用,常量等屬性)
- shared_ptr, unique_ptr:智能指針,程式結束自動析構,不用手動管理資源,省心省力
- thread:c++11 引入的多線程标準庫,完美跨平台
- future:期物,用于子線程結束後擷取結果
- package_task: 異步任務包裝模闆,可以包裝函數用于其它線程.有點類似與function
- function: 函數包裝模闆庫,可以了解為将不同類型但形參和傳回值相同的函數統一的接口
- queue,vecort: 向量,隊列
- mutex: c++ 11引入的互斥鎖對象
- condition_variable: c++ 11引入的條件變量,用于控制線程阻塞
- atmoic:原子變量,++,--,+=,-=這些操作時原子類型的,防止讀取寫于入失敗
1 理論知識
問題0:線程運作完函數後自動就被系統回收了,怎麼才能實作複用呢
答:剛開始我也是比較疑惑,以為有個什麼狀态方法可以調用,線上程結束被銷毀前阻塞住,進而接取下一個任務,實作複用,其實并非如此,線程池實作的原理是,讓線程執行一個死循環任務,當任務隊列為空時,就讓他阻塞防止資源浪費,當有任務時,解除阻塞,讓線程向下執行,當執行完目前函數後,又會再次運作到死循環的的上方,繼續向下執行,進而周而複始的不斷接任務--完成任務--接任務的循環,這裡可以設定一個變量來控制,當想銷毀線程池的時候,讓死循環不再成立,當該線程執行完目前函數後,退出循環,進而銷毀線程,思路很精妙
問題1:傳入的函數多種多樣,怎麼能實作一個統一調用的模式呢
答:用過c++多線程的就應該知道,我們在建立線程時,需要給thread傳遞函數位址和參數,但是我們的任務參數是多種多樣的,數量不一,這時候,我們就需要使用可變參數模闆将函數經過兩次封裝,封裝為統一格式,第一次封裝,封裝為不含有形參的函數,即參數綁定,但此時是有傳回值的,第二次封裝,将函數的傳回值也去除,這樣我們就能使用void()這種統一的形式去調用了。第一次封裝我們使用bind()函數将多個參數的函數封裝為沒有形參的package_task對象,為什麼呢,因為package_task對象可以通過get_future得到future對象,然後future對象可以通過get方法擷取傳回值,這樣我們第二步,就能直接把傳回值也去掉了。
說了這麼多,有點繞,對于沒怎麼使用過新特性的同學來說,可能雲霧缭繞,其實真正想明白這兩個問題,線程池的理論問題就解決了
2代碼實作
總共包含5個檔案,兩個頭檔案,3個源檔案,
2.1 任務隊列頭檔案和實作
這兩個檔案是實作任務隊列的,其實很簡單,兩個方法,一個放入任務,一個取出任務,放入任務就放我們封裝後的
/** Created by Jiale on 2022/3/14 10:19.
* Decryption: 任務隊列頭檔案
**/
#ifndef THREADPOOL_TASKQUEUE_H
#define THREADPOOL_TASKQUEUE_H
#include <queue>
#include <functional>
#include <mutex>
#include <future>
#include <iostream>
class TaskQueue {
public:
using Task = std::function<void()>; // 任務類
template<typename F, typename ...Args>
auto addTask(F &f, Args &&...args) -> std::future<decltype(f(args...))>; // 添加任務
Task takeTask(); // 取任務
bool empty() {return taskQueue.empty();}
private:
std::mutex taskQueueMutex; // 任務隊列互斥鎖
std::queue<Task> taskQueue; // 任務隊列
};
template <typename F, typename ...Args> // 可變參數模闆,模闆必須在頭檔案定義
auto TaskQueue::addTask(F &f, Args &&...args)-> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...)); // 擷取函數傳回值類型
// 将函數封裝為無形參的類型 std::bind(f, std::forward<Args>(args)...):将參數與函數名綁定
// packaged_task<RetType()>(std::bind(f, std::forward<Args>(args)...)); 将綁定參數後的函數封裝為隻有傳回值沒有形參的任務對象,這樣就能使用get_future得到future對象,然後future對象可以通過get方法擷取傳回值了
// std::make_shared<std::packaged_task<RetType()>>(std::bind(f, std::forward<Args>(args)...)); 生成智能指針,離開作用域自動析構
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(f, std::forward<Args>(args)...));
std::lock_guard<std::mutex> lockGuard(taskQueueMutex); // 插入時上鎖,防止多個線程同時插入
// 将函數封裝為無傳回無形參類型,通過lamdba表達式,調用封裝後的函數,注意,此時傳回一個無形參無傳回值的函數對象
taskQueue.emplace([task]{(*task)();});
return task->get_future();
}
#endif //THREADPOOL_TASKQUEUE_H
/** Created by Jiale on 2022/3/14 10:19.
* Decryption: 任務隊列源檔案
**/
#include "include/TaskQueue.h"
/**
* 從任務隊列中取任務
* @return 取出的任務
*/
TaskQueue::Task TaskQueue::takeTask() {
Task task;
std::lock_guard<std::mutex> lockGuard(taskQueueMutex); // 上鎖
if (!taskQueue.empty()) {
task = std::move(taskQueue.front()); // 取出任務
taskQueue.pop(); // 将任務從隊列中删除
return task;
}
return nullptr;
}
可以看出,代碼不多,就是一個簡單的放入任務,取出任務,但是如果沒接觸過這種寫法的時候還是比較難想的,我把那句難了解的代碼拆成三部分
2.2 線程池代碼實作
/** Created by Jiale on 2022/3/14 10:42.
* Decryption: 線程池頭檔案
**/
#ifndef THREADPOOL_THREADPOOL_H
#define THREADPOOL_THREADPOOL_H
#include <atomic>
#include <thread>
#include <condition_variable>
#include "TaskQueue.h"
class ThreadPool {
std::atomic<int> threadNum{}; // 最小線程數
std::atomic<int> busyThreadNum; // 忙線程數
std::condition_variable notEmptyCondVar; // 判斷任務隊列是否非空
std::mutex threadPoolMutex; // 線程池互斥鎖
bool shutdown; // 線程池是否啟動
std::unique_ptr<TaskQueue> taskQueue; // 任務隊列
std::vector<std::shared_ptr<std::thread>> threadVec; // 線程池
public:
explicit ThreadPool(int threadNum = 5); // 建立線程池
~ThreadPool(); // 銷毀線程池
template <typename F, typename ...Args>
auto commit(F &f, Args &&...args) -> decltype(taskQueue->addTask(f, std::forward<Args>(args)...)); // 送出一個任務
void worker();
};
template <typename F, typename ...Args> // 可變參數模闆
auto ThreadPool::commit(F &f, Args &&...args) -> decltype(taskQueue->addTask(f, std::forward<Args>(args)...)){
// 這個目的就是把接收的參數直接轉發給我們上面寫的addTask方法,這樣,就可以對使用者隐藏TaskQueue的細節,隻向使用者暴露ThreadPool就行
auto ret = taskQueue->addTask(f, std::forward<Args>(args)...);
notEmptyCondVar.notify_one();
return ret;
}
#endif //THREADPOOL_THREADPOOL_H
/** Created by Jiale on 2022/3/14 10:42.
* Decryption:線程池源檔案
**/
#include "include/ThreadPool.h"
ThreadPool::ThreadPool(int threadNum) : taskQueue(std::make_unique<TaskQueue>()), shutdown(false), busyThreadNum(0) {
this->threadNum.store(threadNum);
for (int i = 0; i < this->threadNum; ++i) {
threadVec.push_back(std::make_shared<std::thread>(&ThreadPool::worker, this)); // 建立線程
threadVec.back()->detach(); // 建立線程後detach,與主線程脫離
}
}
ThreadPool::~ThreadPool() {
shutdown = true; // 等待線程執行完,就不在去隊列取任務
}
void ThreadPool::worker() {
while (!shutdown) {
std::unique_lock<std::mutex> uniqueLock(threadPoolMutex);
notEmptyCondVar.wait(uniqueLock, [this] { return !taskQueue->empty() || shutdown; }); // 任務隊列為空,阻塞在此,當任務隊列不是空或者線程池關閉時,向下執行
auto currTask = std::move(taskQueue->takeTask()); // 取出任務
uniqueLock.unlock();
++busyThreadNum;
currTask(); // 執行任務
--busyThreadNum;
}
}
2.3 測試
線程池設計好了,我們進行測試,如果我們開5個子線程,處理20個任務,那麼,應該有5個線程ID,且是5個線程并發執行的,我們在測試函數裡睡眠2秒,那麼,總的時間應該是8秒執行完
#include <iostream>
#include <thread>
#include <future>
#include "ThreadPool.h"
using namespace std;
mutex mut;
int func(int x) {
auto now = time(nullptr);
auto dateTime = localtime(&now);
mut.lock(); // 為了防止列印錯亂,我們在這裡加鎖
cout << "任務編号:" << x <<" 執行線程ID: " << this_thread::get_id() << " 目前時間: " << dateTime->tm_min << ":" << dateTime->tm_sec << endl;
mut.unlock();
this_thread::sleep_for(2s);
return x;
}
int main() {
ThreadPool threadPool;
for (int i = 0; i < 20; ++i) auto ret = threadPool.commit(func, i);
this_thread::sleep_for(20s); // 主線程等待,因為現在子線程是脫離狀态,如果主線程關閉,則看不到列印
}
2.4 測試結果
可以看到我們的線程是并發執行的,總共用時從44分20秒,到44分26秒,總共6秒,加上我們最後一次列印沒有停留2秒,總共是8秒,每次列印的線程号也相同,可以看出,我們實作了線程的複用
總結
這隻是多線程的一個簡單實作,很多東西沒有考慮到,比如任務逾時,任務優先級等,當然,我們會了簡單的之後就能慢慢摸索更複雜的功能。感謝閱讀