天天看點

線程池(ThreadPoolExecutor)源碼分析之如何保證核心線程不被銷毀的

今天看到了别人的一個代碼,為了實作每小時重新開機一下MQ拉取消息,他使用的是Thread.sleep(10006060)方法,然後重新開機MQ。我一看到就非常頭疼啊。。為什麼要使用這種方式而不使用java的線程池呢?于是我就問他,他說當時為了友善。大家都知道Thread.sleep期間是不會釋放共享資源的,會造成死鎖現象。然後我就想Thread.sleep可以在睡覺過程中等待被interrupt中斷,然後繼續工作。那麼線程池是怎麼保證他的核心線程不釋放 而一直等待任務的執行的呢?難道我們一直了解的線程run方法執行完畢線程就銷毀是不正确的?而且還有我們為何通過設定​

​allowCoreThreadTimeOut(true)​

​ 就能使核心線程銷毀的呢?

我們通常都是通過執行execute(Runnable command)方法來向線程池送出一個不需要傳回結果的任務的(如果你需要傳回結果那麼就是 ​

​<T> Future<T> submit(Callable<T> task)​

​方法),懷着一顆探索的心,敲敲翻開了線程池的源碼:

public void execute(Runnable command) {
         /*如果送出的任務為null  抛出空指針異常*/
        if (command == null)
            throw new NullPointerException();
           
        int c = ctl.get();
        /*如果目前的任務數小于等于設定的核心線程大小,那麼調用addWorker直接執行該任務*/
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /*如果目前的任務數大于設定的核心線程大小,而且目前的線程池狀态時運作狀态,那麼向阻塞隊列中添加任務*/
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*如果向隊列中添加失敗,那麼就新開啟一個線程來執行該任務*/
        else if (!addWorker(command, false))
            reject(command);
    }      

有人可能不了解​

​ctl.get()​

​​簡單解釋一下,線程池是通過Integer類型的高3為表述目前線程池的狀态​

​RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED​

​​ 。低29位表示目前線程的運作任務數量。然後通過位運算來計算運作狀态和任務的數量。

解釋一下execute的執行流程

當線程池中的線程數小于corePoolSize 時,新送出的任務直接建立一個線程執行任務(不管是否有空閑線程)

當線程池中的線程數等于corePoolSize 時,新送出的任務将會進入阻塞隊列(workQueue)中,等待線程的排程

當阻塞隊列滿了以後,如果corePoolSize < maximumPoolSize ,則新送出的任務會建立線程執行任務,直至線程數達到maximumPoolSize

當線程數達到maximumPoolSize 時,新送出的任務會由(飽和政策)管理

代碼執行邏輯很簡單,我們需要注意的就是addWorker方法,點進去繼續檢視。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //死循環
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果目前線程池的狀态時SHUTDOWN,STOP,TIDYING,TERMINATED并且為SHUTDOWN狀态時任務隊列為空,那麼就傳回false  原因:如果調用了shutdown方法,此時的線程池還會繼續工作并且會在任務隊列中的所有任務執行完成後才會結束線程池。
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //死循環
            for (;;) {
                int wc = workerCountOf(c);
                //core是在execute方法中傳的參數,true表示 核心線程,false表示最大線程 
                 //CAPACITY  可以了解為Integer的最大值  1左移29位再-1
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //如果增加任務數量成功那麼退出這個循環執行下面的代碼,否則繼續    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //這行代碼仔細記着  稍後分析
            w = new Worker(firstTask);
            final Thread t = w.thread;
            
            if (t != null) {
                //同步塊 使用内置鎖 鎖住
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                      //再判斷一次目前線程池的狀态  避免在執行過程中線程時被使用者關閉
                    int rs = runStateOf(ctl.get());
                    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //向正在執行的任務隊列(workers)中添加work    差別一下:workqueue是等待執行的阻塞隊列
                        workers.add(w);
                        int s = workers.size();
                        //記錄曾經并發執行的最大任務個數
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                            //添加任務成功
                        workerAdded = true;
                    }
                } finally {
                    //finally塊釋放内置鎖    
                    mainLock.unlock();
                }
                //如果任務添加成功那麼開始執行任務
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }      

寫到這裡差點忘記了我們當初的目的,我們的目的是檢視線程池是如何保證核心線程不被銷毀的。

看到這裡終于出現了一點眉目,在任務添加成功後,我們發現了​​

​t.start()​

if (workerAdded) {
              t.start();
              workerStarted = true;
         }      

可不要被迷惑,這裡的t是通過​

​work.thread;​

​​ 得到的。這時候我們需要檢視work類中的run方法。

work在ThreadPoolExecutor為一個内部類實作了Runnable接口。隻有一個構造方法

Worker(Runnable firstTask) {
     setState(-1); // inhibit interrupts until runWorker
     this.firstTask = firstTask;
     this.thread = getThreadFactory().newThread(this);
}

/** 重寫了run方法  */
 public void run() {
     runWorker(this);
 }      

通過構造方法我們可以清楚的看到我們送出的任務就是​

​firstTask​

​​,而thread就是目前的work對象。在上面的addWorker方法中調用的​

​t.start()​

​​就會調用這裡的​

​runWorker(this)​

​方法,點進去繼續檢視

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //得到worker對象中我們送出的任務
        Runnable task = w.firstTask;
    
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果目前任務為空  那麼就從getTask中獲得任務
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //任務執行前調用的方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任務結束後調用的方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }      

關鍵代碼就是那個while循環。如果task不為空執行task否則從​

​getTask()​

​​中取任務。在執行完任務後會在finally 塊中設定task = null;

順便介紹一下,在這裡,我們可以看到​​

​beforeExecute(Thread t, Runnable r)​

​​方法和​

​afterExecute(Runnable r, Throwable t)​

​​會在任務的執行前後執行,我們可以通過繼承線程池的方式來重寫這兩個方法,這樣就能夠對任務的執行進行監控啦。

咋一看 好像沒什麼問題。其實我們可以發現如果執行完一個任務 task 設定為 null。就要調用 ​​

​getTask()​

​方法 。 點進去檢視一下。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //死循環
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }      

需要仔細關注的是這些代碼

= timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                        if (r != null)
                    return r;      

解釋一下:從阻塞任務隊列中取任務,如果設定了​

​allowCoreThreadTimeOut(true)​

​​ 或者目前運作的任務數大于設定的核心線程數,那麼​

​timed =true​

​​ 。此時将使用​

​workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)​

​​從任務隊列中取任務,而如果沒有設定,那麼使用​

​workQueue.take();​

​​取任務,對于阻塞隊列,​

​poll(long timeout, TimeUnit unit)​

​​ 将會在規定的時間内去任務,如果沒取到就傳回null。​

​take()​

​​會一直阻塞,等待任務的添加。

到此 相信我們都能夠了解為什麼我們的線程池能夠一直等待任務的執行而不被銷毀了,其實也就是進入了阻塞狀态而已。

繼續閱讀