在這種情況下,隻要生産者(p1)将元素放到ring buffer上,消費者c1和c2就可以并行處理這些元素。但是消費者c3必須一直等到c1和c2處理完之後,才可以處理。在現實世界中的對應的案例就像:在處理實際的業務邏輯(c3)之前,需要校驗資料(c1),以及将資料寫入磁盤(c2)。
用原生的disruptor文法來建立這些消費者的話代碼如下:
<a href="http://ifeve.com/disruptor-dsl/#viewsource">檢視源代碼</a>
<code>01</code>
<code>executor executor = executors.newcachedthreadpool();</code>
<code>02</code>
<code>batchhandler handler1 =</code><code>new</code> <code>mybatchhandler1();</code>
<code>03</code>
<code>batchhandler handler2 =</code><code>new</code> <code>mybatchhandler2();</code>
<code>04</code>
<code>batchhandler handler3 =</code><code>new</code> <code>mybatchhandler3()</code>
<code>05</code>
<code>ringbuffer ringbuffer =</code><code>new</code> <code>ringbuffer(entry_factory, ring_buffer_size);</code>
<code>06</code>
<code>consumerbarrier consumerbarrier1 = ringbuffer.createconsumerbarrier();</code>
<code>07</code>
<code>batchconsumer consumer1 =</code><code>new</code> <code>batchconsumer(consumerbarrier1, handler1);</code>
<code>08</code>
<code>batchconsumer consumer2 =</code><code>new</code> <code>batchconsumer(consumerbarrier1, handler2);</code>
<code>09</code>
<code>consumerbarrier consumerbarrier2 =</code>
<code>10</code>
<code>ringbuffer.createconsumerbarrier(consumer1, consumer2);</code>
<code>11</code>
<code>batchconsumer consumer3 =</code><code>new</code> <code>batchconsumer(consumerbarrier2, handler3);</code>
<code>12</code>
<code>executor.execute(consumer1);</code>
<code>13</code>
<code>executor.execute(consumer2);</code>
<code>14</code>
<code>executor.execute(consumer3);</code>
<code>15</code>
<code>producerbarrier producerbarrier =</code>
<code>16</code>
<code>ringbuffer.createproducerbarrier(consumer3);</code>
在以上這段代碼中,我們不得不建立那些個handler(就是那些個mybatchhandler執行個體),外加消費者屏障,batchconsumer執行個體,然後在他們各自的線程中處理這些消費者。dsl能幫我們完成很多建立工作,最終的結果如下:
<code>1</code>
<code>2</code>
<code>3</code>
<code>4</code>
<code>batchhandler handler3 =</code><code>new</code> <code>mybatchhandler3();</code>
<code>5</code>
<code>disruptorwizard dw =</code><code>new</code> <code>disruptorwizard(entry_factory,</code>
<code>6</code>
<code> </code><code>ring_buffer_size, executor);</code>
<code>7</code>
<code>dw.consumewith(handler1, handler2).then(handler3);</code>
<code>8</code>
<code>producerbarrier producerbarrier = dw.createproducerbarrier();</code>
我們甚至可以在一個更複雜的六邊形模式中建構一個并行消費者鍊:
<code>dw.consumewith(handler1a, handler2a);</code>
<code>dw.after(handler1a).consumewith(handler1b);</code>
<code>dw.after(handler2a).consumewith(handler2b);</code>
<code>dw.after(handler1b, handler2b).consumewith(handler3);</code>