天天看點

java線程池,阿裡為什麼不允許使用Executors?帶着問題基礎Java中的線程池最後

帶着問題

  1. 阿裡Java代碼規範為什麼不允許使用Executors快速建立線程池?
  2. 下面的代碼輸出是什麼?
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        1, //corePoolSize
        100, //maximumPoolSize
        100, //keepAliveTime
        TimeUnit.SECONDS, //unit
        new LinkedBlockingDeque<>(100));//workQueue

for (int i = 0; i < 5; i++) {
    final int taskIndex = i;
    executor.execute(() -> {
        System.out.println(taskIndex);
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}           

A) 0 1 2 3 4 5

B) 0~5 順序不一緻輸出5行

C) 0

基礎

什麼是線程池?

線程池可以通過池看出來是一個資源集,任何池的作用都大同小異,主要是用來減少資源建立、初始化的系統開銷。

建立線程很“貴”嗎?

是的。建立線程的代價是昂貴的。

我們都知道系統中的每個程序有自己獨立的記憶體空間,而被稱為輕量級程序的線程也是需要的。

在JVM中預設一個線程需要使用256k~1M(取決于32位還是64位作業系統)的記憶體。(具體的數組我們不深究,因為随着JVM版本的變化這個預設值随時可能發生變更,我們隻需要知道線程是需要占用記憶體的)

除了記憶體還有更多嗎?

許多文章會将上下文切換、CPU排程列入其中,這邊不将線程排程列入是因為睡眠中的線程不會被排程(OS控制),如果不是睡眠中的線程那麼是一定需要被排程的。

但在JVM中除了建立時的記憶體消耗,還會給GC帶來壓力,如果頻繁建立線程那麼相對的GC的時候也需要回收對應的線程。

線程池的機制?

可以看到線程池是一種重複利用線程的技術,線程池的主要機制就是保留一定的線程數在沒有事情做的時候使之睡眠,當有活幹的時候拿一個線程去運作。

這些牽扯到線程池實作的具體政策。

還有哪些常見的池?

  • 線程池
  • 連接配接池(資料庫連接配接、TCP連接配接等)
  • BufferPool
  • ......

Java中的線程池

UML圖(Java 8)

可以看到真正的實作類有

  1. ThreadPoolExecutor (1.5)
  2. ForkJoinPool (1.7)
  3. ScheduledThreadPoolExecutor (1.5)

今天我們主要談談

ThreadPoolExecutor

也是使用率較高的一個實作。

Executors提供的工廠方法

  1. newCachedThreadPool (ThreadPoolExecutor)
建立一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那麼就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于作業系統(或者說JVM)能夠建立的最大線程大小。
  1. newFixedThreadPool (ThreadPoolExecutor)
建立固定大小的線程池。每次送出一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那麼線程池會補充一個新線程。
  1. newSingleThreadExecutor (ThreadPoolExecutor)
建立一個單線程的線程池。這個線程池隻有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那麼會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的送出順序執行。
  1. newScheduledThreadPool (ScheduledThreadPoolExecutor)
建立一個大小無限的線程池。此線程池支援定時以及周期性執行任務的需求。
  1. newSingleThreadScheduledExecutor (ScheduledThreadPoolExecutor)
建立一個單線程用于定時以及周期性執行任務的需求。
  1. newWorkStealingPool (1.8 ForkJoinPool)
建立一個工作竊取

可以看到各種不同的工廠方法中使用的線程池實作類最終隻有3個,對應關系如下:

工廠方法 實作類
newCachedThreadPool ThreadPoolExecutor
newFixedThreadPool
newSingleThreadExecutor
newScheduledThreadPool ScheduledThreadPoolExecutor
newSingleThreadScheduledExecutor
newWorkStealingPool ForkJoinPool

首先我們看下

ThreadPoolExecutor

的完全構造函數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)           
  1. corePoolSize
核心池大小,除非設定了

allowCoreThreadTimeOut

否則哪怕線程超過空閑時間,池中也要最少要保留這個數目的線程。

需要注意的是,corePoolSize所需的線程并不是立即建立的,需要在送出任務之後進行建立,是以如果有大量的緩存線程數可以先送出一個空任務讓線程池将線程先建立出來,進而提升後續的執行效率。

  1. maximumPoolSize
允許的最大線程數。
  1. keepAliveTime
空閑線程空閑存活時間,核心線程需要

allowCoreThreadTimeOut

為true才會退出。
  1. unit

keepAliveTime

配合,設定

keepAliveTime

的機關,如:毫秒、秒。
  1. workQueue
線程池中的任務隊列。上面提到線程池的主要作用是複用線程來處理任務,是以我們需要一個隊列來存放需要執行的任務,在使用池中的線程來處理這些任務,是以我們需要一個任務隊列。
  1. threadFactory
當線程池判斷需要新的線程時通過線程工程建立線程。
  1. handler
執行被阻止時的處理程式,線程池無法處理。這個與任務隊列相關,比如隊列中可以指定隊列大小,如果超過了這個大小該怎麼辦呢?JDK已經為我們考慮到了,并提供了4個預設實作。

下列是JDK中預設攜帶的政策:

  1. AbortPolicy (預設)
抛出

RejectedExecutionException

異常。
  1. CallerRunsPolicy
調用目前線程池所在的線程去執行。
  1. DiscardPolicy
直接丢棄目前任務。
  1. DiscardOldestPolicy
将最舊的任務丢棄,将目前任務添加到隊列。

容易混淆的參數:corePoolSize maximumPoolSize workQueue

任務隊列、核心線程數、最大線程數的邏輯關系

  1. 當線程數小于核心線程數時,建立線程。
  2. 當線程數大于等于核心線程數,且任務隊列未滿時,将任務放入任務隊列。
  3. 當線程數大于等于核心線程數,且任務隊列已滿
    1. 若線程數小于最大線程數,建立線程
    2. 若線程數等于最大線程數,調用拒絕執行處理程式(預設效果為:抛出異常,拒絕任務)

那麼這三個參數推薦如何設定,有最優值嗎?

由于java對于協程的支援不友好,是以會大量依賴于線程池和線程。

進而這個值沒有最優推薦,需要根據業務需求情況來進行設定。

不同的需求類型可以建立多個不同的線程池來執行。

問題1:阿裡開發規範為什麼不允許Executors快速建立線程池?

參考位址: https://github.com/alibaba/p3c

可以看到原因很簡單

workQueue

參數直接 使用了

new LinkedBlockingQueue<Runnable>()

理論上可以無限添加任務到線程池。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>();
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,
    1,
    0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
}           

如果送出到線程池的任務由問題,比如 sleep 永久,會造成記憶體洩漏,最終導緻OOM。

同時 阿裡還推薦自定義

threadFactory

設定線程名稱便于以後排查問題。

問題2:下面的代碼輸出是什麼?

應該選C。

雖然最大線程數有100但核心線程數為1,任務隊列由100。

滿足了 '當線程數大于等于核心線程數,且任務隊列未滿時,将任務放入任務隊列。' 這個條件。

是以後續添加的任務都會被堵塞。

最後

關于 ThreadPoolExecutor 的邏輯在實際使用的時候會有點奇怪,因為線程池中的線程并沒有超過最大線程數,有沒有一種可能當任務被堵塞很久的時候建立新的線程池來處理呢?

這邊推薦大家使用 newWorkStealingPool,也就是ForkJoinPool。采取了工作竊取的模式。

後續會跟大家一起聊聊 ForkJoinPool。