天天看點

Guava并發(2)——ListenableFuture\FutureCallback\SettableFuture\Futures

ListenableFuture類

  • jdk5之後有了Future這種異步執行的結構
ExecutorService executor = Executors.newCachedThreadPool();
   Future<Integer> future = executor.submit(new Callable<Integer>(){
                                public Integer call() throws Exception{
                                   return service.getCount();
} });
//Retrieve the value of computation
Integer count = future.get();
           
  • ListenableFuture對Future進行了擴充,允許注冊一個回調函數,task執行完後自動調用。
  • 擷取ListableFuture對象。

正如我們擷取Future對象要通過ExecutorService.submit(Callable)來擷取一樣,我們可以這樣建立ListenableFuture對象:

?

executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS));

//包裝Executors建立的線程池

ListenableFuture<String> listenableFuture = executorService.submit(

new

Callable<String>()...);

//擷取ListableFuture對象

listenableFuture.addListener(

new

Runnable() {

@Override

public

void

run() {

methodToRunOnFutureTaskCompletion();

}

}, executorService);

//注冊回調函數

FutureCallback類

  • FutureCallback定義了onSuccess和onFailure方法,onSuccess方法會接收一個Future對象,這樣我們就可以擷取Future的結果。
  • 首先需要一個FutureCallback實作類。

?

public

class

FutureCallbackImpl

implements

FutureCallback<String> {

private

StringBuilder builder =

new

StringBuilder();

@Override

public

void

onSuccess(String result) {

builder.append(result).append(

" successfully"

);

}

@Override

public

void

onFailure(Throwable t) {

builder.append(t.toString());

}

public

String getCallbackResult() {

return

builder.toString();

}

}

使用執行個體: ?

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListenableFuture<String> futureTask = executorService.submit(

new

Callable<String>() {

//建立ListenaleFuture對象

@Override

public

String call()

throws

Exception {

return

"Task completed"

;

}

});

FutureCallbackImpl callback =

new

FutureCallbackImpl();

Futures.addCallback(futureTask, callback);

//添加回調

callback.getCallbackResult();

//擷取結果

如果CallBack是一個耗時操作,你應該選擇另一個注冊CallBack: ?

Futures.addCallback(futureTask,callback,executorService);

//提供另一個線程池來執行性回調

SettableFuture類:

SettableFuture可以用來設定要傳回得值:

?

SettableFuture<String> sf = SettableFuture.create();

//Set a value to return

sf.set(

"Success"

);

//Or set a failure Exception

sf.setException(someException);

AsyncFunction:

  • 該接口與函數式程式設計密切相關, 類似Function, 但apply方法會轉換成一個ListenableFuture封裝的範型對象。

?

public

class

AsyncFuntionSample

implements

AsyncFunction<Long, String> {

private

ConcurrentMap<Long, String> map = Maps.newConcurrentMap();

private

ListeningExecutorService listeningExecutorService;

@Override

public

ListenableFuture<String> apply(

final

Long input)

throws

Exception {

if

(map.containsKey(input)) {

SettableFuture<String> listenableFuture = SettableFuture.create();

//建構一個SettableFuture

listenableFuture.set(map.get(input));

return

listenableFuture;

}

else

{

return

listeningExecutorService.submit(

new

Callable<String>() {

@Override

public

String call()

throws

Exception {

String retrieved =

//compute to get the data;

map.putIfAbsent(input, retrieved);

return

retrieved;

}

});

}

}

}

FutureFallback類:

  • FutureFallback用于異常恢複的備份。

?

public

class

FutureFallbackImpl

implements

FutureFallback<String> {

@Override

public

ListenableFuture<String> create(Throwable t)

throws

Exception {

if

(t

instanceof

FileNotFoundException) {

SettableFuture<String> settableFuture = SettableFuture.create();

settableFuture.set(

"Not Found"

);

return

settableFuture;

}

throw

new

Exception(t);

}

}

Futures類:

  • Futures類是有關Future執行個體的一個工具類。

異步轉換:

?

ListenableFuture<Person> lf = Futures.transform(ListenableFuture<String> f,AsyncFunction<String,Person> af);

使用FutureFallbacks:

?

1

ListenableFuture<String> lf = Futures.withFallback(ListenableFuture<String> f,FutureFallback<String> fb);

RateLimiter:

  • RateLimiter限制通路每秒通路資源的線程數。有點類似信号量Semaphore。

?

RateLimiter limiter = RateLimiter.create(

4.0

);

//每秒不超過4個任務被送出

?

limiter.acquire(); 

//請求RateLimiter, 超過permits會被阻塞

executor.submit(runnable);

//送出任務

也有非阻塞式地嘗試: ?

If(limiter.tryAcquire()){

//未請求到limiter則立即傳回false

doSomething();

}

else

{

doSomethingElse();

}

不吝指正。