天天看點

Java并發程式設計學習11-任務執行演練任務執行Demo

任務執行Demo

上一篇博文帶大家了解了任務執行和 Executor 架構的基礎知識,本篇将結合這些内容,示範一些不同版本的任務執行Demo,并且每個版本都實作了不同程度的并發性。

以下的示例是要實作浏覽器程式中的頁面渲染功能:将 HTML 頁面繪制到圖像緩存中【為了簡便,假設 HTML 頁面隻包含标簽文本、預定義大小的圖檔和URL】。

Java并發程式設計學習11-任務執行演練任務執行Demo

1. 串行的頁面渲染器

最簡單實作頁面渲染器功能就是對 HTML 文檔進行串行處理。首先繪制文本元素,同時為圖像預留出矩形的占位空間,在處理完第一遍文本後,程式再開始下載下傳圖像,并将它們繪制到相應的占位空間中。

public class SingleThreadRenderer (	
	void renderPage (CharSequence source){
		renderText(source);
		List<ImageData> imageData = new ArrayList<>();
		for (ImageInfo imageInfo : sacanForImageInfo(source))
			imageData.add(imageInfo.downloadImage());
		for (ImageData data : imageData)
			renderImage(data);
	}
}
           

上述圖像下載下傳過程的大部分時間都是在等待 I/O 操作執行完成,在這期間 CPU 幾乎不做任何工作。這種串行執行方法沒有充分地利用 CPU,使用者在看到最終頁面需要等待過長的時間。

這個時候通過将上述串行執行的任務分解為多個獨立的任務并發執行,就能夠獲得更高的 CPU 使用率和響應靈敏度。

2. 攜帶結果的任務

從《任務執行和Executor架構》的那篇博文中,我們知道 Executor 架構使用 Runnable 作為其基本的任務表示形式。但是 Runnable 也有自己的局限性,它不能 傳回一個值 或 抛出一個受檢查的異常。

實際上,許多任務都是存在延遲的計算,比如:

  • 執行資料庫查詢
  • 從網絡上擷取資源
  • 計算某個複雜的功能

對于這些延遲的任務,Callable 其實是個更好的任務表示形式,它的主入口點(即 call)将傳回一個值,并可能抛出一個異常。在

java.util.concurrent.Executors

中包含了一些輔助方法【callable】能将其他類型的任務【Runnable 、java.security.PrivilegedAction 和 java.security.PrivilegedExceptionAction】封裝為一個 Callable。

public interface Callable<V> {
	V call() throws Exception;
}
           
可以使用

Callable<Void>

來表示無傳回值的任務。

從《同步工具類》的那篇博文中,我們知道 Future 表示一個任務的生命周期,它提供了相應的方法來判斷是否已經完成或取消,以及擷取任務的結果和取消任務等。在 Future 的規範中,任務的生命周期隻能前進,不能後退,就像 ExecutorService 的生命周期一樣。當某個任務完成後,它就永遠停留在 “完成” 狀态上。

public interface Future<V> {
	boolean cancel(boolean mayInterruptIfRunning);
	boolean isCancelled();
	boolean isDone();
	V get() throws InterruptedException, ExecutionException, CancellationException;
	// 支援限時的擷取結果
	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}
           
在 Executor 架構中,已送出但尚未開始的任務可以取消,但對于那些已經開始執行的任務,隻有它們能響應中斷時,才能取消。已經完成的任務可以随便取消,無任何影響。

那麼如何建立一個 Future 來描述任務呢?

  • ExecutorService 中的所有 submit 方法,可以将一個 Runnable 或 Callable 送出給 Executor,并得到一個 Future 用來獲得任務的執行結果或者取消任務。
  • 也可以顯式為一個 Runnable 或 Callable 執行個體化一個 FutureTask,因為 FutureTask 實作了 Runnable,是以可以将它送出給 Executor 來執行【其實 submit 方法也是這麼做的】。

從 Java6 開始,ExecutorService 實作可以改寫 AbstractExecutorService 中的 newTaskFor 方法,進而根據已送出的 Runnable 或 Callable 來控制 Future 的執行個體化過程。

如下代碼清單【AbstractExecutorService 中的 newTaskFor 方法的預設實作、submit 方法實作】:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
	return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
	return new FutureTask<T>(callable);
}

public Future<?> submit(Runnable task) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<Void> ftask = newTaskFor(task, null);
	execute(ftask);
	return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<T> ftask = newTaskFor(task, result);
	execute(ftask);
	return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<T> ftask = newTaskFor(task);
	execute(ftask);
	return ftask;
}
           

3. 使用 Future 實作頁面渲染器

為了使頁面渲染器實作更高的并發性,首先将渲染過程分解為兩個任務,一個是渲染所有的文本,另一個是下載下傳所有的圖像。

下面我們來看一下如下示例【使用 Future 等待圖像下載下傳】:

public class FutureRenderer {

    private final ExecutorService executor = Executors.newCachedThreadPool();

    void renderPage (CharSequence source){
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
                public List<ImageData> call() {
                    List<ImageData> result = new ArrayList<ImageData>() ;
                    for (ImageInfo imageInfo : imageInfos)
                        result.add(imageInfo.downloadImage());
                    return result;
                }
            };
        Future<List<ImageData>> future = executor.submit(task);
        renderText(source);
        try {
            List<ImageData> imageData = future.get();
            for (ImageData data : imageData)
                renderImage(data);
        } catch (InterruptedException e){
            //重新設定線程的中斷狀态
            Thread.currentThread().interrupt() ;
            //由于不需要結果,是以取消任務
            future.cancel(true);
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}
           

上述 FutureRenderer 中建立了一個 Callable 來下載下傳所有的圖像,并将其送出到一個 ExecutorService,這将傳回一個描述任務執行情況的 Future。後面當主任務需要圖像時,通過 Future.get 方法就可以獲得所有下載下傳的圖像,即使還沒下載下傳好,至少下載下傳任務已經開始了。

4. 使用 CompletionService 實作頁面渲染器

在上面的 FutureRenderer 裡 ,我們已經并行地執行了兩個不同類型的任務--下載下傳圖像 與 渲染文本。如果渲染文本的速度遠遠高于下載下傳圖像的速度,那麼程式的最終性能與串行執行時的性能差别不大,反而代碼更加複雜了。其實使用者不必等到所有的圖像都下載下傳好,而是希望每下載下傳完一幅圖像就立即顯示出來。

下載下傳圖像的任務還可以繼續細分,為每一幅圖像的下載下傳都建立一個獨立任務,并線上程池中執行它們,進而将串行的下載下傳過程轉換為并行的過程,這樣也就減少下載下傳所有圖像的總時間。

下面我們來看下如下的示例【使用 CompletionService,使頁面元素在下載下傳完成後立即顯示出來】:

public class CompletionServiceRenderer {

    private final ExecutorService executor;

    CompletionServiceRenderer(ExecutorService executor) {
        this.executor = executor;
    }

    void renderPage(CharSequence source) {

        List<ImageInfo> info = scanForImageInfo(source);

        CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);

        for (final ImageInfo imageInfo : info)
            completionService.submit(new Callable<ImageData>() {
                public ImageData call() {
                    return imageInfo.downloadImage();
                }
            });

        renderText(source);

        try {
            for (int t = 0, n = info.size(); t < n; t++) {
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}	
           

下面我們來簡單了解下 CompletionService【完成服務】:

  • CompletionService 将 Executor 和 BlockingQueue 的功能融合在一起。可以将 Callable 任務送出給它來執行,然後使用類似于隊列操作的 take 和 poll 等方法來獲得已完成的結果,而這些結果将會在完成時被封裝為 Future。
  • CompletionService 有個子類實作為 ExecutorCompletionService。它的構造函數中會建立一個 BlockingQueue 來儲存計算結果。當計算完成時,調用 FutureTask 中的 done 方法。當送出某個任務時,該任務将首先包裝為一個 QueueingFuture,這是 FutureTask 的一個子類,它覆寫了父類的 done 方法,并将結果放入 BlockingQueue 中。take 和 poll 方法委托給了 BlockingQueue,這些方法會在出結果之前阻塞。

如下為 JDK 1.8 中 ExecutorCompletionService 裡的 QueueingFuture 實作【其他版本可能有差異,以實際為準】

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

	public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
	
	public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
	
	// 其他方法省略
}
           
從 ExecutorCompletionService 的構造函數可知,多個 ExecutorCompletionService 可以共享一個 Executor,是以可以建立一個對于特定計算私有,又能共享一個公共 Executor 的 ExecutorCompletionService。

5. 為任務設定時限

下面我們來看下如下的案例:

  • 某個 Web 應用程式從外部的廣告伺服器上擷取廣告資訊,但如果該應用程式在兩秒内得不到響應,将直接顯示預設的廣告,這樣即使無法獲得廣告資訊,也不會降低站點的響應性能。
  • 一個門戶網站從多個資料源并行地擷取資料,但可能隻會在指定的時間内等待資料,如果超出了等待時間,那麼将隻顯示已經獲得的資料。

上述案例都規定了任務要在指定的時間内完成,如果某個任務無法在指定時間内完成,那麼将不再需要它的結果,此時就應當放棄這個任務。

那麼如何給任務設定時限呢?

前面提到的支援時間限制的 Future.get 支援給任務設定時限:當結果可用時,它将立即傳回,如果在指定時限内沒有計算出結果,那麼将抛出 TimeoutException。

如果任務逾時了該如何取消呢?

上述通過支援時間限制的 Future.get 擷取任務結果。如果任務逾時了,它會抛出 TimeoutException,這時可以通過 Future.cancel 來取消任務。

5.1 限時擷取廣告資訊示例

下面我們來看下如下的示例【在指定時間内擷取廣告資訊】:

public class PageAdRenderer {

    private static final Long TIME_BUDGET = 2000000000L;

    private final ExecutorService executor = Executors.newCachedThreadPool();

    private final Ad DEFAULT_AD = new Ad();

    public Page renderPageWithAd() throws InterruptedException {
        long endNanos = System.nanoTime() + TIME_BUDGET;
        Future<Ad> f = executor.submit(new FetchAdTask());
        // 等待廣告的同時顯示頁面
        Page page = renderPageBody();
        Ad ad;
        try {
            // 隻等待指定的時間長度
            long timeLeft = endNanos - System.nanoTime();
            ad = f.get(timeLeft, NANOSECONDS);
        } catch (ExecutionException e) {
            ad = DEFAULT_AD;
        } catch (TimeoutException e) {
            ad = DEFAULT_AD;
            f.cancel(true);
        }
        page.setAd(ad);
        return page;
    }
}
           

上述示例生成的頁面中包括響應使用者請求的内容以及從廣告伺服器上獲得的廣告。它将擷取廣告的任務送出給一個 Executor,然後計算剩餘的文本頁面内容,最後等待廣告資訊,直到超出指定的時間。如果 get 逾時,那麼将取消廣告擷取任務,并使用預設的廣告資訊。

注意:
  • 傳遞給 get 方法的 timeout 參數的計算方法是,将 指定時限 減去 目前時間。這可能會得到負數,但

    java.util.concurrent

    中所有 與時限相關的方法 都将 負數視為零,是以不需要額外的代碼來處理這種情況。
  • Future.cancel 的參數為 true 表示任務線程可以在運作過程中中斷【在後續博文會詳細介紹】。

5.2 旅行預訂門戶網站示例

下面我們來考慮這樣一個旅行預訂門戶網站:

使用者輸入旅行的日期和其他要求,門戶網站擷取并顯示來自多條航線、旅店或汽車租賃公司的報價。在擷取不同公司報價的過程中,可能會調用 Web 服務、通路資料庫、執行一個 EDI 事物或其他機制。在這種情況下,頁面應該隻顯示在指定時間内收到的資訊。對于沒有及時響應的服務提供者,頁面可以忽略它們,或者顯示一個提示資訊。

從一個公司擷取報價的過程與從其他公司獲得報價的過程無關,是以可以将擷取報價的過程當成一個任務,進而使獲得報價的過程能并發執行。

通過上面了解的支援限時的 Future.get ,我們很容易想到如下的擷取報價的邏輯:

建立 n 個擷取報價的任務,并将其送出到一個線程池,同時保留 n 個 Future,并使用限時的 get 方法通過 Future 串行地擷取每一個結果。

雖然上面也可行,但是現在我們有更好的方法,下面來看一下如下示例【使用線程池的 invokeAll 方法】:

public class TravelWebSite {

    private final ExecutorService executor = Executors.newCachedThreadPool();

    public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, 
                                                   Set<TravelCompany> companies, 
                                                   Comparator<TravelQuote> ranking, 
                                                   long time, TimeUnit unit) throws InterruptedException {
        List<QuoteTask> tasks = new ArrayList<>();
        
        for (TravelCompany company : companies) 
            tasks.add(new QuoteTask(company, travelInfo));
        
        List<Future<TravelQuote>> futures = executor.invokeAll(tasks, time, unit);
        
        List<TravelQuote> quotes = new ArrayList<>(tasks.size());

        Iterator<QuoteTask> taskIterator = tasks.iterator();
        
        for (Future<TravelQuote> future : futures) {
            QuoteTask task = taskIterator.next();
            try {
                quotes.add(future.get());
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
            } catch (CancellationException e) {
                quotes.add(task.getTimeoutQuote(e));
            }
        }

        Collections.sort(quotes, ranking);
        return quotes;
    }
}
           
  • invokeAll 方法的參數為一組任務,并傳回一組 Future。這兩個集合有着相同的結構。
  • invokeAll 方法按照任務集合中疊代器的順序将所有的 Future 添加到傳回的集合中,進而使調用者能将各個 Future 與其表示的 Callable 關聯起來。
  • 當所有任務都執行完畢時,或者調用線程被中斷時,又或者超過指定時限時,invokeAll 将傳回。
  • 當超過指定時限後,任何還未完成的任務都會取消。
  • 當 invokeAll 方法傳回後,每個任務要麼正常地完成,要麼被取消,而用戶端代碼可以調用 get 或 isCancelled 來判斷究竟是何種情況。

6. 總結

繼續閱讀