天天看點

ExecutorService 線程池簡介

1. Executors 建立線程池的方法

      newFixedThreadPool() 方法,該方法傳回一個固定數量的線程池,該方法的線程數始終不變,當有一個任務送出時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中等待有空閑的線程去執行。适用場景:可用于Web服務瞬時削峰,但需注意長時間持續高峰情況造成的隊列阻塞。

      newSingleThreadExecutor() 方法, 建立-一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務列隊中。适用場景:快速處理大量耗時較短的任務,如Netty的NIO接受請求時,可使用CachedThreadPool。

      newCachedThreadPool() 方法, 傳回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若用空閑的線程則執行任務,若無任務則不建立線程。并且每一個空閑線程會在60秒後自動回收。

      newScheduledThreadPool() 方法,該方法傳回一個SchededExecutorService對象,但該線程池可以指定線程的數量。

檢視源碼可知,上面的 四種方法其實都是執行個體化的

ThreadPoolExecutor 對象
           

2.自定義線程池

既然我們都知道,上面的四種方法實際上就是 new ThreadPoolExecutor() 來的,是以當上面的四種方法滿足不了我們的需求的時候,我們可以自定義線程池

ExecutorService pool = new ThreadPoolExecutor(
        1,  // 同時執行的線程數
        2,  // 最大線程數
        0L,  // 線程存活時間
         TimeUnit.SECONDS,  // 線程存活時間機關
         new ArrayBlockingQueue<>(3),  // 用什麼隊列
         new MyThreadFactory(), // 線程工廠可以自定義也可以不寫
         new MyPolicy() // 拒絕政策,就是線程池滿了存不進去的時候怎麼辦,
                        // 這個一般在使用有界隊列的時候才會有這種問題
);
           
ExecutorService 線程池簡介

自定義線程池的關鍵在于用什麼隊列

A.有界隊列,比如說  ArrayBlockingQueue

        在使用有界隊列時,若有新的任務需要執行,如果線程池實際線程數小于corePoolsize,則優先建立線程,若大于corePoolsize, 則會将任務加入隊列,若隊列已滿,則在總線程數不大于maximumPoolsize的前提下,建立新的線程,若線程數大于maximumPoolSize,則執行拒絕政策。或其他自定義方式。

B.無界隊列,比如說 LinkedBlockingQueue

       與有界隊列相比,除非系統資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新任務到來,系統的線程數小于corePoolsize時,則建立線程執行任務。當達到corePoolsize後,就不會繼續增加。若後續仍有新的任務加入,而有沒有空閑的線程資源,則任務直接進入隊列等待。若任務建立和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統記憶體。

C. 拒絕政策,jdk自帶四種,也可以自定義,實作  RejectedExecutionHandler 接口,然後傳入既可

  • AbortPolicy:直接抛出異常,系統正常工作
  • CallerRunsPolicy:隻要線程池未關閉,該政策直接在調用者線程中,運作目前被丢棄的任務。
  • DiscardOldestPolicy:丢棄最老的一個請求,嘗試再次送出目前任務。
  • DiscardPolicy:丢棄無法處理的任務,不給予任何處理。

D.ThreadFactory 工廠,可以自定義,實作  ThreadFactory 接口,重寫  newThread 方法即可

代碼示例:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Main2 {

    public static void main(String[] args) {

        ExecutorService pool = new ThreadPoolExecutor(
                1,  // 同時執行的線程數
                2,  // 最大線程數
                0L,  // 線程存活時間
                TimeUnit.SECONDS,  // 線程存活時間機關
                new ArrayBlockingQueue<>(3),  // 用什麼隊列
                new MyThreadFactory(), // 線程工廠可以自定義也可以不寫
                new MyPolicy() // 拒絕政策,就是線程池滿了存不進去的時候怎麼辦,
                // 這個一般在使用有界隊列的時候才會有這種問題
        );

        Task t1 = new Task("1", "zhangzq");
        Task t2 = new Task("2", "licm");
        Task t3 = new Task("3", "zhangjy");
        Task t4 = new Task("4", "youy");
        Task t5 = new Task("5", "zhuyf");
        Task t6 = new Task("6", "lixw");

        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        pool.execute(t6);

        pool.shutdown();
    }

}
class Task implements Runnable{

    private String id;
    private String name;

    public Task(String id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "Task{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public void run() {
        try {
            System.out.println( this.toString() );
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class MyThreadFactory implements ThreadFactory {

    private final AtomicInteger num = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r ,"zzq-Thread-"+ num.getAndIncrement() );
        System.out.println( t.getName() + " has been create... " );
        return t;
    }
}

class MyPolicy implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println( "該線程添加失敗..."+r.toString() );
    }
}
           
ExecutorService 線程池簡介

繼續閱讀