天天看點

【高并發】深度解析線程池中那些重要的頂層接口和抽象類

大家好,我是冰河~~

在上一篇《

高并發之——不得不說的線程池與ThreadPoolExecutor類淺析

》一文中,從整體上介紹了Java的線程池。如果細細品味線程池的底層源碼實作,你會發現整個線程池體系的設計是非常優雅的!這些代碼的設計值得我們去細細品味和研究,從中學習優雅代碼的設計規範,形成自己的設計思想,為我所用!哈哈,說多了,接下來,我們就來看看線程池中那些非常重要的接口和抽象類,深度分析下線程池中是如何将抽象這一思想運用的淋漓盡緻的!

通過對線程池中接口和抽象類的分析,你會發現,整個線程池設計的是如此的優雅和強大,從線程池的代碼設計中,我們學到的不隻是代碼而已!!

題外話:膜拜Java大神Doug Lea,Java中的并發包正是這位老爺子寫的,他是這個世界上對Java影響力最大的一個人。

一、接口和抽象類總覽

說起線程池中提供的重要的接口和抽象類,基本上就是如下圖所示的接口和類。

【高并發】深度解析線程池中那些重要的頂層接口和抽象類

接口與類的簡單說明:

  • Executor接口:這個接口也是整個線程池中最頂層的接口,提供了一個無傳回值的送出任務的方法。
  • ExecutorService接口:派生自Executor接口,擴充了很過功能,例如關閉線程池,送出任務并傳回結果資料、喚醒線程池中的任務等。
  • AbstractExecutorService抽象類:派生自ExecutorService接口,實作了幾個非常實作的方法,供子類進行調用。
  • ScheduledExecutorService定時任務接口,派生自ExecutorService接口,擁有ExecutorService接口定義的全部方法,并擴充了定時任務相關的方法。

接下來,我們就分别從源碼角度來看下這些接口和抽象類從頂層設計上提供了哪些功能。

二、Executor接口

Executor接口的源碼如下所示。

public interface Executor {
    //送出運作任務,參數為Runnable接口對象,無傳回值
    void execute(Runnable command);
}           

從源碼可以看出,Executor接口非常簡單,隻提供了一個無傳回值的送出任務的execute(Runnable)方法。

由于這個接口過于簡單,我們無法得知線程池的執行結果資料,如果我們不再使用線程池,也無法通過Executor接口來關閉線程池。此時,我們就需要ExecutorService接口的支援了。

三、ExecutorService接口

ExecutorService接口是非定時任務類線程池的核心接口,通過ExecutorService接口能夠向線程池中送出任務(支援有傳回結果和無傳回結果兩種方式)、關閉線程池、喚醒線程池中的任務等。ExecutorService接口的源碼如下所示。

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {

    //關閉線程池,線程池中不再接受新送出的任務,但是之前送出的任務繼續運作,直到完成
    void shutdown();
    
    //關閉線程池,線程池中不再接受新送出的任務,會嘗試停止線程池中正在執行的任務。
    List<Runnable> shutdownNow();
    
    //判斷線程池是否已經關閉
    boolean isShutdown();
    
    //判斷線程池中的所有任務是否結束,隻有在調用shutdown或者shutdownNow方法之後調用此方法才會傳回true。
    boolean isTerminated();

    //等待線程池中的所有任務執行結束,并設定逾時時間
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    //送出一個Callable接口類型的任務,傳回一個Future類型的結果
    <T> Future<T> submit(Callable<T> task);
    
    //送出一個Callable接口類型的任務,并且給定一個泛型類型的接收結果資料參數,傳回一個Future類型的結果
    <T> Future<T> submit(Runnable task, T result);

    //送出一個Runnable接口類型的任務,傳回一個Future類型的結果
    Future<?> submit(Runnable task);

    //批量送出任務并獲得他們的future,Task清單與Future清單一一對應
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    
    //批量送出任務并獲得他們的future,并限定處理所有任務的時間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit) throws InterruptedException;
    
    //批量送出任務并獲得一個已經成功執行的任務的結果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException; 
    
    //批量送出任務并獲得一個已經成功執行的任務的結果,并限定處理任務的時間
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}           

關于ExecutorService接口中每個方法的含義,直接上述接口源碼中的注釋即可,這些接口方法都比較簡單,我就不一一重複列舉描述了。這個接口也是我們在使用非定時任務類的線程池中最常使用的接口。

四、AbstractExecutorService抽象類

AbstractExecutorService類是一個抽象類,派生自ExecutorService接口,在其基礎上實作了幾個比較實用的方法,提供給子類進行調用。我們還是來看下AbstractExecutorService類的源碼。

注意:大家可以到java.util.concurrent包下檢視完整的AbstractExecutorService類的源碼,這裡,我将AbstractExecutorService源碼進行拆解,詳解每個方法的作用。

  • newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}           

RunnableFuture類用于擷取執行結果,在實際使用時,我們經常使用的是它的子類FutureTask,newTaskFor方法的作用就是将任務封裝成FutureTask對象,後續将FutureTask對象送出到線程池。

  • doInvokeAny方法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    //送出的任務為空,抛出空指針異常
    if (tasks == null)
        throw new NullPointerException();
    //記錄待執行的任務的剩餘數量
    int ntasks = tasks.size();
    //任務集合中的資料為空,抛出非法參數異常
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    //以目前執行個體對象作為參數建構ExecutorCompletionService對象
    // ExecutorCompletionService負責執行任務,後面調用用poll傳回第一個執行結果
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        // 記錄可能抛出的執行異常
        ExecutionException ee = null;
        // 初始化逾時時間
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();
    
        //送出任務,并将傳回的結果資料添加到futures集合中
        //送出一個任務主要是確定在進入循環之前開始一個任務
        futures.add(ecs.submit(it.next()));
        --ntasks;
        //記錄正在執行的任務數量
        int active = 1;

        for (;;) {
            //從完成任務的BlockingQueue隊列中擷取并移除下一個将要完成的任務的結果。
            //如果BlockingQueue隊列中中的資料為空,則傳回null
            //這裡的poll()方法是非阻塞方法
            Future<T> f = ecs.poll();
            //擷取的結果為空
            if (f == null) {
                //集合中仍有未執行的任務數量
                if (ntasks > 0) {
                    //未執行的任務數量減1
                    --ntasks;
                    //送出完成并将結果添加到futures集合中
                    futures.add(ecs.submit(it.next()));
                    //正在執行的任務數量加•1
                    ++active;
                }
                //所有任務執行完成,并且傳回了結果資料,則退出循環
                //之是以處理active為0的情況,是因為poll()方法是非阻塞方法,可能導緻未傳回結果時active為0
                else if (active == 0)
                    break;
                //如果timed為true,則執行擷取結果資料時設定逾時時間,也就是逾時擷取結果表示
                else if (timed) {    
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                //沒有設定逾時,并且所有任務都被送出了,則一直阻塞,直到傳回一個執行結果
                else
                    f = ecs.take();
            }
            //擷取到執行結果,則将正在執行的任務減1,從Future中擷取結果并傳回
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        //如果從所有執行的任務中擷取到一個結果資料,則取消所有執行的任務,不再向下執行
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}           

這個方法是批量執行線程池的任務,最終傳回一個結果資料的核心方法,通過源代碼的分析,我們可以發現,這個方法隻要擷取到一個結果資料,就會取消線程池中所有運作的任務,并将結果資料傳回。這就好比是很多要進入一個居民小區一樣,隻要有一個人有門禁卡,門衛就不再檢查其他人是否有門禁卡,直接放行。

在上述代碼中,我們看到送出任務使用的ExecutorCompletionService對象的submit方法,我們再來看下ExecutorCompletionService類中的submit方法,如下所示。

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}           

可以看到,ExecutorCompletionService類中的submit方法本質上調用的還是Executor接口的execute方法。

  • invokeAny方法
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}           

這兩個invokeAny方法本質上都是在調用doInvokeAny方法,線上程池中送出多個任務,隻要傳回一個結果資料即可。

直接看上面的代碼,大家可能有點暈。這裡,我舉一個例子,我們在使用線程池的時候,可能會啟動多個線程去執行各自的任務,比如線程A負責task_a,線程B負責task_b,這樣可以大規模提升系統處理任務的速度。如果我們希望其中一個線程執行完成傳回結果資料時立即傳回,而不需要再讓其他線程繼續執行任務。此時,就可以使用invokeAny方法。

  • invokeAll方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    //辨別所有任務是否完成
    boolean done = false;
    try {
        //周遊所有任務
        for (Callable<T> t : tasks) {
            将每個任務封裝成RunnableFuture對象送出任務
            RunnableFuture<T> f = newTaskFor(t);
            //将結果資料添加到futures集合中
            futures.add(f);
            //執行任務
            execute(f);
        }
        //周遊結果資料集合
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            //任務沒有完成
            if (!f.isDone()) {
                try {
                    //阻塞等待任務完成并傳回結果
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        //任務完成(不管是正常結束還是異常完成)
        done = true;
        //傳回結果資料集合
        return futures;
    } finally {
        //如果發生中斷異常InterruptedException 則取消已經送出的任務
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            // 在添加執行任務時逾時判斷,如果逾時則立刻傳回futures集合
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }
         // 周遊所有任務
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                //對結果進行判斷時進行逾時判斷
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                //重置任務的逾時時間
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}           

invokeAll方法同樣實作了無逾時時間設定和有逾時時間設定的邏輯。

無逾時時間設定的invokeAll方法總體邏輯為:将所有任務封裝成RunnableFuture對象,調用execute方法執行任務,将傳回的結果資料添加到futures集合,之後對futures集合進行周遊判斷,檢測任務是否完成,如果沒有完成,則調用get方法阻塞任務,直到傳回結果資料,此時會忽略異常。最終在finally代碼塊中對所有任務是否完成的辨別進行判斷,如果存在未完成的任務,則取消已經送出的任務。

有逾時設定的invokeAll方法總體邏輯與無逾時時間設定的invokeAll方法總體邏輯基本相同,隻是在兩個地方添加了逾時的邏輯判斷。一個是在添加執行任務時進行逾時判斷,如果逾時,則立刻傳回futures集合;另一個是每次對結果資料進行判斷時添加了逾時處理邏輯。

invokeAll方法中本質上還是調用Executor接口的execute方法來送出任務。

  • submit方法

submit方法的邏輯比較簡單,就是将任務封裝成RunnableFuture對象并送出,執行任務後傳回Future結果資料。如下所示。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}           

從源碼中可以看出submit方法送出任務時,本質上還是調用的Executor接口的execute方法。

綜上所述,在非定時任務類的線程池中送出任務時,本質上都是調用的Executor接口的execute方法。至于調用的是哪個具體實作類的execute方法,我們在後面的文章中深入分析。

五、ScheduledExecutorService接口

ScheduledExecutorService接口派生自ExecutorService接口,繼承了ExecutorService接口的所有功能,并提供了定時處理任務的能力,ScheduledExecutorService接口的源代碼比較簡單,如下所示。

package java.util.concurrent;

public interface ScheduledExecutorService extends ExecutorService {

    //延時delay時間來執行command任務,隻執行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //延時delay時間來執行callable任務,隻執行一次
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    //延時initialDelay時間首次執行command任務,之後每隔period時間執行一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
                                                  
    //延時initialDelay時間首次執行command任務,之後每延時delay時間執行一次
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}           

至此,我們分析了線程池體系中重要的頂層接口和抽象類。

通過對這些頂層接口和抽象類的分析,我們需要從中感悟并體會軟體開發中的抽象思維,深入了解抽象思維在具體編碼中的實作,最終,形成自己的程式設計思維,運用到實際的項目中,這也是我們能夠從源碼中所能學到的衆多細節之一。這也是進階或資深工程師和架構師必須了解源碼細節的原因之一。

好了,今天就到這兒吧,我是冰河,我們下期見~~