天天看點

ThreadPoolExecutor核心實作原理和源碼解析<二>

Worker類實作了Runnable,同時也擴充了AbstractQueuedSynchronizer,說明Worker本身不隻是一個可執行的任務,還可以實作鎖的功能。其主要作用是執行隊列中的任務,并負責管理工作線程和維護一些統計名額,如已完成的任務數量等等;同時Worker通過擴充AbstractQueuedSynchronizer來簡化任務執行時擷取鎖與釋放鎖的操作。Worker中的鎖主要是為了防止中斷在運作任務中的工作線程,中斷僅用于喚醒在等待從workQueue中擷取任務的線程。

如何防止被中斷?

worker實作了一個簡單的不可重入互斥鎖,工作線程執行任務時,首先會進行加鎖,如果主線程想要中斷目前工作線程,需要先擷取鎖,否則無法中斷。當工作線程執行完任務則會釋放鎖,并調用getTask從workQueue擷取任務繼續執行。由此可知,隻有在等待從workQueue中擷取任務(調用getTask期間)時才能中斷。工作線程接收到中斷資訊,并不會立即就會停止,而是會檢查workQueue是否為空,不為空則還是會繼續擷取任務執行,隻有隊列為空才會被停止。是以中斷是為了停止空閑線程,也就是那些從任務隊列擷取任務被阻塞(任務隊列為空)的線程。後續會詳細分析整個過程。

為什麼Worker被設計為不可重入?

這就需要知道那些操作可能會發生中斷工作線程的操作。目前主要有以下幾個:

setCorePoolSize();

setMaximumPoolSize();

setKeppAliveTime();

allowCoreThreadTimeOut();

shutdown();

tryTerminate();

如果鎖可以重入,調用諸如setCorePoolSize等線程池控制方法時可以再次擷取鎖,那麼可能會導緻調用線程池控制方法期間中斷正在運作的工作線程。jdk不希望在調用像setCorePoolSize這樣的池控制方法時重新擷取鎖。

Worker源碼如下:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * 該類實際絕不會被序列化,提供serialVersionUID主要為了屏蔽javac warning
         */
        private static final long serialVersionUID = ;

        /** 運作在Worker對象中的線程 */
        final Thread thread;
        /** 要運作的初始任務,可能為null */
        Runnable firstTask;
        /** 每個線程的任務計數器,使用volatile保證可見性 */
        volatile long completedTasks;

        /**
         * 使用指定的初始任務和ThreadFactory中的線程對象建立一個Worker
         */
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 将主運作循環委托給外部的runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() == ;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(, )) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState();
            return true;
        }

        public void lock()        { acquire(); }
        public boolean tryLock()  { return tryAcquire(); }
        public void unlock()      { release(); }
        public boolean isLocked() { return isHeldExclusively(); }
    }                

核心函數 runWorker

 runWorker流程圖:

ThreadPoolExecutor核心實作原理和源碼解析<二>

runWorker會不斷從工作隊清單中取任務并執行;同時runWorker也會管理線程的中斷狀态,源碼如下:

final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        boolean completedAbruptly = true;//是否“突然完成”,非正常完成
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                try {
                    beforeExecute(w.thread, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
           

主要步驟:

1 從初始任務開始執行,如果firstTask 為null,隻要線程池在運作,調用getTask從隊列中取任務來執行。如果getTask傳回null,則worker可能由于線程池狀态調整或參數動态調整導緻退出。若外部代碼中抛出異常導緻worker退出,completedAbruptly将為true,則在processWorkerExit将建立新的worker替代。

2 執行任務前,對worker加鎖,已防止在任務運作時,線程池中其他操作中斷目前worker。調用clearInterruptsForTaskRun管理線程中斷狀态,首先看看源碼:

private void clearInterruptsForTaskRun() {
        if (runStateLessThan(ctl.get(), STOP) &&
            Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))
            Thread.currentThread().interrupt();
    }
           

這個方法調用非常重要,當線程池狀态小于STOP,調用Thread.interrupted(),如果getTask期間設定了worker的中斷狀态,則傳回true,同時Thread.interrupted()将清除中斷狀态,即再次調用将傳回false;再次檢查線程池狀态,如果狀态大于或等于STOP,則需要調用Thread.currentThread().interrupt()恢複線程的中斷狀态。是以,該方法有兩個作用:

<一>:當線程池仍然在運作時,若其他操作中斷了worker,則該操作将清除中斷狀态

<二>:清除中斷狀态後,再次檢查線程池狀态,如果狀态大于或等于STOP,此時需要恢複線程的中斷狀态,這樣在下次調用getTask将傳回null,worker将正常退出。

3 每個任務執行前,調用beforeExecute,beforeExecute可能抛出異常,該情況下抛出的異常會導緻任務未執行worker就死亡,沒有使用catch處理,會向上抛跳出循環,且completedAbruptly==true。

4 beforeExecute正常完成則開始運作任務,并收集其抛出的任何異常以發送到afterExecute,這裡将分别處理分别處理RuntimeException,Error和任意Throwables,由于不能在Runnable.run内重新抛出Throwables,是以将Throwable包裝為Error(到線程的UncaughtExceptionHandler中處理)向上抛。任何向上抛的異常都将導緻線程死亡,completedAbruptly仍然為true。

5 任務執行完成後,調用afterExecute,該方法同樣可能抛出異常,并導緻線程死亡。

擷取任務

runWorker運作期間,将不斷調用getTask()從任務隊列中取任務來執行。

getTask方法流程圖如下:

ThreadPoolExecutor核心實作原理和源碼解析&lt;二&gt;

源碼如下:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        /**
         * 外層循環
         * 用于檢查線程池狀态和工作隊列是否為空
         */
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 調用了shutdownNow()或調用了shutdown()且workQueue為空,傳回true
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?
            /**
              * 内層循環
              * 用于檢測工作線程數量和擷取task的逾時狀态
              */
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
           

任務隊列為空時,getTask()會根據目前線程池的配置執行阻塞或定時等待任務,當發生以下條件時,将傳回null:

1 工作線程的數量超過maximumPoolSize

2 線程池已經停止

3 線程池調用了shutdown且任務隊列為空

4 工作線程等待一個任務逾時,且allowCoreThreadTimeOut || workerCount > corePoolSize傳回true。

工作線程退出

runWorker中,當getTask傳回null或抛出異常,将進入processWorkerExit處理工作線程的退出。

processWorkerExit方法流程圖如下:

ThreadPoolExecutor核心實作原理和源碼解析&lt;二&gt;

下面看看源碼:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
         /**
           * 如果是突然終止,工作線程數減1
           * 如果不是突然終止,在getTask()中已經減1
           */
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//鎖定線程池
        try {
            completedTaskCount += w.completedTasks;//彙總完成的任務數量
            workers.remove(w);//移除工作線程
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//嘗試終止線程池

        int c = ctl.get();
        //狀态是running、shutdown,即tryTerminate()沒有成功終止線程池
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ?  : corePoolSize;
                //任務隊列中仍然有任務未執行,需至少保證有一個工作線程
                if (min ==  && ! workQueue.isEmpty())
                    min = ;
                /**
                  * allowCoreThreadTimeOut為false則需要保證線程池中至少有corePoolSize數量的工作線程
                  */
                if (workerCountOf(c) >= min)
                    return; 
            }
            //添加一個沒有firstTask的工作線程
            addWorker(null, false);
        }
    }
           

processWorkerExit隻會在工作線程中被調用,主要用于清理和記錄一個即将死亡的線程,該方法可能會終止線程池。這裡不再詳細tryTerminate和addWorker的實作,關于tryTerminate和addWorker的分析參見ThreadPoolExecutor核心實作原理和源碼解析<一>

歡迎指出本文有誤的地方,轉載請注明原文出處。

繼續閱讀