天天看點

java-AQS簡介一、什麼是AQS二、AQS架構三、源碼LockSupport四、注意點

一、什麼是AQS

AQS,在java.util.concurrent.locks包中,AbstractQueuedSynchronizer這個類是并發包中的核心,了解其他類之前,需要先弄清楚AQS。

AQS就是一個同步器,要做的事情就相當于一個鎖,是以就會有兩個動作:一個是擷取,一個是釋放。擷取釋放的時候該有一個東西來記住他是被用還是沒被用,這個東西就是一個狀态。如果鎖被擷取了,也就是被用了,還有很多其他的要來擷取鎖,總不能給全部拒絕了,這時候就需要他們排隊,這裡就需要一個隊列。

AQS的核心思想是:通過一個volatile修飾的int屬性

state

代表同步狀态,例如0是無鎖狀态,1是上鎖狀态。多線程競争資源時,通過CAS的方式來修改state,例如從0修改為1,修改成功的線程即為資源競争成功的線程,将其設為

exclusiveOwnerThread

,也稱【工作線程】,資源競争失敗的線程會被放入一個

FIFO

的隊列中并挂起休眠,當

exclusiveOwnerThread

線程釋放資源後,會從隊列中喚醒線程繼續工作,循環往複。

二、AQS架構

1、基礎架構圖

java-AQS簡介一、什麼是AQS二、AQS架構三、源碼LockSupport四、注意點

架構還是比較簡單的,除了實作Serializable接口外,就隻繼承了

AbstractOwnableSynchronizer

父類。

AbstractOwnableSynchronizer

父類中維護了一個

exclusiveOwnerThread

屬性,是用來記錄目前同步器資源的獨占線程的,沒有其他東西。

2、AQS的CLH鎖隊列

CLH其實就是一個FIFO的隊列,隻不過稍微做了點改進,AQS有一個内部類

Node

,AQS會将競争鎖失敗的線程封裝成一個Node節點,然後由這些NODE組成了一個連結清單隊列

/**
      +------+  prev +-----+       +-----+
 head |      | <---- |     | <---- |     |  tail
      +------+       +-----+       +-----+
/**
static final class Node {
    //作為共享模式
    static final Node SHARED = new Node();
    //作為獨占模式
    static final Node EXCLUSIVE = null;
    //等待狀态:表示節點中線程是已被取消的
    static final int CANCELLED =  1;
    //等待狀态:表示目前節點的後繼節點的線程需要被喚醒
    static final int SIGNAL    = -1;
    //等待狀态:表示線程正在等待條件
    static final int CONDITION = -2;
    //等待狀态:表示下一個共享模式的節點應該無條件的傳播下去
    static final int PROPAGATE = -3;
    //等待狀态,初始化為0,剩下的狀态就是上面列出的
    volatile int waitStatus;
    //目前節點的前驅節點
    volatile Node prev;
    //後繼節點
    volatile Node next;
    //目前節點的線程
    volatile Thread thread;
    //
    Node nextWaiter;
    //是否是共享節點
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    //目前節點的前驅節點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
           

 waitStatus是以0為臨界值的,大于0代表節點無效,例如AQS在喚醒隊列中的節點時,waitStatus大于0的節點會被跳過

AQS内部還維護了int類型的state變量,代表同步器的狀态。例如,在ReentrantLock中,state就代表鎖的重入次數,每lock一次,state就+1,每unlock一次,state就-1,當state等于0時,代表沒有上鎖。

AQS内部還維護了head和tail屬性,用來指向FIFO隊列中的頭尾節點,被head指向的節點,總是工作線程。線程在擷取到鎖後,是不會出隊的。隻有當head釋放鎖,并将其後繼節點喚醒并設為head後,才會出隊。

3、共享鎖和互斥鎖

QS的CLH隊列鎖中,每個節點代表着一個需要擷取鎖的線程,該node中有兩個常量SHARED共享模式,EXCLUSIVE獨占模式。

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
           

共享模式允許多個線程可以擷取同一個鎖,獨占模式則一個鎖隻能被一個線程持有,其他線程必須要等待。

三、源碼

1、AQS源碼

//阻塞隊列的隊列頭
private transient volatile Node head;
//隊列尾
private transient volatile Node tail;
//同步狀态,這就是上面提到的需要原子操作的狀态
private volatile int state;
//傳回目前同步器的狀态
protected final int getState() {
    return state;
}
//設定同步器的狀态
protected final void setState(int newState) {
    state = newState;
}
//原子的設定目前同步器的狀态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//
static final long spinForTimeoutThreshold = 1000L;
           

2、獨占模式的擷取

acquire,獨占,忽略中斷

//獨占模式的擷取方法,會忽略中斷
//tryAcquire方法會被至少調用一次,由子類實作
//如果tryAcquire不能成功,目前線程就會進入隊列排隊
public final void acquire(int arg) {
    //首先調用tryAcquire嘗試擷取
    //擷取不成功,就使用acquireQueued使線程進入等待隊列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           

tryAcquire方法:

//由子類來實作
//嘗試在獨占模式下擷取,會查詢該對象的狀态是否允許在獨占模式下擷取
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
           

使用指定的模式建立一個節點,添加到AQS連結清單隊列中:

private Node addWaiter(Node mode) {
    //目前線程,指定的mode,共享或者獨占
    Node node = new Node(Thread.currentThread(), mode);
    //先嘗試使用直接添加進隊列
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //使用添加節點的方法
    enq(node);
    return node;
}
           

向隊列中插入節點:

//會插入節點到對列中
private Node enq(final Node node) {
    for (;;) {
        //尾節點
        Node t = tail;
        //需要執行個體化一個隊列
        if (t == null) { // Must initialize
            //使用cas建立頭節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           

tryAcquire沒有擷取到,就會先使用addWaiter添加進隊列,然後使用acquireQueued從隊列擷取,如果這時候擷取成功,則替換目前節點為隊列頭,然後傳回:

//獨占模式處理正在排隊等待的線程。
//自旋,直至擷取成功才傳回
final boolean acquireQueued(final Node node, int arg) {
    //目前擷取是否失敗
    boolean failed = true;
    try {
        //擷取是否被中斷
        boolean interrupted = false;
        for (;;) {
            //擷取目前節點的前驅節點
            final Node p = node.predecessor();
            
            //head節點要麼是剛才初始化的節點
            //要麼就是成功擷取鎖的節點
            //如果目前節點的前驅節點是head,目前節點就應該去嘗試擷取鎖了
            //目前節點的前驅節點是頭節點,就嘗試擷取
            if (p == head && tryAcquire(arg)) {
                //擷取成功的話,就把目前節點設定為頭節點
                setHead(node);
                //之前的head節點的next引用設為null
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //檢視目前節點是否應該被park
            //如果應該,就park目前線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //失敗了,取消目前線程
        if (failed)
            cancelAcquire(node);
    }
}
           

設定頭節點,隻能被擷取方法調用:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
           

shouldParkAfterFailedAcquire方法,檢視是否應該被park:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驅節點中儲存的等待狀态
    int ws = pred.waitStatus;
    //等待狀态是signal,也就是目前節點在等着被喚醒
    //此時目前節點應該park
    if (ws == Node.SIGNAL)
        return true;
        
    //等待狀态大于0表示前驅節點已經取消
    //會向前找到一個非取消狀态的節點
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
       //将前驅節點的waitStatus設定為signal,表示目前需要被park
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
           

看下parkAndCheckInterrupt方法:

//挂起目前線程,并傳回目前中斷狀态
private final boolean parkAndCheckInterrupt() {
    //挂起目前線程
    LockSupport.park(this);
    return Thread.interrupted();
}
           

cancelAcquire取消目前節點:

private void cancelAcquire(Node node) {
    //節點不存在
    if (node == null)
        return;
    //節點的線程引用設為null
    node.thread = null;

    //前驅節點
    Node pred = node.prev;
    //大于0表示前驅節點被取消
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    //前驅節點的下一個是需要移除的節點
    Node predNext = pred.next;

    //設定節點狀态為取消
    node.waitStatus = Node.CANCELLED;

    //如果是尾節點,直接取消,将前一個節點設定為尾節點
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {//不是尾節點,說明有後繼節點,将前驅節點的next紙箱後繼節點
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
           

acquireInterruptibly 獨占,可中斷

跟獨占忽略中斷類似,不再解釋。

tryAcquireNanos,獨占,可逾時,可中斷

跟上面類似,但是在doAcquireNanos中會擷取目前時間,并擷取LockSupport.parkNanos之後的時間在做逾時時間的重新計算,到了逾時時間,就傳回false。

3、獨占模式的釋放

release,獨占,忽略中斷

public final boolean release(int arg) {
    //嘗試釋放,修改狀态
    if (tryRelease(arg)) {
        //成功釋放
        //head代表初始化的節點,或者是目前占有鎖的節點
        //需要unpark後繼節點
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

unparkSuccessor:

private void unparkSuccessor(Node node) {
    //頭節點中儲存的waitStatus
    int ws = node.waitStatus;
    //重置頭節點狀态為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //後繼節點
    Node s = node.next;
    //後繼節點為null或者已經取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從最後往前找有效的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

共享模式的擷取

acquireShared,共享,忽略中斷

acquireSharedInterruptibly,共享,可中斷

tryAcquireSharedNanos,共享,可設定逾時,可中斷

共享模式的釋放

releaseShared

共享模式的和獨占模式基本差不多,和獨占式的acquireQueued方法差別就是在擷取成功的節點後會繼續unpark後繼節點,将共享狀态向後傳播。

LockSupport

用來建立鎖和其他同步類的基本線程阻塞原語。每個使用LockSupport的線程都會與一個許可關聯,如果該許可可用并且可在程序中使用,則調用park()将會立即傳回,否則可能阻塞。如果許可不可用,可調用unpark使其可用。

許可不可重入,隻能調用一次park()方法,否則會一直阻塞。

park()和unpark()作用分别是阻塞線程和解除阻塞線程,且park和unpark不會遇到suspend和resume可能引發的死鎖問題。

park,如果許可可用,使用該許可,并且該調用立即傳回;否則為線程排程禁用目前線程,并在發生以下三種情況之一之前,使其處于休眠狀态:

* 其他某個線程将目前線程作為目标調用unpark

* 其他某個線程中斷目前線程

* 該調用不合邏輯的傳回

unpark,如果給定的線程尚不可用,則使其可用。如果線程在park上受阻塞,則它将解除其阻塞狀态。否則,保證下一次調用park不受阻塞。如果給定線程尚未啟動,則無法保證此操作有任何效果。

四、注意點

1.工作線程什麼時候出隊?

FIFO

隊列中的一個節點競争到資源時,它并不是就馬上出隊了,而是将

head

指向自己。節點釋放鎖後依然不會主動出隊,而是等待下一個節點競争鎖成功後修改

head

的指向,将前任head踢出去。

2.AQS喚醒隊列的規則是什麼?

head

指向的節點成功釋放資源後,首先會判斷目前節點的

waitStatus

是否等于0,如果等于0就不會去喚醒後繼節點了,這也就是為什麼新的節點入隊休眠的前提是必須将前驅節點的

waitStatus

改為

SIGNAL(-1)

的原因,如果不改,後繼節點将不會被喚醒,就會導緻死鎖。

AQS首先會喚醒目前節點的直接後繼節點

next

,如果next為null,有兩種情況:

  1. 确實沒有後繼節點了,之前next指向的節點可能由于逾時等原因退出競争了。
  2. 存在後繼節點,隻是由于多線程的原因,後繼節點還沒來得及将目前節點的

    next

    指向它。

第一種情況好辦,後繼節點為null,不喚醒就是了。

第二種情況就需要從tail向head尋找了,找到了有效節點再喚醒。

如果存在直接後繼節點,但是節點的

waitStatus

大于0,AQS也是會選擇跳過它的。前面已經說過,

waitStatus

大于0的節點代表無效節點,如

CANCELLED(1)

是已經取消競争的節點。如果直接後繼節點是無效節點的話,AQS會從tail開始向head周遊,直到找到有效節點,再将其喚醒。

總結:存在直接後繼節點且節點有效,則優先喚醒後繼節點。否則,從tail向head周遊,直到找到有效節點再喚醒。

3.喚醒節點為什麼要從尾巴開始?

這是因為,新節點入隊時,需要執行三個步驟:

  1. 目前節點的prev指向前任tail
  2. CAS将tail指向目前節點
  3. 前任tail的next指向目前節點

這三個操作AQS并沒有做同步處理,如果在執行步驟2後CPU時間片到期了,此時的節點指向是這樣的:

java-AQS簡介一、什麼是AQS二、AQS架構三、源碼LockSupport四、注意點

前驅節點的next還沒有指派,如果從頭向尾找,就可能會存在漏喚醒的問題。

而prev的指派先于tail的CAS操作之前執行,是以從尾向頭找,就可以避免這個問題。