天天看點

Redis之限流限流使用redis簡單限流漏鬥限流

限流

當系統的處理能力有限時,如何阻止計劃外的請求繼續對系統施壓,是一個需要重視的問題,避免超出負載的流量影響系統的穩定運作,這就需要用到限流算法,

除了控制流量,限流還有一個目的是控制使用者行為,避免垃圾請求,比如在論壇上,使用者的發帖、回複、點贊等行為都要嚴格受控,一般短時間内使用者的請求将會收到一定次數的限制,超過這個限制将拒絕或做其它處理。

使用redis簡單限流

接下來我們使用redis實作一個在指定時間内隻能做固定次數的操作,超出這個次數,就做出拒絕處理,

我們可以考慮使用zset這個資料結構,使用score存儲每次操作的時間戳,value根據業務情況來,保證value唯一性即可,

随後每次我們使用滑動時間視窗(定寬),每次都保留着這個視窗内的資料,其餘的都trim掉,此舉也可避免一定的記憶體空間的浪費,如果使用者是冷使用者,滑動時間視窗内的行為是空記錄,那麼該zset就可以從記憶體中移除,不再占用空間,

使用zset結構記錄使用者的行為曆史,每一個行為都會作為zset中的一個key儲存下來,同一個使用者的同一種行為用一個zset記錄,

Redis之限流限流使用redis簡單限流漏鬥限流

下方使用zset的滑動視窗代碼實作,我們首先在zset中添加使用者本次行為,然後删除了本次時間視窗外的資料,

最後擷取本次視窗内的資料總數,如果總數沒有超出限制,我們傳回true,超出限制了傳回false,下面main方法中設定的時間視窗是60s,在60s內最多5次請求,

Redis之限流限流使用redis簡單限流漏鬥限流

根據結果我們可以看到,循環20次,隻有前5次得到了true,允許請求通路,後續的結果全是false,被拒絕通路。

Redis之限流限流使用redis簡單限流漏鬥限流

因為這幾個連續的操作都是針對同一個key的,使用pipeline可以顯著提高redis存取效率。

不過這種方案存在着自己的不足,因為要記錄時間視窗内所有的行為記錄,如果這個量很大,比如60s内操作不能超過100萬次,這時候是不适合用這種方式的,會消耗大量的存儲空間。

漏鬥限流

漏鬥限流是最常用的限流方法之一,這個算法的靈感來自于漏鬥(funnel)的結構,如下圖所示,漏鬥的容量是有限的,

如果将漏嘴堵住,然後一直灌水,它會變滿,直到再也裝不下,如果将漏嘴放開,水會從下方流出,流走一部分後,又可以繼續灌水,

如果漏嘴的速率大于灌水的速率,那麼漏洞永遠裝不滿,如果漏嘴速率小于灌水的速率,一旦漏鬥滿了,灌水就需要暫停并等待漏鬥騰出一部分的空間,方可繼續灌水,

是以,漏鬥的剩餘空間就代表着目前行為可以持續進行的數量,漏嘴的流水速率代表着系統允許該行為的最大頻率。

Redis之限流限流使用redis簡單限流漏鬥限流

java實作

package com.redis.cell;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class FunnelRateLimiter {

    static class Funnel {

        // 漏鬥容量
        int capacity;
        // 漏嘴流水速率
        float leakingRate;
        // 漏鬥剩餘空間
        int leftQuota;
        // 上一次漏水時間
        long leakingTs;

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        synchronized void makeSpace() {
            // 目前時間
            long nowTs = System.currentTimeMillis();
            // 距離上一次漏水過去了多長時間
            long deltaTs = nowTs - leakingTs;
            // 過去的時間流了多少水
            int deltaQuota = (int) (deltaTs * leakingRate);
            // 小于0說明時間過長,超出最大值了,重新初始化容量
            if (deltaQuota < 0) {
                this.leakingRate = capacity;
                this.leakingTs = nowTs;
                return;
            }
            // 流出的還沒到1,就不往下走了
            if (deltaQuota < 1) {
                return;
            }
            // 剩餘空間大于0,但是和流出的相加超出最大值後,也重新初始化容量
            if (this.leftQuota > 0 && (this.leftQuota + deltaQuota) < 0) {
                this.leakingRate = capacity;
                this.leakingTs = nowTs;
                return;
            }
            // 将剩餘容量和流出的累加
            this.leftQuota += deltaQuota;
            // 記錄本次流水時間
            this.leakingTs = nowTs;
            // 超出最大值,則用最大值
            if (this.leftQuota > this.capacity) {
                this.leftQuota = this.capacity;
            }
        }

        synchronized boolean watering(int quota) {
            // 計算流水和剩餘容量
            makeSpace();
            // 剩餘容量大于等于本次的用量,允許操作
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }

    }

    private static Map<String, Funnel> funnelMap = new ConcurrentHashMap<>();

    public static boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnelMap.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnelMap.put(key, funnel);
        }
        return funnel.watering(1);
    }

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            System.out.println(isActionAllowed("mn", "reply", 5, 1));
        }
    }

}           

Funnel對象的makeSpace方法是漏鬥算法的核心,每次灌水前都會調用該方法觸發漏水,給漏鬥騰出空間,能騰出多少空間取決過去了多久,以及流水的速率,

Funnel對象占據的空間大小不再和行為的頻率成正比,它的空間占用是一個常量。

上述代碼的運作結果如下圖,速率是1毫秒流走一個量,最大容量是5,可以看到下面的輸出,輸出5個true後,後續的輸出都是false,直到過去1毫秒後,才繼續輸出true。

Redis之限流限流使用redis簡單限流漏鬥限流

上面的代碼是使用Java代碼實作的,實作分布式則需要使用redis來處理,我們可以使用redis的hash結構,

将Funnel對象的字段存儲到hash中去,灌水的時候将hash中的字段取出運算,最後再放回去,但是這就出現了原子性的問題,因為這幾個步驟的過程非原子性,意味着需要進行加鎖,加鎖還有加鎖失敗重試的可能,降低了性能,影響使用者體驗,

不過這些操作可以使用lua腳本來做,但是在redis4.0中為我們提供了一個Redis-Cell的子產品,這個子產品使用了漏鬥算法實作了限流,并提供原子的限流指令給我們用,這使得用redis實作限流就很簡單了。

RedisCell

該子產品需要額外安裝插件,我們使用docker進行安裝運作:

docker pull carto/redis-cell
docker run -d -p 6379:6379 --name redisCell 69846d418101           

該子產品隻有1個指令,cl.throttle,它的參數和傳回值都比較多,下面看看使用說明

Redis之限流限流使用redis簡單限流漏鬥限流

上面指令的意思是,允許使用者點贊的行為頻率為60s内共30次操作,漏鬥的初始容量為15,即開始連續可以點15次的贊,後續将受到漏水速率的影響,

是以上面的指令中 user1:zan 是使用者的id+行為(key),15是容量(capacity),30是共多少次操作(operations),60則是多長時間(seconds),最後個即每次操作消耗的容量(quota),可以不填,預設也是1。

傳回的結果解釋如下

127.0.0.1:6379> cl.throttle user1:zan 15 30 60 1

1) (integer) 0   # 0 表示允許,1表示拒絕

2) (integer) 16  # 漏鬥容量

3) (integer) 15  # 漏洞剩餘空間

4) (integer) -1  # 被拒絕後,還有多少秒漏洞會有空間(多少秒後可以重試)

5) (integer) 2   # 多長時間後,漏鬥将完全空出來(機關秒)
           

執行限流指令時,如果被拒絕了,就需要丢棄或重試,cl.throttle 指令将多久後重試的時間都算好了,直接取傳回結果的第四個值進行sleep等待後重試即可,如果不想阻塞線程,也可以用異步定時任務重試。

Java代碼運作結果如下:

Redis之限流限流使用redis簡單限流漏鬥限流

本文用到的代碼都在:

https://github.com/qiaomengnan16/redis-demo/tree/main/redis-cell