天天看點

AbstractQueuedSynchronizer的介紹和原理分析(轉)簡介 API說明 示例 實作分析 一個例子

提供了一個基于FIFO隊列,可以用于建構鎖或者其他相關同步裝置的基礎架構。該同步器(以下簡稱同步器)利用了一個int來表示狀态,期望它能夠成為實作大部分同步需求的基礎。使用的方法是繼承,子類通過繼承同步器并需要實作它的方法來管理其狀态,管理的方式就是通過類似acquire和release的方式來操縱狀态。然而多線程環境中對狀态的操縱必須確定原子性,是以子類對于狀态的把握,需要使用這個同步器提供的以下三個方法對狀态進行操作:

java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()

java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)

java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類推薦被定義為自定義同步裝置的内部類,同步器自身沒有實作任何同步接口,它僅僅是定義了若幹acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當它被定義為一個排他模式時,其他線程對其的擷取就被阻止,而共享模式對于多個線程擷取都可以成功。

同步器是實作鎖的關鍵,利用同步器将鎖的語義實作,然後在鎖的實作中聚合同步器。可以這樣了解:鎖的API是面向使用者的,它定義了與鎖互動的公共行為,而每個鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個線程進行加鎖,排除兩個以上的線程),但是實作是依托給同步器來完成;同步器面向的是線程通路和資源控制,它定義了線程對資源是否能夠擷取以及線程的排隊等操作。鎖和同步器很好的隔離了二者所需要關注的領域,嚴格意義上講,同步器可以适用于除了鎖以外的其他同步設施上(包括鎖)。

同步器的開始提到了其實作依賴于一個FIFO隊列,那麼隊列中的元素Node就是儲存着線程引用和線程狀态的容器,每個線程對同步器的通路,都可以看做是隊列中的一個節點。Node的主要包含以下成員變量:

<code>1</code>

<code>Node {</code>

<code>2</code>

<code>    </code><code>int</code> <code>waitStatus;</code>

<code>3</code>

<code>    </code><code>Node prev;</code>

<code>4</code>

<code>    </code><code>Node next;</code>

<code>5</code>

<code>    </code><code>Node nextWaiter;</code>

<code>6</code>

<code>    </code><code>Thread thread;</code>

<code>7</code>

<code>}</code>

以上五個成員變量主要負責儲存該節點的線程引用,同步等待隊列(以下簡稱sync隊列)的前驅和後繼節點,同時也包括了同步狀态。

屬性名稱

描述

int waitStatus

表示節點的狀态。其中包含的狀态有:

CANCELLED,值為1,表示目前的線程被取消;

SIGNAL,值為-1,表示目前節點的後繼節點包含的線程需要運作,也就是unpark;

CONDITION,值為-2,表示目前節點在等待condition,也就是在condition隊列中;

PROPAGATE,值為-3,表示目前場景下後續的acquireShared能夠得以執行;

值為0,表示目前節點在sync隊列中,等待着擷取鎖。

Node prev

前驅節點,比如目前節點被取消,那就需要前驅節點和後繼節點來完成連接配接。

Node next

後繼節點。

Node nextWaiter

存儲condition隊列中的後繼節點。

Thread thread

入隊列時的目前線程。

節點成為sync隊列和condition隊列建構的基礎,在同步器中就包含了sync隊列。同步器擁有三個成員變量:sync隊列的頭結點head、sync隊列的尾節點tail和狀态state。對于鎖的擷取,請求形成節點,将其挂載在尾部,而鎖資源的轉移(釋放再擷取)是從頭部開始向後進行。對于同步器維護的狀态state,多個線程對其的擷取将會産生一個鍊式的結構。

AbstractQueuedSynchronizer的介紹和原理分析(轉)簡介 API說明 示例 實作分析 一個例子

實作自定義同步器時,需要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀态的變遷。

方法名稱

protected boolean tryAcquire(int arg)

排它的擷取這個狀态。這個方法的實作需要查詢目前狀态是否允許擷取,然後再進行擷取(使用compareAndSetState來做)狀态。

protected boolean tryRelease(int arg) 

釋放狀态。

protected int tryAcquireShared(int arg)

共享的模式下擷取狀态。

protected boolean tryReleaseShared(int arg)

共享的模式下釋放狀态。

protected boolean isHeldExclusively()

在排它模式下,狀态是否被占用。

實作這些方法必須是非阻塞而且是線程安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設定目前的線程。

開始提到同步器内部基于一個FIFO隊列,對于一個獨占鎖的擷取和釋放有以下僞碼可以表示。

擷取一個排他鎖。

<code>01</code>

<code>while</code><code>(擷取鎖) {</code>

<code>02</code>

<code>    </code><code>if</code> <code>(擷取到) {</code>

<code>03</code>

<code>        </code><code>退出</code><code>while</code><code>循環</code>

<code>04</code>

<code>    </code><code>} </code><code>else</code> <code>{</code>

<code>05</code>

<code>        </code><code>if</code><code>(目前線程沒有入隊列) {</code>

<code>06</code>

<code>            </code><code>那麼入隊列</code>

<code>07</code>

<code>        </code><code>}</code>

<code>08</code>

<code>        </code><code>阻塞目前線程</code>

<code>09</code>

<code>    </code><code>}</code>

<code>10</code>

釋放一個排他鎖。

<code>if</code> <code>(釋放成功) {</code>

<code>    </code><code>删除頭結點</code>

<code>    </code><code>激活原頭結點的後繼節點</code>

下面通過一個排它鎖的例子來深入了解一下同步器的工作原理,而隻有掌握同步器的工作原理才能夠更加深入了解其他的并發元件。

排他鎖的實作,一次隻能一個線程擷取到鎖。

<code>class</code> <code>Mutex </code><code>implements</code> <code>Lock, java.io.Serializable {</code>

<code>   </code><code>// 内部類,自定義同步器</code>

<code>   </code><code>private</code> <code>static</code> <code>class</code> <code>Sync </code><code>extends</code> <code>AbstractQueuedSynchronizer {</code>

<code>     </code><code>// 是否處于占用狀态</code>

<code>     </code><code>protected</code> <code>boolean</code> <code>isHeldExclusively() {</code>

<code>       </code><code>return</code> <code>getState() == </code><code>1</code><code>;</code>

<code>     </code><code>}</code>

<code>     </code><code>// 當狀态為0的時候擷取鎖</code>

<code>     </code><code>public</code> <code>boolean</code> <code>tryAcquire(</code><code>int</code> <code>acquires) {</code>

<code>       </code><code>assert</code> <code>acquires == </code><code>1</code><code>; </code><code>// Otherwise unused</code>

<code>11</code>

<code>       </code><code>if</code> <code>(compareAndSetState(</code><code>0</code><code>, </code><code>1</code><code>)) {</code>

<code>12</code>

<code>         </code><code>setExclusiveOwnerThread(Thread.currentThread());</code>

<code>13</code>

<code>         </code><code>return</code> <code>true</code><code>;</code>

<code>14</code>

<code>       </code><code>}</code>

<code>15</code>

<code>       </code><code>return</code> <code>false</code><code>;</code>

<code>16</code>

<code>17</code>

<code>     </code><code>// 釋放鎖,将狀态設定為0</code>

<code>18</code>

<code>     </code><code>protected</code> <code>boolean</code> <code>tryRelease(</code><code>int</code> <code>releases) {</code>

<code>19</code>

<code>       </code><code>assert</code> <code>releases == </code><code>1</code><code>; </code><code>// Otherwise unused</code>

<code>20</code>

<code>       </code><code>if</code> <code>(getState() == </code><code>0</code><code>) </code><code>throw</code> <code>new</code> <code>IllegalMonitorStateException();</code>

<code>21</code>

<code>       </code><code>setExclusiveOwnerThread(</code><code>null</code><code>);</code>

<code>22</code>

<code>       </code><code>setState(</code><code>0</code><code>);</code>

<code>23</code>

<code>       </code><code>return</code> <code>true</code><code>;</code>

<code>24</code>

<code>25</code>

<code>     </code><code>// 傳回一個Condition,每個condition都包含了一個condition隊列</code>

<code>26</code>

<code>     </code><code>Condition newCondition() { </code><code>return</code> <code>new</code> <code>ConditionObject(); }</code>

<code>27</code>

<code>   </code><code>}</code>

<code>28</code>

<code>   </code><code>// 僅需要将操作代理到Sync上即可</code>

<code>29</code>

<code>   </code><code>private</code> <code>final</code> <code>Sync sync = </code><code>new</code> <code>Sync();</code>

<code>30</code>

<code>   </code><code>public</code> <code>void</code> <code>lock()                { sync.acquire(</code><code>1</code><code>); }</code>

<code>31</code>

<code>   </code><code>public</code> <code>boolean</code> <code>tryLock()          { </code><code>return</code> <code>sync.tryAcquire(</code><code>1</code><code>); }</code>

<code>32</code>

<code>   </code><code>public</code> <code>void</code> <code>unlock()              { sync.release(</code><code>1</code><code>); }</code>

<code>33</code>

<code>   </code><code>public</code> <code>Condition newCondition()   { </code><code>return</code> <code>sync.newCondition(); }</code>

<code>34</code>

<code>   </code><code>public</code> <code>boolean</code> <code>isLocked()         { </code><code>return</code> <code>sync.isHeldExclusively(); }</code>

<code>35</code>

<code>   </code><code>public</code> <code>boolean</code> <code>hasQueuedThreads() { </code><code>return</code> <code>sync.hasQueuedThreads(); }</code>

<code>36</code>

<code>   </code><code>public</code> <code>void</code> <code>lockInterruptibly() </code><code>throws</code> <code>InterruptedException {</code>

<code>37</code>

<code>     </code><code>sync.acquireInterruptibly(</code><code>1</code><code>);</code>

<code>38</code>

<code>39</code>

<code>   </code><code>public</code> <code>boolean</code> <code>tryLock(</code><code>long</code> <code>timeout, TimeUnit unit)</code>

<code>40</code>

<code>       </code><code>throws</code> <code>InterruptedException {</code>

<code>41</code>

<code>     </code><code>return</code> <code>sync.tryAcquireNanos(</code><code>1</code><code>, unit.toNanos(timeout));</code>

<code>42</code>

<code>43</code>

<code> </code><code>}</code>

可以看到Mutex将Lock接口均代理給了同步器的實作。

使用方将Mutex構造出來之後,調用lock擷取鎖,調用unlock進行解鎖。下面以Mutex為例子,詳細分析以下同步器的實作邏輯。

該方法以排他的方式擷取鎖,對中斷不敏感,完成synchronized語義。

<code>public</code> <code>final</code> <code>void</code> <code>acquire(</code><code>int</code> <code>arg) {</code>

<code>        </code><code>if</code> <code>(!tryAcquire(arg) &amp;amp;&amp;amp;</code>

<code>            </code><code>acquireQueued(addWaiter(Node.EXCLUSIVE), arg))</code>

<code>            </code><code>selfInterrupt();</code>

上述邏輯主要包括:

1. 嘗試擷取(調用tryAcquire更改狀态,需要保證原子性);

在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證隻有一個線程能夠對狀态進行成功修改,而沒有成功修改的線程将進入sync隊列排隊。

2. 如果擷取不到,将目前線程構造成節點Node并加入sync隊列;

進入隊列的每個線程都是一個節點Node,進而形成了一個雙向隊列,類似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規模(也就是兩個節點左右)。

3. 再次嘗試擷取,如果沒有擷取到那麼将目前線程從線程排程器上摘下,進入等待狀态。

使用LockSupport将目前線程unpark,關于LockSupport後續會詳細介紹。

<code>private</code> <code>Node addWaiter(Node mode) {</code>

<code>    </code><code>Node node = </code><code>new</code> <code>Node(Thread.currentThread(), mode);</code>

<code>    </code><code>// 快速嘗試在尾部添加</code>

<code>    </code><code>Node pred = tail;</code>

<code>    </code><code>if</code> <code>(pred != </code><code>null</code><code>) {</code>

<code>        </code><code>node.prev = pred;</code>

<code>        </code><code>if</code> <code>(compareAndSetTail(pred, node)) {</code>

<code>            </code><code>pred.next = node;</code>

<code>            </code><code>return</code> <code>node;</code>

<code>    </code><code>enq(node);</code>

<code>    </code><code>return</code> <code>node;</code>

<code>private</code> <code>Node enq(</code><code>final</code> <code>Node node) {</code>

<code>    </code><code>for</code> <code>(;;) {</code>

<code>        </code><code>Node t = tail;</code>

<code>        </code><code>if</code> <code>(t == </code><code>null</code><code>) { </code><code>// Must initialize</code>

<code>            </code><code>if</code> <code>(compareAndSetHead(</code><code>new</code> <code>Node()))</code>

<code>                </code><code>tail = head;</code>

<code>        </code><code>} </code><code>else</code> <code>{</code>

<code>            </code><code>node.prev = t;</code>

<code>            </code><code>if</code> <code>(compareAndSetTail(t, node)) {</code>

<code>            </code><code>t.next = node;</code>

<code>            </code><code>return</code> <code>t;</code>

1. 使用目前線程構造Node;

對于一個節點需要做的是将當節點前驅節點指向尾節點(current.prev = tail),尾節點指向它(tail = current),原有的尾節點的後繼節點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節點的設定來保證的,也就是compareAndSetTail來完成的。

2. 先行嘗試在隊尾添加;

如果尾節點已經有了,然後做如下操作:

(1)配置設定引用T指向尾節點;

(2)将節點的前驅節點更新為尾節點(current.prev = tail);

(3)如果尾節點是T,那麼将當尾節點設定為該節點(tail = current,原子更新);

(4)T的後繼節點指向目前節點(T.next = current)。

注意第3點是要求原子的。

這樣可以以最短路徑O(1)的效果來完成線程入隊,是最大化減少開銷的一種方式。

3. 如果隊尾添加失敗或者是第一個入隊的節點。

如果是第1個節點,也就是sync隊列沒有初始化,那麼會進入到enq這個方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都将進入enq這個方法。

可以看到enq的邏輯是確定進入的Node都會有機會順序的添加到sync隊列中,而加入的步驟如下:

(1)如果尾節點為空,那麼原子化的配置設定一個頭節點,并将尾節點指向頭節點,這一步是初始化;

(2)然後是重複在addWaiter中做的工作,但是在一個while(true)的循環中,直到目前節點入隊為止。

進入sync隊列之後,接下來就是要進行鎖的擷取,或者說是通路控制了,隻有一個線程能夠在同一時刻繼續的運作,而其他的進入等待狀态。而每個線程都是一個獨立的個體,它們自省的觀察,當條件滿足的時候(自己的前驅是頭結點并且原子性的擷取了狀态),那麼這個線程能夠繼續運作。

<code>final</code> <code>boolean</code> <code>acquireQueued(</code><code>final</code> <code>Node node, </code><code>int</code> <code>arg) {</code>

<code>    </code><code>boolean</code> <code>failed = </code><code>true</code><code>;</code>

<code>    </code><code>try</code> <code>{</code>

<code>        </code><code>boolean</code> <code>interrupted = </code><code>false</code><code>;</code>

<code>        </code><code>for</code> <code>(;;) {</code>

<code>            </code><code>final</code> <code>Node p = node.predecessor();</code>

<code>            </code><code>if</code> <code>(p == head &amp;amp;&amp;amp; tryAcquire(arg)) {</code>

<code>                </code><code>setHead(node);</code>

<code>                </code><code>p.next = </code><code>null</code><code>; </code><code>// help GC</code>

<code>                </code><code>failed = </code><code>false</code><code>;</code>

<code>                </code><code>return</code> <code>interrupted;</code>

<code>            </code><code>}</code>

<code>            </code><code>if</code> <code>(shouldParkAfterFailedAcquire(p, node) &amp;amp;&amp;amp;</code>

<code>                    </code><code>parkAndCheckInterrupt())</code>

<code>                </code><code>interrupted = </code><code>true</code><code>;</code>

<code>                </code><code>}</code>

<code>    </code><code>} </code><code>finally</code> <code>{</code>

<code>        </code><code>if</code> <code>(failed)</code>

<code>            </code><code>cancelAcquire(node);</code>

1. 擷取目前節點的前驅節點;

需要擷取目前節點的前驅節點,而頭結點所對應的含義是目前站有鎖且正在運作。

2. 目前驅節點是頭結點并且能夠擷取狀态,代表該目前節點占有鎖;

如果滿足上述條件,那麼代表能夠占有鎖,根據節點對鎖占有的含義,設定頭結點為目前節點。

3. 否則進入等待狀态。

如果沒有輪到目前節點運作,那麼将目前線程從線程排程器上摘下,也就是進入等待狀态。

這裡針對acquire做一下總結:

1. 狀态的維護;

需要在鎖定時,需要維護一個狀态(int類型),而對狀态的操作是原子和非阻塞的,通過同步器提供的對狀态通路的方法對狀态進行操縱,并且利用compareAndSet來確定原子性的修改。

2. 狀态的擷取;

一旦成功的修改了狀态,目前線程或者說節點,就被設定為頭節點。

3. sync隊列的維護。

在擷取資源未果的過程中條件不符合的情況下(不該自己,前驅節點不是頭節點或者沒有擷取到資源)進入睡眠狀态,停止線程排程器對目前節點線程的排程。

這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關鍵,就是前驅節點的通知,而這一個過程就是釋放,釋放會通知它的後繼節點從睡眠中傳回準備運作。

下面的流程圖基本描述了一次acquire所需要經曆的過程:

AbstractQueuedSynchronizer的介紹和原理分析(轉)簡介 API說明 示例 實作分析 一個例子

如上圖所示,其中的判定退出隊列的條件,判定條件是否滿足和休眠目前線程就是完成了自旋spin的過程。

在unlock方法的實作中,使用了同步器的release方法。相對于在之前的acquire方法中可以得出調用acquire,保證能夠擷取到鎖(成功擷取狀态),而release則表示将狀态設定回去,也就是将資源釋放,或者說将鎖釋放。

<code>public</code> <code>final</code> <code>boolean</code> <code>release(</code><code>int</code> <code>arg) {</code>

<code>    </code><code>if</code> <code>(tryRelease(arg)) {</code>

<code>        </code><code>Node h = head;</code>

<code>        </code><code>if</code> <code>(h != </code><code>null</code> <code>&amp;amp;&amp;amp; h.waitStatus != </code><code>0</code><code>)</code>

<code>            </code><code>unparkSuccessor(h);</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>8</code>

<code>    </code><code>return</code> <code>false</code><code>;</code>

<code>9</code>

1. 嘗試釋放狀态;

tryRelease能夠保證原子化的将狀态設定回去,當然需要使用compareAndSet來保證。如果釋放狀态成功過之後,将會進入後繼節點的喚醒過程。

2. 喚醒目前節點的後繼節點所包含的線程。

通過LockSupport的unpark方法将休眠中的線程喚醒,讓其繼續acquire狀态。

<code>private</code> <code>void</code> <code>unparkSuccessor(Node node) {</code>

<code>    </code><code>// 将狀态設定為同步狀态</code>

<code>    </code><code>int</code> <code>ws = node.waitStatus;</code>

<code>    </code><code>if</code> <code>(ws &amp;lt; </code><code>0</code><code>)      compareAndSetWaitStatus(node, ws, </code><code>0</code><code>);   </code><code>// 擷取目前節點的後繼節點,如果滿足狀态,那麼進行喚醒操作  // 如果沒有滿足狀态,從尾部開始找尋符合要求的節點并将其喚醒     Node s = node.next;     if (s == null || s.waitStatus &amp;gt; 0) {</code>

<code>        </code><code>s = </code><code>null</code><code>;</code>

<code>        </code><code>for</code> <code>(Node t = tail; t != </code><code>null</code> <code>&amp;amp;&amp;amp; t != node; t = t.prev)</code>

<code>            </code><code>if</code> <code>(t.waitStatus &amp;lt;= </code><code>0</code><code>)</code>

<code>                </code><code>s = t;</code>

<code>    </code><code>if</code> <code>(s != </code><code>null</code><code>)</code>

<code>        </code><code>LockSupport.unpark(s.thread);</code>

上述邏輯主要包括,該方法取出了目前節點的next引用,然後對其線程(Node)進行了喚醒,這時就隻有一個或合理個數的線程被喚醒,被喚醒的線程繼續進行對資源的擷取與争奪。

回顧整個資源的擷取和釋放過程:

在擷取時,維護了一個sync隊列,每個節點都是一個線程在進行自旋,而依據就是自己是否是首節點的後繼并且能夠擷取資源;

在釋放時,僅僅需要将資源還回去,然後通知一下後繼節點并将其喚醒。

這裡需要注意,隊列的維護(首節點的更換)是依靠消費者(擷取時)來完成的,也就是說在滿足了自旋退出的條件時的一刻,這個節點就會被設定成為首節點。

tryAcquire是自定義同步器需要實作的方法,也就是自定義同步器非阻塞原子化的擷取狀态,如果鎖該方法一般用于Lock的tryLock實作中,這個特性是synchronized無法提供的。

該方法提供擷取狀态能力,當然在無法擷取狀态的情況下會進入sync隊列進行排隊,這類似acquire,但是和acquire不同的地方在于它能夠在外界對目前線程進行中斷的時候提前結束擷取狀态的操作,換句話說,就是在類似synchronized擷取鎖時,外界能夠對目前線程進行中斷,并且擷取鎖的這個操作能夠響應中斷并提前傳回。一個線程處于synchronized塊中或者進行同步I/O操作時,對該線程進行中斷操作,這時該線程的中斷辨別位被設定為true,但是線程依舊繼續運作。

如果在擷取一個通過網絡互動實作的鎖時,這個鎖資源突然進行了銷毀,那麼使用acquireInterruptibly的擷取方式就能夠讓該時刻嘗試擷取鎖的線程提前傳回。而同步器的這個特性被實作Lock接口中的lockInterruptibly方法。根據Lock的語義,在被中斷時,lockInterruptibly将會抛出InterruptedException來告知使用者。

<code>public</code> <code>final</code> <code>void</code> <code>acquireInterruptibly(</code><code>int</code> <code>arg)</code>

<code>    </code><code>throws</code> <code>InterruptedException {</code>

<code>    </code><code>if</code> <code>(Thread.interrupted())</code>

<code>        </code><code>throw</code> <code>new</code> <code>InterruptedException();</code>

<code>    </code><code>if</code> <code>(!tryAcquire(arg))</code>

<code>        </code><code>doAcquireInterruptibly(arg);</code>

<code>private</code> <code>void</code> <code>doAcquireInterruptibly(</code><code>int</code> <code>arg)</code>

<code>    </code><code>final</code> <code>Node node = addWaiter(Node.EXCLUSIVE);</code>

<code>                </code><code>return</code><code>;</code>

<code>            </code><code>// 檢測中斷标志位</code>

<code>            </code><code>parkAndCheckInterrupt())</code>

<code>                </code><code>throw</code> <code>new</code> <code>InterruptedException();</code>

1. 檢測目前線程是否被中斷;

判斷目前線程的中斷标志位,如果已經被中斷了,那麼直接抛出異常并将中斷标志位設定為false。

2. 嘗試擷取狀态;

調用tryAcquire擷取狀态,如果順利會擷取成功并傳回。

3. 構造節點并加入sync隊列;

擷取狀态失敗後,将目前線程引用構造為節點并加入到sync隊列中。退出隊列的方式在沒有中斷的場景下和acquireQueued類似,當頭結點是自己的前驅節點并且能夠擷取到狀态時,即可以運作,當然要将本節點設定為頭結點,表示正在運作。

4. 中斷檢測。

在每次被喚醒時,進行中斷檢測,如果發現目前線程被中斷,那麼抛出InterruptedException并退出循環。

該方法提供了具備有逾時功能的擷取狀态的調用,如果在指定的nanosTimeout内沒有擷取到狀态,那麼傳回false,反之傳回true。可以将該方法看做acquireInterruptibly的更新版,也就是在判斷是否被中斷的基礎上增加了逾時控制。

針對逾時控制這部分的實作,主要需要計算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(目前時間)+ lastTime(睡眠之前記錄的時間)。如果nanosTimeout大于0,那麼還需要使目前線程睡眠,反之則傳回false。

<code>private</code> <code>boolean</code> <code>doAcquireNanos(</code><code>int</code> <code>arg, </code><code>long</code> <code>nanosTimeout)</code>

<code>throws</code> <code>InterruptedException {</code>

<code>    </code><code>long</code> <code>lastTime = System.nanoTime();</code>

<code>                </code><code>return</code> <code>true</code><code>;</code>

<code>            </code><code>if</code> <code>(nanosTimeout &amp;lt;= </code><code>0</code><code>)               </code><code>return</code> <code>false</code><code>;           </code><code>if</code><code>(shouldParkAfterFailedAcquire(p, node) &amp;amp;&amp;amp; nanosTimeout &amp;gt; spinForTimeoutThreshold)</code>

<code>            </code><code>LockSupport.parkNanos(</code><code>this</code><code>, nanosTimeout);</code>

<code>            </code><code>long</code> <code>now = System.nanoTime();</code>

<code>            </code><code>//計算時間,目前時間減去睡眠之前的時間得到睡眠的時間,然後被</code>

<code>            </code><code>//原有逾時時間減去,得到了還應該睡眠的時間</code>

<code>            </code><code>nanosTimeout -= now - lastTime;</code>

<code>            </code><code>lastTime = now;</code>

<code>            </code><code>if</code> <code>(Thread.interrupted())</code>

1. 加入sync隊列;

将目前線程構造成為節點Node加入到sync隊列中。

2. 條件滿足直接傳回;

退出條件判斷,如果前驅節點是頭結點并且成功擷取到狀态,那麼設定自己為頭結點并退出,傳回true,也就是在指定的nanosTimeout之前擷取了鎖。

3. 擷取狀态失敗休眠一段時間;

通過LockSupport.unpark來指定目前線程休眠一段時間。

4. 計算再次休眠的時間;

喚醒後的線程,計算仍需要休眠的時間,該時間表示為nanosTimeout = 原有nanosTimeout – now(目前時間)+ lastTime(睡眠之前記錄的時間)。其中now – lastTime表示這次睡眠所持續的時間。

5. 休眠時間的判定。

喚醒後的線程,計算仍需要休眠的時間,并無阻塞的嘗試再擷取狀态,如果失敗後檢視其nanosTimeout是否大于0,如果小于0,那麼傳回完全逾時,沒有擷取到鎖。 如果nanosTimeout小于等于1000L納秒,則進入快速的自旋過程。那麼快速自旋會造成處理器資源緊張嗎?結果是不會,經過測算,開銷看起來很小,幾乎微乎其微。Doug Lea應該測算了線上程排程器上的切換造成的額外開銷,是以在短時1000納秒内就讓目前線程進入快速自旋狀态,如果這時再休眠相反會讓nanosTimeout的擷取時間變得更加不精确。

上述過程可以如下圖所示:

AbstractQueuedSynchronizer的介紹和原理分析(轉)簡介 API說明 示例 實作分析 一個例子

上述這個圖中可以了解為在類似擷取狀态需要排隊的基礎上增加了一個逾時控制的邏輯。每次逾時的時間就是目前逾時剩餘的時間減去睡眠的時間,而在這個逾時時間的基礎上進行了判斷,如果大于0那麼繼續睡眠(等待),可以看出這個逾時版本的擷取狀态隻是一個近似逾時的擷取狀态,是以任何含有逾時的調用基本結果就是近似于給定逾時。

調用該方法能夠以共享模式擷取狀态,共享模式和之前的獨占模式有所差別。以檔案的檢視為例,如果一個程式在對其進行讀取操作,那麼這一時刻,對這個檔案的寫操作就被阻塞,相反,這一時刻另一個程式對其進行同樣的讀操作是可以進行的。如果一個程式在對其進行寫操作,那麼所有的讀與寫操作在這一時刻就被阻塞,直到這個程式完成寫操作。

以讀寫場景為例,描述共享和獨占的通路模式,如下圖所示:

AbstractQueuedSynchronizer的介紹和原理分析(轉)簡介 API說明 示例 實作分析 一個例子

上圖中,紅色代表被阻塞,綠色代表可以通過。

<code>public</code> <code>final</code> <code>void</code> <code>acquireShared(</code><code>int</code> <code>arg) {</code>

<code>    </code><code>if</code> <code>(tryAcquireShared(arg) &amp;lt; </code><code>0</code><code>)   doAcquireShared(arg); } </code><code>private</code> <code>void</code><code>doAcquireShared(</code><code>int</code> <code>arg) {     </code><code>final</code> <code>Node node = addWaiter(Node.SHARED);   </code><code>boolean</code> <code>failed = </code><code>true</code><code>;  </code><code>try</code> <code>{       </code><code>boolean</code> <code>interrupted = </code><code>false</code><code>;        </code><code>for</code> <code>(;;) {          </code><code>final</code> <code>Node p = node.predecessor();          </code><code>if</code> <code>(p == head) {                </code><code>int</code> <code>r = tryAcquireShared(arg);              </code><code>if</code> <code>(r &amp;gt;= </code><code>0</code><code>) {</code>

<code>                    </code><code>setHeadAndPropagate(node, r);</code>

<code>                    </code><code>p.next = </code><code>null</code><code>; </code><code>// help GC</code>

<code>                    </code><code>if</code> <code>(interrupted)</code>

<code>                        </code><code>selfInterrupt();</code>

<code>                    </code><code>failed = </code><code>false</code><code>;</code>

<code>                    </code><code>return</code><code>;</code>

<code>parkAndCheckInterrupt())</code>

<code>            </code><code>interrupted = </code><code>true</code><code>;</code>

1. 嘗試擷取共享狀态;

調用tryAcquireShared來擷取共享狀态,該方法是非阻塞的,如果擷取成功則立刻傳回,也就表示擷取共享鎖成功。

2. 擷取失敗進入sync隊列;

在擷取共享狀态失敗後,目前時刻有可能是獨占鎖被其他線程所把持,那麼将目前線程構造成為節點(共享模式)加入到sync隊列中。

3. 循環内判斷退出隊列條件;

如果目前節點的前驅節點是頭結點并且擷取共享狀态成功,這裡和獨占鎖acquire的退出隊列條件類似。

4. 擷取共享狀态成功;

在退出隊列的條件上,和獨占鎖之間的主要差別在于擷取共享狀态成功之後的行為,而如果共享狀态擷取成功之後會判斷後繼節點是否是共享模式,如果是共享模式,那麼就直接對其進行喚醒操作,也就是同時激發多個線程并發的運作。

5. 擷取共享狀态失敗。

通過使用LockSupport将目前線程從線程排程器上摘下,進入休眠狀态。

對于上述邏輯中,節點之間的通知過程如下圖所示:

AbstractQueuedSynchronizer的介紹和原理分析(轉)簡介 API說明 示例 實作分析 一個例子

上圖中,綠色表示共享節點,它們之間的通知和喚醒操作是在前驅節點擷取狀态時就進行的,紅色表示獨占節點,它的被喚醒必須取決于前驅節點的釋放,也就是release操作,可以看出來圖中的獨占節點如果要運作,必須等待前面的共享節點均釋放了狀态才可以。而獨占節點如果擷取了狀态,那麼後續的獨占式擷取和共享式擷取均被阻塞。

調用該方法釋放共享狀态,每次擷取共享狀态acquireShared都會操作狀态,同樣在共享鎖釋放的時候,也需要将狀态釋放。比如說,一個限定一定數量通路的同步工具,每次擷取都是共享的,但是如果超過了一定的數量,将會阻塞後續的擷取操作,隻有當之前擷取的消費者将狀态釋放才可以使阻塞的擷取操作得以運作。

<code>public</code> <code>final</code> <code>boolean</code> <code>releaseShared(</code><code>int</code> <code>arg) {</code>

<code>    </code><code>if</code> <code>(tryReleaseShared(arg)) {</code>

<code>        </code><code>doReleaseShared();</code>

上述邏輯主要就是調用同步器的tryReleaseShared方法來釋放狀态,并同時在doReleaseShared方法中喚醒其後繼節點。

在上述對同步器AbstractQueuedSynchronizer進行了實作層面的分析之後,我們通過一個例子來加深對同步器的了解:

設計一個同步工具,該工具在同一時刻,隻能有兩個線程能夠并行通路,超過限制的其他線程進入阻塞狀态。

對于這個需求,可以利用同步器完成一個這樣的設定,定義一個初始狀态,為2,一個線程進行擷取那麼減1,一個線程釋放那麼加1,狀态正确的範圍在[0,1,2]三個之間,當在0時,代表再有新的線程對資源進行擷取時隻能進入阻塞狀态(注意在任何時候進行狀态變更的時候均需要以CAS作為原子性保障)。由于資源的數量多于1個,同時可以有兩個線程占有資源,是以需要實作tryAcquireShared和tryReleaseShared方法,這裡謝謝luoyuyou和同僚小明指正,已經修改了實作。

<code>public</code> <code>class</code> <code>TwinsLock </code><code>implements</code> <code>Lock {</code>

<code>    </code><code>private</code> <code>final</code> <code>Sync  sync    = </code><code>new</code> <code>Sync(</code><code>2</code><code>);</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>class</code> <code>Sync </code><code>extends</code> <code>AbstractQueuedSynchronizer {</code>

<code>        </code><code>private</code> <code>static</code> <code>final</code> <code>long</code>   <code>serialVersionUID    = -7889272986162341211L;</code>

<code>        </code><code>Sync(</code><code>int</code> <code>count) {</code>

<code>            </code><code>if</code> <code>(count &lt;= </code><code>0</code><code>) {</code>

<code>                </code><code>throw</code> <code>new</code> <code>IllegalArgumentException(</code><code>"count must large than zero."</code><code>);</code>

<code>            </code><code>setState(count);</code>

<code>        </code><code>public</code> <code>int</code> <code>tryAcquireShared(</code><code>int</code> <code>reduceCount) {</code>

<code>            </code><code>for</code> <code>(;;) {</code>

<code>                </code><code>int</code> <code>current = getState();</code>

<code>                </code><code>int</code> <code>newCount = current - reduceCount;</code>

<code>                </code><code>if</code> <code>(newCount &lt; </code><code>0</code> <code>|| compareAndSetState(current, newCount)) {</code>

<code>                    </code><code>return</code> <code>newCount;</code>

<code>        </code><code>public</code> <code>boolean</code> <code>tryReleaseShared(</code><code>int</code> <code>returnCount) {</code>

<code>                </code><code>int</code> <code>newCount = current + returnCount;</code>

<code>                </code><code>if</code> <code>(compareAndSetState(current, newCount)) {</code>

<code>                    </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>public</code> <code>void</code> <code>lock() {</code>

<code>        </code><code>sync.acquireShared(</code><code>1</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>lockInterruptibly() </code><code>throws</code> <code>InterruptedException {</code>

<code>        </code><code>sync.acquireSharedInterruptibly(</code><code>1</code><code>);</code>

<code>    </code><code>public</code> <code>boolean</code> <code>tryLock() {</code>

<code>44</code>

<code>        </code><code>return</code> <code>sync.tryAcquireShared(</code><code>1</code><code>) &gt;= </code><code>0</code><code>;</code>

<code>45</code>

<code>46</code>

<code>47</code>

<code>    </code><code>public</code> <code>boolean</code> <code>tryLock(</code><code>long</code> <code>time, TimeUnit unit) </code><code>throws</code> <code>InterruptedException {</code>

<code>48</code>

<code>        </code><code>return</code> <code>sync.tryAcquireSharedNanos(</code><code>1</code><code>, unit.toNanos(time));</code>

<code>49</code>

<code>50</code>

<code>51</code>

<code>    </code><code>public</code> <code>void</code> <code>unlock() {</code>

<code>52</code>

<code>        </code><code>sync.releaseShared(</code><code>1</code><code>);</code>

<code>53</code>

<code>54</code>

<code>55</code>

<code>    </code><code>@Override</code>

<code>56</code>

<code>    </code><code>public</code> <code>Condition newCondition() {</code>

<code>57</code>

<code>        </code><code>return</code> <code>null</code><code>;</code>

<code>58</code>

<code>59</code>

這裡我們編寫一個測試來驗證TwinsLock是否能夠正常工作并達到預期。

<code>public</code> <code>class</code> <code>TwinsLockTest {</code>

<code>    </code><code>@Test</code>

<code>    </code><code>public</code> <code>void</code> <code>test() {</code>

<code>        </code><code>final</code> <code>Lock lock = </code><code>new</code> <code>TwinsLock();</code>

<code>        </code><code>class</code> <code>Worker </code><code>extends</code> <code>Thread {</code>

<code>            </code><code>public</code> <code>void</code> <code>run() {</code>

<code>                </code><code>while</code> <code>(</code><code>true</code><code>) {</code>

<code>                    </code><code>lock.lock();</code>

<code>                    </code><code>try</code> <code>{</code>

<code>                        </code><code>Thread.sleep(1000L);</code>

<code>                </code><code>System.out.println(Thread.currentThread());</code>

<code>                    </code><code>} </code><code>catch</code> <code>(Exception ex) {</code>

<code>                    </code><code>} </code><code>finally</code> <code>{</code>

<code>                        </code><code>lock.unlock();</code>

<code>                    </code><code>}</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &amp;lt; </code><code>10</code><code>; i++) {</code>

<code>            </code><code>Worker w = </code><code>new</code> <code>Worker();</code>

<code>            </code><code>w.start();</code>

<code>        </code><code>new</code> <code>Thread() {</code>

<code>                        </code><code>Thread.sleep(200L);</code>

<code>                        </code><code>System.out.println();</code>

<code>        </code><code>}.start();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>Thread.sleep(20000L);</code>

<code>        </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>

<code>            </code><code>e.printStackTrace();</code>

上述測試用例的邏輯主要包括:

​1. 列印線程

Worker在兩次睡眠之間列印自身線程,如果一個時刻隻能有兩個線程同時通路,那麼列印出來的内容将是成對出現。

​2. 分隔線程

不停的列印換行,能讓Worker的輸出看起來更加直覺。

該測試的結果是在一個時刻,僅有兩個線程能夠獲得到鎖,并完成列印,而表象就是列印的内容成對出現。

http://ifeve.com/introduce-abstractqueuedsynchronizer/

http://blog.csdn.net/ksgt00016758/article/details/38751429