Java通過Executors提供四種線程池,分别為:
newCachedThreadPool 建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程
newFixedThreadPool:數量固定的線程池,核心線程,當線程處于空閑時,并不會回收,除非線程池被關閉
當所有線程處于活動狀态時,新的任務都會處于等待狀态,直到有線程空閑
建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待
newScheduledThreadPool建立一個定長線程池,支援定時和周期性任務執行
newSingleThreadExecutor建立一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO,LIFO,優先級)執行,所有任務都在同一個線程執行,不存在同步問題。
handler
線程池的飽和政策,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續送出任務,必須采取一種政策處理該任務,線程池提供了4種政策:
1、AbortPolicy:直接抛出異常,預設政策;
2、CallerRunsPolicy:用調用者所在的線程來執行任務;
3、DiscardOldestPolicy:丢棄阻塞隊列中靠最前的任務,并執行目前任務;
4、DiscardPolicy:直接丢棄任務;
當然也可以根據應用場景實作RejectedExecutionHandler接口,自定義飽和政策,如記錄日志或持久化存儲不能處理的任務。
workQueue
用來儲存等待被執行的任務的阻塞隊列,且任務必須實作Runable接口,在JDK中提供了如下阻塞隊列:
1、ArrayBlockingQueue:基于數組結構的有界阻塞隊列,按FIFO排序任務;
2、LinkedBlockingQuene:基于連結清單結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優先級的無界阻塞隊列;
ThreadPoolExecutor:是線程工廠類,利用工廠模式
java線程池底層其實是個數組,在利用隊列實作的,編寫自己的線程池:
public class MyThreadPool{
private ArrayList<MyThead> threads;
private ArrayBlockingQueue<Runnable> taskQueue;
private int threadNum;
private int workThreadNum;
private final ReentrantLock mainLock = new ReentrantLock();
public MyThreadPool(int initPoolNum) {
threadNum = initPoolNum;
threads = new ArrayList<>(initPoolNum);
//任務隊列初始化為線程池線程數的四倍
taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);
threadNum = initPoolNum;
workThreadNum = 0;
}
public void execute(Runnable runnable) {
try {
mainLock.lock();
//線程池未滿,每加入一個任務則開啟一個線程
if(workThreadNum < threadNum) {
MyThead myThead = new MyThead(runnable);
myThead.start();
threads.add(myThead);
workThreadNum++;
}
//線程池已滿,放入任務隊列,等待有空閑線程時執行
else {
//隊列已滿,無法添加時,拒絕任務
if(!taskQueue.offer(runnable)) {
rejectTask();
}
}
} finally {
mainLock.unlock();
}
}
private void rejectTask() {
System.out.println("任務隊列已滿,無法繼續添加,請擴大您的初始化線程池!");
}
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(5);
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行中");
}
};
for (int i = 0; i < 20; i++) {
myThreadPool.execute(task);
}
}
class MyThead extends Thread{
private Runnable task;
public MyThead(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
//該線程一直啟動着,不斷從任務隊列取出任務執行
while (true) {
//如果初始化任務不為空,則執行初始化任務
if(task != null) {
task.run();
task = null;
}
//否則去任務隊列取任務并執行
else {
Runnable queueTask = taskQueue.poll();
if(queueTask != null)
queueTask.run();
}
}
}
}
}