天天看點

Java 中的IO模型

• BIO:JDK1.4之前的阻塞IO

BIO,即为Blocking I/O,阻塞IO,大致流程为

1)服务端建立ServerSocket,以一个端口启动,2)等待客户端建立socket连接,如果没有连接,一直阻塞(等待),3)一个socket建立连接之后,从线程池中去一个线程取处理socket

对于这种模型的总结:

缺点:如果请求量过大,线程池不够用,那么会严重影响性能。

目前tomcat I/O模型默认还是BIO,对于连接不大的, 该模型代码编写简单,只需要关注线程内的连接即可

public class BlockingIOServer {
    public static void main(String[] args) throws IOException {
        int port = ;
        ExecutorService threadPool = Executors.newFixedThreadPool();
        ServerSocket server = new ServerSocket(port);

        while(true){
            Socket client = server.accept();

            //从线程池取线程处理client
            threadPool.execute(()->{
                try{
                    InputStream input = client.getInputStream();

                    //TODO read input
                    String req = null;
                    String res = "response:"+req;

                    //TODO response
                    client.getOutputStream().write(res.getBytes());

                }catch(IOException e){
                    e.printStackTrace();
                }finally {
                    try {
                        client.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

}
           

•NIO:JDK1.4及以后的版本非阻塞IO

即是Non Blocking I/O,非阻塞IO, jdk1.4之后提供了一套专门的api 专门操作非阻塞IO,接口以及类定义在java.nio包

NIO API由四个主要的部分组成:缓冲区(Buffers)、通道(Channels)、选择器(Selector)是其核心组成类。

NIO 的工作大致流程为:

• 1、通道注册一个监听到事件处理器

• 2、有事件发生时,事件处理器会通知相应的通道处理

public class NonBlockingIOServer {

    private  int BLOCK = ;
    private  ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
    private  ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);
    private  Selector selector;


    public NonBlockingIOServer(int port) throws IOException {
        //1.open  ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2.configureBlocking false
        serverSocketChannel.configureBlocking(false);
        //3.bind port
        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        //4.open  Selector
        selector = Selector.open();
        //5.serverSocketChannel register select
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server Start,port:"+port);
    }

    private void accept() throws IOException {
        while (true) {
            // 1.select,block
            selector.select();

            // 2.SelectionKey iterator
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                try {
                    doAccept(selectionKey);
                } catch (IOException e) {
                    selectionKey.cancel();
                    e.printStackTrace();
                }
            }
        }
    }

    private void doAccept(SelectionKey selectionKey)throws IOException{
        if (selectionKey.isAcceptable()) {
            // ServerSocketChannel 的 selectionKey
            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
            if(null == server){
                return;
            }

            //接受到此通道套接字的连接,block here
            SocketChannel client = server.accept(); 
            // 配置为非阻塞
            client.configureBlocking(false);   

            // 注册读到selector,等待读的selectionKey
            client.register(selector, SelectionKey.OP_READ);
        } else if (selectionKey.isReadable()) {
            // SocketChannel 的 selectionKey
            SocketChannel client = (SocketChannel) selectionKey.channel();

            receiveBuffer.clear();
            int count = client.read(receiveBuffer);    
            if (count > ) {
                String receiveText = new String( receiveBuffer.array(),,count);
                System.out.println(receiveText);
                //注册写到selector,等待读的selectionKey
                SelectionKey key = client.register(selector, SelectionKey.OP_WRITE);
                //这里可以作为设计框架的扩展之处
                key.attach(receiveText);
            }
        } else if (selectionKey.isWritable()) {
            // SocketChannel selectionKey
            SocketChannel client = (SocketChannel) selectionKey.channel();

            //取出read 的 attachment
            String request = (String) selectionKey.attachment();
            String sendText="response--" + request;

            sendBuffer.clear();
            sendBuffer.put(sendText.getBytes());
            sendBuffer.flip();

            //输出到通道
            client.write(sendBuffer);
            System.out.println(sendText);
            client.register(selector, SelectionKey.OP_READ);
        }
    }

    /**
     * [[@param](http://my.oschina.net/u/2303379)](http://my.oschina.net/u/2303379) args
     * [[@throws](http://my.oschina.net/throws)](http://my.oschina.net/throws) IOException
     */
    public static void main(String[] args) throws IOException {
        int port = ;
        NonBlockingIOServer server = new NonBlockingIOServer(port);
        server.accept();
    }
}
           

代码中的主要流程为:

•1、open ServerSocketChannel,configureBlocking false,bind host and port

•2、open Selector

•3、ServerSocketChannel register on Selector

•4、有客户端连接的事件发生,事件处理器通知ServerSocketChannel去处理

对这一模型的总结:

•NIO本身是基于事件驱动思想来完成的

•NIO基于Selector,当有感兴趣的事件发生时,就通知对应的事件处理器去处理事件,如果没有,则不处理。当socket有流可读或可写入socket时,操作系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操作系统。所以使用一个线程做轮询就可以了

•Buffer,也是NIO的一个新特性,可以块状的读/写数据,效率得到极大的提高。

• JDK1.7之后,AIO异步非阻塞Io

• ◦AIO,即是Asynchronous I/O,异步非阻塞I/O

◦JDK1.7之后,也叫作AIO,工作方式是异步非阻塞

AIO主要工作流程为:

◦客户端发起一个IO调用

◦服务端接受IO之后,异步回调接收成功后的IO,不会阻挡当前主流程,主流程继续接受下一个请求

public class AsynchronousIOServer {
    private static Charset charset = Charset.forName("UTF-8");

    public static void main(String[] args) {
        int port = ;

        int processors = Runtime.getRuntime().availableProcessors();
        ExecutorService threadPool = Executors.newFixedThreadPool(processors);

        try {
            AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);
            AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
            server.bind(new InetSocketAddress(port));

            doAccept(server);

            group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.out.println("close server");
            System.exit();
        }
    }

    private static void doAccept(AsynchronousServerSocketChannel server) {
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Void attachment) {
                server.accept(null, this);// accept next client connect

                doRead(client, attachment);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });

    }

    private static void doRead(AsynchronousSocketChannel client, Void attachment) {
        ByteBuffer buffer = ByteBuffer.allocate();

        client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (result <= ) {
                    try {
                        System.out.println("客户端断线:" + client.getRemoteAddress().toString());
                        attachment = null;
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return;
                }

                attachment.flip();
                String req = charset.decode(attachment).toString();
                attachment.compact();

                client.read(attachment, attachment, this);// next client read

                /** do service code **/
                System.out.println(req);

                ByteBuffer resBuffer = ByteBuffer.wrap(("response:" + req).getBytes());
                doWrite(client, resBuffer, resBuffer);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }

        });
    }

    private static <V> void doWrite(AsynchronousSocketChannel client, ByteBuffer resBuffer, ByteBuffer attachment) {
        client.write(attachment, attachment, new CompletionHandler<Integer, ByteBuffer>() {

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                // TODO write success

                if (result <= ) {
                    try {
                        System.out.println("客户端断线:" + client.getRemoteAddress().toString());
                        attachment = null;
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }

}
           

对于这一模型的总结:

• 主要流程为:

◦1、创建一个异步非阻塞服务端

◦2、服务端接受一个请求,异步回调接受成功后的IO请求,然后继续接受下一个请求

◦3、异步回调请求的IO,读取请求数据成功后,异步回调读取后的结果,然后继续读下面的数据,不会阻塞当前IO读

◦4、异步回调的读IO数据,然后同步处理数据,这里可能是计算逻辑,所以这里也是性能的瓶颈之处,如果是计算密集型,AIO模型不适用,处理完成之后,异步写数据到IO请求

特点:

AIO当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的。

•对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并异步回调通知应用程序;

对于这三种模型使用场景的一些总结:

• BIO方式适用于连接数量小,连接时间短,计算密集,代码编写直观,程序直观简单易理解,JDK1.4之前。

• NIO方式适用于连接数量大,连接时间短,比如Http服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

• AIO方式使用于连接数量大,连接时间长,IO密集型,比如聊天服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。