天天看點

Java 常用限流算法解析

前言

限流作為高并發場景下抵擋流量洪峰,保護後端服務不被沖垮的一種有效手段,比如大家熟知的限流元件guawa,springcloud中的Hystrix,以及springcloud-alibaba生态中的Sentinel,甚至是基于網關的限流,比如在nginx中配置限流政策,在gateway中配置限流政策等

限流無處不在,既然限流的作用如此強大,那麼其底層的實作原理如何呢,說到底,限流的核心是由一系列不同的算法完成,本篇将通過執行個體來說明下常用的幾種限流算法的用法和原理

1、計數器算法

計數器算法限流是采用簡單的計數操作,到一段時間之後自動清零,通俗來說,就是系統允許的最大流量是固定的,每過來一個請求分發一個數量的資源處理請求,一旦某個時間段,這批用于處理請求的資源達到了最大值,後續再過來的請求就直接沒法處理了

為了模拟計數器的效果,這裡我們使用Java中的Semaphore,對Semaphore有過了解的同學應該直到,這個元件可以搭配線程一起使用,可以對并發線程進行處理,

可以了解Semaphore就是一個令牌發放的人員,所有過來的請求都必須從Semaphore中拿到一個資源(線程),拿到資源的請求才能被處理

/**
 * 計數器限流算法
 */
public class Counter {

    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(3);
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //模拟每隔3秒鐘,将計數器清零,在此期間過來的請求可以擷取到了其中一個計數器的資源後,處理自己的請求業務
                semaphore.release(3);
                System.out.println();
            }
        }, 3000, 4000, TimeUnit.MILLISECONDS);

        //模拟源源不斷的請求
        while (true) {
            try {
                semaphore.acquire();
                System.out.println("擷取到計數器,開始處理自己的業務");
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("finished ~");
        }
    }

}
           

本段代碼的思路就是,模拟使用一個定時排程的線程池,每隔幾秒鐘重置Semaphore的計數器初始值,Semaphore這裡模拟限流中的令牌發放員,持有一定數量的令牌,源源不斷的請求過來之後,拿到了令牌的請求就能進行後續處理,否則被丢棄(或者放到隊列等待處理)

運作下這段代碼,看下效果

Java 常用限流算法解析

可以發現,由于請求那裡使用了while(true)模拟源源不斷的請求,但實際上Semaphore手頭隻有3個,不管過來多少,3個計數器發完了,隻能等待下一個輪回

計數器算法優缺點總結

  • 實作非常簡單
  • 控制力度太粗,假如1s内限制3次,假如有3次在前100ms内已經用完,那麼後面的900ms将隻能處于阻 塞狀态,其他的請求就進不來了,造成伺服器資源浪費
  • 實際應用場景較少

2、漏桶算法

可以按照下圖了解漏桶算法,如圖所示,有一個桶,桶裡面源源不斷的有一定數量的水滴,以某種速度漏出去,漏出去的水滴可以了解為一個個請求的令牌

漏桶算法是将請求緩存在桶中,使得服務能夠勻速處理。超出桶容量的請求将部分丢棄,漏桶算法主要用于保護内部業務處理時,能夠穩定有節奏的運作,但缺點是無法根據流量的波動進行動态調整(擴縮容),可以想象現實中銀行的服務視窗辦理業務的情況

Java 常用限流算法解析

編碼實作思路

  • 既然請求到來之後,是被勻速處理的,可以考慮使用阻塞隊列,而且阻塞隊列的容量是固定的
  • 服務端處理請求的能力是有限的,比如每秒隻能處理一個請求,那麼過來的請求可能有一部分被丢棄而無法處理

下面看具體的代碼,參考注釋進行了解

/**
 * 漏桶算法模拟
 */
public class LoseBarrel {

    public static void main(String[] args) {
        //相當于是一個容量固定大小的桶,用于存放源源不斷過來的請求
        final LinkedBlockingQueue<String> que = new LinkedBlockingQueue(3);
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        //模拟請求的處理,背景每2秒從桶中取一個請求進行處理
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                String v = que.poll();
                System.out.println("目前處理:" + v);
            }
        }, 2000, 2000, TimeUnit.MILLISECONDS);

        //無數個請求不斷湧過來,i可以了解為請求的編号
        int taskNum=0;

        while (true){
            taskNum++;
            try {
                System.out.println("put:任務"+taskNum);
                //等待1s如果進不了桶,就溢出丢棄
                que.offer("任務" + taskNum,1000,TimeUnit.MILLISECONDS);
            }catch (Exception e){
                e.printStackTrace();
            }
        }

    }
}
           
Java 常用限流算法解析

從運作代碼的效果來看,由于請求的速度非常快,是不間斷的過來的,但是過來的請求要先放入隊列,等待排程器從隊列中取出請求進行處理,事實上,處理請求的能力比請求的速度要慢,是以有一部分的請求将會被丢棄,正如圖中,處理了1,2,3任務之後,中間跳過了4和5,直接從6開始處理

漏桶算法優缺點小結

  • 可以有效抵擋外部的請求洪峰,保護内部服務不會過載
  • 内部服務勻速執行,但無法應對流量洪峰,無法做到彈性處理突發任務
  • 任務逾時溢出時被丢棄,粒度較粗,事實上可能更需要緩存隊列輔助保持一段時間

關于漏桶算法的實際使用,在Nginx中可以通過相關的配置實作限流,由于Nginx屬于網關層,在某些場景下,利用Nginx配置限流政策具有較好的效果,具體配置可以參考:Nginx限流

Java 常用限流算法解析

3、令牌桶算法

令牌桶算法可認為是漏桶算法的一種更新,它不但可以将流量做一步限制,還能解決漏桶中無法彈性伸縮處理請求的問題,即請求突然的洪峰和低谷。在現實中,類似服務大廳的門口設定門禁卡發放。發放是勻速的,請求較少時,令牌可以緩存起 來,供流量爆發時一次性批量擷取使用。而内部服務視窗不再設限

令牌桶算法可以參考下圖進行了解

Java 常用限流算法解析

從這個圖中可以抽取幾點關鍵資訊

  • 有一個不斷産生令牌的工廠,用于生産令牌
  • 生成的令牌放到一個令牌桶中,令牌桶容量有限,超出容量的令牌将會被丢棄
  • 後續過來的請求,需要先從令牌中擷取令牌,隻有那些擷取到令牌的請求才能被處理

從這不難發現,令牌桶的好處就是,可以暫存令牌,假如某段時間,密集性的請求湧過來,如果令牌桶中的令牌比較充足,就可以及時應對這段洪峰的請求處理,而不是像漏桶那樣直接丢棄

實作思路

1、一個可以存儲請求的容器

2、恒速産生令牌

3、模拟正常的速度的請求和洪峰的請求

了解了原理之後,直接上代碼,參考注釋進行了解

/**
 * 令牌桶算法模拟
 */
public class TokenBarrel {

    public static void main(String[] args) throws Exception {
        //模拟令牌桶,裡面可以存儲3個請求
        final Semaphore semaphore = new Semaphore(3);
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        //模拟每隔2秒中,向令牌桶中放一個令牌
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if (semaphore.availablePermits() < 3) {
                    semaphore.release();
                }
                System.out.println("令牌數:" + semaphore.availablePermits());
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);

        Thread.sleep(50);

        //模拟洪峰5個請求,前3個迅速響應,後兩個由于産生令牌的速度是固定的,隻能排隊等待
        for (int i = 0; i < 5; i++) {
            semaphore.acquire();
            System.out.println("洪峰請求:" + i);
        }

        //模拟日常請求,2s一個
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1200);
            semaphore.acquire();
            System.out.println("日常請求:" + i);
            Thread.sleep(1200);
        }
        //再次洪峰請求
        for (int i = 0; i < 5; i++) {
            semaphore.acquire();
            System.out.println("洪峰請求:" + i);
        }

        //檢查令牌桶的數量
        for (int i = 0; i < 5; i++) {
            Thread.sleep(2000);
            System.out.println("令牌剩餘:" + semaphore.availablePermits());
        }
    }
}

           
Java 常用限流算法解析

下面來分析下執行結果

  • 洪峰0-2迅速被執行,說明桶中暫存了3個令牌,有效應對了洪峰
  • 洪峰3,4被間隔性執行,得到了有效的限流
  • 日常請求被勻速執行,間隔均勻(這裡設定了日常請求業務處理需要1.2秒,令牌産生的速度能夠滿足)
  • 第二波洪峰來臨,和第一次一樣
  • 所有請求過去後,令牌最終被均勻頒發,積累到3個後不再繼續增加

令牌桶由于其優秀的特性,使用的場景較多,比如springcloud中的gateway限流,就有着實際的運用場景,具體使用可以參考小編之前的文章:gateway限流使用

4、滑動視窗算法

滑動視窗可了解為更加細分的計數器算法。前面了解到計數器算法比較粗暴,比如限定了1分鐘内的通路次數。

而滑動視窗限流是将1分鐘拆分成多 個段,不但要求整個1分鐘内請求數小于上限,而且要求每個細分的片段内請求數也要小于上限值。相當于将原來的計數周期 做了多個片段拆分,更為精細。

Java 常用限流算法解析

以上圖為例進行說明,其核心思想如下所述

  1. 将一個大的時間段拆分為多個小的時間段,比如計數器算法中限定1分鐘,那麼認為1分鐘為一個大的滑動時間視窗
  2. 對于這個1分鐘的時間視窗來說,該算法要求1分鐘内請求數不超過100個,即1分鐘内的請求總數不超過100個
  3. 為了更好的限制細分視窗流量,比如限定1秒或者5秒内的請求次數也不得超過一個數值,比如50個,那麼就可以将1分鐘劃分為12個段,每個段為5秒中,這5秒中邏輯上為一個整體,進行流量數的限制
  4. 由于時間視窗是不斷往前推進的,是以每個視窗當作一個整體向前推進

有了理論的基礎之後,我們很容易想到,可以利用linkedList結合map去實作

Java 常用限流算法解析

關鍵實作思路

  • 使用一個linkedlist用于儲存每個具體的小分段的時間
  • 每個小分段内,以目前時間為key,目前這一秒的時間内請求次數為value進行記錄
  • 需要一個定時任務排程器,不斷的将時間視窗以秒為機關,向前推進
  • 需要2個方法,一個是判斷每個時間點的請求是否達到給定的上限,另一個判斷這段時間内的總的請求數是否達到上限

下面來看具體的代碼,結合注釋進行了解

/**
 * 滑動時間視窗算法
 */
public class Windows {

    //整個視窗的流量上限,超出會被限流
    final int totalMax = 5;

    //每片的流量上限,超出同樣會被拒絕,可設定不同的值,可以和整個視窗限流數不一樣,但是不能大于
    final int sliceMax = 5;

    //分多少片,這裡指代每個小的分段
    final int slice = 3;

    //視窗,分3段,每段1s,也就是總長度3s
    final LinkedList<Long> linkedList = new LinkedList<>();

    //計數器,每片一個key,可以使用HashMap,這裡為控制保持有序性和可讀性,采用TreeMap
    Map<Long, AtomicInteger> map = new TreeMap<>();

    //心跳,每1s跳動1次,滑動視窗向前滑動一步,實際業務中可能需要手動控制滑動視窗的時機。
    ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

    //擷取key值,這裡即是時間戳(秒)
    private Long getKey() {
        return System.currentTimeMillis() / 1000;
    }

    //滑動時間視窗初始化
    public Windows() {
        //初始化視窗,目前時間指向的是最末端,前兩片其實是過去的2s
        Long key = getKey();
        for (int i = 0; i < slice; i++) {
            linkedList.addFirst(key - i);
            map.put(key - i, new AtomicInteger(0));
        }

        //啟動心跳任務,視窗根據時間,自動向前滑動,每秒1步
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //隊尾添加最新的片
                Long key = getKey();
                linkedList.addLast(key);
                map.put(key, new AtomicInteger());
                //将最老的片移除
                map.remove(linkedList.getFirst());
                linkedList.removeFirst();
                System.out.println("step:" + key + ":" + map);
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }

    //檢查目前時間所在的片是否達到上限
    public boolean checkCurrentSlice() {
        long key = getKey();
        AtomicInteger integer = map.get(key);
        if (integer != null) {
            return integer.get() < sliceMax;
        }
        //預設允許通路
        return true;
    }

    //檢查整個視窗所有片的計數之和是否達到上限
    public boolean checkAllCount() {
        return map.values().stream().mapToInt(value -> value.get()).sum() < totalMax;
    }

    //請求來臨進行處理
    public void req() {
        Long key = getKey();
        //如果時間視窗未到達目前時間片,稍微等待一下
        // 其實是一個保護措施,放置心跳對滑動視窗的推動滞後于目前請求
        while (linkedList.getLast() < key) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //如果未達到上限,傳回ok,計數器加1
        // 如果總的時間視窗或者分段視窗數量任意一項達到上限,拒絕,以達到限流的目的
        // 這裡是直接拒絕。也将請求放入緩沖隊列暫存
        if (checkCurrentSlice() && checkAllCount()) {
            map.get(key).incrementAndGet();
            System.out.println(key + " = get ok:" + map);
        } else {
            System.out.println(key + " = is rejected:" + map);
        }
    }

    public static void main(String[] args) throws Exception {
        Windows window = new Windows();
        //10個随機的請求,之間有200ms間隔。會造成總數達到上限而被限流
        for (int i = 0; i < 10; i++) {
            Thread.sleep(200);
            window.req();
        }
        Thread.sleep(3000);
        System.out.println();
        System.out.println();
        //模拟突發請求,單個時間分片計數器達到上限被限流
        for (int i = 0; i < 10; i++) {
            window.req();
        }
    }

}

           

繼續閱讀