文章目錄
一、前言二、pipeline 初始化
-
- 2.1 channel中的核心元件
- 2.2 pipeline預設結構
- 三、pipeline添加節點
-
- 3.0 DefaultChannelPipeline類中的addLast()方法源碼解析:Netty添加節點的時候,從源碼層面區分一個 ChannelHandlerContext到底是in還是out
- 3.1 checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗
- 3.2 newContext()源碼解析:建立節點
-
- 3.2.1 前奏:filterName()處理名稱
- 3.2.2 newContext()建立context
- 3.2.3 InBound和OutBound繼承體系
- 3.3 addLast0()方法源碼解析:尾添加節點,雙向連結清單尾插法添加四操作
- 3.4 源碼解析:callHandlerAdded0()方法,回調使用者方法并設定添加完成狀态
- 四、pipeline删除節點
-
- 4.1 源碼解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循環周遊找到待删除的節點
- 4.2 源碼解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,調整雙向連結清單指針删除
- 4.3 源碼解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回調使用者函數
- 五、面試金手指
-
- 5.1 服務端初始化、新連接配接建立:從channel到pipeline
- 5.2 pipeline初始化
-
- 5.2.1 channel中的核心元件
- 5.2.2 pipeline預設結構
- 5.3 pipeline添加節點
- 5.4 pipeline删除節點
- 六、小結
一、前言
對于前面三篇 服務端初始化、netty中實作reactor原理、新連接配接建立小結:
通過前面的源碼系列文章中的netty reactor線程三部曲,我們已經知道,netty的reactor線程就像是一個發動機,驅動着整個netty架構的運作,而服務端初始化和新連接配接的建立正是發動機的導火線,将發動機點燃
netty在服務端初始化和新連接配接建立的過程中會建立相應的channel,而與channel的動作密切相關的是pipeline這個概念,pipeline像是可以看作是一條流水線,原始的原料(位元組流)進來,經過加工,最後輸出
本文,我将以新連接配接的建立為例分為以下幾個部分給你介紹netty中的pipeline是怎麼玩轉起來的
pipeline 初始化
pipeline 添加節點
pipeline 删除節點
Pipeline第一篇:核心是pipeline非循環雙連結清單的插入和删除;
Pipeline第二篇:UnSafe接口+HeadContext類+inBound事件傳播+TailContext類+outBound事件傳播+異常Exception傳播。
二、pipeline 初始化
在新連接配接的建立這篇文章中,我們已經知道了建立NioSocketChannel的時候會将netty的核心元件建立出來
2.1 channel中的核心元件
小結:在【Netty源碼解析 003 新連接配接建立】中,對于服務端初始化和新連接配接建立,得到了channel的繼承體系,這裡對channel的成員變量解釋
一個channnel中有一個pipeline,底層功能由這個pipeline完成,如pipeline.read(),pipeline是一個非循環雙向連結清單,就是 newChannelPipeline(); new TailContext(this); new HeadContext(this);
NioSocketChannelConfig
ChannelId 就是newId,就是random
unsafe 就是 newUnsafe();
pipeline是其中的一員,在下面這段代碼中被建立
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel newChannelPipeline()源碼解析
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
接上面,DefaultChannelPipeline 對于pipeline中的tail和head的處理
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
pipeline中儲存了channel的引用,建立完pipeline之後,整個pipeline是這個樣子的
2.2 pipeline預設結構
pipeline中的每個節點是一個ChannelHandlerContext類對象,每個context節點儲存了它包裹的執行器 ChannelHandler 執行操作所需要的上下文,其實就是pipeline,因為pipeline包含了channel的引用,可以拿到所有的context資訊。
預設情況下,一條pipeline會有兩個節點,head和tail,下一篇文章【Netty中的pipeline(二)】我們具體分析這兩個特殊的節點,今天我們重點放在pipeline。
Pipeline屬性結構如下
public class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); // 沒用,列印異常
private static final String HEAD_NAME = generateName0(DefaultChannelPipeline.HeadContext.class); // 一個常量,表示head的name
private static final String TAIL_NAME = generateName0(DefaultChannelPipeline.TailContext.class); // 一個常量,表示tail的name,下一篇文章講head和tail的時候使用
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal() {
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap();
}
}; //存放名稱name的緩存,使用弱引用,表示緩存,本文newContext()建立節點的使用filterName()節點名稱的時候使用
final AbstractChannelHandlerContext head; // pipeline中的head指針,下一篇文章講述HeadContext類的使用用到
final AbstractChannelHandlerContext tail; // pipeline中的tail指針,下一篇文章講述TailContext類的使用用到
private final Channel channel; // pipeline中包含channel引用
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private Handle estimatorHandle;
private boolean firstRegistration = true;
private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
private boolean registered;
protected DefaultChannelPipeline(Channel channel) { // DefaultChannelPipeline構造函數中設定channel,初始化tail head,并設定好tail head
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
對于DefaultChannelPipeline類:
1、nameCaches:存放名稱name的緩存,使用弱引用,表示緩存,本文newContext()建立節點的使用filterName()節點名稱的時候使用
2、head tail:下一篇文章才講到
3、DefaultChannelPipeline構造函數中,設定channel,初始化tail head,并設定好tail head,就是DefaultChannelPipeline在建立号執行個體pipeline的時候,channel head tail就設定好了
三、pipeline添加節點
下面是一段非常常見的用戶端代碼
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// head outBound 寫出
p.addLast(new Spliter()) // TCP拆包 inBound 讀入
p.addLast(new Decoder()); // 解碼 inBound 讀入
p.addLast(new BusinessHandler()) // 業務處理器 inBound 讀入
p.addLast(new Encoder()); // 編碼 outBound 寫出
// tail inBound 讀入
}
});
首先,用一個spliter将來源TCP資料包拆包,然後将拆出來的包進行decoder,傳入業務處理器BusinessHandler,業務處理完encoder,輸出
整個pipeline結構如下
我用兩種顔色區分了一下pipeline中兩種不同類型的節點,
第一種類型的Handler是 ChannelInboundHandler,處理inBound事件,最典型的就是讀取資料流,加工處理;
第二種類型的Handler是 ChannelOutboundHandler, 處理outBound事件,比如當調用writeAndFlush()類方法時,就會經過該種類型的handler
不管是哪種類型的handler,其外層對象 ChannelHandlerContext 之間都是通過雙向連結清單連接配接,而區分一個 ChannelHandlerContext到底是in還是out,在添加節點的時候我們就可以看到netty是怎麼處理的
3.0 DefaultChannelPipeline類中的addLast()方法源碼解析:Netty添加節點的時候,從源碼層面區分一個 ChannelHandlerContext到底是in還是out
DefaultChannelPipeline類中的addLast()方法
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) { // 對于handlers周遊,每一個item添加到pipeline裡面去
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) { // 保證線程安全
// 1.檢查是否有重複handler 先對要添加的ChannelHandler類對象handler校驗
checkMultiplicity(handler);
// 2.建立節點 使用name和handler建立AbstractChannelHandlerContext 節點
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加節點 将建立的AbstractChannelHandlerContext 節點插入
addLast0(newCtx);
}
// 4.回調使用者方法
callHandlerAdded0(handler); // 添加完成,調用使用者自定義重寫的方法
return this;
}
這裡簡單地用synchronized方法是為了防止多線程并發操作pipeline底層的雙向連結清單
我們還是逐漸分析上面這段代碼
3.1 checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗
在使用者代碼添加一條handler的時候,首先會檢視該handler有沒有添加過
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) { // handler是ChannelHandlerAdapter的執行個體對象,就強制類型轉換
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; // handler是ChannelHandlerAdapter的執行個體對象,就強制類型轉換
if (!h.isSharable() && h.added) { // 這個實參handler不是共享,且已經被添加到pipeline裡面(added标志位為true)抛出異常
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true; // 因為下一步就要添加了,是以這裡設定這個item handler的标志位added為true
}
}
netty使用一個成員變量added辨別一個channel是否已經添加,上面這段代碼很簡單,如果目前要添加的Handler是非共享的,并且已經添加過,那就抛出異常,否則,辨別該handler已經添加
小結:checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗
如果目前要添加的Handler是非共享的,并且已經添加過,那就抛出異常,否則,辨別該handler已經添加
金手指1:
如果一個Handler如果是sharable的,就可以無限次被添加到pipeline中,我們用戶端代碼如果要讓一個Handler被共用,隻需要加一個@Sharable标注即可,如下
@Sharable
public class BusinessHandler {
}
金手指2:如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個
isSharable() 方法正是通過該Handler對應的類是否标注@Sharable來判斷的,且看isSharable() 方法的源碼
ChannelHandlerAdapter類
public boolean isSharable() {
Class<?> clazz = getClass(); //得到位元組碼對象
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
這裡也可以看到,netty為了性能優化到極緻,還使用了ThreadLocal來緩存Handler的狀态,高并發海量連接配接下,每次有新連接配接添加Handler都會建立調用此方法
3.2 newContext()源碼解析:建立節點
回到主流程,看建立上下文這段代碼
3.2.1 前奏:filterName()處理名稱
newCtx = newContext(group, filterName(name, handler), handler);
這裡我們需要先分析 filterName(name, handler) 這段代碼,這個函數用于給handler建立一個唯一性的名字
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
小結:源碼解析filterName(),給handler建立一個唯一性的名字
我們傳入的name為null,netty就給我們生成一個預設的name,否則,檢查是否有重名,檢查通過的話就傳回
netty建立預設name的規則為 簡單類名#0,下面我們來看些具體是怎麼實作的
DefaultChannelPipeline類
// nameCaches 意為名稱緩存,是FastThreadLocal類型,
// 建立FastThreadLocal類型對象,提供initialValue()方法的時候,這個initialValue()提供一個FastThreadLocal中的Map的具體類型,就是WeakHashMap,在Java四種引用類型裡面,這是弱引用,無論記憶體是否足夠,隻要 JVM 開始進行垃圾回收,那些被弱引用關聯的對象都會被回收,WeakHashMap 節約存儲空間,作為緩存一些不重要的資料,且看部落格【Java四種引用】
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
// 上面的name==null,要使用generateName生成随機名稱,這裡使用到上面的nameCaches
private String generateName(ChannelHandler handler) {
// 先檢視緩存中是否有生成過預設name
Map<Class<?>, String> cache = nameCaches.get(); // FastThreadLocal類型的nameCaches得到一個元素cache,類型為Map<Class<?>, String>,就是一個map,key為位元組碼,value為String,這個map類型的cache才是緩存本體
Class<?> handlerType = handler.getClass(); // 獲得實參item handler的位元組碼
String name = cache.get(handlerType); // cache根據key 位元組碼得到String name
// 沒有生成過,就生成一個預設name,加入緩存
if (name == null) { // 如果這個位元組碼的name==null
name = generateName0(handlerType); // 生成一個,放進去,參數是位元組碼對象
cache.put(handlerType, name); // 生成一個,放進去
}
// 生成完了或者name!=null,還要看預設name有沒有沖突
if (context0(name) != null) { // context0(name)不是null,進入,說明生成的name有沖突,要進入處理沖突,是null,生成的name沒有沖突,直接return name,結束方法
String baseName = name.substring(0, name.length() - 1); // 得到baseName就是生成的name的前面n-1位
for (int i = 1;; i ++) { // 死循環
String newName = baseName + i; // baseName + i,湊上第n位,不斷循環
if (context0(newName) == null) { // 直到找到,context0(name)是null,就是name=newName break; return name;
name = newName;
break;
}
}
}
return name; // 最後傳回生成的name
}
最後一段,檢測沖突的,這樣修改,好了解一些(不沖突,直接return name;沖突,不斷湊上最後一位,直到不沖突,傳回return newName;)
// 生成完了或者name!=null,還要看預設name有沒有沖突
if (context0(name) != null) { // context0(name)不是null,進入,說明生成的name有沖突,要進入處理沖突,是null,生成的name沒有沖突,直接return name,結束方法
String baseName = name.substring(0, name.length() - 1); // 得到baseName就是生成的name的前面n-1位
for (int i = 1;; i ++) { // 死循環 i沒有上限
String newName = baseName + i; // baseName + i,湊上第n位,不斷循環
if (context0(newName) == null) { // 直到找到,context0(name)是null,表示目前這個newName不沖突了
return newName; // 傳回不沖突的newName
}
}
}
return name; // 傳回不沖突的name
其實,上面代碼中,有兩個地方我們需要繼續深入
generateName0() 當name==null,生成name
context0(name) 判斷name是否沖突
netty使用一個 FastThreadLocal(後面的文章會細說)變量來緩存Handler的類和預設名稱的映射關系,在生成name的時候,首先檢視緩存中有沒有生成過預設name(簡單類名#0),如果沒有生成,就調用generateName0()生成預設name,然後加入緩存
接下來還需要檢查name是否和已有的name有沖突,調用context0(),查找pipeline裡面有沒有對應的context
private static String generateName0(Class<?> handlerType) { // handlerType是位元組碼對象
return StringUtil.simpleClassName(handlerType) + "#0";
}
public static String simpleClassName(Class<?> clazz) { // handlerType是位元組碼對象
String className = ((Class)ObjectUtil.checkNotNull(clazz, "clazz")).getName(); //得到單純的類名
int lastDotIdx = className.lastIndexOf(46); // 從後面數第46個
return lastDotIdx > -1?className.substring(lastDotIdx + 1):className; // 如果這個數字大于-1,就是倒數46個存在,傳回className 0~lastDotIdx+1
如果這個數字小于等于-1,其實就是等于-1,就是說倒數第46個不存在,直接傳回className整個類名
}
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next; // 得到head後繼節點,從head後繼節點開始循環周遊,是否name沖突,為什麼不是從head開始,因為head是不存儲元素的
while (context != tail) { // 隻要還沒有到尾巴,即使這裡是tail,tail也是不存放元素的,這裡隻是比較name
if (context.name().equals(name)) {
return context; // 沖突,傳回沖突的節點,其實這個傳回值沒什麼用,因為隻要判斷一下是否為null就好,甚至可以将傳回值修改為boolean類型
}
context = context.next; // 指針移動
}
return null; // 不沖突
}
context0()方法連結清單周遊每一個 ChannelHandlerContext,隻要發現某個context的名字與待添加的name相同,就傳回該context,最後抛出異常,可以看到,這個其實是一個線性搜尋的過程
如果context0(name) != null 成立,說明現有的context裡面已經有了一個預設name,那麼就從 簡單類名#1 往上一直找,直到找到一個唯一的name,比如簡單類名#3( for (int i = 1;; i ++) { // 死循環 i沒有上限)
如果使用者代碼在添加Handler的時候指定了一個name,那麼要做到事僅僅為檢查一下是否有重複
private void checkDuplicateName(String name) {
if (context0(name) != null) { // 重複,抛出異常
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
處理完name之後,就進入到建立context的過程,由前面的調用鍊得知,group為null,是以childExecutor(group)也傳回null
3.2.2 newContext()建立context
DefaultChannelPipeline類
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 底層調用childExecutor()方法
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
//..
}
DefaultChannelHandlerContext類構造方法
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); // 調用父類,pipeline executor name三個參數都是傳遞過來的,對于inbound和outbound,是使用handler作為實參調用方法得到的
if (handler == null) { // handler為null,抛出異常
throw new NullPointerException("handler");
}
// handler不為null,就指派
this.handler = handler;
}
構造函數中,DefaultChannelHandlerContext将參數回傳到父類,儲存Handler的引用,進入到其父類
AbstractChannelHandlerContext類,DefaultChannelHandlerContext的父類
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name"); // 設定name
this.pipeline = pipeline; // 設定pipeline
this.executor = executor; // 設定executor
this.inbound = inbound; // 設定inbound
this.outbound = outbound; // 設定outbound
}
netty中用兩個字段來表示這個channelHandlerContext屬于inBound還是outBound,或者兩者都是,兩個boolean是通過下面兩個小函數來判斷(見上面一段代碼)
DefaultChannelHandlerContext
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler; // 方法很簡單,是否是ChannelInboundHandler類及其子類對象,是傳回true,不是就傳回false
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler; // 方法很簡單,是否是ChannelOutboundHandler類及其子類對象,是傳回true,不是就傳回false
}
3.2.3 InBound和OutBound繼承體系
通過instanceof關鍵字根據接口類型來判斷,是以,如果一個Handler實作了兩類接口,那麼他既是一個inBound類型的Handler,又是一個outBound類型的Handler,比如下面這個ChannelDuplexHandler類:
繼承體系一句話小結:
ChannelInboundHandler和ChannelOutboundHandler兩個都是接口(一定不能執行個體化對象,是以上面instanceOf一定是它們的子類對象),都是ChannelHandler的子接口
ChannelInboundHandler有實作類ChannelInboundHandlerAdapter,這個不是重點,
重點是,ChannelDuplexHandler抽象類既是ChannelInboundHandler接口實作類,也是ChannelOutboundHandler接口實作類,
然後MessageToMessageCodec抽象類是ChannelDuplexHandler抽象類的是實作類,它可以同時完成解碼decode和編碼encode的操作。
常用的,将decode操作和encode操作合并到一起的codec,一般會繼承 MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler,它可以同時完成解碼decode和編碼encode的工作,如下:
MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
}
context 建立完了之後,接下來終于要将建立完畢的context加入到pipeline中去了
3.3 addLast0()方法源碼解析:尾添加節點,雙向連結清單尾插法添加四操作
為什麼addLast0()不是添加tail節點後面,而是添加到tail節點前面?
因為tail尾節點不存放資料,最後一個存放資料的節點就是tail節點的前驅節點。
注意:pipeline中的,head節點和tail節點都是不存放資料的
DefaultChannelPipeline類addLast0()方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1 新節點prev:尾節點前面那個
newCtx.next = tail; // 2 新節點next:尾巴
prev.next = newCtx; // 3 之前的尾節點前面那個next:新節點
tail.prev = newCtx; // 4 尾節點next:新節點
}
用下面這幅圖可見簡單的表示這段過程,說白了,其實就是一個雙向連結清單的插入操作
添加節點過程
操作完畢,該context就加入到pipeline中
添加節點之後
到這裡,pipeline添加節點的操作就完成了,你可以根據此思路掌握所有的addxxx()系列方法
3.4 源碼解析:callHandlerAdded0()方法,回調使用者方法并設定添加完成狀态
AbstractChannelHandlerContext
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx); // ctx.handler() 表示context得到handler,然後調用hander的handlerAdded()方法
ctx.setAddComplete();
}
到了第四步,pipeline中的新節點添加完成,于是便開始回調使用者代碼 ctx.handler().handlerAdded(ctx);,常見的使用者代碼如下
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 節點被添加完畢之後回調到此
// do something
}
}
接下來,設定該節點的狀态
AbstractChannelHandlerContext
final void setAddComplete() { // 設定添加成功
for (;;) { // 死循環
int oldState = handlerState; // 記錄之前的oldState
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { // 死循環,知道這個節點被删除或者被成功新增,才return結束循環,否則,不斷空轉
return;
}
}
}
用cas修改節點的狀态至:REMOVE_COMPLETE(說明該節點已經被移除) 或者 ADD_COMPLETE(該節點已經被添加)
四、pipeline删除節點
相關問題:介紹一下Netty中的Handler可插拔?
回答:netty有個最大的特性之一就是Handler可插拔,做到動态編織pipeline,
解釋:在首次建立連接配接的時候,需要通過進行權限認證,在認證通過之後,就可以将此context移除,下次pipeline在傳播事件的時候就就不會調用到權限認證處理器。
權限認證-删除Handler
下面是權限認證Handler最簡單的實作,第一個資料包傳來的是資料實體就是認證資訊,如果校驗通過,就删除此Handler,否則,直接關閉連接配接
// AuthHandler 是權限認證類
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 在channelRead0 pipeline删除節點的時候
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this); // context中的pipeline删除這個AuthHandler對象
} else {
ctx.close(); // 關閉這個context
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
}
重點就在 ctx.pipeline().remove(this) 這段代碼
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler)); // remove() 移除 就是pipeline雙向連結清單移除
return this;
}
remove操作相比add簡單不少,分為三個步驟:
-
- 找到待删除的節點
- 調整雙向連結清單指針删除
- 回調使用者函數
4.1 源碼解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循環周遊找到待删除的節點
DefaultChannelPipeline
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); // 從這個實參handler得到context
if (ctx == null) { // context為null,抛出異常
throw new NoSuchElementException(handler.getClass().getName());
} else { // context不為null,傳回context,這個else可以不需要
return ctx;
}
}
// context方法概要:将一個handler包裝成一個context,或者,使用handler作為實參來執行個體化一個context
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next; // pipeline中,head不存放元素,直接從head.next開始
for (;;) { // 死循環
if (ctx == null) { // 直到目前ctx為null,就是pipeline沒有了,return null
return null;
}
if (ctx.handler() == handler) { // 當pipeline中的節點ctx的handler == 實參handler , 就是找到了,就傳回這個context
return ctx;
}
ctx = ctx.next; // 不斷向後移動
}
}
這裡為了找到Handler對應的context,照樣是通過依次周遊雙向連結清單的方式,直到某一個context的Handler和目前Handler相同,便找到了該節點
4.2 源碼解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,調整雙向連結清單指針删除
進入到remove(getContextOrDie(handler))方法
DefaultChannelPipeline
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 2.調整雙向連結清單指針删除
remove0(ctx); // remove0删除
}
// 3.回調使用者函數
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next; // 1
next.prev = prev; // 2
}
這個remove0()方法可以簡化
private static void remove0(AbstractChannelHandlerContext ctx) { // 要删除的節點是ctx
ctx.prev.next = ctx.next; // 1 修改ctx前驅節點的next指針指向
ctx.next.prev = ctx.prev; // 2 修改ctx後繼節點的prev指針指向
}
經曆的過程要比添加節點要簡單,可以用下面一幅圖來表示
删除節點過程
删除節點之後,最後的結果為
結合這兩幅圖,可以很清晰地了解權限驗證Handler的工作原理,另外,被删除的節點因為沒有對象引用到,果過段時間就會被gc自動回收
4.3 源碼解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回調使用者函數
在callHandlerRemoved0(ctx)方法中,以這個被移除pipeline雙連結清單的ctx為參數,執行這個ctx的handler的handlerRemoved()方法,這個方法是使用者自定義方法,這個方法的參數是被移除pipeline雙連結清單的ctx
如果正常,就執行handlerRemoved()方法了
如果抛出異常,執行ctx.setRemoved()方法,僅僅設定這個類變量handlerState而已
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
}
到了第三步,pipeline中的節點删除完成,于是便開始回調使用者代碼 ctx.handler().handlerRemoved(ctx);,常見的代碼如下
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 節點被删除完畢之後回調到此,可做一些資源清理
// do something
}
}
最後,将該節點的狀态設定為removed
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
removexxx系列的其他方法族大同小異,你可以根據上面的思路展開其他的系列方法,這裡不再贅述
五、面試金手指
5.1 服務端初始化、新連接配接建立:從channel到pipeline
對于前面三篇 服務端初始化、netty中實作reactor原理、新連接配接建立小結:
通過前面的源碼系列文章中的netty reactor線程三部曲,我們已經知道,netty的reactor線程就像是一個發動機,驅動着整個netty架構的運作,
而服務端初始化和新連接配接的建立正是發動機的導火線,将發動機點燃
netty在服務端初始化和新連接配接建立的過程中會建立相應的channel,而與channel的動作密切相關的是pipeline這個概念,pipeline像是可以看作是一條流水線,原始的原料(位元組流)進來,經過加工,最後輸出
-
- 以新連接配接建立為例,新連接配接建立的過程中建立channel,而在建立channel的過程中建立了該channel對應的pipeline,建立完pipeline之後,自動給該pipeline添加了兩個節點(head tail),即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel所有的上下文資訊。
- pipeline是雙向連結清單結構,添加和删除節點均隻需要調整連結清單結構
- pipeline中的每個節點包着具體的處理器ChannelHandler,節點根據ChannelHandler的類型是ChannelInboundHandler還是ChannelOutboundHandler來判斷該節點屬于in還是out或者兩者都是。
5.2 pipeline初始化
5.2.1 channel中的核心元件
金手指:pipeline初始化總述
小結:在【Netty源碼解析 003 新連接配接建立】中,對于服務端初始化和新連接配接建立,得到了channel的繼承體系,這裡對channel的成員變量解釋
一個channnel中有一個pipeline,底層功能由這個pipeline完成,如pipeline.read(),pipeline是一個非循環雙向連結清單,就是 newChannelPipeline(); new TailContext(this); new HeadContext(this);
NioSocketChannelConfig
ChannelId 就是newId,就是random
unsafe 就是 newUnsafe();
5.2.2 pipeline預設結構
pipeline中的每個節點是一個ChannelHandlerContext類對象,每個context節點儲存了它包裹的執行器 ChannelHandler 執行操作所需要的上下文,其實就是pipeline,因為pipeline包含了channel的引用,可以拿到所有的context資訊。
預設情況下,一條pipeline會有兩個節點,head和tail,下一篇文章【Netty中的pipeline(二)】我們具體分析這兩個特殊的節點,今天我們重點放在pipeline。
Pipeline屬性結構如下
public class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); // 沒用,列印異常
private static final String HEAD_NAME = generateName0(DefaultChannelPipeline.HeadContext.class); // 一個常量,表示head的name
private static final String TAIL_NAME = generateName0(DefaultChannelPipeline.TailContext.class); // 一個常量,表示tail的name
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal() {
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap();
}
}; //存放名稱name的緩存,使用弱引用,表示緩存
final AbstractChannelHandlerContext head; // pipeline中的head指針
final AbstractChannelHandlerContext tail; // pipeline中的tail指針
private final Channel channel; // pipeline中包含channel引用
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private Handle estimatorHandle;
private boolean firstRegistration = true;
private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
private boolean registered;
對于DefaultChannelPipeline類:
1、nameCaches:存放名稱name的緩存,使用弱引用,表示緩存,本文newContext()建立節點的使用filterName()節點名稱的時候使用
2、head tail:下一篇文章才講到
3、DefaultChannelPipeline構造函數中,設定channel,初始化tail head,并設定好tail head,就是DefaultChannelPipeline在建立号執行個體pipeline的時候,channel head tail就設定好了
5.3 pipeline添加節點
synchronized (this) { // 保證線程安全
// 1.檢查是否有重複handler 先對要添加的ChannelHandler類對象handler校驗
checkMultiplicity(handler);
// 2.建立節點 使用name和handler建立AbstractChannelHandlerContext 節點
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加節點 将建立的AbstractChannelHandlerContext 節點插入
addLast0(newCtx);
}
// 4.回調使用者方法
callHandlerAdded0(handler); // 添加完成,調用使用者自定義重寫的方法
1、checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗
如果目前要添加的Handler是非共享的,并且已經添加過,那就抛出異常,否則,辨別該handler已經添加
金手指1:
如果一個Handler如果是sharable的,就可以無限次被添加到pipeline中,我們用戶端代碼如果要讓一個Handler被共用,隻需要加一個@Sharable标注即可,如下
@Sharable
public class BusinessHandler {
}
金手指2:如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個
isSharable() 方法正是通過該Handler對應的類是否标注@Sharable來判斷的,且看isSharable() 方法的源碼
ChannelHandlerAdapter類
public boolean isSharable() {
Class<?> clazz = getClass(); //得到位元組碼對象
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
這裡也可以看到,netty為了性能優化到極緻,還使用了ThreadLocal來緩存Handler的狀态,高并發海量連接配接下,每次有新連接配接添加Handler都會建立調用此方法
2、newContext()源碼解析:建立節點
2.1 前奏:filterName()處理名稱
2.1 詳細:源碼解析filterName(),給handler建立一個唯一性的名字
我們傳入的name為null,netty就給我們生成一個預設的name,否則,檢查是否有重名,檢查通過的話就傳回
2.1 緩存 // nameCaches 意為名稱緩存,是FastThreadLocal類型,
// 建立FastThreadLocal類型對象,提供initialValue()方法的時候,這個initialValue()提供一個FastThreadLocal中的Map的具體類型,就是WeakHashMap,在Java四種引用類型裡面,這是弱引用,無論記憶體是否足夠,隻要 JVM 開始進行垃圾回收,那些被弱引用關聯的對象都會被回收,WeakHashMap 節約存儲空間,作為緩存一些不重要的資料,且看部落格【Java四種引用】
2.2 newContext()建立context
2.3 InBound和OutBound繼承體系
繼承體系一句話小結:
ChannelInboundHandler和ChannelOutboundHandler兩個都是接口(一定不能執行個體化對象,是以上面instanceOf一定是它們的子類對象),都是ChannelHandler的子接口
ChannelInboundHandler有實作類ChannelInboundHandlerAdapter,這個不是重點,
重點是,ChannelDuplexHandler抽象類既是ChannelInboundHandler接口實作類,也是ChannelOutboundHandler接口實作類,
然後MessageToMessageCodec抽象類是ChannelDuplexHandler抽象類的是實作類,它可以同時完成解碼decode和編碼encode的操作。
3、addLast0()方法源碼解析:尾添加節點,雙向連結清單尾插法添加四操作
為什麼addLast0()不是添加tail節點後面,而是添加到tail節點前面?
因為tail尾節點不存放資料,最後一個存放資料的節點就是tail節點的前驅節點。
注意:pipeline中的,head節點和tail節點都是不存放資料的
4、源碼解析:callHandlerAdded0()方法,回調使用者方法并設定添加完成狀态
5.4 pipeline删除節點
相關問題:介紹一下Netty中的Handler可插拔?
回答:netty有個最大的特性之一就是Handler可插拔,做到動态編織pipeline,
解釋:在首次建立連接配接的時候,需要通過進行權限認證,在認證通過之後,就可以将此context移除,下次pipeline在傳播事件的時候就就不會調用到權限認證處理器。
相關問題:pipeline删除節點三步驟
1、源碼解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循環周遊找到待删除的節點
2、源碼解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,調整雙向連結清單指針删除
3、源碼解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回調使用者函數
六、小結
【Netty源碼解析004】Netty中的pipeline(一),完成了。