天天看點

接口ExecutorService的方法使用

1.ExecutorService接口繼承于Executor接口(線程池最頂層父類),抽象類AbstractExecutorService實作了ExecutorService接口,最常用的線程池類ThreadPoolExecutor就是繼承于AbstractExecutorService抽象類。

2.接口ExecutorService 中有很多工具方法,ExecutorService接口中的方法有些與 Future 和Callable有關,是以 Future與Callable 的使用是學習 ExecutorService 接口中方法的基礎。

接口ExecutorService的完整實作與繼承結構圖

接口ExecutorService的方法使用

ExecutorService接口的全部API

接口ExecutorService的方法使用

ExecutorService接口的invokeAny()和 invokeAll()

1.方法 invokeAny()和 invokeAll()具有阻塞特性。

2.方法 invokeAny()是取得第一個完成任務的結果值,當第一個任務執行完後,會調

interrupt()方法将其他任務中斷,是以在這些任務中可以結合

if (Thread.currentThread(). islnterrupted() ==true)代碼來決定其餘任務是否繼續運作。

3.方法 invokeAll () 等全部線程任務執行完畢後,取得全部完成任務的結果值。

invokeAny()方法

invokeAny()執行後有2種情況:

(1) 無 if (Thread.currentThread().islnterrupted ())代碼:已經獲得第1個運作的結果值後,其他線程繼續運作。

(2)有 if (Thread.currentThread().islnterrupted())代碼:已經獲得第1個運作的結果值後,其他線程如果使用 throw new InterruptedException()代碼則這些線程中斷,雖然 throw 抛出了異常,但在 main 線程中并不能捕獲異常。如果想捕獲異常,則需要在 Callable中使用 try-catch 顯式進行捕獲。

源碼

//傳回值不是List<Future<T>> 直接傳回了Callable的傳回值,不需要通過Future擷取
//參數是Collection,也就是集合,泛型是extends Callable的類
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
           

invokeAny()執行的任務中,存在部分任務抛異常。

invokeAny()執行的任務中,如果存在部分任務抛異常,會放棄該任務,從其他任務中選取最先傳回的值。

示例

public class ExecutorServiceDemo {
    //無異常任務
    public static class MyCallableA implements Callable {

        String name;
        long sleepTime;

        public MyCallableA(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            return name + "執行完畢!";
        }
    }

    //異常任務
    public static class MyCallableB implements Callable{

        String name;
        long sleepTime;

        public MyCallableB(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            Integer.parseInt("a");
            return name + "執行完畢!";
        }
    }

    public static void main(String[] args) {
        try {
            ExecutorService es = Executors.newCachedThreadPool();
            List list = new ArrayList<Callable>();
            //正常任務,不抛異常
            list.add(new MyCallableA("任務1",2000));
            //抛異常任務,該任務先執行完
            list.add(new MyCallableB("任務2",1000));
            //MyCallableB任務抛異常,隻能擷取MyCallableA的值
            Object result = es.invokeAny(list);
            System.out.println(result);
            System.out.println("結束!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

任務1執行完畢!
結束!
           

由實驗可知,MyCallableB抛異常之後,invokeAny把關注點放在了其他任務上,并不影響,main主線程也不受影響,正常執行,invokeAny有堵塞效果,在等待任務傳回值傳回時,後面代碼無法執行。

invokeAny()執行的任務中,存在全部任務抛異常。

當一個任務抛異常後,invokeAny會從其他任務中擷取最先傳回的值,如果再有任務抛異常,會再次從現在剩餘的任務中擷取最先傳回的值,直到所有的任務都抛異常,沒有其他任務,此時invokeAny會将任務的異常在主線程抛出。

public class ExecutorServiceDemo {
    //異常任務
    public static class MyCallableA implements Callable {

        String name;
        long sleepTime;

        public MyCallableA(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            Integer.parseInt(name);
            return name + "執行完畢!";
        }
    }

    //異常任務
    public static class MyCallableB implements Callable{

        String name;
        long sleepTime;

        public MyCallableB(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            Integer.parseInt(name);
            return name + "執行完畢!";
        }
    }

    public static void main(String[] args) {
        try {
            ExecutorService es = Executors.newCachedThreadPool();
            List list = new ArrayList<Callable>();
            //抛異常任務,該任務後執行完
            list.add(new MyCallableA("任務1",2000));
            //抛異常任務,該任務先執行完
            list.add(new MyCallableB("任務2",1000));
            //兩個任務都抛異常,擷取最後抛異常的任務的錯誤資訊
            Object result = es.invokeAny(list);
            System.out.println(result);
            System.out.println("結束!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

com.minxl.demo.multiThread.ExecutorServiceDemo
java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "任務1"
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:193)
	at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:215)
	at com.minxl.demo.multiThread.ExecutorServiceDemo.main(ExecutorServiceDemo.java:63)
Caused by: java.lang.NumberFormatException: For input string: "任務1"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at com.minxl.demo.multiThread.ExecutorServiceDemo$MyCallableA.call(ExecutorServiceDemo.java:29)
	at com.minxl.demo.multiThread.ExecutorServiceDemo$MyCallableA.call(ExecutorServiceDemo.java:15)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
           

由實驗可知,MyCallableB抛異常,invokeAny隻能擷取MyCallableA的值,後來MyCallableA也抛異常,最終将MyCallableA的異常在主線程抛出,中斷主線程的代碼執行。

方法 invokeAny(CollectionTasks,timeout,timeUnit)逾時的測試

方法<T> T invokeAny(Collection< ? extends Callable> tasks, long timeout, TimeUnit unit) 主要作用就是在指定時間内取得第1個先執行完的任務的結果值,取到之後中斷其他任務線程(需要配合if(Thread.currentThread().isInterrupted() == true)代碼)。如果沒有取到傳回值,逾時了,主線程會報異常TimeoutException,然後中斷所有任務線程(需要配合if(Thread.currentThread().isInterrupted() == true)代碼。

示例

public class ExecutorServiceDemo {
    //任務
    public static class MyCallableA implements Callable {

        String name;
        long sleepTime;

        public MyCallableA(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            while(1==1){
                if(Thread.currentThread().isInterrupted() == true){
                    System.out.println(name+"被中斷!");
                    break;
                }
            }
            return name + "執行完畢!";
        }
    }

    //任務
    public static class MyCallableB implements Callable{

        String name;
        long sleepTime;

        public MyCallableB(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            while(1==1){
                if(Thread.currentThread().isInterrupted() == true){
                    System.out.println(name+"被中斷!");
                    break;
                }
            }
            return name + "執行完畢!";
        }
    }

    public static void main(String[] args) {
        try {
            ExecutorService es = Executors.newCachedThreadPool();
            List list = new ArrayList<Callable>();
            //不會傳回值
            list.add(new MyCallableA("任務1",2000));
            //不會傳回值
            list.add(new MyCallableB("任務2",1000));
            //兩個任務都不會傳回值,main抛異常,invokeAny中斷所有任務
            Object result = es.invokeAny(list,2, TimeUnit.SECONDS);
            System.out.println(result);
            System.out.println("結束!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

任務2被中斷!
任務1被中斷!
java.util.concurrent.TimeoutException
	at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:184)
	at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:225)
	at com.minxl.demo.multiThread.ExecutorServiceDemo.main(ExecutorServiceDemo.java:72)
           

invokeAll()方法

1.方法 invokeAll()傳回所有任務的執行結果,并且此方法的效果是阻塞執行的

,要把所有的結果都取回再繼續向下運作。

2.invokeAll方法傳回值放在List<Future<T*>>中,list存放任務的順序與放入任務的順序一緻,等所有任務都結束才會傳回List<Future<T*>>

3.子任務(Callable任務)抛異常時,隻有調用Future.get()方法,才能抛出,進而被主線程捕獲。

源碼

//傳回值時list,裡面存放Future對象,傳回值還得從Future裡get()方法擷取。
//參數還是Collection集合,且是Callable實作類的集合
List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

           

方法 invokeAll(Collection tasks)中存在任務線程異常

public class ExecutorServiceDemo {
    //無異常任務
    public static class MyCallableA implements Callable {
        String name;
        long sleepTime;

        public MyCallableA(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            System.out.println(name+"執行完,準備傳回值");
            return name + "執行完畢!";
        }
    }

    //異常任務
    public static class MyCallableB implements Callable{
        String name;
        long sleepTime;

        public MyCallableB(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            Integer.parseInt(name);
            System.out.println(name+"執行完,準備傳回值");
            return name + "執行完畢!";
        }
    }

    public static void main(String[] args) {
        try {
            ExecutorService es = Executors.newCachedThreadPool();
            List list = new ArrayList<Callable>();
            //後執行完,無異常
            list.add(new MyCallableA("任務1",2000));
            //先執行完,抛異常
            list.add(new MyCallableB("任務2",1000));
            //執行任務
            List<Future> result = es.invokeAll(list);
            for(Future future : result){
                System.out.println(future.get());
            }
            System.out.println("結束!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

任務1執行完,準備傳回值
任務1執行完畢!
java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "任務2"
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.example.demo.ExecutorServiceDemo.main(ExecutorServiceDemo.java:58)
Caused by: java.lang.NumberFormatException: For input string: "任務2"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.base/java.lang.Integer.parseInt(Integer.java:652)
	at java.base/java.lang.Integer.parseInt(Integer.java:770)
	at com.example.demo.ExecutorServiceDemo$MyCallableB.call(ExecutorServiceDemo.java:41)
	at com.example.demo.ExecutorServiceDemo$MyCallableB.call(ExecutorServiceDemo.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
           

任務2先執行完,等待任務1執行完,按順序放入結果List<Future*>中,周遊擷取值時先正确擷取任務1傳回值,再擷取任務2傳回值抛異常,中斷main線程。

方法 invokeAll(Collection tasks,long timeout,TimeUnit unit)

方法 invokeAll(Collection tasks,long timeout,TimeUnit unit)的作用是如果任務在指定的時間内沒有完成,則出現異常:CancellationException。

1.如果前面的任務沒有執行完,在Future.get()時會抛異常CancellationException,代碼不在往下執行。

2.如果後面的任務沒有執行完,在Future.get()時前面規定時間内,傳回值的會正常擷取,後面沒執行完的會抛異常CancellationException,代碼不在往下執行。

示例:先快後慢

public class ExecutorServiceDemo {
    //任務
    public static class MyCallableA implements Callable {
        String name;
        long sleepTime;

        public MyCallableA(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            System.out.println(name+"執行完,準備傳回值");
            return name + "執行完畢!";
        }
    }

    //任務
    public static class MyCallableB implements Callable{
        String name;
        long sleepTime;

        public MyCallableB(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
           // Integer.parseInt(name);
            System.out.println(name+"執行完,準備傳回值");
            return name + "執行完畢!";
        }
    }

    public static void main(String[] args) {
        try {
            ExecutorService es = Executors.newCachedThreadPool();
            List list = new ArrayList<Callable>();
            //先執行完
            list.add(new MyCallableA("任務1",900));
            //規定時間沒執行完,被中斷
            list.add(new MyCallableB("任務2",1500));
            //執行任務
            List<Future> result = es.invokeAll(list,1,TimeUnit.SECONDS);
            for(Future future : result){
                System.out.println(future.get());
            }
            System.out.println("結束!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

任務1執行完,準備傳回值
任務1執行完畢!
java.util.concurrent.CancellationException
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.example.demo.ExecutorServiceDemo.main(ExecutorServiceDemo.java:58)
           

上面例子中,任務1在規定時間内執行完,傳回值,任務2沒執行完被中斷,主線程調用時抛異常CancellationException。

示例:先慢後快

public class ExecutorServiceDemo {
    //任務
    public static class MyCallableA implements Callable {
        String name;
        long sleepTime;

        public MyCallableA(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
            System.out.println(name+"執行完,準備傳回值");
            return name + "執行完畢!";
        }
    }

    //任務
    public static class MyCallableB implements Callable{
        String name;
        long sleepTime;

        public MyCallableB(String name,long sleepTime) {
            super();
            this.name = name;
            this.sleepTime = sleepTime ;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(sleepTime);
           // Integer.parseInt(name);
            System.out.println(name+"執行完,準備傳回值");
            return name + "執行完畢!";
        }
    }

    public static void main(String[] args) {
        try {
            ExecutorService es = Executors.newCachedThreadPool();
            List list = new ArrayList<Callable>();
            //後執行完,被中斷
            list.add(new MyCallableA("任務1",1500));
            //先執行完
            list.add(new MyCallableB("任務2",900));
            //執行任務
            List<Future> result = es.invokeAll(list,1,TimeUnit.SECONDS);
            for(Future future : result){
                System.out.println(future.get());
            }
            System.out.println("結束!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
任務2執行完,準備傳回值
java.util.concurrent.CancellationException
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.example.demo.ExecutorServiceDemo.main(ExecutorServiceDemo.java:58)
           

任務2先執行完,任務1規定時間沒執行完,但是Future.get()先擷取任務1的值,則直接抛異常CancellationException,後面代碼無法執行,任務2的值沒傳回。

總結

接口 ExecutorService 中的方法都以便攜的方式去建立線程池,使用兩個主要的方法invokeAny()和 invokeAll()來取得第一個首先執行完任務的結果值,以及全部任務的結果值。