天天看點

萬字探索線程池:優化并發任務的利器

作者:程式猿凱撒

前言

在Java中,使用線程來開發支援多任務并行的程式是非常友善的。但是,在實際應用中不建議大家直接“new”一個線程去處理任務,因為線程會消耗CPU資源,當在一個程序中建立大量的線程時,不僅不會提升程式的性能,反而會影響任務的執行效率。同時,線程的頻繁建立和銷毀,會因為配置設定記憶體和回收記憶體而占用CPU資源,進而影響性能。為了解決這些問題,Java引入了線程池技術。線程池實際上運用的是一種池化技術,所謂池化技術就是提前建立好大量的“資源”儲存在某個容器中,在需要使用時,可以直接從該容器中擷取對應的資源進行處理,用完之後回收以便下次繼續使用。

線程池的建立

線程過多會帶來額外的開銷,其中包括建立銷毀線程的開銷、排程線程的開銷等等,同時也降低了計算機的整體性能。線程池維護多個線程,等待監督管理者配置設定可并發執行的任務。這種做法,一方面避免了處理任務時建立銷毀線程開銷的代價,另一方面避免了線程數量膨脹導緻的過分排程問題,保證了對核心的充分利用。

Executors建立線程池的方法

  1. newFixedThreadPool(int nThreads): 固定大小線程池,特點是線程數固定,使用無界隊列,适用于任務數量不均勻的場景、對記憶體壓力不敏感但系統負載比較敏感的場景;
java複制代碼public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}
           
  1. newCachedThreadPool(): Cached 線程池,特點是不限制線程數,适用于要求低延遲的短期任務場景;
java複制代碼public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}
           
  1. newSingleThreadExecutor(): 單線程線程池,就是一個線程的固定線程池,适用于需要異步執行但需要保證任務順序的場景;
java複制代碼public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>()));  
}
           
  1. newScheduledThreadPool(int corePoolSize): Scheduled 線程池,适用于定期執行任務場景,支援按固定頻率定期執行和按固定延時定期執行兩種方式;
java複制代碼public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}

// ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  
        new DelayedWorkQueue());  
}
           
  1. newWorkStealingPool(): 工作竊取線程池,使用的是 ForkJoinPool,是固定并行度的多任務隊列,适合任務執行時長不均勻的場景。
java複制代碼public static ExecutorService newWorkStealingPool() {  
    return new ForkJoinPool  
        (Runtime.getRuntime().availableProcessors(),  
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,  
        null, true);  
}
           

這些方法的差別在于線程池的類型和行為:

  • 固定線程池、緩存線程池和單線程池都屬于ThreadPoolExecutor的不同配置方式,提供了不同的線程池特性和行為。
  • 定時任務線程池是基于ScheduledThreadPoolExecutor實作的(父類也是ThreadPoolExecutor),可以執行定時或周期性任務。

為什麼大公司都不推薦Executors建立線程池

在大公司中,通常會避免直接使用 Executors 類中提供的簡單工廠方法來建立線程池,而更傾向于使用 ThreadPoolExecutor 類進行手動配置和建立線程池。這是因為 Executors 類提供的工廠方法在某些情況下可能會導緻線程池的不合理配置,進而影響系統的性能和穩定性。

以下是一些原因,為什麼大公司不直接使用 Executors 建立線程池:

  1. 隐式的配置:Executors 類提供的工廠方法會隐藏線程池的一些關鍵配置參數,如核心線程數、最大線程數和任務隊列類型等。這可能導緻使用者無法直接設定這些參數,進而無法針對具體的應用場景進行優化。
  2. 任務隊列選擇:不同的應用場景可能需要不同類型的任務隊列,例如有界隊列或無界隊列。Executors 類的工廠方法通常使用無界隊列,這可能會導緻任務積壓,造成記憶體溢出等問題。
  3. 拒絕政策限制:Executors 類提供的工廠方法通常使用預設的拒絕政策,例如抛出異常或直接丢棄任務。在實際的生産環境中,需要根據業務需求選擇合适的拒絕政策,如将任務放入隊列、執行特定的補償邏輯等。
  4. 線程池飽和政策:Executors 類提供的工廠方法可能采用一種飽和政策,即當任務隊列已滿并且線程數達到最大線程數時,會建立新的線程來處理任務。這可能導緻系統資源耗盡,造成性能下降或系統崩潰。

通過直接使用 ThreadPoolExecutor 類,開發人員可以手動配置線程池的各項參數,包括核心線程數、最大線程數、任務隊列類型、拒絕政策等,以适應具體的業務需求和系統負載情況。這樣可以更加精細地控制線程池的行為,提高系統的性能、穩定性和可調試性。

線程池原理

ThreadPoolExecutor的實作

ThreadPoolExecutor實作細節

ThreadPoolExecutor 是 Java 中提供的一個實作了 ExecutorService 接口的線程池實作類,它提供了更豐富的配置選項和靈活性。以下是 ThreadPoolExecutor 的具體實作細節:

java複制代碼public class ThreadPoolExecutor extends AbstractExecutorService {
    // 核心線程數,線程池中一直保持活動的線程數量
    private volatile int corePoolSize;
    
    // 最大線程數,線程池中允許的最大線程數量
    private volatile int maximumPoolSize;
    
    // 線程空閑時間,當線程池中的線程數超過核心線程數時,空閑線程的最大存活時間
    private volatile long keepAliveTime;
    
    // 時間機關,用于指定 keepAliveTime 的機關
    private final TimeUnit unit;
    
    // 任務隊列,用于存儲尚未被執行的任務
    private final BlockingQueue<Runnable> workQueue;
    
    // 線程工廠,用于建立新的線程
    private final ThreadFactory threadFactory;
    
    // 拒絕政策,用于處理無法送出的任務
    private final RejectedExecutionHandler handler;
    
    // 線程池中的目前線程數量及狀态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    // 線程池的執行方法
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 如果核心線程數未滿,則建立新的核心線程執行任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 嘗試将任務添加到隊列中
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) // 如果線程池狀态發生變化,則嘗試從隊列中移除任務
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 如果任務無法放入隊列,則嘗試建立新的非核心線程執行任務
            reject(command);
    }
}

           

ThreadPoolExecutor 類具有核心線程數、最大線程數、任務隊列等屬性,還包含了用于管理線程池狀态和執行任務的方法。它使用 AtomicInteger 對線程池中的線程數量進行原子更新,根據目前線程池狀态和任務情況,動态地建立新線程、放入隊列或拒絕任務。

任務隊列

Java中的線程池架構提供了幾種常見的任務隊列類型,每種類型都有不同的特點和适用場景。

隊列 特點 适用場景
直接送出隊列(SynchronousQueue) 這是一個沒有存儲能力的隊列,它要求線程池立即處理任務,如果線程池的線程都在忙碌,那麼新任務将會被拒絕 适用于需要立即執行任務的場景,通常用于限制線程池的最大并發數。
無界隊列(LinkedBlockingQueue) 這是一個沒有固定容量限制的隊列,可以無限制地添加新任務。當線程池中的線程都在忙碌時,新任務将會被放入隊列等待執行 适用于任務量較大、任務執行時間較長的場景,可以保證盡可能多的任務被接受和處理。
有界隊列(ArrayBlockingQueue) 這是一個具有固定容量的隊列,可以指定隊列的最大容量。當線程池中的線程都在忙碌時,新任務将會被放入隊列等待執行。如果隊列已滿,新任務将會被拒絕 适用于控制線程池的最大并發數和任務隊列的容量,可以避免任務送出過快導緻記憶體溢出。
優先級隊列(PriorityBlockingQueue) 這是一個基于優先級的隊列,可以根據任務的優先級順序來執行任務。具有較高優先級的任務将會被優先執行 适用于根據任務優先級來排程任務執行的場景,可以實作任務的有序執行。

需要根據任務的特性、并發需求、資源限制等因素進行權衡和選擇,以獲得最佳的性能和效果。

拒絕政策

線程池架構提供了四種常見的拒絕政策,用于處理當線程池無法接受新任務時的情況。這些拒絕政策分别是:

拒絕政策 特點 适用場景
AbortPolicy(預設) 該政策是預設的拒絕政策。當線程池無法接受新任務時,會直接抛出 RejectedExecutionException 異常,阻止新任務的送出。 适用于在任務送出被拒絕時,立即通知調用者并停止系統的運作。
CallerRunsPolicy 當線程池無法接受新任務時,該政策将使用調用線程來執行被拒絕的任務,即在調用線程中執行任務的 run() 方法 适用于要求任務不被丢棄且可以在調用線程中執行的場景。這可能會降低系統的吞吐量,但可以提供一種适應性,避免任務丢失。
DiscardPolicy 當線程池無法接受新任務時,該政策會默默地丢棄被拒絕的任務,不做任何處理 适用于對任務丢失不敏感的場景,不需要通知或記錄被丢棄的任務。
DiscardOldestPolicy 當線程池無法接受新任務時,該政策會丢棄隊列中最舊的任務,然後嘗試重新送出被拒絕的任務 适用于對最舊任務優先級較低的場景,可以在任務被丢棄的同時,盡量保留較新的任務。

線程池執行流程

向線程送出任務時可以使用 execute 和 submit,差別就是 submit 可以傳回一個 future 對象,通過 future 對象可以了解任務執行情況,可以取消任務的執行,還可擷取執行結果或執行異常。submit 最終也是通過 execute 執行的。

向線程池送出任務時的執行順序如下圖所示。

萬字探索線程池:優化并發任務的利器
  • 向線程池送出任務時,會首先判斷線程池中的線程數是否大于設定的核心線程數,如果不大于,就建立一個核心線程來執行任務。
  • 如果大于核心線程數,就會判斷緩沖隊列是否滿了,如果沒有滿,則放入隊列,等待線程空閑時執行任務。
  • 如果隊列已經滿了,則判斷是否達到了線程池設定的最大線程數,如果沒有達到,就建立新線程來執行任務。
  • 如果已經達到了最大線程數,則執行指定的拒絕政策。

線程池狀态

萬字探索線程池:優化并發任務的利器

線程池有5種狀态,狀态說明如下:

  • RUNNING,運作狀态,可以接收新的任務并處理,可以處理阻塞隊列中的任務。
  • SHUTDOWN,關閉狀态,不接收新的任務,但是可以繼續處理阻塞隊列中的任務。
  • STOP,停止狀态,不接收新的任務,不處理阻塞隊列中的任務,同時會中斷正在處理的任務。
  • TIDYING,過渡狀态,該狀态意味着所有的任務都執行完了,并且線程池中已經沒有有效的工作線程。該狀态下會調用terminated()方法進入TERMINATED狀态。
  • TERMINATED,終止狀态,terminated()方法調用完成以後的狀态。

線程池設定

線程池線程數設定多少比較合适

《Java并發程式設計實戰》一書給了推薦設定:

萬字探索線程池:優化并發任務的利器
  • Ncpu表示CPU的數量,可以通過Runtime.getRuntime().availableProcessors()獲得。
  • Ucpu表示期望的CPU的使用率。
  • W/C表示等待時間與計算時間的比例。
  • Nthreads表示線程數量的計算公式。

假設CPU使用率是100%,那麼Nthreads=Ncpu×(1+W/C),也就意味着W/C的值越大,那麼線程數量越多,反之線程數量越少。 我們還要看目前線程池中要執行的任務是屬于I/O密集型還是CPU密集型。

  • I/O密集型:就是線程頻繁需要和磁盤或者遠端網絡通信,這種場景中磁盤的耗時和網絡通信的耗時較大,意味着線程處于阻塞期間,不會占用CPU資源,是以線程數量設定超過CPU核心數并不會造成問題。
  • CPU密集型:就是對CPU的使用率較高的場景,比如循環、遞歸、邏輯運算等,這種情況下線程數量設定越少,就越能減少CPU的上下文頻繁切換。

有一種建議如下,其中N表示CPU的核心數量。

  • CPU密集型,線程池大小設定為N+1。
  • IO密集型,線程池大小設定為2N+1。

之是以需要+1,是因為這樣設定以後,線程在某個時刻發生一個頁錯誤或者因為其他原因暫停時,剛好有一個額外的線程可以確定CPU周期不會中斷。

這些設定隻是一些建議和參考,并不是絕對的規則。實際的設定應該根據具體的應用場景、任務特性和系統資源來進行調整和優化。通過測試和監控,觀察線程池的性能名額,并根據實際情況進行适當的調整,以達到最佳的性能和資源利用效果。

如何動态設定線程池參數

設定線程池線程數

當需要動态調整線程池大小時,可以按照以下順序來設定最小線程數和最大線程數:

  1. 調大線程池:
  2. 首先,增加最大線程數,以便線程池可以容納更多的線程。
  3. 然後,增加最小線程數,以確定在任務到達時能夠立即處理。
  4. 調小線程池:
  5. 首先,降低最小線程數,以允許空閑線程在一段時間後被回收。
  6. 然後,降低最大線程數,以限制線程池的最大容量。
java複制代碼// 建立線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

// 送出任務到線程池中
executor.execute(() -> {
    // 任務邏輯
});

// 調大線程池
executor.setMaximumPoolSize(15);
executor.setCorePoolSize(8);

// 調小線程池
executor.setCorePoolSize(5);
executor.setMaximumPoolSize(8);

           

在上述示例中,首先通過 setMaximumPoolSize() 方法增加最大線程數,然後通過 setCorePoolSize() 方法增加最小線程數,以調大線程池。而在調小線程池時,先通過 setCorePoolSize() 方法降低最小線程數,再通過 setMaximumPoolSize() 方法降低最大線程數。

這樣的設定順序可以確定線程池能夠根據任務負載的變化進行适當調整,并在需要時及時建立或回收線程,以優化性能和資源利用。

設定隊列大小

當遇到請求數過多但是隊列設定太小時,想要動态調整隊列大小怎麼做呢?在ThreadPoolExecutor中并沒有提供隊列長度修改方法。以LinkedBlockingQueue為例,它的成員變量capacity是被final修飾的,隻能在構造方法中初始化,是以也沒辦法動态設定capacity的大小。 我們可以把LinkedBlockingQueue複制一份,然後提供一個修改setCapacity的方法。

萬字探索線程池:優化并發任務的利器

線程池參數傳遞

線上程池中傳遞參數,可以通過以下幾種方式實作:

使用任務的構造函數或方法

可以在建立任務對象時,将參數作為構造函數或方法的參數傳遞進去。任務執行時可以直接使用這些參數。例如:

java複制代碼public class MyTask implements Runnable {
    private final int parameter;

    public MyTask(int parameter) {
        this.parameter = parameter;
    }

    @Override
    public void run() {
        // 使用參數執行任務邏輯
        // ...
    }
}

// 建立線程池
ExecutorService executor = Executors.newFixedThreadPool(5);

// 送出任務并傳遞參數
int parameterValue = 123;
executor.execute(new MyTask(parameterValue));

           

使用 ThreadLocal

ThreadLocal 可以讓每個線程都持有一個獨立的變量副本。可以将參數設定到 ThreadLocal 中,在任務執行時從 ThreadLocal 中擷取參數。這樣可以實作線程隔離的參數傳遞。例如:

java複制代碼// 建立線程池
ExecutorService executor = Executors.newFixedThreadPool(5);

// 定義 ThreadLocal 對象
ThreadLocal<Integer> parameter = new ThreadLocal<>();

// 設定參數
int parameterValue = 123;
parameter.set(parameterValue);

// 送出任務并使用 ThreadLocal 傳遞參數
executor.execute(() -> {
    // 使用參數執行任務邏輯
    // 可以通過 parameter.get() 擷取參數值
    int value = parameter.get();
    // ...
});
           

transmittable-thread-local傳遞

使用線程池和ThreadLocal共用可能會存在資料不一緻的情況。這是因為線程池會複用線程對象,Threadlocal為每個線程維護了獨立的變量副本,這就會導緻目前線程會取到其他線程設定的ThreadLocal的值。

另外,如果需要線上程池中共享資料,而不會受到線程複用和資料不一緻的影響,可以考慮使用 ThreadLocal 的替代方案,如使用 InheritableThreadLocal 或者在任務中顯式傳遞參數來共享資料。

阿裡巴巴開源的 TransmittableThreadLocal(TTL)。TransmittableThreadLocal類繼承并加強了 JDK 内置的InheritableThreadLocal類,在使用線程池等會池化複用線程的執行元件情況下,提供ThreadLocal值的傳遞功能,解決異步執行時上下文傳遞的問題。

作者:scoop

連結:https://juejin.cn/post/7241184271318237245

繼續閱讀