前言
限流作為高并發場景下抵擋流量洪峰,保護後端服務不被沖垮的一種有效手段,比如大家熟知的限流元件guawa,springcloud中的Hystrix,以及springcloud-alibaba生态中的Sentinel,甚至是基于網關的限流,比如在nginx中配置限流政策,在gateway中配置限流政策等
限流無處不在,既然限流的作用如此強大,那麼其底層的實作原理如何呢,說到底,限流的核心是由一系列不同的算法完成,本篇将通過執行個體來說明下常用的幾種限流算法的用法和原理
1、計數器算法
計數器算法限流是采用簡單的計數操作,到一段時間之後自動清零,通俗來說,就是系統允許的最大流量是固定的,每過來一個請求分發一個數量的資源處理請求,一旦某個時間段,這批用于處理請求的資源達到了最大值,後續再過來的請求就直接沒法處理了
為了模拟計數器的效果,這裡我們使用Java中的Semaphore,對Semaphore有過了解的同學應該直到,這個元件可以搭配線程一起使用,可以對并發線程進行處理,
可以了解Semaphore就是一個令牌發放的人員,所有過來的請求都必須從Semaphore中拿到一個資源(線程),拿到資源的請求才能被處理
/**
* 計數器限流算法
*/
public class Counter {
public static void main(String[] args) {
final Semaphore semaphore = new Semaphore(3);
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//模拟每隔3秒鐘,将計數器清零,在此期間過來的請求可以擷取到了其中一個計數器的資源後,處理自己的請求業務
semaphore.release(3);
System.out.println();
}
}, 3000, 4000, TimeUnit.MILLISECONDS);
//模拟源源不斷的請求
while (true) {
try {
semaphore.acquire();
System.out.println("擷取到計數器,開始處理自己的業務");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("finished ~");
}
}
}
本段代碼的思路就是,模拟使用一個定時排程的線程池,每隔幾秒鐘重置Semaphore的計數器初始值,Semaphore這裡模拟限流中的令牌發放員,持有一定數量的令牌,源源不斷的請求過來之後,拿到了令牌的請求就能進行後續處理,否則被丢棄(或者放到隊列等待處理)
運作下這段代碼,看下效果

可以發現,由于請求那裡使用了while(true)模拟源源不斷的請求,但實際上Semaphore手頭隻有3個,不管過來多少,3個計數器發完了,隻能等待下一個輪回
計數器算法優缺點總結
- 實作非常簡單
- 控制力度太粗,假如1s内限制3次,假如有3次在前100ms内已經用完,那麼後面的900ms将隻能處于阻 塞狀态,其他的請求就進不來了,造成伺服器資源浪費
- 實際應用場景較少
2、漏桶算法
可以按照下圖了解漏桶算法,如圖所示,有一個桶,桶裡面源源不斷的有一定數量的水滴,以某種速度漏出去,漏出去的水滴可以了解為一個個請求的令牌
漏桶算法是将請求緩存在桶中,使得服務能夠勻速處理。超出桶容量的請求将部分丢棄,漏桶算法主要用于保護内部業務處理時,能夠穩定有節奏的運作,但缺點是無法根據流量的波動進行動态調整(擴縮容),可以想象現實中銀行的服務視窗辦理業務的情況
編碼實作思路
- 既然請求到來之後,是被勻速處理的,可以考慮使用阻塞隊列,而且阻塞隊列的容量是固定的
- 服務端處理請求的能力是有限的,比如每秒隻能處理一個請求,那麼過來的請求可能有一部分被丢棄而無法處理
下面看具體的代碼,參考注釋進行了解
/**
* 漏桶算法模拟
*/
public class LoseBarrel {
public static void main(String[] args) {
//相當于是一個容量固定大小的桶,用于存放源源不斷過來的請求
final LinkedBlockingQueue<String> que = new LinkedBlockingQueue(3);
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
//模拟請求的處理,背景每2秒從桶中取一個請求進行處理
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
String v = que.poll();
System.out.println("目前處理:" + v);
}
}, 2000, 2000, TimeUnit.MILLISECONDS);
//無數個請求不斷湧過來,i可以了解為請求的編号
int taskNum=0;
while (true){
taskNum++;
try {
System.out.println("put:任務"+taskNum);
//等待1s如果進不了桶,就溢出丢棄
que.offer("任務" + taskNum,1000,TimeUnit.MILLISECONDS);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
從運作代碼的效果來看,由于請求的速度非常快,是不間斷的過來的,但是過來的請求要先放入隊列,等待排程器從隊列中取出請求進行處理,事實上,處理請求的能力比請求的速度要慢,是以有一部分的請求将會被丢棄,正如圖中,處理了1,2,3任務之後,中間跳過了4和5,直接從6開始處理
漏桶算法優缺點小結
- 可以有效抵擋外部的請求洪峰,保護内部服務不會過載
- 内部服務勻速執行,但無法應對流量洪峰,無法做到彈性處理突發任務
- 任務逾時溢出時被丢棄,粒度較粗,事實上可能更需要緩存隊列輔助保持一段時間
關于漏桶算法的實際使用,在Nginx中可以通過相關的配置實作限流,由于Nginx屬于網關層,在某些場景下,利用Nginx配置限流政策具有較好的效果,具體配置可以參考:Nginx限流
3、令牌桶算法
令牌桶算法可認為是漏桶算法的一種更新,它不但可以将流量做一步限制,還能解決漏桶中無法彈性伸縮處理請求的問題,即請求突然的洪峰和低谷。在現實中,類似服務大廳的門口設定門禁卡發放。發放是勻速的,請求較少時,令牌可以緩存起 來,供流量爆發時一次性批量擷取使用。而内部服務視窗不再設限
令牌桶算法可以參考下圖進行了解
從這個圖中可以抽取幾點關鍵資訊
- 有一個不斷産生令牌的工廠,用于生産令牌
- 生成的令牌放到一個令牌桶中,令牌桶容量有限,超出容量的令牌将會被丢棄
- 後續過來的請求,需要先從令牌中擷取令牌,隻有那些擷取到令牌的請求才能被處理
從這不難發現,令牌桶的好處就是,可以暫存令牌,假如某段時間,密集性的請求湧過來,如果令牌桶中的令牌比較充足,就可以及時應對這段洪峰的請求處理,而不是像漏桶那樣直接丢棄
實作思路
1、一個可以存儲請求的容器
2、恒速産生令牌
3、模拟正常的速度的請求和洪峰的請求
了解了原理之後,直接上代碼,參考注釋進行了解
/**
* 令牌桶算法模拟
*/
public class TokenBarrel {
public static void main(String[] args) throws Exception {
//模拟令牌桶,裡面可以存儲3個請求
final Semaphore semaphore = new Semaphore(3);
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
//模拟每隔2秒中,向令牌桶中放一個令牌
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (semaphore.availablePermits() < 3) {
semaphore.release();
}
System.out.println("令牌數:" + semaphore.availablePermits());
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
Thread.sleep(50);
//模拟洪峰5個請求,前3個迅速響應,後兩個由于産生令牌的速度是固定的,隻能排隊等待
for (int i = 0; i < 5; i++) {
semaphore.acquire();
System.out.println("洪峰請求:" + i);
}
//模拟日常請求,2s一個
for (int i = 0; i < 3; i++) {
Thread.sleep(1200);
semaphore.acquire();
System.out.println("日常請求:" + i);
Thread.sleep(1200);
}
//再次洪峰請求
for (int i = 0; i < 5; i++) {
semaphore.acquire();
System.out.println("洪峰請求:" + i);
}
//檢查令牌桶的數量
for (int i = 0; i < 5; i++) {
Thread.sleep(2000);
System.out.println("令牌剩餘:" + semaphore.availablePermits());
}
}
}
下面來分析下執行結果
- 洪峰0-2迅速被執行,說明桶中暫存了3個令牌,有效應對了洪峰
- 洪峰3,4被間隔性執行,得到了有效的限流
- 日常請求被勻速執行,間隔均勻(這裡設定了日常請求業務處理需要1.2秒,令牌産生的速度能夠滿足)
- 第二波洪峰來臨,和第一次一樣
- 所有請求過去後,令牌最終被均勻頒發,積累到3個後不再繼續增加
令牌桶由于其優秀的特性,使用的場景較多,比如springcloud中的gateway限流,就有着實際的運用場景,具體使用可以參考小編之前的文章:gateway限流使用
4、滑動視窗算法
滑動視窗可了解為更加細分的計數器算法。前面了解到計數器算法比較粗暴,比如限定了1分鐘内的通路次數。
而滑動視窗限流是将1分鐘拆分成多 個段,不但要求整個1分鐘内請求數小于上限,而且要求每個細分的片段内請求數也要小于上限值。相當于将原來的計數周期 做了多個片段拆分,更為精細。
以上圖為例進行說明,其核心思想如下所述
- 将一個大的時間段拆分為多個小的時間段,比如計數器算法中限定1分鐘,那麼認為1分鐘為一個大的滑動時間視窗
- 對于這個1分鐘的時間視窗來說,該算法要求1分鐘内請求數不超過100個,即1分鐘内的請求總數不超過100個
- 為了更好的限制細分視窗流量,比如限定1秒或者5秒内的請求次數也不得超過一個數值,比如50個,那麼就可以将1分鐘劃分為12個段,每個段為5秒中,這5秒中邏輯上為一個整體,進行流量數的限制
- 由于時間視窗是不斷往前推進的,是以每個視窗當作一個整體向前推進
有了理論的基礎之後,我們很容易想到,可以利用linkedList結合map去實作
關鍵實作思路
- 使用一個linkedlist用于儲存每個具體的小分段的時間
- 每個小分段内,以目前時間為key,目前這一秒的時間内請求次數為value進行記錄
- 需要一個定時任務排程器,不斷的将時間視窗以秒為機關,向前推進
- 需要2個方法,一個是判斷每個時間點的請求是否達到給定的上限,另一個判斷這段時間内的總的請求數是否達到上限
下面來看具體的代碼,結合注釋進行了解
/**
* 滑動時間視窗算法
*/
public class Windows {
//整個視窗的流量上限,超出會被限流
final int totalMax = 5;
//每片的流量上限,超出同樣會被拒絕,可設定不同的值,可以和整個視窗限流數不一樣,但是不能大于
final int sliceMax = 5;
//分多少片,這裡指代每個小的分段
final int slice = 3;
//視窗,分3段,每段1s,也就是總長度3s
final LinkedList<Long> linkedList = new LinkedList<>();
//計數器,每片一個key,可以使用HashMap,這裡為控制保持有序性和可讀性,采用TreeMap
Map<Long, AtomicInteger> map = new TreeMap<>();
//心跳,每1s跳動1次,滑動視窗向前滑動一步,實際業務中可能需要手動控制滑動視窗的時機。
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
//擷取key值,這裡即是時間戳(秒)
private Long getKey() {
return System.currentTimeMillis() / 1000;
}
//滑動時間視窗初始化
public Windows() {
//初始化視窗,目前時間指向的是最末端,前兩片其實是過去的2s
Long key = getKey();
for (int i = 0; i < slice; i++) {
linkedList.addFirst(key - i);
map.put(key - i, new AtomicInteger(0));
}
//啟動心跳任務,視窗根據時間,自動向前滑動,每秒1步
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//隊尾添加最新的片
Long key = getKey();
linkedList.addLast(key);
map.put(key, new AtomicInteger());
//将最老的片移除
map.remove(linkedList.getFirst());
linkedList.removeFirst();
System.out.println("step:" + key + ":" + map);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
//檢查目前時間所在的片是否達到上限
public boolean checkCurrentSlice() {
long key = getKey();
AtomicInteger integer = map.get(key);
if (integer != null) {
return integer.get() < sliceMax;
}
//預設允許通路
return true;
}
//檢查整個視窗所有片的計數之和是否達到上限
public boolean checkAllCount() {
return map.values().stream().mapToInt(value -> value.get()).sum() < totalMax;
}
//請求來臨進行處理
public void req() {
Long key = getKey();
//如果時間視窗未到達目前時間片,稍微等待一下
// 其實是一個保護措施,放置心跳對滑動視窗的推動滞後于目前請求
while (linkedList.getLast() < key) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果未達到上限,傳回ok,計數器加1
// 如果總的時間視窗或者分段視窗數量任意一項達到上限,拒絕,以達到限流的目的
// 這裡是直接拒絕。也将請求放入緩沖隊列暫存
if (checkCurrentSlice() && checkAllCount()) {
map.get(key).incrementAndGet();
System.out.println(key + " = get ok:" + map);
} else {
System.out.println(key + " = is rejected:" + map);
}
}
public static void main(String[] args) throws Exception {
Windows window = new Windows();
//10個随機的請求,之間有200ms間隔。會造成總數達到上限而被限流
for (int i = 0; i < 10; i++) {
Thread.sleep(200);
window.req();
}
Thread.sleep(3000);
System.out.println();
System.out.println();
//模拟突發請求,單個時間分片計數器達到上限被限流
for (int i = 0; i < 10; i++) {
window.req();
}
}
}