天天看點

多線程與高并發四之 AQS源碼解析(CAS+volatile)

volatile :   可見性:緩存一緻性協定 MSI             禁止指令重排:JMM,記憶體屏障

synchronized  和  ReentrantLock 的差別

synchronized  系統自動上鎖,

ReentrantLock 需要手工上鎖,ReentrantLock 可以出現各種 condition (狀況)。

例如  tryLock 嘗試加鎖,lockInterruptibly()上鎖的時候異常了、getQueueLength()擷取隊列長度、hasQueuedThreads() 是否有線程在隊列裡,某一個線程是否在隊列裡  等等

實作:

ReentrantLock  CAS的實作。 synchronized  四種鎖的實作,

無鎖狀态、偏向鎖狀态、輕量級鎖狀态、重量級鎖狀态  ( 見 多線程與高并發程式設計一 )

AQS  AbstractQueuedSynchronizer  類,volatile state 狀态可重入鎖
           

AQS 底層就是CAS+ volatile 

volatile state 用來記錄鎖的狀态,為0則沒有上鎖,否則上鎖了。

ReentrantLock.lock() 會調用 acquire(1);  
AbstractQueuedSynchronizer.acquire(1) 方法裡面的if 會調用 addWaiter(Node.EXCLUSIVE) 
  static final Node EXCLUSIVE = null; 就是說 Node.EXCLUSIVE 這個值永遠是空的
           
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

首先,tail 為空, 傳進來的mode為空,是以pred為空,直接進入 enq(node) 方法 

Node node = new Node(Thread.currentThread(), mode);  node 資訊裡面的線程為目前線程,mode 為空!!!
           

enq:

for循環   t = tail , t 為空,  compareAndSetHead(new Node()) , 方法如下

private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
           

這裡調用的是unsafe的compare方法,這個是系統底層的,不做深入研究,大家了解下下面的概念原理就好

compareAndSwapObject方法中的第一個參數和第二個參數,用于确定待操作對象在記憶體中的具體位置的,然後取出值和第三個參數進行比較,如果相等,則将記憶體中的值更新為第四個參數的值,同時傳回true,表明原子更新操作完畢。反之則不更新記憶體中的值,同時傳回false,表明原子操作失敗

上面的方法基本上就是比較 head == null ,由于剛開始都是null的,很明顯為true,将 new Node 指派給 head。這時候,這個node.thread 已經是目前線程了。tail = head

接着進行for循環,這時候tail已經不為空了,進入else,繼續将 tail 設定為目前node ,next 指向目前node。

這裡添加第一個完成。

第二個線程來的時候,addWaiter 方法, pred != null 為true,進行 compareAndSetTail 将尾部設為第二個線程,next 指向第二個線程。

基本原理如下圖

多線程與高并發四之 AQS源碼解析(CAS+volatile)

圖可能有點錯誤,走下代碼邏輯吧

線程 A , B ,C



線程A
private Node addWaiter(Node mode) {
//Node node = new Node(A, null);
        Node node = new Node(Thread.currentThread(), mode);
        //Node pred = null  因為剛進來 tail = null;
        Node pred = tail;

        // A 不執行這個判斷
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        enq(node);
        return node;
    }

A進入enq  node = A
private Node enq(final Node node) {
        for (;;) {
           //第一次  tail = null  ,  第二次 tail = 空 node
            Node t = tail; 
            if (t == null) {
            // 第一次進入這裡
              //head = 空node
                if (compareAndSetHead(new Node()))
                //tail = 空node
                    tail = head;
            } else {

                //第二次進入   //A的prev = 空 node
                node.prev = t; 
                // 将  tail 設定為 A(node)
                if (compareAndSetTail(t, node)) {
                    //空node 的next 設定為 Anode
                    t.next = node;
                    //傳回一個空node
                    return t;
                }
            }
        }

     到這裡   tail = A;不是null,head 為 空node,t.next = Anode; A.prev=空node

線程B
private Node addWaiter(Node mode) {
        //Node node = new Node(B, null);
        Node node = new Node(Thread.currentThread(), mode);
        //Node pred = A; 
        Node pred = tail;

        // tail = A ,走這裡
        if (pred != null) {
            // B.prev = A
            node.prev = pred;
            //  tail = B
            if (compareAndSetTail(pred, node)) {
                // A.next = B
                pred.next = node;
                //傳回 B
                return node;
            }
        }
        enq(node);
        return node;
    }

到這裡 tail = B;head 為 空node,A.next = Bnode; B.prev=A;

 線程C
private Node addWaiter(Node mode) {
        //Node node = new Node(C, null);
        Node node = new Node(Thread.currentThread(), mode);
        //Node pred = B; 
        Node pred = tail;

        // tail = B ,走這裡
        if (pred != null) {
            // C.prev =B
            node.prev = pred;
            //  tail = C
            if (compareAndSetTail(pred, node)) {
                // B.next = C
                pred.next = node;
                //傳回 C
                return node;
            }
        }
        enq(node);
        return node;
    }

最終     tail = C;head 為 空node,A.next = Bnode; B.prev=A;B.next = Cnode; C.prev=B;
           

最後發現 整個 head 一直都隻是一個聲明的 空 node

ReentrantLock.unlock()
           
public void unlock() {
        sync.release(1);
    }

public final boolean release(int arg) {
        if (tryRelease(arg)) { //嘗試釋放
            Node h = head; //成功釋放,取出 head 裡面标志的線程
            if (h != null && h.waitStatus != 0) // 有等待的線程并且等待狀态不為0
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 擷取到目前state的值 - 1
        if (Thread.currentThread() != getExclusiveOwnerThread()) //判斷釋放鎖的線程是否是目前持有鎖的線程
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 如果state - 1 的值為0,則需要釋放,如果不為0,說明還有該線程還有重入鎖
            free = true;
            setExclusiveOwnerThread(null); //修改鎖的擁有者為null
        }
        setState(c); //修改state的值
        return free;
    }

    /***
     * 喚醒
     * @param node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 喚醒排隊中的下一個(node.next)線程
    }
           

unlock 方法源碼基本如上。

先嘗試釋放目前鎖,判斷有沒有重入鎖,沒有則進行釋放。釋放之後,喚醒下一個排隊中的線程。

那麼 AQS中的tail 是什麼時候釋放的呢,就是連結清單中的元素是什麼時候被銷毀的呢

在 加鎖的時候  addWaiter 外層有個 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  // 休眠
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);  //釋放 自己的node
        }
    }
           

VarHandle  jdk1.9之後支援

例如: Object o = new Object();  在記憶體中聲明了一塊記憶體,o 指向這個記憶體。然後有一個 VarHandle 也可以指向這個 o。

多線程與高并發四之 AQS源碼解析(CAS+volatile)

相當于記憶體是一個保險櫃。x 是這個保險櫃的所有者。 x 告訴你這個保險櫃的具體位址、櫃号、密碼。然後你通過這些資訊也可以找到這個保險櫃。

多線程與高并發四之 AQS源碼解析(CAS+volatile)

以及,進行一些原子性操作。即打開保險櫃之後,你可以修改裡面内容,并且是安全的。