SCG中預設使用了Redis來實作令牌桶限流,通過Java代碼調用lua腳本實作。
RequestRateLimiterGatewayFilterFactory
RequestRateLimiterGatewayFilterFactory
是SCG的限流GatewayFilter的工廠
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.rateLimiter,
defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder
.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY)
.flatMap(key -> {
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange
.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
//重點,調用具體的限流實作isAllowed方法判斷目前請求是否允許被執行,預設為RedisRateLimiter
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders()
.entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(),
header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
});
});
}
RedisRateLimiter
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
Config routeConfig = loadConfiguration(routeId);
/**
* 從字面意思了解為補充率,也就是令牌的補充率,但是SCG的注釋為每秒允許使用者的請求數
* 可以想象,一個桶往外流水,一個人用勺子取水,我們想控制這個人取水的速度,是不是可以通過控制往桶裡面加水的速度進而控制取水的速度
* 也就是通過流入可以控制流出,這裡就是這個意思
*/
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
//令牌桶的容量
// How much bursting do you want to allow?
int burstCapacity = routeConfig.getBurstCapacity();
//每次請求消耗的令牌數量
// How many tokens are requested per request?
int requestedTokens = routeConfig.getRequestedTokens();
try {
List<String> keys = getKeys(id);
// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "",
burstCapacity + "", Instant.now().getEpochSecond() + "",
requestedTokens + "");
// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
//調用lua腳本
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
scriptArgs);
// .log("redisratelimiter", Level.FINER);
return flux.onErrorResume(throwable -> {
if (log.isDebugEnabled()) {
log.debug("Error calling rate limiter lua", throwable);
}
return Flux.just(Arrays.asList(1L, -1L));
}).reduce(new ArrayList<Long>(), (longs, l) -> {
//将lua腳本傳回的兩個值放入list中
longs.addAll(l);
return longs;
}).map(results -> {
//判斷lua腳本傳回的是否是1,如果是1,表示允許請求
boolean allowed = results.get(0) == 1L;
//擷取到lua腳本傳回的剩餘令牌數
Long tokensLeft = results.get(1);
//拼裝傳回Response
Response response = new Response(allowed,
getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
//當Redis發生異常時的操作
/*
* We don't want a hard dependency on Redis to allow traffic. Make sure to set
* an alert so you know if this is happening too much. Stripe's observed
* failure rate is 0.01%.
*/
log.error("Error determining if user allowed from redis", e);
}
//最後的兜底,當上邊異常時,都允許請求通過
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}
request_rate_limiter.lua
SCG的Redis lua腳本
local tokens_key = KEYS[1] --request_rate_limiter.{'id'}.tokens
local timestamp_key = KEYS[2] --request_rate_limiter.{'id'}.timestamp
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1]) -- 允許使用者每秒執行的請求數(填充令牌的速度) 20
local capacity = tonumber(ARGV[2]) -- 令牌桶的容量 50
--第一次請求1618841535
--第二次請求1618841537
local now = tonumber(ARGV[3]) -- 目前時間戳
local requested = tonumber(ARGV[4]) -- 每個請求消耗的令牌數 2
local fill_time = capacity/rate --容量/填充令牌的速度 --- 填充的次數
local ttl = math.floor(fill_time*2) -- key的過期時間
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
--第一次請求 last_tokens = 50
--第二次請求 last_tokens = 48
local last_tokens = tonumber(redis.call("get", tokens_key)) --擷取最後一次請求後剩餘的令牌數量
if last_tokens == nil then --如果剩餘的令牌為空,則初始化為容量
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
--第一次請求 last_refreshed = 0
--第二次請求 last_refreshed = 1618841535
local last_refreshed = tonumber(redis.call("get", timestamp_key)) -- 擷取最後一次請求的時間
if last_refreshed == nil then -- 如果最後一次請求的時間為空,則設定為0
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
--第一次請求 delta=(0 , 1618841535 - 0 ) = 1618841535
--第二次請求 delta = (0 , 1618841537 - 1618841535) = 2
local delta = math.max(0, now-last_refreshed) --目前時間減去最後重新整理時間 ----- 兩次請求的時間差
--第一次請求 filled_tokens = math.min(50 , 50 + (1618841535 * 20)) = 50 --- 總容量
--第二次請求 filled_tokens = math.min(50 , 48 + (2 * 20)) = 50,這裡我們可以發現,當兩次請求的間隔大于1s時,都會将filled_tokens重新設定為桶的最大容量
--當兩次請求的間隔小于1s時,math.min函數才會取","後邊的值,當然,小于1s時,delta就是0了,是以filled_tokens就是上次剩餘的令牌數量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) -- 計算
-- 第一次請求 50 >= 2 允許
local allowed = filled_tokens >= requested --當filled_tokens >= 一次請求消耗的令牌數時,則允許
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
--如果允許則扣除令牌
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
if ttl > 0 then
--設定剩餘令牌數
redis.call("setex", tokens_key, ttl, new_tokens)
--設定本次請求時間戳
redis.call("setex", timestamp_key, ttl, now)
end
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }