天天看點

Guava學習之ListenableFuture接口添加回調函數建立應用避免嵌套 FutureCheckedFuture

本文是對 Guava 中 ListenableFuture 的學習介紹。歡迎加入學習項目: LearningGuava。

以下參考: 官方文檔

處理并發是一個很困難的問題,但是我們可以通過使用功能強大的抽象來簡化這個工作。為了簡化這個問題,Guava 提供了 ListenableFuture,它繼承了 JDK 中的

Future

接口。

我們強烈建議:在你的代碼中,使用

ListenableFuture

來替代

Future

,因為

* 很多

Future

相關的方法需要它。

* 一開始就使用

ListenableFuture

會省事很多。

* 這樣工具方法提供者就不需要針對

Future

ListenableFuture

都提供方法。

接口

Future

代表了異步執行的結果:一個可能還沒有産生結果的執行過程。

Future

可以正在被執行,但是會保證傳回一個結果。

ListenableFuture

可以使你注冊回調函數,使得在結果計算完成的時候可以回調你的函數。如果結果已經算好,那麼将會立即回調。這個簡單的功能使得可以完成很多

Future

支援不了的操作。

ListenableFuture

添加的基本函數是

addListener(Runnable, Executor)

。通過這個函數,當

Future

中的結果執行完成時,傳入的

Runnable

會在傳入的

Executor

中執行。

添加回調函數

使用者偏向于使用

Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)

, 或者當需要注冊輕量級的回調的時候,可以使用預設為

MoreExecutors.directExecutor()

的版本。

FutureCallback<V>

實作了兩個方法:

*

onSuccess(V)

:當 future 執行成功時候的反應。

*

onFailure(Throwable)

:當 future 執行失敗時候的反應。

建立

與 JDK 中 通過

ExecutorService.submit(Callable)

來初始化一個異步的任務相似,Guava 提供了一個

ListeningExecutorService

接口,這個接口可以傳回一個

ListenableFuture

ExecutorService

隻是傳回一個普通的

Future

)。如果需要将一個

ExecutorService

轉換為

ListeningExecutorService

,可以使用

MoreExecutors.listeningDecorator(ExecutorService)

。一個使用示例如下:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool());
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
           

如果你想從一個基于

FutureTask

的 API 轉換過來,Guava 提供了

ListenableFutureTask.create(Callable<V>)

ListenableFutureTask.create(Runnable, V)

。和 JDK 不一樣,

ListenableFutureTask

并不意味着可以直接擴充。

如果你更喜歡可以設定 future 值的抽象,而不是實作一個方法來計算結果,那麼可以考慮直接擴充

AbstractFuture<V>

或者

SettableFuture

如果你一定要将一個基于

Future

的 API 轉換為基于

ListenableFuture

的話,你不得不采用寫死的方式

JdkFutureAdapters.listenInPoolThread(Future)

來實作從

Future

ListenableFuture

的轉換。是以,盡可能地使用

ListenableFuture

應用

使用

ListenableFuture

一個最重要的原因就是:可以基于他實作負責的異步執行鍊。如下所示:

ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
  new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture<QueryResult> apply(RowKey rowKey) {
      return dataService.read(rowKey);
    }
  };
ListenableFuture<QueryResult> queryFuture =
    Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
           

很多不能被

Future

支援的方法可以通過

ListenableFuture

被高效地支援。不同的操作可能被不同的執行器執行,而且一個

ListenableFuture

可以有多個響應操作。

ListenableFuture

有多個後續操作的時候,這樣的操作稱為:“扇出”。當它依賴多個輸入 future 同時完成時,稱作“扇入”。可以參考

Futures.allAsList

的實作。

方法 描述 參考

transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor)

傳回新的

ListenableFuture

,它是給定

AsyncFunction

結合的結果

transformAsync(ListenableFuture<A>, AsyncFunction<A, B>)

transform(ListenableFuture<A>, Function<A, B>, Executor)

傳回新的

ListenableFuture

,它是給定

Function

結合的結果

transform(ListenableFuture<A>, Function<A, B>)

allAsList(Iterable<ListenableFuture<V>>)

傳回一個

ListenableFuture

,它的值是一個輸入 futures 的值的按序清單,任何一個 future 的失敗都會導緻最後結果的失敗

allAsList(ListenableFuture<V>...)

successfulAsList(Iterable<ListenableFuture<V>>)

傳回一個

ListenableFuture

,它的值是一個輸入 futures 的成功執行值的按序清單,對于取消或者失敗的任務,對應的值是

null

successfulAsList(ListenableFuture<V>...)

AsyncFunction<A, B>

提供了一個方法:

ListenableFuture<B> apply(A input)

。可以被用來異步轉換一個值。

List<ListenableFuture<QueryResult>> queries;
// The queries go to all different data centers, but we want to wait until they're all done or failed.

ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);

Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);
           

避免嵌套 Future

在使用通用接口傳回

Future

的代碼中,很有可能會嵌套

Future

。例如:

executorService.submit(new Callable<ListenableFuture<Foo>() {
  @Override
  public ListenableFuture<Foo> call() {
    return otherExecutorService.submit(otherCallable);
  }
});
           

上述代碼将會傳回:

ListenableFuture<ListenableFuture<Foo>>

。這樣的代碼是不正确的,因為外層 future 的取消操作不能傳遞到内層的 future。此外,一個常犯的錯誤是:使用

get()

或者 listener 來檢測其它 future 的失敗。為了避免這樣的情況,Guava 所有處理 future 的方法(以及一些來自 JDK 的代碼)具有安全解決嵌套的版本。

CheckedFuture

Guava 也提供

CheckedFuture<V, X extends Exception>

接口。

CheckedFuture

是這樣的一個

ListenableFuture

:具有多個可以抛出受保護異常的

get

方法。這使得建立一個執行邏輯可能抛出異常的 future 變得容易。使用

Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)

可以将

ListenableFuture

轉換為

CheckedFuture