原文:https://juejin.im/entry/5bd491c85188255ac2629bef?utm_source=coffeephp.com
在分布式領域,我們難免會遇到并發量突增,對後端服務造成高壓力,嚴重甚至會導緻系統當機。為避免這種問題,我們通常會為接口添加限流、降級、熔斷等能力,進而使接口更為健壯。Java領域常見的開源元件有Netflix的hystrix,阿裡系開源的sentinel等,都是蠻不錯的限流熔斷架構。
今天我們就基于Redis元件的特性,實作一個分布式限流元件,名字就定為shield-ratelimiter。
原理
首先解釋下為何采用Redis作為限流元件的核心。
通俗地講,假設一個使用者(用IP判斷)每秒通路某服務接口的次數不能超過10次,那麼我們可以在Redis中建立一個鍵,并設定鍵的過期時間為60秒。
當一個使用者對此服務接口發起一次通路就把鍵值加1,在機關時間(此處為1s)内當鍵值增加到10的時候,就禁止通路服務接口。PS:在某種場景中添加通路時間間隔還是很有必要的。我們本次不考慮間隔時間,隻關注機關時間内的通路次數。
需求
原理已經講過了,說下需求。
- 基于Redis的incr及過期機制開發
- 調用友善,聲明式
- Spring支援
基于上述需求,我們決定基于注解方式進行核心功能開發,基于Spring-boot-starter作為基礎環境,進而能夠很好的适配Spring環境。
另外,在本次開發中,我們不通過簡單的調用Redis的java類庫API實作對Redis的incr操作。
原因在于,我們要保證整個限流的操作是原子性的,如果用Java代碼去做操作及判斷,會有并發問題。這裡我決定采用Lua腳本進行核心邏輯的定義。
為何使用Lua
在正式開發前,我簡單介紹下對Redis的操作中,為何推薦使用Lua腳本。
- 減少網絡開銷: 不使用 Lua 的代碼需要向 Redis 發送多次請求, 而腳本隻需一次即可, 減少網絡傳輸;
- 原子操作: Redis 将整個腳本作為一個原子執行, 無需擔心并發, 也就無需事務;
- 複用: 腳本會永久儲存 Redis 中, 其他用戶端可繼續使用.
Redis添加了對Lua的支援,能夠很好的滿足原子性、事務性的支援,讓我們免去了很多的異常邏輯處理。對于Lua的文法不是本文的主要内容,感興趣的可以自行查找資料。
正式開發
到這裡,我們正式開始手寫限流元件的程序。
1. 工程定義
項目基于maven建構,主要依賴Spring-boot-starter,我們主要在springboot上進行開發,是以自定義的開發包可以直接依賴下面這個坐标,友善進行包管理。版本号自行選擇穩定版。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.4.2.RELEASE</version>
</dependency>
2. Redis整合
由于我們是基于Redis進行的限流操作,是以需要整合Redis的類庫,上面已經講到,我們是基于Springboot進行的開發,是以這裡可以直接整合RedisTemplate。
2.1 坐标引入
這裡我們引入spring-boot-starter-redis的依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.4.2.RELEASE</version>
</dependency>
2.2 注入CacheManager及RedisTemplate
建立一個Redis的配置類,命名為RedisCacheConfig,使用javaconfig形式注入CacheManager及RedisTemplate。為了操作友善,我們采用了Jackson進行序列化。代碼如下
@Configuration
@EnableCaching
public class RedisCacheConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class);
@Bean
public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) {
CacheManager cacheManager = new RedisCacheManager(redisTemplate);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Springboot Redis cacheManager 加載完成");
}
return cacheManager;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
//使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(預設使用JDK的序列化方式)
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(mapper);
template.setValueSerializer(serializer);
//使用StringRedisSerializer來序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
LOGGER.info("Springboot RedisTemplate 加載完成");
return template;
}
}
注意 要使用 @Configuration 标注此類為一個配置類,當然你可以使用 @Component, 但是不推薦,原因在于 @Component 注解雖然也可以當作配置類,但是并不會為其生成CGLIB代理Class,而使用@Configuration,CGLIB會為其生成代理類,進行性能的提升。
2.3 調用方application.propertie需要增加Redis配置
我們的包開發完畢之後,調用方的application.properties需要進行相關配置如下:
#單機模式redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.pool.maxActive=8
spring.redis.pool.maxWait=-1
spring.redis.pool.maxIdle=8
spring.redis.pool.minIdle=0
spring.redis.timeout=10000
spring.redis.password=
如果有密碼的話,配置password即可。
這裡為單機配置,如果需要支援哨兵叢集,則配置如下,Java代碼不需要改動,隻需要變動配置即可。注意 兩種配置不能共存!
#哨兵叢集模式
# database name
spring.redis.database=0
# server password 密碼,如果沒有設定可不配
spring.redis.password=
# pool settings ...池配置
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
# name of Redis server 哨兵監聽的Redis server的名稱
spring.redis.sentinel.master=mymaster
# comma-separated list of host:port pairs 哨兵的配置清單
spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579
3. 定義注解
為了調用友善,我們定義一個名為RateLimiter 的注解,内容如下
/**
* @author snowalker
* @version 1.0
* @date 2018/10/27 1:25
* @className RateLimiter
* @desc 限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
/**
* 限流key
* @return
*/
String key() default "rate:limiter";
/**
* 機關時間限制通過請求數
* @return
*/
long limit() default 10;
/**
* 過期時間,機關秒
* @return
*/
long expire() default 1;
}
該注解明确隻用于方法,主要有三個屬性。
- key–表示限流子產品名,指定該值用于區分不同應用,不同場景,推薦格式為:應用名:子產品名:ip:接口名:方法名
- limit–表示機關時間允許通過的請求數
- expire–incr的值的過期時間,業務中表示限流的機關時間。
4. 解析注解
定義好注解後,需要開發注解使用的切面,這裡我們直接使用aspectj進行切面的開發。先看代碼@Aspect @Component public class RateLimterHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RateLimterHandler.class); @Autowired RedisTemplate redisTemplate; private DefaultRedisScript<Long> getRedisScript; @PostConstruct public void init() { getRedisScript = new DefaultRedisScript<>(); getRedisScript.setResultType(Long.class); getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimter.lua"))); LOGGER.info("RateLimterHandler[分布式限流處理器]腳本加載完成"); }
這裡是注入了RedisTemplate,使用其API進行Lua腳本的調用。
init() 方法在應用啟動時會初始化DefaultRedisScript,并加載Lua腳本,友善進行調用。
PS: Lua腳本放置在classpath下,通過ClassPathResource進行加載。
@Pointcut("@annotation(com.snowalker.shield.ratelimiter.core.annotation.RateLimiter)")
public void rateLimiter() {}
這裡我們定義了一個切點,表示隻要注解了 @RateLimiter 的方法,均可以觸發限流操作。
@Around("@annotation(rateLimiter)")
public Object around(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLimiter) throws Throwable {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分布式限流處理器]開始執行限流操作");
}
Signature signature = proceedingJoinPoint.getSignature();
if (!(signature instanceof MethodSignature)) {
throw new IllegalArgumentException("the Annotation @RateLimter must used on method!");
}
/**
* 擷取注解參數
*/
// 限流子產品key
String limitKey = rateLimiter.key();
Preconditions.checkNotNull(limitKey);
// 限流門檻值
long limitTimes = rateLimiter.limit();
// 限流逾時時間
long expireTime = rateLimiter.expire();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分布式限流處理器]參數值為-limitTimes={},limitTimeout={}", limitTimes, expireTime);
}
/**
* 執行Lua腳本
*/
List<String> keyList = new ArrayList();
// 設定key值為注解中的值
keyList.add(limitKey);
/**
* 調用腳本并執行
*/
Long result = (Long) redisTemplate.execute(getRedisScript, keyList, expireTime, limitTimes);
if (result == 0) {
String msg = "由于超過機關時間=" + expireTime + "-允許的請求次數=" + limitTimes + "[觸發限流]";
LOGGER.debug(msg);
return "false";
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分布式限流處理器]限流執行結果-result={},請求[正常]響應", result);
}
return proceedingJoinPoint.proceed();
}
}
這段代碼的邏輯為,擷取 @RateLimiter 注解配置的屬性:key、limit、expire,并通過 redisTemplate.execute(RedisScript script, List keys, Object… args) 方法傳遞給Lua腳本進行限流相關操作,邏輯很清晰。
這裡我們定義如果腳本傳回狀态為0則為觸發限流,1表示正常請求。
5. Lua腳本
這裡是我們整個限流操作的核心,通過執行一個Lua腳本進行限流的操作。腳本内容如下
--擷取KEY
local key1 = KEYS[1]
local val = redis.call('incr', key1)
local ttl = redis.call('ttl', key1)
--擷取ARGV内的參數并列印
local expire = ARGV[1]
local times = ARGV[2]
redis.log(redis.LOG_DEBUG,tostring(times))
redis.log(redis.LOG_DEBUG,tostring(expire))
redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val);
if val == 1 then
redis.call('expire', key1, tonumber(expire))
else
if ttl == -1 then
redis.call('expire', key1, tonumber(expire))
end
end
if val > tonumber(times) then
return 0
end
return 1
邏輯很通俗,我簡單介紹下。
- 首先腳本擷取Java代碼中傳遞而來的要限流的子產品的key,不同的子產品key值一定不能相同,否則會覆寫!
- redis.call(‘incr’, key1)對傳入的key做incr操作,如果key首次生成,設定逾時時間ARGV[1];(初始值為1)
- ttl是為防止某些key在未設定逾時時間并長時間已經存在的情況下做的保護的判斷;
- 每次請求都會做+1操作,當限流的值val大于我們注解的門檻值,則傳回0表示已經超過請求限制,觸發限流。否則為正常請求。
當過期後,又是新的一輪循環,整個過程是一個原子性的操作,能夠保證機關時間不會超過我們預設的請求門檻值。
到這裡我們便可以在項目中進行測試。
測試
demo位址
這裡我貼一下核心代碼,我們定義一個接口,并注解 @RateLimiter(key = “ratedemo:1.0.0”, limit = 5, expire = 100) 表示子產品ratedemo:sendPayment:1.0.0
在100s内允許通過5個請求,這裡的參數設定是為了友善看結果。實際中,我們通常會設定1s内允許通過的次數。
@Controller
public class TestController {
private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);
@ResponseBody
@RequestMapping("ratelimiter")
@RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100)
public String sendPayment(HttpServletRequest request) throws Exception {
return "正常請求";
}
}
我們通過RestClient請求接口,日志傳回如下:
2018-10-28 00:00:00.602 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:00.688 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]限流執行結果-result=1,請求[正常]響應
2018-10-28 00:00:00.860 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:01.183 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:01.520 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]限流執行結果-result=1,請求[正常]響應
2018-10-28 00:00:01.521 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]限流執行結果-result=1,請求[正常]響應
2018-10-28 00:00:01.557 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:01.558 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]限流執行結果-result=1,請求[正常]響應
2018-10-28 00:00:01.774 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:02.111 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始
2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]限流執行結果-result=1,請求[正常]響應
2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler :
由于超過機關時間=100-允許的請求次數=5[觸發限流]
2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]參數值為-limitTimes=5,limitTimeout=100
2018-10-28 00:00:02.278 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler :
由于超過機關時間=100-允許的請求次數=5[觸發限流]
2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]參數值為-limitTimes=5,limitTimeout=100
2018-10-28 00:00:02.446 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler :
由于超過機關時間=100-允許的請求次數=5[觸發限流]
2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]開始執行限流操作
2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :
RateLimterHandler[分布式限流處理器]參數值為-limitTimes=5,limitTimeout=100
2018-10-28 00:00:02.629 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :
由于超過機關時間=100-允許的請求次數=5[觸發限流]
根據日志能夠看到,正常請求5次後,傳回限流觸發,說明我們的邏輯生效,對前端而言也是可以看到false标記,表明我們的Lua腳本限流邏輯是正确的,這裡具體傳回什麼标記需要調用方進行明确的定義。
總結
我們通過Redis的incr及expire功能特性,開發定義了一套基于注解的分布式限流操作,核心邏輯基于Lua保證了原子性。達到了很好的限流的目的,生産上,可以基于該特點進行定制自己的限流元件,當然你可以參考本文的代碼,相信你寫的一定比我的demo更好!
代碼詳細位址:https://github.com/shmll/shield-ratelimter
看完打開支付寶掃一掃領個紅包吧!
