天天看点

java io bio nio aio 详解一、 Reactor and Proactor 二、BIO、NIO、AIO

BIO、NIO、AIO的区别:

BIO就是基于Thread per Request的传统server/client实现模式,

NIO通常采用Reactor模式,

AIO通常采用Proactor模式,

AIO简化了程序的编写,stream的读取和写入都有OS来完成,不需要像NIO那样子遍历Selector。Windows基于IOCP实现AIO,Linux只有eppoll模拟实现了AIO。

Java7之前的JDK只支持NIO和BIO,从7开始支持AIO。

4种通信方式:TCP/IP+BIO, TCP/IP+NIO, UDP/IP+BIO, UDP/IP+NIO。

一、 Reactor and Proactor

IO读写时,多路复用机制都会依赖对一个事件多路分离器,负责把源事件的IO 事件分离出来,分别到相应的read/write事件分离器。涉及到事件分离器的两种模式分别就是 Reactor和Proactor,Reactor是基于同步IO的,Proactor是基于异步IO的。

关于同步和异步IO

Io的两个重要步骤:发起IO请求,和实际的IO操作。在unix网络编程的定义里异步和非异步概念的区别就是实际的IO操作是否阻塞。如果不是就是异步,如果是就是同步。

而阻塞和非阻塞的区别在于发起IO请求的时候是否会阻塞,如果会就是阻塞,不会就是非阻塞。

本人理解能力有限,想了个例子来辅助自己理解:

小明想要买一本<深入java虚拟机>的书,以下几个场景可以来理解这几种io模式:

1. 如果小明每天都去书店问售货员说有没有这本书,如果没有就回去继续等待,等下次再过来文。(阻塞)

2. 如果小明告诉售货员想买一本<深入java虚拟机>的书,那么就在家里等着做其他事情去了,如果书到了售货员就通知小明,小明再自己过去取。

3. 如果小明告售货员想买一本<深入java虚拟机>的书,然后告诉售货员到了帮他送到某某地方去,就做其他事情去了。小明就不管了,等书到了,售货员就帮他送到那个地方了。

售货员可以认为是操作系统的一个服务,而小明是一个用户进程。不知道是否有误,如果有误请大家拍砖指出,谢谢。

可以看出2,3的效率明显要比1高。但是1最简单,而2,3需要一些协作。充分证明了团队合作的力量。

在Reactor模式中,事件分离者等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。

在Proactor模式中,事件处理者(或者代由事件分离者发起)直接发起一个异步读写操作(相当于请求),而实际的工作是由操作系统来完成的。发起时,需要提供的参数包括用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分离者得知了这个请求,它默默等待这个请求的完成,然后转发完成事件给相应的事件处理者或者回调。举例来说,在Windows上事件处理者投递了一个异步IO操作(称有 overlapped的技术),事件分离者等IOCompletion事件完成. 这种异步模式的典型实现是基于操作系统底层异步API的,所以我们可称之为“系统级别”的或者“真正意义上”的异步,因为具体的读写是由操作系统代劳的。

举个例子,将有助于理解Reactor与Proactor二者的差异,以读操作为例(类操作类似)。

在Reactor中实现读:

- 注册读就绪事件和相应的事件处理器

- 事件分离器等待事件

- 事件到来,激活分离器,分离器调用事件对应的处理器。

- 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。

与如下Proactor(真异步)中的读过程比较:

- 处理器发起异步读操作(注意:操作系统必须支持异步IO)。在这种情况下,处理器无视IO就绪事件,它关注的是完成事件。

- 事件分离器等待操作完成事件

- 在分离器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成。

- 事件分离器呼唤处理器。

- 事件处理器处理用户自定义缓冲区中的数据,然后启动一个新的异步操作,并将控制权返回事件分离器。

可以看出,两个模式的相同点,都是对某个IO事件的事件通知(即告诉某个模块,这个IO操作可以进行或已经完成)。在结构

上,两者也有相同点:demultiplexor负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler;

不同点在于,异步情况下(Proactor),当回调handler时,表示IO操作已经完成;同步情况下(Reactor),回调handler时,表示

IO设备可以进行某个操作(can read or can write),handler这个时候开始提交操作。

二、BIO、NIO、AIO

NIO通常采用Reactor模式,AIO通常采用Proactor模式。AIO简化了程序的编写,stream的读取和写入都有OS来完成,不需要像NIO那样子遍历Selector。Windows基于IOCP实现AIO,Linux只有eppoll模拟实现了AIO。

Java7之前的JDK只支持NIO和BIO,从7开始支持AIO。

4种通信方式:TCP/IP+BIO, TCP/IP+NIO, UDP/IP+BIO, UDP/IP+NIO。

(1)TCP/IP+BIO

Socket和ServerSocket实现,ServerSocket实现Server端端口监听,Socket用于建立网络IO连接。不适用于处理多个请求 1.生成Socket会消耗过多的本地资源。2. Socket连接的建立一般比较慢。

BIO情况下,能支持的连接数有限,一般都采取accept获取Socket以后采用一个thread来处理,one connection one thread。无论连接是否有真正数据请求,都需要独占一个thread。

    可以通过设立Socket池来一定程度上解决问题,但是使用池需要注意的问题是:1. 竞争等待比较多。 2. 需要控制好超时时间。Socket和ServerSocket实现,ServerSocket实现Server端端口监听,Socket用于建立网络IO连接。

服务器,使用ServerSocket监听指定的端口,端口可以随意指定(由于1024以下的端口通常属于保留端口,在一些操作系统中不可以随意使用,所以建议使用大于1024的端口),等待客户连接请求,客户连接后,会话产生;在完成会话后,关闭连接。

[java]  view plain copy

  1. package test;    
  2. import java.io.BufferedReader;    
  3. import java.io.IOException;    
  4. import java.io.InputStreamReader;    
  5. import java.net.ServerSocket;    
  6. import java.net.Socket;    
  7. public class Server {    
  8.     private Socket socket;    
  9.     private ServerSocket ss;    
  10.     public Server() throws IOException {    
  11.         ss = new ServerSocket(7777);    
  12.         while (true) {    
  13.             socket = ss.accept();    
  14.             BufferedReader br = new BufferedReader(new InputStreamReader(socket    
  15.                     .getInputStream()));    
  16.             System.out.println("you input is : " + br.readLine());    
  17.         }    
  18.     }    
  19.     public static void main(String[] args) {    
  20.         try {    
  21.             new Server();    
  22.         } catch (IOException e) {    
  23.             e.printStackTrace();    
  24.         }    
  25.     }    
  26. }    

客户端,使用Socket对网络上某一个服务器的某一个端口发出连接请求,一旦连接成功,打开会话;会话完成后,关闭Socket。客户端不需要指定打开的端口,通常临时的、动态的分配一个1024以上的端口。 [java]  view plain copy

  1. package test;    
  2. import java.io.BufferedRear;    
  3. import java.io.IOException;    
  4. import java.io.InputStreamReader;    
  5. import java.io.PrintWriter;    
  6. import java.net.Socket;    
  7. import java.net.UnknownHostException;    
  8. ublic class Client {    
  9.    Socket client;    
  10.    PrintWriter pw;    
  11.    public Client() throws UnknownHostException, IOException {    
  12.        client=new Socket("Socket服务器IP",7777);    
  13.        pw=new PrintWriter(client.getOutputStream());    
  14.        BufferedReader br=new BufferedReader(new InputStreamReader(System.in));    
  15.        pw.write(br.readLine());    
  16.        pw.close();    
  17.        br.close();    
  18.    }    
  19.    public static void main(String[] args) {    
  20.        try {    
  21.            new Client();    
  22.        } catch (UnknownHostException e) {    
  23.            e.printStackTrace();    
  24.        } catch (IOException e) {    
  25.            e.printStackTrace();    
  26.     }    
  27.    }    

不适用于处理多个请求 1.生成Socket会消耗过多的本地资源。2. Socket连接的建立一般比较慢。

BIO情况下,能支持的连接数有限,一般都采取accept获取Socket以后采用一个thread来处理,one connection one thread。无论连接是否有真正数据请求,都需要独占一个thread。

可以通过设立Socket池来一定程度上解决问题, [java]  view plain copy

  1. import java.io.BufferedReader;  
  2. import java.io.IOException;  
  3. import java.io.InputStreamReader;  
  4. import java.io.PrintWriter;  
  5. import java.net.ServerSocket;  
  6. import java.net.Socket;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. public class BIOPoolServer {  
  10. ExecutorService pool = null;  
  11.     public  BIOPoolServer(){  
  12.     try {  
  13. ServerSocket server = new ServerSocket(29001);  
  14. pool = Executors.newFixedThreadPool(1);  
  15. while(true){  
  16. pool.execute(new Handler(server.accept()));  
  17. }  
  18. } catch (IOException e) {  
  19. e.printStackTrace();  
  20. }finally{  
  21. pool.shutdown();  
  22. }  
  23.     }  
  24.     class Handler implements Runnable{  
  25.     Socket socket;  
  26.     public Handler(Socket socket){  
  27.     this.socket = socket;  
  28.     }  
  29. public void run() {  
  30. try {  
  31. BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
  32. PrintWriter out = new PrintWriter(socket.getOutputStream(),true);  
  33.                 String msg = in.readLine();  
  34.                 System.out.println("The client send the msg : "+msg);  
  35.     out.println("The server has received!");  
  36. } catch (IOException e) {  
  37. e.printStackTrace();  
  38. }  
  39. }  
  40.     }  
  41.     public static void main(String[] args) {  
  42. new BIOPoolServer();  
  43. }  
  44. }  

但是使用池需要注意的问题是:1. 竞争等待比较多。 2. 需要控制好超时时间。

(2)TCP/IP+NIO

使用Channel(SocketChannel和ServerSocketChannel)和Selector。

Server端通常由一个thread来监听connect事件,另外多个thread来监听读写事件。这样做的好处是这些连接只有在真是请求的时候才会创建thread来处理,one request one thread。这种方式在server端需要支持大量连接但这些连接同时发送请求的峰值不会很多的时候十分有效。

[java]  view plain copy

  1. server端:  
  2. import java.io.IOException;  
  3. import java.net.InetSocketAddress;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.IntBuffer;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.Selector;  
  8. import java.nio.channels.ServerSocketChannel;  
  9. import java.nio.channels.SocketChannel;  
  10. import java.nio.channels.spi.SelectorProvider;  
  11. import java.util.Iterator;  
  12. import java.util.Set;  
  13. public class NIOServer {  
  14. ServerSocketChannel channel = null;  
  15.    public NIOServer(){  
  16.   try {  
  17.   openChannel();  
  18.   waitForConnection();  
  19. } catch (IOException e) {  
  20. e.printStackTrace();  
  21. }  
  22.    }  
  23.    private void openChannel() throws IOException{  
  24. channel = ServerSocketChannel.open();  
  25. //绑定监听端口  
  26. channel.socket().bind(new InetSocketAddress(29000));  
  27. //设置为非阻塞形式  
  28. channel.configureBlocking(false);  
  29.   }  
  30.    private void waitForConnection() throws IOException{    
  31.   Selector acceptSelector = SelectorProvider.provider().openSelector();  
  32.   channel.register(acceptSelector, SelectionKey.OP_ACCEPT);  
  33.   int keyAdded = 0;  
  34.   while((keyAdded=acceptSelector.select())>0){  
  35. // 某客户已经准备好可以进行I/O操作了,获取其ready键集合  
  36.   Set readKeys = acceptSelector.selectedKeys();  
  37.   Iterator iter = readKeys.iterator();  
  38.   while(iter.hasNext()){  
  39.   SelectionKey sk = (SelectionKey)iter.next();  
  40.   iter.remove();  
  41.   if(sk.isAcceptable()){  
  42.   ServerSocketChannel server = (ServerSocketChannel) sk.channel();  
  43.   SocketChannel socket = server.accept();  
  44.   ByteBuffer _buffer = ByteBuffer.allocate(8);    
  45.   IntBuffer _intBuffer = _buffer.asIntBuffer();   
  46.    _buffer.clear();    
  47.    socket.read(_buffer);  
  48.        int result = _intBuffer.get(0) + _intBuffer.get(1);    
  49.        _buffer.flip();    
  50.        _buffer.clear();    
  51.        _intBuffer.put(0, result);    
  52.        socket.write(_buffer);  
  53.   }  
  54.   }  
  55.   }  
  56.    }  
  57.    public static void main(String[] args) {  
  58.  new NIOServer();  
  59. }  
  60. }  
  61. client端:  
  62. import java.io.IOException;  
  63. import java.net.InetSocketAddress;  
  64. import java.nio.ByteBuffer;  
  65. import java.nio.IntBuffer;  
  66. import java.nio.channels.SocketChannel;  
  67. public class NIOClient {  
  68.  public void start(int first, int second){  
  69. SocketChannel channel = null;  
  70. try {  
  71. InetSocketAddress socketAddress = new InetSocketAddress("localhost", 29000);   
  72. channel = SocketChannel.open(socketAddress);  
  73. channel.configureBlocking(false);  
  74. ByteBuffer _buffer = ByteBuffer.allocate(8);   
  75. IntBuffer _intBuffer = _buffer.asIntBuffer();  
  76. _buffer.clear();    
  77. _intBuffer.put(0, first);    
  78. _intBuffer.put(1, second);    
  79. channel.write(_buffer);   
  80. System.out.println("发送加法请求 " + first + "+" + second);  
  81. _buffer.clear();    
  82.     channel.read(_buffer);   
  83.     int result = _intBuffer.get(0);   
  84.     System.out.println("运算结果:"+result);  
  85. } catch (IOException e) {  
  86. e.printStackTrace();  
  87. }finally {    
  88.         if (channel != null) {    
  89.             try {    
  90.             channel.close();    
  91.             } catch (IOException e) {    
  92.             }    
  93.         }    
  94.     }      
  95.  }  
  96.  public static void main(String[] args) {  
  97. new NIOClient().start(3, 23);  
  98. }  
  99. }  

使用Channel(SocketChannel和ServerSocketChannel)和Selector。

Server端通常由一个thread来监听connect事件,另外多个thread来监听读写事件。这样做的好处是这些连接只有在真是请求的时候才会创建thread来处理,one request one thread。这种方式在server端需要支持大量连接但这些连接同时发送请求的峰值不会很多的时候十分有效。

(3)UDP/IP+BIO

DatagramSocket和DatagramPacket。DatagramSocket负责监听端口以及读写数据,DatagramPacket作为数据流对象进行传输。

UDP/IP是无连接的,无法进行双向通信,除非双方都成为UDP Server。

(4)UDP/IP+NIO

通过DatagramChannel和ByteBuffer实现。DatagramChannel负责端口监听及读写。ByteBuffer负责数据流传输。

如果要将消息发送到多台机器,如果为每个目标机器都建立一个连接的话,会有很大的网络流量压力。这时候可以使用基于UDP/IP的Multicast协议传输,Java中可以通过MulticastSocket和DatagramPacket来实现。

Multicast一般多用于多台机器的状态同步,比如JGroups。SRM, URGCP都是Multicast的实现方式。eBay就采用SRM来实现将数据从主数据库同步到各个搜索节点机器。