線程池是Java并發程式設計中的一個重要元件,它能夠有效地管理線程的生命周期和執行,進而避免了頻繁建立和銷毀線程的開銷。在本文中,我們将詳細解讀Java線程池的實作源碼。
1. 線程池的基本實作
Java線程池的基本實作是通過ThreadPoolExecutor類來完成的。ThreadPoolExecutor是一個線程池的核心類,它實作了Executor接口并提供了線程池的完整功能。下面是ThreadPoolExecutor類的構造函數:
```
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代碼 }
}
```
其中,corePoolSize是線程池的核心線程數,maximumPoolSize是線程池的最大線程數,keepAliveTime和unit是非核心線程的存活時間和時間機關,workQueue是任務隊列,threadFactory用于建立線程,handler是拒絕政策,用于處理任務隊列已滿時的任務拒絕。
線程池的執行過程如下:
1. 當任務數量小于核心線程數時,建立核心線程來處理任務。
2. 當任務數量大于核心線程數時,将任務放入任務隊列中。
3. 當任務隊列已滿且線程數小于最大線程數時,建立新的線程來處理任務。
4. 當線程數達到最大線程數時,執行拒絕政策。
下面我們來詳細解讀ThreadPoolExecutor類的實作源碼。
2. 線程池的實作細節
ThreadPoolExecutor類實作了ExecutorService接口,它提供了submit、invokeAll、invokeAny等方法,用于送出任務和管理線程池。下面我們來看一下ThreadPoolExecutor類中的一些重要方法。
2.1 execute方法
execute方法是ThreadPoolExecutor類中最重要的方法之一,用于執行任務。下面是execute方法的源碼:
```
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we add a new thread. This covers both cases when the * queue is full and when the queue is "small" (i.e., when * queuing would normally be allowed but construction has * not yet completed). We don't actually know whether the * task is going to be executed until workerThread * actually handles it, so we compensate for workers that * exit just after checking before blocking by rechecking * runState. */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) reject(command);
else if (workerCountOf(recheck) == 0) addWorker(null, false);
}
else if (! addWorker(command, false)) reject(command);
}
```
可以看到,execute方法中主要做了以下幾件事情:
1. 如果目前線程數小于核心線程數,嘗試通過addWorker方法建立新的線程來處理任務。
2. 如果任務隊列未滿,則将任務加入任務隊列中。
3. 如果任務隊列已滿但線程數小于最大線程數,嘗試通過addWorker方法建立新的線程來處理任務。
4. 如果任務隊列已滿且線程數已達到最大線程數,執行拒絕政策。 其中,addWorker方法用于建立新的線程來處理任務。下面我們來看一下addWorker方法的實作。
2.2 addWorker方法 addWorker方法用于建立新的線程來處理任務,其源碼如下:
```
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
if (compareAndIncrementWorkerCount(c)) break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) addWorkerFailed(w);
}
return workerStarted;
}
```
可以看到,addWorker方法主要做了以下幾件事情:
1. 檢查線程池的狀态,如果線程池的狀态為SHUTDOWN或STOP,則傳回false,不再建立新的線程。
2. 如果目前線程數小于最大線程數,則通過compareAndIncrementWorkerCount方法增加線程數,并繼續執行下面的代碼。
3. 建立新的Worker對象,并将其加入到workers集合中。
4. 如果worker添加成功,則啟動新的線程。 其中,Worker類是線程池中最重要的類之一,我們接下來來看一下Worker類的實作。
2.3 Worker類 Worker類是線程池中真正執行任務的類,其源碼如下:
```
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask;
/**
* Per-thread task counter
*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/**
* Delegates main run loop to outer runWorker
*/
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
```
可以看到,Worker類主要做了以下幾件事情:
1. 實作了Runnable接口,用于執行任務。
2. 實作了AbstractQueuedSynchronizer類,用實作線程的同步,保證同一時間隻有一個線程在執行任務。
3. 每個Worker對象都有一個firstTask屬性,表示Worker執行的第一個任務。
4. 每個Worker對象都有一個completedTasks屬性,表示Worker執行的任務數量。
在Worker類中,最重要的方法是runWorker方法,它是線程池中真正執行任務的方法,其源碼如下:
```
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
```
可以看到,runWorker方法主要做了以下幾件事情:
1. 擷取Worker對象的firstTask屬性,并将其置為null。
1. 允許中斷,執行任務。
1. 如果線程池狀态為STOP或SHUTDOWN,并且目前線程沒有被中斷,則中斷目前線程。
1. 執行任務,并在任務執行前後調用beforeExecute和afterExecute方法。
1. 在任務執行完成後,更新Worker對象的completedTasks屬性,并将Worker對象解鎖。
1. 循環執行任務,直到線程池狀态為STOP或SHUTDOWN。
通過以上源碼解讀,我們可以對Java線程池的實作原理有更加深入的了解。