天天看點

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

二、Java記憶體模型(重要)

1. CPU緩存模型

1.1 CPU緩存

CPU緩存是為了解決 CPU處理速度和記憶體處理速度不對等的問題。(類比:緩存如Redis是為了解決程式處理速度和通路正常關系型資料庫速度不對等的問題)

記憶體緩存是為了解決 記憶體處理速度和外存(硬碟)處理速度不對等的問題

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

1.2 記憶體緩存不一緻問題

CPU 為了解決記憶體緩存不一緻性問題可以通過制定緩存一緻協定(比如 MESI 協定)或者其他手段來解決。

這個緩存一緻性協定指的是在 CPU高速緩存與主記憶體互動的時候需要遵守的原則和規範。

作業系統通過記憶體模型(Memory Model) 定義一系列規範來解決這個問題。

2. 指令重排序

2.1 概念

為了提升執行速度/性能,計算機在執行程式代碼的時候,會對指令進行重排序。

指令簡單來說就是系統在執行代碼的時候并不一定是按照你寫的代碼的順序依次執行。

2.2 分類

(1)編譯器優化重排

概念:編譯器(包括 JVM、JIT 編譯器等)在不改變單線程程式語義的前提下,重新安排語句的執行順序。

編譯器禁止重排序方式:通過禁止特定類型的編譯器重排序的方式來禁止重排序。

(2)處理器級别指令重排序

處理器級别指令禁止重排序方式:通過插入記憶體屏障(Memory Barrier,或有時叫做記憶體栅欄,Memory Fence)的方式來禁止特定類型的處理器重排序。

&& 記憶體屏障是一種 CPU 指令,用來禁止處理器指令發生重排序(像屏障一樣),進而保障指令執行的有序性。另外,為了達到屏障的效果,它也會使處理器寫入、讀取值之前,将将主記憶體的值寫入高效緩存,清空無效隊列,進而保障變量的可見性。

<1> 指令重排序

現代處理器采用了指令級并行技術(Instruction-Level Parallelism,ILP)來将多條指令重疊執行。如果不存在資料依賴性,處理器可以改變語句對應機器指令的執行順序。

<2> 記憶體系統重排

記憶體系統也會有“重排序”,但又不是真正意義上的重排序。在 JMM 裡表現為主存和本地記憶體的内容可能不一緻,進而導緻程式在多線程下執行可能出現問題。

2.3 java源代碼重排及問題

(1)java源代碼重排過程

Java 源代碼會經曆 編譯器優化重排 —> 指令并行重排 —> 記憶體系統重排 的過程,最終才變成作業系統可執行的指令序列。

(2)指令重排問題

指令重排序可以保證串行語義一緻,但是沒有義務保證多線程間的語義也一緻 ,是以在多線程下,指令重排序可能會導緻一些問題。

3. JMM記憶體模型圖

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

4. 主記憶體與本地記憶體(工作記憶體)

(1)主記憶體

所有線程建立的執行個體對象都存放在主記憶體中,不管該執行個體對象是成員變量還是方法中的本地變量(也稱局部變量)

(2)本地記憶體

每個線程都有一個私有的本地記憶體來存儲共享變量的副本,并且,每個線程隻能通路自己的本地記憶體,無法通路其他線程的本地記憶體。本地記憶體是 JMM 抽象出來的一個概念,存儲了主記憶體中的共享變量副本。

5. JMM8種同步操作

(1)鎖定(lock)

作用于主記憶體中的變量,将他标記為一個線程獨享變量。

(2)解鎖(unlock)

作用于主記憶體中的變量,解除變量的鎖定狀态,被解除鎖定狀态的變量才能被其他線程鎖定。

(3)read(讀取)

作用于主記憶體的變量,它把一個變量的值從主記憶體傳輸到線程的工作記憶體中,以便随後的 load 動作使用。

(4)load(載入)

把 read 操作從主記憶體中得到的變量值放入工作記憶體的變量的副本中。

(5)use(使用)

把工作記憶體中的一個變量的值傳給執行引擎,每當虛拟機遇到一個使用到變量的指令時都會使用該指令。

(6)assign(指派)

作用于工作記憶體的變量,它把一個從執行引擎接收到的值賦給工作記憶體的變量,每當虛拟機遇到一個給變量指派的位元組碼指令時執行這個操作。

(7)store(存儲)

作用于工作記憶體的變量,它把工作記憶體中一個變量的值傳送到主記憶體中,以便随後的 write 操作使用。

(8)write(寫入)

作用于主記憶體的變量,它把 store 操作從工作記憶體中得到的變量的值放入主記憶體的變量中。

6.happens-before原則

6.1 為什麼要happens-before原則?

(1)happens-before 原則的誕生是為了程式員和編譯器、處理器之間的平衡。

(2)程式員追求的是易于了解和程式設計的強記憶體模型,遵守既定規則編碼即可。編譯器和處理器追求的是較少限制的弱記憶體模型,讓它們盡己所能地去優化性能,讓性能最大化。

6.2 happens-before原則設計思想

(1)為了對編譯器和處理器的限制盡可能少,隻要不改變程式的執行結果(單線程程式和正确執行的多線程程式),編譯器和處理器怎麼進行重排序優化都行。

(2)對于會改變程式執行結果的重排序,JMM 要求編譯器和處理器必須禁止這種重排序。

6.3 happens-before常見規則

(1)程式順序規則

一個線程内,按照代碼順序,書寫在前面的操作 happens-before 于書寫在後面的操作;

(2)解鎖規則

解鎖 happens-before 于加鎖;

(3)volatile 變量規則

對一個 volatile 變量的寫操作 happens-before 于後面對這個 volatile 變量的讀操作。

說白了就是對 volatile 變量的寫操作的結果對于發生于其後的任何操作都是可見的。

(4)傳遞規則

如果 A happens-before B,且 B happens-before C,那麼 A happens-before C;

(5)線程啟動規則

Thread 對象的 start()方法 happens-before 于此線程的每一個動作。

&& 如果兩個操作不滿足上述任意一個 happens-before 規則,那麼這兩個操作就沒有順序的保障,JVM 可以對這兩個操作進行重排序。

6.4 happens-before 和 JMM 什麼關系?

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

7. 并發程式設計三個重要特性

7.1 原子性

(1)一次操作或者多次操作,要麼所有的操作全部都得到執行并且不會受到任何因素的幹擾而中斷,要麼都不執行。

(2)Java 中,可以借助synchronized 、各種 Lock 以及各種原子類實作原子性。

(3)synchronized 和各種 Lock 可以保證任一時刻隻有一個線程通路該代碼塊,是以可以保障原子性。各種原子類是利用CAS(compare and swap)操作(可能也會用到 volatile或者final關鍵字)來保證原子操作。

7.2 可見性

(1)當一個線程對共享變量進行了修改,那麼另外的線程都是立即可以看到修改後的最新值。

(2)Java 中,可以借助synchronized 、volatile 以及各種 Lock 實作可見性。

(3)果我們将變量聲明為 volatile ,這就訓示 JVM,這個變量是共享且不穩定的,每次使用它都到主存中進行讀取。

7.3 有序性

(1)由于指令重排序問題,代碼的執行順序未必就是編寫代碼時候的順序。

(2)在 Java 中,volatile 關鍵字可以禁止指令進行重排序優化。

8. JMM記憶體模型總結

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

三、Java線程池(重要)

1.概念

提前建立好多個線程,放入線程池中,使用時直接擷取,使用完放回池中

1.線程池的作用

(1)提高響應速度:

減少了建立新線程的時間

(1)降低資源消耗:

重複利用線程池中線程,不需要每次建立

(1)便于線程管理:

可以進行線程的統一配置設定、調優和監控

2.Executor 架構

2.1 概念

(1)

工具類、線程池的工廠類架構,用于建立并傳回不同類型的線程池

(2)

Executor 架構不僅包括了線程池的管理,還提供了線程工廠、隊列以及拒絕政策等,Executor 架構讓并發程式設計變得更加簡單。

2.2 三大組成

2.2.1 任務(Runnable /Callable)

執行任務需要實作的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 實作類都

可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。

2.2.2 任務的執行(Executor)

包括任務執行機制的核心接口 Executor ,以及繼承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 這兩個關鍵類實作了 ExecutorService 接口。

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

2.2.3 異步計算的結果(Future)

(1)Future 接口以及 Future 接口的實作類 FutureTask 類都可以代表異步計算的結果。

(2)當我們把 Runnable接口 或 Callable 接口 的實作類送出給 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行,調用 submit() 方法時會傳回一個 FutureTask 對象。

2.3 Executor 架構的使用

如圖:

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

2.3.1 主線程首先要建立實作Runnable或者Callable接口的任務對象。

2.3.2 把建立完成的實作 Runnable/Callable接口的對象直接交給ExcutorService執行。

ExecutorService.execute(Runnable command))或者也可以把 Runnable 對象或Callable 對象送出給 ExecutorService 執行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))

2.3.3 如果執行 ExecutorService.submit(…),ExecutorService 将傳回一個實作Future接口的對象。

由于 FutureTask 實作了 Runnable,我們也可以建立 FutureTask,然後直接交給 ExecutorService 執行。

2.3.4 最後,主線程可以執行 FutureTask.get()方法來等待任務執行完成。

主線程也可以執行 FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行

3.ThreadPoolExecutor 類(重要)

線程池實作類 ThreadPoolExecutor 是 Executor 架構最核心的類。

3.1 ThreadPoolExecutor 類分析

ThreadPoolExecutor 類中提供的四個構造方法。我們來看最長的那個,其餘三個都是在這個構造方法的基礎上産生(其他幾個構造方法說白點都是給定某些預設參數的構造方法比如預設制定拒絕政策是什麼)。

/**
     * 用給定的初始參數建立一個新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數量
                              int maximumPoolSize,//線程池的最大線程數
                              long keepAliveTime,//當線程數大于核心線程數時,多餘的空閑線程存活的最長時間
                              TimeUnit unit,//時間機關
                              BlockingQueue<Runnable> workQueue,//任務隊列,用來儲存等待執行任務的隊列
                              ThreadFactory threadFactory,//線程工廠,用來建立線程,一般預設即可
                              RejectedExecutionHandler handler//拒絕政策,當送出的任務過多而不能及時處理時,我們可以定制政策來處理任務
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

3.2 ThreadPoolExecutor3個最重要的參數

3.2.1 corePoolSize:核心線程池大小

最小可以同時運作的線程數量。

3.2.2 maximumPoolSize :最大線程數

當隊列中存放的任務達到隊列容量的時候,目前可以同時運作的線程數量變為最大線程數。

3.2.3 workQueue: 阻塞隊列

(1)當新任務來的時候會先判斷目前運作的線程數量是否達到核心線程數,如果達到的話,新任務就會被存放在隊列中。

(2)用來存放待執行的任務,均為線程安全,如果隊列滿了,而任務還再持續進入,則會建立新的線程。

3.3 ThreadPoolExecutor 其他常見參數

3.3.1 keepAliveTime:最長存活時間

超出核心線程數之外的線程,沒有新任務時最多保持多長時間後會終止

3.3.2 unit : keepAliveTime 參數的時間機關

3.3.3 threadFactory :executor 建立新線程的時候會用到。

用來生産線程執行任務的線程工廠,預設正常優先級、非守護線程

3.3.4 handler :飽和(拒絕)政策。

如果目前同時運作的線程數量達到最大線程數量并且隊列也已經被放滿了任務時,ThreadPoolTaskExecutor 定義如下政策(如:ThreadPoolExecutor.AbortPolicy):

(1)AbortPolicy

抛出 RejectedExecutionException異常來拒絕新任務的處理。

(1)CallerRunsPolicy

重試送出目前的任務,即再次調用運作該任務的execute()方法。

适用于:承受一定延遲并且要求任何一個任務請求都要被執行

(1)DiscardPolicy

不處理新任務,直接丢棄掉

(1)DiscardOldestPolicy

丢棄最早的未處理的任務請求。

&& 如圖為常見參數圖:

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

3.4 建立線程池方式

推薦使用 ThreadPoolExecutor 構造函數建立線程池

3.4.1 為什麼推薦ThreadPoolExecutor

首先,如果不使用線程池,有可能會造成系統建立大量同類線程而導緻消耗完記憶體或者“過度切換”的問題。

(1)導緻OOM

通過 ThreadPoolExecutor 構造函數的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險。

說白了就是:使用有界隊列,控制線程建立數量。

如通過Executors 去建立線程池,傳回線程池對象弊端如下:

<1> FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,進而導緻 OOM。

<2> CachedThreadPool 和 ScheduledThreadPool : 允許建立的線程數量為 Integer.MAX_VALUE ,可能會建立大量線程,進而導緻 OOM。

(2)根據具體業務去動态配置相關參數及政策等

<1> 實際使用中需要根據自己機器的性能、業務場景來手動配置線程池的參數比如核心線程數、使用的任務隊列、飽和政策等等。

<2> 我們應該顯示地給我們的線程池命名,這樣有助于我們定位問題。

3.4.2 推薦的建立線程池方式

(1)通過ThreadPoolExecutor構造函數實作(推薦)

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

(2)通過 Executor 架構的工具類 Executors 來實作我們可以建立三種類型的 ThreadPoolExecutor:FixedThreadPool、SingleThreadExecutor、CachedThreadPool

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐

4.ThreadPoolExecutor 原理分析

4.1 ThreadPoolExecutor的execute(worker)方法

該方法送出一個任務到線程池中去

Java之并發程式設計(二)二、Java記憶體模型(重要)三、Java線程池(重要)四、Java 線程池最佳實踐
// 存放線程池的運作狀态 (runState) 和線程池内有效線程的數量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    //任務隊列
    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任務為null,則抛出異常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中儲存的線程池目前的一些狀态資訊
        int c = ctl.get();

        //  下面會涉及到 3 步 操作
        // 1.首先判斷目前線程池中執行的任務數量是否小于 corePoolSize
        // 如果小于的話,通過addWorker(command, true)建立一個線程,并将任務(command)添加到該線程中;然後,啟動該線程進而執行任務。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果目前執行的任務數量大于等于 corePoolSize 的時候就會走到這裡
        // 通過 isRunning 方法判斷線程池狀态,線程池處于 RUNNING 狀态并且隊列可以加入任務,該任務才會被加入進去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次擷取線程池狀态,如果線程池狀态不是 RUNNING 狀态就需要從任務隊列中移除任務,并嘗試判斷線程是否全部執行完畢。同時執行拒絕政策。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果目前線程池為空就新建立一個線程并執行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通過addWorker(command, false)建立一個線程,并将任務(command)添加到該線程中;然後,啟動該線程進而執行任務。
        //如果addWorker(command, false)執行失敗,則通過reject()執行相應的拒絕政策的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

           

4.2 addWorker

該方法主要用來建立新的工作線程,如果傳回 true 說明建立和啟動工作線程成功,否則的話傳回的就是 false。

// 全局鎖,并發操作必備
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟蹤線程池的最大大小,隻有在持有全局鎖mainLock的前提下才能通路此集合
    private int largestPoolSize;
    // 工作線程集合,存放線程池中所有的(活躍的)工作線程,隻有在持有全局鎖mainLock的前提下才能通路此集合
    private final HashSet<Worker> workers = new HashSet<>();
    //擷取線程池狀态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //判斷線程池的狀态是否為 Running
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


    /**
     * 添加新的工作線程到線程池
     * @param firstTask 要執行
     * @param core參數為true的話表示使用線程池的基本大小,為false使用線程池最大大小
     * @return 添加成功就傳回true否則傳回false
     */
   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //這兩句用來擷取線程池的狀态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //擷取線程池中工作的線程的數量
                int wc = workerCountOf(c);
                // core參數為false的話表明隊列也滿了,線程池大小變為 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //原子操作将workcount的數量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果線程的狀态改變了就再次執行上述操作
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 标記工作線程是否啟動成功
        boolean workerStarted = false;
        // 标記工作線程是否建立成功
        boolean workerAdded = false;
        Worker w = null;
        try {

            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              // 加鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //擷取線程池狀态
                    int rs = runStateOf(ctl.get());
                   //rs < SHUTDOWN 如果線程池狀态依然為RUNNING,并且線程的狀态是存活的話,就會将工作線程添加到工作線程集合中
                  //(rs=SHUTDOWN && firstTask == null)如果線程池狀态小于STOP,也就是RUNNING或者SHUTDOWN狀态下,同時傳入的任務執行個體firstTask為null,則需要添加到工作線程集合和啟動新的Worker
                   // firstTask == null證明隻建立線程而不執行任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                       //更新目前工作線程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 工作線程是否啟動成功
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                 如果成功添加工作線程,則調用Worker内部的線程執行個體t的Thread#start()方法啟動真實的線程執行個體
                if (workerAdded) {
                    t.start();
                  /// 标記線程啟動成功
                    workerStarted = true;
                }
            }
        } finally {
           // 線程啟動失敗,需要從工作線程中移除對應的Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

           

4.3 示例代碼:Runnable+ThreadPoolExecutor,如下:

我們在代碼中模拟了 10 個任務,我們配置的核心線程數為 5 、等待隊列容量為 100 ,是以每次隻可能存在 5 個任務同時執行,剩下的 5 個任務會被放到等待隊列中去。目前的 5 個任務中如果有任務被執行完了,線程池就會去拿新的任務執行。

4.3.1 MyRunnable.java

首先建立一個 Runnable 接口的實作類(當然也可以是 Callable 接口,我們上面也說了兩者的差別。)

import java.util.Date;

/**
 * 這是一個簡單的Runnable類,需要大約5秒鐘來執行其任務。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

           

4.3.2 ThreadPoolExecutorDemo.java

編寫測試程式,我們這裡以阿裡巴巴推薦的使用 ThreadPoolExecutor 構造函數自定義參數的方式來建立線程池。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿裡巴巴推薦的建立線程池的方式
        //通過ThreadPoolExecutor構造函數自定義參數建立
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //建立WorkerThread對象(WorkerThread類實作了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //執行Runnable
            executor.execute(worker);
        }
        //終止線程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

           

上面的代碼指定了:

corePoolSize: 核心線程數為 5。

maximumPoolSize : 最大線程數 10

keepAliveTime : 等待時間為 1L。

unit: 等待時間的機關為 TimeUnit.SECONDS。

workQueue: 任務隊列為 ArrayBlockingQueue,并且容量為 100;

handler: 飽和政策為 CallerRunsPolicy。

四、Java 線程池最佳實踐

1.使用 ThreadPoolExecutor 的構造函數聲明線程池。

原因上面已經寫過,點選下面跳轉:

3.4.1 為什麼推薦ThreadPoolExecutor

2.監測線程池運作狀态

2.1 你可以通過一些手段來檢測線程池的運作狀态比如 SpringBoot 中的 Actuator 元件。

2.2 你也可以利用 ThreadPoolExecutor 的相關 API 做一個簡陋的監控

如下API:

printThreadPoolStatus()會每隔一秒列印出線程池的線程數、活躍線程數、完成的任務數、以及隊列中的任務數。

/**
     * 列印線程池的狀态
     * @param threadPool 線程池對象
     */
    public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
            log.info("Active Threads: {}", threadPool.getActiveCount());
            log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);
    }

           

3.建議不同類别的業務使用不同的線程池

一般建議是不同的業務使用不同的線程池,配置線程池的時候根據目前業務的情況對目前線程池進行配置,因為不同的業務的并發以及對資源的使用情況都不同,重心優化系統性能瓶頸相關的業務。

4.别忘記給線程池命名

給線程池裡的線程命名通常有下面兩種方式:

4.1 利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

           

4.2 自己實作 ThreadFactor

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 線程工廠,它設定線程名稱,有利于我們定位問題。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 建立一個帶名字的線程池生産工廠
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }

}

           

5.正确配置線程池參數

5.1 線程池設定

5.1.1 過大過小引發的問題

(1)過小

如果同一時間有大量任務/請求需要處理,可能會導緻大量的請求/任務在任務隊列中排隊等待執行,甚至會出現任務隊列滿了之後任務/請求無法處理的情況,或者大量任務堆積在任務隊列導緻 OOM。這樣很明顯是有問題的,CPU 根本沒有得到充分利用。

(2)過大

如果我們設定線程數量太大,大量線程可能會同時在争取 CPU 資源,這樣會導緻大量的上下文切換,進而增加線程的執行時間,影響了整體執行效率。

5.1.2 線程池大小設定公式

(1) 标準公式

最佳線程數 = N(CPU 核心數)∗(1+WT(線程等待時間)/ST(線程計算時間)),其中 WT(線程等待時間)=線程運作總時間 - ST(線程計算時間)。

&& 我們可以通過 JDK 自帶的工具 VisualVM 來檢視 WT/ST 比例

(2) 簡單适用的公式

<1> 先判斷是CPU 密集任務還是 IO 密集任務

CPU 密集型:

簡單了解就是利用 CPU 計算能力的任務比如你在記憶體中對大量資料進行排序。

IO 密集型:

涉及到網絡讀取,檔案讀取這類都是 IO 密集型,這類任務的特點是 CPU 計算耗費時間相比于等待 IO 操作完成的時間來說很少,大部分時間都花在了等待 IO 操作完成上。

<2> 然後選擇對應的公式

CPU 密集型任務: N+1

I/O 密集型任務: 2N

5.2 ThreadPoolExecutor3個最重要的參數

上面已經寫過,點選下面跳轉:

3.2 ThreadPoolExecutor3個最重要的參數

上一篇跳轉—Java之并發程式設計(一)                 下一篇跳轉—Java之并發程式設計(三)

本篇文章主要參考連結如下:

參考連結1-King說Java

參考連結2-JavaGuide

持續更新中…

随心所往,看見未來。Follow your heart,see light!

歡迎點贊、關注、留言,一起學習、交流!