什麼是線程池
線程池是一種用于管理和複用線程的機制,可以有效地管理線程的建立、執行和銷毀。使用線程池可以減少線程建立和銷毀的開銷,并能夠提供對并發任務的排程和執行的控制。
Java提供了ExecutorService接口和一些實作類來實作線程池。以下是幾個常用的線程池實作類:
- ThreadPoolExecutor:ThreadPoolExecutor是一個靈活的線程池實作,可以通過構造函數參數來指定線程池的核心線程數、最大線程數、線程空閑時間等參數。
- Executors.newFixedThreadPool(int nThreads):newFixedThreadPool方法傳回一個固定大小的線程池,其中線程數固定為指定的數量。
- Executors.newCachedThreadPool():newCachedThreadPool方法傳回一個根據需要建立線程的線程池。線程池會根據任務的數量自動調整線程的數量。
- Executors.newSingleThreadExecutor():newSingleThreadExecutor方法傳回一個隻有一個線程的線程池,用于按順序執行任務。
這些線程池實作類都實作了ExecutorService接口,提供了一些常用的方法來管理線程池和執行任務,例如submit用于送出任務,shutdown用于關閉線程池等。
使用線程池的主要優點是:
- 重用線程:線程池會複用已建立的線程,避免了頻繁建立和銷毀線程的開銷。
- 控制并發數:可以限制線程池中線程的數量,控制并發執行的任務數,避免資源耗盡。
- 提供任務排程和執行控制:可以送出任務并由線程池負責排程和執行任務,可以設定任務的優先級、逾時等屬性。
使用線程池可以提高多線程應用程式的性能和穩定性,并且能夠更好地管理線程資源。
線程池的使用示例
這裡就舉一個使用Executors.newFixedThreadPool()建立線程池的例子
java複制代碼package org.example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
// 建立固定大小為3的線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 送出任務給線程池
for (int i = 0; i < 10; i++) {
Runnable task = new MyTask(i);
executorService.execute(task);
}
// 關閉線程池
executorService.shutdown();
}
}
class MyTask implements Runnable {
private int taskId;
public MyTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "---" + "Task " + taskId + " is running.");
// 執行任務的代碼
}
}
可以發現使用都是這三個線程
線程池的實作思路
線程池的實作原理基于以下幾個關鍵元件:
- 任務隊列(Task Queue):用于存儲送出給線程池的任務。當線程池中的線程空閑時,會從任務隊列中擷取任務進行執行。
- 線程池管理器(ThreadPool Manager):負責線程池的建立、初始化、銷毀等操作,以及對線程池中的線程進行管理。
- 工作線程(Worker Threads):線程池中的線程,用于執行送出的任務。線程池會建立一定數量的工作線程,并将任務配置設定給它們執行。
- 飽和政策(Saturation Policy):用于決定當任務隊列已滿且無法繼續接受新任務時,線程池應該采取的政策。常見的飽和政策有直接送出、丢棄、丢棄最舊任務和調用者運作等。
線程池的工作流程如下:
- 建立線程池:通過線程池的工廠方法或自定義方式建立線程池,并指定線程池的參數,如核心線程數、最大線程數、線程空閑時間等。
- 初始化線程:線程池在建立後,會初始化一定數量的核心線程,這些線程處于等待任務的狀态。
- 送出任務:應用程式通過調用線程池的任務送出方法(如submit()或execute())将任務送出給線程池。
- 任務隊列:線程池維護一個任務隊列,用于存儲已送出但尚未執行的任務。
- 任務排程和執行:線程池根據任務排程政策從任務隊列 中選擇任務,并将任務配置設定給合适的線程執行。任務可以在核心線程、非核心線程或等待隊列中等待執行。
- 線程複用:線程池會複用已建立的線程來執行多個任務,避免了線程的頻繁建立和銷毀開銷。執行完一個任務後,線程會被重新配置設定給下一個任務。
- 動态調整線程數量:線程池根據目前的工作負載情況和線程池參數,動态調整線程的數量。當任務量增加時,可以建立新的線程來處理;當任務量減少時,可以銷毀多餘的線程。
- 線程逾時:對于一些線程池實作,當線程在一定時間内沒有任務可執行時,超過指定的線程空閑時間,線程可能會被終止和移除。
- 錯誤處理:線程池可以提供錯誤處理機制,當任務抛出異常時,可以進行相應的處理,如記錄日志或通知應用程式。
- 關閉線程池:當應用程式不再需要線程池時,應調用線程池的關閉方法來關閉線程池。關閉線程池會停止接受新的任務,并等待已送出的任務執行完成。可以選擇使用不同的關閉方法,如shutdown()方法來等待任務執行完成,或shutdownNow()方法來立即停止執行中的任務并關閉線程池。
線程池的實作原理可以提高線程的使用率、減少線程建立和銷毀的開銷,并提供對并發任務的管理和排程。通過合理配置線程池的參數,可以控制并發線程的數量,避免資源耗盡,并提高系統的性能和穩定性。
源碼分析
構造方法源碼
arduino複制代碼public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ini複制代碼public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
這裡比較簡單就是初始化一些初始值,需要注意的是這裡使用到了阻塞隊列,主要用于存儲任務使用
execute源碼分析
scss複制代碼public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//這裡需要注意這個ctl,高三位辨別目前線程池的狀态
//剩下的是線程數workerCountOf其實就是一個位運算,取得線程數
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//目前工作的線程比核心線程少則建立一個線程
//這裡可以看出線程池中的線程并不是線程池初始化的時候建立的
//而是使用懶加載的一種方式建立線程
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//2.核心池已滿,但任務隊列未滿,添加到隊列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//再次檢查線程池的狀态
reject(command);
else if (workerCountOf(recheck) == 0)
//說明線程已經被銷毀了需要建立線程
addWorker(null, false);
}
//3.核心池已滿,隊列已滿,試着建立一個新線程
else if (!addWorker(command, false))
//這都失敗了則可以直接拒絕任務了
reject(command);
}
線程池的狀态
這裡我們可以看一下線程池的狀态
arduino複制代碼private static final int COUNT_BITS = Integer.SIZE - 3; // 32 -3 = 29
//這個就表示最大線程數
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//可以發現狀态都是左移29位,也就印證了上文說了高三位存儲的是線程池的狀态
//接收任務并且執行隊列中的任務
private static final int RUNNING = -1 << COUNT_BITS;
//不接收任務但是會執行隊列中的任務
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接收任務不執行隊列中的任務且會中斷執行中的任務
private static final int STOP = 1 << COUNT_BITS;
//所有的任務都已經結束,線程數量為 0,處于該狀态的線程池即将調用 terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
//terminated()方法執行完成
private static final int TERMINATED = 3 << COUNT_BITS;
狀态變化流程圖:
addWorker源碼分析
ini複制代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//擷取目前狀态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//狀态是STOP TIDYING TERMINATED會直接傳回false
//如果是SHUTDOWN則會判斷firstTask是否是null,并且任務隊列總還有任務的時候
是可以允許繼續添加線程的(應為要等任務全部執行完成)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//工作線程數
int wc = workerCountOf(c);
//判斷工作線城市是否已經大于等于預設的最大線程數
//或者是判斷是否大于核心線程數或者我們設定的最大線程數
(這裡是根據建立類型判斷取值的也就是core)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
//上面的條件都通過了則會通過CAS修改線程數如果成功了則會跳出循環
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//沒有成功的話繼續重試
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//工作線程開始的标志
boolean workerStarted = false;
//工作線程添加完成的标志
boolean workerAdded = false;
Worker w = null;
try {
//新建立一個Worker
w = new Worker(firstTask);
//擷取對應的線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//這裡需要加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//如果是運作狀态或者說是SHUTDOWN狀态但是firstTask == null是可以添加線程的
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到線程池中
workers.add(w);
int s = workers.size();
//如果集合中的工作線程數大于最大線程數,
這個最大線程數表示線程池曾經出現過的最大線程數
if (s > largestPoolSize)
largestPoolSize = s;
//這裡就表示添加成功了
workerAdded = true;
}
} finally {
//解鎖
mainLock.unlock();
}
if (workerAdded) {
//最後啟動線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果失敗了需要移除對應的worker并且數量遞減
addWorkerFailed(w);
}
return workerStarted;
}
這裡我們可以看一下Worker的資料結構
arduino複制代碼private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//真正工作的線程
final Thread thread;
//需要執行的任務
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
......
}
可以發現Worker繼承了AQS并且實作了Runnable,在構造方法中會将對應state置為-1并且會使用ThreadFactory建立一個新的線程,需要注意的是傳入的Runnable就是this,意思就是一旦線程start則會執行Worker中的run方法,上文中我們也知道添加成功之後會直線線程start,也就是說會走到run方法執行runWorker對應的源碼如下:
ini複制代碼final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//這個循環直到任務隊列中的任務全部執行完了才會停止(getTask擷取任務隊列中的任務)
while (task != null || (task = getTask()) != null) {
w.lock();
//線程池狀态時STOP的時候需要終止任務
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//這裡預設是沒有實作的,在一些特定的場景中我們可以自己繼承
ThreadpoolExecutor 自己重寫
beforeExecute(wt, task);
Throwable thrown = null;
try {
//這裡還是執行任務了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//這裡預設也是沒有實作
afterExecute(task, thrown);
}
} finally {
task = null;
//最後将完成任務書加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執行到這裡說明任務隊列中也沒有可以執行的任務了是以這裡會
将入參 worker 從數組 workers 裡删除掉;
processWorkerExit(w, completedAbruptly);
}
}
注:為什麼沒有使用可重入鎖ReentrantLock,而是使用AQS,為的就是實作不可重入的特性去反應線程現在的執行狀态
最後再看一下getTask方法
getTask
java複制代碼private Runnable getTask() {
//逾時标志
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果目前的狀态
//1. 線程池狀态為 shutdown,且workQueue 為空
(反映了 shutdown 狀态的線程池還是要執行 workQueue 中剩餘的任務的)
//2. 線程池狀态為 stop(shutdownNow()會導緻變成 STOP)(此時不用考慮 workQueue的情況)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
//傳回null之後runWorker會回收Worker,這個線程就會被釋放掉
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed 變量用于判斷是否需要進行逾時控制
// allowCoreThreadTimeOut 預設是 false,也就是核心線程不允許進行逾時
// wc > corePoolSize,表示目前線程池中的線程數量大于核心線程數量
// 對于超過核心線程數量的這些線程,需要進行逾時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//線程數量超過 maximumPoolSize
//timed && timedOut 如果為 true,表示目前操作需要進行逾時控制
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
//此時也是worker數量減一然後傳回null 外層調用方法會釋放worker
return null;
continue;
}
try {
//據timed來判斷,如果為true,則通過阻塞隊列poll方法進行逾時控制,
如果在 keepaliveTime 時間内沒有擷取到任務,則傳回null
否則通過 take 方法阻塞式擷取隊列中的任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//如果拿到了則傳回任務繼續執行
return r;
//拿不到标志timedOut位true. 下一次循環會傳回null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
可以發現這個方法中有兩個重要的方法就是workQueue.poll()和workQueue.take(),他們分别表示逾時等待還是說無限等待
workQueue.poll()
ini複制代碼public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {//這裡表示任務數量
//第一次進來肯定不是0
//第二次進來就為0是以會直接傳回null
//當然如果第二次循環這個時候任務數量不是0則不會走到這個循環
//而是傳回任務執行任務
if (nanos <= 0)
return null;
//這個時候就等待對應的時間,等待完成之後nanos=0
nanos = notEmpty.awaitNanos(nanos);
}
//任務出隊
x = dequeue();
//任務數量遞減
c = count.getAndDecrement();
if (c > 1)
//如果此時還有任務則通知(這個時候等待的線程會收到通知并喚醒線程)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
可以發現這個方法就是等待一定的時間如果一直沒有任務則目前線程會直接被銷毀掉
workQueue.take()
ini複制代碼public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
這裡比較簡單,就是隻要目前的任務數量是0就會等待,直到調用signal方法喚醒目前線程
整體邏輯流程圖
作者:Potato_洋芋
連結:https://juejin.cn/post/7243725147109916731