天天看點

Java線程池源碼解讀

作者:我心永恒224077694

線程池是Java并發程式設計中的一個重要元件,它能夠有效地管理線程的生命周期和執行,進而避免了頻繁建立和銷毀線程的開銷。在本文中,我們将詳細解讀Java線程池的實作源碼。

1. 線程池的基本實作

Java線程池的基本實作是通過ThreadPoolExecutor類來完成的。ThreadPoolExecutor是一個線程池的核心類,它實作了Executor接口并提供了線程池的完整功能。下面是ThreadPoolExecutor類的構造函數:

```

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代碼 }

}

```

其中,corePoolSize是線程池的核心線程數,maximumPoolSize是線程池的最大線程數,keepAliveTime和unit是非核心線程的存活時間和時間機關,workQueue是任務隊列,threadFactory用于建立線程,handler是拒絕政策,用于處理任務隊列已滿時的任務拒絕。

線程池的執行過程如下:

1. 當任務數量小于核心線程數時,建立核心線程來處理任務。

2. 當任務數量大于核心線程數時,将任務放入任務隊列中。

3. 當任務隊列已滿且線程數小于最大線程數時,建立新的線程來處理任務。

4. 當線程數達到最大線程數時,執行拒絕政策。

下面我們來詳細解讀ThreadPoolExecutor類的實作源碼。

2. 線程池的實作細節

ThreadPoolExecutor類實作了ExecutorService接口,它提供了submit、invokeAll、invokeAny等方法,用于送出任務和管理線程池。下面我們來看一下ThreadPoolExecutor類中的一些重要方法。

2.1 execute方法

execute方法是ThreadPoolExecutor類中最重要的方法之一,用于執行任務。下面是execute方法的源碼:

```

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task. The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we add a new thread. This covers both cases when the * queue is full and when the queue is "small" (i.e., when * queuing would normally be allowed but construction has * not yet completed). We don't actually know whether the * task is going to be executed until workerThread * actually handles it, so we compensate for workers that * exit just after checking before blocking by rechecking * runState. */

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true)) return; c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command)) reject(command);

else if (workerCountOf(recheck) == 0) addWorker(null, false);

}

else if (! addWorker(command, false)) reject(command);

}

```

可以看到,execute方法中主要做了以下幾件事情:

1. 如果目前線程數小于核心線程數,嘗試通過addWorker方法建立新的線程來處理任務。

2. 如果任務隊列未滿,則将任務加入任務隊列中。

3. 如果任務隊列已滿但線程數小于最大線程數,嘗試通過addWorker方法建立新的線程來處理任務。

4. 如果任務隊列已滿且線程數已達到最大線程數,執行拒絕政策。 其中,addWorker方法用于建立新的線程來處理任務。下面我們來看一下addWorker方法的實作。

2.2 addWorker方法 addWorker方法用于建立新的線程來處理任務,其源碼如下:

```

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (; ; ) {

int c = ctl.get();

int rs = runStateOf(c); // Check if queue empty only if necessary.

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;

for (; ; ) {

int wc = workerCountOf(c);

if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;

if (compareAndIncrementWorkerCount(c)) break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize) largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (!workerStarted) addWorkerFailed(w);

}

return workerStarted;

}

```

可以看到,addWorker方法主要做了以下幾件事情:

1. 檢查線程池的狀态,如果線程池的狀态為SHUTDOWN或STOP,則傳回false,不再建立新的線程。

2. 如果目前線程數小于最大線程數,則通過compareAndIncrementWorkerCount方法增加線程數,并繼續執行下面的代碼。

3. 建立新的Worker對象,并将其加入到workers集合中。

4. 如果worker添加成功,則啟動新的線程。 其中,Worker類是線程池中最重要的類之一,我們接下來來看一下Worker類的實作。

2.3 Worker類 Worker類是線程池中真正執行任務的類,其源碼如下:

```

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable {

/**

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

*/

private static final long serialVersionUID = 6138294804551838833L;

/**

* Thread this worker is running in. Null if factory fails.

*/

final Thread thread;

/**

* Initial task to run. Possibly null.

*/

Runnable firstTask;

/**

* Per-thread task counter

*/

volatile long completedTasks;

/**

* Creates with given first task and thread from ThreadFactory.

*

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

/**

* Delegates main run loop to outer runWorker

*/

public void run() {

runWorker(this);

}

// Lock methods

//

// The value 0 represents the unlocked state.

// The value 1 represents the locked state.

protected boolean isHeldExclusively() {

return getState() != 0;

}

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

public void lock() {

acquire(1);

}

public boolean tryLock() {

return tryAcquire(1);

}

public void unlock() {

release(1);

}

public boolean isLocked() {

return isHeldExclusively();

}

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

}

```

可以看到,Worker類主要做了以下幾件事情:

1. 實作了Runnable接口,用于執行任務。

2. 實作了AbstractQueuedSynchronizer類,用實作線程的同步,保證同一時間隻有一個線程在執行任務。

3. 每個Worker對象都有一個firstTask屬性,表示Worker執行的第一個任務。

4. 每個Worker對象都有一個completedTasks屬性,表示Worker執行的任務數量。

在Worker類中,最重要的方法是runWorker方法,它是線程池中真正執行任務的方法,其源碼如下:

```

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x;

throw x;

} catch (Error x) {

thrown = x;

throw x;

} catch (Throwable x) {

thrown = x;

throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

```

可以看到,runWorker方法主要做了以下幾件事情:

1. 擷取Worker對象的firstTask屬性,并将其置為null。

1. 允許中斷,執行任務。

1. 如果線程池狀态為STOP或SHUTDOWN,并且目前線程沒有被中斷,則中斷目前線程。

1. 執行任務,并在任務執行前後調用beforeExecute和afterExecute方法。

1. 在任務執行完成後,更新Worker對象的completedTasks屬性,并将Worker對象解鎖。

1. 循環執行任務,直到線程池狀态為STOP或SHUTDOWN。

通過以上源碼解讀,我們可以對Java線程池的實作原理有更加深入的了解。

繼續閱讀