天天看點

當面試官問線程池時,你應該知道些什麼?

Java面試中,線程池也算是一個高頻的問題,其實就JDK源碼來看線程池這一塊的實作代碼應該算是寫的清晰易懂的,通過這篇文章,我們就來盤點一下線程池的知識點。

本文基于JDK1.8源碼進行分析

首先看下線程池構造函數:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //忽略指派與校驗邏輯
    }
      

構造參數比較多,一個一個說下:

  • corePoolSize線程池中的核心線程數
  • maximumPoolSize線程池中的最大線程數
  • keepAliveTime線程池中的線程存活時間(準确來說應該是沒有任務執行時的回收時間,後面會分析)
  • unit時間機關
  • workQueue來不及執行的任務存放的阻塞隊列
  • threadFactory建立woker線程(注意不是我們送出的任務)是進行一些屬性設定,比如線程名,優先級等等,有預設實作。
  • handler 任務拒絕政策,當運作線程數已達到maximumPoolSize,隊列也已經裝滿時會調用該參數拒絕任務,有預設實作。

當我們向線程池送出任務時,通常使用execute方法,接下來就先從該方法開始分析。

在分析execute代碼之前,需要先說明下,我們都知道線程池是維護了一批線程來處理使用者送出的任務,達到線程複用的目的,線程池維護的這批線程被封裝成了Worker。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //JDK8的源碼中,線程池本身的狀态跟worker數量使用同一個變量ctl來維護
        int c = ctl.get();
        //通過位運算得出當然線程池中的worker數量與構造參數corePoolSize進行比較
        if (workerCountOf(c) < corePoolSize) {
            //如果小于corePoolSize,則直接新增一個worker,并把當然使用者送出的任務command作為參數,如果成功則傳回。
            if (addWorker(command, true))
                return;
            //如果失敗,則擷取最新的線程池資料
            c = ctl.get();
        }
        //如果線程池仍在運作,則把任務放到阻塞隊列中等待執行。
        if (isRunning(c) && workQueue.offer(command)) {
            //這裡的recheck思路是為了處理并發問題
            int recheck = ctl.get();
            //當任務成功放入隊列時,如果recheck發現線程池已經不再運作了則從隊列中把任務删除
            if (! isRunning(recheck) && remove(command))
                //删除成功以後,會調用構造參數傳入的拒絕政策。
                reject(command);
             //如果worker的數量為0(此時隊列中可能有任務沒有執行),則建立一個worker(由于此時建立woker的目的是執行隊列中堆積的任務,
             //是以入參沒有執行任務,詳細邏輯後面會詳細分析addWorker方法)。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果前面的新增woker,放入隊列都失敗,則會繼續新增worker,此時線程池的狀态是woker數量達到corePoolSize,阻塞隊列任務已滿
        //隻能基于maximumPoolSize參數建立woker
        else if (!addWorker(command, false))
            //如果基于maximumPoolSize建立woker失敗,此時是線程池中線程數已達到上限,隊列已滿,則調用構造參數中傳入的拒絕政策
            reject(command);
    }
      

源碼裡我增加了很多注釋,需要多讀幾遍才能完全了解,總結一下使用者向線程池送出任務以後,線程池的執行邏輯:

  • 如果目前woker數量小于corePoolSize,則建立一個woker并把目前任務配置設定給該woker線程,成功則傳回。
  • 如果第一步失敗,則嘗試把任務放入阻塞隊列,如果成功則傳回。
  • 如果第二步失敗,則判斷如果目前woker數量小于maximumPoolSize,則建立一個woker并把目前任務配置設定給該woker線程,成功則傳回。
  • 如果第三步失敗,則調用拒絕政策處理該任務。

從execute的源碼可以看出addWorker方法是重中之重,馬上來看下它的實作。

addWorker方法:

private boolean addWorker(Runnable firstTask, boolean core) {
        //這裡有一段基于CAS+死循環實作的關于線程池狀态,線程數量的校驗與更新邏輯就先忽略了,重點看主流程。
        //...

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
             //把指定任務作為參數建立一個worker線程
            w = new Worker(firstTask);
            //這裡是重點,咋一看,一定以為w.thread就是我們傳入的firstTask
            //其實是通過線程池構造函數參數threadFactory生成的woker對象
            //也就是說這個變量t就是代表woker線程。絕對不是使用者送出的線程任務firstTask!!!
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //加鎖之後仍舊是判斷線程池狀态等一些校驗邏輯。
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //把建立的woker線程放入集合儲存,這裡使用的是HashSet
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //然後啟動woker線程
                    //這裡再強調一遍上面說的邏輯,該變量t代表woker線程,也就是會調用woker的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //如果woker啟動失敗,則進行一些善後工作,比如說修改目前woker數量等等
                addWorkerFailed(w);
        }
        return workerStarted;
    }
      

addWorker方法主要做的工作就是建立一個Woker線程,加入到woker集合中,然後啟動該線程,那麼接下來的重點就是Woker類的run方法了。

worker執行方法:

//Woker類實作了Runnable接口
public void run() {
            runWorker(this);
        }

//最終woker執行邏輯走到了這裡
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //task就是Woker構造函數入參指定的任務,即使用者送出的任務
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            //一般情況下,task都不會為空(特殊情況上面注釋中也說明了),是以會直接進入循環體中
            //這裡getTask方法是要重點說明的,它的實作跟我們構造參數設定存活時間有關
            //我們都知道構造參數設定的時間代表了線程池中的線程,即woker線程的存活時間,如果到期則回收woker線程,這個邏輯的實作就在getTask中。
            //來不及執行的任務,線程池會放入一個阻塞隊列,getTask方法就是去阻塞隊列中取任務,使用者設定的存活時間,就是
            //從這個阻塞隊列中取任務等待的最大時間,如果getTask傳回null,意思就是woker等待了指定時間仍然沒有
            //取到任務,此時就會跳過循環體,進入woker線程的銷毀邏輯。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //該方法是個空的實作,如果有需要使用者可以自己繼承該類進行實作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //真正的任務執行邏輯
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //該方法是個空的實作,如果有需要使用者可以自己繼承該類進行實作
                        afterExecute(task, thrown);
                    }
                } finally {
                    //這裡設為null,也就是循環體再執行的時候會調用getTask方法
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //當指定任務執行完成,阻塞隊列中也取不到可執行任務時,會進入這裡,做一些善後工作,比如在corePoolSize跟maximumPoolSize之間的woker會進行回收
            processWorkerExit(w, completedAbruptly);
        }
    }
      

woker線程的執行流程就是首先執行初始化時配置設定給的任務,執行完成以後會嘗試從阻塞隊列中擷取可執行的任務,如果指定時間内仍然沒有任務可以執行,則進入銷毀邏輯。

注:這裡隻會回收corePoolSize與maximumPoolSize直接的那部分woker

了解了整個線程池的運作原理以後,再來看下JDK預設提供的線程池類型就會一目了然了:

public static ExecutorService newFixedThreadPool(int nThreads) {
        //corePoolSize跟maximumPoolSize值一樣,同時傳入一個無界阻塞隊列
        //根據上面分析的woker回收邏輯,該線程池的線程會維持在指定線程數,不會進行回收
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
      
public static ExecutorService newSingleThreadExecutor() {
        //線程池中隻有一個線程進行任務執行,其他的都放入阻塞隊列
        //外面包裝的FinalizableDelegatedExecutorService類實作了finalize方法,在JVM垃圾回收的時候會關閉線程池
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
      
public static ExecutorService newCachedThreadPool() {
        //這個線程池corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,意思也就是說來一個任務就建立一個woker,回收時間是60s
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
      

最後再說說初始化線程池時線程數的選擇:

  • 如果任務是IO密集型,一般線程數需要設定2倍CPU數以上,以此來盡量利用CPU資源。
  • 如果任務是CPU密集型,一般線程數量隻需要設定CPU數加1即可,更多的線程數也隻能增加上下文切換,不能增加CPU使用率。

上述隻是一個基本思想,如果真的需要精确的控制,還是需要上線以後觀察線程池中線程數量跟隊列的情況來定。