天天看點

線程池剖析(一)

我們借用JDK自帶的線程池ThreadPoolExecutor講解線程池思想,打算分三個主題來講:

  1. corePoolSize、maximumPoolSize與workQueue
  2. ThreadFactory
  3. RejectedExecutionHandler

本文先講第一個主題。

1、corePoolSize、maximumPoolSize與workQueue

線程池的運作過程:

線程池剖析(一)

為了友善測試的時候,了解線程的調用情況,是以改寫了ThreadFactory,線程名稱字首為THREAD-POOL-,按序号遞增,這樣我們就能清楚看到線程調用的是核心線程還是拓展線程。由于JDK線程池是先建立核心線程,後面有需要再建立拓展線程,是以index <= corePoolSize的都是為核心線程,其他的則為拓展線程。

public class AsyncThreadFactory implements ThreadFactory {

    private final AtomicInteger index = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, String.format("THREAD-POOL-%s", index.getAndIncrement()));
    }
}
           

測試代碼:

public class AsyncUtilMain {

    /**
     * 核心線程數
     */
    private static final int CORE_POOL_SIZE = 10;
    /**
     * 最大線程數
     */
    private static final int MAX_POOL_SIZE = 100;
    /**
     * 阻塞隊列長度
     */
    private static final int QUEUE_SIZE = 10;
    /**
     * 需求線程數
     */
    private static final int REQUIREMENT_POOL_SIZE = 10;


    public static void main(String[] args) {
        
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 60, TimeUnit.MINUTES, new LinkedBlockingQueue<>(QUEUE_SIZE),
                new AsyncThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

        for (int index = 0; index < REQUIREMENT_POOL_SIZE; index++) {
            int value = index;
            threadPoolExecutor.submit(() -> System.out.println(Thread.currentThread().getName() + ":" + value));
        }

        threadPoolExecutor.shutdown();
    }
}
           

根據上述流程圖,線程池的運作情況會有以下四種:

1、corePoolSize >= 需求線程數

10個核心線程數, 90個可拓展線程數, 阻塞隊列數10個

需求線程數為10個, 是以所有的線程均可被核心線程數調用

是以運作結果應該為1~10号線程(核心線程)完成整個過程

運作結果:

THREAD-POOL-2:1

THREAD-POOL-4:3

THREAD-POOL-3:2

THREAD-POOL-1:0

THREAD-POOL-5:4

THREAD-POOL-6:5

THREAD-POOL-7:6

THREAD-POOL-8:7

THREAD-POOL-9:8

THREAD-POOL-10:9

2、corePoolSize + workQueueSize >= 需求線程數

5個核心線程數, 5個可拓展線程數, 阻塞隊列長度為5

需求線程數為10個, 是以5個核心線程會馬上消費5個需求線程,剩下的5個會在阻塞隊列中等待5個核心線程消費。

是以運作結果應該為1~5号線程(核心線程)完成整個過程

配置代碼:

/**
     * 核心線程數
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大線程數
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 阻塞隊列長度
     */
    private static final int QUEUE_SIZE = 5;
    /**
     * 需求線程數
     */
    private static final int REQUIREMENT_POOL_SIZE = 10;
           

運作結果:

THREAD-POOL-3:2

THREAD-POOL-4:3

THREAD-POOL-1:0

THREAD-POOL-2:1

THREAD-POOL-1:7

THREAD-POOL-4:6

THREAD-POOL-5:4

THREAD-POOL-3:5

THREAD-POOL-1:9

THREAD-POOL-2:8

3、corePoolSize + workQueueSize < 需求線程數

5個核心線程數, 5個可拓展線程數, 阻塞隊列長度為3

需求線程數為10個, 是以5個核心線程會馬上消費5個需求線程,剩下的3個會在阻塞由于阻塞隊列溢出,那麼剩下的兩個需求線程将會觸發可拓展線程,是以實際運作的線程應該為1~5号線程(核心線程)和 6 ~ 7号線程(可拓展線程)

配置代碼:

/**
     * 核心線程數
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大線程數
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 阻塞隊列長度
     */
    private static final int QUEUE_SIZE = 3;
    /**
     * 需求線程數
     */
    private static final int REQUIREMENT_POOL_SIZE = 10;
           

運作結果:

THREAD-POOL-1:0

THREAD-POOL-6:8

THREAD-POOL-1:5

THREAD-POOL-6:6

THREAD-POOL-5:4

THREAD-POOL-2:1

THREAD-POOL-4:3

THREAD-POOL-7:9

THREAD-POOL-3:2

THREAD-POOL-1:7

4、maximumPoolSize+ workQueueSize + < 需求線程數

這種情況是最大線程數和阻塞隊列都不足以滿足需求線程數,那麼這時就會觸發線程池配置的拒絕政策:

配置代碼:

/**
     * 核心線程數
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大線程數
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 阻塞隊列長度
     */
    private static final int QUEUE_SIZE = 3;
    /**
     * 需求線程數
     */
    private static final int REQUIREMENT_POOL_SIZE = 15;
           

運作結果:

THREAD-POOL-10:12

THREAD-POOL-5:4

THREAD-POOL-4:3

Exception in thread “main” THREAD-POOL-5:5

THREAD-POOL-6:8

THREAD-POOL-7:9

THREAD-POOL-9:11

THREAD-POOL-8:10

THREAD-POOL-2:1

THREAD-POOL-10:7

THREAD-POOL-4:6

THREAD-POOL-1:0

THREAD-POOL-3:2

java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 10, active threads = 8, queued tasks = 1, completed tasks = 2]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)

at cn.gaaidou.web.server.socket.AsyncUtilMain.main(AsyncUtilMain.java:40)

由于代碼配置的拒絕政策是ThreadPoolExecutor.AbortPolicy,是以線程池直接抛出異常。