看阿裡巴巴開發手冊并發程式設計這塊有一條:線程池不允許使用 Executors 去建立,而是通過ThreadPoolExecutor的方式,通過源碼分析禁用的原因。
寫在前面
首先感謝大家在蓋樓的間隙閱讀本篇文章,通過閱讀本篇文章你将了解到:
線程池的定義
Executors建立線程池的幾種方式
ThreadPoolExecutor對象
線程池執行任務邏輯和線程池參數的關系
Executors建立傳回ThreadPoolExecutor對象
OOM異常測試
如何定義線程池參數
如果隻想知道原因可以直接拉到總結那。
管理一組工作線程。通過線程池複用線程有以下幾點優點:
減少資源建立 => 減少記憶體開銷,建立線程占用記憶體
降低系統開銷 => 建立線程需要時間,會延遲處理的請求
提高穩定穩定性 => 避免無限建立線程引起的OutOfMemoryError【簡稱OOM】
Executors建立線程池的方式
根據傳回的對象類型建立線程池可以分為三類:
建立傳回ThreadPoolExecutor對象
建立傳回ScheduleThreadPoolExecutor對象
建立傳回ForkJoinPool對象
本文隻讨論建立傳回ThreadPoolExecutor對象
在介紹Executors建立線程池方法前先介紹一下ThreadPoolExecutor,因為這些建立線程池的靜态方法都是傳回ThreadPoolExecutor對象,和我們手動建立ThreadPoolExecutor對象的差別就是我們不需要自己傳構造函數的參數。
ThreadPoolExecutor的構造函數共有四個,但最終調用的都是同一個:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
構造函數參數說明:
corePoolSize => 線程池核心線程數量
maximumPoolSize => 線程池最大數量
keepAliveTime => 空閑線程存活時間
unit => 時間機關
workQueue => 線程池所使用的緩沖隊列
threadFactory => 線程池建立線程使用的工廠
handler => 線程池對拒絕任務的處理政策

執行邏輯說明:
判斷核心線程數是否已滿,核心線程數大小和corePoolSize參數有關,未滿則建立線程執行任務
若核心線程池已滿,判斷隊列是否滿,隊列是否滿和workQueue參數有關,若未滿則加入隊列中
若隊列已滿,判斷線程池是否已滿,線程池是否已滿和maximumPoolSize參數有關,若未滿建立線程執行任務
若線程池已滿,則采用拒絕政策處理無法執執行的任務,拒絕政策和handler參數有關
Executors建立傳回ThreadPoolExecutor對象的方法共有三種:
Executors#newCachedThreadPool => 建立可緩存的線程池
Executors#newSingleThreadExecutor => 建立單線程的線程池
Executors#newFixedThreadPool => 建立固定長度的線程池
Executors#newCachedThreadPool方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool是一個根據需要建立新線程的線程池
corePoolSize => 0,核心線程池的數量為0
maximumPoolSize => Integer.MAX_VALUE,可以認為最大線程數是無限的
keepAliveTime => 60L
unit => 秒
workQueue => SynchronousQueue
當一個任務送出時,corePoolSize為0不建立核心線程,SynchronousQueue是一個不存儲元素的隊列,可以了解為隊裡永遠是滿的,是以最終會建立非核心線程來執行任務。對于非核心線程空閑60s時将被回收。
因為Integer.MAX_VALUE非常大,可以認為是可以無限建立線程的,在資源有限的情況下容易引起OOM異常
Executors#newSingleThreadExecutor方法
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor是單線程線程池,隻有一個核心線程
corePoolSize => 1,核心線程池的數量為1
maximumPoolSize => 1,隻可以建立一個非核心線程
keepAliveTime => 0L
unit => 毫秒
workQueue => LinkedBlockingQueue
當一個任務送出時,首先會建立一個核心線程來執行任務,如果超過核心線程的數量,将會放入隊列中,因為LinkedBlockingQueue是長度為Integer.MAX_VALUE的隊列,可以認為是無界隊列,是以往隊列中可以插入無限多的任務,在資源有限的時候容易引起OOM異常,同時因為無界隊列,maximumPoolSize和keepAliveTime參數将無效,壓根就不會建立非核心線程。
Executors#newFixedThreadPool方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool是固定核心線程的線程池,固定核心線程數由使用者傳入
它和SingleThreadExecutor類似,唯一的差別就是核心線程數不同,并且由于使用的是LinkedBlockingQueue,在資源有限的時候容易引起OOM異常
總結:
FixedThreadPool和SingleThreadExecutor => 允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,進而引起OOM異常
CachedThreadPool => 允許建立的線程數為Integer.MAX_VALUE,可能會建立大量的線程,進而引起OOM異常
這就是為什麼禁止使用Executors去建立線程池,而是推薦自己去建立ThreadPoolExecutor的原因
理論上會出現OOM異常,必須測試一波驗證之前的說法:
測試類:TaskTest.java
public class TaskTest {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
int i = 0;
while (true) {
es.submit(new Task(i++));
}
}
}
使用Executors建立的CachedThreadPool,往線程池中無限添加線程在啟動測試類之前先将JVM記憶體調整小一點,不然很容易将電腦跑出問題【别問我為什麼知道,是鐵憨憨甜沒錯了!!!】,在idea裡:Run -> Edit Configurations
- -Xms10M => Java Heap記憶體初始化值
- -Xmx10M => Java Heap記憶體最大值
運作結果:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Disconnected from the target VM, address: '127.0.0.1:60416', transport: 'socket'
建立到3w多個線程的時候開始報 OOM 錯誤
另外兩個線程池就不做測試了,測試方法一緻,隻是建立的線程池不一樣
CPU密集型 => 線程池的大小推薦為CPU數量 + 1,CPU數量可以根據Runtime.availableProcessors方法擷取
IO密集型 => CPU數量 * CPU使用率 * (1 + 線程等待時間/線程CPU時間),CPU密集型、IO密集型,看下這篇。
混合型 => 将任務分為CPU密集型和IO密集型,然後分别使用不同的線程池去處理,進而使每個線程池可以根據各自的工作負載來調整
阻塞隊列 => 推薦使用有界隊列,有界隊列有助于避免資源耗盡的情況發生
拒絕政策 => 預設采用的是AbortPolicy拒絕政策,直接在程式中抛出RejectedExecutionException異常【因為是運作時異常,不強制catch】,這種處理方式不夠優雅。線程池 8 大拒絕政策,這篇很全。
處理拒絕政策有以下幾種比較推薦:
在程式中捕獲RejectedExecutionException異常,在捕獲異常中對任務進行處理。針對預設拒絕政策
使用CallerRunsPolicy拒絕政策,該政策會将任務交給調用execute的線程執行【一般為主線程】,此時主線程将在一段時間内不能送出任何任務,進而使工作線程處理正在執行的任務。此時送出的線程将被儲存在TCP隊列中,TCP隊列滿将會影響用戶端,這是一種平緩的性能降低
自定義拒絕政策,隻需要實作RejectedExecutionHandler接口即可
如果任務不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒絕政策将任務丢棄也是可以的
如果使用Executors的靜态方法建立ThreadPoolExecutor對象,可以通過使用Semaphore對任務的執行進行限流也可以避免出現OOM異常。