天天看點

Spring Cloud Alibaba-Sentinel源碼閱讀(三)-流控原理

  由上一篇《Spring Cloud Alibaba-Sentinel源碼閱讀(二)-流控的主流程》可知,Sentinel的流控主流程就是一條ProcessorSlot 處理鍊,調用 ProcessorSlotChain 的 entry 方法,就是依次調用這些ProcessorSlot 的方法。而流控相關的最重要的兩個ProcessorSlot 就是StatisticSlot和FlowSlot。

一、StatisticSlot 收集實時消息

StatisticSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
	try {
	
		// 先依次執行後面的ProcessorSlot
		fireEntry(context, resourceWrapper, node, count, prioritized, args);
		
		//請求通過,增加線程數
		// Request passed, add thread count and pass count.
		node.increaseThreadNum();
		//請求通過,增加請求通過數
		node.addPassRequest(count);

		if (context.getCurEntry().getOriginNode() != null) {
			// Add count for origin node.
			context.getCurEntry().getOriginNode().increaseThreadNum();
			context.getCurEntry().getOriginNode().addPassRequest(count);
		}

		if (resourceWrapper.getEntryType() == EntryType.IN) {
			// Add count for global inbound entry node for global statistics.
			Constants.ENTRY_NODE.increaseThreadNum();
			Constants.ENTRY_NODE.addPassRequest(count);
		}

		// Handle pass event with registered entry callback handlers.
		for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
			handler.onPass(context, resourceWrapper, node, count, args);
		}
	} catch (PriorityWaitException ex) {
		node.increaseThreadNum();
		if (context.getCurEntry().getOriginNode() != null) {
			// Add count for origin node.
			context.getCurEntry().getOriginNode().increaseThreadNum();
		}

		if (resourceWrapper.getEntryType() == EntryType.IN) {
			// Add count for global inbound entry node for global statistics.
			Constants.ENTRY_NODE.increaseThreadNum();
		}
		// Handle pass event with registered entry callback handlers.
		for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
			handler.onPass(context, resourceWrapper, node, count, args);
		}
	} catch (BlockException e) {
		
		// Blocked, set block exception to current entry.
		context.getCurEntry().setBlockError(e);

		// 被限流,節點Block數加一
		// Add block count.
		node.increaseBlockQps(count);
		if (context.getCurEntry().getOriginNode() != null) {
			context.getCurEntry().getOriginNode().increaseBlockQps(count);
		}

		if (resourceWrapper.getEntryType() == EntryType.IN) {
			// Add count for global inbound entry node for global statistics.
			Constants.ENTRY_NODE.increaseBlockQps(count);
		}

		// Handle block event with registered entry callback handlers.
		for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
			handler.onBlocked(e, context, resourceWrapper, node, count, args);
		}

		throw e;
	} catch (Throwable e) {
		// Unexpected internal error, set error to current entry.
		context.getCurEntry().setError(e);
		throw e;
	}
}
           

DefaultNode#addPassRequest

public void addPassRequest(int count) {
	super.addPassRequest(count);
	this.clusterNode.addPassRequest(count);
}
           

StatisticNode#addPassRequest

public void addPassRequest(int count) {
	// 按照秒級次元統計
	rollingCounterInSecond.addPass(count);
	// 按照分鐘次元統計
	rollingCounterInMinute.addPass(count);
}
           

先來看下rollingCounterInSecond、rollingCounterInMinute的初始指派:

private transient volatile Metric rollingCounterInSecond 
             = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);
             
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
           

秒級次元的第一個入參SampleCountProperty.SAMPLE_COUNT的值是2,IntervalProperty.INTERVAL的值是1000,代表的意思是1000ms(即1s)被劃分成2個時間視窗。

ArrayMetric#addPass

public void addPass(int count) {
	// 擷取對應的滑動時間視窗
	WindowWrap<MetricBucket> wrap = data.currentWindow();
	// 在對應的滑動時間視窗上進行統計
	wrap.value().addPass(count);
}
           

LeapArray#currentWindow(long)

public WindowWrap<T> currentWindow(long timeMillis) {
	if (timeMillis < 0) {
		return null;
	}
	
	// 計算目前時間會落在一個采集間隔 ( LeapArray ) 中哪一個時間視窗中
	int idx = calculateTimeIdx(timeMillis);
	
	// 計算目前時間戳所在的時間視窗的開始時間,即要計算出 WindowWrap 中 windowStart 的值,
	// 其實就是要算出小于目前時間戳,并且是 windowLengthInMs 的整數倍最大的數字,Sentinel 給出是算法為 ( timeMillis - timeMillis % windowLengthInMs )。
	// Calculate current bucket start time.
	long windowStart = calculateWindowStart(timeMillis);
	
	// 死循環查找目前的時間視窗,這裡之所有需要循環,是因為可能多個線程都在擷取目前時間視窗。
	/*
	 * Get bucket item at given time from the array.
	 *
	 * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
	 * (2) Bucket is up-to-date, then just return the bucket.
	 * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
	 */
	while (true) {
		WindowWrap<T> old = array.get(idx);
		
		// 如果按照上面計算出來的下标在array中沒找到,則說明之前沒有做流控,需要建立一個bucket
		if (old == null) {
			/*
			 *     B0       B1      B2    NULL      B4
			 * ||_______|_______|_______|_______|_______||___
			 * 200     400     600     800     1000    1200  timestamp
			 *                             ^
			 *                          time=888
			 *            bucket is empty, so create new and update
			 *
			 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
			 * then try to update circular array via a CAS operation. Only one thread can
			 * succeed to update, while other threads yield its time slice.
			 */
			WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
			
			// 使用CAS 機制來更新 LeapArray 數組中的 元素,因為同一時間,可能有多個線程都在擷取目前時間視窗對象,
			// 但該時間視窗對象還未建立,這裡就是避免建立多個,導緻統計資料被覆寫,如果用 CAS 更新成功的線程,則傳回建立好的 WindowWrap ,
			// 若CAS 設定不成功的線程繼續執行循環
			if (array.compareAndSet(idx, null, window)) {
				// Successfully updated, return the created bucket.
				return window;
			} else {
				// Contention failed, the thread will yield its time slice to wait for bucket available.
				Thread.yield();
			}
		} else if (windowStart == old.windowStart()) {
	
			/*
			 *     B0       B1      B2     B3      B4
			 * ||_______|_______|_______|_______|_______||___
			 * 200     400     600     800     1000    1200  timestamp
			 *                             ^
			 *                          time=888
			 *            startTime of Bucket 3: 800, so it's up-to-date
			 *
			 * If current {@code windowStart} is equal to the start timestamp of old bucket,
			 * that means the time is within the bucket, so directly return the bucket.
			 */
			return old;
			
		// 如果原先存在的視窗開始時間小于目前時間戳計算出來的開始時間,則表示 bucket 已被棄用。則需要将開始時間重置到新時間戳對應的開始時間戳。
		} else if (windowStart > old.windowStart()) {
		
			/*
			 *   (old)
			 *             B0       B1      B2    NULL      B4
			 * |_______||_______|_______|_______|_______|_______||___
			 * ...    1200     1400    1600    1800    2000    2200  timestamp
			 *                              ^
			 *                           time=1676
			 *          startTime of Bucket 2: 400, deprecated, should be reset
			 *
			 * If the start timestamp of old bucket is behind provided time, that means
			 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
			 * Note that the reset and clean-up operations are hard to be atomic,
			 * so we need a update lock to guarantee the correctness of bucket update.
			 *
			 * The update lock is conditional (tiny scope) and will take effect only when
			 * bucket is deprecated, so in most cases it won't lead to performance loss.
			 */
			if (updateLock.tryLock()) {
				try {
					// Successfully get the update lock, now we reset the bucket.
					return resetWindowTo(old, windowStart);
				} finally {
					updateLock.unlock();
				}
			} else {
				// Contention failed, the thread will yield its time slice to wait for bucket available.
				Thread.yield();
			}
		} else if (windowStart < old.windowStart()) {
			// Should not go through here, as the provided time is already behind.
			return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
		}
	}
}
           

二、FlowSlot 流控

FlowSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
				  boolean prioritized, Object... args) throws Throwable {
	// 判斷是否觸發流控規則
	checkFlow(resourceWrapper, context, node, count, prioritized);
	// 繼續執行後續的ProcessorSlot
	fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
           

FlowRuleChecker#checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
	if (ruleProvider == null || resource == null) {
		return;
	}
	
	// 擷取Sentinel DashBoard 上配置的流控規則
	Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
	if (rules != null) {
		for (FlowRule rule : rules) {
		
			// 依次執行流控規則
			if (!canPassCheck(rule, context, node, count, prioritized)) {
				throw new FlowException(rule.getLimitApp(), rule);
			}
		}
	}
}
           

FlowRuleChecker#canPassCheck()

public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, 
				int acquireCount, boolean prioritized) {
    String limitApp = rule.getLimitApp(); 
    // 如果限流規則沒有配置針對來源,則直接預設通過,
    // 該值在配置時,預設為 default,即對所有調用發起方都生效
    if (limitApp == null) {    
        return true;
    }
    if (rule.isClusterMode()) {  // @2
        return passClusterCheck(rule, context, node, acquireCount, prioritized);  
    }
    return passLocalCheck(rule, context, node, acquireCount, prioritized);     
}
           

FlowRuleChecker#passLocalCheck

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
	// 流控模式:首先根據流控模式(strategy)選擇一個合适的 Node,如果為空,則直接傳回 true,表示放行。
	Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
	if (selectedNode == null) {
		return true;
	}
	// 流控效果:調用 FlowRule 内部持有的流量控制器來判斷是否符合流控規則,最終調用的是 TrafficShapingController canPass 方法。
	return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
           

上面的兩個方法分别對應sentinel dashboard的流控模式、流控效果

Spring Cloud Alibaba-Sentinel源碼閱讀(三)-流控原理

預設模式(快速失敗)DefaultController#canPass()

public boolean canPass(Node node, int acquireCount, boolean prioritized) {

	// 目前已消耗的令牌數量,即目前時間視窗内已建立的線程數量(FLOW_GRADE_THREAD) 或已認證的請求個數(FLOW_GRADE_QPS)
    int curCount = avgUsedTokens(node);     
	
	// 如果 已消耗的令牌數+目前請求的令牌數(正常是1) > 總令牌數,則需要根據是否有優先級進行不同的處理;
	// 否則直接傳回true,表示通過。
    if (curCount + acquireCount > count) {   
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {   
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
			
			// 嘗試搶占下一個滑動視窗的令牌,并傳回該時間視窗所剩餘的時間,如果擷取失敗,則傳回 OccupyTimeoutProperty.getOccupyTimeout() 值,
			// 該傳回值的作用就是目前申請資源的線程将 sleep(阻塞)的時間。
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);   
			
			// 如果 waitInMs 小于搶占的最大逾時時間,則在下一個時間視窗中增加對應令牌數,并且線程将sleep
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {             
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);                                                                                  
                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);  
            }
        }
        return false;     
    }
    return true;       
}
           

WarmUpController是基于漏桶算法,RateLimiterController是基于令牌桶算法

三、流控流程圖

Spring Cloud Alibaba-Sentinel源碼閱讀(三)-流控原理

繼續閱讀