天天看點

Java中的異步執行Future小結

最近碰到的異步處理的操作比較多,異步就是不等想要的結果傳回執行接下來的其他操作,等異步結果傳回後直接調用已經注冊好的處理方法完成後續操作。異步的思想是非常棒的,相比輪詢的方式而言,異步的實作方式無疑是高效并且優雅的。本文介紹了包括Future,AIO和有點類似于單機版的Map-Reduce的fork/join架構。

Guava ListenableFuture

使用JDK提供的線程池ExcuteService的execute(Runable runable)方法來執行不需要傳回結果的線程任務,而使用submit(Callable callable)方法需要線程任務傳回T類型的執行結果,方法傳回Future對象,使用Future的get方法可以擷取執行結果,而在執行get方法線上程傳回結果之前是阻塞的,jdk這對于想要異步的處理結果沒有提供相應的接口,guava的ListenableFuture接口就是為實作異步的擷取Future中的結果而出現的。

顧名思義,ListenableFuture是可監聽的Future,可以在結果傳回的時候以方法回調的方式實作異步的後續操作。那麼如何擷取ListenableFuture呢,方法有兩種:

1. 将jdk提供的Futhure轉換成ListenableFuture

2. 将ExcutorService線程池轉換成ListeningExcutorService,繼而擷取ListenableFuture

第一種方式我們使用如下擴充卡獲得Future對應的ListenableFuture

ListenableFuture<String> listenableFuture=JdkFutureAdapters.listenInPoolThread(future);
           

第二種方式我們使用一個線程池的修飾類獲得

ListeningExecutorService listeningThreadPool=MoreExecutors.listeningDecorator(threadPool);
ListenableFuture<String> listenableFuture=listeningThreadPool.submit(new Callable<String>() {....}
           

在擷取了ListenableFuture之後,我們同樣有兩種方式異步擷取線程執行結果

1. 添加FutureCallback執行回調方法

2. 為ListenableFuture添加監聽線程

第一種方法使用Futures的addCallback方法實作

Futures.addCallback(listenableFuture,new FutureCallback<String>(){

            @Override
            public void onSuccess(String result) {
                System.out.println(result);
            }

            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        });
           

第二種方法不太推薦,使用listenableFuture.addListener的方法實作。

源碼解讀

我們看到guava的實作方式非常優雅,那麼它是怎麼實作這種異步回調的呢,以JdkFutureAdapters.listenInPoolThread為例,其實他是傳回了一個ListenableFutureAdapter的内部類,它實作了ListenableFuture并且繼承了ForwardingFuture類,然後調用callback的時候會建立一個Runable線程任務,其主要邏輯是使用Future的get方法阻塞擷取執行結果,在結果完成的時候回到callback類的onSuccess方法,如果出現異常則調用onFailure方法。

整個方法的設計使用了擴充卡模式,ListenableFuture是最終使用者需要的接口,ListenableFutureAdapter是擴充卡,ForwordingFuture實作了Future接口,是被适配者。

關于FutureTask

FutureTask是Future的一個實作類,它同時實作了Runable接口,直接使用線程池運作FutureTask,并且擷取阻塞擷取結果,其在異步方面并沒有做出改變。

本節代碼

所有測試代碼擷取可以點選這裡

參考文獻:guava并發,深入學習 FutureTask

Java AIO中的Future和CompleteHandler

Socket通信是Java網絡通信的一種方式,在基礎的阻塞BIO後出現了NIO,NIO采用了多路複用思想,使得一個線程可以監聽多個socket檔案描述符,使得在在一次輪訓的過程中可以檢視多個阻塞IO的狀态,相比BIO每次隻能監聽一個IO狀态,如果這個IO長期處于阻塞狀态,那麼其他IO操作如果準備好也無法執行。

BIO的另一個問題是在于對于每個準備好的IO操作必須配置設定一個線程進行實際的阻塞IO操作,這使得系統的線程數和已準備好的IO操作成線性關系。NIO的優勢在于它可以對多個已準備好的IO阻塞操作做打包操作,做線程數的縮減。

NIO對資料的操作是面向緩沖區的,而BIO是面向資料流的,NIO中面向緩沖區的IO操作是講所有的資料一次讀入到緩沖區内,進而做操作,這相比BIO的面向資料流的IO每次隻能讀入一個或多個位元組的方式,這種方式更加快捷。

NIO本質其實還是采用輪詢的方式去擷取已準備好的IO操作,實際的讀寫IO操作仍然是阻塞的,Java AIO是對NIO的又一次改進,其真正實作了異步的IO操作,包括擷取準備好的操作,讀寫操作都是異步的,其執行都會直接傳回一個Future對象,我們可以使用其get方法阻塞接下來的執行。

Java AIO主要用到的類有:

1. AsynchronousSocketChannel

2. AsynchronousServerSocketChannel

AsynchronousSocketChannel的connet方法為其準備io操作,write和read方法為其實際的IO操作,同樣的AsynchronousServerSocketChannel的accept方法為其準備io操作,讀寫操作是和AsynchronousSocketChannel一樣的。

以上方法都是異步的,也就是說方法會立即傳回,擷取異步執行結果的方法有兩種:

1. 擷取Future結果,使用Future的get方法擷取執行結果

2. 使用CompletionHandler的回調方法在結果傳回的時候擷取結果

以AsynchronousServerSocketChannel為例,其connet方法有兩個重載方式:

1. accept()無參方式傳回一個Future,可以使用get方法擷取連接配接的client。

2. accept(A attach,CompletionHandler

public class AioEchoServer {
    private int PORT = ;
    private String IP = "127.0.0.1";
    private AsynchronousServerSocketChannel server;

    public AioEchoServer() {
        try {
            server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP, PORT));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public void startServer() {
        //attachment參數可以被CompletionHandler接收

        server.accept("attachment", new CompletionHandler<AsynchronousSocketChannel, String>() {
            @Override
            public void completed(AsynchronousSocketChannel client, String attachment) {
                //擷取了對應的用戶端socket
                try {
                    System.out.println("attachment: " + attachment);
                    ByteBuffer byteBuffer = ByteBuffer.allocate();
                    //這裡是非阻塞讀取,需要使用get等待用戶端傳回結果
                    int readResult = client.read(byteBuffer).get();
                    System.out.println("讀入資料量:" + readResult);
                    byteBuffer.flip();
                    System.out.println("Get from client:" + (new String(byteBuffer.array())));
                    int writeResult = client.write(byteBuffer).get();
                    if (writeResult > ) {
                        System.out.println(client.getRemoteAddress() + ": " + "response success! write length: " + writeResult);
                    } else {
                        System.out.println("write length <0");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    server.accept(null,this);
                }
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                exc.printStackTrace();
            }
        });
    }

    public static void main(String[] args) {
        AioEchoServer aioEchoServer = new AioEchoServer();
        aioEchoServer.startServer();
        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();
    }
}
           

用戶端的實作方式

public class AioClient {
    public static void main(String[] args) {
        String IP = "127.0.0.1";
        int PORT = ;
        final AsynchronousSocketChannel client;
        try {
            client = AsynchronousSocketChannel.open();
            SocketAddress serverSocketAddress = new InetSocketAddress(IP, PORT);
            client.connect(serverSocketAddress, "clientAttachment", new CompletionHandler<Void, String>() {
                @Override
                public void completed(Void result, String attachment) {
                    System.out.println(attachment);
                    try {
                        ByteBuffer byteBuffer = ByteBuffer.wrap("hello".getBytes());
                        //wrap後不用byteBuffer.flip(),此時position=0,limit是之前position的位置
                        //read後的postion是寫入的界限,limit=capbility,flip做的事情:limit=poition,position=0.
                        //須阻塞寫入
                        int writeResult = client.write(byteBuffer).get();
                        System.out.println("寫入Byte數:" + writeResult);
                        //須阻塞讀取
                        byteBuffer.clear();
                        int readResult = client.read(byteBuffer).get();
                        byteBuffer.flip();
                        System.out.println("server response: " + (new String(byteBuffer.array())));
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            client.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }

                @Override
                public void failed(Throwable exc, String attachment) {
                    exc.printStackTrace();
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        Scanner scanner=new Scanner(System.in);
        scanner.nextLine();

    }
}
           

需要注意的有兩點,不論是client還是server,其write和read都是非阻塞的,模拟的情況下我們需要阻塞等待其結果傳回;二是ByteBuffer.wrap方法之後不用flip操作,其傳回的結果已經是flip過的,作者在這裡踩了坑。

單機版Map-Reduce fork/join架構

fork/join架構類似于單機版的Map-Reduce,如果一個計算任務資料量比較大,就将任務分解交給不同的線程去處理,然後最後彙總結果,這種計算模型對于多核處理器來說具有極大的優勢。使用fork/join架構的基本過程如下。

1. 實作一個類繼承自RecursiveTask(或者無傳回值的RecursiveAction)

1. 實作compute方法,将任務拆分成多個自實作的RecursiveTask

2. invokeAll(所有自實作的RecursiveTask);

3. 使用自實作的RecursiveTask的join方法擷取執行結果

4. 彙總傳回結果并傳回

  1. 使用ForkJoinPool的invoke方法調用上一步自定義的計算任務

下例實作了一個計算數組中所有數和的任務

public class CountTask extends RecursiveTask<Integer>{
    private static int THRESHOLD=;
    private int[] integers;
    private int start;
    private int end;

    public CountTask(int[] integers,int start,int end){
        this.integers=integers;
        this.start=start;
        this.end=end;
    }

    @Override
    protected Integer compute() {
        int len=end-start+;
        boolean isOverThreshold=len>THRESHOLD;
        int sum=;
        if(!isOverThreshold){
           for(int i=start;i<=end;i++){
               sum+=integers[i];
           }
        }else{
            int mid=(start+end)/;
            CountTask leftTask=new CountTask(this.integers,start,mid);
            CountTask rightTask=new CountTask(this.integers,mid+,end);
            invokeAll(leftTask,rightTask);
            //fork的作用是将目前任務放到workerThread裡面去做
            //invokeAll是将其中一個放在本線程做,其他的調用fork
            int leftResult=leftTask.join();
            int rightResult=rightTask.join();
            sum=leftResult+rightResult;

        }
        return sum;
    }
}
           
public class ForkjoinTest {
    public static void main(String[] args){
        int[] integers={,,,,,,,,,};
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        ForkJoinTask<Integer> task=new CountTask(integers,,);
        int sum=forkJoinPool.invoke(task);
        System.out.println(sum);
    }
}
           

值得一提的是fork/join架構是jdk8的stream實作的計算模型,如果想要深入了解stream的實作原理,可以參考這篇文章Java8 Stream原理深度解析。

關于源碼

本文所有源碼可以從這裡獲得,本文首發表于我的部落格,歡迎關注!轉載須注明文章出處,作者保留文章所有權。

繼續閱讀