天天看點

如何使用Disruptor(二)如何從Ringbuffer讀取(轉)

ConsumerBarrier與消費者

這裡我要稍微反過來介紹,因為總的來說讀取資料這一過程比寫資料要容易了解。假設通過一些“魔法”已經把資料寫入到 Ring Buffer了,怎樣從 Ring Buffer讀出這些資料呢?

如何使用Disruptor(二)如何從Ringbuffer讀取(轉)

(好,我開始後悔使用Paint/Gimp 了。盡管這是個購買繪圖闆的好借口,如果我繼續寫下去的話… UML界的權威們大概也在詛咒我的名字了。)

消費者(Consumer)是一個想從Ring Buffer裡讀取資料的線程,它可以通路ConsumerBarrier對象——這個對象由RingBuffer建立并且代表消費者與RingBuffer進行互動。就像Ring Buffer顯然需要一個序号才能找到下一個可用節點一樣,消費者也需要知道它将要處理的序号——每個消費者都需要找到下一個它要通路的序号。在上面的例子中,消費者處理完了Ring Buffer裡序号8之前(包括8)的所有資料,那麼它期待通路的下一個序号是9。

消費者可以調用ConsumerBarrier對象的waitFor()方法,傳遞它所需要的下一個序号.

1

final

long

availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier傳回RingBuffer的最大可通路序号——在上面的例子中是12。ConsumerBarrier有一個WaitStrategy方法來決定它如何等待這個序号,我現在不會去描述它的細節,代碼的注釋裡已經概括了每一種WaitStrategy的優點和缺點 。

接下來怎麼做?

接下來,消費者會一直原地停留,等待更多資料被寫入Ring Buffer。并且,一旦資料寫入後消費者會收到通知——節點9,10,11和12 已寫入。現在序号12到了,消費者可以讓ConsumerBarrier去拿這些序号節點裡的資料了。

如何使用Disruptor(二)如何從Ringbuffer讀取(轉)

拿到了資料後,消費者(Consumer)會更新自己的辨別(cursor)。

你應該已經感覺得到,這樣做是怎樣有助于平緩延遲的峰值了——以前需要逐個節點地詢問“我可以拿下一個資料嗎?現在可以了麼?現在呢?”,消費者(Consumer)現在隻需要簡單的說“當你拿到的數字比我這個要大的時候請告訴我”,函數傳回值會告訴它有多少個新的節點可以讀取資料了。因為這些新的節點的确已經寫入了資料(Ring Buffer本身的序号已經更新),而且消費者對這些節點的唯一操作是讀而不是寫,是以通路不用加鎖。這太好了,不僅代碼實作起來可以更加安全和簡單,而且不用加鎖使得速度更快。

另一個好處是——你可以用多個消費者(Consumer)去讀同一個RingBuffer ,不需要加鎖,也不需要用另外的隊列來協調不同的線程(消費者)。這樣你可以在Disruptor的協調下實作真正的并發資料處理。

BatchConsumer代碼是一個消費者的例子。如果你實作了BatchHandler, 你可以用BatchConsumer來完成上面我提到的複雜工作。它很容易對付那些需要成批處理的節點(例如上文中要處理的9-12節點)而不用單獨地去讀取每一個節點。

繼續閱讀