天天看點

并發程式設計系列——4Reentrantlock核心原理分析

CSDN位址:https://blog.csdn.net/Eclipse_2019/article/details/127230357

學習目标

  1. 鎖的類别有哪些
  2. reentrantlock與synchronized的差別
  3. 設計一把鎖要考慮啥
  4. 如何使用reentrantlock
  5. reentrantlock的lock流程
  6. reentrantlock的unlock流程
  7. AQS的了解

第1章 鎖分類

1、樂觀鎖和悲觀鎖

  • 樂觀鎖:采用樂觀的思想處理資料,在每次讀取資料時都認為别人不會修改該資料,是以不會上鎖,但在更新時會判斷在此期間别人有沒有更新該資料,通常采用在寫時先讀出目前版本号然後加鎖的方法。具體過程為:比較目前版本号與上一次的版本号,如果版本号一緻,則更新,如果版本号不一緻,則重複進行讀、比較、寫操作。(CAS)
  • 悲觀鎖:采用悲觀思想處理資料,在每次讀取資料時都認為别人會修改資料,是以每次在讀寫資料時都會上鎖,這樣别人想讀寫這個資料時就會阻塞、等待直到拿到鎖。(Synchronized)

2、公平鎖和非公平鎖

  • 公平鎖:指在配置設定鎖前檢查是否有線程在排隊等待擷取該鎖,優先将鎖配置設定給排隊時間最長的線程。
  • 非公平鎖:指在配置設定鎖時不考慮線程排隊等待的情況,直接嘗試擷取鎖,在擷取不到鎖時再排到隊尾等待。

3、共享鎖和獨占鎖

  • 獨占鎖:也叫互斥鎖,每次隻允許一個線程持有該鎖,ReentrantLock為獨占鎖的實作。
  • 共享鎖:允許多個線程同時擷取該鎖,并發通路共享資源。ReentrantReadWriteLock中的讀鎖為共享鎖的實作。

4、重量級鎖和輕量級鎖

5、可重入鎖和不可重入鎖

  • 可重入鎖:也叫作遞歸鎖,指在同一線程中,在外層函數擷取到該鎖之後,内層的遞歸函數仍然可以繼續擷取該鎖。在Java環境下,ReentrantLock和synchronized都是可重入鎖。
  • 不可重入鎖:每次都要重新搶占鎖資源。StampedLock是jdk1.8新增的重入鎖

6、自旋鎖:說白了就是一個循環

7、分段鎖:說白了就是對每個區間進行上鎖

8、如何進行鎖優化

  • 減少鎖持有的時間 減少鎖持有的時間指隻在有線程安全要求的程式上加鎖來盡量減少同步代碼塊對鎖的持有時間。
  • 減小鎖粒度 減小鎖粒度指将單個耗時較多的鎖操作拆分為多個耗時較少的鎖操作來增加鎖的并行度,減少同一個鎖上的競争。在減少鎖的競争後,偏向鎖、輕量級鎖的使用率才會提高。減小鎖粒度最典型的案例就是ConcurrentHashMap中的分段鎖。
  • 鎖分離 鎖分離指根據不同的應用場景将鎖的功能進行分離,以應對不同的變化,最常見的鎖分離思想就是讀寫鎖(ReadWriteLock),它根據鎖的功能将鎖分離成讀鎖和寫鎖,這樣讀讀不互斥,讀寫互斥,寫寫互斥,既保證了線程的安全性,又提高了性能。 操作分離思想可以進一步延伸為隻要操作互不影響,就可以進一步拆分,比如LinkedBlockingQueue從頭部取出資料,并從尾部加入資料。
  • 鎖粗化 鎖粗化指為了保障性能,會要求盡可能将鎖的操作細化以減少線程持有鎖的時間,但是如果鎖分得太細,将會導緻系統頻繁擷取鎖和釋放鎖,反而影響性能的提升。在這種情況下,建議将關聯性強的鎖操作集中起來處理,以提高系統整體的效率。
  • 鎖消除 在開發中經常會出現在不需要使用鎖的情況下誤用了鎖操作而引起性能下降,這多數是因為程式編碼不規範引起的。這時,我們需要檢查并消除這些不必要的鎖來提高系統的性能。

第2章 reentrantlock與synchronized的差別

1、底層實作層面

synchronized 是JVM層面的鎖,是Java關鍵字

reentrantlock是JUC下面的一個類,是java實作的

2、是否可手動釋放

synchronized不需要手動釋放,reentrantlock需要手動釋放

3、是否可以中斷

synchronized不可中斷,reentrantlock可以

4、效率層面

reentrantlock效率比synchronized要高

5、鎖的層面

reentrantlock的對象就是鎖本身,synchronized鎖的是别的對象;

reentrantlock可以

6、公平和非公平層面

reentrantlock可以是公平也可以是非公平,通過構造函數指定,synchronized是非公平

第3章 如何設計一把鎖

先來看看我這代碼裡面實作了一把鎖

public class RepeatLock {
    boolean isLocked = false;
    Thread lockedBy = null;
    int lockedCount = 0;
    public synchronized void lock()
            throws InterruptedException{
        Thread thread = Thread.currentThread();
        while(isLocked && lockedBy != thread){
            wait();
        }
        isLocked = true;
        lockedCount++;
        lockedBy = thread;
    }
    public synchronized void unlock(){
        if(Thread.currentThread() == this.lockedBy){
            lockedCount--;
            if(lockedCount == 0){
                isLocked = false;
                notify();
            }
        }
    }
}
 
public class Count {
    // 不可重入鎖
    UnRepeatLock lock = new UnRepeatLock();
    public void print() {
        System.out.println("擷取lock");
        lock.lock();
        System.out.println("調用doAdd之前");
        doAdd();
        System.out.println("調用doAdd之後");
        lock.unlock();
        System.out.println("釋放lock");
    }
    public void doAdd() {
        System.out.println("再次擷取lock");
        lock.lock();
        //do something
        lock.unlock();
        System.out.println("再次釋放lock");
    }
    public static void main(String[] args) {
        Count cc = new Count();
        new Thread(()->{cc.print();}).start();
    }
}           

實作鎖的互斥特性

  • 鎖的互斥特性->要有共享資源->标記(0無鎖,1有鎖)
  • 沒有搶占到鎖的線程?(等待->喚醒)
  • 等待的線程怎麼存儲?->資料結構去存儲一系列等待中的線程,FIFO
  • 公平和非公平(能否插隊)
  • 重入的特性(識别是否是同一個線程)

第4章 源碼解讀

本文隻介紹非公平鎖的實作
public class AtomicDemo {
    private static int count = 0;
    static Lock lock = new ReentrantLock();
    public static void inc() {
        lock.lock();
        try {
            Thread.sleep(1);
            count++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
 
    public static void dec() {
        lock.lock();
        try {
            Thread.sleep(1);
            count--;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<1000;i++){
            new Thread(()->{AtomicDemo.inc();}).start();
        }
        Thread.sleep(4000);
        System.out.println(count);
    }
}           
并發程式設計系列——4Reentrantlock核心原理分析

4.1 lock

final void lock() {
    //搶占互斥資源
    //if(state==0){
        //然後再去修改狀态
    //}
    //上面這種方式肯定不行,因為可能同時有多個線程進入到if語句,這就意味着多個線程搶占到同一把鎖
    //不管目前隊列是否有人排隊,所有鎖都可以去搶
    if (compareAndSetState(0, 1))//通過CAS去搶(樂觀鎖)
        //儲存目前線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}           

1、線程A嘗試去拿到鎖,這個時候state=0,能夠拿到鎖,把state改為1,調用setExclusiveOwnerThread方法,指派exclusiveOwnerThread=線程A

2、線程B進來,嘗試去拿鎖,拿不到鎖,進入acquire(1)方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}           

主要有3個方法,

  • tryAcquire
  • addWaiter
  • acquireQueued

我們先看第一個tryAcquire(arg),最終進入非公平鎖實作類

protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
  }           
final boolean nonfairTryAcquire(int acquires) {
    //得到目前線程
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果是0,說明線程A已經釋放鎖,現在我們假如的是A還沒有釋放,是以不為0,如果已經執行完,,按照剛才的邏輯把state改為1,調用setExclusiveOwnerThread		方法指派exclusiveOwnerThread=線程B
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果目前線程是已經儲存的exclusiveOwnerThread,表明同一個線程重入,隻需要加重入次數
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);//這裡不需要CAS是因為目前線程已經獲得鎖了
        //為什麼要統計累加次數,是為了比對lock和unlock成對出現
        return true;
    }
    return false;
}           

按照我們的假設,線程B都不滿足,nonfairTryAcquire傳回false,繼續執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,我們先看addWaiter

//mode傳入參數為null
private Node addWaiter(Node mode) {
    //node的構造函數,指派node的nextWaiter與thread,nextWaiter=null thread=ThreadB  this.nextWaiter = mode;this.thread = thread;
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //tail預設為null,連結清單的尾結點
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}           

pred=tail為null ,進入enq方法

private Node enq(final Node node) {
    //跟while(1)的差別,while從反彙編來看,代碼更長,時間更久,沒啥差別
    for (;;) {
        //tail為null
        Node t = tail;  
        if (t == null) { // Must initialize
            //嘗試設定head=new node
            if (compareAndSetHead(new Node()))
                //将tail設定為head, tail  head都為new    Node()
                //再進入循環,t=new Node().進入else
                tail = head;
        } else {
            //t=new node  ,node為剛才ThreadB的node
            node.prev = t;
            //嘗試把ThreadB的node指派給tail
            if (compareAndSetTail(t, node)) {
                //指派成功,headnode的後節點指向B的node,tail=B線程節點
                t.next = node;
                //傳回head節點,跳出方法
                return t;
            }
        }
    }
}           

到這,形成了一個連結清單node,并且傳回tail節點

并發程式設計系列——4Reentrantlock核心原理分析

繼續調用

//node為ThreadB的node,也就是tail節點,arg=1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //拿到ThreadB Node的前指針節點
            final Node p = node.predecessor();
            //如果P==head 滿足,并且嘗試去拿鎖,A在占着,拿不到鎖,走下一個if
            if (p == head && tryAcquire(arg)) {
                //将Node設定為head節點,并且前後指針空指向,變成下圖
                setHead(node);
                //把之前的head的對象的後節點執行為空,讓它進行GC回收
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //A沒有釋放,進入該邏輯,shouldParkAfterFailedAcquire為false,不執行,進入下次循環
            //B還是拿不到鎖,進入shouldParkAfterFailedAcquire
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}           

進入shouldParkAfterFailedAcquire方法,第一次進入把head節點的waitStatus改成SIGNAL,第二次進入傳回true

//pred為ThreadB的Node的前節點,即head節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // pred.waitStatus預設為0,進入else流程
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //把head的waitStatus從0改成SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}           

第二次進入shouldParkAfterFailedAcquire後傳回true,調用parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {    
    LockSupport.park(this); //挂機目前線程    
    return Thread.interrupted();
}           

同理,假如A沒執行完,B挂起,再來一個線程C,最後會得到B、C都挂起

并發程式設計系列——4Reentrantlock核心原理分析

4.2 unlock

//參數為1
public final boolean release(int arg) {    	
    //釋放鎖成功        
    if (tryRelease(arg)) {            
        Node h = head;            
        if (h != null && h.waitStatus != 0)                
            unparkSuccessor(h);            
        return true;        
    }        
    return false;    
}           

//嘗試去釋放鎖

//releases傳入為1
protected final boolean tryRelease(int releases) {    
    //這一步state為1,因為之前A占着,得到c=0    
    int c = getState() - releases;    
    //判斷是不是目前鎖是A占着    
    if (Thread.currentThread() != getExclusiveOwnerThread())        
        throw new IllegalMonitorStateException();    
    boolean free = false;    
    if (c == 0) {        
        free = true;		
        //釋放鎖,将exclusiveOwnerThread設定為null        
        setExclusiveOwnerThread(null);    
    }    
    //将state設定為0    
    setState(c);    
    return free;
}           

釋放鎖成功,之下如下邏輯

Node h = head;   
//拿到head的node節點//首節點不為空,并且waitStatus為SIGNAL -1
if (h != null && h.waitStatus != 0)    
    //進入該邏輯    
    unparkSuccessor(h);
return true;           
//為一個new的node首節點
private void unparkSuccessor(Node node) {    
    //ws為-1    
    int ws = node.waitStatus;    
    if (ws < 0)        
        //進入該邏輯,将head節點的waitStatus更改為0        
        compareAndSetWaitStatus(node, ws, 0);        
    
    //找到B線程的node節點    
    Node s = node.next;    
    //B線程的node節點不為空,waitStatus為-1不進入    
    if (s == null || s.waitStatus > 0) {        
        s = null;        
        //如果B節點失效,或者已經取消或者并發還沒有指派,從後往前找,找到第一個需要執行的node,t==null或者t==目前head節點,才跳出        
        //為什麼?enq方法裡面,if沒有鎖 線程A通過CAS進入if語句塊之後,發生上下文切換,此時線程B同樣執行了該方法,并且執行完畢。然後線程C調用了unparkSuccessor方法。        
        //假如是從頭到尾的周遊形式,線程A的next指針此時還是null!也就是說,會出現後續節點被漏掉的情況。        
        for (Node t = tail; t != null && t != node; t = t.prev)            
            if (t.waitStatus <= 0)                
                s = t;    
    }    
    //得到我需要要unpark的線程,這裡為ThreadB    
    if (s != null)        
        //解鎖threadB        
        LockSupport.unpark(s.thread);
}           

當A線程unlock釋放鎖後,線程B的線程被喚醒,從park的時候繼續開始執行,阻塞在acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {    
    boolean failed = true;    
    try {        
        boolean interrupted = false;        
        for (;;) {            
            //threadA解鎖後,繼續執行,拿到head節點            
            final Node p = node.predecessor();            
            //p==head節點,并且threadB去嘗試加鎖,這個時候A已經釋放鎖,進入該邏輯            
            if (p == head && tryAcquire(arg)) {                
                setHead(node); 
                //将B節點設定為head節點,裡面邏輯同時把node的前指針以及thread設為null                
                p.next = null; // help GC  //将p的next執行去除                
                failed = false;                
                return interrupted;            
            }            
            if (shouldParkAfterFailedAcquire(p, node) &&               
                parkAndCheckInterrupt())   
                //當線程A解鎖後,ThreaB從這裡開始運作,繼續for循環                
                interrupted = true;        
        }    
    } finally {        
        if (failed)            
            cancelAcquire(node);    
    }
}           

繼續執行for循環,進入第一個if後,連結清單的節點變成如下

并發程式設計系列——4Reentrantlock核心原理分析

第5章 相關問題

5.1 為什麼是非公平鎖

因為我任意一個新的線程都會在之前加鎖的線程釋放鎖的時候搶占到鎖,即在unparkSuccessor方法中,A釋放鎖,但是還沒有調用LockSupport.unpark(s.thread)方法的時候,有新的線程去調用了lock,并且加鎖成功,是以,非公平就展現出來了,我後來的新線程可以在我之前的線程搶先獲得鎖

那麼,公平鎖又是怎麼實作的,我們知道預設是非公平鎖,那麼我們看下怎麼做到公平

5.2 公平鎖怎麼實作

公平鎖,拿到鎖後,直接調用

public final void acquire(int arg) {    
    if (!tryAcquire(arg) &&        
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        
        selfInterrupt();
}           

然後調用tryAcquire方法

protected final boolean tryAcquire(int acquires) {    
    //得到目前線程    
    final Thread current = Thread.currentThread();    
    int c = getState();    
    //能搶占到鎖,假如A釋放了,我們新進來一個線程去搶占鎖    
    if (c == 0) {        
        if (!hasQueuedPredecessors() &&            
            compareAndSetState(0, acquires)) {            
            setExclusiveOwnerThread(current);            
            return true;        
        }    
    }    
    else if (current == getExclusiveOwnerThread()) {        
        int nextc = c + acquires;        
        if (nextc < 0)            
            throw new Error("Maximum lock count exceeded");        
        setState(nextc);        
        return true;    
    }    
    return false;
}           

我們發現公平鎖與非公平鎖多了一個hasQueuedPredecessors方法判斷,隻有當hasQueuedPredecessors傳回false的時候,我們才會去嘗試加鎖

public final boolean hasQueuedPredecessors() {    
    // The correctness of this depends on head being initialized    
    // before tail and on head.next being accurate if the current    
    // thread is first in queue.    
    Node t = tail; //首先讀取tail,為了避免tail不為空,head為空的情況,初始化邏輯    
    //if (compareAndSetHead(new Node()))    //tail = head;    
    Node h = head;    
    Node s;    
    return h != t &&        
        ((s = h.next) == null || s.thread != Thread.currentThread());
}           

我們來分析下,

h != t 就是我的隊列裡面不止一個node節點,就是有node節點在等待,那麼傳回true,接下來去判斷&&後面的邏輯,如何後面的邏輯也為true,就不讓搶占,但是如果後面的為false,就能搶占

  1. 如果h==t,有2種情況,第一種2個都為null,說明連結清單還沒有初始化,沒有待執行線程,第二種都不為空,但是相等 說明連結清單已經隻有一個線程在執行并且執行完了沒有情況,新來的線程可以去拿到

((s = h.next) == null || s.thread != Thread.currentThread()); 這裡因為第一個條件為true了,是以這個條件必須為false才能夠去搶占鎖,那麼s = h.next) == null和s.thread != Thread.currentThread()必須都為false

  1. (s = h.next) == null表示有一個線程已經在搶占了,但是還沒有完全傳入連結表,不可搶占
  2. s.thread != Thread.currentThread() 下一個要執行的node 的thread不是目前的線程,不可搶占CLH(Craig, Landin, and Hagersten locks):

以上就能夠保證,我要搶占鎖,必須按照順序來,FIFO,也就保證了線程的公平性

5.3 狀态

SIGNAL -1 必須後續需要去unpark

CANCELLED 1 取消狀态

CONDITION -2 等待條件狀态,在等待隊列中

PROPAGATE -3 狀态需要向後傳播

5.4 interrupted的作用

除了unpark外,還有interrupt可以中斷,該作用就是當interrupt中斷時,我能捕捉到,傳遞我的線程是否有被interrupt中斷過,如果有中斷過,并且拿到鎖,執行selfInterrupt,告訴外面我這個線程被interrupt中斷過

5.5 park和unpark

1)不會抛中斷異常

2)使用更加靈活(相對于wait和notify,它們必須在加鎖的代碼塊内),可以在任何地方用

3)可以指定鎖和解鎖哪個線程

下文預告

  1. HashMap的底層資料結構
  2. HashMap、CHM、HashTable三者差別
  3. HashMap的工作流程

繼續閱讀