天天看點

Java BIO NIO 與 AIO 分析第三部分之AIOAIO部分AIO與NIO差别AIO NIO與IO多路複用的關系AIO體系結構CompletionHandler 分析AsynchronousFileChannel 分析異步TCP IO操作分析總結

原創文章, 轉載請私信. 訂閱号 tastejava 回複 aio思維導圖擷取思維導圖源檔案

AIO部分

上一篇文章中分析了BIO部分, 接下來分析一下AIO部分, AIO是JDK1.7新增的屬于java.nio包下的IO元件. 還是一樣的思路, 從了解AIO的各個重要元件開始. JAVA的AIO隻提供TCP操作和檔案操作, 沒有提供UDP支援. 主要元件有CompletionHandler, AsynchronousFileChannel, AsynchronousServerSocketChannel, AsynchronousSocketChannel 共四個元件, 各自作用為:

  1. CompletionHandler 包含了異步IO通用回調方法
  2. AsynchronousFileChannel 用于操作檔案
  3. AsynchronousServerSocketChannel 用于操作TCP連接配接, 監聽伺服器端
  4. AsynchronousSocketChannel 用于操作TCP連接配接

AIO與NIO差别

AIO全稱是非阻塞異步IO(Asynchronous IO), 真正的實作了IO操作的異步處理.與NIO的輪詢連接配接是否處于就緒狀态不同, AIO是傳入一個回調方法和緩存參數, 讀取或者發送資料操作交由作業系統實作, 當IO操作完成後, 相應的AsynchronousChannel會回調提供的回調方法, 在回調方法中進行資料擷取以及業務處理.

AIO NIO與IO多路複用的關系

AIO和NIO都屬于IO多路複用的實作方式, AIO與NIO都實作了IO多路複用. AIO屬于proactor模式, NIO屬于reactor模式. 具體什麼是reactor, 什麼是reactor在之後的文章再仔細分析, 這篇文章隻是分析Java提供的AIO本身.

AIO體系結構

Java BIO NIO 與 AIO 分析第三部分之AIOAIO部分AIO與NIO差别AIO NIO與IO多路複用的關系AIO體系結構CompletionHandler 分析AsynchronousFileChannel 分析異步TCP IO操作分析總結

從圖中可以看到AIO體系非常簡潔, 下面我們一個一個元件梳理, 分析怎麼使用.

CompletionHandler 分析

在正式分析檔案或者網絡的異步IO操作前, 首先要熟悉一個很重要的接口, CompletionHandler, 這個接口作用是什麼呢, 一句話概括, CompletionHandler用于消費處理異步IO操作産生的結果資料.源碼原始注釋如下:

A handler for consuming the result of an asynchronous I/O operation.

接口簽名為

這裡的類型參數V和A源碼注釋描述的也十分簡潔:

* @param   <V>     The result type of the I/O operation
 * @param   <A>     The type of the object attached to the I/O operation
           

即V是異步IO操作傳回的結果, A是調用IO操作時額外附加的對象.

CompletionHandler提供的回調方法總共有兩個, 一個是IO操作完成并且成功回調的completed方法, 一個是IO操作失敗後回調的failed方法.詳細源碼和分析如下:

/**
     * Invoked when an operation has completed.
     * 當IO操作成功完成後調用此方法
     *
     * @param   result
     *          The result of the I/O operation.
     * 			異步IO操作産生的結果
     * @param   attachment
     *          The object attached to the I/O operation when it was initiated.
     * 			調用異步IO操作時初始化的附加對象
     */
    void completed(V result, A attachment);

    /**
     * Invoked when an operation fails.
     * 當IO操作失敗時的回調方法
     *
     * @param   exc
     *          The exception to indicate why the I/O operation failed
     * 			IO操作産生的錯誤對象
     * @param   attachment
     *          The object attached to the I/O operation when it was initiated.
     * 			調用異步IO操作時初始化的附加對象
     */
    void failed(Throwable exc, A attachment);
           

了解具體AIO的前置條件是要記住CompletionHandler兩個類型參數, 第一個V是IO操作結果, 第二個A是附加對象, 接下來具體看看AIO的檔案操作和網絡TCP操作.

AsynchronousFileChannel 分析

用AIO讀取檔案, 核心過程是将讀取這個操作實際執行者委托給作業系統, 作業系統将資料讀到傳遞的緩存對象中, 然後回調讀取成功方法.異步IO操作都分為兩種, 一種是傳回一個Future對象, 這種方式的get方法是阻塞的, 雖然用的是異步IO操作, 但是實際還是同步非阻塞IO, 還是reactor模式. 另一種是傳遞一個回調函數, 異步IO操作完成後自動回調傳遞的回調函數, 這一種是真正的異步IO, 是proactor模式.

下面分别用同步非阻塞和異步非阻塞的方式操作檔案, 具體代碼如下:

reactor模式

// 雖然用的是異步IO, 但是還是reactor模式的IO多路複用
@Test
public void testAsynchronousFileChannelWidthFuture() throws IOException, ExecutionException, InterruptedException {
    // 配置設定位元組緩存
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
    String filePath = getClass().getResource("/") + "/SimpleReadFile.txt";
    Path path = Paths.get(filePath.replaceFirst("file:/", ""));
    // 獲得異步通道執行個體
    AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
    Future<Integer> read = asynchronousFileChannel.read(byteBuffer, 0);
    // 判斷future是否完成讀取, 未完成時目前線程可以處理其他業務
    while (!read.isDone()) {
        // 其他業務邏輯
        continue;
    }
    // Future的get方法是阻塞方法, 未完成會阻塞到完成并且傳回結果.
    Integer readCount = read.get();
    byteBuffer.flip();
    CharBuffer charBuffer = Charsets.UTF_8.decode(byteBuffer);
    int limit = charBuffer.limit();
    char[] chars = new char[limit];
    charBuffer.get(chars);
    log.info("讀取了{}個位元組", readCount);
    log.info("{}", String.valueOf(chars));
}
           

proactor模式

// 真正的異步IO即AIO, 使用proactor模式
@Test
public void testAsynchronousFileChannelWidthCompletionHandler() throws IOException, InterruptedException {
    String filePath = getClass().getResource("/") + "/SimpleReadFile.txt";
    Path path = Paths.get(filePath.replaceFirst("file:/", ""));
    AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
    asynchronousFileChannel.read(byteBuffer, 0, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
        // 讀取完成後回調方法
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            CharBuffer charBuffer = Charsets.UTF_8.decode(attachment);
            int limit = charBuffer.limit();
            char[] chars = new char[limit];
            charBuffer.get(chars);
            log.info("讀取了{}個位元組", result);
            log.info("{}", String.valueOf(chars));
        }
        // 讀取過程失敗回調方法
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            log.info("讀取出錯");
        }
    });
    Thread.sleep(2000);
}
           

異步TCP IO操作分析

AIO隻支援TCP, 不提供UDP支援, 接下來看一下AsynchronousServerSocketChannel和AsynchronousSocketChannel的具體使用.網絡的異步IO操作時, 傳回結果(包括AsynchronousServerSocketChannel的accept, AsynchronousSocketChannel.connect方法傳回結果)都有傳回Future和傳入CompletionHandler回調方法兩種方式, 傳回Future的方法是換了個地方阻塞, 即使用isDone判斷Future是否完成, 也是輪詢的方式, 不做描述, 接下來的示例代碼都是通過回調的方法使用AIO.

首先準備一個工具方法, 作用是将字元串資料放入Buffer, 供網絡傳輸資料使用.

// 将字元串放入指定Buffer對象的工具方法
private ByteBuffer getReadableByteBufferWithData(ByteBuffer byteBuffer, String message) {
    byteBuffer.put(message.getBytes());
    // 切換位元組緩存為讀狀态, 為發送資料做準備
    byteBuffer.flip();
    return byteBuffer;
}
           

AIO網絡操作主要流程如下:

①網絡操作的發送資料流程主要是将資料寫入Buffer, 然後調用AsynchronousSocketChannel的write方法, 并将完成操作的回調方法類執行個體CompletionHandler傳進去.

②網絡操作的接收資料流程主要是調用AsynchronousSocketChannel的read方法, 傳入緩存Buffer對象, 傳入回調方法執行個體CompletionHandler, 當接收資料完成後, read方法會自動回調傳入的回調方法.

可以看到, 不管接收資料還是發送資料, 都要提供回調方法CompletionHandler, 是以我們還要準備一個接收資料完成的回調CompletionHandler類執行個體, 和一個發送資料完成的回調CompletionHandler類執行個體.具體代碼如下:

/**
 * @author GaoZl
 * @date 2019/12/3 9:46
 * 發送資料完成後回調函數
 */
@Slf4j
public class SendCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        // 發送成功, 清空緩存并切換為寫狀态
        attachment.clear();
        log.info("狀态: 發送消息完成");
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        log.info("狀态: 發送消息失敗");
    }
}
           
/**
 * @author GaoZl
 * @date 2019/12/3 9:45
 * 接收資料完成後的回調函數
 */
@Slf4j
public class ReceiveCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel asynchronousSocketChannel;

    public ReceiveCompletionHandler(AsynchronousSocketChannel asynchronousSocketChannel) {
        this.asynchronousSocketChannel = asynchronousSocketChannel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        // 切換緩存為讀狀态
        attachment.flip();
        CharBuffer charBuffer = Charsets.UTF_8.decode(attachment);
        // 讀取資料完成, 清空緩存并切換為寫狀态
        attachment.clear();
        // 本條消息資料已經讀取到了 charBuffer
        // 在處理資料同時未完成處理的時候有資料傳入會存在資料丢失的情況, 還需解決
        asynchronousSocketChannel.read(attachment, attachment, this);
        // 資料的業務處理邏輯
        int limit = charBuffer.limit();
        char[] chars = new char[limit];
        charBuffer.get(chars);
        log.info("讀取了{}個位元組", result);
        log.info("接收到消息: {}", String.valueOf(chars));
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        log.info("狀态: 接收資料失敗");
    }
}
           

有了具體的接收資料回調類和發送資料回調類後, 就是具體組合使用了, 下面依舊是從實際代碼和注釋來看一下AIO的具體TCP網絡操作, 代碼如下:

// 伺服器端
@Test
public void testAsynchronousServerSocketChannel() throws IOException, InterruptedException {
    ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
    ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
    AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
    asynchronousServerSocketChannel.bind(new InetSocketAddress(9999));
    asynchronousServerSocketChannel.accept(writeBuffer, new CompletionHandler<AsynchronousSocketChannel, ByteBuffer>() {
        @Override
        public void completed(AsynchronousSocketChannel result, ByteBuffer attachment) {
            // 發送消息
            // 發送資料完成後回調函數
            SendCompletionHandler sendCompletionHandler = new SendCompletionHandler();
            // 發送資料
            getReadableByteBufferWithData(writeBuffer, "消息來自伺服器, 連接配接已經建立");
            result.write(writeBuffer, writeBuffer, sendCompletionHandler);
            // 接收消息
            ReceiveCompletionHandler readCompletionHandler = new ReceiveCompletionHandler(result);
            result.read(readBuffer, readBuffer, readCompletionHandler);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            log.info("狀态: 伺服器與用戶端建立連接配接失敗");
        }
    });
    // 主線程阻塞
    Thread.sleep(1000 * 3600);
}
           
// 用戶端
@Test
public void testAsynchronousSocketChannel() throws IOException, InterruptedException {
    ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
    ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
    AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
    asynchronousSocketChannel.connect(new InetSocketAddress("127.0.0.1", 9999), asynchronousSocketChannel,
            new CompletionHandler<Void, AsynchronousSocketChannel>() {
                @Override
                public void completed(Void result, AsynchronousSocketChannel attachment) {
                    // 接收消息
                    ReceiveCompletionHandler readCompletionHandler = new ReceiveCompletionHandler(attachment);
                    attachment.read(readBuffer, readBuffer, readCompletionHandler);
                    // 5秒向用戶端發送一次消息
                    while (true) {
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 發送資料完成後回調函數
                        SendCompletionHandler sendCompletionHandler = new SendCompletionHandler();
                        getReadableByteBufferWithData(writeBuffer, "消息來自用戶端, 連接配接已經建立");
                        asynchronousSocketChannel.write(writeBuffer, writeBuffer, sendCompletionHandler);
                    }
                }

                @Override
                public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                    log.info("狀态: 用戶端與伺服器建立連接配接失敗");
                }
            });
    // 主線程阻塞
    Thread.sleep(1000 * 3600);
}
           

總結

AIO和NIO帶來的性能提升是裡程碑式的, 但是API設計比較繁瑣, Buffer的flip設計也不是很好用, 網絡程式設計還是建議netty這種成熟的網絡架構.