天天看點

《一起學sentinel》六、Slot的子類及實作之FlowSlot和DegradeSlot

一、概述

在 Sentinel 裡面,所有的資源都對應一個資源名稱(

resourceName

),每次資源調用都會建立一個

Entry

對象。Entry 可以通過對主流架構的适配自動建立,也可以通過注解的方式或調用

SphU

API 顯式建立。Entry 建立的時候,同時也會建立一系列功能插槽(slot chain),這些插槽有不同的職責,例如:

  • NodeSelectorSlot

    負責收集資源的路徑,并将這些資源的調用路徑,以樹狀結構存儲起來,用于根據調用路徑來限流降級;
  • ClusterBuilderSlot

    則用于存儲資源的統計資訊以及調用者資訊,例如該資源的 RT, QPS, thread count 等等,這些資訊将用作為多元度限流,降級的依據;
  • LogSlot

    則用于記錄用于記錄塊異常,為故障排除提供具體的日志
  • StatisticSlot

    則用于記錄、統計不同緯度的 runtime 名額監控資訊;
  • AuthoritySlot

    則根據配置的黑白名單和調用來源資訊,來做黑白名單控制;
  • SystemSlot

    則通過系統的狀态,例如 load1 等,來控制總的入口流量;
  • FlowSlot

    則用于根據預設的限流規則以及前面 slot 統計的狀态,來進行流量控制;
  • DegradeSlot

    則通過統計資訊以及預設的規則,來做熔斷降級;

下面是關系結構圖

《一起學sentinel》六、Slot的子類及實作之FlowSlot和DegradeSlot

二、FlowSlot分析

1.FlowSlot介紹

官方文檔是這樣描述FlowSlot的:

流量控制(flow control),其原理是監控應用流量的 QPS 或并發線程數等名額,當達到指定的門檻值時對流量進行控制,以避免被瞬時的流量高峰沖垮,進而保障應用的高可用性。

FlowSlot

會根據預設的規則,結合前面

NodeSelectorSlot

ClusterBuilderSlot

StatisticSlot

統計出來的實時資訊進行流量控制。

限流的直接表現是在執行

Entry nodeA = SphU.entry(resourceName)

的時候抛出

FlowException

異常。

FlowException

BlockException

的子類,您可以捕捉

BlockException

來自定義被限流之後的處理邏輯。

同一個資源可以建立多條限流規則。

FlowSlot

會對該資源的所有限流規則依次周遊,直到有規則觸發限流或者所有規則周遊完畢。

一條限流規則主要由下面幾個因素組成,我們可以組合這些元素來實作不同的限流效果:

  • resource

    :資源名,即限流規則的作用對象
  • count

    : 限流門檻值
  • grade

    : 限流門檻值類型(QPS 或并發線程數)
  • limitApp

    : 流控針對的調用來源,若為

    default

    則不區分調用來源
  • strategy

    : 調用關系限流政策
  • controlBehavior

    : 流量控制效果(直接拒絕、Warm Up、勻速排隊)
    這個是預設行為,超出的請求會被拒絕。并抛出FlowException。
    - ```java
    服務升溫Warmup ({@code RuleConstant.CONTROL_BEHAVIOR_WARM_UP})
    如果系統的負載已經低了一段時間,和大量的請求到來時,系統可能無法處理所有這些請求。
    但是,如果我們穩定地增加傳入請求,系統就會升溫,最終能夠處理所有的請求。
    此預熱期可通過在流規則中設定字段{@code warmupperiods}來配置。           

    這個政策嚴格控制請求之間的間隔。

    換句話說,它允許請求以穩定、統一的速率通過。

    該政策是leaky bucket的實作。

    它用于以穩定的速率處理請求,經常用于突發流量(例如消息處理)。

    我們可以根據以下指令擷取到樣例圖
               
    //指令:

curl

http://localhost:8719/tree

//樣例圖:

idx id thread pass blocked success total aRt 1m-pass 1m-block 1m-all e

2 abc647 0 460 46 46 1 27 630 276 897 0

其中:

- `thread`: 代表目前處理該資源的**并發**數;
- `pass`: 代表一**秒**内到來到的**請求**;
- `blocked`: 代表一**秒**内被流量**控制**的請求數量;
- `success`: 代表一**秒**内**成功**處理完的請求;
- `total`: 代表到一**秒**内到來的請求以及被**阻止**的請求**總和**;
- RT: 代表一秒内該資源的**平均響應時間**;
- `1m-pass`: 則是一**分鐘**内到來的**請求**;
- `1m-block`: 則是一**分鐘**内被**阻止**的請求;
- `1m-all`: 則是一**分鐘**内到來的請求和**被阻止**的請求的**總和**;
- `(e)exception`: 則是一秒内業務本身**異常**的**總和**。

#### 2.源碼解讀
           

@Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {

checkFlow(resourceWrapper, context, node, count, prioritized);

fireEntry(context, resourceWrapper, node, count, prioritized, args);           

}

public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

fireExit(context, resourceWrapper, count, args);           
1.在`entry`階段,執行了一個校驗方法.
           

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);           
2.我們可以看到這裡有一個多出來的關鍵參數“`ruleProvider`”,我們看看這個多出來參數的實作。
           

private final Function> ruleProvider = new Function>() {

@Override
public Collection<FlowRule> apply(String resource) {
    // Flow rule map should not be null.
    Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
    return flowRules.get(resource);
}           

};

3.首先會在全局的`FlowRuleManager`中擷取全局的`FlowRuleMap`,然後根據我們的唯一判斷準則“`resource`”擷取對應的`FlowRuleList`。

4.接下來我們看看`checkFlow`方法。
           

public void checkFlow(Function> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

if (ruleProvider == null || resource == null) {
    return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
    for (FlowRule rule : rules) {
        if (!canPassCheck(rule, context, node, count, prioritized)) {
            throw new FlowException(rule.getLimitApp(), rule);
        }
    }
}           
5.首先對`resource`和`ruleList`進行了判斷,如果為空着直接跳過校驗。接着取出`ruleList`,分别判斷每一個判斷是否滿足,如果不滿足, `throw new FlowException(rule.getLimitApp(), rule)`;

 
           

public boolean canPassCheck(/@NonNull/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {

String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}

if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}

return passLocalCheck(rule, context, node, acquireCount, prioritized);
}           
6.我們進入内層方法可以發現,這裡區分了叢集模式和本地模式,就算選擇了叢集模式後續代碼中也會重新進行叢集模式的校驗,如果校驗失敗則會降級退回到本地模式。
           

static Node selectNodeByRequesterAndStrategy(/@NonNull/ FlowRule rule, Context context, DefaultNode node) {

// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();

if (limitApp.equals(origin) && filterOrigin(origin)) {
    if (strategy == RuleConstant.STRATEGY_DIRECT) {
        // Matches limit origin, return origin statistic node.
        return context.getOriginNode();
    }

    return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
    if (strategy == RuleConstant.STRATEGY_DIRECT) {
        // Return the cluster node.
        return node.getClusterNode();
    }

    return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
           && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
    if (strategy == RuleConstant.STRATEGY_DIRECT) {
        return context.getOriginNode();
    }

    return selectReferenceNode(rule, context, node);
}

return null;           
7.現在我們終于進入到了`strategy`的邏輯,這裡主要邏輯是判斷在不同的**limitApp**下,如指定類型,叢集,其他以及**STRATEGY_DIRECT**流程,如果全部比對失敗後會進入到`selectReferenceNode`,這裡包含了**STRATEGY_RELATE**流程以及**STRATEGY_CHAIN**流程。

接下來則到了最後一塊:controlBehavior

接下來就是根據資料與流量控制規則進行判斷,是否通過。

![image.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6119fd35ce3a4d3c9538a16bc7da6ff2~tplv-k3u1fbpfcp-zoom-1.image)

8.這個控制器有四個實作類,對應了`flow`的最後一個關鍵因子“**controlBehavior**”

* `DefaultController`預設的節流控制器(立即拒絕政策`Immediately`)。
* `RateLimiterController`穩定勻速的令牌桶方式(勻速排隊`Uniform` `Rate`)
* `WarmUpController`(服務升溫`Warmup`)
* `WarmUpRateLimiterController`(服務升溫+令牌桶`Warmup+RateLimiter`)

作為`sentinel` 的核心限流控制器,就和我們使用的方式一樣,預先設定了大量的對應具體資源的規則,規則會在初始化時被注冊為一個`map`,這裡我們可以看到`sentinel`使用了`CopyOnWrite`的思想去操作`flow`的`map`。進行`grade`、`strategy`、`controlBehavior`的多元度組合限流後,完整的實作了限流的功能。





## 三、DegradeSlot分析

#### 1.DegradeSlot介紹

官方文檔是這樣描述`DegradeSlot`的:緻力于斷路器。個人認為這個是`sentinel`比起一般的網關,最具差異的地方,既有豐富的限流,又提供了熔斷的能力。

#### 2.源碼解讀
           

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {

performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);           

public void exit(Context context, ResourceWrapper r, int count, Object... args) {

Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
    fireExit(context, r, count, args);
    return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
    fireExit(context, r, count, args);
    return;
}

if (curEntry.getBlockError() == null) {
    // passed request
    for (CircuitBreaker circuitBreaker : circuitBreakers) {
        circuitBreaker.onRequestComplete(context);
    }
}

fireExit(context, r, count, args);           
1.我們可以看到,這裡的entry裡面調用了私有的performChecking方法,和flow一樣,我們先看看CircuitBreaker的對象結構。
           

public interface CircuitBreaker {

/**
   *擷取相關的斷路規則。
 */
DegradeRule getRule();

/**
 * 僅當調用時該調用可用時,才擷取該調用的權限。
 *
 * @param context 目前調用的上下文
 * @return true 如果獲得了權限,則使用return false
 */
boolean tryPass(Context context);

/**
 * 擷取斷路器的通過狀态。
 */
State currentState();

/**
 * 用上下文記錄一個已完成的請求,并對斷路器進行狀态轉換
 */
void onRequestComplete(Context context);

/**
 * Circuit breaker state.
 */
enum State {
    /**
     * 在{@code OPEN}狀态下,所有請求都将被拒絕,直到下一個恢複時間點。
     */
    OPEN,
    /**
     *在{@code HALF_OPEN}狀态下,斷路器允許“探測”調用。
      *如果調用異常,根據政策(例如,它是緩慢的),斷路器
     *将重新轉換為{@code OPEN}狀态,等待下一個恢複時間點;
     *否則,該資源将被視為“恢複”和斷路器
     *将停止切斷請求并轉換為{@code CLOSED}狀态。
     */
    HALF_OPEN,
    /**
     *在{@code CLOSED}狀态中,允許所有請求。當目前路徑成本超過門檻值時,
     *斷路器将轉換為{@code OPEN}狀态。
     */
    CLOSED
}           
整段都在維護這個CircuitBreaker的state,我們可以看到這裡不再是resource,而是context,這裡的狀态決定了請求是否能夠通過,如果在這個最低成的slot就被攔截并拒絕,那麼可以了解為是不需要再次限流的(雖然會統計數量)
           

void performChecking(Context context, ResourceWrapper r) throws BlockException {

List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
    return;
}
for (CircuitBreaker cb : circuitBreakers) {
    if (!cb.tryPass(context)) {
        throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
    }
}           
2.performChecking主要的邏輯就是從目前DegradeRuleManager中擷取resource對應的熔斷規則,如果需要進行熔斷則`throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule())`;





## 四、小結

本期我們講述了Slot的子類`FlowSlot`和`DegradeSlot`的基本實作原理。

現在建立我們的知識樹



#### 執行個體化DefaultNode和ClusterNode,建立結構樹

------

建立上下文時,首先會在`NodeSelectorSlot`中判斷是否有`DefaultNode`。

如果沒有則新增一個基于`resource`的`DefaultNode`,然後執行下一個`slot`。

下一個`slot`是`ClusterBuilderSlot`,`ClusterBuilderSlot`會判斷是否有對應的`ClusterNode`,如果沒有則新增一個基于resource的`ClusterNode`并繼續下一個流程(`slot`)。

總結來說,這個兩個`slot`奠定了一個基于`resource`進行全局控制的基調。



#### 進行資訊收集

------

`LogSlot`在`DefaultNode`和`ClusterNode`初始化後,作為業務執行個體子產品的分界點,收集全局異常并處理。

`StatisticSlot`作為全局統計的執行個體,依托于`ClusterNode`,将全局的`RT`, `QPS`, `thread` `count` 等等資訊存放在`clusterNodeMap`裡面。



#### 進行權限校驗及系統級限流

---

在樹結構和資訊收集的slot建立完畢後,開始業務邏輯的實作,首先實作的就是AuthoritySlot的黑白名單能力,依托sentinel的resource的定義,我們很簡單就可以拿到關于resource的authorityRules,将對應的rules取出後,以此進行黑、白名單判斷,也可以了解為一種權限級别的限流措施。

SystemSlot則是全統計的全局限流,從調用點origins級别的配置中讀取了配置好的限流措施,在下一個slot實作前完成了所有的判斷,如qps,線程數,成功通路數,RT,CPU狀态。如果出現異常,則throws BlockException,交給之前的slot去處理相應邏輯。到這裡,一個基礎的限流架構已經基本實作。



#### 進行限流和熔斷

---

當所有的配置項已經配置完畢,權限級别和系統級别的限流做完,現在輪到了最後的兩個slot。

flowslot和DegradeSlot分别對應了我們配置的限流flow和配置的熔斷機制。

到這裡,一個成熟的分布式網關已經完成,我們的sentinel的完整功能已經叙述完畢。