天天看點

ThreadPoolExecutor生命周期執行流程

ThreadPoolExecutor

  • 生命周期
  • 執行流程
    • 線程池建立
    • 線程的送出
    • 核心方法
      • addWorker
      • ThreadPoolExecutor.Worker
      • runWorker
      • getTask

參考于 ThreadPoolExecutor

生命周期

狀态 介紹
RUNNING 允許接收新任務,處理隊列中的任務
SHUTDOWN 不接收新的任務,處理完隊列中的任務
STOP 不接收新的任務,不處理隊列中的任務,嘗試中斷正在執行任務的線程
TIDYING 所有任務被終止了,工作線程數workCount也被設為0,線程的狀态也被設為TIDYING,并開始調用鈎子函數terminated()
TERMINATED 鈎子函數terminated()執行完畢

生命周期轉換:(不可逆)

ThreadPoolExecutor生命周期執行流程

執行流程

ThreadPoolExecutor生命周期執行流程

線程池建立

/**
     * @param corePoolSize 核心線程數。即使空閑,也要保留在池中的線​​程數,除非設定了{@code allowCoreThreadTimeOut}
     * @param maximumPoolSize 線程池最大容量大小
     * @param keepAliveTime 當線程數大于核心線程數時,多餘的空閑線程将在終止之前等待新任務的最長時間
     * @param unit keepAliveTime時間機關
     * @param workQueue 用于在任務暫挂之前用于保留任務的隊列。該隊列将僅儲存由{@code execute}方法送出的{@code Runnable}任務。
     * @param threadFactory 線程工廠
     * @param handler 因為達到了線程界限和隊列容量而在執行被阻塞時使用的處理程式
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

線程的送出

  1. workerCount < corePoolSize,新送出任務将建立一個新Worker執行任務,即使此時線程池中存在空閑線程;
  2. 當線程池達到corePoolSize時,新送出任務将被放入workQueue中,等待線程池中任務排程執行;
  3. 當workQueue已滿,且maximumPoolSize > corePoolSize時,新送出任務會建立新線程執行任務;
  4. 當送出任務數超過maximumPoolSize時,新送出任務由RejectedExecutionHandler處理;
  5. 當線程池中超過corePoolSize線程或者allowCoreThreadTimeOut為true,空閑時間達到keepAliveTime時,關閉空閑線程;
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
            
        //ctl存儲worker個數和線程池狀态
        int c = ctl.get();
        
        //如果worker個數小于核心線程數
        if (workerCountOf(c) < corePoolSize) {
        	//建立worker并添加到workers中
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果線程池正在運作中且加入workQueue成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //二次校驗線程池狀态
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
           
Worker負責處理處理循環處理Runnable

核心方法

addWorker

先了解一下《retry文法》

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //死循環
        for (;;) {
            int c = ctl.get();
            //擷取線程池運作狀态
            int rs = runStateOf(c);

			//判斷目前線程池的狀态是不是已經SHUTDOWN,如果SHUTDOWN了拒絕線程加入
            //(rs!=SHUTDOWN || firstTask!=null || workQueue.isEmpty())
            //如果rs不為SHUTDOWN,狀态是STOP、TIDYING或TERMINATED,是以此時要拒絕請求
            //如果狀态為SHUTDOWN,而傳入一個不為null的線程,那麼需要拒絕
            //如果狀态為SHUTDOWN,同時隊列中已經沒任務了,那麼拒絕掉
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			//死循環
            for (;;) {
            	//擷取worker個數
                int wc = workerCountOf(c);
                //worker個數超過容量,則傳回失敗
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //線程個數增加成功,則執行循環體以下的代碼
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //重新check線程池狀态,如果狀态改變了,則重新進入第一個循環
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           

ThreadPoolExecutor.Worker

Worker繼承《AQS》,實作Runnable接口

Worker

的構造方法:

Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this); //thread運作時執行的是Worker的run方法
	}
           

Worker

run

方法:

public void run() {
      runWorker(this);
	}
           

runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        	//先執行firstTask,之後循環getTask擷取Task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//運作task
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
           

getTask

擷取Task
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			//釋放線程worker,當線程池中超過corePoolSize線程或者allowCoreThreadTimeOut為true,空閑時間達到keepAliveTime時,關閉空閑線程
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            	//可能阻塞
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }