天天看點

線程池工廠Executors程式設計的藝術

  Executors是一個線程池的工廠類,提供各種有用的線程池的建立,使用得當,将會使我們并發程式設計變得簡單!今天就來聊聊這個工廠類的藝術吧!

  Executors隻是Executor架構的主要成員元件之一,為java的異步任務排程執行提供了重要的入口!

  在說Executors之前,還需要說一下另一個Executor架構的重要成員,ThreadPoolExecutor。

  ThreadPoolExecutor 實作了ExecutorService接口,提供了線程池的排程功能!成為純種池技術的基石!

ThreadPoolExecutor 的主要構造函數:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }      

  咱們此處不是要去分析其源碼,隻是想看一下怎樣使用他!

  從構造方法可以看出 ThreadPoolExecutor 的主要參數 7 個,在其注釋上也有說明功能,咱們翻譯下每個參數的功能:

corePoolSize: 線程池核心線程數(平時保留的線程數),使用時機: 在初始時刻,每次請求進來都會建立一個線程直到達到該size
    maximumPoolSize: 線程池最大線程數,使用時機: 當workQueue都放不下時,啟動新線程,直到最大線程數,此時到達線程池的極限
    keepAliveTime/unit: 超出corePoolSize數量的線程的保留時間,unit為時間機關
    workQueue: 阻塞隊列,當核心線程數達到或者超出後,會先嘗試将任務放入該隊列由各線程自行消費;  
        ArrayBlockingQueue: 構造函數一定要傳大小
        LinkedBlockingQueue: 構造函數不傳大小會預設為65536(Integer.MAX_VALUE ),當大量請求任務時,容易造成 記憶體耗盡。
        SynchronousQueue: 同步隊列,一個沒有存儲空間的阻塞隊列 ,将任務同步傳遞給工作線程。
        PriorityBlockingQueue: 優先隊列
    threadFactory:線程工廠,用于線程需要建立時,調用其newThread()生産新線程使用
    handler: 飽和政策,當隊列已放不下任務,且建立的線程已達到 maximum 後,則不能再處理任務,直接将任務交給飽和政策
        AbortPolicy: 直接抛棄(預設)
        CallerRunsPolicy: 用調用者的線程執行任務
        DiscardOldestPolicy: 抛棄隊列中最久的任務
        DiscardPolicy: 抛棄目前任務      

  是以,雖然參數有點多,但是其實仔細閱讀以上的注釋,基本就能很好地使用線程池了,不過咱們還可以通過一個流程圖來說明這一切,對于喜歡看圖說話的同學來說是件好事!

線程池工廠Executors程式設計的藝術

  好了,ThreadPoolExecutor 介紹完後,什麼感覺呢? 說是簡單,其實還是挺麻煩的,畢竟你還要去了解各種隊列的公優缺點,去了解線程池範圍怎樣設定合理。是以,是時候讓 Executors 工廠出場了!

   作為一個工廠類,Executors 簡化了各種參數,隻忘文生義即可明白其意思!Executors 主要提供4種類型的線程池!

1. FixedThreadPool 建立一個指定數量的線程池

  可控制線程最大并發數,超出的線程會在隊列中等待。

  其實作方式如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }      

  使用固定線程數的worker和無界隊列儲存多餘的task,是以建立的線程數不會超過corePoolSize,是以maxPoolSize是一個無效參數,是以keepAliveTime是一個無效參數,是以不會調用RejectedExecutionHandler.rejectedExecution()方法,即無拒絕政策可用;進而在外部表現來看,就是固定線程,在執行無限的隊列!

2. SingleThreadExecutor 建立一個單線程化的線程池

  它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

  其實作方式如下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }      

  與 FixedThreadPool 原理類似,不過這裡隻會使用一個線程在運作。使用一個線程可以保證取任務時的順序一緻性,進而表現出先到先得的效果!在要求有執行順序要求的場景,剛好派上用場!

3. CachedThreadPool, 建立一個可緩存的無限線程池

  如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }      

  使用SyncronousQueue作為線程池的工作隊列,這是特殊的隊列,它是個沒容量的阻塞隊列(至于為什麼要用這個特性,據說是性能比 LinkedBlockingQueue 更高,詳解他),每個插入操作必須有一個線程對應的移除操作,反之一樣;當送出一個cached任務時,執行SyncronousQueue.offer(x),如果有等待的空閑線程,則比對成功,exec立即傳回;如果沒有比對,則會建立一個新線程執行任務;任務執行完後使用poll()等待keep即60秒,一直嘗試從隊列中擷取任務,逾時後自動釋放線程;

  使用這種線程池把握好節奏,因為看起來線程池動不動就會建立系列線程池,而且動不動就會釋放線程,完全是不可控的,不可監控的。

4. ScheduledThreadPool 建立一個定長線程池

  支援定時及周期性任務執行。

  其實作如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }      

  看起來,這個在參數方面沒有太多的解說,隻是依賴于 ScheduledThreadPoolExecutor 的實作了!是以,我們來看一下 ScheduledThreadPoolExecutor是怎麼實作的?

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor    {}
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }      

  ok, 如上兩個,就夠了!ScheduledThreadPoolExecutor 也是繼承了 ThreadPoolExecutor, 其構造方法也是依賴于父類,隻是将 隊列實作改為 DelayedWorkQueue,進而實作延時運作或者定期執行的效果!

  而相比于 Timer 來說,ScheduledThreadPool會有更好的執行效果,因為Timer隻會有一個線程來執行任務,會有局限性。而且 ScheduledThreadPool 提供其他很多的可操作方法!

  Executors 的出現,使線程池的使用難度大大降低,使開發更友善,在合适的場合使用相應的工廠方法,定能讓開發事半功倍!

不要害怕今日的苦,你要相信明天,更苦!

繼續閱讀