一: 線程池定義:
1. 為什麼會出現線程池?
頻繁建立線程是一個比耗時且有一定的資源開銷,故引入了線程池,将線程通過池子的管理,進行重複利用。
2. 如果建立線程:
a ) 通過Executors輔助類建立線程池的執行個體:
它預設提供了4種線程池政策:
newFixedThreadPool(int nThreads) 建立一個固定數量的線程池,它的緩存任務隊列的大小是Int 的最大值,同時拒絕政策采用抛出異常 RejectedExecutionException
Executors 傳回的線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadPool : 允許的請求隊列長度為 Integer.MAX_VALUE ,可能會堆積大量的請求,進而導緻 OOM 。
CachedThreadPool 和 ScheduledThreadPool : 允許的建立線程數量為 Integer.MAX_VALUE ,可能會建立大量的線程,進而導緻 OOM 。
b ) 直接通過調用ThreadPoolExecutor 的構造方法進行建立:
/**
corePoolSize : 線程池線程初始數
maxinumPoolSize : 線程池最大線程數
keepAliveTime : (maxinumPoolSize - corePoolSize) 的線程存活時長
unit : 存活時長的時間機關
workQueue : 任務阻塞隊列
ArrayBlockingQueue:構造函數一定要傳大小
LinkedBlockingQueue:構造函數不傳大小會預設為(Integer.MAX_VALUE ),當大量請求任務時,容易造成 記憶體耗盡。
SynchronousQueue:同步隊列,一個沒有存儲空間的阻塞隊列 ,将任務同步傳遞給工作線程。
PriorityBlockingQueue : 優先隊列
threadFactory : 線程工廠
handler : 拒絕執行政策處理器
拒絕政策有4種:
AbortPolicy: 當任務數超出線程池處理數,就會抛出該異常
DiscardPolicy: 當任務數超出線程池處理數,它會将後續送出的任務進行丢棄
DiscardOldestPolicy: 當任務數超出線程池處理數,它會将阻塞隊列中第一個位置的元素彈出
CallerUnsPolicy: 當任務數超出線程池處理數,它會将任務抛出調用者線程進行執行
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
二:線程池狀态及生命周期:
RUNNING : 表示線程池可以接收新的任務送出,并且還可以正常處理阻塞隊列中的任務。
SHUTDOWN : 不再接收新的任務送出,但還會繼續處理阻塞隊列中的任務。
STOP : 不再接受新的任務送出,同時還會丢棄阻塞隊列中的未處理的任務,并且會立即中止目前進行中的任務
TIDYING : 表示所有任務都執行完畢後(包括阻塞隊列),目前線程池中的活動線程數量降為0,将會調用terminated方法
TERMIMATED : 線程池終止狀态,當terminated() 方法執行完畢後,線程池将會處理該狀态之下。

三:線程池的結構:
3.1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
該屬性是原子的Integer 類型,可以實作原子操作,Doug Lea 将這個ctl 變量,拆分成二部分,前29位用于記錄工作線程數, 後3位表示線程池的狀态
3.2 線程池中大量的提供了對 AtomicInteger ctl 的狀态原子操作變更 和 工作線程數量的原子變更
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
3.3 private final BlockingQueue<Runnable> workQueue;
該隊列用于存放 工作任務 (Runable,Callable)
3.4 private final HashSet<Worker> workers = new HashSet<Worker>();
該HashSet 集合,專門用于存線上程池中正在工作的線程
3.5 private volatile ThreadFactory threadFactory;
用于建立線程池線程的工廠方法,可以自己擴充,友善将線程命名之類
3.6 private volatile RejectedExecutionHandler handler;
用于執行任務過多超載時,執行的拒絕政策
3.7 private volatile int corePoolSize;
線程池核心線程數
3.8 private volatile int maximumPoolSize;
線程池最大的線程數
3.9 private volatile long keepAliveTime;
線程池空餘線程(非核心線程)的存活時間
3.10 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}
它是線程池中的内部類,主要就是封裝了線程和任務.
四:線程池中一些重要的實作:
1. 當任務送出,會進行判斷
如果線程數小于基本線程數(coreSize), 則直接建立線程,并将線程和任務包裝成worker 對象放入 工作線程集合中
如果線程數大于基本線程數,則放入到任務隊列中
如果任務隊列已滿,則又會建立線程到線程池最大值
如果線程數已經達到最大值,任務隊列已滿,則執行拒絕政策。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); int c = ctl.get();
//工作線程數量與線程池核心線程數比較,小于核心數線程數量,則建立線程,并且将建立的工作線程及其綁定的任務的worker 對象放入工作線程集合中,而且同時會調用線程的start方法。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判斷線程池處于運作狀态,然後将任務放入到工作隊列中去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查線程池是否處于運作狀态,如果不是運作狀态,則将任務從隊列中移除,并執行拒絕政策
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作線程數為0,表示
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//Work 線程中的run 方法,調用了線程池的runWorker 方法:
/** Delegates main run loop to outer runWorker */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
public void run() {
runWorker(this);
}
}
// 線上程調用start 方法後,裡面有一個while 循環,其判斷條件是:線上程第一次建立時,會直接執行其任務,如果沒有任務,則從隊列中擷取任務,如果沒有擷取到任務則一直阻塞,直到任務擷取到為止。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 要麼執行送出過來的任務,要麼從工作隊列中擷取任務 getTask(),沒有擷取到任務則阻塞,直到擷取到任務為止
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 移除空閑多餘線程
processWorkerExit(w, completedAbruptly);
}
}
// 在getTask 中的阻塞是通過 workQueue.take() 方法實作,同時在for 循環中,會判斷線程是否逾時,如果逾時,則會将線程打斷
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果工作線程數大于核心線程數,且線程存活時間超過設定時間時,傳回true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 工作線程數大于最大線程數或者線程逾時 并且 工作線程大于1 或者 工作隊列任務數為空時,那麼take 方法就會傳回null , 進而把流程交給了 runWorker 方法中的 finally 代碼塊,
// 而finally 代碼塊,會将空閑多餘的線程從工作線程中移除.
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
五:其它相關的資訊:
PS: 在ThreadPoolExecutor 中的submit 方法會吞掉異常資訊,而execute 方法隻能出現部分資訊,最好是将異常進行包裝,如下:
public Runnable wrap(final Runable task, final Exception ex , String threadName) {
return new Runable(){
try {
task.run();
} catch (Exception e) {
ex.printStackTrace(e);
throw e;
}
}
}