6 Disruptor并發架構簡介
Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平台,它能夠以很低的延遲産生大量交易。這個系統是建立在JVM平台上,其核心是一個業務邏輯處理器,
它能夠在一個線程裡每秒處理6百萬訂單。業務邏輯處理器完全是運作在記憶體中,使用
事件源驅動方式。業務邏輯處理器的核心是Disruptor。
Disruptor它是一個開源的并發架構,并獲得2011 Duke’s 程式架構創新獎,能夠在無鎖的情況下實作網絡的Queue并發操作。
Disruptor是一個高性能的異步處理架構,或者可以認為是最快的消息架構(輕量的JMS),也可以認為是一個觀察者模式的實作,或者事件監聽模式的實作。
目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本性能更加的優秀,提供更多的API使用方式。
下載下傳disruptor-3.3.2.jar引入我們的項目既可以開始disruptor之旅。
在使用之前,首先說明disruptor主要功能加以說明,你可以了解為他是一種高效的”生産者-消費者”模型。也就性能遠遠高于傳統的BlockingQueue容器。
官方學習網站:http://ifeve.com/disruptor-getting-started/
(1)使用Disruptor
- 第一:建立一個Event類,用來承載資料,因為Disruptor是一個事件驅動的,是以再Disruptor中是以事件綁定資料進行傳遞的
- 第二:建立一個工廠Event類,用于建立Event類執行個體對象
- 第三:需要有一個監聽事件類,用于處理資料(Event類)
- 第四:我們需要進行測試代碼編寫。執行個體化Disruptor執行個體,配置一系列參數。然後我們對Disruptor執行個體綁定監聽事件類,接受并處理資料。
- 第五:在Disruptor中,真正存儲資料的核心叫做RingBuffer,我們通過Disruptor執行個體拿到它,然後把資料生産出來,把資料加入到RingBuffer的執行個體對象中即可。
執行個體化一個Disruptor對象: //建立Disruptor //1 eventFactory 為 //2 ringBufferSize為RingBuffer緩沖區大小,最好是2的指數倍 //3 線程池,進行Disruptor内部的資料接收處理調用 //4 第四個參數ProducerType.SINGLE和ProducerType.MULTI,用來指定資料生成者有一個還是多個 //5 第五個參數是一種政策:WaitStrategy /** * 建立Disruptor * @param eventFactory 工廠類對象,用于建立一個個的LongEvent, LongEvent是實際的消費資料,初始化啟動Disruptor的時候,Disruptor會調用該工廠方法建立一個個的消費資料執行個體存放到RingBuffer緩沖區裡面去,建立的對象個數為ringBufferSize指定的 * @param ringBufferSize RingBuffer緩沖區大小 * @param executor 線程池,Disruptor内部的對資料進行接收處理時調用 * @param producerType 用來指定資料生成者有一個還是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI * @param waitStrategy 一種政策,用來均衡資料生産者和消費者之間的處理效率,預設提供了3個實作類 */ com.lmax.disruptor.dsl.Disruptor.Disruptor<V>(EventFactory<V> eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, WaitStrategy waitStrategy) //BlockingWaitStrategy 是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現 WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy(); //SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生産者線程的影響最小,适合用于異步日志類似的場景 WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); //YieldingWaitStrategy是可以被用在低延遲系統中的兩個政策之一,這種政策在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy政策會循環等待sequence增加到合适的值。循環中調用Thread.yield()允許其他準備好的線程執行。如果需要高性能而且事件消費者線程比邏輯核心少的時候,推薦使用YieldingWaitStrategy政策。例如:在開啟超線程的時候。 WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); //BusySpinWaitStrategy是性能最高的等待政策,同時也是對部署環境要求最高的政策。這個性能最好用在事件處理線程比實體核心數目還要小的時候。例如:在禁用超線程技術的時候。 WaitStrategy BusySpin_WAIT = new BusySpinWaitStrategy(); //連接配接消費事件方法,其中EventHandler的是為消費者消費消息的實作類 disruptor.handleEventsWith(? extends EventHandler<V>); //通過執行個體化的Disruptor對象擷取到RingBuffer緩沖區,然後往緩沖區裡面添加資料并且釋出,消費者就可以消費這個資料了 RingBuffer<V> ringBuffer = disruptor.getRingBuffer();//擷取資料緩沖區 long sequence = ringBuffer.next();//從資料緩沖區中擷取下一個可用事件槽的Id V event = ringBuffer.get(sequence); //從事件槽中擷取一個資料對象(初始化的時候,槽就會生成對應的對象V放到RingBuffer裡面,就是eventFactory傳回的對象) event.setValue(bbf.getLong(0));//調用Event的方法,設定資料,注意Event完全由使用者實作 ringBuffer.publish(sequence);//釋出事件,釋出的是RingBuffer的事件槽的Id,消費者也是根據這個Id去RingBuffer中擷取對應的事件資料的,另外ringBuffer.publish 方法必須包含在 finally 中以確定必須得到調用;如果某個請求的 sequence 未被送出,将會堵塞後續的釋出操作或者其它的 producer。
(2)Disruptor術語
- RingBuffer:被看做Disruptor最主要的元件,然而從3.0開始RingBuffer僅僅負責存儲和更新再Disruptor中流通的資料。對一些特殊的使用場景能夠被使用者(使用其他資料結構)完全替代。
- Sequence:Disruptor使用Sequence來表示一個特殊元件處理的序号。和Disruptor一樣,每一個消費者(EventProcessor)都維持着一個Sequence。大部分的并發代碼依賴這些Sequence值得運轉,是以Sequence支援多種目前為AtomicLong類的特性。
- Sequencer:這是Disruptor真正的核心。實作了這個接口的兩種生産者(單生産者和多生産者)均實作了所有的并發算法,為了在生産者和消費者之間進行準确快速的資料傳遞。
- SequenceBarrier:由Sequencer生成,并且包含了已經釋出的Sequence的引用,這些Sequence源于Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裡面擷取事件時的處理政策。(例如:當生産者太慢,消費者太快,會導緻消費者擷取不到新的事件會根據該政策進行處理,預設會堵塞)
- WaitStrategy:決定一個消費者将如何等待生産者将Event置入Disruptor的政策。用來權衡當生産者無法将新的事件放進RingBuffer時的處理政策。(例如:當生産者太快,消費者太慢,會導緻生成者擷取不到新的事件槽來插入新事件,則會根據該政策進行處理,預設會堵塞)
- Event:從生産者到消費者過程中所處理的資料單元。Disruptor中沒有代碼表示Event,因為它完全是由使用者定義的。
- EventProcessor:主要事件循環,處理Disruptor中的Event,并且擁有消費者的Sequence。它有一個實作類是BatchEventProcessor,包含了event loop有效的實作,并且将回調到一個EventHandler接口的實作對象。
- EventHandler:由使用者實作并且代表了Disruptor中的一個消費者的接口。
- Producer:由使用者實作,它調用RingBuffer來插入事件(Event),在Disruptor中沒有相應的實作代碼,由使用者實作。
- WorkProcessor:確定每個sequence隻被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
- WorkerPool:一個WorkProcessor池,其中WorkProcessor将消費Sequence,是以任務可以在實作WorkHandler接口的worker之間移交
- LifecycleAware:當BatchEventProcessor啟動和停止時,于實作這個接口用于接收通知。
(3)了解RingBuffer
-
ringbuffer到底是什麼?
答:嗯,正如名字所說的一樣,它是一個環(首尾相接的環),你可以把它用做在不同上下文(線程)間傳遞資料的buffer。
- 基本來說,ringbuffer擁有一個序号,這個序号指向數組中下一個可用元素。
Disruptor說的是生産者和消費者的故事. 有一個數組.生産者往裡面扔芝麻.消費者從裡面撿芝麻. 但是扔芝麻和撿芝麻也要考慮速度的問題. 1 消費者撿的比扔的快 那麼消費者要停下來.生産者扔了新的芝麻,然後消費者繼續. 2 數組的長度是有限的,生産者到末尾的時候會再從數組的開始位置繼續.這個時候可能會追上消費者,消費者還沒從那個地方撿走芝麻,這個時候生産者要等待消費者撿走芝麻,然後繼續.
- 随着你不停地填充這個buffer(可能也會有相應的讀取),這個序号會一直增長,直到繞過這個環。
- 要找到數組中目前序号指向的元素,可以通過mod操作:
(取模操作)假如目前的Sequence為12,RingBuffer的長度為10,那麼下一個事件槽的ID就為(java的mod文法):Sequence mod Array.length = index in Array
。很簡單吧。由于是取模操作,是以如果槽的個數是2的N次方那麼将更有利于基于二進制的計算機進行計算。12 % 10 = 2
(4)RingBuffer的特點
- 如果你看了維基百科裡面的關于環形buffer的詞條,你就會發現,我們的實作方式,與其最大的差別在于:沒有尾指針。我們隻維護了一個指向下一個可用位置的序号。這種實作是經過深思熟慮的—我們選擇用環形buffer的最初原因就是想要提供可靠的消息傳遞。
- 我們實作的ring buffer和大家常用的隊列之間的差別是,我們不删除buffer中的資料,也就是說這些資料一直存放在buffer中,直到新的資料覆寫他們。這就是和維基百科版本相比,我們不需要尾指針的原因。ringbuffer本身并不控制是否需要重疊。
- 因為它是數組,是以要比連結清單快,而且有一個容易預測的通路模式。
- 這是對CPU緩存友好的,也就是說在硬體級别,數組中的元素是會被預加載的,是以在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。
- 其次,你可以為數組預先配置設定記憶體,使得數組對象一直存在(除非程式終止)。這就意味着不需要花大量的時間用于垃圾回收。此外,不像連結清單那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當删除節點時,需要執行相應的記憶體清理操作。
(5)Disruptor應用
Disruptor實際上是對RingBuffer的封裝,是以我們也可以直接使用RingBuffer類
- API提供的生産者接口
,前者不能動态傳參,後者可以動态傳遞一個參數data,V為需要建立的資料對象,data為實際資料,實作EventTranslator<V>與EventTranslatorOneArg<V v, Object data>
方法,其中v就是下一個可用事件槽裡面的對象,data為傳進來的真實資料,調用translateTo(V v, long sequeue, Object data)
來釋出資料到RingBuffer中ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);
import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer, * 是以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來釋出事件 */ public class LongEventProducerWithTranslator { //一個translator可以看做一個事件初始化器,publicEvent方法會調用它 //填充Event private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer buffer){ ringBuffer.publishEvent(TRANSLATOR, buffer); } }
- API提供的消費者接口
- WorkerPool :
其中WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)
為資料緩沖區,RingBuffer
是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過sequenceBarrier
來擷取,RingBuffer.newBarrier(Sequence... sequencesToTrack);
為異常處理函數,當handler發生異常時回調該函數,exceptionHandler
workHandlers
為實作了EventHandler接口的消息業務處理類,可以有多個。
WorkerPool啟動的方法是
WorkerPool.start(Executor executor)
- BatchEventProcessor :
其中BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler)
為資料緩沖區,RingBuffer
是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過sequenceBarrier
來擷取,RingBuffer.newBarrier(Sequence... sequencesToTrack);
eventHandler
為實作了EventHandler接口的消息業務處理類。
BatchEventProcessor啟動的方法是
Executor.submit(BatchEventProcessor batchEventProcessor)
- WorkerPool :
**注意**SequenceBarrier是用來協調消費者和生成者之間效率的政策類,是以要想Barrier生效,必須要将消費者消費的Seuence傳遞給RingBuffer,然後由RingBuffer進行協調:
ringBuffer.addGatingSequences(BatchEventProcessor.getSequence()); 多消費者時使用BatchEventProcessor.getWorkerSequences()
(這兩個方法WorkerPool同樣适用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行調用,在調用handleEventsWith注冊消費者方法時會自動添加該處理。
-Trade.java
import java.util.concurrent.atomic.AtomicInteger;
public class Trade {
private String id;//ID
private String name;
private double price;//金額
private AtomicInteger count = new AtomicInteger(0);
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
- TradeHandler.java
import java.util.UUID; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * 實作EventHandler是為了作為BatchEventProcessor的事件處理器, * 實作WorkHandler是為了作為WorkerPool的事件處理器 * @author jliu10 * */ public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //這裡做具體的消費邏輯 event.setId(UUID.randomUUID().toString());//簡單生成下ID System.out.println(event.getId()); } }
- Main1.java
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer建立一個單生産者的RingBuffer, * 第一個參數叫EventFactory,從名字上了解就是"事件工廠",其實它的職責就是産生資料填充RingBuffer的區塊。 * 第二個參數是RingBuffer的大小,它必須是2的指數倍 目的是為了将求模運算轉為&運算提高效率 * 第三個參數是RingBuffer的生産都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待政策 */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //建立線程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //建立SequenceBarrier 用來權衡消費者是否可以從ringbuffer裡面擷取事件 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //建立消息處理器 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //這一步的目的就是把消費者的位置資訊引用注入到生産者 如果隻有一個消費者的情況可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //把消息處理器送出到線程池 executors.submit(transProcessor); //如果存在多個消費者 那重複執行上面3行代碼 把TradeHandler換成其它消費者類 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊 ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 資料 ringBuffer.publish(seq);//釋出這個區塊的資料使handler(consumer)可見 } return null; } }); future.get();//等待生産者結束 Thread.sleep(1000);//等上1秒,等消費都處理完成 transProcessor.halt();//通知事件(或者說消息)處理器 可以結束了(并不是馬上結束!!!) executors.shutdown();//終止線程 } }
- Main2.java
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool; public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; EventFactory<Trade> eventFactory = new EventFactory<Trade>() { public Trade newInstance() { return new Trade(); } }; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandler<Trade> handler = new TradeHandler(); WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); //這一步的目的就是把消費者的位置資訊引用注入到生産者 如果隻有一個消費者的情況可以省略 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executor); //下面這個生産8個資料 for(int i=0;i<8;i++){ long seq=ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); } }
- Disruptor注冊消費者的方法是:
Disruptor提供了一些複雜的并行運作方式。Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)
- 1、生産者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的消息處理類必須要實作EventHandler接口)
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(A, B); //聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3 handlerGroup.then(C);
- 2、生産者A生成的資料同時被B1,C2兩個消費者消費,而B消耗完畢之後由B2處理,C1處理完成之後由C2處理,B2與C2兩者都消費完成之後再由消費者D對兩者同時消費。其中B1與B2,C1與C2是并行執行的。
disruptor.handleEventsWith(B1, C1); disruptor.after(B1).handleEventsWith(B2); disruptor.after(C1).handleEventsWith(C2); disruptor.after(B2, C2).handleEventsWith(h3);
- 3、生産者A生成的資料依次被A,B,C三個消費者消費
disruptor.handleEventsWith(A).handleEventsWith(B).handleEventsWith(C);
- 1、生産者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的消息處理類必須要實作EventHandler接口)
參考部落格 http://ifeve.com/disruptor-dsl/,該部落格中介紹的是Disruptor2.0的版本,在3.0中有一些方法的作用有變化,請參考http://ifeve.com/disruptor-wizard/
- Main.java
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import com.test.sync13.generate1.Trade; public class Main { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=8; ExecutorService executor=Executors.newFixedThreadPool(8); Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操作 /** //使用disruptor建立消費者組C1,C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3 handlerGroup.then(new Handler3()); */ //順序操作 /** */ disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3()); //六邊形操作. /** Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); */ disruptor.start();//啟動 CountDownLatch latch=new CountDownLatch(1); //生産者準備 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生産者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime)); } }
- Handler*.java
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; import com.test.sync13.generate1.Trade; public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler1: set name"); event.setName("h1"); Thread.sleep(500); } }
- TradePublisher.java
import java.util.Random; import java.util.concurrent.CountDownLatch; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; import com.test.sync13.generate1.Trade; public class TradePublisher implements Runnable { Disruptor<Trade> disruptor; private CountDownLatch latch; private static int LOOP=10;//模拟百萬次交易的發生 public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i=0;i<LOOP;i++){ disruptor.publishEvent(tradeTransloator); } //采用CountDownLatch來保證10個線程能夠同時啟動 latch.countDown(); } } class TradeEventTranslator implements EventTranslator<Trade>{ private Random random=new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade){ trade.setPrice(random.nextDouble()*9999); return trade; } }
- Trade.java
import java.util.concurrent.atomic.AtomicInteger; public class Trade { private String id;//ID private String name; private double price;//金額 private AtomicInteger count = new AtomicInteger(0); public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } public AtomicInteger getCount() { return count; } public void setCount(AtomicInteger count) { this.count = count; } }