天天看點

Future 和 Callable 組合Future 和 Callable 組合

Future 和 Callable 組合

概述

  • 适用場景:有一個很耗時的傳回值需要計算,并且這個傳回值不是立刻需要的話,那麼就可以使用這個組合( Callable 和 Future ),用另一個線程去計算傳回值,而目前線程在使用這個傳回值之前可以做其它的操作,等到需要這個傳回值時,再通過Future得到該傳回值;
  • Future 類基于多線程的Future模式設計
  • future.get()方法阻塞等待線程傳回結果
  • future.cancel(true)方法實作取消任務
  • future.wait(3)如果線程耗時超過3秒則抛出異常,線程中斷運作
  • 使用CompletionService來維護處理線程不的傳回結果時,主線程總是能夠拿到最先完成的任務的傳回值,而不管它們加入線程池的順序

示例

  • Future 和 Callable ,借用 ExecutorService 執行 Callable ,使用 Future 接收傳回值
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
// Future 和 Callable 的組合
public class FutureAndCallableTest {

    public static void main(String[] args) {

        /**
         * ExecutorService 繼承 Executor ,
         * 目的是為管理Thread對象,進而簡化并發程式設計,
         * Executor     無需顯示的去管理線程的生命周期
         */
        ExecutorService service = Executors.newSingleThreadExecutor();

        /**
         * Future 與 Callable 組合的使用方式
         */
        Future<Integer> future = service.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                Random random = new Random();
                return random.nextInt();
            }
        });

        try {
            Integer integer = future.get();
            Thread.sleep();
            System.out.println(integer);
            service.shutdown(); // 關閉線程池

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}
           
  • Future 和 Callable 組合,将 Callable 作為 Future 的構造函數的參數,通過 Thread 的構造方法組合 Future 的對象,執行 Thread
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;


public class FutureTaskAndCallableTest {

    public static void main(String[] args) {

        /**
         * Callable 用于産生結果,Future 用于擷取結果
         * Future   可以拿到異步執行任務的傳回值
         * 
         * Callable 有傳回值,可以抛出傳回值異常
         * Runnable 無傳回值,不可抛出傳回值異常
         */
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Random random = new Random();
                int nextInt = random.nextInt();
                return nextInt;
            }
        };

        /**
         * FutureTask   實作  Runnable 
         * 是以可以作為   Thread 構造方法的參數
         * FutureTask實作了兩個接口,Runnable和Future,
         * 是以它既可以作為Runnable被線程執行,
         * 又可以作為Future得到Callable的傳回值
         */
        FutureTask<Integer> future = new FutureTask<Integer>(callable);
        /**
         * 使用場景:
         * 有一個很耗時的傳回值需要計算,并且這個傳回值不是立刻需要的話,那麼就可以使用這個組合( Callable 和 Future ),
         * 用另一個線程去計算傳回值,而目前線程在使用這個傳回值之前可以做其它的操作,
         * 等到需要這個傳回值時,再通過Future得到該傳回值
         */

        /**
         * Future get() 運作時未執行完Callable 中的計算,是否會阻塞等待計算結果?
         * 會阻塞,可設定阻塞時間,抛出   TimeoutException    
         */

        try {
//          for(int i=0 ; i< 5 ; i++){
                new Thread(future).start();
                Thread.sleep(); // 暫停一段時間
                Integer integer = future.get();
                System.out.println(integer);
//          }
            // 多次調用,但Callable 中實際上隻執行了一次,除第一次外,每次都是擷取第一次執行的結果
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        /**
         * Future 設計模式
         */
    }
}
           
  • CompletionService使用
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionServiceAndCallableTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService cs = Executors.newSingleThreadExecutor();

        CompletionService<Integer> service = new ExecutorCompletionService<>(cs);

        for(int i =  ; i <  ; i++){
            final int result = i ;
            service.submit(new Callable<Integer>() {

                @Override
                public Integer call() throws Exception {
                    return result;
                }
            });
        }

        for(int i =  ; i <  ; i++){
            Integer integer = service.take().get();
            System.out.println(integer);
        }

        cs.shutdown();
    }

}
           

參考資料

  • Java線程(七):Callable和Future
  • Java多線程詳解之Callable和Future阻塞線程同步傳回結果
  • The j.u.c Synchronizer Framework翻譯
  • 由FutureTask的get方法靠什麼機制來阻塞引發的思考
  • Callable、FutureTask中阻塞逾時傳回的坑點
  • 利用Future異步擷取多線程的傳回結果

繼續閱讀