天天看点

闭关修炼(十三)Netty入门阻塞IO与非阻塞IO的区别NIO客户端与服务端Netty快速入门Netty案例实现

文章目录

  • 阻塞IO与非阻塞IO的区别
    • 同步、异步与阻塞、非阻塞
    • 阻塞与非阻塞
      • 什么是同步阻塞IO
      • 什么是伪异步阻塞IO
      • 什么是同步非阻塞IO
        • NIO服务器的缺点
      • 什么是异步非阻塞IO
  • NIO客户端与服务端
    • 选择器key
    • NIO服务端
    • NIO客户端
  • Netty快速入门
    • 什么是netty?
    • netty的应用场景
    • 为什么会有netty
  • Netty案例实现
    • 导入jar
    • Netty服务端
    • Netty客户端
    • channelClosed
    • channelDisconnected

阻塞IO与非阻塞IO的区别

同步、异步与阻塞、非阻塞

NIO有两个重要的组合:通道和缓冲区,通道是传输数据,缓冲区来存放数据。

IO流又称为BIO,Blocking IO,同步阻塞IO,

NIO(1.7) 同步、非阻塞IO

AIO(1.7后)异步、非阻塞IO

阻塞与非阻塞

什么是同步阻塞IO

原本的IO网络编程,服务器端没有收到客户端任何数据的时候就一直等待。serverSocket.accept()

这种就成为服务器的同步阻塞IO

什么是伪异步阻塞IO

如何解决阻塞问题?使用多线程(线程池),这种称为伪异步阻塞IO,请求来了交给多线程去处理,伪异步阻塞没有真正解决阻塞IO的核心问题,在接收客户端数据仍然是在阻塞状态,只不过是接收到数据后放到线程去处理。

多线程–伪异步

什么是同步非阻塞IO

NIO出现同步非阻塞IO,NIO的真正用途在网络通信。

NIO中有一个东西叫选择器,客户端不会直接与服务器端建立通道,而是和选择器建立通道,客户端可以注册多条通道到选择器,任何数据在选择器中已经准备好了,选择器会通知给服务器端,服务端采用循环去监听选择器中的事件KEY(轮询监听),这种就称为同步非阻塞IO

选择器:管理通道,在服务器创建选择器(选择器只有一个,不可能在客户端创建)。

NIO服务器的缺点

需要轮询监听选择器,如果选择器为空轮询时,会一直循环最终导致CPU100%

什么是异步非阻塞IO

AIO,不需要启动额外的IO线程,被动回调,

NIO客户端与服务端

选择器key

OP_CONNECT:可连接

OP_ACCEPT:可接受连接,服务器通道注册到选择器使用这个

OP_READ :可读

OP_WRITE:可写

NIO服务端

写起来有些复杂

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class NIOServer {
    static int PORT = 8080;

    public static void main(String[] args) throws IOException {
        System.out.println("服务器端开启");
        // 创建服务器端通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置成异步
        serverChannel.configureBlocking(false);
        // 绑定连接
        serverChannel.bind(new InetSocketAddress(PORT));
        // 获取选择器
        Selector selector = Selector.open();
        // 将通道注册到选择器,并且监听已接收到的数据
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 轮询获取已准备就绪的事件
        while (selector.select() > 0) {
            // 获取当前选择器有注册已监听到的事件
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                //获取准备就绪事件
                SelectionKey skey = it.next();
                // 判断事件准备就绪
                if (skey.isAcceptable()) {
                    // 若可接受连接,获取客户连接
                    SocketChannel socketChannel = serverChannel.accept();
                    // 设置为异步模式
                    socketChannel.configureBlocking(false);
                    // 将该通道注册到服务器上
                    socketChannel.register(selector, SelectionKey.OP_READ);

                } else if (skey.isReadable()) {
                    // 若就绪,获取就绪状态的通道
                    SocketChannel socketChannel = (SocketChannel) skey.channel();
                    // 读取数据
                    ByteBuffer buff = ByteBuffer.allocate(NIOClient.BUFFER_SIZE);
                    int len = 0;
                    while ((len = socketChannel.read(buff)) > 0) {
                        buff.flip();
                        System.out.println(new String(buff.array(), 0, len));
                        buff.clear();
                    }
                }
                it.remove();
            }
        }
    }
}
           

NIO客户端

public class NIOClient {
    static String REMOTE_ADDRESS = "127.0.0.1";
    static int BUFFER_SIZE = 1024;

    public static void main(String[] args) throws IOException {
        System.out.println("客户端启动");
        // 创建Socket通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress(REMOTE_ADDRESS, NIOServer.PORT));
        // 切换为异步非阻塞 (1.7
        sChannel.configureBlocking(false);
        // 指定缓冲区
        ByteBuffer buff = ByteBuffer.allocate(BUFFER_SIZE);
        // 存放数
        buff.put(new Date().toString().getBytes());
        // 切换读取模式
        buff.flip();
        // 通过通道传输
        sChannel.write(buff);
        // 收尾
        buff.clear();
        sChannel.close();

    }
}
           

Netty快速入门

什么是netty?

netty是一个基于java NIO类库的异步通讯框架,他的架构特点是:异步非阻塞,基于事件驱动,高性能,高可靠,和高可定制性。

netty的应用场景

分布式框架中dubbo、zookeeper、RocketMQ底层RPC通讯使用的就是netty

游戏开发中,底层使用netty进行通讯

为什么会有netty

解决传统IO非阻塞问题

对NIO进行封装,简化代码

采用事件驱动

容错机制,可连接重试

Netty案例实现

导入jar

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty</artifactId>
            <version>3.3.0.Final</version>
        </dependency>
           

Netty服务端

AIO异步非阻塞特性,也就是说接收客户端数据时,创建单独的线程进行处理,互不影响

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


class ServerHandler extends SimpleChannelHandler {
    // 通道关闭时触发
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
        System.out.println("channelClosed");

    }

    // 必须建立连接,关闭通道时进行触发
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("channelDisconnected");

    }

    // 接收端出现异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
        System.out.println("exceptionCaught");
    }

    // 接收客户端数据,创建单独的线程进行处理,互不影响
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.messageReceived(ctx, e);
        System.out.println("messageReceived");
        System.out.println("服务器端获取客户端发来参数:" + e.getMessage());
        // 服务器回复信息
        ctx.getChannel().write("你好~");
    }
}

public class NettyServer {
    static int PORT = 8080;
    public static void main(String[] args) {
        // 1.创建服务对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 2.创建2个线程池 一个监听端口号,一个nio监听
        ExecutorService boos = Executors.newCachedThreadPool();
        ExecutorService wook = Executors.newCachedThreadPool();
        // 3.将线程池放入到工程
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, wook));
        // 4.设置管道工程
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            // 设置管道
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                // 传输数据的时候直接为string类型
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                // 设置事件监听类
                pipeline.addLast("serverHandler", new ServerHandler());
                return pipeline;
            }
        });
        // 5.绑定端口号
        serverBootstrap.bind(new InetSocketAddress(PORT));
        System.out.println("NettyServer启动");
        while (true){
            Thread.sleep(1000);
            System.out.println("每隔1秒打印");
        }
    }
}

           

Netty客户端

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class ClientHandler extends SimpleChannelHandler{
    // 通道关闭时触发
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
        System.out.println("channelClosed");

    }

    // 必须建立连接了,关闭通道时进行触发
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("channelDisconnected");

    }

    // 接收端出现异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
        System.out.println("exceptionCaught");
    }

    // 接收客户端数据,创建单独的线程进行处理,互不影响
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.messageReceived(ctx, e);
        System.out.println("messageReceived");
        System.out.println("服务器端向客户端回复内容:" + e.getMessage());
    }
}
public class NettyClient {
    static String REMOTE_ADDRESS = "127.0.0.1";

    public static void main(String[] args) {
        // 1.创建客户端对象
        ClientBootstrap clientBootstrap = new ClientBootstrap();
        // 2.创建2个线程池 一个监听端口号,一个nio监听
        ExecutorService boos = Executors.newCachedThreadPool();
        ExecutorService wook = Executors.newCachedThreadPool();
        // 3.将线程池放入到工程
        clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, wook));
        // 4.设置管道工程
        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            // 设置管道
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                // 传输数据的时候直接为string类型
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                // 设置事件监听类
                pipeline.addLast("clientHandler", new ClientHandler());
                return pipeline;
            }
        });
        // 5.连接服务器
        ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress(REMOTE_ADDRESS,
                NettyServer.PORT));
        // 获取通道
        Channel channel = connect.getChannel();
        System.out.println("客户端start");
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("请输入内容");
            channel.write(scanner.next());
        }
    }


}

           

channelClosed

只要是关闭连接,就调用channelClosed方法

channelDisconnected

调用这个channelDisconnected方法前提是已经建立了连接,然后再关闭连接,如果还没建立连接关闭是不会调用这个方法的