天天看點

全網最全Sentinel資源名額資料統計實作源碼解析

作者:一個即将退役的碼農

節點選擇器:NodeSelectorSlot

NodeSelectorSlot 負責為資源的首次通路建立 DefaultNode,以及維護 Context.curNode 和調用樹。NodeSelectorSlot 被放在 ProcessorSlotChain 連結清單的第一個位置,這是因為後續的 ProcessorSlot 都需要依賴這個 ProcessorSlot。NodeSelectorSlot 源碼如下:

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    // Context 的 name -> 資源的 DefaultNode
    private volatile Map<String, DefaultNode> map = new HashMap<>(10);
    // 入口方法
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        // 使用 Context 的名稱作為 key 緩存資源的 DefaultNode
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    // 為資源建立 DefaultNode
                    node = new DefaultNode(resourceWrapper, null);
                    // 替換 map
                    HashMap<String, DefaultNode> cacheMap = new HashMap<>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // 綁定調用樹
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }
        // 替換 Context 的 curNode 為目前 DefaultNode
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    // 出口方法什麼也不做
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

           

如源碼所示,map 字段是一個非靜态字段,意味着每個 NodeSelectorSlot 都有一個 map。由于一個資源對應一個 ProcessorSlotChain,而一個 ProcessorSlotChain 隻建立一個 NodeSelectorSlot,并且 map 緩存 DefaultNode 使用的 key 并非資源 ID,而是 Context.name,是以 map 的作用是緩存針對同一資源為不同調用鍊路入口建立的 DefaultNode。

在 entry 方法中,首先根據 Context.name 從 map 擷取目前調用鍊路入口的資源 DefaultNode,如果資源第一次被通路,也就是資源的 ProcessorSlotChain 第一次被建立,那麼這個 map 是空的,就會加鎖為資源建立 DefaultNode,如果資源不是首次被通路,但卻首次作為目前調用鍊路(Context)的入口資源,也需要加鎖為資源建立一個 DefaultNode。可見,Sentinel 會為同一資源 ID 建立多少個 DefaultNode 取決于有多少個調用鍊使用其作為入口資源,直白點就是同一資源存在多少個 DefaultNode 取決于 Context.name 有多少種不同取值,這就是為什麼說一個資源可能有多個 DefaultNode 的原因。

為什麼這麼設計呢?

舉個例子,對同一支付接口,我們需要使用 Spring MVC 暴露給前端通路,同時也可能會使用 Dubbo 暴露給其它内部服務調用。Sentinel 的 Web MVC 擴充卡在調用鍊路入口建立名為“sentinel_spring_web_context”的 Context,與 Sentinel 的 Dubbo 擴充卡調用 ContextUtil#enter 方法建立的 Context 名稱不同。針對這種情況,我們可以實作隻限制 Spring MVC 進來的流量,也就是限制前端發起接口調用的 QPS、并行占用的線程數等。

NodeSelectorSlot#entry 方法最難以了解的就是實作綁定調用樹這行代碼:

((DefaultNode) context.getLastNode()).addChild(node);

           

這行代碼分兩種情況分析更容易了解,我們就以 Sentinel 提供的 Demo 為例進行分析。

一般情況

Sentinel 的 sentinel-demo 子產品下提供了多種使用場景的 Demo,我們選擇 sentinel-demo-spring-webmvc 這個 Demo 為例,該 Demo 下有一個 hello 接口,其代碼如下。

@RestController
public class WebMvcTestController {

    @GetMapping("/hello")
    public String apiHello() throws BlockException {
        doBusiness();
        return "Hello!";
    }
}

           

我們不需要添加任何規則,隻是為了調試 Sentinel 的源碼。将 demo 啟動起來後,在浏覽器通路“/hello”接口,在 NodeSelectorSlot#entry 方法的綁定調用樹這一行代碼下斷點,觀察此時 Context 的字段資訊。正常情況下我們可以看到如下圖所示的結果。

全網最全Sentinel資源名額資料統計實作源碼解析

從上圖中可以看出,此時的 Context.entranceNode 的子節點為空(childList 的大小為 0),并且目前 CtEntry 父、子節點都是 Null(curEntry 字段)。當綁定調用樹這一行代碼執行完成後,Context 的字段資訊如下圖所示:

全網最全Sentinel資源名額資料統計實作源碼解析

從上圖可以看出,NodeSelectorSlot 為目前資源建立的 DefaultNode 被添加到了 Context.entranceNode 的子節點。entranceNode 類型為 EntranceNode,在調用 ContextUtil#enter 方法時建立,在第一次建立名為“sentinel_spring_web_context”的 Context 時建立,相同名稱的 Context 都使用同一個 EntranceNode。并且該 EntranceNode 在建立時會被添加到 Constant.ROOT。

此時,Constant.ROOT、Context.entranceNode、目前通路資源的 DefaultNode 構造成的調用樹如下:

ROOT (machine-root)
                /
      EntranceNode (context name: sentinel_spring_web_context)
             /
DefaultNode (resource name: GET:/hello)

           

如果我們現在再通路 Demo 的其他接口,例如通路“/err”接口,那麼生成的調用樹就會變成如下:

ROOT (machine-root)
                            /
      EntranceNode (context name: sentinel_spring_web_context)
                    /                                \
DefaultNode (resource name: GET:/hello)     DefaultNode (resource name: GET:/err) 

           

Context.entranceNode 将會存儲 Web 項目的所有資源(接口)的 DefaultNode。

存在多次 SphU#entry 的情況

比如我們在一個服務中添加了 Sentinel 的 Web MVC 适配子產品的依賴,也添加了 Sentinel 的 OpenFeign 适配子產品的依賴,并且我們使用 OpenFeign 調用内部其他服務的接口,那麼就會存在一次調用鍊路上出現多次調用 SphU#entry 方法的情況。

首先 webmvc 擴充卡在接收用戶端請求時會調用一次 SphU#entry,在處理用戶端請求時可能需要使用 OpenFeign 調用其它服務的接口,那麼在發起接口調用時,Sentinel 的 OpenFeign 擴充卡也會調用一次 SphU#entry。

現在我們将 Demo 的 hello 接口修改一下,将 hello 接口調用的 doBusiness 方法也作為資源使用 Sentinel 保護起來,改造後的 hello 接口代碼如下:

@RestController
public class WebMvcTestController {

    @GetMapping("/hello")
    public String apiHello() throws BlockException {
        ContextUtil.enter("my_context");
        Entry entry = null;
        try {
            entry = SphU.entry("POST:http://wujiuye.com/hello2", EntryType.OUT);
            // ==== 這裡是被包裝的代碼 =====
            doBusiness();
            return "Hello!";
            // ==== end ===============
        } catch (Exception e) {
            if (!(e instanceof BlockException)) {
                Tracer.trace(e);
            }
            throw e;
        } finally {
            if (entry != null) {
                entry.exit(1);
            }
            ContextUtil.exit();
        }
    }
}

           

我們可将 doBusiness 方法看成是遠端調用,例如調用第三方的接口,接口名稱為“http://wujiuye.com/hello2”,使用 POST 方式調用,那麼我們可以使用“POST:http://wujiuye.com/hello2”作為資源名稱,并将流量類型設定為 OUT 類型。上下文名稱取名為"my_context"。

現在啟動 demo,使用浏覽器通路“/hello”接口。當代碼執行到 apiHello 方法時,在 NodeSelectorSlot#entry 方法的綁定調用樹這一行代碼下斷點。當綁定調用樹這行代碼執行完成後,Context 的字段資訊如下圖所示。

全網最全Sentinel資源名額資料統計實作源碼解析

如圖所示,Sentinel 并沒有建立名稱為 my_context 的 Context,還是使用應用接收到請求時建立名為“sentinel_spring_web_context”的 Context,是以處理浏覽器發送過來的請求的“GET:/hello”資源是本次調用鍊路的入口資源,Sentinel 在調用鍊路入口處建立 Context 之後不再建立新的 Context。

由于之前并沒有為名稱為“POST:http://wujiuye.com/hello2”的資源建立 ProcessorSlotChain,是以 SphU#entry 會為該資源建立一個 ProcessorSlotChain,也就會為該 ProcessorSlotChain 建立一個 NodeSelectorSlot。在執行到 NodeSelectorSlot#entry 方法時,就會為該資源建立一個 DefaultNode,而将該資源的 DefaultNode 綁定到節點樹後,該資源的 DefaultNode 就會成為“GET:/hello”資源的 DefaultNode 的子節點,調用樹如下。

ROOT (machine-root)
                    /
    EntranceNode (name: sentinel_spring_web_context)
                 /                       \
          DefaultNode (GET:/hello)   .........
               /
         DefaultNode  (POST:/hello2)

           

此時,目前調用鍊路上也已經存在兩個 CtEntry,這兩個 CtEntry 構造一個雙向連結清單,如下圖所示。

全網最全Sentinel資源名額資料統計實作源碼解析

雖然存在兩個 CtEntry,但此時 Context.curEntry 指向第二個 CtEntry,第二個 CtEntry 在 apiHello 方法中調用 SphU#entry 方法時建立,當執行完 doBusiness 方法後,調用目前 CtEntry#exit 方法,由該 CtEntry 将 Context.curEntry 還原為該 CtEntry 的父 CtEntry。這有點像入棧和出棧操作,例如棧幀在 Java 虛拟機棧的入棧和出棧,調用方法時方法的棧幀入棧,方法執行完成棧幀出棧。

NodeSelectorSlot#entry 方法我們還有一行代碼沒有分析,就是将目前建立的 DefaultNode 設定為 Context 的目前節點,代碼如下:

// 替換 Context.curNode 為目前 DefaultNode
context.setCurNode(node);

           

替換 Context.curNode 為目前資源 DefaultNode 這行代碼就是将目前建立的 DefaultNode 指派給目前 CtEntry.curNode。對着上圖了解就是,将資源“GET:/hello”的 DefaultNode 指派給第一個 CtEntry.curNode,将資源“POST:http://wujiuye.com/hello2”的 DefaultNode 指派給第二個 CtEntry.curNode。

要了解 Sentinel 構造 CtEntry 雙向連結清單的目的,首先我們需要了解調用 Context#getCurNode 方法擷取目前資源的 DefaultNode 可以做什麼。

Tracer#tracer 方法用于記錄異常。以異常名額資料統計為例,在發生非 Block 異常時,Tracer#tracer 需要從 Context 擷取目前資源的 DefaultNode,通知 DefaultNode 記錄異常,同時 DefaultNode 也會通知 ClusterNode 記錄記錄,如下代碼所示。

public class DefaultNode extends StatisticNode {
  ......
  @Override
    public void increaseExceptionQps(int count) {
        super.increaseExceptionQps(count);
        this.clusterNode.increaseExceptionQps(count);
    }
}

           

這個例子雖然簡單,但也足以說明 Sentinel 構造 CtEntry 雙向連結清單的目的。

ClusterNode 構造器:ClusterBuilderSlot

ClusterNode 出現的背景

在一個資源的 ProcessorSlotChain 中,NodeSelectorSlot 負責為資源建立 DefaultNode,這個 DefaultNode 僅限同名的 Context 使用。是以一個資源可能會存在多個 DefaultNode,那麼想要擷取一個資源的總的 QPS 就必須要周遊這些 DefaultNode。為了性能考慮,Sentinel 會為每個資源建立一個全局唯一的 ClusterNode,用于統計資源的全局并行占用線程數、QPS、異常總數等名額資料。

ClusterBuilderSlot

與 NodeSelectorSlot 的職責相似,ClusterBuilderSlot 的職責是為資源建立全局唯一的 ClusterNode,僅在資源第一次被通路時建立。ClusterBuilderSlot 還會将 ClusterNode 指派給 DefaultNode.clusterNode,由 DefaultNode 持有 ClusterNode,負責管理 ClusterNode 的名額資料統計。這點也是 ClusterBuilderSlot 在 ProcessorSlotChain 連結清單中必須排在 NodeSelectorSlot 之後的原因,即必須先有 DefaultNode,才能将 ClusterNode 交給 DefaultNode 管理。

ClusterBuilderSlot 的源碼比較多,本篇隻分析其實作 ProcessorSlot 接口的 entry 和 exit 方法。ClusterBuilderSlot 删減後的源碼如下。

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    // 資源 -> ClusterNode
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
    private static final Object lock = new Object();

    // 非靜态,一個資源對應一個 ProcessorSlotChain,是以一個資源共用一個 ClusterNode
    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
            throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // 建立 ClusterNode
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    // 添加到緩存
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);
                    clusterNodeMap = newMap;
                }
            }
        }
        // node 為 NodeSelectorSlot 傳遞過來的 DefaultNode
        node.setClusterNode(clusterNode);
        // 如果 origin 不為空,則為遠端建立一個 StatisticNode
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

           

ClusterBuilderSlot 使用一個 Map 緩存資源的 ClusterNode,并且用一個非靜态的字段維護目前資源的 ClusterNode。因為一個資源隻會建立一個 ProcessorSlotChain,意味着 ClusterBuilderSlot 也隻會建立一個,那麼讓 ClusterBuilderSlot 持有該資源的 ClusterNode 就可以省去每次都從 Map 中擷取的步驟,這當然也是 Sentinel 為性能做出的努力。

ClusterBuilderSlot#entry 方法的 node 參數由前一個 ProcessorSlot 傳遞過來,也就是 NodeSelectorSlot 傳遞過來的 DefaultNode。ClusterBuilderSlot 将 ClusterNode 指派給 DefaultNode.clusterNode,那麼後續的 ProcessorSlot 就能從 node 參數中取得 ClusterNode。DefaultNode 與 ClusterNode 的關系如下圖所示。

全網最全Sentinel資源名額資料統計實作源碼解析

ClusterNode 有一個 Map 類型的字段用來緩存 origin 與 StatisticNode 的映射,代碼如下:

public class ClusterNode extends StatisticNode {
    private final String name;
    private final int resourceType;
    private Map<String, StatisticNode> originCountMap = new HashMap<>();
}

           

如果上遊服務在調用目前服務的接口傳遞 origin 字段過來,例如可在 http 請求頭添加“S-user”參數,或者 Dubbo rpc 調用在請求參數清單加上“application”參數,那麼 ClusterBuilderSlot 就會為 ClusterNode 建立一個 StatisticNode,用來統計目前資源被遠端服務調用的名額資料。

例如,當 origin 表示來源應用的名稱時,對應的 StatisticNode 統計的就是針對該調用來源的名額資料,可用來檢視哪個服務通路這個接口最頻繁,由此可實作按調用來源限流。

ClusterNode#getOrCreateOriginNode 方法源碼如下:

public Node getOrCreateOriginNode(String origin) {
        StatisticNode statisticNode = originCountMap.get(origin);
        if (statisticNode == null) {
            try {
                lock.lock();
                statisticNode = originCountMap.get(origin);
                if (statisticNode == null) {
                    statisticNode = new StatisticNode();
                    // 這幾行代碼在 Sentinel 中随處可見
                    HashMap<String, StatisticNode> newMap = new HashMap<>(originCountMap.size() + 1);
                    newMap.putAll(originCountMap);
                    newMap.put(origin, statisticNode);
                    originCountMap = newMap;
                }
            } finally {
                lock.unlock();
            }
        }
        return statisticNode;
    }

           

為了便于使用,ClusterBuilderSlot 會将調用來源(origin)的 StatisticNode 指派給 Context.curEntry.originNode,後續的 ProcessorSlot 可調用 Context#getCurEntry#getOriginNode 方法擷取該 StatisticNode。這裡我們可以得出一個結論,如果我們自定義的 ProcessorSlot 需要用到調用來源的 StatisticNode,那麼在建構 ProcessorSlotChain 時,我們必須要将這個自定義 ProcessorSlot 放在 ClusterBuilderSlot 之後。

資源名額資料統計:StatisticSlot

StatisticSlot 才是實作資源各項名額資料統計的 ProcessorSlot,它與 NodeSelectorSlot、ClusterBuilderSlot 組成了資源名額資料統計流水線,分工明确。

首先 NodeSelectorSlot 為資源建立 DefaultNode,将 DefaultNode 向下傳遞,ClusterBuilderSlot 負責給資源的 DefaultNode 加工,添加 ClusterNode 這個零部件,再将 DefaultNode 向下傳遞給 StatisticSlot,如下圖所示:

全網最全Sentinel資源名額資料統計實作源碼解析

StatisticSlot 在統計名額資料之前會先調用後續的 ProcessorSlot,根據後續 ProcessorSlot 判斷是否需要拒絕該請求的結果決定記錄哪些名額資料,這也是為什麼 Sentinel 設計的責任鍊需要由前一個 ProcessorSlot 在 entry 或者 exit 方法中調用 fireEntry 或者 fireExit 完成調用下一個 ProcessorSlot 的 entry 或 exit 方法,而不是使用 for 循環周遊調用 ProcessorSlot 的原因。每個 ProcessorSlot 都有權決定是先等後續的 ProcessorSlot 執行完成再做自己的事情,還是先完成自己的事情再讓後續 ProcessorSlot 執行,與流水線有所差別。

StatisticSlot 源碼架構如下:

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
           // .....
        } catch (PriorityWaitException ex) {
            // .....
        } catch (BlockException e) {
            // ....
            throw e;
        } catch (Throwable e) {
            // .....
            throw e;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();
        // ....
        fireExit(context, resourceWrapper, count);
    }
}

           
  • entry:先調用 fireEntry 方法完成調用後續的 ProcessorSlot#entry 方法,根據後續的 ProcessorSlot 是否抛出 BlockException 決定記錄哪些名額資料,并将資源并行占用的線程數加 1。
  • exit:若無任何異常,則記錄響應成功、請求執行耗時,将資源并行占用的線程數減 1。

entry 方法

第一種情況:當後續的 ProcessorSlot 未抛出任何異常時,表示不需要拒絕該請求,放行目前請求。

當請求可正常通過時,需要将目前資源并行占用的線程數增加 1、目前時間視窗被放行的請求總數加 1,代碼如下:

// Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

           

如果調用來源不為空,也将調用來源的 StatisticNode 的目前并行占用線程數加 1、目前時間視窗被放行的請求數加 1,代碼如下:

if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

           

如果流量類型為 IN,則将資源全局唯一的 ClusterNode 的并行占用線程數、目前時間視窗被放行的請求數都增加 1,代碼如下:

if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

           

回調所有 ProcessorSlotEntryCallback#onPass 方法,代碼如下:

// Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }

           

可調用 StatisticSlotCallbackRegistry#addEntryCallback 靜态方法注冊 ProcessorSlotEntryCallback,ProcessorSlotEntryCallback 接口的定義如下:

public interface ProcessorSlotEntryCallback<T> {
    void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;
    void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}

           
  • onPass:該方法在請求被放行時被回調執行。
  • onBlocked:該方法在請求被拒絕時被回調執行。

第二種情況:捕獲到類型為 PriorityWaitException 的異常。

這是特殊情況,在需要對請求限流時,隻有使用預設流量效果控制器才可能會抛出 PriorityWaitException 異常,這部分内容将在分析 FlowSlot 的實作源碼時再作分析。

當捕獲到 PriorityWaitException 異常時,說明目前請求已經被休眠了一會了,但請求還是允許通過的,隻是不需要為 DefaultNode 記錄這個請求的名額資料了,隻自增目前資源并行占用的線程數,同時,DefaultNode 也會為 ClusterNode 自增并行占用的線程數。最後也會回調所有 ProcessorSlotEntryCallback#onPass 方法。這部分源碼如下。

node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }

           

第三種情況:捕獲到 BlockException 異常,BlockException 異常隻在需要拒絕請求時抛出。

當捕獲到 BlockException 異常時,将異常記錄到調用鍊路上下文的目前 Entry(StatisticSlot 的 exit 方法會用到),然後調用 DefaultNode#increaseBlockQps 方法記錄目前請求被拒絕,将目前時間視窗的 block qps 這項名額資料的值加 1。如果調用來源不為空,讓調用來源的 StatisticsNode 也記錄目前請求被拒絕;如果流量類型為 IN,則讓用于統計所有資源名額資料的 ClusterNode 也記錄目前請求被拒絕。這部分的源碼如下:

// Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;

           

StatisticSlot 捕獲 BlockException 異常隻是為了收集被拒絕的請求,BlockException 異常還是會往上抛出。抛出異常的目的是為了攔住請求,讓入口處能夠執行到 catch 代碼塊完成請求被拒絕後的服務降級處理。

第四種情況:捕獲到其它異常。

其它異常并非指業務異常,因為此時業務代碼還未執行,而業務代碼抛出的異常是通過調用 Tracer#trace 方法記錄的。

當捕獲到非 BlockException 異常時,除 PriorityWaitException 異常外,其它類型的異常都同樣處理。讓 DefaultNode 記錄目前請求異常,将目前時間視窗的 exception qps 這項名額資料的值加 1。調用來源的 StatisticsNode、用于統計所有資源名額資料的 ClusterNode 也記錄下這個異常。這部分源碼如下:

// Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;

           

exit 方法

exit 方法被調用時,要麼請求被拒絕,要麼請求被放行并且已經執行完成,是以 exit 方法需要知道目前請求是否正常執行完成,這正是 StatisticSlot 在捕獲異常時将異常記錄到目前 Entry 的原因,exit 方法中通過 Context 可擷取到目前 CtEntry,從目前 CtEntry 可擷取 entry 方法中寫入的異常。

exit 方法源碼如下(有删減):

@Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();
        if (context.getCurEntry().getError() == null) {
            // 計算耗時
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            // 記錄執行耗時與成功總數
            node.addRtAndSuccess(rt, count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
            }
            // 自減目前資源占用的線程數
            node.decreaseThreadNum();
            // origin 不為空
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }
            // 流量類型為 in 時
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        }
        // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }
        fireExit(context, resourceWrapper, count);
    }

           

exit 方法中通過 Context 可擷取目前資源的 DefaultNode,如果 entry 方法中未出現異常,那麼說明請求是正常完成的,在請求正常完成情況下需要記錄請求的執行耗時以及響應是否成功,可将目前時間減去調用鍊路上目前 Entry 的建立時間作為請求的執行耗時。

資源名額資料的記錄過程

ClusterNode 才是一個資源全局的名額資料統計節點,但我們并未在 StatisticSlot#entry 方法與 exit 方法中看到其被使用。因為 ClusterNode 被 ClusterBuilderSlot 交給了 DefaultNode 掌管,在 DefaultNode 的相關名額資料收集方法被調用時,ClusterNode 的對應方法也會被調用,如下代碼所示:

public class DefaultNode extends StatisticNode {
   ......
    private ClusterNode clusterNode;

    @Override
    public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }
}

           

記錄某項名額資料指的是:針對目前請求,記錄目前請求的某項名額資料,例如請求被放行、請求被拒絕、請求的執行耗時等。

假設目前請求被成功處理,StatisticSlot 會調用 DefaultNode#addRtAndSuccess 方法記錄請求處理成功、并且記錄處理請求的耗時,DefaultNode 先調用父類的 addRtAndSuccess 方法,然後 DefaultNode 會調用 ClusterNode#addRtAndSuccess 方法。ClusterNode 與 DefaultNode 都是 StatisticNode 的子類,StatisticNode#addRtAndSuccess 方法源碼如下:

@Override
    public void addRtAndSuccess(long rt, int successCount) {
        // 秒級滑動視窗
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);
        // 分鐘級的滑動視窗
        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }

           

rollingCounterInSecond 是一個秒級的滑動視窗,rollingCounterInMinute 是一個分鐘級的滑動視窗,類型為 ArrayMetric。分鐘級的滑動視窗一共有 60 個 MetricBucket,每個 MetricBucket 都被 WindowWrap 包裝,每個 MetricBucket 統計一秒鐘内的各項名額資料,如下圖所示:

全網最全Sentinel資源名額資料統計實作源碼解析

當調用 rollingCounterInMinute#addSuccess 方法時,由 ArrayMetric 根據目前時間戳擷取目前時間視窗的 MetricBucket,再調用 MetricBucket#addSuccess 方法将 success 這項名額的值加上方法參數傳遞進來的值(一般是 1)。MetricBucket 使用 LongAdder 記錄各項名額資料的值。

Sentinel 在 MetricEvent 枚舉類中定義了 Sentinel 會收集哪些名額資料,MetricEvent 枚舉類的源碼如下:

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

           
  • pass 名額:請求被放行的總數
  • block:請求被拒絕的總數
  • exception:請求處理異常的總數
  • success:請求被處理成功的總數
  • rt:被處理成功的請求的總耗時
  • occupied_pass:預通過總數(前一個時間視窗使用了目前時間視窗的 passQps)

其它的名額資料都可通過以上這些名額資料計算得出,例如,平均耗時可根據總耗時除以成功總數計算得出。

資源名額資料統計總結

  • 一個調用鍊路上隻會建立一個 Context,在調用鍊路的入口建立(一個調用鍊路上第一個被 Sentinel 保護的資源)。
  • 一個 Context 名稱隻建立一個 EntranceNode,也是在調用鍊路的入口建立,調用 Context#enter 方法時建立。
  • 與方法調用的入棧出棧一樣,一個線程上調用多少次 SphU#entry 方法就會建立多少個 CtEntry,前一個 CtEntry 作為目前 CtEntry 的父節點,目前 CtEntry 作為前一個 CtEntry 的子節點,構成一個雙向連結清單。Context.curEntry 儲存的是目前的 CtEntry,在調用目前的 CtEntry#exit 方法時,由目前 CtEntry 将 Context.curEntry 還原為目前 CtEntry 的父節點 CtEntry。
  • 一個調用鍊路上,如果多次調用 SphU#entry 方法傳入的資源名稱都相同,那麼隻會建立一個 DefaultNode,如果資源名稱不同,會為每個資源名稱建立一個 DefaultNode,目前 DefaultNode 會作為調用鍊路上的前一個 DefaultNode 的子節點。
  • 一個資源有且隻有一個 ProcessorSlotChain,一個資源有且隻有一個 ClusterNode。
  • 一個 ClusterNode 負責統計一個資源的全局名額資料。
  • StatisticSlot 負責記錄請求是否被放行、請求是否被拒絕、請求是否處理異常、處理請求的耗時等名額資料,在 StatisticSlot 調用 DefaultNode 用于記錄某項名額資料的方法時,DefaultNode 也會調用 ClusterNode 的相對應方法,完成兩份名額資料的收集。
  • DefaultNode 統計目前資源的各項名額資料的次元是同一個 Context(名稱相同),而 ClusterNode 統計目前資源各項名額資料的次元是全局。

繼續閱讀