天天看點

線程池實作原理-2

前言

線程池實作原理-1

addWorker實作

在看addWorker方法之前,我們先看一個例子,了解一下retry的使用

  1. break retry 跳到retry處,且不再進入循環
  2. continue retry 跳到retry處,且再次進入循環
public static void main(String[] args) {
    breakRetry();
    continueRetry();
}

private static void breakRetry() {
    int i = 0;
    retry:
    for (; ; ) {
        System.out.println("start");
        for (; ; ) {
            i++;
            if (i == 4)
                break retry;
        }
    }
    //start 進入外層循環
    //4
    System.out.println(i);
}

private static void continueRetry() {
    int i = 0;
    retry:
    for(;;) {
        System.out.println("start");
        for(;;) {
            i++;
            if (i == 3)
                continue retry;
            System.out.println("end");
            if (i == 4)
                break retry;
        }
    }
    //start 第一次進入外層循環
    //end i=1輸出
    //end i=2輸出
    //start 再次進入外層循環
    //end i=4輸出
    //4 最後輸出
    System.out.println(i);
}           

複制

這裡說一下Runnable 參數的含義

  1. firstTask != null 說明任務被添加了,我們需要啟動一個線程去執行它
  2. fistTask == null 說明我隻想啟動一個線程去消費阻塞隊列中的任務
// core為ture表示是核心線程,否則非核心線程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 将條件改為如下形式,友善了解
         * rs >= SHUTDOWN && (rs != SHUTDOWN || fistTask != null || workQueue.isEmpty)
         * 1.如果目前線程池的狀态>SHUTDOWN,addWorker傳回false,添加任務失敗
         * 2.如果目前線程池的狀态=SHUTDOWN,分為如下2種情況
         * (1)workQueue為空,fistTask == null 和fistTask != null的任務都不能
         * (2)workQueue不為空,可以添加fistTask != null的任務
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                // 1.是核心線程 >= corePoolSize
                // 2.非核心線程 >= maximumPoolSize
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 成功将線程數+1,跳到retry處,并且不再進入死循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 否則重新讀取ctl
            c = ctl.get();  // Re-read ctl
            // 線程狀态發生改變,跳到retry處,并且進入死循環
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 線程是否啟動的标志位
    boolean workerStarted = false;
    // 線程封裝成Worker對象,是否添加到線程池中的标志位
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 1.rs < SHUTDOWN 即 rs = RUNNING
                // 2.rs == SHUTDOWN && firstTask == null
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 重新整理了largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 留心一下這裡,後面會從這裡開始講起
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}           

複制

仔細了解一下這段代碼,其實就能了解,當線程池處于RUNNING 接受新任務,并且處理進入隊列的任務,處于SHUTDOWN 不接受新任務,處理進入隊列的任務,剩餘狀态都不會處理任務,上面代碼中的注釋有詳細解釋

if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
    return false;           

複制

線程池在執行任務的時候,會把任務對象包裝成一個Worker對象,Worker對象是ThreadPoolExecutor的一個内部類,繼承了AbstractQueuedSynchronizer,實作了一個獨占鎖,status值為0表示未鎖定狀态,status值為1表示鎖定狀态,實作了Runnable接口,在執行run方法的時候,它執行完初始化的firstTask後,還會從workQueue中取出任務執行,這樣就不用建立一個線程執行任務,而是在一個線程中執行了好幾個任務

Worker内部類

// 省略了一部分對鎖的操作,簡單的對AQS的一個實作
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

}           

複制

runWorker實作

當t.start()被執行後,run方法會執行runWorker方法,來看runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允許中斷
    w.unlock(); // allow interrupts
    // 辨別線程是不是異常終止的
    boolean completedAbruptly = true;
    try {
        // 先執行初始化的fistTask,執行完成後還會無限循環擷取workQueue裡的任務來執行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 配合shutdownNow 方法
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 線程開始開始執行之前執行此方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 線程執行後執行
                    afterExecute(task, thrown);
                }
            } finally {
                // 運作過的置為null
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}           

複制

這個方法需要注意的就是

  1. getTask()從阻塞隊列中擷取任務,如果隊列中沒有任務會被阻塞,并不會占用CPU資源
  2. 可以根據業務需要自定義beforeExecute和afterExecute方法

getTask實作

private Runnable getTask() {
    // 标記擷取頭結點并且移除頭結點的方法是否逾時
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1.rs >= STOP
        // 2.rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 用CAS将線程池中的數量-1,直到成功才會退出
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 1.核心線程允許被銷毀
        // 2.核心線程數 > corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 1.timeOut為true,表示逾時擷取,workQueue沒有任務,說明線程應該被銷毀,但是還是要 && timed
        // 2.wc > maximumPoolSize肯定要删除線程了
        // 3.workQueue為空可以銷毀線程,此時有可能所有線程都被銷毀了
        // 4.workQueue不為空,隻有wc > 1才能被删除
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // timed為true,超過keepAliveTime還是沒有任務,傳回null
            // timed為false,則一直阻塞等待任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}           

複制

processWorkerExit實作

線程執行完畢執行的方法

// processWorkerExit在runWorker結束之後被調用
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是異常終止,或者被中斷,減少workerCount
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // Transitions to TERMINATED state if either (SHUTDOWN and pool
    // and queue empty) or (STOP and pool empty)
    tryTerminate();

    int c = ctl.get();
    // 狀态為RUNNING或者SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 目前核心線程已經夠用了,不用再建立
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 增加一個消費的線程
        addWorker(null, false);
    }
}           

複制

shutdown實作

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 檢查能否操作線程
        checkShutdownAccess();
        // 確定狀态 >= SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中斷所有的空閑線程
        interruptIdleWorkers();
        // ScheduledThreadPoolExecutor會重寫這個方法,做一些其他的運算
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}           

複制

// 中斷空閑線程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}           

複制

// onlyOne為true則隻中斷一個空閑線程,否則全部中斷
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 周遊Worker并執行中斷操作,w.tryLock()保證了正在執行的Worker不會被中斷
            // 因為正在運作的Worker再次擷取鎖會失敗
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}           

複制

這裡需要注意的是不會中斷正在運作的線程,因為正在運作的線程w.tryLock()會傳回false

shutdownNow實作

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 確定狀态 >= STOP
        advanceRunState(STOP);
        // 中斷所有線程
        interruptWorkers();
        // 擷取所有沒有執行完成的task
        // 即将阻塞隊列中的任務放到tasks中 
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}           

複制

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}           

複制

// 這個是Worker内部類的方法
void interruptIfStarted() {
    Thread t;
  // state的初始值為-1,運作到runWorker才允許中斷
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}           

複制

shutdownNow會中斷所有的線程,因為和shutdown相比在中斷之前,不用擷取鎖

tryTerminate實作

// 将狀态轉換到TERMINATED
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 以下幾種狀态不能轉換為TERMINATED
        // 1.RUNNING狀态
        // 2.TIDYING或TERMINATED
        // 3.SHUTDOWN狀态,但是workQueue不為空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 讓子類去實作,做一些操作
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}           

複制

從上面可看出狀态轉換的條件

  1. SHUTDOWN想轉化為TIDYING,需要workQueue為空,同時workerCount為0
  2. STOP轉化為TIDYING,需要workerCount為0