天天看點

pulsar 實作的一種 RateLimiter

pulsar 實作了一個 RateLimiter 來限制 dispatch 的速率。

大體思路是:初始有 n 個令牌,當令牌被申請完了後,其他人就無法獲得令牌了,每隔一段時間 t 會清零已配置設定的令牌數。

是以,記住這 2 個參數即可。

通過一個測試用例,觀察 RateLimiter 的用法。

// org.apache.pulsar.common.util.RateLimiterTest#testMultipleAcquire
public void testMultipleAcquire() throws Exception {
    // 每過 1000ms 重置令牌數
    final long rateTimeMSec = 1000;
    // 令牌總數為 100
    final int permits = 100;
    final int acquirePermist = 50;
    RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
    long start = System.currentTimeMillis();
    for (int i = 0; i < permits / acquirePermist; i++) {
        // 1 次擷取 50 個令牌,2 次申請完 100 個令牌
        rate.acquire(acquirePermist);
    }
    long end = System.currentTimeMillis();
    assertTrue((end - start) < rateTimeMSec);
    // 時間還不到 1000ms,令牌沒有重置,則可用令牌仍為 0
    assertEquals(rate.getAvailablePermits(), 0);
    rate.close();
}      

接下來看下實作:

org.apache.pulsar.common.util.RateLimiter
  // 令牌總數
  private long permits;
  // 目前已配置設定的令牌數
  private long acquiredPermits;
  
  // 清理已配置設定令牌數的定時任務 1. 線程池 2. 定時任務 3. 間隔時間
  private final ScheduledExecutorService executorService;
  private ScheduledFuture<?> renewTask;
  private long rateTime;
  // 提供了一個接口用來修改令牌總數
  private Supplier<Long> permitUpdater;      

申請 acquirePermit 個令牌,自旋模式

public synchronized void acquire(long acquirePermit) throws InterruptedException {
    checkArgument(!isClosed(), "Rate limiter is already shutdown");
    checkArgument(acquirePermit <= this.permits,
            "acquiring permits must be less or equal than initialized rate =" + this.permits);

    // 如果還沒建立清理定時任務,則建立它
    if (renewTask == null) {
        renewTask = createTask();
    }

    boolean canAcquire = false;
    do {
        // 如果已配置設定令牌數小于總令牌數,可以配置設定
        canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
        if (!canAcquire) {
            // 阻塞目前線程
            wait();
        } else {
            // 增加已配置設定令牌數
            acquiredPermits += acquirePermit;
        }
    } while (!canAcquire);
}      

申請 acquirePermit 個令牌,快速失敗模式

public synchronized boolean tryAcquire(long acquirePermit) {
    checkArgument(!isClosed(), "Rate limiter is already shutdown");
    // lazy init and start task only once application start using it
    if (renewTask == null) {
        renewTask = createTask();
    }

    // acquired-permits can't be larger than the rate
    if (acquirePermit > this.permits) {
        acquiredPermits = this.permits;
        return false;
    }
    // 這裡并沒有嚴格限制,無所謂,沒必要太精确
    boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
    if (canAcquire) {
        acquiredPermits += acquirePermit;
    }
    return canAcquire;
}      
protected ScheduledFuture<?> createTask() {
    return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit);
}      
synchronized void renew() {
    // 直接重置為 0
    acquiredPermits = 0;
    // 如果提供了這個對象,則用它的值來設定總令牌數
    if (permitUpdater != null) {
        long newPermitRate = permitUpdater.get();
        if (newPermitRate > 0) {
            setRate(newPermitRate);
        }
    }
    // 喚醒所有等待目前對象的線程
    notifyAll();
}