天天看點

細說JUC的線程池架構

線程池技術主要來自于`java.util.concurrent`包(俗稱**JUC**),該包是JDK1.5以後引進來的,主要是完成高并發,多線程的一個工具包。

線程池主要解決了線程的排程,維護,建立等問題,它在提高了線程的使用率的同時還提高了性能。

前言

線程的建立是需要JVM和OS(作業系統)互相配合的,一次的建立要花費許多的資源。

1.首先,JVM要為該線程配置設定堆棧和初始化大量記憶體塊,棧記憶體至少是1MB。

2.其次便是要進行系統的調用,在OS中建立和注冊本地的線程。

在Java的高并發場景下頻繁的建立和銷毀線程,一方面是記憶體塊的頻繁配置設定和回收,另一方面是作業系統頻繁注冊線程和銷毀,記憶體資源使用率不高的同時,也增加了時間的成本,這是非常低效的。我們要做的是線上程執行完使用者代碼邏輯塊後,儲存該線程,等待下一次使用者代碼邏輯塊來到時,繼續去運用該線程去完成這個任務,這樣不僅減少了線程頻繁的建立和銷毀,同時提高了性能。具體的實作便是線程池技術。

JUC線程池架構

線程池技術主要來自于

java.util.concurrent

包(俗稱JUC),該包是JDK1.5以後引進來的,主要是完成高并發,多線程的一個工具包。

在JUC中,有關線程池的類和接口大緻如圖下所示:

細說JUC的線程池架構

接下來讓我們一個一個解析每個接口和類吧。

Exector接口

我們從源碼來看Exector

public interface Executor {
    void execute(Runnable command);
}
           

我們可以看到Exector隻有一個接口方法便是execute()方法,該方法是定義是線程在未來的某個時間執行給定的目标執行任務,也就是說線程池中的線程執行目标任務的方法。

ExecutorService接口

該接口繼承了Executor是以它也繼承了execute()方法,同時它本身也擴充了一些重要的接口方法,我們通過源碼看一下幾個比較常用的方法。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
}
           
  • shutdown()接口方法的定義是關閉線程池,它與shutdownNow()方法不同的點在于,它不會中止正在執行的線程,它也會把未完成的目标任務完成了,此時線程池的狀态未SHUTDOWN,執行回調函數後關閉線程池。
  • shutdownNow()接口方法的定義也是關閉線程池,它與shutdown()方法不同的點在于,它會中止正在執行的線程,清空已送出但未執行的目标任務,傳回已完成的目标任務,同時線程池的狀态為STOP,執行回調函數後關閉線程池。
  • isShutdown()接口方法的定義是判斷目前的線程池狀态是否是SHUTDOWN狀态,是傳回true,不是傳回false。
  • isTerminated()接口方法的定義是判斷目前線程池狀态是否是TERMINATED狀态,也就是判斷目前線程池是否已經關閉,不是傳回flase,是傳回true。
  • submit()接口方法的定義與execute()方法類似,也是送出目标任務給線程池,線程池中的線程在适合的時機去執行該目标任務,它與execute()方法不同的點在兩個:一方面是submit()方法的形參可以有是Callable類型的,也可以是Runnable類型的,而execute()方法僅能接收Runnable類型的,另一方面是submit()方法的傳回值類型是Future,這意味着,我們可以擷取到目标任務的執行結果,以及任務的是否執行、是否取消等情況,而execute()方法的傳回值是void類型,這也表示我們擷取不到目标任務的執行情況等資訊。

AbstractExecutorService抽象類

正如第一張圖顯示的:AbstractExecutorService抽象類繼承了ExecutorService接口,這意味着AbstractExecutorService抽象類擁有着父類ExecutorService接口的所有接口方法,同時因為ExecutorService接口又繼承了Executor接口,是以也擁有Executor接口的接口方法。

不僅如此AbstractExecutorService抽象類實作了除shutdown()、shutdownNow()、execute()、isShutdown()、isTerminated()以外的方法,這裡我們主要檢視一下submit()方法的實作,同時也可以加深我們對execute()和submit()的關系與差別。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
           
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
           

我們來解讀一下源碼:

  1. 首先判斷傳進來的Runnable類型的對象是否為空,如果為空的話便抛出一個空指針異常。
  2. 若不為空,則将目前的Runnable類型對象傳入newTaskFor方法,我們走進newTaskFor方法可以發現,其将Runnable類型對象修飾成了一個FutureTask類型的對象,FutureTask是實作了RunnableFuture接口的實作類,是以可以将其賦予給ftask。
  3. 随後,調用了execute方法,将ftask作為目标任務,傳入線程池,等待線程池排程線程執行這個任務,最後再傳回ftask,便于調用線程監控目标任務的執行情況和執行結果。

從源碼分析我們可以得知,submit()方法本質上還是調用了executor()方法,隻不過将Runnable類型的對象修飾成了FutureTask類型,讓其擁有監控執行任務的能力而已。

有關Callable接口和FutureTask實作類以及RunnableFuture接口的詳細資訊可以查閱筆者另一篇随筆: https://www.cnblogs.com/qzlzzz/p

ThreadPoolExecutor線程池實作類

ThreadPoolExecutor繼承了AbstractExecutorService抽象類,是以也就擁有了AbstractExecutorService抽象類繼承的實作了的接口方法和AbstractExecutorService抽象類所繼承的未實作的接口方法。

在此前提下,ThreadPoolExecutor不僅實作了AbstractExecutorService抽象類未實作的接口方法,同時其内部真正的實作了一個線程池,且實作了線程池的排程,管理,維護,空閑線程的存活時間,預設的線程工廠,和阻塞隊列,核心線程數,最大線程數,淘汰政策等功能。我們也可得知ThreadPoolExecutor是線程池技術的核心、重要類。"由于本随筆僅說JUC的線程池架構,是以不多描述線程池的實作等其核心功能"

這裡我們着眼于ThreadPoolExecutor對execute()方法的實作,首先我們來看其源碼:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
           

其實在源碼中已經有很詳細的注釋解析了,甚至不追及都可以懂這段代碼的作用,我想這就是好的程式員的一種展現,筆者在追尋源碼的路上時候也慢慢體會到尤雨溪大佬為什麼那麼強調:想要水準提升,必須有好的英語。

接着我們來大緻說一下源碼:

1. 首先會判斷傳入進來的Runnable類型的對象是否為空,如果為空則抛出一個空指針異常。

2. 随後擷取ctl的值,ctl是原子類,在定義時它的初始值是 -536870912,擷取到值後賦給c變量,c變量傳入workerCountOf()方法,在方法的内部進行了或運算,以此來擷取線程池的線程數,如果線程池的線程數比定義線程池時所設定的核心線程數要少的話,不管線程池裡的線程是否空閑,都會建立一個線程。

3. 判斷為true的話,進入到嵌套if()中的addWorker()方法。

這裡我們再來探尋一下addWorker()方法的源碼:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           

在addWorker()方法裡面我們可以看到充斥着大量線程池在SHUTDOWN和STOP狀态時,線程池該怎樣去運作,當線程池中的線程數達到核心線程數,線程池又如何去做,以及如何選取空餘的線程去執行目标任務或者在阻塞隊列中的目标任務等排程,建立功能。

在如此長的一段代碼中我們關注這幾行:

//第一段代碼
            w = new Worker(firstTask);
            final Thread t = w.thread;
           

以及

//第二段代碼
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
           

首先是第一段代碼,我們可以看到它将目标任務Runnable類型的對象修飾成了Worker類型,我們翻看一下Worker類:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
        //省略其他代碼
  
        Runnable firstTask;
        
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            runWorker(this);
        }

        //省略其他代碼
}
           

從Worker的構造方法中我們可以知道其将目标任務,傳給了自己的執行個體屬性,同時由于自己本身是Runnable的實作類,是以可以以自己本身作為參數傳入到線程工廠的構造線程方法中,而自己本身實作的run()方法中又調用了runWorker()方法,runWorker()方法的參數又是目前Worker的執行個體本身,如果讀者有意深入的話,會發現runWorker()方法體中有一段是task.run()去執行目标任務,其餘的代碼則是回調函數的調用。

也就是說線程工廠建立的線程,如果啟動該線程去執行的話,是執行Worker類中的run()方法,也就會去執行run()方法中的runWorker()方法。

然後我們繼續來看第一段代碼,其使用句柄w擷取到thread,賦予給了Thread類型的t變量。第一段代碼結束,再到第二段代碼中使用了t.start()來啟動這個線程去執行目标任務,再将這個任務的工作狀态設為ture。

至此,兩段代碼探讨結束。

4. 最後回到execute()方法中,繼續走下去便是一些線程池拒絕政策的判斷,在這裡就不過多叙述了。

ScheduledExecutorSerive接口

從關系圖可以得知ScheduledExecutorService接口繼承了ExecutorService接口,這說明ScheduledExecutorService接口擁有着ExecutorService接口的接口方法,同時除了ExecutorService的送出、執行、判斷線程池狀态等的接口方法之外,ScheduledExecutorService接口還拓展了一些接口方法。

這裡我們從接口定義中來解讀ScheduledExecutorService究竟增加了那些功能。

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}
           
  • 可以看出schedule()有兩種重載方法,差別在于第一種方法接收的形參是Runnable類型的,第二種方法接收的形參是Callable類型的。其作用都是前一次執行結束到下一次執行開始的時間是delay,機關是unit。
  • scheduleAtFixedRate()接口方法的定義是首次執行目标任務的時間延遲initialDelay,兩個目标任務開始執行最小間隔時間是delay,其機關都是unit。
  • scheduleWithFixedDelay()接口方法的定義與schedule()方法類似,隻不過是首次執行的時間延遲initialDelay,機關是unit。
注意上面的方法都是周期的,也就是說會周期地執行目标任務。

ScheduledThreadPoolExecutor線程池實作類

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,同時實作了ScheduledExecutorSerive接口,這意味着ScheduledThreadPoolExecuotr不僅擁有了ThreadPoolExecutor實作的線程池,同ScheduledExecutorService接口繼承的接口方法也無需其實作,因為ThreadPoolExecutor和AbstractExecutorService已經幫其實作了。在此基礎上,ScheduledThreadPoolExecutor實作了ScheduledExecutorService接口拓展的方法,這使得ScheduledThreadPoolExecutor成為一個執行"延時"和"周期性"任務的可排程線程池。

至此,JUC線程池架構也逐漸清晰了起來,Exector接口定義了最重要的execute方法,ExecutorService 則拓展了送出和執行的方法,也擴充了監控線程池的方法、AbstractExecutorService抽象類則負責實作了ExecutorService 接口拓展的方法,因為ThreadPoolExecutor類内部實作了線程池,是以監控線程池的方法和execute方法等重要的方法自然也交給了其實作。最後的ScheduledThreadPoolExecuto類其實也是在整個完整的線程池技術上,拓展了線程池的一些功能。

結尾

一定要吃早餐,學習的過程需注意自己的身體才行。