天天看點

Spring Cloud Gateway源碼解析-12-令牌桶限流(RequestRateLimiterGatewayFilterFactory)RequestRateLimiterGatewayFilterFactoryRedisRateLimiterrequest_rate_limiter.lua

SCG中預設使用了Redis來實作令牌桶限流,通過Java代碼調用lua腳本實作。

RequestRateLimiterGatewayFilterFactory

RequestRateLimiterGatewayFilterFactory

是SCG的限流GatewayFilter的工廠

public GatewayFilter apply(Config config) {
		KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
		RateLimiter<Object> limiter = getOrDefault(config.rateLimiter,
				defaultRateLimiter);
		boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
		HttpStatusHolder emptyKeyStatus = HttpStatusHolder
				.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

		return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY)
				.flatMap(key -> {
					if (EMPTY_KEY.equals(key)) {
						if (denyEmpty) {
							setResponseStatus(exchange, emptyKeyStatus);
							return exchange.getResponse().setComplete();
						}
						return chain.filter(exchange);
					}
					String routeId = config.getRouteId();
					if (routeId == null) {
						Route route = exchange
								.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
						routeId = route.getId();
					}
					//重點,調用具體的限流實作isAllowed方法判斷目前請求是否允許被執行,預設為RedisRateLimiter
					return limiter.isAllowed(routeId, key).flatMap(response -> {

						for (Map.Entry<String, String> header : response.getHeaders()
								.entrySet()) {
							exchange.getResponse().getHeaders().add(header.getKey(),
									header.getValue());
						}

						if (response.isAllowed()) {
							return chain.filter(exchange);
						}

						setResponseStatus(exchange, config.getStatusCode());
						return exchange.getResponse().setComplete();
					});
				});
	}
           

RedisRateLimiter

public Mono<Response> isAllowed(String routeId, String id) {
		if (!this.initialized.get()) {
			throw new IllegalStateException("RedisRateLimiter is not initialized");
		}

		Config routeConfig = loadConfiguration(routeId);

		/**
		 * 從字面意思了解為補充率,也就是令牌的補充率,但是SCG的注釋為每秒允許使用者的請求數
		 * 可以想象,一個桶往外流水,一個人用勺子取水,我們想控制這個人取水的速度,是不是可以通過控制往桶裡面加水的速度進而控制取水的速度
		 * 也就是通過流入可以控制流出,這裡就是這個意思
		 */
		// How many requests per second do you want a user to be allowed to do?
		int replenishRate = routeConfig.getReplenishRate();
		//令牌桶的容量
		// How much bursting do you want to allow?
		int burstCapacity = routeConfig.getBurstCapacity();
		//每次請求消耗的令牌數量
		// How many tokens are requested per request?
		int requestedTokens = routeConfig.getRequestedTokens();

		try {
			List<String> keys = getKeys(id);

			// The arguments to the LUA script. time() returns unixtime in seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "",
					burstCapacity + "", Instant.now().getEpochSecond() + "",
					requestedTokens + "");
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			//調用lua腳本
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
					scriptArgs);
			// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> {
				if (log.isDebugEnabled()) {
					log.debug("Error calling rate limiter lua", throwable);
				}
				return Flux.just(Arrays.asList(1L, -1L));
			}).reduce(new ArrayList<Long>(), (longs, l) -> {
				//将lua腳本傳回的兩個值放入list中
				longs.addAll(l);
				return longs;
			}).map(results -> {
				//判斷lua腳本傳回的是否是1,如果是1,表示允許請求
				boolean allowed = results.get(0) == 1L;
				//擷取到lua腳本傳回的剩餘令牌數
				Long tokensLeft = results.get(1);
				//拼裝傳回Response
				Response response = new Response(allowed,
						getHeaders(routeConfig, tokensLeft));

				if (log.isDebugEnabled()) {
					log.debug("response: " + response);
				}
				return response;
			});
		}
		catch (Exception e) {
			//當Redis發生異常時的操作
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make sure to set
			 * an alert so you know if this is happening too much. Stripe's observed
			 * failure rate is 0.01%.
			 */
			log.error("Error determining if user allowed from redis", e);
		}
		//最後的兜底,當上邊異常時,都允許請求通過
		return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
	}
           

request_rate_limiter.lua

SCG的Redis lua腳本

local tokens_key = KEYS[1] --request_rate_limiter.{'id'}.tokens
local timestamp_key = KEYS[2] --request_rate_limiter.{'id'}.timestamp
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1]) -- 允許使用者每秒執行的請求數(填充令牌的速度) 20
local capacity = tonumber(ARGV[2]) -- 令牌桶的容量 50
--第一次請求1618841535
--第二次請求1618841537
local now = tonumber(ARGV[3]) -- 目前時間戳
local requested = tonumber(ARGV[4]) -- 每個請求消耗的令牌數 2

local fill_time = capacity/rate --容量/填充令牌的速度 ---  填充的次數
local ttl = math.floor(fill_time*2) -- key的過期時間

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

--第一次請求 last_tokens = 50
--第二次請求 last_tokens = 48
local last_tokens = tonumber(redis.call("get", tokens_key)) --擷取最後一次請求後剩餘的令牌數量
if last_tokens == nil then  --如果剩餘的令牌為空,則初始化為容量
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

--第一次請求 last_refreshed = 0
--第二次請求 last_refreshed = 1618841535
local last_refreshed = tonumber(redis.call("get", timestamp_key)) -- 擷取最後一次請求的時間
if last_refreshed == nil then -- 如果最後一次請求的時間為空,則設定為0
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

--第一次請求 delta=(0 , 1618841535 - 0 ) = 1618841535
--第二次請求 delta = (0 , 1618841537 - 1618841535) = 2
local delta = math.max(0, now-last_refreshed) --目前時間減去最後重新整理時間 -----  兩次請求的時間差
--第一次請求 filled_tokens = math.min(50 , 50 + (1618841535 * 20)) = 50 --- 總容量
--第二次請求 filled_tokens = math.min(50 , 48 + (2 * 20)) = 50,這裡我們可以發現,當兩次請求的間隔大于1s時,都會将filled_tokens重新設定為桶的最大容量
    --當兩次請求的間隔小于1s時,math.min函數才會取","後邊的值,當然,小于1s時,delta就是0了,是以filled_tokens就是上次剩餘的令牌數量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) -- 計算
-- 第一次請求 50 >= 2 允許
local allowed = filled_tokens >= requested --當filled_tokens >= 一次請求消耗的令牌數時,則允許

local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  --如果允許則扣除令牌
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
  --設定剩餘令牌數
  redis.call("setex", tokens_key, ttl, new_tokens)
  --設定本次請求時間戳
  redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

           

繼續閱讀