一、線程池的使用
1.1 線程池的建立
我們可以通過ThreadPoolExecutor來建立一個線程池。
new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
建立一個線程池時需要輸入幾個參數,如下。
- corePoolSize(線程池的基本大小):當送出一個任務到線程池時,線程池會建立 一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會建立線程,等到需要 執行的任務數大于線程池基本大小時就不再建立。如果調用了線程池的 prestartAllCoreThreads()方法,線程池會提前建立并啟動所有基本線程。
- runnableTaskQueue(任務隊列):用于儲存等待執行的任務的阻塞隊列。可以選 擇以下幾個阻塞隊列。
ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按FIFO(先進 先出)原則對元素進行排序。 LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,此隊列按FIFO排序元素, 吞吐量通常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool() 使用了 這個隊列。 SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線 程調用移除操作, 否則插入操作一直處于阻塞狀态,吞吐量通常要高于Linked- BlockingQueue,靜态工廠方法 Executors.newCachedThreadPool使用了這個隊列。 PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
- maximumPoolSize(線程池最大數量):線程池允許建立的最大線程數。如果隊列滿了,并且已建立的線程數小于最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,如果使用了無界的任務隊列這個參數就沒什麼效果。
- ThreadFactory:用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的 線程設定更有意義的名字。使用開源架構guava提供的ThreadFactoryBuilder可以快速給線 程池裡的線程設定有意義的名字,代碼如下。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
- RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處 于飽和狀态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是 AbortPolicy,表示無法處理新任務時抛出異常。在JDK 1.5中Java線程池架構提供了以下4 種政策。
當然,也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如 記錄日志或持久化存儲不能處理的任務。AbortPolicy:直接抛出異常。 CallerRunsPolicy:隻用調用者所線上程來運作任務。 DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。 DiscardPolicy:不處理,丢棄掉。
- keepAliveTime(線程活動保持時間):線程池的工作線程空閑後,保持存活的時間。是以,如果任務很多,并且每個任務執行的時間比較短,可以調大時間,提高線程的 使用率。
- TimeUnit(線程活動保持時間的機關):可選的機關有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒 (MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
1.2 建立一個核心線程數10,最大線程數20,線程池的工作線程空閑後存活的時間3000 ms,工作隊列是ArrayBlockingQueue,飽和政策是AbortPolicy的線程池。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
1.3 向線程池送出任務
可以使用兩個方法向線程池送出任務,分别為
execute()
和
submit()
方法。
1.3.1 execute()方法
execute()方法用于送出不需要傳回值的任務,是以無法判斷任務是否被線程池執行成功。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(() -> System.out.println("線程執行了"));
threadPoolExecutor.shutdown();
1.3.2 submit()
submit()方法用于送出需要傳回值的任務。線程池會傳回一個future類型的對象,通過 這個future對象可以判斷任務是否執行成功,并且可以通過future的get()方法來擷取傳回值,get()方法會阻塞目前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方 法則會阻塞目前線程一段時間後立即傳回,這時候有可能任務沒有執行完。

在這裡插入代碼片
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
Future<String> thread1Result = threadPoolExecutor.submit(new Thread1());
System.out.println(thread1Result.get());//輸出 hello ...
threadPoolExecutor.shutdown();
}
public static class Thread1 implements Callable<String> {
@Override
public String call(){
return "hello ...";
}
}
二、合理地配置線程池
要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。
- 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。 ·
- 任務的優先級:高、中和低。 ·
- 任務的執行時間:長、中和短。
- 任務的依賴性:是否依賴其他系統資源,如資料庫連接配接。
性質不同的任務可以用不同規模的線程池分開處理。
- CPU密集型任務應配置盡可能小的線程,如配置cpu個數+1個線程的線程池。
- 由于IO密集型任務線程并不是一直在執行任務, 則應配置盡可能多的線程,如兩倍的cpu個數。
- 混合型的任務,如果可以拆分,将其拆分成一個 CPU密集型任務和一個IO密集型任務,隻要這兩個任務執行的時間相差不是太大,那麼分 解後執行的吞吐量将高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必 要進行分解。可以通過
方法獲得目前裝置的CPU 個數。Runtime.getRuntime().availableProcessors()
優先級不同的任務可以使用優先級隊列
PriorityBlockingQueue
來處理。它可以讓優先級高的任務先執行。
注意 如果一直有優先級高的任務送出到隊列裡,那麼優先級低的任務可能永遠 不能執行。
執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先級隊列, 讓執行時間短的任務先執行。
依賴資料庫連接配接池的任務,因為線程送出SQL後需要等待資料庫傳回結果,等待的時間越長,則CPU空閑時間就越長,那麼線程數應該設定得越大,這樣才能更好地利用 CPU。
建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一 點兒,比如幾千。
三、監控線程池
如果在系統中大量使用線程池,則有必要對線程池進行監控,友善在出現問題時,可 以根據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線 程池的時候可以使用以下屬性。
taskCount:線程池需要執行的任務數量。
completedTaskCount:線程池在運作過程中已完成的任務數量,小于或等于 taskCount。
largestPoolSize:線程池裡曾經建立過的最大線程數量。通過這個資料可以知道線程 池是否曾經滿過。
如該數值等于線程池的最大大小,則表示線程池曾經滿過。
getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池裡的線程不會自 動銷毀,是以這個大小隻增不減。
getActiveCount:擷取活動的線程數。
通過擴充線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的 beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和線程池關 閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author : pengweiwei
* @date : 2020/2/2 8:19 下午
*/
public class MyThreadPool extends ThreadPoolExecutor{
public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
//可以列印一些線程池的資訊
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//可以列印一些線程池的資訊
super.afterExecute(r, t);
}
@Override
protected void terminated() {
//可以列印一些線程池的資訊
super.terminated();
}
}