線程池: 簡單地說,線程池 就是預先建立好一批線程,友善、快速地處理收到的業務。比起傳統的到來一個任務,即時建立一個線程來處理,節省了線程的建立和回收的開銷,響應更快,效率更高。
在linux中,使用的是posix線程庫,首先介紹幾個常用的函數:
1 線程的建立和取消函數
pthread_create
建立線程
pthread_join
合并線程
pthread_cancel
取消線程
2 線程同步函數
pthread_mutex_lock
pthread_mutex_unlock
pthread_cond_signal
pthread_cond_wait
關于函數的詳細說明,參考man手冊
線程池的實作:
線程池的實作主要分為三部分,線程的建立、添加任務到線程池中、工作線程從任務隊列中取出任務進行處理。
主要有兩個類來實作,CTask,CThreadPool
/**
執行任務的類,設定任務資料并執行
**/
class CTask
{
protected:
string m_strTaskName; //任務的名稱
void* m_ptrData; //要執行的任務的具體資料
public:
CTask(){}
CTask(string taskName)
{
this->m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); //設定任務資料
};
任務類是個虛類,所有的任務要從CTask類中繼承 ,實作run接口,run接口中需要實作的就是具體解析任務的邏輯。m_ptrData是指向任務資料的指針,可以是簡單資料類型,也可以是自定義的複雜資料類型。
線程池類
/**
線程池
**/
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList; //任務清單
int m_iThreadNum; //線程池中啟動的線程數
static vector<pthread_t> m_vecIdleThread; //目前空閑的線程集合
static vector<pthread_t> m_vecBusyThread; //目前正在執行的線程集合
static pthread_mutex_t m_pthreadMutex; //線程同步鎖
static pthread_cond_t m_pthreadCond; //線程同步的條件變量
protected:
static void* ThreadFunc(void * threadData); //新線程的線程函數
static int MoveToIdle(pthread_t tid); //線程執行結束後,把自己放入到空閑線程中
static int MoveToBusy(pthread_t tid); //移入到忙碌線程中去
int Create(); //建立所有的線程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任務添加到線程池中
int StopAll();
};
當線程池對象建立後,啟動一批線程,并把所有的線程放入空閑清單中,當有任務到達時,某一個線程取出任務并進行處理。
線程之間的同步用線程鎖和條件變量。
這個類的對外接口有兩個:
AddTask函數把任務添加到線程池的任務清單中,并通知線程進行處理。當任務到到時,把任務放入m_vecTaskList任務清單中,并用pthread_cond_signal喚醒一個線程進行處理。
StopAll函數停止所有的線程
************************************************
代碼:
××××××××××××××××××××CThread.h
#ifndef __CTHREAD
#define __CTHREAD
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
/**
執行任務的類,設定任務資料并執行
**/
class CTask
{
protected:
string m_strTaskName; //任務的名稱
void* m_ptrData; //要執行的任務的具體資料
public:
CTask(){}
CTask(string taskName)
{
this->m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); //設定任務資料
};
/**
線程池
**/
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList; //任務清單
int m_iThreadNum; //線程池中啟動的線程數
static vector<pthread_t> m_vecIdleThread; //目前空閑的線程集合
static vector<pthread_t> m_vecBusyThread; //目前正在執行的線程集合
static pthread_mutex_t m_pthreadMutex; //線程同步鎖
static pthread_cond_t m_pthreadCond; //線程同步的條件變量
protected:
static void* ThreadFunc(void * threadData); //新線程的線程函數
static int MoveToIdle(pthread_t tid); //線程執行結束後,把自己放入到空閑線程中
static int MoveToBusy(pthread_t tid); //移入到忙碌線程中去
int Create(); //建立所有的線程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //把任務添加到線程池中
int StopAll();
};
#endif
類的實作為:
××××××××××××××××××××CThread.cpp
#include "CThread.h"
#include <string>
#include <iostream>
using namespace std;
void CTask::SetData(void * data)
{
m_ptrData = data;
}
vector<pthread_t> CThreadPool::m_vecBusyThread;
vector<pthread_t> CThreadPool::m_vecIdleThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
Create();
}
int CThreadPool::MoveToIdle(pthread_t tid)
{
vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
while(busyIter != m_vecBusyThread.end())
{
if(tid == *busyIter)
{
break;
}
busyIter++;
}
m_vecBusyThread.erase(busyIter);
m_vecIdleThread.push_back(tid);
return 0;
}
int CThreadPool::MoveToBusy(pthread_t tid)
{
vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
while(idleIter != m_vecIdleThread.end())
{
if(tid == *idleIter)
{
break;
}
idleIter++;
}
m_vecIdleThread.erase(idleIter);
m_vecBusyThread.push_back(tid);
return 0;
}
void* CThreadPool::ThreadFunc(void * threadData)
{
pthread_t tid = pthread_self();
while(1)
{
pthread_mutex_lock(&m_pthreadMutex);
pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
cout << "tid:" << tid << " run" << endl;
//get task
vector<CTask*>* taskList = (vector<CTask*>*)threadData;
vector<CTask*>::iterator iter = taskList->begin();
while(iter != taskList->end())
{
MoveToBusy(tid);
break;
}
CTask* task = *iter;
taskList->erase(iter);
pthread_mutex_unlock(&m_pthreadMutex);
cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
//cout << "task to be run:" << taskList->size() << endl;
task->Run();
//cout << "CThread::thread work" << endl;
cout << "tid:" << tid << " idle" << endl;
}
return (void*)0;
}
int CThreadPool::AddTask(CTask *task)
{
this->m_vecTaskList.push_back(task);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
int CThreadPool::Create()
{
for(int i = 0; i < m_iThreadNum;i++)
{
pthread_t tid = 0;
pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
m_vecIdleThread.push_back(tid);
}
return 0;
}
int CThreadPool::StopAll()
{
vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
while(iter != m_vecIdleThread.end())
{
pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
iter = m_vecBusyThread.begin();
while(iter != m_vecBusyThread.end())
{
pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
return 0;
}
簡單示例:
××××××××test.cpp
#include "CThread.h"
#include <iostream>
using namespace std;
class CWorkTask: public CTask
{
public:
CWorkTask()
{}
int Run()
{
cout << (char*)this->m_ptrData << endl;
sleep(10);
return 0;
}
};
int main()
{
CWorkTask taskObj;
char szTmp[] = "this is the first thread running,haha success";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0;i < 11;i++)
{
threadPool.AddTask(&taskObj);
}
while(1)
{
sleep(120);
}
return 0;
}