天天看點

Storm-源碼分析- Disruptor在storm中的使用

disruptor為了更便于使用, 在2.0做了比較大的調整, 比較突出的是更換了幾乎所有的概念名

老版本,

Storm-源碼分析- Disruptor在storm中的使用

新版本,

Storm-源碼分析- Disruptor在storm中的使用

從左到右的變化如下,

1. producer –> publisher 

2. producerbarrier被integrate到ringbuffer裡面, 叫做publishport, 提供publish接口 

3. entry –> event 

4, cursor封裝成sequence, 其實sequence就是将cursor+pading封裝一下 

5. consumer –> eventprocesser 

6. consumerbarrier 變為dependencybarrier, 或sequencebarrier

并且對于publisher和eventprocesser, 存在claimstrategy和waitstrategy 

對于publisher的claimstrategy, 由于publisher需要先claim到sequencer才能publish: singlethreadedclaimstrategy, multithreadedclaimstrategy, 應該是對于singlethread不需要使用cas更為高效 

對于eventprocesser的waitstrategy, 當取不到資料的時候采用什麼樣的政策進行等待: blockingwaitstrategy, busyspinwaitstrategy, sleepingwaitstrategy, yieldingwaitstrategy 

blocking就是同步加鎖, busyspin就是忙等耗cpu, 都比較低效 

yielding就是調用thread.yeild(), 把線程的從可執行狀态調整成就緒裝, 意思我先息下, 你們忙你們先來, 就是把cpu讓給其他的線程, 但是yeild并不保證過多久線程被執行, 如果沒有其他線程, 可能會被立即執行 

而sleep, 會強制線程休眠指定時間, 然後再重新排程

首先聲明一組變量, 部分會在構造函數中被初始化 

最重要的結構就是ringbuffer, 這是個模闆類, 這裡從objecteventfactory()的實作也可以看出來, 初始化的時候在ringbuffer的每個entry上都建立一個mutableobject對象

mutableobject的實作很簡單, 這是封裝了object o, 為什麼要做這層封裝? 

為了避免java gc, 對于ringbuffer一旦初始化好, 上面的所有的mutableobject都不會被釋放, 你隻是去對object o, set不同的值

publish過程, 可見目前producerbarrier已經被內建到ringbuffer裡面, 是以直接調用_buffer的接口 

首先調用next, claim序号 

取出序号上的mutableobject, 并将輸入obj set 

最後, publish目前序号, 表示consumer可以讀取 

當consumer沒有start時, 會将obj cache在_cache中, 而不會放到ringbuffer中 (我沒有想明白why? 為何要使用低效的連結清單queue來cache, 而不直接放到ringbuffer裡面)

consume的過程, 這裡實作的時batch consume, 即給定cursor, 會一直consume到該cursor為止

_consumer代表目前已經被consume的序号, 是以從_consumer.get() + 1開始讀 

取出mutableobject中的o, 并将mutableobject 清空 

根據o的情況, 3種情況, 

    1. 如果是flush_cache對象, 将cache中的event讀出調用handler.onevent 

    2. 如果是interrupt對象, 觸發interruptedexception 

    3. 正常情況, 直接調用handler.onevent處理該o, curr == cursor判斷表示batch是否結束, 當讀到cursor的時候結束

最終将_consumer置為cursor, 表示已經讀到cursor位置

建立disruptorqueue, 選用multithreadedclaimstrategy和blockingwaitstrategy

并封裝一系列java接口 

最重要的工作是, 啟動consume-loop 

這裡ret是closeover了一個間隔為0的不停執行(consume-batch-when-available queue handler) 的線程, 而consumebatchwhenavailable的實作就是不停的sleep并調用consumebatchtocursor

并且通過consumer-started!通知其他線程consumer已經start

看看async-loop實作什麼功能?  傳回reify實作的record, 其中closeover了thread  這個thread主要就是死循環的執行傳入的afn, 并且以afn的傳回值作為執行間隔 主要功能, 異步的loop, 開啟新的線程來執行loop, 而不是在目前主線程, 并且提供了sleep設定
Storm-源碼分析- Disruptor在storm中的使用

 本文章摘自部落格園,原文釋出日期:2013-07-10