天天看點

Java-concurrent之ExecutorService

為了能夠更好地控制多線程,JDK提供了一套Executor架構,幫助開發人員有效地進行線程控制,其本質就是一個線程池。

1. 概述

  1. JDK5之後把工作單元和執行機制區分開了,工作單元包括

    Runnable

    Callable<T>

    ,而執行機制則由

    Executor

    架構提供。
  2. Executor

    架構為線程的啟動、執行和關閉提供了便利,底層使用線程池實作。
  3. 使用

    Executor

    架構管理線程的好處在于簡化管理、提高效率,還能避免this逃逸問題——是指不完整的對象被線程調用。
  4. Executor

    架構使用了兩級排程模型進行線程的排程。
    1. 在上層,Java多線程程式通常把應用分解為多個任務,然後使用使用者排程架構

      Executor

      将這些任務映射為固定數量的線程;
    2. 在底層,作業系統核心将這些線程映射到硬體處理器上。

2. 相關類型

主要類以及相關的繼承體系如下:

Executor
    ExecutorService
        AbstractExecutorService
            ThreadPoolExecutor
            ForkJoinPool 【work-stealing模式】
        ScheduledExecutorService
            ScheduledThreadPoolExecutor
           
  1. 這些成員都位于

    java.util.concurrent

    package中,屬于JDK并發包中的核心類。
  2. 其中

    ThreadPoolExecutor

    表示一個線程池,提供了大量配置參數來覆寫盡可能廣的需求。(可以看到工具類

    Executors

    中建構的各類線程池時,其底層都是使用的

    ThreadPoolExecutor

    或其子類。)
  3. 官方提供的工具類

    Executors

    所建立的各類線程池基本已經能夠滿足絕大部分需求。

3.

ThreadPoolExecutor

在平時的開發中,我們最常接觸到的應該就是ThreadPoolExecutor了。

3.1 構造函數

作為

ExecutorService

接口的實作類,Executor架構的最核心的類就是它了,而工具類

Executors

中提供的各類簡化線程池建立的方法,其底層實作大部分都是使用的

ThreadPoolExecutor

或其子類來完成的。是以了解

ThreadPoolExecutor

對我們熟練掌握Java多線程程式設計有着非常重要的意義。

ThreadPoolExecutor

提供了多個構造函數,這裡我們僅列舉最具有代表性的一個:

public ThreadPoolExecutor(int corePoolSize, // 線程池中的核心線程數量,可以了解為最小線程數
				  int maximumPoolSize, // 最大線程數量
				  long keepAliveTime, // 線程允許空閑的最長時間
				  TimeUnit unit, // keepAliveTime參數的機關
				  BlockingQueue<Runnable> workQueue, // 已送出但還未配置設定到執行線程的任務構成的隊列
				  ThreadFactory threadFactory, // 線程工廠執行個體,用途不言自明
				  RejectedExecutionHandler handler) { ... } // 在因線程數量和任務隊列都達到上限而導緻新加入的任務無處可去時,所采取的拒絕政策; JDK預設提供了四種政策。
           

通過以上的注釋,相信讀者應該能夠比較清楚地了解

ThreadPoolExecutor

中幾個重要參數的含義。

3.2 阻塞隊列

BlockingQueue<T>

ThreadPoolExecutor

構造函數中傳入的

BlockingQueue<T>

執行個體,其主要作用是用來存儲已被送出但未能馬上配置設定到線程去執行的任務。

通過搜尋接口的繼承鍊,我們發現JDK提供了如下這四種隊列實作類:

3.2.1

ArrayBlockingQueue

有界的任務隊列。有界正是該隊列最大的特點;該類的執行個體在建構時,必須傳入一個代表該隊列最大容量的參數。

如果使用該隊列執行個體作為任務隊列,隻有在其裝載的任務滿載時,才有可能将線程數提升到corePoolSize之上;換而言之就是除非系統非常繁忙,否則将確定核心線程數維持在corePoolSize。

關于使用該隊列作為任務隊列的線程池執行邏輯,請參閱下面的貼圖。

3.2.2

LinkedBlockingQueue

無界的任務隊列。該隊列的特點就是除非系統資源耗盡,否則無界任務隊列将不會出現任務入隊失敗的情況。

按照下面貼圖中的執行邏輯,當使用此隊列作為任務隊列時,線程池中線程的數量将一直保持在corePoolSize;如果任務建立和處理的速度差異很大,無界隊列将會保持快速增長,直到耗盡系統資源。

3.2.3

PriorityBlockingQueue

帶有執行優先級的任務隊列。這是一個特殊的無界隊列,前面兩種隊列都是按照FIFO算法來處理任務的,但該隊列可以根據任務自身額優先級順序先後執行,這就確定了緊急任務優先執行,進而保證了品質。

3.2.4

SynchronousQueue

直接送出的隊列。這個隊列非常特殊,其并沒有容量,每一次插入操作必須等待一個相應的删除操作; 反之,每一個删除操作都要等待對應的插入操作。

如果使用本執行個體來作為任務隊列,送出的任務将不會被真實地儲存,而總是被直接送出給線程去執行;如果沒有空閑線程,将嘗試建立新的線程,如果線程數量已經達到最大值,将執行拒絕政策。

是以選擇使用本類作為任務隊列時,通常需要設定很大的

maximumPoolSize

值,否則拒絕政策很可能頻繁執行。

3.3 線程工廠

ThreadFactory

講完了

BlockingQueue<T>

,我們接着來看看第二個參數,也就是用于建立線程的線程工廠。

ThreadPoolExecutor

預設使用

Executors.defaultThreadFactory()

來作為線程工廠。這其中一大問題是線程名稱過于泛化,導緻問題排查時舉步維艱(pool-1-thread-1的線程名格式,我們基本無法從中得到任何有用的資訊)。

通過提供自定義的

ThreadFactory

我們将可以得到很多好處——設定更佳人性化的線程名來追蹤問題,跟蹤線程狀态,優化線程池以滿足業務場景,擷取更詳盡的線程堆棧資訊以排錯。

3.4 拒絕政策

RejectedExecutionHandler

作為前文我們提到的

ThreadPoolExecutor

構造函數中的最後一個參數類型。它代表了在任務數量超過了系統實際承載能力時,該如何處理? 即當線程數量達到上限,同時任務隊列中無法塞入更多的新任務時,我們需要一套政策來處理這類情況。

JDK預設提供了四種拒絕政策:

  1. AbortPolicy 直接抛出異常,阻止系統正常工作。
  2. CallerRunsPolicy 隻要線程池未關閉,将直接在調用者線程中運作目前被丢棄的任務。
  3. DiscardOldestPolicy 丢棄最老的那個請求(也就是下一個将要被執行的任務),并嘗試再次送出目前任務。
  4. DiscardPolicy 默默丢棄無法處理的任務,不進行任何操作。如果允許任務丢失,這應該是最好的一種方案了!

以上這四種政策都是作為

ThreadPoolExecutor

内部類而存在的。

3.5 任務排程邏輯

ThreadPoolExecutor.execute() 方法展現出來的任務排程邏輯如下

Java-concurrent之ExecutorService
3.6

ThreadPoolExecutor

擴充

ThreadPoolExecutor

提供了

beforeExecute()

afterExecute()

terminated()

三個接口來允許外界對線程池進行控制。

  1. 以上這三個方法都是

    protected

    通路級别,并且都是空實作;擺明了是交給擴充子類去實作的。
  2. 其中前兩個方法被回調于

    ThreadPoolExecutor.Worker.runWorker()

    方法中。

    ThreadPoolExecutor

    中的工作線程正是

    ThreadPoolExecutor.Worker

    執行個體,

    Worker.runWorker()

    會同時被多個線程通路,是以

    beforeExecute()

    afterExecute()

    也将被多線程通路,這一點需要記住。
@Test
public void customExtend() throws Exception {
	ExecutorService es = new MyThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, CollUtil
			.<Runnable> newBlockingQueue(1, true));

	for (int i = 0; i < 5; i++) {
		MyTask myTask = new MyTask("LQ-" + i);
		es.execute(myTask);
		ThreadUtil.safeSleep(10);
	}

	es.shutdown();
	es.awaitTermination(1000, TimeUnit.MILLISECONDS);
}

private static class MyTask implements Runnable {

	private String name;

	public MyTask(String name) {
		this.name = name;
	}

	@Override
	public void run() {
		Console.log("current task is {}, current thread is {} 正在執行...", name, Thread
				.currentThread().getName());
		ThreadUtil.safeSleep(100);
	}

}

private static class MyThreadPoolExecutor extends ThreadPoolExecutor {
	public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
			TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		Console.log("current task is {}, 執行結束", ((MyTask) r).name);
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		Console.log("current task is {}, current thread is {} 準備執行", ((MyTask) r).name, t
				.getName());
	}

	@Override
	protected void terminated() {
		Console.log("線程池退出");
	}
}

// -------------------------------------- 輸出如下
// 可以看到線程之間并不是連續的,比如 任務 LQ-0的執行
// 執行完成的先後順序不等于送出的順序,即使是同樣的任務。
current task is LQ-0, current thread is pool-1-thread-1 準備執行
current task is LQ-1, current thread is pool-1-thread-2 準備執行
current task is LQ-1, current thread is pool-1-thread-2 正在執行...
current task is LQ-0, current thread is pool-1-thread-1 正在執行...
current task is LQ-2, current thread is pool-1-thread-3 準備執行
current task is LQ-2, current thread is pool-1-thread-3 正在執行...
current task is LQ-3, current thread is pool-1-thread-4 準備執行
current task is LQ-3, current thread is pool-1-thread-4 正在執行...
current task is LQ-4, current thread is pool-1-thread-5 準備執行
current task is LQ-4, current thread is pool-1-thread-5 正在執行...
current task is LQ-1, 執行結束
current task is LQ-2, 執行結束
current task is LQ-3, 執行結束
current task is LQ-0, 執行結束
current task is LQ-4, 執行結束
線程池退出

                

4. 工具類

Executors

中提供的幾種線程池

說完了核心,我們再來看看JDK提供的工具類

Executors

中提供的幾種線程池。

4.1 CachedThreadPool

工具類

Executors

提供的

newCachedThreadPool

,其底層正是使用了該

SynchronousQueue

隊列來暫存任務。

使用工具類

Executors.newCachedThreadPool()

建立的線程池,其maximumPoolSize值為Integer.MAX_VALUE,這一點在選擇線程池執行業務邏輯時千萬要警醒。

按照上面對該隊列的探讨,可以得出結論:該線程池适用于執行很多的短期異步任務的小程式,或者負載比較輕的伺服器。

4.2 FixedThreadPool

工具類

Executors

提供的

newCachedThreadPool

,其底層正是使用了該

LinkedBlockingQueue

隊列來暫存任務。

按照上文對該隊列的探讨,可以得出結論:該線程池适用于為了滿足管理資源的需求,而需要限制目前線程數量的應用場景,它适用于負載比較重的伺服器。沒有空閑線程時, 新的任務将被暫存在一個任務隊列中。

4.3 SingleThreadExecutor

工具類

Executors

提供的

newFixedThreadPool

,其底層正是使用了該

LinkedBlockingQueue

隊列來暫存任務,是以

按照上文對該隊列的探讨,可以得出結論:該線程池适用于需要保證順序地執行各個任務,并且在任意時間點不會有多個線程在活動的場景。

4.4 ScheduledThreadPoolExecutor

該線程池屬于和

ThreadPoolExecutor

同一層次的

ExecutorService

實作類。該線程池支援延遲任務,以及定時及周期性的任務執行;多數情況下可用來替代Timer類。

ScheduledThreadPoolExecutor适用于需要在多個背景線程執行周期任務,同時為了滿足資源管理需求需要限制背景線程數量的應用場景。

4.5 ForkJoinPool

該線程池也是屬于和

ThreadPoolExecutor

同一層次的

ExecutorService

實作類。

該線程池的工作模式類似于

ThreadPoolExecutor

,但是使用了work-stealing模式,其會為線程池中的每個線程建立一個隊列,進而用work-stealing(任務竊取)算法使得線程可以從其他線程隊列裡竊取任務來執行。這樣就可以避免一部分線程無所事事,而另外一部分卻是過度負載。

5. Links

  1. 《億級流量網站架構核心技術》 P244
  2. 《Java高并發程式設計》 P95
  3. Executor簡介