天天看點

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

JUC:ReentrantLock

關鍵詞

  • 互斥鎖:ReentrantLock
  • 公平鎖和非公平鎖:ReentrantLock(CAS+AQS隊列)
  • 可重入鎖:ReentrantLock(state變量+CAS操作)
  • 響應中斷(一個線程擷取不到鎖,不會一直等下去)(tryLock方法,傳入時間參數,表示等待指定的時間)

一、概述

Concurrent 包中和互斥鎖(ReentrantLock)相關類之間的繼承層次

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

Lock是一個接口,其定義如下:

public interface Lock {
  void lock();  //不能被中斷
  void lockInterruptibly() throws InterruptedException;  //可以被中斷
  boolean tryLock();
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  void unlock();
  Condition newCondition();
}
           

常用的方法是lock()/unlock()。lock()不能被中斷,對應的lockInterruptibly()可以被中斷。

1.2 ReentrantLock(可重入鎖)本身沒有代碼邏輯,實作都在其内部類Sync中:

public class ReentrantLock implements Lock, java.io.Serializable {
  private final Sync sync;
 
  public void lock() {
    sync.acquire(1);
 }
 
  public void unlock() {
    sync.release(1);
 }
  // ...
}
           

1.3 鎖的公平性vs.非公平性

Sync是一個抽象類,它有兩個子類FairSync與NonfairSync,分别對應公平鎖和非公平鎖。從下面的ReentrantLock構造方法可以看出,會傳入一個布爾類型的變量fair指定鎖是公平的還是非公平的,預設為非公平的。

public ReentrantLock() {
	sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
}
           

什麼叫公平鎖和非公平鎖呢?先舉個現實生活中的例子,一個人去火車站售票視窗買票,發現現場有人排隊,于是他排在隊伍末尾,遵循先到者優先服務的規則,這叫公平;如果他去了不排隊,直接沖到視窗買票,這叫作不公平。

對應到鎖的例子,一個新的線程來了之後,看到有很多線程在排隊,自己排到隊伍末尾,這叫公平;線程來了之後直接去搶鎖,這叫作不公平。預設設定的是非公平鎖,其實是為了提高效率,減少線程切換。

1.4 鎖實作的基本原理

Sync的父類AbstractQueuedSynchronizer經常被稱作隊列同步器(AQS),這個類非常重要,該類的父類是AbstractOwnableSynchronizer。

此處的鎖具備synchronized功能,即可以阻塞一個線程。為了實作一把具有阻塞或喚醒功能的鎖,需要幾個核心要素:

  1. 需要一個state變量,标記該鎖的狀态。state變量至少有兩個值:0、1。對state變量的操作,使用CAS保證線程安全。
  2. 需要記錄目前是哪個線程持有鎖。
  3. 需要底層支援對一個線程進行阻塞或喚醒操作。
  4. 需要有一個隊列維護所有阻塞的線程。這個隊列也必須是線程安全的無鎖隊列,也需要使用CAS。

針對要素1和2,在上面兩個類中有對應的展現:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
  // ...
  private transient Thread exclusiveOwnerThread;  // 記錄持有鎖的線程
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
 
  private volatile int state; // 記錄鎖的狀态,通過CAS修改state的值。
  // ...
 
}
           

state取值不僅可以是0、1,還可以大于1,就是為了支援鎖的可重入性。例如,同樣一個線程,調用5次lock,state會變成5;然後調用5次unlock,state減為0。

  • 當state=0時,沒有線程持有鎖,exclusiveOwnerThread=null;
  • 當state=1時,有一個線程持有鎖,exclusiveOwnerThread=該線程;
  • 當state > 1時,說明該線程重入了該鎖

對于要素3,Unsafe類提供了阻塞或喚醒線程的一對操作原語,也就是park/unpark。

public native void unpark(Object thread);

public native void park(boolean isAbsolute, long time);
           

有一個LockSupport的工具類,對這一對原語做了簡單封裝:

public class LockSupport {
  // ...
 
  private static final Unsafe U = Unsafe.getUnsafe();
 
  public static void park() {
    U.park(false, 0L);
 }
 
  public static void unpark(Thread thread) {
    if (thread != null)
      U.unpark(thread);
 }
}
           

在目前線程中調用park(),該線程就會被阻塞;在另外一個線程中,調用unpark(Thread thread),傳入一個被阻塞的線程,就可以喚醒阻塞在park()地方的線程。

  • unpark(Thread thread),它實作了一個線程對另外一個線程的“精準喚醒”。
  • notify也隻是喚醒某一個線程,但無法指定具體喚醒哪個線程。

針對要素4,在AQS(AbstractQueuedSynchronizer)中利用雙向連結清單和CAS實作了一個阻塞隊列。如下所示:

public abstract class AbstractQueuedSynchronizer {
  // ...
  static final class Node {
    volatile Thread thread; // 每個Node對應一個被阻塞的線程
    volatile Node prev;
    volatile Node next;
    // ...
 }
 
  private transient volatile Node head;
  private transient volatile Node tail;
  // ...
}
           

阻塞隊列是整個AQS核心中的核心。如下圖所示,head指向雙向連結清單頭部,tail指向雙向連結清單尾部。

  • 入隊就是把新的Node加到tail後面,然後對tail進行CAS操作;
  • 出隊就是對head進行CAS操作,把head向後移一個位置。
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

初始的時候,head=tail=NULL;然後,在往隊列中加入阻塞的線程時,會建立一個空的Node,讓head和tail都指向這個空Node;之後,在後面加入被阻塞的線程對象。是以,當head=tail的時候,說明隊列為空。

1.5 公平與非公平的lock()實作差異

下面分析基于AQS,ReentrantLock在公平性和非公平性上的實作差異。

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

非公平鎖:直接使用目前線程擷取鎖,不排隊

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

公平鎖:判斷目前隊列中沒有等待線程,後使用目前線程擷取鎖,需要排隊

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

1.6 阻塞隊列與喚醒機制

下面進入鎖的最為關鍵的部分,即acquireQueued(…)方法内部一探究竟。

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

先說addWaiter(…)方法,就是為目前線程生成一個Node,然後把Node 放入雙向連結清單的尾部 。要注意的是,這隻是把Thread對象放入了一個隊列中而已,線程本身并未阻塞。

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

建立節點,嘗試将節點追加到隊列尾部。擷取tail節點,将tail節點的next設定為目前節點。

如果tail不存在,就初始化隊列。

在addWaiter(…)方法把Thread對象加入阻塞隊列之後的工作就要靠acquireQueued(…)方法完成。

線程一旦進入acquireQueued(…)就會被無限期阻塞,即使有其他線程調用interrupt()方法也不能将其喚醒,除非有其他線程釋放了鎖,并且該線程拿到了鎖,才會從accquireQueued(…)傳回。

進入acquireQueued(…),該線程被阻塞。在該方法傳回的一刻,就是拿到鎖的那一刻,也就是被喚醒的那一刻,此時會删除隊列的第一個元素(head指針前移1個節點)。

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

首先,acquireQueued(…)方法有一個傳回值,表示什麼意思呢?雖然該方法不會中斷響應,但它會記錄被阻塞期間有沒有其他線程向它發送過中斷信号。如果有,則該方法會傳回true;否則,傳回false。

基于這個傳回值,才有了下面的代碼:

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

當 acquireQueued(…)傳回 true 時,會調用 selfInterrupt(),自己給自己發送中斷信号,也就是自己把自己的中斷标志位設為true。之是以要這麼做,是因為自己在阻塞期間,收到其他線程中斷信号沒有及時響應,現在要進行補償。這樣一來,如果該線程在lock代碼塊内部有調用sleep()之類的阻塞方法,就可以抛出異常,響應該中斷信号。

阻塞就發生在下面這個方法中:

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

線程調用 park()方法,自己把自己阻塞起來,直到被其他線程喚醒,該方法傳回。

park()方法傳回有兩種情況。

  1. 其他線程調用了unpark(Thread t)。
  2. 其他線程調用了t.interrupt()。這裡要注意的是,lock()不能響應中斷,但LockSupport.park()會響應中斷。

也正因為LockSupport.park()可能被中斷喚醒,acquireQueued(…)方法才寫了一個for死循環。喚醒之後,如果發現自己排在隊列頭部,就去拿鎖;如果拿不到鎖,則再次自己阻塞自己。不斷重複此過程,直到拿到鎖。

被喚醒之後,通過Thread.interrupted()來判斷是否被中斷喚醒。如果是情況1,會傳回false;如果是情況2,則傳回true。

1.7 unlock()實作分析

說完了lock,下面分析unlock的實作。unlock不區分公平還是非公平。

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

上圖中,目前線程要釋放鎖,先調用tryRelease(arg)方法,如果傳回true,則取出head,讓head擷取鎖。

對于tryRelease方法:

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

首先計算目前線程釋放鎖後的state值。

如果目前線程不是排他線程,則抛異常,因為隻有擷取鎖的線程才可以進行釋放鎖的操作。

此時設定state,沒有使用CAS,因為是單線程操作。

再看unparkSuccessor方法:

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

release()裡面做了兩件事:tryRelease(…)方法釋放鎖;unparkSuccessor(…)方法喚醒隊列中的後繼者。

1.8 lockInterruptibly()實作分析

上面的 lock 不能被中斷,這裡的 lockInterruptibly()可以被中斷:

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

這裡的 acquireInterruptibly(…)也是 AQS 的模闆方法,裡面的 tryAcquire(…)分别被 FairSync和NonfairSync實作。

主要看doAcquireInterruptibly(…)方法:

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

當parkAndCheckInterrupt()傳回true的時候,說明有其他線程發送中斷信号,直接抛出InterruptedException,跳出for循環,整個方法傳回。

1.9 tryLock()實作分析

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

tryLock()實作基于調用非公平鎖的tryAcquire(…),對state進行CAS操作,如果操作成功就拿到鎖;如果操作不成功則直接傳回false,也不阻塞。

二、公平鎖、非公平鎖、響應中斷

  • ReentrantLock主要利用CAS+AQS隊列來實作。它支援公平鎖和非公平鎖,兩者的實作類似。
  • ReentrantLock是獨占鎖,加鎖和解鎖的過程需要手動進行,不易操作,但非常靈活。
  • ReentrantLock可重入,加鎖和解鎖需要手動進行,且次數需一樣,否則其他線程無法獲得鎖。
  • ReentrantLock可以相應中斷。
  • ReentrantLock還可以實作公平鎖機制。在鎖上等待時間最長的線程将獲得鎖的使用權。通俗的了解就是誰排隊時間最長誰先執行擷取鎖。

2.1 鎖的基本使用

分别lock和unlock

public class BaseDemo {

    private static final Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        new Thread(()->run(),"線程1").start();
        new Thread(()->run(),"線程2").start();
    }

    public static void run(){
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName()+"擷取鎖");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName()+"鎖釋放");
            lock.unlock();
        }
    }
}
           
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

2.2 公平鎖使用

public class FairSyncDemo {

    /**
     * 參數傳遞true,表示使用公平鎖
     */
    private static final Lock lock = new ReentrantLock(true);

    public static void main(String[] args) {
        new Thread(() -> run(), "線程1").start();
        new Thread(() -> run(), "線程2").start();
        new Thread(() -> run(), "線程3").start();
        new Thread(() -> run(), "線程4").start();
    }


    public static void run() {
        for (int i=0 ; i<2 ; i++) {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + "擷取鎖");
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //System.out.println(Thread.currentThread().getName() + "鎖釋放");
                lock.unlock();
            }
            System.out.println("-----------------");
        }
    }
}
           
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

2.3 非公平鎖使用

public class NoFairSyncDemo {

    /**
     * 參數傳遞false,表示使用公平鎖,預設值就是false
     */
    private static final Lock lock = new ReentrantLock(false);

    public static void main(String[] args) {
        new Thread(() -> run(), "線程1").start();
        new Thread(() -> run(), "線程2").start();
        new Thread(() -> run(), "線程3").start();
        new Thread(() -> run(), "線程4").start();
    }

    public static void run() {
        for (int i=0 ; i<2 ; i++) {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + "擷取鎖");
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //System.out.println(Thread.currentThread().getName() + "鎖釋放");
                lock.unlock();
            }
            System.out.println("-----------------");
        }
    }

}
           
JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

2.4 響應中斷

/**
 * 響應中斷就是一個線程擷取不到鎖,不會傻傻的一直等下去,
 * ReentrantLock會給予一個中斷回應
 */
public class InterruptDemo {

    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        Thread thread1 = new Thread(new MyThread(lock1,lock2));
        Thread thread2 = new Thread(new MyThread(lock1,lock2));
        thread1.start();
        thread2.start();
        thread1.interrupt();//第一個線程中斷
    }

    static class MyThread implements Runnable{
        Lock innerLock1;
        Lock innerLock2;

        public MyThread(Lock innerLock1, Lock innerLock2) {
            this.innerLock1 = innerLock1;
            this.innerLock2 = innerLock2;
        }

        @Override
        public void run() {

            try {
                innerLock1.lockInterruptibly();
                TimeUnit.MICROSECONDS.sleep(55);
                innerLock2.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                innerLock1.unlock();
                innerLock2.unlock();
                System.out.println(Thread.currentThread().getName()+"線程結束!");
            }
        }

    }

}
           

定義了兩個鎖lock1和lock2

然後使用兩個線程thread1和thread2構造死鎖場景

正常情況下,這兩個線程互相等待擷取資源而處于死循環狀态。但是我們此時innerLock1中斷,另外一個線程就可以擷取資源,正常地執行了

JUC:ReentrantLock互斥鎖關鍵詞一、概述二、公平鎖、非公平鎖、響應中斷

2.5 限時等待

通過tryLock方法來實作,可以選擇傳入時間參數,表示等待指定的時間,無參則表示立即傳回鎖申請的結果:true表示擷取鎖成功,false表示擷取鎖失敗。我們可以将這種方法用來解決死鎖問題。

public class InterruptWaitTimeDemo {

    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        Thread thread1 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
        Thread thread2 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
        thread1.start();
        thread2.start();
        thread1.interrupt();//第一個線程中斷
    }

    static class MyThread implements Runnable{
        Lock innerLock1;
        Lock innerLock2;

        public MyThread(Lock innerLock1, Lock innerLock2) {
            this.innerLock1 = innerLock1;
            this.innerLock2 = innerLock2;
        }

        @Override
        public void run() {

            try {
                if(!innerLock1.tryLock()){
                    TimeUnit.MILLISECONDS.sleep(150);
                }
                if(!innerLock2.tryLock()){
                    TimeUnit.MILLISECONDS.sleep(150);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                innerLock1.unlock();
                innerLock2.unlock();
                System.out.println(Thread.currentThread().getName()+"線程結束!");
            }
        }

    }

}
           

繼續閱讀