天天看點

【并發進階】全面解析線程池原理

 大家好,我是Coder哥,今天我們來聊聊線程池。

  關于線程池使用方法的文章太多了,這裡就不多啰嗦了,今天我們來聊細節,我知道大家對于如何使用線程池肯定比我熟悉,但是線程池建立流程的幾個關鍵節點的政策你知道嗎?比如第一次啟動時候,核心線程數的建立是建立滿還是先複用?空閑線程是如何釋放的?等,具體有如下幾個問題:

  1. 核心線程的建立政策是什麼?比如核心線程數設定為5,我先送出一個任務,執行完後又送出一個任務,這個時候線程池裡面有幾個核心線程?是2個還是1個呢?
  2. 當隊列滿的時候送出任務,會建立最大核心線程數相關線程,那麼這個線程是從隊列裡面取還是直接用這個剛送出的任務呢?
  3. 線程池的拒絕時機是什麼?
  4. 線程池是如何實作線程的重複利用的?
  5. 空閑線程是如何釋放的?
  6. 核心線程是否能夠回收?
良心提醒:涉及到源碼,可能需要時間比較長,對于以上幾個問題如果你都比較清楚,就不用往下看了,避免浪費時間。如果暫時沒時間可以先收藏哦

線程池概述

我們先簡單的複習一下線程池參數及流程

線程池的6個參數

【并發進階】全面解析線程池原理

  首先,我們先來看下線程池中各個參數的含義,如表所示線程池主要有 6 個參數,其中第 3 個參數由

keepAliveTime + 時間機關

組成。

corePoolSize

是核心線程數,也就是常駐線程池的線程數量,與它對應的是

maximumPoolSize

表示線程池最大線程數量,當我們的任務特别多而 corePoolSize 核心線程數無法滿足需求并且任務隊列存放滿的時候,就會向線程池中增加線程,以便應對任務突增的情況。

線程的建立流程

【并發進階】全面解析線程池原理

  如上圖所示,當送出任務後,線程池首先會檢查目前線程數,如果此時線程數小于核心線程數,則建立線程并執行任務,随着任務的不斷增加,線程數會逐漸增加并達到核心線程數,此時如果仍有任務被不斷送出,就會被放入 workQueue 任務隊列中,等待核心線程執行完目前任務後重新從 workQueue 中提取正在等待被執行的任務。

此時,如果我們的任務特别多,達到了workQueue的容量上限,線程池就會繼續建立非核心線程來執行任務也就是maximumPoolSize控制的最大線程數,假設任務仍然不斷送出,線程池會繼續建立非核心線程來執行任務,當線程數達到maximumPoolSize規定的上限時,線程池就會拒絕這些任務,也就是執行線程的拒絕政策。

線程的整體建立流程如上面的描述,但是建立的細節我們需要結合源碼來分析了。

線程池核心分析

線程池組成及功能

【并發進階】全面解析線程池原理

如圖,我們思考一下,如果我們自己實作線程池會怎麼實作,顧名思義,線程池就是存放線程的池子,為了達到複用、資源控制等效果,首先我們需要一個線程的生産工廠,其次我們需要一個管理線程的地方來管理線程的建立、執行、複用、銷毀、拒絕等功能,然後還要個能排隊的任務隊列來排隊等待執行任務,最後我們還需要實作不同的拒絕政策來處理超出線程城池自身處理能力的任務

  • ThreadFactory: 線程的生産工廠,通過方法

    Thread newThread(Runnable r)

    來建立線程。
  • ThreadPoolExecutor: 線程池的實作類用來管理線程池的建立、執行、複用、銷毀、拒絕等功能。
  • BlockingQueueworkQueue: 存放任務的隊列。
  • **RejectedExecutionHandler: ** 拒絕政策接口,當線程及任務滿的時候,拒絕任務送出的政策。
  • **Worker: **複用的工作線程,存放到

    ThreadPoolExecutor

    中的

    HashSet<Worker>

    裡面。

帶着上面的基本概念,我們來結合源碼深入的分析一下

1. 啟動源碼分析

public static void main(String[] args) throws InterruptedException {
    //建立線程池 核心線程數為2,最大線程數為3的線程池
    ThreadPoolExecutor executorService = new ThreadPoolExecutor(
            2,    // corePoolSize
            3,    // maximumPoolSize
            60L,  // keepAliveTime
            TimeUnit.SECONDS, //機關
            new LinkedBlockingDeque<>(3), // workQueue
            Executors.defaultThreadFactory()); // threadFactory
    //線程池送出一個任務并執行
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            System.out.println("線程池建立線程");
        }
    });
}
           

複制

我們建立一個參數如代碼所示的線程池,并送出一個

Runnable

任務,接下來我們看一下submit對應的源碼:

public Future<?> submit(Runnable task) {
    RunnableFuture<Void> ftask = new FutureTask<T>(task, null);
    execute(ftask);
    return ftask;
}
// 執行線程的方法
public void execute(Runnable command) {
    // 目前線程池線程的數量
    int c = ctl.get();
   // 1. 擷取目前線程池的線程數,如果小于核心線程數,建立核心線程 `addWorker(command, true)`
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
    }
   // 2. 如果目前線程池是運作狀态并且工作線程數量大于等于核心線程數,把任務添加到隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
       // 4. 如果目前線程池沒有在運作(線程池調用shutdown()方法了),直接清除任務并執行拒絕政策
        if (! isRunning(recheck) && remove(command))
            reject(command);
       // 5. 否則如果線程池在運作并且工作線程數量為0,則建立非核心線程(比如 corePoolSize設定為0的時候,會走這裡直接建立非核心線程)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
   /* 3. 否則這裡分兩種情況,
      第一(isRunning(c)==false): 那麼addWorker() 會傳回 false 直接執行拒絕政策
      第二 (workQueue.offer()==false): 那麼說明隊列滿了,這個時候 addWorker() 會傳回 true
   */
    else if (!addWorker(command, false))
        reject(command);
}
           

複制

我們看一下上面的代碼步驟,正常流程建立

1、2、3

  1. 如果小于核心線程數,建立核心線程

    addWorker(command, true)

  2. 如果核心線程數滿了,就把任務添加到隊列
  3. 如果隊列滿了,才會建立非核心線程。

基于這個正常的流程,我們來回顧一下開篇提到的

問題1,問題2

問題1:

由代碼可知,隻要工作線程數小于核心線程數,不管工作線程是否有空,都會直接建立工作線程并直接任務。

問題2:

當隊列滿的時候,會直接

addWorker(command,false)

建立非核心線程并把任務傳給它直接執行。

我們再來看一下

問題3

拒絕政策觸發的時機,我們結合流程

1、2、3

1、2、4

來看看。

流程1、2、3

:這個就是當工作線程數最大并且隊列滿的時候,添加線程會失敗,觸發拒絕政策

reject(command)

流程1、2、3或4

:如果線程池在運作的時候,突然有人調用線程池的shutdown方法了,這個時候

isRunning(c)==false

就會觸發拒絕政策,或者在進入

4

之前shutdown了,那麼會進入

3

這時

addWorker(command, false)會傳回false

,也會觸發拒絕政策。

拒絕政策觸發時機小結:
  1. 線程池滿了,任務隊列滿了會觸發拒絕政策。
  2. 當線程池在調用shutdown方法的時候,如果繼續添加任務也會觸發拒絕政策。

2. Worker線程複用源碼分析

在上面啟動源碼的時候我們分析了,如果任務多了把任務放到

BlockingQueue

隊列裡面,其實複用就是工作線程一直從隊列裡面取任務,然後執行。我們來看一下

addWorker()

方法的源碼

private boolean addWorker(Runnable firstTask, boolean core) {
    ...
    boolean workerStarted = false;
    boolean workerAdded = false;
    ThreadPoolExecutor.Worker w = null;
    try {
       // 1. 建立Worker 線程,這裡注意點:在Worker裡面的線程是通過我們最開始傳入的線程工廠建立的
        w = new Worker(firstTask);
        final Thread t = w.thread;
       ... //這裡省略其他邏輯
        workers.add(w);
       workerAdded = true;
       if (workerAdded) {
          // 2. 啟動線程, 這裡的start() 就是線程啟動,我們看Worker類裡面把this傳遞給thread了
          // 是以這裡start()其實調用的是 Worker裡面的 run() 方法。
          t.start();
          workerStarted = true;
        }
       ...
    return workerStarted;
}

class Worker extends AbstractQueuedSynchronizer implements Runnable
    // Worker類的構造方法, 并且Worker是 實作的 Runnable接口
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
        // 這裡其實 <=> this.thread = new Thread(this);
        this.thread = getThreadFactory().newThread(this);
    }
   // 3. 接着上面第2步 start() 方法過來
   public void run() {
      runWorker(this);
  }
   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
       // 4. 拿到真正的任務
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 5. 這裡如果task為null, 就從 隊列裡面循環擷取,這裡就是通過 getTask() 來擷取的
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 這裡相應線程中斷,或者線程停止
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    try {
                       // 6. 啟動任務
                        task.run();
                    } catch (Throwable ex) {
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.unlock();
                }
            }
           // 工作線程中斷或者異常跳出,會觸發Worker線程回收工作
            completedAbruptly = false;
        } finally {
           // 釋放Worker線程,具體源碼可自行查閱
            processWorkerExit(w, completedAbruptly);
        }
    }
}
           

複制

我們看了上面的代碼,省去了大部分的非主流程的代碼,我們可以看到,建立Worker線程的時候, 線程工廠會把Worder 自身傳遞給

Thread()

, 這樣在第

2

步的時候,

t.start()

啟動線程就會異步觸發 Worker中的

run()

方法,在

runWorker()

方法中有個

while循環

一直的從隊列中擷取任務并運作,這裡其實就是線程池中線程在重複的取任務執行任務,如此循環往複

這個流程也解釋了最開始的

問題4

已經清晰了。

對于這部分代碼t.start() 啟動為啥調用 run()方法可以看一下我之前的文章。

3. 空閑線程是如何釋放的?

至此線程池建立,複用,拒絕都聊過,最後我們來看一下非工作線程是怎麼釋放的呢,在Worker線程複用源碼分析中我們看到,

runWorker()

方法中有個while循環在一直循環的擷取任務執行任務,當跳出循環的時候,會執行

processWorkerExit(w, completedAbruptly)

方法,這個方法就是釋放線程的,但是跳出循環的時機是什麼呢?

上面代碼我們可以看到,當Worker線程被中斷或者狀态為STOP的時候會跳出,這兩個是線程池調用shutdown方法的時候觸發的,咱這裡先不考慮這個情況,除了這種情況還有個條件在

while((task = getTask()) != null)

中,就是說擷取不到

getTask()

的時候也會跳出循環釋放線程,那麼我們來看看

getTask()

的源碼

private Runnable getTask() {
   //逾時标志
    boolean timedOut = false;
   // 循環
    for (;;) {
        int c = ctl.get();
        //1. 線程池狀态為 SHUTDOWN并且隊列為空或者STOP的時候會傳回 null, 釋放目前線程
        if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
           ...
            return null;
        }
    // 2. 擷取目前線程池中線程的數量
        int wc = workerCountOf(c);

        // 3. 當允許核心線程逾時或者 目前線程數量大于核心線程數時 timed = true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    // 4. timed == true 并且 timeOut == true 并且 隊列任務為空且線程池線程存在的情況下,傳回null, 釋放目前線程
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
           // Worker 線程數減一操作
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
           // 5. 這裡是關鍵:timed==true 意思是線程數大于核心線程數的時候,從隊列裡面取值并加了個 keepAliveTime 逾時時間,如果超過這個時間還沒取到任務,就timedOut=true, 然後再次循環的時候,上面第4步的if 條件就滿足了,就會return null, 然後就會釋放線程了。
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
           

複制

其實到這一步我們可以看出,釋放線程的原理是,上面代碼的

第5步

,也就是timed==true 意思是線程數大于核心線程數的時候,從隊列裡面取值并加了個 keepAliveTime 逾時時間,如果超過這個時間還沒取到任務,就timedOut=true, 然後再次循環的時候,上面第4步的if 條件就滿足了,就會return null, 然後就會釋放線程了。

這裡還有個點是,allowCoreThreadTimeOut這個參數來控制是否能釋放核心線程數,預設是 false,可以通過

allowCoreThreadTimeOut(boolean value)

方法來設定值,如果設定為

true

,意味着上面代碼的

第3步

不會判斷目前線程數>核心線程數這個條件,也就是說,線程池中隻要有線程就可以釋放。

至此對于最開始的

問題5,問題6

已經清晰了。

最後

 感謝各位能看到最後,希望本篇的内容對你有幫助,有什麼意見或者建議可以留言一起讨論,看到後第一時間回複,也希望大家能給個贊,你的贊就是我寫文章的動力,再次感謝。