天天看點

Java多線程之ThreadPoolExecutor實作原理和源碼分析(五)

章節概覽、

  • Java多線程之章節概覽

1、概述

線程池的顧名思義,就是線程的一個集合。需要用到線程,從集合裡面取出即可。這樣設計主要的作用是優化線程的建立和銷毀而造成的資源浪費的情況。Java中的線程池的實作主要是JUC下面的ThreadPoolExecutor類完成的。下面我們做的源碼分析都是基于ThreadPoolExecutor類進行分析。

2、線程池實作類圖UML

Java多線程之ThreadPoolExecutor實作原理和源碼分析(五)

從類繼承圖可以看到,ThreadPoolExecutor 繼承 AbstractExecutorService 抽象類。而AbstractExecutorService 實作了ExecutorService接口。ExecutorService 接口又繼承了Executor接口。下面分析下這幾個接口。

2.1、 核心接口分析

2.1.1、 Executor接口源碼分析

public interface Executor {
// 執行一個任務。任務都被封裝成Runnable的實作
    void execute(Runnable command);
}
           

2.1.2 ExecutorService接口源碼分析

public interface ExecutorService extends Executor {
	
// 啟動有序的關閉,之前送出的任務将會被執行,但不會接受新的任務。
    void shutdown();

// 嘗試停止所有正在執行的任務,停止等待處理的任務,病傳回任務清單
    List<Runnable> shutdownNow();

// 判斷線程池是否已經關閉
    boolean isShutdown();

// 如果關閉後所有任務都已完成。 但是前提是必須先執行:shutdown 或者 shutdownNow
    boolean isTerminated();

// 在開啟shutdown之後,阻止所有的任務知道執行完成
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
        
// 送出任務,帶傳回結果的
    <T> Future<T> submit(Callable<T> task);

// 送出任務,封裝傳回結果為T
    <T> Future<T> submit(Runnable task, T result);

 // 送出一個普通任務,傳回結果任意
    Future<?> submit(Runnable task);

// 執行一批任務,傳回結果為 List<Future<T>>
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
           
2.2 核心内部類分析

從繼承圖來看,其具有5個核心的内部類。其中4内部類對應的是拒絕政策。Worker是核心的執行代碼。下面我們看下拒絕政策類的結構以及政策的運用場景

2.2.1、 RejectedExecutionHandler 接口

public interface RejectedExecutionHandler {
// 拒絕執行政策
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
           

2.2.2、 AbortPolicy 政策

Java線程池預設的阻塞政策,不執行此任務,而且直接抛出一個運作時異常。

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        // 直接抛出異常,描述前線程的基本資訊
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
           

2.2.3、DiscardPolicy政策

空方法,不做任何處理

public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
           

2.2.4、DiscardOldestPolicy 政策

從隊列裡面抛棄一個最老的任務,并再次execute 此task

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }
       
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
            	// 從隊列裡面取出最老的一個任務
                e.getQueue().poll();
                // 手動調用execute方法執行,将任務添加到隊列中
                e.execute(r);
            }
        }
    }
           

2.2.5、CallerRunsPolicy 政策

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

      // 如果目前線程池沒有關閉,則調用線程的run方法
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
           

3、ThreadPoolExecutor構造函數核心成員變量分析

3.1、構造函數詳解
public class ThreadPoolExecutor extends AbstractExecutorService {
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
}
           

構造函數參數說明:

  • corePoolSize

    線程池中的核心線程數,空閑時候線程也不會回收,除非把allowCoreThreadTimeOut設定為 true,這時核心線程才會被回收。

  • maximumPoolSize

    線程池中可以建立的最大線程數,限定為2^29-1。

  • keepAliveTime

    當線程池中建立的線程超過了核心線程數的時候,在沒有新任務加入的等待時間。

  • unit

    keepAliveTime的時間機關,可以是納秒,微秒,毫秒,秒,分鐘,小時,天。

  • workQueue

    存放任務的隊列,隻有當線程數 > 核心線程數,才會把其他的任務放入queue,一般常用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。

  • threadFactory

    建立線程的工廠類。

  • handler

    當queue滿了和線程數達到最大限制,對于繼續到達的任務采取的政策。預設采取AbortPolicy , 也就是拒絕政策,直接抛出異常

3.2、核心成員變量分析

線程池中設計非常巧妙的一個地方是把線程池的狀态和運作的線程數量用一個int類型進行存儲。這樣一來可以保持線程池狀态和線程池活躍線程數量的一緻性。因為AtomicInteger是線程安全的。

  1. workerCount:線程池中目前活動的線程數量,占據ctl的低29位;
  2. runState:線程池運作狀态,占據ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀态

為了将線程池的狀态和線程池中的工作線程的數量放到一個int裡面進行管理。他們利用了二進制資料進行位運算。其中int類型有4個位元組,一個位元組8位。總共有32位。其中高的3位表示線程的狀态。低29位代表線程的數量。

其中32位中,高三位代表的是狀态:

  • 111 > RUNNING
  • 000 > SHUTDOWN
  • 001 > STOP
  • 010 > TIDYING
  • 110 > TERMINATED

低29位代表線程的數量。是以最大的線程數為 2^29 -1 = 536870911

// 記錄線程池狀态和線程數量(總共32位,前三位表示線程池狀态,後29位表示線程數量),保證線程安全性
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// int 位元組32位,COUNT_BITS代表的是29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程的最大容量: 000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 運作狀态: 111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
// 關閉狀态: 000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 停止狀态: 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
// 整理狀态: 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
// 終止狀态: 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;

/**
* 是按位取反的意思,CAPACITY表示的是高位的3個0,和低位的29個1,而~CAPACITY則表示高位的3個1,2低位的9個0,
* 然後再與入參c執行按位與操作,即高3位保持原樣,低29位全部設定為0,也就擷取了線程池的運作狀态runState
*/
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
/**
* 傳回目前線程的數量。其中c代表線程池的狀态,即是高三位。:
* 而CAPACITY 代表的是線程的容量,即000 11111111111111111111111111111
* c & CAPACITY ,隻有當都為1的時候,才為真,這樣直接舍棄高位
*/
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
/**
* 傳入的rs表示線程池運作狀态runState,其是高3位有值,低29位全部為0的int,
* 而wc則代表線程池中有效線程的數量workerCount,其為高3位全部為0,而低29位有值得int,
* 将runState和workerCount做或操作|處理,即用runState的高3位,workerCount的低29位填充的數字,而預設傳入的
*/
    private static int ctlOf(int rs, int wc) { return rs | wc; }
           

線程池的狀态轉換:

// 調用了shutdown()方法 
RUNNING -> SHUTDOWN 

// 調用了shutdownNow() 
(RUNNING 或 SHUTDOWN) -> STOP 

// 當隊列和線程池為空 
SHUTDOWN -> TIDYING 

// 當線程池為空 
STOP -> TIDYING 

// 當terminated()鈎子方法執行完成 
TIDYING -> TERMINATED 
           

4、執行流程核心源碼分析

4.1、程式入口:execute 方法
/*
 * 我們以execute 方法作為程式的入口開始分析
 */
public void execute(Runnable command) {
// 判斷目前任務是否為null,如果為null,直接抛出異常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 有以下3個步驟
         *
         * 1.如果少于corePoolSize的線程在運作,那麼試着啟動一個新線程,其中用給定指令作為first task。
         * 這會調用addWorker去原子性得檢查runState和workerCoune,是以可以防止錯誤報警,在錯誤報警不應該時通過傳回false來添加線程
         * 2.如果任務被成功排隊,我們任然應該第二次檢查是否添加一個新線程(因為可能存在在最後一次檢查後挂掉的情況)
         * 或者在進入這個方法期間線程池shutdown。是以我們再次檢查狀态,如果已關閉和有必要則退出隊列,或者如果沒有的話就開始一個新的線程。
         * 3.如果我們無法将task入隊,那麼我們試圖添加新線程。如果失敗,那麼知道我們shutdown或者是飽和的并拒絕task。
         */
		
      // 擷取ctl的初始值。其初始值是:rs | wc,即狀态位和線程數量高低位互補  
        int c = ctl.get();
        // 擷取目前線程的數量,初始化的時候數量為0,和目前 corePoolSize 比較
        if (workerCountOf(c) < corePoolSize) {
       	    // 如果條件成立,調用addWorker(command, true)
       	    // 源碼分析請看:4.2、boolean addWorker(Runnable firstTask, boolean core)
       	    // 從addWorker源碼分析有得出,隻要目前的workerCountOf(c) < corePoolSize 條件成立,就會往線程池裡面加入一個線程
            // 目前加入的線程會被初始化到Worker中,通過firstTask進行設定
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 目前線程池的數量達到corePoolSize的時候
        // 驗證目前線程池是否處于運作狀态。如果處于運作狀态。将目前的任務添加到任務隊列中。
        // offer方法添加一個元素并傳回為true。如果隊列已滿,這傳回false
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 添加任務到隊列成功以後,在此判斷目前線程池是否運作狀态
            // 如果線程池沒有處于運作狀态,則從隊列中移除目前任務,同時執行拒絕政策
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 由于存在核心線程的過期政策,可能這個時候目前線程池中的線程已經都過期清理了
            // 是以這裡進一步的進行檢測,擷取目前線程的個數。如果線程個數為0的話,則建立一個線程worker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果目前線程池運作正常,且添加任務到隊列失敗。這時重新啟動一個worker線程去執行
        // 此時線程池的最多的線程數量,wc < maximumPoolSize
        // 嘗試直接添一個新的worker線程。如果添加失敗,執行拒絕政策
        else if (!addWorker(command, false))
            reject(command);
    }
           
4.2、 boolean addWorker(Runnable firstTask, boolean core)

addWorker 中的兩個方法參數,firstTask代表目前需要執行的任務。

core的含義有如下:

  1. 如果滿足workerCountOf© < corePoolSize ,則為true
  2. 如果不滿足 workerCountOf© < corePoolSize 且添加任務workQueue.offer(command) 失敗。這時候傳入的為false。
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
           // 擷取目前ctl的最新值
            int c = ctl.get();
            // 擷取目前線程池的運作狀态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 判斷目前線程是否是否已經結束。檢查目前隊列是否為null
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 擷取目前線程的數量
                int wc = workerCountOf(c);
                // 判斷目前線程的數量是否超過最大值
 				// 這裡從core的細節上面已經說明。如果為true 則使用 corePoolSize,反之使用 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               // 通過CAS設定目前的workerCount:ctl.compareAndSet(expect, expect + 1);
               // 目前的線程數量 + 1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 初始化Worker對象,傳入第一個需要執行的任務
           // Woker類的分析,請看: 4.3、Woker類源碼分析
            w = new Worker(firstTask);
            // 擷取 worker對象内部封裝的thread線程
            final Thread t = w.thread;
            if (t != null) {
                // 同步代碼塊,保證目前線程池狀态的一緻性。因為workers是共享變量
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
          
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 檢查目前線程是否被啟動。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将目前線程加入到線程池中
                        workers.add(w);
                        int s = workers.size();
                        // 重置目前線程池擁有的線程個數
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加成功,啟動線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 添加失敗,做失敗清理
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           
4.3、Woker類源碼分析
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 完成的任務數量
        volatile long completedTasks;
        // 構造函數
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            // 初始化成員變量 firstTask
            this.firstTask = firstTask;
            // 初始化目前線程,通過線程工場。具體源碼請參考:4.4、ThreadFactory 源碼分析
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 執行目前傳入的任務,具體實作,請參考:4.5、void runWorker(Worker w) 源碼分析
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

           
4.4、ThreadFactory 源碼分析
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
	
       // 建立線程
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 設定目前線程為非背景線程
            if (t.isDaemon())
                t.setDaemon(false);
            // 設定目前線程的優先級為正常優先級
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
           
4.5、void runWorker(Worker w) 源碼分析
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 當線程池中的線程數量未達到corePoolSize大小的時候,每次都會先建立一個Worker對象,把目前的任務複制給firstTask
        // 直到目前的線程池中的線程數量和corePoolSize大小相等,每次新加任務都會存入到任務隊列中
        Runnable task = w.firstTask;
        // 置空目前的firstTask,主要是為了友善GC
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 目前的firstTask != null 
           // getTask 擷取的任務也不為空,getTask方法詳解,請參考:4.6、Runnable getTask() 源碼分析
            while (task != null || (task = getTask()) != null) {
                // 擷取目前鎖資源
                w.lock();
                // 如果池正在停止,請確定線程被中斷;
				// 如果沒有,請確定線程不被中斷。 這個
				// 需要在第二種情況下重新檢查才能處理
				// shutdown在清除中斷時正在比賽
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                    
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行目前的任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 置空目前的 task,友善GC
                    task = null;
                    // 目前的完成任務++
                    w.completedTasks++;
                    // 釋放目前的鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        	// 當判斷條件 task != null || (task = getTask()) != null 不成立的時候,删除目前Woker
            processWorkerExit(w, completedAbruptly);
        }
    }
           
4.6、Runnable getTask() 源碼分析
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判斷目前的任務隊列是否為null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            // 擷取目前的線程數量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
           
            // 如果目前的 allowCoreThreadTimeOut 設定為 ture,或者 wc > corePoolSize 的情況 為 ture
            // 目前線程隊列已滿,才會出現 wc > corePoolSize的
            // 通過這段設定,用于判斷核心線程空閑時,是否需要清理
            // 其次當線程數高于核心線程數時,是否需要清理線程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize 的情況

            // 判斷目前的線程數是否大于最大的線程數
            // wc > 1 或者 workQueue為空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 減少目前的線程數量,通過CAS
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 根據  timed判斷,通過哪種方式擷取目前的任務
                Runnable r = timed ?
                    // timed 為false,說明目前線程池線程數量超過了核心數量
                    //  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 表示等待 keepAliveTime 的時間之後
                    // 沒有任務的話,直接傳回 null。而這個傳回為null,直接影響目前線程是否被回收的前提條件。
                    // 線程循環條件:while (task != null || (task = getTask()) != null) 如果 傳回 task = null。則直接跳出循環
                    // 通過 processWorkerExit(w, completedAbruptly) 進行線程的回收
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 移除并傳回隊列的頭部元素,如果隊列為空,則阻塞
                    workQueue.take();
                // 如果 r != null。 則傳回
                if (r != null)
                    return r;
                // 如果擷取失敗,設定目前timeOut為逾時,接着循環
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
           

ThreadPoolExecutor 的核心理财源碼已經分析完了

5、總結

  1. 當往線程池中添加任務的時候,每次添加一個任務都回去新增一個線程。直到不滿足 wc < corePoolSize
  2. 目前線程池的大小已經達到了corePoolSize的時候,每次添加任務會被存放到阻塞任務隊列中。等待執行
  3. 等等待任務隊列也滿的時候,且添加失敗。此時在來新的任務,就會接着增加線程的個數,直到滿足:wc >= maximumPoolSize ,添加線程失敗執行拒絕政策。
  4. 線程池中,把線程的狀态和數量通過int類型進行維護,高三位表示狀态,低29位表示線程數量。這樣可以保證線程的狀态和數量的一緻性

繼續閱讀