javaIO编程.
前言
同步/异步:
同步和异步关注的是消息通信机制,通常用来形容方法的调用,同步调用时需等待方法返回后才能继续运行;异步调用时,调用者可以继续后续的运行,异步方法通常会在另外一个线程中真实地执行,如果调用者需要返回结果,异步方法执行完成时,会通知调用者。(去饭店吃饭/订外卖)
阻塞/非阻塞:
阻塞和非阻塞关注的是对资源的需求,通常用来形容多线程之间的互相影响,比如一个线程占用了一个临界区资源(同时只能被一个线程使用),那么其他需要这个资源的线程则处于等待或者阻塞状态。(食堂打饭排队/自助食堂)
- BIO是同步,阻塞的IO
- NIO是同步,非阻塞的IO
- AIO是异步, 非阻塞的IO
随着java版本的不断升级与迭代,java的IO模型开始得到改变。从原始的BIO,到1.4以后发布的NIO,再到对NIO进行的改进AIO分别对IO模型做了优化,那么性能就自然不用多说了,肯定是依次得到了提升的。
BIO
1.1 线程模型
package com.atChina.chat.demo04;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
public class MyHttp {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(9999);
while (true) {
Socket client = server.accept();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
byte[] bts = new byte[20480];
int len = client.getInputStream().read(bts);
String requestInfo = new String(bts, 0, len).trim();
System.out.println(requestInfo);
// 响应体
StringBuilder responseContent = new StringBuilder();
responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");
// 响应头
StringBuilder response = new StringBuilder();
// HTTP协议版本, 状态代码, 描述
response.append("HTTP/1.1 200 OK\r\n");
response.append("Content-type:text/html;charset=utf-8\r\n");
response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
response.append("\r\n");
response.append(responseContent);
bw.write(response.toString());
bw.flush();
bw.close();
}
}
}
package com.atChina.chat.demo04;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
public class MyHttpBio {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(9999);
while (true) {
Socket client = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
try {
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
byte[] bts = new byte[20480];
int len = client.getInputStream().read(bts);
String requestInfo = new String(bts, 0, len).trim();
System.out.println(requestInfo);
// 响应体
StringBuilder responseContent = new StringBuilder();
responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");
// 响应头
StringBuilder response = new StringBuilder();
// HTTP协议版本, 状态代码, 描述
response.append("HTTP/1.1 200 OK\r\n");
response.append("Content-type:text/html;charset=utf-8\r\n");
response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
response.append("\r\n");
response.append(responseContent);
bw.write(response.toString());
bw.flush();
bw.close();
}catch (Exception exception){
}
}
}).start();
}
}
}
BIO 的缺点server.accept() 组塞等待连接。client.getInputStream().read(bts); 在客户端没有数据输入的时候会阻塞等待客户端的传入数据,同步阻塞导致并发能力低下,不能处理高并发的场景。
NIO
2.1 什么是NIO
- 1.在 JDK 1. 4 中 新 加入 了 NIO( New Input/ Output) 类, 引入了一种基于通道和缓冲区的 I/O 方式,它可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆DirectByteBuffer 对象作为这块内存的引用进行操作,避免了在 Java 堆和 Native 堆中来回复制数据。
- 2.NIO 是一种同步非阻塞的 IO 模型,所以也可以叫NON-BLOCKING IO。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写到缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。
2.2 NIO的特点
- 1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
- 2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
- 3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换(其实单线程也是能够处理,但为了效率,还是建议多线程辅助)。
2.3 NIO的工作模型
public class MyHttpNio1 {
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
List<SocketChannel> socketChannelList=new ArrayList<>();
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannelList.add(socketChannel);
continue;
}
Iterator<SocketChannel> iterator = socketChannelList.iterator();
while(iterator.hasNext()){
SocketChannel channel = iterator.next();
//5.获取客户端传递过来的数据,并且把数据放在byteBuffer这个缓冲区中
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//返回值:
//正数:表示本次读到的有效字节的个数
//0 :表示本次没有读到有效字节
//-1 :表示读到了末尾
int read = channel.read(byteBuffer);
System.out.println("客户端消息:" +
new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
// 响应体
StringBuilder responseContent = new StringBuilder();
responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");
// 响应头
StringBuilder response = new StringBuilder();
// HTTP协议版本, 状态代码, 描述
response.append("HTTP/1.1 200 OK\r\n");
response.append("Content-type:text/html;charset=utf-8\r\n");
response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
response.append("\r\n");
response.append(responseContent);
//6.给客户端回写数据
channel.write(ByteBuffer.wrap(response.toString().getBytes(StandardCharsets.UTF_8)));
//7.释放资源
channel.close();
iterator.remove();
}
}
}
}
public class MyHttpNio2 {
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()) { //处理连接事件
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false); //设置为非阻塞
System.out.println("client:" + socketChannel.getLocalAddress() + " is connect");
socketChannel.register(selector, SelectionKey.OP_READ); //注册客户端读取事件到selector
} else if (key.isReadable()) { //处理读取事件
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(byteBuffer);
System.out.println("client:" + channel.getLocalAddress() + " send " + new String(byteBuffer.array()));
channel.register(selector, SelectionKey.OP_WRITE); //注册客户端读取事件到selector
} else if(key.isWritable()){
SocketChannel channel = (SocketChannel)key.channel();
//5.获取客户端传递过来的数据,并且把数据放在byteBuffer这个缓冲区中
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//返回值:
//正数:表示本次读到的有效字节的个数
//0 :表示本次没有读到有效字节
//-1 :表示读到了末尾
int read = channel.read(byteBuffer);
System.out.println("客户端消息:" +
new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
// 响应体
StringBuilder responseContent = new StringBuilder();
responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");
// 响应头
StringBuilder response = new StringBuilder();
// HTTP协议版本, 状态代码, 描述
response.append("HTTP/1.1 200 OK\r\n");
response.append("Content-type:text/html;charset=utf-8\r\n");
response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
response.append("\r\n");
response.append(responseContent);
//6.给客户端回写数据
channel.write(ByteBuffer.wrap(response.toString().getBytes(StandardCharsets.UTF_8)));
channel.close();
}
iterator.remove(); //事件处理完毕,要记得清除
}
}
}
}
2.4 传统IO与NIO的对比
5.1 数据传输方式
IO是面向流的,NIO是面向缓冲区的。
Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。
5.2 阻塞与非阻塞
Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
2.5 举例说明IO与NIO
6.1 IO工作模型
老王干了多年程序员工作后,感觉心力交瘁,也没发财, 于是准备创业干点事情。于是拿出多年积蓄在某城市开了一家100桌大小的火锅店。因为多年的行业浸淫,老王坚信以客户为中心的思想不动摇。为了做到VIP一般的服务,老王招了50多名服务员,同时还在招了50多名兼职服务员,以备客人多时用。为什么招这么多呢?因为老王要求每桌标配一名服务员进行服务,如果50名正式员工不够用,就启用兼职员工。一个多月下来,老王的生意也很火爆,大家觉得服务也还不错。可是老王月底算账时,发现发完扣除房租,水电,食材,人工等乱七八糟的成本后,自己并没赚到大钱,当然,小钱还是赚到了的,但相对投入这么大成本来说,收益率实在不够理想。于是,老王陷入了沉思...
6.2 NIO工作模型
老王通过观察,回想自己火锅店的状况,发现了一个重要问题?就是大部分客人吃饭的时候并不是很需要有人在旁边服务,客人主要是点菜,加菜,加水,结账时需要,而目前的服务模式会出现一些服务员在门口闲着的状态,有的甚至在旁边玩起手机...虽然客人随叫随到,但是好像资源利用率并不怎么高。
于是老王第二天召集主要人员开会,说:我们的生意很火爆,有时顾客还需排队,我想改进一下一下店里的服务模式。第一呢,我想扩大一下规模,扩大一倍,把桌子加到200桌。第二呢,我想兼职员工还是不要了,毕竟呼来唤去的很麻烦。这时会议中一人说到,那我们岂不是还得招个一百来人?老王卖了下关子,咳了一声,道:那倒不用,我们还是保持我们现在的人员配置,但是呢,我们要改变服务模式,我们毕竟是平民火锅,但是提供的却是VIP式的服务,那我们还是回归平民模式吧,但我们尽量保证服务质量。那老王的方法是什么呢?
其实也不是什么很牛逼的方法,现在来说很普通,安排个专门接待的人员,来客人时给
他一份菜单并安排一个桌位,告诉他有什么需要请按桌上的呼叫按钮(现在可以直接微信扫一扫点单付款,更方便),并记下他的相关信息。然后收到呼叫就派出空闲的服务员过去服务,服务完了就回来领取新的任务,当然分配任务可通过对讲机完成。这样就避免了大量人力资源的浪费,至少看起来现在服务员比以前“充实”多了,服务员要是觉得太辛苦,我们可以加薪来提高积极性嘛。
就这样持续了几个月,老王火锅店盈利比之前翻了3翻,脸上的笑容也多了起来,对待员工也变得更加和蔼。关键是,老王在年终大会上做了个大胆的决定,去其他城市再开几家分店,并承诺全员加薪30%,大家欢呼雀跃!!!(都是我编的,tmd快编不下去了!)
本故事纯属虚构,如有雷同,那就是抄的!
此模式适用于服务时间短暂频繁的需求,对于复杂耗时,不建议使用,例如:你要开洗脚城按摩店那就行不通了!毕竟洗脚按摩需要人长时间服务的。
Netty
3.1什么是Netty
“Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.”
由于NIO通信的复杂性,推动了Netty的产生,Netty是一个异步、事件驱动(asynchronous event-driven)的用以开发高性能、高可靠性协议服务器和客户端的网络应用框架。
换句话说,Netty是一个NIO服务器-客户端框架,它在JAVA NIO的基础上进行封装并提供出一套便于用户开发的API,使用它可以快速简单地开发网络应用程序,比如协议服务器和客户端。
3.2 netty的基本组件
- Bootstrap
一个Netty应用通常由一个Bootstrap开始,它的主要作用是配置整个Netty程序,串联起各个组件。Bootstrap有两种类型,ServerBootstrap和Bootstrap,分别用于Server端和Client端
- Channel
代表一个Socket连接,或者其他和I/O相关的组件
- EventLoop
为Channel处理I/O操作,一个EventLoop可以为多个Channel服务,可理解为一个线程
- EventLoopGroup
一个EventLoopGroup包含多个EventLoop,可以将其理解为一个线程池
- Handler
用来处理各种事件,比如连接、数据接收、异常、数据转换、编解码等。按照输入输出来分,分为ChannelInboundHandler和ChannelOutboundHandler。前者对从客户端发往服务器的报文进行处理,一般用来执行编码、读数据、业务处理等;后者处理服务器发往客户端的数据,如编码
- ChannelPipeline
ChannelHandler的容器。每个Channel会绑定一个ChannelPipeline,用于处理该Channel上产生的事件
- ChannelFuture
Netty中所有的I/O操作都是异步的,所以不能立刻得知消息的处理结果。因此,我们需要通过ChannelFuture注册一个监听,当操作执行成功或失败时进行一些处理
3.3 Netty线程模型
3.4工作流程
当一个连接到达时,Netty会产生一个Channel,然后从EventLoopGroup中选出一个EventLoop将该Channel注册在其上,标明感兴趣的I/O事件,如:OP_READ。EventLoop对Channel的监听即为上面讲到的I/O多路复用技术(selector)
当某个Channel上有数据到来时,EventLoop会选出对应的Channel,读取数据,并且触发网络事件交由ChannelPipeline处理。ChannelPipeline中是一系列的Handler,数据在ChannelPipeline中流动,并经由一个个Handler处理,每个Handler处理完成后将数据交给下一个Handler处理
Netty为我们提供了一些ChannelHandlerAdapter,我们可以通过使用它们简化Handler的编写,从而可以将注意力集中在具体的业务逻辑上。
3.5 Netty编解码
由于网络传输只能传输字节流,因此在发送数据前需要把message类型的数据转化为bytes这就是编码的过程。相对应的,在接收到数据后,我们需要把bytes解析成message。这就是解码过程
3.6 Netty心跳
心跳就是Netty检查连接是否关闭的能力,它的作用就是不断的在制定时间内对channel进行监测,把那种长时间没有连接的channel进行关闭。
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//对连接作心跳检测
pipeline.addLast(new IdleStateHandler(5, 5, 10));
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new HHandler());
}
});
HHandler extends SimpleChannelInboundHandler<String> 继承自这个类然后重写下面的trigger方法,实现心跳检查。
/**
* 通过心跳检查来检测长时间不工作的 channel 并且对它进行关闭
*/
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event=(IdleStateEvent)evt;
//如果是心跳检测事件
if(event.state()==IdleState.ALL_IDLE){
ChannelFuture writeAndFlush = ctx.writeAndFlush(" you will be close");
writeAndFlush.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture paramF) throws Exception {
ctx.channel().close();
}
});
}
}else{
super.userEventTriggered(ctx, evt);
}
}
服务端代码
package com.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* netty5服务端
*
*/
public class Server {
public static void main(String[] args) {
//服务类
ServerBootstrap bootstrap = new ServerBootstrap();
//boss和worker
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
//设置线程池
bootstrap.group(boss, worker);
//设置socket工厂、
bootstrap.channel(NioServerSocketChannel.class);
//设置管道工厂
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
//netty3中对应设置如下
//bootstrap.setOption("backlog", 1024);
//bootstrap.setOption("tcpNoDelay", true);
//bootstrap.setOption("keepAlive", true);
//设置参数,TCP参数
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
//绑定端口
ChannelFuture future = bootstrap.bind(8121);
System.out.println("start");
//等待服务端关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally{
//释放资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
package com.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 服务端消息处理
*
*/
public class ServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
ctx.channel().writeAndFlush("hi");
ctx.writeAndFlush("hi");
}
/**
* 新客户端接入
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
/**
* 客户端断开
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
/**
* 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
客户端代码
package com.client;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* netty5的客户端
*
*/
public class Client {
public static void main(String[] args) {
//服务类
Bootstrap bootstrap = new Bootstrap();
//worker
EventLoopGroup worker = new NioEventLoopGroup();
try {
//设置线程池
bootstrap.group(worker);
//设置socket工厂、
bootstrap.channel(NioSocketChannel.class);
//设置管道
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture connect = bootstrap.connect("127.0.0.1", 8121);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while(true){
System.out.println("请输入.....:");
String msg = bufferedReader.readLine();
connect.channel().writeAndFlush(msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally{
worker.shutdownGracefully();
}
}
}