天天看點

AQS底層原理實作AQS底層原理

AQS底層原理

概念

AQS是一個同步隊列,全名為AbstractQueuedSynchronizer,它是一個同步工具,是Lock用來實作線程同步的核心元件。

AQS的功能分為兩種:獨占和共享

獨占鎖:每次隻能有一個線程擷取鎖,比如ReentrantLock是以獨占的方式實作互斥

共享鎖:允許多個線程同時擷取鎖,并發的通路共享資源,比如ReentrantReadWriteLock

内部實作

AQS隊列内部維護的是一個FIFO的雙向連結清單。每個Node由線程封裝,當線程争搶鎖失敗後會封裝成Node加入

AQS隊列中去。當擷取鎖的線程釋放鎖以後,會從隊列中喚醒一個阻塞的節點

Node的組成

static final class Node {
        //共享模式下的等待标記
        static final Node SHARED = new Node();
        //獨占模式下的等待标記
        static final Node EXCLUSIVE = null;
        //線程的等待狀态,表示線程已經被取消
        static final int CANCELLED =  1;
        //線程的等待狀态,表示後繼線程需要被喚醒
        static final int SIGNAL    = -1;
        //線程的等待狀态,表示線程在Condition上
        static final int CONDITION = -2;
        //表示下一個acquireShared需要無條件傳播
        static final int PROPAGATE = -3;
        
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        //目前節點的線程,初始化後使用,在使用後失效
        volatile Thread thread;

        /**
         * 連結到等在等待條件上的下一個節點,或特殊的值SHARED,因為條件隊列隻有在獨占模式時才能被通路,
         * 是以我們隻需要一個簡單的連接配接隊列在等待的時候儲存節點,然後把它們轉移到隊列中重新擷取
         * 因為條件隻能是獨占性的,我們通過使用特殊的值來表示共享模式
         */
        Node nextWaiter;
        
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        /**
         * 傳回目前節點的前驅節點,如果為空,直接抛出空指針異常
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // 用來建立初始化的head 或 SHARED的标記
        }
		 //将線程構造成一個節點,添加隊列
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { //指定線程和節點狀态的構造方法 // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
           

釋放鎖以及添加線程對于隊列的變化

添加節點的場景

AQS底層原理實作AQS底層原理

添加節點會涉及到兩個變化

1:新的線程封裝為Node節點追加到同步隊列中,設定prev節點,以及修改目前節點的前置節點的next節點指向自己(連結清單插入操作)

2:通過CAS将tail重新指向新的尾部節點

head節點表示擷取鎖成功的節點,當頭結點在釋放同步狀态時,會喚醒後繼節點,如果後繼節點獲得鎖成功,會把自己設定為頭結點,節點變化過程如下。

這個過程也是涉及到兩個變化

1:修改head為下一個擷取鎖的節點

2:新的擷取鎖的節點,将pre的指針指向null

設定head節點不需要使用CAS,因為設定head節點是由擷取鎖的線程來完成的,而同步鎖隻能由一個線程獲得,是以不需要CAS保證

AQS底層原理實作AQS底層原理

ReentrantLock源碼分析

ReentrantLock的時序圖

調用ReentrantLock中的lock()方法,源碼調用過程如下

AQS底層原理實作AQS底層原理

ReentrantLock.lock()擷取鎖的入口

public void lock() {
        sync.lock();
    }
           

sync是一個抽象的靜态内部類,繼承了AQS來實作重入鎖的邏輯,AQS是一個同步隊列,能夠實作線程的阻塞以及喚醒,但是并不具備業務功能,是以在不同場景中,會繼承AQS來實作對應場景的功能,Sync有兩個具體的實作類,分别是:

NofairSync:表示可以存在搶占鎖的功能,允許搶占。

FairSync:表示所有線程嚴格按照FIFO來擷取鎖

NofairSync

以非公平鎖為例,看看lock的具體實作

1:CAS成功,就表示成功擷取到了鎖

2:CAS失敗,調用acquire(1)進行鎖競争

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
           

CAS的實作原理

//CAS改變狀态
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

通過cas樂觀鎖的方式進行比較并替換,這段代碼意思是,如果目前記憶體的state和預期值expect一樣,則替換為

update,更新成功傳回true,否則傳回false,這個操作是原子性的

state是AQS中的一個屬性,對于重入鎖的實作來說,表示有一個同步狀态,有兩個含義表示:

1:當state=0時,表示無鎖狀态

2:當state>0時,表示已經有線程擷取了鎖,也就是state=1,因為ReentrantLock允許重入,是以在同一線程多次擷取同步鎖時,state會遞增。

Unsafe 可認為是 Java 中留下的後門,提供了一些低層次操作,如直接記憶體通路、線程的挂起和恢複、CAS、線程同步、記憶體屏障

AQS.acquire

如果CAS操作未能成功,說明state已經不為0,此時繼續acquire(1)操作

1:通過tryAcquire 嘗試擷取獨占鎖,如果成功則傳回true,失敗傳回false

2:如果tryAcquire 失敗,則會通過addWaiter方法将目前線程封裝為Node添加到AQS隊列的尾部

3:acquireQueued,将Node作為參數,通過自旋去嘗試擷取鎖

if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    //中斷
    selfInterrupt();
           

NonfairSync.tryAcquire

嘗試擷取鎖,如果成功傳回 true,不成功傳回 false,它是重寫 AQS 類中的 tryAcquire 方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
           

ReentrantLock.nonfairTryAcquire

1:擷取目前線程,判斷鎖的狀态

2:如果state=0表示目前是無鎖狀态,通過cas更新state狀态的值

3:目前線程是可重入的,則增加重入次數

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果是0的話,則嘗試去原子擷取這個鎖
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //比較與目前線程是否是一個線程,如果是的話
    else if (current == getExclusiveOwnerThread()) {
        //增加狀态變量的值,是以是可重入操作
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
           

AQS.addWaiter

當tryAcquire方法擷取鎖失敗後,則會先調用addWaiter将目前線程封裝為Node。形參mode表示目前節點的狀

态,傳遞的參數是Node.EXCLUSIVE,表示獨占狀态。意味着重入鎖用到了AQS的獨占功能

1:将目前線程封裝為Node

2:目前連結清單中的tail節點是否為空,如果不為空,則通過CAS操作把目前線程的node添加到AQS隊列

3:如果為空或CAS失敗,調用enq将節點添加到AQS隊列

private Node addWaiter(Node mode) {
    //将目前線程封裝為Node
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //pred不為空的情況下,說明隊列中存在節點
    if (pred != null) {
        node.prev = pred;//将目前線程的Node的prev指向tail
        //通過 cas 把 node加入到 AQS 隊列,也就是設定為 tail
        if (compareAndSetTail(pred, node)) {
            //設定成功以後,把原 tail 節點的 next指向目前 node
            pred.next = node;
            return node;
        }
    }
    //tail=null或者compareAndSetTail(pred, node)=false,把 node 添加到同步隊列
    enq(node);
    return node;
}
           

enq

private Node enq(final Node node) {
    //自璇操作,入隊
    for (;;) {
        //尾部
        Node t = tail;
        //尾部為空
        if (t == null) { // Must initialize
            //建立頭部節點,設定初始化節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

圖解分析:

假設有三個線程來争搶鎖

AQS底層原理實作AQS底層原理

AQS.acquireQueued

通過addWaiter方法把線程添加到連結清單後,會接着把Node作為參數傳遞給acquireQueued方法,去競争鎖

1:擷取目前節點的prev節點

2:如果prev節點為head節點,那麼它就有資格去争搶鎖,調用tryAcquire搶占鎖

3:搶占鎖成功以後,把擷取鎖的節點設定為head,并且移除原來初始化的head節點

4:如果擷取鎖失敗,則根據waitStatus決定是否需要挂起線程

5:最後,通過cance|Acquire取消獲得鎖的操作

//通過自旋方式擷取同步狀态
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;//預設線程沒有被中斷過
        for (;;) {
            final Node p = node.predecessor();//擷取該節點的前驅節點p
            if (p == head && tryAcquire(arg)) {  // 如果p是頭節點并且能擷取到同步狀态
                setHead(node);                   // 把目前節點設為頭節點
                p.next = null;                  // 把p的next設為null,便于GC
                failed = false;                 // 标志--表示成功擷取同步狀态,預設是true,表示失敗
                return interrupted;             // 傳回該線程在擷取到同步狀态的過程中有沒有被中斷過
            }
            //ThreadA 可能還沒釋放鎖,使得 ThreadB 在執行 tryAcquire 時會傳回 false
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //并且傳回目前線程在等待過程中有沒有中斷過。
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

shouldParkAfterFailedAcquire

如果線程A的鎖還沒有釋放的情況下,B和C來争搶鎖肯定是會失敗的,那麼失敗之後,調用

shouldParkAfterFailedAcquire方法

Node中有五種狀态,分别是CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、預設狀态(0)

CANCELLED:在同步隊列中等待的線程逾時或者被打斷,需要從同步隊列中取消該Node的結點,其結點的

waitStatus為CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化

SIGNAL:隻要前置節點釋放鎖,就會通知辨別為SIGNAL狀态的後續節點的線程

CONDITION:線程的等待狀态,表示線程在Condition上

PROPAGATE:共享模式下,PROPAGATE 狀态的線程處于可運作狀态 0:初始狀态,

這個方法主要作用是,通過Node的狀态判斷,A線程競争鎖失敗以後是否應該被挂起

1:如果線程A的pred節點狀态為SIGNAL,那就表示可以放心挂起目前線程

2:通過循環掃描連結清單把CANCELLED狀态的節點移除

3:修改pred節點的狀态為SIGNAL,傳回false

傳回 false 時,也就是不需要挂起,傳回 true,則需要調用 parkAndCheckInterrupt挂起目前線程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//前置節點的waitStatus
        if (ws == Node.SIGNAL) //如果前置節點為SIGNAL,意味着隻需要等待其他前置節點的線程被釋放
            return true; //可以放心挂起
        if (ws > 0) { //ws 大于 0,意味着 prev 節點取消了排隊,直接移除這個節點就行
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);//這裡采用循環,從雙向清單中移除 CANCELLED 的節點
            pred.next = node;
        } else {//利用 cas 設定 prev 節點的狀态為 SIGNAL(-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

parkAndCheckInterrupt

使用 LockSupport.park 挂起目前線程WATING 狀态

Thread.interrupted,傳回目前線程是否被其他線程觸發過中斷請求,也就是thread.interrupt(); 如果有觸發過中

斷請求,那麼這個方法會傳回目前的中斷辨別true,并且對中斷辨別進行複位辨別已經響應過了中斷請求。如果返

回 true,意味着在 acquire 方法中會執行 selfInterrupt()。

private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  return Thread.interrupted();
}
           

圖解分析

通過 acquireQueued 方法來競争鎖,如果 ThreadA 還在執行中沒有釋放鎖的話,意味着 ThreadB 和 ThreadC

隻能挂起了。

AQS底層原理實作AQS底層原理

鎖的釋放流程

ReentrantLock.unlock

public final boolean release(int arg) {
    if (tryRelease(arg)) { //釋放鎖成功
        Node h = head; //擷取到AQS中的head節點
        if (h != null && h.waitStatus != 0) //如果不為空,并且狀态不為0,則喚醒後續節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

ReentrantLock.tryRelease

這個方法是一個設定鎖的操作,通過将state狀态減掉傳入的參數值(1),如果結果狀态為0,就将它的排它鎖的

Owner設定為null

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
           

unparkSuccessor

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;//獲得頭結點的狀态
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;//擷取頭結點的下一個節點
    if (s == null || s.waitStatus > 0) {
        //如果下一個節點為 null 或者 status>0 表示 cancelled 狀态.
        //通過從尾部節點開始掃描,找到距離 head 最近的一個waitStatus<=0 的節點
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //next 節點不為空,直接喚醒這個線程即可
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

為什麼釋放節點時候是從tail掃描的

再回到 enq那個方法。8-11行來看一個新的節點是如何加入到連結清單中的

1:将新的節點的prev指向tail

2:通過cas将tail設定為新的節點

3:t.next = node;

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}
           
AQS底層原理實作AQS底層原理

在 cas 操作之後,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head開始往後周遊,由于

t.next=node 還沒執行意味着連結清單的關系還沒有建立完整。就會導緻周遊到 t 節點的時候被中斷。是以從後往前遍

曆,一定不會存在這個問題。

圖解分析

通過鎖的釋放,原本的結構就發生了一些變化。head 節點的 waitStatus 變成了 0,ThreadB 被喚醒

AQS底層原理實作AQS底層原理

原本挂起的線程繼續執行

通過ReentrantLock.unlock,原本挂起的線程被喚醒後繼續執行原來被挂起的線程是在 acquireQueued 方法中,

是以被喚醒以後繼續從這個方法開始執行

具體見上面的AQS.acquireQueued ,關鍵代碼如下:

if (p == head && tryAcquire(arg)) {
    setHead(node);//設定線程B為頭結點
    p.next = null; // help GC
    failed = false;//把原 head 節點的 next 節點指向為 null
    return interrupted;
}
           

圖解分析

  1. 設定新 head 節點的 prev=null
  2. 設定原 head 節點的 next 節點為 null
AQS底層原理實作AQS底層原理

公平鎖和非公平鎖的差別

鎖的公平性是相對于擷取鎖的順序而言的,如果是一個公平鎖,那麼鎖的擷取順序就應該符合請求的絕對時間順

序,也就是 FIFO。 在上面分析的例子來說,隻要CAS 設定同步狀态成功,則表示目前線程擷取了鎖,而公平鎖則

不一樣,差異點有兩個

lock

final void lock() {
  acquire(1);
}
           

非公平鎖在擷取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會

FairSync.tryAcquire

protected final boolean tryAcquire(int acquires) {
    //取到目前線程
    final Thread current = Thread.currentThread();
    //擷取目前的state(上鎖次數)
    int c = getState();
    //如果當時沒有鎖
    if (c == 0) {
        //則去隊列,取隊列中的并嘗試擷取鎖
        //通過CAS修改狀态,
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //如果成功,則把獨占鎖設定為目前這個線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}
           

這個方法與 nonfairTryAcquire(int acquires)比較,不同的地方在于判斷條件多了hasQueuedPredecessors()方

法,也就是加入了[同步隊列中目前節點是否有前驅節點]的判斷,如果該方法傳回 true,則表示有線程比目前線程

更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。