又又又踩坑了
生産有個對賬系統,每天需要從管道端下載下傳對賬檔案,然後開始日終對賬。這個系統已經運作了很久,前兩天突然收到短信預警,沒有擷取管道端對賬檔案。
ps:對賬系統詳細實作方式:對賬系統設計與實作
本以為又是管道端搞事情,上去一排查才發現,所有下載下傳任務都被阻塞了。再進一步排查源碼,才發現自己一直用錯了線程池某個方法。
由于線程建立比較昂貴,正式項目中我們都會使用線程池執行異步任務。線程池,使用池化技術儲存線程對象,使用的時候直接取出來,用完歸還以便使用。
雖然線程池的使用非常方法非常簡單,但是越簡單,越容易踩坑。細數一下,這些年來因為線程池導緻生産事故也有好幾起。
是以今天,小黑哥就針對線程池的話題,給大家示範一下怎麼使用線程池才會踩坑。
希望大家看完,可以完美避開這些坑~
先贊後看,養成習慣。微信搜尋「程式通事」,關注就完事了!
慎用 Executors 元件
Java 從 JDK1.5 開始提供線程池的實作類,我們隻需要在構造函數内傳入相關參數,就可以建立一個線程池。
不過線程池的構造函數可以說非常複雜,就算最簡單的那個構造函數,也需要傳入 5 個參數。這對于新手來說,非常不友善哇。
也許 JDK 開發者也考慮到這個問題,是以非常貼心給我們提供一個工具類
Executors
,用來快捷建立建立線程池。
雖然這個工具類使用真的非常友善,可以少寫很多代碼,但是小黑哥還是建議生産系統還是老老實實手動建立線程池,慎用
Executors
,尤其是工具類中兩個方法
Executors#newFixedThreadPool
與
Executors#newCachedThreadPool
。
如果你圖了友善使用上述方法建立了線程池,那就是一顆定時炸彈,說不準那一天生産系統就會💥。
我們來看兩個🌰,看下這個這兩個方法會有什麼問題。
假設我們有個應用有個批量接口,每次請求将會下載下傳 100w 個檔案,這裡我們使用
Executors#newFixedThreadPool
批量下載下傳。
下面方法中,我們随機休眠,模拟真實下載下傳耗時。
為了快速複現問題,調整 JVM 參數為
-Xmx128m -Xms128m
private ExecutorService threadPool = Executors.newFixedThreadPool(10);
/**
* 批量下載下傳對賬檔案
*
* @return
*/
@RequestMapping("/batchDownload")
public String batchDownload() {
// 模拟下載下傳 100w 個檔案
for (int i = 0; i < 1000000; i++) {
threadPool.execute(() -> {
// 随機休眠,模拟下載下傳耗時
Random random = new Random();
try {
TimeUnit.SECONDS.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
return "process";
}
程式運作之後,多請求幾次這個批量下載下傳方法,程式很快就會 OOM 。
檢視
Executors#newFixedThreadPool
源碼,我們可以看到這個方法建立了一個預設的
LinkedBlockingQueue
當做任務隊列。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
這個問題槽點就在于
LinkedBlockingQueue
,這個隊列的預設構造方法如下:
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
建立
LinkedBlockingQueue
隊列時,如果我們不指定隊列數量,預設數量上限為
Integer.MAX_VALUE
。這麼大的數量,我們簡直可以當做無界隊列了。
上面我們使用
newFixedThreadPool
,我們僅使用了固定數量的線程下載下傳。如果線程都在執行任務,線程池将會任務加入任務隊列中。
如果線程池執行任務過慢,任務将會一直堆積在隊列中。由于我們隊列可以認為是無界的,可以無限制添加任務,這就導緻記憶體占用越來越高,直到 OOM 爆倉。
ps:線程池基本工作原理
下面我們将上面的例子稍微修改一下,使用
newCachedThreadPool
建立線程池。
程式運作之後,多請求幾次這個批量下載下傳方法,程式很快就會 OOM ,不過這次報錯資訊與之前資訊與之前不同。
從報錯資訊來看,這次 OOM 的主要原因是因為無法再建立新的線程。
這次看下一下
newCachedThreadPool
方法的源碼,可以看到這個方法将會建立最大線程數為
Integer.MAX_VALUE
的的線程池。
由于這個線程池使用
SynchronousQueue
隊列,這個隊列比較特殊,沒辦法存儲任務。是以預設情況下,線程池隻要接到一個任務,就會建立一個線程。
一旦線程池收到大量任務,就會建立大量線程。Java 中的線程是會占用一定的記憶體空間 ,是以建立大量的線程是必然會導緻 OOM。
複用線程池
由于線程池的構造方法比較複雜,而
Executors
建立的線程池比較坑,是以我們有個項目中自己封裝了一個線程池工具類。
工具類代碼如下:
public static ThreadPoolExecutor getThreadPool() {
// 為了快速複現問題,故将線程池 核心線程數與最大線程數設定為 100
return new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
}
項目代碼中這樣使用這個工具類:
@RequestMapping("/batchDownload")
public String batchDownload() {
ExecutorService threadPool = ThreadPoolUtils.getThreadPool();
// 模拟下載下傳 100w 個檔案
for (int i = 0; i < 100; i++) {
threadPool.execute(() -> {
// 随機休眠,模拟下載下傳耗時
Random random = new Random();
try {
TimeUnit.SECONDS.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
return "process";
}
使用 WRK 工具對這個接口同時發起多個請求,很快應用就會抛出 OOM。
每次請求都會建立一個新的線程池執行任務,如果短時間内有大量的請求,就會建立很多的線程池,間接導緻建立很多線程。進而導緻記憶體占盡,發生 OOM 問題。
這個問題修複辦法很簡單,要麼工具類生成一個單例線程池,要麼項目代碼中複用建立出來的線程池。
Spring 異步任務
上面代碼中我們都是自己建立一個線程池執行異步任務,這樣還是比較麻煩。在 Spring 中, 我們可以在方法上使用 Spring 注解 @Async,然後執行異步任務。
代碼如下:
@Async
public void async() throws InterruptedException {
log.info("async process");
Random random = new Random();
TimeUnit.SECONDS.sleep(random.nextInt(100));
}
不過使用 Spring 異步任務,我們需要自定義線程池,不然大量請求下,還是有可能發生 OOM 問題。
這是原因主要是 Spring 異步任務預設使用 Spring 内部線程池
SimpleAsyncTaskExecutor
這個線程池比較坑爹,不會複用線程。也就是說來一個請求,将會建立一個線程。
是以如果需要使用異步任務,一定要使用自定義線程池替換預設線程池。
如果使用 XML 配置,我們可以增加如下配置:
<task:executor id="myexecutor" pool-size="5" />
<task:annotation-driven executor="myexecutor"/>
如果使用注解配置,我們需要設定一個 Bean:
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setThreadNamePrefix("test-%d");
// 其他設定
return new ThreadPoolTaskExecutor();
}
然後使用注解時指定線程池名稱:
@Async("threadPoolTaskExecutor")
public void xx() {
// 業務邏輯
}
如果是 SpringBoot 項目,從本人測試情況來看,預設将會建立核心線程數為 8,最大線程數為
Integer.MAX_VALUE
,隊列數也為
Integer.MAX_VALUE
線程池。
ps:以下代碼基于 Spring-Boot 2.1.6-RELEASE,暫不确定 Spring-Boot 1.x 版本是否也是這種政策,熟悉的同學的,也可以留言指出一下。
雖然上面的線程池不用擔心建立過多線程的問題,不是還是有可能隊列任務過多,導緻 OOM 的問題。是以還是建議使用自定義線程池嗎,或者在配置檔案修改預設配置,例如:
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=20
spring.task.execution.pool.queue-capacity=200
Spring 相關踩坑案例: Spring 定時任務突然不執行
線程池方法使用不當
最後再來說下文章開頭的我踩到的這個坑,這個問題主要是因為了解錯這個方法。
錯誤代碼如下:
// 建立線程池
ExecutorService threadPool = ...
List<Callable<String>> tasks = new ArrayList<>();
// 批量建立任務
for (int i = 0; i < 100; i++) {
tasks.add(() -> {
Random random = new Random();
try {
TimeUnit.SECONDS.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
});
}
// 執行所有任務
List<Future<String>> futures = threadPool.invokeAll(tasks);
// 擷取結果
for (Future<String> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
上面代碼中,使用
invokeAll
執行所有任務。由于這個方法傳回值為
List<Future<T>>
,我誤以為這個方法如
submit
一樣,異步執行,不會阻塞主線程。
實際上從源碼上,這個方法實際上逐個調用
Future#get
擷取任務結果,而這個方法會同步阻塞主線程。
一旦某個任務被永久阻塞,比如 Socket 網絡連接配接位置逾時時間,導緻任務一直阻塞在網絡連接配接,間接導緻這個方法一直被阻塞,進而影響後續方法執行。
如果需要使用
invokeAll
方法,最好使用其另外一個重載方法,設定逾時時間。
總結
今天文章通過幾個例子,給大家展示了一下線程池使用過程一些坑。為了快速複現問題,上面的示例代碼還是比較極端,實際中可能并不會這麼用。
不過即使這樣,我們千萬不要抱着僥幸的心理,認為這些任務很快就會執行結束。我們在生産上碰到好幾次事故,正常的情況執行都很快。但是偶爾外部程式抽瘋,傳回時間變長,就可能導緻系統中存在大量任務,導緻 OOM。
最後總結一下幾個線程池幾個最佳實踐:
第一,生産系統慎用
Executors
類提供的便捷方法,我們需要自己根據自己的業務場景,配置合理的線程數,任務隊列,拒絕政策,線程回收政策等等,并且一定記得自定義線程池的命名方式,以便于後期排查問題。
第二,線程池不要重複建立,每次都建立一個線程池可能比不用線程池還要糟糕。如果使用其他同學建立的線程池工具類,最好還是看一下實作方式,防止自己誤用。
第三,一定不要按照自己的片面了解去使用 API 方法,如果把握不準,一定要去看下方法上注釋以及相關源碼。
歡迎關注我的公衆号:程式通事,獲得日常幹貨推送。如果您對我的專題内容感興趣,也可以關注我的部落格:studyidea.cn