JDK線程池一瞥
我們首先看一個JDK給我們提供的線程池
ThreadPoolExecutor
的構造函數的參數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
參數解釋:
- corePoolSize:這個參數你可以了解為線程池當中至少需要 corePoolSize 個線程,初始時線程池當中線程的個數為0,當線程池當中線程的個數小于 corePoolSize 每次送出一個任務都會建立一個線程,并且先執行這個送出的任務,然後再去任務隊列裡面去擷取新的任務,然後再執行。
- maximumPoolSize:這個參數指的是線程池當中能夠允許的最大的線程的數目,當任務隊列滿了之後如果這個時候有新的任務想要加入隊列當中,當發現隊列滿了之後就建立新的線程去執行任務,但是需要滿足最大的線程的個數不能夠超過 maximumPoolSize 。
- keepAliveTime 和 unit:這個主要是用于時間的表示,當隊列當中多長時間沒有資料的時候線程自己退出,前面談到了線程池當中任務過多的時候會超過 corePoolSize ,當線程池閑下來的時候這些多餘的線程就可以退出了。
- workQueue:這個就是用于儲存任務的阻塞隊列。
- threadFactory:這個參數倒不是很重要,線程工廠。
- handler:這個表示拒絕政策,JDK給我們提供了四種政策:
- AbortPolicy:抛出異常。
- DiscardPolicy:放棄這個任務。
- CallerRunPolicy:送出任務的線程執行。
- DiscardOldestPolicy:放棄等待時間最長的任務。
基于上面談到的參數,線程池當中送出任務的流程大緻如下圖所示:
自己動手實作線程池
根據前面的參數分析我們自己實作的線程池需要實作一下功能:
- 能夠送出Runnable的任務和Callable的任務。
- 線程池能夠自己實作動态的擴容和所容,動态調整線程池當中線程的數目,當任務多的時候能夠增加線程的數目,當任務少的時候多出來的線程能夠自動退出。
- 有自己的拒絕政策,當任務隊列滿了,線程數也達到最大的時候,需要拒絕送出的任務。
線程池參數介紹
private AtomicInteger ct = new AtomicInteger(0); // 目前在執行任務的線程個數
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit unit;
private BlockingQueue<Runnable> taskQueue;
private RejectPolicy policy;
private ArrayList<Worker> workers = new ArrayList<>();
private volatile boolean isStopped;
private boolean useTimed;
參數解釋如下:
- ct:表示目前線程池當中線程的個數。
- corePoolSize:線程池當中核心線程的個數,意義和上面談到的JDK的線程池意義一緻。
- maximumPoolSize:線程池當中最大的線程個數,意義和上面談到的JDK的線程池意義一緻。
- keepAliveTime 和 unit:和JDK線程池的參數意義一緻。
- taskQueue:任務隊列,用不儲存送出的任務。
- policy:拒絕政策,主要有一下四種政策:
public enum RejectPolicy {
ABORT,
CALLER_RUN,
DISCARD_OLDEST,
DISCARD
}
- workers:用于儲存工作線程。
- isStopped:線程池是否被關閉了。
- useTimed:主要是用于表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的時間内,如果沒有從任務隊列當中擷取到任務,線程就從線程池退出,但是需要保證線程池當中最小的線程個數不小于 corePoolSize 。
實作Runnable
// 下面這個方法是向線程池送出任務
public void execute(Runnable runnable) throws InterruptedException {
checkPoolState();
if (addWorker(runnable, false) // 如果能夠加入新的線程執行任務 加入成功就直接傳回
|| !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 傳回 false 說明送出任務失敗 任務隊列已經滿了
|| addWorker(runnable, true)) // 使用能夠使用的最大的線程數 (maximumPoolSize) 看是否能夠産生新的線程
return;
// 如果任務隊列滿了而且不能夠加入新的線程 則拒絕這個任務
if (!taskQueue.offer(runnable))
reject(runnable);
}
在上面的代碼當中:
- checkPoolState函數是檢查線程池的狀态,當線程池被停下來之後就不能夠在送出任務:
private void checkPoolState() {
if (isStopped) {
// 如果線程池已經停下來了,就不在向任務隊列當中送出任務了
throw new RuntimeException("thread pool has been stopped, so quit submitting task");
}
}
- addWorker函數是往線程池當中送出任務并且産生一個線程,并且這個線程執行的第一個任務就是傳遞的參數。max表示線程的最大數目,max == true 的時候表示使用 maximumPoolSize 否則使用 corePoolSize,當傳回值等于 true 的時候表示執行成功,否則表示執行失敗。
/**
*
* @param runnable 需要被執行的任務
* @param max 是否使用 maximumPoolSize
* @return boolean
*/
public synchronized boolean addWorker(Runnable runnable, boolean max) {
if (ct.get() >= corePoolSize && !max)
return false;
if (ct.get() >= maximumPoolSize && max)
return false;
Worker worker = new Worker(runnable);
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
thread.start();
return true;
}
實作Callable
這個函數其實比較簡單,隻需要将傳入的Callable對象封裝成一個FutureTask對象即可,因為FutureTask實作了Callable和Runnable兩個接口,然後将這個結果傳回即可,得到這個對象,再調用對象的 get 方法就能夠得到結果。
public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
checkPoolState();
FutureTask<V> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask;
}
拒絕政策的實作
根據前面提到的各種政策的具體實作方式,具體的代碼實作如下所示:
private void reject(Runnable runnable) throws InterruptedException {
switch (policy) {
case ABORT:
throw new RuntimeException("task queue is full");
case CALLER_RUN:
runnable.run();
case DISCARD: // 直接放棄這個任務
return;
case DISCARD_OLDEST:
// 放棄等待時間最長的任務 也就是隊列當中的第一個任務
taskQueue.poll();
execute(runnable); // 重新執行這個任務
}
}
線程池關閉實作
一共兩種方式實作線程池關閉:
- 直接關閉線程池,不管任務隊列當中的任務是否被全部執行完成。
- 安全關閉線程池,先等待任務隊列當中所有的任務被執行完成,再關閉線程池,但是在這個過程當中不允許繼續送出任務了,這一點已經在函數 checkPoolState 當中實作了。
// 強制關閉線程池
public synchronized void stop() {
isStopped = true;
for (Worker worker : workers) {
worker.stopWorker();
}
}
public synchronized void shutDown() {
// 先表示關閉線程池 線程就不能再向線程池送出任務
isStopped = true;
// 先等待所有的任務執行完成再關閉線程池
waitForAllTasks();
stop();
}
private void waitForAllTasks() {
// 當線程池當中還有任務的時候 就不退出循環
while (taskQueue.size() > 0) {
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
工作線程的工作實作
@Override
public void run() {
// 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓線程直接執行第一個任務 不需要
// 放入任務隊列再取出來執行了
firstTask.run();
thisThread = Thread.currentThread();
while (!isStopped) {
try {
// 是否使用時間就在這裡顯示出來了
Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
if (task == null) {
int i;
boolean exit = true;
// 如果目前線程數大于核心線程數 則使用 CAS 去退出 用于保證線上程安全下的退出
// 且保證線程的個數不小于 corePoolSize 下面這段代碼需要仔細分析一下
if (ct.get() > corePoolSize) {
do{
i = ct.get();
if (i <= corePoolSize) {
exit = false;
break;
}
}while (!ct.compareAndSet(i, i - 1));
if (exit) {
return;
}
}
}else {
task.run();
}
} catch (InterruptedException e) {
// do nothing
}
}
}
我們現在來仔細分析一下,線程退出線程池的時候是如何保證線程池當中總的線程數是不小于 corePoolSize 的!首先整體的架構是使用 CAS 進行實作,具體代碼為 do ... while 操作,然後在 while 操作裡面使用 CAS 進行測試替換,如果沒有成功再次擷取 ,當線程池當中核心線程的數目小于等于 corePoolSize 的時候也需要退出循環,因為線程池當中線程的個數不能小于 corePoolSize 。是以使用 break 跳出循環的線程是不會退出線程池的。
線程池實作的BUG
在我們自己實作的線程池當中當線程退出的時候,workers 當中還儲存這指向這個線程的對象,但是當線程退出的時候我們還沒有在 workers 當中删除這個對象,是以這個線程對象不會被垃圾回收器收集掉,但是我們這個隻是一個線程池實作的例子而已,并不用于生産環境,隻是為了幫助大家了解線程池的原理。
完整代碼
package cscore.concurrent.java.threadpoolv2;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPool {
private AtomicInteger ct = new AtomicInteger(0); // 目前在執行任務的線程個數
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit unit;
private BlockingQueue<Runnable> taskQueue;
private RejectPolicy policy;
private ArrayList<Worker> workers = new ArrayList<>();
private volatile boolean isStopped;
private boolean useTimed;
public int getCt() {
return ct.get();
}
public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
, int maxTasks) {
// please add -ea to vm options to make assert keyword enable
assert corePoolSize > 0;
assert maximumPoolSize > 0;
assert keepAliveTime >= 0;
assert maxTasks > 0;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.unit = unit;
this.policy = policy;
this.keepAliveTime = keepAliveTime;
taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
useTimed = keepAliveTime != 0;
}
/**
*
* @param runnable 需要被執行的任務
* @param max 是否使用 maximumPoolSize
* @return boolean
*/
public synchronized boolean addWorker(Runnable runnable, boolean max) {
if (ct.get() >= corePoolSize && !max)
return false;
if (ct.get() >= maximumPoolSize && max)
return false;
Worker worker = new Worker(runnable);
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
thread.start();
return true;
}
// 下面這個方法是向線程池送出任務
public void execute(Runnable runnable) throws InterruptedException {
checkPoolState();
if (addWorker(runnable, false) // 如果能夠加入新的線程執行任務 加入成功就直接傳回
|| !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 傳回 false 說明送出任務失敗 任務隊列已經滿了
|| addWorker(runnable, true)) // 使用能夠使用的最大的線程數 (maximumPoolSize) 看是否能夠産生新的線程
return;
// 如果任務隊列滿了而且不能夠加入新的線程 則拒絕這個任務
if (!taskQueue.offer(runnable))
reject(runnable);
}
private void reject(Runnable runnable) throws InterruptedException {
switch (policy) {
case ABORT:
throw new RuntimeException("task queue is full");
case CALLER_RUN:
runnable.run();
case DISCARD:
return;
case DISCARD_OLDEST:
// 放棄等待時間最長的任務
taskQueue.poll();
execute(runnable);
}
}
private void checkPoolState() {
if (isStopped) {
// 如果線程池已經停下來了,就不在向任務隊列當中送出任務了
throw new RuntimeException("thread pool has been stopped, so quit submitting task");
}
}
public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
checkPoolState();
FutureTask<V> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask;
}
// 強制關閉線程池
public synchronized void stop() {
isStopped = true;
for (Worker worker : workers) {
worker.stopWorker();
}
}
public synchronized void shutDown() {
// 先表示關閉線程池 線程就不能再向線程池送出任務
isStopped = true;
// 先等待所有的任務執行完成再關閉線程池
waitForAllTasks();
stop();
}
private void waitForAllTasks() {
// 當線程池當中還有任務的時候 就不退出循環
while (taskQueue.size() > 0) {
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Worker implements Runnable {
private Thread thisThread;
private final Runnable firstTask;
private volatile boolean isStopped;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
@Override
public void run() {
// 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓線程直接執行第一個任務 不需要
// 放入任務隊列再取出來執行了
firstTask.run();
thisThread = Thread.currentThread();
while (!isStopped) {
try {
Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
if (task == null) {
int i;
boolean exit = true;
if (ct.get() > corePoolSize) {
do{
i = ct.get();
if (i <= corePoolSize) {
exit = false;
break;
}
}while (!ct.compareAndSet(i, i - 1));
if (exit) {
return;
}
}
}else {
task.run();
}
} catch (InterruptedException e) {
// do nothing
}
}
}
public synchronized void stopWorker() {
if (isStopped) {
throw new RuntimeException("thread has been interrupted");
}
isStopped = true;
thisThread.interrupt();
}
}
}
線程池測試
package cscore.concurrent.java.threadpoolv2;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);
for (int i = 0; i < 10; i++) {
RunnableFuture<Integer> submit = pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " output a");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
});
System.out.println(submit.get());
}
int n = 15;
while (n-- > 0) {
System.out.println("Number Threads = " + pool.getCt());
Thread.sleep(1000);
}
pool.shutDown();
}
}
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2