AIO
- 1 AIO是什么
- 2 AIO的异步方式
-
- 2.1 未来式
- 2.2 回调式
- 3 示例
-
- 3.1 文件读取
- 3.2 Socket
1 AIO是什么
AIO是NIO的改进与增强,在JDK1.7正式被集入nio包中。AIO采用异步IO模型,关于异步IO模型,在6.1: JAVA IO基础中已经提过:异步IO模型将数据的IO请求和实际获取到IO数据的结果分开,用户只需发起IO操作请求,并在数据准备完成时使用即可,而无需关注获取数据经历了哪些过程。
AIO又被称为NIO2.0,也用到了Buffer以及Channel的概念,这两个概念在【JAVA核心知识】6.2: JAVA NIO已经描述过,下文直接使用,不再进行过多的描述。想了解的朋友可以通过JAVA NIO进行了解。
2 AIO的异步方式
AIO的异步方式分为两种:一种是使用Future类达到异步的未来式,一种是类似于AJax采用回调函数实现异步的回调式。
2.1 未来式
未来式通过concurrent包下的Future类达到异步的目的。提起Future类,自然就会联想到线程池,Future类用来获取Submit给线程池的Callable任务的返回结果。以通过AsynchronousFileChannel.open()获取异步文件通道的实例为例。可以看到open方法会调用另外一个重载的open方法:
public static AsynchronousFileChannel open(Path file,
Set<? extends OpenOption> options,
ExecutorService executor,
FileAttribute<?>... attrs)
throws IOException
{
FileSystemProvider provider = file.getFileSystem().provider();
return provider.newAsynchronousFileChannel(file, options, executor, attrs);
}
第一个参数Path file表示着文件路径,即数据源。
第二个参数Set<? extends OpenOption> options定义此次打开文件要进行什么操作,默认StandardOpenOption.READ即读操作,如果执行未定义的操作会报错java.nio.channels.Non***ableChannelException。
注意第三个参数ExecutorService executor,线程池服务。由此就可以知道未来式异步是通过线程池加上Callable,Future来实现的。
最后一个参数FileAttribute表示如果要创建一个文件,那么要赋予这个文件什么属性,如只读。
2.2 回调式
回调式类似于Ajax进行异步请求的机制,在进行IO操作之前分别定义成功的回调函数和失败的回调函数,AIO会根据IO操作的结果在结束后执行相应的回调函数来实现异步。同样以文件通道AsynchronousFileChannel为例,其回调式read方法被定义为:
public abstract <A> void read(ByteBuffer dst,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler);
第一个参数是数据缓冲区buffer.
第二个参数是FileChannel特有的,标识着从文件的哪个位置开始读取
第三个参数是附件:通过这个参数可以传递对象到回调函数中。
第四个参数是回调函数的回调逻辑,AIO的回调机制通过实现CompletionHandler接口来实现,CompletionHandler接口分别定义了对应着成功回调函数的completed方法与对应着异常回调函数的failed方法。
3 示例
3.1 文件读取
文件读取的示例,分别展示了未来式和回调式的read操作:
package io;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;
public class AioFileDemo {
public static void main(String[] args) {
File file = new File("source/fileReaderTxtFile.txt");
try {
// open一个异步通道
// 第一个参数Path代表文件地址,
// 第二个参数OpenOption表示对文件进行的操作,默认为Read,可以定义多个,如果执行未定义的操作会报异常:java.nio.channels.Non***ableChannelException
AsynchronousFileChannel afc = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
// 定义个Buffer
ByteBuffer bBuf = ByteBuffer.allocate(1024);
// 执行读操作,Future表示
Future<Integer> readFu = afc.read(bBuf, 0); // 线程池
// 等待一下,因为是异步的,如果不等readFu.isDone()就是false,直接跑下去了
Thread.sleep(1000);
// 执行完毕的话
if (readFu.isDone()) {
// 打印出读了几个字节
System.out.println("未来式读取的Buffer长度:" + readFu.get());
// 打印Buffer的内内容
bBuf.flip();
System.out.println("下面是未来式读取到的内容:");
while (bBuf.hasRemaining()) {
System.out.print((char)bBuf.get());
}
System.out.println();
}
// 回调式read
String str = "我是附件呀!";
bBuf.clear();
// 三个参数,分别式要读取的目标Buffer,文件读取位置,回调函数附件,回调函数
afc.read(bBuf, 0, str, new CompletionHandler<Integer, String>() {
@Override
public void completed(Integer result, String attachment) {
System.out.println("回调式成功了输出的:成功了" + attachment);
}
@Override
public void failed(Throwable exc, String attachment) {
System.out.println("失败了" + attachment);
}
});
// 等待read完成,不然bBuf就是空:
// 这就体现出AIO的好处了,执行一个Read指令,接下来流程就直接下去做自己的事情了,而不是阻塞着,这里没有其它工作要做,就Sleep了
Thread.sleep(1000);
bBuf.flip();
// 打印一下读到的
System.out.println("下面是回调式读取到的内容:");
while (bBuf.hasRemaining()) {
System.out.print((char)bBuf.get());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
文件内容:
mytest
输入结果:
未来式读取的Buffer长度:6
下面是未来式读取到的内容:
mytest
回调式成功了输出的:成功了我是附件呀!
下面是回调式读取到的内容:
mytest
3.2 Socket
Server端:
分别展示了未来式和回调式的read操作以及未来式的accept操作
package io;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.concurrent.Future;
public class AioServer {
public static void main(String[] args) {
try {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 8080));
// Future的方式获得连接请求,获得的返回时连接通道
System.out.println("服务端未来式接收连接请求开始...");
Future<AsynchronousSocketChannel> sf = server.accept();
AsynchronousSocketChannel asc = sf.get();
System.out.println("服务端未来式成功接收连接请求...");
ByteBuffer getMsg = ByteBuffer.allocate(1024);
System.out.println("服务端未来式接收消息开始");
// Future的方式进行读,获得的返回时读取到的Buffer长度
Future<Integer> readFu = asc.read(getMsg);
// get是一个阻塞等待过程,这么做是为了等待连接完毕,避免未连接完成时主线程就关闭了
readFu.get();
// 打印出读到的内容
System.out.println("服务端未来式成功接收到消息");
System.out.println(new String(new String(Arrays.copyOf(getMsg.array(), getMsg.remaining()))));
System.out.println("服务端回调式接收消息开始");
// 用回调的方式读一下 - 参数和文件类型是一样的
getMsg.clear();
asc.read(getMsg, getMsg, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("服务端回调式成功接收到消息");
System.out.println(new String(attachment.array()));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("客户端连接失败");
}
});
// 要等待一下,不然主线程直接关闭了:这就体现出AIO的好处了,执行一个Read指令,接下来流程就直接下去做自己的事情了,而不是阻塞着
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Client端:
分别展示了未来式和回调式的write操作以及未来式的connect操作
package io;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Future;
public class AioClient {
public static void main(String[] args) {
try {
// 打开一个客户端通道
AsynchronousSocketChannel asc = AsynchronousSocketChannel.open();
// 未来式的获取连接
System.out.println("客户端未来式请求连接开始...");
Future<Void> ft = asc.connect(new InetSocketAddress("localhost", 8080));
// get并不能获得消息,执行get只是等待连接完毕,避免未连接完成时主线程就关闭了
ft.get();
System.out.println("客户端未来式请求连接成功");
// 写个消息给服务端
System.out.println("客户端未来式发送消息开始...");
ByteBuffer sendMsg = ByteBuffer.wrap("CLIENT SEND MSG1".getBytes());
// get的用处也是为了让消息写完再进行下一步,如果上一条消息没读完,就继续写会报错java.nio.channels.WritePendingException
asc.write(sendMsg).get();
System.out.println("客户端未来式发送消息结束");
// 再写个消息给服务端
sendMsg = ByteBuffer.wrap("CLIENT SEND MSG2".getBytes());
// 回调的方式Write
String flag = "一号机";
System.out.println("客户端回调式发送消息开始...");
asc.write(sendMsg, flag, new CompletionHandler<Integer, String>() {
@Override
public void completed(Integer result, String attachment) {
System.out.println(attachment + "发送成功");
}
@Override
public void failed(Throwable exc, String attachment) {
System.out.println(attachment + "发送失败");
}
});
// 和Server端一样要等待一下,不然主线程直接关闭了
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Server端运行结果:
服务端未来式接收连接请求开始...
服务端未来式成功接收连接请求...
服务端未来式接收消息开始
服务端未来式成功接收到消息
CLIENT SEND MSG1
Client端运行结果:
客户端未来式请求连接开始...
客户端未来式请求连接成功
客户端未来式发送消息开始...
客户端未来式发送消息结束
客户端回调式发送消息开始...
一号机发送成功
PS:
【JAVA核心知识】系列导航 [持续更新中…]
上篇导航:6.2: JAVA NIO
下篇导航:7: 从源码看ArrayList
欢迎关注…
参考资料:
java中的AIO