文章目錄
-
- Callable 接口
-
- Runnable 與 Callable
- JDK 的 Future 實作
- Future 模式
-
- Future 接口
- FutureTask 類
- `ExecutorService` 執行一個 Callable任務
- Future 使用
-
- 執行一個任務
- 執行多個任務
- Guava 的實作
-
- 使用 ListenableFuture 送出任務
- 使用 FutureTask 送出任務
- 送出一組任務
- 其他功能(具體見 Guava 文章)
- 參考
Callable 接口
Runnable 與 Callable
- 兩者都是自定義 Task 可以實作的接口,差別在于
有傳回值,且線上程執行過程中可以抛出異常Callable
- 重寫 各自的方法不同:
方法,Runnable.run()
方法Callable.call()
public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception; // 有傳回值,可以抛出異常
}
JDK 的 Future 實作
- Future 模式
- 一段程式送出了一個請求,期望得到一個答複,但是如果這個請求的處理是漫長的。傳統的單線程,調用函數是同步的
- Future 模式下,調用函數是異步的,而原先等待傳回的時間,在主調用函數中,可以處理其他事務
- JDK 對于Future專門做了封裝,主要是
Callable、Future、FutureTask
Future 模式
Future 接口
-
:取消任務boolean cancel(boolean mayInterruptIfRunning)
-
:是否已經取消boolean isCancelled()
-
:是否已經完成boolean isDone()
-
:擷取傳回結果V get()
-
:擷取傳回結果,可以設定逾時時間V get(long timeout, TimeUnit unit)
FutureTask 類
- 同時實作
、Runnable
接口Future
- 是以它既可以作為 Runnable 被線程執行,又可以作為 Future 得到Callable的傳回值
ExecutorService
執行一個 Callable任務
ExecutorService
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
Future 使用
實作 Callable 接口的自定義任務
class InCallableTask implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000L);
return "Hello SAM !! " + Thread.currentThread().getName();
}
}
class InCallableTaskLongTime implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(10000L);
return "Hello SAM, Long Time Task : " + Thread.currentThread().getName();
}
}
執行一個任務
- 一般用法,送出 Callable,使用 Future 接收
void test1() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
// 送出一個 Callable
Future<String> future = executor.submit(new InCallableTask());
System.out.println(future.isDone());
// 會一直阻塞, 直到任務完成,結果傳回
System.out.println(future.get());
}
- 使用
實作FutureTask
void test2() throws ExecutionException, InterruptedException {
// Callable 任務使用 FutureTask 封裝
FutureTask<String> future = new FutureTask<>(new InCallableTask());
ExecutorService executor = Executors.newFixedThreadPool(1);
// 送出 FutureTask , 因為FutureTask 實作了 Runnable, Future 接口
executor.submit(future);
// 會一直阻塞, 直到任務完成,結果傳回
System.out.println("傳回的資料: " + future.get());
executor.shutdown();
}
執行多個任務
- 一般用法:多次送出,任務誰先完成就先傳回
void test3() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
// 一般使用需要建構多個 FutureTask 對象
FutureTask<String> future = new FutureTask<>(new InCallableTask());
FutureTask<String> futureLongTime = new FutureTask<>(new InCallableTaskLongTime());
// 送出兩個任務
executor.submit(future);
executor.submit(futureLongTime);
System.out.println("幹其他事情去吧..........");
System.out.println(future.get());
System.out.println(futureLongTime.get());
executor.shutdown();
}
-
、invokeAll
方法實作invokeAny
-
:任務都完成後一起傳回,會等待最慢的那個任務完成後一起傳回invokeAll
-
:傳回最先完成的那個任務,未執行的任務都會被取消invokeAny
void test4() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
List<Callable<String>> list = new ArrayList<>(2);
list.add(new InCallableTask());
list.add(new InCallableTaskLongTime());
// 需要所有的任務完成後才傳回
// 兩個任務都完成後一起傳回,會等待最慢的那個任務完成後一起傳回
List<Future<String>> futures = executor.invokeAll(list);
for (Future<String> future : futures) {
System.out.println(future.get());
}
// 傳回最先完成的那個任務,未執行的任務都會被取消
String result = executor.invokeAny(list);
System.out.println(result);
}
- 使用
送出一組任務,先完成的先傳回ExecutorCompletionService
void test5() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
// 建立 ExecutorCompletionService 類,内部使用 LinkedBlockingQueue 存放 Future,可以手動指定任務個數
ExecutorCompletionService<String> ecs = new ExecutorCompletionService<>(executor);
// 任務清單
ArrayList<Callable<String>> tasks = Lists.newArrayList(new InCallableTask(), new InCallableTaskLongTime());
List<Future<String>> futures = new ArrayList<>();
// 送出一組任務
for (Callable<String> callable : tasks) {
Future<String> future = ecs.submit(callable);
// 這邊可以
futures.add(future);
}
// 擷取任務結果: 内部使用 BlockingQueue 存放 Future, 先完成的先傳回
for (int i = 0; i < tasks.size(); i++) {
System.out.println(ecs.take().get());
}
// 這邊應該在 finally 中處理
for (Future<String> future : futures) {
future.cancel(true);
}
executor.shutdown();
}
Guava 的實作
使用 ListenableFuture 送出任務
-
方法可以添加回調、監控、異常捕獲等ListenableFuture
-
新增了很多有用的方法,如 關閉線程池MoreExecutors
- 回調函數
不會阻塞addCallback
public void test1() throws ExecutionException, InterruptedException {
// 自定義線程池名稱,使用 ThreadFactory 的構造方法
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFuture-%s").build();
ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
// 将 ExecutorService 轉為 ListeningExecutorService
ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
// 送出任務
ListenableFuture<String> listenableFuture = service.submit(new InCallableTask());
// 可以注冊一個回調, 在回調函數中對異步處理的結果進行處理
// 這個方法與 JDK 中的 future.get() 不同,不會 callback() 函數不阻塞目前線程
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
// we want this handler to run immediately after we push the big red button!
@Override
public void onSuccess(String result) {
System.out.println("擷取的結果 " + result);
}
@Override
public void onFailure(Throwable thrown) {
thrown.printStackTrace();
}
});
// 上面任務的異常,不會影響主線程後續的執行
System.out.println("這邊會先執行? main task done.....");
MoreExecutors.shutdownAndAwaitTermination(service, 60, TimeUnit.SECONDS);
}
使用 FutureTask 送出任務
針對JDK 的 FutureTask 轉換而來的, Guava 提供
ListenableFutureTask.create(Callable<V>)
和
ListenableFutureTask.create(Runnable, V)
public void test2() {
// 自定義線程池名稱,使用 ThreadFactory 的構造方法
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFutureTask-%s").build();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
// 将 ThreadPoolExecutor 轉為 ListeningExecutorService
ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
// 建立 ListenableFutureTask
ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.create(new InCallableTask());
// 送出任務
service.submit(listenableFutureTask);
Futures.addCallback(listenableFutureTask, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("擷取的結果: " + result);
}
@Override
public void onFailure(Throwable thrown) {
thrown.printStackTrace();
}
});
System.out.println("這邊會先執行? main task done.....");
MoreExecutors.shutdownAndAwaitTermination(service, 60, TimeUnit.SECONDS);
}
送出一組任務
-
:如果一個任務發生異常, 這邊會抛出異常Futures.allAsList(futures)
-
:傳回一個ListenableFuture ,該Future的結果包含所有成功的Future,按照原來的順序,當其中之一Failed 或者 cancel,則用null替代Futures.successfulAsList
- 以上兩個方法都是阻塞的
public void test3() throws ExecutionException, InterruptedException {
// 自定義線程池名稱,使用 ThreadFactory 的構造方法
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFutureTask-%s").build();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
// 将 ThreadPoolExecutor 轉為 ListeningExecutorService
ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
// 任務清單
ArrayList<Callable<String>> tasks = Lists.newArrayList(new InCallableTask(), new InCallableTaskLongTime());
List<ListenableFuture<String>> futures = Lists.newArrayList();
// 送出任務
for (Callable<String> task : tasks) {
futures.add(service.submit(task));
}
// 傳回一個ListenableFuture ,該Future的結果包含所有成功的Future,按照原來的順序,當其中之一Failed或者cancel,則用null替代
ListenableFuture<List<String>> listListenableFuture = Futures.successfulAsList(futures);
// 堵塞,直到所有任務完成,然後一起傳回
// [Hello SAM!! guava-ListenableFutureTask-0, Hello SAM, long time task guava-ListenableFutureTask-0]
// 如果一個任務發生異常: [null, Hello SAM, long time task guava-ListenableFutureTask-0]
System.out.println(listListenableFuture.get());
// allAsList與 successfulAsList的差別
// 如果一個任務發生異常, 這邊會抛出異常,
ListenableFuture<List<String>> listListenableFuture1 = Futures.allAsList(futures);
System.out.println(listListenableFuture1.get());
System.out.println("這邊不會先執行 main task done.....");
// 一定要記得關閉
MoreExecutors.shutdownAndAwaitTermination(service, 60, TimeUnit.SECONDS);
}
其他功能(具體見 Guava 文章)
- 增加異常捕獲等
public void test4() throws ExecutionException, InterruptedException {
// 自定義線程池名稱,使用 ThreadFactory 的構造方法
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFutureTask-%s").build();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
// 将 ThreadPoolExecutor 轉為 ListeningExecutorService
ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
final InCallableTask inCallableTask = new InCallableTask();
ListenableFuture<String> listenableFuture = service.submit(inCallableTask);
Futures.catching(listenableFuture, RuntimeException.class, new Function<RuntimeException, String>() {
@Override
public String apply(RuntimeException input) {
// 這邊可以做一些其他事情,,如重新調用,各種異常處理
return "你好,異常被我吃了";
}
});
Futures.catchingAsync(listenableFuture, RuntimeException.class, new AsyncFunction<RuntimeException, String>() {
@Override
public ListenableFuture<String> apply(RuntimeException input) throws Exception {
return listenableFuture;
}
});
System.out.println(listenableFuture.get());
}
參考
- 源碼位址