天天看點

線程安全(六)線程池系列文章目錄0.前言1.Executor 與ExecutorService接口2.AbstractExecutorService類3.ThreadPoolExecutor4.常見線程池5.線程池關鍵屬性

系列文章目錄

線程安全(一)java對象頭分析以及鎖狀态

線程安全(二)java中的CAS機制

線程安全(三)實作方法sychronized與ReentrantLock(阻塞同步)

線程安全(四)Java記憶體模型與volatile關鍵字

線程安全(五)線程狀态和線程建立

線程安全(六)線程池

線程安全(七)ThreadLocal和java的四種引用

線程安全(八)Semaphore

線程安全(九)CyclicBarrier

線程安全(十)AQS(AbstractQueuedSynchronizer)

0.前言

為什麼要使用線程池?

直接使用線程會有線程重複的建立和銷毀

優點:

  • 降低記憶體消耗,避免線程重複的建立和銷毀
  • 可以建立核心線程數,可直接使用,不需要再建立
  • 顯性的管理監控線程,防止建立過多

先說結果

線程安全(六)線程池系列文章目錄0.前言1.Executor 與ExecutorService接口2.AbstractExecutorService類3.ThreadPoolExecutor4.常見線程池5.線程池關鍵屬性

再看類圖

線程安全(六)線程池系列文章目錄0.前言1.Executor 與ExecutorService接口2.AbstractExecutorService類3.ThreadPoolExecutor4.常見線程池5.線程池關鍵屬性

1.Executor 與ExecutorService接口

1.1.Executor接口

// 送出任務的辦法
void execute(Runnable command);
           

1.2.ExecutorService接口

// 關閉線程池,已送出的任務繼續執行,不接受繼續送出新任務
    void shutdown();

    // 關閉線程池,嘗試停止正在執行的所有任務,不接受繼續送出新任務
    // 它和前面的方法相比,加了一個單詞“now”,差別在于它會去停止目前正在進行的任務
    List<Runnable> shutdownNow();

    // 線程池是否已關閉
    boolean isShutdown();

    // 如果調用了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麼傳回true
    // 這個方法必須在調用shutdown或shutdownNow方法之後調用才會傳回true
    boolean isTerminated();

    // 等待所有任務完成,并設定逾時時間
    // 我們這麼了解,實際應用中是,先調用 shutdown 或 shutdownNow,
    // 然後再調這個方法等待所有的線程真正地完成,傳回值意味着有沒有逾時
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;

    // 送出一個 Callable 任務
    <T> Future<T> submit(Callable<T> task);

    // 送出一個 Runnable 任務,第二個參數将會放到 Future 中,作為傳回值,
    // 因為 Runnable 的 run 方法本身并不傳回任何東西
    <T> Future<T> submit(Runnable task, T result);

    // 送出一個 Runnable 任務
    Future<?> submit(Runnable task);

    // 執行所有任務,傳回 Future 類型的一個 list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;

    // 也是執行所有任務,但是這裡設定了逾時時間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;

    // 隻有其中的一個任務結束了,就可以傳回,傳回執行完的那個任務的結果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;

    // 同上一個方法,隻有其中的一個任務結束了,就可以傳回,傳回執行完的那個任務的結果,
    // 不過這個帶逾時,超過指定的時間,抛出 TimeoutException 異常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
           

2.AbstractExecutorService類

//  建構RunnableFuture
	protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

   // 送出任務
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 1. 将任務包裝成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

   // 送出任務
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 1. 将任務包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

   // 送出任務
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1. 将任務包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
           

不難看出,上述隻是實作送出任務,沒有真正開啟線程,真正運作的還是excute方法。

3.ThreadPoolExecutor

3.1.屬性

32位的整數,前3位用于存放線程池狀态,低29位辨別線程數。
// 原子性操作變量,CAS的展現
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 29位
private static final int COUNT_BITS = Integer.SIZE - 3;

// 線程池的最大線程數
111 11111111111111111111111111(29個1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState存儲在高階位
// 運作:接受新任務并處理隊列任務 
111 00000000000000000000000000000(111後29個0)
private static final int RUNNING    = -1 << COUNT_BITS;

// 關機:不接受新任務,但處理隊列任務 
000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 停止:不接受新任務,不處理排隊任務,并中斷正在進行的任務 
001 00000000000000000000000000000(1後29個0)
private static final int STOP       =  1 << COUNT_BITS;

// 整理:所有任務已經終止
010 00000000000000000000000000000(10後29個0)
private static final int TIDYING    =  2 << COUNT_BITS;

// terminated() 方法結束後,已完成 
011 00000000000000000000000000000(11後29個0)
private static final int TERMINATED =  3 << COUNT_BITS;
           

3.2.線程狀态方法

// CAPACITY值為:11111111111111111111111111111
// ~CAPACITY值為:11100000000000000000000000000000

// &操作,位運算結果将低29位置0了,而高3位還是保持c的值,得到是runState的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 這個方法用于取出workerCount的值 

// &操作,位運算結果将高3位置0了,保留參數c的低29位,得到workerCount的值
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 之前說過 32為2進制數,前三位存的是線程狀态,後29位存的是線程數,這個運算使将他倆合在一起 取rs(runState)的前三位和wc(workerCount)的後29位
private static int ctlOf(int rs, int wc) { return rs | wc; }
// c是否小于s 
private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
// c是否大于等于s
private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
// c是否小于SHUTDOEN 即是否在run 隻有RUNNING狀态會小于0
private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
// 線程号+1
private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
// 線程号-1
 private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
// 一個線程突然終止 減少線程号
private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
           

3.2.相關屬性

// 隊列 用于儲存任務并傳遞給工作線程的隊列
private final BlockingQueue<Runnable> workQueue;
// 鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 包含所有工作線程,隻有持有mainLock的才能進入
private final HashSet<Worker> workers = new HashSet<Worker>();
// 等待條件支援awaittermination
private final Condition termination = mainLock.newCondition();
//最大的池大小
private int largestPoolSize;
//完成任務的計數器。僅在工作線程終止時更新。隻有在mainlock通路
private long completedTaskCount;
// 線程工廠
private volatile ThreadFactory threadFactory;
// 飽和和關閉時使用
private volatile RejectedExecutionHandler handler;
// 線程從隊列中擷取任務的逾時時間,也就是說如果線程空閑超過這個時間就會終止。
private volatile long keepAliveTime;
// 如果為false(預設),即使空閑時,核心線程仍然保持生存狀态。
// 如果為true,核心線程使用keepalivetime逾時等待工作。
private volatile boolean allowCoreThreadTimeOut;
// 核心線程數量
private volatile int corePoolSize;
// 最大線程數。注意,實際最大值是由容量内部限定的。
private volatile int maximumPoolSize;
// 預設拒絕 程式
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
// 終結器
private final AccessControlContext acc;
           

3.3.構造方法

// 核心線程數,最大線程數,保持時間,機關,隊列
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

// 核心線程數,最大線程數,保持時間,機關,隊列,線程工廠
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

// 核心線程數,最大線程數,保持時間,機關,隊列, handler
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

// 核心線程數,最大線程數,保持時間,機關,隊列,線程工廠,handler
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

3.4. void execute(Runnable command)

//
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 1.如果少于核心線程數的線程在運作,啟動一個新線程,原子性檢查運作狀态和線程号,
         * 通過傳回false來防止錯誤的建立線程
         * 2. 如果一個任務可以成功地排隊,那麼我們仍然需要仔細檢查是否應該添加一個線程。
         * (自從上次檢查結束後,有的線程有的已經死亡)或者自從進入這個方法後池就關閉了。
         *是以我們重新檢查狀态,如果停止,需要復原入隊,如果沒有, 或啟動一個新線程。
         * 3.如果我們不能對任務進行隊列,那麼我們嘗試添加一個新線程。
         * 如果失敗,我們知道我們被關閉或飽和,是以任務倍拒絕。
         // 擷取線程狀态和線程數
        int c = ctl.get();
        // 活動線程數 小與 核心線程數,直接啟動新的線程,在增加時會原子性檢測
        // 1,小于核心線程建立線程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.線程池處于運作狀态,并且隊列未滿
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 不是runState 則從workQueue中移除任務并拒絕
            if (! isRunning(recheck) && remove(command))
	            // 采用線程池指定的政策拒絕任務
                reject(command);
            // 線程池處于RUNNING狀态 || 線程池處于非RUNNING狀态但是任務移除失敗 
            else if (workerCountOf(recheck) == 0)
            // 這行代碼是為了SHUTDOWN狀态下沒有活動線程了,但是隊列裡還有任務沒執行這種特殊情況。  
             // 添加一個null任務是因為SHUTDOWN狀态下,線程池不再接受新任務  
                addWorker(null, false);
              // 兩種情況:  
                   // 1.非RUNNING狀态拒絕新的任務  
                   // 2.隊列滿了啟動新的線程失敗(workCount > maximumPoolSize)
        }
        // 如果 workQueue 隊列滿了,/ 以 maximumPoolSize 為界建立新的 worker,
        // 如果失敗,說明目前線程數已經達到 maximumPoolSize,執行拒絕政策
        else if (!addWorker(command, false))
            reject(command);
    }
           

3.5. void shutdown()

// 會将runState置為SHUTDOWN,會終止所有空閑的線程。
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {   
            checkShutdownAccess();
             // 線程池狀态設為SHUTDOWN,如果已經至少是這個狀态那麼則直接傳回  
            advanceRunState(SHUTDOWN);
            // 注意這裡是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit →  
	        // tryTerminate方法中會保證隊列中剩餘的任務得到執行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

private void interruptIdleWorkers(boolean onlyOne) {  
    final ReentrantLock mainLock = this.mainLock;  
    mainLock.lock();  
    try {  
        for (Worker w : workers) {  
            Thread t = w.thread;  
   // w.tryLock能擷取到鎖,說明該線程沒有在運作,因為runWorker中執行任務會先lock,  
          // 是以保證了中斷的肯定是空閑的線程。  
            if (!t.isInterrupted() && w.tryLock()) {  
                try {  
                    t.interrupt();  
                } catch (SecurityException ignore) {  
                } finally {  
                    w.unlock();  
                }  
            }  
            if (onlyOne)  
                break;  
        }  
    } finally {  
        mainLock.unlock();  
    }  
} 
           

3.6. List shutdownNow()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // STOP狀态:不再接受新任務且不再執行隊列中的任務。
            advanceRunState(STOP);
            // 中斷所有線程
            interruptWorkers();
            // 傳回隊列中還沒有被執行的任務。
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

 void interruptIfStarted() {  
            Thread t;  
			// 初始化時state == -1  
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {  
                try {  
                    t.interrupt();  
                } catch (SecurityException ignore) {  
                }  
            }  
        }
           

3.7. void runWorker(Worker w)

// 第一次啟動會執行初始化傳進來的任務firstTask;
// 然後會從workQueue中取任務執行,如果隊列為空則等待keepAliveTime這麼長時間。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的構造函數中抑制了線程中斷setState(-1),是以這裡需要unlock進而允許中斷  
        w.unlock(); // allow interrupts
        // 為true的情況:1.執行任務抛出異常;2.被中斷。
        boolean completedAbruptly = true;
        try {
	        // 如果getTask傳回null那麼getTask中會将workerCount遞減,如果異常了這個遞減操作會在processWorkerExit中處理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
	                // 任務執行前可以插入一些處理,子類重載該方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
	                    // 執行使用者任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
	                    // 和beforeExecute一樣,留給子類去重載
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
	        // 結束線程的一些清理工作
            processWorkerExit(w, completedAbruptly);
        }
    }
           

3.8. boolean addWorker(Runnable firstTask, boolean core)

// 第一個為目前任務,第二個為是否為核心線程,是用corePoolSize ,false用maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||  
            // workQueue.isEmpty())  
            // 滿足下列調價則直接傳回false,線程建立失敗:  
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時不再接受新的任務,且所有任務執行結束  
            // rs = SHUTDOWN:firtTask != null 此時不再接受任務,但是仍然會執行隊列中的任務  
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,  
            // false),任務為null && 隊列為空  
            // 最後一種情況也就是說SHUTDONW狀态下,如果隊列不為空還得接着往下執行,為什麼?add一個null任務目的到底是什麼?  
            // 看execute方法隻有workCount==0的時候firstTask才會為null結合這裡的條件就是線程池SHUTDOWN了不再接受新任務  
            // 但是此時隊列不為空,那麼還得建立線程把任務給執行完才行
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			// 走到這的情形:  
            // 1.線程池狀态為RUNNING  
            // 2.SHUTDOWN狀态,但隊列中還有任務需要執行 
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 原子操作遞增workCount  

                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果線程池的狀态發生變化則重試
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		// wokerCount遞增成功  
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 并發的通路線程池workers對象必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
					// RUNNING狀态 || SHUTDONW狀态下清理隊列中剩餘的任務 
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啟動新添加的線程,這個線程首先執行firstTask,然後不停的從隊列中取任務執行  
                // 當等待keepAlieTime還沒有任務執行則該線程結束。見runWoker和getTask方法的代碼。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
	        // 線程啟動失敗,則從wokers中移除w并遞減wokerCount
            if (! workerStarted)
	            // 遞減wokerCount會觸發tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           

3.9 invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)

批量執行線程任務時設定的的逾時時間(常用于Future,需要拿到所有線程傳回結果的值)

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
            	// 所有任務執行execute,同步執行
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
            	// 判斷每個future是否執行完成
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                    	// 在限定時間内拿到結果
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    // 根據目前已用時間 更新下個future的限定時間
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
           // 隻要非完成的,就會中斷所有線程
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
           

4.常見線程池

Executors是一個工具類,生成幾種常見的線程池

線程安全(六)線程池系列文章目錄0.前言1.Executor 與ExecutorService接口2.AbstractExecutorService類3.ThreadPoolExecutor4.常見線程池5.線程池關鍵屬性

附:圖檔來源于網絡,侵删

4.1.newFixedThreadPool()

生成一個固定大小的線程池(核心線程數和最大線程數是一樣的,超出核心線程數的新送出的任務都會加入無界隊列)

因為是無界隊列,是以最大線程數其實用不到

建立一個線程池,該線程池重用在共享無界隊列上運作的固定數量的線程。在任何時候,最多 nThreads 個線程将是活動的處理任務。如果在所有線程都處于活動狀态時送出了其他任務,它們将在隊列中等待,直到有線程可用。如果任何線程在關閉之前的執行過程中由于失敗而終止,如果需要執行後續任務,新的線程将取代它。池中的線程将一直存在,直到顯式關閉。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
           

4.2.newSingleThreadExecutor()

生成隻有一個線程的固定線程池,設定線程數為 1

因為是無界隊列,是以最大線程數其實用不到

建立一個使用單個工作線程在無界隊列上運作的 Executor。 (但請注意,如果該單線程在關閉前的執行過程中因故障而終止,如果需要執行後續任務,則新線程将取代它。)任務保證按順序執行,并且不會有超過一個任務處于活動狀态在任何給定時間。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
           

4.3.newCachedThreadPool()

生成一個需要的時候就建立新的線程,同時可以複用之前建立的線程(如果這個線程目前沒有任務)的線程池。

SynchronousQueue内部沒有資料的存儲空間,任何入隊的線程都會阻塞,直到有線程來出隊,可以保證如果加入一個資料,必須要remove掉,才能再次計入

建立一個線程池,根據需要建立新線程,但在可用時将重用以前構造的線程。這些池通常會提高執行許多短期異步任務的程式的性能。如果可用,對執行的調用将重用以前構造的線程。如果沒有可用的現有線程,将建立一個新線程并将其添加到池中。六十秒内未使用的線程将被終止并從緩存中删除。是以,保持空閑足夠長時間的池不會消耗任何資源。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
           

4.4.newScheduledThreadPool()

定期或延時執行

建立一個線程池,可以安排指令在給定延遲後運作,或定期執行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
           

5.線程池關鍵屬性

5.1.線程池建立參數

  • corePoolSize(核心線程數)
  • maximumPoolSize(最大線程數)
  • workQueue(隊列)
  • keepAliveTime(空閑時間)
  • rejectedExecutionHandler(拒絕政策)

    corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設定而得到回收(allowCoreThreadTimeOut(true))。

    workQueue 用于存放任務,添加任務的時候,如果目前線程數超過了 corePoolSize,那麼往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。

    keepAliveTime 用于設定空閑時間,如果線程數超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作

    rejectedExecutionHandler 用于處理當線程池不能執行此任務時的情況。

  • AbortPolicy(預設):丢棄任務并抛出RejectedExecutionException異常。
  • DiscardPolicy:丢棄任務,但是不抛出異常。
  • DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新送出被拒絕的任務。
  • CallerRunsPolicy:由調用線程(送出任務的線程)處理該任務。