天天看點

Java多線程之線程池ThreadPool詳解1. 線程池簡介2. 線程池特點3. 線程池核心參數4. 線程池工具類5. 線程池工作原理6. 不建議使用Executors建立線程池7. 實作驗證

Java多線程之線程池ThreadPool詳解

  • 1. 線程池簡介
  • 2. 線程池特點
  • 3. 線程池核心參數
    • 3.1 核心參數
    • 3.2 拒絕政策
  • 4. 線程池工具類
    • 4.1 四種線程池建立方式
  • 5. 線程池工作原理
  • 6. 不建議使用Executors建立線程池
  • 7. 實作驗證

1. 線程池簡介

線程池(英語: thread pool):一種線程使用模式。線程過多會帶來排程開銷,進而影響緩存局部性和整體性能。而線程池維護着多個線程,等待着監督管理者配置設定可并發執行的任務。這避免了在處理短時間任務時建立與銷毀線程的代價。線程池不僅能夠保證核心的充分利用,還能防止過分排程。

2. 線程池特點

線程池的優勢: 線程池做的工作隻要是控制運作的線程數量,處理過程中将任務放入隊列,然後線上程建立後啟動這些任務,如果線程數量超過了最大數量,超出數量的線程排隊等候,等其他線程執行完畢,再從隊列中取出任務來執行。

它的主要特點為:

1.降低資源消耗: 通過重複利用已建立的線程降低線程建立和銷毀造成的銷耗。

2.提高響應速度: 當任務到達時,任務可以不需要等待線程建立就能立即執行。

3.提高線程的可管理性: 線程是稀缺資源,如果無限制的建立,不僅會銷耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。

4.Java 中的線程池是通過 Executor 架構實作的,該架構中用到了Executor, Executors,ExecutorService, ThreadPoolExecutor 這幾個類。

Java多線程之線程池ThreadPool詳解1. 線程池簡介2. 線程池特點3. 線程池核心參數4. 線程池工具類5. 線程池工作原理6. 不建議使用Executors建立線程池7. 實作驗證

3. 線程池核心參數

3.1 核心參數

• corePoolSize 線程池的核心線程數

• maximumPoolSize 能容納的最大線程數

• keepAliveTime 空閑線程存活時間

• unit 存活的時間機關

• workQueue 存放送出但未執行任務的隊列

• threadFactory 建立線程的工廠類

• handler 等待隊列滿後的拒絕政策

線程池中,有三個重要的參數,決定影響了拒絕政策:

corePoolSize - 核心線程數,也即最小的線程數。

workQueue - 阻塞隊列 。

maximumPoolSize -最大線程數。

當送出任務數大于 corePoolSize 的時候,會優先将任務放到 workQueue 阻塞隊列中。當阻塞隊列飽和後,會擴充線程池中線程數,直到達到maximumPoolSize 最大線程數配置。此時,再多餘的任務,則會觸發線程池的拒絕政策了。總結起來,也就是一句話, 當送出的任務數大于(workQueue.size()+maximumPoolSize ),就會觸發線程池的拒絕政策。

3.2 拒絕政策

CallerRunsPolicy: 當觸發拒絕政策,隻要線程池沒有關閉的話,則使用調用線程直接運作任務。一般并發比較小,性能要求不高,不允許失敗。但是,由于調用者自己運作任務,如果任務送出速度過快,可能導緻程式阻塞,性能效率上必然的損失較大。

AbortPolicy: 丢棄任務,并抛出拒絕執行 RejectedExecutionException 異常資訊。線程池預設的拒絕政策。必須處理好抛出的異常,否則會打斷目前的執行流程,影響後續的任務執行。

DiscardPolicy: 直接丢棄,其他啥都沒有。

DiscardOldestPolicy: 當觸發拒絕政策,隻要線程池沒有關閉的話,丢棄阻塞隊列 workQueue 中最老的一個任務,并将新任務加入。

4. 線程池工具類

4.1 四種線程池建立方式

Executors 是一個 Java 中的工具類。提供工廠方法來建立不同類型的線程池。Executors 的建立線程池的方法,建立出來的線程池都實作了ExecutorService 接口。常用方法有以下幾個:

1.newFiexedThreadPool(int Threads):建立固定數目線程的線程池。

2.newCachedThreadPool():建立一個可緩存的線程池,調用 execute将重用以前構造的線程(如果線程可用)。如果沒有可用的線程,則建立一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

3.newSingleThreadExecutor() 建立一個單線程化的 Executor。

4.newScheduledThreadPool(int corePoolSize) 建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代 Timer 類。

5. 線程池工作原理

Java多線程之線程池ThreadPool詳解1. 線程池簡介2. 線程池特點3. 線程池核心參數4. 線程池工具類5. 線程池工作原理6. 不建議使用Executors建立線程池7. 實作驗證

1.在建立了線程池後,線程池中的線程數為零。

2.當調用 execute()方法添加一個請求任務時,線程池會做出如下判斷:

2.1 如果正在運作的線程數量小于 corePoolSize,那麼馬上建立線程運作這個任務;

2.2 如果正在運作的線程數量大于或等于 corePoolSize,那麼将這個任務放入隊列;

2.3 如果這個時候隊列滿了且正在運作的線程數量還小于maximumPoolSize,那麼還是要建立非核心線程立刻運作這個任務; 2.4 如果隊列滿了且正在運作的線程數量大于或等于maximumPoolSize,那麼線程池會啟動飽和拒絕政策來執行。

4.當一個線程完成任務時,它會從隊列中取下一個任務來執行

5.當一個線程無事可做超過一定的時間(keepAliveTime)時,線程會判斷:

4.1 如果目前運作的線程數大于 corePoolSize,那麼這個線程就被停掉。

4.2是以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。

6. 不建議使用Executors建立線程池

不建議大家使用Executors這個類來建立線程池呢,阿裡開發手冊這樣定義:

【強制】線程池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險。說明: Executors 傳回的線程池對象的弊端如下:

1) FixedThreadPool 和 SingleThreadPool:

允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,進而導緻 OOM。

2) CachedThreadPool 和 ScheduledThreadPool:

允許的建立線程數量為 Integer.MAX_VALUE, 可能會建立大量的線程,進而導緻 OOM。

項目中建立多線程時,使用常見的三種線程池建立方式,單一、可變、定長都有一定問題,原因是 FixedThreadPool 和 SingleThreadExecutor 底層都是用LinkedBlockingQueue 實作的,這個隊列最大長度為 Integer.MAX_VALUE,容易導緻 OOM。

是以實際生産一般自己通過 ThreadPoolExecutor 的 7 個參數,自定義線程池。

7. 實作驗證

package com.zrj.unit.juc;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Test;

import java.util.concurrent.*;

/**
 * 線程池
 * 注意:ThreadFactoryBuilder這裡需要運用guava包,自定義線程名稱
 *
 * @author zrj
 * @since 2021/8/20
 **/
public class ThreadPoolExecutorTest {
    // 定義售票視窗數
    private static int saleWindows = 10;
    // 自定義線程名稱
    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").build();
    // 定義線程池
    private static ExecutorService pool = new ThreadPoolExecutor(30, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    // 倒計時鎖存器
    private static CountDownLatch latch = new CountDownLatch(saleWindows);

    /**
     * 模拟搶票
     */
    @Test
    public void saleTicket() {
        try {
            for (int i = 0; i < saleWindows; i++) {
                pool.execute(() -> {
                    try {
                        String threadName = Thread.currentThread().getName();
                        System.out.println(threadName + "視窗開始售票");
                        Thread.sleep(1000);
                        System.out.println(threadName + "視窗售票完成");
                    } catch (InterruptedException e) {
                        System.out.println("售票異常:" + e);
                    } finally {
                        latch.countDown();
                    }
                });
            }
        } catch (Exception e) {
            System.out.println("系統異常:" + e);
        } finally {
            pool.shutdown();
        }

        // 等待所有線程到達放行
        try {
            latch.await();
        } catch (Exception e) {
            System.out.println("系統異常," + e);
        }
        System.out.println("所有線程執行完成,繼續執行主線程");
    }

}

           

繼續閱讀