ChannelHandler
netty中的ChannelHandler用于處理Channel對應的事件,每一個ChannelHandler都會和一個channel綁定,ChannelHandler體系如下:

ChannelHandler接口裡面隻定義了三個生命周期方法:
void handlerAdded(ChannelHandlerContext var1) throws Exception;
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
子接口ChannelInboundHandler和ChannelOutboundHandler對ChannelHandler進行了擴充,但netty架構提供了ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler三個适配類,實際使用時繼承這些适配類即可。
此外還有簡化類SimpleChannelInboundHandler,繼承SimpleChannelInboundHandler類後,會在接收到資料後⾃動release掉資料占⽤的Bytebuffer資源,并且繼承該類需要指定資料格式。⽽繼承ChannelInboundHandlerAdapter則不會⾃動釋放Bytebuffer資源,需要⼿動調⽤ReferenceCountUtil.release()等⽅法進⾏釋放,并且繼承該類不需要指定資料格式。
實際編寫server端時,需要繼承ChannelInboundHandlerAdapter,防⽌資料未處理完就⾃動釋放了。此外server端可能有多個用戶端連接配接,并且每⼀個用戶端請求的資料格式都不⼀緻,相比之下ChannelInboundHandlerAdapter更靈活。
用戶端根據情況可以繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸的資料格式,就不需要再進⾏格式的轉換了。
ChannelHandler中的三個生命周期方法分别對應如下場景:
目前ChannelHander加入ChannelHandlerContext中;
目前從ChannelHandlerContext中移除;
ChannelHandler回調方法出現異常時被回調.
ChannelInboundHandler和ChannelOutboundHandler
差別主要在于ChannelInboundHandler的channelRead和channelReadComplete回調和ChannelOutboundHandler的write和flush回調上,ChannelInboundHandler的channelRead回調負責執行入棧資料的decode邏輯,ChannelOutboundHandler的write負責執行出站資料的encode工作。
ChannelInboundHandler
ChannelInboundHandler定義了如下回調方法:
void channelRegistered(ChannelHandlerContext var1) throws Exception;
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
void channelActive(ChannelHandlerContext var1) throws Exception;
void channelInactive(ChannelHandlerContext var1) throws Exception;
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
其回調時機為:
channelRegistered 目前channel注冊到EventLoop;
channelUnregistered 目前channel從EventLoop取消注冊;
channelActive 目前channel激活的時候;
channelInactive 目前channel失活的時候;
channelRead 目前channel從遠端讀取到資料;
channelReadComplete channel read消費完讀取的資料的時候被觸發;
userEventTriggered 使用者事件觸發的時候;
channelWritabilityChanged channel的寫狀态變化的時候觸發。
ChannelHandlerContext作為參數,在每個回調事件處理完成之後,使用ChannelHandlerContext的fireChannelXXX方法來傳遞給pipeline中下一個ChannelHandler,netty的codec子產品和業務處理代碼分離就用到了這個鍊路處理。
ChannelOutboundHandler
ChannelOutboundHandler定義了如下回調方法:
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void read(ChannelHandlerContext var1) throws Exception;
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
void flush(ChannelHandlerContext var1) throws Exception;
回調方法觸發時機:
bind bind操作執行前觸發;
connect connect 操作執行前觸發;
disconnect disconnect 操作執行前觸發;
close close操作執行前觸發;
deregister deregister操作執行前觸發;
read read操作執行前觸發;
write write操作執行前觸發;
flush flush操作執行前觸發;
對于ChannelPromise這個參數,可以調用它的addListener注冊監聽,當回調方法所對應的操作完成後,會觸發這個監聽下面的代碼。
同樣添加監聽器的還有ChannelFuture,而ChannelFuture也是ChannelPromise的父接口:
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
...
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
...
}
例如:
ctx.writeAndFlush(toFullHttpResponse()).addListener(ChannelFutureListener.CLOSE);
ChannelFutureListener.CLOSE這個監聽器就會在writeAndFlush完成之後被調用來關閉channel:
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
ChannelHandlerContext
當ChannelHandler加入到ChannelPipeline的時候,會建立一個對應的ChannelHandlerContext并綁定,ChannelPipeline實際維護的是和ChannelHandlerContext的關系,例如在DefaultChannelPipeline:
public class DefaultChannelPipeline implements ChannelPipeline {
...
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
}
DefaultChannelPipeline會儲存第一個ChannelHandlerContext以及最後一個ChannelHandlerContext的引用。
而AbstractChannelHandlerContext中維護了next和prev指針:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
...
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
}
這樣ChannelHandlerContext之間形成了雙向連結清單。
ChannelPipeline
在Channel建立的時候,會同時建立ChannelPipeline:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
在ChannelPipeline中也會持有Channel的引用,ChannelPipeline會維護一個ChannelHandlerContext的雙向連結清單,連結清單的頭尾有預設實作:
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
}
我們添加的自定義ChannelHandler會插入到head和tail之間,以addLast為例:
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup)null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
...
this.addLast0(newCtx);
...
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
如果是ChannelInboundHandler的回調,根據插入的順序從head向tail進行鍊式調用,ChannelOutboundHandler則相反:
值得注意的是,整條鍊路的調用需要通過Channel接口直接觸發,如果使用ChannelContextHandler的接口方法間接觸發,鍊路會從該ChannelContextHandler對應的ChannelHandler開始,而不是從頭或尾開始。
ChannelPipeline入口在NioEventLoop的processSelectedKey():
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {...}
}
當為OP_READ事件時,調用unsafe.read():
@Override
public final void read() {
final ChannelPipeline pipeline = pipeline();
...
try {
do {
...
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
....
} while (allocHandle.continueReading());
} catch (Throwable t) {...}
}
pipeline.fireChannelRead(byteBuf)是入棧入口,實際上是pipeline中ChannelHandlerContext的head節點進行fireChannelRead的同語義操作。
ChannelPipeline相關元素之間的關系如下:
1. 每個Channel都會綁定且隻綁定一個ChannelPipeline,ChannelPipeline中也會持有Channel的引用;
2. ChannelPipeline持有ChannelHandlerContext鍊路;
3. 每個ChannelHandlerContext對應一個ChannelHandler;
4. ChannelHandlerContext同時也會持有ChannelPipeline引用,也就間接持有Channel引用;
5. ChannelHandler鍊路會根據Handler的類型,分為InBound和OutBound兩條鍊路,inbound處理入棧資料,outbound處理出棧資料。