天天看點

Java服務限流算法

一,概述

限流其實就是對服務的請求做一下QPS的控制,對于有些免登入的接口需要做一下通路的限制,不能無限制的去請求接口,不然的話會給伺服器造成很大的壓力,而且我們也希望一些接口做一下控制,控制請求量,這樣我們就可以做一個plugin對服務做限流操作,超出限流就傳回請求失敗,保證系統的穩定運作。主要概念就是門檻值以及拒絕政策,實際中需要用到限流的的比如,驗證碼,白名單,當然也有容器的限流,比如nginx就是比較常用的,可以做一下簡單的處理。

二,限流算法類型

幾種算法的使用,一些基礎代碼如下

限流代碼基礎類

@RequestLimiter

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Order(Ordered.HIGHEST_PRECEDENCE)
public @interface RequestLimiter {
    /**
     * 限流類型 ,具體見枚舉類 RequestLimitType
     */
    RequestLimitType type() default RequestLimitType.TOKEN;

    /**
     * 限流通路數
     */
    int limitCount() default 100;

    /**
     * 限流時間段
     */
    long time() default 60;

    /**
     * 限流時間段 時間機關
     */
    TimeUnit unit() default TimeUnit.SECONDS;

    /**
     * 漏出或者生成令牌時間間隔,機關 毫秒  (當type為TOKEN、LEAKY_BUCKET時生效)
     */
    long period() default 1000;

    /**
     * 每次生成令牌數或者漏出水滴數  (當type為TOKEN、LEAKY_BUCKET時生效)
     */
    int limitPeriodCount() default 10;

}           

LimitKeyConstant

public class LimitKeyConstant {
    /**
     * 令牌桶鍵名
     */
    public static final String QPS_TOKEN = "request:limit:qps:tokenBucket:";

    /**
     * 漏桶鍵名
     */
    public static final String QPS_LEAKY_BUCKET = "request:limit:qps:leakyBucket:";

    /**
     * 固定視窗鍵名
     */
    public static final String QPS_FIXED_WINDOW = "request:limit:qps:fixedWindow:";

    /**
     * 滑動視窗鍵名
     */
    public static final String QPS_SLIDE_WINDOW = "request:limit:qps:slideWindow:";
}           

RequestLimitType

public enum RequestLimitType {
    /**
     * 令牌算法
     */
    TOKEN(1, "令牌算法"),
    /**
     * 漏桶算法
     */
    LEAKY_BUCKET(2, "漏桶算法"),

    /**
     * 固定視窗
     */
    FIXED_WINDOW(3, "固定視窗"),
    /**
     * 滑動視窗
     */
    SLIDE_WINDOW(4, "滑動視窗");

    private Integer type;
    private String desc;

    RequestLimitType(Integer type, String desc) {
        this.type = type;
        this.desc = desc;
    }

    public Integer getType() {
        return type;
    }

    public String getDesc() {
        return desc;
    }
}           

RequestLimitAspect

@Slf4j
@Aspect
@Component
public class RequestLimitAspect {
    @Autowired
    private RequestLimitFactory factory;


    /**
     * 切入點
     */
    @Pointcut(value = "@annotation(com.common.limit.annotation.RequestLimiter)")
    public void requestLimit() {
        // 切入點方法
    }

    /**
     * 前置切點
     *
     * @param joinPoint 切入點
     */
    @Before("requestLimit()")
    public void doBefore(JoinPoint joinPoint) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = (HttpServletRequest) requestAttributes.resolveReference(RequestAttributes.REFERENCE_REQUEST);
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method targetMethod = methodSignature.getMethod();
        RequestLimiter limiter = targetMethod.getAnnotation(RequestLimiter.class);
        RequestLimitService service = factory.build(limiter.type());
        if (service != null) {
            RequestLimitParam param = new RequestLimitParam();
            param.setLimiter(limiter);
            param.setKey(signature.getName());
            if (service.checkRequestLimit(param)) {
                throw new LimitException("請求過于頻繁,請稍後再重試!");
            }
        }
    }
}           

RequestLimitFactory

@Slf4j
@Component
public class RequestLimitFactory implements ApplicationContextAware {
    private static final Map<RequestLimitType, RequestLimitService> MAP = new ConcurrentHashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        try {
            applicationContext.getBeansOfType(RequestLimitService.class).values().forEach(service -> MAP.put(service.getType(), service));
        } catch (Exception e) {
            log.error("初始化限流政策異常", e);
        }
    }

    /**
     * 建構service
     *
     * @param type 限流類型
     * @return 限流操作類
     **/
    public RequestLimitService build(RequestLimitType type) {
        return MAP.get(type);
    }
}           

RequestLimitService

public interface RequestLimitService {
    /**
     * 檢測是否限流
     *
     * @param param 限流參數
     * @return 是否被限流 true: 被限流  false: 未限流,放行
     */
    boolean checkRequestLimit(RequestLimitParam param);

    /**
     * 擷取目前限流類型
     *
     * @return 限流類型
     */
    RequestLimitType getType();

    /**
     * 擷取帶注解方法清單
     *
     * @param resourcePatternResolver 資源查詢
     * @param limitType               注解類型
     * @param scanPackage             掃描包路徑
     * @return 帶注解方法清單
     */
    default List<RequestLimitParam> getTokenLimitList(ResourcePatternResolver resourcePatternResolver, RequestLimitType limitType,
                                                      String scanPackage) {
        try {
            List<RequestLimitParam> list = new ArrayList<>();
            Resource[] resources = resourcePatternResolver.getResources(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + scanPackage +
                    "/**/*.class");
            MetadataReaderFactory metaReader = new CachingMetadataReaderFactory();
            for (Resource resource : resources) {
                MetadataReader reader = metaReader.getMetadataReader(resource);
                AnnotationMetadata annotationMetadata = reader.getAnnotationMetadata();

                Set<MethodMetadata> annotatedMethods = annotationMetadata.getAnnotatedMethods(RequestLimiter.class.getCanonicalName());
                annotatedMethods.forEach(methodMetadata -> {
                    RequestLimiter limiter = methodMetadata.getAnnotations().get(RequestLimiter.class).synthesize();
                    if (!limitType.equals(limiter.type())) {
                        return;
                    }
                    RequestLimitParam param = new RequestLimitParam();
                    param.setKey(methodMetadata.getMethodName());
                    param.setLimiter(limiter);
                    list.add(param);
                });
            }
            return list;
        } catch (IOException e) {
            return Collections.emptyList();
        }
    }
}           

固定時間視窗算法

圖解

Java服務限流算法

介紹

其實就是原子計數法,就是在固定時間内,允許請求量是多少,每次請求就在計數器上加1,設定計數器的過期時間,當計數器的門檻值達到限流配置的數時候,就執行拒絕政策,超過了時間,計數器就會重新歸0。

比如上圖中,會限制在每秒限制請求數為2,就是在每秒的時間會限制請求為2,但是會出現極端的情況,比如在前一個時間段中的前500ms和後500ms,請求數都是2,這樣就會看到在這一秒内是有4個請求的,這就是會出現請求的問題,當然這也是最簡單的限流算法。

代碼

@Slf4j
@Service
public class FixedWindowRateLimitServiceImpl implements RequestLimitService {

    @Autowired
    private RedisConnectionFactory factory;

    @Override
    public boolean checkRequestLimit(RequestLimitParam param) {
        String key = LimitKeyConstant.QPS_FIXED_WINDOW + param.getKey();
        RequestLimiter limiter = param.getLimiter();
        RedisAtomicInteger atomicCount = new RedisAtomicInteger(key, factory);
        int count = atomicCount.getAndIncrement();
        if (count == 0) {
            atomicCount.expire(limiter.time(), limiter.unit());
        }
        log.info("FixedWindowRateLimitServiceImpl time:{} unit:{} allow visit {} ", limiter.time(), limiter.unit(), limiter.limitCount());
        // 檢測是否到達限流值
        if (count >= limiter.limitCount()) {
            log.info("FixedWindowRateLimitServiceImpl limit controller key:{},time:{},name:{} to visit :{}", key, limiter.time(),
                    limiter.unit().name(), limiter.limitCount());
            return true;
        } else {
            return false;
        }
    }

    @Override
    public RequestLimitType getType() {
        return RequestLimitType.FIXED_WINDOW;
    }
}           

滑動時間視窗算法

圖解

Java服務限流算法

介紹

滑動時間視窗算法,其實就是對固定視窗的改進,知道了固定時間視窗會出現極端的情況,那滑動就在下一個臨界的時候,進行處理時間,其實就是在某一段時間進行處理時間。

比如上圖中每 500ms 滑動一次視窗,可以發現視窗滑動的間隔越短,時間視窗的臨界突變問題發生的機率也就越小,不過隻要有時間視窗的存在,還是有可能發生時間視窗的臨界突變問題。

這個是記錄下所有的請求時間點,新請求先判斷最近指定時間範圍内的請求數量是否超過指定門檻值,來确定是否達到限流,雖然沒有時間視窗突變的問題,限流比較準确,但是要記錄下每次請求的時間點,是以占用的記憶體較多。

代碼

@Slf4j
@Service
public class SlideWindowRateLimitServiceImpl implements RequestLimitService {
    @Autowired
    private RedisService redisService;

    @Override
    public boolean checkRequestLimit(RequestLimitParam param) {
        String key = LimitKeyConstant.QPS_SLIDE_WINDOW + param.getKey();
        RequestLimiter limiter = param.getLimiter();
        long current = System.currentTimeMillis();
        long duringTime = limiter.unit().toMillis(limiter.time());
        Long count = redisService.setCount(key, current - duringTime, current);
        // 清除有效期外的資料
        redisService.setRemoveRangeByScore(key, 0, current - duringTime - 1f);

        log.info("SlideWindowRateLimitServiceImpl time:{} unit:{} allow visit {}", limiter.time(), limiter.unit(), limiter.limitCount());
        // 檢測是否到達限流值
        if (count != null && count >= limiter.limitCount()) {
            log.info("SlideWindowRateLimitServiceImpl limit controller key:{},time:{},name:{} to visit :{}", key, limiter.time(),
                    limiter.unit().name(), limiter.limitCount());
            return true;
        } else {
            redisService.setAdd(key, UUID.randomUUID().toString(), current);
            return false;
        }
    }

    @Override
    public RequestLimitType getType() {
        return RequestLimitType.SLIDE_WINDOW;
    }
}           

漏桶算法

圖解

Java服務限流算法

介紹

此算法就是定義一個桶的容量,然後每次的請求過來都放在桶裡面,一直等到桶滿了以後就會執行拒絕政策,然後在桶不滿的情況下,會按照固定的速率去執行請求,其實就是按照固定流速去執行請求,保證機關時間内的執行請求量是固定的。

漏桶就是按照某一個請求的穩定的速度處理發來的請求數量,可以很好地保證系統的穩定運作,隻能平穩處理請求,這也是他的一個缺點,不能處理面對突然來的高的請求量,會導緻請求一直處于哎隊列等待中,不能面對高并發下的請求處理,比較保守的處理邏輯

代碼

@Slf4j
@Service
public class LeakyBucketRateLimitServiceImpl implements RequestLimitService {
    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Autowired
    private RedisService redisService;

    @Resource(name = Constants.THREAD_POOL_TASK_BEAN_NAME)
    private ThreadPoolTaskScheduler executor;

    @Value("${limit.scan.package}")
    private String scanPackage;

    @Override
    public boolean checkRequestLimit(RequestLimitParam requestLimitParam) {
        String key = LimitKeyConstant.QPS_LEAKY_BUCKET + requestLimitParam.getKey();
        Long size = redisService.listSize(key);
        if (size != null && size >= requestLimitParam.getLimiter().limitCount()) {
            log.info("LeakyBucketRateLimitServiceImpl limit key:{}", requestLimitParam.getKey());
            return true;
        } else {
            log.info("LeakyBucketRateLimitServiceImpl not full,limit key:{} ,current size:{},total size:{}", requestLimitParam.getKey(),
                    size, requestLimitParam.getLimiter().limitCount());
            redisService.listLeftPush(key, UUID.randomUUID().toString());
            return false;
        }
    }

    /**
     * 定數流出令牌
     */
    @PostConstruct
    public void init() {
        List<RequestLimitParam> list = this.getTokenLimitList(resourcePatternResolver, RequestLimitType.LEAKY_BUCKET, scanPackage);
        if (list.isEmpty()) {
            log.info("LeakyBucketRateLimitServiceImpl annotation is empty,end current task pool");
            return;
        }
        list.forEach(requestLimitDTO -> {
            executor.scheduleAtFixedRate(() -> {
                String key = LimitKeyConstant.QPS_LEAKY_BUCKET + requestLimitDTO.getKey();
                //截取List在start和end之間的元素處key清單
                redisService.listTrim(key, requestLimitDTO.getLimiter().limitPeriodCount(), -1);
                log.info("LeakyBucketRateLimitServiceImpl limit key:{},limitPeriodCount:{}", key,
                        requestLimitDTO.getLimiter().limitPeriodCount());
            }, requestLimitDTO.getLimiter().period());
        });
    }

    @Override
    public RequestLimitType getType() {
        return RequestLimitType.LEAKY_BUCKET;
    }
}           

令牌算法

圖解

Java服務限流算法

介紹

此算法也是對于漏桶的算法的改進,這個邏輯是桶裡面有一個門檻值,按照一定的速率進行在桶裡面存放令牌,直到令牌滿了,就不在新增令牌,然後請求每次來就去桶中擷取令牌,擷取到了,就進行處理,沒有令牌則執行拒絕政策

這個算法其實原理類似于生産者,消費者的模型,生産者按照一定的速度生成令牌,消費者可以消費資料,相對來說,這個是比較好用的

代碼

@Slf4j
@Service
public class TokenBucketRateLimitServiceImpl implements RequestLimitService {
    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Autowired
    private RedisService redisService;

    @Resource(name = Constants.THREAD_POOL_TASK_BEAN_NAME)
    private ThreadPoolTaskScheduler executor;

    @Value("${limit.scan.package}")
    private String scanPackage;


    @Override
    public boolean checkRequestLimit(RequestLimitParam param) {
        Object pop = redisService.listRightPop(LimitKeyConstant.QPS_TOKEN + param.getKey());
        RequestLimiter limiter = param.getLimiter();
        log.info("TokenBucketRateLimitServiceImpl limit period {} ms create {} total token,max token num is:{}", limiter.period(),
                limiter.limitPeriodCount(), limiter.limitCount());
        if (pop == null) {
            log.info("TokenBucketRateLimitServiceImpl limit is empty key:{}", param.getKey());
            return true;
        } else {
            return false;
        }
    }

    @PostConstruct
    public void init() {
        // 掃描出所有使用了自定義注解并且限流類型為令牌算法的方法資訊
        List<RequestLimitParam> list = this.getTokenLimitList(resourcePatternResolver, RequestLimitType.TOKEN, scanPackage);
        if (list.isEmpty()) {
            log.info("TokenBucketRateLimitServiceImpl annotation is empty,end current task pool");
            return;
        }
        // 每個接口方法更具注解配置資訊送出定時任務,生成令牌進令牌桶
        list.forEach(limit -> executor.scheduleAtFixedRate(() -> {
            String key = LimitKeyConstant.QPS_TOKEN + limit.getKey();
            Long tokenSize = redisService.listSize(key);
            int size = tokenSize == null ? 0 : tokenSize.intValue();
            if (size >= limit.getLimiter().limitCount()) {
                return;
            }
            // 判斷添加令牌數量
            int addSize = Math.min(limit.getLimiter().limitPeriodCount(), limit.getLimiter().limitCount() - size);
            List<String> addList = new ArrayList<>(addSize);
            for (int index = 0; index < addSize; index++) {
                addList.add(UUID.randomUUID().toString());
            }
            redisService.listLeftPushAll(key, addList);
        }, limit.getLimiter().period()));
    }

    @Override
    public RequestLimitType getType() {
        return RequestLimitType.TOKEN;
    }
}           

三,總結

其實這幾種算法,不能說哪一個是最好的,隻能說是要的業務邏輯是什麼樣的,選擇合适的限流算法來滿足自己的業務實作,沒有最優,隻有最合适。

繼續閱讀