天天看点

【JAVA核心知识】6.3: JAVA AIO1 AIO是什么2 AIO的异步方式3 示例

AIO

  • 1 AIO是什么
  • 2 AIO的异步方式
    • 2.1 未来式
    • 2.2 回调式
  • 3 示例
    • 3.1 文件读取
    • 3.2 Socket

1 AIO是什么

AIO是NIO的改进与增强,在JDK1.7正式被集入nio包中。AIO采用异步IO模型,关于异步IO模型,在6.1: JAVA IO基础中已经提过:异步IO模型将数据的IO请求和实际获取到IO数据的结果分开,用户只需发起IO操作请求,并在数据准备完成时使用即可,而无需关注获取数据经历了哪些过程。

AIO又被称为NIO2.0,也用到了Buffer以及Channel的概念,这两个概念在【JAVA核心知识】6.2: JAVA NIO已经描述过,下文直接使用,不再进行过多的描述。想了解的朋友可以通过JAVA NIO进行了解。

2 AIO的异步方式

AIO的异步方式分为两种:一种是使用Future类达到异步的未来式,一种是类似于AJax采用回调函数实现异步的回调式。

2.1 未来式

未来式通过concurrent包下的Future类达到异步的目的。提起Future类,自然就会联想到线程池,Future类用来获取Submit给线程池的Callable任务的返回结果。以通过AsynchronousFileChannel.open()获取异步文件通道的实例为例。可以看到open方法会调用另外一个重载的open方法:

public static AsynchronousFileChannel open(Path file,
                                               Set<? extends OpenOption> options,
                                               ExecutorService executor,
                                               FileAttribute<?>... attrs)
        throws IOException
    {
        FileSystemProvider provider = file.getFileSystem().provider();
        return provider.newAsynchronousFileChannel(file, options, executor, attrs);
    }
           

第一个参数Path file表示着文件路径,即数据源。

第二个参数Set<? extends OpenOption> options定义此次打开文件要进行什么操作,默认StandardOpenOption.READ即读操作,如果执行未定义的操作会报错java.nio.channels.Non***ableChannelException。

注意第三个参数ExecutorService executor,线程池服务。由此就可以知道未来式异步是通过线程池加上Callable,Future来实现的。

最后一个参数FileAttribute表示如果要创建一个文件,那么要赋予这个文件什么属性,如只读。

2.2 回调式

回调式类似于Ajax进行异步请求的机制,在进行IO操作之前分别定义成功的回调函数和失败的回调函数,AIO会根据IO操作的结果在结束后执行相应的回调函数来实现异步。同样以文件通道AsynchronousFileChannel为例,其回调式read方法被定义为:

public abstract <A> void read(ByteBuffer dst,
                                  long position,
                                  A attachment,
                                  CompletionHandler<Integer,? super A> handler);
           

第一个参数是数据缓冲区buffer.

第二个参数是FileChannel特有的,标识着从文件的哪个位置开始读取

第三个参数是附件:通过这个参数可以传递对象到回调函数中。

第四个参数是回调函数的回调逻辑,AIO的回调机制通过实现CompletionHandler接口来实现,CompletionHandler接口分别定义了对应着成功回调函数的completed方法与对应着异常回调函数的failed方法。

3 示例

3.1 文件读取

文件读取的示例,分别展示了未来式和回调式的read操作:

package io;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;

public class AioFileDemo {

    public static void main(String[] args) {
        File file = new File("source/fileReaderTxtFile.txt");
        try {
            // open一个异步通道  
            // 第一个参数Path代表文件地址,
            // 第二个参数OpenOption表示对文件进行的操作,默认为Read,可以定义多个,如果执行未定义的操作会报异常:java.nio.channels.Non***ableChannelException
            AsynchronousFileChannel afc = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
            // 定义个Buffer
            ByteBuffer bBuf = ByteBuffer.allocate(1024);
            // 执行读操作,Future表示
            Future<Integer> readFu = afc.read(bBuf, 0); // 线程池
            // 等待一下,因为是异步的,如果不等readFu.isDone()就是false,直接跑下去了
            Thread.sleep(1000);
            // 执行完毕的话
            if (readFu.isDone()) {
                // 打印出读了几个字节
                System.out.println("未来式读取的Buffer长度:" + readFu.get());
                // 打印Buffer的内内容
                bBuf.flip();
                System.out.println("下面是未来式读取到的内容:");
                while (bBuf.hasRemaining()) {
                    System.out.print((char)bBuf.get());
                }
                System.out.println();
            }
            // 回调式read
            String str = "我是附件呀!";
            bBuf.clear();
            // 三个参数,分别式要读取的目标Buffer,文件读取位置,回调函数附件,回调函数
            afc.read(bBuf,  0, str, new CompletionHandler<Integer, String>() {

                @Override
                public void completed(Integer result, String attachment) {
                    System.out.println("回调式成功了输出的:成功了" + attachment);
                }

                @Override
                public void failed(Throwable exc, String attachment) {
                    System.out.println("失败了" + attachment);
                }

            });
            // 等待read完成,不然bBuf就是空:
            // 这就体现出AIO的好处了,执行一个Read指令,接下来流程就直接下去做自己的事情了,而不是阻塞着,这里没有其它工作要做,就Sleep了
            Thread.sleep(1000);
            bBuf.flip();
            // 打印一下读到的
            System.out.println("下面是回调式读取到的内容:");
            while (bBuf.hasRemaining()) {
                System.out.print((char)bBuf.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
           

文件内容:

mytest
           

输入结果:

未来式读取的Buffer长度:6
下面是未来式读取到的内容:
mytest
回调式成功了输出的:成功了我是附件呀!
下面是回调式读取到的内容:
mytest
           

3.2 Socket

Server端:

分别展示了未来式和回调式的read操作以及未来式的accept操作

package io;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.concurrent.Future;

public class AioServer {

    public static void main(String[] args) {
        try {
            AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
            server.bind(new InetSocketAddress("localhost", 8080));
            // Future的方式获得连接请求,获得的返回时连接通道
            System.out.println("服务端未来式接收连接请求开始...");
            Future<AsynchronousSocketChannel> sf = server.accept();
            AsynchronousSocketChannel asc = sf.get();
            System.out.println("服务端未来式成功接收连接请求...");
            ByteBuffer getMsg = ByteBuffer.allocate(1024);
            System.out.println("服务端未来式接收消息开始");
            // Future的方式进行读,获得的返回时读取到的Buffer长度
            Future<Integer> readFu = asc.read(getMsg);
            // get是一个阻塞等待过程,这么做是为了等待连接完毕,避免未连接完成时主线程就关闭了
            readFu.get();
            // 打印出读到的内容
            System.out.println("服务端未来式成功接收到消息");
            System.out.println(new String(new String(Arrays.copyOf(getMsg.array(), getMsg.remaining()))));

            System.out.println("服务端回调式接收消息开始");
            // 用回调的方式读一下 - 参数和文件类型是一样的
            getMsg.clear();
            asc.read(getMsg, getMsg, new CompletionHandler<Integer, ByteBuffer>() {

                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    System.out.println("服务端回调式成功接收到消息");
                    System.out.println(new String(attachment.array()));
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("客户端连接失败");

                }
            });
            // 要等待一下,不然主线程直接关闭了:这就体现出AIO的好处了,执行一个Read指令,接下来流程就直接下去做自己的事情了,而不是阻塞着
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
           

Client端:

分别展示了未来式和回调式的write操作以及未来式的connect操作

package io;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Future;

public class AioClient {

    public static void main(String[] args) {
        try {
            // 打开一个客户端通道
            AsynchronousSocketChannel asc = AsynchronousSocketChannel.open();
            // 未来式的获取连接
            System.out.println("客户端未来式请求连接开始...");
            Future<Void> ft = asc.connect(new InetSocketAddress("localhost", 8080));
            // get并不能获得消息,执行get只是等待连接完毕,避免未连接完成时主线程就关闭了
            ft.get();
            System.out.println("客户端未来式请求连接成功");
            // 写个消息给服务端
            System.out.println("客户端未来式发送消息开始...");
            ByteBuffer sendMsg = ByteBuffer.wrap("CLIENT SEND MSG1".getBytes());
            // get的用处也是为了让消息写完再进行下一步,如果上一条消息没读完,就继续写会报错java.nio.channels.WritePendingException
            asc.write(sendMsg).get();
            System.out.println("客户端未来式发送消息结束");
            // 再写个消息给服务端
            sendMsg = ByteBuffer.wrap("CLIENT SEND MSG2".getBytes());
            // 回调的方式Write
            String flag = "一号机";
            System.out.println("客户端回调式发送消息开始...");
            asc.write(sendMsg, flag, new CompletionHandler<Integer, String>() {

                @Override
                public void completed(Integer result, String attachment) {
                    System.out.println(attachment + "发送成功");
                }

                @Override
                public void failed(Throwable exc, String attachment) {
                    System.out.println(attachment + "发送失败");

                }
            });
            // 和Server端一样要等待一下,不然主线程直接关闭了
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
           

Server端运行结果:

服务端未来式接收连接请求开始...
服务端未来式成功接收连接请求...
服务端未来式接收消息开始
服务端未来式成功接收到消息
CLIENT SEND MSG1
           

Client端运行结果:

客户端未来式请求连接开始...
客户端未来式请求连接成功
客户端未来式发送消息开始...
客户端未来式发送消息结束
客户端回调式发送消息开始...
一号机发送成功
           

PS:

【JAVA核心知识】系列导航 [持续更新中…]

上篇导航:6.2: JAVA NIO

下篇导航:7: 从源码看ArrayList

欢迎关注…

参考资料:

java中的AIO