天天看點

Spring Cloud限流詳解(内含源碼)

原文: http://www.itmuch.com/spring-cloud-sum/spring-cloud-ratelimit/

轉載聲明:本部落格由周立創作,采用CC BY 3.0 CN許可協定。可自由轉載、引用,但需署名作者且注明文章出處。如轉載至微信公衆号,請在文末添加作者公衆号二維碼。

在高并發的應用中,限流往往是一個繞不開的話題。本文詳細探讨在Spring Cloud中如何實作限流。

Zuul

上實作限流是個不錯的選擇,隻需要編寫一個過濾器就可以了,關鍵在于如何實作限流的算法。常見的限流算法有漏桶算法以及令牌桶算法。這個可參考 https://www.cnblogs.com/LBSer/p/4083131.html ,寫得通俗易懂,你值得擁有,我就不拽文了。

Google Guava

為我們提供了限流工具類

RateLimiter

,于是乎,我們可以撸代碼了。

代碼示例

@Component
public class RateLimitZuulFilter extends ZuulFilter {

    private final RateLimiter rateLimiter = RateLimiter.create(1000.0);

    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }

    @Override
    public boolean shouldFilter() {
        // 這裡可以考慮弄個限流開啟的開關,開啟限流傳回true,關閉限流傳回false,你懂的。
        return true;
    }

    @Override
    public Object run() {
        try {
            RequestContext currentContext = RequestContext.getCurrentContext();
            HttpServletResponse response = currentContext.getResponse();
            if (!rateLimiter.tryAcquire()) {
                HttpStatus httpStatus = HttpStatus.TOO_MANY_REQUESTS;

                response.setContentType(MediaType.TEXT_PLAIN_VALUE);
                response.setStatus(httpStatus.value());
                response.getWriter().append(httpStatus.getReasonPhrase());

                currentContext.setSendZuulResponse(false);

                throw new ZuulException(
                        httpStatus.getReasonPhrase(),
                        httpStatus.value(),
                        httpStatus.getReasonPhrase()
                );
            }
        } catch (Exception e) {
            ReflectionUtils.rethrowRuntimeException(e);
        }
        return null;
    }
}
           

如上,我們編寫了一個

pre

類型的過濾器。對Zuul過濾器有疑問的可參考我的部落格:

在過濾器中,我們使用

Guava RateLimiter

實作限流,如果已經達到最大流量,就抛異常。

分布式場景下的限流

以上單節點Zuul下的限流,但在生産中,我們往往會有多個Zuul執行個體。對于這種場景如何限流呢?我們可以借助Redis實作限流。

使用redis實作,存儲兩個key,一個用于計時,一個用于計數。請求每調用一次,計數器增加1,若在計時器時間内計數器未超過門檻值,則可以處理任務

if(!cacheDao.hasKey(TIME_KEY)) {
    cacheDao.putToValue(TIME_KEY, 0, 1, TimeUnit.SECONDS);
}       
if(cacheDao.hasKey(TIME_KEY) && cacheDao.incrBy(COUNTER_KEY, 1) > 400) {
    // 抛個異常什麼的
}
           

實作微服務級别的限流

一些場景下,我們可能還需要實作微服務粒度的限流。此時可以有兩種方案:

方式一:在微服務本身實作限流。

和在Zuul上實作限流類似,隻需編寫一個過濾器或者攔截器即可,比較簡單,不作贅述。個人不太喜歡這種方式,因為每個微服務都得編碼,感覺成本很高啊。

加班那麼多,作為程式猿的我們,應該學會偷懶,這樣才可能有時間孝順父母、抱老婆、逗兒子、遛狗養鳥、聊天打屁、追求人生信仰。好了不扯淡了,看方法二吧。

方法二:在Zuul上實作微服務粒度的限流。

在講解之前,我們不妨模拟兩個路由規則,兩種路由規則分别代表Zuul的兩種路由方式。

zuul:
  routes:
    microservice-provider-user: /user/**
    user2:
      url: http://localhost:8000/
      path: /user2/**
           

如配置所示,在這裡,我們定義了兩個路由規則,

microservice-provider-user

以及

user2

,其中

microservice-provider-user

這個路由規則使用到Ribbon + Hystrix,走的是

RibbonRoutingFilter

;而

user2

這個路由用不上Ribbon也用不上Hystrix,走的是

SipleRoutingFilter

。如果你搞不清楚這點,請參閱我的部落格:

搞清楚這點之後,我們就可以撸代碼了:

@Component
public class RateLimitZuulFilter extends ZuulFilter {

    private Map<String, RateLimiter> map = Maps.newConcurrentMap();

    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        // 這邊的order一定要大于org.springframework.cloud.netflix.zuul.filters.pre.PreDecorationFilter的order
        // 也就是要大于5
        // 否則,RequestContext.getCurrentContext()裡拿不到serviceId等資料。
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public boolean shouldFilter() {
        // 這裡可以考慮弄個限流開啟的開關,開啟限流傳回true,關閉限流傳回false,你懂的。
        return true;
    }

    @Override
    public Object run() {
        try {
            RequestContext context = RequestContext.getCurrentContext();
            HttpServletResponse response = context.getResponse();

            String key = null;
            // 對于service格式的路由,走RibbonRoutingFilter
            String serviceId = (String) context.get(SERVICE_ID_KEY);
            if (serviceId != null) {
                key = serviceId;
                map.putIfAbsent(serviceId, RateLimiter.create(1000.0));
            }
            // 如果壓根不走RibbonRoutingFilter,則認為是URL格式的路由
            else {
                // 對于URL格式的路由,走SimpleHostRoutingFilter
                URL routeHost = context.getRouteHost();
                if (routeHost != null) {
                    String url = routeHost.toString();
                    key = url;
                    map.putIfAbsent(url, RateLimiter.create(2000.0));
                }
            }
            RateLimiter rateLimiter = map.get(key);
            if (!rateLimiter.tryAcquire()) {
                HttpStatus httpStatus = HttpStatus.TOO_MANY_REQUESTS;

                response.setContentType(MediaType.TEXT_PLAIN_VALUE);
                response.setStatus(httpStatus.value());
                response.getWriter().append(httpStatus.getReasonPhrase());

                context.setSendZuulResponse(false);

                throw new ZuulException(
                        httpStatus.getReasonPhrase(),
                        httpStatus.value(),
                        httpStatus.getReasonPhrase()
                );
            }
        } catch (Exception e) {
            ReflectionUtils.rethrowRuntimeException(e);
        }
        return null;
    }
}
           

簡單講解一下這段代碼:

對于

microservice-provider-user

這個路由,我們可以用

context.get(SERVICE_ID_KEY);

擷取到serviceId,擷取出來就是

microservice-provider-user

而對于

user2

這個路由,我們使用

context.get(SERVICE_ID_KEY);

獲得是null,但是呢,可以用

context.getRouteHost()

獲得路由到的位址,擷取出來就是

http://localhost:8000/

。接下來的事情,你們懂的。

改進與提升

實際項目中,除以上實作的限流方式,還可能會:

一、在上文的基礎上,增加配置項,控制每個路由的限流名額,并實作動态重新整理,進而實作更加靈活的管理

二、基于CPU、記憶體、資料庫等壓力限流(感謝平安常浩智)提出。。

下面,筆者借助Spring Boot Actuator提供的

Metrics

能力進行實作基于記憶體壓力的限流——當可用記憶體低于某個門檻值就開啟限流,否則不開啟限流。

@Component
public class RateLimitZuulFilter extends ZuulFilter {
    @Autowired
    private SystemPublicMetrics systemPublicMetrics;
    @Override
    public boolean shouldFilter() {
        // 這裡可以考慮弄個限流開啟的開關,開啟限流傳回true,關閉限流傳回false,你懂的。
        Collection<Metric<?>> metrics = systemPublicMetrics.metrics();
        Optional<Metric<?>> freeMemoryMetric = metrics.stream()
                .filter(t -> "mem.free".equals(t.getName()))
                .findFirst();
        // 如果不存在這個名額,穩妥起見,傳回true,開啟限流
        if (!freeMemoryMetric.isPresent()) {
            return true;
        }
        long freeMemory = freeMemoryMetric.get()
                .getValue()
                .longValue();
        // 如果可用記憶體小于1000000KB,開啟流控
        return freeMemory < 1000000L;
    }
    // 省略其他方法
}
           

三、實作不同次元的限流,例如:

  • 對請求的目标URL進行限流(例如:某個URL每分鐘隻允許調用多少次)
  • 對用戶端的通路IP進行限流(例如:某個IP每分鐘隻允許請求多少次)
  • 對某些特定使用者或者使用者組進行限流(例如:非VIP使用者限制每分鐘隻允許調用100次某個API等)
  • 多元度混合的限流。此時,就需要實作一些限流規則的編排機制。與、或、非等關系。

參考文檔