天天看點

了解Java線程池ThreadPoolExecutor

在Java的線程池的使用會有比較多的地方,有比較多的應用場景,介紹一下Java線程池ThreadPoolExecutor。

線程是一個作業系統概念。作業系統負責這個線程的建立、挂起、運作、阻塞和終結操作。而作業系統建立線程、切換線程狀态、終結線程都要進行CPU排程----這是一個耗費時間和系統資源的事情。

大多數實際場景中是這樣的:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種背景下,如果我們為每一個請求都單獨建立一個線程,那麼實體機的所有資源基本上都被作業系統建立線程、切換線程狀态、銷毀線程這些操作所占用,用于業務請求處理的資源反而減少了。是以最理想的處理方式是,将處理請求的線程數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證實體機将足夠的資源用于請求處理本身。

1.ThreadPoolExecutor類

二話不多說,來看一下ThreadPoolExecutor類的具體實作源碼。

繼承抽象類AbstractExecutorService,它實作了ExecutorService 接口

public class ThreadPoolExecutor extends AbstractExecutorService{····}           
public abstract class AbstractExecutorService implements ExecutorService {····}           
public interface ExecutorService extends Executor {····}           
public interface Executor {····}           

ThreadPoolExecutor類中提供了四個構造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }           
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }           
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }           
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }           

構造函數中需要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory和handler。

corePoolSize:線程池主要用于執行任務的是核心線程數量。預設情況下,在建立了線程池後,線程池中的線程數為0,當有任務來之後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中。

非核心線程:設定的大于corePoolSize參數小于maximumPoolSize參數的部分,就是線程池可以臨時建立的“非核心線程”的最大數量。這種情況下如果某個線程沒有運作任何任務,在等待keepAliveTime時間後,這個線程将會被銷毀,直到線程池的線程數量重新達到corePoolSize。

maximumPoolSize:目前線程池允許建立的最大線程數量。那麼如果設定的corePoolSize參數和設定的maximumPoolSize參數一緻時,線程池在任何情況下都不會回收空閑線程。keepAliveTime和timeUnit也就失去了意義。

keepAliveTime:線程沒有任務執行時最多保持多久時間會終止。預設情況下,隻有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0。

timeUnit:參數keepAliveTime的時間機關。

workQueue:一個阻塞隊列,用來存儲等待執行的任務。一般來說,這裡的阻塞隊列有以下幾種選擇:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。

threadFactory:線程工廠,主要用來建立線程。

handler:表示當拒絕處理任務時的政策。有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常

ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常

ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

2.線程池實作原理

(1)任務的執行

先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀态鎖,對線程池狀态(比如線程池大小,runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long  keepAliveTime; //線程存活時間   
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設定存活時間
private volatile int   corePoolSize;  //核心池的大小(即線程池中的線程數目大于這個參數時,送出的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;  //線程池最大能容忍的線程數
private volatile int   poolSize; //線程池中目前的線程數
private volatile RejectedExecutionHandler handler; //任務拒絕政策
private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程
private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數           

在ThreadPoolExecutor類中,最核心的任務送出方法是execute()方法,雖然通過submit也可以送出任務,但是實際上submit方法裡面最終調用的還是execute()方法,是以我們隻需要研究execute()方法的實作原理即可:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }           

·····························································································

<未完待續>