天天看點

Netty 系列四(ChannelHandler 和 ChannelPipeline).

一、概念

    先來整體的介紹一下這篇博文要介紹的幾個概念(Channel、ChannelHandler、ChannelPipeline、ChannelHandlerContext、ChannelPromise):

Channel:Netty 中傳入或傳出資料的載體;

ChannelHandler:Netty 中處理入站和出站資料的應用程式邏輯的容器;

ChannelPipeline:ChannelHandler鍊 的容器;

ChannelHandlerContext:代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有ChannelHandler 添加到 ChannelPipeline 中時,都會建立 ChannelHandlerContext;

ChannelPromise:ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 進而使ChannelFuture不可變。

    我們來舉一個例子描述這些概念之間的邏輯關系:服務端接收到用戶端的連接配接請求,建立一個Channel同用戶端進行綁定,新建立的 Channel 會都将會被配置設定一個新的ChannelPipeline(這項關聯是永久性的,Channel 既不會附加另外一個ChannelPipeline,也不能分離目前的)。而 ChannelPipeline 作為 ChannelHandler鍊 的容器,當Channel 生命周期中狀态發生改變時,将會生成對應的事件,這些事件将會被 ChannelPipeline 中 ChannelHandler 所響應,響應方法的參數一般都有一個 ChannelHandlerContext ,一個 ChannelHandler 對應一個 ChannelHandlerContext,ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,這項關聯也是永遠不會改變的。

Netty 系列四(ChannelHandler 和 ChannelPipeline).

二、ChannelHandler

    Netty提供了大量預定義的可以開箱即用的ChannelHandler實作,包括用于各種協定的ChannelHandler。是以,我們在自定義ChannelHandler實作用于處理我們的程式邏輯時,隻需要繼承Netty 的一些預設實作即可,主要有兩種:

1、繼承 ChannelHandlerAdapter (在4.0 中 處理入站事件繼承 ChannelInboundHandlerAdapter,處理出站事件繼承 ChannelOutboundHandlerAdapter ;在5.0 推薦直接繼承 ChannelHandlerAdapter)

2、繼承 SimpleChannelInboundHandler

    這兩種方式有什麼差別呢?  當我們處理 入站資料 和 出站資料時,都需要確定沒有任何的資源洩露。在入站方向,繼承 SimpleChannelInboundHandler 的實作類會在消息被處理之後自動處理消息,而繼承 ChannelHandlerAdapter 的實作類需要手動的釋放消息(ReferenceCountUtil.release(msg));在出站方向,不管繼承的是哪一種的實作類,當你處理了 write() 操作并丢棄了一個消息,那麼你就應該釋放它,不僅如此,還要通知 ChannelPromise。否則可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的情況。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ReferenceCountUtil.release(msg);
    //ChannelPromise 是ChannelFuture的一個子類,設定成 true 通知 ChannelFutureListener 消息已經被處理了
    //當一個 Promise 被完成之後,其對應的 Future 的值便不能再進行任何修改了
    promise.setSuccess();
}      

    tips:總之,如果一個消息被消費或者丢棄了, 并且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那麼使用者就有責任調用 ReferenceCountUtil.release()。

    來看看 ChannelHandler 的 API:

isSharable :如果其對應的實作被标注為 Sharable, 那麼這個方法将傳回 true, 表示它可以被添加到多個 ChannelPipeline中

-- ChannelHandler 生命周期方法 --

handlerAdded :當把 ChannelHandler 添加到 ChannelPipeline 中時被調用

handlerRemoved :當從 ChannelPipeline 中移除 ChannelHandler 時被調用

exceptionCaught : 當處理過程中在 ChannelPipeline 中有錯誤産生時被調用

-- 處理入站資料以及各種狀态變化 --

channelRegistered : 當 Channel 已經注冊到它的 EventLoop 并且能夠處理 I/O 時被調用

channelUnregistered : 當 Channel 從它的 EventLoop 登出并且無法處理任何 I/O 時被調用

channelActive : 當 Channel 處于活動狀态時被調用;Channel 已經連接配接/綁定并且已經就緒

channelInactive : 當 Channel 離開活動狀态并且不再連接配接它的遠端節點時被調用

channelReadComplete : 當Channel上的一個讀操作完成時被調用

channelRead : 當從 Channel 讀取資料時被調用

ChannelWritabilityChanged :當 Channel 的可寫狀态發生改變時被調用。

userEventTriggered : 當 ChannelnboundHandler.fireUserEventTriggered()方法被調用時被調用,因為一個 POJO 被傳經了 ChannelPipeline

-- 處理出站資料并且允許攔截所有的操作 --

bind : 當請求将 Channel 綁定到本地位址時被調用

connect : 當請求将 Channel 連接配接到遠端節點時被調用

disconnect : 當請求将 Channel 從遠端節點斷開時被調用

close : 當請求關閉 Channel 時被調用

deregister(5.0中被廢棄) : 當請求将 Channel 從它的 EventLoop 登出時被調用

read : 當請求從 Channel 讀取更多的資料時被調用

flush : 當請求通過 Channel 将入隊資料沖刷到遠端節點時被調用

write :當請求通過 Channel 将資料寫到遠端節點時被調用

三、ChannelPipeline

    ChannelPipeline 是一個攔截流經 Channel 的入站和出站事件的ChannelHandler 執行個體鍊,它和 ChannelHandler 之間的互動組成了應用程式資料和事件處理邏輯的核心,而它們之間的關聯互動就是通過 ChannelHandlerContext。

    如果一個入站事件被觸發,它将被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。如圖,Netty 總是将 ChannelPipeline 的入站口作為頭部,而将出站口作為尾端,如圖,第一個被入站事件看到的 ChannelHandler 将是1,而第一個被出站事件看到的是 ChannelHandler 将是 5。稍微總結下這句拗口的話,入站事件順序執行(1—>2—>3—>4—>5)、出站事件逆序執行(5—>4—>3—>2—>1)。

Netty 系列四(ChannelHandler 和 ChannelPipeline).

    既然 ChannelPipeline 是 ChannelHandler鍊 的容器,讓我們來看看ChannelPipeline 是如何管理 ChannelHandler的吧!

addFirst : 将一個 ChannelHandler 添加到 ChannelPipeline 最開始位置中

addBefore :将一個 ChannelHandler 添加到 ChannelPipeline 某個ChannelHandler前

addAfter:将一個 ChannelHandler 添加到 ChannelPipeline 某個ChannelHandler後

addLast : 将一個 ChannelHandler 添加到 ChannelPipeline 最末尾位置

remove :将一個 ChannelHandler 從 ChannelPipeline 中移除

replace :将 ChannelPipeline 中的一個 ChannelHandler 替換為另一個 ChannelHandler

get :通過類型或者名稱傳回 ChannelHandler

context :傳回和 ChannelHandler 綁定的 ChannelHandlerContext

names :傳回 ChannelPipeline 中所有 ChannelHandler 的名稱

    ChannelPipeline 的API 用于調用入站操作的附加方法:

fireChannelRegistered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法

fireChannelUnregistered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelUnregistered(ChannelHandlerContext)方法

fireChannelActive: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelActive(ChannelHandlerContext)方法

fireChannelInactive: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelInactive(ChannelHandlerContext)方法

fireExceptionCaught: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的exceptionCaught(ChannelHandlerContext, Throwable)方法

fireUserEventTriggered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的userEventTriggered(ChannelHandlerContext, Object)方法

fireChannelRead: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelRead(ChannelHandlerContext, Object msg)方法

fireChannelReadComplete: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法

fireChannelWritabilityChanged: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelWritabilityChanged(ChannelHandlerContext)方法

    ChannelPipeline 的API 用于調用出站操作的附加方法:

bind: 将 Channel 綁定到一個本地位址,這将調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 bind方法

connect: 将 Channel 連接配接到一個遠端位址,這将調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 connect方法

disconnect: 将 Channel 斷開連接配接。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 disconnect方法

close: 将 Channel 關閉。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 close方法

deregister(5.0中被廢棄): 将 Channel 從它先前所配置設定的 EventExecutor(即 EventLoop)中登出。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 deregister方法

flush: 沖刷Channel所有挂起的寫入。這将調用ChannelPipeline中的下一個ChannelOutboundHandler 的 flush方法

write: 将消息寫入 Channel。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler的write方法。注意:這并不會将消息寫入底層的 Socket,而隻會将它放入隊列中。要将它寫入 Socket,需要調用 flush()或者 writeAndFlush()方法

writeAndFlush: 這是一個先調用 write()方法再接着調用 flush()方法的便利方法

read: 請求從 Channel 中讀取更多的資料。這将調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 read(ChannelHandlerContext)方法

四、ChannelHandlerContext

    ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯。

    ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一點重要的不同。如果調用 Channel 或者 ChannelPipeline 上的這些方法,它們将沿着整個 ChannelPipeline 進行傳播。而調用位于 ChannelHandlerContext上的相同方法,則将從目前所關聯的 ChannelHandler 開始,并且隻會傳播給位于該ChannelPipeline 中的下一個能夠處理該事件的 ChannelHandler。是以,盡量使用 ChannelHandlerContext 的同名方法來處理邏輯,因為它将産生更短的事件流, 應該盡可能地利用這個特性來獲得最大的性能。

五、異常處理

    入站異常處理:

1、ChannelHandler.exceptionCaught()的預設實作是簡單地将目前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;

2、如果異常到達了 ChannelPipeline 的尾端,它将會被記錄為未被處理;

3、要想定義自定義的處理邏輯,你需要重寫 exceptionCaught()方法。一般将這個Channelhandler放在 ChannelPipeline 的最後,確定所有的入站異常都總會被處理。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.channel().close();
}      

    出站異常處理:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    //出站異常處理
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    });
}      

六、寄語

    有時候也會迷茫,身邊的人理論基礎差一些的的,代碼不一樣敲的好好的?而我花大量時間細細的去研究這麼理論真的值得嗎?仔細想想,人生很多事情本來就是徒勞無功的啊,沒必要急功近利,欲速則不達。要堅信,一切的付出總會在人生的某個時刻回報在我們身上。