天天看點

JDK多線程基礎(7):有傳回的線程Callable接口以及 Future 模式

文章目錄

    • Callable 接口
      • Runnable 與 Callable
      • JDK 的 Future 實作
    • Future 模式
      • Future 接口
      • FutureTask 類
      • `ExecutorService` 執行一個 Callable任務
      • Future 使用
        • 執行一個任務
        • 執行多個任務
    • Guava 的實作
      • 使用 ListenableFuture 送出任務
      • 使用 FutureTask 送出任務
      • 送出一組任務
      • 其他功能(具體見 Guava 文章)
    • 參考

Callable 接口

Runnable 與 Callable

  1. 兩者都是自定義 Task 可以實作的接口,差別在于

    Callable

    有傳回值,且線上程執行過程中可以抛出異常
  2. 重寫 各自的方法不同:

    Runnable.run()

    方法,

    Callable.call()

    方法
public interface Runnable {
    public abstract void run();
}

public interface Callable<V> {
    V call() throws Exception; // 有傳回值,可以抛出異常
}
           

JDK 的 Future 實作

  1. Future 模式
  • 一段程式送出了一個請求,期望得到一個答複,但是如果這個請求的處理是漫長的。傳統的單線程,調用函數是同步的
  • Future 模式下,調用函數是異步的,而原先等待傳回的時間,在主調用函數中,可以處理其他事務
  1. JDK 對于Future專門做了封裝,主要是

    Callable、Future、FutureTask

JDK多線程基礎(7):有傳回的線程Callable接口以及 Future 模式

Future 模式

Future 接口

  1. boolean cancel(boolean mayInterruptIfRunning)

    :取消任務
  2. boolean isCancelled()

    :是否已經取消
  3. boolean isDone()

    :是否已經完成
  4. V get()

    :擷取傳回結果
  5. V get(long timeout, TimeUnit unit)

    :擷取傳回結果,可以設定逾時時間

FutureTask 類

  1. 同時實作

    Runnable

    Future

    接口
  2. 是以它既可以作為 Runnable 被線程執行,又可以作為 Future 得到Callable的傳回值

ExecutorService

執行一個 Callable任務

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
           

Future 使用

實作 Callable 接口的自定義任務

class InCallableTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000L);
        return "Hello SAM !! " + Thread.currentThread().getName();
    }
}

class InCallableTaskLongTime implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(10000L);
        return "Hello SAM, Long Time Task : " + Thread.currentThread().getName();
    }
}
           

執行一個任務

  1. 一般用法,送出 Callable,使用 Future 接收
void test1() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    // 送出一個 Callable
    Future<String> future = executor.submit(new InCallableTask());

    System.out.println(future.isDone());

    // 會一直阻塞, 直到任務完成,結果傳回
    System.out.println(future.get());
}
           
  1. 使用

    FutureTask

    實作
void test2() throws ExecutionException, InterruptedException {
    // Callable 任務使用 FutureTask 封裝
    FutureTask<String> future = new FutureTask<>(new InCallableTask());
    ExecutorService executor = Executors.newFixedThreadPool(1);
    // 送出 FutureTask , 因為FutureTask 實作了 Runnable, Future 接口
    executor.submit(future);

    // 會一直阻塞, 直到任務完成,結果傳回
    System.out.println("傳回的資料: " + future.get());

    executor.shutdown();
}
           

執行多個任務

  1. 一般用法:多次送出,任務誰先完成就先傳回
void test3() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(1);

    // 一般使用需要建構多個 FutureTask 對象
    FutureTask<String> future = new FutureTask<>(new InCallableTask());
    FutureTask<String> futureLongTime = new FutureTask<>(new InCallableTaskLongTime());

    // 送出兩個任務
    executor.submit(future);
    executor.submit(futureLongTime);

    System.out.println("幹其他事情去吧..........");

    System.out.println(future.get());
    System.out.println(futureLongTime.get());

    executor.shutdown();
}
           
  1. invokeAll

    invokeAny

    方法實作
  • invokeAll

    :任務都完成後一起傳回,會等待最慢的那個任務完成後一起傳回
  • invokeAny

    :傳回最先完成的那個任務,未執行的任務都會被取消
void test4() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(1);

    List<Callable<String>> list = new ArrayList<>(2);
    list.add(new InCallableTask());
    list.add(new InCallableTaskLongTime());

    // 需要所有的任務完成後才傳回
    // 兩個任務都完成後一起傳回,會等待最慢的那個任務完成後一起傳回
    List<Future<String>> futures = executor.invokeAll(list);
    for (Future<String> future : futures) {
        System.out.println(future.get());
    }

    // 傳回最先完成的那個任務,未執行的任務都會被取消
    String result = executor.invokeAny(list);
    System.out.println(result);
}
           
  1. 使用

    ExecutorCompletionService

    送出一組任務,先完成的先傳回
void test5() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    // 建立 ExecutorCompletionService 類,内部使用 LinkedBlockingQueue 存放 Future,可以手動指定任務個數
    ExecutorCompletionService<String> ecs = new ExecutorCompletionService<>(executor);

    // 任務清單
    ArrayList<Callable<String>> tasks = Lists.newArrayList(new InCallableTask(), new InCallableTaskLongTime());

    List<Future<String>> futures = new ArrayList<>();

    // 送出一組任務
    for (Callable<String> callable : tasks) {
        Future<String> future = ecs.submit(callable);
        // 這邊可以
        futures.add(future);
    }

    // 擷取任務結果: 内部使用 BlockingQueue 存放 Future, 先完成的先傳回
    for (int i = 0; i < tasks.size(); i++) {
        System.out.println(ecs.take().get());
    }

    // 這邊應該在 finally 中處理
    for (Future<String> future : futures) {
        future.cancel(true);
    }

    executor.shutdown();
}
           

Guava 的實作

使用 ListenableFuture 送出任務

  1. ListenableFuture

    方法可以添加回調、監控、異常捕獲等
  2. MoreExecutors

    新增了很多有用的方法,如 關閉線程池
  3. 回調函數

    addCallback

    不會阻塞
public void test1() throws ExecutionException, InterruptedException {

    // 自定義線程池名稱,使用 ThreadFactory 的構造方法
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFuture-%s").build();
    ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

    // 将 ExecutorService 轉為 ListeningExecutorService
    ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
    // 送出任務
    ListenableFuture<String> listenableFuture = service.submit(new InCallableTask());

    // 可以注冊一個回調, 在回調函數中對異步處理的結果進行處理
    // 這個方法與 JDK 中的 future.get() 不同,不會 callback() 函數不阻塞目前線程
    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
        // we want this handler to run immediately after we push the big red button!
        @Override
        public void onSuccess(String result) {
            System.out.println("擷取的結果 " + result);
        }

        @Override
        public void onFailure(Throwable thrown) {
            thrown.printStackTrace();
        }
    });

    // 上面任務的異常,不會影響主線程後續的執行
    System.out.println("這邊會先執行? main task done.....");

    MoreExecutors.shutdownAndAwaitTermination(service, 60, TimeUnit.SECONDS);
}
           

使用 FutureTask 送出任務

針對JDK 的 FutureTask 轉換而來的, Guava 提供

ListenableFutureTask.create(Callable<V>)

ListenableFutureTask.create(Runnable, V)

public void test2() {
    // 自定義線程池名稱,使用 ThreadFactory 的構造方法
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFutureTask-%s").build();
    ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);

    // 将 ThreadPoolExecutor 轉為 ListeningExecutorService
    ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);

    // 建立 ListenableFutureTask
    ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.create(new InCallableTask());
    // 送出任務
    service.submit(listenableFutureTask);

    Futures.addCallback(listenableFutureTask, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            System.out.println("擷取的結果: " + result);
        }

        @Override
        public void onFailure(Throwable thrown) {
            thrown.printStackTrace();
        }
    });

    System.out.println("這邊會先執行? main task done.....");

    MoreExecutors.shutdownAndAwaitTermination(service, 60, TimeUnit.SECONDS);
}
           

送出一組任務

  1. Futures.allAsList(futures)

    :如果一個任務發生異常, 這邊會抛出異常
  2. Futures.successfulAsList

    :傳回一個ListenableFuture ,該Future的結果包含所有成功的Future,按照原來的順序,當其中之一Failed 或者 cancel,則用null替代
  3. 以上兩個方法都是阻塞的
public void test3() throws ExecutionException, InterruptedException {
    // 自定義線程池名稱,使用 ThreadFactory 的構造方法
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFutureTask-%s").build();
    ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);

    // 将 ThreadPoolExecutor 轉為 ListeningExecutorService
    ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);


    // 任務清單
    ArrayList<Callable<String>> tasks = Lists.newArrayList(new InCallableTask(), new InCallableTaskLongTime());

    List<ListenableFuture<String>> futures = Lists.newArrayList();

    // 送出任務
    for (Callable<String> task : tasks) {
        futures.add(service.submit(task));
    }
    // 傳回一個ListenableFuture ,該Future的結果包含所有成功的Future,按照原來的順序,當其中之一Failed或者cancel,則用null替代
    ListenableFuture<List<String>> listListenableFuture = Futures.successfulAsList(futures);

    // 堵塞,直到所有任務完成,然後一起傳回
    // [Hello SAM!!  guava-ListenableFutureTask-0, Hello SAM, long time task guava-ListenableFutureTask-0]
    // 如果一個任務發生異常: [null, Hello SAM, long time task guava-ListenableFutureTask-0]
    System.out.println(listListenableFuture.get());

    // allAsList與 successfulAsList的差別
    // 如果一個任務發生異常, 這邊會抛出異常,
    ListenableFuture<List<String>> listListenableFuture1 = Futures.allAsList(futures);
    System.out.println(listListenableFuture1.get());

    System.out.println("這邊不會先執行 main task done.....");

    // 一定要記得關閉
    MoreExecutors.shutdownAndAwaitTermination(service, 60, TimeUnit.SECONDS);
}
           

其他功能(具體見 Guava 文章)

  1. 增加異常捕獲等
public void test4() throws ExecutionException, InterruptedException {
    // 自定義線程池名稱,使用 ThreadFactory 的構造方法
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("guava-ListenableFutureTask-%s").build();
    ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);

    // 将 ThreadPoolExecutor 轉為 ListeningExecutorService
    ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);

    final InCallableTask inCallableTask = new InCallableTask();
    ListenableFuture<String> listenableFuture = service.submit(inCallableTask);

    Futures.catching(listenableFuture, RuntimeException.class, new Function<RuntimeException, String>() {
        @Override
        public String apply(RuntimeException input) {
            // 這邊可以做一些其他事情,,如重新調用,各種異常處理
            return "你好,異常被我吃了";
        }
    });

    Futures.catchingAsync(listenableFuture, RuntimeException.class, new AsyncFunction<RuntimeException, String>() {
        @Override
        public ListenableFuture<String> apply(RuntimeException input) throws Exception {
            return listenableFuture;
        }
    });

    System.out.println(listenableFuture.get());
}
           

參考

  1. 源碼位址
JDK多線程基礎(7):有傳回的線程Callable接口以及 Future 模式

繼續閱讀