天天看點

閉關修煉(十三)Netty入門阻塞IO與非阻塞IO的差別NIO用戶端與服務端Netty快速入門Netty案例實作

文章目錄

  • 阻塞IO與非阻塞IO的差別
    • 同步、異步與阻塞、非阻塞
    • 阻塞與非阻塞
      • 什麼是同步阻塞IO
      • 什麼是僞異步阻塞IO
      • 什麼是同步非阻塞IO
        • NIO伺服器的缺點
      • 什麼是異步非阻塞IO
  • NIO用戶端與服務端
    • 選擇器key
    • NIO服務端
    • NIO用戶端
  • Netty快速入門
    • 什麼是netty?
    • netty的應用場景
    • 為什麼會有netty
  • Netty案例實作
    • 導入jar
    • Netty服務端
    • Netty用戶端
    • channelClosed
    • channelDisconnected

阻塞IO與非阻塞IO的差別

同步、異步與阻塞、非阻塞

NIO有兩個重要的組合:通道和緩沖區,通道是傳輸資料,緩沖區來存放資料。

IO流又稱為BIO,Blocking IO,同步阻塞IO,

NIO(1.7) 同步、非阻塞IO

AIO(1.7後)異步、非阻塞IO

阻塞與非阻塞

什麼是同步阻塞IO

原本的IO網絡程式設計,伺服器端沒有收到用戶端任何資料的時候就一直等待。serverSocket.accept()

這種就成為伺服器的同步阻塞IO

什麼是僞異步阻塞IO

如何解決阻塞問題?使用多線程(線程池),這種稱為僞異步阻塞IO,請求來了交給多線程去處理,僞異步阻塞沒有真正解決阻塞IO的核心問題,在接收用戶端資料仍然是在阻塞狀态,隻不過是接收到資料後放到線程去處理。

多線程–僞異步

什麼是同步非阻塞IO

NIO出現同步非阻塞IO,NIO的真正用途在網絡通信。

NIO中有一個東西叫選擇器,用戶端不會直接與伺服器端建立通道,而是和選擇器建立通道,用戶端可以注冊多條通道到選擇器,任何資料在選擇器中已經準備好了,選擇器會通知給伺服器端,服務端采用循環去監聽選擇器中的事件KEY(輪詢監聽),這種就稱為同步非阻塞IO

選擇器:管理通道,在伺服器建立選擇器(選擇器隻有一個,不可能在用戶端建立)。

NIO伺服器的缺點

需要輪詢監聽選擇器,如果選擇器為空輪詢時,會一直循環最終導緻CPU100%

什麼是異步非阻塞IO

AIO,不需要啟動額外的IO線程,被動回調,

NIO用戶端與服務端

選擇器key

OP_CONNECT:可連接配接

OP_ACCEPT:可接受連接配接,伺服器通道注冊到選擇器使用這個

OP_READ :可讀

OP_WRITE:可寫

NIO服務端

寫起來有些複雜

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class NIOServer {
    static int PORT = 8080;

    public static void main(String[] args) throws IOException {
        System.out.println("伺服器端開啟");
        // 建立伺服器端通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 設定成異步
        serverChannel.configureBlocking(false);
        // 綁定連接配接
        serverChannel.bind(new InetSocketAddress(PORT));
        // 擷取選擇器
        Selector selector = Selector.open();
        // 将通道注冊到選擇器,并且監聽已接收到的資料
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 輪詢擷取已準備就緒的事件
        while (selector.select() > 0) {
            // 擷取目前選擇器有注冊已監聽到的事件
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                //擷取準備就緒事件
                SelectionKey skey = it.next();
                // 判斷事件準備就緒
                if (skey.isAcceptable()) {
                    // 若可接受連接配接,擷取客戶連接配接
                    SocketChannel socketChannel = serverChannel.accept();
                    // 設定為異步模式
                    socketChannel.configureBlocking(false);
                    // 将該通道注冊到伺服器上
                    socketChannel.register(selector, SelectionKey.OP_READ);

                } else if (skey.isReadable()) {
                    // 若就緒,擷取就緒狀态的通道
                    SocketChannel socketChannel = (SocketChannel) skey.channel();
                    // 讀取資料
                    ByteBuffer buff = ByteBuffer.allocate(NIOClient.BUFFER_SIZE);
                    int len = 0;
                    while ((len = socketChannel.read(buff)) > 0) {
                        buff.flip();
                        System.out.println(new String(buff.array(), 0, len));
                        buff.clear();
                    }
                }
                it.remove();
            }
        }
    }
}
           

NIO用戶端

public class NIOClient {
    static String REMOTE_ADDRESS = "127.0.0.1";
    static int BUFFER_SIZE = 1024;

    public static void main(String[] args) throws IOException {
        System.out.println("用戶端啟動");
        // 建立Socket通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress(REMOTE_ADDRESS, NIOServer.PORT));
        // 切換為異步非阻塞 (1.7
        sChannel.configureBlocking(false);
        // 指定緩沖區
        ByteBuffer buff = ByteBuffer.allocate(BUFFER_SIZE);
        // 存放數
        buff.put(new Date().toString().getBytes());
        // 切換讀取模式
        buff.flip();
        // 通過通道傳輸
        sChannel.write(buff);
        // 收尾
        buff.clear();
        sChannel.close();

    }
}
           

Netty快速入門

什麼是netty?

netty是一個基于java NIO類庫的異步通訊架構,他的架構特點是:異步非阻塞,基于事件驅動,高性能,高可靠,和高可定制性。

netty的應用場景

分布式架構中dubbo、zookeeper、RocketMQ底層RPC通訊使用的就是netty

遊戲開發中,底層使用netty進行通訊

為什麼會有netty

解決傳統IO非阻塞問題

對NIO進行封裝,簡化代碼

采用事件驅動

容錯機制,可連接配接重試

Netty案例實作

導入jar

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty</artifactId>
            <version>3.3.0.Final</version>
        </dependency>
           

Netty服務端

AIO異步非阻塞特性,也就是說接收用戶端資料時,建立單獨的線程進行處理,互不影響

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


class ServerHandler extends SimpleChannelHandler {
    // 通道關閉時觸發
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
        System.out.println("channelClosed");

    }

    // 必須建立連接配接,關閉通道時進行觸發
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("channelDisconnected");

    }

    // 接收端出現異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
        System.out.println("exceptionCaught");
    }

    // 接收用戶端資料,建立單獨的線程進行處理,互不影響
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.messageReceived(ctx, e);
        System.out.println("messageReceived");
        System.out.println("伺服器端擷取用戶端發來參數:" + e.getMessage());
        // 伺服器回複資訊
        ctx.getChannel().write("你好~");
    }
}

public class NettyServer {
    static int PORT = 8080;
    public static void main(String[] args) {
        // 1.建立服務對象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 2.建立2個線程池 一個監聽端口号,一個nio監聽
        ExecutorService boos = Executors.newCachedThreadPool();
        ExecutorService wook = Executors.newCachedThreadPool();
        // 3.将線程池放入到工程
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, wook));
        // 4.設定管道工程
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            // 設定管道
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                // 傳輸資料的時候直接為string類型
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                // 設定事件監聽類
                pipeline.addLast("serverHandler", new ServerHandler());
                return pipeline;
            }
        });
        // 5.綁定端口号
        serverBootstrap.bind(new InetSocketAddress(PORT));
        System.out.println("NettyServer啟動");
        while (true){
            Thread.sleep(1000);
            System.out.println("每隔1秒列印");
        }
    }
}

           

Netty用戶端

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class ClientHandler extends SimpleChannelHandler{
    // 通道關閉時觸發
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
        System.out.println("channelClosed");

    }

    // 必須建立連接配接了,關閉通道時進行觸發
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("channelDisconnected");

    }

    // 接收端出現異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
        System.out.println("exceptionCaught");
    }

    // 接收用戶端資料,建立單獨的線程進行處理,互不影響
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.messageReceived(ctx, e);
        System.out.println("messageReceived");
        System.out.println("伺服器端向用戶端回複内容:" + e.getMessage());
    }
}
public class NettyClient {
    static String REMOTE_ADDRESS = "127.0.0.1";

    public static void main(String[] args) {
        // 1.建立用戶端對象
        ClientBootstrap clientBootstrap = new ClientBootstrap();
        // 2.建立2個線程池 一個監聽端口号,一個nio監聽
        ExecutorService boos = Executors.newCachedThreadPool();
        ExecutorService wook = Executors.newCachedThreadPool();
        // 3.将線程池放入到工程
        clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, wook));
        // 4.設定管道工程
        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            // 設定管道
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                // 傳輸資料的時候直接為string類型
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                // 設定事件監聽類
                pipeline.addLast("clientHandler", new ClientHandler());
                return pipeline;
            }
        });
        // 5.連接配接伺服器
        ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress(REMOTE_ADDRESS,
                NettyServer.PORT));
        // 擷取通道
        Channel channel = connect.getChannel();
        System.out.println("用戶端start");
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("請輸入内容");
            channel.write(scanner.next());
        }
    }


}

           

channelClosed

隻要是關閉連接配接,就調用channelClosed方法

channelDisconnected

調用這個channelDisconnected方法前提是已經建立了連接配接,然後再關閉連接配接,如果還沒建立連接配接關閉是不會調用這個方法的