天天看點

星辰大海,Netty中的pipeline(一)

文章目錄

​​一、前言​​​​二、pipeline 初始化​​

  • ​​2.1 channel中的核心元件​​
  • ​​2.2 pipeline預設結構​​
  • ​​三、pipeline添加節點​​
  • ​​3.0 DefaultChannelPipeline類中的addLast()方法源碼解析:Netty添加節點的時候,從源碼層面區分一個 ChannelHandlerContext到底是in還是out​​
  • ​​3.1 checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗​​
  • ​​3.2 newContext()源碼解析:建立節點​​
  • ​​3.2.1 前奏:filterName()處理名稱​​
  • ​​3.2.2 newContext()建立context​​
  • ​​3.2.3 InBound和OutBound繼承體系​​
  • ​​3.3 addLast0()方法源碼解析:尾添加節點,雙向連結清單尾插法添加四操作​​
  • ​​3.4 源碼解析:callHandlerAdded0()方法,回調使用者方法并設定添加完成狀态​​
  • ​​四、pipeline删除節點​​
  • ​​4.1 源碼解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循環周遊找到待删除的節點​​
  • ​​4.2 源碼解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,調整雙向連結清單指針删除​​
  • ​​4.3 源碼解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回調使用者函數​​
  • ​​五、面試金手指​​
  • ​​5.1 服務端初始化、新連接配接建立:從channel到pipeline​​
  • ​​5.2 pipeline初始化​​
  • ​​5.2.1 channel中的核心元件​​
  • ​​5.2.2 pipeline預設結構​​
  • ​​5.3 pipeline添加節點​​
  • ​​5.4 pipeline删除節點​​
  • ​​六、小結​​

一、前言

對于前面三篇 服務端初始化、netty中實作reactor原理、新連接配接建立小結:

通過前面的源碼系列文章中的netty reactor線程三部曲,我們已經知道,netty的reactor線程就像是一個發動機,驅動着整個netty架構的運作,而服務端初始化和新連接配接的建立正是發動機的導火線,将發動機點燃

netty在服務端初始化和新連接配接建立的過程中會建立相應的channel,而與channel的動作密切相關的是pipeline這個概念,pipeline像是可以看作是一條流水線,原始的原料(位元組流)進來,經過加工,最後輸出

本文,我将以新連接配接的建立為例分為以下幾個部分給你介紹netty中的pipeline是怎麼玩轉起來的

pipeline 初始化

pipeline 添加節點

pipeline 删除節點

Pipeline第一篇:核心是pipeline非循環雙連結清單的插入和删除;

Pipeline第二篇:UnSafe接口+HeadContext類+inBound事件傳播+TailContext類+outBound事件傳播+異常Exception傳播。

二、pipeline 初始化

在新連接配接的建立這篇文章中,我們已經知道了建立NioSocketChannel的時候會将netty的核心元件建立出來

2.1 channel中的核心元件

星辰大海,Netty中的pipeline(一)

小結:在【Netty源碼解析 003 新連接配接建立】中,對于服務端初始化和新連接配接建立,得到了channel的繼承體系,這裡對channel的成員變量解釋

一個channnel中有一個pipeline,底層功能由這個pipeline完成,如pipeline.read(),pipeline是一個非循環雙向連結清單,就是 newChannelPipeline(); new TailContext(this); new HeadContext(this);

NioSocketChannelConfig

ChannelId 就是newId,就是random

unsafe 就是 newUnsafe();

pipeline是其中的一員,在下面這段代碼中被建立

AbstractChannel

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}      
AbstractChannel   newChannelPipeline()源碼解析

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

接上面,DefaultChannelPipeline  對于pipeline中的tail和head的處理

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}      

pipeline中儲存了channel的引用,建立完pipeline之後,整個pipeline是這個樣子的

2.2 pipeline預設結構

星辰大海,Netty中的pipeline(一)

pipeline中的每個節點是一個ChannelHandlerContext類對象,每個context節點儲存了它包裹的執行器 ChannelHandler 執行操作所需要的上下文,其實就是pipeline,因為pipeline包含了channel的引用,可以拿到所有的context資訊。

預設情況下,一條pipeline會有兩個節點,head和tail,下一篇文章【Netty中的pipeline(二)】我們具體分析這兩個特殊的節點,今天我們重點放在pipeline。

Pipeline屬性結構如下

public class DefaultChannelPipeline implements ChannelPipeline {
    static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);   // 沒用,列印異常
    private static final String HEAD_NAME = generateName0(DefaultChannelPipeline.HeadContext.class);    // 一個常量,表示head的name
    private static final String TAIL_NAME = generateName0(DefaultChannelPipeline.TailContext.class);    // 一個常量,表示tail的name,下一篇文章講head和tail的時候使用
    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal() {
        protected Map<Class<?>, String> initialValue() throws Exception {
            return new WeakHashMap();
        }
    };    //存放名稱name的緩存,使用弱引用,表示緩存,本文newContext()建立節點的使用filterName()節點名稱的時候使用
    final AbstractChannelHandlerContext head;  // pipeline中的head指針,下一篇文章講述HeadContext類的使用用到
    final AbstractChannelHandlerContext tail;  // pipeline中的tail指針,下一篇文章講述TailContext類的使用用到
    private final Channel channel;   // pipeline中包含channel引用
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();
    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private Handle estimatorHandle;
    private boolean firstRegistration = true;
    private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
    private boolean registered;
    protected DefaultChannelPipeline(Channel channel) {   // DefaultChannelPipeline構造函數中設定channel,初始化tail head,并設定好tail head
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }      

對于DefaultChannelPipeline類:

1、nameCaches:存放名稱name的緩存,使用弱引用,表示緩存,本文newContext()建立節點的使用filterName()節點名稱的時候使用

2、head tail:下一篇文章才講到

3、DefaultChannelPipeline構造函數中,設定channel,初始化tail head,并設定好tail head,就是DefaultChannelPipeline在建立号執行個體pipeline的時候,channel head tail就設定好了

三、pipeline添加節點

下面是一段非常常見的用戶端代碼

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         // head  outBound   寫出
         p.addLast(new Spliter())   // TCP拆包  inBound   讀入
         p.addLast(new Decoder());   // 解碼   inBound  讀入
         p.addLast(new BusinessHandler())    // 業務處理器  inBound  讀入
         p.addLast(new Encoder());   // 編碼   outBound   寫出
         // tail  inBound  讀入
     }
});      

首先,用一個spliter将來源TCP資料包拆包,然後将拆出來的包進行decoder,傳入業務處理器BusinessHandler,業務處理完encoder,輸出

整個pipeline結構如下

星辰大海,Netty中的pipeline(一)

我用兩種顔色區分了一下pipeline中兩種不同類型的節點,

第一種類型的Handler是 ChannelInboundHandler,處理inBound事件,最典型的就是讀取資料流,加工處理;

第二種類型的Handler是 ChannelOutboundHandler, 處理outBound事件,比如當調用writeAndFlush()類方法時,就會經過該種類型的handler

不管是哪種類型的handler,其外層對象 ChannelHandlerContext 之間都是通過雙向連結清單連接配接,而區分一個 ChannelHandlerContext到底是in還是out,在添加節點的時候我們就可以看到netty是怎麼處理的

3.0 DefaultChannelPipeline類中的addLast()方法源碼解析:Netty添加節點的時候,從源碼層面區分一個 ChannelHandlerContext到底是in還是out

DefaultChannelPipeline類中的addLast()方法

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {   // 對于handlers周遊,每一個item添加到pipeline裡面去
        addLast(executor, null, h);    
    }
    return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {   // 保證線程安全
        // 1.檢查是否有重複handler   先對要添加的ChannelHandler類對象handler校驗
        checkMultiplicity(handler);   
        // 2.建立節點      使用name和handler建立AbstractChannelHandlerContext 節點
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加節點     将建立的AbstractChannelHandlerContext 節點插入
        addLast0(newCtx);
    }

    // 4.回調使用者方法
    callHandlerAdded0(handler);   // 添加完成,調用使用者自定義重寫的方法

    return this;
}      

這裡簡單地用synchronized方法是為了防止多線程并發操作pipeline底層的雙向連結清單

我們還是逐漸分析上面這段代碼

3.1 checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗

在使用者代碼添加一條handler的時候,首先會檢視該handler有沒有添加過

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {     // handler是ChannelHandlerAdapter的執行個體對象,就強制類型轉換
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;  // handler是ChannelHandlerAdapter的執行個體對象,就強制類型轉換
        if (!h.isSharable() && h.added) {   // 這個實參handler不是共享,且已經被添加到pipeline裡面(added标志位為true)抛出異常
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;    // 因為下一步就要添加了,是以這裡設定這個item handler的标志位added為true
    }
}      

netty使用一個成員變量added辨別一個channel是否已經添加,上面這段代碼很簡單,如果目前要添加的Handler是非共享的,并且已經添加過,那就抛出異常,否則,辨別該handler已經添加

小結:checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗

如果目前要添加的Handler是非共享的,并且已經添加過,那就抛出異常,否則,辨別該handler已經添加

金手指1:

如果一個Handler如果是sharable的,就可以無限次被添加到pipeline中,我們用戶端代碼如果要讓一個Handler被共用,隻需要加一個@Sharable标注即可,如下

@Sharable

public class BusinessHandler {

}

金手指2:如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個

isSharable() 方法正是通過該Handler對應的類是否标注@Sharable來判斷的,且看isSharable() 方法的源碼

ChannelHandlerAdapter類

public boolean isSharable() {

Class<?> clazz = getClass(); //得到位元組碼對象

Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();

Boolean sharable = cache.get(clazz);

if (sharable == null) {

sharable = clazz.isAnnotationPresent(Sharable.class);

cache.put(clazz, sharable);

}

return sharable;

}

這裡也可以看到,netty為了性能優化到極緻,還使用了ThreadLocal來緩存Handler的狀态,高并發海量連接配接下,每次有新連接配接添加Handler都會建立調用此方法

3.2 newContext()源碼解析:建立節點

回到主流程,看建立上下文這段代碼

3.2.1 前奏:filterName()處理名稱

newCtx = newContext(group, filterName(name, handler), handler);

這裡我們需要先分析 filterName(name, handler) 這段代碼,這個函數用于給handler建立一個唯一性的名字

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}      

小結:源碼解析filterName(),給handler建立一個唯一性的名字

我們傳入的name為null,netty就給我們生成一個預設的name,否則,檢查是否有重名,檢查通過的話就傳回

netty建立預設name的規則為 簡單類名#0,下面我們來看些具體是怎麼實作的

DefaultChannelPipeline類

// nameCaches 意為名稱緩存,是FastThreadLocal類型,
// 建立FastThreadLocal類型對象,提供initialValue()方法的時候,這個initialValue()提供一個FastThreadLocal中的Map的具體類型,就是WeakHashMap,在Java四種引用類型裡面,這是弱引用,無論記憶體是否足夠,隻要 JVM 開始進行垃圾回收,那些被弱引用關聯的對象都會被回收,WeakHashMap 節約存儲空間,作為緩存一些不重要的資料,且看部落格【Java四種引用】
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

// 上面的name==null,要使用generateName生成随機名稱,這裡使用到上面的nameCaches 
private String generateName(ChannelHandler handler) {
    // 先檢視緩存中是否有生成過預設name
    Map<Class<?>, String> cache = nameCaches.get();  // FastThreadLocal類型的nameCaches得到一個元素cache,類型為Map<Class<?>, String>,就是一個map,key為位元組碼,value為String,這個map類型的cache才是緩存本體
    Class<?> handlerType = handler.getClass();  // 獲得實參item  handler的位元組碼
    String name = cache.get(handlerType);    // cache根據key 位元組碼得到String name
    // 沒有生成過,就生成一個預設name,加入緩存 
    if (name == null) {   // 如果這個位元組碼的name==null
        name = generateName0(handlerType);  // 生成一個,放進去,參數是位元組碼對象
        cache.put(handlerType, name);  // 生成一個,放進去
    }

    // 生成完了或者name!=null,還要看預設name有沒有沖突
    if (context0(name) != null) {   // context0(name)不是null,進入,說明生成的name有沖突,要進入處理沖突,是null,生成的name沒有沖突,直接return name,結束方法
        String baseName = name.substring(0, name.length() - 1);  // 得到baseName就是生成的name的前面n-1位
        for (int i = 1;; i ++) {  // 死循環
            String newName = baseName + i;  // baseName + i,湊上第n位,不斷循環
            if (context0(newName) == null) {  // 直到找到,context0(name)是null,就是name=newName break; return name;
                name = newName;  
                break;
            }
        }
    }
    return name;   // 最後傳回生成的name
}      

最後一段,檢測沖突的,這樣修改,好了解一些(不沖突,直接return name;沖突,不斷湊上最後一位,直到不沖突,傳回return newName;)

// 生成完了或者name!=null,還要看預設name有沒有沖突
    if (context0(name) != null) {   // context0(name)不是null,進入,說明生成的name有沖突,要進入處理沖突,是null,生成的name沒有沖突,直接return name,結束方法
        String baseName = name.substring(0, name.length() - 1);  // 得到baseName就是生成的name的前面n-1位
        for (int i = 1;; i ++) {  // 死循環 i沒有上限
            String newName = baseName + i;  // baseName + i,湊上第n位,不斷循環
            if (context0(newName) == null) {  // 直到找到,context0(name)是null,表示目前這個newName不沖突了
               return newName; // 傳回不沖突的newName
            }
        }
    }
    return name;   // 傳回不沖突的name      

其實,上面代碼中,有兩個地方我們需要繼續深入

generateName0() 當name==null,生成name

context0(name) 判斷name是否沖突

netty使用一個 FastThreadLocal(後面的文章會細說)變量來緩存Handler的類和預設名稱的映射關系,在生成name的時候,首先檢視緩存中有沒有生成過預設name(簡單類名#0),如果沒有生成,就調用generateName0()生成預設name,然後加入緩存

接下來還需要檢查name是否和已有的name有沖突,調用context0(),查找pipeline裡面有沒有對應的context

private static String generateName0(Class<?> handlerType) {   // handlerType是位元組碼對象
    return StringUtil.simpleClassName(handlerType) + "#0";
}

public static String simpleClassName(Class<?> clazz) {  // handlerType是位元組碼對象
    String className = ((Class)ObjectUtil.checkNotNull(clazz, "clazz")).getName();  //得到單純的類名
    int lastDotIdx = className.lastIndexOf(46);  // 從後面數第46個
    return lastDotIdx > -1?className.substring(lastDotIdx + 1):className;  // 如果這個數字大于-1,就是倒數46個存在,傳回className 0~lastDotIdx+1
    如果這個數字小于等于-1,其實就是等于-1,就是說倒數第46個不存在,直接傳回className整個類名
}      
private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;   // 得到head後繼節點,從head後繼節點開始循環周遊,是否name沖突,為什麼不是從head開始,因為head是不存儲元素的
    while (context != tail) {  // 隻要還沒有到尾巴,即使這裡是tail,tail也是不存放元素的,這裡隻是比較name
        if (context.name().equals(name)) {
            return context; // 沖突,傳回沖突的節點,其實這個傳回值沒什麼用,因為隻要判斷一下是否為null就好,甚至可以将傳回值修改為boolean類型
        }
        context = context.next;   // 指針移動
    } 
    return null;   // 不沖突
}      

context0()方法連結清單周遊每一個 ChannelHandlerContext,隻要發現某個context的名字與待添加的name相同,就傳回該context,最後抛出異常,可以看到,這個其實是一個線性搜尋的過程

如果context0(name) != null 成立,說明現有的context裡面已經有了一個預設name,那麼就從 簡單類名#1 往上一直找,直到找到一個唯一的name,比如簡單類名#3( for (int i = 1;; i ++) { // 死循環 i沒有上限)

如果使用者代碼在添加Handler的時候指定了一個name,那麼要做到事僅僅為檢查一下是否有重複

private void checkDuplicateName(String name) {
    if (context0(name) != null) {  // 重複,抛出異常
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}      

處理完name之後,就進入到建立context的過程,由前面的調用鍊得知,group為null,是以childExecutor(group)也傳回null

3.2.2 newContext()建立context

DefaultChannelPipeline類

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

// 底層調用childExecutor()方法
private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

DefaultChannelHandlerContext類構造方法

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));   // 調用父類,pipeline executor name三個參數都是傳遞過來的,對于inbound和outbound,是使用handler作為實參調用方法得到的
    if (handler == null) {   // handler為null,抛出異常
        throw new NullPointerException("handler");
    }
    // handler不為null,就指派
    this.handler = handler;
}      

構造函數中,DefaultChannelHandlerContext将參數回傳到父類,儲存Handler的引用,進入到其父類

AbstractChannelHandlerContext類,DefaultChannelHandlerContext的父類

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");   // 設定name
    this.pipeline = pipeline;     // 設定pipeline
    this.executor = executor;    // 設定executor
    this.inbound = inbound;   // 設定inbound
    this.outbound = outbound;   // 設定outbound
}      

netty中用兩個字段來表示這個channelHandlerContext屬于inBound還是outBound,或者兩者都是,兩個boolean是通過下面兩個小函數來判斷(見上面一段代碼)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;    //  方法很簡單,是否是ChannelInboundHandler類及其子類對象,是傳回true,不是就傳回false
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;   //  方法很簡單,是否是ChannelOutboundHandler類及其子類對象,是傳回true,不是就傳回false
}      

3.2.3 InBound和OutBound繼承體系

通過instanceof關鍵字根據接口類型來判斷,是以,如果一個Handler實作了兩類接口,那麼他既是一個inBound類型的Handler,又是一個outBound類型的Handler,比如下面這個ChannelDuplexHandler類:

星辰大海,Netty中的pipeline(一)

繼承體系一句話小結:

ChannelInboundHandler和ChannelOutboundHandler兩個都是接口(一定不能執行個體化對象,是以上面instanceOf一定是它們的子類對象),都是ChannelHandler的子接口

ChannelInboundHandler有實作類ChannelInboundHandlerAdapter,這個不是重點,

重點是,ChannelDuplexHandler抽象類既是ChannelInboundHandler接口實作類,也是ChannelOutboundHandler接口實作類,

然後MessageToMessageCodec抽象類是ChannelDuplexHandler抽象類的是實作類,它可以同時完成解碼decode和編碼encode的操作。

常用的,将decode操作和encode操作合并到一起的codec,一般會繼承 MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler,它可以同時完成解碼decode和編碼encode的工作,如下:

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;
 }      

context 建立完了之後,接下來終于要将建立完畢的context加入到pipeline中去了

3.3 addLast0()方法源碼解析:尾添加節點,雙向連結清單尾插法添加四操作

為什麼addLast0()不是添加tail節點後面,而是添加到tail節點前面?

因為tail尾節點不存放資料,最後一個存放資料的節點就是tail節點的前驅節點。

注意:pipeline中的,head節點和tail節點都是不存放資料的

DefaultChannelPipeline類addLast0()方法

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1  新節點prev:尾節點前面那個
    newCtx.next = tail; // 2  新節點next:尾巴
    prev.next = newCtx; // 3  之前的尾節點前面那個next:新節點
    tail.prev = newCtx; // 4  尾節點next:新節點
}      

用下面這幅圖可見簡單的表示這段過程,說白了,其實就是一個雙向連結清單的插入操作

添加節點過程

星辰大海,Netty中的pipeline(一)

操作完畢,該context就加入到pipeline中

添加節點之後

星辰大海,Netty中的pipeline(一)

到這裡,pipeline添加節點的操作就完成了,你可以根據此思路掌握所有的addxxx()系列方法

3.4 源碼解析:callHandlerAdded0()方法,回調使用者方法并設定添加完成狀态

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);    // ctx.handler() 表示context得到handler,然後調用hander的handlerAdded()方法
    ctx.setAddComplete();
}      

到了第四步,pipeline中的新節點添加完成,于是便開始回調使用者代碼 ctx.handler().handlerAdded(ctx);,常見的使用者代碼如下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 節點被添加完畢之後回調到此
        // do something
    }
}      

接下來,設定該節點的狀态

AbstractChannelHandlerContext

final void setAddComplete() {   // 設定添加成功
    for (;;) {   // 死循環
        int oldState = handlerState;   // 記錄之前的oldState
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {     // 死循環,知道這個節點被删除或者被成功新增,才return結束循環,否則,不斷空轉
            return;
        }
    }
}      

用cas修改節點的狀态至:REMOVE_COMPLETE(說明該節點已經被移除) 或者 ADD_COMPLETE(該節點已經被添加)

四、pipeline删除節點

相關問題:介紹一下Netty中的Handler可插拔?

回答:netty有個最大的特性之一就是Handler可插拔,做到動态編織pipeline,

解釋:在首次建立連接配接的時候,需要通過進行權限認證,在認證通過之後,就可以将此context移除,下次pipeline在傳播事件的時候就就不會調用到權限認證處理器。

權限認證-删除Handler

下面是權限認證Handler最簡單的實作,第一個資料包傳來的是資料實體就是認證資訊,如果校驗通過,就删除此Handler,否則,直接關閉連接配接

// AuthHandler 是權限認證類
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    // 在channelRead0 pipeline删除節點的時候
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);  // context中的pipeline删除這個AuthHandler對象
        } else {
            ctx.close();   // 關閉這個context
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}      

重點就在 ctx.pipeline().remove(this) 這段代碼

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));  // remove() 移除 就是pipeline雙向連結清單移除
    return this;
}      

remove操作相比add簡單不少,分為三個步驟:

  1. 找到待删除的節點
  2. 調整雙向連結清單指針删除
  3. 回調使用者函數

4.1 源碼解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循環周遊找到待删除的節點

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);    // 從這個實參handler得到context
    if (ctx == null) {   // context為null,抛出異常
        throw new NoSuchElementException(handler.getClass().getName());
    } else {  // context不為null,傳回context,這個else可以不需要
        return ctx;
    }
}

// context方法概要:将一個handler包裝成一個context,或者,使用handler作為實參來執行個體化一個context

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;  // pipeline中,head不存放元素,直接從head.next開始
    for (;;) {   // 死循環

        if (ctx == null) {  // 直到目前ctx為null,就是pipeline沒有了,return null
            return null;
        }

        if (ctx.handler() == handler) {  // 當pipeline中的節點ctx的handler == 實參handler , 就是找到了,就傳回這個context 
            return ctx;
        }

        ctx = ctx.next;  // 不斷向後移動
    }
}      

這裡為了找到Handler對應的context,照樣是通過依次周遊雙向連結清單的方式,直到某一個context的Handler和目前Handler相同,便找到了該節點

4.2 源碼解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,調整雙向連結清單指針删除

進入到remove(getContextOrDie(handler))方法

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.調整雙向連結清單指針删除
        remove0(ctx);  // remove0删除
    }
    // 3.回調使用者函數
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;  
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1  
    next.prev = prev; // 2
}

這個remove0()方法可以簡化
private static void remove0(AbstractChannelHandlerContext ctx) { // 要删除的節點是ctx
    ctx.prev.next = ctx.next; // 1   修改ctx前驅節點的next指針指向
    ctx.next.prev = ctx.prev; // 2   修改ctx後繼節點的prev指針指向
}      

經曆的過程要比添加節點要簡單,可以用下面一幅圖來表示

删除節點過程

星辰大海,Netty中的pipeline(一)

删除節點之後,最後的結果為

星辰大海,Netty中的pipeline(一)

結合這兩幅圖,可以很清晰地了解權限驗證Handler的工作原理,另外,被删除的節點因為沒有對象引用到,果過段時間就會被gc自動回收

4.3 源碼解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回調使用者函數

在callHandlerRemoved0(ctx)方法中,以這個被移除pipeline雙連結清單的ctx為參數,執行這個ctx的handler的handlerRemoved()方法,這個方法是使用者自定義方法,這個方法的參數是被移除pipeline雙連結清單的ctx
如果正常,就執行handlerRemoved()方法了
如果抛出異常,執行ctx.setRemoved()方法,僅僅設定這個類變量handlerState而已
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}      

到了第三步,pipeline中的節點删除完成,于是便開始回調使用者代碼 ctx.handler().handlerRemoved(ctx);,常見的代碼如下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 節點被删除完畢之後回調到此,可做一些資源清理
        // do something
    }
}      

最後,将該節點的狀态設定為removed

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}      

removexxx系列的其他方法族大同小異,你可以根據上面的思路展開其他的系列方法,這裡不再贅述

五、面試金手指

5.1 服務端初始化、新連接配接建立:從channel到pipeline

對于前面三篇 服務端初始化、netty中實作reactor原理、新連接配接建立小結:

通過前面的源碼系列文章中的netty reactor線程三部曲,我們已經知道,netty的reactor線程就像是一個發動機,驅動着整個netty架構的運作,

而服務端初始化和新連接配接的建立正是發動機的導火線,将發動機點燃

netty在服務端初始化和新連接配接建立的過程中會建立相應的channel,而與channel的動作密切相關的是pipeline這個概念,pipeline像是可以看作是一條流水線,原始的原料(位元組流)進來,經過加工,最後輸出

  1. 以新連接配接建立為例,新連接配接建立的過程中建立channel,而在建立channel的過程中建立了該channel對應的pipeline,建立完pipeline之後,自動給該pipeline添加了兩個節點(head tail),即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel所有的上下文資訊。
  2. pipeline是雙向連結清單結構,添加和删除節點均隻需要調整連結清單結構
  3. pipeline中的每個節點包着具體的處理器ChannelHandler,節點根據ChannelHandler的類型是ChannelInboundHandler還是ChannelOutboundHandler來判斷該節點屬于in還是out或者兩者都是。

5.2 pipeline初始化

5.2.1 channel中的核心元件

星辰大海,Netty中的pipeline(一)

金手指:pipeline初始化總述

小結:在【Netty源碼解析 003 新連接配接建立】中,對于服務端初始化和新連接配接建立,得到了channel的繼承體系,這裡對channel的成員變量解釋

一個channnel中有一個pipeline,底層功能由這個pipeline完成,如pipeline.read(),pipeline是一個非循環雙向連結清單,就是 newChannelPipeline(); new TailContext(this); new HeadContext(this);

NioSocketChannelConfig

ChannelId 就是newId,就是random

unsafe 就是 newUnsafe();

5.2.2 pipeline預設結構

星辰大海,Netty中的pipeline(一)

pipeline中的每個節點是一個ChannelHandlerContext類對象,每個context節點儲存了它包裹的執行器 ChannelHandler 執行操作所需要的上下文,其實就是pipeline,因為pipeline包含了channel的引用,可以拿到所有的context資訊。

預設情況下,一條pipeline會有兩個節點,head和tail,下一篇文章【Netty中的pipeline(二)】我們具體分析這兩個特殊的節點,今天我們重點放在pipeline。

Pipeline屬性結構如下

public class DefaultChannelPipeline implements ChannelPipeline {

static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); // 沒用,列印異常

private static final String HEAD_NAME = generateName0(DefaultChannelPipeline.HeadContext.class); // 一個常量,表示head的name

private static final String TAIL_NAME = generateName0(DefaultChannelPipeline.TailContext.class); // 一個常量,表示tail的name

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal() {

protected Map<Class<?>, String> initialValue() throws Exception {

return new WeakHashMap();

}

}; //存放名稱name的緩存,使用弱引用,表示緩存

final AbstractChannelHandlerContext head; // pipeline中的head指針

final AbstractChannelHandlerContext tail; // pipeline中的tail指針

private final Channel channel; // pipeline中包含channel引用

private final ChannelFuture succeededFuture;

private final VoidChannelPromise voidPromise;

private final boolean touch = ResourceLeakDetector.isEnabled();

private Map<EventExecutorGroup, EventExecutor> childExecutors;

private Handle estimatorHandle;

private boolean firstRegistration = true;

private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;

private boolean registered;

對于DefaultChannelPipeline類:

1、nameCaches:存放名稱name的緩存,使用弱引用,表示緩存,本文newContext()建立節點的使用filterName()節點名稱的時候使用

2、head tail:下一篇文章才講到

3、DefaultChannelPipeline構造函數中,設定channel,初始化tail head,并設定好tail head,就是DefaultChannelPipeline在建立号執行個體pipeline的時候,channel head tail就設定好了

5.3 pipeline添加節點

synchronized (this) { // 保證線程安全

// 1.檢查是否有重複handler 先對要添加的ChannelHandler類對象handler校驗

checkMultiplicity(handler);

// 2.建立節點 使用name和handler建立AbstractChannelHandlerContext 節點

newCtx = newContext(group, filterName(name, handler), handler);

// 3.添加節點 将建立的AbstractChannelHandlerContext 節點插入

addLast0(newCtx);

}

// 4.回調使用者方法

callHandlerAdded0(handler); // 添加完成,調用使用者自定義重寫的方法

1、checkMultiplicity()源碼解析:檢查是否有重複handler,先校驗

如果目前要添加的Handler是非共享的,并且已經添加過,那就抛出異常,否則,辨別該handler已經添加

金手指1:

如果一個Handler如果是sharable的,就可以無限次被添加到pipeline中,我們用戶端代碼如果要讓一個Handler被共用,隻需要加一個@Sharable标注即可,如下

@Sharable

public class BusinessHandler {

}

金手指2:如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個

isSharable() 方法正是通過該Handler對應的類是否标注@Sharable來判斷的,且看isSharable() 方法的源碼

ChannelHandlerAdapter類

public boolean isSharable() {

Class<?> clazz = getClass(); //得到位元組碼對象

Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();

Boolean sharable = cache.get(clazz);

if (sharable == null) {

sharable = clazz.isAnnotationPresent(Sharable.class);

cache.put(clazz, sharable);

}

return sharable;

}

這裡也可以看到,netty為了性能優化到極緻,還使用了ThreadLocal來緩存Handler的狀态,高并發海量連接配接下,每次有新連接配接添加Handler都會建立調用此方法

2、newContext()源碼解析:建立節點

2.1 前奏:filterName()處理名稱

2.1 詳細:源碼解析filterName(),給handler建立一個唯一性的名字

我們傳入的name為null,netty就給我們生成一個預設的name,否則,檢查是否有重名,檢查通過的話就傳回

2.1 緩存 // nameCaches 意為名稱緩存,是FastThreadLocal類型,

// 建立FastThreadLocal類型對象,提供initialValue()方法的時候,這個initialValue()提供一個FastThreadLocal中的Map的具體類型,就是WeakHashMap,在Java四種引用類型裡面,這是弱引用,無論記憶體是否足夠,隻要 JVM 開始進行垃圾回收,那些被弱引用關聯的對象都會被回收,WeakHashMap 節約存儲空間,作為緩存一些不重要的資料,且看部落格【Java四種引用】

2.2 newContext()建立context

2.3 InBound和OutBound繼承體系

星辰大海,Netty中的pipeline(一)

繼承體系一句話小結:

ChannelInboundHandler和ChannelOutboundHandler兩個都是接口(一定不能執行個體化對象,是以上面instanceOf一定是它們的子類對象),都是ChannelHandler的子接口

ChannelInboundHandler有實作類ChannelInboundHandlerAdapter,這個不是重點,

重點是,ChannelDuplexHandler抽象類既是ChannelInboundHandler接口實作類,也是ChannelOutboundHandler接口實作類,

然後MessageToMessageCodec抽象類是ChannelDuplexHandler抽象類的是實作類,它可以同時完成解碼decode和編碼encode的操作。

3、addLast0()方法源碼解析:尾添加節點,雙向連結清單尾插法添加四操作

為什麼addLast0()不是添加tail節點後面,而是添加到tail節點前面?

因為tail尾節點不存放資料,最後一個存放資料的節點就是tail節點的前驅節點。

注意:pipeline中的,head節點和tail節點都是不存放資料的

4、源碼解析:callHandlerAdded0()方法,回調使用者方法并設定添加完成狀态

5.4 pipeline删除節點

相關問題:介紹一下Netty中的Handler可插拔?

回答:netty有個最大的特性之一就是Handler可插拔,做到動态編織pipeline,

解釋:在首次建立連接配接的時候,需要通過進行權限認證,在認證通過之後,就可以将此context移除,下次pipeline在傳播事件的時候就就不會調用到權限認證處理器。

相關問題:pipeline删除節點三步驟

1、源碼解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循環周遊找到待删除的節點

2、源碼解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,調整雙向連結清單指針删除

3、源碼解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回調使用者函數

六、小結

【Netty源碼解析004】Netty中的pipeline(一),完成了。