天天看點

Java 線程池源碼詳細解析,程式員必備技能

前言

Java線程池是Java中常用的多線程程式設計工具之一,它可以幫助我們管理線程池中的線程,提高多線程程式的性能和可靠性。本篇技術部落格将深度解析Java線程池的源碼,并通過執行個體代碼進行說明。

大家好,這裡是網際網路技術學堂,留下你的點贊、關注、分享,支援一下吧,謝謝。

Java 線程池源碼詳細解析,程式員必備技能

線程池的基本概念

Java線程池是一個線程池管理器,它可以通過重用已建立的線程來減少線程建立和銷毀的開銷,進而提高多線程程式的性能和可靠性。在Java中,線程池是通過java.util.concurrent包中的ThreadPoolExecutor和ScheduledThreadPoolExecutor兩個類來實作的。

ThreadPoolExecutor類是一個線程池的基本實作類,它可以管理一組線程,包括建立、執行、銷毀線程。ScheduledThreadPoolExecutor類是ThreadPoolExecutor的擴充類,它可以管理一組線程,并且可以安排線程在指定時間執行。

線程池的基本概念包括以下幾個方面:

  1. corePoolSize:線程池中核心線程的數量。
  2. maximumPoolSize:線程池中最大線程的數量。
  3. keepAliveTime:線程池中空閑線程的存活時間。
  4. unit:keepAliveTime的時間機關。
  5. workQueue:用于存放待執行任務的阻塞隊列。
  6. threadFactory:用于建立新線程的工廠。
  7. handler:當任務不能被執行時的處理器。

ThreadPoolExecutor源碼解析

構造方法

ThreadPoolExecutor的構造方法有四種重載方式,其中最常用的是以下構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

// ...

}

這個構造方法有七個參數:

  1. corePoolSize:線程池中核心線程的數量。
  2. maximumPoolSize:線程池中最大線程的數量。
  3. keepAliveTime:線程池中空閑線程的存活時間。
  4. unit:keepAliveTime的時間機關。
  5. workQueue:用于存放待執行任務的阻塞隊列。
  6. threadFactory:用于建立新線程的工廠。
  7. handler:當任務不能被執行時的處理器。

這個構造方法的實作非常簡單,主要是對線程池的各個參數進行了初始化:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
// 檢查參數
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
// 初始化線程池參數
this.corePoolSize = corePoolSize;
this.maximumPoolSize= maximumPoolSize;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.workQueue = workQueue;
this.threadFactory = threadFactory;
this.handler = handler;
}           

execute方法

ThreadPoolExecutor中最重要的方法是execute方法,它用于将任務送出給線程池執行。execute方法有一個Runnable參數,表示要執行的任務。

Java 線程池源碼詳細解析,程式員必備技能
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 如果目前線程數小于核心線程數,就建立一個線程執行該任務
if (poolSize < corePoolSize) {
addWorker(command, true);
} else {
// 将任務放入工作隊列
if (!workQueue.offer(command)) {
// 如果工作隊列已滿,嘗試建立一個新的線程執行該任務
if (!addWorker(command, false)) {
// 如果建立線程失敗,則使用拒絕政策處理該任務
handler.rejectedExecution(command, this);
}
}
}
}           

execute方法的實作非常簡單,首先會檢查任務是否為空。然後,它會根據目前線程池中的線程數和核心線程數的比較來決定如何執行該任務。如果目前線程數小于核心線程數,就建立一個線程執行該任務;否則,将任務放入工作隊列中。如果工作隊列已滿,就嘗試建立一個新的線程執行該任務。如果建立線程失敗,就使用拒絕政策處理該任務。

addWorker方法

addWorker方法用于向線程池中添加新的工作線程。它有兩個參數:一個Runnable類型的firstTask參數表示要執行的第一個任務,一個boolean類型的core表示是否将新線程作為核心線程。

private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋操作,如果線程池狀态不是 RUNNING,傳回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查線程池狀态
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
// 自增工作線程數
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
c = ctl.get();
if (runStateOf(c) != rs) {
continue retry;
}
}
}
// 建立并啟動新的線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次檢查線程池狀态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs== SHUTDOWN && firstTask == null)) {
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}           

addWorker方法中的核心代碼就是建立并啟動新的線程。建立一個新的線程需要建立一個Worker對象,Worker是一個Runnable對象,它用于執行任務。在建立Worker對象時,會通過ThreadFactory建立一個Thread對象。然後,将Worker對象加入到線程池的workers集合中,同時自增工作線程數。最後,通過調用Thread對象的start方法啟動新線程。

Worker類

Worker類是ThreadPoolExecutor中的一個内部類,它實作了Runnable接口。Worker對象用于執行任務。每個Worker對象都綁定了一個Thread對象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// 任務
Runnable task;
// 綁定的線程
final Thread thread;
// 線程初始狀态為0
// 表示該線程處于可運作狀态
// RUNNING表示線程正在運作中
// SHUTDOWN表示線程池已關閉,但還未處理完任務
// STOP表示線程池已關閉,并且已處理完任務
// TIDYING表示正在關閉線程池的工作線程,正在進行狀态轉換操作
// TERMINATED表示線程池已關閉,所有工作線程已經銷毀
volatile int state = 0;
Worker(Runnable firstTask) {
// 設定AQS狀态,表示線程可運作
setState(-1);
this.task = firstTask;
// 通過ThreadFactory建立線程
this.thread = getThreadFactory().newThread(this);
}
// 擷取Worker綁定的線程的線程ID
public long getId() {
return thread.getId();
}
public void run() {
// 執行Worker的runWorker方法,執行任務
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try t.interrupt();
}
}           

Worker類實作了Runnable接口,它的核心代碼就是執行任務的run方法,具體的任務是儲存在task屬性中的。Worker對象通過實作AQS(AbstractQueuedSynchronizer)實作了鎖定線程的功能,可以通過lock、unlock、tryLock等方法來控制線程的執行狀态。

execute方法

execute方法是ThreadPoolExecutor的核心方法之一,用于向線程池送出任務。當調用execute方法送出任務時,線程池的處理流程如下:

  1. 如果工作線程數小于核心線程數,那麼建立一個新的工作線程來執行任務。
  2. 如果工作線程數已經達到核心線程數,那麼将任務放入阻塞隊列中。
  3. 如果阻塞隊列已滿,那麼建立新的工作線程來執行任務,直到達到最大工作線程數。
  4. 如果工作線程數已經達到最大工作線程數,那麼執行拒絕政策。
Java 線程池源碼詳細解析,程式員必備技能

下面是execute方法的代碼:

public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
} else if (!addWorker(command, false)) {
reject(command);
}
}           

execute方法接收一個Runnable對象作為參數。在方法中,首先檢查任務是否為空,如果為空,則抛出NullPointerException異常。然後,通過調用ctl.get()方法擷取線程池的狀态和工作線程數,判斷是否需要建立新的工作線程。如果工作線程數小于核心線程數,那麼建立一個新的工作線程來執行任務。

如果工作線程數已經達到核心線程數,那麼将任務放入阻塞隊列中。如果阻塞隊列已滿,那麼建立新的工作線程來執行任務,直到達到最大工作線程數。如果工作線程數已經達到最大工作線程數,那麼執行拒絕政策。

shutdown方法

shutdown方法用于關閉線程池。當調用shutdown方法時,線程池的處理流程如下:

  1. 線程池狀态設定為SHUTDOWN。
  2. 中斷所有空閑線程。
  3. 清空阻塞隊列。

下面是shutdown方法的代碼:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
}           

shutdown方法首先擷取主鎖,然後調用checkShutdownAccess方法檢查是否有權限關閉線程池。然後,将線程池狀态設定為SHUTDOWN,并中斷所有空閑線程。最後,調用onShutdown方法。

shutdownNow方法

shutdownNow方法用于立即關閉線程池。當調用shutdownNow方法時,線程池的處理流程如下:

1. 線程池狀态設定為STOP。

2. 中斷所有工作線程。

3. 清空阻塞隊列。

4. 傳回等待執行的任務清單。

下面是shutdownNow方法的代碼:

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}           

shutdownNow方法首先擷取主鎖,然後調用checkShutdownAccess方法檢查是否有權限關閉線程池。然後,将線程池狀态設定為STOP,并中斷所有工作線程。最後,清空阻塞隊列,傳回等待執行的任務清單,并調用tryTerminate方法。

submit方法

submit方法用于向線程池送出任務,并傳回一個Future對象。當調用submit方法送出任務時,線程池的處理流程與execute方法類似。不同之處在于,submit方法傳回一個Future對象,可以用來擷取任務執行的結果。

下面是submit方法的代碼:

public <T> Future<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException();
}
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException();
}
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}           

總結

線程池是一種非常實用的工具,可以幫助開發者管理線程,提高程式的性能和可靠性。Java線程池實作了一系列的線程池接口,提供了多種建立和管理線程池的方式。本文對Java線程池的源碼進行了深度解析,講解了線程池的實作原理和核心方法。

繼續閱讀