天天看點

線程池_初步認識_01

線程池_初步認識_01

  • 一、定義
    • 管理一組工作線程。
  • 二、好處
    • 1. 降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗,比如記憶體;
    • 2. 提高響應速度。任務到達時,可以不需要等到線程建立就能執行;
    • 3. 提高線程的可管理性。通過線程池,實作對線程的統一配置設定,調優和監控;比如可以避免無線建立線程引起OutOfMemoryError。
  • 三、實作原理
    • 當向線程池送出一個任務之後,線程池是如何處理這個任務的呢?
      線程池_初步認識_01
      • 上圖就是線程池的主要處理流程;
      • ThreadPoolExecutor 執行execute()方法的四種情況(流程):
        • 1、如果目前運作的線程少于核心線程,則建立新線程來執行任務;(需要注意的是,此操作需要擷取全局鎖)
        • 2、如果運作的線程等于或對于核心線程,則将任務加入工作隊列中;
        • 3、如果工作隊列已滿,則建立新的線程(非核心線程)來處理任務;(需要擷取全局鎖)
        • 4、如果建立線程超出最大線程數,則任務被拒絕,調用飽和政策;(RejectedExecutionHandler.rejectedExecution())
        • 總結: 如此設計的原因?
          • 為了在執行execute()方法時,盡可能地避免擷取全局鎖;因為擷取全局鎖是一個嚴重可伸縮瓶頸。
        • 期望: 每次任務送出的時候,都是目前運作的線程數大于等于核心線程數,此時不用擷取全局鎖,即加入工作隊列。
  • 四、通過ThreadPoolExecutor建立線程池
    • public ThreadPoolExecutor(
      • int corePoolSize,
      • int maximumPoolSize,
      • long keepAliveTime,
      • TimeUnit unit,
      • BlockingQueue<Runnable> workQueue,
      • ThreadFactory threadFactory,
      • RejectedExecutionHandler defaultHandler)
    • 1. corePoolSize
      • 線程池的基本大小;如果要執行的任務數小于線程池基本大小,送出一個任務到線程池後,就會建立一個線程即便有空閑線程;否則,不建立,等待。
    • 2. maximumPoolSize
      • 線程池最大線程數。如果線程池中的線程數大于核心線程數并且隊列滿了,且線程數小于最大線程數,則會建立新的線程。
      • 如果maximumPoolSize與corePoolSize相等,即是固定大小線程池。
      • 如果使用無界隊列,該參數無效;
    • 3. keepAliveTime
      • 空閑線程存活時間。
      • 預設情況下,當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用;當線程池中的線程空閑時,如果空閑時間等于keepAliveTime,線程會被銷毀,直到線程數等于核心線程數,避免浪費記憶體和句柄資源。
      • 當ThreadPoolExecutor的allowCoreThreadTimeOut變量設定為true時,核心線程逾時後也會被回收。
      • 如果任務多,并且每個任務的執行時間較短,增大時間,提高線程的使用率。
    • 4. unit
      • 時間機關
    • 5. BlockingQueue<Runnable> workQueue
      • 任務隊列,用于儲存等待執行的任務的阻塞隊列。包括以下幾種:
        • 1.ArrayBlockingQueue:一個基于數組結構的有界阻塞隊列,FIFO(先進先出)原則對元素進行排序;
        • 2.LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,也是先進先出,一般情況下吞吐量高于ArrayBlockingQueue 。 FixedThreadPool()和SingleThreadExecutor()使用此隊列。
        • 3.SynchronousQueue:一個不存儲元素的阻塞隊列。上一個線程執行移除操作後,之後的插入操作才能執行,否則會一直處于阻塞狀态。吞吐量高于LinkedBlockingQueue。CachedThreadPool()使用此隊列。
        • 4.PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
    • 6. ThreadFactory threadFactory
      • 線程池建立線程使用的工廠;
      • 使用線程池建立線程的時候可以給予更有意義的名稱,便于定位問題。
    • 7. RejectedExecutionHandler defaultHandler
      • 線程池對拒絕任務的處理政策;發生在隊列和線程池都滿了的時候。預設是AbortPolicy,表示無法處理新任務時抛出異常。
        • 1.AbortPolicy : 直接抛出異常;
        • 2.CallerRunsPolicy: 隻用調用者所線上程來運作任務;
        • 3.DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務;
        • 4.DiscardPolicy:不處理,不丢棄;
    • 自定義拒絕政策實作RejectedExecutionHandler接口,實作需要的場景。比如持久化拒絕的任務、記錄日志等等。
  • 五、送出任務到線程池的方法
    • (一)、 通過execute
      • executorService.execute(newRunnable() {
        • @Override
        • public void run() {
          • System.out.println("我要開始運作了");
        • }
      • });
      • 送出不需要傳回值的任務,無法判斷任務是否被線程池執行成功。
    • (二)、 通過submit
      • 1.通過送出Runnable
        • Future future =  executorService.submit(()->{});
      • 2.通過送出Callable
        • Future future =  executorService.submit(() -> {
          • return null;
      • 3.  Runnable 和 Callable 的差別
        • 1. 非常相似,這兩個接口都表示可以由線程或ExecutorService同時執行的任務。
        • 2. 不同之處是兩個接口内部執行的方法不同。
          • Runnable
            • public interface Runnable {
              • public void run();
          • Callable
            • public interface Callable{
              • public Object call() throws Exception;
        • - 可以看出,call()方法是有傳回值的,而且可以引發異常。run沒有傳回值,也不能引發異常(除非未經檢查的異常-RuntimeException的子類)。
      • 4.如果執行的任務需要傳回結果,使用Callable。
    • (三)、兩種執行方法的差別
      • 1. execute 隻能送出Runnable類型的任務;submit 除了可以送出Runnable類型的任務外,還可以送出Callable類型。
      • 2. execute 直接抛出任務執行的異常;submit 會捕獲,可以傳回一個Future類型的對象,通過這個Future對象可以判斷任務是否執行成功,通過get方法擷取傳回值,get()方法會阻塞目前線程直到任務完成,使用get(long timeout,TimeUnit unit)方法會阻塞目前線程一段時間後立即傳回,這時候任務可能沒有執行完。
      • 3. submit 的頂級接口是ExecutorService;execute 的頂級接口是 Executor;
  • 六、擷取傳回結果
    • (一)、 兩種 invokeAny() 和 invokeAll()
    • (二)、invokeAny
      • 1. 當任意一個任務得到結果後,會調用interrupt方法将其他的任務中斷;
      • 2. 部分任務失敗,會使用第一個成功的任務傳回的結果;
      • 3. 任務全部失敗了,抛出Execption,invokeAny 方法将抛出ExecutionException。
    • (三)、invokeAll 傳回所有任務的執行結果,該方法的執行效果也是阻塞執行的,要把所有的結果都取回時再繼續向下執行。
  • 七、關閉線程池
    • 1、shutdown()
      • 1. 将線程池的狀态設定成ShutDown;
      • 2. 中斷所有沒有正在執行任務的線程。在終止前允許執行以前送出的任務;
    • 2、shutdownNow()
      • 1. 将線程池的狀态設定成Stop;
      • 2. 阻止等待任務的啟動并試圖停止目前正在執行的任務,并傳回等待執行任務的清單;不允許執行以前送出的任務。
    • 1、2 兩個方法的原理:
      • 周遊線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,是以無法響應中斷的任務可能永遠無法終止。
      • 調用1、2方法後,isShutdown 方法傳回true。當所有任務都已關閉,才表示線程池關閉成功,調用isTerminaed方法傳回true。
    • 3、awaitTermination()
      • 1. 接收timeout和TimeUnit兩個參數,用于設定逾時時間及機關。當等待超過設定時間時,會監測ExecutorService是否已經
      • 關閉,若關閉則傳回true,否則傳回false。一般情況下會和shutdown方法組合使用。
      • 在實際使用過程中, 使用shutdown()關閉,回收資源。如果有必要,可以在其後執行shutdownNow(),取消所有遺留的任務。
  • 八、配置線程池
    • 角度:
      • 1. 任務的性質: CPU密集型任務、IO密集型任務和混合型任務。
      • 2. 任務的優先級: 高、中與低;
      • 3. 任務的執行時間: 長、短;
      • 4. 任務的依賴性: 是否依賴其他系統資源,如資料庫連接配接。
    • 選擇:
      • 性質不同的任務可以用不同規模的線程池分開處理;
      • CPU 密集型任務應配置盡可能小的線程,如cpu數+1 的線程池;
      • IO密集型任務線程并不是一直在執行,則應配置極可能多的線程;
      • 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理,它可以讓優先級高的任務先執行;
        • 如果一直有優先級高的任務送出到隊列裡,那麼優先級低的任務可能永遠不能執行。
      • 依賴資料庫連接配接的任務,因為線程送出SQL後需要等待資料庫傳回結果,等待的時間越長,則CPU空閑時間就越長,那麼線程數應該設定的越大,這樣才能更好低利用CPU。
    • 建議:
      • 有界隊列;有界隊列可以增加系統的穩定性和預警能力。
      • 展現:
        • 任務線程池的隊列和線程池滿了,不斷抛出抛棄任務的異常,通過排查是資料庫出現問題,導緻執行sql的速度變慢,由于背景任務線程池裡的任務都是需要向資料庫插入資料的和查詢,是以導緻線程池裡的工作線程全部阻塞,任務積壓線上程池裡。如果是無界隊列,線程池中的隊列越來越多,可能撐爆記憶體,導緻系統不可用。
  • 八、例子
    • ExecutorService executorService = Executors.newSingleThreadExecutor();
      • Set<Callable<String>> callables = newHashSet<Callable<String>>();
        • callables.add(() -> {
          • return "Task1";
        • }});
          • return "Task2";
      • List<Future<String>> futures =executorService.invokeAll(callables);
        • for(Future<String> future : futures){
          • System.out.println("future.get = " + future.get());
      • executorService.shutdown();
      • while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
        • System.out.println("線程池沒有關閉");