此線程池所依賴的線程類,請參看《一個Windows C++的線程類實作》:
http://blog.csdn.net/huyiyang2010/archive/2010/08/10/5801597.aspx
ThreadPoolExecutor.h
#ifndef __THREAD_POOL_EXECUTOR__
#define __THREAD_POOL_EXECUTOR__
#include "Thread.h"
#include <set>
#include <list>
#include <windows.h>
class CThreadPoolExecutor
{
public:
CThreadPoolExecutor(void);
~CThreadPoolExecutor(void);
/**
初始化線程池,建立minThreads個線程
**/
bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);
執行任務,若目前任務清單沒有滿,将此任務插入到任務清單,傳回true
若目前任務清單滿了,但目前線程數量小于最大線程數,将建立新線程執行此任務,傳回true
若目前任務清單滿了,但目前線程數量等于最大線程數,将丢棄此任務,傳回false
bool Execute(Runnable * pRunnable);
終止線程池,先制止塞入任務,
然後等待直到任務清單為空,
然後設定最小線程數量為0,
等待直到線程數量為空,
清空垃圾堆中的任務
void Terminate();
傳回線程池中目前的線程數量
unsigned int GetThreadPoolSize();
private:
擷取任務清單中的任務,若任務清單為空,傳回NULL
Runnable * GetTask();
static unsigned int WINAPI StaticThreadFunc(void * arg);
class CWorker : public CThread
{
public:
CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);
~CWorker();
void Run();
private:
CThreadPoolExecutor * m_pThreadPool;
Runnable * m_pFirstTask;
volatile bool m_bRun;
};
typedef std::set<CWorker *> ThreadPool;
typedef std::list<Runnable *> Tasks;
typedef Tasks::iterator TasksItr;
typedef ThreadPool::iterator ThreadPoolItr;
ThreadPool m_ThreadPool;
ThreadPool m_TrashThread;
Tasks m_Tasks;
CRITICAL_SECTION m_csTasksLock;
CRITICAL_SECTION m_csThreadPoolLock;
volatile bool m_bRun;
volatile bool m_bEnableInsertTask;
volatile unsigned int m_minThreads;
volatile unsigned int m_maxThreads;
volatile unsigned int m_maxPendingTasks;
};
#endif
ThreadPoolExecutor.cpp
#include "ThreadPoolExecutor.h"
CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :
m_pThreadPool(pThreadPool),
m_pFirstTask(pFirstTask),
m_bRun(true)
}
CThreadPoolExecutor::CWorker::~CWorker()
/**
執行任務的工作線程。
目前沒有任務時,
如果目前線程數量大于最小線程數量,減少線程,
否則,執行清理程式,将線程類給釋放掉
**/
void CThreadPoolExecutor::CWorker::Run()
Runnable * pTask = NULL;
while(m_bRun)
if(NULL == m_pFirstTask)
{
pTask = m_pThreadPool->GetTask();
}
else
pTask = m_pFirstTask;
m_pFirstTask = NULL;
if(NULL == pTask)
EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)
{
ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
if(itr != m_pThreadPool->m_ThreadPool.end())
{
m_pThreadPool->m_ThreadPool.erase(itr);
m_pThreadPool->m_TrashThread.insert(this);
}
m_bRun = false;
}
else
ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
while(itr != m_pThreadPool->m_TrashThread.end())
(*itr)->Join();
delete (*itr);
m_pThreadPool->m_TrashThread.erase(itr);
itr = m_pThreadPool->m_TrashThread.begin();
LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
continue;
pTask->Run();
pTask = NULL;
}
/////////////////////////////////////////////////////////////////////////////////////////////
CThreadPoolExecutor::CThreadPoolExecutor(void) :
m_bRun(false),
m_bEnableInsertTask(false)
InitializeCriticalSection(&m_csTasksLock);
InitializeCriticalSection(&m_csThreadPoolLock);
CThreadPoolExecutor::~CThreadPoolExecutor(void)
Terminate();
DeleteCriticalSection(&m_csTasksLock);
DeleteCriticalSection(&m_csThreadPoolLock);
bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)
if(minThreads == 0)
return false;
if(maxThreads < minThreads)
m_minThreads = minThreads;
m_maxThreads = maxThreads;
m_maxPendingTasks = maxPendingTasks;
unsigned int i = m_ThreadPool.size();
for(; i<minThreads; i++)
//建立線程
CWorker * pWorker = new CWorker(this);
if(NULL == pWorker)
return false;
EnterCriticalSection(&m_csThreadPoolLock);
m_ThreadPool.insert(pWorker);
LeaveCriticalSection(&m_csThreadPoolLock);
pWorker->Start();
m_bRun = true;
m_bEnableInsertTask = true;
return true;
bool CThreadPoolExecutor::Execute(Runnable * pRunnable)
if(!m_bEnableInsertTask)
if(NULL == pRunnable)
if(m_Tasks.size() >= m_maxPendingTasks)
if(m_ThreadPool.size() < m_maxThreads)
CWorker * pWorker = new CWorker(this, pRunnable);
if(NULL == pWorker)
return false;
EnterCriticalSection(&m_csThreadPoolLock);
m_ThreadPool.insert(pWorker);
LeaveCriticalSection(&m_csThreadPoolLock);
pWorker->Start();
else
EnterCriticalSection(&m_csTasksLock);
m_Tasks.push_back(pRunnable);
LeaveCriticalSection(&m_csTasksLock);
Runnable * CThreadPoolExecutor::GetTask()
Runnable * Task = NULL;
EnterCriticalSection(&m_csTasksLock);
if(!m_Tasks.empty())
Task = m_Tasks.front();
m_Tasks.pop_front();
LeaveCriticalSection(&m_csTasksLock);
return Task;
unsigned int CThreadPoolExecutor::GetThreadPoolSize()
return m_ThreadPool.size();
void CThreadPoolExecutor::Terminate()
m_bEnableInsertTask = false;
while(m_Tasks.size() > 0)
Sleep(1);
m_bRun = false;
m_minThreads = 0;
m_maxThreads = 0;
m_maxPendingTasks = 0;
while(m_ThreadPool.size() > 0)
EnterCriticalSection(&m_csThreadPoolLock);
ThreadPoolItr itr = m_TrashThread.begin();
while(itr != m_TrashThread.end())
(*itr)->Join();
delete (*itr);
m_TrashThread.erase(itr);
itr = m_TrashThread.begin();
LeaveCriticalSection(&m_csThreadPoolLock);
用法:
#include "Thread.h"
#include "ThreadPoolExecutor.h"
class R : public Runnable
{
public:
~R()
{
}
void Run()
printf("Hello World/n");
};
int _tmain(int argc, _TCHAR* argv[])
CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
pExecutor->Init(1, 10, 50);
R r;
for(int i=0;i<100;i++)
while(!pExecutor->Execute(&r))
{
}
pExecutor->Terminate();
delete pExecutor;
getchar();
return 0;
}
測試結果:
機器:
Intel(R) Core(TM)2 Duo CPU
E8400 @ 3.00GHz
2G記憶體
對于100個任務并且每個任務包含10000000個循環,任務中無等待:
單線程執行耗時:2281時間片
單線程池執行耗時:2219時間片
2個線程的線程池耗時:1156時間片
5個線程的線程池耗時:1166時間片
10個線程的線程池耗時:1157時間片
100個線程的線程池耗時:1177時間片
from: