對象池模式在軟體中廣泛使用,例如,線程池,連接配接池,記憶體池等。Boost庫中的pool實作了記憶體池,thread_group實作了簡單的線程池。
以下實作的線程池與boost無關,隻是提供一種思路。

Task類:
對應要執行的任務Task::run, 任務入參Task::mParam.
class Task {
public:
Task(int val=0):
mParam(val)
{
}
virtual ~Task() {
}
void setParam(int val) {
mParam = val;
}
void run() {
thread::id threadId = std::this_thread::get_id();
cout << "param = " << mParam << " ThreadId = " << threadId << endl;
string data = "param = " + std::to_string(mParam)+ " ThreadId = "+ getThreadIdOfString(threadId) +"\n";
myLogger.logData(data);
}
private:
int mParam;
};
threadPool類:
成員變量vector<shared_ptr<thread>> threadCache儲存構造函數中開辟的線程;構造函數中開辟線程,各個線程的入口函數都是threadFunc,如果任務隊列為空則線程bolck,等待任務。
std::queue<Task> mTasksQueue儲存外界傳入的要執行的task.
std::condition_variable mCondVar用于當任務隊列mTasksQueue不為空時,喚醒一個線程,從隊列頭取出一個task執行。
stopAllThreads()用于停止所有線程;
joinThreads()用于等待所有線程結束,在析構函數中被調用。
Note:
threadFunc函數中的線程喚醒條件while (mTasksQueue.empty() && !mReceStopOrder), 不加mReceStopOrder的判斷,易導緻stopAllThreads()可能不起作用(調用stopAllThreads函數時,taskQueue已經為空)。
//c++實作線程池:
class threadPool {
public:
threadPool(int maxThreadsNum=1):
mMaxThreadsNum(maxThreadsNum){
mReceStopOrder = false;
//creat threads:
for (int i = 0; i < mMaxThreadsNum; ++i) {
shared_ptr<thread> th = std::make_shared<thread>(&threadPool::threadFunc, this);//Ok
//shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, this)); //Ok
//shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, *this));//build error
//newThread.detach();
threadCache.push_back(th);
}
}
void joinThreads() {
for (auto& it : threadCache) {
it->join();
}
}
virtual ~threadPool() {
joinThreads();//等待所有線程結束
}
void pushTask(Task task) {
std::lock_guard<std::mutex> lk(mMux);
mTasksQueue.push(task);
mCondVar.notify_one();
}
//構造函數中開辟的多個線程的入口
void threadFunc() {
while (true)
{
//終止函數,停止線程
if (mReceStopOrder) {
myLogger.logData("break---");
break;
}
std::unique_lock<std::mutex> lk(mMux);
while (mTasksQueue.empty() && !mReceStopOrder)
{
mCondVar.wait(lk);
}
if (mReceStopOrder) {
myLogger.logData("break");
break;
}
//get a task and execute it:
Task task = mTasksQueue.front();
mTasksQueue.pop();
lk.unlock();//顯式地解鎖.
task.run();
}
thread::id threadId = std::this_thread::get_id();
string data = "ThreadId "+ getThreadIdOfString(threadId) + " exit \n";
myLogger.logData(data);
}
void stopAllThreads() {
myLogger.logData("stopAllThreads was called!");
mReceStopOrder = true;
mCondVar.notify_all();//可能有的線程處于wait狀态
}
private:
unsigned int mMaxThreadsNum;
std::queue<Task> mTasksQueue; //緩存各個任務
vector<shared_ptr<thread>> threadCache; //儲存開辟的各個線程
std::mutex mMux;
std::condition_variable mCondVar;//當任務隊列mTasksQueue不為空時,喚醒一個線程,從隊列頭取出一個task執行。
std::atomic<bool> mReceStopOrder;//用于控制線程的停止
};
logger類 & std::thread::id轉換為string的函數:
log資料到檔案,如果用螢幕列印的話,由于多線程,容易出現穿插列印。
std::string getThreadIdOfString(const std::thread::id& id)
{
std::stringstream ss;//#include<sstream>
ss << id;
return ss.str();
}
class logger {
public:
logger(string path = "") :filePath(path) {
ofs.open(filePath, ios::out);
}
virtual ~logger() {
ofs.close();
}
void logData(string a) {
std::lock_guard<mutex> lkguard(mMut);
ofs << a << endl;
}
private:
mutex mMut;
string filePath = "";
ofstream ofs;
};
logger myLogger("C:\\Users\\xxx\\Desktop\\goodVideo\\1.txt");//全局變量
Test:
#include <iostream>
#include <stack>
#include <vector>
#include <list>
#include <unordered_map>
#include <queue>
#include <functional>
#include <mutex>
#include <atomic>
#include <thread>
#include <string>
#include <fstream>
#include <sstream>
#include <memory>
int main() {
threadPool myThreadPools(5);
for (int i = 0; i < 5; i++) {
Task myTempTask(i);
myThreadPools.pushTask(myTempTask);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(2));
myThreadPools.stopAllThreads();
}
Related:
https://blog.csdn.net/FlushHip/article/details/81902188 -- thread_id
https://blog.csdn.net/FlushHip/article/details/81902188 -- good idea and implemented with linux api