天天看点

java网络编程(三)----同步非阻塞nio及reactor模型

很多刚接触NIO的人,第一眼看到的就是Java相对晦涩的API,比如:Channel,Selector,Socket什么的;然后就是一坨上百行的代码来演示NIO的服务端Demo,所以这里我们人性化地简单介绍一下。

NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。

NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。

新增的着两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。

对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。

java nio基础实现

  1. 缓冲区 Buffer

    Buffer是一个对象,包含一些要写入或者读出的数据。

    在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

    缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。

    具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。

  2. 通道 Channel

    我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。

    底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。

    Channel主要分两大类:

    • SelectableChannel:用户网络读写
    • FileChannel:用于文件操作
    后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。
  3. 多路复用器 Selector

    Selector是Java NIO 编程的基础。

    Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

    一个Selector可以同时轮询多个Channel,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

nio使用

回忆BIO模型,之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能”傻等”,即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。

NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。

思考一下在Socket网络通讯中有什么事件:

  1. 在服务器端我们需要接收客户端连接,这里就有一个接收“Accept”事件;
  2. 而客户端连接服务器,连接有一个“Connect”事件
  3. 各自进行读操作时,有一个“read”事件
  4. 各自进行写操作时,有一个“write”事件

而上面我们说到连接被抽象为了Channel,这时我们就可以在多路复用器Selector上注册通道和事件。

  1. 在服务器端:ServerSocketChannel.register(Selector, SelectionKey.OP_ACCEPT);

    将ServerSocketChannel注册在Selector并绑定SelectionKey.OP_ACCEPT事件;

  2. 调用多路复用器Selector.select();
  3. 如果channel已经发生了接收客户端的事件,那么就能被多路复用器select到,然后获取到和客户端连接的Socket,再将这个socket的读写事件注册到Selector上。
  4. 同样可以客户端也是将其他事件注册到Selector,然后事件被触发后就可以被select()函数找到
  5. 最后程序不断轮询selector,根据select到的不同事件类型调用对应的handler进行处理。
select()函数调用的是系统底层函数,在Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP。

伪代码如下:

interface ChannelHandler{
      void channelReadable(Channel channel);
      void channelWritable(Channel channel);
   }
   class Channel{
     Socket socket;
     Event event;//读,写或者连接
   }

   //IO线程主循环:
   class IoThread extends Thread{
   public void run(){
   Channel channel;
   //选择就绪的事件和对应的连接
   while(channel=Selector.select()){
      if(channel.event==accept){
          //触发了accept事件的是新连接,
          //我们需要为这个新连接注册读写事件
         registerNewChannelHandler(channel);
      }
      if(channel.event==write){
          //如果可以写,则执行写事件
         getChannelHandler(channel).channelWritable(channel);
      }
      if(channel.event==read){
      //如果可以读,则执行读事件
          getChannelHandler(channel).channelReadable(channel);
      }
    }
   }
   //所有channel的对应事件处理器
   Map<Channel,ChannelHandler> handlerMap;
  }
           

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以你可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。同时我们也可以通过设置周期的方式调用Selector.select(1000)(每一秒select一次)。

selector里面有一个装SelectionKey的集合,记录着各个channel及其感兴趣的事件,详情请点击:

http://blog.csdn.net/u010412719/article/details/52863704

参考了美团点评技术团队的文章:https://tech.meituan.com/nio.html

(没我总结的好啊哈哈哈所以看我的博客就行)

实现nio的完整代码在最后

nio与bio对比

所有的系统I/O都分为两个阶段:等待就绪和操作。举例来说,读函数,分为等待系统可读和真正的读;同理,写函数分为等待网卡可以写和真正的写。

需要说明的是等待就绪的阻塞是不使用CPU的,是在“空等”;而真正的读写操作的阻塞是使用CPU的,真正在”干活”,而且这个过程非常快,属于memory copy,带宽通常在1GB/s级别以上,可以理解为基本不耗时。

下图是几种常见I/O模型的对比:

java网络编程(三)----同步非阻塞nio及reactor模型

java的nio便是第二或者第三种形式:

  • 第二种:在一个while循环里面不断每隔一秒检查selector里面是否有事件被触发Selector.select(1000)
  • 第三种:Selector.select()会检查是是否有就绪事件,没有的话会阻塞直到有就绪事件函数才会返回。

Reactor

有一篇文章写得很好:http://www.blogjava.net/DLevin/archive/2015/09/02/427045.html

你们可以看看,我这里再强调一下重点。

java网络编程(三)----同步非阻塞nio及reactor模型

图解:

1. Handle:在网络编程中,这里一般指Socket Handle,即一个网络连接(Connection,在Java NIO中的Channel)。这个Channel注册到Synchronous Event Demultiplexer中,以监听Handle中发生的事件,对ServerSocketChannnel可以是CONNECT事件,对SocketChannel可以是READ、WRITE、CLOSE事件等。

2. Synchronous Event Demultiplexer:多路复用器,阻塞等待一系列的Handle中的事件到来,如果阻塞等待返回,即表示在返回的Handle中可以不阻塞的执行返回的事件类型。这个模块一般使用操作系统的select来实现。在Java NIO中用Selector来封装,当Selector.select()返回时,可以调用Selector的selectedKeys()方法获取Set,一个SelectionKey表达一个有事件发生的Channel以及该Channel上的事件类型。

3. Initiation Dispatcher:Reactor模式的主要模块,事件调度器,通常被称为reactor,用于管理Event Handler,即EventHandler的容器,用以注册、移除EventHandler等;另外,它还作为Reactor模式的入口调用,在这个模块里面调用Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,当阻塞等待返回时,根据事件发生的Handle将其分发给对应的Event Handler处理,即回调EventHandler中的handle_event()方法。

4. 定义事件处理方法:handle_event(),以供InitiationDispatcher回调使用。

需要注意的是:在java nio中我们注册了建立连接相关的“Accept”“Connect”事件后,再进行读写操作要将读写事件再注册到多路复用器,下次读写事件发生时才会被select到。这就是传说中的多路复用IO。

另外Reactor翻译为“反应”器,名字中”反应“的由来:

“反应”即“倒置”,“控制逆转”

具体事件处理程序不调用反应器,而是由反应器分配一个具体事件处理程序,具体事件处理程序对某个指定的事件发生做出反应;这种控制逆转又称为“好莱坞法则”(不要调用我,让我来调用你)

nio实现代码

一、server端代码

public class Server {
    private static synchronized void start(int port){
        ServerHandle serverHandle = new ServerHandle(port);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args){
        start();
    }
}
           

server处理类ServerHandle:

class ServerHandle implements Runnable {
    private Selector m_Selector;
    private ServerSocketChannel m_ServerChannel;
    private volatile boolean m_Started;
    public ServerHandle(int vPort) {
        try {
            m_Selector = Selector.open();
            //服务器监听通道
            ServerSocketChannel tServerChannel = ServerSocketChannel.open();
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
            tServerChannel.configureBlocking(false);
            tServerChannel.bind(new InetSocketAddress(vPort), );
            //注册服务器通道到selector,监听ACCEPT事件
            tServerChannel.register(m_Selector, SelectionKey.OP_ACCEPT);
            m_Started = true;
            System.out.println("服务器已启动,端口号:" + vPort);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void stop() {
        m_Started = false;
    }

    @Override
    public void run() {
        while (m_Started) {
            try {
                //无论是否有读写事件发生,selector在1s后被唤醒一次
                m_Selector.select();
                //或者通过下面这种方式:
                //会阻塞直到selector里面有就绪事件
                //m_Selector.select();
                Set<SelectionKey> tKeys = m_Selector.selectedKeys();
                Iterator<SelectionKey> tIterator = tKeys.iterator();
                SelectionKey tKey;
                //轮询
                while (tIterator.hasNext()) {
                    tKey = tIterator.next();
                    tIterator.remove();
                    try {
                        handleInput(tKey);
                    } catch (IOException e) {
                        if (null != tKey) {
                            tKey.cancel();
                            if (null == tKey.channel()) {
                                tKey.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
        //完成后关闭select
        //关闭Selector会自动关闭里面的资源
        if (null != m_Selector) {
            try {
                m_Selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey vKey) throws IOException {
        if (vKey.isValid()) {
            if (vKey.isAcceptable()) {
                ServerSocketChannel tServerChannel = (ServerSocketChannel) vKey.channel();
                SocketChannel tSocketChannel = tServerChannel.accept();
                tSocketChannel.configureBlocking(false);
                //拿到新连接注册读事件
                tSocketChannel.register(m_Selector, SelectionKey.OP_READ);
            }
        }
        //读消息
        if (vKey.isReadable()) {
            SocketChannel tSocketChannel = (SocketChannel) vKey.channel();
            //得用buffer
            ByteBuffer tBuffer = ByteBuffer.allocate();
            int tReadBytes = tSocketChannel.read(tBuffer);
            if ( < tReadBytes) {
                tBuffer.flip();
                byte[] tBytes = new byte[tBuffer.remaining()];
                tBuffer.get(tBytes);
                String tExpression = new String(tBytes, "UTF-8");
                System.out.println("服务器收到消息:" + tExpression);
                String tResult = "这句话是服务器发过来的";
                doWrite(tSocketChannel, tResult);
            }
            //读取不到消息关闭资源
            else if ( > tReadBytes) {
               //即使key被cancel了他的channel还在的
                vKey.cancel();
                tSocketChannel.close();
            }
        }
    }

//发送数据
    private void doWrite(SocketChannel vChannel, String vResponse) throws IOException {
        byte[] tBytes = vResponse.getBytes("UTF-8");
        ByteBuffer tWriteBuffer = ByteBuffer.allocate(tBytes.length);
        tWriteBuffer.put(tBytes);
        tWriteBuffer.flip();
        vChannel.write(tWriteBuffer);
    }
}
           

二、客户端代码:

public class Client {
    private static String DEFAULT_HOST = "127.0.0.1";
    private static int DEFAULT_PORT = ;
    private static ClientHandle clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(null != clientHandle) clientHandle.stop();
        clientHandle = new ClientHandle(ip,port);
        new Thread(clientHandle,"Server").start();
    }
    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{
        clientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {
        start();
        while(Client.sendMsg(new Scanner(System.in).nextLine()));
    }
}
           

客户端处理类ServerHandle:

class ClientHandle implements Runnable {
    private String m_Host;
    private int m_Port;
    private Selector m_Selector;
    private SocketChannel m_SocketChannel;
    private volatile boolean m_Started;

    ClientHandle(String vIp, int vPort) {
        m_Host = vIp;
        m_Port = vPort;

        try {
            m_Selector = Selector.open();
            m_SocketChannel = SocketChannel.open();
            m_SocketChannel.configureBlocking(false);
            m_Started = true;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    void stop() {
        m_Started = false;
    }


    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            System.out.println("客户端连接失败");
        }
        while (m_Started) {
            try {
                m_Selector.select();
                Set<SelectionKey> tKeys = m_Selector.selectedKeys();
                Iterator<SelectionKey> tIterator = tKeys.iterator();
                SelectionKey tKey;
                while (tIterator.hasNext()) {
                    tKey = tIterator.next();
                    tIterator.remove();
                    try {
                        handleInput(tKey);
                    } catch (IOException e) {
                        if (null != tKey) {
                            tKey.cancel();
                            if (null != tKey.channel()) {
                                tKey.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
        //关闭selector会关闭它里面的所有资源
        if (null != m_Selector) {
            try {
                m_Selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private void handleInput(SelectionKey vKey) throws IOException {
        if (vKey.isValid()) {
            SocketChannel socketChannel = (SocketChannel) vKey.channel();
            //如果是连接事件
            if (vKey.isConnectable()) {
                if (socketChannel.finishConnect())
                    System.out.println("客户端连接上服务器端");
                else System.exit();
            }
            //读消息
            if (vKey.isReadable()) {
                //创建ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate();
                //读取请求码流,返回读取到的字节数
                int readBytes = socketChannel.read(buffer);
                //读取到字节,对字节进行编解码
                if ( < readBytes) {
                    //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes, "UTF-8");
                    System.out.println("客户端收到消息:" + result);
                }
                //链路已经关闭,释放资源
                else if ( > readBytes) {
                    vKey.cancel();
                    socketChannel.close();
                }
            }

        }

    }

    private void doWrite(SocketChannel channel, String request) throws IOException {
        //将消息编码为字节数组
        byte[] bytes = request.getBytes();
        //根据数组容量创建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //发送缓冲区的字节数组
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException {
        //客户端请求连接上服务器,并注册连接事件
        m_SocketChannel.connect(new InetSocketAddress(m_Host, m_Port));
        m_SocketChannel.register(m_Selector, SelectionKey.OP_CONNECT);

    }

    void sendMsg(String msg) throws Exception {
        //发送了数据的话就注册读事件
        m_SocketChannel.register(m_Selector, SelectionKey.OP_READ);
        doWrite(m_SocketChannel, msg);
    }
}