天天看點

Sentinel黑白名單限流與熱點參數限流源碼分享

作者:一個即将退役的碼農

黑白名單限流

黑白名單過濾是使用最為廣泛的一種過濾規則,例如,用于實作接口安全的 IP 黑白名單規則過濾,用于防騷擾的短信、來電攔截黑白名單過濾。是以 Sentinel 中的黑白名單限流并不難了解,如果配置了黑名單,且請求來源存在黑名單中,則攔截(拒絕)請求,如果配置了白名單,且請求來源存在白名單中則放行。Sentinel 不支援一個黑白名單規則同時配置黑名單和白名單,是以不存優先級的問題。

黑白名單過濾功能更像是一種授權機制,它簡單的将權限分為有權限和無權限兩種情況,如果支援沖突,可使用優先級政策解決沖突問題。Sentinel 把黑白名作為授權政策,實作黑白名單限流即實作授權限流。Sentinel 在命名上也是使用 Authority,而非 BlackWhiteList。

一些關鍵類說明:

  • AuthoritySlot:實作黑白名稱授權功能的切入點(ProcessorSlot)
  • AuthorityRule:授權規則類
  • AuthorityRuleChecker:授權檢測類
  • AuthorityRuleManager:授權規則管理者,提供 loadRuls API
  • AuthorityException:授權檢測異常,繼承 BlockException

AuthorityRule

授權規則(AuthorityRule)是 Sentinel 中最易于了解的一種規則,AuthorityRule 的配置項如下:

public class AuthorityRule extends AbstractRule {
    private int strategy = RuleConstant.AUTHORITY_WHITE;
}

           
  • resource:資源名稱,從父類繼承而來。
  • limitApp:限制的來源名稱,在 AuthorityRule 中可配置多個,使用‘,’号分隔。
  • strategy:限流政策,白名單:AUTHORITY_WHITE,黑名單:AUTHORITY_BLACK。

當 strategy 配置為 AUTHORITY_WHITE 時,limitApp 即為白名單;當 strategy 配置為 AUTHORITY_BLACK 時,limitApp 即為黑明單。例如:

AuthorityRule rule = new AuthorityRule();
// 資源名稱
rule.setResource("GET:/hello");
// 白名單政策
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
// 白名單
rule.setLimitApp("serviceA,serviceC");
AuthorityRuleManager.loadRules(Collections.singletonList(rule));

           

上述規則用于限制資源 "GET:/hello" 隻允許服務 A 和服務 C 通路。

AuthoritySlot

在使用預設的 SlotChainBuilder 情況下,AuthoritySlot 被放在 SystemSlot、FlowSlot、DegradeSlot 的前面,其優先級更高。

原因之一是授權限流不需要使用統計的名額資料,另一個原因則是提升性能,在未授權的情況下沒必要判斷是否需要熔斷、系統負載能否接住這個請求、QPS 是否過高等,這與使用者授權功能是一樣的道理,未登陸無需判斷是否有權限通路某個資源。

AuthoritySlot 的實作源碼如下:

public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        // (1)
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
        if (authorityRules == null) {
            return;
        }
        // (2)
        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }
        // (3)
        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
}

           
  • 從 AuthorityRuleManager 擷取目前配置的所有授權規則;
  • 擷取為目前資源配置的所有授權規則;
  • 周遊授權規則,調用 AuthorityRuleChecker#passCheck 方法判斷是否拒絕目前請求,是則抛出 AuthorityException 異常。

AuthorityRuleChecker

AuthorityRuleChecker 負責實作黑白名單的過濾邏輯,其 passCheck 方法源碼如下:

static boolean passCheck(AuthorityRule rule, Context context) {
        // 擷取來源
        String requester = context.getOrigin();
        // 來源為空,或者來源等于規則配置的 limitApp 則攔截請求
        if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
            return true;
        }
        // 字元串查找,這一步起到快速過濾的作用,提升性能
        int pos = rule.getLimitApp().indexOf(requester);
        boolean contain = pos > -1;
        // 存在才精确比對
        if (contain) {
            boolean exactlyMatch = false;
            // 分隔數組
            String[] appArray = rule.getLimitApp().split(",");
            for (String app : appArray) {
                if (requester.equals(app)) {
                    // 标志設定為 true
                    exactlyMatch = true;
                    break;
                }
            }
            contain = exactlyMatch;
        }
        // 政策
        int strategy = rule.getStrategy();
        // 如果是黑名單,且來源存在規則配置的黑名單中
        if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
            return false;
        }
        // 如果是白名單,且來源不存在規則配置的白名單中
        if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
            return false;
        }
        return true;
    }

           

整個方法都比較簡單,首先是從目前 Context 擷取調用來源的名稱,隻有在調用來源不為空且規則配置了黑名單或者白名單的情況下,才會走黑白名單的過濾邏輯,這也說明,要實作黑白名單限流的前提是,每個服務消費端在發起請求時都必須要攜帶自身服務的名稱,這取決于 Sentinel 主流架構擴充卡;其次,Sentinel 通過使用 indexOf 先簡單比對一次黑名單或白名單,再切割黑名單或白名單數組實作精确比對,這有助于提升性能;如果目前請求來源存在名單中,則根據政策判斷這份名稱是黑名單還是白名單,再決定是否需要拒絕請求。

熱點參數限流

熱點參數限流并非在 Sentinel 的 core 子產品中實作的,但也是非常實用的一種限流方式。并且,Sentinel 支援 API Gateway 網關限流也是基于參數限流實作的,了解熱點參數限流的實作原理,也有助于我們更好地了解網關限流。

參數限流,即根據方法調用傳遞的參數實作限流,又或者說是根據接口的請求參數限流;熱點參數限流,即針對通路頻繁的參數限流。

例如,都是調用一個下單接口,但購買的商品不同,比如主播帶貨的商品下單流量較大,而一般商品購買量很少,同時因為商品數量有限,不太可能每個下單請求都能購買成功,如果能實作根據用戶端請求傳遞的商品 ID 實作限流,将流量控制在商品的庫存總量左右,并且使用 QPS 限流等兜底,這種有針對性的限流将接口通過的有效流量最大化。

熱點參數限流功能在 Sentinel 源碼的擴充功能子產品為 sentinel-extension,子子產品為 sentinel-parameter-flow-control。

基于滑動視窗的熱點參數名額資料統計

熱點參數限流使用的名額資料不再是 core 子產品中統計的名額資料,而是重新實作了一套名額資料統計功能,依舊是基于滑動視窗。

  • ParamMapBucket:實作參數名額資料統計的 Bucket,用于統計某個參數對應不同取值的被限流總數、被放行的總數。
  • HotParameterLeapArray:實作滑動視窗,持有 WindowWrap 數組,WindowWrap 包裝 ParamMapBucket。

與 core 子產品的 MetricBucket 實作不同,MetricBucket 隻統計每個名額的數值,而 ParamMapBucket 需要統計每個名額、參數的每種取值的數值,MetricBucket 更像是 Redis 中的 String 結構,而 ParamMapBucket 更像 Redis 中的 Hash 結構。

ParamMapBucket 的源碼如下:

public class ParamMapBucket {

    // 數組類型為 CacheMap<Object, AtomicInteger>
    private final CacheMap<Object, AtomicInteger>[] data;

    public ParamMapBucket() {
        this(DEFAULT_MAX_CAPACITY);
    }

    public ParamMapBucket(int capacity) {
        RollingParamEvent[] events = RollingParamEvent.values();
        // 根據需要統計的名額資料建立數組
        this.data = new CacheMap[events.length];
        // RollingParamEvent 可取值為 REQUEST_PASSED、REQUEST_BLOCKED
        for (RollingParamEvent event : events) {
            data[event.ordinal()] = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(capacity);
        }
    }

}

           
  • data:數組元素類型為 CacheMap<Object, AtomicInteger>,下标為 0 存儲的是統計請求通過的名額資料,下标為 1 統計的是請求被拒絕的名額資料。
  • CacheMap<Object, AtomicInteger>:key 為參數的取值,例如商品的 ID,value 才是名額數值。

HotParameterLeapArray 繼承 LeapArray,即實作滑動視窗。ParamMapBucket 不存儲視窗時間資訊,視窗時間資訊依然由 WindowWrap 存儲,HotParameterLeapArray 使用 WindowWrap 包裝 ParamMapBucket。

筆者也是看了 HotParameterLeapArray 之後才明白為什麼 Sentienl 将滑動視窗抽象為 LeapArray,這為擴充實作收集自定義名額資料的滑動視窗提供了支援。

HotParameterLeapArray 的提供的幾個 API 如下:

public class HotParameterLeapArray extends LeapArray<ParamMapBucket> {
   //.....
    public void addValue(RollingParamEvent event, int count, Object value) {
        // ....
    }

    public Map<Object, Double> getTopValues(RollingParamEvent event, int number) {
       // .....
    }

    public long getRollingSum(RollingParamEvent event, Object value) {
        // .....
    }

    public double getRollingAvg(RollingParamEvent event, Object value) {
        // ....
    }
}

           
  • addValue:添加參數的名額數值,例如,給 REQUEST_PASSED 名額且參數取值為 4343433 的名額數值加上 count,假設這個滑動視窗是用于統計商品 ID 參數的,4343433 表示商品 ID,count 為 1,調用該方法表示給商品 ID 為 4343433 的請求通過總數加 1。
  • getTopValues:擷取熱點參數的 QPS,即擷取某個名額排名前 number 的參數取值與名額資料。例如,擷取 REQUEST_PASSED 名額排名前 10 的 QPS,方法傳回值類型為 Map,key 為參數的取值,value 為 QPS。
  • getRollingSum:計算某個名額、參數的某個取值的總請求數。例如,擷取 REQUEST_PASSED 且商品 ID 為 4343433 的請求總數。
  • getRollingAvg:擷取某個名額、參數的某個取值的平均 QPS。例如,擷取 REQUEST_PASSED 且商品 ID 為 4343433 的平均 QPS。

可見,如果是分鐘級的滑動視窗,一分内參數的取值越多,其占用的記憶體就越多。

參數限流中的 Node

兩個需要重點關注的類:

  • ParameterMetric:用于實作類似 ClusterNode 的統計功能。
  • ParameterMetricStorage:用于實作類似 EntranceNode 功能,管理和存儲每個資源對應的 ParameterMetric。

ParameterMetric 有三個靜态字段,源碼如下:

public class ParameterMetric {

    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
    private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

}

           
  • ruleTimeCounters:用于實作勻速流量控制效果,key 為參數限流規則(ParamFlowRule),值為參數不同取值對應的上次生産令牌的時間。
  • ruleTokenCounter:用于實作勻速流量控制效果,key 為參數限流規則(ParamFlowRule),值為參數不同取值對應的目前令牌桶中的令牌數。
  • threadCountMap:key 為參數索引,值為參數不同取值對應的目前并行占用的線程總數。

ParameterMetricStorage 使用 ConcurrentHashMap 緩存每個資源對應的 ParameterMetric,隻會為配置了參數限流規則的資源建立 ParameterMetric。其部份源碼如下所示:

public final class ParameterMetricStorage {
    private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
    private static final Object LOCK = new Object();

    public static void initParamMetricsFor(ResourceWrapper resourceWrapper,ParamFlowRule rule) {
        if (resourceWrapper == null || resourceWrapper.getName() == null) {
            return;
        }
        String resourceName = resourceWrapper.getName();
        ParameterMetric metric;
        // 雙重檢測,線程安全,為資源建立全局唯一的 ParameterMetric
        if ((metric = metricsMap.get(resourceName)) == null) {
            synchronized (LOCK) {
                if ((metric = metricsMap.get(resourceName)) == null) {
                    metric = new ParameterMetric();
                    metricsMap.put(resourceWrapper.getName(), metric);
                }
            }
        }
        // 初始化 ParameterMetric
        metric.initialize(rule);
    }
}

           

initParamMetricsFor 方法用于為資源建立 ParameterMetric 并初始化,該方法在資源被通路時由 ParamFlowSlot 調用,并且該方法隻在為資源配置了參數限流規則的情況下被調用。

熱點參數限流功能的實作

sentinel-parameter-flow-control 子產品通過 Java SPI 注冊自定義的 SlotChainBuilder,即注冊 HotParamSlotChainBuilder,将 ParamFlowSlot 放置在 StatisticSlot 的後面,這個 ParamFlowSlot 就是實作熱點參數限流功能的切入點。

public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            return;
        }
        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

           

既然是參數限流,那麼肯定是需要能夠拿到參數了,而 ProcessorSlot#entry 方法的最後一個參數就是請求傳遞過來的參數,通過 SphU#entry 方法一層層往下傳遞。例如:

@GetMapping("/hello")
    public String apiHello(String name) throws BlockException {
        ContextUtil.enter("my_context");
        Entry entry = null;
        try {
            entry = SphU.entry("GET:/hello", EntryType.IN,1,name);
            doBusiness();
            return "Hello!";
        } catch (Exception e) {
            if (!(e instanceof BlockException)) {
                Tracer.trace(e);
            }
            throw e;
        } finally {
            if (entry != null) {
                entry.exit(1);
            }
            ContextUtil.exit();
        }
    }

           

當 SphU#entry 調用到 ParamFlowSlot#entry 方法時,ParamFlowSlot 調用 checkFlow 方法判斷是否需要限流。checkFlow 方法的實作如下:

void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        //(1)
        if (args == null) {
            return;
        }
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            return;
        }
        List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
        //(2)
        for (ParamFlowRule rule : rules) {
            applyRealParamIdx(rule, args.length);

            // Initialize the parameter metrics.
            ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

            if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                String triggeredParam = "";
                if (args.length > rule.getParamIdx()) {
                    Object value = args[rule.getParamIdx()];
                    triggeredParam = String.valueOf(value);
                }
                throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
            }
        }
    }

           
  • checkFlow 方法的最後一個參數是請求參數,也就是調用 SphU#entry 方法傳遞進來的參數。
  • checkFlow 方法首先調用 ParamFlowRuleManager 的 API 判斷目前資源有沒有配置參數限流規則,如果有,則擷取為目前資源配置的所有參數限流規則。
  • 周遊參數限流規則,調用 ParameterMetricStorage#initParamMetricsFor 方法判斷是否需要為目前資源初始化建立 ParameterMetric,然後調用 ParamFlowChecker#passCheck 方法判斷目前請求是否可以放行,如果需要拒絕請求,則抛出 ParamFlowException 異常。

在閱讀 ParamFlowChecker#passCheck 方法的源碼之前,我們需要先了解參數限流規則的配置,了解每個配置項的作用。

參數限流規則 ParamFlowRule 的源碼如下(有删減):

public class ParamFlowRule extends AbstractRule {
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    private double count;
    private Integer paramIdx;
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    private int maxQueueingTimeMs = 0;
    private long durationInSec = 1;
    private int burstCount = 0;
}

           
  • grade:限流規則的門檻值類型,支援的類型同 FlowRule。
  • count:門檻值,同 FlowRule。
  • paramIdx:參數索引,ParamFlowChecker 根據限流規則的參數索引擷取參數的值,下标從 0 開始,例如方法 public String apiHello(String name),該方法隻有一個參數,索引為 0 對應 name 參數。
  • controlBehavior:流量控制效果,同 FlowRule,但隻支援快速失敗和勻速排隊。
  • maxQueueingTimeMs:實作勻速排隊流量控制效果的虛拟隊列最大等待時間,超過該值的請求被抛棄,同 FlowRule;
  • durationInSec:統計名額資料的視窗時間大小,機關為秒。
  • burstCount:支援的突發流量總數。

假設需要針對資源“GET:/hello”的 name 參數限流,當 name 取值為“jackson”時限流 QPS 門檻值為 5,則配置如下:

ParamFlowRule rule = new ParamFlowRule();
// 資源為/hello
rule.setResource("GET:/hello");
// 索引 0 對應的參數為 name
rule.setParamIdx(0); 
// qps 限流
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 門檻值為 5
rule.setCount(5);
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

           

以此為例,我們分析 ParamFlowChecker#passCheck 方法源碼,passCheck 傳回 true 表示放行,傳回 false 表示拒絕。

ParamFlowChecker#passCheck 方法源碼如下:

public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
                             Object... args) {
        if (args == null) {
            return true;
        }
        // 判斷參數索引是否合法
        int paramIdx = rule.getParamIdx();
        if (args.length <= paramIdx) {
            return true;
        }
        // 擷取參數值,如果值為空則允許通過
        Object value = args[paramIdx];
        if (value == null) {
            return true;
        }
        // 叢集限流
        if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            return passClusterCheck(resourceWrapper, rule, count, value);
        }
        // 單機限流
        return passLocalCheck(resourceWrapper, rule, count, value);
    }

           
  • 如果參數為空、或者參數的總數小于等于規則配置的參數索引值、或者參數索引對應的參數的值為空,則放行請求;
  • 如果是叢集限流模式,則調用 passClusterCheck 方法,否則調用 passLocalCheck 方法。

我們先不讨論叢集限流情況,僅看單機本地限流情況。passLocalCheck 方法的源碼如下:

private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
                                          Object value) {
        try {
            // 基本資料類型
            if (Collection.class.isAssignableFrom(value.getClass())) {
                for (Object param : ((Collection)value)) {
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            }
            // 數組類
            else if (value.getClass().isArray()) {
                int length = Array.getLength(value);
                for (int i = 0; i < length; i++) {
                    Object param = Array.get(value, i);
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            }
            // 引用類型
            else {
                return passSingleValueCheck(resourceWrapper, rule, count, value);
            }
        } catch (Throwable e) {
        }
        return true;
    }

           

由于參數可能是基本資料類型,也可能是數組類型,或者引用類型,是以 passLocalCheck 方法分三種情況處理。我們隻讨論其中一種情況,其它情況的實作類似。

以資源“GET:/hello”為例,其方法 apiHello 的 name 參數為 String 類型,是以會調用 passSingleValueCheck 方法,該方法源碼如下:

static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
       //(1) 
       if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
                return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
            } else {
                return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
            }
        } 
        // (2)
        else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
            Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
            long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
            if (exclusionItems.contains(value)) {
                int itemThreshold = rule.getParsedHotItems().get(value);
                return ++threadCount <= itemThreshold;
            }
            long threshold = (long)rule.getCount();
            return ++threadCount <= threshold;
        }

        return true;
    }

           
  • 當規則配置的門檻值類型為 QPS 時,根據流控效果調用 passThrottleLocalCheck 或 passDefaultLocalCheck 方法;
  • 當規則配置的門檻值類型為 THREAD 時,擷取目前資源的 ParameterMetric,進而取得目前資源、目前參數的值對應的并行占用的線程總數,如果并行占用的線程總數+1 大于限流門檻值則限流,否則放行。

你可能好奇,并行占用線程總數是在哪裡自增和自減的呢?

這是由 ParamFlowStatisticEntryCallback 與 ParamFlowStatisticExitCallback 這兩個 Callback 實作的,分别在 StatisticSlot 的 entry 方法和 exit 方法中被回調執行,這是我們前面分析 StatisticSlot 源碼時故意遺漏的細節。

快速失敗(直接拒絕)與勻速排隊

1. 快速失敗

快速失敗基于令牌桶算法實作。passDefaultLocalCheck 方法控制每個時間視窗隻生産一次令牌,将令牌放入令牌桶,每個請求都從令牌桶中取走令牌,當令牌足夠時放行,當令牌不足時直接拒絕。ParameterMetric#tokenCounters 用作令牌桶,timeCounters 存儲最近一次生産令牌的時間。

passDefaultLocalCheck 方法源碼如下:

static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                         Object value) {
        //(1)
        ParameterMetric metric = getParameterMetric(resourceWrapper);
        CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
        CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
        if (tokenCounters == null || timeCounters == null) {
            return true;
        }
        // (2)
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long tokenCount = (long)rule.getCount();
        if (exclusionItems.contains(value)) {
            tokenCount = rule.getParsedHotItems().get(value);
        }
        if (tokenCount == 0) {
            return false;
        }
        // (3)
        long maxCount = tokenCount + rule.getBurstCount();
        if (acquireCount > maxCount) {
            return false;
        }
        while (true) {
            // (4)
            long currentTime = TimeUtil.currentTimeMillis();
            AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
            if (lastAddTokenTime == null) {
                tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                return true;
            }

            //(5)
            long passTime = currentTime - lastAddTokenTime.get();
            if (passTime > rule.getDurationInSec() * 1000) {
                // 確定非 NULL
                AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                if (oldQps == null) {
                    lastAddTokenTime.set(currentTime);
                    return true;
                } else {
                     //(6)
                    long restQps = oldQps.get();
                    // 計算需要新增的令牌數,根據時間間隔、限流門檻值、視窗時間計算
                    long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
                    // 計算新的令牌總數,并立即使用(扣減 acquireCount 個令牌)
                    long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
                        : (restQps + toAddCount - acquireCount);

                    if (newQps < 0) {
                        return false;
                    }
                    if (oldQps.compareAndSet(restQps, newQps)) {
                        lastAddTokenTime.set(currentTime);
                        return true;
                    }
                    Thread.yield();
                }
            } else {
                // (7)
                AtomicLong oldQps = tokenCounters.get(value);
                if (oldQps != null) {
                    long oldQpsValue = oldQps.get();
                    // 令牌是否足夠
                    if (oldQpsValue - acquireCount >= 0) {
                        // 從令牌桶中取走令牌
                        if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
                            return true;
                        }
                    } else {
                        return false;
                    }
                }
                Thread.yield();
            }
        }
    }

           
  1. 根據資源擷取 ParameterMetric,從 ParameterMetric 擷取目前限流規則的令牌桶和最近一次生産令牌的時間,時間精确到毫秒。
  2. 計算限流門檻值,即令牌桶最大存放的令牌總數(tokenCount)。
  3. 重新計算限流門檻值,将目前限流門檻值加上允許突增流量的數量。
  4. 擷取目前時間,如果目前參數值未生産過令牌,則初始化生産令牌,并立即使用(maxCount - acquireCount)。
  5. 擷取目前時間與上次生産令牌的時間間隔,如果間隔時間大于一個視窗時間見(6),否則見(7)。
  6. 計算需要生産的令牌總數,并與目前桶中剩餘的令牌數相加得到新的令牌總數,如果新的令牌總數大于限流門檻值,則使用限流門檻值作為新的令牌總數,并且生産完成立即使用(maxCount - acquireCount),最後更新最近一次生産令牌的時間。
  7. 從令牌桶中擷取令牌,如果擷取成功(oldQpsValue - acquireCount >= 0),則放行目前請求,否則拒絕目前請求。

2. 勻速排隊

與 RateLimiterController 實作原理一樣,passThrottleLocalCheck 方法讓請求在虛拟隊列中排隊,控制請求通過的時間間隔,該時間間隔通過門檻值與視窗時間大小計算出來,如果目前請求計算出來的排隊等待時間大于限流規則指定的 maxQueueingTimeMs,則拒絕目前請求。

passThrottleLocalCheck 方法源碼如下:

static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
        //(1)
        ParameterMetric metric = getParameterMetric(resourceWrapper);
        CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
        if (timeRecorderMap == null) {
            return true;
        }
        // (2)
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long tokenCount = (long)rule.getCount();
        if (exclusionItems.contains(value)) {
            tokenCount = rule.getParsedHotItems().get(value);
        }
        if (tokenCount == 0) {
            return false;
        }
        //(3)
        long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
        while (true) {
            //(4)
            long currentTime = TimeUtil.currentTimeMillis();
            AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
            if (timeRecorder == null) {
                return true;
            }
            long lastPassTime = timeRecorder.get();
            // 計算目前請求的期望通過時間,最近一次請求的期望通過時間 + 請求通過的時間間隔
            long expectedTime = lastPassTime + costTime;
            //(5)
            if (expectedTime <= currentTime 
                 || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
                AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
                if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
                    long waitTime = expectedTime - currentTime;
                    if (waitTime > 0) {
                        lastPastTimeRef.set(expectedTime);
                        try {
                            TimeUnit.MILLISECONDS.sleep(waitTime);
                        } catch (InterruptedException e) {
                            RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
                        }
                    }
                    return true;
                } else {
                    Thread.yield();
                }
            } else {
                return false;
            }
        }
    }

           
  1. 當流控效果選擇勻速限流時,ParameterMetric 的 ruleTimeCounters 不再是記錄上次生産令牌的時間,而是記錄最後一個請求的期望通過時間。
  2. 計算限流門檻值,不支援突增流量。
  3. 計算請求通過的時間間隔,例如,當 acquireCount 等于 1、限流門檻值配置為 200 且視窗時間大小為 1 秒時,計算出來的 costTime 等于 5ms,即每 5ms 隻允許通過一個請求。
  4. 計算目前請求的期望通過時間,值為最近一次請求的期望通過時間 + 請求通過的時間間隔,最近一次請求的期望通過時間也就是虛拟隊列中隊列尾部的那個請求的期望通過時間。
  5. 如果期望通過時間與目前時間間隔大于規則配置的允許隊列最大等待時間(maxQueueingTimeMs),則拒絕目前請求,否則将目前請求“放入”虛拟隊列等待,計算出目前請求需要等待的時間,讓目前線程休眠指定時長之後再放行該請求。

總結

黑白名單限流的實作相對簡單,熱點參數限流的實作相對複雜。熱點參數限流自己實作了一個滑動視窗用于收集名額資料,但該滑動視窗并未被使用,而是使用 ParameterMetric 與 ParameterMetricStorage,這應該是出于性能的考慮。熱點參數限流對性能的影響和對記憶體的占用與參數的取值有多少種可能成正比,限流參數的取值可能性越多,占用的記憶體就越大,對性能的影響也就越大,在使用熱點參數限流功能時,一定要考慮參數的取值。

例如,根據商品 ID 限流,如果有十萬個商品下單,那麼 CacheMap 将會存在十萬個 key-value,并且不會被移除,随着程序運作的時長而增長。如果限流門檻值類型選擇為 THREAD 則不會存在這個問題,因為在 ParamFlowStatisticExitCallback 方法會調用 ParameterMetric#decreaseThreadCount 方法扣減參數值占用的線程數,當線程數為零時,會将目前參數值對應的 key-value 從 CacheMap 中移除。

繼續閱讀