天天看點

關于阿裡規範禁止使用Executors建立線程池的分析

文章目錄

    • 1.阿裡規範
    • 2.Executors主要功能
      • 2.1 newFixedThreadPool
      • 2.2 newSingleThreadExecutor
      • 2.3 newCachedThreadPool
      • 2.4 newSingleThreadScheduledExecutor
    • 3.OOM測試
      • 3.1 FixedThreadPool
      • 3.1 SingleThread
      • 3.3 CachedThreadPool
      • 3.4 ScheduledThreadPool
    • 4.源碼分析
      • 4.1 newFixedThreadPool
      • 4.2 newSingleThreadExecutor
      • 4.3 newCachedThreadPool
      • 4.4 newScheduledThreadPool
    • 5.總結
      • 5.1 建立線程池的正确方式

1.阿裡規範

在阿裡java開發手測中,對Executors有一個專門的規約:

關于阿裡規範禁止使用Executors建立線程池的分析

線程池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險。

注意,這裡的重點是 不允許。而不是不建議。可見該規範 背後都是血淋淋的生産事故。

2.Executors主要功能

打開Executors的官方文檔,其描述為:

Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
Methods that create and return an ExecutorService set up with commonly useful configuration settings.
Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
Methods that create and return a ThreadFactory that sets newly created threads to a known state.
Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
           

Executors主要是為 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, 和 Callable這些類提供的建立工廠方法。這個類主要提供如下幾種方法:

  • 建立并傳回一個具有通用配置的ExecutorService。
  • 建立并傳回一個具有通用配置的ScheduledExecutorService 。
  • 建立并傳回一個包裝的ExecutorService,不能重新對其參數進行配置。
  • 建立并傳回一個ThreadFactory,設定建立線程為指定狀态。
  • 建立并傳回一個閉包形式的callable,以便在執行方法中執行其所需形式的callable。

其常用方法有newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool等。

2.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.
           

Parameters:

nThreads - the number of threads in the pool
           

Returns:

the newly created thread pool
           

Throws:

IllegalArgumentException - if nThreads <= 0
           

建立一個線程池,該線程池重用固定數量的線程在一個共享的無界隊列上操作,在任何時候,大多數線程都在活動處理任務。如果在所有的線程都處于活動狀态時送出了其他的任務,則他們在隊列中等待,直到有一個線程可用為止,如果任何線程在關閉之前的執行過程由于失敗而終止,那麼在需要執行後續任務時,将有一個新的線程替代它。池中的線程将一直存在,直到池顯示關閉。

還有另外一個方法:

public static ExecutorService newFixedThreadPool(int nThreads,
                                                 ThreadFactory threadFactory)
           

與之類似,隻是自行指定了ThreadFactory。

2.2 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()
Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
           

Returns:

the newly created single-threaded Executor
           

建立一個Executor,該Executor使用單個工作隊列線程操作一個無界隊列。但是請注意,如果這個線程在關閉之前由于執行失敗而終止,那麼在需要之系的後續任務的時候,一個新的線程将取代它。)任務包裝安順序之系,并且在任何時間内活動的任務不超過一個。與newFixedThreadPool(1)不同,傳回的Executor保證不會重新配置以使用其他的線程。

與之類似的還有:

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
           

2.3 newCachedThreadPool

public static ExecutorService newCachedThreadPool()
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.
           

Returns:

the newly created thread pool
           

建立一個線程池,根據需要建立的新線程,但是在可用時可以重用之前的構造線程,這些pool通常會提高之系許多短期異步任務的程式的性能。如果可用,對execute的調用将重用之前構造的線程。如果沒有可用的現有線程,将建立一個新線程并添加到pool中。未使用超過60s的線程将被終止之後删除。是以,一個足夠長時間保持空閑的pool将不會消耗任何資源。注意,可以使用ThreadPoolExecccutor構造函數建立具有類似屬性但細節不同的pool。

與此類似的還有:

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
           

2.4 newSingleThreadScheduledExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
           

Parameters:

corePoolSize - the number of threads to keep in the pool, even if they are idle
           

Returns:

a newly created scheduled thread pool
           

Throws:

IllegalArgumentException - if corePoolSize < 0
           

建立一個線程池,該線程池可以在指定的時間延期執行或者定期執行。

與之類似的還有:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,                                                ThreadFactory threadFactory)
                                                              
public static ScheduledExecutorService newSingleThreadScheduledExecutor()  

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
           

3.OOM測試

3.1 FixedThreadPool

有如下代碼:

//-Xmx10m -Xms10m
public static void main(String[] args) throws InterruptedException{
		ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);

		IntStream.range(0,10000000).forEach((i)->{
				threadPool.execute(() -> {
					byte[] array = new byte[1024*1024*1];
					try {
						TimeUnit.HOURS.sleep(1);
						int length  = array.length;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				});
		});

		threadPool.shutdown();
		threadPool.awaitTermination(1,TimeUnit.HOURS);
	}
           

執行結果:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
	at com.dhb.executors.test.FixedThreadPoolOOM.lambda$main$1(FixedThreadPoolOOM.java:18)
	at com.dhb.executors.test.FixedThreadPoolOOM$$Lambda$1/1078694789.accept(Unknown Source)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
	at com.dhb.executors.test.FixedThreadPoolOOM.main(FixedThreadPoolOOM.java:17)
           

我們可以看到,對于FixedThreadPool,固定線程,但是添加的task将被放到隊列裡面,一段時間之後就會出現OOM異常。

3.1 SingleThread

與之前代碼一樣,我們進行修改:

//-Xmx10m -Xms10m
public static void main(String[] args) throws InterruptedException{
		ExecutorService threadPool =  Executors.newSingleThreadExecutor();

		IntStream.range(0,10000000).forEach((i)->{
			threadPool.execute(() -> {
				byte[] array = new byte[1024*1024*1];
				try {
					TimeUnit.HOURS.sleep(1);
					int length  = array.length;
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		});

		threadPool.shutdown();
		threadPool.awaitTermination(1,TimeUnit.HOURS);
	}
           

同樣會出現OOM。

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.dhb.executors.test.SingleThreadOOM.lambda$main$1(SingleThreadOOM.java:15)
	at com.dhb.executors.test.SingleThreadOOM$$Lambda$1/1078694789.accept(Unknown Source)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
	at com.dhb.executors.test.SingleThreadOOM.main(SingleThreadOOM.java:14)
           

3.3 CachedThreadPool

将線程池修改為cachedthreadPool

//-Xms10m -Xmx10m
public static void main(String[] args) throws InterruptedException{
		ExecutorService threadPool =  Executors.newCachedThreadPool();

		IntStream.range(0,10000000).forEach((i)->{
			threadPool.execute(() -> {
				byte[] array = new byte[1024*1024*1];
				try {
					TimeUnit.HOURS.sleep(1);
					int length  = array.length;
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		});

		threadPool.shutdown();
		threadPool.awaitTermination(1,TimeUnit.HOURS);
	}
           

執行之後同樣OOM:

java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
	at com.dhb.executors.test.CachedThreadPoolOOM.lambda$null$0(CachedThreadPoolOOM.java:16)
	at com.dhb.executors.test.CachedThreadPoolOOM$$Lambda$2/1149319664.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: Java heap space
           

3.4 ScheduledThreadPool

public static void main(String[] args) {
		ExecutorService threadPool =  Executors.newScheduledThreadPool(1);

		try {
			IntStream.range(0,10000000).forEach((i)->{
				threadPool.execute(() -> {
					byte[] array = new byte[1024*1024*1];
					try {
						TimeUnit.HOURS.sleep(1);
						int length  = array.length;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				});
			});

			threadPool.shutdown();
			threadPool.awaitTermination(1,TimeUnit.HOURS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
           

同樣也會出現OOM

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.Executors.callable(Executors.java:407)
	at java.util.concurrent.FutureTask.<init>(FutureTask.java:152)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.<init>(ScheduledThreadPoolExecutor.java:209)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:532)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at com.dhb.executors.test.ScheduledThreadPoolOOM.lambda$main$1(ScheduledThreadPoolOOM.java:15)
           

4.源碼分析

對于這些threadPool,在執行的過程中都不約而同的出現了OOM異常。我們可以看看這些方法的源代碼:

4.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           

4.2 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

           

4.3 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

4.4 newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
       public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
           

5.總結

通過源碼可以看到,這四個線程池最底層都是用ThreadPoolExecutor的構造方法。newFixedThreadPool和newSingleThreadExecutor使用的是無界隊列LinkedBlockingQueue。而LinkedBlockingQueue在沒有指定長度的情況下,預設的隊列長度為:

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
           

也就是Integer.MAX_VALUE就是這兩個方法的隊列長度。而方法newCachedThreadPool和ScheduledExecutorService雖然沒有使用LinkedBlockingQueue,但是其線程池的最大線程數是Integer.MAX_VALUE。面對隊列中的資料,這是兩類處理政策,前者是通過加大隊列的緩沖資料的長度來實作,而後者則是讓可用的最大線程數沒有上限。這兩種辦法都不是一個很好解決問題的辦法,在資源有限的情況下,都有可能導緻OOM。

5.1 建立線程池的正确方式

阿裡巴巴的jdk規範讓我們避免使用Executors的預設方法建立線程池。那麼我們可以使用手動的方法來建立,手動指定線程數量和隊列的長度:

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
           

這種情況下,一旦送出的線程超過了目前的可用線程時,就會觸發拒絕政策,抛出java.util.concurrent.RejectedExecutionException,我們可以捕獲異常之後來進行相應的處理。另外,我們還可以使用guava來建立隊列:

private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("test-pool-%d").build();
private static ExecutorService pool = new ThreadPoolExecutor(5, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100), namedThreadFactory, new
ThreadPoolExecutor.
AbortPolicy());
           

通過guava的ThreadFactoryBuilder就能很好的建立一個線程池。

我的部落格即将同步至騰訊雲+社群,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=y1se2gvilcle