天天看點

google Guava包的ListenableFuture解析接口添加回調(Callbacks)ListenableFuture的建立ApplicationCheckedFuture

原文位址  譯者:羅立樹  校對:方騰飛

并發程式設計是一個難題,但是一個強大而簡單的抽象可以顯著的簡化并發的編寫。出于這樣的考慮,Guava 定義了 ListenableFuture接口并繼承了JDK concurrent包下的Future 接口。

我們強烈地建議你在代碼中多使用ListenableFuture來代替JDK的 Future, 因為:

  • 大多數Futures 方法中需要它。
  • 轉到ListenableFuture 程式設計比較容易。
  • Guava提供的通用公共類封裝了公共的操作方方法,不需要提供Future和ListenableFuture的擴充方法。

接口

傳統JDK中的Future通過異步的方式計算傳回結果:在多線程運算中可能或者可能在沒有結束傳回結果,Future是運作中的多線程的一個引用句柄,確定在服務執行傳回一個Result。

ListenableFuture可以允許你注冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用,  或者在運算(多線程執行)完成後立即執行。這樣簡單的改進,使得可以明顯的支援更多的操作,這樣的功能在JDK concurrent中的Future是不支援的。

ListenableFuture 中的基礎方法是addListener(Runnable, Executor), 該方法會在多線程運算完的時候,指定的Runnable參數傳入的對象會被指定的Executor執行。

添加回調(Callbacks)

多數使用者喜歡使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)的方式, 或者 另外一個版本version(譯者注:addCallback(ListenableFuture<V> future,FutureCallback<? super V> callback)),預設是采用 MoreExecutors.sameThreadExecutor()線程池, 為了簡化使用,Callback采用輕量級的設計.  FutureCallback<V> 中實作了兩個方法:

  • onSuccess(V),在Future成功的時候執行,根據Future結果來判斷。
  • onFailure(Throwable), 在Future失敗的時候執行,根據Future結果來判斷。

ListenableFuture的建立

對應JDK中的 ExecutorService.submit(Callable) 送出多線程異步運算的方式,Guava 提供了ListeningExecutorService 接口, 該接口傳回 ListenableFuture 而相應的 ExecutorService 傳回普通的 Future。将 ExecutorService 轉為 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)進行裝飾。

01

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(

10

));

02

ListenableFuture explosion = service.submit(

new

Callable() {

03

public

Explosion call() {

04

return

pushBigRedButton();

05

}

06

});

07

Futures.addCallback(explosion, 

new

FutureCallback() {

08

// we want this handler to run immediately after we push the big red button!

09

public

void

onSuccess(Explosion explosion) {

10

walkAwayFrom(explosion);

11

}

12

public

void

onFailure(Throwable thrown) {

13

battleArchNemesis(); 

// escaped the explosion!

14

}

15

});

另外, 假如你是從 FutureTask轉換而來的, Guava 提供ListenableFutureTask.create(Callable<V>) 和ListenableFutureTask.create(Runnable, V). 和 JDK不同的是, ListenableFutureTask 不能随意被繼承(譯者注:ListenableFutureTask中的done方法實作了調用listener的操作)。

假如你喜歡抽象的方式來設定future的值,而不是想實作接口中的方法,可以考慮繼承抽象類AbstractFuture<V> 或者直接使用 SettableFuture 。

假如你必須将其他API提供的Future轉換成 ListenableFuture,你沒有别的方法隻能采用寫死的方式JdkFutureAdapters.listenInPoolThread(Future) 來将 Future 轉換成 ListenableFuture。盡可能地采用修改原生的代碼傳回 ListenableFuture會更好一些。

Application

使用ListenableFuture 最重要的理由是它可以進行一系列的複雜鍊式的異步操作。

1

ListenableFuture rowKeyFuture = indexService.lookUp(query);

2

AsyncFunction<RowKey, QueryResult> queryFunction =

3

new

AsyncFunction<RowKey, QueryResult>() {

4

public

ListenableFuture apply(RowKey rowKey) {

5

return

dataService.read(rowKey);

6

}

7

};

8

ListenableFuture queryFuture = Futures.transform(rowKeyFuture, queryFunction, queryExecutor);

其他更多的操作可以更加有效的支援而JDK中的Future是沒法支援的.

不同的操作可以在不同的Executors中執行,單獨的ListenableFuture 可以有多個操作等待。

當一個操作開始的時候其他的一些操作也會盡快開始執行–“fan-out”–ListenableFuture 能夠滿足這樣的場景:促發所有的回調(callbacks)。反之更簡單的工作是,同樣可以滿足“fan-in”場景,促發ListenableFuture 擷取(get)計算結果,同時其它的Futures也會盡快執行:可以參考 the implementation of Futures.allAsList 。(譯者注:fan-in和fan-out是軟體設計的一個術語,可以參考這裡:http://baike.baidu.com/view/388892.htm#1或者看這裡的解析Design Principles: Fan-In vs Fan-Out,這裡fan-out的實作就是封裝的ListenableFuture通過回調,調用其它代碼片段。fan-in的意義是可以調用其它Future)

方法 描述 參考
transform(ListenableFuture<A>, AsyncFunction<A, B>, Executor)* 傳回一個新的ListenableFuture ,該ListenableFuture 傳回的result是由傳入的AsyncFunction 參數指派到傳入的 ListenableFuture中. transform(ListenableFuture<A>, AsyncFunction<A, B>)
transform(ListenableFuture<A>, Function<A, B>, Executor) 傳回一個新的ListenableFuture ,該ListenableFuture 傳回的result是由傳入的Function 參數指派到傳入的 ListenableFuture中. transform(ListenableFuture<A>, Function<A, B>)
allAsList(Iterable<ListenableFuture<V>>) 傳回一個ListenableFuture ,該ListenableFuture 傳回的result是一個List,List中的值是每個ListenableFuture的傳回值,假如傳入的其中之一fails或者cancel,這個Future fails 或者canceled allAsList(ListenableFuture<V>...)
successfulAsList(Iterable<ListenableFuture<V>>) 傳回一個ListenableFuture ,該Future的結果包含所有成功的Future,按照原來的順序,當其中之一Failed或者cancel,則用null替代 successfulAsList(ListenableFuture<V>...)

AsyncFunction<A, B> 中提供一個方法ListenableFuture<B> apply(A input),它可以被用于異步變換值。

1

List<ListenableFuture> queries;

2

// The queries go to all different data centers, but we want to wait until they're all done or failed.

3

4

ListenableFuture<List> successfulQueries = Futures.successfulAsList(queries);

5

6

Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);

CheckedFuture

Guava也提供了 CheckedFuture<V, X extends Exception> 接口。CheckedFuture 是一個ListenableFuture ,其中包含了多個版本的get 方法,方法聲明抛出檢查異常.這樣使得建立一個在執行邏輯中可以抛出異常的Future更加容易 。将 ListenableFuture 轉換成CheckedFuture,可以使用 Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)。

原創文章,轉載請注明: 轉載自并發程式設計網 – ifeve.com本文連結位址: google Guava包的ListenableFuture解析