天天看點

Java之ThreadPoolExcutor和四種常見的線程池(2)

三、4類常見的線程池

newCachedThreadPool建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。

newFixedThreadPool 建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。

newScheduledThreadPool 建立一個定長線程池,支援定時及周期性任務執行。

newSingleThreadExecutor 建立一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

其實他們都是通過ThreadPoolExcutors建立的

1.CachedThreadPool

由Executors的newCachedThreadPool方法建立,不存在核心線程,隻存在數量不定的非核心線程,而且其數量最大值為Integer.MAX_VALUE。當線程池中的線程都處于活動時(全滿),線程池會建立新的線程來處理新的任務,否則就會利用新的線程來處理新的任務,線程池中的空閑線程都有逾時機制,預設逾時時長為60s,超過60s的空閑線程就會被回收。和FixedThreadPool不同的是,CachedThreadPool的任務隊列其實相當于一個空的集合,這将導緻任何任務都會被執行,因為在這種場景下SynchronousQueue是不能插入任務的,SynchronousQueue是一個特殊的隊列,在很多情況下可以了解為一個無法儲存元素的隊列。從CachedThreadPool的特性看,這類線程比較适合執行大量耗時較小的任務。當整個線程池都處于閑置狀态時,線程池中的線程都會因為逾時而被停止回收,幾乎是不占任何系統資源。實作方式如下:

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}      

2.FixedThreadPool

由Executors的newFixedThreadPool方法建立。它是一種線程數量固定的線程池,當線程處于空閑狀态時,他們并不會被回收,除非線程池被關閉。當所有的線程都處于活動狀态時,新的任務都會處于等待狀态,直到有線程空閑出來。FixedThreadPool隻有核心線程,且該核心線程都不會被回收,這意味着它可以更快地響應外界的請求。jdk實作如下:FixedThreadPool沒有額外線程,隻存在核心線程,而且核心線程沒有逾時機制,而且任務隊列沒有長度的限制。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}      

3.ScheduledThreadPool

通過Executors的newScheduledThreadPool方式建立,核心線程數量是固定的,而非核心線程是沒有限制的,并且當非核心線程閑置時它會被立即回收,ScheduledThreadPool這類線程池主要用于執行定時任務和具有固定時期的重複任務,實作方法如下:

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}      

4.SingleThreadPool

通過Executors的newSingleThreadExecutor方法來建立。這類線程池内部隻有一個核心線程,它確定所有的任務都在同一個線程中按順序執行。SingleThreadExecutor的意義在于統一所有外界任務一個線程中,這使得這些任務之間不需要處理線程同步的問題,實作方式如下:

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}      

四、為什麼《阿裡巴巴Java開發手冊》上要禁止使用Executors來建立線程池

Executors建立出來的線程池使用的全都是無界隊列,而使用無界隊列會帶來很多弊端,最重要的就是,它可以無限儲存任務,是以很有可能造成OOM異常。同時在某些類型的線程池裡面,使用無界隊列還會導緻maxinumPoolSize、keepAliveTime、handler等參數失效

五、我們用代碼測試下

import java.util.*;
import java.lang.*;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
 
class Rextester
{  
    private static int produceTaskSleepTime = 5;
    private static int consumeTaskSleepTime = 1000;
    private static int taskMaxNumber = 10; //定義最大添加10個線程到線程池中
    public static void main(String args[]) {
        
        //構造一個線程池
        //該政策默默的丢棄無法處理的任務,不予任何處理。
        ThreadPoolExecutor threadPool = getTpe("DiscardPolicy");
        //該政策将丢棄最老的一個請求,也就是即将被執行的任務,并嘗試再次送出目前任務。
        //ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
        for (int i = 1; i <= taskMaxNumber; i++) {
            try {
                //添加任務到線程池,我們要知道添加的任務是Runnable
                String value = "value is: " + i;
                System.out.println("put:" + value);
                threadPool.execute(new ThreadPoolTask(value));
                //線程休息一段時間,友善我們分析結果
                Thread. sleep(produceTaskSleepTime);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
   /**
    * 線程池執行的任務
    */
    public static class ThreadPoolTask implements Runnable {
    //儲存任務所需要的資料
        private String value;
        public ThreadPoolTask(String value) {
            this.value = value;
        }
        public void run() {
            //列印語句
            System. out.println("start------" + value);
            try {
                Thread. sleep(consumeTaskSleepTime);
            } catch (Exception e) {
                e.printStackTrace();
            }
            value = "";
        }
    }
 
   /**
    *初始化線程池 
    */
    public static ThreadPoolExecutor getTpe(String type) {
        if ("DiscardOldestPolicy".equals(type)) {
            return  new ThreadPoolExecutor(2, 4, 3,
                    TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new ThreadPoolExecutor.DiscardOldestPolicy()); 
        } else {
            return  new ThreadPoolExecutor(2, 4, 3,
                    TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new ThreadPoolExecutor.DiscardPolicy());    
        }
        //DiscardPolicy DiscardOldestPolicy
    }
}      

運作結果:

put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 3
start------value is: 4
start------value is: 5      

我們把上面的生成線程池的代碼不//

ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");      

運作結果如下

put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 8
start------value is: 9
start------value is: 10