天天看點

Java并發計算判斷線程池中的線程是否全部執行完畢

目錄

  • ​​1 java多線程的使用​​
  • ​​2 shutdown和shutdownNow差別源碼解析​​
  • ​​2.1 shutdown案例​​
  • ​​2.2 shutdownNow案例​​
  • ​​2.3 源碼解讀​​
  • ​​2.4 shutdown和shutdownNow總結​​
  • ​​3 判斷線程池中的線程是否全部執行完畢​​
  • ​​3.1 方式一:調用ExecutorService 中的isTerminated方法​​
  • ​​3.2 方式二:調用ExecutorService 中的awaitTermination()方法,等待子線程結束​​
  • ​​3.3 方式三:閉鎖CountDownLatch​​

1 java多線程的使用

2 shutdown和shutdownNow差別源碼解析

1、這兩個都是關閉​​線程池​​的方法,但是大家可能對其作用和原理不是很清楚,不知道線程池是否真的關閉了,或者又重新送出了任務會怎樣?下面我先通過一些案列帶大家看一下他們之間的一些差別

2.1 shutdown案例

@Test
    public void shutdownTest(){

        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2,
                1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));

        Runnable runnable = () -> {
            try {
                Thread.sleep(1000);
                System.out.println("線程:" + Thread.currentThread().getName() + "正在執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        pool.submit(runnable);
        pool.submit(runnable);
        pool.submit(runnable);
        pool.shutdown();
        System.out.println("main線程已經執行到這了");
        System.out.println("線程數:"+pool.getActiveCount()+",隊列裡的任務:"+pool.getQueue().size());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }      
Java并發計算判斷線程池中的線程是否全部執行完畢

可以看出線程池雖然調用了shutdown方法,這個方法會将線程池的狀态改為SHUTDOWN狀态,不接受新的任務,但是會繼續處理沒有處理完的任務和阻塞隊列中的任務,如果有新的任務過來會抛RejectedExecutionException異常。

下面是我又送出了一個任務,然後不出所料的報RejectedExecutionException異常

Java并發計算判斷線程池中的線程是否全部執行完畢

2.2 shutdownNow案例

可能有人看方法名字就很快地回答出了,立刻關閉線程池,正在執行的任務和阻塞隊列中沒有執行的任務都不會執行了,那你就想錯了,實際上并不是這樣,下面看一個案例否定這種說法。

@Test
    public void shutdownNowTest1(){

        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2,
                1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));


        Runnable runnable = () -> {
            Integer sum = 0;
            for(Integer i=0;i<10000;i++){
                sum += i;
            }
            System.out.println("sum:"+sum);
        };
        pool.submit(runnable);
        pool.submit(runnable);
        pool.shutdownNow();
        System.out.println("main已經走到這裡了");

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }      
Java并發計算判斷線程池中的線程是否全部執行完畢

看到了吧這裡main方法都列印了,說明已經已經執行完shutdownNow()方法了,但是為什麼sum還能計算并且能輸出呢,這就能說明正在執行的線程不是立即關閉。

其實shutdownNow()的方法底層循環周遊了工作線程的interrupt()方法,這個方法會Just to set the interrupt flag,然後線程遇到阻塞方法(sleep,join,wait等)就會抛出InterruptedException()異常,案例如下

@Test
    public void shutdownNowTest1(){

        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2,
                1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));


        Runnable runnable = () -> {
            try {
                Thread.sleep(1000);
                System.out.println("111");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(222);
        };
        pool.submit(runnable);
        pool.submit(runnable);
        pool.shutdownNow();
        System.out.println("main已經走到這裡了");

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }      
Java并發計算判斷線程池中的線程是否全部執行完畢

2.3 源碼解讀

下面來看一下他們的源碼,有助于大家進一步記憶。

*shutdown源碼*

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //加鎖,同一時刻隻能由一個線程執行線程池的shutdown()方法
        mainLock.lock();
        try {
            //檢查調用方是否有權限關閉線程池和中斷工作線程
            checkShutdownAccess();
            //将線程池的狀态改為SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中斷所有空閑線程
            interruptIdleWorkers();
            //取消延時任務 ScheduledThreadPoolExecutor
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            //釋放鎖
            mainLock.unlock();
        }
        //嘗試将線程池置為TERMINATED
        tryTerminate();
    }

    //通過 CAS 自旋操作将線程池運作狀态設定為目标值,如果已經大于等于目标值,則不作任何操作
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            //如果目前運作狀态 >= targetState,則直接break,不作處理
            //如果如果目前運作狀态 < targetState,則通過CAS操作将運作狀态設定為目标值,成功的話會傳回true,失敗則傳回false
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    //如果為true,則隻中斷一個空閑線程,如果為false,則中斷所有空閑線程
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //線程沒有中斷,并且線程想以獨占的方式擷取鎖,擷取到傳回true,表示線程處于空閑狀态,沒擷取到傳回false,表示線程處于工作狀态
                //tryLock()底層調用了tryAcquire(),再底層就是AQS了
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    //AQS源碼
    protected boolean tryAcquire(int unused) {
        //先通過CAS嘗試将同步狀态(AQS的state屬性)從0修改為1。若直接修改成功了,則将占用鎖的線程設定為目前線程
        if (compareAndSetState(0, 1)) {
             //用來儲存目前占用同步狀态的線程。
             setExclusiveOwnerThread(Thread.currentThread());
             return true;
        }
        return false;
    }

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //循環關閉正在工作的線程
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //将線程池狀态置為TIDYING狀态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }


*shutdownNow源碼(上面重複的代碼不再贅述)*
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //将線程池狀态置為stop
            advanceRunState(STOP);
            //中斷所有線程,包括正在運作的線程
            interruptWorkers();
            //将未執行的任務移入清單中
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //中斷所有線程(interrupt flag = true)
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    //中斷線程,包括正在執行的線程(interrupt flag = true)
    void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        //調用BlockingQueue的drainTo()方法轉移元素
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }      

上面可以看出線程池調用shutdownNow()方法之後有三個關鍵步驟

  1. 會将線程池置于stop狀态
  2. 然後會周遊正在執行的任務,調用正在執行線程的interrupt()方法
  3. 最後會将阻塞隊列中未執行的任務放到task中傳回。

通過源碼可以看出shutdownNow()方法是有傳回值的,傳回的内容就是阻塞隊列為執行的任務。而且shutdownNow()是不能直接停止目前工作線程(僅僅調用的interrupt()而不是stop()),若想調用shutdownNow()停止工作中的線程,隻有判斷判斷線程的狀态(flag=Thread.currentThread().isInterrupted())看線程是否關閉,或者調用能中斷線程的方法(join,sleep,wait等)。但是不推薦關停正在執行的任務,因為線程的stop()方法都棄用了,官方解釋說stop()會釋放這個線程所持有的所有的鎖對象這會導緻對象處于不一緻狀态。

比如:可能會導緻從一個賬戶向另一個賬戶轉賬的過程中被終止,錢已經轉出,但沒有輸入目标賬戶。

2.4 shutdown和shutdownNow總結

shutdown():

1、同一時刻隻能有一個線程能修改線程池的狀态

2、檢查調用方是否有權限關閉線程池和中斷工作線程

3、通過CAS将線程池狀态置為SHUTDOWN

4、通過AQS關閉所有空閑的線程

5、取消延時隊列中的任務

6、嘗試将線程池狀态置為Terminated

shutdownNow():

1、同一時刻隻能有一個線程能修改線程池的狀态

2、檢查調用方是否有權限關閉線程池和中斷工作線程

3、通過CAS将線程池狀态置為STOP

4、周遊所有正在執行的工作線程,調用其interrupt()方法,修改其中斷狀态辨別為true

5、傳回阻塞隊列中未執行的任務給調用方

6、嘗試将線程池狀态置為Terminated

shutdown()和shutdownNow()方法都可以關閉線程池,但是工作中為了代碼看起來比較優雅一般我們用shutdown()和awaitTermination()結合使用來關閉線程池,如果情況緊急,那麼就用shutdownNow()來關閉線程池。

pool.shutdown();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    @Test
    public void shutdownTest(){

        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2,
                1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));

        Runnable runnable = () -> {
            try {
                Thread.sleep(1000);
                System.out.println("線程:" + Thread.currentThread().getName() + "正在執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        TimeInterval timer = DateUtil.timer();
        pool.submit(runnable);
        pool.submit(runnable);
        pool.submit(runnable);
        pool.shutdown();

        System.out.println("main線程已經執行到這了");
        System.out.println("線程數:"+pool.getActiveCount()+",隊列裡的任務:"+pool.getQueue().size());
        try {
            pool.awaitTermination(5,TimeUnit.SECONDS);
            System.out.println(timer.intervalRestart());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }      
Java并發計算判斷線程池中的線程是否全部執行完畢

3 判斷線程池中的線程是否全部執行完畢

在使用多線程中我們會使用 java.util.concurrent.Executors的線程池,當多個子線程異步執行的時候,調用ExecutorService.shutdown方法,線程池不再接收任何新的任務,但此時線程池并不會立刻退出,直到添加到線程池中的任務都已經執行處理完成,才會退出,我們如何判斷線程池中所有的子線程都已經執行完畢,然後繼續執行後續操作。

3.1 方式一:調用ExecutorService 中的isTerminated方法

調用ExecutorService 中的isTerminated方法。

在調用shutdown方法後我們可以在一個死循環裡面用isTerminated方法判斷是否線程池中的所有線程已經執行完畢

Java并發計算判斷線程池中的線程是否全部執行完畢

3.2 方式二:調用ExecutorService 中的awaitTermination()方法,等待子線程結束

調用ExecutorService 中的awaitTermination()方法,等待子線程結束

Java并發計算判斷線程池中的線程是否全部執行完畢

3.3 方式三:閉鎖CountDownLatch

閉鎖CountDownLatch