天天看点

星辰大海,Netty中的pipeline(一)

文章目录

​​一、前言​​​​二、pipeline 初始化​​

  • ​​2.1 channel中的核心组件​​
  • ​​2.2 pipeline默认结构​​
  • ​​三、pipeline添加节点​​
  • ​​3.0 DefaultChannelPipeline类中的addLast()方法源码解析:Netty添加节点的时候,从源码层面区分一个 ChannelHandlerContext到底是in还是out​​
  • ​​3.1 checkMultiplicity()源码解析:检查是否有重复handler,先校验​​
  • ​​3.2 newContext()源码解析:创建节点​​
  • ​​3.2.1 前奏:filterName()处理名称​​
  • ​​3.2.2 newContext()创建context​​
  • ​​3.2.3 InBound和OutBound继承体系​​
  • ​​3.3 addLast0()方法源码解析:尾添加节点,双向链表尾插法添加四操作​​
  • ​​3.4 源码解析:callHandlerAdded0()方法,回调用户方法并设置添加完成状态​​
  • ​​四、pipeline删除节点​​
  • ​​4.1 源码解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循环遍历找到待删除的节点​​
  • ​​4.2 源码解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,调整双向链表指针删除​​
  • ​​4.3 源码解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回调用户函数​​
  • ​​五、面试金手指​​
  • ​​5.1 服务端初始化、新连接建立:从channel到pipeline​​
  • ​​5.2 pipeline初始化​​
  • ​​5.2.1 channel中的核心组件​​
  • ​​5.2.2 pipeline默认结构​​
  • ​​5.3 pipeline添加节点​​
  • ​​5.4 pipeline删除节点​​
  • ​​六、小结​​

一、前言

对于前面三篇 服务端初始化、netty中实现reactor原理、新连接建立小结:

通过前面的源码系列文章中的netty reactor线程三部曲,我们已经知道,netty的reactor线程就像是一个发动机,驱动着整个netty框架的运行,而服务端初始化和新连接的建立正是发动机的导火线,将发动机点燃

netty在服务端初始化和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出

本文,我将以新连接的建立为例分为以下几个部分给你介绍netty中的pipeline是怎么玩转起来的

pipeline 初始化

pipeline 添加节点

pipeline 删除节点

Pipeline第一篇:核心是pipeline非循环双链表的插入和删除;

Pipeline第二篇:UnSafe接口+HeadContext类+inBound事件传播+TailContext类+outBound事件传播+异常Exception传播。

二、pipeline 初始化

在新连接的建立这篇文章中,我们已经知道了创建NioSocketChannel的时候会将netty的核心组件创建出来

2.1 channel中的核心组件

星辰大海,Netty中的pipeline(一)

小结:在【Netty源码解析 003 新连接建立】中,对于服务端初始化和新连接建立,得到了channel的继承体系,这里对channel的成员变量解释

一个channnel中有一个pipeline,底层功能由这个pipeline完成,如pipeline.read(),pipeline是一个非循环双向链表,就是 newChannelPipeline(); new TailContext(this); new HeadContext(this);

NioSocketChannelConfig

ChannelId 就是newId,就是random

unsafe 就是 newUnsafe();

pipeline是其中的一员,在下面这段代码中被创建

AbstractChannel

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}      
AbstractChannel   newChannelPipeline()源码解析

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

接上面,DefaultChannelPipeline  对于pipeline中的tail和head的处理

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}      

pipeline中保存了channel的引用,创建完pipeline之后,整个pipeline是这个样子的

2.2 pipeline默认结构

星辰大海,Netty中的pipeline(一)

pipeline中的每个节点是一个ChannelHandlerContext类对象,每个context节点保存了它包裹的执行器 ChannelHandler 执行操作所需要的上下文,其实就是pipeline,因为pipeline包含了channel的引用,可以拿到所有的context信息。

默认情况下,一条pipeline会有两个节点,head和tail,下一篇文章【Netty中的pipeline(二)】我们具体分析这两个特殊的节点,今天我们重点放在pipeline。

Pipeline属性结构如下

public class DefaultChannelPipeline implements ChannelPipeline {
    static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);   // 没用,打印异常
    private static final String HEAD_NAME = generateName0(DefaultChannelPipeline.HeadContext.class);    // 一个常量,表示head的name
    private static final String TAIL_NAME = generateName0(DefaultChannelPipeline.TailContext.class);    // 一个常量,表示tail的name,下一篇文章讲head和tail的时候使用
    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal() {
        protected Map<Class<?>, String> initialValue() throws Exception {
            return new WeakHashMap();
        }
    };    //存放名称name的缓存,使用弱引用,表示缓存,本文newContext()创建节点的使用filterName()节点名称的时候使用
    final AbstractChannelHandlerContext head;  // pipeline中的head指针,下一篇文章讲述HeadContext类的使用用到
    final AbstractChannelHandlerContext tail;  // pipeline中的tail指针,下一篇文章讲述TailContext类的使用用到
    private final Channel channel;   // pipeline中包含channel引用
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();
    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private Handle estimatorHandle;
    private boolean firstRegistration = true;
    private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
    private boolean registered;
    protected DefaultChannelPipeline(Channel channel) {   // DefaultChannelPipeline构造函数中设置channel,初始化tail head,并设置好tail head
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }      

对于DefaultChannelPipeline类:

1、nameCaches:存放名称name的缓存,使用弱引用,表示缓存,本文newContext()创建节点的使用filterName()节点名称的时候使用

2、head tail:下一篇文章才讲到

3、DefaultChannelPipeline构造函数中,设置channel,初始化tail head,并设置好tail head,就是DefaultChannelPipeline在新建号实例pipeline的时候,channel head tail就设置好了

三、pipeline添加节点

下面是一段非常常见的客户端代码

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         // head  outBound   写出
         p.addLast(new Spliter())   // TCP拆包  inBound   读入
         p.addLast(new Decoder());   // 解码   inBound  读入
         p.addLast(new BusinessHandler())    // 业务处理器  inBound  读入
         p.addLast(new Encoder());   // 编码   outBound   写出
         // tail  inBound  读入
     }
});      

首先,用一个spliter将来源TCP数据包拆包,然后将拆出来的包进行decoder,传入业务处理器BusinessHandler,业务处理完encoder,输出

整个pipeline结构如下

星辰大海,Netty中的pipeline(一)

我用两种颜色区分了一下pipeline中两种不同类型的节点,

第一种类型的Handler是 ChannelInboundHandler,处理inBound事件,最典型的就是读取数据流,加工处理;

第二种类型的Handler是 ChannelOutboundHandler, 处理outBound事件,比如当调用writeAndFlush()类方法时,就会经过该种类型的handler

不管是哪种类型的handler,其外层对象 ChannelHandlerContext 之间都是通过双向链表连接,而区分一个 ChannelHandlerContext到底是in还是out,在添加节点的时候我们就可以看到netty是怎么处理的

3.0 DefaultChannelPipeline类中的addLast()方法源码解析:Netty添加节点的时候,从源码层面区分一个 ChannelHandlerContext到底是in还是out

DefaultChannelPipeline类中的addLast()方法

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {   // 对于handlers遍历,每一个item添加到pipeline里面去
        addLast(executor, null, h);    
    }
    return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {   // 保证线程安全
        // 1.检查是否有重复handler   先对要添加的ChannelHandler类对象handler校验
        checkMultiplicity(handler);   
        // 2.创建节点      使用name和handler新建AbstractChannelHandlerContext 节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点     将新建的AbstractChannelHandlerContext 节点插入
        addLast0(newCtx);
    }

    // 4.回调用户方法
    callHandlerAdded0(handler);   // 添加完成,调用用户自定义重写的方法

    return this;
}      

这里简单地用synchronized方法是为了防止多线程并发操作pipeline底层的双向链表

我们还是逐步分析上面这段代码

3.1 checkMultiplicity()源码解析:检查是否有重复handler,先校验

在用户代码添加一条handler的时候,首先会查看该handler有没有添加过

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {     // handler是ChannelHandlerAdapter的实例对象,就强制类型转换
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;  // handler是ChannelHandlerAdapter的实例对象,就强制类型转换
        if (!h.isSharable() && h.added) {   // 这个实参handler不是共享,且已经被添加到pipeline里面(added标志位为true)抛出异常
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;    // 因为下一步就要添加了,所以这里设置这个item handler的标志位added为true
    }
}      

netty使用一个成员变量added标识一个channel是否已经添加,上面这段代码很简单,如果当前要添加的Handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加

小结:checkMultiplicity()源码解析:检查是否有重复handler,先校验

如果当前要添加的Handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加

金手指1:

如果一个Handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个Handler被共用,只需要加一个@Sharable标注即可,如下

@Sharable

public class BusinessHandler {

}

金手指2:如果Handler是sharable的,一般就通过spring的注入的方式使用,不需要每次都new 一个

isSharable() 方法正是通过该Handler对应的类是否标注@Sharable来判断的,且看isSharable() 方法的源码

ChannelHandlerAdapter类

public boolean isSharable() {

Class<?> clazz = getClass(); //得到字节码对象

Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();

Boolean sharable = cache.get(clazz);

if (sharable == null) {

sharable = clazz.isAnnotationPresent(Sharable.class);

cache.put(clazz, sharable);

}

return sharable;

}

这里也可以看到,netty为了性能优化到极致,还使用了ThreadLocal来缓存Handler的状态,高并发海量连接下,每次有新连接添加Handler都会创建调用此方法

3.2 newContext()源码解析:创建节点

回到主流程,看创建上下文这段代码

3.2.1 前奏:filterName()处理名称

newCtx = newContext(group, filterName(name, handler), handler);

这里我们需要先分析 filterName(name, handler) 这段代码,这个函数用于给handler创建一个唯一性的名字

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}      

小结:源码解析filterName(),给handler创建一个唯一性的名字

我们传入的name为null,netty就给我们生成一个默认的name,否则,检查是否有重名,检查通过的话就返回

netty创建默认name的规则为 简单类名#0,下面我们来看些具体是怎么实现的

DefaultChannelPipeline类

// nameCaches 意为名称缓存,是FastThreadLocal类型,
// 新建FastThreadLocal类型对象,提供initialValue()方法的时候,这个initialValue()提供一个FastThreadLocal中的Map的具体类型,就是WeakHashMap,在Java四种引用类型里面,这是弱引用,无论内存是否足够,只要 JVM 开始进行垃圾回收,那些被弱引用关联的对象都会被回收,WeakHashMap 节约存储空间,作为缓存一些不重要的数据,且看博客【Java四种引用】
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

// 上面的name==null,要使用generateName生成随机名称,这里使用到上面的nameCaches 
private String generateName(ChannelHandler handler) {
    // 先查看缓存中是否有生成过默认name
    Map<Class<?>, String> cache = nameCaches.get();  // FastThreadLocal类型的nameCaches得到一个元素cache,类型为Map<Class<?>, String>,就是一个map,key为字节码,value为String,这个map类型的cache才是缓存本体
    Class<?> handlerType = handler.getClass();  // 获得实参item  handler的字节码
    String name = cache.get(handlerType);    // cache根据key 字节码得到String name
    // 没有生成过,就生成一个默认name,加入缓存 
    if (name == null) {   // 如果这个字节码的name==null
        name = generateName0(handlerType);  // 生成一个,放进去,参数是字节码对象
        cache.put(handlerType, name);  // 生成一个,放进去
    }

    // 生成完了或者name!=null,还要看默认name有没有冲突
    if (context0(name) != null) {   // context0(name)不是null,进入,说明生成的name有冲突,要进入处理冲突,是null,生成的name没有冲突,直接return name,结束方法
        String baseName = name.substring(0, name.length() - 1);  // 得到baseName就是生成的name的前面n-1位
        for (int i = 1;; i ++) {  // 死循环
            String newName = baseName + i;  // baseName + i,凑上第n位,不断循环
            if (context0(newName) == null) {  // 直到找到,context0(name)是null,就是name=newName break; return name;
                name = newName;  
                break;
            }
        }
    }
    return name;   // 最后返回生成的name
}      

最后一段,检测冲突的,这样修改,好理解一些(不冲突,直接return name;冲突,不断凑上最后一位,直到不冲突,返回return newName;)

// 生成完了或者name!=null,还要看默认name有没有冲突
    if (context0(name) != null) {   // context0(name)不是null,进入,说明生成的name有冲突,要进入处理冲突,是null,生成的name没有冲突,直接return name,结束方法
        String baseName = name.substring(0, name.length() - 1);  // 得到baseName就是生成的name的前面n-1位
        for (int i = 1;; i ++) {  // 死循环 i没有上限
            String newName = baseName + i;  // baseName + i,凑上第n位,不断循环
            if (context0(newName) == null) {  // 直到找到,context0(name)是null,表示当前这个newName不冲突了
               return newName; // 返回不冲突的newName
            }
        }
    }
    return name;   // 返回不冲突的name      

其实,上面代码中,有两个地方我们需要继续深入

generateName0() 当name==null,生成name

context0(name) 判断name是否冲突

netty使用一个 FastThreadLocal(后面的文章会细说)变量来缓存Handler的类和默认名称的映射关系,在生成name的时候,首先查看缓存中有没有生成过默认name(简单类名#0),如果没有生成,就调用generateName0()生成默认name,然后加入缓存

接下来还需要检查name是否和已有的name有冲突,调用context0(),查找pipeline里面有没有对应的context

private static String generateName0(Class<?> handlerType) {   // handlerType是字节码对象
    return StringUtil.simpleClassName(handlerType) + "#0";
}

public static String simpleClassName(Class<?> clazz) {  // handlerType是字节码对象
    String className = ((Class)ObjectUtil.checkNotNull(clazz, "clazz")).getName();  //得到单纯的类名
    int lastDotIdx = className.lastIndexOf(46);  // 从后面数第46个
    return lastDotIdx > -1?className.substring(lastDotIdx + 1):className;  // 如果这个数字大于-1,就是倒数46个存在,返回className 0~lastDotIdx+1
    如果这个数字小于等于-1,其实就是等于-1,就是说倒数第46个不存在,直接返回className整个类名
}      
private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;   // 得到head后继节点,从head后继节点开始循环遍历,是否name冲突,为什么不是从head开始,因为head是不存储元素的
    while (context != tail) {  // 只要还没有到尾巴,即使这里是tail,tail也是不存放元素的,这里只是比较name
        if (context.name().equals(name)) {
            return context; // 冲突,返回冲突的节点,其实这个返回值没什么用,因为只要判断一下是否为null就好,甚至可以将返回值修改为boolean类型
        }
        context = context.next;   // 指针移动
    } 
    return null;   // 不冲突
}      

context0()方法链表遍历每一个 ChannelHandlerContext,只要发现某个context的名字与待添加的name相同,就返回该context,最后抛出异常,可以看到,这个其实是一个线性搜索的过程

如果context0(name) != null 成立,说明现有的context里面已经有了一个默认name,那么就从 简单类名#1 往上一直找,直到找到一个唯一的name,比如简单类名#3( for (int i = 1;; i ++) { // 死循环 i没有上限)

如果用户代码在添加Handler的时候指定了一个name,那么要做到事仅仅为检查一下是否有重复

private void checkDuplicateName(String name) {
    if (context0(name) != null) {  // 重复,抛出异常
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}      

处理完name之后,就进入到创建context的过程,由前面的调用链得知,group为null,因此childExecutor(group)也返回null

3.2.2 newContext()创建context

DefaultChannelPipeline类

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

// 底层调用childExecutor()方法
private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

DefaultChannelHandlerContext类构造方法

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));   // 调用父类,pipeline executor name三个参数都是传递过来的,对于inbound和outbound,是使用handler作为实参调用方法得到的
    if (handler == null) {   // handler为null,抛出异常
        throw new NullPointerException("handler");
    }
    // handler不为null,就赋值
    this.handler = handler;
}      

构造函数中,DefaultChannelHandlerContext将参数回传到父类,保存Handler的引用,进入到其父类

AbstractChannelHandlerContext类,DefaultChannelHandlerContext的父类

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");   // 设置name
    this.pipeline = pipeline;     // 设置pipeline
    this.executor = executor;    // 设置executor
    this.inbound = inbound;   // 设置inbound
    this.outbound = outbound;   // 设置outbound
}      

netty中用两个字段来表示这个channelHandlerContext属于inBound还是outBound,或者两者都是,两个boolean是通过下面两个小函数来判断(见上面一段代码)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;    //  方法很简单,是否是ChannelInboundHandler类及其子类对象,是返回true,不是就返回false
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;   //  方法很简单,是否是ChannelOutboundHandler类及其子类对象,是返回true,不是就返回false
}      

3.2.3 InBound和OutBound继承体系

通过instanceof关键字根据接口类型来判断,因此,如果一个Handler实现了两类接口,那么他既是一个inBound类型的Handler,又是一个outBound类型的Handler,比如下面这个ChannelDuplexHandler类:

星辰大海,Netty中的pipeline(一)

继承体系一句话小结:

ChannelInboundHandler和ChannelOutboundHandler两个都是接口(一定不能实例化对象,所以上面instanceOf一定是它们的子类对象),都是ChannelHandler的子接口

ChannelInboundHandler有实现类ChannelInboundHandlerAdapter,这个不是重点,

重点是,ChannelDuplexHandler抽象类既是ChannelInboundHandler接口实现类,也是ChannelOutboundHandler接口实现类,

然后MessageToMessageCodec抽象类是ChannelDuplexHandler抽象类的是实现类,它可以同时完成解码decode和编码encode的操作。

常用的,将decode操作和encode操作合并到一起的codec,一般会继承 MessageToMessageCodec,而MessageToMessageCodec就是继承ChannelDuplexHandler,它可以同时完成解码decode和编码encode的工作,如下:

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;
 }      

context 创建完了之后,接下来终于要将创建完毕的context加入到pipeline中去了

3.3 addLast0()方法源码解析:尾添加节点,双向链表尾插法添加四操作

为什么addLast0()不是添加tail节点后面,而是添加到tail节点前面?

因为tail尾节点不存放数据,最后一个存放数据的节点就是tail节点的前驱节点。

注意:pipeline中的,head节点和tail节点都是不存放数据的

DefaultChannelPipeline类addLast0()方法

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1  新节点prev:尾节点前面那个
    newCtx.next = tail; // 2  新节点next:尾巴
    prev.next = newCtx; // 3  之前的尾节点前面那个next:新节点
    tail.prev = newCtx; // 4  尾节点next:新节点
}      

用下面这幅图可见简单的表示这段过程,说白了,其实就是一个双向链表的插入操作

添加节点过程

星辰大海,Netty中的pipeline(一)

操作完毕,该context就加入到pipeline中

添加节点之后

星辰大海,Netty中的pipeline(一)

到这里,pipeline添加节点的操作就完成了,你可以根据此思路掌握所有的addxxx()系列方法

3.4 源码解析:callHandlerAdded0()方法,回调用户方法并设置添加完成状态

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);    // ctx.handler() 表示context得到handler,然后调用hander的handlerAdded()方法
    ctx.setAddComplete();
}      

到了第四步,pipeline中的新节点添加完成,于是便开始回调用户代码 ctx.handler().handlerAdded(ctx);,常见的用户代码如下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 节点被添加完毕之后回调到此
        // do something
    }
}      

接下来,设置该节点的状态

AbstractChannelHandlerContext

final void setAddComplete() {   // 设置添加成功
    for (;;) {   // 死循环
        int oldState = handlerState;   // 记录之前的oldState
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {     // 死循环,知道这个节点被删除或者被成功新增,才return结束循环,否则,不断空转
            return;
        }
    }
}      

用cas修改节点的状态至:REMOVE_COMPLETE(说明该节点已经被移除) 或者 ADD_COMPLETE(该节点已经被添加)

四、pipeline删除节点

相关问题:介绍一下Netty中的Handler可插拔?

回答:netty有个最大的特性之一就是Handler可插拔,做到动态编织pipeline,

解释:在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器。

权限认证-删除Handler

下面是权限认证Handler最简单的实现,第一个数据包传来的是数据实体就是认证信息,如果校验通过,就删除此Handler,否则,直接关闭连接

// AuthHandler 是权限认证类
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    // 在channelRead0 pipeline删除节点的时候
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);  // context中的pipeline删除这个AuthHandler对象
        } else {
            ctx.close();   // 关闭这个context
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}      

重点就在 ctx.pipeline().remove(this) 这段代码

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));  // remove() 移除 就是pipeline双向链表移除
    return this;
}      

remove操作相比add简单不少,分为三个步骤:

  1. 找到待删除的节点
  2. 调整双向链表指针删除
  3. 回调用户函数

4.1 源码解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循环遍历找到待删除的节点

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);    // 从这个实参handler得到context
    if (ctx == null) {   // context为null,抛出异常
        throw new NoSuchElementException(handler.getClass().getName());
    } else {  // context不为null,返回context,这个else可以不需要
        return ctx;
    }
}

// context方法概要:将一个handler包装成一个context,或者,使用handler作为实参来实例化一个context

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;  // pipeline中,head不存放元素,直接从head.next开始
    for (;;) {   // 死循环

        if (ctx == null) {  // 直到当前ctx为null,就是pipeline没有了,return null
            return null;
        }

        if (ctx.handler() == handler) {  // 当pipeline中的节点ctx的handler == 实参handler , 就是找到了,就返回这个context 
            return ctx;
        }

        ctx = ctx.next;  // 不断向后移动
    }
}      

这里为了找到Handler对应的context,照样是通过依次遍历双向链表的方式,直到某一个context的Handler和当前Handler相同,便找到了该节点

4.2 源码解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,调整双向链表指针删除

进入到remove(getContextOrDie(handler))方法

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.调整双向链表指针删除
        remove0(ctx);  // remove0删除
    }
    // 3.回调用户函数
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;  
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1  
    next.prev = prev; // 2
}

这个remove0()方法可以简化
private static void remove0(AbstractChannelHandlerContext ctx) { // 要删除的节点是ctx
    ctx.prev.next = ctx.next; // 1   修改ctx前驱节点的next指针指向
    ctx.next.prev = ctx.prev; // 2   修改ctx后继节点的prev指针指向
}      

经历的过程要比添加节点要简单,可以用下面一幅图来表示

删除节点过程

星辰大海,Netty中的pipeline(一)

删除节点之后,最后的结果为

星辰大海,Netty中的pipeline(一)

结合这两幅图,可以很清晰地了解权限验证Handler的工作原理,另外,被删除的节点因为没有对象引用到,果过段时间就会被gc自动回收

4.3 源码解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回调用户函数

在callHandlerRemoved0(ctx)方法中,以这个被移除pipeline双链表的ctx为参数,执行这个ctx的handler的handlerRemoved()方法,这个方法是用户自定义方法,这个方法的参数是被移除pipeline双链表的ctx
如果正常,就执行handlerRemoved()方法了
如果抛出异常,执行ctx.setRemoved()方法,仅仅设置这个类变量handlerState而已
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}      

到了第三步,pipeline中的节点删除完成,于是便开始回调用户代码 ctx.handler().handlerRemoved(ctx);,常见的代码如下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 节点被删除完毕之后回调到此,可做一些资源清理
        // do something
    }
}      

最后,将该节点的状态设置为removed

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}      

removexxx系列的其他方法族大同小异,你可以根据上面的思路展开其他的系列方法,这里不再赘述

五、面试金手指

5.1 服务端初始化、新连接建立:从channel到pipeline

对于前面三篇 服务端初始化、netty中实现reactor原理、新连接建立小结:

通过前面的源码系列文章中的netty reactor线程三部曲,我们已经知道,netty的reactor线程就像是一个发动机,驱动着整个netty框架的运行,

而服务端初始化和新连接的建立正是发动机的导火线,将发动机点燃

netty在服务端初始化和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出

  1. 以新连接创建为例,新连接创建的过程中创建channel,而在创建channel的过程中创建了该channel对应的pipeline,创建完pipeline之后,自动给该pipeline添加了两个节点(head tail),即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel所有的上下文信息。
  2. pipeline是双向链表结构,添加和删除节点均只需要调整链表结构
  3. pipeline中的每个节点包着具体的处理器ChannelHandler,节点根据ChannelHandler的类型是ChannelInboundHandler还是ChannelOutboundHandler来判断该节点属于in还是out或者两者都是。

5.2 pipeline初始化

5.2.1 channel中的核心组件

星辰大海,Netty中的pipeline(一)

金手指:pipeline初始化总述

小结:在【Netty源码解析 003 新连接建立】中,对于服务端初始化和新连接建立,得到了channel的继承体系,这里对channel的成员变量解释

一个channnel中有一个pipeline,底层功能由这个pipeline完成,如pipeline.read(),pipeline是一个非循环双向链表,就是 newChannelPipeline(); new TailContext(this); new HeadContext(this);

NioSocketChannelConfig

ChannelId 就是newId,就是random

unsafe 就是 newUnsafe();

5.2.2 pipeline默认结构

星辰大海,Netty中的pipeline(一)

pipeline中的每个节点是一个ChannelHandlerContext类对象,每个context节点保存了它包裹的执行器 ChannelHandler 执行操作所需要的上下文,其实就是pipeline,因为pipeline包含了channel的引用,可以拿到所有的context信息。

默认情况下,一条pipeline会有两个节点,head和tail,下一篇文章【Netty中的pipeline(二)】我们具体分析这两个特殊的节点,今天我们重点放在pipeline。

Pipeline属性结构如下

public class DefaultChannelPipeline implements ChannelPipeline {

static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); // 没用,打印异常

private static final String HEAD_NAME = generateName0(DefaultChannelPipeline.HeadContext.class); // 一个常量,表示head的name

private static final String TAIL_NAME = generateName0(DefaultChannelPipeline.TailContext.class); // 一个常量,表示tail的name

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal() {

protected Map<Class<?>, String> initialValue() throws Exception {

return new WeakHashMap();

}

}; //存放名称name的缓存,使用弱引用,表示缓存

final AbstractChannelHandlerContext head; // pipeline中的head指针

final AbstractChannelHandlerContext tail; // pipeline中的tail指针

private final Channel channel; // pipeline中包含channel引用

private final ChannelFuture succeededFuture;

private final VoidChannelPromise voidPromise;

private final boolean touch = ResourceLeakDetector.isEnabled();

private Map<EventExecutorGroup, EventExecutor> childExecutors;

private Handle estimatorHandle;

private boolean firstRegistration = true;

private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;

private boolean registered;

对于DefaultChannelPipeline类:

1、nameCaches:存放名称name的缓存,使用弱引用,表示缓存,本文newContext()创建节点的使用filterName()节点名称的时候使用

2、head tail:下一篇文章才讲到

3、DefaultChannelPipeline构造函数中,设置channel,初始化tail head,并设置好tail head,就是DefaultChannelPipeline在新建号实例pipeline的时候,channel head tail就设置好了

5.3 pipeline添加节点

synchronized (this) { // 保证线程安全

// 1.检查是否有重复handler 先对要添加的ChannelHandler类对象handler校验

checkMultiplicity(handler);

// 2.创建节点 使用name和handler新建AbstractChannelHandlerContext 节点

newCtx = newContext(group, filterName(name, handler), handler);

// 3.添加节点 将新建的AbstractChannelHandlerContext 节点插入

addLast0(newCtx);

}

// 4.回调用户方法

callHandlerAdded0(handler); // 添加完成,调用用户自定义重写的方法

1、checkMultiplicity()源码解析:检查是否有重复handler,先校验

如果当前要添加的Handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加

金手指1:

如果一个Handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个Handler被共用,只需要加一个@Sharable标注即可,如下

@Sharable

public class BusinessHandler {

}

金手指2:如果Handler是sharable的,一般就通过spring的注入的方式使用,不需要每次都new 一个

isSharable() 方法正是通过该Handler对应的类是否标注@Sharable来判断的,且看isSharable() 方法的源码

ChannelHandlerAdapter类

public boolean isSharable() {

Class<?> clazz = getClass(); //得到字节码对象

Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();

Boolean sharable = cache.get(clazz);

if (sharable == null) {

sharable = clazz.isAnnotationPresent(Sharable.class);

cache.put(clazz, sharable);

}

return sharable;

}

这里也可以看到,netty为了性能优化到极致,还使用了ThreadLocal来缓存Handler的状态,高并发海量连接下,每次有新连接添加Handler都会创建调用此方法

2、newContext()源码解析:创建节点

2.1 前奏:filterName()处理名称

2.1 详细:源码解析filterName(),给handler创建一个唯一性的名字

我们传入的name为null,netty就给我们生成一个默认的name,否则,检查是否有重名,检查通过的话就返回

2.1 缓存 // nameCaches 意为名称缓存,是FastThreadLocal类型,

// 新建FastThreadLocal类型对象,提供initialValue()方法的时候,这个initialValue()提供一个FastThreadLocal中的Map的具体类型,就是WeakHashMap,在Java四种引用类型里面,这是弱引用,无论内存是否足够,只要 JVM 开始进行垃圾回收,那些被弱引用关联的对象都会被回收,WeakHashMap 节约存储空间,作为缓存一些不重要的数据,且看博客【Java四种引用】

2.2 newContext()创建context

2.3 InBound和OutBound继承体系

星辰大海,Netty中的pipeline(一)

继承体系一句话小结:

ChannelInboundHandler和ChannelOutboundHandler两个都是接口(一定不能实例化对象,所以上面instanceOf一定是它们的子类对象),都是ChannelHandler的子接口

ChannelInboundHandler有实现类ChannelInboundHandlerAdapter,这个不是重点,

重点是,ChannelDuplexHandler抽象类既是ChannelInboundHandler接口实现类,也是ChannelOutboundHandler接口实现类,

然后MessageToMessageCodec抽象类是ChannelDuplexHandler抽象类的是实现类,它可以同时完成解码decode和编码encode的操作。

3、addLast0()方法源码解析:尾添加节点,双向链表尾插法添加四操作

为什么addLast0()不是添加tail节点后面,而是添加到tail节点前面?

因为tail尾节点不存放数据,最后一个存放数据的节点就是tail节点的前驱节点。

注意:pipeline中的,head节点和tail节点都是不存放数据的

4、源码解析:callHandlerAdded0()方法,回调用户方法并设置添加完成状态

5.4 pipeline删除节点

相关问题:介绍一下Netty中的Handler可插拔?

回答:netty有个最大的特性之一就是Handler可插拔,做到动态编织pipeline,

解释:在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器。

相关问题:pipeline删除节点三步骤

1、源码解析:remove(getContextOrDie(handler))方法中的getContextOrDie(handler),使用循环遍历找到待删除的节点

2、源码解析:remove(getContextOrDie(handler))方法中的remove0(ctx)方法,调整双向链表指针删除

3、源码解析:remove(getContextOrDie(handler))方法中的callHandlerRemoved0(ctx)方法,回调用户函数

六、小结

【Netty源码解析004】Netty中的pipeline(一),完成了。