天天看點

掌握Java并發程式設計線程池的實作原理

作者:java油膩的程式猿

前沿

Java中的線程池在實際項目中使用場景很多,幾乎索引需要實作異步或者并發執行任務的程式都會使用到線程池,合理的使用線程池能夠帶來以下幾點好處。

  • 降低資源的消耗: 通過出重複利用已建立的線程降低線程建立和銷毀帶來的性能消耗。
  • 提高響應速度: 當任務到達時,任務可以不需要等待線程的建立就能立即執行。
  • 提高線程的可管理性: 線程是稀缺資源,如果無限的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一配置設定、調優和控制。

一、線程池的實作原理

掌握Java并發程式設計線程池的實作原理

二、線程池的核心對象參數說明

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
複制代碼           
線程池的核心對象 說明
corePoolSize 線程池核心線程數量,corePoolSize 是線程池中的一個最小的線程數量,即使這些線程處理空閑狀态,他們也不會被銷毀,除非設定了allowCoreThreadTimeOut。
maximumPoolSize 線程池最大線程數量,線程池允許建立的最大線程數,如果隊列滿了,并且一建立的線程數小于最大線程數,則線程池會繼續建立新的線程來執行任務。
keepAliveTime 線程保持活動的時間,如果任務很多,并且每個任務的執行時間比較短,可以調大時間,提高線程的使用率
workQueue 任務隊列(阻塞隊列),有多種任務隊列,比如:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue等等
ThreadFactory 用于建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字
RejectedExecutionHandler

飽和拒絕政策,當隊列和線程池都滿了,說明線程池處于飽和狀态,那麼必須采用一種政策處理送出的新任務,預設政策是AbortPolicy,表示無法處理新任務時抛出異常,在JDK 1.5中線程池架構提供4中拒絕政策:

1.AbortPolicy:直接抛異常。

2.CallerRunsPolicy:隻用調用者所線上程來運作任務。

3.DiscardOldestPolicy:丢棄隊列裡最佳的一個任務,并執行目前任務。

4.DiscardPocily:不處理,直接丢棄掉。

三、Java中常用的四種線程池

在Java中使用線程池,可以用ThreadPoolExecutor的構造函數直接建立出線程池執行個體,在Executors類中,為我們提供了常用線程池的建立方法。

接下來我們就來了解常用的四種:

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
複制代碼           

從構造方法可以看出,它建立了一個固定大小的線程池,每次送出一個任務就建立一個線程,直到線程達到線程池的最大值nThreads。線程池的大小一旦達到最大值後,再有新的任務送出時則放入無界阻塞隊列中,等到有線程空閑時,再從隊列中取出任務繼續執行。 那麼,如何使用newFixedThreadPool呢?我們來舉個例子:

public class ThreadPoolExecutorsChallenge {

    static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                        System.out.println("運作時間: " + sdf.format(new Date()) + " " + index);
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
複制代碼           

運作結果:

運作時間: 23:56:12 1
運作時間: 23:56:12 0
運作時間: 23:56:12 2
運作時間: 23:56:14 3
運作時間: 23:56:14 4
複制代碼           

上面我建立的一個固定大小為3的線程池,然後線上程池送出個5個任務,從結果可以看到,一開始三個任務進來都是立即執行,在送出第4個任務時,由于線程池大小已經到達3并且前3個任務在運作中,是以第4個任務被放進了隊列,等待有空閑的線程時再被執行(前3個任務運作時間是一緻的,後兩個延遲了2秒才被執行)。

這裡僅僅是為了示範效果,正常的話手動建立線程池效果會更好,生産環境線程池不允許Executors建立,使用建議使用ThreadPoolExecutor方式建立,避免資源耗盡的風險。

說明:

Executors傳回的線程池對象弊端如下有:

1) FixedThreadPool和SingleThreadPool:允許的請求隊列長度為Integer.MAX_VALUE,可能對堆積大量的請求,進而導緻OOM。

2) CacheThreadPool和ScheduledThreadPool:允許的建立線程數量為Integer.MAX_VALUE,可能建立大量的線程,進而導緻OOM。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複制代碼           

使用newCachedThreadPool線程池:

public class ThreadPoolExecutorsChallenge {
    
    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            final int index = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                        System.out.println("運作時間: " + sdf.format(new Date()) + " " + index);
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
複制代碼           

運作結果:

運作時間: 23:39:04 3
運作時間: 23:39:04 1
運作時間: 23:39:04 2
運作時間: 23:39:04 0
運作時間: 23:39:04 4
複制代碼           

因為這種線程有新的任務送出,就會建立新的線程(線程池中沒有空閑線程時),不需要等待,是以送出的5個任務的運作時間是一樣的,通過Executors.newCachedThreadPool()建立線程池可能會建立大量的線程,導緻OOM。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複制代碼           

從構造方法可以看出,它建立了一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序執行。 那麼,如何使用newSingleThreadExecutor呢?我們來舉個例子:

public class ThreadPoolExecutorsChallenge {

    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int index = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                        System.out.println("運作時間: " + sdf.format(new Date()) + " " + index);
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
複制代碼           

因為該線程池類似于單線程執行,是以先執行完前一個任務後,每隔2秒,再順序執行下一個任務, 運作結果如下:

運作時間: 23:47:05 0
運作時間: 23:47:07 1
運作時間: 23:47:09 2
運作時間: 23:47:11 3
運作時間: 23:47:13 4
複制代碼           

newScheduledThreadPool

這個方法建立了一個固定大小的線程池,支援定時及周期性任務執行。 首先看一下定時執行的例子:

public class ThreadPoolExecutorsChallenge {

    public static void main(String[] args) {
        final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        System.out.println("任務送出時間:" + sdf.format(new Date()));
        scheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("任務運作時間:" + sdf.format(new Date()));
            }
        }, 3, TimeUnit.SECONDS);
        scheduledThreadPool.shutdown();
    }
}
複制代碼           

使用該線程池的schedule方法,延遲3秒鐘後執行任務,運作結果如下:

任務送出時間:23:54:22
任務運作時間:23:54:25
複制代碼           

同時使用newScheduledThreadPool可以實作周期執行的例子,實作延遲1秒後每個三秒執行一次任務:

public class ThreadPoolExecutorsChallenge {

    public static void main(String[] args) {
        final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        System.out.println("送出時間: " + sdf.format(new Date()));
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("運作時間: " + sdf.format(new Date()));
            }
        }, 1, 3, TimeUnit.SECONDS);
    }
}
複制代碼           

運作結果:

送出時間: 00:03:15
運作時間: 00:03:16
運作時間: 00:03:19
運作時間: 00:03:22
運作時間: 00:03:25
運作時間: 00:03:28
複制代碼           

四、推薦

避免耗盡的風險,推薦建立線程池方式:

//擷取系統處理器個數,作為線程池數量
int nThreads = Runtime.getRuntime().availableProcessors();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();
System.out.println("系統處理器個數:" + nThreads);

ExecutorService pool = new ThreadPoolExecutor(nThreads , 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
複制代碼           

繼續閱讀