文章目录
- 阻塞IO与非阻塞IO的区别
-
- 同步、异步与阻塞、非阻塞
- 阻塞与非阻塞
-
- 什么是同步阻塞IO
- 什么是伪异步阻塞IO
- 什么是同步非阻塞IO
-
- NIO服务器的缺点
- 什么是异步非阻塞IO
- NIO客户端与服务端
-
- 选择器key
- NIO服务端
- NIO客户端
- Netty快速入门
-
- 什么是netty?
- netty的应用场景
- 为什么会有netty
- Netty案例实现
-
- 导入jar
- Netty服务端
- Netty客户端
- channelClosed
- channelDisconnected
阻塞IO与非阻塞IO的区别
同步、异步与阻塞、非阻塞
NIO有两个重要的组合:通道和缓冲区,通道是传输数据,缓冲区来存放数据。
IO流又称为BIO,Blocking IO,同步阻塞IO,
NIO(1.7) 同步、非阻塞IO
AIO(1.7后)异步、非阻塞IO
阻塞与非阻塞
什么是同步阻塞IO
原本的IO网络编程,服务器端没有收到客户端任何数据的时候就一直等待。serverSocket.accept()
这种就成为服务器的同步阻塞IO
什么是伪异步阻塞IO
如何解决阻塞问题?使用多线程(线程池),这种称为伪异步阻塞IO,请求来了交给多线程去处理,伪异步阻塞没有真正解决阻塞IO的核心问题,在接收客户端数据仍然是在阻塞状态,只不过是接收到数据后放到线程去处理。
多线程–伪异步
什么是同步非阻塞IO
NIO出现同步非阻塞IO,NIO的真正用途在网络通信。
NIO中有一个东西叫选择器,客户端不会直接与服务器端建立通道,而是和选择器建立通道,客户端可以注册多条通道到选择器,任何数据在选择器中已经准备好了,选择器会通知给服务器端,服务端采用循环去监听选择器中的事件KEY(轮询监听),这种就称为同步非阻塞IO
选择器:管理通道,在服务器创建选择器(选择器只有一个,不可能在客户端创建)。
NIO服务器的缺点
需要轮询监听选择器,如果选择器为空轮询时,会一直循环最终导致CPU100%
什么是异步非阻塞IO
AIO,不需要启动额外的IO线程,被动回调,
NIO客户端与服务端
选择器key
OP_CONNECT:可连接
OP_ACCEPT:可接受连接,服务器通道注册到选择器使用这个
OP_READ :可读
OP_WRITE:可写
NIO服务端
写起来有些复杂
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class NIOServer {
static int PORT = 8080;
public static void main(String[] args) throws IOException {
System.out.println("服务器端开启");
// 创建服务器端通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置成异步
serverChannel.configureBlocking(false);
// 绑定连接
serverChannel.bind(new InetSocketAddress(PORT));
// 获取选择器
Selector selector = Selector.open();
// 将通道注册到选择器,并且监听已接收到的数据
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮询获取已准备就绪的事件
while (selector.select() > 0) {
// 获取当前选择器有注册已监听到的事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
//获取准备就绪事件
SelectionKey skey = it.next();
// 判断事件准备就绪
if (skey.isAcceptable()) {
// 若可接受连接,获取客户连接
SocketChannel socketChannel = serverChannel.accept();
// 设置为异步模式
socketChannel.configureBlocking(false);
// 将该通道注册到服务器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (skey.isReadable()) {
// 若就绪,获取就绪状态的通道
SocketChannel socketChannel = (SocketChannel) skey.channel();
// 读取数据
ByteBuffer buff = ByteBuffer.allocate(NIOClient.BUFFER_SIZE);
int len = 0;
while ((len = socketChannel.read(buff)) > 0) {
buff.flip();
System.out.println(new String(buff.array(), 0, len));
buff.clear();
}
}
it.remove();
}
}
}
}
NIO客户端
public class NIOClient {
static String REMOTE_ADDRESS = "127.0.0.1";
static int BUFFER_SIZE = 1024;
public static void main(String[] args) throws IOException {
System.out.println("客户端启动");
// 创建Socket通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress(REMOTE_ADDRESS, NIOServer.PORT));
// 切换为异步非阻塞 (1.7
sChannel.configureBlocking(false);
// 指定缓冲区
ByteBuffer buff = ByteBuffer.allocate(BUFFER_SIZE);
// 存放数
buff.put(new Date().toString().getBytes());
// 切换读取模式
buff.flip();
// 通过通道传输
sChannel.write(buff);
// 收尾
buff.clear();
sChannel.close();
}
}
Netty快速入门
什么是netty?
netty是一个基于java NIO类库的异步通讯框架,他的架构特点是:异步非阻塞,基于事件驱动,高性能,高可靠,和高可定制性。
netty的应用场景
分布式框架中dubbo、zookeeper、RocketMQ底层RPC通讯使用的就是netty
游戏开发中,底层使用netty进行通讯
为什么会有netty
解决传统IO非阻塞问题
对NIO进行封装,简化代码
采用事件驱动
容错机制,可连接重试
Netty案例实现
导入jar
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.3.0.Final</version>
</dependency>
Netty服务端
AIO异步非阻塞特性,也就是说接收客户端数据时,创建单独的线程进行处理,互不影响
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class ServerHandler extends SimpleChannelHandler {
// 通道关闭时触发
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
System.out.println("channelClosed");
}
// 必须建立连接,关闭通道时进行触发
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
// 接收端出现异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
// 接收客户端数据,创建单独的线程进行处理,互不影响
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
System.out.println("messageReceived");
System.out.println("服务器端获取客户端发来参数:" + e.getMessage());
// 服务器回复信息
ctx.getChannel().write("你好~");
}
}
public class NettyServer {
static int PORT = 8080;
public static void main(String[] args) {
// 1.创建服务对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 2.创建2个线程池 一个监听端口号,一个nio监听
ExecutorService boos = Executors.newCachedThreadPool();
ExecutorService wook = Executors.newCachedThreadPool();
// 3.将线程池放入到工程
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, wook));
// 4.设置管道工程
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// 设置管道
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 传输数据的时候直接为string类型
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 设置事件监听类
pipeline.addLast("serverHandler", new ServerHandler());
return pipeline;
}
});
// 5.绑定端口号
serverBootstrap.bind(new InetSocketAddress(PORT));
System.out.println("NettyServer启动");
while (true){
Thread.sleep(1000);
System.out.println("每隔1秒打印");
}
}
}
Netty客户端
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class ClientHandler extends SimpleChannelHandler{
// 通道关闭时触发
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
System.out.println("channelClosed");
}
// 必须建立连接了,关闭通道时进行触发
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
// 接收端出现异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
// 接收客户端数据,创建单独的线程进行处理,互不影响
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
System.out.println("messageReceived");
System.out.println("服务器端向客户端回复内容:" + e.getMessage());
}
}
public class NettyClient {
static String REMOTE_ADDRESS = "127.0.0.1";
public static void main(String[] args) {
// 1.创建客户端对象
ClientBootstrap clientBootstrap = new ClientBootstrap();
// 2.创建2个线程池 一个监听端口号,一个nio监听
ExecutorService boos = Executors.newCachedThreadPool();
ExecutorService wook = Executors.newCachedThreadPool();
// 3.将线程池放入到工程
clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, wook));
// 4.设置管道工程
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// 设置管道
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 传输数据的时候直接为string类型
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 设置事件监听类
pipeline.addLast("clientHandler", new ClientHandler());
return pipeline;
}
});
// 5.连接服务器
ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress(REMOTE_ADDRESS,
NettyServer.PORT));
// 获取通道
Channel channel = connect.getChannel();
System.out.println("客户端start");
Scanner scanner = new Scanner(System.in);
while (true){
System.out.println("请输入内容");
channel.write(scanner.next());
}
}
}
channelClosed
只要是关闭连接,就调用channelClosed方法
channelDisconnected
调用这个channelDisconnected方法前提是已经建立了连接,然后再关闭连接,如果还没建立连接关闭是不会调用这个方法的