天天看点

Sentinel 源码分析(一)总体逻辑 

总体逻辑

入口

public enum EntryType {
    /**
     * Inbound traffic
     */
    IN("IN"),
    /**
     * Outbound traffic
     */
    OUT("OUT");

}
    
public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

//CtSph
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
           

entry()方法总会把参数包装成一个资源,ResourceWrapper。

ResourceWrapper 

资源包装类,支持按资源名称和方法2种资源。

public abstract class ResourceWrapper {
//资源名称
    protected final String name;
//资源实体类型
    protected final EntryType entryType;
//资源类型
    protected final int resourceType;

}

public final class ResourceTypeConstants {

    public static final int COMMON = 0;
    public static final int COMMON_WEB = 1;
    public static final int COMMON_RPC = 2;
    public static final int COMMON_API_GATEWAY = 3;
    public static final int COMMON_DB_SQL = 4;

    private ResourceTypeConstants() {}
}
           
public class StringResourceWrapper extends ResourceWrapper
{
}
           
public class MethodResourceWrapper extends ResourceWrapper {

    private final transient Method method;

}
           

Entry

Entry封装了当前调用的信息。

CtEntry

SphResourceTypeSupport

提供创建一个被保护的资源的能力。

public interface SphResourceTypeSupport {
    Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
        throws BlockException;

    Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                        Object[] args) throws BlockException;
    AsyncEntry asyncEntryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                                  Object[] args) throws BlockException;
}
           

 Sph 

Sph接口提供一些方便的方法创建entry。

public interface Sph extends SphResourceTypeSupport {
    Entry entry(String name) throws BlockException;
    Entry entry(Method method) throws BlockException;
    
    Entry entry(Method method, int count) throws BlockException;
    Entry entry(String name, int count) throws BlockException;
    
    Entry entry(Method method, EntryType type) throws BlockException;
    Entry entry(String name, EntryType type) throws BlockException;

    Entry entry(Method method, EntryType type, int count) throws BlockException;
    Entry entry(String name, EntryType type, int count) throws BlockException;

    Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException;
    Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;

    AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;
    
    Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException;
    Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized, Object... args)
        throws BlockException;
}
           

 CtSph

CtSph是Sph的具体实现。内部会存储每一种资源对应的ProcessorSlotChain。sentinel实现限流降级的原理,其核心就是一堆Slot组成的调用链。

public class CtSph implements Sph {

    private static final Object[] OBJECTS0 = new Object[0];
    //每一种资源,一个功能插槽链。
    private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
        = new HashMap<ResourceWrapper, ProcessorSlotChain>();

    private static final Object LOCK = new Object();

}
           

ContextUtil

ContextUtil,上下文Context主要用于参数的传递。Context采用ThreadLocal变量。

private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//存储所有的EntranceNode,每个EntranceNode关联一个context Name
 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
           

 初始化

初始化Map,构造默认context的EntranceNode,默认流量类型是入口流量。并把此Node加入ROOT的子集合中。

private static void initDefaultContext() {
        String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
        EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
        Constants.ROOT.addChild(node);
        contextNameNodeMap.put(defaultContextName, node);
    }
           

构造Context

Context的构造由trueEnter()方法实现。trueEnter()主要做了以下工作:

  1. 根据

    ContextName

    生成

    entranceNode

    ,并加入缓存,每个

    ContextName

    对应一个入口节点

    entranceNode

  2. 根据

    ContextName

    entranceNode

    初始化上下文对象,并将上下文对象设置到当前线程中
  3. 所有entraceNode都作为ROOT的子节点。
protected static Context trueEnter(String name, String origin) {
	  // 先从ThreadLocal中尝试获取,获取到则直接返回
    Context context = contextHolder.get();
    if (context == null) {
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        // 尝试从缓存中获取该上下文名称对应的 入口节点
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
      		 // 判断缓存中入口节点数量是否大于2000
            if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                setNullContext();
                return NULL_CONTEXT;
            } else {
                try {
                		// 加锁
                    LOCK.lock();
                    // 双重检查锁
                    node = contextNameNodeMap.get(name);
                    if (node == null) {
                    	 // 判断缓存中入口节点数量是否大于2000
                        if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                            setNullContext();
                            return NULL_CONTEXT;
                        } else {
                            // 根据上下文名称生成入口节点(entranceNode)
                            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                            // 加入至全局根节点下
                            Constants.ROOT.addChild(node);
                            // 加入缓存中
                            Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                            newMap.putAll(contextNameNodeMap);
                            newMap.put(name, node);
                            contextNameNodeMap = newMap;
                        }
                    }
                } finally {
                    LOCK.unlock();
                }
            }
        }
        // 初始化上下文对象
        context = new Context(node, name);
        context.setOrigin(origin);
        // 设置到当前线程中
        contextHolder.set(context);
    }

    return context;
}
           

entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();

        //chain为空,是由某种情况引起的,表示context的数量超过阈值。
        if (context instanceof NullContext) {
            //此处只初始化entry,不做任何规则检查。
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            //默认context
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // 全局开关如果关闭,则不做任何检查。
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        //生成Slot链
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);


        //资源数量超过最大值(MAX_SLOT_CHAIN_SIZE),则不检查。
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
        // 开始执行Slot链 调用逻辑
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            //清除上下文
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // 除非Sentinel内部存在错误,否则不应发生这种情况。
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
           

构造Context

构造Context,通过内部ContextUtil实现--InternalContextUtil,调用trueEnter方法。

private final static class InternalContextUtil extends ContextUtil {
        static Context internalEnter(String name) {
            return trueEnter(name, "");
        }

        static Context internalEnter(String name, String origin) {
            return trueEnter(name, origin);
        }
    }
           

查找SlotChain

lookProcessChain

方法为指定资源生成Slot链,如果缓存中没有,则初始化一个。

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 根据资源尝试从全局缓存中获取
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        //双重检查锁
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // 判断资源数是否大于6000
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                // 初始化Slot链
                chain = SlotChainProvider.newSlotChain();
                //采用WriteOnCopy模式,因为写的机会相对少。
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                
                chainMap = newMap;
            }
        }
    }
    return chain;
}
           

初始化SlotChain

SlotChain初始化由SlotChainBuilder初始化,Sentinel默认仅实现了一个DefaultSlotChainBuilder 。

public final class SlotChainProvider {

    //Builder
    private static volatile SlotChainBuilder slotChainBuilder = null;

    //方法非线程安全的,由外部控制lock
    public static ProcessorSlotChain newSlotChain() {
    // 判断是否已经初始化过
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }
        // 通过SPI创建builder.
        slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
                + slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }
           

DefaultSlotChainBuilder 

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        //默认 Chain。
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        //加载默认 Slot。
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}
           

默认Slot

默认8种具体Slot,根据@SpiOrder注解排序,数值越小,优先级越高。都继承了AbstractLinkedProcessorSlot,用于构造链表元素。fireEntry传递下去,但是本节点不执行。

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }
}
           
Sentinel 源码分析(一)总体逻辑 
  • NodeSelectorSlot

     负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;(-10000)
  • ClusterBuilderSlot

     则用于存储资源的统计信息以及调用者信息,例如该资源的 

    RT, QPS, thread count

     等等,这些信息将用作为多维度限流,降级的依据;(-9000)
  • LogSlot,记录Blocking Exception。(-8000)
  • StatisticSlot

     则用于记录、统计不同纬度的 

    runtime

     指标监控信息;(-7000)
  • AuthoritySlot

     则根据配置的黑白名单和调用来源信息,来做黑白名单控制;(-6000)
  • SystemSlot

     则通过系统的状态,例如 

    load1

     等,来控制总的入口流量(-5000)
  • FlowSlot

     则用于根据预设的限流规则以及前面 

    slot

     统计的状态,来进行流量控制;(-2000)
  • DegradeSlot

     则通过统计信息以及预设的规则,来做熔断降级;(-1000)
Sentinel 源码分析(一)总体逻辑