天天看點

深入剖析線程池ThreadPoolExecutor 原理

一: 線程池定義:

1. 為什麼會出現線程池?

頻繁建立線程是一個比耗時且有一定的資源開銷,故引入了線程池,将線程通過池子的管理,進行重複利用。

2. 如果建立線程:

a ) 通過Executors輔助類建立線程池的執行個體:

它預設提供了4種線程池政策:

newFixedThreadPool(int nThreads) 建立一個固定數量的線程池,它的緩存任務隊列的大小是Int 的最大值,同時拒絕政策采用抛出異常 RejectedExecutionException

Executors 傳回的線程池對象的弊端如下:

FixedThreadPool 和 SingleThreadPool : 允許的請求隊列長度為 Integer.MAX_VALUE ,可能會堆積大量的請求,進而導緻 OOM 。

CachedThreadPool 和 ScheduledThreadPool : 允許的建立線程數量為 Integer.MAX_VALUE ,可能會建立大量的線程,進而導緻 OOM 。

b ) 直接通過調用ThreadPoolExecutor 的構造方法進行建立:

/**
corePoolSize : 線程池線程初始數
maxinumPoolSize : 線程池最大線程數
keepAliveTime : (maxinumPoolSize - corePoolSize) 的線程存活時長
unit : 存活時長的時間機關
workQueue : 任務阻塞隊列
    ArrayBlockingQueue:構造函數一定要傳大小
    LinkedBlockingQueue:構造函數不傳大小會預設為(Integer.MAX_VALUE ),當大量請求任務時,容易造成 記憶體耗盡。
    SynchronousQueue:同步隊列,一個沒有存儲空間的阻塞隊列 ,将任務同步傳遞給工作線程。
   PriorityBlockingQueue : 優先隊列

threadFactory : 線程工廠
handler : 拒絕執行政策處理器
    拒絕政策有4種:
        AbortPolicy: 當任務數超出線程池處理數,就會抛出該異常
        DiscardPolicy: 當任務數超出線程池處理數,它會将後續送出的任務進行丢棄
        DiscardOldestPolicy: 當任務數超出線程池處理數,它會将阻塞隊列中第一個位置的元素彈出
        CallerUnsPolicy:   當任務數超出線程池處理數,它會将任務抛出調用者線程進行執行
*/
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
           

二:線程池狀态及生命周期:

RUNNING : 表示線程池可以接收新的任務送出,并且還可以正常處理阻塞隊列中的任務。

SHUTDOWN : 不再接收新的任務送出,但還會繼續處理阻塞隊列中的任務。

STOP : 不再接受新的任務送出,同時還會丢棄阻塞隊列中的未處理的任務,并且會立即中止目前進行中的任務

TIDYING : 表示所有任務都執行完畢後(包括阻塞隊列),目前線程池中的活動線程數量降為0,将會調用terminated方法

TERMIMATED : 線程池終止狀态,當terminated() 方法執行完畢後,線程池将會處理該狀态之下。

深入剖析線程池ThreadPoolExecutor 原理

三:線程池的結構:

3.1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

該屬性是原子的Integer 類型,可以實作原子操作,Doug Lea 将這個ctl 變量,拆分成二部分,前29位用于記錄工作線程數, 後3位表示線程池的狀态

3.2 線程池中大量的提供了對 AtomicInteger ctl 的狀态原子操作變更 和 工作線程數量的原子變更

/**
 * Attempts to CAS-increment the workerCount field of ctl.
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * Attempts to CAS-decrement the workerCount field of ctl.
 */
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

/**
 * Decrements the workerCount field of ctl. This is called only on
 * abrupt termination of a thread (see processWorkerExit). Other
 * decrements are performed within getTask.
 */
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
           

3.3 private final BlockingQueue<Runnable> workQueue;

該隊列用于存放 工作任務 (Runable,Callable)

3.4 private final HashSet<Worker> workers = new HashSet<Worker>();

該HashSet 集合,專門用于存線上程池中正在工作的線程

3.5 private volatile ThreadFactory threadFactory;

用于建立線程池線程的工廠方法,可以自己擴充,友善将線程命名之類

3.6 private volatile RejectedExecutionHandler handler;

用于執行任務過多超載時,執行的拒絕政策

3.7 private volatile int corePoolSize;

線程池核心線程數

3.8 private volatile int maximumPoolSize;

線程池最大的線程數

3.9 private volatile long keepAliveTime;

線程池空餘線程(非核心線程)的存活時間

3.10 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}

它是線程池中的内部類,主要就是封裝了線程和任務.

四:線程池中一些重要的實作:

1. 當任務送出,會進行判斷

如果線程數小于基本線程數(coreSize), 則直接建立線程,并将線程和任務包裝成worker 對象放入 工作線程集合中

如果線程數大于基本線程數,則放入到任務隊列中

如果任務隊列已滿,則又會建立線程到線程池最大值

如果線程數已經達到最大值,任務隊列已滿,則執行拒絕政策。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();  int c = ctl.get();
            //工作線程數量與線程池核心線程數比較,小于核心數線程數量,則建立線程,并且将建立的工作線程及其綁定的任務的worker 對象放入工作線程集合中,而且同時會調用線程的start方法。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //判斷線程池處于運作狀态,然後将任務放入到工作隊列中去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次檢查線程池是否處于運作狀态,如果不是運作狀态,則将任務從隊列中移除,并執行拒絕政策
            if (! isRunning(recheck) && remove(command))
                reject(command);
             // 如果工作線程數為0,表示   
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
 }
 
 
  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
   
    //Work 線程中的run 方法,調用了線程池的runWorker 方法:
     /** Delegates main run loop to outer runWorker  */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    { 
        public void run() {
            runWorker(this);
        }
     }
    // 線上程調用start 方法後,裡面有一個while 循環,其判斷條件是:線上程第一次建立時,會直接執行其任務,如果沒有任務,則從隊列中擷取任務,如果沒有擷取到任務則一直阻塞,直到任務擷取到為止。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 要麼執行送出過來的任務,要麼從工作隊列中擷取任務 getTask(),沒有擷取到任務則阻塞,直到擷取到任務為止
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 移除空閑多餘線程
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    // 在getTask 中的阻塞是通過 workQueue.take() 方法實作,同時在for 循環中,會判斷線程是否逾時,如果逾時,則會将線程打斷
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 如果工作線程數大于核心線程數,且線程存活時間超過設定時間時,傳回true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 工作線程數大于最大線程數或者線程逾時 并且 工作線程大于1 或者 工作隊列任務數為空時,那麼take 方法就會傳回null , 進而把流程交給了 runWorker 方法中的 finally 代碼塊,
            // 而finally 代碼塊,會将空閑多餘的線程從工作線程中移除.
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
           

五:其它相關的資訊:

PS: 在ThreadPoolExecutor 中的submit 方法會吞掉異常資訊,而execute 方法隻能出現部分資訊,最好是将異常進行包裝,如下:

public Runnable wrap(final Runable task, final Exception ex , String threadName) {
    return new Runable(){
        try {
            task.run();
        } catch (Exception e) {
            ex.printStackTrace(e);
            throw e;
        }
    }
}
           

繼續閱讀