天天看點

美團一面:如何實作一個100W ops 生産者消費者程式?

作者:架構師尼恩

▌說在前面:

在40歲老架構師 尼恩的讀者社群(50+)中,最近有小夥伴拿到了一線網際網路企業如極兔、有贊、希音、百度、網易的面試資格,遇到一幾個很重要的面試題:

如何設計一個100W級 ops 生産者、消費者程式?

與之類似的、其他小夥伴遇到過的問題還有:

手寫一個 生産者、消費者程式?

設計一個 高性能的 生産者、消費者程式?

這裡尼恩給大家做一下系統化、體系化的線程池梳理,使得大家可以充分展示一下大家雄厚的 “技術肌肉”,讓面試官愛到 “不能自已、口水直流”。

也一并把這個題目以及參考答案,收入咱們的 《尼恩Java面試寶典》V61版本,供後面的小夥伴參考,提升大家的 3高 架構、設計、開發水準。

注:本文以 PDF 持續更新,最新尼恩 架構筆記、面試題 的PDF檔案,請到《技術自由圈》公衆号擷取

▌最佳答案至少要包括以下5個版本:

  • 版本1:不安全的生産者-消費者模式版本
  • 版本2:使用 内置鎖實作的 生産者-消費者模式版本
  • 版本3:使用信号量實作(Semaphore)
  • 版本4:使用Blockingqueue 實作
  • 版本5:無鎖實作生産者-消費者模式版本

▌什麼是生産者-消費者模式:

▌首先,來看什麼是生産者-消費者問題?

生産者-消費者問題(Producer-Consumer Problem)也稱有限緩沖問題(Bounded-Buffer Problem),是一個多線程同步問題的經典案例。

生産者-消費者問題描述了兩個通路共享緩沖區的線程,即生産者線程和消費者線程,在實際運作時會發生的問題。生産者線程的主要功能是生成一定量的資料放到緩沖區中,然後重複此過程。消費者線程的主要功能是從緩沖區提取(或消耗)資料。

生産者―消費者問題關鍵是:

1)保證生産者不會在緩沖區滿時加入資料,消費者也不會在緩沖區中為空時消耗資料。

2)保證在生産者加入過程、消費者消耗過程中,不會産生錯誤的資料和行為。

生産者-消費者問題不僅僅是一個多線程同步問題的經典案例,而且業内已經将解決該問題的方案,抽象成為了一種設計模式——“生産者-消費者”模式。

▌現在,來看看什麼是生産者-消費者模式?

生産者-消費者模式是一個經典的多線程設計模式,它為多線程間的協作提供了良好的解決方案。

在生産者-消費者模式中,通常由兩類線程,即生産者線程(若幹個)和消費者線程(若幹個)。生産者線程向資料緩沖區(DataBuffer)加入資料,消費者線程則從DataBuffer消耗資料。生産者和消費者、記憶體緩沖區之間的關系結構圖如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

生産者-消費者模式中,有4個關鍵點:

(1)生産者與生産者之間、消費者與消費者之間,對資料緩沖區的操作是并發進行的。

(2)資料緩沖區是有容量上限的。資料緩沖區滿後,生産者不能再加入資料;DataBuffer空時,消費者不能再取出資料。

(3)資料緩沖區是線程安全的。在并發操作資料區的過程中,不能出現資料不一緻情況;或者在多個線程并發更改共享資料後,不會造成出現髒資料的情況。

(4)生産者或者消費者線程在空閑時,需要盡可能阻塞而不是執行無效的空操作,盡量節約CPU資源。

▌面試題:如何實作一個100W級ops 生産者、消費者程式?

尼恩提示,遇到這樣的面試題,我們可以從基礎的版本開始,一步一步進行性能優化。

  • 版本1:不安全的生産者-消費者模式版本
  • 版本2:使用 内置鎖實作的 生産者-消費者模式版本 順便說說,鎖的代價
  • 版本3:使用 内置鎖實作的 生産者-消費者模式版本

▌版本1:不安全的生産者-消費者模式版本

根據生産者―消費者模式的結構圖和描述先來實作一個非線程安全版本,包含了:

  • 資料緩沖區(DataBuffer)類、
  • 生産者(Producer)類、
  • 消費者(Consumer)類。

首先定義其資料緩沖區類,具體的代碼如下:

//共享資料區,類定義
class NotSafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    //儲存具體資料元素
    private List<T> dataList = new LinkedList<>();

    //儲存元素數量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向資料區增加一個元素
     */
    public void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("隊列已經滿了!");
            return;
        }
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果資料不一緻,抛出異常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }

    /**
     * 從資料區取出一個元素
     */
    public T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("隊列已經空了!");
            return null;
        }
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果資料不一緻,抛出異常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}           

上面的代碼:

  • 在add()執行個體方法中,加入元素之前首先會對amount是否達到上限進行判斷,如果資料區滿了,則不能加入資料;
  • 在fetch()執行個體方法中,消耗元素前首先會對amount是否大于零進行判斷,如果資料區空了,就不能取出資料。

生産者-消費者模式中,資料緩沖區(DataBuffer)類以及相應的生産、消費動作(Action)是可變的,生産者類、消費者類的執行邏輯是不同的,

那本着“分離變與不變”的軟體設計基本原則,可以将生産者類、消費者類與具體的生産、消費Action解耦,

進而使得生産者類、消費者類的代碼在後續可以複用,生産者、消費者邏輯與對應Action解耦後的類結構圖如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

通用Producer類組合了一個Callable類型的成員action執行個體,代表了生産資料所需要執行的實際動作,需要在構造Producer執行個體時傳入。

通用生産者類的代碼具體如下:

/**
 * 生産者任務的定義
 * Created by 尼恩@瘋狂創客圈.  源碼來自 《Java高并發核心程式設計 卷2 加強版》
 */
public class Producer implements Runnable {
    //生産的時間間隔,産一次等待的時間,預設為200ms
    public static final int PRODUCE_GAP = 200;

    //總次數
    // 注意:
    // 不是單個的次數
    // 是所有生産者的總的生産次數
    static final AtomicInteger TURN = new AtomicInteger(0);

    //生産者對象編号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生産者名稱
    String name = null;

    //生産的動作
    Callable action = null;

    int gap = PRODUCE_GAP;

    public Producer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        if (this.gap <= 0) {
            this.gap = PRODUCE_GAP;
        }
        name = "生産者-" + PRODUCER_NO.incrementAndGet();

    }

    public Producer(Callable action) {
        this.action = action;
        this.gap = PRODUCE_GAP;
        name = "生産者-" + PRODUCER_NO.incrementAndGet();

    }

    @Override
    public void run() {
        while (true) {

            try {
                //執行生産動作
                Object out = action.call();
                //輸出生産的結果
                if (null != out) {
                    Print.tcfo("第" + TURN.get() + "輪生産:" + out);
                }
                //每一輪生産之後,稍微等待一下
                sleepMilliSeconds(gap);

                //增加生産輪次
                TURN.incrementAndGet();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}           

通用Consumer類也組合了一個Callable類型的成員action執行個體,代表了消費者所需要執行的實際消耗動作,需要在構造Consumer執行個體時傳入。

通用Consumer類的代碼具體如下:

/**
 * 消費者任務的定義
 * Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》
 */
public class Consumer implements Runnable {

    //消費的時間間隔,預設等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消費總次數
    // 注意:
    // 不是單個消費者的次數
    // 是所有消費者的總的消費次數
    static final AtomicInteger TURN = new AtomicInteger(0);

    //消費者對象編号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消費者名稱
    String name;

    //消費的動作
    Callable action = null;

    //消費一次等待的時間,預設為1000ms
    int gap = CONSUME_GAP;

    public Consumer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        name = "消費者-" + CONSUMER_NO.incrementAndGet();

    }

    public Consumer(Callable action) {
        this.action = action;
        this.gap = gap;
        this.gap = CONSUME_GAP;
        name = "消費者-" + CONSUMER_NO.incrementAndGet();
    }

    @Override
    public void run() {
        while (true) {
            //增加消費次數
            TURN.incrementAndGet();
            try {
                //執行消費動作
                Object out = action.call();
                if (null != out) {
                    Print.tcfo("第" + TURN.get() + "輪消費:" + out);
                }
                //每一輪消費之後,稍微等待一下
                sleepMilliSeconds(gap);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}           

在完成了資料緩沖區類的定義、生産者類定義、消費者類的定義之後,

接下來定義一下資料緩沖區執行個體、生産動作和消費動作,具體的代碼如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

public class NotSafePetStore {
    //共享資料區,執行個體對象
    private static NotSafeDataBuffer<IGoods> notSafeDataBuffer = new NotSafeDataBuffer();

    //生産者執行的動作
    static Callable<IGoods> produceAction = () ->
    {
        //首先生成一個随機的商品
        IGoods goods = Goods.produceOne();
        //将商品加上共享資料區
        try {
            notSafeDataBuffer.add(goods);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
    //消費者執行的動作
    static Callable<IGoods> consumerAction = () ->
    {
        // 從PetStore擷取商品
        IGoods goods = null;
        try {
            goods = notSafeDataBuffer.fetch();

        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
}           

利用以上NotSafePetStore類所定義的三個靜态成員,可以快速組裝出一個簡單的生産者-消費者模式的Java實作版本,具體的代碼如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

public static void main(String[] args) throws InterruptedException {
    System.setErr(System.out);

    // 同時并發執行的線程數
    final int THREAD_TOTAL = 20;
    //線程池,用于多線程模拟測試
    ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
    for (int i = 0; i < 5; i++) {
        //生産者線程每生産一個商品,間隔500ms
        threadPool.submit(new Producer(produceAction, 500));
        //消費者線程每消費一個商品,間隔1500ms
        threadPool.submit(new Consumer(consumerAction, 1500));
    }
}           

在NotSafePetStore的main()方法中,利用for循環向線程池送出了5個生産者線程和5個消費者執行個體。

每個生産者執行個體生産一個商品間隔500毫秒;消費者執行個體每消費一個商品間隔1500毫秒;

也就是說,生産的速度大于消費的速度。

執行結果如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

從以上異常可以看出,在向資料緩沖區進行元素的增加或者提取時,多個線程在并發執行對amount、dataList兩個成員操作時次序已經混亂,導緻了資料不一緻和線程安全問題。

▌版本2:使用 内置鎖實作的 生産者-消費者模式版本

前面說了,為了實作 線程安全的生産者-消費者模式實作,可以使用鎖來實作。

  • 比較低級的辦法是用内置鎖,也就是synchronized+wait+notify方式解決
  • 更加進階一點的版本,使用顯示鎖 比如信号量(Semaphore)、Blockingqueue
  • 當然,終極版本是無鎖程式設計 disruptor 的方式來解決

當然,咱們這個小節,僅僅關注有鎖版本,後面的小節,再無鎖程式設計。

咱們就挨個來看下。先看 内置鎖實作的 生産者-消費者模式版本

▌synchronized+wait()+notify() 的實作方式

使用synchronized解決生産者和消費者模式,首先我們需要找出臨界區資源和臨界區代碼塊。

首先,我們來看下什麼是臨界區資源。臨界區資源表示一種可以被多個線程使用的公共資源或共享資料,但是每一次隻能有一個線程使用它。一旦臨界區資源被占用,想使用該資源的其他線程則必須等待。在并發情況下,臨界區資源是受保護的對象。

接下來,我們再來看下什麼是臨界區代碼塊。臨界區代碼段(Critical Section)是每個線程中通路臨界資源的那段代碼,多個線程必須互斥地對臨界區資源進行通路。線程進入臨界區代碼段之前,必須在進入區申請資源,申請成功之後進行臨界區代碼段,執行完成之後釋放資源。臨界區代碼段的進入和退出如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

最後,我們來看下竟态條件(Race Conditions)可能是由于在通路臨界區代碼段時沒有互斥地通路而導緻的特殊情況。

如果多個線程在臨界區代碼段的并發執行結果可能因為代碼的執行順序不同而出現不同的結果,我們就說這時在臨界區出現了競态條件問題。

那咱們回過頭來看生産者-消費者模式, 這個模式中, 生産者和消費者都需要操作DataBuffer(資料緩沖區)中,可以知道,臨界區代碼段在DataBuffer(資料緩沖區)中。

在資料緩沖區中,主要是資料進行操作, 那麼 由兩個臨界區資源,分别是amount和dataList。

由生産者-消費者模式的關鍵點我們可知, 生産者與生産者之間、消費者與消費者之間,對資料緩沖區的操作是并發進行的。

那麼添加資料和消耗資料是臨界區代碼,即其add()和fetch()兩個方法。

那麼建立建一個安全的資料緩存區類SafeDataBuffer類,在其add()和fetch()兩個執行個體方法的public聲明後面加上synchronized關鍵字即可。那線程安全的SafeDataBuffer類代碼如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

//共享資料區,類定義
class SafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();

    //儲存數量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向資料區增加一個元素
     */
    public synchronized void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("隊列已經滿了!");
            return;
        }
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果資料不一緻,抛出異常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }

    /**
     * 從資料區取出一個元素
     */
    public synchronized T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("隊列已經空了!");
            return null;
        }
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果資料不一緻,抛出異常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}           

由于其他的代碼沒有發生變化,我們執行看下結果:

美團一面:如何實作一個100W ops 生産者消費者程式?

運作這個線程安全的生産者-消費者模式的實作版本,

等待一段時間,之前出現的amount數量和dataList的長度不相等的受檢異常沒有再抛出;

另外,之前出現的資料不一緻情況以及線程安全問題也被完全解除。

目前的SafeDataBuffer類中,還存在一個性能的問題:消費者每一輪消費,不管資料區是否為空,都需要進行資料區的詢問和判斷。

循環的代碼如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

/**
 * 從資料區取出一個元素
 */
public synchronized T fetch() throws Exception {
    if (amount.get() <= 0) {
        Print.tcfo("隊列已經空了!");
        return null;
    }
    ....
}           

當資料區空時(amount <= 0),消費者無法取出資料,但是仍然做一個無用的資料區詢問工作,白白耗費了CPU的時間片

對于生産者來說,也存在類似的無效輪詢問題。

當資料區滿時,生産者無法加入資料,這時候生産者執行add(T element)方法也白白耗費了CPU的時間片。// Created by 尼恩@瘋狂創客圈. 源碼來自 《Java高并發核心程式設計 卷2 加強版》 /** * 向資料區增加一個元素 */ public synchronized void add(T element) throws Exception { if (amount.get() > MAX_AMOUNT) { Print.tcfo("隊列已經滿了!"); return; } .... }

在生産者或者消費者空閑時節約CPU時間片,免去巨大的CPU資源浪費的方法是使用“等待-通知”方式進行生産者與消費者之間的線程通信。

具體實作:

(1)在資料區滿(amount.get() > MAX_AMOUNT)時,可以讓生産者等待,等到下次資料區中可以加入資料時,給生産者發通知,讓生産者喚醒。

(2)在資料區空(amount <= 0)時,可以讓消費者等待,等到下次資料區中可以取出資料時,消費者才能被喚醒。

(3)可以在消費者取出一個資料後,由消費者去喚醒等待的生産者。

(4)可以在生産者加入一個資料後,由生産者去喚醒等待的消費者。

Java語言中“等待-通知”方式的線程間的通信使用對象的wait()、notify()兩類方法來實作。

每個Java對象都有wait()、notify()兩類執行個體方法,并且wait()、notify()方法和對象的螢幕是緊密相關的。

Java對象中的wait()、notify()兩類方法就如同信号開關,用來進行等待方和通知方之間的互動。

對象的wait()方法的主要作用是讓目前線程阻塞并等待被喚醒。wait()方法與對象螢幕緊密相關,使用wait()方法時也一定需要放在同步塊中。

wait()方法的調用方法如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

synchronized(locko)
{
    //同步保護的代碼塊
    locko.wait();
        ...
}           

對象的notify()方法的主要作用是喚醒在等待的線程。notify()方法與對象螢幕緊密相關,使用notify()方法時也需要放在同步塊中。notify()方法的調用方法如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

synchronized(locko)
{
    //同步保護的代碼塊
    locko.notify();
        ...
}           

為了避免空輪詢導緻CPU時間片浪費,提高生産者-消費者實作版本的性能,接下來示範使用“等待-通知”的方式在生産者與消費者之間進行線程間通信。

使用“等待-通知”機制通信的生産者-消費者實作版本定義三個同步對象,具體如下:

(1)LOCK_OBJECT:用于臨界區同步,臨界區資源為資料緩沖區的dataList變量和amount 變量。

(2)NOT_FULL:用于資料緩沖區的未滿條件等待和通知。生産者在添加元素前,需要判斷資料區是否已滿,如果是,生産者進入NOT_FULL的同步區去等待被通知,隻要消費者消耗一個元素,資料區就是未滿的,進入NOT_FULL的同步區發送通知。

(3)NOT_EMPTY:用于資料緩沖區的非空條件等待和通知。消費者在消耗元素前需要判斷資料區是否已空,如果是,消費者進入NOT_EMPTY的同步區等待被通知,隻要生産者添加一個元素,資料區就是非空的,生産者會進入NOT_EMPTY的同步區發送通知。

具體代碼如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

public class CommunicatePetStore {

    public static final int MAX_AMOUNT = 10; //資料區長度

    //共享資料區,類定義
    static class DateBuffer<T> {
        //儲存資料
        private List<T> dataList = new LinkedList<>();
        //儲存數量
        private volatile int amount = 0;

        private final Object LOCK_OBJECT = new Object();
        private final Object NOT_FULL = new Object();
        private final Object NOT_EMPTY = new Object();

        // 向資料區增加一個元素
        public void add(T element) throws Exception {
            synchronized (NOT_FULL) {
                while (amount >= MAX_AMOUNT) {
                    Print.tcfo("隊列已經滿了!");
                    //等待未滿通知
                    NOT_FULL.wait();
                }
            }
            synchronized (LOCK_OBJECT) {

                if (amount < MAX_AMOUNT) { // 加上雙重檢查,模拟雙檢鎖在單例模式中應用
                    dataList.add(element);
                    amount++;
                }
            }
            synchronized (NOT_EMPTY) {
                //發送未空通知
                NOT_EMPTY.notify();
            }
        }

        /**
         * 從資料區取出一個商品
         */
        public T fetch() throws Exception {
            synchronized (NOT_EMPTY) {
                while (amount <= 0) {
                    Print.tcfo("隊列已經空了!");
                    //等待未空通知
                    NOT_EMPTY.wait();
                }
            }

            T element = null;
            synchronized (LOCK_OBJECT) {
                if (amount > 0) {  // 加上雙重檢查,模拟雙檢鎖在單例模式中應用
                    element = dataList.remove(0);
                    amount--;
                }
            }

            synchronized (NOT_FULL) {
                //發送未滿通知
                NOT_FULL.notify();
            }
            return element;
        }
    }
}           

那以上就是使用synchronized+wait+notify實作的線程安全的生産者-消費者模式。

雖然線程安全問題順利解決,但是以上的解決方式使用了SafeDataBuffer的執行個體的對象鎖作為同步鎖,這樣一來,所有的生産、消費動作在執行過程中都需要搶占同一個同步鎖,最終的結果是所有的生産、消費動作都被串行化了。

而且在鎖競争激烈的情況下,synchronized鎖會膨脹更新為重量級鎖,嚴重的影響的程式的性能。

尼恩提示:

synchronized鎖的膨脹底層原理,非常重要, 這部分内容可以閱讀 《Java高并發核心程式設計 卷2 加強版》。

美團一面:如何實作一個100W ops 生産者消費者程式?

這裡不做贅述。

▌版本3:使用信号量實作(Semaphore)

為了實作 線程安全的生産者-消費者模式實作,可以使用鎖來實作。

  • 比較低級的辦法是用内置鎖,也就是synchronized+wait+notify方式解決
  • 更加進階一點的版本,使用顯示鎖 比如信号量(Semaphore)、Blockingqueue
  • 當然,終極版本是無鎖程式設計 disruptor 的方式來解決,

當然,咱們這個小節,僅僅關注有鎖版本,後面的小節,再無鎖程式設計。

咱們就挨個來看下。那接下我們看下顯示鎖 的 核心成員之一 信号量(semaphore)實作線程安全的生産者-消費者模式。

▌什麼是信号量?

信号量是Dijkstra在1965年提出的一種方法,它使用一個整型變量來累計喚醒次數,供以後使用。

在他的建議中引入了一個新的變量類型,稱作信号量(semaphore)。

一個信号量的取值可以為0(表示沒有儲存下來的喚醒操作)或者正值(表示有一個或多個喚醒操作)。

Dijkstra建議設立兩種操作:down和up(分别為一般化後的sleep和wakeup)。

對一個信号量執行down操作,則是檢查其值是否大于0。若該值大于0,則将其減1(即用掉一個儲存的喚醒信号)并繼續;若該值為0,則程序将睡眠,而且此時down操作并未結束。

原子操作:所謂原子操作,是指一組相關聯的操作要麼都不間斷地執行,要麼不執行。

檢查數值、修改變量值以及可能發生的睡眠操作,均作為一個單一的、不可分割的原子操作完成。保證一旦一個信号量操作開始,則在該操作完成或阻塞之前,其他程序均不允許通路該信号量。

這種原子性對于解決同步問題和避免競争條件是絕對必要的。

up操作對信号量的值增1。

如果一個或多個程序在該信号量上睡眠,無法完成一個先前的down操作,則由系統選擇其中的一個(如随機挑選)并允許該程序完成它的down操作。

于是,對一個有程序在其上睡眠的信号量執行一次up操作後,該信号量的值仍舊是0,但在其上睡眠的程序卻少了一個。信号量的值增加1和喚醒一個程序同樣也是不可分割的,不會有某個程序因執行up而阻塞,正如前面的模型中不會有程序因執行wakeup而阻塞一樣。

Dijkstra論文中的信号量含義

在Dijkstra原來的論文中,他分别使用名稱P和V而不是down和up,

荷蘭語中,Proberen的意思是嘗試,Verhogen的含義是增加或升高。

從實體上說明信号量的P、V操作的含義。

P(S)表示申請一個資源,S.value>0表示有資源可用,其值為資源的數目;S.value=0表示無資源可用;S.value<0, 則|S.value|表示S等待隊列中的程序個數。

V(S)表示釋放一個資源,信号量的初值應該大于等于0。P操作相當于“等待一個信号”,而V操作相當于“發送一個信号”,在實作同步過程中,V操作相當于發送一個信号說合作者已經完成了某項任務,在實作互斥過程中,V操作相當于發送一個信号說臨界資源可用了。

實際上,在實作互斥時,P、V操作相當于申請資源和釋放資源。

Dijkstra的解決方案使用了三個信号量:

一個稱為full,用來記錄充滿緩沖槽數目,

一個稱為empty,記錄空的緩沖槽總數;

一個稱為mutex,用來確定生産者和消費者不會同時通路緩沖區。

full的初值為0,empty的初值為緩沖區中槽的數目,mutex的初值為1。供兩個或多個程序使用的信号量,其初值為1,保證同時隻有一個程序可以進入臨界區,稱作二進制信号量。如果每個程序在進入臨界區前都執行down操作,并在剛剛退出時執行一個up操作,就能夠實作互斥。

信号量的另一種用途是用于實作同步,信号量full和empty用來保證某種事件的順序發生或不發生。在本例中,它們保證當緩沖區滿的時候生産者停止運作,以及當緩沖區空的時候消費者停止運作。

對于無界緩沖區的生産者—消費者問題,兩個程序共享一個不限大小的公共緩沖區。

由于是無界緩沖區(倉庫是無界限制的),即生産者不用關心倉庫是否滿,隻管往裡面生産東西,但是消費者還是要關心倉庫是否空。是以生産者不會因得不到緩沖區而被阻塞,不需要對空緩沖區進行管理,可以去掉在有界緩沖區中用來管理空緩沖區的信号量及其PV操作。

JUC中的信号量 Semaphore

在JUC中的信号量 Semaphore屬于共享鎖。Semaphore可以用來控制在同一時刻通路共享資源的線程數量,通過協調各個線程以保證共享資源的合理使用。Semaphore維護了一組虛拟許可,其數量可以通過構造器的參數指定。線程在通路共享資源前必須使用Semaphore的acquire()方法獲得許可,如果許可數量為0,該線程就一直阻塞。線程通路完成資源後,必須使用Semaphore的release()方法釋放許可。更形象的說法是:Semaphore是一個是許可管理器。

JUC包中Semaphore類的主要方法大緻如下:

Semaphore類的主要方法大緻如下:

(1) Semaphore(permits):構造一個Semaphore執行個體,初始化其管理的許可數量為permits參數值。

(2) Semaphore(permits,fair):構造一個Semaphore執行個體,初始化其管理的許可數量為permits參數值,以及是否以公平模式(fair參數是否為true)進行許可的發放。

Semaphore和ReentrantLock類似,Semaphore發放許可時有兩種模式:公平模式和非公平模式,預設情況下使用非公平模式。

(3) availablePermits():擷取Semaphore對象可用的許可數量。

(4) acquire():目前線程嘗試擷取Semaphore對象的一個許可。此過程是阻塞的,線程會一直等待Semaphore發放一個許可,直到發生以下任意一件事:

  • 目前線程擷取了一個可用的許可。
  • 目前線程被中斷,就會抛出InterruptedException異常,并停止等待,繼續往下執行。

(5) acquire(permits) :目前線程嘗試阻塞地擷取permits個許可。此過程是阻塞的,線程會一直等待Semaphore發放permits個許可。如果沒有足夠的許可而目前線程被中斷,就會抛出InterruptedException異常并終止阻塞。

(6) acquireUninterruptibly():目前線程嘗試阻塞地擷取一個許可,阻塞的過程不可中斷,直到成功擷取一個許可。

(7) acquireUninterruptibly(permits):目前線程嘗試阻塞地擷取permits個許可,阻塞的過程不可中斷,直到成功擷取permits個許可。

(8) tryAcquire():目前線程嘗試擷取一個許可。此過程是非阻塞的,它隻是進行一次嘗試,會立即傳回。如果目前線程成功擷取了一個許可,就傳回true;如果目前線程沒有獲得許可,就傳回false

(9) tryAcquire(permits):目前線程嘗試擷取permits個許可。此過程是非阻塞的,它隻是進行一次嘗試,會立即傳回。如果目前線程成功擷取了permits個許可,就傳回true;如果目前線程沒有獲得permits個許可,就傳回false。

(10) tryAcquire(timeout,TimeUnit):限時擷取一個許可。此過程是阻塞的,會一直等待許可,直到發生以下任意一件事:

  • 目前線程擷取了一個許可,則會停止等待,繼續執行,并傳回true。
  • 目前線程等待timeout後逾時,則會停止等待,繼續執行,并傳回false。
  • 目前線程在timeout時間内被中斷,則會抛出InterruptedException異常,并停止等待,繼續執行。

(11) tryAcquire(permits,timeout,TimeUnit):與tryAcquire(timeout,TimeUnit)方法在邏輯上基本相同,不同之處在于:在擷取許可的數量上不同,此方法用于擷取permits個許可。

(12) release():目前線程釋放一個可用的許可。

(13) release(permits):目前線程釋放permits個可用的許可。

(14) drainPermits():目前線程獲得剩餘的所有可用許可。

(15) hasQueuedThreads():判斷目前Semaphore對象上是否存在正在等待許可的線程。

(16) getQueueLength():擷取目前Semaphore對象上正在等待許可的線程數量。

▌使用Semaphore實作的生産者-消費者模式

那接下來我們就看下使用Semaphore實作的生産者-消費者模式的代碼,主要是針對臨界區資源和臨界區代碼進行修改,具體修改如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

public class SemaphorePetStore {
    public static final int MAX_AMOUNT = 10; //資料區長度

    //共享資料區,類定義
    static class DateBuffer<T> {
        //儲存資料
        private LinkedBlockingDeque<T> dataList = new LinkedBlockingDeque<>(MAX_AMOUNT);

        //儲存數量
        private volatile int amount = 0;
        // 每次處理的次數
        private static final int times = 100;

        //信号量辨別
        private static AtomicInteger signal = new AtomicInteger(0);


        // 向資料區增加一個元素
        public void add(T element) throws Exception {
            while (amount < times) {
                if (signal.get() >= 0 && dataList.size() == 0) {
                    synchronized (signal) {
                        //生産者: P操作 -1
                        Print.fo("生産者: P操作 -1 ");
                        signal.incrementAndGet();
                        Print.fo("生産者: 生産,放入一個對象");
                        dataList.add(element);
                        amount++;
                        //生産者: P操作 -1
                        Print.fo("生産者: V操作 +1");
                        signal.decrementAndGet();
                        Print.fo("生産者: 通知消費者,生産者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    }
                } else {
                    Thread.sleep(10);
                }
            }
        }

        /**
         * 從資料區取出一個商品
         */
        public T fetch() throws Exception {
            T element = null;
            while (amount < times) {
                if (signal.get() <= 0 && dataList.size() > 0) {
                    synchronized (signal) {
                        //消費者: P操作 -1
                        Print.fo("消費者: P操作 -1 ");
                        signal.decrementAndGet();
                        Print.fo("消費者: 消費,取出一個對象");
                        element = dataList.take();
                        amount--;
                        //生産者: P操作 -1
                        Print.fo("消費者: V操作 +1");
                        signal.incrementAndGet();
                        Print.fo("消費者: 通知生産者,消費者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    }
                } else {
                    Thread.sleep(10);
                }
            }
            return element;
        }
    }
}           

由于其他代碼未做更改,小夥伴可參考前面的線程不安全的生産者類、消費者類以及組裝生産者-消費者模式的實作。

部分執行結果如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

▌版本4:使用 Blockingqueue 實作

回顧前面: 為了實作 線程安全的生産者-消費者模式實作,可以使用鎖來實作。

  • 比較低級的辦法是用内置鎖,也就是synchronized+wait+notify方式解決
  • 更加進階一點的版本,使用顯示鎖 比如信号量(Semaphore)、Blockingqueue
  • 當然,終極版本是無鎖程式設計 disruptor 的方式來解決,

當然,咱們這個小節,僅僅關注有鎖版本,後面的小節,再無鎖程式設計。

咱們就挨個來看下。那接下我們看下基于 顯示鎖 實作的核心結構 Blockingqueue實作線程安全的生産者-消費者模式。

在多線程環境中,通過BlockingQueue(阻塞隊列)可以很容易地實作多線程之間資料共享和通信。

阻塞隊列與普通隊列(ArrayDeque等)之間的最大不同點在于阻塞隊列提供了阻塞式的添加和删除方法。

(1)阻塞添加

所謂的阻塞添加是指當阻塞隊列元素已滿時,隊列會阻塞添加元素的線程,直隊列元素不滿時,才重新喚醒線程執行元素添加操作。

(2)阻塞删除

阻塞删除是指在隊列元素為空時,删除隊列元素的線程将被阻塞,直到隊列不為空時,才重新喚醒删除線程再執行删除操作。

BlockingQueue的實作類有ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,具體如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

ArrayBlockingQueue是一個常用的阻塞隊列,是基于數組實作的,其内部使用一個定長數組存儲元素。除了一個定長數組外,ArrayBlockingQueue内部還儲存着兩個整型變量,分别辨別隊列的頭部和尾部在數組中的位置。ArrayBlockingQueue的添加和删除操作都是共用同一個鎖對象,由此意味着添加和删除無法并行運作,這一點不同于LinkedBlockingQueue。ArrayBlockingQueue完全可以将添加和删除的鎖分離,進而添加和删除操作完全并行。Doug Lea之是以沒有這樣去做,是因為ArrayBlockingQueue的資料寫入和擷取操作已經足夠輕巧。

LinkedBlockingQueue是基于連結清單的阻塞隊列,其内部也維持着一個資料緩沖隊列(該隊列由一個連結清單構成)。LinkedBlockingQueue對于添加和删除元素分别采用了獨立的鎖來控制資料同步,這也意味着在高并發的情況下生産者和消費者可以并行地操作隊列中的資料,以此來提高整個隊列的并發性能。

DelayQueue中的元素隻有當其指定的延遲時間到了,才能從隊列中擷取到該元素。DelayQueue是一個沒有大小限制的隊列,是以往隊列中添加資料的操作(生産者)永遠不會被阻塞,而隻有擷取資料的操作(消費者)才會被阻塞。DelayQueue使用場景較少,但是相當巧妙,常見的例子比如使用一個DelayQueue來管理一個逾時未響應的連接配接隊列。

基于優先級的阻塞隊列和DelayQueue類似,PriorityBlockingQueue并不會阻塞資料生産者,而隻會在沒有可消費的資料時,阻塞資料的消費者。在使用的時候要特别注意,生産者生産資料的速度絕對不能快于消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。

相對于有緩沖的阻塞隊列(如LinkedBlockingQueue)來說,SynchronousQueue少了中間緩沖區(如倉庫)的環節。如果有倉庫,生産者直接把商品批發給倉庫,不需要關心倉庫最終會将這些商品發給哪些消費者,由于倉庫可以中轉部分商品,總體來說有倉庫進行生産和消費的吞吐量高一些。

反過來說,又因為倉庫的引入,使得商品從生産者到消費者中間增加了額外的交易環節,單個商品的及時響應性能可能會降低,是以對單個消息的響應要求高的場景可以使用SynchronousQueue。聲明一個SynchronousQueue有兩種不同的方式:公平模式和非公平模式。公平模式的SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多餘的生産者和消費者,進而體系整體的公平政策。非公平模式(預設情況)的SynchronousQueue采用非公平鎖,同時配合一個LIFO堆棧(TransferStack内部執行個體)來管理多餘的生産者和消費者。對于後一種模式,如果生産者和消費者的處理速度有差距,則很容易出現線程饑渴的情況,即可能出現某些生産者或者消費者的資料永遠都得不到處理。

了解完阻塞隊列的基本方法、主要類型之後,下面通過ArrayBlockingQueue隊列實作一個生産者-消費者的案例。

具體的代碼在前面的生産者和消費者實作基礎上進行疊代——Consumer(消費者)和Producer(生産者)通過ArrayBlockingQueue隊列擷取和添加元素。其中,消費者調用了take()方法擷取元素,當隊列沒有元素就阻塞;生産者調用put()方法添加元素,當隊列滿時就阻塞。通過這種方式便實作生産者-消費者模式,比直接使用等待喚醒機制或者Condition條件隊列更加簡單。基于ArrayBlockingQueue的生産者和消費者實作版本具體的UML類圖如下

美團一面:如何實作一個100W ops 生産者消費者程式?

出于“分離變與不變”的原則,此版本的Producer(生産者)、Consumer(消費者)等的邏輯不用變化,直接複用前面原的代碼即可。此版本DataBuffer(共享資料區)需要變化,使用一個ArrayBlockingQueue用于緩存資料,具體的代碼如下:

// Created by 尼恩@瘋狂創客圈.   源碼來自 《Java高并發核心程式設計 卷2 加強版》

public class ArrayBlockingQueuePetStore {

    public static final int MAX_AMOUNT = 10; //資料區長度


    //共享資料區,類定義
    static class DateBuffer<T> {
        //儲存資料
        private ArrayBlockingQueue<T> dataList = new ArrayBlockingQueue<>(MAX_AMOUNT);


        // 向資料區增加一個元素
        public void add(T element) throws Exception {
            dataList.put(element);
        }

        /**
         * 從資料區取出一個商品
         */
        public T fetch() throws Exception {
            return dataList.take();
        }
    }
}           

運作程式,部分執行結果如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

▌鎖的代價

鎖提供了互斥,并能夠確定變化能夠以一個确定的順序讓其它的線程看見。

鎖其實是很昂貴的,因為他們在競争的時候需要進行仲裁。這個仲裁會涉及到作業系統的上下文切換,作業系統會挂起所有在等待這把鎖的線程,直到鎖持有者釋放該鎖。

上下文切換期間,執行線程會喪失對作業系統的控制,導緻執行線程的執行上下文丢失之前緩存的資料和指令集,這會給現代處理器帶來嚴重的性能損耗。

當然效率更高的使用者态鎖是另一種選擇,但使用者鎖隻有在沒有競争的時候才真正會帶來益處。

注:因為使用者态的鎖往往是通過自旋鎖來實作(或者帶休眠的自旋鎖),而自旋在競争激烈的時候開銷是很大的(一直在消耗CPU資源)。

網上有小夥伴為了進行效果驗證,寫了一個很簡單程式,就是調用一個循環5億次遞增操作的函數。

這個java函數在單線程,2.4G Intel Westmere EP的CPU上隻需要300ms。

一旦引入鎖,即使沒有發生競争,程式的執行時間也會發生顯著的增加。

循環5億次遞增操作 實驗結果如下:

Method Time (ms)
Single thread 300
Single thread with lock 10,000
Two threads with lock 224,000
Single thread with CAS 5,700
Two threads with CAS 30,000
Single thread with volatile write 4,700

▌CAS的代價

無鎖程式設計 場景中, 線程之間的協調 主要使用 CAS的機制。 但是 從上面的實驗看到, CAS也是有代價的。

為啥呢?

CAS依賴于處理器的支援,當然大部分現代處理器都支援。

CAS相對于鎖是非常高效的,因為它不需要涉及核心上下文切換進行仲裁。

但cas并不是免費的,處理器需要對CPU指令pipeline加鎖以確定原子性,

并且cas隻保證原子性,不保證可見性,是以cas一般和 volatile記憶體屏障一起使用,以確定對其他線程的可見性。

尼恩備注:cas+ volatile記憶體屏障 的底層原理,非常重要。

如果大家對 cas+ volatile記憶體屏障 的知識不清楚, 請細緻閱讀尼恩 《Java高并發核心程式設計 卷2 》, 這本書做了非常詳細的介紹。

▌版本5:無鎖實作生産者-消費者模式版本

回顧前面: 為了實作 線程安全的生産者-消費者模式實作,可以使用鎖來實作。

  • 比較低級的辦法是用内置鎖,也就是synchronized+wait+notify方式解決
  • 更加進階一點的版本,使用顯示鎖 比如信号量(Semaphore)、Blockingqueue
  • 當然,終極版本是無鎖程式設計 disruptor 的方式來解決,

咱們就挨個來看下無鎖版本。

為了提升性能,需要使用 CAS實作生産者、消費者。

從實操的角度來說,CAS的一個問題就是太複雜了,本來用鎖進行并發程式設計就已經很頭疼了,用CAS來實作複雜邏輯就更頭痛了。

但有一個好消息是,目前有一個現成的Disruptor架構,它已經幫助我們實作了這一個功能。

▌Disruptor架構的簡單介紹

Disruptor架構有着1000W級ops性能,非常複雜的底層原理,光介紹清楚這個架構,尼恩的 《Disruptor 紅寶書》 PDF 電子書就有100多頁。

具體的Disruptor架構底層原理,請參見尼恩的 《Disruptor 紅寶書》 PDF ,和《100Wqps日志平台實操》視訊。

這裡僅僅對這個 架構進行簡單介紹。

Disruptor架構是由LMAX公司開發的一款高效的無鎖記憶體隊列。

它使用無鎖的方式實作了一個環形隊列(RingBuffer),非常适合實作生産者-消費者模式,比如事件和消息的釋出。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名項目都應用了Disruptor以擷取高性能。

Disruptor架構别出心裁地使用了環形隊列來代替普通線形隊列,這個環形隊列内部實作為一個普通的數組。

對于一般的隊列,勢必要提供隊列同步head和尾部tail兩個指針,用于出隊和入隊,這樣無疑增加了線程協作的複雜度。但如果隊列是環形的,則隻需要對外提供一個目前位置cursor,利用這個指針既可以進行入隊操作,也可以進行出隊操作。由于環形隊列的緣故,隊列的總大小必須事先指定,不能動态擴充。為了能夠快速從一個序列(sequence)對應到數組的實際位置(每次有元素入隊,序列就加1),Disruptor架構要求我們必須将數組的大小設定為2的整數次方。這樣通過sequence &(queueSize-1)就能立即定位到實際的元素位置index,這比取餘(%)操作快得多。

如果queueSize是2的整數次幂,則這個數字的二進制表示必然是10、100、1000、10000等形式。是以,queueSize-1的二進制則是一個全1的數字。是以它可以将sequence限定在queueSize-1範圍内,并且不會有任何一位是浪費的。

RingBuffer的結構如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

其實質隻是一個普通的數組,隻是當放置資料填充滿隊列(即到達2^n-1位置)之後,再填充資料,就會從0開始,覆寫之前的資料,于是就相當于一個環。

RingBuffer的指針(Sequence)屬于一個volatile變量,同時也是我們能夠不用鎖操作就能實作Disruptor的原因之一,而且通過緩存行補充,避免僞共享問題。 該所謂指針是通過一直自增的方式來擷取下一個可寫或者可讀資料,該資料是Long類型,不用擔心會爆掉。有人計算過: long的範圍最大可以達到9223372036854775807,一年365 * 24 * 60 * 60 = 31536000秒,每秒産生1W條資料,也可以使用292年。

Disruptor 不像傳統的隊列,分為一個隊頭指針和一個隊尾指針,而是隻有一個角标(上面的seq),

在Disruptor中生産者分為單生産者和多生産者,在枚舉類ProducerType中定義單生産(SINGLE)和多生産(MULTI)。而消費者并沒有區分。

單生産者情況下,就是普通的生産者向RingBuffer中放置資料,消費者擷取最大可消費的位置,并進行消費。單生産者線程寫資料的流程比較簡單,具體如下:

(1)申請寫入m個元素;

(2)若是有m個元素可以入,則傳回最大的序列号。這兒主要判斷是否會覆寫未讀的元素;

(3)若是傳回的正确,則生産者開始寫入元素。

美團一面:如何實作一個100W ops 生産者消費者程式?

采用多生産者時,會遇到“如何防止多個線程重複寫同一個元素”的問題。Disruptor的解決方法是,每個線程擷取不同的一段數組空間進行操作。這個通過CAS很容易達到。隻需要在配置設定元素的時候,通過CAS判斷一下這段空間是否已經配置設定出去即可。

但是又會碰到新問題:如何防止讀取的時候,讀到還未寫的元素。那麼Disruptor引入了一個跟RingBuffer同樣大小的Buffer,稱為AvailableBuffer。當某個位置寫入成功的時候,便把availble Buffer相應的位置置位,标記為寫入成功。讀取的時候,會周遊available Buffer,來判斷元素是否已經就緒。多生産者流程如下:

(1)申請寫入m個元素;

(2)若是有m個元素可以寫入,則傳回最大的序列号。每個生産者會被配置設定一段獨享的空間;

(3)生産者寫入元素,寫入元素的同時設定available Buffer裡面相應的位置,以标記自己哪些位置是已經寫入成功的。

美團一面:如何實作一個100W ops 生産者消費者程式?

那麼生産者和消費者模式在RingBuffer上的情況如下

美團一面:如何實作一個100W ops 生産者消費者程式?

生産者向緩沖區中寫入資料,而消費者從中讀取資料。生産者寫入資料時,使用CAS操作,消費者讀取資料時,為了防止多個消費者處理同一個資料,也使用CAS操作進行資料保護。

▌ArrayBlockingQueue 和 Disruptor 的性能PK

參考文獻中,有小夥伴選取了Doug Lea的ArrayBlockingQueue的實作作為參考目标進行測試,ArrayBlockingQueue是所有有界隊列中性能最好的,測試是按照阻塞的方式進行的。

美團一面:如何實作一個100W ops 生産者消費者程式?
美團一面:如何實作一個100W ops 生産者消費者程式?
美團一面:如何實作一個100W ops 生産者消費者程式?
美團一面:如何實作一個100W ops 生産者消費者程式?
美團一面:如何實作一個100W ops 生産者消費者程式?

下表展示了總共處理5億條消息時每秒吞吐量的性能測試結果,

測試環境為:沒有HT的1.6.0_25 64-bit Sun JVM, Windows 7, Intel Core i7 860 @ 2.8 GHz ,以及Intel Core i7-2720QM, Ubuntu 11.04。

我們取了最好的前三條結果,這個結果使用于任何JVM運作環境,表中顯示的結果并不是我們發現最好的結果。

Nehalem 2.8Ghz – Windows 7 SP1 64-bit Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit
ABQ Disruptor ABQ Disruptor
Unicast: 1P – 1C 5,339,256 25,998,336 4,057,453 22,381,378
Pipeline: 1P – 3C 2,128,918 16,806,157 2,006,903 15,857,913
Sequencer: 3P – 1C 5,539,531 13,403,268 2,056,118 14,540,519
Multicast: 1P – 3C 1,077,384 9,377,871 260,733 10,860,121
Diamond: 1P – 3C 2,113,941 16,143,613 2,082,725 15,295,197

無論在linux 環境在是在windows 環境, 無論 是多個生産者還是單個生産者, Disruptor 的性能穩穩的都在 1000W ops 以上。

▌基于Disruptor的實作100W級ops+ 生産者和消費者設計

基于Disruptor的高性能生産者和消費者模式的類圖如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

MsgEven 是存放資料對象的載體,具體代碼如下:

public class MsgEven {

    private IGoods goods;

    public IGoods getGoods() {
        return goods;
    }

    public void setGoods(IGoods goods) {
        this.goods = goods;
    }
}           

消費者的作用是讀取資料進行處理。

這裡,資料的讀取已經由Disruptor架構進行封裝了,onEvent()方法為架構的回調方法。

是以,隻需要簡單地進行資料處理即可。

具體代碼如下:

public class Consumer  implements EventHandler<MsgEven> {

    //消費的時間間隔,預設等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消費者對象編号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消費者名稱
    String name;


    public Consumer() {
        name = "消費者-" + CONSUMER_NO.incrementAndGet();
    }


    @Override
    public void onEvent(MsgEven msgEven, long sequence, boolean endOfBatch)  {
  
        Print.tcfo("消費者中:"+sequence+"商品資訊:"+msgEven.getGoods());
    }
}           

需要一個産生MsgEven 對象的工廠類GoodsFactory。

它會在Disruptor架構系統初始化時,構造所有的緩沖區中的對象執行個體,具體代碼如下:

public class GoodsFactory implements EventFactory<MsgEven> {
    @Override
    public MsgEven newInstance() {
        return new MsgEven();
    }
}           

生産者需要一個RingBuffer的引用,也就是環形緩沖區。

它有一個重要的方法add()将産生的資料推入緩沖區。方法add()接收一個IGood對象。

add()方法的功能就是将傳入的IGood對象中的資料提取出來,并裝載到環形緩沖區中。

具體代碼如下:

public class Produer {

    //生産者對象編号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生産者名稱
    String name = null;



    private  final RingBuffer<MsgEven> ringBuffer ;

    public Produer(RingBuffer<MsgEven> ringBuffer) {
        name = "生産者-" + PRODUCER_NO.incrementAndGet();
        this.ringBuffer = ringBuffer;
    }

    public  void add(IGoods goods){
        // 1.ringBuffer 事件隊列 下一個槽
        long sequence = ringBuffer.next();
        try {
            //2.取出空的事件隊列
            MsgEven    msgEven= ringBuffer.get(sequence);
            msgEven.setGoods(goods);
            //3.擷取事件隊列傳遞的資料
            Print.cfo("生産者名稱:"+name+",生産商品:"+goods.toString());
        }finally {
            //4.釋出事件
            ringBuffer.publish(sequence);
        }
    }
}           

我們的生産者、消費者和資料都已經準備就緒,隻差一個統籌規劃的主函數将所有的内容整合起來。具體代碼如下:

public class DisruptorPetStore {
    public static void main(String[] args) throws InterruptedException {
        // 1.建立工廠
        GoodsFactory dateBufferFactory= new GoodsFactory();
        //2.建立ringBuffer 大小,大小一定要是2的N次方
        int bufferSize=1024*1024;

        //3.建立Disruptor
        Disruptor<MsgEven>  disruptor = new Disruptor<MsgEven>(dateBufferFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());
        //4.設定事件處理器 即消費者
        disruptor.handleEventsWith(new Consumer());
        // 5.啟動
        disruptor.start();

        // 6.建立RingBuffer容器
        RingBuffer<MsgEven> ringBuffer= disruptor.getRingBuffer();

        //7.建立生産者
        Produer produer = new Produer(ringBuffer);

        for (int l=0;true;l++){
            IGoods goods = Goods.produceOne();
            produer.add(goods);
            Thread.sleep(100);

        }
    }
}           

部分執行結果如下:

美團一面:如何實作一個100W ops 生産者消費者程式?

學習生産者-消費者模式學習的思想, 消息隊列、緩存中也有生産者-消費者模式的思想。

▌尼恩總結:

生産者、消費者,是一道高頻的面試題,非常高頻,也非常考驗水準。

如果按照上面的套路去作答, 無論是美團,還是華為,或者其他的大廠面試官,都會對你 獻上膝蓋。

如果面試過程中, 遇到什麼問題,可以來 《技術自由圈》 社群交流。

▌作者介紹:

本文1作: 唐歡,資深架構師, 《Java 高并發核心程式設計 加強版 》作者之1 。

本文2作: 尼恩,40歲資深老架構師, 《Java 高并發核心程式設計 加強版 卷1、卷2、卷3》創世作者, 著名部落客 。 《K8S學習聖經》《Docker學習聖經》等11個PDF 聖經的作者。

美團一面:如何實作一個100W ops 生産者消費者程式?

▌相關的面試題:

最後,尼恩再給大家來幾道相關的面試題

▌聊聊:如何寫代碼來解決生産者消費者問題?

在現實中你解決的許多線程問題都屬于生産者消費者模型,就是一個線程生産任務供其它線程進行消費,你必須知道怎麼進行線程間通信來解決這個問題。

比較低級的辦法是用wait和notify來解決這個問題,比較贊的辦法是用Semaphore 或者 BlockingQueue來實作生産者消費者模型。

▌聊聊:什麼是竟态條件?

在大多數實際的多線程應用中,兩個或兩個以上的線程需要共享對同一資料的存取。

如果i線程存取相同的對象,并且每一個線程都調用了一個修改該對象狀态的方法,将會發生什麼呢?

可以想象,線程彼此踩了對方的腳。

根據線程通路資料的次序,可能會産生訛誤的對象。這樣的情況通常稱為競争條件。

▌聊聊:Java中Semaphore是什麼?

Java中的Semaphore是一種新的同步類,它是一個計數信号。

從概念上講,從概念上講,信号量維護了一個許可集合。

如有必要,在許可可用前會阻塞每一個 acquire(),然後再擷取該許可。

每個 release()添加一個許可,進而可能釋放一個正在阻塞的擷取者。

但是,不使用實際的許可對象,Semaphore隻對可用許可的号碼進行計數,并采取相應的行動。

信号量常常用于多線程的代碼中,比如資料庫連接配接池。

▌聊聊:java中wait()的核心原理是什麼?

1)當線程調用了locko(某個同步鎖對象)的wait()方法後,JVM會将目前線程加入locko螢幕的WaitSet(等待集),等待被其他線程喚醒。

2)目前線程會釋放locko對象螢幕的Owner權利,讓其他線程可以搶奪locko對象的螢幕。

3)讓目前線程等待,其狀态變成WAITING。

▌聊聊:java中的notify()的核心原理是什麼?

1)當線程調用了locko(某個同步鎖對象)的notify()方法後,JVM會喚醒locko螢幕WaitSet中的第一個等待線程。

2)當線程調用了locko的notifyAll()方法後,JVM會喚醒locko螢幕WaitSet中的所有等待線程。

3)等待線程被喚醒後,會從螢幕的WaitSet移動到EntryList,線程具備了排隊搶奪螢幕Owner權利的資格,其狀态從WAITING變成BLOCKED。

4)EntryList中的線程搶奪到螢幕Owner權利之後,線程的狀态從BLOCKED變成Runnable,具備重新執行的資格。

▌聊聊:談一下synchronized的作用

synchronized 關鍵字主要用來解決的是多線程同步問題,其可以保證在被其修飾的代碼任意時 刻隻有一個線程執行。

視情況而定,(主動)說出它的用法及底層實作原理(使用的是 moniterenter 和 moniterexit指令…)。

▌聊聊:同時通路synchronized的靜态和非靜态方法,能保證線程安全嗎?

不能,兩者的鎖對象不一樣。

前者是類鎖(XXX.class), 後者是this

▌聊聊:同時通路synchronized方法和非同步方法,能保證線程安全嗎?

不能,因為synchronized隻會對被修飾的方法起作用。

▌聊聊:兩個線程同時通路兩個對象的非靜态同步方法能保證線程安全嗎?

不能,每個對象都擁有一把鎖。兩個對象相當于有兩把鎖,導緻鎖對象不一緻。(PS:如果 是類鎖,則所有對象共用一把鎖)

▌聊聊:若synchronized方法抛出異常,會導緻死鎖嗎?

JVM會自動釋放鎖,不會導緻死鎖問題。

▌聊聊:若synchronized的鎖對象能為空嗎?會出現什麼情況?

鎖對象不能為空,否則抛出NPE(NullPointerException)

▌聊聊:synchronized的繼承性問題

重寫父類的synchronized的方法,主要分為兩種情況:

(1)子類的方法沒有被synchronized修飾:

synchronized的不具備繼承性。是以子類方法是線程不安全的。

(2)子類的方法被synchronized修飾

兩個鎖對象其實是一把鎖,而且是子類對象作為鎖。

這也證明了: synchronized的鎖是可重入鎖。否則将出現死鎖問題。

▌聊聊:如何在兩個線程間共享資料?

可以通過共享對象來實作這個目的,或者是使用像阻塞隊列這樣并發的資料結構。

▌聊聊:Java中notify 和 notifyAll有什麼差別?

因為多線程可以等待單監控鎖,Java API 的設計人員提供了一些方法當等待 條件改變的時候通知它們,但是這些方法沒有完全實作。notify()方法不能喚醒某個具體的線程,是以隻有一個線程在等待的時候它才有用武之地。而notifyAll()喚醒所有線程并允許他們争奪鎖確定了至少有一個線程能繼續運作。

▌聊聊:為什麼wait, notify 和 notifyAll這些方法不在thread類裡面?

一個很明顯的原因是JAVA提供的鎖是對象級的而不是線程級的,每個對象都有鎖,通過線程獲得。如果線程需要等待某些鎖那麼調用對象中的wait()方法就有意義了。如果wait()方法定義在Thread類中,線程正在等待的是哪個鎖就不明顯了。簡單的說,由于wait,notify和notifyAll都是鎖級别的操作,是以把他們定義在Object類中因為鎖屬于對象。

▌聊聊:為什麼wait和notify方法要在同步塊中調用?

當一個線程需要調用對象的wait()方法的時候,這個線程必須擁有該對象的鎖,接着它就會釋放這個對象鎖并進入等待狀态直到其他線程調用這個對象上的notify()方法。同樣的,當一個線程需要調用對象的notify()方法時,它會釋放這個對象的鎖,以便其他在等待的線程就可以得到這個對象鎖。由于所有的這些方法都需要線程持有對象的鎖,這樣就隻能通過同步來實作,是以他們隻能在同步方法或者同步塊中被調用。如果你不這麼做,代碼會抛出IllegalMonitorStateException異常。

▌什麼是 Disruptor?它有哪些特點?

Disruptor 是一個高性能的無鎖并發架構,用于解決在多線程場景下的資料共享和通信問題。它采用了環形緩沖區(RingBuffer)來存儲事件,并通過控制序列(Sequence)來實作線程間的協作。Disruptor 的特點包括:高性能、低延遲、可擴充、無鎖化等。

▌Disruptor 如何實作無鎖并發?它的核心原理是什麼?

Disruptor 實作無鎖并發的核心原理是使用了 CAS(Compare And Swap)算法以及 volatile 變量保證資料的原子性和可見性。此外,Disruptor 還使用了基于序列的技術來實作線程間的協作。

▌Disruptor 的優缺點是什麼?

Disruptor 的優點包括高性能、低延遲、可擴充、無鎖化等。 缺點包括學習成本較高、應用場景相對局限等。

▌Disruptor 中的 RingBuffer 是什麼?有哪些作用?

RingBuffer 是 Disruptor 中的核心元件之一,它是一個環形的緩沖區,用于存儲事件。RingBuffer 的作用包括存儲資料、實作高并發以及提供線程間的協作。

▌Disruptor 中的 Sequence 是什麼?有哪些作用?

Sequence 是 Disruptor 中的另一個核心元件,它是一個單調遞增的序列,用于控制事件的釋出和消費。Sequence 的作用包括控制事件的釋出和消費、實作線程間的協作等。

▌Disruptor 中的 EventProcessor 是什麼?有哪些作用?

EventProcessor 是 Disruptor 中的一個概念,它主要負責處理事件,并将事件從 RingBuffer 中取出來進行處理。EventProcessor 的作用包括實作事件的處理、實作線程間的協作等。

▌Disruptor 中的 WaitStrategy 是什麼?有哪些類型?

WaitStrategy 是 Disruptor 中用于實作線程間協作的政策之一,它主要決定了消費者在沒有可用事件時如何等待。Disruptor 提供了多種 WaitStrategy,包括 BlockingWaitStrategy、BusySpinWaitStrategy、LiteBlockingWaitStrategy 等。

▌Disruptor 的多生産者模型和多消費者模型分别是什麼?如何實作?

Disruptor 支援多生産者和多消費者模型。多生産者模型指的是多個生産者往 RingBuffer 中寫入資料,多消費者模型指的是多個消費者從 RingBuffer 中讀取資料。實作多生産者和多消費者模型需要使用不同的序列來控制并發操作。

▌Disruptor 如何保證資料的順序性?

Disruptor 通過使用控制序列(Sequence)來實作資料的順序性。在生産者往 RingBuffer 中寫入資料時,會更新生産者的序列;在消費者從 RingBuffer 中讀取資料時,會更新消費者的序列。通過對序列的控制,可以保證事件的順序性。

▌Disruptor 如何保證資料的可見性?

Disruptor 使用 volatile 變量和 CAS 算法來保證資料的可見性。

在生産者往 RingBuffer 中寫入資料時,會使用 volatile 變量來確定資料的可見性;在消費者從 RingBuffer 中讀取資料時,會使用 CAS 算法來保證對序列的原子性和可見性,進而保證資料的可見性。

▌Disruptor 的實作原理是什麼?

Disruptor 實作原理主要包括以下幾個方面:

  • 使用環形緩沖區(RingBuffer)存儲事件
  • 使用控制序列(Sequence)控制事件的釋出和消費
  • 使用多生産者和多消費者模型提高并發性能
  • 使用無鎖算法保證線程安全等。

▌Disruptor 與傳統的線程池有什麼差別?

Disruptor 和傳統的線程池相比,具有更高的并發性能和更低的延遲。

這是因為 Disruptor 使用了無鎖算法和基于序列的技術來實作資料共享和通信,避免了線程間的互斥和同步操作,進而提高了并發性能,并且由于沒有線程切換的開銷,也可以降低延遲。

▌Disruptor 如何處理異常?

Disruptor 中的異常處理需要由應用程式自己實作。

一般來說,可以在 EventProcessor 中捕獲異常,并進行相應的處理。如果不進行處理,異常可能會導緻整個系統崩潰。

▌聊聊:Disruptor 适用于哪些場景?

Disruptor 适用于需要高性能、低延遲、大規模并發、對資料順序有要求等場景,例如高頻交易系統、大規模資料處理系統、實時消息系統等。

▌聊聊:Disruptor 如何進行性能測試?

Disruptor 的性能測試可以使用 JMH(Java Microbenchmark Harness)架構進行。

常見的性能測試包括吞吐量測試、延遲測試、競争測試等。

▌題外話:JMH 是什麼?

JMH:即(Java Microbenchmark Harness),它是由 Java 官方團隊開發的一款用于 Java 微基準測試工具。

基準測試:是指通過設計科學的測試方法、測試工具和測試系統,實作對一類測試對象的某項性能名額進行定量的和可對比的測試。

比如魯大師、安兔兔,都是按一定的基準或者在特定條件下去測試某一對象的的性能,比如顯示卡、IO、CPU 之類的。

JMH 官網:http://openjdk.java.net/projects/code-tools/jmh

▌JMH 和 JMeter 的不同?

JMeter 更多的是對 rest api 進行壓測,而 JMH 關注的粒度更細,它更多的是發現某塊性能槽點代碼,然後對優化方案進行基準測試對比。比如 json 序列化方案對比,bean copy 方案對比等。

▌聊聊:Disruptor 如何保證線程安全?

Disruptor 使用了無鎖算法和基于序列的技術來保證線程安全。

具體而言,它使用 CAS 算法和 volatile 變量來保證資料的原子性和可見性,使用控制序列來實作線程間的協作,避免了線程間的互斥和同步操作,進而保證線程安全。

▌聊聊:Disruptor 中的 Sequence Barrier 是什麼?有哪些作用?

Sequence Barrier 是 Disruptor 中用于實作線程間協作的元件之一,它主要負責控制消費者的讀取進度,并阻塞消費者直到事件可用。

Sequence Barrier 的作用包括實作線程間的協作、控制消費者的讀取進度、提高并發性能等。

▌聊聊:Disruptor 中的 BatchEventProcessor 是什麼?有哪些作用?

BatchEventProcessor 是 Disruptor 中的一個概念,它是一個高性能的事件處理器,可以批量地處理事件。

BatchEventProcessor 的作用包括實作事件的處理、實作線程間的協作等。

▌聊聊:Disruptor 如何解決資料競争問題?

Disruptor 使用無鎖算法和基于序列的技術來避免資料競争問題。

具體而言,它使用 CAS 算法和 volatile 變量來保證資料的原子性和可見性,使用控制序列來實作線程間的協作,進而避免了線程間的互斥和同步操作,避免了資料競争問題。

▌聊聊:Disruptor 中的 EventTranslator 是什麼?有哪些作用?

EventTranslator 是 Disruptor 中實作事件釋出的一種方式,它可以将使用者定義的資料轉換為 Disruptor 需要的事件格式,并釋出到 RingBuffer 中。

EventTranslator 的作用包括簡化事件釋出的流程、提高代碼的可讀性等,同時還可以提高性能,因為它可以避免在生産者線程和 RingBuffer 之間進行對象複制的操作。

通過 EventTranslator,使用者可以自定義事件,并将其釋出到 RingBuffer 中,讓消費者線程進行處理。

▌聊聊:Disruptor 如何處理高并發場景下的競争問題?

Disruptor 通過使用 RingBuffer 的多個序号來實作消費者和生産者之間的解耦。

RingBuffer 的每個序号都對應着一個槽位,生産者将事件釋出到槽位中,并更新序号。

消費者根據各自的需要擷取槽位中的事件,并更新對應的序号。

這種方式避免了生産者和消費者之間的競争,進而提高了系統性能。

▌聊聊:Disruptor 中的 TimeoutBlockingWaitStrategy 是什麼?有哪些作用?

TimeoutBlockingWaitStrategy 是一種等待政策,它會在一定的時間内進行等待,并且在逾時後抛出異常。

這種等待政策可以在需要限制等待時間的場景中使用,例如在生産者線程向 RingBuffer 釋出事件時,限制等待時間可以保證生産者不會一直阻塞在 RingBuffer 上,進而避免系統出現死鎖等問題。

▌聊聊:Disruptor 如何實作有界隊列?

Disruptor 使用 RingBuffer 來實作有界隊列,RingBuffer 的容量就是隊列的大小。

當 RingBuffer 的可用空間被填滿後,新的事件将無法繼續釋出,進而實作了對隊列大小的控制。

▌聊聊:Disruptor 在實際項目中的應用有哪些經驗?

Disruptor 主要用于解決高性能的并發程式設計問題,例如在金融領域的交易系統、廣告系統中的實時資料處理等場景。

在實際應用中,需要根據具體的業務需求來選擇相應的政策和配置參數。

▌聊聊:Disruptor 的性能瓶頸在哪裡?如何避免?

Disruptor 的性能瓶頸主要在于 RingBuffer 的讀寫操作和事件處理器的邏輯處理。

為了避免性能瓶頸,可以使用合适的等待政策、線程池大小和事件處理器數量等政策,并且盡可能避免在事件處理器中進行複雜的計算或者 IO 操作。

▌聊聊:Disruptor 和 Kafka 中的消息隊列相比有哪些異同?

Disruptor 和 Kafka 都是高性能的消息隊列,但是它們的設計目标和應用場景有所不同。

Disruptor 的設計目标是提供一個非常快速、可預測的記憶體消息傳遞機制,用于實作高性能的并發程式設計。

而 Kafka 的設計目标是建立一個高可靠、可擴充、持久化的分布式消息隊列,用于實作大規模資料的流式處理。

▌聊聊:Disruptor 是否适用于分布式系統?

Disruptor 是一種本地記憶體消息傳遞機制,不适用于分布式系統。

如果需要在分布式環境中使用 Disruptor,可以考慮使用類似于 Kafka 的分布式消息隊列來代替。

▌聊聊:Disruptor 是否适用于對資料一緻性要求較高的場景?

Disruptor 是一種記憶體模型,在單個 JVM 中提供了高效且可預測的消息傳遞機制,是以适用于對資料一緻性要求較高的場景。

但是,如果需要實作跨程序或者跨機器的高一緻性需求,需要考慮使用分布式鎖等更為複雜的機制。

▌聊聊:Disruptor 是否适用于記憶體受限的場景?

Disruptor 在記憶體使用上比較高效,但如果在記憶體受限的場景下,需要根據具體情況進行評估。

Disruptor 的記憶體消耗主要來自于 RingBuffer 和事件對象本身,可以通過減小 RingBuffer 大小和優化事件對象等方式來降低記憶體消耗。

同時,也可以考慮使用壓縮算法等方式來減小事件對象的大小。是以,在記憶體受限的場景下,需要結合具體的業務需求和系統限制來進行評估。

▌聊聊:Disruptor 是否适用于長時間阻塞的場景?

Disruptor 不适用于長時間阻塞的場景。

Disruptor 的設計初衷是為了解決高并發、低延遲的場景,即在處理速度和吞吐量方面具有優勢。

在 Disruptor 中,生産者和消費者之間通過環形緩沖區進行資料交換,但如果某個消費者在處理某個事件時出現了長時間阻塞,則會影響其他消費者的處理效率,降低整個系統的性能。

如果需要處理長時間阻塞的任務,可以考慮使用線程池等方式來解決。

同時,也可以對 Disruptor 進行擴充,實作類似于逾時機制的功能,當某個消費者的處理時間超過一定門檻值時,自動放棄該事件的處理,避免長時間阻塞對整個系統的影響。

▌參考文獻:

https://www.cnblogs.com/daoqidelv/p/7043696.html

清華大學出版社《Java高并發核心程式設計 卷2 加強版》

《隊列之王 Disruptor 紅寶書》

▌技術自由的實作路徑 PDF擷取:

▌實作你的架構自由:

  • 《吃透8圖1模闆,人人可以做架構》PDF
  • 《10Wqps評論中台,如何架構?B站是這麼做的!!!》PDF
  • 《阿裡二面:千萬級、億級資料,如何性能優化? 教科書級 答案來了》PDF
  • 《峰值21WQps、億級DAU,小遊戲《羊了個羊》是怎麼架構的?》PDF
  • 《100億級訂單怎麼排程,來一個大廠的極品方案》PDF
  • 《2個大廠 100億級 超大流量 紅包 架構方案》PDF

… 更多架構文章,正在添加中

▌實作你的 響應式 自由:

  • 《響應式聖經:10W字,實作Spring響應式程式設計自由》PDF
  • 這是老版本 《Flux、Mono、Reactor 實戰(史上最全)》PDF

▌實作你的 spring cloud 自由:

  • 《Spring cloud Alibaba 學習聖經》 PDF
  • 《分庫分表 Sharding-JDBC 底層原理、核心實戰(史上最全)》PDF
  • 《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之間混亂關系(史上最全)》PDF

▌實作你的 linux 自由:

  • 《Linux指令大全:2W多字,一次實作Linux自由》PDF

▌實作你的 網絡 自由:

  • 《TCP協定詳解 (史上最全)》PDF
  • 《網絡三張表:ARP表, MAC表, 路由表,實作你的網絡自由!!》PDF

▌實作你的 分布式鎖 自由:

  • 《Redis分布式鎖(圖解 - 秒懂 - 史上最全)》PDF
  • 《Zookeeper 分布式鎖 - 圖解 - 秒懂》PDF

▌實作你的 王者元件 自由:

  • 《隊列之王: Disruptor 原理、架構、源碼 一文穿透》PDF
  • 《緩存之王:Caffeine 源碼、架構、原理(史上最全,10W字 超級長文)》PDF
  • 《緩存之王:Caffeine 的使用(史上最全)》PDF
  • 《Java Agent 探針、位元組碼增強 ByteBuddy(史上最全)》PDF

▌實作你的 面試題 自由:

4000頁《尼恩Java面試寶典》PDF 40個專題

....

注:以上尼恩 架構筆記、面試題 的PDF檔案,請到《技術自由圈》公衆号擷取

還需要啥自由,可以告訴尼恩。 尼恩幫你實作.......

繼續閱讀