我們借用JDK自帶的線程池ThreadPoolExecutor講解線程池思想,打算分三個主題來講:
- corePoolSize、maximumPoolSize與workQueue
- ThreadFactory
- RejectedExecutionHandler
本文先講第一個主題。
1、corePoolSize、maximumPoolSize與workQueue
線程池的運作過程:
為了友善測試的時候,了解線程的調用情況,是以改寫了ThreadFactory,線程名稱字首為THREAD-POOL-,按序号遞增,這樣我們就能清楚看到線程調用的是核心線程還是拓展線程。由于JDK線程池是先建立核心線程,後面有需要再建立拓展線程,是以index <= corePoolSize的都是為核心線程,其他的則為拓展線程。
public class AsyncThreadFactory implements ThreadFactory {
private final AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("THREAD-POOL-%s", index.getAndIncrement()));
}
}
測試代碼:
public class AsyncUtilMain {
/**
* 核心線程數
*/
private static final int CORE_POOL_SIZE = 10;
/**
* 最大線程數
*/
private static final int MAX_POOL_SIZE = 100;
/**
* 阻塞隊列長度
*/
private static final int QUEUE_SIZE = 10;
/**
* 需求線程數
*/
private static final int REQUIREMENT_POOL_SIZE = 10;
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 60, TimeUnit.MINUTES, new LinkedBlockingQueue<>(QUEUE_SIZE),
new AsyncThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int index = 0; index < REQUIREMENT_POOL_SIZE; index++) {
int value = index;
threadPoolExecutor.submit(() -> System.out.println(Thread.currentThread().getName() + ":" + value));
}
threadPoolExecutor.shutdown();
}
}
根據上述流程圖,線程池的運作情況會有以下四種:
1、corePoolSize >= 需求線程數
10個核心線程數, 90個可拓展線程數, 阻塞隊列數10個
需求線程數為10個, 是以所有的線程均可被核心線程數調用
是以運作結果應該為1~10号線程(核心線程)完成整個過程
運作結果:
THREAD-POOL-2:1
THREAD-POOL-4:3
THREAD-POOL-3:2
THREAD-POOL-1:0
THREAD-POOL-5:4
THREAD-POOL-6:5
THREAD-POOL-7:6
THREAD-POOL-8:7
THREAD-POOL-9:8
THREAD-POOL-10:9
2、corePoolSize + workQueueSize >= 需求線程數
5個核心線程數, 5個可拓展線程數, 阻塞隊列長度為5
需求線程數為10個, 是以5個核心線程會馬上消費5個需求線程,剩下的5個會在阻塞隊列中等待5個核心線程消費。
是以運作結果應該為1~5号線程(核心線程)完成整個過程
配置代碼:
/**
* 核心線程數
*/
private static final int CORE_POOL_SIZE = 5;
/**
* 最大線程數
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 阻塞隊列長度
*/
private static final int QUEUE_SIZE = 5;
/**
* 需求線程數
*/
private static final int REQUIREMENT_POOL_SIZE = 10;
運作結果:
THREAD-POOL-3:2
THREAD-POOL-4:3
THREAD-POOL-1:0
THREAD-POOL-2:1
THREAD-POOL-1:7
THREAD-POOL-4:6
THREAD-POOL-5:4
THREAD-POOL-3:5
THREAD-POOL-1:9
THREAD-POOL-2:8
3、corePoolSize + workQueueSize < 需求線程數
5個核心線程數, 5個可拓展線程數, 阻塞隊列長度為3
需求線程數為10個, 是以5個核心線程會馬上消費5個需求線程,剩下的3個會在阻塞由于阻塞隊列溢出,那麼剩下的兩個需求線程将會觸發可拓展線程,是以實際運作的線程應該為1~5号線程(核心線程)和 6 ~ 7号線程(可拓展線程)
配置代碼:
/**
* 核心線程數
*/
private static final int CORE_POOL_SIZE = 5;
/**
* 最大線程數
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 阻塞隊列長度
*/
private static final int QUEUE_SIZE = 3;
/**
* 需求線程數
*/
private static final int REQUIREMENT_POOL_SIZE = 10;
運作結果:
THREAD-POOL-1:0
THREAD-POOL-6:8
THREAD-POOL-1:5
THREAD-POOL-6:6
THREAD-POOL-5:4
THREAD-POOL-2:1
THREAD-POOL-4:3
THREAD-POOL-7:9
THREAD-POOL-3:2
THREAD-POOL-1:7
4、maximumPoolSize+ workQueueSize + < 需求線程數
這種情況是最大線程數和阻塞隊列都不足以滿足需求線程數,那麼這時就會觸發線程池配置的拒絕政策:
配置代碼:
/**
* 核心線程數
*/
private static final int CORE_POOL_SIZE = 5;
/**
* 最大線程數
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 阻塞隊列長度
*/
private static final int QUEUE_SIZE = 3;
/**
* 需求線程數
*/
private static final int REQUIREMENT_POOL_SIZE = 15;
運作結果:
THREAD-POOL-10:12
THREAD-POOL-5:4
THREAD-POOL-4:3
Exception in thread “main” THREAD-POOL-5:5
THREAD-POOL-6:8
THREAD-POOL-7:9
THREAD-POOL-9:11
THREAD-POOL-8:10
THREAD-POOL-2:1
THREAD-POOL-10:7
THREAD-POOL-4:6
THREAD-POOL-1:0
THREAD-POOL-3:2
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 10, active threads = 8, queued tasks = 1, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at cn.gaaidou.web.server.socket.AsyncUtilMain.main(AsyncUtilMain.java:40)
由于代碼配置的拒絕政策是ThreadPoolExecutor.AbortPolicy,是以線程池直接抛出異常。