ExecutorService使用
- 寫在前面
- 一、線程基礎 Thread
- 一、基本方法介紹
- 1.1、shutdown()
- 1.2、shutdownNow()
- 1.3、awaitTermination(n, TimeUnit)
- 1.4、submit()
- 1.5、invokeAll
- 1.6、invokeAny
- 1.7、isTerminated()
- 二、操作使用
- 2.1、選擇合适的線程池
- 2.2、newFixedThreadPool
- 2.3、newWorkStealingPool
- 2.4、newSingleThreadExecutor
- 2.5、newCachedThreadPool
- 2.6、newScheduledThreadPool
- 2.7、ThreadPoolExecutor
- 三、使用
寫在前面
學習多線程,有幾個很關鍵的點,要提前預習
-
幾個關鍵字
volatile
synchronized
final
static
- Thread基本方法
多線程程式設計方式是由 ExecutorService 發起操作的,其實操作的對象,還是基于線程,關于線程/job/task的顆粒度,以及這裡ExecutorService中線程池的概念,都是要加強學習的
這裡先對 ExecutorService 的幾個方法,簡介一下
一、線程基礎 Thread
參照這裡Thread的基本知識介紹。
一、基本方法介紹
1.1、shutdown()
- 不能接受新的submit
- 并沒有任何的interrupt操作,會等待線程池中所有線程(執行中的以及排隊的)執行完畢
可以了解為是個辨別性質的方法,辨別這程式有意願在此刻終止線程池的後續操作。
1.2、shutdownNow()
- 會嘗試interrupt線程池中正在執行的線程
- 等待執行的線程也會被取消
- 但是并不能保證一定能成功的interrupt線程池中的線程。
- 會傳回并未終止的線程清單List
shutdownNow()方法比shutdown()強硬了很多,不僅取消了排隊的線程而且确實嘗試終止目前正在執行的線程。
1.3、awaitTermination(n, TimeUnit)
- 該方法傳回值為boolean類型
- 方法的兩個參數規定了方法的阻塞時間,在阻塞時間内除非所有線程都執行完畢才會提前傳回true
- 如果到了規定的時間,線程池中的線程并沒有全部結束傳回false
- InterruptedException 這個異常也會導緻方法的終止
利用這個阻塞方法的特性,我們可以優雅的關閉線程池中的任務。
Java中在使用Executors線程池時,有時場景需要主線程等各子線程都運作完畢後再執行。這時候就需要用到ExecutorService接口中的awaitTermination方法.
比如應用場景為線程池的有效執行時間為20S,20S之後不管子任務有沒有執行完畢,都要關閉線程池。代碼如下:
ExecutorService es = Executors.newFixedThreadPool(10);
es.execute(new Thread());//執行子線程任務
try {
es.shutdown();
if(!es.awaitTermination(20,TimeUnit.SECONDS)){//20S
System.out.println(" 到達指定時間,還有線程沒執行完,不再等待,關閉線程池!");
es.shutdownNow();
}
} catch (Throwable e) {
// TODO Auto-generated catch block
es.shutdownNow();
e.printStackTrace();
}
與shutdown()方法結合使用時,尤其要注意的是shutdown()方法必須要在awaitTermination()方法之前調用,該方法才會生效。否則會造成死鎖。
1.4、submit()
重載了三個submit方法,接收入參Runnable,Callable
1.5、invokeAll
任務的批量送出invokeAll(),兩個重載
1.6、invokeAny
方法invokeAny,invokeAll都具有阻塞性。
invokeAny取得第一個方法的傳回值,當第一個任務結束後,會調用interrupt方法中斷其它任務。
invokeAll等線程任務執行完畢後,取得全部任務的結果值。
1.7、isTerminated()
若關閉後所有任務都已完成,則傳回true。注意除非首先調用shutdown或shutdownNow,否則isTerminated永不為true。傳回:若關閉後所有任務都已完成,則傳回true。
二、操作使用
2.1、選擇合适的線程池
有如下常見的幾種線程池的建構,區分場景
newFixedThreadPool ----->
newWorkStealingPool -----> 使用場景:
newSingleThreadExecutor -----> 使用場景:
newCachedThreadPool -----> 使用場景:
newScheduledThreadPool ----->
new ThreadPoolExecutor(…) -----> 使用場景:自适應
2.2、newFixedThreadPool
定義:定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待
使用場景:任務量比較固定但耗時長的任務
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.3、newWorkStealingPool
定義:工作竊取線程池,建立一個擁有多個任務隊列(以便減少連接配接數)的線程池
使用場景:會建立一個含有足夠多線程的線程池,來維持相應的并行級别,它會通過工作竊取的方式,使得多核的 CPU 不會閑置,總會有活着的線程讓 CPU 去運作。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
newWorkStealingPool雖是1.8才有的,但 ForkJoinPool 是1.7就有的對象。
2.4、newSingleThreadExecutor
定義:單線程池,隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
使用場景:多個任務順序執行(FIFO,優先級)
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
裝飾者模式
2.5、newCachedThreadPool
- 定義:緩沖線程池,一個可根據需要建立新線程的線程池,如果現有線程沒有可用的,則建立一個新線程并添加到池中,如果有被使用完但是還沒銷毀的線程,就複用該線程。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。是以,長時間保持空閑的線程池不會使用任何資源。
- 使用場景:任務量大但耗時少的任務
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2.6、newScheduledThreadPool
定義:排程線程池
使用場景:定時以及周期性執行任務
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
2.7、ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}