優勢
- 建構一個新的線程是有一定代價的, 因為涉及與作業系統的互動。如果程式中建立了大量的生命期很短的線程,應該使用線程池
- 減少并發線程的數目
Executors
執行器( Executors) 類有許多靜态工廠方法用來建構線程池
方法 | 描述 |
newCachedThreadPool | 必要時建立新線程;空閑線程會被保留 60 秒 |
newFixedThreadPool | 該池包含固定數量的線程;空閑線程會一直被保留 |
newSingleThreadExecutor | 隻有一個線程的 “ 池”, 該線程順序執行每一個送出的任務(類似于 Swing 事件配置設定線程) |
newScheduledThreadPool | 用于預定執行而建構的固定線程池, 替代 java.util.Timer |
newSingleThreadScheduledExecutor | 用于預定執行而建構的單線程 “ 池” |
newWorkStealingPool (1.8新增) | 建立一個線程池,該線程池維護足夠的線程以支援給定的并行級别,并且可以使用多個隊列來減少争用。并行級别對應于主動參與或可用于進行任務處理的最大線程數。線程的實際數量可以動态地增長和收縮。工作竊取池不能保證執行送出的任務的順序。 |
重點介紹下列三種
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool 方法建構了一個線程池,初始線程數0,對于每個任務,如果有空閑線程可用,立即讓它執行任務,如果沒有可用的空閑線程,則建立一個新線程。60秒後回收不使用的線程。
- newFixedThreadPool 方法建構一個具有固定大小的線程池。 如果送出的任務數多于空閑的線程數,那麼把得不到服務的任務放置到隊列中。當其他任務完成以後再運作它們。線程使用完立即回收。
- newSingleThreadExecutor 是一個退化了大小為 1 的線程池: 由一個線程執行送出的任務,一個接着一個。
這3個方法傳回實作了ExecutorService 接口的ThreadPoolExecutor 類的對象。再将一個 Runnable 對象或 Callable 對象送出給 ExecutorService得到Future
ExecutorService service = Executors.newFixedThreadPool(10);
Callable<Integer> callable = new Callable<Integer>(){
@Override
public Integer call() throws Exception {
return 1;
}
};
Future<Integer> future = service.submit(callable);
int i = 0;
try {
i = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println(i);
ExecutorService
- Future<?> submit(Runnable task) 使用這樣一個對象來調用 isDone、 cancel 或 isCancelled。但是, get 方法在完成的時候隻是簡單地傳回 null。
- Future<T> submit(Runnable task, T result) Future 的 get 方法在完成的時候傳回指定的 result 對象。
- Future<T> submit(Callable<T> task) 傳回的 Future 對象将在計算結果準備好的時候得到它。
- void shutdown() 關閉服務, 會先完成已經送出的任務而不再接收新的任務。
ThreadPoolExecutor
使用ThreadPoolExecutor對象構造線程池。
我們觀察Executors的newCachedThreadPool源碼,内部執行個體化了一個ThreadPoolExecutor對象。
觀察ThreadPoolExecutor構造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
從結構關系上分析ThreadPoolExecutor繼承了AbstractExecutorService抽象類,并實作了ExecutorService接口
從構造參數上分析
- corePoolSize: corePoolSize線程池中要保持的數量,即使它們是空閑的,除非設定了allowCoreThreadTimeOut屬性為true
- maximumPoolSize:最大允許在池中允許的最大線程數。
- keepAliveTime:當線程的數量大于核心時,這是空閑線程在終止之前等待新任務的最大時間。(超過核心線程數的線程如果長時間獲得不到新任務就會終止掉)
- unit : keepAliveTime的機關 如TimeUnit.SECONDS 代表秒
- workQueue:工作隊列(實作BlockingQueue接口的阻塞隊列)用于在執行任務之前儲存任務的隊列。此隊列隻保留execute方法送出的Runnable任務。常使用的阻塞隊列LinkedBlockingQueue、SynchronousQueue、ArrayBlockingQueue。使用SynchronousQueue隊清單示,沒有緩存隊列在存儲多餘的線程。
線程的排隊政策與阻塞隊列相關:
LinkedBlockingQueue:當任務數大于核心線程數時,新任務添加到隊列中,當隊列容量達到最大時,将建立新線程,直到達到最大線程數maximumPoolSize。(基于連結清單實作的一個阻塞隊列,預設容器大小Integer.MAX_VALUE。)
SynchronousQueue: 當任務數大于核心線程數時,直接建立新的線程直到達到最大線程數maximumPoolSize。(一個沒有内部容量的阻塞隊列,隊列中每個元素的插入操作必須等待另一個線程執行相應的移除操作。可以了解為沒有緩存)
ArrayBlockingQueue : 同LinkedBlockingQueue。(基于數組實作的一個阻塞隊列,需要指定容器大小。)
- threadFactory:建立新線程時要使用的工廠。
- handler : 處理程式在執行被阻塞時使用的處理程式,因為達到了線程邊界和隊列容量。
handler有以下四種政策:
ThreadPoolExecutor.AbortPolicy():丢棄任務并抛出RejectedExecutionException異常。 (ThreadPoolExecutor預設)
ThreadPoolExecutor.CallerRunsPolicy():由調用線程處理該任務 。
ThreadPoolExecutor.DiscardPolicy():也是丢棄任務,但是不抛出異常。
ThreadPoolExecutor.DiscardOldestPolicy():丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)
小結
使用ThreadPoolExecutor構造方法去生成線程池更加靈活,我們可以靈活指定核心線程數,最大線程數,工作隊列及空閑線程儲存時間。
構造一個核心線程數為5,最大線程數為10,緩存任務數為5,線程逾時為2秒的線程池。
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 2000, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5));