天天看點

限流算法固定視窗(計數器法)滑動窗密碼牌桶算法(谷歌的開源guava有實作)漏桶算法

關于限流算法,一般常見的有下面四種:

  1. 固定視窗(計數器法)
  2. 滑動視窗
  3. 令牌桶算法(谷歌的開源guava有實作)
  4. 漏桶算法

固定視窗(計數器法)

固定視窗是限流算法裡最簡單也是最容易實作的一種算法。假設我們規定接口A的qps是100, 即每分鐘的通路次數不能超過100。那麼我們可以這麼做:在一開始的時候,我們可以設定一個計數器counter,初始化為0, 過期時間為1秒,即1秒後計數器失效。每當一個請求過來的時候,counter值加1,判斷目前counter的值是否大于100,如果大于100則說明請求數過多,直接拒絕請求。如果請求counter計數器不存在,則重置計數器,開始新的一秒的接口限流。注意并發情況下通路計數器需要加鎖。

要點

1、有個定時器,定時把counter重置為0。

2、counter應該是原子類型,例如AtomicInteger。

缺點

限制粒度太低,存在臨界問題。

假設有一個惡意使用者,他在0:59時,瞬間發送了100個請求,并且1:00又瞬間發送了100個請求,那麼其實這個使用者在1秒内,瞬間發送了200個請求。可以瞬間超過我們的速率限制。使用者有可能通過算法的這個漏洞,瞬間壓垮我們的應用。解決這個問題的辦法就是提高限流的粒度,即滑動視窗算法。

滑動視窗

滑動時間視窗計數器算法思想:針對固定時間算法會在臨界點存在瞬間大流量沖擊的場景,滑動時間視窗計數器算法應運而生。它将時間視窗劃分為更小的時間片段,每過一個時間片段,時間視窗就會往右滑動一格,每個時間片段都有獨立的計數器。在計算整個時間視窗内的請求總數時會累加所有的時間片段内的計數器。時間視窗劃分的越細,那麼滑動視窗的滾動就越平滑,限流的統計就會越精确。

一般情況下,會根據片段個數建構循環數組,則每次滑動時,都是滑動到一個已經使用過的片段,則計數應該減去此片段的舊值。

限流算法固定視窗(計數器法)滑動窗密碼牌桶算法(谷歌的開源guava有實作)漏桶算法

 要點:

1、每個時間片段有個計數器。

2、有個總的計數器,是在視窗内的所有片段的計數器的總和。

3、如果采用循環數組,注意減去舊值。

令牌桶算法(谷歌的開源guava有實作)

系統按照恒定的時間間隔(通常是1/QPS)往桶裡加入Token,每個Token代表一次接口通路權限,如果桶已經滿了丢棄令牌。新請求來臨時,會請求從桶中拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務請求。放入Token的時間間隔取決于限制的qps,假設接口的qps是100,則按照1/qps的速率放令牌,即每10ms放入一個令牌,令牌桶算法不存在瞬間的流量高峰,它能嚴格控制接口在qps内通路。

限流算法固定視窗(計數器法)滑動窗密碼牌桶算法(谷歌的開源guava有實作)漏桶算法

RateLimiter

public abstract class RateLimiter {
}
           

 Guava有兩種限流模式,一種為穩定模式(SmoothBursty:令牌生成速度恒定),一種為漸進模式(SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個穩定值) 兩種模式實作思路類似,主要差別在等待時間的計算上。

每次請求時可以請求多個令牌,對本任務沒有限制,但是會對下一個任務有限制,因為如果沒有請求到的令牌會先占着。

建立方法

//平滑
  public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }
//平滑
  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }
  //漸進
  public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {
    return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);
  }
  //漸進
  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
    return create(
        permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
  }
  //漸進
  @VisibleForTesting
  static RateLimiter create(
      double permitsPerSecond,
      long warmupPeriod,
      TimeUnit unit,
      double coldFactor,
      SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }
           

流量

private volatile @Nullable Object mutexDoNotUseDirectly;

//雙重check 延遲生成單例。生成一個信号量。因為沒有public 構造函數,是以要這樣。
  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

  public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

  abstract void doSetRate(double permitsPerSecond, long nowMicros);


  public final double getRate() {
    synchronized (mutex()) {
      return doGetRate();
    }
  }

  abstract double doGetRate();
           

擷取令牌

/**
   * 擷取一個令牌,取不到就阻塞。傳回一個睡眠時間,機關是秒。
   * 如果沒有令牌可用,acquire會自己sleep,調用方得到傳回值之後不需要sleep。
   */
  
  public double acquire() {
    return acquire(1);
  }



   //從目前RateLimiter擷取給定數量的令牌,阻塞到擷取請求可以通過。
    //它會傳回一個以秒記的睡眠時間,如果有的話。
    //傳回值是可以忽略的,因為方法内部已經處理了等待時間,調用方不需要sleep
    @CanIgnoreReturnValue
  public double acquire(int permits) {
    //擷取指定數量的令牌,得到一個需要等待的微秒數
    long microsToWait = reserve(permits);
    //睡眠并且不響應interrupt
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    //轉換成秒,先乘以1.0變成浮點數,避免整數運算丢失精度
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

//從目前RateLimiter中擷取給定數量的令牌,傳回目前時刻到令牌可用時刻的微秒數
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

//預留令牌并傳回調用者必須等待的時間
final long reserveAndGetWaitLength(int permits, long nowMicros) {
    //傳回這些令牌最早可用的時刻
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    //最早可用時刻-目前時刻就是等待時間,如果小于0則取0
    return max(momentAvailable - nowMicros, 0);
}

  
  //
  public boolean tryAcquire(Duration timeout) {
    return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
  }

  //
  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
  public boolean tryAcquire(long timeout, TimeUnit unit) {
    return tryAcquire(1, timeout, unit);
  }

  //
  public boolean tryAcquire(int permits) {
    return tryAcquire(permits, 0, MICROSECONDS);
  }

  //
  public boolean tryAcquire() {
    return tryAcquire(1, 0, MICROSECONDS);
  }

  //
  public boolean tryAcquire(int permits, Duration timeout) {
    return tryAcquire(permits, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
  }

  //
  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }
  //
  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }
 
 
  abstract long queryEarliestAvailable(long nowMicros);
  abstract long reserveEarliestAvailable(int permits, long nowMicros);
 
           

SmoothRateLimiter

SmoothRateLimiter是RateLimiter的平滑限流的實作,它又有2個子類實作:SmoothWarmingUp和SmoothBursty。

限流器的基本提點是一個“穩定的速率”,即在正常條件下的最大速率。為達到這個目的,限流器會根據需要壓制到達的請求。通過計算,限流器會確定到達請求等待合理的時間,以此達到壓制的目的。

維持一個速率(通常被指定為QPS)最簡單的方法是記住上個被允許請求的最後時間戳,并保證在1/QPS時間内不執行請求。例如,QPS=5(每秒5個token),如果我們能夠確定自從上個請求後,在200ms内沒有請求被允許執行,那麼我們将獲得一個想要的速率。如果一個請求在上個請求被放行100ms後到達,那麼我們需要等待額外的100ms。在這個速率下,15個新的許可耗時3秒鐘(例如對于請求 acquire(15))。

很重要的一點,是能夠意識到限流器對過去隻有很淺的記憶。它隻會記住上一個請求。那如果限流器很久沒有被使用,然後一個請求突然到達并被立即允許怎麼辦?這可能會有兩種情形,一種是資源利用不充分,另一種則是導緻溢出,具體取決于沒有遵循預定速率的真實原因。

之前的利用不足意味多餘的資源可被擷取。限流器應該加速一段時間,以利用這些資源。速率适應網絡(帶寬)很重要,過去的利用不足被解釋為“幾乎為空的緩沖”,可以被快速填補。

另一方面,過去的利用不足也可能意味着“伺服器沒有準備好處理将來的請求”,例如,緩存失效,請求更有可能會觸發耗時的操作(一個更極端的例子是,當一個伺服器剛剛被引導,它更可能忙于自身的喚醒)。

即“過去的利用不足”被模組化為變量“storedPermits”。這個變量在沒有使用時為0,當有大量使用時,它可以增長到maxStoredPermits。是以,請求會被函數acquire(permits)觸發許可,提供以下兩種類型的許可:

  • stored permits(可擷取的已存許可)
  • fresh permits(新的的許可)

工作原理如下:

對于一個限流器,每秒産生一個令牌。不使用限流器時,我們都會給storedPermits加1。如果說我們有10sec不使用限流器(例如預計請求在時刻X到來,但在請求到來之前,我們在X+10。這也是上段所描述的點。)。是以storedPermits變為10(假設maxStoredPermits>=10)。在這時,一個人acquire(3)的請求到了。我們從已有的storedPermits拿出許可服務這個請求,并将許可數降至7.(這如何被解釋為壓制時間,将會在之後被詳細讨論。)這之後,假設馬上有一個acquire(10)的請求到達,我們用剩下所有的7個許可數來應對這個請求,還有3個許可數,我們需要通過重新整理限流器新提供。

我們也已經知道花費在3個新的許可上的時間:如果速率是1令牌/sec,那麼我們将花費3秒。但是使用7個已存許可又是什麼意思呢?正如上面所說,這裡沒有固定答案。如果我們主要興趣在應對資源利用不足上,我們想要存儲許可釋放比重新整理許可快。因為利用不足=尚有未被占用的資源。如果我們主要興趣點在應對溢出,那麼存儲的許可數應該釋放的比重新整理的慢。是以,我們想要一個(在每種情形都不同的)方法來解釋storePermits,以此壓制時間。storedPermitsToWaitTime(double storedPermits, double permitsToTake) 在其中扮演重要角色。底層的模型是一個持續變化的函數映射storedPermits(從0到maxStoredPermits)到1/rate(時間間隔)。storedPermits在衡量未使用時間上是必不可少的。我們使用未利用時間換取許可數(permits)。速率是permits/time,是以1/rate=time/permits.是以"1/rate"(time/permits)乘以permits等于給定時間。對于指定數量的請求許可來說,這個積分函數(storedPermitsToWaitTime()計算)與持續請求的最小時間間隔相關。

這裡有個storePermitsToWaitTime的例子。如果storedPermits=10,我們想要3個permits,我們從storedPermits中去擷取,減少他們到7個,并且計算壓制時間作為一個調用storedPermitsToWaitTime(storedPermits=10,permitsToTake=3),這将會評估這個函數積分從7到10.

使用積分保證acquire(3)效果等同于3次acquire(1),或一次acquire(2)+一次acquire(1)。因為積分在[7.0,10.0]等同于在[7.0,8.0],[8.0,9.0],[9.0,10.0]等等。無論這個函數是什麼。這使得我們可以正确處理不同權重(permits)的請求時,不論真正的函數是什麼。是以我們可以自由調整。(唯一的條件顯然是我們能夠計算出他的間隔時間)。

注意,對于這個函數,我們選擇水準線,高度為1/QPS,是以這個函數的影響是不存在的。對于storedPermits将會完全等同于重新整理一個新的(1/QPS是他的代價)。我們将會在之後使用這個小訣竅。

如果我們采用一個低于這條水準線的函數,這意味着我們減少了這個函數的區域,也就是時間。是以限流器就會在一段時間的利用不足後變快。另一方面,如果我們使用一個高于此水準線的函數,這就意味着代表時間的區域增大,是以storedPermits将會比重新整理一個新許可更耗時,相應地,限流器就會在一段時間的利用不足後變慢。

最後,考慮一個限流器以1permit/sec速率,目前未被使用,有一個acquire(100)的請求到來。等待100sec才開始執行任務将會是很愚蠢的行為。為什麼不作任何事情隻等待呢?一個更好的方法是立刻允許請求(正如它是acquire(1)的請求一樣),并且按需要延緩此後的需求。在這個版本,我們允許立刻開始執行任務,并且延緩100秒之後的請求,是以我們允許工作執行而不是讓它空閑等待,即讓後面的等,此次不等,即壓制時間。

這裡有很重要的因果關系。這意味着限流器不會記住最後請求的時刻,但它會記住下一個請求(預計)時間。這也使我們能夠立即知道(見tryAcquire(timeout))指定時間timeout是否足夠将我們帶到下一個排程的時間點,因為我們總維持那個。并且我們所指的“未被使用的限流器”也被這所定義:但我們觀察“下一個請求的期待到達時間”在過去,那麼(now-past)的時間差将被看作RateLimiter未被正式使用時間。這也是被我們解釋為storedPermits的時間。(我們用空閑的時間産生的許可數來增加storedPermits)。是以,如果速率=1許可/sec,并且請求在之前那個請求後一秒後準時到達,那麼storedPermits将永遠不會增加。我們隻會在當晚于預期一秒時間的到達,才會增加它。 

記錄 1秒内的微秒數/permitsPerSencond = 時間間隔interval,每一個interval可獲得一個令牌

屬性

//目前桶裡已有的令牌數。之前一個時間段沒用掉的令牌存着留着将來用。
    double storedPermits;

   //最大可儲存的令牌數,即令牌桶的容量
    double maxPermits;

    //兩個請求之間的靜态間隔,比如每秒限速5個請求,靜态間隔就是200ms
    double stableIntervalMicros;

    //下一個令牌可用的時間。
    //一旦給一個請求發令牌,這個時間就會被推後,請求數量越大,這個時間被推後的越遠。
    //這個時間也可能是過去的某個值。
    //以上面每秒5個請求為例,假設第一個請求在T時刻拿到令牌,則第二個能拿到令牌的時間T+200ms
    //但是這時請求可能沒有到來(比如說沒有使用者點選或者沒有任務),

    private long nextFreeTicketMicros = 0L;
           

建立

private SmoothRateLimiter(SleepingStopwatch stopwatch) {
    super(stopwatch);
  }
           

流量

@Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    //1秒除以每秒多少個令牌,即為靜态間隔
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    //設定流量
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

  abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);

  @Override
  final double doGetRate() {
    return SECONDS.toMicros(1L) / stableIntervalMicros;
  }

  @Override
  final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
  }
           

令牌産生和消費

令牌産生由resync負責。

//根據目前時間更新桶裡的令牌數和下一個可用令牌的時間,
//需要注意:resync隻關注産生的邏輯,不關注消耗的邏輯,而且隻産生過去某時刻到現在的
 
// 因為是根據目前時間計算的過去那段時間應産生的令牌,是以本次計算完之後,
// 桶裡的令牌數不一定夠本次acquire的數量,那就隻能把nextFreeTicketMicros再推後了
// 但是本方法隻負責計算到現在
void resync(long nowMicros) {
    // nextFreeTicketMicros 小于目前時間,表示有令牌沒有使用,則不産生了。
    if (nowMicros > nextFreeTicketMicros) {
      //新令牌數=過去的時間/每個令牌的間隔,在SmoothBursty中coolDownIntervalMicros傳回的是間隔
      //coolDownIntervalMicros,用于目前時刻的間隔,子類可覆寫。
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      
      // 控制桶的容量,多餘的就丢掉,
      storedPermits = min(maxPermits, storedPermits + newPermits);
      
      // 對過去一段時間産生了令牌之後,下一個可用令牌時間就變成現在了
      // 再次明确:本方法隻關注産生邏輯,是以nextFreeTicketMicros回被更新到目前
      nextFreeTicketMicros = nowMicros;
    }

 }


  abstract double coolDownIntervalMicros();
           
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    //産生上次到現在令牌并更新時間。
    resync(nowMicros);
    
    //傳回的等待時間。
    long returnValue = nextFreeTicketMicros;
    

    //可消費的令牌數(請求數,目前數的最小值)。
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    
    //需要新産生的令牌數
    double freshPermits = requiredPermits - storedPermitsToSpend;
    
    //freshPermits * stableIntervalMicros 産生這幾個新令牌需要的等待時間
    //等待時長 =  新令牌等待時長 + storedPermitsToWaitTime 時長
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
    
    //下一次能取得令牌的時間,指派之前nextFreeTicketMicros就是目前時刻,加上等待時間就是下一個時刻。
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    
    // 本次擷取令牌之後,更新桶裡最後剩餘的令牌(減去被消費掉的數量)
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}
           

SmoothBursty

SmoothBursty允許Burst到一定的流量。

屬性

static final class SmoothBursty extends SmoothRateLimiter {
    //最長令牌儲存時間(秒)。
    final double maxBurstSeconds;

}
           

建立

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }
           

流量

@Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // 溢出判斷
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
           

SmoothWarmingUp

 SmoothWarmingUp更平滑一些。有一個warmup period,為thresholdPermits到maxPermits的這段範圍

  • 從0->thresholdPermits,生成一個令牌的時間:stableIntervalMicros
  • 從thresholdPermits -> maxPermits ,生成一個令牌的時間:stableIntervalMicros + permits * slope;
* <pre>
   *          ^ throttling
   *          |
   *    cold  +                  /
   * interval |                 /.
   *          |                / .
   *          |               /  .   ← "warmup period" is the area of the trapezoid between
   *          |              /   .     thresholdPermits and maxPermits
   *          |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM .
   * interval |          .   UP  .
   *          |          . PERIOD.
   *          |          .       .
   *        0 +----------+-------+--------------→ storedPermits
   *          0 thresholdPermits maxPermits
   * </pre>
           

屬性

static final class SmoothWarmingUp extends SmoothRateLimiter {
//預熱期(毫秒)
    private final long warmupPeriodMicros;
//
    private double slope;
//  門檻值
    private double thresholdPermits;
//  冷啟動因子
    private double coldFactor;

  }
           

漏桶算法

漏桶算法就是我們常采用的隊列限流方法,通過在請求和實際的處理服務中間加一層隊列,限制高并發請求對系統造成的壓力,系統将按照自己的處理能力慢慢處理請求,這種限流方法的适用于接口處理實時性要求不高的業務場景,且可靠性高,漏桶算法實作也比較簡單。