天天看點

網際網路架構(6):并發程式設計--Disruptor并發架構

6 Disruptor并發架構簡介

Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平台,它能夠以很低的延遲産生大量交易。這個系統是建立在JVM平台上,其核心是一個業務邏輯處理器,

它能夠在一個線程裡每秒處理6百萬訂單

。業務邏輯處理器完全是運作在記憶體中,使用

事件源驅動方式

。業務邏輯處理器的核心是Disruptor。

Disruptor它是一個開源的并發架構,并獲得2011 Duke’s 程式架構創新獎,能夠在無鎖的情況下實作網絡的Queue并發操作。

Disruptor是一個高性能的異步處理架構,或者可以認為是最快的消息架構(輕量的JMS),也可以認為是一個觀察者模式的實作,或者事件監聽模式的實作。

目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本性能更加的優秀,提供更多的API使用方式。

下載下傳disruptor-3.3.2.jar引入我們的項目既可以開始disruptor之旅。

在使用之前,首先說明disruptor主要功能加以說明,你可以了解為他是一種高效的”生産者-消費者”模型。也就性能遠遠高于傳統的BlockingQueue容器。

官方學習網站:http://ifeve.com/disruptor-getting-started/

(1)使用Disruptor
  • 第一:建立一個Event類,用來承載資料,因為Disruptor是一個事件驅動的,是以再Disruptor中是以事件綁定資料進行傳遞的
  • 第二:建立一個工廠Event類,用于建立Event類執行個體對象
  • 第三:需要有一個監聽事件類,用于處理資料(Event類)
  • 第四:我們需要進行測試代碼編寫。執行個體化Disruptor執行個體,配置一系列參數。然後我們對Disruptor執行個體綁定監聽事件類,接受并處理資料。
  • 第五:在Disruptor中,真正存儲資料的核心叫做RingBuffer,我們通過Disruptor執行個體拿到它,然後把資料生産出來,把資料加入到RingBuffer的執行個體對象中即可。
    執行個體化一個Disruptor對象:
    //建立Disruptor
    //1 eventFactory 為
    //2 ringBufferSize為RingBuffer緩沖區大小,最好是2的指數倍 
    //3 線程池,進行Disruptor内部的資料接收處理調用
    //4 第四個參數ProducerType.SINGLE和ProducerType.MULTI,用來指定資料生成者有一個還是多個
    //5 第五個參數是一種政策:WaitStrategy
    /**
     * 建立Disruptor
     * @param eventFactory 工廠類對象,用于建立一個個的LongEvent, LongEvent是實際的消費資料,初始化啟動Disruptor的時候,Disruptor會調用該工廠方法建立一個個的消費資料執行個體存放到RingBuffer緩沖區裡面去,建立的對象個數為ringBufferSize指定的
     * @param ringBufferSize RingBuffer緩沖區大小
     * @param executor 線程池,Disruptor内部的對資料進行接收處理時調用
     * @param producerType 用來指定資料生成者有一個還是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI
     * @param waitStrategy 一種政策,用來均衡資料生産者和消費者之間的處理效率,預設提供了3個實作類
     */
    com.lmax.disruptor.dsl.Disruptor.Disruptor<V>(EventFactory<V> eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, WaitStrategy waitStrategy)
    
    //BlockingWaitStrategy 是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    
    //SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生産者線程的影響最小,适合用于異步日志類似的場景
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    
    //YieldingWaitStrategy是可以被用在低延遲系統中的兩個政策之一,這種政策在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy政策會循環等待sequence增加到合适的值。循環中調用Thread.yield()允許其他準備好的線程執行。如果需要高性能而且事件消費者線程比邏輯核心少的時候,推薦使用YieldingWaitStrategy政策。例如:在開啟超線程的時候。
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    
    //BusySpinWaitStrategy是性能最高的等待政策,同時也是對部署環境要求最高的政策。這個性能最好用在事件處理線程比實體核心數目還要小的時候。例如:在禁用超線程技術的時候。
    WaitStrategy BusySpin_WAIT = new BusySpinWaitStrategy();
    
    //連接配接消費事件方法,其中EventHandler的是為消費者消費消息的實作類
    disruptor.handleEventsWith(? extends EventHandler<V>);
    
    //通過執行個體化的Disruptor對象擷取到RingBuffer緩沖區,然後往緩沖區裡面添加資料并且釋出,消費者就可以消費這個資料了
    RingBuffer<V> ringBuffer = disruptor.getRingBuffer();//擷取資料緩沖區
    
    long sequence = ringBuffer.next();//從資料緩沖區中擷取下一個可用事件槽的Id
    
    V event = ringBuffer.get(sequence); //從事件槽中擷取一個資料對象(初始化的時候,槽就會生成對應的對象V放到RingBuffer裡面,就是eventFactory傳回的對象)
    
    event.setValue(bbf.getLong(0));//調用Event的方法,設定資料,注意Event完全由使用者實作
    
    ringBuffer.publish(sequence);//釋出事件,釋出的是RingBuffer的事件槽的Id,消費者也是根據這個Id去RingBuffer中擷取對應的事件資料的,另外ringBuffer.publish 方法必須包含在 finally 中以確定必須得到調用;如果某個請求的 sequence 未被送出,将會堵塞後續的釋出操作或者其它的 producer。
               
(2)Disruptor術語
  • RingBuffer:被看做Disruptor最主要的元件,然而從3.0開始RingBuffer僅僅負責存儲和更新再Disruptor中流通的資料。對一些特殊的使用場景能夠被使用者(使用其他資料結構)完全替代。
  • Sequence:Disruptor使用Sequence來表示一個特殊元件處理的序号。和Disruptor一樣,每一個消費者(EventProcessor)都維持着一個Sequence。大部分的并發代碼依賴這些Sequence值得運轉,是以Sequence支援多種目前為AtomicLong類的特性。
  • Sequencer:這是Disruptor真正的核心。實作了這個接口的兩種生産者(單生産者和多生産者)均實作了所有的并發算法,為了在生産者和消費者之間進行準确快速的資料傳遞。
  • SequenceBarrier:由Sequencer生成,并且包含了已經釋出的Sequence的引用,這些Sequence源于Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裡面擷取事件時的處理政策。(例如:當生産者太慢,消費者太快,會導緻消費者擷取不到新的事件會根據該政策進行處理,預設會堵塞)
  • WaitStrategy:決定一個消費者将如何等待生産者将Event置入Disruptor的政策。用來權衡當生産者無法将新的事件放進RingBuffer時的處理政策。(例如:當生産者太快,消費者太慢,會導緻生成者擷取不到新的事件槽來插入新事件,則會根據該政策進行處理,預設會堵塞)
  • Event:從生産者到消費者過程中所處理的資料單元。Disruptor中沒有代碼表示Event,因為它完全是由使用者定義的。
  • EventProcessor:主要事件循環,處理Disruptor中的Event,并且擁有消費者的Sequence。它有一個實作類是BatchEventProcessor,包含了event loop有效的實作,并且将回調到一個EventHandler接口的實作對象。
  • EventHandler:由使用者實作并且代表了Disruptor中的一個消費者的接口。
  • Producer:由使用者實作,它調用RingBuffer來插入事件(Event),在Disruptor中沒有相應的實作代碼,由使用者實作。
  • WorkProcessor:確定每個sequence隻被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
  • WorkerPool:一個WorkProcessor池,其中WorkProcessor将消費Sequence,是以任務可以在實作WorkHandler接口的worker之間移交
  • LifecycleAware:當BatchEventProcessor啟動和停止時,于實作這個接口用于接收通知。
(3)了解RingBuffer
  • ringbuffer到底是什麼?

    答:嗯,正如名字所說的一樣,它是一個環(首尾相接的環),你可以把它用做在不同上下文(線程)間傳遞資料的buffer。

  • 基本來說,ringbuffer擁有一個序号,這個序号指向數組中下一個可用元素。

Disruptor說的是生産者和消費者的故事. 有一個數組.生産者往裡面扔芝麻.消費者從裡面撿芝麻. 但是扔芝麻和撿芝麻也要考慮速度的問題. 1 消費者撿的比扔的快 那麼消費者要停下來.生産者扔了新的芝麻,然後消費者繼續. 2 數組的長度是有限的,生産者到末尾的時候會再從數組的開始位置繼續.這個時候可能會追上消費者,消費者還沒從那個地方撿走芝麻,這個時候生産者要等待消費者撿走芝麻,然後繼續.

  • 随着你不停地填充這個buffer(可能也會有相應的讀取),這個序号會一直增長,直到繞過這個環。
  • 要找到數組中目前序号指向的元素,可以通過mod操作:

    Sequence mod Array.length = index in Array

    (取模操作)假如目前的Sequence為12,RingBuffer的長度為10,那麼下一個事件槽的ID就為(java的mod文法):

    12 % 10 = 2

    。很簡單吧。由于是取模操作,是以如果槽的個數是2的N次方那麼将更有利于基于二進制的計算機進行計算。
(4)RingBuffer的特點
  • 如果你看了維基百科裡面的關于環形buffer的詞條,你就會發現,我們的實作方式,與其最大的差別在于:沒有尾指針。我們隻維護了一個指向下一個可用位置的序号。這種實作是經過深思熟慮的—我們選擇用環形buffer的最初原因就是想要提供可靠的消息傳遞。
  • 我們實作的ring buffer和大家常用的隊列之間的差別是,我們不删除buffer中的資料,也就是說這些資料一直存放在buffer中,直到新的資料覆寫他們。這就是和維基百科版本相比,我們不需要尾指針的原因。ringbuffer本身并不控制是否需要重疊。
  • 因為它是數組,是以要比連結清單快,而且有一個容易預測的通路模式。
  • 這是對CPU緩存友好的,也就是說在硬體級别,數組中的元素是會被預加載的,是以在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。
  • 其次,你可以為數組預先配置設定記憶體,使得數組對象一直存在(除非程式終止)。這就意味着不需要花大量的時間用于垃圾回收。此外,不像連結清單那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當删除節點時,需要執行相應的記憶體清理操作。
(5)Disruptor應用

Disruptor實際上是對RingBuffer的封裝,是以我們也可以直接使用RingBuffer類

  • API提供的生産者接口

    EventTranslator<V>與EventTranslatorOneArg<V v, Object data>

    ,前者不能動态傳參,後者可以動态傳遞一個參數data,V為需要建立的資料對象,data為實際資料,實作

    translateTo(V v, long sequeue, Object data)

    方法,其中v就是下一個可用事件槽裡面的對象,data為傳進來的真實資料,調用

    ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);

    來釋出資料到RingBuffer中
    import java.nio.ByteBuffer;
    
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;
    
    /**
     * Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,
     * 是以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來釋出事件
     */
    public class LongEventProducerWithTranslator {
    
        //一個translator可以看做一個事件初始化器,publicEvent方法會調用它
        //填充Event
        private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                    @Override
                    public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
                        event.setValue(buffer.getLong(0));
                    }
        };
    
        private final RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(ByteBuffer buffer){
            ringBuffer.publishEvent(TRANSLATOR, buffer);
        }
    
    }
               
  • API提供的消費者接口
    • WorkerPool :

      WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)

      其中

      RingBuffer

      為資料緩沖區,

      sequenceBarrier

      是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過

      RingBuffer.newBarrier(Sequence... sequencesToTrack);

      來擷取,

      exceptionHandler

      為異常處理函數,當handler發生異常時回調該函數,

      workHandlers

      為實作了EventHandler接口的消息業務處理類,可以有多個。

      WorkerPool啟動的方法是

      WorkerPool.start(Executor executor)

    • BatchEventProcessor :

      BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler)

      其中

      RingBuffer

      為資料緩沖區,

      sequenceBarrier

      是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過

      RingBuffer.newBarrier(Sequence... sequencesToTrack);

      來擷取,

      eventHandler

      為實作了EventHandler接口的消息業務處理類。

      BatchEventProcessor啟動的方法是

      Executor.submit(BatchEventProcessor batchEventProcessor)

**注意**SequenceBarrier是用來協調消費者和生成者之間效率的政策類,是以要想Barrier生效,必須要将消費者消費的Seuence傳遞給RingBuffer,然後由RingBuffer進行協調:

ringBuffer.addGatingSequences(BatchEventProcessor.getSequence()); 多消費者時使用BatchEventProcessor.getWorkerSequences()

(這兩個方法WorkerPool同樣适用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行調用,在調用handleEventsWith注冊消費者方法時會自動添加該處理。

-Trade.java

import java.util.concurrent.atomic.AtomicInteger;

    public class Trade {  

        private String id;//ID  
        private String name;
        private double price;//金額  
        private AtomicInteger count = new AtomicInteger(0);

        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
        public AtomicInteger getCount() {
            return count;
        }
        public void setCount(AtomicInteger count) {
            this.count = count;
        } 


    }  
           
  • TradeHandler.java
    import java.util.UUID;
    
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    
    /**
     * 實作EventHandler是為了作為BatchEventProcessor的事件處理器,
     * 實作WorkHandler是為了作為WorkerPool的事件處理器
     * @author jliu10
     *
     */
    public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
    
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            this.onEvent(event);  
        }  
    
        @Override  
        public void onEvent(Trade event) throws Exception {  
            //這裡做具體的消費邏輯  
            event.setId(UUID.randomUUID().toString());//簡單生成下ID  
            System.out.println(event.getId());  
        }  
    }  
               
  • Main1.java
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.lmax.disruptor.BatchEventProcessor;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.YieldingWaitStrategy;
    
    public class Main1 {  
    
        public static void main(String[] args) throws Exception {  
            int BUFFER_SIZE=1024;  
            int THREAD_NUMBERS=4;  
            /* 
             * createSingleProducer建立一個單生産者的RingBuffer, 
             * 第一個參數叫EventFactory,從名字上了解就是"事件工廠",其實它的職責就是産生資料填充RingBuffer的區塊。 
             * 第二個參數是RingBuffer的大小,它必須是2的指數倍 目的是為了将求模運算轉為&運算提高效率 
             * 第三個參數是RingBuffer的生産都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待政策 
             */  
            final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, BUFFER_SIZE, new YieldingWaitStrategy());  
    
            //建立線程池  
            ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    
            //建立SequenceBarrier 用來權衡消費者是否可以從ringbuffer裡面擷取事件
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    
            //建立消息處理器  
            BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                    ringBuffer, sequenceBarrier, new TradeHandler());  
    
            //這一步的目的就是把消費者的位置資訊引用注入到生産者    如果隻有一個消費者的情況可以省略 
            ringBuffer.addGatingSequences(transProcessor.getSequence());  
    
            //把消息處理器送出到線程池  
            executors.submit(transProcessor);  
    
            //如果存在多個消費者 那重複執行上面3行代碼 把TradeHandler換成其它消費者類  
    
            Future<?> future= executors.submit(new Callable<Void>() {  
                @Override  
                public Void call() throws Exception {  
                    long seq;  
                    for(int i=0;i<10;i++){  
                        seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊  
                        ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 資料 
                        ringBuffer.publish(seq);//釋出這個區塊的資料使handler(consumer)可見  
                    }  
                    return null;  
                }  
            }); 
    
            future.get();//等待生産者結束  
            Thread.sleep(1000);//等上1秒,等消費都處理完成  
            transProcessor.halt();//通知事件(或者說消息)處理器 可以結束了(并不是馬上結束!!!)  
            executors.shutdown();//終止線程  
        }  
    }  
               
  • Main2.java
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.IgnoreExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.WorkHandler;
    import com.lmax.disruptor.WorkerPool;
    
    public class Main2 {  
        public static void main(String[] args) throws InterruptedException {  
            int BUFFER_SIZE=1024;  
            int THREAD_NUMBERS=4;  
    
            EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            };  
    
            RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
    
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    
            WorkHandler<Trade> handler = new TradeHandler();  
    
            WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
    
            //這一步的目的就是把消費者的位置資訊引用注入到生産者    如果隻有一個消費者的情況可以省略 
            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
    
            workerPool.start(executor);  
    
            //下面這個生産8個資料
            for(int i=0;i<8;i++){  
                long seq=ringBuffer.next();  
                ringBuffer.get(seq).setPrice(Math.random()*9999);  
                ringBuffer.publish(seq);  
            }  
    
            Thread.sleep(1000);  
            workerPool.halt();  
            executor.shutdown();  
        }  
    }  
               
  • Disruptor注冊消費者的方法是:

    Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)

    Disruptor提供了一些複雜的并行運作方式。
    • 1、生産者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的消息處理類必須要實作EventHandler接口)
      EventHandlerGroup<Trade> handlerGroup = 
              disruptor.handleEventsWith(A, B);
      //聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3 
      handlerGroup.then(C);
                 
    • 2、生産者A生成的資料同時被B1,C2兩個消費者消費,而B消耗完畢之後由B2處理,C1處理完成之後由C2處理,B2與C2兩者都消費完成之後再由消費者D對兩者同時消費。其中B1與B2,C1與C2是并行執行的。
      disruptor.handleEventsWith(B1, C1);
      disruptor.after(B1).handleEventsWith(B2);
      disruptor.after(C1).handleEventsWith(C2);
      disruptor.after(B2, C2).handleEventsWith(h3);
                 
    • 3、生産者A生成的資料依次被A,B,C三個消費者消費
      disruptor.handleEventsWith(A).handleEventsWith(B).handleEventsWith(C);
                 

參考部落格 http://ifeve.com/disruptor-dsl/,該部落格中介紹的是Disruptor2.0的版本,在3.0中有一些方法的作用有變化,請參考http://ifeve.com/disruptor-wizard/

  • Main.java
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.BusySpinWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;
    import com.test.sync13.generate1.Trade;
    
    public class Main {  
        public static void main(String[] args) throws InterruptedException {  
    
            long beginTime=System.currentTimeMillis();  
            int bufferSize=8;  
            ExecutorService executor=Executors.newFixedThreadPool(8);  
    
            Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
    
            //菱形操作
            /**
            //使用disruptor建立消費者組C1,C2  
            EventHandlerGroup<Trade> handlerGroup = 
                    disruptor.handleEventsWith(new Handler1(), new Handler2());
            //聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3 
            handlerGroup.then(new Handler3());
            */
    
            //順序操作
            /** */
            disruptor.handleEventsWith(new Handler1()).
                handleEventsWith(new Handler2()).
                handleEventsWith(new Handler3());
    
    
            //六邊形操作. 
            /** 
            Handler1 h1 = new Handler1();
            Handler2 h2 = new Handler2();
            Handler3 h3 = new Handler3();
            Handler4 h4 = new Handler4();
            Handler5 h5 = new Handler5();
            disruptor.handleEventsWith(h1, h2);
            disruptor.after(h1).handleEventsWith(h4);
            disruptor.after(h2).handleEventsWith(h5);
            disruptor.after(h4, h5).handleEventsWith(h3);
           */
    
    
    
            disruptor.start();//啟動  
            CountDownLatch latch=new CountDownLatch(1);  
            //生産者準備  
            executor.submit(new TradePublisher(latch, disruptor));
    
            latch.await();//等待生産者完事. 
    
            disruptor.shutdown();  
            executor.shutdown();  
            System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));  
        }  
    }  
               
  • Handler*.java
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    import com.test.sync13.generate1.Trade;
    
    public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {  
    
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            this.onEvent(event);  
        }  
    
        @Override  
        public void onEvent(Trade event) throws Exception {  
            System.out.println("handler1: set name");
            event.setName("h1");
            Thread.sleep(500);
        }  
    }  
               
  • TradePublisher.java
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    
    import com.lmax.disruptor.EventTranslator;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.test.sync13.generate1.Trade;
    
    public class TradePublisher implements Runnable {  
    
        Disruptor<Trade> disruptor;  
        private CountDownLatch latch;  
    
        private static int LOOP=10;//模拟百萬次交易的發生  
    
        public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
            this.disruptor=disruptor;  
            this.latch=latch;  
        }  
    
        @Override  
        public void run() {  
            TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
            for(int i=0;i<LOOP;i++){  
                disruptor.publishEvent(tradeTransloator);  
            }  
            //采用CountDownLatch來保證10個線程能夠同時啟動
            latch.countDown();  
        }  
    
    }  
    
    class TradeEventTranslator implements EventTranslator<Trade>{  
    
        private Random random=new Random();  
    
        @Override  
        public void translateTo(Trade event, long sequence) {  
            this.generateTrade(event);  
        }  
    
        private Trade generateTrade(Trade trade){  
            trade.setPrice(random.nextDouble()*9999);  
            return trade;  
        }  
    
    }  
               
  • Trade.java
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Trade {  
    
        private String id;//ID  
        private String name;
        private double price;//金額  
        private AtomicInteger count = new AtomicInteger(0);
    
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
        public AtomicInteger getCount() {
            return count;
        }
        public void setCount(AtomicInteger count) {
            this.count = count;
        } 
    
    
    }  
               

繼續閱讀