1.ExecutorService接口繼承于Executor接口(線程池最頂層父類),抽象類AbstractExecutorService實作了ExecutorService接口,最常用的線程池類ThreadPoolExecutor就是繼承于AbstractExecutorService抽象類。
2.接口ExecutorService 中有很多工具方法,ExecutorService接口中的方法有些與 Future 和Callable有關,是以 Future與Callable 的使用是學習 ExecutorService 接口中方法的基礎。
接口ExecutorService的完整實作與繼承結構圖

ExecutorService接口的全部API
ExecutorService接口的invokeAny()和 invokeAll()
1.方法 invokeAny()和 invokeAll()具有阻塞特性。
2.方法 invokeAny()是取得第一個完成任務的結果值,當第一個任務執行完後,會調
interrupt()方法将其他任務中斷,是以在這些任務中可以結合
if (Thread.currentThread(). islnterrupted() ==true)代碼來決定其餘任務是否繼續運作。
3.方法 invokeAll () 等全部線程任務執行完畢後,取得全部完成任務的結果值。
invokeAny()方法
invokeAny()執行後有2種情況:
(1) 無 if (Thread.currentThread().islnterrupted ())代碼:已經獲得第1個運作的結果值後,其他線程繼續運作。
(2)有 if (Thread.currentThread().islnterrupted())代碼:已經獲得第1個運作的結果值後,其他線程如果使用 throw new InterruptedException()代碼則這些線程中斷,雖然 throw 抛出了異常,但在 main 線程中并不能捕獲異常。如果想捕獲異常,則需要在 Callable中使用 try-catch 顯式進行捕獲。
源碼
//傳回值不是List<Future<T>> 直接傳回了Callable的傳回值,不需要通過Future擷取
//參數是Collection,也就是集合,泛型是extends Callable的類
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
invokeAny()執行的任務中,存在部分任務抛異常。
invokeAny()執行的任務中,如果存在部分任務抛異常,會放棄該任務,從其他任務中選取最先傳回的值。
示例
public class ExecutorServiceDemo {
//無異常任務
public static class MyCallableA implements Callable {
String name;
long sleepTime;
public MyCallableA(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
return name + "執行完畢!";
}
}
//異常任務
public static class MyCallableB implements Callable{
String name;
long sleepTime;
public MyCallableB(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
Integer.parseInt("a");
return name + "執行完畢!";
}
}
public static void main(String[] args) {
try {
ExecutorService es = Executors.newCachedThreadPool();
List list = new ArrayList<Callable>();
//正常任務,不抛異常
list.add(new MyCallableA("任務1",2000));
//抛異常任務,該任務先執行完
list.add(new MyCallableB("任務2",1000));
//MyCallableB任務抛異常,隻能擷取MyCallableA的值
Object result = es.invokeAny(list);
System.out.println(result);
System.out.println("結束!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
任務1執行完畢!
結束!
由實驗可知,MyCallableB抛異常之後,invokeAny把關注點放在了其他任務上,并不影響,main主線程也不受影響,正常執行,invokeAny有堵塞效果,在等待任務傳回值傳回時,後面代碼無法執行。
invokeAny()執行的任務中,存在全部任務抛異常。
當一個任務抛異常後,invokeAny會從其他任務中擷取最先傳回的值,如果再有任務抛異常,會再次從現在剩餘的任務中擷取最先傳回的值,直到所有的任務都抛異常,沒有其他任務,此時invokeAny會将任務的異常在主線程抛出。
public class ExecutorServiceDemo {
//異常任務
public static class MyCallableA implements Callable {
String name;
long sleepTime;
public MyCallableA(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
Integer.parseInt(name);
return name + "執行完畢!";
}
}
//異常任務
public static class MyCallableB implements Callable{
String name;
long sleepTime;
public MyCallableB(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
Integer.parseInt(name);
return name + "執行完畢!";
}
}
public static void main(String[] args) {
try {
ExecutorService es = Executors.newCachedThreadPool();
List list = new ArrayList<Callable>();
//抛異常任務,該任務後執行完
list.add(new MyCallableA("任務1",2000));
//抛異常任務,該任務先執行完
list.add(new MyCallableB("任務2",1000));
//兩個任務都抛異常,擷取最後抛異常的任務的錯誤資訊
Object result = es.invokeAny(list);
System.out.println(result);
System.out.println("結束!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
com.minxl.demo.multiThread.ExecutorServiceDemo
java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "任務1"
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:193)
at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:215)
at com.minxl.demo.multiThread.ExecutorServiceDemo.main(ExecutorServiceDemo.java:63)
Caused by: java.lang.NumberFormatException: For input string: "任務1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at com.minxl.demo.multiThread.ExecutorServiceDemo$MyCallableA.call(ExecutorServiceDemo.java:29)
at com.minxl.demo.multiThread.ExecutorServiceDemo$MyCallableA.call(ExecutorServiceDemo.java:15)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
由實驗可知,MyCallableB抛異常,invokeAny隻能擷取MyCallableA的值,後來MyCallableA也抛異常,最終将MyCallableA的異常在主線程抛出,中斷主線程的代碼執行。
方法 invokeAny(CollectionTasks,timeout,timeUnit)逾時的測試
方法<T> T invokeAny(Collection< ? extends Callable> tasks, long timeout, TimeUnit unit) 主要作用就是在指定時間内取得第1個先執行完的任務的結果值,取到之後中斷其他任務線程(需要配合if(Thread.currentThread().isInterrupted() == true)代碼)。如果沒有取到傳回值,逾時了,主線程會報異常TimeoutException,然後中斷所有任務線程(需要配合if(Thread.currentThread().isInterrupted() == true)代碼。
示例
public class ExecutorServiceDemo {
//任務
public static class MyCallableA implements Callable {
String name;
long sleepTime;
public MyCallableA(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
while(1==1){
if(Thread.currentThread().isInterrupted() == true){
System.out.println(name+"被中斷!");
break;
}
}
return name + "執行完畢!";
}
}
//任務
public static class MyCallableB implements Callable{
String name;
long sleepTime;
public MyCallableB(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
while(1==1){
if(Thread.currentThread().isInterrupted() == true){
System.out.println(name+"被中斷!");
break;
}
}
return name + "執行完畢!";
}
}
public static void main(String[] args) {
try {
ExecutorService es = Executors.newCachedThreadPool();
List list = new ArrayList<Callable>();
//不會傳回值
list.add(new MyCallableA("任務1",2000));
//不會傳回值
list.add(new MyCallableB("任務2",1000));
//兩個任務都不會傳回值,main抛異常,invokeAny中斷所有任務
Object result = es.invokeAny(list,2, TimeUnit.SECONDS);
System.out.println(result);
System.out.println("結束!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
任務2被中斷!
任務1被中斷!
java.util.concurrent.TimeoutException
at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:184)
at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:225)
at com.minxl.demo.multiThread.ExecutorServiceDemo.main(ExecutorServiceDemo.java:72)
invokeAll()方法
1.方法 invokeAll()傳回所有任務的執行結果,并且此方法的效果是阻塞執行的
,要把所有的結果都取回再繼續向下運作。
2.invokeAll方法傳回值放在List<Future<T*>>中,list存放任務的順序與放入任務的順序一緻,等所有任務都結束才會傳回List<Future<T*>>
3.子任務(Callable任務)抛異常時,隻有調用Future.get()方法,才能抛出,進而被主線程捕獲。
源碼
//傳回值時list,裡面存放Future對象,傳回值還得從Future裡get()方法擷取。
//參數還是Collection集合,且是Callable實作類的集合
List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
方法 invokeAll(Collection tasks)中存在任務線程異常
public class ExecutorServiceDemo {
//無異常任務
public static class MyCallableA implements Callable {
String name;
long sleepTime;
public MyCallableA(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
System.out.println(name+"執行完,準備傳回值");
return name + "執行完畢!";
}
}
//異常任務
public static class MyCallableB implements Callable{
String name;
long sleepTime;
public MyCallableB(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
Integer.parseInt(name);
System.out.println(name+"執行完,準備傳回值");
return name + "執行完畢!";
}
}
public static void main(String[] args) {
try {
ExecutorService es = Executors.newCachedThreadPool();
List list = new ArrayList<Callable>();
//後執行完,無異常
list.add(new MyCallableA("任務1",2000));
//先執行完,抛異常
list.add(new MyCallableB("任務2",1000));
//執行任務
List<Future> result = es.invokeAll(list);
for(Future future : result){
System.out.println(future.get());
}
System.out.println("結束!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
任務1執行完,準備傳回值
任務1執行完畢!
java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "任務2"
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.example.demo.ExecutorServiceDemo.main(ExecutorServiceDemo.java:58)
Caused by: java.lang.NumberFormatException: For input string: "任務2"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at com.example.demo.ExecutorServiceDemo$MyCallableB.call(ExecutorServiceDemo.java:41)
at com.example.demo.ExecutorServiceDemo$MyCallableB.call(ExecutorServiceDemo.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
任務2先執行完,等待任務1執行完,按順序放入結果List<Future*>中,周遊擷取值時先正确擷取任務1傳回值,再擷取任務2傳回值抛異常,中斷main線程。
方法 invokeAll(Collection tasks,long timeout,TimeUnit unit)
方法 invokeAll(Collection tasks,long timeout,TimeUnit unit)的作用是如果任務在指定的時間内沒有完成,則出現異常:CancellationException。
1.如果前面的任務沒有執行完,在Future.get()時會抛異常CancellationException,代碼不在往下執行。
2.如果後面的任務沒有執行完,在Future.get()時前面規定時間内,傳回值的會正常擷取,後面沒執行完的會抛異常CancellationException,代碼不在往下執行。
示例:先快後慢
public class ExecutorServiceDemo {
//任務
public static class MyCallableA implements Callable {
String name;
long sleepTime;
public MyCallableA(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
System.out.println(name+"執行完,準備傳回值");
return name + "執行完畢!";
}
}
//任務
public static class MyCallableB implements Callable{
String name;
long sleepTime;
public MyCallableB(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
// Integer.parseInt(name);
System.out.println(name+"執行完,準備傳回值");
return name + "執行完畢!";
}
}
public static void main(String[] args) {
try {
ExecutorService es = Executors.newCachedThreadPool();
List list = new ArrayList<Callable>();
//先執行完
list.add(new MyCallableA("任務1",900));
//規定時間沒執行完,被中斷
list.add(new MyCallableB("任務2",1500));
//執行任務
List<Future> result = es.invokeAll(list,1,TimeUnit.SECONDS);
for(Future future : result){
System.out.println(future.get());
}
System.out.println("結束!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
任務1執行完,準備傳回值
任務1執行完畢!
java.util.concurrent.CancellationException
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.example.demo.ExecutorServiceDemo.main(ExecutorServiceDemo.java:58)
上面例子中,任務1在規定時間内執行完,傳回值,任務2沒執行完被中斷,主線程調用時抛異常CancellationException。
示例:先慢後快
public class ExecutorServiceDemo {
//任務
public static class MyCallableA implements Callable {
String name;
long sleepTime;
public MyCallableA(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
System.out.println(name+"執行完,準備傳回值");
return name + "執行完畢!";
}
}
//任務
public static class MyCallableB implements Callable{
String name;
long sleepTime;
public MyCallableB(String name,long sleepTime) {
super();
this.name = name;
this.sleepTime = sleepTime ;
}
@Override
public String call() throws Exception {
Thread.sleep(sleepTime);
// Integer.parseInt(name);
System.out.println(name+"執行完,準備傳回值");
return name + "執行完畢!";
}
}
public static void main(String[] args) {
try {
ExecutorService es = Executors.newCachedThreadPool();
List list = new ArrayList<Callable>();
//後執行完,被中斷
list.add(new MyCallableA("任務1",1500));
//先執行完
list.add(new MyCallableB("任務2",900));
//執行任務
List<Future> result = es.invokeAll(list,1,TimeUnit.SECONDS);
for(Future future : result){
System.out.println(future.get());
}
System.out.println("結束!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
任務2執行完,準備傳回值
java.util.concurrent.CancellationException
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.example.demo.ExecutorServiceDemo.main(ExecutorServiceDemo.java:58)
任務2先執行完,任務1規定時間沒執行完,但是Future.get()先擷取任務1的值,則直接抛異常CancellationException,後面代碼無法執行,任務2的值沒傳回。
總結
接口 ExecutorService 中的方法都以便攜的方式去建立線程池,使用兩個主要的方法invokeAny()和 invokeAll()來取得第一個首先執行完任務的結果值,以及全部任務的結果值。