天天看點

【高并發】通過ThreadPoolExecutor類的源碼深度解析線程池執行任務的核心流程

大家好,我是冰河~~

ThreadPoolExecutor是Java線程池中最核心的類之一,它能夠保證線程池按照正常的業務邏輯執行任務,并通過原子方式更新線程池每個階段的狀态。

ThreadPoolExecutor類中存在一個workers工作線程集合,使用者可以向線程池中添加需要執行的任務,workers集合中的工作線程可以直接執行任務,或者從任務隊列中擷取任務後執行。ThreadPoolExecutor類中提供了整個線程池從建立到執行任務,再到消亡的整個流程方法。本文,就結合ThreadPoolExecutor類的源碼深度分析線程池執行任務的整體流程。

在ThreadPoolExecutor類中,線程池的邏輯主要展現在execute(Runnable)方法,addWorker(Runnable, boolean)方法,addWorkerFailed(Worker)方法和拒絕政策上,接下來,我們就深入分析這幾個核心方法。

execute(Runnable)方法

execute(Runnable)方法的作用是送出Runnable類型的任務到線程池中。我們先看下execute(Runnable)方法的源碼,如下所示。

public void execute(Runnable command) {
    //如果送出的任務為空,則抛出空指針異常
    if (command == null)
        throw new NullPointerException();
    //擷取線程池的狀态和線程池中線程的數量
    int c = ctl.get();
    //線程池中的線程數量小于corePoolSize的值
    if (workerCountOf(c) < corePoolSize) {
        //重新開啟線程執行任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果線程池處于RUNNING狀态,則将任務添加到阻塞隊列中
    if (isRunning(c) && workQueue.offer(command)) {
        //再次擷取線程池的狀态和線程池中線程的數量,用于二次檢查
        int recheck = ctl.get();
        //如果線程池沒有未處于RUNNING狀态,從隊列中删除任務
        if (! isRunning(recheck) && remove(command))
            //執行拒絕政策
            reject(command);
        //如果線程池為空,則向線程池中添加一個線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //任務隊列已滿,則新增worker線程,如果新增線程失敗,則執行拒絕政策
    else if (!addWorker(command, false))
        reject(command);
}           

整個任務的執行流程,我們可以簡化成下圖所示。

【高并發】通過ThreadPoolExecutor類的源碼深度解析線程池執行任務的核心流程

接下來,我們拆解execute(Runnable)方法,具體分析execute(Runnable)方法的執行邏輯。

(1)線程池中的線程數是否小于corePoolSize核心線程數,如果小于corePoolSize核心線程數,則向workers工作線程集合中添加一個核心線程執行任務。代碼如下所示。

//線程池中的線程數量小于corePoolSize的值
if (workerCountOf(c) < corePoolSize) {
    //重新開啟線程執行任務
    if (addWorker(command, true))
        return;
    c = ctl.get();
}           

(2)如果線程池中的線程數量大于corePoolSize核心線程數,則判斷目前線程池是否處于RUNNING狀态,如果處于RUNNING狀态,則添加任務到待執行的任務隊列中。注意:這裡向任務隊列添加任務時,需要判斷線程池是否處于RUNNING狀态,隻有線程池處于RUNNING狀态時,才能向任務隊列添加新任務。否則,會執行拒絕政策。代碼如下所示。

if (isRunning(c) && workQueue.offer(command))            

(3)向任務隊列中添加任務成功,由于其他線程可能會修改線程池的狀态,是以這裡需要對線程池進行二次檢查,如果目前線程池的狀态不再是RUNNING狀态,則需要将添加的任務從任務隊列中移除,執行後續的拒絕政策。如果目前線程池仍然處于RUNNING狀态,則判斷線程池是否為空,如果線程池中不存在任何線程,則建立一個線程添加到線程池中,如下所示。

//再次擷取線程池的狀态和線程池中線程的數量,用于二次檢查
int recheck = ctl.get();
//如果線程池沒有未處于RUNNING狀态,從隊列中删除任務
if (! isRunning(recheck) && remove(command))
    //執行拒絕政策
    reject(command);
//如果線程池為空,則向線程池中添加一個線程
else if (workerCountOf(recheck) == 0)
    addWorker(null, false);           

(4)如果在步驟(3)中向任務隊列中添加任務失敗,則嘗試開啟新的線程執行任務。此時,如果線程池中的線程數量已經大于線程池中的最大線程數maximumPoolSize,則不能再啟動新線程。此時,表示線程池中的任務隊列已滿,并且線程池中的線程已滿,需要執行拒絕政策,代碼如下所示。

//任務隊列已滿,則新增worker線程,如果新增線程失敗,則執行拒絕政策
else if (!addWorker(command, false))
    reject(command);           

這裡,我們将execute(Runnable)方法拆解,結合流程圖來了解線程池中任務的執行流程就比較簡單了。可以這麼說,execute(Runnable)方法的邏輯基本上就是一般線程池的執行邏輯,了解了execute(Runnable)方法,就基本了解了線程池的執行邏輯。

注意:有關ScheduledThreadPoolExecutor類和ForkJoinPool類執行線程池的邏輯,在【高并發專題】系列文章中的後文中會詳細說明,了解了這些類的執行邏輯,就基本全面掌握了線程池的執行流程。

在分析execute(Runnable)方法的源碼時,我們發現execute(Runnable)方法中多處調用了addWorker(Runnable, boolean)方法,接下來,我們就一起分析下addWorker(Runnable, boolean)方法的邏輯。

addWorker(Runnable, boolean)方法

總體上,addWorker(Runnable, boolean)方法可以分為三部分,第一部分是使用CAS安全的向線程池中添加工作線程;第二部分是建立新的工作線程;第三部分則是将任務通過安全的并發方式添加到workers中,并啟動工作線程執行任務。

接下來,我們看下addWorker(Runnable, boolean)方法的源碼,如下所示。

private boolean addWorker(Runnable firstTask, boolean core) {
    //标記重試的辨別
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 檢查隊列是否在某些特定的條件下為空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //下面循環的主要作用為通過CAS方式增加線程的個數
        for (;;) {
            //擷取線程池中的線程數量
            int wc = workerCountOf(c);
            //如果線程池中的線程數量超出限制,直接傳回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //通過CAS方式向線程池新增線程數量
            if (compareAndIncrementWorkerCount(c))
                //通過CAS方式保證隻有一個線程執行成功,跳出最外層循環
                break retry;
            //重新擷取ctl的值
            c = ctl.get();  
            //如果CAS操作失敗了,則需要在内循環中重新嘗試通過CAS新增線程數量
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    
    //跳出最外層for循環,說明通過CAS新增線程數量成功
    //此時建立新的工作線程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将執行的任務封裝成worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //獨占鎖,保證操作workers時的同步
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //此處需要重新檢查線程池狀态
                //原因是在獲得鎖之前可能其他的線程改變了線程池的狀态
                int rs = runStateOf(ctl.get());
                
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //向worker中添加新任務
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //将是否添加了新任務的辨別設定為true
                    workerAdded = true;
                }
            } finally {
                //釋放獨占鎖
                mainLock.unlock();
            }
            //添加新任成功,則啟動線程執行任務
            if (workerAdded) {
                t.start();
                //将任務是否已經啟動的辨別設定為true
                workerStarted = true;
            }
        }
    } finally {
        //如果任務未啟動或啟動失敗,則調用addWorkerFailed(Worker)方法
        if (! workerStarted)
            addWorkerFailed(w);
    }
    //傳回是否啟動任務的辨別
    return workerStarted;
}           

乍一看,addWorker(Runnable, boolean)方法還蠻長的,這裡,我們還是将addWorker(Runnable, boolean)方法進行拆解。

(1)檢查任務隊列是否在某些特定的條件下為空,代碼如下所示。

// 檢查隊列是否在某些特定的條件下為空
if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
    return false;           

(2)在通過步驟(1)的校驗後,則進入内層for循環,在内層for循環中通過CAS來增加線程池中的線程數量,如果CAS操作成功,則直接退出雙重for循環。如果CAS操作失敗,則檢視目前線程池的狀态是否發生了變化,如果線程池的狀态發生了變化,則通過continue關鍵字重新通過外層for循環校驗任務隊列,檢驗通過再次執行内層for循環的CAS操作。如果線程池的狀态沒有發生變化,此時上一次CAS操作失敗了,則繼續嘗試CAS操作。代碼如下所示。

for (;;) {
    //擷取線程池中的線程數量
    int wc = workerCountOf(c);
    //如果線程池中的線程數量超出限制,直接傳回false
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    //通過CAS方式向線程池新增線程數量
    if (compareAndIncrementWorkerCount(c))
        //通過CAS方式保證隻有一個線程執行成功,跳出最外層循環
        break retry;
    //重新擷取ctl的值
    c = ctl.get();  
    //如果CAS操作失敗了,則需要在内循環中重新嘗試通過CAS新增線程數量
    if (runStateOf(c) != rs)
        continue retry;
}           

(3)CAS操作成功後,表示向線程池中成功添加了工作線程,此時,還沒有線程去執行任務。使用全局的獨占鎖mainLock來将新增的工作線程Worker對象安全的添加到workers中。

總體邏輯就是:建立新的Worker對象,并擷取Worker對象中的執行線程,如果線程不為空,則擷取獨占鎖,擷取鎖成功後,再次檢查線線程的狀态,這是避免在擷取獨占鎖之前其他線程修改了線程池的狀态,或者關閉了線程池。如果線程池關閉,則需要釋放鎖。否則将新增加的線程添加到工作集合中,釋放鎖并啟動線程執行任務。将是否啟動線程的辨別設定為true。最後,判斷線程是否啟動,如果沒有啟動,則調用addWorkerFailed(Worker)方法。最終傳回線程是否起送的辨別。

//跳出最外層for循環,說明通過CAS新增線程數量成功
//此時建立新的工作線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    //将執行的任務封裝成worker
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        //獨占鎖,保證操作workers時的同步
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //此處需要重新檢查線程池狀态
            //原因是在獲得鎖之前可能其他的線程改變了線程池的狀态
            int rs = runStateOf(ctl.get());
            
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive())
                    throw new IllegalThreadStateException();
                //向worker中添加新任務
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //将是否添加了新任務的辨別設定為true
                workerAdded = true;
            }
        } finally {
            //釋放獨占鎖
            mainLock.unlock();
        }
        //添加新任成功,則啟動線程執行任務
        if (workerAdded) {
            t.start();
            //将任務是否已經啟動的辨別設定為true
            workerStarted = true;
        }
    }
} finally {
    //如果任務未啟動或啟動失敗,則調用addWorkerFailed(Worker)方法
    if (! workerStarted)
        addWorkerFailed(w);
}
//傳回是否啟動任務的辨別
return workerStarted;           

addWorkerFailed(Worker)方法

在addWorker(Runnable, boolean)方法中,如果添加工作線程失敗或者工作線程啟動失敗時,則會調用addWorkerFailed(Worker)方法,下面我們就來看看addWorkerFailed(Worker)方法的實作,如下所示。

private void addWorkerFailed(Worker w) {
    //擷取獨占鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //如果Worker任務不為空
        if (w != null)
            //将任務從workers集合中移除
            workers.remove(w);
        //通過CAS将任務數量減1
        decrementWorkerCount();
        tryTerminate();
    } finally {
        //釋放鎖
        mainLock.unlock();
    }
}           

addWorkerFailed(Worker)方法的邏輯就比較簡單了,擷取獨占鎖,将任務從workers中移除,并且通過CAS将任務的數量減1,最後釋放鎖。

拒絕政策

我們在分析execute(Runnable)方法時,線程池會在适當的時候調用reject(Runnable)方法來執行相應的拒絕政策,我們看下reject(Runnable)方法的實作,如下所示。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}           

通過代碼,我們發現調用的是handler的rejectedExecution方法,handler又是個什麼鬼,我們繼續跟進代碼,如下所示。

private volatile RejectedExecutionHandler handler;           

再看看RejectedExecutionHandler是個啥類型,如下所示。

package java.util.concurrent;

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}           

可以發現RejectedExecutionHandler是個接口,定義了一個rejectedExecution(Runnable, ThreadPoolExecutor)方法。既然RejectedExecutionHandler是個接口,那我們就看看有哪些類實作了RejectedExecutionHandler接口。

【高并發】通過ThreadPoolExecutor類的源碼深度解析線程池執行任務的核心流程

看到這裡,我們發現RejectedExecutionHandler接口的實作類正是線程池預設提供的四種拒絕政策的實作類。

至于reject(Runnable)方法中具體會執行哪個類的拒絕政策,是根據建立線程池時傳遞的參數決定的。如果沒有傳遞拒絕政策,則預設會執行AbortPolicy類的拒絕政策。否則會執行傳遞的類的拒絕政策。

在建立線程池時,除了能夠傳遞JDK預設提供的拒絕政策外,還可以傳遞自定義的拒絕政策。如果想使用自定義的拒絕政策,則隻需要實作RejectedExecutionHandler接口,并重寫rejectedExecution(Runnable, ThreadPoolExecutor)方法即可。例如,下面的代碼。

public class CustomPolicy implements RejectedExecutionHandler {

    public CustomPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            System.out.println("使用調用者所在的線程來執行任務")
            r.run();
        }
    }
}           

使用如下方式建立線程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
               new CustomPolicy());           

至此,線程池執行任務的整體核心邏輯分析結束。

好了,今天就到這兒吧,我是冰河,我們下期見~~