天天看點

白話講懂java線程池的使用和源碼

線程池是一個稀缺資源,如果被無限的建立,會消耗系統資源,産生競争死鎖,降低系統效率。對于每個線程執行時間很少的這種場景就很适合讓線程重用。否則可能出現花在系統建立銷毀線程上的時間比線程真正執行的事件還長,消耗的資源還多的情況。

應用場景:

  • 單個任務處理時間比較短
  • 需要處理的任務數量比較大

線程池的優勢:

  1. 重用線程,減少線程建立銷毀的系統消耗,提高性能。
  2. 提高響應速度(不用建立銷毀)
  3. 提高線程的管理性,使線程可以統一的配置設定調休監控

線程池的建立

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
           

任務送出

1、public void execute()  //送出任務無傳回值

2、public Future<?> submit()  //任務執行完成後有傳回值

參數解釋

corePoolSize

線程池中的核心線程數,當送出一個任務時,線程池建立一個新線程執行任務,直到目前線程數等于corePoolSize;如果目前線程數為corePoolSize,繼續送出的任務被儲存到阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前建立并啟動所有核心線程。

maximumPoolSize

線程池中允許的最大線程數。如果目前阻塞隊列滿了,且繼續送出任務,則建立新的線程執行任務,前提是目前線程數小于maximumPoolSize;

keepAliveTime

線程池維護線程所允許的空閑時間。當線程池中的線程數量大于corePoolSize的時候,如果這時沒有新的任務送出,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;

unit

keepAliveTime的機關;

workQueue

用來儲存等待被執行的任務的阻塞隊列,且任務必須實作Runable接口,在JDK中提供了如下阻塞隊列:

1、ArrayBlockingQueue:基于數組結構的有界阻塞隊列,按FIFO排序任務;

2、LinkedBlockingQuene:基于連結清單結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQuene;

4、priorityBlockingQuene:具有優先級的無界阻塞隊列; 

線程池結構:

白話講懂java線程池的使用和源碼

例如我們定義了一個, 這段代碼會建立一個核心5個核心線程,最多10個線程,blocking queue最多放5個。

1. 剛開始的五個線程每次送出會将建立核心線程然後用核心線程來執行他們。

2.第6-10個線程在被送出進來會将他們放到blocking queue裡面去。

3.當blockingqueue也滿了之後會在繼續建立非核心線程來執行。

4.當非核心線程也滿了這時候回開始執行拒絕政策maximumPoolSize - corePoolSize = 非核心線程最大數。

三種阻塞隊列:

    BlockingQueue<Runnable> workQueue = null;

    workQueue = new ArrayBlockingQueue<>(5);//基于數組的先進先出隊列,有界

    workQueue = new LinkedBlockingQueue<>();//基于連結清單的先進先出隊列,無界

    workQueue = new SynchronousQueue<>();//無緩沖的等待隊列,無界

四種拒絕政策:

    RejectedExecutionHandler rejected = null;

    rejected = new ThreadPoolExecutor.AbortPolicy();//預設,隊列滿了丢任務抛出異常

    rejected = new ThreadPoolExecutor.DiscardPolicy();//隊列滿了丢任務不異常

    rejected = new ThreadPoolExecutor.DiscardOldestPolicy();//将最早進入隊列的任務删,之後再嘗試加入隊列

    rejected = new ThreadPoolExecutor.CallerRunsPolicy();//如果添加到線程池失敗,那麼主線程會自己去執行該任務

    自定義拒絕政策,如果以上的幾種拒絕政策都不滿足條件,則可以實作RejectedExceptionHandler接口,并将自己的邏輯寫在rejectedExcepiton方法内。

五種線程池:

    ExecutorService threadPool = null;

    threadPool = Executors.newCachedThreadPool();//有緩沖的線程池,線程數 JVM 控制

    threadPool = Executors.newFixedThreadPool(3);//固定大小的線程池

    threadPool = Executors.newScheduledThreadPool(2);

    threadPool = Executors.newSingleThreadExecutor();//單線程的線程池,隻有一個線程在工作

    threadPool = new ThreadPoolExecutor();//預設線程池,可控制參數比較多   

源代碼分析:

1. 首先了解一下,解源碼中的方法,

workerCountOf(c)
runStateOf(c)
           

如下,代碼在源碼中通過一個int型4位元組一共32位,來存儲線程數量,和線程池狀态兩個資訊。32位的左邊3位來存儲狀态,右邊29位來存儲線程數量。通過CAPACITY的與或運算來快速擷取線程數量和線程狀态。注釋中有例子,

public class Test {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //c  ======>  01000000000000000011111000000000
    //          & 11100000000000000000000000000000
    //          = 01000000000000000000000000000000 
    //     equals 01000000000000000000000000000000 -> TIDYING
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    public static void main(String[] args) {
        System.out.println(COUNT_BITS);
        System.out.println(Integer.toBinaryString(CAPACITY));   //00011111111111111111111111111111
        System.out.println(Integer.toBinaryString(~CAPACITY));  //11100000000000000000000000000000
        System.out.println(Integer.toBinaryString(RUNNING));    //11100000000000000000000000000000
        System.out.println(Integer.toBinaryString(SHUTDOWN));   //00000000000000000000000000000000
        System.out.println(Integer.toBinaryString(STOP));       //00100000000000000000000000000000
        System.out.println(Integer.toBinaryString(TIDYING));    //01000000000000000000000000000000
        System.out.println(Integer.toBinaryString(TERMINATED)); //01100000000000000000000000000000
    }
           

2. 源碼分析

第一步:源碼入口: ThreadPoolExecutor.execute(Runnable command)

1. 如果目前線程數小于核心線程池數,直接建立核心線程。

2. 如果目前線程數大于等于核心線程池數,但是Queue裡可以加入線程。

3.直接啟動非核心線程執行。

4.啟動非核心線程失敗,執行拒絕政策。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
           

第二步:進入第一步分支1. -- addWorker(command, true)

注意,該方法中用到大量的自旋,如下,其原因是該方法中有用到CAS操作。compareAndIncrementWorkerCount(c),因為cas操作時操作完的結果值與預期結果不一緻時需要重新操作。就是在緩存一緻性協定中的I(Invalid)值,因為在大量并發的操作中不能保證自己的操作一定能成功,是以需要重試。

for (;;) {
...
}
           

1. 目前線程狀态必須是running。

2.重新檢查目前線程池中線程數。如果大于coresize或maxcoresize退出。

3.目前線程池的線程計數器+1.

4.new 一個 new Worker(firstTask)。

5.加鎖,添加啟動Worker過程中不能多個線程同時執行。

6. workers.add(w)。

7.啟動worker的run方法。

...
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
   firstTask == null &&
   ! workQueue.isEmpty())) //1
return false;
...
if (wc >= CAPACITY ||
	wc >= (core ? corePoolSize : maximumPoolSize))//2
	return false;
if (compareAndIncrementWorkerCount(c))//3
...
w = new Worker(firstTask);//4
final Thread t = w.thread;
...
final ReentrantLock mainLock = this.mainLock;//5
mainLock.lock();
...
	workers.add(w);//6
...
if (workerAdded) {
	t.start();//7
...
           
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           

第三步,進入第二步方法7 -- t.start().

在第二步的代碼中,對t進行了如下指派:

w = new Worker(firstTask);
final Thread t = w.thread;
           

在Worker的構造方法中又有如下thread指派,this.firstTask = firstTask; 這裡的firstTask就是第一步中的command,我們自己定義的線程。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); 
    //w.thread的指派是new Thread(this)其中this就是Worker本身.
}
           

是以t.start(),調用的就是Worker的run方法:

public void run() {
    runWorker(this);
}
           

進入runWorker(this)方法,該方法有以下步驟:

1.  取出使用者自己定義的任務 Runnable task = w.firstTask;

2. 判斷該任務是否為空。如果不為空則取出該任務。

3..如果任務為空,則調用getTasks取出一個任務。

4. 調用該任務的run方法,及開始執行使用者自己用的線程。

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//1
...
try {
while (task != null || (task = getTask()) != null) {//2,3
...
task.run();//4
...
           

第四步,getTask().

1. check queue 是否為 空。如果為空傳回null。

2.從queue中取出一個元素,如果在keepAliveTime事件内能夠取出則用poll如果不能就用take一直阻塞等着取。

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
           
...
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //1
	decrementWorkerCount();
	return null;
}
...
try {
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take(); //2
...