一、概念
先來整體的介紹一下這篇博文要介紹的幾個概念(Channel、ChannelHandler、ChannelPipeline、ChannelHandlerContext、ChannelPromise):
Channel:Netty 中傳入或傳出資料的載體;
ChannelHandler:Netty 中處理入站和出站資料的應用程式邏輯的容器;
ChannelPipeline:ChannelHandler鍊 的容器;
ChannelHandlerContext:代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有ChannelHandler 添加到 ChannelPipeline 中時,都會建立 ChannelHandlerContext;
ChannelPromise:ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 進而使ChannelFuture不可變。
我們來舉一個例子描述這些概念之間的邏輯關系:服務端接收到用戶端的連接配接請求,建立一個Channel同用戶端進行綁定,新建立的 Channel 會都将會被配置設定一個新的ChannelPipeline(這項關聯是永久性的,Channel 既不會附加另外一個ChannelPipeline,也不能分離目前的)。而 ChannelPipeline 作為 ChannelHandler鍊 的容器,當Channel 生命周期中狀态發生改變時,将會生成對應的事件,這些事件将會被 ChannelPipeline 中 ChannelHandler 所響應,響應方法的參數一般都有一個 ChannelHandlerContext ,一個 ChannelHandler 對應一個 ChannelHandlerContext,ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,這項關聯也是永遠不會改變的。

二、ChannelHandler
Netty提供了大量預定義的可以開箱即用的ChannelHandler實作,包括用于各種協定的ChannelHandler。是以,我們在自定義ChannelHandler實作用于處理我們的程式邏輯時,隻需要繼承Netty 的一些預設實作即可,主要有兩種:
1、繼承 ChannelHandlerAdapter (在4.0 中 處理入站事件繼承 ChannelInboundHandlerAdapter,處理出站事件繼承 ChannelOutboundHandlerAdapter ;在5.0 推薦直接繼承 ChannelHandlerAdapter)
2、繼承 SimpleChannelInboundHandler
這兩種方式有什麼差別呢? 當我們處理 入站資料 和 出站資料時,都需要確定沒有任何的資源洩露。在入站方向,繼承 SimpleChannelInboundHandler 的實作類會在消息被處理之後自動處理消息,而繼承 ChannelHandlerAdapter 的實作類需要手動的釋放消息(ReferenceCountUtil.release(msg));在出站方向,不管繼承的是哪一種的實作類,當你處理了 write() 操作并丢棄了一個消息,那麼你就應該釋放它,不僅如此,還要通知 ChannelPromise。否則可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的情況。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ReferenceCountUtil.release(msg);
//ChannelPromise 是ChannelFuture的一個子類,設定成 true 通知 ChannelFutureListener 消息已經被處理了
//當一個 Promise 被完成之後,其對應的 Future 的值便不能再進行任何修改了
promise.setSuccess();
}
tips:總之,如果一個消息被消費或者丢棄了, 并且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那麼使用者就有責任調用 ReferenceCountUtil.release()。
來看看 ChannelHandler 的 API:
isSharable :如果其對應的實作被标注為 Sharable, 那麼這個方法将傳回 true, 表示它可以被添加到多個 ChannelPipeline中
-- ChannelHandler 生命周期方法 --
handlerAdded :當把 ChannelHandler 添加到 ChannelPipeline 中時被調用
handlerRemoved :當從 ChannelPipeline 中移除 ChannelHandler 時被調用
exceptionCaught : 當處理過程中在 ChannelPipeline 中有錯誤産生時被調用
-- 處理入站資料以及各種狀态變化 --
channelRegistered : 當 Channel 已經注冊到它的 EventLoop 并且能夠處理 I/O 時被調用
channelUnregistered : 當 Channel 從它的 EventLoop 登出并且無法處理任何 I/O 時被調用
channelActive : 當 Channel 處于活動狀态時被調用;Channel 已經連接配接/綁定并且已經就緒
channelInactive : 當 Channel 離開活動狀态并且不再連接配接它的遠端節點時被調用
channelReadComplete : 當Channel上的一個讀操作完成時被調用
channelRead : 當從 Channel 讀取資料時被調用
ChannelWritabilityChanged :當 Channel 的可寫狀态發生改變時被調用。
userEventTriggered : 當 ChannelnboundHandler.fireUserEventTriggered()方法被調用時被調用,因為一個 POJO 被傳經了 ChannelPipeline
-- 處理出站資料并且允許攔截所有的操作 --
bind : 當請求将 Channel 綁定到本地位址時被調用
connect : 當請求将 Channel 連接配接到遠端節點時被調用
disconnect : 當請求将 Channel 從遠端節點斷開時被調用
close : 當請求關閉 Channel 時被調用
deregister(5.0中被廢棄) : 當請求将 Channel 從它的 EventLoop 登出時被調用
read : 當請求從 Channel 讀取更多的資料時被調用
flush : 當請求通過 Channel 将入隊資料沖刷到遠端節點時被調用
write :當請求通過 Channel 将資料寫到遠端節點時被調用
三、ChannelPipeline
ChannelPipeline 是一個攔截流經 Channel 的入站和出站事件的ChannelHandler 執行個體鍊,它和 ChannelHandler 之間的互動組成了應用程式資料和事件處理邏輯的核心,而它們之間的關聯互動就是通過 ChannelHandlerContext。
如果一個入站事件被觸發,它将被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。如圖,Netty 總是将 ChannelPipeline 的入站口作為頭部,而将出站口作為尾端,如圖,第一個被入站事件看到的 ChannelHandler 将是1,而第一個被出站事件看到的是 ChannelHandler 将是 5。稍微總結下這句拗口的話,入站事件順序執行(1—>2—>3—>4—>5)、出站事件逆序執行(5—>4—>3—>2—>1)。
既然 ChannelPipeline 是 ChannelHandler鍊 的容器,讓我們來看看ChannelPipeline 是如何管理 ChannelHandler的吧!
addFirst : 将一個 ChannelHandler 添加到 ChannelPipeline 最開始位置中
addBefore :将一個 ChannelHandler 添加到 ChannelPipeline 某個ChannelHandler前
addAfter:将一個 ChannelHandler 添加到 ChannelPipeline 某個ChannelHandler後
addLast : 将一個 ChannelHandler 添加到 ChannelPipeline 最末尾位置
remove :将一個 ChannelHandler 從 ChannelPipeline 中移除
replace :将 ChannelPipeline 中的一個 ChannelHandler 替換為另一個 ChannelHandler
get :通過類型或者名稱傳回 ChannelHandler
context :傳回和 ChannelHandler 綁定的 ChannelHandlerContext
names :傳回 ChannelPipeline 中所有 ChannelHandler 的名稱
ChannelPipeline 的API 用于調用入站操作的附加方法:
fireChannelRegistered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelUnregistered(ChannelHandlerContext)方法
fireChannelActive: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelActive(ChannelHandlerContext)方法
fireChannelInactive: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelInactive(ChannelHandlerContext)方法
fireExceptionCaught: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的exceptionCaught(ChannelHandlerContext, Throwable)方法
fireUserEventTriggered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的userEventTriggered(ChannelHandlerContext, Object)方法
fireChannelRead: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelRead(ChannelHandlerContext, Object msg)方法
fireChannelReadComplete: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelWritabilityChanged(ChannelHandlerContext)方法
ChannelPipeline 的API 用于調用出站操作的附加方法:
bind: 将 Channel 綁定到一個本地位址,這将調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 bind方法
connect: 将 Channel 連接配接到一個遠端位址,這将調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 connect方法
disconnect: 将 Channel 斷開連接配接。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 disconnect方法
close: 将 Channel 關閉。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 close方法
deregister(5.0中被廢棄): 将 Channel 從它先前所配置設定的 EventExecutor(即 EventLoop)中登出。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 deregister方法
flush: 沖刷Channel所有挂起的寫入。這将調用ChannelPipeline中的下一個ChannelOutboundHandler 的 flush方法
write: 将消息寫入 Channel。這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler的write方法。注意:這并不會将消息寫入底層的 Socket,而隻會将它放入隊列中。要将它寫入 Socket,需要調用 flush()或者 writeAndFlush()方法
writeAndFlush: 這是一個先調用 write()方法再接着調用 flush()方法的便利方法
read: 請求從 Channel 中讀取更多的資料。這将調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 read(ChannelHandlerContext)方法
四、ChannelHandlerContext
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯。
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一點重要的不同。如果調用 Channel 或者 ChannelPipeline 上的這些方法,它們将沿着整個 ChannelPipeline 進行傳播。而調用位于 ChannelHandlerContext上的相同方法,則将從目前所關聯的 ChannelHandler 開始,并且隻會傳播給位于該ChannelPipeline 中的下一個能夠處理該事件的 ChannelHandler。是以,盡量使用 ChannelHandlerContext 的同名方法來處理邏輯,因為它将産生更短的事件流, 應該盡可能地利用這個特性來獲得最大的性能。
五、異常處理
入站異常處理:
1、ChannelHandler.exceptionCaught()的預設實作是簡單地将目前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;
2、如果異常到達了 ChannelPipeline 的尾端,它将會被記錄為未被處理;
3、要想定義自定義的處理邏輯,你需要重寫 exceptionCaught()方法。一般将這個Channelhandler放在 ChannelPipeline 的最後,確定所有的入站異常都總會被處理。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
出站異常處理:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
//出站異常處理
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.cause().printStackTrace();
future.channel().close();
}
}
});
}
六、寄語
有時候也會迷茫,身邊的人理論基礎差一些的的,代碼不一樣敲的好好的?而我花大量時間細細的去研究這麼理論真的值得嗎?仔細想想,人生很多事情本來就是徒勞無功的啊,沒必要急功近利,欲速則不達。要堅信,一切的付出總會在人生的某個時刻回報在我們身上。