線程池_02_Executor架構
- 一、Executor 的兩級排程
- 背景知識
- 在HotSpot Vm 的線程模型中,Java 線程被一對一映射為本地作業系統線程。Java 線程啟動時會建立一個本地作業系統線程;當該Java線程終止時,這個作業系統線程也會被回收。
- Executor
- 在上層,Java多線程程式通常把應用分解為若幹個任務,然後使用使用者級的排程器(Executor架構)将這些任務映射為固定數量的線程;換句話說就是應用程式通過Executor架構控制上層的排程。
- 在底層,作業系統核心将這些線程映射到硬體處理器上。換句話說,下層的排程由作業系統核心控制,下層的排程不受應用程式的控制。
- 背景知識
- 二、Executor 的組成
- 1、任務;
- 包括被執行任務需要實作的接口:Runbable或Callable 接口
- 2、任務的執行;
- 包括任務執行機制的核心接口Executor,以及繼承自Executor 的ExecutorService接口。Executor 架構有兩個關鍵類實作了ExecutorService 接口
- 3、異步計算的結果;
- 包括接口Future 和 實作Future接口的FutureTask類。
- 1、任務;
- 三、Executor 使用
- 1、主線程首先建立Runnable或Callable的任務對象A;
- 2、然後把對象A直接交給ExecutorService通過Execute或Submit方法執行;
- 3、如果使用的是submit方法,會傳回一個實作Future接口的對象,主線程可以執行FutureTask.get()方法來等待任務執行完成。也可以執行FutureTask.cancle()來取消任務。
- 四、Executor 主要的類與接口的簡介
- 1、Executor
- 接口,是Executor架構的基礎,它将任務的送出與任務的執行分離開來。
- 2、ThreadPoolExecutor
- 線程池的核心實作類,用來執行被送出的任務
- 3、ScheduledThreadPoolExecutor(定時任務類,這裡不讨論)
- 實作類,可以在給定的延遲後運作指令,或者定期執行指令。
- 4、Future接口和實作Future接口的FutureTask類,代表異步結算的結果
- 5、Runnable 和Callable接口的實作類
- 1、Executor
- 五、Executor 主要的類與接口的詳解
- ThreadPoolExecutor
- 四個核心參數:
- 1、corePool 核心線程池的大小;
- 2、maximumPool 最大線程池的大小;
- 3、BlockingQueue 用來暫時儲存任務的工作隊列;
- 4、RejectedExecutionHandler 當ThreadPoolExecutor 可以關閉或ThreadExecutor 已經飽和時(達到了最大線程池大小并且工作隊列已滿),execute()方法将要調用的Handler。
- ThreadPoolExecutor通常使用工廠類Executors來建立,Executors可以建立3種類型的ThreadPoolExecutor: FixedThreadPool、SingleThreadExecutor、CachedThreadPool;
- 1、FixedThreadPool ,建立固定線程數的API。
線程池_02_Executor架構 - 建立的方法:
線程池_02_Executor架構 - FixedThreadPool 的核心線程和最大線程數是一樣的;線程的空閑存活時間為0,則多餘的空閑線程會被立即終止。
- 執行execute(),流程如下:
- 1、如果目前運作的線程數少于corePoolSize,則建立新線程來執行任務;
- 2、如果目前運作的線程數等于corePoolSize,将任務加入LinkedBlockingQueue;
- 3、線程執行完1中的任務後,會在循環中反複從LinkedBlockingQueue擷取任務來執行。
- 使用無界隊列LinkedBlockingQueue,影響:
- 1、如果目前運作的線程數等于corePoolSize,新任務将在無界隊列中等待,是以線程池中的線程數不會超過corePoolSize;
- 2、maximumPoolSize 與 keepAliveTime 會是無效的;
- 3、運作中的FixedThreadPool 不會拒絕任務(因為工作隊列可以無限增大);
- 建立的方法:
- 2、SingleThreadExecutor,使用單個工作線程的Executor。
-
線程池_02_Executor架構 - 于corePoolSize與maximumPoolSize 設為1,其它參數與FixedThreadPool一樣。
- 執行execute(),流程如下:
- 1、如果線程池中無運作的線程,則建立一個新線程來執行任務;
- 2、目前線程池中有一個運作的線程,将任務加入LinkedBlockingQueue;
- 使用無界隊列LinkedBlockingQueue,與FixedThreadPool一樣的結果。
-
- 3、CachedThreadPool,根據需要建立新線程的線程池。
-
線程池_02_Executor架構 - corePoolSize 為0,maximumPoolSize被設定為Integer.MAX_VALUE,即是maximumPoolSize無界的。keepAliveTime 設為60秒,空閑存活60秒後被終止。
- CachedThreadPool 使用沒有容量的SynchronousQueue作為線程池的工作隊列,但是maximumPoolSize是無界的。這意味着,如果主線程送出任務的速度高于maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端情況下,CachedThreadPool會因為建立過多線程而耗盡CPU和記憶體資源。
-
- 總結:
- FixedThreadPool 與 SingleThreadExecutor 允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,進而引起OOM異常
- CachedThreadPool => 允許建立的線程數為Integer.MAX_VALUE,可能會建立大量的線程,進而引起OOM異常
- 1、FixedThreadPool ,建立固定線程數的API。
- 四個核心參數:
- FutureTask
- 使用:
- 1、由調用線程直接執行FutureTask.run(); ---- 實作了Runnable接口
- 2、通過ExecutorService.submit()方法傳回一個FutureTask; ---- 實作了Future 接口
- 實作原理
- 基于AbstractQueuedSynchronizer(簡稱AQS)。AQS是一個同步架構,它提供通用機制來原子性管理同步狀态、阻塞和喚醒線程,以及維護被阻塞線程的隊列。
- 基于AQS實作的同步器包含兩種類型:
- 1、至少一個acquire操作。這個操作阻塞調用線程,除非/直到AQS的狀态允許這個線程繼續執行。
- 在FutureTask 類中get()和get(long timeout, TimeUnit unit)兩個方法為acquire操作;
- 2、至少一個release操作。這個操作改變AQS的狀态,改變後的狀态可允許一個或多個阻塞線程被解除阻塞。
- 在FutureTask 類中run()和cancel()兩個方法為acquire操作;
- 1、至少一個acquire操作。這個操作阻塞調用線程,除非/直到AQS的狀态允許這個線程繼續執行。
- 使用:
- ThreadPoolExecutor