天天看點

Netty系列3-BIO、AIO、NIO

用戶端和服務端通信本質上就是服務端監聽端口,用戶端發起連接配接請求,通過三次握手連接配接,如果連接配接成功建立,雙方就可以通過套接字socket進行通信。

根據通信實作方式的不同又分為BIO、AIO、NIO三種。

1.BIO

BIO是同步阻塞模型。通常由一個Acceptor線程監聽用戶端的連接配接,接收到連接配接請求後為每個用戶端都建立一個新線程進行處理,最後将響應通過輸出流傳回給用戶端,線程銷毀。

BIO最大的缺點是并發通路量增加後,服務端的線程個數和用戶端并發通路數呈1:1的關系,随着線程數量快速膨脹,系統性能将急劇下降,當線程達到一定數量,系統當機。

為了改進這個問題,提出了對線程使用線程池進行管理,這種通常被稱為僞異步I/O模型,并沒有解決問題。當大量高并發的時候,尤其是大量長連接配接或者讀取資料較慢的時候,線程數量還是急劇增加。使用固定數量線程池,可以解決線程增加的問題,但會導緻大量使用者線程等待,系統有瓶頸,不友好。

服務端核心代:

server = new ServerSocket(8080);
            while(true){
                Socket socket= server.accept();
                //當有新的用戶端接入時,投入線程池
                executorService.execute(new BioServerHandler(socket));
            }
           
public class BioServerHandler implements Runnable{
    private Socket socket;
    public BioServerHandler(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        try(
            BufferedReader in = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream(),
                    true)){
            String message;
            String result;
            while((message = in.readLine())!=null){
                result = response(message);
                out.println(result);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }

}
           

用戶端核心代碼:

Socket socket =  new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT);
    pw = new PrintWriter(socket.getOutputStream());
  	 pw.println(“hello”);
      pw.flush();
           

2.AIO

AIO是異步非阻塞通信模型,是java在1.7後提供的。本質上就是通過回調函數,直接上代碼。

服務端核心代碼:

public class AioServerHandler implements Runnable {

    /*異步通信通道*/
    public AsynchronousServerSocketChannel channel;

    public AioServerHandler(int port) {
        try {
            //建立服務端通道
            channel = AsynchronousServerSocketChannel.open();
            //綁定端口
            channel.bind(new InetSocketAddress(8080));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
    	//accept函數有兩個參數,第一個是傳遞給回調函數的附件,第二個就是回調函數,回調函數需要實作 				      	     //CompletionHandler<AsynchronousSocketChannel, ? super AioServerHandler>
        //CompletionHandler内部有兩個方法,completed函數在連接配接成功後調用,failed在失敗時候調用,兩個方法的參數			   //都有兩個,第一個代表IO操作完成後傳回的結果,第二個參數就是我們的附件
        channel.accept(this,new AioAcceptHandler());
        try {
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
           
public class AioAcceptHandler
        implements CompletionHandler<AsynchronousSocketChannel,
        AioServerHandler> {
    @Override
    //第一個參數是代碼連接配接成功後傳回的結果,第二個就是我們傳遞的附件
    public void completed(AsynchronousSocketChannel channel,
                          AioServerHandler serverHandler) {
        //重新注冊監聽,讓别的用戶端也可以連接配接
        serverHandler.channel.accept(serverHandler,this);
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        //這個是讀事件發生的時候,又有另外一個回調,讀事件的回調函數不貼了
        channel.read(readBuffer,readBuffer,
                new AioReadHandler(channel));

    }

    @Override
    //第一個參數是連接配接失敗傳回的結果,這裡是個異常
    //第二個參數是我們傳遞的附件
    public void failed(Throwable exc, AioServerHandler serverHandler) {
        exc.printStackTrace();
        serverHandler.latch.countDown();
    }
           

用戶端核心代碼:

public class AioClientHandler
        implements CompletionHandler<Void,AioClientHandler>,Runnable {

    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;

    public AioClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            //建立用戶端通道
            clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }



    @Override
    public void run() {
      //這裡的連接配接成功後回調函數就是自己
        clientChannel.connect(new InetSocketAddress(host,port),
                        null,this);
        try {
            clientChannel.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //連接配接成功回調方法
    @Override
    public void completed(Void result, AioClientHandler attachment) {
    }

    //連接配接失敗回調方法
    @Override
    public void failed(Throwable exc, AioClientHandler attachment) {
   
    }

    }

           

3.NIO

3.1 Reactor模式

說起NIO不得不先說下Reactor模式。

(1)Reactor模式定義

Reactor模式是事件驅動模型,有一個或多個并發輸入源,一個Service Handler和多個Request Handlers,Service Handler同步的将輸入的請求(Event)以多路複用的方式,并且根據Event類型分發給相應的Request Handler。

(2)Reactor模式元素

  • EventHandler:事件處理器
  • Handle:作業系統中的句柄,是對資源在作業系統層面上的一種抽象,它可以是打開的檔案、一個連接配接等。在網絡程式設計中一般指Socket Handle,即一個網絡連接配接(在Java NIO中的Channel)。這個Channel注冊到Synchronous Event Demultiplexer中,以監聽Handle中發生的事件,可以是CONNECT、READ、WRITE、CLOSE等事件
  • InitiationDispatcher:事件處理排程器,用來管理EventHandler,将接收到網絡請求分發給相應的處理器去異步處理
  • Demultiplexer:阻塞等待一系列的Handle中的事件到來,在Java NIO中用Selector來封裝。
    Netty系列3-BIO、AIO、NIO

(3)Reactor模式請求處理流程

  • 初始化InitiationDispatcher,并初始化一個Map,用來放Handle和EventHandler的映射。
  • 注冊EventHandler到InitiationDispatcher中,把EventHandler和對應Handle放入到map中
  • 調用InitiationDispatcher的handle_events()方法以啟動Event Loop。在Event Loop中,調用Synchronous Event Demultiplexer的select()方法阻塞等待Event發生。
  • 當Event發生後,select()方法傳回,InitiationDispatcher根據傳回的Handle找到對應的EventHandler,并回調該EventHandler的handle_events()方法,在handle_events()方法中還可以向InitiationDispatcher中注冊新的Eventhandler

3.2 Reactor模式的java實作

(1)單線程Reactor模式

這裡的單線程指的是服務端的Reactor是單線程

  • 伺服器端的Reactor使用Selector來實作多路複用,并且啟動事件循環。服務端會注冊一個Acceptor事件處理器到Reactor中,這樣Reactor會監聽用戶端向伺服器端發起的連接配接請求事件(ACCEPT事件)。
  • 用戶端向伺服器發起一個連接配接請求,Reactor監聽到了該ACCEPT事件并将該事件派發給對應的Acceptor處理器。Acceptor處理器通過accept()方法得到連接配接(SocketChannel),然後将該連接配接所關注的READ事件以及對應的事件處理器注冊到Reactor中,這樣Reactor就會監聽該連接配接的READ事件。
  • 當Reactor監聽到有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。
  • 處理完所有就緒的感興趣的I/O事件後,Reactor線程會再次執行select()阻塞等待新的事件就緒并将其分派給對應處理器進行處理。
    Netty系列3-BIO、AIO、NIO

    Reactor的單線程主要是針對于I/O操作而言,也就是accept()、read()、write()以及connect()操作都在一個線程上完成。

    但這裡單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業務操作也在該線程上,這會大大延遲I/O請求的響應。

    是以出現了第二種模式單線程Reactor,工作者線程池。

(2)單線程Reactor,工作者線程池

與單線程Reactor模式不同的是,添加了一個工作者線程池,并将非I/O操作從Reactor線程中移出轉交給工作者線程池。這樣可以提高Reactor線程的I/O響應,不至于因為一些耗時的業務邏輯而延遲對後面I/O請求的處理。

線程池的優勢:

  • 重用現線程,減少線程建立和銷毀過程的開銷。
  • 當請求到達時,不會由于等待建立線程,提高了響應性。
  • 合理調整線程池的大小,可以建立足夠多的線程使處理器保持忙碌狀态,還可防止過多線程耗盡記憶體。

    缺點:

    I/O操作還是一個Reactor來完成,對于高負載、大并發或大資料量的應用場景有性能瓶頸:

  • 一個NIO線程同時處理成百上千的鍊路,性能上無法支撐
  • 當NIO線程負載過重時處理速度将變慢,這會導緻大量用戶端連接配接逾時,逾時之後往往會進行重發,這更加重了NIO線程的負載,最終會導緻大量消息積壓和處理逾時
    Netty系列3-BIO、AIO、NIO
    這就出現了第三種模式:多Reactor線程模式

(3)多Reactor線程模式

多Reactor線程模式中有一個mainReactor,多個subReactor。Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發的事件循環邏輯。mainReactor線程主要負責接收用戶端的連接配接請求,然後将接收到的SocketChannel傳遞給subReactor,由subReactor來完成和用戶端的通信。

流程:

  • 注冊一個Acceptor事件處理器到mainReactor中,啟動mainReactor的事件循環。
  • 用戶端向伺服器端發起一個連接配接請求,mainReactor監聽到了該ACCEPT事件并将該事件派發給Acceptor處理器
  • Acceptor處理器通過accept()方法得到與這個用戶端對應的連接配接(SocketChannel),然後将這個SocketChannel傳遞給subReactor線程池
  • subReactor線程池配置設定一個subReactor線程給這個SocketChannel,将SocketChannel關注的READ事件或者以及對應的事件處理器注冊到subReactor線程
  • 當有I/O事件就緒時,相關的subReactor就将事件派發給響應的處理器。這裡subReactor線程隻負責完成I/O的read()操作,在讀取到資料後将業務邏輯的處理放入到線程池中完成,若完成業務邏輯後需要傳回資料給用戶端,則相關的I/O的write操作還是會被送出回subReactor線程

    多Reactor線程模式将接受用戶端的連接配接請求和與該用戶端的通信分在了兩個Reactor線程來完成。mainReactor完成接收用戶端連接配接請求的操作,将建立好的連接配接轉交給subReactor線程,subReactor線程完成與用戶端的通信。這裡所有的I/O操作(accept()、read()、write()、connect())還是在Reactor線程(mainReactor線程 或 subReactor線程)中完成的。Thread Pool(線程池)僅用來處理非I/O操作的邏輯。

    優點:

    多Reactor線程模式在大量并發請求的情況下,将大量連接配接分發給多個subReactor線程,在多核的作業系統中這能大大提升應用的負載和吞吐量。

    同時不會因為read()資料耗時而導緻後面的用戶端連接配接請求得不到即時處理。

    Netty服務端使用了多Reactor線程模式

    Netty系列3-BIO、AIO、NIO
    上述分析完後你會發現reactor模式與觀察者模式有點像。不過,觀察者模式與單個事件源關聯,而反應器模式則與多個事件源關聯 。當一個主體發生改變時,所有依屬體都得到通知。

3.3 Selector、Channels、SelectionKey

(1)Selector

Selector也就是NIO中的選擇器,用做事件訂閱和Channel管理。

應用程式向Selector注冊需要它關注的Channel,以及具每一個Channel感興趣的IO事件。

(2)Channels

通道,應用程式和作業系統通信的管道。應用程式可以通過通道讀寫資料。所有在Selector注冊的通道必須繼承SelectableChannel類

  • ServerSocketChannel:伺服器程式的監聽通道,隻能通過這個通道向作業系統注冊支援多路複用IO的端口監聽。可以支援UDP和TCP
  • ScoketChannel:TCPSocket套接字的監聽通道,一個Socket套接字對應了一個用戶端
  • DatagramChannel:UDP資料封包的監聽通道。

    (3)SelectionKey

    SelectionKey是NIO中的操作類型。一共四種操作類型:OP_READ(讀)、OP_WRITE(寫)、OP_CONNECT(請求連接配接)、OP_ACCEPT(接受連接配接)。

  • ServerSocketChannel:可以注冊OP_ACCEPT
  • 伺服器SocketChannel:OP_READ、OP_WRITE
  • 用戶端SocketChannel:OP_READ、OP_WRITE、OP_CONNECT

每個操作類型就緒條件:

  • OP_READ: 當作業系統讀緩沖區有資料可讀時就緒。
  • OP_WRITE:當作業系統寫緩沖區有空閑空間時就緒。一般情況下寫緩沖區都有空閑空間,小塊資料無需注冊,直接寫入即可,否則該條件不斷就緒浪費CPU。但如果是寫密集型的任務,有可能寫滿緩存,這時需要注冊,并且寫完後取消注冊。
  • OP_CONNECT: 請求連接配接成功後就緒。
  • OP_ACCEPT 當收到用戶端連接配接請求時就緒。