天天看點

Disruptor入門

翻譯自disruptor git庫教程   英文位址

可以通過maven或者下載下傳jar來安裝disruptor。隻要把對應的jar放在java classpath就可以了。

我們從一個簡單的例子開始學習disruptor:生産者傳遞一個long類型的值給消費者,而消費者消費這個資料的方式僅僅是把它列印出來。首先聲明一個event來包含需要傳遞的資料:

由于需要讓disruptor為我們建立事件,我們同時還聲明了一個eventfactory來執行個體化event對象。

我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的資料列印到終端:

事件都會有一個生成事件的源,這個例子中假設事件是由于磁盤io或者network讀取資料的時候觸發的,事件源使用一個bytebuffer來模拟它接受到的資料,也就是說,事件源會在io讀取到一部分資料的時候觸發事件(觸發事件不是自動的,程式員需要在讀取到資料的時候自己觸發事件并釋出):

很明顯的是:當用一個簡單隊列來釋出事件的時候會牽涉更多的細節,這是因為事件對象還需要預先建立。釋出事件最少需要兩步:擷取下一個事件槽并釋出事件(釋出事件的時候要使用try/finnally保證事件一定會被釋出)。如果我們使用ringbuffer.next()擷取一個事件槽,那麼一定要釋出對應的事件。如果不能釋出事件,那麼就會引起disruptor狀态的混亂。尤其是在多個事件生産者的情況下會導緻事件消費者失速,進而不得不重新開機應用才能會恢複。

disruptor 3.0提供了lambda式的api。這樣可以把一些複雜的操作放在ring buffer,是以在disruptor3.0以後的版本最好使用event publisher或者event translator來釋出事件。

上面寫法的另一個好處是,translator可以分離出來并且更加容易單元測試。disruptor提供了不同的接口(eventtranslator, eventtranslatoronearg, eventtranslatortwoarg, 等等)去産生一個translator對象。很明顯,translator中方法的參數是通過ringbuffer來傳遞的。

最後一步就是把所有的代碼組合起來完成一個完整的事件處理系統。disruptor在這方面做了簡化,使用了dsl風格的代碼(其實就是按照直覺的寫法,不太能算得上真正的dsl)。雖然dsl的寫法比較簡單,但是并沒有提供所有的選項。如果依靠dsl已經可以處理大部分情況了。

disruptor在自己的接口裡面添加了對于java 8 lambda的支援。大部分disruptor中的接口都符合functional interface的要求(也就是在接口中僅僅有一個方法)。是以在disruptor中,可以廣泛使用lambda來代替自定義類。

在上面的代碼中,有很多自定義類型可以被省略了。還有注意的是:publishevent方法中僅調用傳遞給它的參數,并不是直接調用對應的對象。如果把這段代碼換成下面的代碼:

這段代碼中有一個捕獲參數的lambda,意味着在lambda表達式生成的内部類中會生成一個對象來存儲這個捕獲的bb對象。這會增加不必要的gc。是以在需要較低gc水準的情況下最好把所有的參數都通過publishevent傳遞。

由于在java 8中方法引用也是一個lambda,是以還可以把上面的代碼改成下面的代碼:

上面的代碼已經可以處理大多數的情況了,但是在有的時候還是會需要根據不同的軟體或者硬體來調整選項以獲得更高的性能。基本的選項有兩個:單或者多生産者模式和可選的等待政策。

在并發系統中提高性能最好的方式之一就是單一寫者原則,對disruptor也是适用的。如果在你的代碼中僅僅有一個事件生産者,那麼可以設定為單一生産者模式來提高系統的性能。

為了證明,下面的資料是從mac air i7上面測試的結果:

disruptor預設的等待政策是blockingwaitstrategy。這個政策的内部适用一個鎖和條件變量來控制線程的執行和等待(java基本的同步方法)。blockingwaitstrategy是最慢的等待政策,但也是cpu使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。

和blockingwaitstrategy一樣,spleepingwaitstrategy的cpu使用率也比較低。它的方式是循環等待并且在循環中間調用locksupport.parknanos(1)來睡眠,(在linux系統上面睡眠時間60µs).然而,它的優點在于生産線程隻需要計數,而不執行任何指令。并且沒有條件變量的消耗。但是,事件對象從生産者到消費者傳遞的延遲變大了。sleepingwaitstrategy最好用在不需要低延遲,而且事件釋出對于生産者的影響比較小的情況下。比如異步日志功能。

yieldingwaitstrategy是可以被用在低延遲系統中的兩個政策之一,這種政策在減低系統延遲的同時也會增加cpu運算量。yieldingwaitstrategy政策會循環等待sequence增加到合适的值。循環中調用thread.yield()允許其他準備好的線程執行。如果需要高性能而且事件消費者線程比邏輯核心少的時候,推薦使用yieldingwaitstrategy政策。例如:在開啟超線程的時候。

busyspinwaitstrategy是性能最高的等待政策,同時也是對部署環境要求最高的政策。這個性能最好用在事件處理線程比實體核心數目還要小的時候。例如:在禁用超線程技術的時候。