天天看點

Windows下一個比較完美的線程池實作

1.  前言

線程池不是一個新鮮的東西,網上能找到很多原理、實作,甚至很多庫都提供了實作,比如微軟的 ATL::CThreadPool, Vista後提供的CreateThreadpoolWork, boost 中提供的 thread_pool, CSDN、CodeProject 等網站上很多人已經實作的類。但這些庫往往隻支援啟動任務,而不能很好地停止任務(相信很多人都會和我一樣有這個需求),于是我在FTL中寫了一個個人認為還比較完美的線程池。

2.  功能

本線程池提供了如下功能:

1.       能根據任務個數和目前線程的多少在最小/最大線程個數之間自動調整(Vista後的系統有 SetThreadpoolThreadMaximum 等函數有類似功能);

2.      能友善的對任一任務進行取消操作,無論該任務是等待運作狀态還是正在運作狀态都支援(相比較而言,WaitForThreadpoolWorkCallbacks 函數隻能取消尚未運作的任務);

3.      能對整個線程池進行安全的暫停、繼續、停止處理

4.      支援回調方式的回報通知

5.      使用模版方式實作,能友善的進行參數傳遞

6.      在加入任務時可以設定優先級(目前尚不支援動态調整)

7.      使用的是微軟的基本API,能支援WinXP、Vista、Win7等各種作業系統(CreateThreadpoolWork 等隻能在Vista後才能使用)

3.  UML圖示和簡單說明

其UML圖比較簡單,主要的隻有三個類:

CFThreadPool – 線程池的管理類,負責整個線程池的管理工作,直接使用即可。

CFJobBase – Job的基類,如果想實作自己的Job,必須從這個類繼承,并實作其中的Run/ OnCancelJob 等函數。

IFThreadPoolCallBack– 可選的回調實作,可以通知調用段 Job 的啟動、停止、取消、進度通知、錯誤等各種狀态的改變。

Windows下一個比較完美的線程池實作

4.  實作說明

以下部分簡單介紹了一些比較重要的代碼實作,具體請參見示例代碼和其中的注釋部分。

4.1.     Job容器

本線程池類中有兩種Job的容器,分别是等待運作的Jobs和目前正在運作的Jobs。因為有不同的需求,其定義分别如下(兩種容器類型的選擇,其理由已經寫得比較詳細,大家自行分析即可)

//! 儲存等待Job的資訊,由于有優先級的問題,而且從最前面開始取任務,是以儲存成set

//! 保證優先級高、JobIndex小(同優先級時FIFO) 的Job在最前面

typedeftypenameUnreferenceLess<CFJobBase<T>* >JobBaseUnreferenceLess;

typedefstd::set<CFJobBase<T>*,JobBaseUnreferenceLess > WaitingJobContainer;

WaitingJobContainer        m_WaitingJobs;    //!等待運作的Job

//! 儲存運作Job的資訊,由于會頻繁加入、删除,且需要按照JobIndex查找删除,是以儲存成map

typedefstd::map<LONG,CFJobBase<T>* >   DoingJobContainer;

DoingJobContainer      m_DoingJobs;  //! 正在運作的Job

4.2.     Job優先級

代碼中通過operator < 方法比較CFJobBase<T>::m_nJobPriority和 JobIndex,在SubmitJob時保證其在 set 中的順序來保證 優先級高的Job先執行,相同優先級的Job采用FIFO方式。

4.3.     Job對暫停、停止的支援

為了支援暫停、停止,Job的子類必須在工作循環中調用父類提供 GetJobWaitType方法,并判斷其傳回值。如果傳回值為ftwtStop 表示使用者請求了停止,需要進行必要的清除工作。GetJobWaitType的實作請參見代碼(本質是等待m_hEventJobStop, m_pThreadPool->m_hEventStop, m_pThreadPool->m_hEventContinue這三個手動重置事件之一)

4.4.     線程池對Job的控制

線程池中主要有以下的一些函數,因為意義根據名字很容易猜出,就不再詳細介紹,詳見示例。

Start(LONGnMinNumThreads, LONG nMaxNumThreads);

StopAndWait(DWORDdwTimeOut = FTL_MAX_THREAD_DEADLINE_CHECK);

ClearUndoWork();

SubmitJob(CFJobBase<T>*pJob, LONG* pOutJobIndex); //加入Job,會傳回Job在Pool中的唯一索引,可以通過CancelJob 取消。

Pause/Resume/Stop// 對整個線程池請求暫停、繼續、停止的操作。注意:需要Job子類的配合才能達成目标。

4.5.     資源的處理

CFJobBase子類需要 Initialize、Run、Finalize、OnCancelJob 這幾個虛函數。

當Job運作的時候,其邏輯為 if(Initialize){ Run -> Finalize; },需要在 Finalize 中釋放資源;

當Job沒有運作的時候就被取消或Pool停止,則會調用 OnCancelJob,需要在其中釋放資源。

5.  示例程式

編寫了簡單的MFC示例程式,其界面如圖所示:

因為UI不是重點,而且本人也比較懶,是以沒有弄很好的UI出來,所有的運作資訊請參見VisualStudio中的“輸出”視窗。

Start 按鈕啟動線程池,示例中設定的最小、最大線程個數是 2-4(即會根據加入的Job自動在 2-4 個線程之間自動調整)。

       三個AddJob按鈕分别是增加 高、普通、低優先級的Job,加入後可以在日志中檢視其運作的順序。

       兩個CancelJob按鈕分别是從前面、後面的JobIndex取消Job(實際上支援任和有效的JobIndex)。

Windows下一個比較完美的線程池實作

6.  特别說明

6.1.     IFThreadPoolCallBack 回調的同步

Pool 調用 IFThreadPoolCallBack 接口的各個方法時,為了性能上的考慮,沒有加鎖進行同步。使用時如果有需要,最好自行同步。

6.2.     單任務的暫停、繼續 VS Pool的暫停、繼續

在開發過程中,考慮過是提供單任務的暫停、繼續 還是 整個Pool 的暫停、繼續,考慮到目前的需求,暫時隻支援整個Pool的暫停,免得過分複雜,如有需要,大家可以自行參考CancelJob實作。

6.3.     可能存在的Bug和解決方法

在開發的過程中,對各個部分都進行了詳細的分析、考慮和測試,應該沒有較大的Bug。目前隻想到一種在極端情況下,可能會造成的Bug,特提出以免各位踩雷。

描述:因為是多線程的代碼,是以在極端情況下,可能出現SubmitJob函數尚未傳回,對應的Job就執行完畢(或 Initialize失敗直接傳回)的情況。此時如果調用端采用了将 *pOutJobIndex儲存起來,在 OnJobEnd 中清除的邏輯,可能會因為 OnJobEnd 找不到對應的JobIndex 而出現邏輯錯誤。

解決方法:在調用 SubmitJob 的代碼和OnJobEnd 的回調代碼中,使用相同的鎖機制保證即可(這個也應該由調用者來保證

繼續閱讀