Java多線程之線程池ThreadPool詳解
- 1. 線程池簡介
- 2. 線程池特點
- 3. 線程池核心參數
-
- 3.1 核心參數
- 3.2 拒絕政策
- 4. 線程池工具類
-
- 4.1 四種線程池建立方式
- 5. 線程池工作原理
- 6. 不建議使用Executors建立線程池
- 7. 實作驗證
1. 線程池簡介
線程池(英語: thread pool):一種線程使用模式。線程過多會帶來排程開銷,進而影響緩存局部性和整體性能。而線程池維護着多個線程,等待着監督管理者配置設定可并發執行的任務。這避免了在處理短時間任務時建立與銷毀線程的代價。線程池不僅能夠保證核心的充分利用,還能防止過分排程。
2. 線程池特點
線程池的優勢: 線程池做的工作隻要是控制運作的線程數量,處理過程中将任務放入隊列,然後線上程建立後啟動這些任務,如果線程數量超過了最大數量,超出數量的線程排隊等候,等其他線程執行完畢,再從隊列中取出任務來執行。
它的主要特點為:
1.降低資源消耗: 通過重複利用已建立的線程降低線程建立和銷毀造成的銷耗。
2.提高響應速度: 當任務到達時,任務可以不需要等待線程建立就能立即執行。
3.提高線程的可管理性: 線程是稀缺資源,如果無限制的建立,不僅會銷耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。
4.Java 中的線程池是通過 Executor 架構實作的,該架構中用到了Executor, Executors,ExecutorService, ThreadPoolExecutor 這幾個類。

3. 線程池核心參數
3.1 核心參數
• corePoolSize 線程池的核心線程數
• maximumPoolSize 能容納的最大線程數
• keepAliveTime 空閑線程存活時間
• unit 存活的時間機關
• workQueue 存放送出但未執行任務的隊列
• threadFactory 建立線程的工廠類
• handler 等待隊列滿後的拒絕政策
線程池中,有三個重要的參數,決定影響了拒絕政策:
corePoolSize - 核心線程數,也即最小的線程數。
workQueue - 阻塞隊列 。
maximumPoolSize -最大線程數。
當送出任務數大于 corePoolSize 的時候,會優先将任務放到 workQueue 阻塞隊列中。當阻塞隊列飽和後,會擴充線程池中線程數,直到達到maximumPoolSize 最大線程數配置。此時,再多餘的任務,則會觸發線程池的拒絕政策了。總結起來,也就是一句話, 當送出的任務數大于(workQueue.size()+maximumPoolSize ),就會觸發線程池的拒絕政策。
3.2 拒絕政策
CallerRunsPolicy: 當觸發拒絕政策,隻要線程池沒有關閉的話,則使用調用線程直接運作任務。一般并發比較小,性能要求不高,不允許失敗。但是,由于調用者自己運作任務,如果任務送出速度過快,可能導緻程式阻塞,性能效率上必然的損失較大。
AbortPolicy: 丢棄任務,并抛出拒絕執行 RejectedExecutionException 異常資訊。線程池預設的拒絕政策。必須處理好抛出的異常,否則會打斷目前的執行流程,影響後續的任務執行。
DiscardPolicy: 直接丢棄,其他啥都沒有。
DiscardOldestPolicy: 當觸發拒絕政策,隻要線程池沒有關閉的話,丢棄阻塞隊列 workQueue 中最老的一個任務,并将新任務加入。
4. 線程池工具類
4.1 四種線程池建立方式
Executors 是一個 Java 中的工具類。提供工廠方法來建立不同類型的線程池。Executors 的建立線程池的方法,建立出來的線程池都實作了ExecutorService 接口。常用方法有以下幾個:
1.newFiexedThreadPool(int Threads):建立固定數目線程的線程池。
2.newCachedThreadPool():建立一個可緩存的線程池,調用 execute将重用以前構造的線程(如果線程可用)。如果沒有可用的線程,則建立一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
3.newSingleThreadExecutor() 建立一個單線程化的 Executor。
4.newScheduledThreadPool(int corePoolSize) 建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代 Timer 類。
5. 線程池工作原理
1.在建立了線程池後,線程池中的線程數為零。
2.當調用 execute()方法添加一個請求任務時,線程池會做出如下判斷:
2.1 如果正在運作的線程數量小于 corePoolSize,那麼馬上建立線程運作這個任務;
2.2 如果正在運作的線程數量大于或等于 corePoolSize,那麼将這個任務放入隊列;
2.3 如果這個時候隊列滿了且正在運作的線程數量還小于maximumPoolSize,那麼還是要建立非核心線程立刻運作這個任務; 2.4 如果隊列滿了且正在運作的線程數量大于或等于maximumPoolSize,那麼線程池會啟動飽和拒絕政策來執行。
4.當一個線程完成任務時,它會從隊列中取下一個任務來執行
5.當一個線程無事可做超過一定的時間(keepAliveTime)時,線程會判斷:
4.1 如果目前運作的線程數大于 corePoolSize,那麼這個線程就被停掉。
4.2是以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
6. 不建議使用Executors建立線程池
不建議大家使用Executors這個類來建立線程池呢,阿裡開發手冊這樣定義:
【強制】線程池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險。說明: Executors 傳回的線程池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,進而導緻 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允許的建立線程數量為 Integer.MAX_VALUE, 可能會建立大量的線程,進而導緻 OOM。
項目中建立多線程時,使用常見的三種線程池建立方式,單一、可變、定長都有一定問題,原因是 FixedThreadPool 和 SingleThreadExecutor 底層都是用LinkedBlockingQueue 實作的,這個隊列最大長度為 Integer.MAX_VALUE,容易導緻 OOM。
是以實際生産一般自己通過 ThreadPoolExecutor 的 7 個參數,自定義線程池。
7. 實作驗證
package com.zrj.unit.juc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Test;
import java.util.concurrent.*;
/**
* 線程池
* 注意:ThreadFactoryBuilder這裡需要運用guava包,自定義線程名稱
*
* @author zrj
* @since 2021/8/20
**/
public class ThreadPoolExecutorTest {
// 定義售票視窗數
private static int saleWindows = 10;
// 自定義線程名稱
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").build();
// 定義線程池
private static ExecutorService pool = new ThreadPoolExecutor(30, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
// 倒計時鎖存器
private static CountDownLatch latch = new CountDownLatch(saleWindows);
/**
* 模拟搶票
*/
@Test
public void saleTicket() {
try {
for (int i = 0; i < saleWindows; i++) {
pool.execute(() -> {
try {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + "視窗開始售票");
Thread.sleep(1000);
System.out.println(threadName + "視窗售票完成");
} catch (InterruptedException e) {
System.out.println("售票異常:" + e);
} finally {
latch.countDown();
}
});
}
} catch (Exception e) {
System.out.println("系統異常:" + e);
} finally {
pool.shutdown();
}
// 等待所有線程到達放行
try {
latch.await();
} catch (Exception e) {
System.out.println("系統異常," + e);
}
System.out.println("所有線程執行完成,繼續執行主線程");
}
}