JUC:ReentrantLock
關鍵詞
- 互斥鎖:ReentrantLock
- 公平鎖和非公平鎖:ReentrantLock(CAS+AQS隊列)
- 可重入鎖:ReentrantLock(state變量+CAS操作)
- 響應中斷(一個線程擷取不到鎖,不會一直等下去)(tryLock方法,傳入時間參數,表示等待指定的時間)
一、概述
Concurrent 包中和互斥鎖(ReentrantLock)相關類之間的繼承層次
Lock是一個接口,其定義如下:
public interface Lock {
void lock(); //不能被中斷
void lockInterruptibly() throws InterruptedException; //可以被中斷
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
常用的方法是lock()/unlock()。lock()不能被中斷,對應的lockInterruptibly()可以被中斷。
1.2 ReentrantLock(可重入鎖)本身沒有代碼邏輯,實作都在其内部類Sync中:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
// ...
}
1.3 鎖的公平性vs.非公平性
Sync是一個抽象類,它有兩個子類FairSync與NonfairSync,分别對應公平鎖和非公平鎖。從下面的ReentrantLock構造方法可以看出,會傳入一個布爾類型的變量fair指定鎖是公平的還是非公平的,預設為非公平的。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
什麼叫公平鎖和非公平鎖呢?先舉個現實生活中的例子,一個人去火車站售票視窗買票,發現現場有人排隊,于是他排在隊伍末尾,遵循先到者優先服務的規則,這叫公平;如果他去了不排隊,直接沖到視窗買票,這叫作不公平。
對應到鎖的例子,一個新的線程來了之後,看到有很多線程在排隊,自己排到隊伍末尾,這叫公平;線程來了之後直接去搶鎖,這叫作不公平。預設設定的是非公平鎖,其實是為了提高效率,減少線程切換。
1.4 鎖實作的基本原理
Sync的父類AbstractQueuedSynchronizer經常被稱作隊列同步器(AQS),這個類非常重要,該類的父類是AbstractOwnableSynchronizer。
此處的鎖具備synchronized功能,即可以阻塞一個線程。為了實作一把具有阻塞或喚醒功能的鎖,需要幾個核心要素:
- 需要一個state變量,标記該鎖的狀态。state變量至少有兩個值:0、1。對state變量的操作,使用CAS保證線程安全。
- 需要記錄目前是哪個線程持有鎖。
- 需要底層支援對一個線程進行阻塞或喚醒操作。
- 需要有一個隊列維護所有阻塞的線程。這個隊列也必須是線程安全的無鎖隊列,也需要使用CAS。
針對要素1和2,在上面兩個類中有對應的展現:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// ...
private transient Thread exclusiveOwnerThread; // 記錄持有鎖的線程
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private volatile int state; // 記錄鎖的狀态,通過CAS修改state的值。
// ...
}
state取值不僅可以是0、1,還可以大于1,就是為了支援鎖的可重入性。例如,同樣一個線程,調用5次lock,state會變成5;然後調用5次unlock,state減為0。
- 當state=0時,沒有線程持有鎖,exclusiveOwnerThread=null;
- 當state=1時,有一個線程持有鎖,exclusiveOwnerThread=該線程;
- 當state > 1時,說明該線程重入了該鎖
對于要素3,Unsafe類提供了阻塞或喚醒線程的一對操作原語,也就是park/unpark。
public native void unpark(Object thread);
public native void park(boolean isAbsolute, long time);
有一個LockSupport的工具類,對這一對原語做了簡單封裝:
public class LockSupport {
// ...
private static final Unsafe U = Unsafe.getUnsafe();
public static void park() {
U.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
}
在目前線程中調用park(),該線程就會被阻塞;在另外一個線程中,調用unpark(Thread thread),傳入一個被阻塞的線程,就可以喚醒阻塞在park()地方的線程。
- unpark(Thread thread),它實作了一個線程對另外一個線程的“精準喚醒”。
- notify也隻是喚醒某一個線程,但無法指定具體喚醒哪個線程。
針對要素4,在AQS(AbstractQueuedSynchronizer)中利用雙向連結清單和CAS實作了一個阻塞隊列。如下所示:
public abstract class AbstractQueuedSynchronizer {
// ...
static final class Node {
volatile Thread thread; // 每個Node對應一個被阻塞的線程
volatile Node prev;
volatile Node next;
// ...
}
private transient volatile Node head;
private transient volatile Node tail;
// ...
}
阻塞隊列是整個AQS核心中的核心。如下圖所示,head指向雙向連結清單頭部,tail指向雙向連結清單尾部。
- 入隊就是把新的Node加到tail後面,然後對tail進行CAS操作;
- 出隊就是對head進行CAS操作,把head向後移一個位置。
初始的時候,head=tail=NULL;然後,在往隊列中加入阻塞的線程時,會建立一個空的Node,讓head和tail都指向這個空Node;之後,在後面加入被阻塞的線程對象。是以,當head=tail的時候,說明隊列為空。
1.5 公平與非公平的lock()實作差異
下面分析基于AQS,ReentrantLock在公平性和非公平性上的實作差異。
非公平鎖:直接使用目前線程擷取鎖,不排隊
公平鎖:判斷目前隊列中沒有等待線程,後使用目前線程擷取鎖,需要排隊
1.6 阻塞隊列與喚醒機制
下面進入鎖的最為關鍵的部分,即acquireQueued(…)方法内部一探究竟。
先說addWaiter(…)方法,就是為目前線程生成一個Node,然後把Node 放入雙向連結清單的尾部 。要注意的是,這隻是把Thread對象放入了一個隊列中而已,線程本身并未阻塞。
建立節點,嘗試将節點追加到隊列尾部。擷取tail節點,将tail節點的next設定為目前節點。
如果tail不存在,就初始化隊列。
在addWaiter(…)方法把Thread對象加入阻塞隊列之後的工作就要靠acquireQueued(…)方法完成。
線程一旦進入acquireQueued(…)就會被無限期阻塞,即使有其他線程調用interrupt()方法也不能将其喚醒,除非有其他線程釋放了鎖,并且該線程拿到了鎖,才會從accquireQueued(…)傳回。
進入acquireQueued(…),該線程被阻塞。在該方法傳回的一刻,就是拿到鎖的那一刻,也就是被喚醒的那一刻,此時會删除隊列的第一個元素(head指針前移1個節點)。
首先,acquireQueued(…)方法有一個傳回值,表示什麼意思呢?雖然該方法不會中斷響應,但它會記錄被阻塞期間有沒有其他線程向它發送過中斷信号。如果有,則該方法會傳回true;否則,傳回false。
基于這個傳回值,才有了下面的代碼:
當 acquireQueued(…)傳回 true 時,會調用 selfInterrupt(),自己給自己發送中斷信号,也就是自己把自己的中斷标志位設為true。之是以要這麼做,是因為自己在阻塞期間,收到其他線程中斷信号沒有及時響應,現在要進行補償。這樣一來,如果該線程在lock代碼塊内部有調用sleep()之類的阻塞方法,就可以抛出異常,響應該中斷信号。
阻塞就發生在下面這個方法中:
線程調用 park()方法,自己把自己阻塞起來,直到被其他線程喚醒,該方法傳回。
park()方法傳回有兩種情況。
- 其他線程調用了unpark(Thread t)。
- 其他線程調用了t.interrupt()。這裡要注意的是,lock()不能響應中斷,但LockSupport.park()會響應中斷。
也正因為LockSupport.park()可能被中斷喚醒,acquireQueued(…)方法才寫了一個for死循環。喚醒之後,如果發現自己排在隊列頭部,就去拿鎖;如果拿不到鎖,則再次自己阻塞自己。不斷重複此過程,直到拿到鎖。
被喚醒之後,通過Thread.interrupted()來判斷是否被中斷喚醒。如果是情況1,會傳回false;如果是情況2,則傳回true。
1.7 unlock()實作分析
說完了lock,下面分析unlock的實作。unlock不區分公平還是非公平。
上圖中,目前線程要釋放鎖,先調用tryRelease(arg)方法,如果傳回true,則取出head,讓head擷取鎖。
對于tryRelease方法:
首先計算目前線程釋放鎖後的state值。
如果目前線程不是排他線程,則抛異常,因為隻有擷取鎖的線程才可以進行釋放鎖的操作。
此時設定state,沒有使用CAS,因為是單線程操作。
再看unparkSuccessor方法:
release()裡面做了兩件事:tryRelease(…)方法釋放鎖;unparkSuccessor(…)方法喚醒隊列中的後繼者。
1.8 lockInterruptibly()實作分析
上面的 lock 不能被中斷,這裡的 lockInterruptibly()可以被中斷:
這裡的 acquireInterruptibly(…)也是 AQS 的模闆方法,裡面的 tryAcquire(…)分别被 FairSync和NonfairSync實作。
主要看doAcquireInterruptibly(…)方法:
當parkAndCheckInterrupt()傳回true的時候,說明有其他線程發送中斷信号,直接抛出InterruptedException,跳出for循環,整個方法傳回。
1.9 tryLock()實作分析
tryLock()實作基于調用非公平鎖的tryAcquire(…),對state進行CAS操作,如果操作成功就拿到鎖;如果操作不成功則直接傳回false,也不阻塞。
二、公平鎖、非公平鎖、響應中斷
- ReentrantLock主要利用CAS+AQS隊列來實作。它支援公平鎖和非公平鎖,兩者的實作類似。
- ReentrantLock是獨占鎖,加鎖和解鎖的過程需要手動進行,不易操作,但非常靈活。
- ReentrantLock可重入,加鎖和解鎖需要手動進行,且次數需一樣,否則其他線程無法獲得鎖。
- ReentrantLock可以相應中斷。
- ReentrantLock還可以實作公平鎖機制。在鎖上等待時間最長的線程将獲得鎖的使用權。通俗的了解就是誰排隊時間最長誰先執行擷取鎖。
2.1 鎖的基本使用
分别lock和unlock
public class BaseDemo {
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(()->run(),"線程1").start();
new Thread(()->run(),"線程2").start();
}
public static void run(){
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+"擷取鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName()+"鎖釋放");
lock.unlock();
}
}
}
2.2 公平鎖使用
public class FairSyncDemo {
/**
* 參數傳遞true,表示使用公平鎖
*/
private static final Lock lock = new ReentrantLock(true);
public static void main(String[] args) {
new Thread(() -> run(), "線程1").start();
new Thread(() -> run(), "線程2").start();
new Thread(() -> run(), "線程3").start();
new Thread(() -> run(), "線程4").start();
}
public static void run() {
for (int i=0 ; i<2 ; i++) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "擷取鎖");
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
//System.out.println(Thread.currentThread().getName() + "鎖釋放");
lock.unlock();
}
System.out.println("-----------------");
}
}
}
2.3 非公平鎖使用
public class NoFairSyncDemo {
/**
* 參數傳遞false,表示使用公平鎖,預設值就是false
*/
private static final Lock lock = new ReentrantLock(false);
public static void main(String[] args) {
new Thread(() -> run(), "線程1").start();
new Thread(() -> run(), "線程2").start();
new Thread(() -> run(), "線程3").start();
new Thread(() -> run(), "線程4").start();
}
public static void run() {
for (int i=0 ; i<2 ; i++) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "擷取鎖");
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
} finally {
//System.out.println(Thread.currentThread().getName() + "鎖釋放");
lock.unlock();
}
System.out.println("-----------------");
}
}
}
2.4 響應中斷
/**
* 響應中斷就是一個線程擷取不到鎖,不會傻傻的一直等下去,
* ReentrantLock會給予一個中斷回應
*/
public class InterruptDemo {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(new MyThread(lock1,lock2));
Thread thread2 = new Thread(new MyThread(lock1,lock2));
thread1.start();
thread2.start();
thread1.interrupt();//第一個線程中斷
}
static class MyThread implements Runnable{
Lock innerLock1;
Lock innerLock2;
public MyThread(Lock innerLock1, Lock innerLock2) {
this.innerLock1 = innerLock1;
this.innerLock2 = innerLock2;
}
@Override
public void run() {
try {
innerLock1.lockInterruptibly();
TimeUnit.MICROSECONDS.sleep(55);
innerLock2.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
innerLock1.unlock();
innerLock2.unlock();
System.out.println(Thread.currentThread().getName()+"線程結束!");
}
}
}
}
定義了兩個鎖lock1和lock2
然後使用兩個線程thread1和thread2構造死鎖場景
正常情況下,這兩個線程互相等待擷取資源而處于死循環狀态。但是我們此時innerLock1中斷,另外一個線程就可以擷取資源,正常地執行了
2.5 限時等待
通過tryLock方法來實作,可以選擇傳入時間參數,表示等待指定的時間,無參則表示立即傳回鎖申請的結果:true表示擷取鎖成功,false表示擷取鎖失敗。我們可以将這種方法用來解決死鎖問題。
public class InterruptWaitTimeDemo {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
Thread thread2 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
thread1.start();
thread2.start();
thread1.interrupt();//第一個線程中斷
}
static class MyThread implements Runnable{
Lock innerLock1;
Lock innerLock2;
public MyThread(Lock innerLock1, Lock innerLock2) {
this.innerLock1 = innerLock1;
this.innerLock2 = innerLock2;
}
@Override
public void run() {
try {
if(!innerLock1.tryLock()){
TimeUnit.MILLISECONDS.sleep(150);
}
if(!innerLock2.tryLock()){
TimeUnit.MILLISECONDS.sleep(150);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
innerLock1.unlock();
innerLock2.unlock();
System.out.println(Thread.currentThread().getName()+"線程結束!");
}
}
}
}