天天看点

ReentrantLock(重入锁)以及公平性简介实现分析

dedicate to molly.

<b>reentrantlock</b>的实现不仅可以替代隐式的<b>synchronized</b>关键字,而且能够提供超过关键字本身的多种功能。

这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的,也就是说等待时间最长的线程最有机会获取锁,也可以说锁的获取是有序的。reentrantlock这个锁提供了一个构造函数,能够控制这个锁是否是公平的。

而锁的名字也是说明了这个锁具备了重复进入的可能,也就是说能够让当前线程多次的进行对锁的获取操作,这样的最大次数限制是integer.max_value,约21亿次左右。

事实上公平的锁机制往往没有非公平的效率高,因为公平的获取锁没有考虑到操作系统对线程的调度因素,这样造成jvm对于等待中的线程调度次序和操作系统对线程的调度之间的不匹配。对于锁的快速且重复的获取过程中,连续获取的概率是非常高的,而公平锁会压制这种情况,虽然公平性得以保障,但是响应比却下降了,但是并不是任何场景都是以tps作为唯一指标的,因为公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

在reentrantlock中,对于公平和非公平的定义是通过对同步器abstractqueuedsynchronizer的扩展加以实现的,也就是在tryacquire的实现上做了语义的控制。

<code>01</code>

<code>final</code> <code>boolean</code> <code>nonfairtryacquire(</code><code>int</code> <code>acquires) {</code>

<code>02</code>

<code>    </code><code>final</code> <code>thread current = thread.currentthread();</code>

<code>03</code>

<code>    </code><code>int</code> <code>c = getstate();</code>

<code>04</code>

<code>    </code><code>if</code> <code>(c ==</code><code>0</code><code>) {</code>

<code>05</code>

<code>        </code><code>if</code> <code>(compareandsetstate(</code><code>0</code><code>, acquires)) {</code>

<code>06</code>

<code>            </code><code>setexclusiveownerthread(current);</code>

<code>07</code>

<code>            </code><code>return</code> <code>true</code><code>;</code>

<code>08</code>

<code>        </code><code>}</code>

<code>09</code>

<code>    </code><code>}</code><code>else</code> <code>if</code> <code>(current == getexclusiveownerthread()) {</code>

<code>10</code>

<code>        </code><code>int</code> <code>nextc = c + acquires;</code>

<code>11</code>

<code>                </code><code>if</code> <code>(nextc &lt;</code><code>0</code><code>)</code><code>// overflow</code>

<code>12</code>

<code>            </code><code>throw</code> <code>new</code> <code>error(</code><code>"maximum lock count exceeded"</code><code>);</code>

<code>13</code>

<code>        </code><code>setstate(nextc);</code>

<code>14</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>15</code>

<code>    </code><code>}</code>

<code>16</code>

<code>    </code><code>return</code> <code>false</code><code>;</code>

<code>17</code>

<code>}</code>

上述逻辑主要包括:

如果当前状态为初始状态,那么尝试设置状态;

如果状态设置成功后就返回;

如果状态被设置,且获取锁的线程又是当前线程的时候,进行状态的自增;

如果未设置成功状态且当前线程不是获取锁的线程,那么返回失败。

<code>protected</code> <code>final</code> <code>boolean</code> <code>tryacquire(</code><code>int</code> <code>acquires) {</code>

<code>        </code><code>if</code> <code>(!hasqueuedpredecessors() &amp;&amp; compareandsetstate(</code><code>0</code><code>, acquires)) {</code>

<code>        </code><code>if</code> <code>(nextc &lt;</code><code>0</code><code>)</code>

上述逻辑相比较非公平的获取,仅加入了当前线程(node)之前是否有前置节点在等待的判断。hasqueuedpredecessors()方法命名有些歧义,其实应该是currentthreadhasqueuedpredecessors()更为妥帖一些,也就是说当前面没有人排在该节点(node)前面时候队且能够设置成功状态,才能够获取锁。

<code>protected</code> <code>final</code> <code>boolean</code> <code>tryrelease(</code><code>int</code> <code>releases) {</code>

<code>    </code><code>int</code> <code>c = getstate() - releases;</code>

<code>    </code><code>if</code> <code>(thread.currentthread() != getexclusiveownerthread())</code>

<code>        </code><code>throw</code> <code>new</code> <code>illegalmonitorstateexception();</code>

<code>    </code><code>boolean</code> <code>free =</code><code>false</code><code>;</code>

<code>        </code><code>free =</code><code>true</code><code>;</code>

<code>        </code><code>setexclusiveownerthread(</code><code>null</code><code>);</code>

<code>    </code><code>setstate(c);</code>

<code>    </code><code>return</code> <code>free;</code>

上述逻辑主要主要计算了释放状态后的值,如果为0则完全释放,返回true,反之仅是设置状态,返回false。

下面将主要的笔墨放在公平性和非公平性上,首先看一下二者测试的对比:

测试用例如下:

<code>public</code> <code>class</code> <code>reentrantlocktest {</code>

<code>    </code><code>private</code> <code>static</code> <code>lock fairlock =</code><code>new</code> <code>reentrantlock(</code><code>true</code><code>);</code>

<code>    </code><code>private</code> <code>static</code> <code>lock unfairlock =</code><code>new</code> <code>reentrantlock();</code>

<code>    </code><code>@test</code>

<code>    </code><code>public</code> <code>void</code> <code>fair() {</code>

<code>        </code><code>system.out.println(</code><code>"fair version"</code><code>);</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>5</code><code>; i++) {</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(fairlock));</code>

<code>            </code><code>thread.setname(</code><code>""</code> <code>+ i);</code>

<code>            </code><code>thread.start();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>thread.sleep(</code><code>5000</code><code>);</code>

<code>        </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>e.printstacktrace();</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>21</code>

<code>22</code>

<code>    </code><code>public</code> <code>void</code> <code>unfair() {</code>

<code>23</code>

<code>        </code><code>system.out.println(</code><code>"unfair version"</code><code>);</code>

<code>24</code>

<code>25</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(unfairlock));</code>

<code>26</code>

<code>27</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code>31</code>

<code>32</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>36</code>

<code>37</code>

<code>    </code><code>private</code> <code>static</code> <code>class</code> <code>job</code><code>implements</code> <code>runnable {</code>

<code>38</code>

<code>        </code><code>private</code> <code>lock lock;</code>

<code>39</code>

<code>        </code><code>public</code> <code>job(lock lock) {</code>

<code>40</code>

<code>            </code><code>this</code><code>.lock = lock;</code>

<code>41</code>

<code>42</code>

<code>43</code>

<code>        </code><code>@override</code>

<code>44</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>45</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>5</code><code>; i++) {</code>

<code>46</code>

<code>                </code><code>lock.lock();</code>

<code>47</code>

<code>                </code><code>try</code> <code>{</code>

<code>48</code>

<code>                    </code><code>system.out.println(</code><code>"lock by:"</code>

<code>49</code>

<code>                            </code><code>+ thread.currentthread().getname());</code>

<code>50</code>

<code>                </code><code>}</code><code>finally</code> <code>{</code>

<code>51</code>

<code>                    </code><code>lock.unlock();</code>

<code>52</code>

<code>                </code><code>}</code>

<code>53</code>

<code>            </code><code>}</code>

<code>54</code>

<code>55</code>

<code>56</code>

调用非公平的测试方法,返回结果(部分):

unfair version

lock by:0

lock by:2

lock by:1

调用公平的测试方法,返回结果:

fair version

lock by:3

lock by:4

仔细观察返回的结果(其中每个数字代表一个线程),非公平的结果一个线程连续获取锁的情况非常多,而公平的结果连续获取的情况基本没有。那么在一个线程获取了锁的那一刻,究竟锁的公平性会导致锁有什么样的处理逻辑呢?

通过之前的同步器(abstractqueuedsynchronizer)的介绍,在锁上是存在一个等待队列,sync队列,我们通过复写reentrantlock的获取当前锁的sync队列,输出在reentrantlock被获取时刻,当前的sync队列的状态。

修改测试如下:

<code>    </code><code>private</code> <code>static</code> <code>lock fairlock =</code><code>new</code> <code>reentrantlock2(</code><code>true</code><code>);</code>

<code>    </code><code>private</code> <code>static</code> <code>lock unfairlock =</code><code>new</code> <code>reentrantlock2();</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(fairlock)) {</code>

<code>                </code><code>public</code> <code>string tostring() {</code>

<code>                    </code><code>return</code> <code>getname();</code>

<code>            </code><code>};</code>

<code>        </code><code>// sleep 5000ms</code>

<code>            </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>job(unfairlock)) {</code>

<code>                            </code><code>+ thread.currentthread().getname() +</code><code>" and "</code>

<code>                            </code><code>+ ((reentrantlock2) lock).getqueuedthreads()</code>

<code>                            </code><code>+</code><code>" waits."</code><code>);</code>

<code>57</code>

<code>    </code><code>private</code> <code>static</code> <code>class</code> <code>reentrantlock2</code><code>extends</code> <code>reentrantlock {</code>

<code>58</code>

<code>        </code><code>// constructor override</code>

<code>59</code>

<code>60</code>

<code>        </code><code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>serialversionuid = 1773716895097002072l;</code>

<code>61</code>

<code>62</code>

<code>        </code><code>public</code> <code>collection&lt;thread&gt; getqueuedthreads() {</code>

<code>63</code>

<code>            </code><code>return</code> <code>super</code><code>.getqueuedthreads();</code>

<code>64</code>

<code>65</code>

<code>66</code>

上述逻辑主要是通过构造reentrantlock2用来输出在sync队列中的线程内容,而且每个线程的tostring方法被重写,这样当一个线程获取到锁时,sync队列里的内容也就可以得知了,运行结果如下:

调用非公平方法,返回结果:

lock by:0 and [] waits.

lock by:3 and [2, 1] waits.

lock by:3 and [4, 2, 1] waits.

lock by:3 and [0, 4, 2, 1] waits.

lock by:1 and [0, 4, 2] waits.

调用公平方法,返回结果:

lock by:1 and [0, 4, 3, 2] waits.

lock by:2 and [1, 0, 4, 3] waits.

lock by:3 and [2, 1, 0, 4] waits.

lock by:4 and [3, 2, 1, 0] waits.

lock by:0 and [4, 3, 2, 1] waits.

可以明显看出,在非公平获取的过程中,“插队”现象非常严重,后续获取锁的线程根本不顾及sync队列中等待的线程,而是能获取就获取。反观公平获取的过程,锁的获取就类似线性化的,每次都由sync队列中等待最长的线程(链表的第一个,sync队列是由尾部结点添加,当前输出的sync队列是逆序输出)获取锁。一个 hasqueuedpredecessors方法能够获得公平性的特性,这点实际上是由abstractqueuedsynchronizer来完成的,看一下acquire方法:

<code>1</code>

<code>public</code> <code>final</code> <code>void</code> <code>acquire(</code><code>int</code> <code>arg) {</code>

<code>2</code>

<code>    </code><code>if</code> <code>(!tryacquire(arg) &amp;&amp; acquirequeued(addwaiter(node.exclusive), arg))</code>

<code>3</code>

<code>        </code><code>selfinterrupt();</code>

<code>4</code>

可以看到,如果获取状态和在sync队列中排队是短路的判断,也就是说如果tryacquire成功,那么是不会进入sync队列的,可以通过下图来深刻的认识公平性和abstractqueuedsynchronizer的获取过程。

非公平的,或者说默认的获取方式如下图所示:

ReentrantLock(重入锁)以及公平性简介实现分析

对于状态的获取,可以快速的通过tryacquire的成功,也就是黄色的fast路线,也可以由于tryacquire的失败,构造节点,进入sync队列中排序后再次获取。因此可以理解为fast就是一个快速通道,当例子中的线程释放锁之后,快速的通过fast通道再次获取锁,就算当前sync队列中有排队等待的线程也会被忽略。这种模式,可以保证进入和退出锁的吞吐量,但是sync队列中过早排队的线程会一直处于阻塞状态,造成“饥饿”场景。

而公平性锁,就是在tryacquire的调用中顾及当前sync队列中的等待节点(废弃了fast通道),也就是任意请求都需要按照sync队列中既有的顺序进行,先到先得。这样很好的确保了公平性,但是可以从结果中看到,吞吐量就没有非公平的锁高了。