天天看点

限流算法固定窗口(计数器法)滑动窗口令牌桶算法(谷歌的开源guava有实现)漏桶算法

关于限流算法,一般常见的有下面四种:

  1. 固定窗口(计数器法)
  2. 滑动窗口
  3. 令牌桶算法(谷歌的开源guava有实现)
  4. 漏桶算法

固定窗口(计数器法)

固定窗口是限流算法里最简单也是最容易实现的一种算法。假设我们规定接口A的qps是100, 即每分钟的访问次数不能超过100。那么我们可以这么做:在一开始的时候,我们可以设置一个计数器counter,初始化为0, 过期时间为1秒,即1秒后计数器失效。每当一个请求过来的时候,counter值加1,判断当前counter的值是否大于100,如果大于100则说明请求数过多,直接拒绝请求。如果请求counter计数器不存在,则重置计数器,开始新的一秒的接口限流。注意并发情况下访问计数器需要加锁。

要点

1、有个定时器,定时把counter重置为0。

2、counter应该是原子类型,例如AtomicInteger。

缺点

限制粒度太低,存在临界问题。

假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在1秒内,瞬间发送了200个请求。可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。解决这个问题的办法就是提高限流的粒度,即滑动窗口算法。

滑动窗口

滑动时间窗口计数器算法思想:针对固定时间算法会在临界点存在瞬间大流量冲击的场景,滑动时间窗口计数器算法应运而生。它将时间窗口划分为更小的时间片段,每过一个时间片段,时间窗口就会往右滑动一格,每个时间片段都有独立的计数器。在计算整个时间窗口内的请求总数时会累加所有的时间片段内的计数器。时间窗口划分的越细,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

一般情况下,会根据片段个数构建循环数组,则每次滑动时,都是滑动到一个已经使用过的片段,则计数应该减去此片段的旧值。

限流算法固定窗口(计数器法)滑动窗口令牌桶算法(谷歌的开源guava有实现)漏桶算法

 要点:

1、每个时间片段有个计数器。

2、有个总的计数器,是在窗口内的所有片段的计数器的总和。

3、如果采用循环数组,注意减去旧值。

令牌桶算法(谷歌的开源guava有实现)

系统按照恒定的时间间隔(通常是1/QPS)往桶里加入Token,每个Token代表一次接口访问权限,如果桶已经满了丢弃令牌。新请求来临时,会请求从桶中拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务请求。放入Token的时间间隔取决于限制的qps,假设接口的qps是100,则按照1/qps的速率放令牌,即每10ms放入一个令牌,令牌桶算法不存在瞬间的流量高峰,它能严格控制接口在qps内访问。

限流算法固定窗口(计数器法)滑动窗口令牌桶算法(谷歌的开源guava有实现)漏桶算法

RateLimiter

public abstract class RateLimiter {
}
           

 Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值) 两种模式实现思路类似,主要区别在等待时间的计算上。

每次请求时可以请求多个令牌,对本任务没有限制,但是会对下一个任务有限制,因为如果没有请求到的令牌会先占着。

创建方法

//平滑
  public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }
//平滑
  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }
  //渐进
  public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {
    return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);
  }
  //渐进
  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
    return create(
        permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
  }
  //渐进
  @VisibleForTesting
  static RateLimiter create(
      double permitsPerSecond,
      long warmupPeriod,
      TimeUnit unit,
      double coldFactor,
      SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }
           

流量

private volatile @Nullable Object mutexDoNotUseDirectly;

//双重check 延迟生成单例。生成一个信号量。因为没有public 构造函数,所以要这样。
  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

  public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

  abstract void doSetRate(double permitsPerSecond, long nowMicros);


  public final double getRate() {
    synchronized (mutex()) {
      return doGetRate();
    }
  }

  abstract double doGetRate();
           

获取令牌

/**
   * 获取一个令牌,取不到就阻塞。返回一个睡眠时间,单位是秒。
   * 如果没有令牌可用,acquire会自己sleep,调用方得到返回值之后不需要sleep。
   */
  
  public double acquire() {
    return acquire(1);
  }



   //从当前RateLimiter获取给定数量的令牌,阻塞到获取请求可以通过。
    //它会返回一个以秒记的睡眠时间,如果有的话。
    //返回值是可以忽略的,因为方法内部已经处理了等待时间,调用方不需要sleep
    @CanIgnoreReturnValue
  public double acquire(int permits) {
    //获取指定数量的令牌,得到一个需要等待的微秒数
    long microsToWait = reserve(permits);
    //睡眠并且不响应interrupt
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    //转换成秒,先乘以1.0变成浮点数,避免整数运算丢失精度
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

//从当前RateLimiter中获取给定数量的令牌,返回当前时刻到令牌可用时刻的微秒数
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

//预留令牌并返回调用者必须等待的时间
final long reserveAndGetWaitLength(int permits, long nowMicros) {
    //返回这些令牌最早可用的时刻
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    //最早可用时刻-当前时刻就是等待时间,如果小于0则取0
    return max(momentAvailable - nowMicros, 0);
}

  
  //
  public boolean tryAcquire(Duration timeout) {
    return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
  }

  //
  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
  public boolean tryAcquire(long timeout, TimeUnit unit) {
    return tryAcquire(1, timeout, unit);
  }

  //
  public boolean tryAcquire(int permits) {
    return tryAcquire(permits, 0, MICROSECONDS);
  }

  //
  public boolean tryAcquire() {
    return tryAcquire(1, 0, MICROSECONDS);
  }

  //
  public boolean tryAcquire(int permits, Duration timeout) {
    return tryAcquire(permits, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
  }

  //
  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }
  //
  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }
 
 
  abstract long queryEarliestAvailable(long nowMicros);
  abstract long reserveEarliestAvailable(int permits, long nowMicros);
 
           

SmoothRateLimiter

SmoothRateLimiter是RateLimiter的平滑限流的实现,它又有2个子类实现:SmoothWarmingUp和SmoothBursty。

限流器的基本提点是一个“稳定的速率”,即在正常条件下的最大速率。为达到这个目的,限流器会根据需要压制到达的请求。通过计算,限流器会确保到达请求等待合理的时间,以此达到压制的目的。

维持一个速率(通常被指定为QPS)最简单的方法是记住上个被允许请求的最后时间戳,并保证在1/QPS时间内不执行请求。例如,QPS=5(每秒5个token),如果我们能够确保自从上个请求后,在200ms内没有请求被允许执行,那么我们将获得一个想要的速率。如果一个请求在上个请求被放行100ms后到达,那么我们需要等待额外的100ms。在这个速率下,15个新的许可耗时3秒钟(例如对于请求 acquire(15))。

很重要的一点,是能够意识到限流器对过去只有很浅的记忆。它只会记住上一个请求。那如果限流器很久没有被使用,然后一个请求突然到达并被立即允许怎么办?这可能会有两种情形,一种是资源利用不充分,另一种则是导致溢出,具体取决于没有遵循预定速率的真实原因。

之前的利用不足意味多余的资源可被获取。限流器应该加速一段时间,以利用这些资源。速率适应网络(带宽)很重要,过去的利用不足被解释为“几乎为空的缓冲”,可以被快速填补。

另一方面,过去的利用不足也可能意味着“服务器没有准备好处理将来的请求”,例如,缓存失效,请求更有可能会触发耗时的操作(一个更极端的例子是,当一个服务器刚刚被引导,它更可能忙于自身的唤醒)。

即“过去的利用不足”被建模为变量“storedPermits”。这个变量在没有使用时为0,当有大量使用时,它可以增长到maxStoredPermits。所以,请求会被函数acquire(permits)触发许可,提供以下两种类型的许可:

  • stored permits(可获取的已存许可)
  • fresh permits(新的的许可)

工作原理如下:

对于一个限流器,每秒产生一个令牌。不使用限流器时,我们都会给storedPermits加1。如果说我们有10sec不使用限流器(例如预计请求在时刻X到来,但在请求到来之前,我们在X+10。这也是上段所描述的点。)。因此storedPermits变为10(假设maxStoredPermits>=10)。在这时,一个人acquire(3)的请求到了。我们从已有的storedPermits拿出许可服务这个请求,并将许可数降至7.(这如何被解释为压制时间,将会在之后被详细讨论。)这之后,假设马上有一个acquire(10)的请求到达,我们用剩下所有的7个许可数来应对这个请求,还有3个许可数,我们需要通过刷新限流器新提供。

我们也已经知道花费在3个新的许可上的时间:如果速率是1令牌/sec,那么我们将花费3秒。但是使用7个已存许可又是什么意思呢?正如上面所说,这里没有固定答案。如果我们主要兴趣在应对资源利用不足上,我们想要存储许可释放比刷新许可快。因为利用不足=尚有未被占用的资源。如果我们主要兴趣点在应对溢出,那么存储的许可数应该释放的比刷新的慢。因此,我们想要一个(在每种情形都不同的)方法来解释storePermits,以此压制时间。storedPermitsToWaitTime(double storedPermits, double permitsToTake) 在其中扮演重要角色。底层的模型是一个持续变化的函数映射storedPermits(从0到maxStoredPermits)到1/rate(时间间隔)。storedPermits在衡量未使用时间上是必不可少的。我们使用未利用时间换取许可数(permits)。速率是permits/time,因此1/rate=time/permits.因此"1/rate"(time/permits)乘以permits等于给定时间。对于指定数量的请求许可来说,这个积分函数(storedPermitsToWaitTime()计算)与持续请求的最小时间间隔相关。

这里有个storePermitsToWaitTime的例子。如果storedPermits=10,我们想要3个permits,我们从storedPermits中去获取,减少他们到7个,并且计算压制时间作为一个调用storedPermitsToWaitTime(storedPermits=10,permitsToTake=3),这将会评估这个函数积分从7到10.

使用积分保证acquire(3)效果等同于3次acquire(1),或一次acquire(2)+一次acquire(1)。因为积分在[7.0,10.0]等同于在[7.0,8.0],[8.0,9.0],[9.0,10.0]等等。无论这个函数是什么。这使得我们可以正确处理不同权重(permits)的请求时,不论真正的函数是什么。所以我们可以自由调整。(唯一的条件显然是我们能够计算出他的间隔时间)。

注意,对于这个函数,我们选择水平线,高度为1/QPS,因此这个函数的影响是不存在的。对于storedPermits将会完全等同于刷新一个新的(1/QPS是他的代价)。我们将会在之后使用这个小诀窍。

如果我们采用一个低于这条水平线的函数,这意味着我们减少了这个函数的区域,也就是时间。因此限流器就会在一段时间的利用不足后变快。另一方面,如果我们使用一个高于此水平线的函数,这就意味着代表时间的区域增大,因此storedPermits将会比刷新一个新许可更耗时,相应地,限流器就会在一段时间的利用不足后变慢。

最后,考虑一个限流器以1permit/sec速率,当前未被使用,有一个acquire(100)的请求到来。等待100sec才开始执行任务将会是很愚蠢的行为。为什么不作任何事情只等待呢?一个更好的方法是立刻允许请求(正如它是acquire(1)的请求一样),并且按需要延缓此后的需求。在这个版本,我们允许立刻开始执行任务,并且延缓100秒之后的请求,因此我们允许工作执行而不是让它空闲等待,即让后面的等,此次不等,即压制时间。

这里有很重要的因果关系。这意味着限流器不会记住最后请求的时刻,但它会记住下一个请求(预计)时间。这也使我们能够立即知道(见tryAcquire(timeout))指定时间timeout是否足够将我们带到下一个调度的时间点,因为我们总维持那个。并且我们所指的“未被使用的限流器”也被这所定义:但我们观察“下一个请求的期待到达时间”在过去,那么(now-past)的时间差将被看作RateLimiter未被正式使用时间。这也是被我们解释为storedPermits的时间。(我们用空闲的时间产生的许可数来增加storedPermits)。所以,如果速率=1许可/sec,并且请求在之前那个请求后一秒后准时到达,那么storedPermits将永远不会增加。我们只会在当晚于预期一秒时间的到达,才会增加它。 

记录 1秒内的微秒数/permitsPerSencond = 时间间隔interval,每一个interval可获得一个令牌

属性

//当前桶里已有的令牌数。之前一个时间段没用掉的令牌存着留着将来用。
    double storedPermits;

   //最大可保存的令牌数,即令牌桶的容量
    double maxPermits;

    //两个请求之间的静态间隔,比如每秒限速5个请求,静态间隔就是200ms
    double stableIntervalMicros;

    //下一个令牌可用的时间。
    //一旦给一个请求发令牌,这个时间就会被推后,请求数量越大,这个时间被推后的越远。
    //这个时间也可能是过去的某个值。
    //以上面每秒5个请求为例,假设第一个请求在T时刻拿到令牌,则第二个能拿到令牌的时间T+200ms
    //但是这时请求可能没有到来(比如说没有用户点击或者没有任务),

    private long nextFreeTicketMicros = 0L;
           

创建

private SmoothRateLimiter(SleepingStopwatch stopwatch) {
    super(stopwatch);
  }
           

流量

@Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    //1秒除以每秒多少个令牌,即为静态间隔
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    //设置流量
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

  abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);

  @Override
  final double doGetRate() {
    return SECONDS.toMicros(1L) / stableIntervalMicros;
  }

  @Override
  final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
  }
           

令牌产生和消费

令牌产生由resync负责。

//根据当前时间更新桶里的令牌数和下一个可用令牌的时间,
//需要注意:resync只关注产生的逻辑,不关注消耗的逻辑,而且只产生过去某时刻到现在的
 
// 因为是根据当前时间计算的过去那段时间应产生的令牌,所以本次计算完之后,
// 桶里的令牌数不一定够本次acquire的数量,那就只能把nextFreeTicketMicros再推后了
// 但是本方法只负责计算到现在
void resync(long nowMicros) {
    // nextFreeTicketMicros 小于当前时间,表示有令牌没有使用,则不产生了。
    if (nowMicros > nextFreeTicketMicros) {
      //新令牌数=过去的时间/每个令牌的间隔,在SmoothBursty中coolDownIntervalMicros返回的是间隔
      //coolDownIntervalMicros,用于当前时刻的间隔,子类可覆盖。
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      
      // 控制桶的容量,多余的就丢掉,
      storedPermits = min(maxPermits, storedPermits + newPermits);
      
      // 对过去一段时间产生了令牌之后,下一个可用令牌时间就变成现在了
      // 再次明确:本方法只关注产生逻辑,所以nextFreeTicketMicros回被更新到当前
      nextFreeTicketMicros = nowMicros;
    }

 }


  abstract double coolDownIntervalMicros();
           
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    //产生上次到现在令牌并更新时间。
    resync(nowMicros);
    
    //返回的等待时间。
    long returnValue = nextFreeTicketMicros;
    

    //可消费的令牌数(请求数,当前数的最小值)。
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    
    //需要新产生的令牌数
    double freshPermits = requiredPermits - storedPermitsToSpend;
    
    //freshPermits * stableIntervalMicros 产生这几个新令牌需要的等待时间
    //等待时长 =  新令牌等待时长 + storedPermitsToWaitTime 时长
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
    
    //下一次能取得令牌的时间,赋值之前nextFreeTicketMicros就是当前时刻,加上等待时间就是下一个时刻。
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    
    // 本次获取令牌之后,更新桶里最后剩余的令牌(减去被消费掉的数量)
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}
           

SmoothBursty

SmoothBursty允许Burst到一定的流量。

属性

static final class SmoothBursty extends SmoothRateLimiter {
    //最长令牌保存时间(秒)。
    final double maxBurstSeconds;

}
           

创建

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }
           

流量

@Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // 溢出判断
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
           

SmoothWarmingUp

 SmoothWarmingUp更平滑一些。有一个warmup period,为thresholdPermits到maxPermits的这段范围

  • 从0->thresholdPermits,生成一个令牌的时间:stableIntervalMicros
  • 从thresholdPermits -> maxPermits ,生成一个令牌的时间:stableIntervalMicros + permits * slope;
* <pre>
   *          ^ throttling
   *          |
   *    cold  +                  /
   * interval |                 /.
   *          |                / .
   *          |               /  .   ← "warmup period" is the area of the trapezoid between
   *          |              /   .     thresholdPermits and maxPermits
   *          |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM .
   * interval |          .   UP  .
   *          |          . PERIOD.
   *          |          .       .
   *        0 +----------+-------+--------------→ storedPermits
   *          0 thresholdPermits maxPermits
   * </pre>
           

属性

static final class SmoothWarmingUp extends SmoothRateLimiter {
//预热期(毫秒)
    private final long warmupPeriodMicros;
//
    private double slope;
//  阈值
    private double thresholdPermits;
//  冷启动因子
    private double coldFactor;

  }
           

漏桶算法

漏桶算法就是我们常采用的队列限流方法,通过在请求和实际的处理服务中间加一层队列,限制高并发请求对系统造成的压力,系统将按照自己的处理能力慢慢处理请求,这种限流方法的适用于接口处理实时性要求不高的业务场景,且可靠性高,漏桶算法实现也比较简单。