-
業務背景介紹
對于web應用的限流,光看标題,似乎過于抽象,難以了解,那我們還是以具體的某一個應用場景來引入這個話題吧。
在日常生活中,我們肯定收到過不少不少這樣的短信,“雙11約嗎?,千款….”,“您有幸獲得唱讀卡,趕快戳連結…”。這種類型的短信是屬于推廣性質的短信。為什麼我要說這個呢?聽我慢慢道來。
一般而言,對于推廣營銷類短信,它們針對某一群體(譬如注冊會員)進行定點推送,有時這個群體的成員量比較大,譬如京東的會員,可以達到千萬級别。是以相應的,發送推廣短信的量也會增大。然而,要完成這些短信發送,我們是需要調用服務商的接口來完成的。倘若一次發送的量在200萬條,而我們的服務商接口每秒能處理的短信發送量有限,隻能達到200條每秒。那麼這個時候就會産生問題了,我們如何能控制好程式發送短信時的速度昵?于是限流這個功能就得加上了
-
生産環境背景
1、服務商接口所能提供的服務上限是400條/s
2、業務方調用短信發送接口的速度未知,QPS可能達到800/s,1200/s,或者更高
3、當服務商接口通路頻率超過400/s時,超過的量将拒絕服務,多出的資訊将會丢失
4、線上為多節點布置,但調用的是同一個服務商接口
-
需求分析
1、鑒于業務方對短信發送接口的調用頻率未知,而服務商的接口服務有上限,為保證服務的可用性,業務層需要對接口調用方的流量進行限制—–接口限流
-
需求設計
方案一、在提供給業務方的Controller層進行控制。
1、使用guava提供工具庫裡的RateLimiter類(内部采用令牌捅算法實作)進行限流
<!--核心代碼片段-->
private RateLimiter rateLimiter = RateLimiter.create(400);//400表示每秒允許處理的量是400
if(rateLimiter.tryAcquire()) {
//短信發送邏輯可以在此處
}
2、使用Java自帶delayqueue的延遲隊列實作(編碼過程相對麻煩,此處省略代碼)
3、使用Redis實作,存儲兩個key,一個用于計時,一個用于計數。請求每調用一次,計數器增加1,若在計時器時間内計數器未超過門檻值,則可以處理任務
if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {
cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
}
if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {
LOGGER.info("調用頻率過快");
}
//短信發送邏輯
方案二、在短信發送至服務商時做限流處理
方案三、同時使用方案一和方案二
-
可行性分析
最快捷且有效的方式是使用RateLimiter實作,但是這很容易踩到一個坑,單節點模式下,使用RateLimiter進行限流一點問題都沒有。但是…線上是分布式系統,布署了多個節點,而且多個節點最終調用的是同一個短信服務商接口。雖然我們對單個節點能做到将QPS限制在400/s,但是多節點條件下,如果每個節點均是400/s,那麼到服務商那邊的總請求就是節點數x400/s,于是限流效果失效。使用該方案對單節點的門檻值控制是難以适應分布式環境的,至少目前我還沒想到更為合适的方式。
對于第二種,使用delayqueue方式。其實主要存在兩個問題,1:短信系統本身就用了一層消息隊列,有用kafka,或者rabitmq,如果再加一層延遲隊列,從設計上來說是不太合适的。2:實作delayqueue的過程相對較麻煩,耗時可能比較長,而且達不到精準限流的效果
對于第三種,使用redis進行限流,其很好地解決了分布式環境下多執行個體所導緻的并發問題。因為使用redis設定的計時器和計數器均是全局唯一的,不管多少個節點,它們使用的都是同樣的計時器和計數器,是以可以做到非常精準的流控。同時,這種方案編碼并不複雜,可能需要的代碼不超過10行。
-
實施方案
根據可行性分析可知,整個系統采取redis限流處理是成本最低且最高效的。
具體實作
1、在Controller層設定兩個全局key,一個用于計數,另一個用于計時
private static final String API_WEB_TIME_KEY = "time_key";
private static final String API_WEB_COUNTER_KEY = "counter_key";
2、對時間key的存在與否進行判斷,并對計數器是否超過門檻值進行判斷
if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {
cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
cacheDao.putToValue(API_WEB_COUNTER_KEY,0,(long)2, TimeUnit.SECONDS);//時間到就重新初始化為
}
if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {
LOGGER.info("調用頻率過快");
}
//短信發送邏輯
實施結果
可以達到非常精準的流控,截圖會在後續的過程中貼出來。歡迎有疑問的小夥伴們在評論區提出問題,我看到後盡量抽時間回答的
http://blog.csdn.net/Justnow_/article/details/53055299
一、場景描述
很多做服務接口的人或多或少的遇到這樣的場景,由于業務應用系統的負載能力有限,為了防止非預期的請求對系統壓力過大而拖垮業務應用系統。
也就是面對大流量時,如何進行流量控制?
服務接口的流量控制政策:分流、降級、限流等。本文讨論下限流政策,雖然降低了服務接口的通路頻率和并發量,卻換取服務接口和業務應用系統的高可用。
實際場景中常用的限流政策:
- Nginx前端限流
按照一定的規則如帳号、IP、系統調用邏輯等在Nginx層面做限流
- 業務應用系統限流
1、用戶端限流
2、服務端限流
- 資料庫限流
紅線區,力保資料庫
二、常用的限流算法
常用的限流算法由:樓桶算法和令牌桶算法。本文不具體的詳細說明兩種算法的原理,原理會在接下來的文章中做說明。
1、漏桶算法
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(通路頻率超過接口響應速率),然後就拒絕請求,可以看出漏桶算法能強行限制資料的傳輸速率.示意圖如下:

可見這裡有兩個變量,一個是桶的大小,支援流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。
因為漏桶的漏出速率是固定的參數,是以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.是以,漏桶算法對于存在突發特性的流量來說缺乏效率.
2、令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易了解.随着時間流逝,系統會按恒定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶裡加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.
令牌桶的另外一個好處是可以友善的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.
三、基于Redis功能的實作
簡陋的設計思路:假設一個使用者(用IP判斷)每分鐘通路某一個服務接口的次數不能超過10次,那麼我們可以在Redis中建立一個鍵,并此時我們就設定鍵的過期時間為60秒,每一個使用者對此服務接口的通路就把鍵值加1,在60秒内當鍵值增加到10的時候,就禁止通路服務接口。在某種場景中添加通路時間間隔還是很有必要的。
1)使用Redis的incr指令,将計數器作為Lua腳本
1 local current
2 current = redis.call("incr",KEYS[1])
3 if tonumber(current) == 1 then
4 redis.call("expire",KEYS[1],1)
5 end
Lua腳本在Redis中運作,保證了incr和expire兩個操作的原子性。
2)使用Reids的清單結構代替incr指令
1 FUNCTION LIMIT_API_CALL(ip)
2 current = LLEN(ip)
3 IF current > 10 THEN
4 ERROR "too many requests per second"
5 ELSE
6 IF EXISTS(ip) == FALSE
7 MULTI
8 RPUSH(ip,ip)
9 EXPIRE(ip,1)
10 EXEC
11 ELSE
12 RPUSHX(ip,ip)
13 END
14 PERFORM_API_CALL()
15 END
Rate Limit使用Redis的清單作為容器,LLEN用于對通路次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個指令,用于在第一次執行計數是建立清單并設定過期時間,
RPUSHX在後續的計數操作中進行增加操作。
四、基于令牌桶算法的實作
令牌桶算法可以很好的支撐突然額流量的變化即滿令牌桶數的峰值。
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;
20 public class TokenBucket implements LifeCycle {
// 預設桶大小個數 即最大瞬間流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一個桶的機關是1位元組
private int everyTokenSize = 1;
// 瞬間最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 隊列來緩存桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已經滿了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 擷取足夠的令牌個數
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸内容大小對應的桶個數
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
@Override
public void start() {
// 初始化桶隊列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生産者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true;
}
@Override
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
@Override
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException, InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
String data = "xxxx";// 四個位元組
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
參考:
http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/
http://redisdoc.com/string/incr.html
http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html
http://www.cnblogs.com/exceptioneye/p/4783904.html
在開發高并發系統時有三把利器用來保護系統:緩存、降級和限流。緩存的目的是提升系統通路速度和增大系統能處理的容量,可謂是抗高并發流量的銀彈;而降級是當服務出問題或者影響到核心流程的性能則需要暫時屏蔽掉,待高峰或者問題解決後再打開;而有些場景并不能用緩存和降級來解決,比如稀缺資源(秒殺、搶購)、寫服務(如評論、下單)、頻繁的複雜查詢(評論的最後幾頁),是以需有一種手段來限制這些場景的并發/請求量,即限流。
限流的目的是通過對并發通路/請求進行限速或者一個時間視窗内的的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務(定向到錯誤頁或告知資源沒有了)、排隊或等待(比如秒殺、評論、下單)、降級(傳回兜底資料或預設資料,如商品詳情頁庫存預設有貨)。
一般開發高并發系統常見的限流有:限制總并發數(比如資料庫連接配接池、線程池)、限制瞬時并發數(如nginx的limit_conn子產品,用來限制瞬時并發連接配接數)、限制時間視窗内的平均速率(如Guava的RateLimiter、nginx的limit_req子產品,限制每秒的平均速率);其他還有如限制遠端接口調用速率、限制MQ的消費速率。另外還可以根據網絡連接配接數、網絡流量、CPU或記憶體負載等來限流。
先有緩存這個銀彈,後有限流來應對618、雙十一高并發流量,在處理高并發問題上可以說是如虎添翼,不用擔心瞬間流量導緻系統挂掉或雪崩,最終做到有損服務而不是不服務;限流需要評估好,不可亂用,否則會正常流量出現一些奇怪的問題而導緻使用者抱怨。
在實際應用時也不要太糾結算法問題,因為一些限流算法實作是一樣的隻是描述不一樣;具體使用哪種限流技術還是要根據實際場景來選擇,不要一味去找最佳模式,白貓黑貓能解決問題的就是好貓。
因在實際工作中遇到過許多人來問如何進行限流,是以本文會詳細介紹各種限流手段。那麼接下來我們從限流算法、應用級限流、分布式限流、接入層限流來詳細學習下限流技術手段。
限流算法
常見的限流算法有:令牌桶、漏桶。計數器也可以進行粗暴限流實作。
令牌桶算法
令牌桶算法是一個存放固定容量令牌的桶,按照固定速率往桶裡添加令牌。令牌桶算法的描述如下:
- 假設限制2r/s,則按照500毫秒的固定速率往桶中添加令牌;
- 桶中最多存放b個令牌,當桶滿時,新添加的令牌被丢棄或拒絕;
- 當一個n個位元組大小的資料包到達,将從桶中删除n個令牌,接着資料包被發送到網絡上;
- 如果桶中的令牌不足n個,則不會删除令牌,且該資料包将被限流(要麼丢棄,要麼緩沖區等待)。
漏桶算法
漏桶作為計量工具(The Leaky Bucket Algorithm as a Meter)時,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
- 一個固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,則不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
- 如果流入水滴超出了桶的容量,則流入的水滴溢出了(被丢棄),而漏桶容量是不變的。
令牌桶和漏桶對比:
- 令牌桶是按照固定速率往桶中添加令牌,請求是否被處理需要看桶中令牌是否足夠,當令牌數減為零時則拒絕新的請求;
- 漏桶則是按照常量固定速率流出請求,流入請求速率任意,當流入的請求數累積到漏桶容量時,則新流入的請求被拒絕;
- 令牌桶限制的是平均流入速率(允許突發請求,隻要有令牌就可以處理,支援一次拿3個令牌,4個令牌),并允許一定程度突發流量;
- 漏桶限制的是常量流出速率(即流出速率是一個固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),進而平滑突發流入速率;
- 令牌桶允許一定程度的突發,而漏桶主要目的是平滑流入速率;
- 兩個算法實作可以一樣,但是方向是相反的,對于相同的參數得到的限流效果是一樣的。
另外有時候我們還使用計數器來進行限流,主要用來限制總并發數,比如資料庫連接配接池、線程池、秒殺的并發數;隻要全局總請求數或者一定時間段的總請求數設定的閥值則進行限流,是簡單粗暴的總數量限流,而不是平均速率限流。
到此基本的算法就介紹完了,接下來我們首先看看應用級限流。
應用級限流
限流總并發/連接配接/請求數
對于一個應用系統來說一定會有極限并發/請求數,即總有一個TPS/QPS閥值,如果超了閥值則系統就會不響應使用者請求或響應的非常慢,是以我們最好進行過載保護,防止大量請求湧入擊垮系統。
如果你使用過Tomcat,其Connector 其中一種配置有如下幾個參數:
acceptCount:如果Tomcat的線程都忙于響應,新來的連接配接會進入隊列排隊,如果超出排隊大小,則拒絕連接配接;
maxConnections: 瞬時最大連接配接數,超出的會排隊等待;
maxThreads:Tomcat能啟動用來處理請求的最大線程數,如果請求處理量一直遠遠大于最大線程數則可能會僵死。
詳細的配置請參考官方文檔。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都會有類似的限制連接配接數的配置。
限流總資源數
如果有的資源是稀缺資源(如資料庫連接配接、線程),而且可能有多個系統都會去使用它,那麼需要限制應用;可以使用池化技術來限制總資源數:連接配接池、線程池。比如配置設定給每個應用的資料庫連接配接是100,那麼本應用最多可以使用100個資源,超出了可以等待或者抛異常。
限流某個接口的總并發/請求數
如果接口可能會有突發通路情況,但又擔心通路量太大造成崩潰,如搶購業務;這個時候就需要限制這個接口的總并發/請求數總請求數了;因為粒度比較細,可以為每個接口都設定相應的閥值。可以使用Java中的AtomicLong進行限流:
=================================
try {
if(atomic.incrementAndGet() > 限流數) {
//拒絕請求
}
//處理請求
} finally {
atomic.decrementAndGet();
}
适合對業務無損的服務或者需要過載保護的服務進行限流,如搶購業務,超出了大小要麼讓使用者排隊,要麼告訴使用者沒貨了,對使用者來說是可以接受的。而一些開放平台也會限制使用者調用某個接口的試用請求量,也可以用這種計數器方式實作。這種方式也是簡單粗暴的限流,沒有平滑處理,需要根據實際情況選擇使用;
限流某個接口的時間窗請求數
即一個時間視窗内的請求數,如想限制某個接口/服務每秒/每分鐘/每天的請求數/調用量。如一些基礎服務會被很多其他系統調用,比如商品詳情頁服務會調用基礎商品服務調用,但是怕因為更新量比較大将基礎服務打挂,這時我們要對每秒/每分鐘的調用量進行限速;一種實作方式如下所示:
LoadingCache<Long, AtomicLong> counter =
CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(new CacheLoader<Long, AtomicLong>() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return new AtomicLong(0);
}
});
long limit = 1000;
while(true) {
//得到目前秒
long currentSeconds = System.currentTimeMillis() / 1000;
if(counter.get(currentSeconds).incrementAndGet() > limit) {
System.out.println("限流了:" + currentSeconds);
continue;
}
//業務處理
}
我們使用Guava的Cache來存儲計數器,過期時間設定為2秒(保證1秒内的計數器是有的),然後我們擷取目前時間戳然後取秒數來作為KEY進行計數統計和限流,這種方式也是簡單粗暴,剛才說的場景夠用了。
平滑限流某個接口的請求數
之前的限流方式都不能很好地應對突發請求,即瞬間請求可能都被允許進而導緻一些問題;是以在一些場景中需要對突發請求進行整形,整形為平均速率請求處理(比如5r/s,則每隔200毫秒處理一個請求,平滑了速率)。這個時候有兩種算法滿足我們的場景:令牌桶和漏桶算法。Guava架構提供了令牌桶算法實作,可直接拿來使用。
Guava RateLimiter提供了令牌桶算法實作:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實作。
SmoothBursty
RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
将得到類似如下的輸出:
0.0
0.198239
0.196083
0.200609
0.199599
0.19961
1、RateLimiter.create(5) 表示桶容量為5且每秒新增5個令牌,即每隔200毫秒新增一個令牌;
2、limiter.acquire()表示消費一個令牌,如果目前桶中有足夠令牌則成功(傳回值為0),如果桶中沒有令牌則暫停一段時間,比如發令牌間隔是200毫秒,則等待200毫秒後再去消費令牌(如上測試用例傳回的為0.198239,差不多等待了200毫秒桶中才有令牌可用),這種實作将突發請求速率平均為了固定請求速率。
再看一個突發示例:
RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire(5));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1))
将得到類似如下的輸出:
0.0
0.98745
0.183553
0.199909
limiter.acquire(5)表示桶的容量為5且每秒新增5個令牌,令牌桶算法允許一定程度的突發,是以可以一次性消費5個令牌,但接下來的limiter.acquire(1)将等待差不多1秒桶中才能有令牌,且接下來的請求也整形為固定速率了。
System.out.println(limiter.acquire(10));
1.997428
0.192273
0.200616
同上邊的例子類似,第一秒突發了10個請求,令牌桶算法也允許了這種突發(允許消費未來的令牌),但接下來的limiter.acquire(1)将等待差不多2秒桶中才能有令牌,且接下來的請求也整形為固定速率了。
接下來再看一個突發的例子:
RateLimiter limiter = RateLimiter.create(2);
System.out.println(limiter.acquire());
Thread.sleep(2000L);
0.499876
0.495799
1、建立了一個桶容量為2且每秒新增2個令牌;
2、首先調用limiter.acquire()消費一個令牌,此時令牌桶可以滿足(傳回值為0);
3、然後線程暫停2秒,接下來的兩個limiter.acquire()都能消費到令牌,第三個limiter.acquire()也同樣消費到了令牌,到第四個時就需要等待500毫秒了。
此處可以看到我們設定的桶容量為2(即允許的突發量),這是因為SmoothBursty中有一個參數:最大突發秒數(maxBurstSeconds)預設值是1s,突發量/桶容量=速率*maxBurstSeconds,是以本示例桶容量/突發量為2,例子中前兩個是消費了之前積攢的突發量,而第三個開始就是正常計算的了。令牌桶算法允許将一段時間内沒有消費的令牌暫存到令牌桶中,留待未來使用,并允許未來請求的這種突發。
SmoothBursty通過平均速率和最後一次新增令牌的時間計算出下次新增令牌的時間的,另外需要一個桶暫存一段時間内沒有使用的令牌(即可以突發的令牌數)。另外RateLimiter還提供了tryAcquire方法來進行無阻塞或可逾時的令牌消費。
因為SmoothBursty允許一定程度的突發,會有人擔心如果允許這種突發,假設突然間來了很大的流量,那麼系統很可能扛不住這種突發。是以需要一種平滑速率的限流工具,進而系統冷啟動後慢慢的趨于平均固定速率(即剛開始速率小一些,然後慢慢趨于我們設定的固定速率)。Guava也提供了SmoothWarmingUp來實作這種需求,其可以認為是漏桶算法,但是在某些特殊場景又不太一樣。
SmoothWarmingUp建立方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit)
permitsPerSecond表示每秒新增的令牌數,warmupPeriod表示在從冷啟動速率過渡到平均速率的時間間隔。
示例如下:
RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
for(int i = 1; i < 5;i++) {
System.out.println(limiter.acquire());
}
Thread.sleep(1000L);
0.51767
0.357814
0.219992
0.199984
0.360826
0.220166
0.199723
0.199555
速率是梯形上升速率的,也就是說冷啟動時會以一個比較大的速率慢慢到平均速率;然後趨于平均速率(梯形下降到平均速率)。可以通過調節warmupPeriod參數實作一開始就是平滑固定速率。
到此應用級限流的一些方法就介紹完了。假設将應用部署到多台機器,應用級限流方式隻是單應用内的請求限流,不能進行全局限流。是以我們需要分布式限流和接入層限流來解決這個問題。
分布式限流
分布式限流最關鍵的是要将限流服務做成原子化,而解決方案可以使使用redis+lua或者nginx+lua技術進行實作,通過這兩種技術可以實作的高并發和高性能。
首先我們來使用redis+lua實作時間窗内某個接口的請求數限流,實作了該功能後可以改造為限流總并發/請求數和限制總資源數。Lua本身就是一種程式設計語言,也可以使用它實作複雜的令牌桶或漏桶算法。
redis+lua實作中的lua腳本:
local key = KEYS[1] --限流KEY(一秒一個)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call("INCRBY", key, "1")) --請求數+1
if current > limit then --如果超出限流大小
return 0
elseif current == 1 then --隻有第一次通路需要設定2秒的過期時間
redis.call("expire", key,"2")
end
return 1
如上操作因是在一個lua腳本中,又因Redis是單線程模型,是以是線程安全的。如上方式有一個缺點就是當達到限流大小後還是會遞增的,可以改造成如下方式實作:
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --如果超出限流大小
else --請求數+1,并設定2秒過期
redis.call("INCRBY", key,"1")
return 1
如下是Java中判斷是否需要限流的代碼:
public static boolean acquire() throws Exception {
String luaScript = Files.toString(new File("limit.lua"), Charset.defaultCharset());
Jedis jedis = new Jedis("192.168.147.52", 6379);
String key = "ip:" + System.currentTimeMillis()/ 1000; //此處将目前時間戳取秒數
Stringlimit = "3"; //限流大小
return (Long)jedis.eval(luaScript,Lists.newArrayList(key), Lists.newArrayList(limit)) == 1;
因為Redis的限制(Lua中有寫操作不能使用帶随機性質的讀操作,如TIME)不能在Redis Lua中使用TIME擷取時間戳,是以隻好從應用擷取然後傳入,在某些極端情況下(機器時鐘不準的情況下),限流會存在一些小問題。
使用Nginx+Lua實作的Lua腳本:
local locks = require "resty.lock"
local function acquire()
local lock =locks:new("locks")
local elapsed, err =lock:lock("limit_key") --互斥鎖
local limit_counter =ngx.shared.limit_counter --計數器
local key = "ip:" ..os.time()
local limit = 5 --限流大小
local current =limit_counter:get(key)
if current ~= nil and current + 1> limit then --如果超出限流大小
lock:unlock()
return 0
end
if current == nil then
limit_counter:set(key, 1, 1) --第一次需要設定過期時間,設定key的值為1,過期時間為1秒
else
limit_counter:incr(key, 1) --第二次開始加1即可
lock:unlock()
return 1
ngx.print(acquire())
實作中我們需要使用lua-resty-lock互斥鎖子產品來解決原子性問題(在實際工程中使用時請考慮擷取鎖的逾時問題),并使用ngx.shared.DICT共享字典來實作計數器。如果需要限流則傳回0,否則傳回1。使用時需要先定義兩個共享字典(分别用來存放鎖和計數器資料):
http {
……
lua_shared_dict locks 10m;
lua_shared_dict limit_counter 10m;
有人會糾結如果應用并發量非常大那麼redis或者nginx是不是能抗得住;不過這個問題要從多方面考慮:你的流量是不是真的有這麼大,是不是可以通過一緻性哈希将分布式限流進行分片,是不是可以當并發量太大降級為應用級限流;對策非常多,可以根據實際情況調節;像在京東使用Redis+Lua來限流搶購流量,一般流量是沒有問題的。
對于分布式限流目前遇到的場景是業務上的限流,而不是流量入口的限流;流量入口限流應該在接入層完成,而接入層筆者一般使用Nginx。
參考資料
https://en.wikipedia.org/wiki/Token_bucket
https://en.wikipedia.org/wiki/Leaky_bucket
http://redis.io/commands/incr
http://nginx.org/en/docs/http/ngx_http_limit_req_module.html
http://nginx.org/en/docs/http/ngx_http_limit_conn_module.html
https://github.com/openresty/lua-resty-limit-traffic
http://nginx.org/en/docs/http/ngx_http_core_module.html#limit_rate
http://www.blogjava.net/stevenjohn/archive/2016/06/14/430882.html
http://www.mincoder.com/article/2943.shtml
<!--核心代碼片段-->
private RateLimiter rateLimiter = RateLimiter.create(400);//400表示每秒允許處理的量是400
if(rateLimiter.tryAcquire()) {
//短信發送邏輯可以在此處
}
if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {
cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
}
if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {
LOGGER.info("調用頻率過快");
}
//短信發送邏輯
private static final String API_WEB_TIME_KEY = "time_key";
private static final String API_WEB_COUNTER_KEY = "counter_key";
if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {
cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
cacheDao.putToValue(API_WEB_COUNTER_KEY,0,(long)2, TimeUnit.SECONDS);//時間到就重新初始化為
}
if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {
LOGGER.info("調用頻率過快");
}
//短信發送邏輯

1 local current
2 current = redis.call("incr",KEYS[1])
3 if tonumber(current) == 1 then
4 redis.call("expire",KEYS[1],1)
5 end
1 FUNCTION LIMIT_API_CALL(ip)
2 current = LLEN(ip)
3 IF current > 10 THEN
4 ERROR "too many requests per second"
5 ELSE
6 IF EXISTS(ip) == FALSE
7 MULTI
8 RPUSH(ip,ip)
9 EXPIRE(ip,1)
10 EXEC
11 ELSE
12 RPUSHX(ip,ip)
13 END
14 PERFORM_API_CALL()
15 END
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;
20 public class TokenBucket implements LifeCycle {
// 預設桶大小個數 即最大瞬間流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一個桶的機關是1位元組
private int everyTokenSize = 1;
// 瞬間最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 隊列來緩存桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已經滿了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 擷取足夠的令牌個數
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸内容大小對應的桶個數
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
@Override
public void start() {
// 初始化桶隊列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生産者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true;
}
@Override
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
@Override
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException, InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
String data = "xxxx";// 四個位元組
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
try {
if(atomic.incrementAndGet() > 限流數) {
//拒絕請求
}
//處理請求
} finally {
atomic.decrementAndGet();
}
LoadingCache<Long, AtomicLong> counter =
CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(new CacheLoader<Long, AtomicLong>() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return new AtomicLong(0);
}
});
long limit = 1000;
while(true) {
//得到目前秒
long currentSeconds = System.currentTimeMillis() / 1000;
if(counter.get(currentSeconds).incrementAndGet() > limit) {
System.out.println("限流了:" + currentSeconds);
continue;
}
//業務處理
}
RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());