天天看點

線程池

jdk1.7

線程池的原理:

将需要執行的任務放到一個阻塞隊列(當隊列滿時,再往裡放元素就會被阻塞直到隊列不滿,或者,當隊列空時,再從裡拿元素會被阻塞直到隊列不空)中,然後由線程池建立worker線程,從隊列裡拿出任務并執行。

線程池的實作類ThreadPoolExecutor 初始化參數如下:

public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler)

corePoolSize:池中維持的最少的線程數,盡管這些線程是空閑的,類似低水位線,如果線程數超過了這個數,有空閑線程(所謂的空閑線程就是卡在getTask中沒有獲得任務的線程)等到keepAliveTime時間後還是沒有擷取到任務就會退出了,最後剩到corePoolSize個為止,這corePoolSize個core線程會不斷去取任務,如果沒有任務就在getTask中卡住。當然這些core線程也是可以有逾時時間,用allowCoreThreadTimeOut(boolean value)配置,注意是boolean型。

maximumPoolSize:池中維持的最多的線程數,類似高水位線

keepAliveTime:空閑線程的存活時間

workQueue:存放任務的阻塞隊列

threadFactory:建立工作線程的工廠類

handler:當由于隊列滿了或者線程限制導緻execute方法出問題時調用的方法。詳解execute方法中調用的reject方法

主要方法:

public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        /*

         * Proceed in 3 steps:

         *

         * 1. If fewer than corePoolSize threads are running, try to

         * start a new thread with the given command as its first

         * task.  The call to addWorker atomically checks runState and

         * workerCount, and so prevents false alarms that would add

         * threads when it shouldn't, by returning false.

         * 2. If a task can be successfully queued, then we still need

         * to double-check whether we should have added a thread

         * (because existing ones died since last checking) or that

         * the pool shut down since entry into this method. So we

         * recheck state and if necessary roll back the enqueuing if

         * stopped, or start a new thread if there are none.

         * 3. If we cannot queue task, then we try to add a new

         * thread.  If it fails, we know we are shut down or saturated

         * and so reject the task.

         */

/*    通過workerCountOf擷取目前池中的線程數,如果小于corePoolSize,就建立新的線程,并用其執行這個新加入的command任務

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

/*檢查線程池是否在運作,如果處于運作狀态(也就是可以用,沒有調用關閉一系列的動作),就把command加入阻塞隊列

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

/*如果if成功,由于offer裡有鎖,有可能會卡住,等到它傳回後可能已經過了一段時間,此時極端的情況可能是corePoolSize為0,然後之前的worker線程都完成了任務然後退出,造成該線程池中有那個command任務但無worker線程的情況,是以再檢查一遍線程池是否在運作,如果此時不處于運作狀态(也就是可以用,沒有調用關閉一系列的動作),就把剛加入的command彈出,或者如果線程池處于運作态,但worker線程為0了,要加一個worker線程。

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

/*如果if不成功,那有可能是線程池被關了,或者隊列滿了,在這裡調用了addWorker就是想即使沒有加到隊列,但還是建立個worker線程去執行,當然如果線程池關了addWorker會傳回false。是以即使線程池滿了導緻任務無法加入,但還是可以被執行的。

        else if (!addWorker(command, false))

            reject(command);

    }

private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

/*做一些檢查

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                   firstTask == null &&

                   ! workQueue.isEmpty()))

                return false;

            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

/*這裡用cas增加worker線程計數,失敗則在内部for中重試,成功則跳出外部for

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // Re-read ctl

/*由于前面增加計數有可能會在循環中重試n次,此時會消耗一段時間,有可能線程池的狀态會變,是以要做狀态檢查

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            final ReentrantLock mainLock = this.mainLock;

/*在這建立新的worker

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

/*在這加鎖是為了保護workers(hashset),因為可能會用多個線程通路這塊代碼

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.

                    int c = ctl.get();

                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

/*在這檢查一下worker中注冊的線程是不是已經執行了

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

/*開動worker中的線程

                    t.start();

                    workerStarted = true;

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        return workerStarted;

final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

/*在這裡将state置為0,既可以被中斷或者獲得鎖lock(),因為worker初始化是把state置為了-1

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

            while (task != null || (task = getTask()) != null) {

/*此處加鎖是為了防止被中斷,interruptIdleWorkers方法回調tryLock(),然後嘗試去中斷線程

                w.lock();

                // If pool is stopping, ensure thread is interrupted;

                // if not, ensure thread is not interrupted.  This

                // requires a recheck in second case to deal with

                // shutdownNow race while clearing interrupt

                if ((runStateAtLeast(ctl.get(), STOP) ||

                     (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

/*線程池的子類可以覆寫這個方法,如果不覆寫目前是空的

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    task = null;

                    w.completedTasks++;

                    w.unlock();

            completedAbruptly = false;

            processWorkerExit(w, completedAbruptly);

private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

/*此時還是比較關鍵的,因為後面會提到兩個方法一個是shutdown一個是shutdownnow,shutdown方法會會将rs置為SHUTDOWN,這時如果workQueue非空,就不會傳回null,說明worker還是可以獲得任務去做,直到workqueue裡沒有了任務才傳回null,是以shutdown方法是首先在execute裡禁止往workqueue裡加資料,而又會利用現有的worker把任務都執行完。而shutdownnow方法會則是将rs置為STOP,是以在這會傳回null,也就是worker不能再擷取任務了,要退出了,是以shutdownnow方法既是在execute裡禁止往workqueue裡加資料,而且會讓worker不再取得任務而退出。

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            boolean timed;      // Are workers subject to culling?

/*列出兩種情況需要考慮逾時時間,一種是core線程允許逾時,一種是線程數超出最低水位線

                timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*如果目前worker數沒有超出最高水位線,則開始跳出循環去隊列裡那任務,如果發現超出了最高水位線,或者timedout和timed為true(timed說明需要有worker在空閑keepAliveTime後要退出,timedout為true也就是說擷取任務時逾時了,這個逾時時間是keepAliveTime,就說明這個worker應該退出死掉),這裡就得往下進行去減少一個worker數,然後傳回null,就是傳回沒有任務,那worker就執行完退出了

                if (wc <= maximumPoolSize && ! (timedOut && timed))

                    break;

                if (compareAndDecrementWorkerCount(c))

                    return null;

            try {

/*這裡的poll方法是說隊列裡沒有任務可以等待逾時時間,在沒有就傳回null

這裡的take方法是說隊列裡沒有任務就等待

                Runnable r = timed ?

                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                    workQueue.take();

                if (r != null)

                    return r;

/*将timedOut設為true,然後繼續在for中循環,timedOut對下一次循環的for是有用的,可以看上面的分析

                timedOut = true;

/*當執行shutdownnow方法時會将中斷标志置位,如果該線程在poll或take除卡住時發現中斷被職位,就會發出InterruptedException異常,進而使其不再等待,然後繼續執行for的代碼,走到       if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 這行或會退出。

            } catch (InterruptedException retry) {

                timedOut = false;

/*shutdown是關閉所有空閑線程,然後阻止再往隊列裡放任務,但剩下的任務會被剩下的worker都執行完之後worker才會退出

 public void shutdown() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

            checkShutdownAccess();

            advanceRunState(SHUTDOWN);

/*中斷空閑的線程,所謂空閑線程就是卡住沒到timeout而且沒有擷取到任務的線程,因為置中斷标記的主要作用是當線程處于類似等待的狀态時能發生異常跳出等待,然後進一步判斷線程池的狀态在做進一步處理,主要展現在gettask中,因為shutdown的狀态加上隊列為空可以使得gettask方法傳回null,使得worker無法獲得任務退出,之是以判斷隊列為空是因為如果不為空該線程也不會卡住無法獲得任務了。

            interruptIdleWorkers();

            onShutdown(); // hook for ScheduledThreadPoolExecutor

            mainLock.unlock();

        tryTerminate();

final void tryTerminate() {

            if (isRunning(c) ||

                runStateAtLeast(c, TIDYING) ||

                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

            if (workerCountOf(c) != 0) { // Eligible to terminate

                interruptIdleWorkers(ONLY_ONE);

            mainLock.lock();

                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

                        terminated();

                        ctl.set(ctlOf(TERMINATED, 0));

                        termination.signalAll();

                    return;

            } finally {

                mainLock.unlock();

            // else retry on failed CAS

/*這個awaitTermination一般和shutdown方法一起用,因為shutdown方法是停止空閑的worker,如果有worker還在運作,shutdown不會去幹擾,隻是把線程池狀态置為shutdown,當剩下的worker執行完隊列中的所有任務才退出。

public boolean awaitTermination(long timeout, TimeUnit unit)

        throws InterruptedException {

        long nanos = unit.toNanos(timeout);

                if (runStateAtLeast(ctl.get(), TERMINATED))

                    return true;

                if (nanos <= 0)

                nanos = termination.awaitNanos(nanos);

/*該方法是試圖停止所有worker,如果該worker是空閑的

  public List<Runnable> shutdownNow() {

        List<Runnable> tasks;

            advanceRunState(STOP);

/*中斷所有worker,就是給所有worker中的線程置中斷标記,主要作用是當線程處于類似等待的狀态時能發生異常跳出等待,然後進一步判斷線程池的狀态在做進一步處理,注意展現在gettask方法中,當線程池的狀态為stop就會退出,是的worker無法擷取任務而退出,這個配合上面那句正好是worker退出。

            interruptWorkers();

/*把任務隊列裡的任務放到一個新的arraylist裡并傳回arraylist的引用

            tasks = drainQueue();

        return tasks;

private void processWorkerExit(Worker w, boolean completedAbruptly) {

        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

            decrementWorkerCount();

            completedTaskCount += w.completedTasks;

            workers.remove(w);

        if (runStateLessThan(c, STOP)) {

            if (!completedAbruptly) {

                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                if (min == 0 && ! workQueue.isEmpty())

                    min = 1;

                if (workerCountOf(c) >= min)

                    return; // replacement not needed

            addWorker(null, false);

由于建立線程池時配置參數比較多,是以jvm提供了一個工具類executors,裡面有一些靜态方法可以幫助我們建立線程池,如下:

1.newFixedThreadPool:建立固定數量的線程,核心線程數與最大線程數一緻,使用LinkedBlockingQueue存放任務

public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>()); }

2.newSingleThreadExecutor:建立一個線程,核心線程數與最大線程數一緻,使用LinkedBlockingQueue存放任務

public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService

            (new ThreadPoolExecutor(1, 1,

                                    0L, TimeUnit.MILLISECONDS,

                                    new LinkedBlockingQueue<Runnable>()));

3.newCachedThreadPool:這個方法的特點是core線程為0,最大線程為整數最大值,他使用的是SynchronousQueue

public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

繼續閱讀