天天看点

【J2SE】BIO,NIO,AIO,NettyjavaIO编程.

javaIO编程.

前言

         同步/异步:

        同步和异步关注的是消息通信机制,通常用来形容方法的调用,同步调用时需等待方法返回后才能继续运行;异步调用时,调用者可以继续后续的运行,异步方法通常会在另外一个线程中真实地执行,如果调用者需要返回结果,异步方法执行完成时,会通知调用者。(去饭店吃饭/订外卖)

       阻塞/非阻塞:

       阻塞和非阻塞关注的是对资源的需求,通常用来形容多线程之间的互相影响,比如一个线程占用了一个临界区资源(同时只能被一个线程使用),那么其他需要这个资源的线程则处于等待或者阻塞状态。(食堂打饭排队/自助食堂)

  1.        BIO是同步,阻塞的IO
  2.        NIO是同步,非阻塞的IO
  3.        AIO是异步,  非阻塞的IO

       随着java版本的不断升级与迭代,java的IO模型开始得到改变。从原始的BIO,到1.4以后发布的NIO,再到对NIO进行的改进AIO分别对IO模型做了优化,那么性能就自然不用多说了,肯定是依次得到了提升的。

BIO

 1.1 线程模型

【J2SE】BIO,NIO,AIO,NettyjavaIO编程.
package com.atChina.chat.demo04;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;


public class MyHttp {

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(9999);
        while (true) {
            Socket client = server.accept();
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
            byte[] bts = new byte[20480];
            int len = client.getInputStream().read(bts);
            String requestInfo = new String(bts, 0, len).trim();
            System.out.println(requestInfo);

            // 响应体
            StringBuilder responseContent = new StringBuilder();
            responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");

            // 响应头
            StringBuilder response = new StringBuilder();
            // HTTP协议版本, 状态代码, 描述
            response.append("HTTP/1.1 200 OK\r\n");
            response.append("Content-type:text/html;charset=utf-8\r\n");
            response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
            response.append("\r\n");
            response.append(responseContent);

            bw.write(response.toString());
            bw.flush();
            bw.close();
        }
    }


}
           
package com.atChina.chat.demo04;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;


public class MyHttpBio {

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(9999);
        while (true) {
            Socket client = server.accept();

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
                        byte[] bts = new byte[20480];
                        int len = client.getInputStream().read(bts);
                        String requestInfo = new String(bts, 0, len).trim();
                        System.out.println(requestInfo);

                        // 响应体
                        StringBuilder responseContent = new StringBuilder();
                        responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");

                        // 响应头
                        StringBuilder response = new StringBuilder();
                        // HTTP协议版本, 状态代码, 描述
                        response.append("HTTP/1.1 200 OK\r\n");
                        response.append("Content-type:text/html;charset=utf-8\r\n");
                        response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
                        response.append("\r\n");
                        response.append(responseContent);

                        bw.write(response.toString());
                        bw.flush();
                        bw.close();
                    }catch (Exception exception){

                    }
                }
            }).start();
        }
    }
}
           

   BIO 的缺点server.accept() 组塞等待连接。client.getInputStream().read(bts); 在客户端没有数据输入的时候会阻塞等待客户端的传入数据,同步阻塞导致并发能力低下,不能处理高并发的场景。

NIO

2.1 什么是NIO

  1.      1.在 JDK 1. 4 中 新 加入 了 NIO( New Input/ Output) 类, 引入了一种基于通道和缓冲区的 I/O 方式,它可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆DirectByteBuffer 对象作为这块内存的引用进行操作,避免了在 Java 堆和 Native 堆中来回复制数据。
  2.      2.NIO 是一种同步非阻塞的 IO 模型,所以也可以叫NON-BLOCKING IO。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写到缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。

2.2 NIO的特点

  1.    1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
  2.    2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
  3.    3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换(其实单线程也是能够处理,但为了效率,还是建议多线程辅助)。

2.3 NIO的工作模型

public class MyHttpNio1 {


    public static void main(String[] args) throws IOException, InterruptedException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);
        List<SocketChannel> socketChannelList=new ArrayList<>();

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                socketChannel.configureBlocking(false);
                socketChannelList.add(socketChannel);
                continue;
            }
            Iterator<SocketChannel> iterator = socketChannelList.iterator();
            while(iterator.hasNext()){
                SocketChannel channel = iterator.next();
                //5.获取客户端传递过来的数据,并且把数据放在byteBuffer这个缓冲区中
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                //返回值:
                //正数:表示本次读到的有效字节的个数
                //0  :表示本次没有读到有效字节
                //-1 :表示读到了末尾
                int read = channel.read(byteBuffer);
                System.out.println("客户端消息:" +
                        new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
                // 响应体
                StringBuilder responseContent = new StringBuilder();
                responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");
                // 响应头
                StringBuilder response = new StringBuilder();
                // HTTP协议版本, 状态代码, 描述
                response.append("HTTP/1.1 200 OK\r\n");
                response.append("Content-type:text/html;charset=utf-8\r\n");
                response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
                response.append("\r\n");
                response.append(responseContent);
                //6.给客户端回写数据
                channel.write(ByteBuffer.wrap(response.toString().getBytes(StandardCharsets.UTF_8)));
                //7.释放资源
                channel.close();
                iterator.remove();
            }
        }
    }
}
           
【J2SE】BIO,NIO,AIO,NettyjavaIO编程.
public class MyHttpNio2 {

    public static void main(String[] args) throws IOException, InterruptedException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                if(key.isAcceptable()) {  //处理连接事件
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);  //设置为非阻塞
                    System.out.println("client:" + socketChannel.getLocalAddress() + " is connect");
                    socketChannel.register(selector, SelectionKey.OP_READ); //注册客户端读取事件到selector
                } else if (key.isReadable()) {  //处理读取事件
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    SocketChannel channel = (SocketChannel) key.channel();
                    channel.read(byteBuffer);
                    System.out.println("client:" + channel.getLocalAddress() + " send " + new String(byteBuffer.array()));
                    channel.register(selector, SelectionKey.OP_WRITE); //注册客户端读取事件到selector
                } else if(key.isWritable()){
                    SocketChannel channel = (SocketChannel)key.channel();
                    //5.获取客户端传递过来的数据,并且把数据放在byteBuffer这个缓冲区中
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    //返回值:
                    //正数:表示本次读到的有效字节的个数
                    //0  :表示本次没有读到有效字节
                    //-1 :表示读到了末尾
                    int read = channel.read(byteBuffer);
                    System.out.println("客户端消息:" +
                            new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
                    // 响应体
                    StringBuilder responseContent = new StringBuilder();
                    responseContent.append("<html><head><title>你好</title></head><body>hello world!!<body></html>");
                    // 响应头
                    StringBuilder response = new StringBuilder();
                    // HTTP协议版本, 状态代码, 描述
                    response.append("HTTP/1.1 200 OK\r\n");
                    response.append("Content-type:text/html;charset=utf-8\r\n");
                    response.append("Content-Length:").append(responseContent.toString().getBytes().length).append("\r\n");
                    response.append("\r\n");
                    response.append(responseContent);
                    //6.给客户端回写数据
                    channel.write(ByteBuffer.wrap(response.toString().getBytes(StandardCharsets.UTF_8)));
                    channel.close();
                }
                iterator.remove();  //事件处理完毕,要记得清除
            }
        }
    }
}
           

2.4 传统IO与NIO的对比

5.1 数据传输方式

IO是面向流的,NIO是面向缓冲区的。

Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。

Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。

5.2 阻塞与非阻塞

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。

Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

2.5 举例说明IO与NIO

6.1 IO工作模型

老王干了多年程序员工作后,感觉心力交瘁,也没发财, 于是准备创业干点事情。于是拿出多年积蓄在某城市开了一家100桌大小的火锅店。因为多年的行业浸淫,老王坚信以客户为中心的思想不动摇。为了做到VIP一般的服务,老王招了50多名服务员,同时还在招了50多名兼职服务员,以备客人多时用。为什么招这么多呢?因为老王要求每桌标配一名服务员进行服务,如果50名正式员工不够用,就启用兼职员工。一个多月下来,老王的生意也很火爆,大家觉得服务也还不错。可是老王月底算账时,发现发完扣除房租,水电,食材,人工等乱七八糟的成本后,自己并没赚到大钱,当然,小钱还是赚到了的,但相对投入这么大成本来说,收益率实在不够理想。于是,老王陷入了沉思...

6.2 NIO工作模型

老王通过观察,回想自己火锅店的状况,发现了一个重要问题?就是大部分客人吃饭的时候并不是很需要有人在旁边服务,客人主要是点菜,加菜,加水,结账时需要,而目前的服务模式会出现一些服务员在门口闲着的状态,有的甚至在旁边玩起手机...虽然客人随叫随到,但是好像资源利用率并不怎么高。

于是老王第二天召集主要人员开会,说:我们的生意很火爆,有时顾客还需排队,我想改进一下一下店里的服务模式。第一呢,我想扩大一下规模,扩大一倍,把桌子加到200桌。第二呢,我想兼职员工还是不要了,毕竟呼来唤去的很麻烦。这时会议中一人说到,那我们岂不是还得招个一百来人?老王卖了下关子,咳了一声,道:那倒不用,我们还是保持我们现在的人员配置,但是呢,我们要改变服务模式,我们毕竟是平民火锅,但是提供的却是VIP式的服务,那我们还是回归平民模式吧,但我们尽量保证服务质量。那老王的方法是什么呢?

其实也不是什么很牛逼的方法,现在来说很普通,安排个专门接待的人员,来客人时给

他一份菜单并安排一个桌位,告诉他有什么需要请按桌上的呼叫按钮(现在可以直接微信扫一扫点单付款,更方便),并记下他的相关信息。然后收到呼叫就派出空闲的服务员过去服务,服务完了就回来领取新的任务,当然分配任务可通过对讲机完成。这样就避免了大量人力资源的浪费,至少看起来现在服务员比以前“充实”多了,服务员要是觉得太辛苦,我们可以加薪来提高积极性嘛。

     就这样持续了几个月,老王火锅店盈利比之前翻了3翻,脸上的笑容也多了起来,对待员工也变得更加和蔼。关键是,老王在年终大会上做了个大胆的决定,去其他城市再开几家分店,并承诺全员加薪30%,大家欢呼雀跃!!!(都是我编的,tmd快编不下去了!)

       本故事纯属虚构,如有雷同,那就是抄的!

       此模式适用于服务时间短暂频繁的需求,对于复杂耗时,不建议使用,例如:你要开洗脚城按摩店那就行不通了!毕竟洗脚按摩需要人长时间服务的。

 Netty

3.1什么是Netty

“Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.”

由于NIO通信的复杂性,推动了Netty的产生,Netty是一个异步、事件驱动(asynchronous event-driven)的用以开发高性能、高可靠性协议服务器和客户端的网络应用框架。

换句话说,Netty是一个NIO服务器-客户端框架,它在JAVA NIO的基础上进行封装并提供出一套便于用户开发的API,使用它可以快速简单地开发网络应用程序,比如协议服务器和客户端。

3.2 netty的基本组件

  1. Bootstrap

一个Netty应用通常由一个Bootstrap开始,它的主要作用是配置整个Netty程序,串联起各个组件。Bootstrap有两种类型,ServerBootstrap和Bootstrap,分别用于Server端和Client端

  1. Channel

代表一个Socket连接,或者其他和I/O相关的组件

  1. EventLoop

为Channel处理I/O操作,一个EventLoop可以为多个Channel服务,可理解为一个线程

  1. EventLoopGroup

一个EventLoopGroup包含多个EventLoop,可以将其理解为一个线程池

  1. Handler

用来处理各种事件,比如连接、数据接收、异常、数据转换、编解码等。按照输入输出来分,分为ChannelInboundHandler和ChannelOutboundHandler。前者对从客户端发往服务器的报文进行处理,一般用来执行编码、读数据、业务处理等;后者处理服务器发往客户端的数据,如编码

  1. ChannelPipeline

ChannelHandler的容器。每个Channel会绑定一个ChannelPipeline,用于处理该Channel上产生的事件

  1. ChannelFuture

Netty中所有的I/O操作都是异步的,所以不能立刻得知消息的处理结果。因此,我们需要通过ChannelFuture注册一个监听,当操作执行成功或失败时进行一些处理

3.3 Netty线程模型

【J2SE】BIO,NIO,AIO,NettyjavaIO编程.

 3.4工作流程

【J2SE】BIO,NIO,AIO,NettyjavaIO编程.

        当一个连接到达时,Netty会产生一个Channel,然后从EventLoopGroup中选出一个EventLoop将该Channel注册在其上,标明感兴趣的I/O事件,如:OP_READ。EventLoop对Channel的监听即为上面讲到的I/O多路复用技术(selector)

        当某个Channel上有数据到来时,EventLoop会选出对应的Channel,读取数据,并且触发网络事件交由ChannelPipeline处理。ChannelPipeline中是一系列的Handler,数据在ChannelPipeline中流动,并经由一个个Handler处理,每个Handler处理完成后将数据交给下一个Handler处理

        Netty为我们提供了一些ChannelHandlerAdapter,我们可以通过使用它们简化Handler的编写,从而可以将注意力集中在具体的业务逻辑上。

3.5 Netty编解码

由于网络传输只能传输字节流,因此在发送数据前需要把message类型的数据转化为bytes这就是编码的过程。相对应的,在接收到数据后,我们需要把bytes解析成message。这就是解码过程

3.6  Netty心跳

    心跳就是Netty检查连接是否关闭的能力,它的作用就是不断的在制定时间内对channel进行监测,把那种长时间没有连接的channel进行关闭。

bootstrap.childHandler(new ChannelInitializer<Channel>() {
		
					@Override
					protected void initChannel(Channel ch) throws Exception {
					   ChannelPipeline pipeline = ch.pipeline();
					   //对连接作心跳检测
					   pipeline.addLast(new IdleStateHandler(5, 5, 10));
					   pipeline.addLast("decoder",new StringDecoder());
					   pipeline.addLast("encoder",new StringEncoder());
					   pipeline.addLast(new HHandler());
					}
				});
           

 HHandler extends SimpleChannelInboundHandler<String>  继承自这个类然后重写下面的trigger方法,实现心跳检查。

/**
     * 通过心跳检查来检测长时间不工作的 channel 并且对它进行关闭
     */
    @Override
	public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
		if(evt instanceof IdleStateEvent){
			IdleStateEvent event=(IdleStateEvent)evt;
			//如果是心跳检测事件
			if(event.state()==IdleState.ALL_IDLE){
				ChannelFuture writeAndFlush = ctx.writeAndFlush(" you will be close");
				writeAndFlush.addListener(new ChannelFutureListener() {
					public void operationComplete(ChannelFuture paramF) throws Exception {
						ctx.channel().close();
					}
				});
			}
		}else{
			super.userEventTriggered(ctx, evt);
		}
	}
           

服务端代码

package com.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * netty5服务端
 *
 */
public class Server {

	public static void main(String[] args) {
		//服务类
		ServerBootstrap bootstrap = new ServerBootstrap();
		
		//boss和worker
		EventLoopGroup boss = new NioEventLoopGroup();
		EventLoopGroup worker = new NioEventLoopGroup();
		
		try {
			//设置线程池
			bootstrap.group(boss, worker);
			
			//设置socket工厂、
			bootstrap.channel(NioServerSocketChannel.class);
			
			//设置管道工厂
			bootstrap.childHandler(new ChannelInitializer<Channel>() {

				@Override
				protected void initChannel(Channel ch) throws Exception {
					ch.pipeline().addLast(new StringDecoder());
					ch.pipeline().addLast(new StringEncoder());
					ch.pipeline().addLast(new ServerHandler());
				}
			});
			
			//netty3中对应设置如下
			//bootstrap.setOption("backlog", 1024);
			//bootstrap.setOption("tcpNoDelay", true);
			//bootstrap.setOption("keepAlive", true);
			//设置参数,TCP参数
			bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
			bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
			bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
			
			//绑定端口
			ChannelFuture future = bootstrap.bind(8121);
			
			System.out.println("start");
			
			//等待服务端关闭
			future.channel().closeFuture().sync();
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			//释放资源
			boss.shutdownGracefully();
			worker.shutdownGracefully();
		}
	}
}
           
package com.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * 服务端消息处理
 *
 */
public class ServerHandler extends SimpleChannelInboundHandler<String> {

	@Override
	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {

		System.out.println(msg);
		
		ctx.channel().writeAndFlush("hi");
		ctx.writeAndFlush("hi");
	}

	/**
	 * 新客户端接入
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelActive");
	}

	/**
	 * 客户端断开
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelInactive");
	}

	/**
	 * 异常
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
	}
	
	
}
           

客户端代码

package com.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * netty5的客户端
 *
 */
public class Client {

	public static void main(String[] args) {
		//服务类
		Bootstrap bootstrap = new Bootstrap();
		
		//worker
		EventLoopGroup worker = new NioEventLoopGroup();
		
		try {
			//设置线程池
			bootstrap.group(worker);
			
			//设置socket工厂、
			bootstrap.channel(NioSocketChannel.class);
			
			//设置管道
			bootstrap.handler(new ChannelInitializer<Channel>() {

				@Override
				protected void initChannel(Channel ch) throws Exception {
					ch.pipeline().addLast(new StringDecoder());
					ch.pipeline().addLast(new StringEncoder());
					ch.pipeline().addLast(new ClientHandler());
				}
			});
			
			ChannelFuture connect = bootstrap.connect("127.0.0.1", 8121);
			
			BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
			while(true){
				System.out.println("请输入.....:");
				String msg = bufferedReader.readLine();
				connect.channel().writeAndFlush(msg);
			}
			
		} catch (Exception e) {
			 e.printStackTrace();
		} finally{
			worker.shutdownGracefully();
		}
	}
}