天天看點

Slipped Conditions

所謂slipped conditions,就是說, 從一個線程檢查某一特定條件到該線程操作此條件期間,這個條件已經被其它線程改變,導緻第一個線程在該條件上執行了錯誤的操作。這裡有一個簡單的例子:

<code>01</code>

<code>public</code> <code>class</code> <code>lock {</code>

<code>02</code>

<code>    </code><code>private</code> <code>boolean</code> <code>islocked =</code><code>true</code><code>;</code>

<code>03</code>

<code>04</code>

<code>    </code><code>public</code> <code>void</code> <code>lock(){</code>

<code>05</code>

<code>      </code><code>synchronized</code><code>(</code><code>this</code><code>){</code>

<code>06</code>

<code>        </code><code>while</code><code>(islocked){</code>

<code>07</code>

<code>          </code><code>try</code><code>{</code>

<code>08</code>

<code>            </code><code>this</code><code>.wait();</code>

<code>09</code>

<code>          </code><code>}</code><code>catch</code><code>(interruptedexception e){</code>

<code>10</code>

<code>            </code><code>//do nothing, keep waiting</code>

<code>11</code>

<code>          </code><code>}</code>

<code>12</code>

<code>        </code><code>}</code>

<code>13</code>

<code>      </code><code>}</code>

<code>14</code>

<code>15</code>

<code>16</code>

<code>        </code><code>islocked =</code><code>true</code><code>;</code>

<code>17</code>

<code>18</code>

<code>    </code><code>}</code>

<code>19</code>

<code>20</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>unlock(){</code>

<code>21</code>

<code>      </code><code>islocked =</code><code>false</code><code>;</code>

<code>22</code>

<code>      </code><code>this</code><code>.notify();</code>

<code>23</code>

<code>24</code>

<code>}</code>

我們可以看到,lock()方法包含了兩個同步塊。第一個同步塊執行wait操作直到islocked變為false才退出,第二個同步塊将islocked置為true,以此來鎖住這個lock執行個體避免其它線程通過lock()方法。

我們可以設想一下,假如在某個時刻islocked為false, 這個時候,有兩個線程同時通路lock方法。如果第一個線程先進入第一個同步塊,這個時候它會發現islocked為false,若此時允許第二個線程執行,它也進入第一個同步塊,同樣發現islocked是false。現在兩個線程都檢查了這個條件為false,然後它們都會繼續進入第二個同步塊中并設定islocked為true。

這個場景就是slipped conditions的例子,兩個線程檢查同一個條件, 然後退出同步塊,是以在這兩個線程改變條件之前,就允許其它線程來檢查這個條件。換句話說,條件被某個線程檢查到該條件被此線程改變期間,這個條件已經被其它線程改變過了。

為避免slipped conditions,條件的檢查與設定必須是原子的,也就是說,在第一個線程檢查和設定條件期間,不會有其它線程檢查這個條件。

解決上面問題的方法很簡單,隻是簡單的把islocked = true這行代碼移到第一個同步塊中,放在while循環後面即可:

現在檢查和設定islocked條件是在同一個同步塊中原子地執行了。

也許你會說,我才不可能寫這麼挫的代碼,還覺得slipped conditions是個相當理論的問題。但是第一個簡單的例子隻是用來更好的展示slipped conditions。

<code>//fair lock implementation with nested monitor lockout problem</code>

<code>public</code> <code>class</code> <code>fairlock {</code>

<code>  </code><code>private</code> <code>boolean</code> <code>islocked =</code><code>false</code><code>;</code>

<code>  </code><code>private</code> <code>thread lockingthread =</code><code>null</code><code>;</code>

<code>  </code><code>private</code> <code>list waitingthreads =</code>

<code>            </code><code>new</code> <code>arraylist();</code>

<code>  </code><code>public</code> <code>void</code> <code>lock()</code><code>throws</code> <code>interruptedexception{</code>

<code>    </code><code>queueobject queueobject =</code><code>new</code> <code>queueobject();</code>

<code>    </code><code>synchronized</code><code>(</code><code>this</code><code>){</code>

<code>      </code><code>waitingthreads.add(queueobject);</code>

<code>      </code><code>while</code><code>(islocked || waitingthreads.get(</code><code>0</code><code>) != queueobject){</code>

<code>        </code><code>synchronized</code><code>(queueobject){</code>

<code>            </code><code>queueobject.wait();</code>

<code>            </code><code>waitingthreads.remove(queueobject);</code>

<code>            </code><code>throw</code> <code>e;</code>

<code>25</code>

<code>      </code><code>waitingthreads.remove(queueobject);</code>

<code>26</code>

<code>      </code><code>islocked =</code><code>true</code><code>;</code>

<code>27</code>

<code>      </code><code>lockingthread = thread.currentthread();</code>

<code>28</code>

<code>29</code>

<code>  </code><code>}</code>

<code>30</code>

<code>31</code>

<code>  </code><code>public</code> <code>synchronized</code> <code>void</code> <code>unlock(){</code>

<code>32</code>

<code>    </code><code>if</code><code>(</code><code>this</code><code>.lockingthread != thread.currentthread()){</code>

<code>33</code>

<code>      </code><code>throw</code> <code>new</code> <code>illegalmonitorstateexception(</code>

<code>34</code>

<code>        </code><code>"calling thread has not locked this lock"</code><code>);</code>

<code>35</code>

<code>36</code>

<code>    </code><code>islocked      =</code><code>false</code><code>;</code>

<code>37</code>

<code>    </code><code>lockingthread =</code><code>null</code><code>;</code>

<code>38</code>

<code>    </code><code>if</code><code>(waitingthreads.size() &gt;</code><code>0</code><code>){</code>

<code>39</code>

<code>      </code><code>queueobject queueobject = waitingthread.get(</code><code>0</code><code>);</code>

<code>40</code>

<code>      </code><code>synchronized</code><code>(queueobject){</code>

<code>41</code>

<code>        </code><code>queueobject.notify();</code>

<code>42</code>

<code>43</code>

<code>44</code>

<code>45</code>

<code>1</code>

<code>public</code> <code>class</code> <code>queueobject {}</code>

我們可以看到synchronized(queueobject)及其中的queueobject.wait()調用是嵌在synchronized(this)塊裡面的,這會導緻嵌套管程鎖死問題。為避免這個問題,我們必須将synchronized(queueobject)塊移出synchronized(this)塊。移出來之後的代碼可能是這樣的:

<code>//fair lock implementation with slipped conditions problem</code>

<code>  </code><code>private</code> <code>thread lockingthread  =</code><code>null</code><code>;</code>

<code>    </code><code>boolean</code> <code>mustwait =</code><code>true</code><code>;</code>

<code>    </code><code>while</code><code>(mustwait){</code>

<code>        </code><code>mustwait = islocked || waitingthreads.get(</code><code>0</code><code>) != queueobject;</code>

<code>        </code><code>if</code><code>(mustwait){</code>

注意:因為我隻改動了lock()方法,這裡隻展現了lock方法。

現在lock()方法包含了3個同步塊。

第一個,synchronized(this)塊通過mustwait = islocked || waitingthreads.get(0) != queueobject檢查内部變量的值。

第二個,synchronized(queueobject)塊檢查線程是否需要等待。也有可能其它線程在這個時候已經解鎖了,但我們暫時不考慮這個問題。我們就假設這個鎖處在解鎖狀态,是以線程會立馬退出synchronized(queueobject)塊。

第三個,synchronized(this)塊隻會在mustwait為false的時候執行。它将islocked重新設回true,然後離開lock()方法。

設想一下,在鎖處于解鎖狀态時,如果有兩個線程同時調用lock()方法會發生什麼。首先,線程1會檢查到islocked為false,然後線程2同樣檢查到islocked為false。接着,它們都不會等待,都會去設定islocked為true。這就是slipped conditions的一個最好的例子。

要解決上面例子中的slipped conditions問題,最後一個synchronized(this)塊中的代碼必須向上移到第一個同步塊中。為适應這種變動,代碼需要做點小改動。下面是改動過的代碼:

<code>//fair lock implementation without nested monitor lockout problem,</code>

<code>//but with missed signals problem.</code>

<code>        </code><code>if</code><code>(!mustwait){</code>

<code>          </code><code>waitingthreads.remove(queueobject);</code>

<code>          </code><code>islocked =</code><code>true</code><code>;</code>

<code>          </code><code>lockingthread = thread.currentthread();</code>

<code>          </code><code>return</code><code>;</code>

<code>      </code><code>}    </code>

我們可以看到對局部變量mustwait的檢查與指派是在同一個同步塊中完成的。還可以看到,即使在synchronized(this)塊外面檢查了mustwait,在while(mustwait)子句中,mustwait變量從來沒有在synchronized(this)同步塊外被指派。當一個線程檢查到mustwait是false的時候,它将自動設定内部的條件(islocked),是以其它線程再來檢查這個條件的時候,它們就會發現這個條件的值現在為true了。

synchronized(this)塊中的<code>return;</code>語句不是必須的。這隻是個小小的優化。如果一個線程肯定不會等待(即mustwait為false),那麼就沒必要讓它進入到synchronized(queueobject)同步塊中和執行if(mustwait)子句了。

細心的讀者可能會注意到上面的公平鎖實作仍然有可能丢失信号。設想一下,當該fairlock執行個體處于鎖定狀态時,有個線程來調用lock()方法。執行完第一個 synchronized(this)塊後,mustwait變量的值為true。再設想一下調用lock()的線程是通過搶占式的,擁有鎖的那個線程那個線程此時調用了unlock()方法,但是看下之前的unlock()的實作你會發現,它調用了queueobject.notify()。但是,因為lock()中的線程還沒有來得及調用queueobject.wait(),是以queueobject.notify()調用也就沒有作用了,信号就丢失掉了。如果調用lock()的線程在另一個線程調用queueobject.notify()之後調用queueobject.wait(),這個線程會一直阻塞到其它線程調用unlock方法為止,但這永遠也不會發生。