天天看點

一文弄懂Java中線程池原理

作者:java小悠

在工作中,我們經常使用線程池,但是你真的了解線程池的原理嗎?同時,線程池工作原理和底層實作原理也是面試經常問的考題,是以,今天我們一起聊聊線程池的原理吧。

為什麼要用線程池

使用線程池主要有以下三個原因:

  1. 降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。
  2. 提升響應速度。當任務到達時,任務可以不需要等到線程建立就能立即執行。
  3. 可以對線程做統一管理。線程是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統穩定性,使用線程池可以進行統一配置設定、調優和監控。

線程池的原理

Java中的線程池頂層接口是Executor接口,ThreadPoolExecutor是這個接口的實作類。

我們先看看ThreadPoolExecutor類。

ThreadPoolExecutor提供的構造方法

// 七個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
複制代碼           

我們先看看這些參數是什麼意思:

  • int corePoolSize:該線程池中核心線程數最大值
核心線程:線程池中有兩類線程,核心線程和非核心線程。核心線程預設情況下會一直存在于線程池中,即使這個核心線程什麼都不幹(鐵飯碗),而非核心線程如果長時間的閑置,就會被銷毀(臨時工)。
  • int maximumPoolSize:該線程池中線程總數最大值 。
該值等于核心線程數量 + 非核心線程數量。
  • long keepAliveTime:非核心線程閑置逾時時長。
非核心線程如果處于閑置狀态超過該值,就會被銷毀。如果設定allowCoreThreadTimeOut(true),則會也作用于核心線程。
  • TimeUnit unit:keepAliveTime的機關。
TimeUnit是一個枚舉類型。
  • BlockingQueue workQueue:阻塞隊列,維護着等待執行的Runnable任務對象。
  • 常用的幾個阻塞隊列:
  • LinkedBlockingQueue:鍊式阻塞隊列,底層資料結構是連結清單,預設大小是Integer.MAX_VALUE,也可以指定大小。
  • ArrayBlockingQueue:數組阻塞隊列,底層資料結構是數組,需要指定隊列的大小。
  • SynchronousQueue:同步隊列,内部容量為0,每個put操作必須等待一個take操作,反之亦然。
  • DelayQueue:延遲隊列,該隊列中的元素隻有當其指定的延遲時間到了,才能夠從隊列中擷取到該元素 。
  • ThreadFactory threadFactory
  • 建立線程的工廠 ,用于批量建立線程,統一在建立線程時設定一些參數,如是否守護線程、線程的優先級等。如果不指定,會建立一個預設的線程工廠。
static class DefaultThreadFactory implements ThreadFactory {
    // 省略屬性
    // 構造函數
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    // 省略
}
複制代碼           
  • RejectedExecutionHandler handler
  • 拒絕處理政策,線程數量大于最大線程數就會采用拒絕處理政策,四種拒絕處理的政策為 :
  • ThreadPoolExecutor.AbortPolicy:預設拒絕處理政策,丢棄任務并抛出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:丢棄新來的任務,但是不抛出異常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列頭部(最舊的)的任務,然後重新嘗試執行程式(如果再次失敗,重複此過程)。
  • ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務。

ThreadPoolExecutor的政策

線程池本身有一個排程線程,這個線程就是用于管理布控整個線程池裡的各種任務和事務,例如建立線程、銷毀線程、任務隊列管理、線程隊列管理等等。

故線程池也有自己的狀态。ThreadPoolExecutor類中使用了一些final int常量變量來表示線程池的狀态 ,分别為RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複制代碼           
  • 線程池建立後處于RUNNING狀态。
  • 調用shutdown()方法後處于SHUTDOWN狀态,線程池不能接受新的任務,清除一些空閑worker,不會等待阻塞隊列的任務完成。
  • 調用shutdownNow()方法後處于STOP狀态,線程池不能接受新的任務,中斷所有線程,阻塞隊列中沒有被執行的任務全部丢棄。此時,poolsize=0,阻塞隊列的size也為0。
  • 當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING狀态。接着會執行terminated()函數。
  • 線程池處在TIDYING狀态時,執行完terminated()方法之後,就會由 TIDYING -> TERMINATED, 線程池被設定為TERMINATED狀态。

線程池主要的任務處理流程

處理任務的核心方法是execute,我們看看 JDK 1.8 源碼中ThreadPoolExecutor是如何處理線程任務的:

// JDK 1.8 
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();   
    int c = ctl.get();
    // 1.目前線程數小于corePoolSize,則調用addWorker建立核心線程執行任務
    if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
    }
    // 2.如果不小于corePoolSize,則将任務添加到workQueue隊列。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果isRunning傳回false(狀态檢查),則remove這個任務,然後執行拒絕政策。
        if (! isRunning(recheck) && remove(command))
            reject(command);
            // 2.2 線程池處于running狀态,但是沒有線程,則建立線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.如果放入workQueue失敗,則建立非核心線程執行任務,
    // 如果這時建立非核心線程失敗(目前線程總數不小于maximumPoolSize時),就會執行拒絕政策。
    else if (!addWorker(command, false))
         reject(command);
}
複制代碼           

ctl.get()是擷取線程池狀态,用int類型表示。第二步中,入隊前進行了一次isRunning判斷,入隊之後,又進行了一次isRunning判斷。

為什麼要二次檢查線程池的狀态?

在多線程的環境下,線程池的狀态是時刻發生變化的。很有可能剛擷取線程池狀态後線程池狀态就改變了。判斷是否将command加入workqueue是線程池之前的狀态。倘若沒有二次檢查,萬一線程池處于非RUNNING狀态(在多線程環境下很有可能發生),那麼command永遠不會執行。

總結一下處理流程

  1. 線程總數量 < corePoolSize,無論線程是否空閑,都會建立一個核心線程執行任務(讓核心線程數量快速達到corePoolSize,在核心線程數量 < corePoolSize時)。注意,這一步需要獲得全局鎖。
  2. 線程總數量 >= corePoolSize時,新來的線程任務會進入任務隊列中等待,然後空閑的核心線程會依次去緩存隊列中取任務來執行(展現了線程複用)。
  3. 當緩存隊列滿了,說明這個時候任務已經多到爆棚,需要一些“臨時工”來執行這些任務了。于是會建立非核心線程去執行這個任務。注意,這一步需要獲得全局鎖。
  4. 緩存隊列滿了, 且總線程數達到了maximumPoolSize,則會采取上面提到的拒絕政策進行處理。

整個過程如圖所示:

一文弄懂Java中線程池原理

ThreadPoolExecutor如何做到線程複用的?

我們知道,一個線程在建立的時候會指定一個線程任務,當執行完這個線程任務之後,線程自動銷毀。但是線程池卻可以複用線程,即一個線程執行完線程任務後不銷毀,繼續執行另外的線程任務。那麼,線程池如何做到線程複用呢?

原來,ThreadPoolExecutor在建立線程時,會将線程封裝成工作線程worker,并放入工作線程組中,然後這個worker反複從阻塞隊列中拿任務去執行。

這裡的addWorker方法是在上面提到的execute方法裡面調用的,先看看上半部分:

// ThreadPoolExecutor.addWorker方法源碼上半部分
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                // 1.如果core是ture,證明需要建立的線程為核心線程,則先判斷目前線程是否大于核心線程
                // 如果core是false,證明需要建立的是非核心線程,則先判斷目前線程數是否大于總線程數
                // 如果不小于,則傳回false
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
複制代碼           

上半部分主要是判斷線程數量是否超出門檻值,超過了就傳回false。我們繼續看下半部分:

// ThreadPoolExecutor.addWorker方法源碼下半部分
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 1.建立一個worker對象
        w = new Worker(firstTask);
        // 2.執行個體化一個Thread對象
        final Thread t = w.thread;
        if (t != null) {
            // 3.線程池全局鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 4.啟動這個線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複制代碼           

建立worker對象,并初始化一個Thread對象,然後啟動這個線程對象。

我們接着看看Worker類,僅展示部分源碼:

// Worker類部分源碼
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
            runWorker(this);
    }
    //其餘代碼略...
}
複制代碼           

Worker類實作了Runnable接口,是以Worker也是一個線程任務。在構造方法中,建立了一個線程,線程的任務就是自己。故addWorker方法調用addWorker方法源碼下半部分中的第4步t.start,會觸發Worker類的run方法被JVM調用。

我們再看看runWorker的邏輯:

// Worker.runWorker方法源代碼
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 1.線程啟動之後,通過unlock方法釋放鎖
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 2.Worker執行firstTask或從workQueue中擷取任務,如果getTask方法不傳回null,循環不退出
        while (task != null || (task = getTask()) != null) {
            // 2.1進行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷)
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 2.2檢查線程池狀态,倘若線程池處于中斷狀态,目前線程将中斷。 
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 2.3執行beforeExecute 
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 2.4執行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 2.5執行afterExecute方法 
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 2.6解鎖操作
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複制代碼           

首先去執行建立這個worker時就有的任務,當執行完這個任務後,worker的生命周期并沒有結束,在while循環中,worker會不斷地調用getTask方法從阻塞隊列中擷取任務然後調用task.run()執行任務,進而達到複用線程的目的。隻要getTask方法不傳回null,此線程就不會退出。

當然,核心線程池中建立的線程想要拿到阻塞隊列中的任務,先要判斷線程池的狀态,如果STOP或者TERMINATED,傳回null。

最後看看getTask方法的實作:

// Worker.getTask方法源碼
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 1.allowCoreThreadTimeOut變量預設是false,核心線程即使空閑也不會被銷毀
        // 如果為true,核心線程在keepAliveTime内仍空閑則會被銷毀。 
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 2.如果運作線程數超過了最大線程數,但是緩存隊列已經空了,這時遞減worker數量。 
     // 如果有設定允許線程逾時或者線程數量超過了核心線程數量,
        // 并且線程在規定時間内均未poll到任務且隊列為空則遞減worker數量
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 3.如果timed為true(想想哪些情況下timed為true),則會調用workQueue的poll方法擷取任務.
            // 逾時時間是keepAliveTime。如果超過keepAliveTime時長,
            // poll傳回了null,上邊提到的while循序就會退出,線程也就執行完了。
            // 如果timed為false(allowCoreThreadTimeOut為false
            // 且wc > corePoolSize為false),則會調用workQueue的take方法阻塞在目前。
            // 隊列中有任務加入時,線程被喚醒,take方法傳回任務,并執行。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複制代碼           

核心線程的會一直卡在workQueue.take方法,被阻塞并挂起,不會占用CPU資源,直到拿到Runnable 然後傳回(當然如果allowCoreThreadTimeOut設定為true,那麼核心線程就會去調用poll方法,因為poll可能會傳回null,是以這時候核心線程滿足逾時條件也會被銷毀)。

非核心線程會workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果逾時還沒有拿到,下一次循環判斷compareAndDecrementWorkerCount就會傳回null,Worker對象的run()方法循環體的判斷為null,任務結束,然後線程被系統回收 。

四種常見的線程池

Executors類中提供的幾個靜态方法來建立線程池。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複制代碼           

CacheThreadPool的運作流程如下:

  1. 送出任務進線程池。
  2. 因為corePoolSize為0的關系,不建立核心線程,線程池最大為Integer.MAX_VALUE。
  3. 嘗試将任務添加到SynchronousQueue隊列。
  4. 如果SynchronousQueue入列成功,等待被目前運作的線程空閑後拉取執行。如果目前沒有空閑線程,那麼就建立一個非核心線程,然後從SynchronousQueue拉取任務并在目前線程執行。
  5. 如果SynchronousQueue已有任務在等待,入列操作将會阻塞。

當需要執行很多短時間的任務時,CacheThreadPool的線程複用率比較高, 會顯著的提高性能。而且線程60s後會回收,意味着即使沒有任務進來,CacheThreadPool并不會占用很多資源。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
複制代碼           

核心線程數量和總線程數量相等,都是傳入的參數nThreads,是以隻能建立核心線程,不能建立非核心線程。因為LinkedBlockingQueue的預設大小是Integer.MAX_VALUE,故如果核心線程空閑,則交給核心線程處理;如果核心線程不空閑,則入列等待,直到核心線程空閑。

與CachedThreadPool的差別:

  • 因為 corePoolSize == maximumPoolSize ,是以FixedThreadPool隻會建立核心線程。 而CachedThreadPool因為corePoolSize=0,是以隻會建立非核心線程。
  • 在 getTask() 方法,如果隊列裡沒有任務可取,線程會一直阻塞在 LinkedBlockingQueue.take() ,線程不會被回收。 CachedThreadPool會在60s後收回。
  • 由于線程不會被回收,會一直卡在阻塞,是以沒有任務的情況下, FixedThreadPool占用資源更多。
  • 都幾乎不會觸發拒絕政策,但是原理不同。FixedThreadPool是因為阻塞隊列可以很大(最大為Integer最大值),故幾乎不會觸發拒絕政策;CachedThreadPool是因為線程池很大(最大為Integer最大值),幾乎不會導緻線程數量大于最大線程數,故幾乎不會觸發拒絕政策。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複制代碼           

有且僅有一個核心線程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),是以,不會建立非核心線程。所有任務按照先來先執行的順序執行。如果這個唯一的線程不空閑,那麼新來的任務會存儲在任務隊列裡等待執行。

newScheduledThreadPool

建立一個定長線程池,支援定時及周期性任務執行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
複制代碼           

四種常見的線程池基本夠我們使用了,但是《阿裡巴巴開發手冊》不建議我們直接使用Executors類中的線程池,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學需要更加明确線程池的運作規則,規避資源耗盡的風險。

但如果你及團隊本身對線程池非常熟悉,又确定業務規模不會大到資源耗盡的程度(比如線程數量或任務隊列長度可能達到Integer.MAX_VALUE)時,其實是可以使用JDK提供的這幾個接口的,它能讓我們的代碼具有更強的可讀性。

小結

在工作中,很多人因為不了解線程池的實作原理,把線程池配置錯誤,進而導緻各種問題。希望你們閱讀完本文,能夠學會合理的使用線程池。

連結:https://juejin.cn/post/7171030896307339295