1. Executors 建立線程池的方法
newFixedThreadPool() 方法,該方法傳回一個固定數量的線程池,該方法的線程數始終不變,當有一個任務送出時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中等待有空閑的線程去執行。适用場景:可用于Web服務瞬時削峰,但需注意長時間持續高峰情況造成的隊列阻塞。
newSingleThreadExecutor() 方法, 建立-一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務列隊中。适用場景:快速處理大量耗時較短的任務,如Netty的NIO接受請求時,可使用CachedThreadPool。
newCachedThreadPool() 方法, 傳回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若用空閑的線程則執行任務,若無任務則不建立線程。并且每一個空閑線程會在60秒後自動回收。
newScheduledThreadPool() 方法,該方法傳回一個SchededExecutorService對象,但該線程池可以指定線程的數量。
檢視源碼可知,上面的 四種方法其實都是執行個體化的
ThreadPoolExecutor 對象
2.自定義線程池
既然我們都知道,上面的四種方法實際上就是 new ThreadPoolExecutor() 來的,是以當上面的四種方法滿足不了我們的需求的時候,我們可以自定義線程池
ExecutorService pool = new ThreadPoolExecutor(
1, // 同時執行的線程數
2, // 最大線程數
0L, // 線程存活時間
TimeUnit.SECONDS, // 線程存活時間機關
new ArrayBlockingQueue<>(3), // 用什麼隊列
new MyThreadFactory(), // 線程工廠可以自定義也可以不寫
new MyPolicy() // 拒絕政策,就是線程池滿了存不進去的時候怎麼辦,
// 這個一般在使用有界隊列的時候才會有這種問題
);
自定義線程池的關鍵在于用什麼隊列
A.有界隊列,比如說 ArrayBlockingQueue
在使用有界隊列時,若有新的任務需要執行,如果線程池實際線程數小于corePoolsize,則優先建立線程,若大于corePoolsize, 則會将任務加入隊列,若隊列已滿,則在總線程數不大于maximumPoolsize的前提下,建立新的線程,若線程數大于maximumPoolSize,則執行拒絕政策。或其他自定義方式。
B.無界隊列,比如說 LinkedBlockingQueue
與有界隊列相比,除非系統資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新任務到來,系統的線程數小于corePoolsize時,則建立線程執行任務。當達到corePoolsize後,就不會繼續增加。若後續仍有新的任務加入,而有沒有空閑的線程資源,則任務直接進入隊列等待。若任務建立和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統記憶體。
C. 拒絕政策,jdk自帶四種,也可以自定義,實作 RejectedExecutionHandler 接口,然後傳入既可
- AbortPolicy:直接抛出異常,系統正常工作
- CallerRunsPolicy:隻要線程池未關閉,該政策直接在調用者線程中,運作目前被丢棄的任務。
- DiscardOldestPolicy:丢棄最老的一個請求,嘗試再次送出目前任務。
- DiscardPolicy:丢棄無法處理的任務,不給予任何處理。
D.ThreadFactory 工廠,可以自定義,實作 ThreadFactory 接口,重寫 newThread 方法即可
代碼示例:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Main2 {
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
1, // 同時執行的線程數
2, // 最大線程數
0L, // 線程存活時間
TimeUnit.SECONDS, // 線程存活時間機關
new ArrayBlockingQueue<>(3), // 用什麼隊列
new MyThreadFactory(), // 線程工廠可以自定義也可以不寫
new MyPolicy() // 拒絕政策,就是線程池滿了存不進去的時候怎麼辦,
// 這個一般在使用有界隊列的時候才會有這種問題
);
Task t1 = new Task("1", "zhangzq");
Task t2 = new Task("2", "licm");
Task t3 = new Task("3", "zhangjy");
Task t4 = new Task("4", "youy");
Task t5 = new Task("5", "zhuyf");
Task t6 = new Task("6", "lixw");
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.shutdown();
}
}
class Task implements Runnable{
private String id;
private String name;
public Task(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Task{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public void run() {
try {
System.out.println( this.toString() );
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyThreadFactory implements ThreadFactory {
private final AtomicInteger num = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r ,"zzq-Thread-"+ num.getAndIncrement() );
System.out.println( t.getName() + " has been create... " );
return t;
}
}
class MyPolicy implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println( "該線程添加失敗..."+r.toString() );
}
}