相關文章目錄:
Java線程池ThreadPoolExecutor使用和分析(一)
Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理
Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理
execute()是 java.util.concurrent.Executor接口中唯一的方法,JDK注釋中的描述是“在未來的某一時刻執行指令command”,即向線程池中送出任務,在未來某個時刻執行,送出的任務必須實作Runnable接口,該送出方式不能擷取傳回值。下面是對execute()方法内部原理的分析,分析前先簡單介紹線程池有哪些狀态,在一系列執行過程中涉及線程池狀态相關的判斷。以下分析基于JDK 1.7
以下是本文的目錄大綱:
一、線程池執行流程
二、線程池狀态
三、任務送出内部原理
1、execute() -- 送出任務
2、addWorker() -- 添加worker線程
3、内部類Worker
4、runWorker() -- 執行任務
5、getTask() -- 擷取任務
6、processWorkerExit() -- worker線程退出
若有不正之處請多多諒解,歡迎批評指正、互相讨論。
請尊重作者勞動成果,轉載請标明原文連結:
http://www.cnblogs.com/trust-freedom/p/6681948.html

1、如果線程池中的線程數量少于corePoolSize,就建立新的線程來執行新添加的任務
2、如果線程池中的線程數量大于等于corePoolSize,但隊列workQueue未滿,則将新添加的任務放到workQueue中
3、如果線程池中的線程數量大于等于corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小于maximumPoolSize,則會建立新的線程來處理被添加的任務
4、如果線程池中的線程數量等于了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕政策
其中ctl這個AtomicInteger的功能很強大,其高3位用于維護線程池運作狀态,低29位維護線程池中線程數量
1、RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀态的線程池會接收新任務,也會處理在阻塞隊列中等待處理的任務
2、SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀态的線程池不會再接收新任務,但還會處理已經送出到阻塞隊列中等待處理的任務
3、STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀态的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,而且還會中斷正在運作的任務
4、TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀态時還将調用terminated()方法
5、TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法調用完成後變成此狀态
這些狀态均由int型表示,大小關系為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循線程池從 運作 到 終止這個過程。
runStateOf(int c) 方法:c & 高3位為1,低29位為0的~CAPACITY,用于擷取高3位儲存的線程池狀态
workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用于擷取低29位的線程數量
ctlOf(int rs, int wc)方法:參數rs表示runState,參數wc表示workerCount,即根據runState和workerCount打包合并成ctl
1、execute() -- 送出任務
execute(Runnable command)
參數:
command 送出執行的任務,不能為空
執行流程:
1、如果線程池目前線程數量少于corePoolSize,則addWorker(command, true)建立新worker線程,如建立成功傳回,如沒建立成功,則執行後續步驟;
addWorker(command, true)失敗的原因可能是:
A、線程池已經shutdown,shutdown的線程池不再接收新任務
B、workerCountOf(c) < corePoolSize 判斷後,由于并發,别的線程先建立了worker線程,導緻workerCount>=corePoolSize
2、如果線程池還在running狀态,将task加入workQueue阻塞隊列中,如果加入成功,進行double-check,如果加入失敗(可能是隊列已滿),則執行後續步驟;
double-check主要目的是判斷剛加入workQueue阻塞隊列的task是否能被執行
A、如果線程池已經不是running狀态了,應該拒絕添加新任務,從workQueue中删除任務
B、如果線程池是運作狀态,或者從workQueue中删除任務失敗(剛好有一個線程執行完畢,并消耗了這個任務),確定還有線程執行任務(隻要有一個就夠了)
3、如果線程池不是running狀态 或者 無法入隊列,嘗試開啟新線程,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕目前command
2、addWorker() -- 添加worker線程
addWorker(Runnable firstTask, boolean core)
firstTask: worker線程的初始任務,可以為空
core: true:将corePoolSize作為上限,false:将maximumPoolSize作為上限
addWorker方法有4種傳參的方式:
1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)
在execute方法中就使用了前3種,結合這個核心方法進行以下分析
第一個:線程數小于corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就傳回false
第二個:當隊列被放滿時,就嘗試将這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就傳回false
第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務隊列裡拿任務,這樣就相當于建立了一個新的線程,隻是沒有馬上配置設定任務
第四個:這個方法就是放一個null的task進Workers Set,而且是在小于corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就傳回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為線程池預先啟動corePoolSize個worker等待從workQueue中擷取任務執行
1、判斷線程池目前是否為可以添加worker線程的狀态,可以則繼續下一步,不可以return false:
A、線程池狀态>shutdown,可能為stop、tidying、terminated,不能添加worker線程
B、線程池狀态==shutdown,firstTask不為空,不能添加worker線程,因為shutdown狀态的線程池不接收新任務
C、線程池狀态==shutdown,firstTask==null,workQueue為空,不能添加worker線程,因為firstTask為空是為了添加一個沒有任務的線程再從workQueue擷取task,而workQueue為空,說明添加無任務線程已經沒有意義
2、線程池目前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步
3、線上程池的ReentrantLock保證下,向Workers Set中添加新建立的worker執行個體,添加完成後解鎖,并啟動worker線程,如果這一切都成功了,return true,如果添加worker入Set失敗或啟動失敗,調用addWorkerFailed()邏輯
3、内部類Worker
Worker類
Worker類本身既實作了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS),是以其既是一個可執行的任務,又可以達到鎖的效果
new Worker()
1、将AQS的state置為-1,在runWoker()前不允許中斷
2、待執行的任務會以參數傳入,并賦予firstTask
3、用Worker這個Runnable建立Thread
之是以Worker自己實作Runnable,并建立Thread,在firstTask外包一層,是因為要通過Worker控制中斷,而firstTask這個工作任務隻是負責執行業務
Worker控制中斷主要有以下幾方面:
1、初始AQS狀态為-1,此時不允許中斷interrupt(),隻有在worker線程啟動了,執行了runWoker(),将state置為0,才能中斷
不允許中斷展現在:
A、shutdown()線程池時,會對每個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定将state從0->1,故初始狀态state==-1時tryLock()失敗,沒發interrupt()
B、shutdownNow()線程池時,不用tryLock()上鎖,但調用worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯
2、為了防止某種情況下,在運作中的worker被中斷,runWorker()每次運作任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操作需要先擷取worker的鎖,這樣就防止了中斷正在運作的線程
Worker實作的AQS為不可重入鎖,為了是在獲得worker鎖的情況下再進入其它一些需要加鎖的方法
Worker和Task的差別:
Worker是線程池中的線程,而Task雖然是runnable,但是并沒有真正執行,隻是被Worker調用了run方法,後面會看到這部分的實作。
4、runWorker() -- 執行任務
runWorker(Worker w)
1、Worker線程啟動後,通過Worker類的run()方法調用runWorker(this)
2、執行任務之前,首先worker.unlock(),将AQS的state置為0,允許中斷目前worker線程
3、開始執行firstTask,調用task.run(),在執行任務前會上鎖wroker.lock(),在執行完任務後會解鎖,為了防止在任務運作時被線程池一些中斷操作中斷
4、在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法
5、無論在beforeExecute()、task.run()、afterExecute()發生異常上抛,都會導緻worker線程終止,進入processWorkerExit()處理worker退出的流程
6、如正常執行完目前task後,會通過getTask()從阻塞隊列中擷取新任務,當隊列中沒有任務,且擷取任務逾時,那麼目前worker也會進入退出流程
5、getTask() -- 擷取任務
getTask()
1、首先判斷是否可以滿足從workQueue中擷取任務的條件,不滿足return null
A、線程池狀态是否滿足:
(a)shutdown狀态 + workQueue為空 或 stop狀态,都不滿足,因為被shutdown後還是要執行workQueue剩餘的任務,但workQueue也為空,就可以退出了
(b)stop狀态,shutdownNow()操作會使線程池進入stop,此時不接受新任務,中斷正在執行的任務,workQueue中的任務也不執行了,故return null傳回
B、線程數量是否超過maximumPoolSize 或 擷取任務是否逾時
(a)線程數量超過maximumPoolSize可能是線程池在運作時被調用了setMaximumPoolSize()被改變了大小,否則已經addWorker()成功不會超過maximumPoolSize
(b)如果 目前線程數量>corePoolSize,才會檢查是否擷取任務逾時,這也展現了當線程數量達到maximumPoolSize後,如果一直沒有新任務,會逐漸終止worker線程直到corePoolSize
2、如果滿足擷取任務條件,根據是否需要定時擷取調用不同方法:
A、workQueue.poll():如果在keepAliveTime時間内,阻塞隊列還是沒有任務,傳回null
B、workQueue.take():如果阻塞隊列為空,目前線程會被挂起等待;當隊列中有任務加入時,線程被喚醒,take方法傳回任務
3、在阻塞從workQueue中擷取任務時,可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執行第1步中的判斷,滿足就繼續擷取任務,不滿足return null,會進入worker退出的流程
6、processWorkerExit() -- worker線程退出
processWorkerExit(Worker w, boolean completedAbruptly)
worker: 要結束的worker
completedAbruptly: 是否突然完成(是否因為異常退出)
1、worker數量-1
A、如果是突然終止,說明是task執行時異常情況導緻,即run()方法執行時發生了異常,那麼正在工作的worker線程數量需要-1
B、如果不是突然終止,說明是worker線程沒有task可執行了,不用-1,因為已經在getTask()方法中-1了
2、從Workers Set中移除worker,删除時需要上鎖mainlock
3、tryTerminate():在對線程池有負效益的操作時,都需要“嘗試終止”線程池,大概邏輯:
判斷線程池是否滿足終止的狀态
A、如果狀态滿足,但還有線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程
B、沒有線程了,更新狀态為tidying->terminated
4、是否需要增加worker線程,如果線程池還沒有完全終止,仍需要保持一定數量的線程
線程池狀态是running 或 shutdown
A、如果目前線程是突然終止的,addWorker()
B、如果目前線程不是突然終止的,但目前線程數量 < 要維護的線程數量,addWorker()
故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然後再逐漸銷毀這corePoolSize個線程
參考資料:
深入分析java線程池的實作原理 - 占小狼
JUC源碼分析-線程池-ThreadPoolExecutor
原文連結:
作者:Trust_FreeDom - 部落格園
部落格首頁:http://www.cnblogs.com/trust-freedom/
歡迎轉載,但請保留作者和本文連結,謝謝!
歡迎在下面的評論區與我交流。
如果覺得寫的不錯,請點選下面的“推薦”按鈕,讓我更有動力寫出更好的文章。