目錄
1.引言
1.1 類比synchronized
2.使用
2.1 同步
2.2 通信協同
3.實作
2.1 類圖
2.2 核心方法
2.2.1 構造
2.2.2 lock
2.3 AQS
2.3.1 字段分析
2.3.2 lock()方法
2.3.3 release(int)
1.引言
在Java多線程中,可以使用synchronized關鍵字來實作線程之間同步互斥,但在JDK1.5中新增加了ReentrantLock類也能達到同樣的效果,并且在擴充功能上也更加強大,比如具有嗅探鎖定、多路分支通知等功能,而且在使用上也比synchronized更加的靈活。
類ReentrantLock具有完全互斥排他的效果,即同一時間隻有一個線程在執行ReentrantLock.lock()方法後面的任務。雖然保證了執行個體變量的線程安全性,但效率卻是非常低下的
1.1 類比synchronized
synchronized | ReentrantLock | ||
---|---|---|---|
相同點 | 可重入 比如一個線程獲得了某個對象的鎖,此時這個對象鎖還沒有釋放,當其再次想要擷取這個對象的鎖的時候還是可以擷取的 | ||
不同點 | 實作 | JVM實作 | JDK API (java.util.concurrent包) |
使用 | 使用簡單 通信機制配合 synchronized與wait()和notify()/notifyAll() | 使用略複雜 需要 lock() 和 unlock() 方法配合 try/finally 語句塊來完成 通信配合 Condition await和signal | |
功能 | 功能簡單單一 不可中斷 僅支援非公平鎖 | 進階功能 等待可中斷; lock.lockInterruptibly()等待的線程可選擇放棄等待 可實作公平鎖;(公平鎖就是先等待的線程先獲得鎖,預設情況是非公平, 構造方法) 可實作選擇性通知(鎖可以綁定多個條件)ReentrantLock與Condition執行個體可以實作“選擇性通知” |
2.使用
2.1 同步
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock(); // 3個線程競争的鎖
// 啟動線程1
new Thread(()->{
try {
lock.lock();
Thread.sleep(20000); // 休眠20s後釋放鎖(sleep鎖不釋放)
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread1").start();
Thread.sleep(5000); // 主線程休眠5s
// 啟動線程2
new Thread(()->{
try {
lock.lock();
Thread.sleep(30000); // 休眠30s後釋放鎖(sleep鎖不釋放)
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread2").start();
Thread.sleep(5000); // 主線程休眠5s
// 啟動線程3
new Thread(()->{
try {
lock.lock();
Thread.sleep(30000); // 休眠30s後釋放鎖(sleep鎖不釋放)
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread3").start();
}
2.2 通信協同
配合try finally,可以搭配多個condition
public class ConditionUseCase {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await(); // 條件不滿足時等待
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal(); // 條件滿足時喚醒
} finally {
lock.unlock();
}
}
}
3.實作
2.1 類圖
從類圖可以看出底層基于AQS實作,ReentrantLock的lock等方法,委托給其依賴sync的lock方法
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLzkEVOhWNXFWbG12Yo50MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL4gjM4QzMwEjMxMzMwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
2.2 核心方法
2.2.1 構造
預設無參構造使用非公平鎖實作,可以傳入參數選擇使用公平鎖。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2.2.2 lock
依賴sync的實作,常用非公平鎖的實作,如下圖Sync繼承自AQS(AbstractQueuedSynchronizer, 隊列同步器),先利用CAS(state, 0, 1)操作(類似于synchronized偏向鎖優化),下一節會詳細介紹AQS實作
2.3 AQS
AQS核心思想是,如果被請求的共享資源空閑,則将目前線程設定為有效的工作線程,并且将共享資源設定為鎖定狀态state。
如果被請求的共享資源被占用,那麼就需要一套線程阻塞等待以及被喚醒時鎖配置設定的機制,将暫時擷取不到鎖的線程加入到隊列中。
2.3.1 字段分析
- state 鎖狀态
- exclusiveOwnerThread 持有鎖的線程
- head、tail 連結清單 等待隊列
- 連結清單節點 等待狀态waitStatus
- 0,初始值,通過CAS被修改
- CANCELLED=1, 中斷取消,不再等待鎖了,大于0表示已經沒必要被通知了
- SIGNAL=-1, 信号,通知後繼節點,我釋放或者中斷放棄了
- CONDITION=-2, 目前線程等待在Condition上
- PROPAGATE=-3, 傳播 頭節點在共享鎖釋放時
- 前驅後繼節點
- 等待的線程
- 連結清單節點 等待狀态waitStatus
| 等待隊列節點類Node,等待隊列是“ CLH”(Craig,Landin和 Hagersten)鎖定隊列的變體,CLH鎖通常用于自旋鎖。 每個節點中的“狀态waitStatus”字段跟蹤線程是否應阻塞 節點的前驅節點釋放時會signal後面的節點,每個節點都監聽該信号signalAll(),單個等待線程signal() |
|
2.3.2 lock()方法
lock方法的時序如下:
// NonfairSync 利用CAS synchronized偏向鎖類似
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread()); // 對應于偏向鎖
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
lock | acquire |
| |
方法 | 解析 |
---|---|
1.addWaiter Node node = new Node(Thread.currentThread(), mode); compareAndSetTail(pred, node) | 目前線程建立新的節點,插入到隊尾 |
2.acquireQueued | 排隊擷取鎖, for循環自旋擷取,如果阻塞了,循環也暫停,如果不阻塞繼續擷取,擷取到了return 僅目前驅節點是哨兵頭,才去擷取鎖CAS 此處擷取不成功,由于鎖被線程1占有 shouldParkAfterFailedAcquire&&parkAndCheckInterrupt 設前驅為signal , 并parkAndCheckInterrupt阻塞目前線程(重量級) 此時線程的執行暫停了 |
3.selfInterrupt | 中斷目前線程 |
以上一節中3個線程競争鎖的代碼為例子,
線程1在lock時,鎖空閑,CAS可以成功,并将鎖owner設為線程1
線程2在lock時,鎖已經被線程1占有,acquire(1)搶占鎖,
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.3.3 release(int)
ReentrantLock unlock委托給AQS的release(1)
public void unlock() {//ReentrantLock
sync.release(1);
}
public final boolean release(int arg) {//AQS
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
方法 | 解析 |
---|---|
1.線程1 tryRelease | 将exclusiveOwnerThread置空,并修改state (由于隻有一個線程能進入該代碼,無需并發控制) |
2.線程1 unparkSuccessor private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } | 找到第一個未中斷取消的中繼節點thread2,将其喚醒uppark |
3.線程2繼續執行acquireQueued的for循環 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } | 此時線程2 tryAcquire可以獲得鎖,競争鎖成功return |