天天看點

c++11實作一個線程池子Task類:threadPool類:logger類 & std::thread::id轉換為string的函數:Test:

對象池模式在軟體中廣泛使用,例如,線程池,連接配接池,記憶體池等。Boost庫中的pool實作了記憶體池,thread_group實作了簡單的線程池。

以下實作的線程池與boost無關,隻是提供一種思路。

c++11實作一個線程池子Task類:threadPool類:logger類 & std::thread::id轉換為string的函數:Test:

Task類:

對應要執行的任務Task::run, 任務入參Task::mParam.

class Task {
public:
	Task(int val=0):
		mParam(val)
	{

	}
	virtual ~Task() {

	}
	void setParam(int val) {
		mParam = val;
	}
	void run() {
		thread::id threadId = std::this_thread::get_id();
		cout << "param = " << mParam <<  " ThreadId = " << threadId << endl;
		string data = "param = " + std::to_string(mParam)+ " ThreadId = "+ getThreadIdOfString(threadId) +"\n";
		myLogger.logData(data);
	}
private:
	int mParam;
};
           

threadPool類:

成員變量vector<shared_ptr<thread>> threadCache儲存構造函數中開辟的線程;構造函數中開辟線程,各個線程的入口函數都是threadFunc,如果任務隊列為空則線程bolck,等待任務。

std::queue<Task> mTasksQueue儲存外界傳入的要執行的task.

std::condition_variable mCondVar用于當任務隊列mTasksQueue不為空時,喚醒一個線程,從隊列頭取出一個task執行。

stopAllThreads()用于停止所有線程;

joinThreads()用于等待所有線程結束,在析構函數中被調用。

Note:

threadFunc函數中的線程喚醒條件while (mTasksQueue.empty() && !mReceStopOrder), 不加mReceStopOrder的判斷,易導緻stopAllThreads()可能不起作用(調用stopAllThreads函數時,taskQueue已經為空)。

//c++實作線程池:
class threadPool {
public:
	threadPool(int maxThreadsNum=1):
		mMaxThreadsNum(maxThreadsNum){
		mReceStopOrder = false;
		//creat threads:
		for (int i = 0; i < mMaxThreadsNum; ++i) {
			shared_ptr<thread> th = std::make_shared<thread>(&threadPool::threadFunc, this);//Ok
			//shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, this)); //Ok
			//shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, *this));//build error
			
			//newThread.detach();
			threadCache.push_back(th);
		}
	}

	void joinThreads() {
		for (auto& it : threadCache) {
			it->join();
		}
	}
	virtual ~threadPool() {
		joinThreads();//等待所有線程結束
	}
	void pushTask(Task task) {

		std::lock_guard<std::mutex> lk(mMux);
		mTasksQueue.push(task);
		mCondVar.notify_one();
	}
	//構造函數中開辟的多個線程的入口
	 void threadFunc() {
		while (true)
		{
			//終止函數,停止線程
			if (mReceStopOrder) {
				myLogger.logData("break---");
				break;
			}
			std::unique_lock<std::mutex> lk(mMux);
			while (mTasksQueue.empty() && !mReceStopOrder)
			{
				mCondVar.wait(lk);
			}
			if (mReceStopOrder) {
				myLogger.logData("break");
				break;
			}
			//get a task and execute it:
			Task task = mTasksQueue.front();
			mTasksQueue.pop();
			lk.unlock();//顯式地解鎖.
			task.run();
		}

		thread::id threadId = std::this_thread::get_id();
		string data = "ThreadId "+ getThreadIdOfString(threadId) + " exit \n";
		myLogger.logData(data);
	}
	void stopAllThreads() {
		myLogger.logData("stopAllThreads was called!");
		mReceStopOrder = true;
		mCondVar.notify_all();//可能有的線程處于wait狀态
	}

private:
	unsigned int mMaxThreadsNum;
	std::queue<Task> mTasksQueue; //緩存各個任務
	vector<shared_ptr<thread>> threadCache; //儲存開辟的各個線程
	std::mutex mMux;
	std::condition_variable mCondVar;//當任務隊列mTasksQueue不為空時,喚醒一個線程,從隊列頭取出一個task執行。
	std::atomic<bool> mReceStopOrder;//用于控制線程的停止
};
           

logger類 & std::thread::id轉換為string的函數:

log資料到檔案,如果用螢幕列印的話,由于多線程,容易出現穿插列印。

std::string getThreadIdOfString(const std::thread::id& id)
{
	std::stringstream ss;//#include<sstream>
	ss << id;
	return ss.str();
}


class logger {
public:
	logger(string path = "") :filePath(path) {
		ofs.open(filePath, ios::out);
	}

	virtual ~logger() {
		ofs.close();
	}

	void logData(string a) {
		std::lock_guard<mutex> lkguard(mMut);
		ofs << a << endl;
	}
private:
	mutex mMut;
	string filePath = "";
	ofstream ofs;
};
logger myLogger("C:\\Users\\xxx\\Desktop\\goodVideo\\1.txt");//全局變量
           

Test:

#include <iostream>
#include <stack>
#include <vector>
#include <list>
#include <unordered_map>
#include <queue>
#include <functional>
#include <mutex>
#include <atomic>
#include <thread>
#include <string>  
#include <fstream> 
#include <sstream>
#include <memory>

int main() {

	threadPool myThreadPools(5);
	for (int i = 0; i < 5; i++) {
		Task myTempTask(i);
		myThreadPools.pushTask(myTempTask);
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
	}
	

	std::this_thread::sleep_for(std::chrono::seconds(2));

	myThreadPools.stopAllThreads();

}
           
c++11實作一個線程池子Task類:threadPool類:logger類 &amp; std::thread::id轉換為string的函數:Test:

Related:

https://blog.csdn.net/FlushHip/article/details/81902188  -- thread_id

https://blog.csdn.net/FlushHip/article/details/81902188 -- good idea and implemented with linux api

繼續閱讀