![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cGcq5CNxUTM5EDZkJWZ4QjZ1gTMzYzXwIzNwcTM3AzLclDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.jpg)
前言
原以為線程池還挺簡單的(平時常用,也分析過原理),這次是想自己動手寫一個線程池來更加深入的了解它;但在動手寫的過程中落地到細節時發現并沒想的那麼容易。結合源碼對比後确實不得不佩服
DougLea
。
我覺得大部分人直接去看
java.util.concurrent.ThreadPoolExecutor
的源碼時都是看一個大概,因為其中涉及到了許多細節處理,還有部分
AQS
的内容,是以想要理清楚具體細節并不是那麼容易。
與其挨個分析源碼不如自己實作一個簡版,當然簡版并不意味着功能缺失,需要保證核心邏輯一緻。
是以也是本篇文章的目的:
自己動手寫一個五髒俱全的線程池,同時會了解到線程池的工作原理,以及如何在工作中合理的利用線程池。
再開始之前建議對線程池不是很熟悉的朋友看看這幾篇:
這裡我截取了部分内容,也許可以埋個伏筆(坑)。
具體請看這兩個連結。
- 如何優雅的使用和了解線程池
- 線程池中你不容錯過的一些細節
由于篇幅限制,本次可能會分為上下兩篇。
建立線程池
現在進入正題,建立了一個
CustomThreadPool
類,它的工作原理如下:
簡單來說就是往線程池裡邊丢任務,丢的任務會緩沖到隊列裡;線程池裡存儲的其實就是一個個的
Thread
,他們會一直不停的從剛才緩沖的隊列裡擷取任務執行。
流程還是挺簡單。
先來看看我們這個自創的線程池的效果如何吧:
初始化了一個核心為3、最大線程數為5、隊列大小為 4 的線程池。
先往其中丢了 10 個任務,由于阻塞隊列的大小為 4 ,最大線程數為 5 ,是以由于隊列裡緩沖不了最終會建立 5 個線程(上限)。
過段時間沒有任務送出後(
sleep
)則會自動縮容到三個線程(保證不會小于核心線程數)。
構造函數
來看看具體是如何實作的。
下面則是這個線程池的構造函數:
會有以下幾個核心參數:
-
最小線程數,等效于 miniSize
中的核心線程數。ThreadPool
-
最大線程數。maxSize
-
線程保活時間。keepAliveTime
-
阻塞隊列。workQueue
-
通知接口。notify
大緻上都和
ThreadPool
中的參數相同,并且作用也是類似的。
需要注意的是其中初始化了一個
workers
成員變量:
-
/**
-
* 存放線程池
-
*/
-
private volatile Set<Worker> workers;
-
public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime,
-
TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
-
workers = new ConcurrentHashSet<>();
-
}
workers
是最終存放線程池中運作的線程,在
j.u.c
源碼中是一個
HashSet
是以對他所有的操作都是需要加鎖。
我這裡為了簡便起見就自己定義了一個線程安全的
Set
稱為
ConcurrentHashSet
。
其實原理也非常簡單,和
HashSet
類似也是借助于
HashMap
來存放資料,利用其
key
不可重複的特性來實作
set
,隻是這裡的
HashMap
是用并發安全的
ConcurrentHashMap
來實作的。
這樣就能保證對它的寫入、删除都是線程安全的。
不過由于
ConcurrentHashMap
的
size()
函數并不準确,是以我這裡單獨利用了一個
AtomicInteger
來統計容器大小。
建立核心線程
往線程池中丢一個任務的時候其實要做的事情還蠻多的,最重要的事情莫過于建立線程存放到線程池中了。
當然我們不能無限制的建立線程,不然拿線程池來就沒任何意義了。于是
miniSize maxSize
這兩個參數就有了它的意義。
但這兩個參數再哪一步的時候才起到作用呢?這就是首先需要明确的。
從這個流程圖可以看出第一步是需要判斷是否大于核心線程數,如果沒有則建立。
結合代碼可以發現在執行任務的時候會判斷是否大于核心線程數,進而建立線程。
worker.startTask()
執行任務部分放到後面分析。
這裡的
miniSize
由于會在多線程場景下使用,是以也用
volatile
關鍵字來保證可見性。
隊列緩沖
結合上面的流程圖,第二步自然是要判斷隊列是否可以存放任務(是否已滿)。
優先會往隊列裡存放。
上至封頂
一旦寫入失敗則會判斷目前線程池的大小是否大于最大線程數,如果沒有則繼續建立線程執行。
不然則執行會嘗試阻塞寫入隊列(
j.u.c
會在這裡執行拒絕政策)
以上的步驟和剛才那張流程圖是一樣的,這樣大家是否有看出什麼坑嘛?
時刻小心
從上面流程圖的這兩步可以看出會直接建立新的線程。
這個過程相對于中間直接寫入阻塞隊列的開銷是非常大的,主要有以下兩個原因:
- 建立線程會加鎖,雖說最終用的是 ConcurrentHashMap 的寫入函數,但依然存在加鎖的可能。
- 會建立新的線程,建立線程還需要調用作業系統的 API 開銷較大。
是以理想情況下我們應該避免這兩步,盡量讓丢入線程池中的任務進入阻塞隊列中。
執行任務
任務是添加進來了,那是如何執行的?
在建立任務的時候提到過
worker.startTask()
函數:
-
/**
-
* 添加任務,需要加鎖
-
* @param runnable 任務
-
*/
-
private void addWorker(Runnable runnable) {
-
Worker worker = new Worker(runnable, true);
-
worker.startTask();
-
workers.add(worker);
-
}
也就是在建立線程執行任務的時候會建立
Worker
對象,利用它的
startTask()
方法來執行任務。
是以先來看看
Worker
對象是長啥樣的:
其實他本身也是一個線程,将接收到需要執行的任務存放到成員變量
task
處。
而其中最為關鍵的則是執行任務
worker.startTask()
這一步驟。
-
public void startTask() {
-
thread.start();
-
}
其實就是運作了
worker
線程自己,下面來看
run
方法。
- 第一步是将建立線程時傳過來的任務執行(
),接着會一直不停的從隊列裡擷取任務執行,直到擷取不到新任務了。task.run
- 任務執行完畢後将内置的計數器 -1 ,友善後面任務全部執行完畢進行通知。
- worker 線程擷取不到任務後退出,需要将自己從線程池中釋放掉(
)。workers.remove(this)
從隊列裡擷取任務
其實
getTask
也是非常關鍵的一個方法,它封裝了從隊列中擷取任務,同時對不需要保活的線程進行回收。
很明顯,核心作用就是從隊列裡擷取任務;但有兩個地方需要注意:
- 當線程數超過核心線程數時,在擷取任務的時候需要通過保活時間從隊列裡擷取任務;一旦擷取不到任務則隊列肯定是空的,這樣傳回
之後在上文的 null
中就會退出這個線程;進而達到了回收線程的目的,也就是我們之前示範的效果run()
- 這裡需要加鎖,加鎖的原因是這裡肯定會出現并發情況,不加鎖會導緻
條件多次執行,進而導緻線程被全部回收完畢。workers.size()>miniSize
關閉線程池
最後來談談線程關閉的事;
還是以剛才那段測試代碼為例,如果送出任務後我們沒有關閉線程,會發現即便是任務執行完畢後程式也不會退出。
從剛才的源碼裡其實也很容易看出來,不退出的原因是
Worker
線程一定還會一直阻塞在
task=workQueue.take();
處,即便是線程縮容了也不會小于核心線程數。
通過堆棧也能證明:
恰好剩下三個線程阻塞于此處。
而關閉線程通常又有以下兩種:
- 立即關閉:執行關閉方法後不管現線上程池的運作狀況,直接一刀切全部停掉,這樣會導緻任務丢失。
- 不接受新的任務,同時等待現有任務執行完畢後退出線程池。
立即關閉
我們先來看第一種
立即關閉
:
1. /**
2. * 立即關閉線程池,會造成任務丢失
3. */
4. public void shutDownNow() {
5. isShutDown.set(true);
6. tryClose(false);
7. }
8.
9. /**
10. * 關閉線程池
11. *
12. * @param isTry true 嘗試關閉 --> 會等待所有任務執行完畢
13. * false 立即關閉線程池--> 任務有丢失的可能
14. */
15. private void tryClose(boolean isTry) {
16. if (!isTry) {
17. closeAllTask();
18. } else {
19. if (isShutDown.get() && totalTask.get() == 0) {
20. closeAllTask();
21. }
22. }
23.
24. }
25.
26. /**
27. * 關閉所有任務
28. */
29. private void closeAllTask() {
30. for (Worker worker : workers) {
31. //LOGGER.info("開始關閉");
32. worker.close();
33. }
34. }
35.
36. public void close() {
37. thread.interrupt();
38. }
很容易看出,最終就是周遊線程池裡所有的
worker
線程挨個執行他們的中斷函數。
我們來測試一下:
可以發現後面丢進去的三個任務其實是沒有被執行的。
完事後關閉
而正常關閉則不一樣:
1. /**
2. * 任務執行完畢後關閉線程池
3. */
4. public void shutdown() {
5. isShutDown.set(true);
6. tryClose(true);
7. }
他會在這裡多了一個判斷,需要所有任務都執行完畢之後才會去中斷線程。
同時線上程需要回收時都會嘗試關閉線程:
來看看實際效果:
回收線程
上文或多或少提到了線程回收的事情,其實總結就是以下兩點:
- 一旦執行了
方法都會将線程池的狀态置為關閉狀态,這樣隻要 shutdown/shutdownNow
線程嘗試從隊列裡擷取任務時就會直接傳回空,導緻 worker
線程被回收。worker
- 一旦線程池大小超過了核心線程數就會使用保活時間來從隊列裡擷取任務,是以一旦擷取不到傳回
時就會觸發回收。null
但如果我們的隊列足夠大,導緻線程數都不會超過核心線程數,這樣是不會觸發回收的。
比如這裡我将隊列大小調為 10 ,這樣任務就會累計在隊列裡,不會建立五個
worker
線程。
是以一直都是
Thread-1~3
這三個線程在反複排程任務。
總結
本次實作了線程池裡大部分核心功能,我相信隻要看完并動手敲一遍一定會對線程池有不一樣的了解。
結合目前的内容來總結下:
- 線程池、隊列大小要設計的合理,盡量的讓任務從隊列中擷取執行。
- 慎用
方法關閉線程池,會導緻任務丢失(除非業務允許)。shutdownNow()
- 如果任務多,線程執行時間短可以調大
值,使得線程盡量不被回收進而可以複用線程。keepalive
同時下次會分享一些線程池的新特性,如:
- 執行帶有傳回值的線程。
- 異常處理怎麼辦?
- 所有任務執行完怎麼通知我?
本文所有源碼:
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java
你的點贊與分享是對我最大的支援