this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
**下面這些對建立 非常重要,在後面使用線程池的過程中你一定會用到!是以,務必拿着小本本記清楚。**
`ThreadPoolExecutor`**3個最重要的參數**:
- `corePoolSize`: 核心線程數線程數定義了最小可以同時運作的線程數量。
- `maximumPoolSize`: 當隊列中存放的任務達到隊列容量的時候,目前可以同時運作的線程數量變為最大線程數。
- `workQueue`: 當新任務來的時候會先判斷目前運作的線程數量是否達到核心線程數,如果達到的話,信任就會被存放在隊列中。
`ThreadPoolExecutor`其他常見參數:
1. `keepAliveTime`:當線程池中的線程數量大于`corePoolSize`的時候,如果這時沒有新的任務送出,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了`keepAliveTime`才會
被回收銷毀;
2. `unit`:`keepAliveTime`參數的時間機關。
3. `threadFactory`:`executor`建立新線程的時候會用到。
4. `handler`:飽和政策。關于飽和政策下面單獨介紹一下。
下面這張圖可以加深你對線程池中各個參數的互相關系的了解
![](https://upload-images.jianshu.io/upload_images/25222111-d5cf73f9805476d9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
`ThreadPoolExecutor`**飽和政策定義**:
如果目前同時運作的線程數量達到最大線程數量并且隊列也已經被放滿了任時,
`ThreadPoolTaskExecutor`定義一些政策:
- `ThreadPoolExecutor.AbortPolicy`:抛出`RejectedExecutionException`來拒絕新任務的處理。
- `ThreadPoolExecutor.CallerRunsPolicy`:調用執行自己的線程運作任務,也就是直接在調用`execute`方法的線程中運作( `run` )被拒絕的任務,如果執行程式已關閉,則會丢棄該任務。是以這種政策會降低對于新任務送出速度,影響程式的整體性能。另外,這個政策喜歡增加隊列容量。如果您的應用程式可以承受此延遲并且你不能任務丢棄任何一個任務請求的話,你可以選擇這個政策。
- `ThreadPoolExecutor.DiscardPolicy`: 不處理新任務,直接丢棄掉。
- `ThreadPoolExecutor.DiscardOldestPolicy`: 此政策将丢棄最早的未處理的任務請求。
舉個例子:
>Spring 通過`ThreadPoolTaskExecutor`或者我們直接通過`ThreadPoolExecutor`的構造函數建立線程池的時候,當我們不指定`RejectedExecutionHandler`飽和政策的話來配置線程池的時候預設使用的是`ThreadPoolExecutor.AbortPolicy`。在預設情況下,
`ThreadPoolExecutor`将抛出`RejectedExecutionException`來拒絕新來的任務 ,這代表你将丢失對這個任務的處理。 對于可伸縮的應用程式,建議使用
`ThreadPoolExecutor.CallerRunsPolicy`。當最大池被填滿時,此政策為我們提供可伸縮隊列。(這個直接檢視`ThreadPoolExecutor`的構造函數源碼就可以看出,比較簡單的原因,這裡就不貼代碼了。)
## 2.推薦使用 ThreadPoolExecutor 構造函數建立線程池
**為什麼呢?**
>使用線程池的好處是減少在建立和銷毀線程上所消耗的時間以及系統資源開銷,解決資源不足的問題。如果不使用線程池,有可能會造成系統建立大量同類線程而導緻消耗完記憶體或者“過度切換”的問題。
**強制線程池不允許使用 Executors 去建立,而是通過ThreadPoolExecutor構造函數的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險**
>Executors 傳回線程池對象的弊端如下:
> - `FixedThreadPool`和`SingleThreadExecutor`: 允許請求的隊列長度為`Integer.MAX_VALUE`,可能堆積大量的請求,進而導緻OOM。
> - **CachedThreadPool**和**ScheduledThreadPool**: 允許建立的線程數量為Integer.MAX_VALUE ,可能會建立大量線程,進而導緻 OOM。
**方式一:通過`ThreadPoolExecutor`構造函數實作(推薦)**
![](https://upload-images.jianshu.io/upload_images/25222111-48b1ae3c4d8b8a7b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**方式二:通過 Executor 架構的工具類 Executors 來實作**
我們可以建立三種類型的 ThreadPoolExecutor:
- **FixedThreadPool**
- **SingleThreadExecutor**
- **CachedThreadPool**
對應 Executors 工具類中的方法如圖所示:
![](https://upload-images.jianshu.io/upload_images/25222111-3586e2861b98e52f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
# 四、 (重要)ThreadPoolExecutor 使用示例
我們上面講解了`Executor`架構以及`ThreadPoolExecutor`類,下面讓我們實戰一下,來通過寫一個`ThreadPoolExecutor`的小Demo來回顧上面的内容。
## 1.示例代碼:Runnable+ ThreadPoolExecutor
首先建立一個`Runnable`接口的實作類(當然也可以是`Callable`接口,我們上面也說了兩者的差別。)
`MyRunnable.java`
```java
import java.util.Date;
/**
* 這是一個簡單的Runnable類,需要大約5秒鐘來執行其任務。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {
private String command;
public MyRunnable(String s) {
this.command = s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = "
+ new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = "
+ new Date());
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return this.command;
}
}
編寫測試程式,我們這裡以阿裡巴巴推薦的使用
ThreadPoolExecutor
構造函數自定義參數的方式來建立線程池。
ThreadPoolExecutorDemo.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿裡巴巴推薦的建立線程池的方式
//通過ThreadPoolExecutor構造函數自定義參數建立
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
//建立WorkerThread對象(WorkerThread類實作了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//執行Runnable executor.execute(worker);
}
//終止線程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
可以看到我們上面的代碼指定了:
1.
corePoolSize
: 核心線程數為 5。
2.
maximumPoolSize
:最大線程數 10
3.
keepAliveTime
: 等待時間為 1L。
4.
unit
: 等待時間的機關為
TimeUnit.SECONDS
。
5.
workQueue
:任務隊列為
ArrayBlockingQueue
,并且容量為 100;
6.
handler
:飽和政策為
CallerRunsPolicy
。
Output:
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
2.線程池原理分析
承接上一節,我們通過代碼輸出結果可以看出:線程首先會先執行 5 個任務,然後這些任務有任務被執行完的話,就會去拿新的任務執行。 大家可以先通過上面講解的内容,分析一下到底是咋回事?(自己獨立思考一會)
現在,我們就分析上面的輸出内容來簡單分析一下線程池原理。
**為了搞懂線程池的原理,我們需要首先分析一下
execute
方法。**在上一節中的Demo中我們使用
executor.execute(worker)
來送出一個任務到線程池中去,這個方法非常重要,下面我們來看看它的源碼:
// 存放線程池的運作狀态 (runState) 和線程池内有效線程的數量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//任務隊列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任務為null,則抛出異常。
if (command == null)
throw new NullPointerException();
// ctl 中儲存的線程池目前的一些狀态資訊
int c = ctl.get();
// 下面會涉及到 3 步 操作
// 1.首先判斷目前線程池中之行的任務數量是否小于 corePoolSize
// 如果小于的話,通過addWorker(command, true)建立一個線程,并将任務(command)添 加到該線程中;然後,啟動該線程進而執行任務。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果目前之行的任務數量大于等于 corePoolSize 的時候就會走到這裡
// 通過 isRunning 方法判斷線程池狀态,線程池處于RUNNING狀态才會被并且隊列可以加入任務,該任務才會被加入進去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次擷取線程池狀态,如果線程池狀态不是RUNNING狀态就需要從任務隊列中移除任務,并嘗試判斷線程是否全部執行完畢。同時執行拒絕政策。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果目前線程池為空就新建立一個線程并執行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通過addWorker(command, false)建立一個線程,并将任務(command)添加到該線程中;然後,啟動該線程進而執行任務。
//如果addWorker(command, false)執行失敗,則通過reject()執行相應的拒絕政策的内容。
else if (!addWorker(command, false))
reject(command);
}
通過下圖可以更好的對上面這 3 步做一個展示
addWorker
這個方法主要用來建立新的工作線程,如果傳回true說明建立和啟動工作線程成功,否則的話傳回的就是false。
// 全局鎖,并發操作必備
private final ReentrantLock mainLock = new ReentrantLock();
// 跟蹤線程池的最大大小,隻有在持有全局鎖mainLock的前提下才能通路此集合
private int largestPoolSize;
// 工作線程集合,存放線程池中所有的(活躍的)工作線程,隻有在持有全局鎖mainLock的前提下才 能通路此集合
private final HashSet<Worker> workers = new HashSet<>();
//擷取線程池狀态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//判斷線程池的狀态是否為
Running private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 添加新的工作線程到線程池
* @param firstTask 要執行
* @param core參數為true的話表示使用線程池的基本大小,為false使用線程池最大大小
* @return 添加成功就傳回true否則傳回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//這兩句用來擷取線程池的狀态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//擷取線程池中線程的數量
int wc = workerCountOf(c);
// core參數為true的話表明隊列也滿了,線程池大小變為 maximumPoolSize
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的數量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果線程的狀态改變了就再次執行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标記工作線程是否啟動成功
boolean workerStarted = false;
// 标記工作線程是否建立成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//擷取線程池狀态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果線程池狀态依然為RUNNING,并且線程的狀态是存活的話,就會将工作線程添加到工作線程集合中
//(rs=SHUTDOWN && firstTask == null)如果線程池狀态小于STOP,也就是 RUNNING或者SHUTDOWN狀态下,同時傳入的任務執行個體firstTask為null,則需要添加到工作線程集合和啟 動新的Worker
// firstTask == null證明隻建立線程而不執行任務
if (rs < SHUTDOWN || (
rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新目前工作線程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作線程是否啟動成功
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
如果成功添加工作線程,則調用Worker内部的線程執行個體t的Thread#start()方 法啟動真實的線程執行個體
if (workerAdded) {
t.start();
/// 标記線程啟動成功
workerStarted = true;
}
}
} finally {
// 線程啟動失敗,需要從工作線程中移除對應的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
現在,讓我們在回到上一節我們寫的Demo, 現在應該是不是很容易就可以搞懂它的原理了呢?
沒搞懂的話,也沒關系,可以看看我的分析:
我們在代碼中模拟了 10 個任務,我們配置的核心線程數為 5 、等待隊列容量為 100 ,是以每次隻可能存在 5 個任務同時執行,剩下的 5 個任務會被放到等待隊列中去。目前的5個任務中如果有任務被執行完了,線程池就會去拿新的任務執行。
3.幾個常見的對比
1)Runnable vs Callable
Runnable
自Java 1.0以來一直存在,但
Callable
僅在 Java1.5中引入,目的就是為了來處理
Runnable
不支援的用例。
Runnable
接口不會傳回結果或抛出檢查異常,但是
Callable
接口可以。是以,如果任務不需要傳回結果或抛出異常推薦使用
Runnable
接口,這樣代碼看起來會更加簡潔。
工具類
Executors
可以實作
Runnable
對象和
Callable
對象之間的互相轉換。(
Executors.callable
(
Runnable task
)或
Executors.callable
(
Runnable task
,
Object resule
) )。
Runnable.java
@FunctionalInterface
public interface Runnable {
/**
* 被線程執行,沒有傳回值也無法抛出異常
*/
public abstract void run();
}
Callable.java
@FunctionalInterface
public interface Callable<V> {
/**
* 計算結果,或在無法這樣做時抛出異常。
* @return 計算得出的結果
* @throws 如果無法計算結果,則抛出異常
*/
V call() throws Exception;
}
2) execute() vs submit()
1.
execute()
方法用于送出不需要傳回值的任務,是以無法判斷任務是否被線程池執行成功與否;
2.
submit()
方法用于送出需要傳回值的任務。線程池會傳回一個
Future
類型的對象,通過這個
Future
對象可以判斷任務是否執行成功,并且可以通過
Future
的
get()
方法來擷取傳回值,
get()
方法會阻塞目前線程直到任務完成,而使用
get(long timeout,TimeUnit unit)
方法則會阻塞目前線程一段時間後立即傳回,這時候有可能任務沒有執行完。
我們以
AbstractExecutorService
接口中的一個 submit 方法為例子來看看源代碼:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask); return ftask;
}
上面方法調用的
newTaskFor
方法傳回了一個
FutureTask
對象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
我們再來看看
execute()
方法:
public void execute(Runnable command) {
...
}
3)shutdown() VS shutdownNow()
-
:關閉線程池,線程池的狀态變為shutdown()
。線程池不再接受新任務了,但是隊列裡的任務得執行完畢。SHUTDOWN
-
:關閉線程池,線程的狀态變為shutdownNow()
。線程池會終止目前正在運作的任務,并停止處理排隊的任務并傳回正在等待執行的List。STOP
4)isTerminated() VS isShutdown()
-
當調用isShutDown
方法後傳回為 true。shutdown()
-
當調用isTerminated
方法後,并且所有送出的任務完成後傳回為 trueshutdown()
4.加餐: Callable + ThreadPoolExecutor 示例代碼
MyCallable.java
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
//傳回執行目前 Callable 的線程名字
return Thread.currentThread().getName();
}
}
CallableDemo.java
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CallableDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿裡巴巴推薦的建立線程池的方式
//通過ThreadPoolExecutor構造函數自定義參數建立
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<String>> futureList = new ArrayList<>();
Callable<String> callable = new MyCallable();
for (int i = 0; i < 10; i++) {
//送出任務到線程池
Future<String> future = executor.submit(callable);
//将傳回值 future 添加到 list,我們可以通過 future 獲得 執行 Callable 得到 的傳回值
futureList.add(future);
}
for (Future<String> fut : futureList) {
try {
System.out.println(new Date() + "::" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//關閉線程池
executor.shutdown();
}
}
Output:
Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
五、幾種常見的線程池詳解
1.FixedThreadPool
1)介紹
FixedThreadPool
被稱為可重用固定線程數的線程池。通過Executors類中的相關源代碼來看一下相關實作:
/**
* 建立一個可重用固定數量線程的線程池
*/
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
另外還有一個
FixedThreadPool
的實作方法,和上面的類似,是以這裡不多做闡述:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
從上面源代碼可以看出新建立的
FixedThreadPool
的
corePoolSize
和
maximumPoolSize
都被設定為
nThreads
,這個
nThreads
參數是我們使用的時候自己傳遞的。
2)執行任務過程介紹
FixedThreadPool
的
execute()
方法運作示意圖
上圖說明:
- 如果目前運作的線程數小于 corePoolSize, 如果再來新任務的話,就建立新的線程來執行任務;
- 目前運作的線程數等于 corePoolSize 後, 如果再來新任務的話,會将任務加入
;LinkedBlockingQueue
- 線程池中的線程執行完 手頭的任務後,會在循環中反複從
中擷取任務來執行;LinkedBlockingQueue
3)為什麼不推薦使用 FixedThreadPool ?
- 當線程池中的線程數達到
後,新任務将在無界隊列中等待,是以線程池中的線程數不會超過corePoolSize
;corePoolSize
- 由于使用無界隊列時
将是一個無效參數,因為不可能存在任務隊列滿的情況。是以,通過建立maximumPoolSize
的源碼可以看出建立的FixedThreadPool
的FixedThreadPool
和corePoolSize
被設定為同一個值。maximumPoolSize