一、ReentrantLock 類
1.1 什麼是reentrantlock
java.util.concurrent.lock 中的 Lock 架構是鎖定的一個抽象,它允許把鎖定的實作作為 Java 類,而不是作為語言的特性來實作。這就為 Lock 的多種實作留下了空間,各種實作可能有不同的排程算法、性能特性或者鎖定語義。 ReentrantLock 類實作了 Lock ,它擁有與 synchronized 相同的并發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈争用情況下更佳的性能。(換句話說,當許多線程都想通路共享資源時,JVM 可以花更少的時候來排程線程,把更多時間用在執行線程上。)
reentrant 鎖意味着什麼呢?簡單來說,它有一個與鎖相關的擷取計數器,如果擁有鎖的某個線程再次得到鎖,那麼擷取計數器就加1,然後鎖需要被釋放兩次才能獲得真正釋放。這模仿了 synchronized 的語義;如果線程進入由線程已經擁有的監控器保護的 synchronized 塊,就允許線程繼續進行,當線程退出第二個(或者後續) synchronized 塊的時候,不釋放鎖,隻有線程退出它進入的監控器保護的第一個 synchronized 塊時,才釋放鎖。
1.2 ReentrantLock與synchronized的比較
相同:ReentrantLock提供了synchronized類似的功能和記憶體語義。
不同:
(1)ReentrantLock功能性方面更全面,比如時間鎖等候,可中斷鎖等候,鎖投票等,是以更有擴充性。在多個條件變量和高度競争鎖的地方,用ReentrantLock更合适,ReentrantLock還提供了Condition,對線程的等待和喚醒等操作更加靈活,一個ReentrantLock可以有多個Condition執行個體,是以更有擴充性。
(2)ReentrantLock 的性能比synchronized會好點。
(3)ReentrantLock提供了可輪詢的鎖請求,他可以嘗試的去取得鎖,如果取得成功則繼續處理,取得不成功,可以等下次運作的時候處理,是以不容易産生死鎖,而synchronized則一旦進入鎖請求要麼成功,要麼一直阻塞,是以更容易産生死鎖。
1.3 ReentrantLock擴充的功能
1.3.1 實作可輪詢的鎖請求
在内部鎖中,死鎖是緻命的——唯一的恢複方法是重新啟動程式,唯一的預防方法是在建構程式時不要出錯。而可輪詢的鎖擷取模式具有更完善的錯誤恢複機制,可以規避死鎖的發生。
如果你不能獲得所有需要的鎖,那麼使用可輪詢的擷取方式使你能夠重新拿到控制權,它會釋放你已經獲得的這些鎖,然後再重新嘗試。可輪詢的鎖擷取模式,由tryLock()方法實作。此方法僅在調用時鎖為空閑狀态才擷取該鎖。如果鎖可用,則擷取鎖,并立即傳回值true。如果鎖不可用,則此方法将立即傳回值false。此方法的典型使用語句如下:
Lock lock = ...;
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
1.3.2 實作可定時的鎖請求
當使用内部鎖時,一旦開始請求,鎖就不能停止了,是以内部鎖給實作具有時限的活動帶來了風險。為了解決這一問題,可以使用定時鎖。當具有時限的活
動調用了阻塞方法,定時鎖能夠在時間預算内設定相應的逾時。如果活動在期待的時間内沒能獲得結果,定時鎖能使程式提前傳回。可定時的鎖擷取模式,由tryLock(long, TimeUnit)方法實作。
1.3.3 實作可中斷的鎖擷取請求
可中斷的鎖擷取操作允許在可取消的活動中使用。lockInterruptibly()方法能夠使你獲得鎖的時候響應中斷。
1.4 ReentrantLock不好與需要注意的地方
(1) lock 必須在 finally 塊中釋放。否則,如果受保護的代碼将抛出異常,鎖就有可能永遠得不到釋放!這一點差別看起來可能沒什麼,但是實際上,它極為重要。忘記在 finally 塊中釋放鎖,可能會在程式中留下一個定時炸彈,當有一天炸彈爆炸時,您要花費很大力氣才有找到源頭在哪。而使用同步,JVM 将確定鎖會獲得自動釋放 (2) 當 JVM 用 synchronized 管理鎖定請求和釋放時,JVM 在生成線程轉儲時能夠包括鎖定資訊。這些對調試非常有價值,因為它們能辨別死鎖或者其他異常行為的來源。 Lock 類隻是普通的類,JVM 不知道具體哪個線程擁有 Lock 對象。
二、條件變量Condition
條件變量很大一個程度上是為了解決Object.wait/notify/notifyAll難以使用的問題。
條件(也稱為條件隊列 或條件變量)為線程提供了一個含義,以便在某個狀态條件現在可能為 true 的另一個線程通知它之前,一直挂起該線程(即讓其“等待”)。因為通路此共享狀态資訊發生在不同的線程中,是以它必須受保護,是以要将某種形式的鎖與該條件相關聯。等待提供一個條件的主要屬性是:以原子方式 釋放相關的鎖,并挂起目前線程,就像
Object.wait
做的那樣。
上述API說明表明條件變量需要與鎖綁定,而且多個Condition需要綁定到同一鎖上。前面的Lock中提到,擷取一個條件變量的方法是Lock.newCondition()。
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
以上是Condition接口定義的方法,await*對應于Object.wait,signal對應于Object.notify,signalAll對應于Object.notifyAll。特别說明的是Condition的接口改變名稱就是為了避免與Object中的wait/notify/notifyAll的語義和使用上混淆,因為Condition同樣有wait/notify/notifyAll方法。
每一個Lock可以有任意資料的Condition對象,Condition是與Lock綁定的,是以就有Lock的公平性特性:如果是公平鎖,線程為按照FIFO的順序從Condition.await中釋放,如果是非公平鎖,那麼後續的鎖競争就不保證FIFO順序了。
一個使用Condition實作生産者消費者的模型例子如下。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProductQueue<T> { private final T[] items; private final Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); // private int head, tail, count; public ProductQueue(int maxSize) { items = (T[]) new Object[maxSize]; } public ProductQueue() { this(10); } public void put(T t) throws InterruptedException { lock.lock(); try { while (count == getCapacity()) { notFull.await(); } items[tail] = t; if (++tail == getCapacity()) { tail = 0; } ++count; notEmpty.signalAll(); } finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } T ret = items[head]; items[head] = null;//GC // if (++head == getCapacity()) { head = 0; } --count; notFull.signalAll(); return ret; } finally { lock.unlock(); } } public int getCapacity() { return items.length; } public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } }
在這個例子中消費take()需要 隊列不為空,如果為空就挂起(await()),直到收到notEmpty的信号;生産put()需要隊列不滿,如果滿了就挂起(await()),直到收到notFull的信号。
可能有人會問題,如果一個線程lock()對象後被挂起還沒有unlock,那麼另外一個線程就拿不到鎖了(lock()操作會挂起),那麼就無法通知(notify)前一個線程,這樣豈不是“死鎖”了?
2.1 await* 操作
上一節中說過多次ReentrantLock是獨占鎖,一個線程拿到鎖後如果不釋放,那麼另外一個線程肯定是拿不到鎖,是以在lock.lock()和lock.unlock()之間可能有一次釋放鎖的操作(同樣也必然還有一次擷取鎖的操作)。我們再回頭看代碼,不管take()還是put(),在進入lock.lock()後唯一可能釋放鎖的操作就是await()了。也就是說await()操作實際上就是釋放鎖,然後挂起線程,一旦條件滿足就被喚醒,再次擷取鎖!
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
上面是await()的代碼片段。上一節中說過,AQS在擷取鎖的時候需要有一個CHL的FIFO隊列,是以對于一個Condition.await()而言,如果釋放了鎖,要想再一次擷取鎖那麼就需要進入隊列,等待被通知擷取鎖。完整的await()操作是安裝如下步驟進行的:
-
- 将目前線程加入Condition鎖隊列。特别說明的是,這裡不同于AQS的隊列,這裡進入的是Condition的FIFO隊列。後面會具體談到此結構。進行2。
- 釋放鎖。這裡可以看到将鎖釋放了,否則别的線程就無法拿到鎖而發生死鎖。進行3。
- 自旋(while)挂起,直到被喚醒或者逾時或者CACELLED等。進行4。
- 擷取鎖(acquireQueued)。并将自己從Condition的FIFO隊列中釋放,表明自己不再需要鎖(我已經拿到鎖了)。
這裡再回頭介紹Condition的資料結構。我們知道一個Condition可以在多個地方被await*(),那麼就需要一個FIFO的結構将這些Condition串聯起來,然後根據需要喚醒一個或者多個(通常是所有)。是以在Condition内部就需要一個FIFO的隊列。
private transient Node firstWaiter; private transient Node lastWaiter;
上面的兩個節點就是描述一個FIFO的隊列。我們再結合前面提到的節點(Node)資料結構。我們就發現Node.nextWaiter就派上用場了!nextWaiter就是将一系列的Condition.await*串聯起來組成一個FIFO的隊列。
2.2 signal/signalAll 操作
await*()清楚了,現在再來看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await*()中FIFO隊列中第一個Node喚醒(或者全部Node)喚醒。盡管所有Node可能都被喚醒,但是要知道的是仍然隻有一個線程能夠拿到鎖,其它沒有拿到鎖的線程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
上面的代碼很容易看出來,signal就是喚醒Condition隊列中的第一個非CANCELLED節點線程,而signalAll就是喚醒所有非CANCELLED節點線程。當然了遇到CANCELLED線程就需要将其從FIFO隊列中剔除。
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int c = p.waitStatus; if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
上面就是喚醒一個await*()線程的過程,根據前面的小節介紹的,如果要unpark線程,并使線程拿到鎖,那麼就需要線程節點進入AQS的隊列。是以可以看到在LockSupport.unpark之前調用了enq(node)操作,将目前節點加入到AQS隊列。
參考:
《深入淺出 Java Concurrency》—鎖機制(一)Lock與ReentrantLock
http://blog.csdn.net/fg2006/article/details/6397894
Java多線程基礎總結七:ReentrantLock(2)
http://www.bianceng.cn/Programming/Java/201206/34155_2.htm
再談重入鎖--ReentrantLock
http://tenyears.iteye.com/blog/48750
深入淺出 Java Concurrency (9): 鎖機制 part 4
http://www.blogjava.net/xylz/archive/2010/07/08/325540.html