天天看點

ReentrantLock(重入鎖)以及公平性簡介實作分析

dedicate to molly.

<b>reentrantlock</b>的實作不僅可以替代隐式的<b>synchronized</b>關鍵字,而且能夠提供超過關鍵字本身的多種功能。

這裡提到一個鎖擷取的公平性問題,如果在絕對時間上,先對鎖進行擷取的請求一定被先滿足,那麼這個鎖是公平的,反之,是不公平的,也就是說等待時間最長的線程最有機會擷取鎖,也可以說鎖的擷取是有序的。reentrantlock這個鎖提供了一個構造函數,能夠控制這個鎖是否是公平的。

而鎖的名字也是說明了這個鎖具備了重複進入的可能,也就是說能夠讓目前線程多次的進行對鎖的擷取操作,這樣的最大次數限制是integer.max_value,約21億次左右。

事實上公平的鎖機制往往沒有非公平的效率高,因為公平的擷取鎖沒有考慮到作業系統對線程的排程因素,這樣造成jvm對于等待中的線程排程次序和作業系統對線程的排程之間的不比對。對于鎖的快速且重複的擷取過程中,連續擷取的機率是非常高的,而公平鎖會壓制這種情況,雖然公平性得以保障,但是響應比卻下降了,但是并不是任何場景都是以tps作為唯一名額的,因為公平鎖能夠減少“饑餓”發生的機率,等待越久的請求越是能夠得到優先滿足。

在reentrantlock中,對于公平和非公平的定義是通過對同步器abstractqueuedsynchronizer的擴充加以實作的,也就是在tryacquire的實作上做了語義的控制。

<code>01</code>

<code>final</code> <code>boolean</code> <code>nonfairtryacquire(</code><code>int</code> <code>acquires) {</code>

<code>02</code>

<code>    </code><code>final</code> <code>thread current = thread.currentthread();</code>

<code>03</code>

<code>    </code><code>int</code> <code>c = getstate();</code>

<code>04</code>

<code>    </code><code>if</code> <code>(c ==</code><code>0</code><code>) {</code>

<code>05</code>

<code>        </code><code>if</code> <code>(compareandsetstate(</code><code>0</code><code>, acquires)) {</code>

<code>06</code>

<code>            </code><code>setexclusiveownerthread(current);</code>

<code>07</code>

<code>            </code><code>return</code> <code>true</code><code>;</code>

<code>08</code>

<code>        </code><code>}</code>

<code>09</code>

<code>    </code><code>}</code><code>else</code> <code>if</code> <code>(current == getexclusiveownerthread()) {</code>

<code>10</code>

<code>        </code><code>int</code> <code>nextc = c + acquires;</code>

<code>11</code>

<code>                </code><code>if</code> <code>(nextc &lt;</code><code>0</code><code>)</code><code>// overflow</code>

<code>12</code>

<code>            </code><code>throw</code> <code>new</code> <code>error(</code><code>"maximum lock count exceeded"</code><code>);</code>

<code>13</code>

<code>        </code><code>setstate(nextc);</code>

<code>14</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>15</code>

<code>    </code><code>}</code>

<code>16</code>

<code>    </code><code>return</code> <code>false</code><code>;</code>

<code>17</code>

<code>}</code>

上述邏輯主要包括:

如果目前狀态為初始狀态,那麼嘗試設定狀态;

如果狀态設定成功後就傳回;

如果狀态被設定,且擷取鎖的線程又是目前線程的時候,進行狀态的自增;

如果未設定成功狀态且目前線程不是擷取鎖的線程,那麼傳回失敗。

<code>protected</code> <code>final</code> <code>boolean</code> <code>tryacquire(</code><code>int</code> <code>acquires) {</code>

<code>        </code><code>if</code> <code>(!hasqueuedpredecessors() &amp;&amp; compareandsetstate(</code><code>0</code><code>, acquires)) {</code>

<code>        </code><code>if</code> <code>(nextc &lt;</code><code>0</code><code>)</code>

上述邏輯相比較非公平的擷取,僅加入了目前線程(node)之前是否有前置節點在等待的判斷。hasqueuedpredecessors()方法命名有些歧義,其實應該是currentthreadhasqueuedpredecessors()更為妥帖一些,也就是說目前面沒有人排在該節點(node)前面時候隊且能夠設定成功狀态,才能夠擷取鎖。

<code>protected</code> <code>final</code> <code>boolean</code> <code>tryrelease(</code><code>int</code> <code>releases) {</code>

<code>    </code><code>int</code> <code>c = getstate() - releases;</code>

<code>    </code><code>if</code> <code>(thread.currentthread() != getexclusiveownerthread())</code>

<code>        </code><code>throw</code> <code>new</code> <code>illegalmonitorstateexception();</code>

<code>    </code><code>boolean</code> <code>free =</code><code>false</code><code>;</code>

<code>        </code><code>free =</code><code>true</code><code>;</code>

<code>        </code><code>setexclusiveownerthread(</code><code>null</code><code>);</code>

<code>    </code><code>setstate(c);</code>

<code>    </code><code>return</code> <code>free;</code>

上述邏輯主要主要計算了釋放狀态後的值,如果為0則完全釋放,傳回true,反之僅是設定狀态,傳回false。

下面将主要的筆墨放在公平性和非公平性上,首先看一下二者測試的對比:

測試用例如下:

<code>public</code> <code>class</code> <code>reentrantlocktest {</code>

<code>    </code><code>private</code> <code>static</code> <code>lock fairlock =</code><code>new</code> <code>reentrantlock(</code><code>true</code><code>);</code>

<code>    </code><code>private</code> <code>static</code> <code>lock unfairlock =</code><code>new</code> <code>reentrantlock();</code>

<code>    </code><code>@test</code>

<code>    </code><code>public</code> <code>void</code> <code>fair() {</code>

<code>        </code><code>system.out.println(</code><code>"fair version"</code><code>);</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>5</code><code>; i++) {</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(fairlock));</code>

<code>            </code><code>thread.setname(</code><code>""</code> <code>+ i);</code>

<code>            </code><code>thread.start();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>thread.sleep(</code><code>5000</code><code>);</code>

<code>        </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>e.printstacktrace();</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>21</code>

<code>22</code>

<code>    </code><code>public</code> <code>void</code> <code>unfair() {</code>

<code>23</code>

<code>        </code><code>system.out.println(</code><code>"unfair version"</code><code>);</code>

<code>24</code>

<code>25</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(unfairlock));</code>

<code>26</code>

<code>27</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code>31</code>

<code>32</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>36</code>

<code>37</code>

<code>    </code><code>private</code> <code>static</code> <code>class</code> <code>job</code><code>implements</code> <code>runnable {</code>

<code>38</code>

<code>        </code><code>private</code> <code>lock lock;</code>

<code>39</code>

<code>        </code><code>public</code> <code>job(lock lock) {</code>

<code>40</code>

<code>            </code><code>this</code><code>.lock = lock;</code>

<code>41</code>

<code>42</code>

<code>43</code>

<code>        </code><code>@override</code>

<code>44</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>45</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>5</code><code>; i++) {</code>

<code>46</code>

<code>                </code><code>lock.lock();</code>

<code>47</code>

<code>                </code><code>try</code> <code>{</code>

<code>48</code>

<code>                    </code><code>system.out.println(</code><code>"lock by:"</code>

<code>49</code>

<code>                            </code><code>+ thread.currentthread().getname());</code>

<code>50</code>

<code>                </code><code>}</code><code>finally</code> <code>{</code>

<code>51</code>

<code>                    </code><code>lock.unlock();</code>

<code>52</code>

<code>                </code><code>}</code>

<code>53</code>

<code>            </code><code>}</code>

<code>54</code>

<code>55</code>

<code>56</code>

調用非公平的測試方法,傳回結果(部分):

unfair version

lock by:0

lock by:2

lock by:1

調用公平的測試方法,傳回結果:

fair version

lock by:3

lock by:4

仔細觀察傳回的結果(其中每個數字代表一個線程),非公平的結果一個線程連續擷取鎖的情況非常多,而公平的結果連續擷取的情況基本沒有。那麼在一個線程擷取了鎖的那一刻,究竟鎖的公平性會導緻鎖有什麼樣的處理邏輯呢?

通過之前的同步器(abstractqueuedsynchronizer)的介紹,在鎖上是存在一個等待隊列,sync隊列,我們通過複寫reentrantlock的擷取目前鎖的sync隊列,輸出在reentrantlock被擷取時刻,目前的sync隊列的狀态。

修改測試如下:

<code>    </code><code>private</code> <code>static</code> <code>lock fairlock =</code><code>new</code> <code>reentrantlock2(</code><code>true</code><code>);</code>

<code>    </code><code>private</code> <code>static</code> <code>lock unfairlock =</code><code>new</code> <code>reentrantlock2();</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(fairlock)) {</code>

<code>                </code><code>public</code> <code>string tostring() {</code>

<code>                    </code><code>return</code> <code>getname();</code>

<code>            </code><code>};</code>

<code>        </code><code>// sleep 5000ms</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(unfairlock)) {</code>

<code>                            </code><code>+ thread.currentthread().getname() +</code><code>" and "</code>

<code>                            </code><code>+ ((reentrantlock2) lock).getqueuedthreads()</code>

<code>                            </code><code>+</code><code>" waits."</code><code>);</code>

<code>57</code>

<code>    </code><code>private</code> <code>static</code> <code>class</code> <code>reentrantlock2</code><code>extends</code> <code>reentrantlock {</code>

<code>58</code>

<code>        </code><code>// constructor override</code>

<code>59</code>

<code>60</code>

<code>        </code><code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>serialversionuid = 1773716895097002072l;</code>

<code>61</code>

<code>62</code>

<code>        </code><code>public</code> <code>collection&lt;thread&gt; getqueuedthreads() {</code>

<code>63</code>

<code>            </code><code>return</code> <code>super</code><code>.getqueuedthreads();</code>

<code>64</code>

<code>65</code>

<code>66</code>

上述邏輯主要是通過構造reentrantlock2用來輸出在sync隊列中的線程内容,而且每個線程的tostring方法被重寫,這樣當一個線程擷取到鎖時,sync隊列裡的内容也就可以得知了,運作結果如下:

調用非公平方法,傳回結果:

lock by:0 and [] waits.

lock by:3 and [2, 1] waits.

lock by:3 and [4, 2, 1] waits.

lock by:3 and [0, 4, 2, 1] waits.

lock by:1 and [0, 4, 2] waits.

調用公平方法,傳回結果:

lock by:1 and [0, 4, 3, 2] waits.

lock by:2 and [1, 0, 4, 3] waits.

lock by:3 and [2, 1, 0, 4] waits.

lock by:4 and [3, 2, 1, 0] waits.

lock by:0 and [4, 3, 2, 1] waits.

可以明顯看出,在非公平擷取的過程中,“插隊”現象非常嚴重,後續擷取鎖的線程根本不顧及sync隊列中等待的線程,而是能擷取就擷取。反觀公平擷取的過程,鎖的擷取就類似線性化的,每次都由sync隊列中等待最長的線程(連結清單的第一個,sync隊列是由尾部結點添加,目前輸出的sync隊列是逆序輸出)擷取鎖。一個 hasqueuedpredecessors方法能夠獲得公平性的特性,這點實際上是由abstractqueuedsynchronizer來完成的,看一下acquire方法:

<code>1</code>

<code>public</code> <code>final</code> <code>void</code> <code>acquire(</code><code>int</code> <code>arg) {</code>

<code>2</code>

<code>    </code><code>if</code> <code>(!tryacquire(arg) &amp;&amp; acquirequeued(addwaiter(node.exclusive), arg))</code>

<code>3</code>

<code>        </code><code>selfinterrupt();</code>

<code>4</code>

可以看到,如果擷取狀态和在sync隊列中排隊是短路的判斷,也就是說如果tryacquire成功,那麼是不會進入sync隊列的,可以通過下圖來深刻的認識公平性和abstractqueuedsynchronizer的擷取過程。

非公平的,或者說預設的擷取方式如下圖所示:

ReentrantLock(重入鎖)以及公平性簡介實作分析

對于狀态的擷取,可以快速的通過tryacquire的成功,也就是黃色的fast路線,也可以由于tryacquire的失敗,構造節點,進入sync隊列中排序後再次擷取。是以可以了解為fast就是一個快速通道,當例子中的線程釋放鎖之後,快速的通過fast通道再次擷取鎖,就算目前sync隊列中有排隊等待的線程也會被忽略。這種模式,可以保證進入和退出鎖的吞吐量,但是sync隊列中過早排隊的線程會一直處于阻塞狀态,造成“饑餓”場景。

而公平性鎖,就是在tryacquire的調用中顧及目前sync隊列中的等待節點(廢棄了fast通道),也就是任意請求都需要按照sync隊列中既有的順序進行,先到先得。這樣很好的確定了公平性,但是可以從結果中看到,吞吐量就沒有非公平的鎖高了。