天天看點

Netty 架構學習 —— ChannelHandler 與 ChannelPipeline

ChannelHandler

1. Channel 生命周期

Channel 的生命周期狀态如下:

狀态 描述
ChannelUnregistered Channel 已經被建立,但還未注冊到 EventLoop
ChannelRegistered Channel 已經被注冊到 EventLoop
ChannelActive Channel 處于活動狀态(已經連接配接到它的遠端節點),可以接收和發送資料
ChannelInactive Channel 沒有連接配接到遠端節點

Channel 的生命周期按上表從上到下所示,當狀态發生改變時,将會生成對應的事件。這些事件将會被轉發到 ChannelPipeline 中的 ChannelHandler,随後其可以做出響應

2. ChannelHandler 生命周期

下表列出了 ChannelHandler 定義的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 或者被從 ChannelPipeline 移除時會調用這些操作,這些方法中的每一個都接受一個 ChannelHandlerContext 參數

類型 描述
handlerAdded 當把 ChannelHandler 添加到 ChannelPipeline 中時被調用
handlerRemoved 當從 ChannelPipeline 中移除 ChannelHandler 時被調用
exceptionCaught 當處理過程中在 ChannelPipeline 中有錯誤産生時被調用

3. ChannelInboundHandler 接口

ChannelInboundHandler 是 ChannelHandler 接口的子接口,處理入站資料以及各種狀态變化。下表列出 ChannelInboundHandler 的生命周期方法,這些方法将會在資料被接收時或者與其對應的 Channel 狀态發生改變時被調用

類型 描述
channelRegistered 當 Channel 已經注冊到它的 EventLoop 并且能夠處理 IO 時被調用
channelUnregistered 當 Channel 從它的 EventLoop 登出并且無法處理任何 IO 時被調用
channelActive 當 Channel 處于活動狀态時被調用,Channel 已經連接配接/綁定并且就緒
channelInactive 當 Channel 離開活動狀态并且不再連接配接它的遠端節點時被調用
channelReadComplete 當 Channel 上的一個讀操作完成時被調用
channelRead 當從 Channel 讀取資料時被調用
ChannelWritabilityChanged 當 Channel 的可寫狀态發生改變時被調用
useEventTriggered 當 ChannelInboundHandler.fireUserEventTriggered() 方法被調用時被調用,因為一個 POJO 流經 ChannelPipeline

當某個 ChannelInboundHandler 的實作重寫 channelRead() 方法時,它将負責顯式地釋放與池化 ByteBuf 執行個體相關的記憶體。Netty 為此提供了一個實用方法 ReferenceCountUtil.release()

@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ReferenceCountUtil.release(msg);
    }
}
           

這種方式可能會很煩瑣,另一種更加簡單的方式是使用 SimpleChannelInboundHandler

@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandlerAdapter<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        // 不需要顯式進行資源釋放
    }
}
           

由于 SimpleChannelInboundHandler 會自動釋放資源,是以你不應該存儲任何指向消息的引用,因為這些引用都将會失效

4. ChannelOutboundHandler 接口

出站操作和資料由 ChannelOutboundHandler 處理,它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 調用

ChannelOutboundHandler 的一個強大的功能是可以按需推遲操作或者事件,使得可以通過一些複雜的方法來處理請求,例如,如果到遠端節點的寫入被暫停了,那麼你可以推遲沖刷操作并在稍後繼續

下标顯示了所有由 ChannelOutboundHandler 本身所定義的方法

類型 描述
bind(ChannelHandlerContext, SocketAddress, ChannelPromise) 當請求将 Channel 綁定到本地位址時被調用
connect(ChannelHandlerContext, SocketAddress, ChannelPromise) 當請求将 Channel 連接配接到遠端節點時被調用
disconnect(ChannelHandlerContext, ChannelPromise) 當請求将 Channel 從遠端節點斷開時被調用
close(ChannelHandlerContext, ChannelPromise) 當請求關閉 Channel 時被調用
deregister(ChannelHandlerContext, ChannelPromise) 當請求将 Channel 從它的 EventLoop 登出時被調用
read(ChannelHandlerContext) 當請求從 Channel 讀取更多的資料時被調用
flush(ChannelHandlerContext) 當請求通過 Channel 将入隊資料沖刷到遠端節點時被調用
write(ChannelHandlerContext, Object, ChannelPromise) 當請求通過 Channel 将資料寫到遠端節點時被調用

ChannelOutboundHandler 中的大部分方法都需要一個 ChannelPromise 參數,以便在操作完成時得到通知。CHannelPromise 是 ChannelFuture 的一個子類,定義了一些可寫方法

5. ChannelHandler 擴充卡

可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 類作為自己的 Channel 的起始點,這兩個擴充卡分别提供了 ChannelInboundHandler 和 CHannelOutboundHandler 的基本實作,并擴充抽象類 ChannelHandlerAdapter

ChannelHandlerAdapter 還提供了實用方法 isSharable(),如果其對應的實作被标記為 Sharable,那麼這個方法将傳回 true,表示它可以被添加到多個 ChannelPipeline 中

如果想在自己的 ChannelHandler 中使用這些擴充卡類,隻需要簡單地擴充它們,并重寫自定義方法

ChannelPipeline 接口

可以把 ChannelPipeline 了解成一個攔截流經 Channel 的入站和出站事件的 ChannelHandler 執行個體鍊,每一個新建立的 Channel 都将會被配置設定一個新的 ChannelPipeline,Channel 不能附加到另一個 ChannelPipeline,也不能從目前分離。根據事件的起源,事件将會被 ChannelInboundHandler 或者 ChannelOutboundHandler 處理,随後,通過調用 ChannelHandlerContext 實作,它将被轉發給同一超類型的下一個 ChannelHandler

一個典型的同時具有入站和出站 ChannelHandler 的 ChannelPipeline 布局如圖所示

Netty 架構學習 —— ChannelHandler 與 ChannelPipeline

在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相比對,如果不比對,ChannelPipeline 将跳過該 ChannelHandler 并前進到下一個,直到找到和該事件所期望的方向相比對

1. 修改 ChannelPipeline

通過調用 ChannelPipeline 上的相關方法,ChannelHandler 可以添加、删除或者替換其他的 ChannelHandler,當然也包括它自己

下表列出了由 ChannelHandler 修改 ChannelPipeline 的相關方法

名稱 描述

addFirst

addBefore

addAfter

addLast

将一個 ChannelHandler 添加到 ChannelPipeline
remove 将一個 ChannelHandler 從 ChannelPipeline 中移除
replace 将 ChannelPipeline 中的一個 ChannelHandler 替換為另一個 ChannelHandler

ChannelPipeline 的用于通路 ChannelHandler 的操作

名稱 描述
get 通過類型或者名稱傳回 ChannelHandler
context 傳回和 ChannelHandler 綁定的 ChannelHandlerContext
names 傳回 ChannelPipeline 中所有 ChannelHandler 的名稱

2. 觸發事件

ChannelPipeline 的 API 公開了用于調用入站和出站操作的附加方法

ChannelPipeline 的入站操作如表

方法名稱 描述
fireChannelRegistered 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext) 方法
fireChannelUnregistered 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelUnregistered(ChannelHandlerContext) 方法
fireChannelActive 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelActive(ChannelHandlerContext) 方法
fireChannelInactive 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelInactive(ChannelHandlerContext) 方法
fireExceptionCaught 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 exceptionCaught(ChannelHandlerContext, Throwable) 方法
fireUserEventTriggered 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 userEventTriggered(ChannelHandlerContext, Object) 方法
fireChannelRead 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRead(ChannelHandlerContext) 方法
fireChannelReadComplete 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelReadComplete(ChannelHandlerContext) 方法
fireChannelWritabilityChanged 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelWritabilityChanged(ChannelHandlerContext) 方法

ChannelPipeline 的出站操作如表

方法名稱 描述
bind 将 Channel 綁定到一個本地位址,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 bind(ChannelHandlerContext, SocketAddress, ChannelPromise) 方法
connect 将 Channel 綁定到一個遠端位址,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 connect(ChannelHandlerContext, SocketAddress, ChannelPromise) 方法
disconnect 将 Channel 斷開連接配接,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 disconnect(ChannelHandlerContext, ChannelPromise) 方法
close 将 Channel 關閉,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 close(ChannelHandlerContext, ChannelPromise) 方法
deregister 将 Channel 從它先前所配置設定的 EventExecutor(即 EventLoop)中登出,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 deregister(ChannelHandlerContext, ChannelPromise) 方法
flush 沖刷 Channel 所有挂起的寫入,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 flush(ChannelHandlerContext) 方法
write 将消息寫入 Channel,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 write(ChannelHandlerContext, Object, ChannelPromise) 方法
writeAndFlush 這是一個先調用 write() 方法再接着調用 flush() 方法的便利方法
read 請求從 Channel 中讀取更多的資料,這将調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 read(ChannelHandlerContext) 方法

總結一下:

  • ChannelPipeline 儲存了與 Channel 相關聯的 ChannelHandler
  • ChannelPipeline 可以根據需要,動态修改 ChannelHandler
  • ChannelPipeline 有豐富的 API 用以響應入站和出站事件

ChannelContext 接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有 ChannelHandler 添加到 ChannelPipeline 中時,都會建立 ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所關聯的 ChannelHandler 和在同一個 ChannelPipeline 中的其他 ChannelHandler 之間的互動

ChannelHandlerContext 有很多方法,其中一些方法也存在于 Channel 和 ChannelPipeline 上,不同的是,如果調用 Channel 或者 ChannelHandlerPipeline 上的這些方法,它們将沿着整個 ChannelPipeline 進行傳播。而調用位于 ChannelHandlerContext 上的相同方法,則将從目前所關聯的 ChannelHandler 開始,并且隻會傳播給位于該 ChannelPipeline 中的下一個能處理該事件的 ChannelHandler

1. 使用 ChannelHandlerContext

Channel、ChannelPipeline、ChannelHandler 以及 ChannelHandlerContext 之間的關系如圖所示

Netty 架構學習 —— ChannelHandler 與 ChannelPipeline

通過 ChannelHandler 或 ChannelPipeline 操作,事件将傳播整個 ChannelPipeline,但在 ChannelHandler 的級别上,事件從上一個 ChannelHandler 到下一個 ChannelHandler 的移動是由 ChannelHandlerContext 上的調用完成的

Netty 架構學習 —— ChannelHandler 與 ChannelPipeline

為什麼想要從 ChannelPipeline 中的某個特定點開始傳播事件?

  • 減少将事件傳經對它不感興趣的 ChannelHandler 所帶來的開銷
  • 避免将事件傳經那些可能會對它感興趣的 ChannelHandler

要想調用從某個特定的 ChannelHandler 開始的處理過程,必須擷取該 ChannelHandler 之前的 ChannelHandler 所關聯的 ChannelHandlerContext,這個 ChannelHandlerContext 将調用和它所關聯的 ChannelHandler 之後的 ChannelHandler

我們還可以通過調用 ChannelHandlerContext 上的 pipeline() 方法來獲得 ChannelPipeline 的引用,進而得以在運作時操作 ChannelHandler

異常處理

1. 處理入站異常

如果在處理入站事件的過程中有異常抛出,那麼它将從它在 ChannelInboundHandler 裡被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你需要在你的 ChannelInboundHandler 實作重寫以下方法,否則預設将異常轉發給下一個 ChannelHandler:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
           

因為異常會按照入站方向繼續流動,是以實作了處理異常邏輯的 ChannelInboundHandler 通常位于 ChannelPipeline 的最後,確定異常總能被處理

2. 處理出站異常

用于處理出站操作的正常完成以及異常的選項,都基于以下通知機制:

  • 每個出站操作都将傳回一個 ChannelFuture,注冊到 ChannelFuture 的 ChannelFutureListener 将在操作完成時通知結果
  • 幾乎所有的 ChannelOutboundHandler 上的方法都會傳入一個 ChannelPromise 執行個體,作為 ChannelFuture 的子類,ChannelPromise 也可以被配置設定用于異步通知的監聽器

添加 ChannelFutureListener 隻需要調用 ChannelFuture執行個體上的 addListener(ChannelFutureListener) 方法,并且有兩種方式可以做到,第一種是調用出站操作(如 write 方法)所傳回的 ChannelFuture 上的 addListener() 方法;第二種是 ChannelFutureListener 添加到作為參數傳遞的 ChannelOutboundHandler 的方法的 ChannelPromise

是以,如果你的 ChannelOutboundHandler 抛出異常,Netty 本身會通知任何已經注冊到對應 ChannelPromise 的注冊器