天天看點

【多線程】說說線程池

前言

線程池内部是多個線程的集合,在建立初期,線程池會建立出多個空閑的線程,當有一個任務需要執行時,線程池會選擇出一個線程去執行它,執行結束後,該線程不會被銷毀,而是可以繼續複用。

使用線程池可以大大減少線程頻繁建立與銷毀的開銷,降低了系統資源的消耗。當任務來臨時,直接複用之前的線程,而不是先建立,提高了系統的響應速度。此外,線程池可以控制最大的并發數,避免資源的過度消耗。

簡單執行個體

先給出一個線程池的簡單例子:

package com.xue.testThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 4; i++) {
            int finalI = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "正在執行任務" + finalI);
                }
            });
        }
        threadPool.shutdown();
    }
}      

輸出如下:

【多線程】說說線程池

可見,2個線程總共執行了4個任務,線程得到了複用。

線程池的核心參數

這些核心參數位于ThreadPoolExecutor的構造方法中:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)      
  • corePoolSize               核心線程數,或者說常駐線程數,線程池中最少線程數
  • maximumPoolSize      最大線程數
  • keepAliveTime             空閑線程的存活時間,線程池中目前線程數大于corePoolSize時,那些空閑時間達到keepAliveTime的空閑線程,它們将會被銷毀掉
  • TimeUnit                       keepAliveTime的時間機關
  • workQueue                   任務隊列,存放未被執行的任務
  • threadFactory               建立線程的工廠
  • handler                          拒絕政策,目前線程數≥最大線程數且任務隊列滿的時候,對後續任務的拒絕方式

線程池的種類

不同的線程池有不同的适用場景,本質上都是在Executors類中執行個體化一個ThreadPoolExecutor對象,隻是傳入的參數不一樣罷了。

線程池的種類有以下幾種:

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }      

建立一個固定大小的線程池,即核心線程數等于最大線程數,每個線程的存活時間和線程池的壽命一緻,線程池滿負荷運作時,多餘的任務會加入到無界的阻塞隊列中,newFixedThreadPool可以很好的控制線程的并發量。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }      

建立一個可以無限擴大的線程池,當任務來臨時,有空閑線程就去執行,否則立即建立一個線程。當線程的空閑時間超過1分鐘時,銷毀該線程。适用于執行任務較少且需要快速執行的場景,即短期異步任務。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }      

建立一個大小為1的線程池,用于順序執行任務。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }      

建立一個初始大小為corePoolSize的線程池,線程池的存活時間沒有限制,newScheduledThreadPool中的schedule方法用于延時執行任務,scheduleAtFixedRate用于周期性地執行任務。

 線程池執行任務的流程

  • 當線程池中線程數小于corePoolSize時,新送出任務将建立一個新線程執行任務,即使此時線程池中存在空閑線程。
  • 當線程池中線程數達到corePoolSize時,新送出任務将被放入workQueue中,等待線程池中任務排程執行 。
  • 當workQueue已滿,且maximumPoolSize > corePoolSize時,新送出任務會建立新線程執行任務。
  • 當workQueue已滿,且送出任務數超過maximumPoolSize,任務由RejectedExecutionHandler處理。
  • 當線程池中線程數超過corePoolSize,且超過這部分的空閑時間達到keepAliveTime時,回收這些線程。
  • 當設定allowCoreThreadTimeOut(true)時,線程池中corePoolSize範圍内的線程空閑時間達到keepAliveTime也将回收。

使用更加直覺的流程圖來描述:

【多線程】說說線程池

注:此章節參考​​通俗易懂,各常用線程池執行的-流程圖​​

工作隊列

工作隊列用來存儲送出的任務,工作隊列一般使用的都是阻塞隊列。阻塞隊列可以保證任務隊列中沒有任務時阻塞擷取任務的線程,使得線程進入wait狀态,釋放cpu資源。當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執行。

阻塞隊列一般由以下幾種:

LinkedBlockingQueue  

由單連結清單實作的無界阻塞隊列,遵循FIFO。注意這裡的無界是因為其記錄隊列大小的資料類型是int,那麼隊列長度的最大值就是恐怖的Integer.MAX_VALUE,這個值已經很大了,是以可以将之稱為無界隊列。不過該隊列也提供了有參構造函數,可以手動指定其隊列大小,否則使用預設的int最大值。

LinkedBlockingQueue隻能從head取元素,從tail添加元素。添加元素和擷取元素都有獨立的鎖,也就是說它是讀寫分離的,讀寫操作可以并行執行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在并發情況下的線程安全。

當線程數目達到corePoolSize時,後續的任務會直接加入到LinkedBlockingQueue中,在不指定其隊列大小的情況下,該隊列永遠也不會滿,可能記憶體滿了,隊列都不會滿,此時maximumPoolSize和拒絕政策将不會有任何意義。

ArrayBlockingQueue

由數組實作的有界阻塞隊列,同樣遵循FIFO,必須制定隊列大小。使用全局獨占鎖的方式,使得在同一時間隻有一個線程能執行入隊或出隊操作,相比于LinkedBlockingQueue,ArrayBlockingQueue鎖的力度很大。

SynchronousQueue

是一個沒有容量的隊列,當然也可以稱為單元素隊列。會将任務直接傳遞給消費者,添加任務時,必須等待前一個被添加的任務被消費掉,即take動作等待put動作,put動作等待take動作,put與take是循環往複的。

如果線程拒絕執行該隊列中的任務,或者說沒有線程來執行。那麼舊任務無法被執行,新任務也無法被添加,線程池将陷入一種尴尬的境地。是以,該隊列一般需要maximumPoolSize為Integer.MAX_VALUE,有一個任務到來,就立馬新起一個線程執行,newCachedThreadPool就是使用的這種組合。

關于這些阻塞隊列的源碼解析,可能需要另開篇幅。

線程工廠

先看一下,ThreadPoolExecutor構造方法中預設使用的線程工廠

【多線程】說說線程池
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }      

defaultThreadFactory對于線程的命名方式為“pool-”+pool的自增序号+"-thread-"+線程的自增序号,這也印證了在簡單執行個體的章節中,輸出Thread.getCurrentThread.getName()是“pool-1-thread-1”的樣式

預設線程工廠給線程的取名沒有太多的意義,在實際開發中,我們一般會給線程取個比較有識别度的名稱,友善出現問題時的排查。

拒絕政策

如果當工作隊列已滿,且線程數目達到maximumPoolSize後,依然有任務到來,那麼此時線程池就會采取拒絕政策。

ThreadPoolExecutor中提供了4種拒絕政策。

AbortPolicy

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();   

     public static class AbortPolicy implements RejectedExecutionHandler {
 
            public AbortPolicy() { }

            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
            }
    }      

這是線程池的預設拒絕政策,直接會丢棄任務并抛出RejectedExecutionException異常。

DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }      

丢棄後續送出的任務,但不抛出異常。建議在一些無關緊要的場景中使用此拒絕政策,否則無法及時發現系統的異常狀态。

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }      

從源碼中可以看到,此拒絕政策會丢棄隊列頭部的任務,然後将後續送出的任務加入隊列中。

CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }      

由調用線程執行該任務,即送出任務的線程,一般是主線程。

如何配置最大線程數

CPU密集型任務

CPU密集指的是需要進行大量的運算,一般沒有什麼阻塞。

盡量使用較小的線程池,大小一般為CPU核心數+1。因為CPU密集型任務使得CPU使用率很高,若開過多的線程數,會造成CPU過度切換。

IO密集型任務

IO密集指的是需要進行大量的IO,阻塞十分嚴重,可以挂起被阻塞的線程,開啟新的線程幹别的事情。

總結

繼續閱讀