1. 前言
線程池不是一個新鮮的東西,網上能找到很多原理、實作,甚至很多庫都提供了實作,比如微軟的 ATL::CThreadPool, Vista後提供的CreateThreadpoolWork, boost 中提供的 thread_pool, CSDN、CodeProject 等網站上很多人已經實作的類。但這些庫往往隻支援啟動任務,而不能很好地停止任務(相信很多人都會和我一樣有這個需求),于是我在FTL中寫了一個個人認為還比較完美的線程池。
2. 功能
本線程池提供了如下功能:
1. 能根據任務個數和目前線程的多少在最小/最大線程個數之間自動調整(Vista後的系統有 SetThreadpoolThreadMaximum 等函數有類似功能);
2. 能友善的對任一任務進行取消操作,無論該任務是等待運作狀态還是正在運作狀态都支援(相比較而言,WaitForThreadpoolWorkCallbacks 函數隻能取消尚未運作的任務);
3. 能對整個線程池進行安全的暫停、繼續、停止處理
4. 支援回調方式的回報通知
5. 使用模版方式實作,能友善的進行參數傳遞
6. 在加入任務時可以設定優先級(目前尚不支援動态調整)
7. 使用的是微軟的基本API,能支援WinXP、Vista、Win7等各種作業系統(CreateThreadpoolWork 等隻能在Vista後才能使用)
3. UML圖示和簡單說明
其UML圖比較簡單,主要的隻有三個類:
CFThreadPool – 線程池的管理類,負責整個線程池的管理工作,直接使用即可。
CFJobBase – Job的基類,如果想實作自己的Job,必須從這個類繼承,并實作其中的Run/ OnCancelJob 等函數。
IFThreadPoolCallBack– 可選的回調實作,可以通知調用段 Job 的啟動、停止、取消、進度通知、錯誤等各種狀态的改變。
4. 實作說明
以下部分簡單介紹了一些比較重要的代碼實作,具體請參見示例代碼和其中的注釋部分。
4.1. Job容器
本線程池類中有兩種Job的容器,分别是等待運作的Jobs和目前正在運作的Jobs。因為有不同的需求,其定義分别如下(兩種容器類型的選擇,其理由已經寫得比較詳細,大家自行分析即可)
//! 儲存等待Job的資訊,由于有優先級的問題,而且從最前面開始取任務,是以儲存成set
//! 保證優先級高、JobIndex小(同優先級時FIFO) 的Job在最前面
typedeftypenameUnreferenceLess<CFJobBase<T>* >JobBaseUnreferenceLess;
typedefstd::set<CFJobBase<T>*,JobBaseUnreferenceLess > WaitingJobContainer;
WaitingJobContainer m_WaitingJobs; //!等待運作的Job
//! 儲存運作Job的資訊,由于會頻繁加入、删除,且需要按照JobIndex查找删除,是以儲存成map
typedefstd::map<LONG,CFJobBase<T>* > DoingJobContainer;
DoingJobContainer m_DoingJobs; //! 正在運作的Job
4.2. Job優先級
代碼中通過operator < 方法比較CFJobBase<T>::m_nJobPriority和 JobIndex,在SubmitJob時保證其在 set 中的順序來保證 優先級高的Job先執行,相同優先級的Job采用FIFO方式。
4.3. Job對暫停、停止的支援
為了支援暫停、停止,Job的子類必須在工作循環中調用父類提供 GetJobWaitType方法,并判斷其傳回值。如果傳回值為ftwtStop 表示使用者請求了停止,需要進行必要的清除工作。GetJobWaitType的實作請參見代碼(本質是等待m_hEventJobStop, m_pThreadPool->m_hEventStop, m_pThreadPool->m_hEventContinue這三個手動重置事件之一)
4.4. 線程池對Job的控制
線程池中主要有以下的一些函數,因為意義根據名字很容易猜出,就不再詳細介紹,詳見示例。
Start(LONGnMinNumThreads, LONG nMaxNumThreads);
StopAndWait(DWORDdwTimeOut = FTL_MAX_THREAD_DEADLINE_CHECK);
ClearUndoWork();
SubmitJob(CFJobBase<T>*pJob, LONG* pOutJobIndex); //加入Job,會傳回Job在Pool中的唯一索引,可以通過CancelJob 取消。
Pause/Resume/Stop// 對整個線程池請求暫停、繼續、停止的操作。注意:需要Job子類的配合才能達成目标。
4.5. 資源的處理
CFJobBase子類需要 Initialize、Run、Finalize、OnCancelJob 這幾個虛函數。
當Job運作的時候,其邏輯為 if(Initialize){ Run -> Finalize; },需要在 Finalize 中釋放資源;
當Job沒有運作的時候就被取消或Pool停止,則會調用 OnCancelJob,需要在其中釋放資源。
5. 示例程式
編寫了簡單的MFC示例程式,其界面如圖所示:
因為UI不是重點,而且本人也比較懶,是以沒有弄很好的UI出來,所有的運作資訊請參見VisualStudio中的“輸出”視窗。
Start 按鈕啟動線程池,示例中設定的最小、最大線程個數是 2-4(即會根據加入的Job自動在 2-4 個線程之間自動調整)。
三個AddJob按鈕分别是增加 高、普通、低優先級的Job,加入後可以在日志中檢視其運作的順序。
兩個CancelJob按鈕分别是從前面、後面的JobIndex取消Job(實際上支援任和有效的JobIndex)。
6. 特别說明
6.1. IFThreadPoolCallBack 回調的同步
Pool 調用 IFThreadPoolCallBack 接口的各個方法時,為了性能上的考慮,沒有加鎖進行同步。使用時如果有需要,最好自行同步。
6.2. 單任務的暫停、繼續 VS Pool的暫停、繼續
在開發過程中,考慮過是提供單任務的暫停、繼續 還是 整個Pool 的暫停、繼續,考慮到目前的需求,暫時隻支援整個Pool的暫停,免得過分複雜,如有需要,大家可以自行參考CancelJob實作。
6.3. 可能存在的Bug和解決方法
在開發的過程中,對各個部分都進行了詳細的分析、考慮和測試,應該沒有較大的Bug。目前隻想到一種在極端情況下,可能會造成的Bug,特提出以免各位踩雷。
描述:因為是多線程的代碼,是以在極端情況下,可能出現SubmitJob函數尚未傳回,對應的Job就執行完畢(或 Initialize失敗直接傳回)的情況。此時如果調用端采用了将 *pOutJobIndex儲存起來,在 OnJobEnd 中清除的邏輯,可能會因為 OnJobEnd 找不到對應的JobIndex 而出現邏輯錯誤。
解決方法:在調用 SubmitJob 的代碼和OnJobEnd 的回調代碼中,使用相同的鎖機制保證即可(這個也應該由調用者來保證