天天看點

多線程-1.0-AQS重生之路AQS重生之路

AQS重生之路

​ AbstractQueuedSynchronizer抽象同步隊列簡稱AQS,它是實作同步器的基礎元件,并發包中鎖的底層就是使用AQS實作的。AQS是一個FIFO的雙向隊列,其内部通過節點head和tail記錄隊首和隊尾元素,隊列元素的類型為Node。

多線程-1.0-AQS重生之路AQS重生之路

1.1 Node分析

static final class Node {
  
  	//标記該線程是擷取共享資源時被阻塞挂起後放入AQS隊列
    static final Node SHARED = new Node();
	//标記線程是擷取獨占資源時被挂起後放入AQS隊列的
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;

    static final int PROPAGATE = -3;

	//記錄目前線程等待狀态,可以為
  	//CANCELLED(線程被取消了)
  	//SIGNAL(線程需要被喚醒)
  	//CONDITION(線程在條件隊列裡面等待)
  	//PROPAGATE(釋放共享資源時需要通知其他節點)
    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;
  
	//存放進入AQS隊列裡面的線程
    volatile Thread thread;

    Node nextWaiter;
}
/**
	頭尾節點
*/
private transient volatile Node head;
private transient volatile Node tail;


private volatile int state;
           

上述state變量擴充:對于ReentrantLock的實作來說,state可以用來表示目前線程擷取鎖的可重入次數;對于讀寫鎖ReentrantReadWriteLock來說,state的高16位表示讀狀态,也就是擷取該讀鎖的次數,低16位表示擷取到寫鎖的線程的可重入次數;對于semaphore來說,state用來表示目前可用信号的個數;對于CountDownlatch來說,state用來表示計數器目前的值。

1.2 ConditionObject

**字段:**ConditionObject内部類隻有兩個變量分别存儲條件隊列的頭、尾元素。

方法:

多線程-1.0-AQS重生之路AQS重生之路

1.3 變量

多線程-1.0-AQS重生之路AQS重生之路

方法操作

​ 對于AQS來說,線程同步的關鍵是對狀态值state進行操作。根據state是否屬于一個線程,操作state的方式分為獨占方式和共享方式。

獨占方式

​ 注意:阻塞的方式進行資源擷取,try相關的方法都需要子類去實作。
  • void acquire(intarg)
    //會首先使用tryAcquire方法嘗試擷取資源,具體是設定狀态變量state的值,成功則直接傳回
    //失敗則将目前線程封裝為類型為Node.EXCLUSIVE的Node節點後插入到AQS阻塞隊列的尾部,并調用LockSupport.park(this)方法挂起自己。
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
               
  • void acquireInterruptibly(int arg)
  • boolean release(int arg)
    1.嘗試使用tryRelease操作釋放資源,這裡是設定狀态變量state的值,然後調用LockSupport.unpark(thread)方法激活AQS隊列裡面被阻塞的一個線程
    
    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = 
                  ;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
               

共享方式

​ 當多個線程去請求資源時通過CAS方式競争擷取資源,當一個線程擷取到了資源後,另外一個線程再次去擷取時如果目前資源還能滿足它的需要,則目前線程隻需要使用CAS方式進行擷取即可。比如Semaphore信号量,當一個線程通過acquire()方法擷取信号量時,會首先看目前信号量個數是否滿足需要,不滿足則把目前線程放入阻塞隊列,如果滿足則通過自旋CAS擷取信号量。
  • void acquireShared(int arg)
    會首先使用tryAcquireShared嘗試擷取資源,具體是設定狀态變量state的值,成功則直接傳回,失敗則将目前線程封裝為類型為Node.SHARED的Node節點後插入到AQS阻塞隊列的尾部,并使用LockSupport.park(this)方法挂起自己。 
    
    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
    }
               
  • void acquireSharedInterruptibly(int arg)
  • boolean releaseShared(int arg)

1.4 維護AQS隊列

入隊操作

簡述:當一個線程擷取鎖失敗後該線程會被轉換為Node節點,然後就會使用enq(final Node node)方法将該節點插入到AQS的阻塞隊列。

1.當隊列為空,首先設定一個head的哨兵節點,并讓尾部也指向head節點
2.第二次循環,将目前節點連結到尾部,并将tail指向尾節點
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

1.5 AQS—條件變量的支援

​ 主要就是通過

Lock.newCondition()

來擷取條件變量,然後通過

condition.await()

condition.signal()

來控制線程通信。

案例如下

package demo;

import java.util.HashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class Test {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {

       new Thread(Test::await).start();

       new Thread(Test::signal).start();

    }

    private static void await(){
        lock.lock();
        try {
            System.out.println("begin wait");
            condition.await();
            System.out.println("end wait");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void signal(){
        lock.lock();
        try {
            System.out.println("begin signal");
            condition.signal();
            System.out.println("end signal");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
           

a.源碼分析

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
   					//建立新的node節點,并插入到條件隊列末尾
            Node node = addConditionWaiter();
   					//釋放目前線程擷取的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
   					//調用park()方法阻塞挂起目前線程
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
           ...
}
           
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
      	//将條件隊列頭元素移動到AQS隊列
        doSignal(first);
}
           

b.總體流程

​ 1.當多個線程同時調用lock.lock()方法擷取鎖時,隻有一個線程擷取到了鎖,其他線程會被轉換為Node節點插入到lock鎖對應的AQS阻塞隊列裡面,并做自旋CAS嘗試擷取鎖。

​ 2.如果擷取到鎖的線程又調用了對應的條件變量的await()方法,則該線程會釋放擷取到的鎖,并被轉換為Node節點插入到條件變量對應的條件隊列裡面。

​ 3.這時候因為調用lock.lock()方法被阻塞到AQS隊列裡面的一個線程會擷取到被釋放的鎖,如果該線程也調用了條件變量的await()方法則該線程也會被放入條件變量的條件隊列裡面。

​ 4.當另外一個線程調用條件變量的signal()或者signalAll()方法時,會把條件隊列裡面的一個或者全部Node節點移動到AQS的阻塞隊列裡面,等待時機擷取鎖。

​ 最後使用一個圖總結如下:一個鎖對應一個AQS阻塞隊列,對應多個條件變量,每個條件變量有自己的一個條件隊列。

多線程-1.0-AQS重生之路AQS重生之路