天天看點

高并發之——從源碼角度深度解析線程池是如何實作優雅退出的

在【高并發專題】中,我們從源碼角度深度分析了線程池中那些重要的接口和抽象類、深度解析了線程池是如何建立的,ThreadPoolExecutor類有哪些屬性和内部類,以及它們對線程池的重要作用。深度分析了線程池的整體核心流程,以及如何拆解Worker線程的執行代碼,深度解析Worker線程的執行流程。

注意:以上内容大家可以在【高并發專題】中進行查閱。

高并發之——不得不說的線程池與ThreadPoolExecutor類淺析 高并發之——P8級别架構師帶你深度解析線程池中那些重要的頂層接口和抽象類 高并發之——建立線程池居然有這麼多方式... 高并發之——ThreadPoolExecutor類居然是這樣保證線程池正确運作的... 高并發之——通過ThreadPoolExecutor類的源碼深度解析線程池執行任務的核心流程 高并發之——通過源碼深度分析線程池中Worker線程的執行流程 本文,我們就來從源碼角度深度解析線程池是如何優雅的退出程式的。首先,我們來看下ThreadPoolExecutor類中的shutdown()方法。

shutdown()方法

當使用線程池的時候,調用了shutdown()方法後,線程池就不會再接受新的執行任務了。但是在調用shutdown()方法之前放入任務隊列中的任務還是要執行的。此方法是非阻塞方法,調用後會立即傳回,并不會等待任務隊列中的任務全部執行完畢後再傳回。我們看下shutdown()方法的源代碼,如下所示。

public void shutdown() {
    //擷取線程池的全局鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //檢查是否有關閉線程池的權限
        checkShutdownAccess();
        //将目前線程池的狀态設定為SHUTDOWN
        advanceRunState(SHUTDOWN);
        //中斷Worker線程
        interruptIdleWorkers();
        //為ScheduledThreadPoolExecutor調用鈎子函數
        onShutdown(); // hook for 
    } finally {
        //釋放線程池的全局鎖
        mainLock.unlock();
    }
    //嘗試将狀态變為TERMINATED
    tryTerminate();
}      

總體來說,shutdown()方法的代碼比較簡單,首先檢查了是否有權限來關閉線程池,如果有權限,則再次檢測是否有中斷工作線程的權限,如果沒有權限,則會抛出SecurityException異常,代碼如下所示。

//檢查是否有關閉線程池的權限
checkShutdownAccess();
//将目前線程池的狀态設定為SHUTDOWN
advanceRunState(SHUTDOWN);
//中斷Worker線程
interruptIdleWorkers();      

其中,checkShutdownAccess()方法的實作代碼如下所示。

private void checkShutdownAccess() {
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkPermission(shutdownPerm);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                security.checkAccess(w.thread);
        } finally {
            mainLock.unlock();
        }
    }
}      

對于checkShutdownAccess()方法的代碼了解起來比較簡單,就是檢測是否具有關閉線程池的權限,期間使用了線程池的全局鎖。

接下來,我們看advanceRunState(int)方法的源代碼,如下所示。

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}      

advanceRunState(int)方法的整體邏輯就是:判斷目前線程池的狀态是否為指定的狀态,在shutdown()方法中傳遞的狀态是SHUTDOWN,如果是SHUTDOWN,則直接傳回;如果不是SHUTDOWN,則将目前線程池的狀态設定為SHUTDOWN。

接下來,我們看看showdown()方法調用的interruptIdleWorkers()方法,如下所示。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}      

可以看到,interruptIdleWorkers()方法調用的是interruptIdleWorkers(boolean)方法,繼續看interruptIdleWorkers(boolean)方法的源代碼,如下所示。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}      

上述代碼的總體邏輯為:擷取線程池的全局鎖,循環所有的工作線程,檢測線程是否被中斷,如果沒有被中斷,并且Worker線程獲得了鎖,則執行線程的中斷方法,并釋放線程擷取到的鎖。此時如果onlyOne參數為true,則退出循環。否則,循環所有的工作線程,執行相同的操作。最終,釋放線程池的全局鎖。

接下來,我們看下shutdownNow()方法。

shutdownNow()方法

如果調用了線程池的shutdownNow()方法,則線程池不會再接受新的執行任務,也會将任務隊列中存在的任務丢棄,正在執行的Worker線程也會被立即中斷,同時,方法會立刻傳回,此方法存在一個傳回值,也就是目前任務隊列中被丢棄的任務清單。

shutdownNow()方法的源代碼如下所示。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //檢查是否有關閉權限
        checkShutdownAccess();
        //設定線程池的狀态為STOP
        advanceRunState(STOP);
        //中斷所有的Worker線程
        interruptWorkers();
        //将任務隊列中的任務移動到tasks集合中
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    /嘗試将狀态變為TERMINATED
    tryTerminate();
    //傳回tasks集合
    return tasks;
}      

shutdownNow()方法的源代碼的總體邏輯與shutdown()方法基本相同,隻是shutdownNow()方法将線程池的狀态設定為STOP,中斷所有的Worker線程,并且将任務隊列中的所有任務移動到tasks集合中并傳回。

可以看到,shutdownNow()方法中斷所有的線程時,調用了interruptWorkers()方法,接下來,我們就看下interruptWorkers()方法的源代碼,如下所示。

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}      

interruptWorkers()方法的邏輯比較簡單,就是獲得線程池的全局鎖,循環所有的工作線程,依次中斷線程,最後釋放線程池的全局鎖。

在interruptWorkers()方法的内部,實際上調用的是Worker類的interruptIfStarted()方法來中斷線程,我們看下Worker類的interruptIfStarted()方法的源代碼,如下所示。

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}      

發現其本質上調用的還是Thread類的interrupt()方法來中斷線程。

awaitTermination(long, TimeUnit)方法

當線程池調用了awaitTermination(long, TimeUnit)方法後,會阻塞調用者所在的線程,直到線程池的狀态修改為TERMINATED才傳回,或者達到了逾時時間傳回。接下來,我們看下awaitTermination(long, TimeUnit)方法的源代碼,如下所示。

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    //擷取距離逾時時間剩餘的時長
    long nanos = unit.toNanos(timeout);
    //擷取Worker線程的的全局鎖
    final ReentrantLock mainLock = this.mainLock;
    //加鎖
    mainLock.lock();
    try {
        for (;;) {
            //目前線程池狀态為TERMINATED狀态,會傳回true
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            //達到逾時時間,已逾時,則傳回false
            if (nanos <= 0)
                return false;
            //重置距離逾時時間的剩餘時長
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        //釋放鎖
        mainLock.unlock();
    }
}      

上述代碼的總體邏輯為:首先擷取Worker線程的獨占鎖,後在循環判斷目前線程池是否已經是TERMINATED狀态,如果是則直接傳回true,否則檢測是否已經逾時,如果已經逾時,則傳回false。如果未逾時,則重置距離逾時時間的剩餘時長。接下來,進入下一輪循環,再次檢測目前線程池是否已經是TERMINATED狀态,如果是則直接傳回true,否則檢測是否已經逾時,如果已經逾時,則傳回false。如果未逾時,則重置距離逾時時間的剩餘時長。以此循環,直到線程池的狀态變為TERMINATED或者已經逾時。