天天看點

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

AQS源碼詳解

  好了,我們來開始今天的内容,首先我們來看下

AQS

是什麼,全稱是

AbstractQueuedSynchronizer

翻譯過來就是【抽象隊列同步】對吧。通過名字我們也能看出這是個抽象類

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

而且裡面定義了很多的方法

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  裡面這麼多方法,咱們當然不是一個個去翻。裡面還有很多的抽象方法,咱們還得找它的實作多麻煩對不對。是以我們換個方式來探索。

  我們先來看下這樣一個場景

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  在這裡我們有一個能被多個線程共享操作的資源,在這個場景中應該能看出我們的資料是不安全的,因為我們并不能保證我們的操作是原子操作對吧。基于這個場景我們通過代碼來看看效果

package com.example.demo;

public class AtomicDemo {

    // 共享變量
    private static int count = 0;

    // 操作共享變量的方法
    public static void incr(){
        // 為了示範效果  休眠一下子
        try {
            Thread.sleep(1);
            count ++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}
      

  通過執行發現,執行的結果是一個不确定的值,但總是會小于等于1000,至于原因,是因為incr() 方法不是一個原子操作。為什麼不是原子操作這個咱們今天就不深究此處了.

迎合今天的主題,我們通過Lock來解決

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicDemo {

    // 共享變量
    private static int count = 0;

    private static Lock lock = new ReentrantLock();

    // 操作共享變量的方法
    public static void incr(){
        // 為了示範效果  休眠一下子
        try {
            lock.lock();
            Thread.sleep(1);
            count ++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}      

  然後我們運作發現結果都是 1000了,這也就是1000個線程都去操作這個 count 變量,結果符合我們的預期了。那lock到底是怎麼實作的呢?

  我們先來分析分析

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

這樣的圖檔看着比較複雜,咱們簡化下。

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  我們自己假設下,如果要你去設計這樣的方法,你應該要怎麼設計,他們需要實作哪些功能,

  首先是lock方法,它是不是要滿足這幾個功能。

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

需求清楚了,那我們怎麼設計呢?

第一個互斥怎麼做,也就是多個線程隻有一個線程能搶占到資源,這個時候我們可以這樣設定

// 給一個共享資源
Int state = 0 ; // 0表示資源沒有被占用,可以搶占
if(state == 0 ){
   // 表示可以擷取鎖
}else{
   // 表示鎖被搶占 需要阻塞等待
}      
J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

然後就是沒有搶占到鎖的線程的存儲,我們可以通過一個隊列,利用FIFO來實作存儲。

最後就是線程的阻塞和喚醒。大家說說有哪些阻塞線程的方式呀?

  1. wait/notify: 不合适,不能喚醒指定的線程
  2. Sleep:休眠,類似于定時器
  3. Condition:可以喚醒特定線程
  4. LockSupport:

    LockSupport.park():阻塞目前線程

    LockSupport.unpark(Thread t):喚醒特定線程

結合今天的主題,我們選擇LockSupport來實作阻塞和喚醒。

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  好了,到這兒我們已經猜想到了Lock中的實作邏輯,但是在探究源碼之前我們還有個概念需要先和大家講下,因為這個是我們源碼中會接觸到的一個,先講了,看的時候就比較輕松了對吧。

  我們先來看看重入鎖的場景代碼

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AtomicDemo {

    // 共享變量
    private static int count = 0;

    private static Lock lock = new ReentrantLock();

    // 操作共享變量的方法
    public static void incr(){
        // 為了示範效果  休眠一下子
        try {
            lock.lock();
            Thread.sleep(1);
            count ++;
            // 調用了另外一個方法。
            decr();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void decr(){
        try {
            // 重入鎖
            lock.lock();
            count--;
        }catch(Exception e){

        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000 ; i++) {
            new Thread(()->AtomicDemo.incr()).start();
        }

        Thread.sleep(4000);
        System.out.println("result:" + count);
    }

}      

  首先大家考慮這段代碼會死鎖嗎? 大家給我個回複,我看看大家的了解的怎麼樣

好了,有說會死鎖的,有說不會,其實這兒是不會死鎖的,而且結果就是0.為什麼呢?

  這個其實是鎖的一個嵌套,因為這兩把鎖都是同一個 線程對象,我們講共享變量的設計是

  當state=0;線程可以搶占到資源 state =1; 如果進去嵌套通路 共享資源,這時 state = 2 如果有多個嵌套 state會一直累加,釋放資源的時候, state–,直到所有重入的鎖都釋放掉 state=0,那麼其他線程才能繼續搶占資源,說白了重入鎖的設計目的就是為了防止

死鎖

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  通過類圖我們可以發現右車的業務應用其實内在都有相識的設計,這裡我們隻需要搞清楚其中的一個,其他的你自己應該就可以看懂~,好了我們就具體結合前面的案例代碼,以ReentrantLock為例來介紹AQS的代碼實作。

  在看源碼之前先回顧下這個圖,帶着問題去看,會更輕松

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}      

這個方法邏輯比較簡單,if條件成立說明 搶占鎖成功并設定 目前線程為獨占鎖

else 表示搶占失敗,acquire(1) 方法我們後面具體介紹

compareAndSetState(0, 1):用到了CAS 是一個原子操作方法,底層是UnSafe.作用就是設定 共享操作的 state 由0到1. 如果state的值是0就修改為1

setExclusiveOwnerThread:代碼很簡單,進去看一眼即可

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
      
  1. tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回(這裡展現了非公平鎖,每個線程擷取鎖時會嘗試直接搶占加塞一次,而CLH隊列中可能還有别的線程在等待);
  2. addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
  3. acquireQueued()使線程阻塞在等待隊列中擷取資源,一直擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。

  當然這裡代碼的作用我是提前研究過的,對于大家肯定不是很清楚,我們繼續裡面去看,最後大家可以回到這兒再論證。

  再次嘗試搶占鎖

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}      
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
//再次嘗試搶占鎖
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
// 重入鎖的情況
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
// false 表示搶占失敗
    return false;
}      

  将阻塞的線程添加到雙向連結清單的結尾

private Node addWaiter(Node mode) {
    //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //嘗試快速方式直接放到隊尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失敗則通過enq入隊。
    enq(node);
    return node;
}      

private Node enq(final Node node) {
    //CAS"自旋",直到成功加入隊尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 隊列為空,建立一個空的标志結點作為head結點,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入隊尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}      

第一個if語句

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

else語句

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

線程3進來會執行如下代碼

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

那麼效果圖

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  OK,通過tryAcquire()和addWaiter(),該線程擷取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一部該幹什麼了吧:進入等待狀态休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。沒錯,就是這樣!是不是跟醫院排隊拿号有點相似~~acquireQueued()就是幹這件事:在等待隊列中排隊拿号(中間沒其它事幹可以休息),直到拿到号後再傳回。這個函數非常關鍵,還是上源碼吧:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标記是否成功拿到資源
    try {
        boolean interrupted = false;//标記等待過程中是否被中斷過

        //又是一個“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驅
            //如果前驅是head,即該結點已成老二,那麼便有資格去嘗試擷取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到資源後,将head指向該結點。是以head所指的标杆結點,就是目前擷取到資源的那個結點或null。
                p.next = null; // setHead中node.prev已置為null,此處再将head.next置為null,就是為了友善GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
                failed = false; // 成功擷取資源
                return interrupted;//傳回等待過程中是否被中斷過
            }

            //如果自己可以休息了,就通過park()進入waiting狀态,直到被unpark()。如果不可中斷的情況下被中斷了,那麼會從park()中醒過來,發現拿不到資源,進而繼續進入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待過程中被中斷過,哪怕隻有那麼一次,就将interrupted标記為true
        }
    } finally {
        if (failed) // 如果等待過程中沒有成功擷取資源(如timeout,或者可中斷的情況下被中斷了),那麼取消結點在隊列中的等待。
            cancelAcquire(node);
    }
}      

  到這裡了,我們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驅的狀态
    if (ws == Node.SIGNAL)
        //如果已經告訴前驅拿完号後通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀态,并排在它的後邊。
         * 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鍊,稍後就會被保安大叔趕走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它拿完号後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}      

  整個流程中,如果前驅結點的狀态不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿号。

  如果線程找好安全休息點後,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀态。

private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);//調用park()使線程進入waiting狀态
     return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。
 }      

好了,我們可以小結下了。

看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函數的具體流程:

  1. 結點進入隊尾後,檢查狀态,找到安全休息點;
  2. 調用park()進入waiting狀态,等待unpark()或interrupt()喚醒自己;
  3. 被喚醒後,看自己是不是有資格能拿到号。如果拿到,head指向目前結點,并傳回從入隊到拿到号的整個過程中是否被中斷過;如果沒拿到,繼續流程1。

最後我們再回到前面的acquire方法來總結下

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }      

總結下它的流程吧

  1. 調用自定義同步器的tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;
  2. 沒成功,則addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;
  3. acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試擷取資源。擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。
  4. 如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。
J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  好了,lock方法看完後,我們再來看下unlock方法

  它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來擷取資源。這也正是unlock()的語義,當然不僅僅隻限于unlock()

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到頭結點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//喚醒等待隊列裡的下一個線程
        return true;
    }
    return false;
}      

  此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:

public final boolean release(int arg) {
        if (tryRelease(arg)) {//這裡是先嘗試釋放一下資源,一般都可以釋放成功,除了多次重入但隻釋放一次的情況。
            Node h = head;
            //這裡判斷的是 阻塞隊列是否還存在和head節點是否是tail節點,因為之前說過,隊列的尾節點的waitStatus是為0的
            if (h != null && h.waitStatus != 0)
                //到這裡就說明head節點已經釋放成功啦,就先去叫醒後面的直接節點去搶資源吧
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
      
private void unparkSuccessor(Node node) {
    //這裡,node一般為目前線程所在的結點。
    int ws = node.waitStatus;
    if (ws < 0)//置零目前線程所在的結點狀态,允許失敗。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一個需要喚醒的結點s
    if (s == null || s.waitStatus > 0) {//如果為空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
            if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}      

  這個函數并不複雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裡我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head标杆結點,表示自己已經擷取到資源了,acquire()也傳回了

  好了,到這我們就因為把源碼看完了,再回頭來看下這張圖

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  是不是就清楚了AQS到底是怎麼實作的我們上面的猜想的了吧。那麼對應的下課後讓你自己去看

J.U.C的核心基礎内容之AQS源碼詳解,建議收藏哦!!!場景模拟需求分析什麼是重入鎖?AQS類圖源碼分析

  這幾個的源碼,你是不是就應該能看懂了,好了本文就介紹到此,本文對你有幫助的歡迎關注點贊,謝謝【歡迎加群交流哦:463257262】

繼續閱讀