天天看點

比AtomicLong還高效的LongAdder 源碼解析

接觸到atomiclong的原因是在看guava的loadingcache相關代碼時,關于loadingcache,其實思路也非常簡單清晰:用模闆模式解決了緩存不命中時擷取資料的邏輯,這個思路我早前也正好在項目中使用到。

言歸正傳,為什麼說longadder引起了我的注意,原因有二:

1. 作者是doug lea ,地位實在舉足輕重。

2. 他說這個比atomiclong高效。

我們知道,atomiclong已經是非常好的解決方案了,涉及并發的地方都是使用cas操作,在硬體層次上去做 compare and set操作。效率非常高。

是以,我決定研究下,為什麼longadder比atomiclong高效。

首先,看longadder的繼承樹:

比AtomicLong還高效的LongAdder 源碼解析

繼承自striped64,這個類包裝了一些很重要的内部類和操作。稍候會看到。

<b>正式開始前,強調下,我們知道,atomiclong的實作方式是内部有個value 變量,當多線程并發自增,自減時,均通過cas 指令從機器指令級别操作保證并發的原子性。</b>

比AtomicLong還高效的LongAdder 源碼解析

怪不得可以和atomiclong作比較,連api都這麼像。我們随便挑一個api入手分析,這個api通了,其他api都大同小異,是以,我選擇了add這個方法。事實上,其他api也都依賴這個方法。

比AtomicLong還高效的LongAdder 源碼解析

longadder中包含了一個cell 數組,cell是striped64的一個内部類,顧名思義,cell 代表了一個最小單元,這個單元有什麼用,稍候會說道。先看定義:

比AtomicLong還高效的LongAdder 源碼解析

cell内部有一個非常重要的value變量,并且提供了一個cas更新其值的方法。

回到add方法:

比AtomicLong還高效的LongAdder 源碼解析

這裡,我有個疑問,atomiclong已經使用cas指令,非常高效了(比起各種鎖),longadder如果還是用cas指令更新值,怎麼可能比atomiclong高效了? 何況内部還這麼多判斷!!!

這是我開始時最大的疑問,是以,我猜想,難道有比cas指令更高效的方式出現了? 帶着這個疑問,繼續。

第一if 判斷,第一次調用的時候cells數組肯定為null,是以,進入casbase方法:

比AtomicLong還高效的LongAdder 源碼解析

原子更新base沒啥好說的,如果更新成功,本地調用開始傳回,否則進入分支内部。

什麼時候會更新失敗? 沒錯,并發的時候,好戲開始了,atomiclong的處理方式是死循環嘗試更新,直到成功才傳回,而longadder則是進入這個分支。

分支内部,通過一個threadlocal變量threadhashcode 擷取一個hashcode對象,該hashcode對象依然是striped64類的内部類,看定義:

比AtomicLong還高效的LongAdder 源碼解析

有個code變量,儲存了一個非0的随機數随機值。

比AtomicLong還高效的LongAdder 源碼解析

拿到該線程相關的hashcode對象後,擷取它的code變量,as[(n-1)h] 這句話相當于對h取模,隻不過比起取摸,因為是 與 的運算是以效率更高。

計算出一個在cells 數組中當先線程的hashcode對應的 索引位置,并将該位置的cell 對象拿出來更新cas 更新它的value值。

當然,如果as 為null 并且更新失敗,才會進入retryupdate方法。

看到這裡我想應該有很多人明白為什麼longadder會比atomiclong更高效了,沒錯,唯一會制約atomiclong高效的原因是高并發,高并發意味着cas的失敗幾率更高, 重試次數更多,越多線程重試,cas失敗幾率又越高,變成惡性循環,atomiclong效率降低。 那怎麼解決? longadder給了我們一個非常容易想到的解決方案: 減少并發,将單一value的更新壓力分擔到多個value中去,降低單個value的 “熱度”,分段更新!!! 這樣,線程數再多也會分擔到多個value上去更新,隻需要增加value就可以降低 value的 “熱度” atomiclong中的 惡性循環不就解決了嗎? cells 就是這個 “段” cell中的value 就是存放更新值的, 這樣,當我需要總數時,把cells 中的value都累加一下不就可以了麼!!

當然,聰明之處遠遠不僅僅這裡,在看看add方法中的代碼,casbase方法可不可以不要,直接分段更新,上來就計算 索引位置,然後更新value?

答案是不好,不是不行,因為,casbase操作等價于atomiclong中的cas操作,要知道,longadder這樣的處理方式是有壞處的,分段操作必然帶來空間上的浪費,可以空間換時間,但是,能不換就不換,看空間時間都節約~! 是以,casbase操作保證了在低并發時,不會立即進入分支做分段更新操作,因為低并發時,casbase操作基本都會成功,隻有并發高到一定程度了,才會進入分支,是以,doug lead對該類的說明是: 低并發時longadder和atomiclong性能差不多,高并發時longadder更高效!

比AtomicLong還高效的LongAdder 源碼解析

但是,doung lea 還是沒這麼簡單,聰明之處還沒有結束……

如此,retryupdate中做了什麼事,也基本略知一二了,因為cell中的value都更新失敗(說明該索引到這個cell的線程也很多,并發也很高時) 或者cells數組為空時才會調用retryupdate,

是以,retryupdate裡面應該會做兩件事:

1. 擴容,将cells數組擴大,降低每個cell的并發量,同樣,這也意味着cells數組的rehash動作。

2. 給空的cells變量賦一個新的cell數組。

是不是這樣呢? 繼續看代碼:

代碼比較長,變成文本看看,為了友善大家看if else 分支,對應的 { } 我用相同的顔色标注出來

可以看到,這個時候doug lea才願意使用死循環保證更新成功~!

<code>01</code>

<code>final</code> <code>void</code> <code>retryupdate(</code><code>long</code> <code>x, hashcode hc,</code><code>boolean</code> <code>wasuncontended) {</code>

<code>02</code>

<code></code><code>int</code> <code>h = hc.code;</code>

<code>03</code>

<code></code><code>boolean</code> <code>collide =</code><code>false</code><code>;</code><code>// true if last slot nonempty</code>

<code>04</code>

<code></code><code>for</code> <code>(;;) {</code>

<code>05</code>

<code></code><code>cell[] as; cell a;</code><code>int</code> <code>n;</code><code>long</code> <code>v;</code>

<code>06</code>

<code></code><code>if</code> <code>((as = cells) !=</code><code>null</code> <code>&amp;amp;&amp;amp; (n = as.length) &amp;gt;</code><code>0</code><code>) {</code><code>// 分支1</code>

<code>07</code>

<code></code><code>if</code> <code>((a = as[(n -</code><code>1</code><code>) &amp;amp; h]) ==</code><code>null</code><code>) {</code>

<code>08</code>

<code></code><code>if</code> <code>(busy ==</code><code>0</code><code>) {</code><code>// try to attach new cell</code>

<code>09</code>

<code></code><code>cell r =</code><code>new</code> <code>cell(x);</code><code>// optimistically create</code>

<code>10</code>

<code></code><code>if</code> <code>(busy ==</code><code>0</code> <code>&amp;amp;&amp;amp; casbusy()) {</code>

<code>11</code>

<code></code><code>boolean</code> <code>created =</code><code>false</code><code>;</code>

<code>12</code>

<code></code><code>try</code> <code>{</code><code>// recheck under lock</code>

<code>13</code>

<code></code><code>cell[] rs;</code><code>int</code> <code>m, j;</code>

<code>14</code>

<code></code><code>if</code> <code>((rs = cells) !=</code><code>null</code> <code>&amp;amp;&amp;amp;</code>

<code>15</code>

<code></code><code>(m = rs.length) &amp;gt;</code><code>0</code> <code>&amp;amp;&amp;amp;</code>

<code>16</code>

<code></code><code>rs[j = (m -</code><code>1</code><code>) &amp;amp; h] ==</code><code>null</code><code>) {</code>

<code>17</code>

<code></code><code>rs[j] = r;</code>

<code>18</code>

<code></code><code>created =</code><code>true</code><code>;</code>

<code>19</code>

<code></code><code>}</code>

<code>20</code>

<code></code><code>}</code><code>finally</code> <code>{</code>

<code>21</code>

<code></code><code>busy =</code><code>0</code><code>;</code>

<code>22</code>

<code>23</code>

<code></code><code>if</code> <code>(created)</code>

<code>24</code>

<code></code><code>break</code><code>;</code>

<code>25</code>

<code></code><code>continue</code><code>;</code><code>// slot is now non-empty</code>

<code>26</code>

<code>27</code>

<code>28</code>

<code></code><code>collide =</code><code>false</code><code>;</code>

<code>29</code>

<code>30</code>

<code></code><code>else</code> <code>if</code> <code>(!wasuncontended)</code><code>// cas already known to fail</code>

<code>31</code>

<code></code><code>wasuncontended =</code><code>true</code><code>;</code><code>// continue after rehash</code>

<code>32</code>

<code></code><code>else</code> <code>if</code> <code>(a.cas(v = a.value, fn(v, x)))</code>

<code>33</code>

<code>34</code>

<code></code><code>else</code> <code>if</code> <code>(n &amp;gt;= ncpu || cells != as)</code>

<code>35</code>

<code></code><code>collide =</code><code>false</code><code>;</code><code>// at max size or stale</code>

<code>36</code>

<code></code><code>else</code> <code>if</code> <code>(!collide)</code>

<code>37</code>

<code></code><code>collide =</code><code>true</code><code>;</code>

<code>38</code>

<code></code><code>else</code> <code>if</code> <code>(busy ==</code><code>0</code> <code>&amp;amp;&amp;amp; casbusy()) {</code>

<code>39</code>

<code></code><code>try</code> <code>{</code>

<code>40</code>

<code></code><code>if</code> <code>(cells == as) {</code><code>// expand table unless stale</code>

<code>41</code>

<code></code><code>cell[] rs =</code><code>new</code> <code>cell[n &amp;lt;&amp;lt;</code><code>1</code><code>];</code>

<code>42</code>

<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &amp;lt; n; ++i)</code>

<code>43</code>

<code></code><code>rs[i] = as[i];</code>

<code>44</code>

<code></code><code>cells = rs;</code>

<code>45</code>

<code>46</code>

<code>47</code>

<code>48</code>

<code>49</code>

<code>50</code>

<code></code><code>continue</code><code>;</code><code>// retry with expanded table</code>

<code>51</code>

<code>52</code>

<code></code><code>h ^= h &amp;lt;&amp;lt;</code><code>13</code><code>;</code><code>// rehash h ^= h &amp;gt;&amp;gt;&amp;gt; 17;</code>

<code>53</code>

<code></code><code>h ^= h &amp;lt;&amp;lt;</code><code>5</code><code>;</code>

<code>54</code>

<code>55</code>

<code></code><code>else</code> <code>if</code> <code>(busy ==</code><code>0</code> <code>&amp;amp;&amp;amp; cells == as &amp;amp;&amp;amp; casbusy()) {</code><code>//分支2</code>

<code>56</code>

<code></code><code>boolean</code> <code>init =</code><code>false</code><code>;</code>

<code>57</code>

<code></code><code>try</code> <code>{</code><code>// initialize table</code>

<code>58</code>

<code></code><code>if</code> <code>(cells == as) {</code>

<code>59</code>

<code></code><code>cell[] rs =</code><code>new</code> <code>cell[</code><code>2</code><code>];</code>

<code>60</code>

<code></code><code>rs[h &amp;amp;</code><code>1</code><code>] =</code><code>new</code> <code>cell(x);</code>

<code>61</code>

<code>62</code>

<code></code><code>init =</code><code>true</code><code>;</code>

<code>63</code>

<code>64</code>

<code>65</code>

<code>66</code>

<code>67</code>

<code></code><code>if</code> <code>(init)</code>

<code>68</code>

<code>69</code>

<code>70</code>

<code></code><code>else</code> <code>if</code> <code>(casbase(v = base, fn(v, x)))</code>

<code>71</code>

<code></code><code>break</code><code>;</code><code>// fall back on using base</code>

<code>72</code>

<code>73</code>

<code></code><code>hc.code = h;</code><code>// record index for next time</code>

<code>74</code>

分支2中,為cells為空的情況,需要new 一個cell數組。

分支1分支中,略複雜一點點:

注意,幾個分支中都提到了busy這個方法,這個可以了解為一個cas實作的鎖,隻有在需要更新cells數組的時候才會更新該值為1,如果更新失敗,則說明目前有線程在更新cells數組,目前線程需要等待。重試。

回到分支1中,這裡首先判斷目前cells數組中的索引位置的cell元素是否為空,如果為空,則添加一個cell到數組中。

否則更新 标示沖突的标志位wasuncontended 為 true ,重試。

否則,再次更新cell中的value,如果失敗,重試。

。。。。。。。一系列的判斷後,如果還是失敗,下下下策,rehash,直接将cells數組擴容一倍,并更新目前線程的hash值,保證下次更新能盡可能成功。

可以看到,longadder确實用了很多心思減少并發量,并且,每一步都是在”沒有更好的辦法“的時候才會選擇更大開銷的操作,進而盡可能的用最最簡單的辦法去完成操作。追求簡單,但是絕對不粗暴。

————————————————分割線——————————————————————-

昨天和左耳朵耗子簡單讨論了下,發現左耳朵耗子對讀者思維的引導還是非常不錯的,在第一次發現這個類後,對裡面的實作又提出了更多的問題,引導大家思考,值得學習。

我們 發現的問題有這麼幾個:

1. jdk 1.7中是不是有這個類?

我确認後,結果如下: jdk-7u51 版本上還沒有 但是jdk-8u20版本上已經有了。代碼基本一樣 ,增加了對double類型的支援和删除了一些備援的代碼。有興趣的同學可以去下載下傳下jdk 1.8看看

2. base有沒有參與彙總?

base在調用intvalue等方法的時候是會彙總的:

比AtomicLong還高效的LongAdder 源碼解析

3. base的順序可不可以調換?

左耳朵耗子,提出了這麼一個問題: 在add方法中,如果cells不會為空後,casbase方法一直都沒有用了?

是以,我想可不可以調換add方法中的判斷順序,比如,先做casbase的判斷,結果是 不調換可能更好,調換後每次都要cas一下,在高并發時,失敗幾率非常高,并且是惡性循環,比起一次判斷,後者的開銷明顯小很多,還沒有副作用。是以,不調換可能會更好。

4. atomiclong可不可以廢掉?