Netty的ChannelPipeline和ChannelHandler機制類似于Servlet和Filter過濾器,這類攔截器實際上是職責鍊模式的一種變形,主要是為了友善事件的攔截和使用者業務邏輯的定制。Netty的Channel過濾器實作原理與Servlet Filter機制一緻,它将Channel的資料管道抽象為ChannelPipeline,消息在ChannelPipeline中流動和傳遞。ChannelPipeline持有I/O事件攔截器ChannelHandler的連結清單,由ChannelHandler對I/O事件進行攔截和處理,可以友善地通過新增和删除ChannelHandler來實作不同的業務邏輯定制,不需要對已有的ChannelHandler進行修改,能夠實作對修改封閉和對擴充的支援。
ChannelPipeline是ChannelHandler的容器,它負責ChannelHandler的管理和事件攔截與排程。
一個消息被ChannelPipeline的ChannelHandler鍊攔截和處理的全過程:
(1)底層的SocketChannel read()方法讀取ByteBuf,觸發ChannelRead事件,由I/O線程NioEventLoop調用ChannelPipeline的fireChannelRead(Object msg)方法,将消息(ByteBuf)傳輸到ChannelPipeline中;
(2)消息依次被HeadHandler、ChannelHandler1、ChannelHandler2……TailHandler攔截和處理,在這個過程中,任何ChannelHandler都可以中斷目前的流程,結束消息的傳遞;
(3)調用ChannelHandlerContext的write方法發送消息,消息從TailHandler開始,途經ChannelHandlerN……ChannelHandler1、HeadHandler,最終被添加到消息發送緩沖區中等待重新整理和發送,在此過程中也可以中斷消息的傳遞,例如當編碼失敗時,就需要中斷流程,構造異常的Future傳回。
Netty中的事件分為inbound事件和outbound事件。inbound事件通常由I/O線程觸發,例如TCP鍊路建立事件、鍊路關閉事件、讀事件、異常通知事件等。
觸發inbound事件的方法如下。
(1)ChannelHandlerContext.fireChannelRegistered():Channel注冊事件;
(2)ChannelHandlerContext.fireChannelActive():TCP鍊路建立成功,Channel激活事件;
(3)ChannelHandlerContext.fireChannelRead(Object):讀事件;
(4)ChannelHandlerContext.fireChannelReadComplete():讀操作完成通知事件;
(5)ChannelHandlerContext.fireExceptionCaught(Throwable):異常通知事件;
(6)ChannelHandlerContext.fireUserEventTriggered(Object):使用者自定義事件;
(7)ChannelHandlerContext.fireChannelWritabilityChanged():Channel的可寫狀态變化通知事件;
(8)ChannelHandlerContext.fireChannelInactive():TCP連接配接關閉,鍊路不可用通知事件。
Outbound事件通常是由使用者主動發起的網絡I/O操作,例如使用者發起的連接配接操作、綁定操作、消息發送等操作,它對應圖17-1的右半部分。
觸發outbound事件的方法如下:
(1)ChannelHandlerContext.bind(SocketAddress, ChannelPromise):綁定本地位址事件;
(2)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):連接配接服務端事件;
(3)ChannelHandlerContext.write(Object, ChannelPromise):發送事件;
(4)ChannelHandlerContext.flush():重新整理事件;
(5)ChannelHandlerContext.read():讀事件;
(6)ChannelHandlerContext.disconnect(ChannelPromise):斷開連接配接事件;
(7)ChannelHandlerContext.close(ChannelPromise):關閉目前Channel事件。
ChannelPipeline通過ChannelHandler接口來實作事件的攔截和處理,由于ChannelHandler中的事件種類繁多,不同的ChannelHandler可能隻需要關心其中的某一個或者幾個事件,是以,通常ChannelHandler隻需要繼承ChannelHandlerAdapter類覆寫自己關心的方法即可。
例如,下面的例子展示了攔截Channel Active事件,列印TCP鍊路建立成功日志,代碼如下:
事實上,使用者不需要自己建立pipeline,因為使用ServerBootstrap或者Bootstrap啟動服務端或者用戶端時,Netty會為每個Channel連接配接建立一個獨立的pipeline。對于使用者而言,隻需要将自定義的攔截器加入到pipeline中即可。
對于類似編解碼這樣的ChannelHandler,它存在先後順序,例如MessageToMessageDecoder,在它之前往往需要有ByteToMessageDecoder将ByteBuf解碼為對象,然後對對象做二次解碼得到最終的POJO對象。
ChannelPipeline支援運作态動态的添加或者删除ChannelHandler,在某些場景下這個特性非常實用。例如當業務高峰期需要對系統做擁塞保護時,就可以根據目前的系統時間進行判斷,如果處于業務高峰期,則動态地将系統擁塞保護ChannelHandler添加到目前的ChannelPipeline中,當高峰期過去之後,就可以動态删除擁塞保護ChannelHandler了。
ChannelPipeline是線程安全的,這意味着N個業務線程可以并發地操作ChannelPipeline而不存在多線程并發問題。但是,ChannelHandler卻不是線程安全的,這意味着盡管ChannelPipeline是線程安全的,但是使用者仍然需要自己保證ChannelHandler的線程安全。
ChannelPipeline的代碼相對比較簡單,它實際上是一個ChannelHandler的容器,内部維護了一個ChannelHandler的連結清單和疊代器,可以友善地實作ChannelHandler查找、添加、替換和删除。
ChannelPipeline是ChannelHandler的管理容器,負責ChannelHandler的查詢、添加、替換和删除,它與Map等容器的實作非常類似。
由于ChannelPipeline支援運作期動态修改,在調用類似addBefore(ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler)方法時,存在兩種潛在的多線程并發通路場景。
I/O線程和使用者業務線程的并發通路;
使用者多個線程之間的并發通路。
為了保證ChannelPipeline的線程安全性,需要通過線程安全容器或者鎖來保證并發通路的安全,此處Netty直接使用了synchronized關鍵字,保證同步塊内的所有操作的原子性。首先根據baseName擷取它對應的DefaultChannelHandlerContext,ChannelPipeline維護了ChannelHandler名和ChannelHandlerContext執行個體的映射關系。 新增的ChannelHandler名進行重複性校驗,如果已經有同名的ChannelHandler存在,則不允許覆寫,抛出IllegalArgumentException("Duplicate handler name: " + name)異常。校驗通過之後,使用新增的ChannelHandler等參數構造一個新的DefaultChannelHandlerContext執行個體。将新建立的DefaultChannelHandlerContext添加到目前的pipeline中(首先需要對添加的ChannelHandlerContext做重複性校驗,如果ChannelHandler不是可以在多個ChannelPipeline中共享的,且已經被添加到ChannelPipeline中,則抛出ChannelPipelineException異常。),加入成功之後,緩存ChannelHandlerContext,發送新增ChannelHandlerContext通知消息。

當發生某個I/O事件的時候,例如鍊路建立、鍊路關閉、讀取操作完成等,都會産生一個事件,事件在pipeline中得到傳播和處理,它是事件處理的總入口。由于網絡I/O相關的事件有限,是以Netty對這些事件進行了統一抽象,Netty自身和使用者的ChannelHandler會對感興趣的事件進行攔截和處理。
pipeline中以fireXXX命名的方法都是從IO線程流向使用者業務Handler的inbound事件,它們的實作因功能而異,但是處理步驟類似,總結如下。
(1)調用HeadHandler對應的fireXXX方法;
(2)執行事件相關的邏輯操作。
以fireChannelActive方法為例,調用head.fireChannelActive()之後,判斷目前的Channel配置是否自動讀取,如果為真則調用Channel的read方法
由使用者線程或者代碼發起的I/O操作被稱為outbound事件,事實上inbound和outbound是Netty自身根據事件在pipeline中的流向抽象出來的術語,在其他NIO架構中并沒有這個概念。
Pipeline本身并不直接進行I/O操作,最終都是由Unsafe和Channel來實作真正的I/O操作的。Pipeline負責将I/O事件通過HeadHandler進行排程和傳播,最終調用Unsafe的I/O方法進行I/O操作。最終由TailHandler調用Unsafe的connect方法發起真正的連接配接,pipeline僅僅負責事件的排程。