天天看點

Java并發程式設計架構Disruptor

Disruptor是什麼?

輪胎:RingBuffer

RingBuffer,環形緩沖區,在disruptor中扮演着非常重要的角色,了解RingBuffer的結構有利于我們了解disruptor為什麼這麼快、無鎖的實作方式、生産者/消費者模式的實作細節。如下圖所示:

<a href="http://s1.51cto.com/wyfs02/M01/8B/F8/wKioL1heKq_wpmu8AAA9zSBGoCQ743.png" target="_blank"></a>

數組

這個類似于輪胎的東西實際上就是一個數組,使用數組的好處當然是由于預加載的緣故使得通路比連結清單要快的多。

序号

RingBuffer中元素擁有序号的概念,并且序号是一直增長的,這怎麼了解?比如RingBuffer大小為10,那麼序号從0開始增長,當到9的時候,相當于轉了一圈,如果繼續增長的話,那麼将覆寫0号元素。也即是說通過 序号%SIZE 來定位元素,實作set/get操作。這裡也發現了和隊列不同的一個方式,就是不存在元素的删除操作,隻有覆寫而已,實際上RingBuffer的這種特别的環形存儲方式,使得不需要花費大量的時間用于記憶體清理/垃圾回收。

由于涉及到取模操作,為了CPU進行位運算更加高效,RingBuffer的大小應該是2的N次方。

無鎖的機制

在生産者/消費者模式下,disruptor号稱“無鎖并行架構”(要知道BlockingQueue是利用了Lock鎖機制來實作的),這是怎麼做到的呢?下面我們來具體分析下:

一個生産者 + 一個消費者

生産者維護一個生産指針P,消費者維護一個消費者指針C,當然P和C本質上就是序号。2者各操作各的,不需要鎖,僅僅需要注意的是生産者和消費者的速度問題,當然這個在disruptor内部已經為我們做了處理,就是判斷一下P和C之間不能超過一圈的大小。

一個生産者 + 多個消費者

多個消費者當然持有多個消費指針C1,C2,...,消費者依據C進行各自讀取資料,隻需要保證生産者的速度“協調”最慢的消費者的速度,就是那個不能超出一圈的概念。此時也不需要進行鎖定。

多個生産者 + N個消費者

很顯然,無論生産者有幾個,生産者指針P隻能存在一個,否則資料就亂套了。那麼多個生産者之間共享一個P指針,在disruptor中實際上是利用了CAS機制來保證多線程的資料安全,也沒有使用到鎖。

Disruptor初體驗:簡單的生産者和消費者

業務資料對象POJO(Event)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

<code>public</code> <code>class</code> <code>Order {</code>

<code>    </code><code>//訂單ID</code>

<code>    </code><code>private</code> <code>long</code> <code>id;</code>

<code>    </code><code>//訂單資訊</code>

<code>    </code><code>private</code> <code>String info;</code>

<code>    </code><code>//訂單價格</code>

<code>    </code><code>private</code> <code>double</code> <code>price;</code>

<code>    </code><code>public</code> <code>long</code> <code>getId() {</code>

<code>        </code><code>return</code> <code>id;</code>

<code>    </code><code>}</code>

<code>    </code><code>public</code> <code>void</code> <code>setId(</code><code>long</code> <code>id) {</code>

<code>        </code><code>this</code><code>.id = id;</code>

<code>    </code><code>public</code> <code>String getInfo() {</code>

<code>        </code><code>return</code> <code>info;</code>

<code>    </code><code>public</code> <code>void</code> <code>setInfo(String info) {</code>

<code>        </code><code>this</code><code>.info = info;</code>

<code>    </code><code>public</code> <code>double</code> <code>getPrice() {</code>

<code>        </code><code>return</code> <code>price;</code>

<code>    </code><code>public</code> <code>void</code> <code>setPrice(</code><code>double</code> <code>price) {</code>

<code>        </code><code>this</code><code>.price = price;</code>

<code>}</code>

業務資料工廠(Factory)

<code>public</code> <code>class</code> <code>OrderFactory </code><code>implements</code> <code>EventFactory{</code>

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>Object newInstance() {</code>

<code>        </code><code>System.out.println(</code><code>"OrderFactory.newInstance"</code><code>);</code>

<code>        </code><code>return</code> <code>new</code> <code>Order();</code>

事件處理器(Handler,即消費者處理邏輯)

<code>public</code> <code>class</code> <code>OrderHandler </code><code>implements</code> <code>EventHandler&lt;Order&gt;{</code>

<code>    </code><code>public</code> <code>void</code> <code>onEvent(Order order, </code><code>long</code> <code>l, </code><code>boolean</code> <code>b) </code><code>throws</code> <code>Exception {</code>

<code>        </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 消費者進行中:"</code> <code>+ l);</code>

<code>        </code><code>order.setInfo(</code><code>"info"</code> <code>+ order.getId());</code>

<code>        </code><code>order.setPrice(Math.random());</code>

Main

36

37

38

39

<code>public</code> <code>class</code> <code>Main {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>InterruptedException {</code>

<code>        </code><code>//建立訂單工廠</code>

<code>        </code><code>OrderFactory orderFactory = </code><code>new</code> <code>OrderFactory();</code>

<code>        </code><code>//ringbuffer的大小</code>

<code>        </code><code>int</code> <code>RINGBUFFER_SIZE = </code><code>1024</code><code>;</code>

<code>        </code><code>//建立disruptor</code>

<code>        </code><code>Disruptor&lt;Order&gt; disruptor = </code><code>new</code> <code>Disruptor&lt;Order&gt;(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());</code>

<code>        </code><code>//設定事件處理器 即消費者</code>

<code>        </code><code>disruptor.handleEventsWith(</code><code>new</code> <code>OrderHandler());</code>

<code>        </code><code>disruptor.start();</code>

<code>        </code><code>RingBuffer&lt;Order&gt; ringBuffer = disruptor.getRingBuffer();</code>

<code>        </code><code>//-------------生産資料</code>

<code>        </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i &lt; </code><code>3</code> <code>; i++){</code>

<code>            </code><code>long</code> <code>sequence = ringBuffer.next();</code>

<code>            </code><code>Order order = ringBuffer.get(sequence);</code>

<code>            </code><code>order.setId(i);</code>

<code>            </code><code>ringBuffer.publish(sequence);</code>

<code>            </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 生産者釋出一條資料:"</code> <code>+ sequence + </code><code>" 訂單ID:"</code> <code>+ i);</code>

<code>        </code><code>}</code>

<code>        </code><code>Thread.sleep(</code><code>1000</code><code>);</code>

<code>        </code><code>disruptor.shutdown();</code>

運作結果:

<a href="http://s2.51cto.com/wyfs02/M00/8B/FE/wKiom1heUkvDVrbbAABB1RkRt-k131.png" target="_blank"></a>

說明:

其實上面的結果已經很明顯的說明了,在初始階段構造Disruptor的時候,會調用工廠Factory去執行個體化RingBuffer中的Event資料對象。

另外在構造Disruptor的時候,在3.3.6之前使用的是API:

<a href="http://s4.51cto.com/wyfs02/M02/8B/FB/wKioL1heU5KD4yc1AABMgNq5GkY611.png" target="_blank"></a>

到了3.3.6這些API都不推薦使用了,即不再推薦傳入Executor這樣的線程池,而是推薦傳入ThreadFactory線程工廠。這樣的話,關閉disruptor就會自動關閉Executor線程池,而不需要像以前那樣必須在關閉disruptor的時候再關閉線程池了。

構造Disruptor時,需要注意ProducerType(SINGLE or MULTI 訓示是單個生産者還是多個生産者模式)、WaitStrategy(政策選擇,決定了消費者如何等待生産者)。

<a href="http://s5.51cto.com/wyfs02/M00/8B/FB/wKioL1heVOLj98krAABdMOQG3ug461.png" target="_blank"></a>

單獨使用RingBuffer:WorkerPool

如果場景比較簡單,我們完全可以不用建立Disruptor,而是僅僅使用RingBuffer功能。

<code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>InterruptedException {</code>

<code>    </code><code>ExecutorService executor = Executors.newFixedThreadPool(</code><code>3</code><code>);</code>

<code>    </code><code>RingBuffer&lt;Order&gt; ringBuffer = RingBuffer.create(ProducerType.SINGLE,</code><code>new</code> <code>OrderFactory(),</code><code>1024</code><code>,</code><code>new</code> <code>YieldingWaitStrategy());</code>

<code>    </code><code>WorkerPool&lt;Order&gt; workerPool = </code><code>new</code> <code>WorkerPool&lt;Order&gt;(ringBuffer,ringBuffer.newBarrier(),</code><code>new</code> <code>IgnoreExceptionHandler(),</code><code>new</code> <code>OrderHandler());</code>

<code>    </code><code>workerPool.start(executor);</code>

<code>    </code><code>//-------------生産資料</code>

<code>    </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i &lt; </code><code>30</code> <code>; i++){</code>

<code>        </code><code>long</code> <code>sequence = ringBuffer.next();</code>

<code>        </code><code>Order order = ringBuffer.get(sequence);</code>

<code>        </code><code>order.setId(i);</code>

<code>        </code><code>ringBuffer.publish(sequence);</code>

<code>        </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 生産者釋出一條資料:"</code> <code>+ sequence + </code><code>" 訂單ID:"</code> <code>+ i);</code>

<code>    </code><code>Thread.sleep(</code><code>1000</code><code>);</code>

<code>    </code> 

<code>    </code><code>workerPool.halt();</code>

<code>    </code><code>executor.shutdown();</code>

實際上是利用WorkerPool輔助連接配接消費者。

一個生産者+多個消費者

<a href="http://s5.51cto.com/wyfs02/M01/8B/FC/wKioL1heZFugNx-wAABXx8iDTG8613.png" target="_blank"></a>

<code>    </code><code>//建立訂單工廠</code>

<code>    </code><code>OrderFactory orderFactory = </code><code>new</code> <code>OrderFactory();</code>

<code>    </code><code>//ringbuffer的大小</code>

<code>    </code><code>int</code> <code>RINGBUFFER_SIZE = </code><code>1024</code><code>;</code>

<code>    </code><code>//建立disruptor</code>

<code>    </code><code>Disruptor&lt;Order&gt; disruptor = </code><code>new</code> <code>Disruptor&lt;Order&gt;(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());</code>

<code>    </code><code>//設定事件處理器 即消費者</code>

<code>    </code><code>EventHandlerGroup&lt;Order&gt; eventHandlerGroup = disruptor.handleEventsWith(</code><code>new</code> <code>OrderHandler(),</code><code>new</code> <code>OrderHandler2());</code>

<code>    </code><code>eventHandlerGroup.then(</code><code>new</code> <code>OrderHandler3());</code>

<code>    </code><code>disruptor.start();</code>

<code>    </code><code>RingBuffer&lt;Order&gt; ringBuffer = disruptor.getRingBuffer();</code>

<code>    </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i &lt; </code><code>3</code> <code>; i++){</code>

<code>    </code><code>disruptor.shutdown();</code>

<a href="http://s5.51cto.com/wyfs02/M01/8C/00/wKiom1heZJniEbQbAAA1jfoPs8I081.png" target="_blank"></a>

生産者生産了3條消息,一個消費者線程1消費了這3條資料,另一個消費者線程2也消費了這3條資料,2者是并行的,待消費者線程1和2完畢後,3條資料交給消費者線程3處理。

如果我們想順序的按照A-&gt;B-&gt;C呢?

<code>        </code><code>disruptor.handleEventsWith(</code><code>new</code> <code>Handler1()).</code>

<code>            </code><code>handleEventsWith(</code><code>new</code> <code>Handler2()).</code>

<code>            </code><code>handleEventsWith(</code><code>new</code> <code>Handler3());</code>

如果我們想六邊形操作呢?

<a href="http://s4.51cto.com/wyfs02/M01/8B/FC/wKioL1heZe3QoLB9AACQcTxdeJ4198.png" target="_blank"></a>

<code>        </code><code>Handler1 h1 = </code><code>new</code> <code>Handler1();</code>

<code>        </code><code>Handler2 h2 = </code><code>new</code> <code>Handler2();</code>

<code>        </code><code>Handler3 h3 = </code><code>new</code> <code>Handler3();</code>

<code>        </code><code>Handler4 h4 = </code><code>new</code> <code>Handler4();</code>

<code>        </code><code>Handler5 h5 = </code><code>new</code> <code>Handler5();</code>

<code>        </code><code>disruptor.handleEventsWith(h1, h2);</code>

<code>        </code><code>disruptor.after(h1).handleEventsWith(h4);</code>

<code>        </code><code>disruptor.after(h2).handleEventsWith(h5);</code>

<code>        </code><code>disruptor.after(h4, h5).handleEventsWith(h3);</code>

到這裡相信你對Disruptor已經有所了解了,那麼多個生産者多個消費者如何實作呢,其實和上面的代碼非常類似,無非是多個生産者都持有RingBuffer可以publish而已。

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1885830,如需轉載請自行聯系原作者