天天看點

Sentinel 源碼分析(一)總體邏輯 

總體邏輯

入口

public enum EntryType {
    /**
     * Inbound traffic
     */
    IN("IN"),
    /**
     * Outbound traffic
     */
    OUT("OUT");

}
    
public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

//CtSph
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
           

entry()方法總會把參數包裝成一個資源,ResourceWrapper。

ResourceWrapper 

資源包裝類,支援按資源名稱和方法2種資源。

public abstract class ResourceWrapper {
//資源名稱
    protected final String name;
//資源實體類型
    protected final EntryType entryType;
//資源類型
    protected final int resourceType;

}

public final class ResourceTypeConstants {

    public static final int COMMON = 0;
    public static final int COMMON_WEB = 1;
    public static final int COMMON_RPC = 2;
    public static final int COMMON_API_GATEWAY = 3;
    public static final int COMMON_DB_SQL = 4;

    private ResourceTypeConstants() {}
}
           
public class StringResourceWrapper extends ResourceWrapper
{
}
           
public class MethodResourceWrapper extends ResourceWrapper {

    private final transient Method method;

}
           

Entry

Entry封裝了目前調用的資訊。

CtEntry

SphResourceTypeSupport

提供建立一個被保護的資源的能力。

public interface SphResourceTypeSupport {
    Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
        throws BlockException;

    Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                        Object[] args) throws BlockException;
    AsyncEntry asyncEntryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                                  Object[] args) throws BlockException;
}
           

 Sph 

Sph接口提供一些友善的方法建立entry。

public interface Sph extends SphResourceTypeSupport {
    Entry entry(String name) throws BlockException;
    Entry entry(Method method) throws BlockException;
    
    Entry entry(Method method, int count) throws BlockException;
    Entry entry(String name, int count) throws BlockException;
    
    Entry entry(Method method, EntryType type) throws BlockException;
    Entry entry(String name, EntryType type) throws BlockException;

    Entry entry(Method method, EntryType type, int count) throws BlockException;
    Entry entry(String name, EntryType type, int count) throws BlockException;

    Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException;
    Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;

    AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;
    
    Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException;
    Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized, Object... args)
        throws BlockException;
}
           

 CtSph

CtSph是Sph的具體實作。内部會存儲每一種資源對應的ProcessorSlotChain。sentinel實作限流降級的原理,其核心就是一堆Slot組成的調用鍊。

public class CtSph implements Sph {

    private static final Object[] OBJECTS0 = new Object[0];
    //每一種資源,一個功能插槽鍊。
    private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
        = new HashMap<ResourceWrapper, ProcessorSlotChain>();

    private static final Object LOCK = new Object();

}
           

ContextUtil

ContextUtil,上下文Context主要用于參數的傳遞。Context采用ThreadLocal變量。

private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//存儲所有的EntranceNode,每個EntranceNode關聯一個context Name
 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
           

 初始化

初始化Map,構造預設context的EntranceNode,預設流量類型是入口流量。并把此Node加入ROOT的子集合中。

private static void initDefaultContext() {
        String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
        EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
        Constants.ROOT.addChild(node);
        contextNameNodeMap.put(defaultContextName, node);
    }
           

構造Context

Context的構造由trueEnter()方法實作。trueEnter()主要做了以下工作:

  1. 根據

    ContextName

    生成

    entranceNode

    ,并加入緩存,每個

    ContextName

    對應一個入口節點

    entranceNode

  2. 根據

    ContextName

    entranceNode

    初始化上下文對象,并将上下文對象設定到目前線程中
  3. 所有entraceNode都作為ROOT的子節點。
protected static Context trueEnter(String name, String origin) {
	  // 先從ThreadLocal中嘗試擷取,擷取到則直接傳回
    Context context = contextHolder.get();
    if (context == null) {
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        // 嘗試從緩存中擷取該上下文名稱對應的 入口節點
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
      		 // 判斷緩存中入口節點數量是否大于2000
            if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                setNullContext();
                return NULL_CONTEXT;
            } else {
                try {
                		// 加鎖
                    LOCK.lock();
                    // 雙重檢查鎖
                    node = contextNameNodeMap.get(name);
                    if (node == null) {
                    	 // 判斷緩存中入口節點數量是否大于2000
                        if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                            setNullContext();
                            return NULL_CONTEXT;
                        } else {
                            // 根據上下文名稱生成入口節點(entranceNode)
                            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                            // 加入至全局根節點下
                            Constants.ROOT.addChild(node);
                            // 加入緩存中
                            Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                            newMap.putAll(contextNameNodeMap);
                            newMap.put(name, node);
                            contextNameNodeMap = newMap;
                        }
                    }
                } finally {
                    LOCK.unlock();
                }
            }
        }
        // 初始化上下文對象
        context = new Context(node, name);
        context.setOrigin(origin);
        // 設定到目前線程中
        contextHolder.set(context);
    }

    return context;
}
           

entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();

        //chain為空,是由某種情況引起的,表示context的數量超過門檻值。
        if (context instanceof NullContext) {
            //此處隻初始化entry,不做任何規則檢查。
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            //預設context
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // 全局開關如果關閉,則不做任何檢查。
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        //生成Slot鍊
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);


        //資源數量超過最大值(MAX_SLOT_CHAIN_SIZE),則不檢查。
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
        // 開始執行Slot鍊 調用邏輯
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            //清除上下文
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // 除非Sentinel内部存在錯誤,否則不應發生這種情況。
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
           

構造Context

構造Context,通過内部ContextUtil實作--InternalContextUtil,調用trueEnter方法。

private final static class InternalContextUtil extends ContextUtil {
        static Context internalEnter(String name) {
            return trueEnter(name, "");
        }

        static Context internalEnter(String name, String origin) {
            return trueEnter(name, origin);
        }
    }
           

查找SlotChain

lookProcessChain

方法為指定資源生成Slot鍊,如果緩存中沒有,則初始化一個。

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 根據資源嘗試從全局緩存中擷取
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        //雙重檢查鎖
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // 判斷資源數是否大于6000
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                // 初始化Slot鍊
                chain = SlotChainProvider.newSlotChain();
                //采用WriteOnCopy模式,因為寫的機會相對少。
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                
                chainMap = newMap;
            }
        }
    }
    return chain;
}
           

初始化SlotChain

SlotChain初始化由SlotChainBuilder初始化,Sentinel預設僅實作了一個DefaultSlotChainBuilder 。

public final class SlotChainProvider {

    //Builder
    private static volatile SlotChainBuilder slotChainBuilder = null;

    //方法非線程安全的,由外部控制lock
    public static ProcessorSlotChain newSlotChain() {
    // 判斷是否已經初始化過
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }
        // 通過SPI建立builder.
        slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
                + slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }
           

DefaultSlotChainBuilder 

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        //預設 Chain。
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        //加載預設 Slot。
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}
           

預設Slot

預設8種具體Slot,根據@SpiOrder注解排序,數值越小,優先級越高。都繼承了AbstractLinkedProcessorSlot,用于構造連結清單元素。fireEntry傳遞下去,但是本節點不執行。

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }
}
           
Sentinel 源碼分析(一)總體邏輯 
  • NodeSelectorSlot

     負責收集資源的路徑,并将這些資源的調用路徑,以樹狀結構存儲起來,用于根據調用路徑來限流降級;(-10000)
  • ClusterBuilderSlot

     則用于存儲資源的統計資訊以及調用者資訊,例如該資源的 

    RT, QPS, thread count

     等等,這些資訊将用作為多元度限流,降級的依據;(-9000)
  • LogSlot,記錄Blocking Exception。(-8000)
  • StatisticSlot

     則用于記錄、統計不同緯度的 

    runtime

     名額監控資訊;(-7000)
  • AuthoritySlot

     則根據配置的黑白名單和調用來源資訊,來做黑白名單控制;(-6000)
  • SystemSlot

     則通過系統的狀态,例如 

    load1

     等,來控制總的入口流量(-5000)
  • FlowSlot

     則用于根據預設的限流規則以及前面 

    slot

     統計的狀态,來進行流量控制;(-2000)
  • DegradeSlot

     則通過統計資訊以及預設的規則,來做熔斷降級;(-1000)
Sentinel 源碼分析(一)總體邏輯