由上一篇《Spring Cloud Alibaba-Sentinel源码阅读(二)-流控的主流程》可知,Sentinel的流控主流程就是一条ProcessorSlot 处理链,调用 ProcessorSlotChain 的 entry 方法,就是依次调用这些ProcessorSlot 的方法。而流控相关的最重要的两个ProcessorSlot 就是StatisticSlot和FlowSlot。
一、StatisticSlot 收集实时消息
StatisticSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 先依次执行后面的ProcessorSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//请求通过,增加线程数
// Request passed, add thread count and pass count.
node.increaseThreadNum();
//请求通过,增加请求通过数
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// 被限流,节点Block数加一
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
DefaultNode#addPassRequest
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
StatisticNode#addPassRequest
public void addPassRequest(int count) {
// 按照秒级维度统计
rollingCounterInSecond.addPass(count);
// 按照分钟维度统计
rollingCounterInMinute.addPass(count);
}
先来看下rollingCounterInSecond、rollingCounterInMinute的初始赋值:
private transient volatile Metric rollingCounterInSecond
= new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
秒级维度的第一个入参SampleCountProperty.SAMPLE_COUNT的值是2,IntervalProperty.INTERVAL的值是1000,代表的意思是1000ms(即1s)被划分成2个时间窗口。
ArrayMetric#addPass
public void addPass(int count) {
// 获取对应的滑动时间窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 在对应的滑动时间窗口上进行统计
wrap.value().addPass(count);
}
LeapArray#currentWindow(long)
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间会落在一个采集间隔 ( LeapArray ) 中哪一个时间窗口中
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间戳所在的时间窗口的开始时间,即要计算出 WindowWrap 中 windowStart 的值,
// 其实就是要算出小于当前时间戳,并且是 windowLengthInMs 的整数倍最大的数字,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )。
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
// 死循环查找当前的时间窗口,这里之所有需要循环,是因为可能多个线程都在获取当前时间窗口。
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
// 如果按照上面计算出来的下标在array中没找到,则说明之前没有做流控,需要新建一个bucket
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 使用CAS 机制来更新 LeapArray 数组中的 元素,因为同一时间,可能有多个线程都在获取当前时间窗口对象,
// 但该时间窗口对象还未创建,这里就是避免创建多个,导致统计数据被覆盖,如果用 CAS 更新成功的线程,则返回新建好的 WindowWrap ,
// 若CAS 设置不成功的线程继续执行循环
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
// 如果原先存在的窗口开始时间小于当前时间戳计算出来的开始时间,则表示 bucket 已被弃用。则需要将开始时间重置到新时间戳对应的开始时间戳。
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
二、FlowSlot 流控
FlowSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 判断是否触发流控规则
checkFlow(resourceWrapper, context, node, count, prioritized);
// 继续执行后续的ProcessorSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 获取Sentinel DashBoard 上配置的流控规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 依次执行流控规则
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
FlowRuleChecker#canPassCheck()
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
String limitApp = rule.getLimitApp();
// 如果限流规则没有配置针对来源,则直接默认通过,
// 该值在配置时,默认为 default,即对所有调用发起方都生效
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) { // @2
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
FlowRuleChecker#passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 流控模式:首先根据流控模式(strategy)选择一个合适的 Node,如果为空,则直接返回 true,表示放行。
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 流控效果:调用 FlowRule 内部持有的流量控制器来判断是否符合流控规则,最终调用的是 TrafficShapingController canPass 方法。
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
上面的两个方法分别对应sentinel dashboard的流控模式、流控效果

默认模式(快速失败)DefaultController#canPass()
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 当前已消耗的令牌数量,即当前时间窗口内已创建的线程数量(FLOW_GRADE_THREAD) 或已通过的请求个数(FLOW_GRADE_QPS)
int curCount = avgUsedTokens(node);
// 如果 已消耗的令牌数+当前请求的令牌数(正常是1) > 总令牌数,则需要根据是否有优先级进行不同的处理;
// 否则直接返回true,表示通过。
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 尝试抢占下一个滑动窗口的令牌,并返回该时间窗口所剩余的时间,如果获取失败,则返回 OccupyTimeoutProperty.getOccupyTimeout() 值,
// 该返回值的作用就是当前申请资源的线程将 sleep(阻塞)的时间。
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
// 如果 waitInMs 小于抢占的最大超时时间,则在下一个时间窗口中增加对应令牌数,并且线程将sleep
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
WarmUpController是基于漏桶算法,RateLimiterController是基于令牌桶算法