总体逻辑
入口
public enum EntryType {
/**
* Inbound traffic
*/
IN("IN"),
/**
* Outbound traffic
*/
OUT("OUT");
}
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
//CtSph
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
entry()方法总会把参数包装成一个资源,ResourceWrapper。
ResourceWrapper
资源包装类,支持按资源名称和方法2种资源。
public abstract class ResourceWrapper {
//资源名称
protected final String name;
//资源实体类型
protected final EntryType entryType;
//资源类型
protected final int resourceType;
}
public final class ResourceTypeConstants {
public static final int COMMON = 0;
public static final int COMMON_WEB = 1;
public static final int COMMON_RPC = 2;
public static final int COMMON_API_GATEWAY = 3;
public static final int COMMON_DB_SQL = 4;
private ResourceTypeConstants() {}
}
public class StringResourceWrapper extends ResourceWrapper
{
}
public class MethodResourceWrapper extends ResourceWrapper {
private final transient Method method;
}
Entry
Entry封装了当前调用的信息。
CtEntry
SphResourceTypeSupport
提供创建一个被保护的资源的能力。
public interface SphResourceTypeSupport {
Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
throws BlockException;
Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException;
AsyncEntry asyncEntryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException;
}
Sph
Sph接口提供一些方便的方法创建entry。
public interface Sph extends SphResourceTypeSupport {
Entry entry(String name) throws BlockException;
Entry entry(Method method) throws BlockException;
Entry entry(Method method, int count) throws BlockException;
Entry entry(String name, int count) throws BlockException;
Entry entry(Method method, EntryType type) throws BlockException;
Entry entry(String name, EntryType type) throws BlockException;
Entry entry(Method method, EntryType type, int count) throws BlockException;
Entry entry(String name, EntryType type, int count) throws BlockException;
Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException;
Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;
Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException;
Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized, Object... args)
throws BlockException;
}
CtSph
CtSph是Sph的具体实现。内部会存储每一种资源对应的ProcessorSlotChain。sentinel实现限流降级的原理,其核心就是一堆Slot组成的调用链。
public class CtSph implements Sph {
private static final Object[] OBJECTS0 = new Object[0];
//每一种资源,一个功能插槽链。
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
private static final Object LOCK = new Object();
}
ContextUtil
ContextUtil,上下文Context主要用于参数的传递。Context采用ThreadLocal变量。
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//存储所有的EntranceNode,每个EntranceNode关联一个context Name
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
初始化
初始化Map,构造默认context的EntranceNode,默认流量类型是入口流量。并把此Node加入ROOT的子集合中。
private static void initDefaultContext() {
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
Constants.ROOT.addChild(node);
contextNameNodeMap.put(defaultContextName, node);
}
构造Context
Context的构造由trueEnter()方法实现。trueEnter()主要做了以下工作:
- 根据
生成ContextName
,并加入缓存,每个entranceNode
对应一个入口节点ContextName
entranceNode
- 根据
和ContextName
初始化上下文对象,并将上下文对象设置到当前线程中entranceNode
- 所有entraceNode都作为ROOT的子节点。
protected static Context trueEnter(String name, String origin) {
// 先从ThreadLocal中尝试获取,获取到则直接返回
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 尝试从缓存中获取该上下文名称对应的 入口节点
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 判断缓存中入口节点数量是否大于2000
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
try {
// 加锁
LOCK.lock();
// 双重检查锁
node = contextNameNodeMap.get(name);
if (node == null) {
// 判断缓存中入口节点数量是否大于2000
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 根据上下文名称生成入口节点(entranceNode)
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 加入至全局根节点下
Constants.ROOT.addChild(node);
// 加入缓存中
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
// 初始化上下文对象
context = new Context(node, name);
context.setOrigin(origin);
// 设置到当前线程中
contextHolder.set(context);
}
return context;
}
entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
//chain为空,是由某种情况引起的,表示context的数量超过阈值。
if (context instanceof NullContext) {
//此处只初始化entry,不做任何规则检查。
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
//默认context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 全局开关如果关闭,则不做任何检查。
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//生成Slot链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//资源数量超过最大值(MAX_SLOT_CHAIN_SIZE),则不检查。
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 开始执行Slot链 调用逻辑
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
//清除上下文
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// 除非Sentinel内部存在错误,否则不应发生这种情况。
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
构造Context
构造Context,通过内部ContextUtil实现--InternalContextUtil,调用trueEnter方法。
private final static class InternalContextUtil extends ContextUtil {
static Context internalEnter(String name) {
return trueEnter(name, "");
}
static Context internalEnter(String name, String origin) {
return trueEnter(name, origin);
}
}
查找SlotChain
lookProcessChain
方法为指定资源生成Slot链,如果缓存中没有,则初始化一个。
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 根据资源尝试从全局缓存中获取
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
//双重检查锁
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 判断资源数是否大于6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 初始化Slot链
chain = SlotChainProvider.newSlotChain();
//采用WriteOnCopy模式,因为写的机会相对少。
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
初始化SlotChain
SlotChain初始化由SlotChainBuilder初始化,Sentinel默认仅实现了一个DefaultSlotChainBuilder 。
public final class SlotChainProvider {
//Builder
private static volatile SlotChainBuilder slotChainBuilder = null;
//方法非线程安全的,由外部控制lock
public static ProcessorSlotChain newSlotChain() {
// 判断是否已经初始化过
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 通过SPI创建builder.
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
DefaultSlotChainBuilder
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
//默认 Chain。
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//加载默认 Slot。
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
默认Slot
默认8种具体Slot,根据@SpiOrder注解排序,数值越小,优先级越高。都继承了AbstractLinkedProcessorSlot,用于构造链表元素。fireEntry传递下去,但是本节点不执行。
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
}
-
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;(-10000)NodeSelectorSlot
-
则用于存储资源的统计信息以及调用者信息,例如该资源的ClusterBuilderSlot
等等,这些信息将用作为多维度限流,降级的依据;(-9000)RT, QPS, thread count
- LogSlot,记录Blocking Exception。(-8000)
-
则用于记录、统计不同纬度的StatisticSlot
指标监控信息;(-7000)runtime
-
则根据配置的黑白名单和调用来源信息,来做黑白名单控制;(-6000)AuthoritySlot
-
则通过系统的状态,例如SystemSlot
等,来控制总的入口流量(-5000)load1
-
则用于根据预设的限流规则以及前面FlowSlot
统计的状态,来进行流量控制;(-2000)slot
-
则通过统计信息以及预设的规则,来做熔断降级;(-1000)DegradeSlot