天天看點

Netty 學習(三)EventLoop一、前言二、背景知識三、Netty 中的 EventLoop四、總結附錄

目錄

  • 一、前言
  • 二、背景知識
    • 1.Java NIO
      • Selector
      • 通道事件
      • 通道與 Selector 綁定
      • NIO 服務端的模闆
    • 2. EventLoop
  • 三、Netty 中的 EventLoop
    • 1. 主從多線程模型
    • 2. 事件處理機制
    • 3. EventLoop 最佳實踐
  • 四、總結
  • 附錄
    • NIO 示例
      • NIO 服務端示例代碼
      • NIO 用戶端示例代碼

一、前言

要了解 Netty 的 Reactor 線程模型,需要先了解 NIO 和 EventLoop 等一些基礎知識,Netty 是對 NIO 的一些基礎 API 進行了封裝,而 EventLoop 的設計思想被運用在 Redis、Nginx、Node.js 等衆多高性能架構中。了解了這兩個基礎知識後,對 Netty 的 Reactor 模式就會有比較深刻的認識。

二、背景知識

1.Java NIO

Java NIO 之 Selector(選擇器)

要點:

Selector

一般稱為

選擇器

,當然你也可以翻譯為

多路複用器

。它是Java NIO核心元件中的一個,用于檢查一個或多個NIO Channel(通道)的狀态是否處于可讀、可寫。如此可以實作單線程管理多個channels,也就是可以管理多個網絡連結。

Netty 學習(三)EventLoop一、前言二、背景知識三、Netty 中的 EventLoop四、總結附錄

通道事件

  • NIO Channel(通道)在通訊過程中會通過狀态(事件)表明是否已經就緒,比如某個Channel成功連接配接到另一個伺服器稱為“ 連接配接就緒 ”,NIO 的事件用 SecletionKey的四個常量來表示:
Key 事件 Value
連接配接就緒 SelectionKey.OP_CONNECT 8
接收就緒 SelectionKey.OP_ACCEPT 16
讀就緒 SelectionKey.OP_READ 1
寫就緒 SelectionKey.OP_WRITE 4

通道與 Selector 綁定

一個 SelectionKey 對象表示了一個特定的通道對象和一個特定的選擇器對象之間的注冊關系。同時包括了監控的 IO 事件。

  • key.channel(); 傳回該 SelectionKey 對應的 channel。
  • key.interestOps(); 傳回代表需要 Selector 監控的 IO 操作的 bit mask。
  • key.interestOps(int ops); 設定新的監控事件 bit mask。
  • key.selector(); 傳回該 SelectionKey 對應的 Selector。
  • key.attachment(); 傳回 SelectionKey 的attachment,attachment 可以在注冊 channel 的時候指定。
  • key.readyOps(); 傳回一個 bit mask,代表在相應 channel 上可以進行的 IO 操作。

NIO 服務端的模闆

ServerSocketChannel ssc = ServerSocketChannel.open();
	ssc.socket().bind(new InetSocketAddress("localhost", 8080));
	ssc.configureBlocking(false);
	
	Selector selector = Selector.open();
	ssc.register(selector, SelectionKey.OP_ACCEPT);
	
	// 參考 Netty 源碼,采用反射方式将 Selector 的 selectedKeys 字段與本地變量關聯,這樣變量會随着 Selector 的 select 方法的執行發生變化。
    Set<SelectionKey> selectedKeys = new HashSet<>();
    Field selectedKeysField = SelectorImpl.class.getDeclaredField("selectedKeys");
    selectedKeysField.setAccessible(true);
    selectedKeysField.set(selector, selectedKeys);

	
	while(true) {
	    int readyNum = selector.select();
	    if (readyNum == 0) {
	        continue;
	    }
	
	    Iterator<SelectionKey> it = selectedKeys.iterator();
	
	    while(it.hasNext()) {
	        SelectionKey key = it.next();
	
	        if(key.isAcceptable()) {
	            // 接受連接配接
	        } else if (key.isReadable()) {
	            // 通道可讀
	        } else if (key.isWritable()) {
	            // 通道可寫
	        }
	
	        it.remove();
	    }
	}
           

2. EventLoop

什麼是 Event Loop?

要點:

Netty 學習(三)EventLoop一、前言二、背景知識三、Netty 中的 EventLoop四、總結附錄
Netty 學習(三)EventLoop一、前言二、背景知識三、Netty 中的 EventLoop四、總結附錄

主線程就讓 Event Loop 線程去處理耗時較長的任務,然後接着往後運作。等到耗時任務完成操作,Event Loop 線程再把結果傳回主線程。主線程就調用事先設定的回調函數,完成整個任務。

三、Netty 中的 EventLoop

1. 主從多線程模型

Netty 學習(三)EventLoop一、前言二、背景知識三、Netty 中的 EventLoop四、總結附錄
  1. 主從多線程模型由多個 Reactor 線程組成,每個 Reactor 線程

    都有獨立的 Selector 對象

  2. MainReactor 僅負責處理用戶端連接配接的 Accept 事件,連接配接建立成功後将新建立的連接配接對象注冊至 SubReactor。
  3. SubReactor 配置設定線程池中的 I/O 線程與連接配接綁定,它将負責連接配接生命周期内所有的 I/O 事件。

2. 事件處理機制

Netty 學習(三)EventLoop一、前言二、背景知識三、Netty 中的 EventLoop四、總結附錄

NioEventLoop 的事件處理機制采用的是無鎖串行化的設計思路。

  1. BossEventLoopGroup 負責監聽用戶端的 Accept 事件,當事件觸發時,将事件注冊至 WorkerEventLoopGroup 中的一個 NioEventLoop 上。
  2. WorkerEventLoopGroup 負責處理用戶端的 Read / Write 事件。資料準備完成時,會被傳遞到 ChannelPipeline 的第一個 ChannelHandler 中。資料處理完成後,将加工完成的資料再傳遞給下一個 ChannelHandler,整個過程是串行化執行,不會發生線程上下文切換的問題。

3. EventLoop 最佳實踐

  1. 網絡連接配接建立過程中三次握手、安全認證的過程會消耗不少時間。這裡建議采用 Boss 和 Worker 兩個 EventLoopGroup,有助于分擔 Reactor 線程的壓力。
  2. 如果業務邏輯執行時間較短,建議直接在 ChannelHandler 中執行。耗時較長的 ChannelHandler 可以考慮維護一個業務線程池,将編解碼後的資料封裝成 Task 進行異步處理,避免 ChannelHandler 阻塞而造成 EventLoop 不可用。
  3. 在設計業務架構的時候,需要明确業務分層和 Netty 分層之間的界限。不要一味地将業務邏輯都添加到 ChannelHandler 中。

四、總結

Netty Reactor 線程模型的核心處理引擎是 EventLoop, Netty EventLoop 的功能用處可以簡單的歸納總結:

  • MainReactor 線程: 處理用戶端請求接入。
  • SubReactor 線程: 資料讀取、I/O事件分發與執行。

附錄

NIO 示例

NIO 服務端示例代碼

class NioServer {
	    public static void main(String[] args) {
	        ServerSocketChannel ssc = ServerSocketChannel.open()
	        ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000))
	        ssc.configureBlocking(false)
	
	        Selector selector = Selector.open()
	
	        ssc.register(selector, SelectionKey.OP_ACCEPT)
	
	        def readBuf = ByteBuffer.allocate(1024)
	        def writeBuf = ByteBuffer.allocate(128)
	
	        writeBuf.put("received".getBytes())
	        writeBuf.flip()
	
	        for(;;){
	            int nReady = selector.select()
	            if(nReady == 0){
	                Thread.sleep(1000)
	                continue
	            }
	            def keys = selector.selectedKeys()
	            def it = keys.iterator()
	
	            while(it.hasNext()){
	                def key = it.next()
	                it.remove()
	
	                if(key.isAcceptable()){
	                    def channel = ssc.accept()
	                    channel.configureBlocking(false)
	                    channel.register(selector, SelectionKey.OP_READ)
	                }else if(key.isReadable()){
	                    def chanel = key.channel() as SocketChannel
	                    readBuf.clear()
	                    chanel.read(readBuf)
	                    readBuf.flip()
	                    byte[] a = new byte[readBuf.limit()]
	                    readBuf.get(a)
	                    println("received ${ new String(a)}")
	                    key.interestOps(SelectionKey.OP_WRITE)
	                }else if(key.isWritable()){
	                    writeBuf.rewind()
	                    def channel = key.channel() as SocketChannel
	                    channel.write(writeBuf)
	                    key.interestOps(SelectionKey.OP_READ)
	                }
	            }
	        }
	    }
	}
           

NIO 用戶端示例代碼

class NioClient {
	    static void main(String[] args) {
	        def channel = SocketChannel.open()
	        channel.connect(new InetSocketAddress("127.0.0.1", 8000))
	        def writeBuf = ByteBuffer.allocate(32)
	        def readBuf = ByteBuffer.allocate(32)
	
	        writeBuf.put("hello".getBytes())
	        writeBuf.flip()
	
	        while (true) {
	            writeBuf.rewind()
	            channel.write(writeBuf)
	
	            readBuf.clear()
	            channel.read(readBuf)
	            readBuf.flip()
	            byte[] a = new byte[readBuf.limit()]
	            readBuf.get(a)
	            println("received: ${new String(a)}")
	        }
	    }
	}