天天看點

Java并發程式設計:線程池1. 概述2. Executor架構3. 線程池的使用4. 線程池的實作原理5. ThreadPoolExecutor線程複用原理6. 案例分析

1. 概述

1.1 線程池的特點

  • 降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。(線程複用)
  • 提高響應速度。當任務到達時,任務可以不需要等待線程建立就能立即執行。
  • 提高線程的可管理性。線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定、調優和監控。(控制最大并發數)

1.2 異步回調

  • 同步和異步通常用來形容一次方法調用。
  • 同步方法調用一旦開始,調用者必須等到方法調用傳回後,才能繼續後續的行為。
  • 異步方法調用一旦開始,方法調用就會立即傳回,調用者就可以繼續後續的操作。而異步方法通常會在另一個線程中“真實”地執行。整個過程,不會阻礙調用者的工作。對于調用者來說,異步調用似乎是一瞬間完成的。如果異步調用需要傳回結果,那麼當這個異步調用真實完成時,會通知調用者。
package com.java.forkjoinpool;

import java.util.concurrent.CompletableFuture;

/**
 * @author rrqstart
 * @Description 異步回調
 * public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {//......}
 */
public class CompletableFutureTest {
    public static void main(String[] args) throws Exception {
        //異步調用,無傳回值,開啟一個新線程來執行任務
        //public static CompletableFuture<Void> runAsync(Runnable runnable)
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":無傳回值。");
        });
        Void v = completableFuture1.get();
        System.out.println(v); //null

        System.out.println("----------------------------------------");

        //異步回調,有傳回值,開啟一個新線程來執行任務
        //public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":有傳回值。");
//            int i = 10 / 0;
            return 1024;
        });
        Integer i = completableFuture2.whenComplete((t, u) -> {
            System.out.println("t = " + t);
            System.out.println("u = " + u);
        }).exceptionally(f -> {
            System.out.println("exception : " + f.getMessage());
            return 404;
        }).get();
        System.out.println(i);
    }
}
           
//int i = 10 / 0;被注釋掉時程式的輸出結果:
ForkJoinPool.commonPool-worker-9:無傳回值。
null
----------------------------------------
ForkJoinPool.commonPool-worker-9:有傳回值。
t = 1024
u = null
1024
           
//int i = 10 / 0;沒有被注釋掉時程式的輸出結果:
ForkJoinPool.commonPool-worker-9:無傳回值。
null
----------------------------------------
ForkJoinPool.commonPool-worker-9:有傳回值。
t = null
u = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
exception : java.lang.ArithmeticException: / by zero
404
           

2. Executor架構

2.1 Executor架構簡介

  • 在 Java 中,使用線程來異步的執行任務。Java 的線程既是工作單元,也是執行機制。從 JDK1.5 開始,把工作單元與執行機制分離開來,工作單元包括 Runnable 和 Callable,執行機制由 Executor 架構提供。
  • Executor 架構的兩級排程模型:
    • 在 HotSpot VM 的線程模型中,Java 線程被一對一映射為本地作業系統的線程。當 Java 線程啟動時會建立一個本地作業系統線程;當該 Java 線程終止時,這個作業系統線程也會被回收。作業系統會排程所有線程并将它們配置設定給可用的CPU。
    • 在上層,Java 多線程程式通常把應用分解為若幹個任務,然後使用使用者級的排程器(Executor 架構)将這些任務映射為固定數量的線程;在底層,作業系統核心(OS Kernel)将這些線程映射到硬體處理器上。
  • Executor 架構的結構:
    • 任務:包括被執行任務需要實作的接口 Runnable 和 Callable。
    • 任務的執行:包括任務執行機制的核心接口 Executor,以及繼承自 Executor 的 ExecutorService 接口。
    • 異步計算的結果:包括接口 Future 和實作 Future 的 FutureTask 類。
Java并發程式設計:線程池1. 概述2. Executor架構3. 線程池的使用4. 線程池的實作原理5. ThreadPoolExecutor線程複用原理6. 案例分析

2.2 Executors源碼分析

package java.util.concurrent;
//Since:JDK1.5
public class Executors {
	//1.建立一個隻有一個線程的線程池:一個任務一個任務的執行,适用于需要保證順序的執行各個任務。
	public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }
    
    //2.建立一個固定線程數的可重用的線程池:執行長期任務性能好,适合嚴格限制線程數的場景,如:負載比較重的伺服器。
    public static ExecutorService newFixedThreadPool(int nThreads) {
    	//使用無界隊列LinkedBlockingQueue(隊列容量為:Integer.MAX_VALUE)作為線程池的工作隊列
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
    //3.建立一個線程數可伸縮的線程池:适用于執行很多短期異步任務,線程池根據需要建立新線程,但在先前建構的線程可用時将重新開機它們。可擴容!
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
	/**
	 * ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。
	 * 表示在給定的延遲之後執行任務,或者定期執行任務。
	 * ScheduledThreadPoolExecutor的功能與Timer類似,但其功能更強大、更靈活。
	 * Timer對應的是單個背景線程,ScheduledThreadPoolExecutor可以對應1個或多個背景線程。
	 */
	//4.建立固定數量線程的線程池:适合多個背景線程執行周期任務,同時為了滿足資源管理的需求而限制背景線程數量的場景。
	public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
	
	//5.建立單個線程的線程池:适合需要單個背景線程執行周期任務,同時需要保證順序的執行各個任務的應用場景。
	public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }
}
           

2.3 Callable與FutureTask

2.3.1 Callable

Callable接口的源碼(since JDK1.5)

package java.util.concurrent;

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
           

Runnable接口的源碼(since JDK1.0)

package java.lang;

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
           

Thread類的部分源碼(since JDK1.0)

package java.lang;

public class Thread implements Runnable {
	
    public synchronized void start() {
        //A zero status value corresponds to state "NEW".
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                //do nothing.
            }
        }
    }

    private native void start0();
    
    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
}
           

2.3.2 FutureTask

  • Future接口和FutureTask類代表異步計算的結果。
  • 可以把FutureTask交給Executor執行,也可以通過ExecutorService.submit(…)方法傳回一個FutureTask,然後執行FutureTask.get()方法或FutureTask.canael(…)方法。也可以單獨使用FutureTask:當一個線程需要等待另一個線程把某個任務執行完後它才能繼續執行,此時可以使用FutureTask。
  • FutureTask是基于AbstractQueuedSynchronizer(AQS)實作的。
  • 基于AQS實作的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、FutureTask。

FutureTask類的部分源碼(since JDK1.5)

package java.util.concurrent;

//RunnableFuture<V>接口的父接口:Future<V>、Runnable
//FutureTask<V>類實作的接口:RunnableFuture<V>、Future<V>、Runnable
public class FutureTask<V> implements RunnableFuture<V> {
	//構造器1
	public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; //ensure visibility of callable
    }
    
    //構造器2
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW; //ensure visibility of callable
    }
	
	/**
	 * 根據run()方法被執行的時機,FutureTask可處于三種狀态:
	 *(1)未啟動:建立了FutureTask但還未執行run()方法時FutureTask處于該狀态。
	 *(2)已啟動:run()方法被執行時。
	 *(3)已完成:run()方法執行完後正常結束,或被cancel()方法取消,或run()方法抛出異常而異常結束時。
	 */
	public void run() { //...... }
	
	/**
	 * 當FutureTask處于未啟動或已啟動狀态時,執行get()方法将導緻調用線程阻塞。
	 * 當FutureTask處于已完成狀态時,執行get()方法将導緻調用線程立即傳回結果或抛出異常。
	 */
	public V get() throws InterruptedException, ExecutionException { //...... }
	
	/**
	 * 當FutureTask處于未啟動狀态時,執行cancel(...)方法将導緻此任務永遠不會被執行。
	 * 當FutureTask處于已啟動狀态時,執行cancel(true)方法将以中斷執行此任務線程的方式來試圖停止任務。
	 * 當FutureTask處于已啟動狀态時,執行cancel(false)方法将不會對正在執行此任務的線程産生影響。
	 * 當FutureTask處于已完成狀态時,執行cancel(...)方法将傳回false。
	 */
	public boolean cancel(boolean mayInterruptIfRunning) { //...... }
}
           

2.3.3 案例分析

package com.java.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

/**
 * @author rrqstart
 * 建立線程的方式:
 * * (1)繼承Thread類
 * * (2)實作Runnable接口:無泛型、重寫run方法、無傳回值、不可抛異常
 * * (3)實作Callable接口:有泛型、重寫call方法、有傳回值、可抛異常
 * <p>
 * FutureTask可用于異步擷取執行結果或取消執行任務的場景。通過get()方法可以異步擷取執行結果。
 * 不論FutureTask調用多少次run()或者call()方法,它都能確定隻執行一次Runable或Callable任務。
 * 是以,FutureTask非常适合用于耗時高并發的計算,另外可以通過cancel()方法取消執行任務。
 */
public class ThreadWithCallable {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyThread thread = new MyThread();

        FutureTask<Integer> futureTask = new FutureTask<Integer>(thread);

        new Thread(futureTask, "A").start(); //在這個線程的任務耗時約2秒

        //如果再添加一個線程,結果還是列印一條結果
        new Thread(futureTask, "B").start();

        //主線程可以完成自己的任務後,去擷取其他線程的結果
        System.out.println(Thread.currentThread().getName() + "-->計算完成");

        //這個get方法,可能會産生阻塞,應該放在代碼的最後,或者使用異步通信來處理
        System.out.println(futureTask.get());

        /*
         * 執行結果:
         *      main-->計算完成
         *      ------ come in call------
         *      1024
         */
    }
}

//泛型帶什麼類型,call方法就傳回什麼類型
class MyThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("------ come in call------");
        TimeUnit.SECONDS.sleep(2);
        return 1024;
    }
}
           

3. 線程池的使用

3.1 線程池的建立

3.1.1 ThreadPoolExecutor源碼分析

package java.util.concurrent;
//Since:JDK1.5
public class ThreadPoolExecutor extends AbstractExecutorService {
	// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
	//The context to be used when executing the finalizer, or null.
    private final AccessControlContext acc;
    
    private volatile int corePoolSize; //核心線程池的大小

    private volatile int maximumPoolSize; //最大線程池的大小

    private final BlockingQueue<Runnable> workQueue; //用來暫存任務的工作隊列

    private volatile long keepAliveTime;
    
    private volatile ThreadFactory threadFactory;

    private volatile RejectedExecutionHandler handler;

	//預設的拒絕政策:AbortPolicy
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

	//預定義的4種拒絕政策的靜态内部類
    public static class CallerRunsPolicy implements RejectedExecutionHandler {......}
    public static class AbortPolicy implements RejectedExecutionHandler {......}
    public static class DiscardPolicy implements RejectedExecutionHandler {......}
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {......}

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}
           

3.1.2 7大參數

  • corePoolSize

    :線程池中的常駐核心線程數,是線程池的基本大小。當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會建立線程,等到需要執行的任務數大于線程池的基本大小時就不再建立。如果調用了線程池的

    prestartAllCoreThreads()

    方法,線程池會提前建立并啟動所有基本線程。
    • 核心線程:線程池中有兩類線程,核心線程和非核心線程。核心線程預設情況下會一直存在于線程池中,即使這個核心線程什麼都不幹(鐵飯碗),而非核心線程如果長時間的閑置,就會被銷毀(臨時工)。
  • maximumPoolSize

    :線程池中能夠容納同時執行的最大線程數,也就是線程池允許建立的最大線程數,此值必須大于等于1。
    • 該值等于核心線程數量 + 非核心線程數量。
  • keepAliveTime

    :非核心線程閑置逾時時長。目前池中線程數量超過corePoolSize時,當空閑時間達到keepAliveTime時,多餘的線程會被銷毀,直到隻剩下corePoolSize個線程為止。
    • 非核心線程如果處于閑置狀态超過該值,就會被銷毀。如果設定allowCoreThreadTimeOut(true),則也會作用于核心線程。
  • unit

    :keepAliveTime的機關。
  • workQueue

    :任務隊列,該阻塞隊列中維護着被送出但等待執行的Runnable任務對象。
  • threadFactory

    :建立線程的工廠 ,用于批量建立線程,統一在建立線程時設定一些參數,如是否是守護線程、線程的優先級等。如果不指定,會建立一個預設的線程工廠。
  • handler

    :拒絕政策。表示當隊列和線程池都滿了,即線程池處于飽和狀态時,或者線程池已經關閉了的時候,如何來拒絕請求執行的任務的政策。

3.1.3 4大拒絕政策

  • 等待隊列已經滿了,再也塞不下新任務了,同時,線程池中的最大線程數也達到了,無法繼續為新任務服務。此時就需要拒絕政策機制合理的處理這個問題。
  • AbortPolicy:預設的拒絕政策。直接抛出RejectedExecutionException異常阻止系統正常運作。
  • CallerRunsPolicy:“調用者運作”的一種調節機制,該政策既不會抛棄任務,也不會抛出異常,而是将某些任務回退到調用者,進而降低新任務的流量。
  • DiscardOldestPolicy:抛棄隊列中等待最久的任務,然後把目前任務加入隊列中嘗試再次送出目前任務。
  • DiscardPolicy:該政策默默地丢棄無法處理的任務,不予任何處理也不抛出異常,如果允許任務丢失,這是最好的一種政策。

3.2 向線程池送出任務

  • public void execute(Runnable command)

    :用于送出不需要傳回值的任務,是以該方法無法判斷任務是否被線程池執行成功。
  • public Future<?> submit(Runnable task)

    :用于送出需要傳回值的任務,線程池會傳回一個Future類型的對象,通過這個對象可以判斷任務是否執行成功,并且可以通過這個對象的get()方法來擷取傳回值,get()方法會阻塞目前線程直到任務完成,而使用get(long timeout, TimeUnit unit)方法會阻塞目前線程一段時間後立即傳回,這時候可能任務沒有執行完。
  • public <T> Future<T> submit(Runnable task, T result)

  • public <T> Future<T> submit(Callable<T> task)

3.3 關閉線程池

  • public void shutdown()

    :周遊線程池中的工作線程,然後逐個調用線程的interrupt()方法來中斷線程,是以無法響應中斷的任務可能永遠無法終止。shutdown()方法隻是将線程池的狀态設定成SHUTDOWN,然後中斷所有沒有正在執行任務的線程。
  • public List<Runnable> shutdownNow()

    :周遊線程池中的工作線程,然後逐個調用線程的interrupt()方法來中斷線程,是以無法響應中斷的任務可能永遠無法終止。shutdownNow()方法首先将線程池的狀态設定成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,并傳回等待執行任務的清單。
  • public boolean isShutdown()

    :隻要調用了shutdown()或shutdownNow()中的任意一個,isShutdown()方法就傳回true。
  • public boolean isTerminated()

    :當所有任務都已關閉後,才表示線程池關閉成功,此時調用isTerminated()方法會傳回true。

3.4 合理地配置線程池

  • CPU密集型任務:

    maximumPoolSize = Ncpu + 1

    (配置盡可能小的線程池)
  • IO密集型任務:

    maximumPoolSize = Ncpu * Ucpu * (1 + W / C)

    maximumPoolSize = 2 * Ncpu

    (由于IO密集型任務的線程并不是一直在執行任務,應配置盡可能大的線程池)
    • Ncpu = CPU的數量
    • Ucpu = 目标CPU的使用率(0 ≤ Ucpu ≤ 1)
    • W / C = 等待時間與計算時間的比率
擷取目前裝置的邏輯處理器個數Ncpu:
  • int count = Runtime.getRuntime().availableProcessors();

3.5 線程池的狀态

線程池本身有一個排程線程,這個線程就是用于管理布控整個線程池裡的各種任務和事務,例如建立線程、銷毀線程、任務隊列管理、線程隊列管理等等,故線程池也有自己的狀态。ThreadPoolExecutor 類中使用了一些 private static final int 型常量來表示線程池的狀态。

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
           
  • RUNNING:線程池建立後處于 RUNNING 狀态。
  • SHUTDOWN:調用 shutdown() 方法後處于 SHUTDOWN 狀态,線程池不能接受新的任務,清除一些空閑 worker,會等待阻塞隊列的任務完成。
  • STOP:調用 shutdownNow() 方法後處于 STOP 狀态,線程池不能接受新的任務,中斷所有線程,阻塞隊列中沒有被執行的任務全部丢棄。此時,poolsize=0,阻塞隊列的 size=0。
  • TIDYING:當所有的任務已終止,ctl 記錄的”任務數量”為 0,線程池會變為 TIDYING 狀态。接着會執行 terminated() 函數。
    • ThreadPoolExecutor 中有一個控制狀态的屬性叫 ctl,它是一個 AtomicInteger 類型的變量。線程池狀态就是通過 AtomicInteger 類型的成員變量 ctl 來擷取的。
    • 擷取的 ctl 值傳入 runStateOf() 方法,與 ~CAPACITY 位與運算(CAPACITY 是低 29 位全 1 的 int 變量)。
    • ~CAPACITY 在這裡相當于掩碼,用來擷取 ctl 的高 3 位,表示線程池狀态;而另外的低 29 位用于表示工作線程數。
  • TERMINATED:線程池處在 TIDYING 狀态時,執行完 terminated() 方法之後,就會由 TIDYING → TERMINATED, 線程池被設定為 TERMINATED 狀态。

4. 線程池的實作原理

Java并發程式設計:線程池1. 概述2. Executor架構3. 線程池的使用4. 線程池的實作原理5. ThreadPoolExecutor線程複用原理6. 案例分析
Java并發程式設計:線程池1. 概述2. Executor架構3. 線程池的使用4. 線程池的實作原理5. ThreadPoolExecutor線程複用原理6. 案例分析
Java并發程式設計:線程池1. 概述2. Executor架構3. 線程池的使用4. 線程池的實作原理5. ThreadPoolExecutor線程複用原理6. 案例分析
  • ThreadPoolExecutor 之是以采取上述步驟,是為了在執行 execute() 方法時,盡可能地避免擷取全局鎖(2.1和2.3兩步在執行時需要擷取全局鎖)。當正在運作地線程數大于等于 corePoolSize 以後,幾乎所有的 execute() 方法調用都會執行步驟2.2,而步驟2.2無需擷取全局鎖。

5. ThreadPoolExecutor線程複用原理

  • 一個線程在建立的時候會指定一個線程任務,當執行完這個線程任務之後,線程自動銷毀。但是線程池卻可以複用線程,即一個線程執行完線程任務後不銷毀,繼續執行另外的線程任務。那麼,線程池是如何做到線程複用的呢?ThreadPoolExecutor 在建立線程時,會将線程封裝成工作線程 Worker,并放入工作線程組中,然後這個 Worker 反複從阻塞隊列中拿任務去執行。

6. 案例分析

package com.java.threadpool;

import java.util.concurrent.*;

/**
 * @author rrqstart
 * Executors:3大方法
 * ThreadPoolExecutor:7大參數
 * RejectedExecutionHandler:4大拒絕政策
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        /*
         * 線程池不允許使用 Executors 去建立!!!
         * 而是通過 ThreadPoolExecutor 的方式建立線程池。這樣的處理方式更加明确線程池的運作規則,規避資源耗盡的風險。
         * Executors 傳回的線程池對象的弊端如下:
         * (1) FixedThreadPool 和 SingleThreadPool:允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,進而導緻 OOM。
         * (2) CachedThreadPool:允許的建立線程數量為 Integer.MAX_VALUE,可能會建立大量的線程,進而導緻 OOM。
         */
        //1.建立線程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        //new LinkedBlockingQueue<Runnable>() 等價于 new LinkedBlockingQueue<Runnable>(Integer.MAX_VALUE)

        try {
            for (int i = 1; i <= 10; i++) {
                //2.向線程池送出任務
                threadPool.execute(() -> {
                	System.out.println(Thread.currentThread().getName() + " --> 辦理業務!");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //3. 關閉線程池
            threadPool.shutdown();
        }
    }
}
           
//采取中止政策AbortPolicy的執行結果:
pool-1-thread-1 --> 辦理業務!
pool-1-thread-5 --> 辦理業務!
pool-1-thread-4 --> 辦理業務!
pool-1-thread-3 --> 辦理業務!
pool-1-thread-2 --> 辦理業務!
pool-1-thread-4 --> 辦理業務!
pool-1-thread-5 --> 辦理業務!
pool-1-thread-1 --> 辦理業務!
java.util.concurrent.RejectedExecutionException: Task com.java.threadpool.ThreadPoolTest$$Lambda$1/1023892928@4dd8dc3 rejected from java.util.concurrent.ThreadPoolExecutor@6d03e736[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 4]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.java.threadpool.ThreadPoolTest.main(ThreadPoolTest.java:32)

Process finished with exit code 0
           
//采取調用者運作政策CallerRunsPolicy的執行結果:
pool-1-thread-1 --> 辦理業務!
pool-1-thread-5 --> 辦理業務!
pool-1-thread-1 --> 辦理業務!
pool-1-thread-4 --> 辦理業務!
pool-1-thread-3 --> 辦理業務!
pool-1-thread-2 --> 辦理業務!
main --> 辦理業務!
pool-1-thread-1 --> 辦理業務!
pool-1-thread-5 --> 辦理業務!
pool-1-thread-1 --> 辦理業務!
           
//采取丢棄政策DiscardPolicy的執行結果:
pool-1-thread-2 --> 辦理業務!
pool-1-thread-5 --> 辦理業務!
pool-1-thread-3 --> 辦理業務!
pool-1-thread-3 --> 辦理業務!
pool-1-thread-4 --> 辦理業務!
pool-1-thread-1 --> 辦理業務!
pool-1-thread-5 --> 辦理業務!
pool-1-thread-2 --> 辦理業務!