天天看點

LMAX Disruptor——一個高性能、低延遲且簡單的架構

LMAX Disruptor——一個高性能、低延遲且簡單的架構

在這種情況下,隻要生産者(p1)将元素放到ring buffer上,消費者c1和c2就可以并行處理這些元素。但是消費者c3必須一直等到c1和c2處理完之後,才可以處理。在現實世界中的對應的案例就像:在處理實際的業務邏輯(c3)之前,需要校驗資料(c1),以及将資料寫入磁盤(c2)。

用原生的disruptor文法來建立這些消費者的話代碼如下:

<a href="http://ifeve.com/disruptor-dsl/#viewsource">檢視源代碼</a>

<code>01</code>

<code>executor executor = executors.newcachedthreadpool();</code>

<code>02</code>

<code>batchhandler handler1 =</code><code>new</code> <code>mybatchhandler1();</code>

<code>03</code>

<code>batchhandler handler2 =</code><code>new</code> <code>mybatchhandler2();</code>

<code>04</code>

<code>batchhandler handler3 =</code><code>new</code> <code>mybatchhandler3()</code>

<code>05</code>

<code>ringbuffer ringbuffer =</code><code>new</code> <code>ringbuffer(entry_factory, ring_buffer_size);</code>

<code>06</code>

<code>consumerbarrier consumerbarrier1 = ringbuffer.createconsumerbarrier();</code>

<code>07</code>

<code>batchconsumer consumer1 =</code><code>new</code> <code>batchconsumer(consumerbarrier1, handler1);</code>

<code>08</code>

<code>batchconsumer consumer2 =</code><code>new</code> <code>batchconsumer(consumerbarrier1, handler2);</code>

<code>09</code>

<code>consumerbarrier consumerbarrier2 =</code>

<code>10</code>

<code>ringbuffer.createconsumerbarrier(consumer1, consumer2);</code>

<code>11</code>

<code>batchconsumer consumer3 =</code><code>new</code> <code>batchconsumer(consumerbarrier2, handler3);</code>

<code>12</code>

<code>executor.execute(consumer1);</code>

<code>13</code>

<code>executor.execute(consumer2);</code>

<code>14</code>

<code>executor.execute(consumer3);</code>

<code>15</code>

<code>producerbarrier producerbarrier =</code>

<code>16</code>

<code>ringbuffer.createproducerbarrier(consumer3);</code>

在以上這段代碼中,我們不得不建立那些個handler(就是那些個mybatchhandler執行個體),外加消費者屏障,batchconsumer執行個體,然後在他們各自的線程中處理這些消費者。dsl能幫我們完成很多建立工作,最終的結果如下:

<code>1</code>

<code>2</code>

<code>3</code>

<code>4</code>

<code>batchhandler handler3 =</code><code>new</code> <code>mybatchhandler3();</code>

<code>5</code>

<code>disruptorwizard dw =</code><code>new</code> <code>disruptorwizard(entry_factory,</code>

<code>6</code>

<code>    </code><code>ring_buffer_size, executor);</code>

<code>7</code>

<code>dw.consumewith(handler1, handler2).then(handler3);</code>

<code>8</code>

<code>producerbarrier producerbarrier = dw.createproducerbarrier();</code>

我們甚至可以在一個更複雜的六邊形模式中建構一個并行消費者鍊:

LMAX Disruptor——一個高性能、低延遲且簡單的架構

<code>dw.consumewith(handler1a, handler2a);</code>

<code>dw.after(handler1a).consumewith(handler1b);</code>

<code>dw.after(handler2a).consumewith(handler2b);</code>

<code>dw.after(handler1b, handler2b).consumewith(handler3);</code>