線程池_初步認識_01
- 一、定義
- 管理一組工作線程。
- 二、好處
- 1. 降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗,比如記憶體;
- 2. 提高響應速度。任務到達時,可以不需要等到線程建立就能執行;
- 3. 提高線程的可管理性。通過線程池,實作對線程的統一配置設定,調優和監控;比如可以避免無線建立線程引起OutOfMemoryError。
- 三、實作原理
- 當向線程池送出一個任務之後,線程池是如何處理這個任務的呢?
線程池_初步認識_01 - 上圖就是線程池的主要處理流程;
- ThreadPoolExecutor 執行execute()方法的四種情況(流程):
- 1、如果目前運作的線程少于核心線程,則建立新線程來執行任務;(需要注意的是,此操作需要擷取全局鎖)
- 2、如果運作的線程等于或對于核心線程,則将任務加入工作隊列中;
- 3、如果工作隊列已滿,則建立新的線程(非核心線程)來處理任務;(需要擷取全局鎖)
- 4、如果建立線程超出最大線程數,則任務被拒絕,調用飽和政策;(RejectedExecutionHandler.rejectedExecution())
- 總結: 如此設計的原因?
- 為了在執行execute()方法時,盡可能地避免擷取全局鎖;因為擷取全局鎖是一個嚴重可伸縮瓶頸。
- 期望: 每次任務送出的時候,都是目前運作的線程數大于等于核心線程數,此時不用擷取全局鎖,即加入工作隊列。
- 當向線程池送出一個任務之後,線程池是如何處理這個任務的呢?
- 四、通過ThreadPoolExecutor建立線程池
- public ThreadPoolExecutor(
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler defaultHandler)
- 1. corePoolSize
- 線程池的基本大小;如果要執行的任務數小于線程池基本大小,送出一個任務到線程池後,就會建立一個線程即便有空閑線程;否則,不建立,等待。
- 2. maximumPoolSize
- 線程池最大線程數。如果線程池中的線程數大于核心線程數并且隊列滿了,且線程數小于最大線程數,則會建立新的線程。
- 如果maximumPoolSize與corePoolSize相等,即是固定大小線程池。
- 如果使用無界隊列,該參數無效;
- 3. keepAliveTime
- 空閑線程存活時間。
- 預設情況下,當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用;當線程池中的線程空閑時,如果空閑時間等于keepAliveTime,線程會被銷毀,直到線程數等于核心線程數,避免浪費記憶體和句柄資源。
- 當ThreadPoolExecutor的allowCoreThreadTimeOut變量設定為true時,核心線程逾時後也會被回收。
- 如果任務多,并且每個任務的執行時間較短,增大時間,提高線程的使用率。
- 4. unit
- 時間機關
- 5. BlockingQueue<Runnable> workQueue
- 任務隊列,用于儲存等待執行的任務的阻塞隊列。包括以下幾種:
- 1.ArrayBlockingQueue:一個基于數組結構的有界阻塞隊列,FIFO(先進先出)原則對元素進行排序;
- 2.LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,也是先進先出,一般情況下吞吐量高于ArrayBlockingQueue 。 FixedThreadPool()和SingleThreadExecutor()使用此隊列。
- 3.SynchronousQueue:一個不存儲元素的阻塞隊列。上一個線程執行移除操作後,之後的插入操作才能執行,否則會一直處于阻塞狀态。吞吐量高于LinkedBlockingQueue。CachedThreadPool()使用此隊列。
- 4.PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
- 任務隊列,用于儲存等待執行的任務的阻塞隊列。包括以下幾種:
- 6. ThreadFactory threadFactory
- 線程池建立線程使用的工廠;
- 使用線程池建立線程的時候可以給予更有意義的名稱,便于定位問題。
- 7. RejectedExecutionHandler defaultHandler
- 線程池對拒絕任務的處理政策;發生在隊列和線程池都滿了的時候。預設是AbortPolicy,表示無法處理新任務時抛出異常。
- 1.AbortPolicy : 直接抛出異常;
- 2.CallerRunsPolicy: 隻用調用者所線上程來運作任務;
- 3.DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務;
- 4.DiscardPolicy:不處理,不丢棄;
- 線程池對拒絕任務的處理政策;發生在隊列和線程池都滿了的時候。預設是AbortPolicy,表示無法處理新任務時抛出異常。
- 自定義拒絕政策實作RejectedExecutionHandler接口,實作需要的場景。比如持久化拒絕的任務、記錄日志等等。
- public ThreadPoolExecutor(
- 五、送出任務到線程池的方法
- (一)、 通過execute
- executorService.execute(newRunnable() {
- @Override
- public void run() {
- System.out.println("我要開始運作了");
- }
- });
- 送出不需要傳回值的任務,無法判斷任務是否被線程池執行成功。
- executorService.execute(newRunnable() {
- (二)、 通過submit
- 1.通過送出Runnable
- Future future = executorService.submit(()->{});
- 2.通過送出Callable
- Future future = executorService.submit(() -> {
- return null;
- Future future = executorService.submit(() -> {
- 3. Runnable 和 Callable 的差別
- 1. 非常相似,這兩個接口都表示可以由線程或ExecutorService同時執行的任務。
- 2. 不同之處是兩個接口内部執行的方法不同。
- Runnable
- public interface Runnable {
- public void run();
- public interface Runnable {
- Callable
- public interface Callable{
- public Object call() throws Exception;
- public interface Callable{
- Runnable
- - 可以看出,call()方法是有傳回值的,而且可以引發異常。run沒有傳回值,也不能引發異常(除非未經檢查的異常-RuntimeException的子類)。
- 4.如果執行的任務需要傳回結果,使用Callable。
- 1.通過送出Runnable
- (三)、兩種執行方法的差別
- 1. execute 隻能送出Runnable類型的任務;submit 除了可以送出Runnable類型的任務外,還可以送出Callable類型。
- 2. execute 直接抛出任務執行的異常;submit 會捕獲,可以傳回一個Future類型的對象,通過這個Future對象可以判斷任務是否執行成功,通過get方法擷取傳回值,get()方法會阻塞目前線程直到任務完成,使用get(long timeout,TimeUnit unit)方法會阻塞目前線程一段時間後立即傳回,這時候任務可能沒有執行完。
- 3. submit 的頂級接口是ExecutorService;execute 的頂級接口是 Executor;
- (一)、 通過execute
- 六、擷取傳回結果
- (一)、 兩種 invokeAny() 和 invokeAll()
- (二)、invokeAny
- 1. 當任意一個任務得到結果後,會調用interrupt方法将其他的任務中斷;
- 2. 部分任務失敗,會使用第一個成功的任務傳回的結果;
- 3. 任務全部失敗了,抛出Execption,invokeAny 方法将抛出ExecutionException。
- (三)、invokeAll 傳回所有任務的執行結果,該方法的執行效果也是阻塞執行的,要把所有的結果都取回時再繼續向下執行。
- 七、關閉線程池
- 1、shutdown()
- 1. 将線程池的狀态設定成ShutDown;
- 2. 中斷所有沒有正在執行任務的線程。在終止前允許執行以前送出的任務;
- 2、shutdownNow()
- 1. 将線程池的狀态設定成Stop;
- 2. 阻止等待任務的啟動并試圖停止目前正在執行的任務,并傳回等待執行任務的清單;不允許執行以前送出的任務。
- 1、2 兩個方法的原理:
- 周遊線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,是以無法響應中斷的任務可能永遠無法終止。
- 調用1、2方法後,isShutdown 方法傳回true。當所有任務都已關閉,才表示線程池關閉成功,調用isTerminaed方法傳回true。
- 3、awaitTermination()
- 1. 接收timeout和TimeUnit兩個參數,用于設定逾時時間及機關。當等待超過設定時間時,會監測ExecutorService是否已經
- 關閉,若關閉則傳回true,否則傳回false。一般情況下會和shutdown方法組合使用。
- 在實際使用過程中, 使用shutdown()關閉,回收資源。如果有必要,可以在其後執行shutdownNow(),取消所有遺留的任務。
- 1、shutdown()
- 八、配置線程池
- 角度:
- 1. 任務的性質: CPU密集型任務、IO密集型任務和混合型任務。
- 2. 任務的優先級: 高、中與低;
- 3. 任務的執行時間: 長、短;
- 4. 任務的依賴性: 是否依賴其他系統資源,如資料庫連接配接。
- 選擇:
- 性質不同的任務可以用不同規模的線程池分開處理;
- CPU 密集型任務應配置盡可能小的線程,如cpu數+1 的線程池;
- IO密集型任務線程并不是一直在執行,則應配置極可能多的線程;
- 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理,它可以讓優先級高的任務先執行;
- 如果一直有優先級高的任務送出到隊列裡,那麼優先級低的任務可能永遠不能執行。
- 依賴資料庫連接配接的任務,因為線程送出SQL後需要等待資料庫傳回結果,等待的時間越長,則CPU空閑時間就越長,那麼線程數應該設定的越大,這樣才能更好低利用CPU。
- 建議:
- 有界隊列;有界隊列可以增加系統的穩定性和預警能力。
- 展現:
- 任務線程池的隊列和線程池滿了,不斷抛出抛棄任務的異常,通過排查是資料庫出現問題,導緻執行sql的速度變慢,由于背景任務線程池裡的任務都是需要向資料庫插入資料的和查詢,是以導緻線程池裡的工作線程全部阻塞,任務積壓線上程池裡。如果是無界隊列,線程池中的隊列越來越多,可能撐爆記憶體,導緻系統不可用。
- 角度:
- 八、例子
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Set<Callable<String>> callables = newHashSet<Callable<String>>();
- callables.add(() -> {
- return "Task1";
- }});
- callables.add(() -> {
-
-
- return "Task2";
-
- List<Future<String>> futures =executorService.invokeAll(callables);
- for(Future<String> future : futures){
- System.out.println("future.get = " + future.get());
- for(Future<String> future : futures){
- executorService.shutdown();
- while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
- System.out.println("線程池沒有關閉");
- Set<Callable<String>> callables = newHashSet<Callable<String>>();
- ExecutorService executorService = Executors.newSingleThreadExecutor();