大家好,我是老三,很高興又和大家見面,最近降溫,大家注意保暖。
這節分享Java線程池,接下來我們一步步把線程池扒個底朝天。
引言:老三取錢
有一個程式員,他的名字叫老三。
老三兜裡沒有錢,匆匆銀行業務辦。
這天起了一大早,銀行姐姐說早安。
老三一看櫃台空,卡裡五毛都取完。
老三這天起的晚,營業視窗都排滿。
隻好進入排隊區,摸出手機等空閑。

老三睡到上三杆,視窗排隊都爆滿。
經理一看開新口,排隊同志趕緊辦。
這天業務太火爆,櫃台排隊都用完。
老三一看急上火,經理你說怎麼辦。
經理揮手一笑間,這種場面已見慣。四種辦法來處理,你猜我會怎麼辦。
- 小小銀行不堪負,陳舊系統已癱瘓。
- 我們廟小對不起,誰叫你來找誰辦。
- 看你情況特别急,來去隊裡加個塞。
- 今天實在沒辦法,不行你看改一天。
對,沒錯,其實這個流程就和JDK線程池
ThreadPoolExecutor
的工作流程類似,先賣個關子,後面結合線程池工作流程,保證你會豁然開朗。
實戰:線程池管理資料處理線程
光說不練假把式,show you code,我們來一個結合業務場景的線程池實戰。——很多同學面試的時候,線程池原理背的滾瓜爛熟,一問項目中怎麼用的,歇菜。看完這個例子,趕緊琢磨琢磨,項目裡有什麼地方能套用的。
應用場景
應用場景非常簡單,我們的項目是一個稽核類的系統,每年到了核算的時候,需要向第三方的核算系統提供資料,以供核算。
這裡存在一個問題,由于曆史原因,核算系統提供的接口隻支援單條推送,但是實際的資料量是三十萬條,如果一條條推送,那麼起碼得一個星期。
是以就考慮使用多線程的方式來推送資料,那麼,線程通過什麼管理呢?線程池。
為什麼要用線程池管理線程呢?當然是為了線程複用。
思路也很簡單,開啟若幹個線程,每個線程從資料庫中讀取取(start,count]區間未推送的資料進行推送。
具體代碼實作
我把這個場景提取了出來,主要代碼:
代碼比較長,是以用了carbon美化,代碼看不清,沒關系,可運作的代碼我都上傳到了遠端倉庫,倉庫位址: https://gitee.com/fighter3/thread-demo.git ,這個例子比較簡單,沒有用過線程池的同學可以考慮你有沒有什麼資料處理、清洗的場景可以套用,不妨借鑒、演繹一下。
本文主題是線程池,是以我們重點關注線程池的代碼:
線程池構造
//核心線程數:設定為作業系統CPU數乘以2
private static final Integer CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
//最大線程數:設定為和核心線程數相同
private static final Integer MAXIMUM_POOl_SIZE = CORE_POOL_SIZE;
//建立線程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOl_SIZE * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
線程池直接采用ThreadPoolExecutor構造:
- 核心線程數設定為CPU數×2
- 因為需要分段資料,是以最大線程數設定為和核心線程數一樣
- 阻塞隊列使用
LinkedBlockingQueue
- 拒絕政策使用預設
線程池送出任務
//送出線程,用資料起始位置辨別線程
Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
- 因為需要傳回值,是以使用
送出任務,如果使用submit()
送出任務,沒有傳回值。execute()
代碼不負責,可以done下來跑一跑。
那麼,線程池具體是怎麼工作的呢?我們接着往下看。
原理:線程池實作原理
線程池工作流程
構造方法
我們在構造線程池的時候,使用了
ThreadPoolExecutor
的構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
先來看看幾個參數的含義:
-
: 核心線程數corePoolSize
-
:允許的最大線程數(核心線程數+非核心線程數)maximumPoolSize
-
:線程池任務隊列用來儲存等待執行的任務的阻塞隊列,常見阻塞隊列有:workQueue
-
-
:一個基于數組結構的有界阻塞隊列ArrayBlockingQueue
-
:基于連結清單結構的阻塞隊列LinkedBlockingQueue
-
:不存儲元素的阻塞隊列SynchronousQueue
-
:具有優先級的無限阻塞隊列PriorityBlockingQueue
-
-
: 線程池飽和拒絕政策JDK線程池架構提供了四種政策:handler
-
-
:直接抛出異常,預設政策。AbortPolicy
-
:用調用者所線上程來運作任務。CallerRunsPolicy
-
:丢棄任務隊列裡最老的任務DiscardOldestPolicy
-
:不處理,丢棄目前任務DiscardPolicy
-
- 也可以根據自己的應用場景,實作
接口來自定義政策。RejectedExecutionHandler
上面四個是和線程池工作流程息息相關的參數,我們再來看看剩下三個參數。
-
:非核心線程閑置下來最多存活的時間keepAliveTime
-
:線程池中非核心線程保持存活的時間unit
-
:建立一個新線程時使用的工廠,可以用來設定線程名等threadFactory
知道了幾個參數,那麼這幾個參數是怎麼應用的呢?
以
execute()
方法送出任務為例,我們來看線程池的工作流程:
向線程池送出任務的時候:
- 如果目前運作的線程少于
,則建立新線程來執行任務核心線程數corePoolSize
- 如果運作的線程等于或多于
,則将任務加入核心線程數corePoolSize
任務隊列workQueue
- 如果
已滿,建立新的線程來處理任務任務隊列workQueue
- 如果建立新線程使目前總線程數超過
,任務将被拒絕,最大線程數maximumPoolSize
執行線程池拒絕政策handler
結合一下我們開頭的生活事例,是不是就對上了:
線程池工作源碼分析
上面的流程分析,讓我們直覺地了解了線程池的工作原理,我們再來通過源碼看看細節。
送出線程(execute)
線程池執行任務的方法如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//擷取目前線程池的狀态+線程個數變量的組合值
int c = ctl.get();
//1.如果正在運作線程數少于核心線程數
if (workerCountOf(c) < corePoolSize) {
//開啟新線程運作
if (addWorker(command, true))
return;
c = ctl.get();
}
//2. 判斷線程池是否處于運作狀态,是則添加任務到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
//二次檢查
int recheck = ctl.get();
//如果目前線程池不是運作狀态,則從隊列中移除任務,并執行拒絕政策
if (! isRunning(recheck) && remove(command))
reject(command);
//如若目前線程池為空,則添加一個新線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//最後嘗試添加線程,如若添加失敗,執行拒絕政策
else if (!addWorker(command, false))
reject(command);
}
我們來看一下
execute()
的詳細流程圖:
新增線程 (addWorker)
在
execute
方法代碼裡,有個關鍵的方法
private boolean addWorker(Runnable firstTask, boolean core)
,這個方法主要完成兩部分工作:
增加線程數
、
添加任務,并執行
。
- 我們先來看第一部分增加線程數:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1.檢查隊列是否隻在必要時為空(判斷線程狀态,且隊列不為空)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//2.循環CAS增加線程個數
for (;;) {
int wc = workerCountOf(c);
//2.1 如果線程個數超限則傳回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//2.2 CAS方式增加線程個數,同時隻有一個線程成功,成功跳出循環
if (compareAndIncrementWorkerCount(c))
break retry;
//2.3 CAS失敗,看線程池狀态是否變化,變化則跳到外層,嘗試重新擷取線程池狀态,否則内層重新CAS
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//3. 到這說明CAS成功了
boolean workerStarted = false;
boolean workerAdded = false;
- 接着來看第二部分添加任務,并執行
Worker w = null;
try {
//4.建立worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//4.1、加獨占鎖 ,為了實作workers同步,因為可能多個線程調用了線程池的excute方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//4.2、重新檢查線程池狀态,以避免在擷取鎖前調用了shutdown接口
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//4.3添加任務
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//4.4、添加成功之後啟動任務
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
我們來看一下整體的流程:
執行線程(runWorker)
使用者線程送出到線程池之後,由
Worker
執行,
Worker
是線程池内部一個繼承
AQS
、實作
Runnable
接口的自定義類,它是具體承載任務的對象。
先看一下它的構造方法:
Worker(Runnable firstTask) {
setState(-1); // 在調用runWorker之前禁止中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //建立一個線程
}
- 在構造函數内 首先設定 state=-1,現了簡單不可重入獨占鎖,state=0表示鎖未被擷取狀态,state=1表示鎖已被擷取狀态,設定狀态大小為-1,是為了避免線程在運作runWorker()方法之前被中斷
- firstTask記錄該工作線程的第一個任務
- thread是具體執行任務的線程
它的
run
方法直接調用
runWorker
,真正地執行線程就是在我們的
runWorker
方法裡:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允許中斷
boolean completedAbruptly = true;
try {
//擷取目前任務,從隊列中擷取任務
while (task != null || (task = getTask()) != null) {
w.lock();
…………
try {
//執行任務前做一些類似統計之類的事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執行任務完畢後幹一些些事情
afterExecute(task, thrown);
}
} finally {
task = null;
// 統計目前Worker 完成了多少個任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執行清理工作
processWorkerExit(w, completedAbruptly);
}
}
代碼看着多,其實砍掉枝蔓,最核心的點就是
task.run()
讓線程跑起來。
擷取任務(getTask)
我們在上面的執行任務
runWorker
裡看到,這麼一句
while (task != null || (task = getTask()) != null)
,執行的任務是要麼目前傳入的
firstTask
,或者還可以通過
getTask()
擷取,這個
getTask
的核心目的就是從隊列中擷取任務。
private Runnable getTask() {
//poll()方法是否逾時
boolean timedOut = false;
//循環擷取
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1.線程池未終止,且隊列為空,傳回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//工作線程數
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//2.判斷工作線程數是否超過最大線程數 && 逾時判斷 && 工作線程數大于0或隊列為空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//從任務隊列中擷取線程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//擷取成功
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
總結一下,Worker執行任務的模型如下[8]:
小結
到這,了解了
execute
和
worker
的一些流程,可以說其實
ThreadPoolExecutor
的實作就是一個生産消費模型。
當使用者添加任務到線程池時相當于生産者生産元素,
workers
線程工作集中的線程直接執行任務或者從任務隊列裡面擷取任務時則相當于消費者消費元素。
線程池生命周期
線程池狀态表示
ThreadPoolExecutor
裡定義了一些狀态,同時利用高低位的方式,讓
ctl
這個參數能夠儲存狀态,又能儲存線程數量,非常巧妙![6]
//記錄線程池狀态和線程數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 線程池狀态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
高3位表示狀态,低29位記錄線程數量:
線程池狀态流轉
線程池一共定義了五種狀态,來看看這些狀态是怎麼流轉的[6]:
- RUNNING:運作狀态,接受新的任務并且處理隊列中的任務。
- SHUTDOWN:關閉狀态(調用了 shutdown 方法)。不接受新任務,,但是要處理隊列中的任務。
- STOP:停止狀态(調用了 shutdownNow 方法)。不接受新任務,也不處理隊列中的任務,并且要中斷正在處理的任務。
- TIDYING:所有的任務都已終止了,workerCount 為 0,線程池進入該狀态後會調terminated() 方法進入 TERMINATED 狀态。
- TERMINATED:終止狀态,terminated() 方法調用結束後的狀态。
應用:打造健壯的線程池
合理地配置線程池
關于線程池的構造,我們需要注意兩個配置,線程池的大小和任務隊列。
線程池大小
關于線程池的大小,并沒有一個需要嚴格遵守的“金規鐵律”,按照任務性質,大概可以分為
CPU密集型任務
IO密集型任務
混合型任務
- CPU密集型任務:CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。
- IO密集型任務:IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。
- 混合型任務:混合型任務可以按需拆分成CPU密集型任務和IO密集型任務。
當然,這個隻是建議,實際上具體怎麼配置,還要結合
事前評估和測試
事中監控
來确定一個大緻的線程線程池大小。線程池大小也可以不用寫死,使用動态配置的方式,以便調整。
任務隊列
任務隊列一般建議使用有界隊列,無界隊列可能會出現隊列裡任務無限堆積,導緻記憶體溢出的異常。
線程池監控
[1]如果在系統中大量使用線程池,則有必要對線程池進行監控,友善在出現問題時,可以根據線程池的使用狀況快速定位問題。
可以通過線程池提供的參數和方法來監控線程池:
- getActiveCount() :線程池中正在執行任務的線程數量
- getCompletedTaskCount() :線程池已完成的任務數量,該值小于等于 taskCount
- getCorePoolSize() :線程池的核心線程數量
- getLargestPoolSize():線程池曾經建立過的最大線程數量。通過這個資料可以知道線程池是否滿過,也就是達到了 maximumPoolSize
- getMaximumPoolSize():線程池的最大線程數量
- getPoolSize() :線程池目前的線程數量
- getTaskCount() :線程池已經執行的和未執行的任務總數
還可以通過擴充線程池來進行監控:
- 通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,
- 也可以在任務執行前、執行後和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。
End
這篇文章從一個生活場景入手,一步步從實戰到原理來深入了解線程池。
但是你發現沒有,我們平時常說的所謂
四種線程池
在文章裡沒有提及——當然是因為篇幅原因,下篇就安排線程池建立工具類
Executors
線程池也是面試的重點戰區,面試又會問到哪些問題呢?
這些内容,都已經在路上。
點贊
關注
不迷路,下篇見!