天天看點

一個簡單的linux線程池

線程池: 簡單地說,線程池 就是預先建立好一批線程,友善、快速地處理收到的業務。比起傳統的到來一個任務,即時建立一個線程來處理,節省了線程的建立和回收的開銷,響應更快,效率更高。

在linux中,使用的是posix線程庫,首先介紹幾個常用的函數:

1 線程的建立和取消函數

pthread_create

建立線程

pthread_join

合并線程

pthread_cancel

取消線程

2 線程同步函數

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait

關于函數的詳細說明,參考man手冊

線程池的實作:

線程池的實作主要分為三部分,線程的建立、添加任務到線程池中、工作線程從任務隊列中取出任務進行處理。

主要有兩個類來實作,CTask,CThreadPool

/**
執行任務的類,設定任務資料并執行
**/
class CTask  
{  
protected:  
 string m_strTaskName;  //任務的名稱  
 void* m_ptrData;       //要執行的任務的具體資料  
public:  
 CTask(){}  
 CTask(string taskName)  
 {  
  this->m_strTaskName = taskName;  
  m_ptrData = NULL;  
 }  
 virtual int Run()= 0;  
 void SetData(void* data);    //設定任務資料  
};  
           

任務類是個虛類,所有的任務要從CTask類中繼承 ,實作run接口,run接口中需要實作的就是具體解析任務的邏輯。m_ptrData是指向任務資料的指針,可以是簡單資料類型,也可以是自定義的複雜資料類型。

線程池類

/**
線程池
**/
class CThreadPool  
{  
private:  
 vector<CTask*> m_vecTaskList;         //任務清單  
 int m_iThreadNum;                            //線程池中啟動的線程數             
 static vector<pthread_t> m_vecIdleThread;   //目前空閑的線程集合  
 static vector<pthread_t> m_vecBusyThread;   //目前正在執行的線程集合  
 static pthread_mutex_t m_pthreadMutex;    //線程同步鎖  
 static pthread_cond_t m_pthreadCond;    //線程同步的條件變量  
protected:  
 static void* ThreadFunc(void * threadData); //新線程的線程函數  
 static int MoveToIdle(pthread_t tid);   //線程執行結束後,把自己放入到空閑線程中  
 static int MoveToBusy(pthread_t tid);   //移入到忙碌線程中去  
 int Create();          //建立所有的線程  
public:  
 CThreadPool(int threadNum);  
 int AddTask(CTask *task);      //把任務添加到線程池中  
 int StopAll();  
};  
           

當線程池對象建立後,啟動一批線程,并把所有的線程放入空閑清單中,當有任務到達時,某一個線程取出任務并進行處理。

線程之間的同步用線程鎖和條件變量。

這個類的對外接口有兩個:

AddTask函數把任務添加到線程池的任務清單中,并通知線程進行處理。當任務到到時,把任務放入m_vecTaskList任務清單中,并用pthread_cond_signal喚醒一個線程進行處理。

StopAll函數停止所有的線程

************************************************  
  
代碼:  
  
××××××××××××××××××××CThread.h  
  
   
  
#ifndef __CTHREAD  
#define __CTHREAD  
#include <vector>  
#include <string>  
#include <pthread.h>  
  
using namespace std;  
  
/** 
執行任務的類,設定任務資料并執行 
**/  
class CTask  
{  
protected:  
 string m_strTaskName;  //任務的名稱  
 void* m_ptrData;       //要執行的任務的具體資料  
public:  
 CTask(){}  
 CTask(string taskName)  
 {  
  this->m_strTaskName = taskName;  
  m_ptrData = NULL;  
 }  
 virtual int Run()= 0;  
 void SetData(void* data);    //設定任務資料  
};  
  
/** 
線程池 
**/  
class CThreadPool  
{  
private:  
 vector<CTask*> m_vecTaskList;         //任務清單  
 int m_iThreadNum;                            //線程池中啟動的線程數             
 static vector<pthread_t> m_vecIdleThread;   //目前空閑的線程集合  
 static vector<pthread_t> m_vecBusyThread;   //目前正在執行的線程集合  
 static pthread_mutex_t m_pthreadMutex;    //線程同步鎖  
 static pthread_cond_t m_pthreadCond;    //線程同步的條件變量  
protected:  
 static void* ThreadFunc(void * threadData); //新線程的線程函數  
 static int MoveToIdle(pthread_t tid);   //線程執行結束後,把自己放入到空閑線程中  
 static int MoveToBusy(pthread_t tid);   //移入到忙碌線程中去  
 int Create();          //建立所有的線程  
public:  
 CThreadPool(int threadNum);  
 int AddTask(CTask *task);      //把任務添加到線程池中  
 int StopAll();  
};  
  
#endif  
  
   
  
   
  
   
  
類的實作為:  
  
××××××××××××××××××××CThread.cpp  
  
   
  
#include "CThread.h"  
#include <string>  
#include <iostream>  
  
using namespace std;  
  
void CTask::SetData(void * data)  
{  
 m_ptrData = data;  
}  
  
vector<pthread_t> CThreadPool::m_vecBusyThread;  
vector<pthread_t> CThreadPool::m_vecIdleThread;  
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;  
  
CThreadPool::CThreadPool(int threadNum)  
{  
 this->m_iThreadNum = threadNum;  
 Create();  
}  
int CThreadPool::MoveToIdle(pthread_t tid)  
{  
 vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();  
 while(busyIter != m_vecBusyThread.end())  
 {  
  if(tid == *busyIter)  
  {  
   break;  
  }  
  busyIter++;  
 }  
 m_vecBusyThread.erase(busyIter);  
 m_vecIdleThread.push_back(tid);  
 return 0;  
}  
  
int CThreadPool::MoveToBusy(pthread_t tid)  
{  
 vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();  
 while(idleIter != m_vecIdleThread.end())  
 {  
  if(tid == *idleIter)  
  {  
   break;  
  }  
  idleIter++;  
 }  
 m_vecIdleThread.erase(idleIter);  
 m_vecBusyThread.push_back(tid);  
 return 0;  
}  
void* CThreadPool::ThreadFunc(void * threadData)  
{  
 pthread_t tid = pthread_self();  
 while(1)  
 {  
  pthread_mutex_lock(&m_pthreadMutex);  
  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);  
  cout << "tid:" << tid << " run" << endl;  
  //get task  
  vector<CTask*>* taskList = (vector<CTask*>*)threadData;  
  vector<CTask*>::iterator iter = taskList->begin();  
  while(iter != taskList->end())  
  {  
     
   MoveToBusy(tid);  
   break;  
  }  
  CTask* task = *iter;  
  taskList->erase(iter);  
  pthread_mutex_unlock(&m_pthreadMutex);  
  cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;  
  cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;  
  //cout << "task to be run:" << taskList->size() << endl;  
  task->Run();  
    
  //cout << "CThread::thread work" << endl;  
  cout << "tid:" << tid << " idle" << endl;  
    
 }  
 return (void*)0;  
}  
  
int CThreadPool::AddTask(CTask *task)  
{  
 this->m_vecTaskList.push_back(task);  
 pthread_cond_signal(&m_pthreadCond);  
 return 0;  
}  
int CThreadPool::Create()  
{  
 for(int i = 0; i < m_iThreadNum;i++)  
 {  
  pthread_t tid = 0;  
  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);  
  m_vecIdleThread.push_back(tid);  
 }  
 return 0;  
}  
  
int CThreadPool::StopAll()  
{  
 vector<pthread_t>::iterator iter = m_vecIdleThread.begin();  
 while(iter != m_vecIdleThread.end())  
 {  
  pthread_cancel(*iter);  
  pthread_join(*iter,NULL);  
  iter++;  
 }  
  
 iter = m_vecBusyThread.begin();  
 while(iter != m_vecBusyThread.end())  
 {  
  pthread_cancel(*iter);  
  pthread_join(*iter,NULL);  
  iter++;  
 }  
   
 return 0;  
}  
  
簡單示例:  
  
××××××××test.cpp  
  
#include "CThread.h"  
#include <iostream>  
  
using namespace std;  
  
class CWorkTask: public CTask  
{  
public:  
 CWorkTask()  
 {}  
 int Run()  
 {  
  cout << (char*)this->m_ptrData << endl;  
  sleep(10);  
  return 0;  
 }  
};  
int main()  
{  
 CWorkTask taskObj;  
 char szTmp[] = "this is the first thread running,haha success";  
 taskObj.SetData((void*)szTmp);  
 CThreadPool threadPool(10);  
 for(int i = 0;i < 11;i++)  
 {  
  threadPool.AddTask(&taskObj);  
 }  
 while(1)  
 {  
  sleep(120);  
 }  
 return 0;  
}  
           

繼續閱讀