pulsar 實作了一個 RateLimiter 來限制 dispatch 的速率。
大體思路是:初始有 n 個令牌,當令牌被申請完了後,其他人就無法獲得令牌了,每隔一段時間 t 會清零已配置設定的令牌數。
是以,記住這 2 個參數即可。
通過一個測試用例,觀察 RateLimiter 的用法。
// org.apache.pulsar.common.util.RateLimiterTest#testMultipleAcquire
public void testMultipleAcquire() throws Exception {
// 每過 1000ms 重置令牌數
final long rateTimeMSec = 1000;
// 令牌總數為 100
final int permits = 100;
final int acquirePermist = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
for (int i = 0; i < permits / acquirePermist; i++) {
// 1 次擷取 50 個令牌,2 次申請完 100 個令牌
rate.acquire(acquirePermist);
}
long end = System.currentTimeMillis();
assertTrue((end - start) < rateTimeMSec);
// 時間還不到 1000ms,令牌沒有重置,則可用令牌仍為 0
assertEquals(rate.getAvailablePermits(), 0);
rate.close();
}
接下來看下實作:
org.apache.pulsar.common.util.RateLimiter
// 令牌總數
private long permits;
// 目前已配置設定的令牌數
private long acquiredPermits;
// 清理已配置設定令牌數的定時任務 1. 線程池 2. 定時任務 3. 間隔時間
private final ScheduledExecutorService executorService;
private ScheduledFuture<?> renewTask;
private long rateTime;
// 提供了一個接口用來修改令牌總數
private Supplier<Long> permitUpdater;
申請 acquirePermit 個令牌,自旋模式
public synchronized void acquire(long acquirePermit) throws InterruptedException {
checkArgument(!isClosed(), "Rate limiter is already shutdown");
checkArgument(acquirePermit <= this.permits,
"acquiring permits must be less or equal than initialized rate =" + this.permits);
// 如果還沒建立清理定時任務,則建立它
if (renewTask == null) {
renewTask = createTask();
}
boolean canAcquire = false;
do {
// 如果已配置設定令牌數小于總令牌數,可以配置設定
canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (!canAcquire) {
// 阻塞目前線程
wait();
} else {
// 增加已配置設定令牌數
acquiredPermits += acquirePermit;
}
} while (!canAcquire);
}
申請 acquirePermit 個令牌,快速失敗模式
public synchronized boolean tryAcquire(long acquirePermit) {
checkArgument(!isClosed(), "Rate limiter is already shutdown");
// lazy init and start task only once application start using it
if (renewTask == null) {
renewTask = createTask();
}
// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}
// 這裡并沒有嚴格限制,無所謂,沒必要太精确
boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (canAcquire) {
acquiredPermits += acquirePermit;
}
return canAcquire;
}
protected ScheduledFuture<?> createTask() {
return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit);
}
synchronized void renew() {
// 直接重置為 0
acquiredPermits = 0;
// 如果提供了這個對象,則用它的值來設定總令牌數
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
setRate(newPermitRate);
}
}
// 喚醒所有等待目前對象的線程
notifyAll();
}