系列文章目錄
線程安全(一)java對象頭分析以及鎖狀态
線程安全(二)java中的CAS機制
線程安全(三)實作方法sychronized與ReentrantLock(阻塞同步)
線程安全(四)Java記憶體模型與volatile關鍵字
線程安全(五)線程狀态和線程建立
線程安全(六)線程池
線程安全(七)ThreadLocal和java的四種引用
線程安全(八)Semaphore
線程安全(九)CyclicBarrier
線程安全(十)AQS(AbstractQueuedSynchronizer)
0.前言
為什麼要使用線程池?
直接使用線程會有線程重複的建立和銷毀
優點:
- 降低記憶體消耗,避免線程重複的建立和銷毀
- 可以建立核心線程數,可直接使用,不需要再建立
- 顯性的管理監控線程,防止建立過多
先說結果
再看類圖
1.Executor 與ExecutorService接口
1.1.Executor接口
// 送出任務的辦法
void execute(Runnable command);
1.2.ExecutorService接口
// 關閉線程池,已送出的任務繼續執行,不接受繼續送出新任務
void shutdown();
// 關閉線程池,嘗試停止正在執行的所有任務,不接受繼續送出新任務
// 它和前面的方法相比,加了一個單詞“now”,差別在于它會去停止目前正在進行的任務
List<Runnable> shutdownNow();
// 線程池是否已關閉
boolean isShutdown();
// 如果調用了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麼傳回true
// 這個方法必須在調用shutdown或shutdownNow方法之後調用才會傳回true
boolean isTerminated();
// 等待所有任務完成,并設定逾時時間
// 我們這麼了解,實際應用中是,先調用 shutdown 或 shutdownNow,
// 然後再調這個方法等待所有的線程真正地完成,傳回值意味着有沒有逾時
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 送出一個 Callable 任務
<T> Future<T> submit(Callable<T> task);
// 送出一個 Runnable 任務,第二個參數将會放到 Future 中,作為傳回值,
// 因為 Runnable 的 run 方法本身并不傳回任何東西
<T> Future<T> submit(Runnable task, T result);
// 送出一個 Runnable 任務
Future<?> submit(Runnable task);
// 執行所有任務,傳回 Future 類型的一個 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 也是執行所有任務,但是這裡設定了逾時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 隻有其中的一個任務結束了,就可以傳回,傳回執行完的那個任務的結果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 同上一個方法,隻有其中的一個任務結束了,就可以傳回,傳回執行完的那個任務的結果,
// 不過這個帶逾時,超過指定的時間,抛出 TimeoutException 異常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2.AbstractExecutorService類
// 建構RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 送出任務
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任務包裝成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 送出任務
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任務包裝成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 送出任務
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任務包裝成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
不難看出,上述隻是實作送出任務,沒有真正開啟線程,真正運作的還是excute方法。
3.ThreadPoolExecutor
3.1.屬性
32位的整數,前3位用于存放線程池狀态,低29位辨別線程數。
// 原子性操作變量,CAS的展現
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的最大線程數
111 11111111111111111111111111(29個1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState存儲在高階位
// 運作:接受新任務并處理隊列任務
111 00000000000000000000000000000(111後29個0)
private static final int RUNNING = -1 << COUNT_BITS;
// 關機:不接受新任務,但處理隊列任務
000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 停止:不接受新任務,不處理排隊任務,并中斷正在進行的任務
001 00000000000000000000000000000(1後29個0)
private static final int STOP = 1 << COUNT_BITS;
// 整理:所有任務已經終止
010 00000000000000000000000000000(10後29個0)
private static final int TIDYING = 2 << COUNT_BITS;
// terminated() 方法結束後,已完成
011 00000000000000000000000000000(11後29個0)
private static final int TERMINATED = 3 << COUNT_BITS;
3.2.線程狀态方法
// CAPACITY值為:11111111111111111111111111111
// ~CAPACITY值為:11100000000000000000000000000000
// &操作,位運算結果将低29位置0了,而高3位還是保持c的值,得到是runState的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 這個方法用于取出workerCount的值
// &操作,位運算結果将高3位置0了,保留參數c的低29位,得到workerCount的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 之前說過 32為2進制數,前三位存的是線程狀态,後29位存的是線程數,這個運算使将他倆合在一起 取rs(runState)的前三位和wc(workerCount)的後29位
private static int ctlOf(int rs, int wc) { return rs | wc; }
// c是否小于s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// c是否大于等于s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// c是否小于SHUTDOEN 即是否在run 隻有RUNNING狀态會小于0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 線程号+1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 線程号-1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 一個線程突然終止 減少線程号
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
3.2.相關屬性
// 隊列 用于儲存任務并傳遞給工作線程的隊列
private final BlockingQueue<Runnable> workQueue;
// 鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 包含所有工作線程,隻有持有mainLock的才能進入
private final HashSet<Worker> workers = new HashSet<Worker>();
// 等待條件支援awaittermination
private final Condition termination = mainLock.newCondition();
//最大的池大小
private int largestPoolSize;
//完成任務的計數器。僅在工作線程終止時更新。隻有在mainlock通路
private long completedTaskCount;
// 線程工廠
private volatile ThreadFactory threadFactory;
// 飽和和關閉時使用
private volatile RejectedExecutionHandler handler;
// 線程從隊列中擷取任務的逾時時間,也就是說如果線程空閑超過這個時間就會終止。
private volatile long keepAliveTime;
// 如果為false(預設),即使空閑時,核心線程仍然保持生存狀态。
// 如果為true,核心線程使用keepalivetime逾時等待工作。
private volatile boolean allowCoreThreadTimeOut;
// 核心線程數量
private volatile int corePoolSize;
// 最大線程數。注意,實際最大值是由容量内部限定的。
private volatile int maximumPoolSize;
// 預設拒絕 程式
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
// 終結器
private final AccessControlContext acc;
3.3.構造方法
// 核心線程數,最大線程數,保持時間,機關,隊列
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 核心線程數,最大線程數,保持時間,機關,隊列,線程工廠
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 核心線程數,最大線程數,保持時間,機關,隊列, handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 核心線程數,最大線程數,保持時間,機關,隊列,線程工廠,handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.4. void execute(Runnable command)
//
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1.如果少于核心線程數的線程在運作,啟動一個新線程,原子性檢查運作狀态和線程号,
* 通過傳回false來防止錯誤的建立線程
* 2. 如果一個任務可以成功地排隊,那麼我們仍然需要仔細檢查是否應該添加一個線程。
* (自從上次檢查結束後,有的線程有的已經死亡)或者自從進入這個方法後池就關閉了。
*是以我們重新檢查狀态,如果停止,需要復原入隊,如果沒有, 或啟動一個新線程。
* 3.如果我們不能對任務進行隊列,那麼我們嘗試添加一個新線程。
* 如果失敗,我們知道我們被關閉或飽和,是以任務倍拒絕。
// 擷取線程狀态和線程數
int c = ctl.get();
// 活動線程數 小與 核心線程數,直接啟動新的線程,在增加時會原子性檢測
// 1,小于核心線程建立線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.線程池處于運作狀态,并且隊列未滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 不是runState 則從workQueue中移除任務并拒絕
if (! isRunning(recheck) && remove(command))
// 采用線程池指定的政策拒絕任務
reject(command);
// 線程池處于RUNNING狀态 || 線程池處于非RUNNING狀态但是任務移除失敗
else if (workerCountOf(recheck) == 0)
// 這行代碼是為了SHUTDOWN狀态下沒有活動線程了,但是隊列裡還有任務沒執行這種特殊情況。
// 添加一個null任務是因為SHUTDOWN狀态下,線程池不再接受新任務
addWorker(null, false);
// 兩種情況:
// 1.非RUNNING狀态拒絕新的任務
// 2.隊列滿了啟動新的線程失敗(workCount > maximumPoolSize)
}
// 如果 workQueue 隊列滿了,/ 以 maximumPoolSize 為界建立新的 worker,
// 如果失敗,說明目前線程數已經達到 maximumPoolSize,執行拒絕政策
else if (!addWorker(command, false))
reject(command);
}
3.5. void shutdown()
// 會将runState置為SHUTDOWN,會終止所有空閑的線程。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 線程池狀态設為SHUTDOWN,如果已經至少是這個狀态那麼則直接傳回
advanceRunState(SHUTDOWN);
// 注意這裡是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit →
// tryTerminate方法中會保證隊列中剩餘的任務得到執行。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock能擷取到鎖,說明該線程沒有在運作,因為runWorker中執行任務會先lock,
// 是以保證了中斷的肯定是空閑的線程。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.6. List shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP狀态:不再接受新任務且不再執行隊列中的任務。
advanceRunState(STOP);
// 中斷所有線程
interruptWorkers();
// 傳回隊列中還沒有被執行的任務。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
void interruptIfStarted() {
Thread t;
// 初始化時state == -1
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
3.7. void runWorker(Worker w)
// 第一次啟動會執行初始化傳進來的任務firstTask;
// 然後會從workQueue中取任務執行,如果隊列為空則等待keepAliveTime這麼長時間。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// Worker的構造函數中抑制了線程中斷setState(-1),是以這裡需要unlock進而允許中斷
w.unlock(); // allow interrupts
// 為true的情況:1.執行任務抛出異常;2.被中斷。
boolean completedAbruptly = true;
try {
// 如果getTask傳回null那麼getTask中會将workerCount遞減,如果異常了這個遞減操作會在processWorkerExit中處理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任務執行前可以插入一些處理,子類重載該方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行使用者任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 和beforeExecute一樣,留給子類去重載
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 結束線程的一些清理工作
processWorkerExit(w, completedAbruptly);
}
}
3.8. boolean addWorker(Runnable firstTask, boolean core)
// 第一個為目前任務,第二個為是否為核心線程,是用corePoolSize ,false用maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
// workQueue.isEmpty())
// 滿足下列調價則直接傳回false,線程建立失敗:
// rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時不再接受新的任務,且所有任務執行結束
// rs = SHUTDOWN:firtTask != null 此時不再接受任務,但是仍然會執行隊列中的任務
// rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,
// false),任務為null && 隊列為空
// 最後一種情況也就是說SHUTDONW狀态下,如果隊列不為空還得接着往下執行,為什麼?add一個null任務目的到底是什麼?
// 看execute方法隻有workCount==0的時候firstTask才會為null結合這裡的條件就是線程池SHUTDOWN了不再接受新任務
// 但是此時隊列不為空,那麼還得建立線程把任務給執行完才行
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 走到這的情形:
// 1.線程池狀态為RUNNING
// 2.SHUTDOWN狀态,但隊列中還有任務需要執行
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 原子操作遞增workCount
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果線程池的狀态發生變化則重試
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// wokerCount遞增成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 并發的通路線程池workers對象必須加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// RUNNING狀态 || SHUTDONW狀态下清理隊列中剩餘的任務
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 啟動新添加的線程,這個線程首先執行firstTask,然後不停的從隊列中取任務執行
// 當等待keepAlieTime還沒有任務執行則該線程結束。見runWoker和getTask方法的代碼。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 線程啟動失敗,則從wokers中移除w并遞減wokerCount
if (! workerStarted)
// 遞減wokerCount會觸發tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}
3.9 invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)
批量執行線程任務時設定的的逾時時間(常用于Future,需要拿到所有線程傳回結果的值)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
// 所有任務執行execute,同步執行
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
// 判斷每個future是否執行完成
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
// 在限定時間内拿到結果
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
// 根據目前已用時間 更新下個future的限定時間
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
// 隻要非完成的,就會中斷所有線程
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
4.常見線程池
Executors是一個工具類,生成幾種常見的線程池
附:圖檔來源于網絡,侵删
4.1.newFixedThreadPool()
生成一個固定大小的線程池(核心線程數和最大線程數是一樣的,超出核心線程數的新送出的任務都會加入無界隊列)
因為是無界隊列,是以最大線程數其實用不到
建立一個線程池,該線程池重用在共享無界隊列上運作的固定數量的線程。在任何時候,最多 nThreads 個線程将是活動的處理任務。如果在所有線程都處于活動狀态時送出了其他任務,它們将在隊列中等待,直到有線程可用。如果任何線程在關閉之前的執行過程中由于失敗而終止,如果需要執行後續任務,新的線程将取代它。池中的線程将一直存在,直到顯式關閉。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
4.2.newSingleThreadExecutor()
生成隻有一個線程的固定線程池,設定線程數為 1
因為是無界隊列,是以最大線程數其實用不到
建立一個使用單個工作線程在無界隊列上運作的 Executor。 (但請注意,如果該單線程在關閉前的執行過程中因故障而終止,如果需要執行後續任務,則新線程将取代它。)任務保證按順序執行,并且不會有超過一個任務處于活動狀态在任何給定時間。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.3.newCachedThreadPool()
生成一個需要的時候就建立新的線程,同時可以複用之前建立的線程(如果這個線程目前沒有任務)的線程池。
SynchronousQueue内部沒有資料的存儲空間,任何入隊的線程都會阻塞,直到有線程來出隊,可以保證如果加入一個資料,必須要remove掉,才能再次計入
建立一個線程池,根據需要建立新線程,但在可用時将重用以前構造的線程。這些池通常會提高執行許多短期異步任務的程式的性能。如果可用,對執行的調用将重用以前構造的線程。如果沒有可用的現有線程,将建立一個新線程并将其添加到池中。六十秒内未使用的線程将被終止并從緩存中删除。是以,保持空閑足夠長時間的池不會消耗任何資源。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4.4.newScheduledThreadPool()
定期或延時執行
建立一個線程池,可以安排指令在給定延遲後運作,或定期執行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
5.線程池關鍵屬性
5.1.線程池建立參數
- corePoolSize(核心線程數)
- maximumPoolSize(最大線程數)
- workQueue(隊列)
- keepAliveTime(空閑時間)
-
rejectedExecutionHandler(拒絕政策)
corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設定而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任務,添加任務的時候,如果目前線程數超過了 corePoolSize,那麼往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
keepAliveTime 用于設定空閑時間,如果線程數超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作
rejectedExecutionHandler 用于處理當線程池不能執行此任務時的情況。
- AbortPolicy(預設):丢棄任務并抛出RejectedExecutionException異常。
- DiscardPolicy:丢棄任務,但是不抛出異常。
- DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新送出被拒絕的任務。
- CallerRunsPolicy:由調用線程(送出任務的線程)處理該任務。