總體邏輯
入口
public enum EntryType {
/**
* Inbound traffic
*/
IN("IN"),
/**
* Outbound traffic
*/
OUT("OUT");
}
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
//CtSph
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
entry()方法總會把參數包裝成一個資源,ResourceWrapper。
ResourceWrapper
資源包裝類,支援按資源名稱和方法2種資源。
public abstract class ResourceWrapper {
//資源名稱
protected final String name;
//資源實體類型
protected final EntryType entryType;
//資源類型
protected final int resourceType;
}
public final class ResourceTypeConstants {
public static final int COMMON = 0;
public static final int COMMON_WEB = 1;
public static final int COMMON_RPC = 2;
public static final int COMMON_API_GATEWAY = 3;
public static final int COMMON_DB_SQL = 4;
private ResourceTypeConstants() {}
}
public class StringResourceWrapper extends ResourceWrapper
{
}
public class MethodResourceWrapper extends ResourceWrapper {
private final transient Method method;
}
Entry
Entry封裝了目前調用的資訊。
CtEntry
SphResourceTypeSupport
提供建立一個被保護的資源的能力。
public interface SphResourceTypeSupport {
Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
throws BlockException;
Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException;
AsyncEntry asyncEntryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException;
}
Sph
Sph接口提供一些友善的方法建立entry。
public interface Sph extends SphResourceTypeSupport {
Entry entry(String name) throws BlockException;
Entry entry(Method method) throws BlockException;
Entry entry(Method method, int count) throws BlockException;
Entry entry(String name, int count) throws BlockException;
Entry entry(Method method, EntryType type) throws BlockException;
Entry entry(String name, EntryType type) throws BlockException;
Entry entry(Method method, EntryType type, int count) throws BlockException;
Entry entry(String name, EntryType type, int count) throws BlockException;
Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException;
Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;
Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException;
Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized, Object... args)
throws BlockException;
}
CtSph
CtSph是Sph的具體實作。内部會存儲每一種資源對應的ProcessorSlotChain。sentinel實作限流降級的原理,其核心就是一堆Slot組成的調用鍊。
public class CtSph implements Sph {
private static final Object[] OBJECTS0 = new Object[0];
//每一種資源,一個功能插槽鍊。
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
private static final Object LOCK = new Object();
}
ContextUtil
ContextUtil,上下文Context主要用于參數的傳遞。Context采用ThreadLocal變量。
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//存儲所有的EntranceNode,每個EntranceNode關聯一個context Name
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
初始化
初始化Map,構造預設context的EntranceNode,預設流量類型是入口流量。并把此Node加入ROOT的子集合中。
private static void initDefaultContext() {
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
Constants.ROOT.addChild(node);
contextNameNodeMap.put(defaultContextName, node);
}
構造Context
Context的構造由trueEnter()方法實作。trueEnter()主要做了以下工作:
- 根據
生成ContextName
,并加入緩存,每個entranceNode
對應一個入口節點ContextName
entranceNode
- 根據
和ContextName
初始化上下文對象,并将上下文對象設定到目前線程中entranceNode
- 所有entraceNode都作為ROOT的子節點。
protected static Context trueEnter(String name, String origin) {
// 先從ThreadLocal中嘗試擷取,擷取到則直接傳回
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 嘗試從緩存中擷取該上下文名稱對應的 入口節點
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 判斷緩存中入口節點數量是否大于2000
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
try {
// 加鎖
LOCK.lock();
// 雙重檢查鎖
node = contextNameNodeMap.get(name);
if (node == null) {
// 判斷緩存中入口節點數量是否大于2000
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 根據上下文名稱生成入口節點(entranceNode)
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 加入至全局根節點下
Constants.ROOT.addChild(node);
// 加入緩存中
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
// 初始化上下文對象
context = new Context(node, name);
context.setOrigin(origin);
// 設定到目前線程中
contextHolder.set(context);
}
return context;
}
entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
//chain為空,是由某種情況引起的,表示context的數量超過門檻值。
if (context instanceof NullContext) {
//此處隻初始化entry,不做任何規則檢查。
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
//預設context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 全局開關如果關閉,則不做任何檢查。
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//生成Slot鍊
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//資源數量超過最大值(MAX_SLOT_CHAIN_SIZE),則不檢查。
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 開始執行Slot鍊 調用邏輯
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
//清除上下文
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// 除非Sentinel内部存在錯誤,否則不應發生這種情況。
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
構造Context
構造Context,通過内部ContextUtil實作--InternalContextUtil,調用trueEnter方法。
private final static class InternalContextUtil extends ContextUtil {
static Context internalEnter(String name) {
return trueEnter(name, "");
}
static Context internalEnter(String name, String origin) {
return trueEnter(name, origin);
}
}
查找SlotChain
lookProcessChain
方法為指定資源生成Slot鍊,如果緩存中沒有,則初始化一個。
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 根據資源嘗試從全局緩存中擷取
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
//雙重檢查鎖
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 判斷資源數是否大于6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 初始化Slot鍊
chain = SlotChainProvider.newSlotChain();
//采用WriteOnCopy模式,因為寫的機會相對少。
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
初始化SlotChain
SlotChain初始化由SlotChainBuilder初始化,Sentinel預設僅實作了一個DefaultSlotChainBuilder 。
public final class SlotChainProvider {
//Builder
private static volatile SlotChainBuilder slotChainBuilder = null;
//方法非線程安全的,由外部控制lock
public static ProcessorSlotChain newSlotChain() {
// 判斷是否已經初始化過
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 通過SPI建立builder.
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
DefaultSlotChainBuilder
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
//預設 Chain。
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//加載預設 Slot。
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
預設Slot
預設8種具體Slot,根據@SpiOrder注解排序,數值越小,優先級越高。都繼承了AbstractLinkedProcessorSlot,用于構造連結清單元素。fireEntry傳遞下去,但是本節點不執行。
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
}
-
負責收集資源的路徑,并将這些資源的調用路徑,以樹狀結構存儲起來,用于根據調用路徑來限流降級;(-10000)NodeSelectorSlot
-
則用于存儲資源的統計資訊以及調用者資訊,例如該資源的ClusterBuilderSlot
等等,這些資訊将用作為多元度限流,降級的依據;(-9000)RT, QPS, thread count
- LogSlot,記錄Blocking Exception。(-8000)
-
則用于記錄、統計不同緯度的StatisticSlot
名額監控資訊;(-7000)runtime
-
則根據配置的黑白名單和調用來源資訊,來做黑白名單控制;(-6000)AuthoritySlot
-
則通過系統的狀态,例如SystemSlot
等,來控制總的入口流量(-5000)load1
-
則用于根據預設的限流規則以及前面FlowSlot
統計的狀态,來進行流量控制;(-2000)slot
-
則通過統計資訊以及預設的規則,來做熔斷降級;(-1000)DegradeSlot