系統能夠每秒處理1000個任務,每一個event至少有2個屬性:
clientid-我們希望每一秒有多個任務是在同一個用戶端下處理的(譯者:不同的clientid對應不同的clientprojection,即對應不同的一系列操作)
uuid-全局唯一的
消費一個任務要花費10毫秒,為這樣的流設計一個消費者:
能夠實時的處理任務
和同一個用戶端有關的任務應該被有序地處理,例如你不能對擁有同一個clientid的任務序列使用并行處理
如果10秒内出現了重複的uuid,丢棄它。假設10秒後不會重複
到目前為止我們提出了線程池和共享緩存結合的設計,而在這篇文章中我們會使用rxjava進行實作。開始之前,我從沒有提到eventstream是如何實作的,僅僅是給出了api:
<code>1</code>
<code>interface</code> <code>eventstream {</code>
<code>2</code>
<code>3</code>
<code> </code><code>void</code> <code>consume(eventconsumer consumer);</code>
<code>4</code>
<code>5</code>
<code>}</code>
事實上為了能夠進行測試,我建立了一個rxjava流,它所有的行為都符合設計要求:
<code>01</code>
<code>@slf4j</code>
<code>02</code>
<code>class</code> <code>eventstream {</code>
<code>03</code>
<code>04</code>
<code> </code><code>void</code> <code>consume(eventconsumer consumer) {</code>
<code>05</code>
<code> </code><code>observe()</code>
<code>06</code>
<code> </code><code>.subscribe(</code>
<code>07</code>
<code> </code><code>consumer::consume,</code>
<code>08</code>
<code> </code><code>e -> log.error(</code><code>"error emitting event"</code><code>, e)</code>
<code>09</code>
<code> </code><code>);</code>
<code>10</code>
<code> </code><code>}</code>
<code>11</code>
<code>12</code>
<code> </code><code>observable<event> observe() {</code>
<code>13</code>
<code> </code><code>return</code> <code>observable</code>
<code>14</code>
<code> </code><code>.interval(</code><code>1</code><code>, timeunit.milliseconds)</code>
<code>15</code>
<code> </code><code>.delay(x -> observable.timer(randomutils.nextint(</code><code>0</code><code>, 1_000), timeunit.microseconds))</code>
<code>16</code>
<code> </code><code>.map(x -> </code><code>new</code> <code>event(randomutils.nextint(1_000, 1_100), uuid.randomuuid()))</code>
<code>17</code>
<code> </code><code>.flatmap(</code><code>this</code><code>::occasionallyduplicate, </code><code>100</code><code>)</code>
<code>18</code>
<code> </code><code>.observeon(schedulers.io());</code>
<code>19</code>
<code>20</code>
<code>21</code>
<code> </code><code>private</code> <code>observable<event> occasionallyduplicate(event x) {</code>
<code>22</code>
<code> </code><code>final</code> <code>observable<event> event = observable.just(x);</code>
<code>23</code>
<code> </code><code>if</code> <code>(math.random() >= </code><code>0.01</code><code>) {</code>
<code>24</code>
<code> </code><code>return</code> <code>event;</code>
<code>25</code>
<code> </code><code>}</code>
<code>26</code>
<code> </code><code>final</code> <code>observable<event> duplicated =</code>
<code>27</code>
<code> </code><code>event.delay(randomutils.nextint(</code><code>10</code><code>, 5_000), timeunit.milliseconds);</code>
<code>28</code>
<code> </code><code>return</code> <code>event.concatwith(duplicated);</code>
<code>29</code>
<code>30</code>
<code>31</code>
雖然你們沒必要明白這個流模拟器是怎麼工作的,但它的工作過程相當有趣。首先我們使用interval()産生一個每毫秒輸出一個long型值(0,1,2)的穩定流(這是因為設計要求系統每秒能處理1000個event)。然後我們使用delay()對每個event進行0到1000微秒内的随機延遲,在這之後events出現的時機就變得不可預測,就更符合真實情況。最終我們使用map()将每個long型值映射到一個随機的event上,每個event都包含一個1000到1100(inclusive-exclusive)之間的clientid。
最後一點就有趣了,我們想模拟随機的重複事件。為了做到這點,我們使用flatmap()将每個event映射到自身(99%情況下)。然而在剩下的1%情況中,我們将event傳回兩次,第二次出現的時間延遲了10ms到5s。實際應用時,重複的event與第一次出現的event之間會相隔幾百個其他的event,這就使得流的行為更加符合真實情況。
有兩種方法可以與eventstream進行互動-基于回調的consume()和基于流的observer()。我們可以利用observable快速地建立處理管道,這種方法的功能和part1中的非常的像但更加簡單。
首先利用rxjava實作最初的方案非常簡短:
<code>eventstream es = </code><code>new</code> <code>eventstream();</code>
<code>eventconsumer clientprojection = </code><code>new</code> <code>clientprojection(</code>
<code> </code><code>new</code> <code>projectionmetrics(</code>
<code> </code><code>new</code> <code>metricregistry()));</code>
<code>es.observe()</code>
<code> </code><code>.subscribe(</code>
<code> </code><code>clientprojection::consume,</code>
<code> </code><code>e -> log.error(</code><code>"fatal error"</code><code>, e)</code>
<code>eventconsumer clientprojection = </code><code>new</code> <code>failonconcurrentmodification(</code>
<code> </code><code>new</code> <code>clientprojection(</code>
<code> </code><code>new</code> <code>projectionmetrics(</code>
<code> </code><code>new</code> <code>metricregistry())));</code>
<code> </code><code>.flatmap(e -> clientprojection.consume(e, schedulers.io()))</code>
<code> </code><code>.window(</code><code>1</code><code>, timeunit.seconds)</code>
<code> </code><code>.flatmap(observable::count)</code>
<code> </code><code>c -> log.info(</code><code>"processed {} events/s"</code><code>, c),</code>
eventconsumer中添加了一個輔助方法,它能夠利用提供的scheduler異步地處理event:
<code>@functionalinterface</code>
<code>interface</code> <code>eventconsumer {</code>
<code> </code><code>event consume(event event);</code>
<code> </code><code>default</code> <code>observable<event> consume(event event, scheduler scheduler) {</code>
<code> </code><code>.fromcallable(() -> </code><code>this</code><code>.consume(event))</code>
<code> </code><code>.subscribeon(scheduler);</code>
使用flatmap()在一個獨立的scheduler.io()中處理event,這樣每一個消費過程都是異步調用的。這次event的處理已經符合實時性的要求了,但還有一個更大的問題。我使用failonconcurrentmodification對clientprojection進行包裝是有原因的。events的處理都是彼此獨立的,是以對于同一個clientid有可能會并發地處理兩個event,這樣并不好。幸運的是比起使用線程來說,用rxjava解決這個問題要更加簡單:
<code> </code><code>.groupby(event::getclientid)</code>
<code> </code><code>.flatmap(byclient -> byclient</code>
<code> </code><code>.observeon(schedulers.io())</code>
<code> </code><code>.map(clientprojection::consume))</code>
上面的代碼改動的地方隻有一點點。首先我們依據clientid對event進行分組,将單一的observable流分割成多個流,每個名為byclient的子流都代表着擁有相同clientid的event。現在如果我們對子流進行映射,我們能夠确定有相同clientid的event是絕不會并發地被處理的。輸出流是惰性的,是以我們必須對流調用subscribe。與其對每一個event單獨地調用subscribe,我們選擇将每一秒内處理的event收集起來并對其計數。這樣一來每秒我們接收到的就是一個integer類型的event,它代表着每秒内我們處理的event數量。
現在我們必須除去重複的uuid,最簡單也是最笨的做法就是利用全局狀态。我們能夠簡單地利用filter()在cache中查找重複的event:
<code>final</code> <code>cache<uuid, uuid> seenuuids = cachebuilder.newbuilder()</code>
<code> </code><code>.expireafterwrite(</code><code>10</code><code>, timeunit.seconds)</code>
<code> </code><code>.build();</code>
<code> </code><code>.filter(e -> seenuuids.getifpresent(e.getuuid()) == </code><code>null</code><code>)</code>
<code> </code><code>.doonnext(e -> seenuuids.put(e.getuuid(), e.getuuid()))</code>
如果你想要監控上面代碼的效果可以簡單的加入一個度量器:
<code>meter duplicates = metricregistry.meter(</code><code>"duplicates"</code><code>);</code>
<code> </code><code>.filter(e -> {</code>
<code> </code><code>if</code> <code>(seenuuids.getifpresent(e.getuuid()) != </code><code>null</code><code>) {</code>
<code> </code><code>duplicates.mark();</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>})</code>
在操作符内部通路全局的、尤其是可變的狀态時是非常危險的,并且這樣會破壞rxjava唯一的目的-簡單并發。雖然我們使用的是guava中線程安全的cache,但在很多情況下你很容易會忘記這個全局共享的可變狀态是可以被多個線程通路的,如果你發現你在操作符鍊中修改外部的一些變量的話,那就要非常小心了。
rxjava 1.x有一個distinct()運算函數,它大概可以做如下的工作:
<code> </code><code>.distinct(event::getuuid)</code>
不幸的是distinct()會在内部将所有的uuid都存儲在一個不斷增長的hashset裡面,但我們隻關心10s内的重複事件。通過複制粘貼distinctoperator的實作,我創造了distinctevent操作符,它利用了guava的cache僅僅隻存儲10s内的uuid。我故意将event寫死在這個操作符内而不是将它寫成一般性的就是為了讓代碼更易懂:
<code>class</code> <code>distinctevent </code><code>implements</code> <code>observable.operator<event, event> {</code>
<code> </code><code>private</code> <code>final</code> <code>duration duration;</code>
<code> </code><code>distinctevent(duration duration) {</code>
<code> </code><code>this</code><code>.duration = duration;</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>subscriber<? </code><code>super</code> <code>event> call(subscriber<? </code><code>super</code> <code>event> child) {</code>
<code> </code><code>return</code> <code>new</code> <code>subscriber<event>(child) {</code>
<code> </code><code>final</code> <code>map<uuid, boolean> keymemory = cachebuilder.newbuilder()</code>
<code> </code><code>.expireafterwrite(duration.tomillis(), timeunit.milliseconds)</code>
<code> </code><code>.<uuid, boolean>build().asmap();</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>onnext(event event) {</code>
<code> </code><code>if</code> <code>(keymemory.put(event.getuuid(), </code><code>true</code><code>) == </code><code>null</code><code>) {</code>
<code> </code><code>child.onnext(event);</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>request(</code><code>1</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>void</code> <code>onerror(throwable e) {</code>
<code> </code><code>child.onerror(e);</code>
<code> </code><code>public</code> <code>void</code> <code>oncompleted() {</code>
<code> </code><code>child.oncompleted();</code>
<code>32</code>
<code>33</code>
<code>34</code>
<code> </code><code>};</code>
<code>35</code>
<code>36</code>
自定義的操作符使用起來非常簡單,實作如下:
<code> </code><code>.lift(</code><code>new</code> <code>distinctevent(duration.ofseconds(</code><code>10</code><code>)))</code>
<code> </code><code>.map(clientprojection::consume)</code>
<code> </code><code>)</code>
事實上如果我們跳過每秒的logging實作可以變得更加簡單:
<code> </code><code>e -> {},</code>
這個方案比之前的基于線程池和裝飾者模式的要更加簡短,其中唯一麻煩的部分就是在自定義的操作符中當存儲了太多的uuid之後會造成記憶體洩漏,幸好rxjava 2能解決這個問題。
distinct()允許使用自定義的collection而不必使用内置的hashset(感覺2.x中可以使用自定義的資料結構後,1.x中的distinctevent就完全沒必要了)。不管你是否相信,依賴倒置不僅僅隻出現在spring架構或者java ee中。當一個庫允許你提供它内部資料結構的自定義實作時,這就已經是依賴反轉。首先我創造了一個輔助方法,它能夠建立set,set由map提供依賴,而map則由cache提供依賴。這就像委托一樣!
<code>private</code> <code>set<uuid> recentuuids() {</code>
<code> </code><code>return</code> <code>collections.newsetfrommap(</code>
<code> </code><code>cachebuilder.newbuilder()</code>
<code> </code><code>.expireafterwrite(</code><code>10</code><code>, timeunit.seconds)</code>
<code> </code><code>.<uuid, boolean>build()</code>
<code>6</code>
<code> </code><code>.asmap()</code>
<code>7</code>
<code> </code><code>);</code>
<code>8</code>
有了這個方法之後,我們就能利用以下的代碼實作整個任務:
<code> </code><code>.distinct(event::getuuid, </code><code>this</code><code>::recentuuids)</code>
這段代碼是如此的優雅、簡單、清晰!它的大緻流程如下:
observe一個event流
消除重複的uuid
依據clientid對event分組
對每一個client有序地處理event
希望你能喜歡這些方案,并能将之運用到你的日常生活中去。
<a href="http://www.nurkiewicz.com/2016/10/small-scale-stream-processing-kata-part.html">small scale stream processing kata. part 1: thread pools</a>
small scale stream processing kata. part 2: rxjava 1.x/2.x