天天看點

并發程式設計--AbstractQueuedSynchronizer介紹和原理分析

AbstractQueuedSynchronizer是并發程式設計包中最重要的類,是并發程式設計包的實作基層。簡單來說,AbstractQueuedSynchronizer提供了一個基于FIFO的隊列,用于存儲處于阻塞狀态的線程;提供了一個volatile修改的state變量,用于作為鎖擷取的辨別位,對于FIFO隊列和state的操作都是通過Unsafe這個類來實作CAS原子操作。

AQS的功能可以分為兩類:獨占功能和共享功能,它的所有子類中,要麼實作了并使用了它獨占鎖功能的API,要麼使用了共享鎖的功能,而不會同時使用兩套API,即便是它最有名的子類讀寫鎖ReentrantReadWriteLock也是通過兩個内部類:讀鎖和寫鎖,分别實作兩套API來實作的。

一、FIFO隊列:

//阻塞線程節點
    static final class Node {
        
        static final Node SHARED = new Node();  //共享鎖

        static final Node EXCLUSIVE = null;  //獨占鎖

        static final int CANCELLED =  1;  //用于表示目前線程被取消

        static final int SIGNAL    = -1;  //表示目前線程處于阻塞狀态

        static final int CONDITION = -2;  //表示目前節點在等待Condition
 
        static final int PROPAGATE = -3;  //表示目前場景下後續的acquireShared能夠得以執行

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

       
        volatile Thread thread;

        Node nextWaiter;
    }      

當線程沒有競争到鎖資源時就會被阻塞并放到FIFO隊列中,當鎖資源釋放後再從FIFO由阻塞狀态變為可執行狀态來擷取鎖資源并執行。當線程擷取鎖資源時有公平鎖和非公平鎖可以參考

​​并發程式設計--公平鎖和非公平鎖​​;此外對于鎖的分類還有獨占鎖和共享鎖,簡單來說獨占鎖state就是目前鎖隻能有一個線程擷取,其他線程處于阻塞狀态,共享鎖是多個線程可以同時持有目前鎖狀态state,隻要符合某種要求所有線程可以并發執行,不用進行阻塞等待。

AQS FIFO隊列模型:

并發程式設計--AbstractQueuedSynchronizer介紹和原理分析

二、鎖狀态:

AbstractQueuedSynchronizer提供了一個變量state,用來作為目前鎖是否可以擷取的辨別,對于所有線程來說,目前state的值對于他們來說都是可見的,每個線程持有的state值都是相同的。

private volatile int state;      

三、擷取鎖

1、成功擷取鎖

我們介紹一下一個線程是如何擷取鎖的,當一個獨占鎖被擷取之後,其他線程是該如何進行操作的。我們用ReentrantLock的接口lock來看一下鎖是如何擷取的。

lock接口:

final void lock() {
    //但擷取鎖時,會将state設定為1,如果是單個線程多次擷取鎖則state++
            acquire(1);
        }      

(1)tryAcquire是嘗試擷取鎖,如果擷取鎖則繼續執行

(2)如果tryAcquire傳回false,則調用acquireQueued将目前線程阻塞并添加的FIFO隊列中

//tryAcquire是嘗試擷取鎖,如果擷取鎖則繼續執行
//如果tryAcquire傳回false,則調用acquireQueued将目前線程阻塞并添加的FIFO隊列中
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }      

我們看看tryAcquire嘗試擷取鎖的機制,我們介紹一下公平鎖的擷取吧

(1)首先會擷取lock中的state值,如果state等于0表示目前鎖可擷取,接下來hasQueuedPredecessors判斷目前鎖是否在FIFO隊列中,當FIFO隊列為空時,目前線程可以擷取鎖,不然根據公平規則,需要讓FIFO中的線程優先擷取鎖,當可以擷取鎖是調用 compareAndSetState(0, acquires)将state值通過cas操作設定為acquires,目前線程擷取了鎖,state值為acquires值,其他線程來擷取鎖時state肯定就不為0了。

(2)當state值不為0時,首先要判斷一下目前擷取鎖的線程和申請鎖的線程是否是一個線程,如果時一個線程state++,目前線程擷取了多次鎖

(3)當以上條件都不滿足時,表示線程無法擷取鎖,傳回false。

protected final boolean tryAcquire(int acquires) {
            //擷取目前線程
            final Thread current = Thread.currentThread();
            //擷取鎖辨別state
            int c = getState();
            //如果c等于0則表示目前線程可以擷取鎖
            if (c == 0) {
                //hasQueuedPredecessors判斷目前鎖是否在FIFO隊列中,當FIFO隊列為空時,目前線程可以擷取鎖,不然根據公平規則,需要讓FIFO中的線程優先擷取鎖
                //當可以擷取鎖是調用 compareAndSetState(0, acquires)将state值通過cas操作設定為acquires
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //将獨占線程設定為目前線程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //即使目前擷取不到鎖,如果擷取鎖的線程是目前線程則state++,線程擷取多個鎖
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //擷取鎖失敗
            return false;
        }
    }      

總結:鎖的擷取是根據鎖狀态值state和判斷目前擷取鎖的線程是否是申請鎖的線程來判斷的

2、進入FIFO線程隊列

當擷取鎖失敗時傳回false,則執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)操作。

首先調用addWaiter(Node.EXCLUSIVE),将線程添加到FIFO隊列中,并阻塞。

private Node addWaiter(Node mode) {
        //建立目前線程節點Node
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        //當尾節點不為空時
        if (pred != null) {
            node.prev = pred;
            //cas原子操作将目前線程添加到FIFO隊列中
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //當尾節點為空時,初始化head和tail節點
        enq(node);
        //傳回node
        return node;
    }      

接下來調用acquireQueued将線程進行阻塞

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //擷取node節點的前一個節點
                final Node p = node.predecessor();
                //再嘗試擷取鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //shouldParkAfterFailedAcquire 用來設定node的waitstatus狀态為SIGNAL,值為-1,表示目前節點的後繼節點包含的線程需要運作,也就是unpark;
                //parkAndCheckInterrupt() 操作是将目前線程park,阻塞,線程阻塞在這個地方
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }      

線程阻塞的操作是在parkAndCheckInterrupt中将目前線程阻塞。

private final boolean parkAndCheckInterrupt() {
        //阻塞目前線程
        LockSupport.park(this);
        return Thread.interrupted();
    }      

總結:以上操作完成了線程的進FIFO隊列及目前線程的阻塞。

3、釋放鎖資源

鎖的釋放還是比較簡單的,簡單的來說就是将鎖辨別位state進行減一操作,然後将阻塞線程喚起讓他們重新競争

public void unlock() {
        sync.release(1);
    }      

release中進行state恢複值并喚起阻塞線程。

public final boolean release(int arg) {
        //tryRelease釋放鎖,将state進行減值,并設定目前可執行線程為null
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //喚起阻塞線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }      

tryRelease中對state進行修改,如果state的值為0的話将目前執行線程設定為null。

protected final boolean tryRelease(int releases) {
      //修改state值
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
        //将目前執行線程設定為空
                setExclusiveOwnerThread(null);
            }
      //修改state值
            setState(c);
            return free;
        }      

接下來的操作是調用unparkSuccessor将阻塞線程喚起

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚起目前線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }      

繼續閱讀