天天看點

JDK7 AIO初體驗

關于AIO的概念了解

Io的兩個重要步驟:發起IO請求,和實際的IO操作。在unix網絡程式設計的定義裡異步和非異步概念的差別就是實際的IO操作是否阻塞。如果不是就是異步,如果是就是同步。

而阻塞和非阻塞的差別在于發起IO請求的時候是否會阻塞,如果會就是阻塞,不會就是非阻塞。

本人了解能力有限,想了個例子來輔助自己了解:

小明想要買一本《深入java虛拟機》的書,以下幾個場景可以來了解這幾種io模式:

1. 如果小明每天都去書店問售貨員說有沒有這本書,如果沒有就回去繼續等待,等下次再過來文。(阻塞)

2. 如果小明告訴售貨員想買一本《深入java虛拟機》的書,那麼就在家裡等着做其他事情去了,如果書到了售貨員就通知小明,小明再自己過去取。

3. 如果小明告售貨員想買一本《深入java虛拟機》的書,然後告訴售貨員到了幫他送到某某地方去,就做其他事情去了。小明就不管了,等書到了,售貨員就幫他送到那個地方了。

售貨員可以認為是作業系統的一個服務,而小明是一個使用者程序。

可以看出2,3的效率明顯要比1高。但是1最簡單,而2,3需要一些協作。充分證明了團隊合作的力量。

JDK7 AIO初體驗

AsynchronousChannel

:支援異步通道,包括服務端

AsynchronousServerSocketChannel

AsynchronousSocketChannel

等實作。

CompletionHandler

:使用者處理器。定義了一個使用者處理就緒事件的接口,由使用者自己實作,異步io的資料就緒後回調該處理器消費或處理資料。

AsynchronousChannelGroup

:一個用于資源共享的異步通道集合。處理IO事件和配置設定給

CompletionHandler

以一個簡單監聽服務端為例,基本過程是:

1. 啟動一個服務端通道

2. 定義一個事件處理器,使用者事件完成的時候處理,如消費資料。

3. 向系統注冊一個感興趣的事件,如接受資料,并把事件完成的處理器傳遞給系統。

4. 都已經交待完畢,可以隻管繼續做自己的事情了,作業系統在完成事件後通過其他的線程會自動調用處理器完成事件處理。

以下用一個例子來簡單實作,一個服務端和用戶端。服務端監聽用戶端的消息,并列印出來。

AIOServer.java

package io.aio;  

    import java.io.IOException;  
    import java.net.InetSocketAddress;  
    import java.nio.ByteBuffer;  
    import java.nio.channels.AsynchronousServerSocketChannel;  
    import java.nio.channels.AsynchronousSocketChannel;  
    import java.nio.channels.CompletionHandler;  
    import java.util.concurrent.ExecutionException;  
    import java.util.concurrent.Future;  
    import java.util.concurrent.TimeUnit;  
    import java.util.concurrent.TimeoutException;  

    /** 
     *  
     * @author noname 
     */  
    public class AIOServer {  
        public final static int PORT = ;  
        private AsynchronousServerSocketChannel server;  

        public AIOServer() throws IOException {  
            server = AsynchronousServerSocketChannel.open().bind(  
                    new InetSocketAddress(PORT));  
        }  

        public void startWithFuture() throws InterruptedException,  
                ExecutionException, TimeoutException {  
            System.out.println("Server listen on " + PORT);  
            Future<AsynchronousSocketChannel> future = server.accept();  
            AsynchronousSocketChannel socket = future.get();  
            ByteBuffer readBuf = ByteBuffer.allocate();  
            readBuf.clear();  
            socket.read(readBuf).get(, TimeUnit.SECONDS);  
            readBuf.flip();  
            System.out.printf("received message:" + new String(readBuf.array()));  
            System.out.println(Thread.currentThread().getName());  

        }  

        public void startWithCompletionHandler() throws InterruptedException,  
                ExecutionException, TimeoutException {  
            System.out.println("Server listen on " + PORT);  
            //注冊事件和事件完成後的處理器  
            server.accept(null,  
                    new CompletionHandler<AsynchronousSocketChannel, Object>() {  
                        final ByteBuffer buffer = ByteBuffer.allocate();  

                        public void completed(AsynchronousSocketChannel result,  
                                Object attachment) {  
                            System.out.println(Thread.currentThread().getName());  
                            System.out.println("start");  
                            try {  
                                buffer.clear();  
                                result.read(buffer).get(, TimeUnit.SECONDS);  
                                buffer.flip();  
                                System.out.println("received message: "  
                                        + new String(buffer.array()));  
                            } catch (InterruptedException | ExecutionException e) {  
                                System.out.println(e.toString());  
                            } catch (TimeoutException e) {  
                                e.printStackTrace();  
                            } finally {  

                                try {  
                                    result.close();  
                                    server.accept(null, this);  
                                } catch (Exception e) {  
                                    System.out.println(e.toString());  
                                }  
                            }  

                            System.out.println("end");  
                        }  

                        @Override  
                        public void failed(Throwable exc, Object attachment) {  
                            System.out.println("failed: " + exc);  
                        }  
                    });  
            // 主線程繼續自己的行為  
            while (true) {  
                System.out.println("main thread");  
                Thread.sleep();  
            }  

        }  

        public static void main(String args[]) throws Exception {  
            new AIOServer().startWithCompletionHandler();  
        }  
    }  
           

AIOClient.java

package io.aio;  

    import java.net.InetSocketAddress;  
    import java.nio.ByteBuffer;  
    import java.nio.channels.AsynchronousSocketChannel;  

    public class AIOClient {  

        public static void main(String... args) throws Exception {  
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();  
            client.connect(new InetSocketAddress("localhost", ));  
            client.write(ByteBuffer.wrap("test".getBytes())).get();  
        }  
    }  
           

服務端寫了兩種處理實作方式,

startWithCompletionHandler

是通過

Handler

來處理,

startWithFuture

是通過

Future

方式來處理。

startWithCompletionHandler

方法裡可以看到調用

accepte()

完成異步注冊後,線程就可以繼續自己的處理了,完全不被這個io所中斷。

從以上來看AIO的代碼簡單了很多,至少比NIO的代碼實作簡單很多。