消費者 c3 也許是你的業務邏輯。消費者 c1 可能在備份接收到的資料,而消費者 c2 可能在準備資料或者别的東西。
(為什麼單詞 queue 裡必須有這麼多 “e” 呢?這是我在畫這些圖時遇到的最麻煩的詞)。
你也許從這裡看到了問題的端倪:一條消息從 p1 傳輸到 c3 要完整的穿過四個隊列,每個隊列在消息進入隊列和取出隊列時都會産生消耗成本。
這張圖看起來更複雜。不過所有的參與者都隻依賴 ring buffer 作為一個單獨的聯結點,而且所有的互動都是基于 barrier 對象與檢查依賴的目标序号來實作的。
中描述過的單生産者模型。有趣的是,生産者并不需要關心所有的消費者。它隻關心消費者 c3,如果消費者 c3 處理完了 ring buffer
的某一個節點,那麼另外兩個消費者肯定也處理完了。是以,隻要 c3 的位置向前移動,ring buffer 的後續節點就會空閑出來。
管理消費者的依賴關系需要兩個 consumerbarrier 對象。第一個僅僅與 ring buffer 互動,c1 和 c2
消費者向它申請下一個可通路節點。第二個 consumerbarrier 隻知道消費者 c1 和
c2,它傳回兩個消費者通路過的消息序号中較小的那個。
hmmm。我想需要一個例子。
我們從這個故事發生到一半的時候來看:生産者 p1 已經在 ring buffer 裡寫到序号 22 了,消費者 c1
已經通路和處理完了序号 21 之前的所有資料。消費者 c2 處理到了序号 18。消費者 c3,就是依賴其他消費者的那個,才處理到序号 15。
生産者 p1 不能繼續向 ringbuffer 寫入資料了,因為序号 15 占據了我們想要寫入序号 23 的資料節點 (slot)。
(抱歉,我真的試過用其他顔色來代替紅色和綠色,但是别的都更容易混淆。)
第一個 consumerbarrier(cb1)告訴 c1 和 c2 消費者可以去通路序号 22 前面的所有資料,這是 ring
buffer 中的最大序号。第二個 consumerbarrier (cb2) 不但會檢查 ringbuffer
的序号,也會檢查另外兩個消費者的序号并且傳回它們之間的最小值。是以,三号消費者被告知可以通路 ring buffer 裡序号 18 前面的資料。
注意這些消費者還是直接從 ring buffer 拿資料節點——并不是由 c1 和 c2 消費者把資料節點從 ring buffer
裡取出再傳遞給 c3 消費者的。作為替代的是,由第二個 consumerbarrier 告訴 c3 消費者,在 ringbuffer
裡的哪些節點可以安全的處理。
這産生了一個問題——如果任何資料都來自于 ring buffer,那麼 c3 消費者如何讀到前面兩個消費者處理完成的資料呢?如果 c3
消費者關心的隻是先前的消費者是否已經完成它們的工作(例如,把資料複制到别的地方),那麼這一切都沒有問題—— c3
消費者知道工作已完成就放心了。但是,如果 c3 消費者需要通路先前的消費者的處理結果,它又從哪裡去擷取呢?
秘密在于把處理結果寫入 ring buffer 資料節點 (entry) 本身。這樣,當 c3 消費者從 ring buffer 取出節點時,它已經填充好了 c3 消費者工作需要的所有資訊。這裡 真正 重要的地方是節點 (entry) 對象的每一個字段應該隻允許一個消費者寫入。這可以避免産生并發寫入沖突 (write-contention) 減慢了整個處理過程。
有兩個字段:fizz 和 buzz。如果消費者是 fizz consumer, 它隻寫入字段 fizz。如果是 buzz consumer,
它隻寫入字段 buzz。第三個消費者 fizzbuzz,它隻去讀這兩個字段但是不會做寫入,因為讀沒問題,不會引起争用。
這一切看起來都要比隊列實作更複雜。是的,它涉及到更多的内部協調。但是這些細節對于消費者和生産者是隐藏的,它們隻和 barrier 對象互動。訣竅在消費者結構裡。上文例子中提到的菱形結構可以用下面的方法建立:
<code>01</code>
<code>consumerbarrier consumerbarrier1 =</code>
<code>02</code>
<code> </code><code>ringbuffer.createconsumerbarrier();</code>
<code>03</code>
<code>batchconsumer consumer1 =</code>
<code>04</code>
<code> </code><code>new</code> <code>batchconsumer(consumerbarrier1, handler1);</code>
<code>05</code>
<code>batchconsumer consumer2 =</code>
<code>06</code>
<code> </code><code>new</code> <code>batchconsumer(consumerbarrier1, handler2);</code>
<code>07</code>
<code>consumerbarrier consumerbarrier2 =</code>
<code>08</code>
<code> </code><code>ringbuffer.createconsumerbarrier(consumer1, consumer2);</code>
<code>09</code>
<code>batchconsumer consumer3 =</code>
<code>10</code>
<code> </code><code>new</code> <code>batchconsumer(consumerbarrier2, handler3);</code>
<code>11</code>
<code>producerbarrier producerbarrier =</code>
<code>12</code>
<code> </code><code>ringbuffer.createproducerbarrier(consumer3);</code>
現在你知道了——如何關聯 disruptor 與互相依賴(等待)的多個消費者。關鍵點是:
使用多個 consumerbarrier 來管理消費者之間的依賴(等待)關系。
使用 producerbarrier 監視結構圖中最後一個消費者。
隻允許一個消費者更新資料節點 (entry) 的每一個獨立字段。