天天看點

【java基礎】并發閑談三:CAS與AQS

寫在最前,本人也隻是個大三的學生,如果你發現任何我寫的不對的,請在評論中指出。

預設版本 JDK8

CAS

  鎖是分兩種的,一種是悲觀鎖,另一種是樂觀鎖。

  

悲觀鎖

就是常說的鎖,像 Lock、synchronized 這種。悲觀鎖總是認為要通路的共享資源是要發生沖突的,是以每次都會在通路的時候加上鎖,以此保證每次通路共享資源的時候隻有一個線程。

  

樂觀鎖

是純粹的樂天派,它是認為要通路的共享資源是不會發生沖突的,無需加鎖也無需等待,自然能夠避免死鎖。而一旦多個線程發生沖突,樂觀鎖通常是使用一種稱為CAS的技術來保證線程執行的安全性。

CAS概念了解

  CAS全拼是比較并交換(Compare And Swap)。在CAS中,有這樣三個值:

  • V:要更新的變量 var
  • E:期望的值 expected
  • N:新值 new

整體過程如下: 首先判斷 V 是否等于 E,如果等于,将 V 的值設定為 N;如果不等,說明其他線程更新了該值,則目前線程放棄更新,什麼都不做。是以這裡的 E 本質是一個舊值。

可以舉例來說明:

  1. 假設一個線程想要将共享變量 i=5 更新為6
  2. 情況1是我們首先去判斷i此時是等于5的,說明沒有被其他線程所更改,那麼就會把值設定為6,此次 CAS 成功,i的值等于6
  3. 情況2是發生i的值不是期望的5,而是其他什麼值,那麼就會判定此次的 CAS 失敗,然後什麼也不做,i 的值保持原狀。

  你可能會疑惑,那假如你判斷 i 等于5之後,正要修改,然後被其他線程更新了值怎麼辦?

我剛開始也有這種疑惑,實際上 CAS 操作是原子性的,它畢竟是 native 方法,是 JVM 底層或 C++ 實作的,它是一種系統原語,是一條 CPU 原子指令,從 CPU 層面保證了它的原子性。

CAS實作技術——Unsafe

  

Unsafe 是 sun.misc 包下的,

它裡面一些 native 是關于 CAS 的。Unsafe 中對 CAS 的實作是 C++ 寫的,它的具體實作和作業系統、CPU 都有關系。

  用到 CAS 技術的可以參照一下各種原子類,比如:

AtomicInteger、AtomicBoolean、AtomicLong等等......

  這裡就用 AtomicInteger 類的 getAndAdd( int delta ) 來說明:

/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
// 可以看到 AtomicInteger 類實際上也是調用 Unsafe 類裡的 getAndAddInt 方法來實作功能的
public final int getAndAdd(int delta) {
   return unsafe.getAndAddInt(this, valueOffset, delta);
}

@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
    	// 擷取舊值的操作
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}
           

  我們來一步步解析這段源碼。首先,對象o是this,也就是一個AtomicInteger對象。然後offset是一個常量VALUE ➡

private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value")。

  因為CAS是無鎖的,它允許操作失敗,是以一般都是搭配while循環來進行的,這裡采用的是do while循環,是保證循環體内的代碼至少執行一次的。它這裡聲明了一個變量v, 如果你執行失敗那就傳回原值v, 如果執行成功那麼就是 v + delta。

這裡的 weakCompareAndSetInt 方法我并不是太清晰,是以采用百度上的說法:

簡單來說,weakCompareAndSet操作僅保留了volatile自身變量的特性,而除去了happens-before規則帶來的記憶體語義。也就是說,weakCompareAndSet無法保證處理操作目标的volatile變量外的其他變量的執行順序( 編譯器和處理器為了優化程式性能而對指令序列進行重新排序 ),同時也無法保證這些變量的可見性。這在一定程度上可以提高性能。

CAS不得不注意的問題

  1. ABA的問題: CAS是無法注意到: **假設一個共享變量從A到B又從B到A的變化的,**此時可以采用的操作就是加上版本号或者時間戳(AtomicStampedReference類可以解決)。
  2. 循環時間長: 因為CAS基本多與自旋配合,那麼長時間不成功,會導緻CPU資源浪費。推薦JVM支援處理器提供的pause指令。
  3. 隻能保證一個共享變量是原子操作的(AtomicReference類可以解決)。

AQS

  AQS是AbstractQueuedSynchronizer的簡稱,翻譯過來就是

抽象隊列同步器

,它也可以拆分來了解:

  • 抽象: 提供了一種定義的方式,但是不包括具體的實作。
  • 隊列:使用了基于隊列的方式去實作。
  • 同步:實作了同步的功能。

  AQS是一系列并發流程控制類的基石,比如:

CountDownLatch / CyclicBarrier / Semaphore / ReentrantLock ReentrantReadWriteLock 等等。。。

同步器設計思路

  在具體分析之前,需要了解兩種同步的方式,

獨占模式和共享模式:

  • 獨占模式: 指的是目前資源僅供一個線程通路(ReentrantLock )
  • 共享模式: 指的是同時可以被多個線程所擷取, 具體的資源的個數可以通過參數指定(CountDownLatch 、CyclicBarrier )。

  以下是AQS的設計方案:

  1. 設定一個擁有可見性的變量

    int state = 0

    ,使用這個變量表示被擷取的資源的數量
  2. 線程在擷取資源之前需要檢查

    state

    的狀态,如果為0,就設定為1,表示擷取資源成功,如果不為0,則表示目前資源已經被占用,此時線程要阻塞并等待其他線程釋放資源。
  3. 為了能使得資源釋放後找到那些被阻塞的線程,需要把這些線程放到一個FIFO的隊列内。
  4. 當占用資源的線程釋放資源後,就可以從隊列裡取出一個正在阻塞的線程并喚醒執行。

AQS的資料成員:

在AQS中被定義的成員變量:

  1. state

    作用就是第一條。
state是被volatile修飾的變量, 保證了自身的可見性。 并且對它的操作都是
原子性的,state的通路方式有三種:
- getState()
- setState()
- compareAndSetState()
其中 compareAndSetState() 是借用了Unsafe的compareAndSwapInt() 方法實作的。	
           
  1. head

    tail

    ,定義了上面第四條中的FIFO隊列,head和tail分别指向隊列的頭部和尾部,并且這是一個雙向隊列。
  2. 定義

    Node

    ,隊列中的每個元素都是

    Node

    節點(相當重要)
static final class Node{
	  //标記一個結點(對應的線程)在共享模式下等待
      static final Node SHARED = new Node();
      // 标記一個結點(對應的線程)在獨占模式下等待
      static final Node EXCLUSIVE = null; 
		
	  // waitStatus的值,表示該結點(對應的線程)已被取消
      static final int CANCELLED = 1; 
      //waitStatus的值,表示後繼結點(對應的線程)需要被喚醒
      static final int SIGNAL = -1;
      //waitStatus的值,表示該結點(對應的線程)在等待某一條件
      static final int CONDITION = -2;
      /*waitStatus的值,表示有資源可用,新head結點需要繼續喚醒後繼結點(共享模式下,多線程并發釋放資源,而head喚醒其後繼結點後,需要把多出來的資源留給後面的結點;設定新的head結點時,會繼續喚醒其後繼結點)*/
      static final int PROPAGATE = -3;
      
       // 等待狀态,取值範圍,-3,-2,-1,0,1
      volatile int waitStatus;
      volatile Node prev; // 前驅結點
      volatile Node next; // 後繼結點
      volatile Thread thread; // 結點對應的線程
      Node nextWaiter; // 等待隊列裡下一個等待條件的結點

      //成員方法忽略,可以參考具體的源碼
}
           

AQS主要方法

  AQS的設計時基于模闆方法模式的,它有一些方法必須要子類去實作的,主要有:

  • isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
  • tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
  • tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但是沒有剩餘資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待節點傳回true,否則傳回false。

  這些方法都是

protected

方法,但是它們并沒有在AQS具體實作,而是直接抛出異常(這裡不适用抽象方法的目的是:避免強迫子類中把所有的抽象方法都實作一遍,減少無用功,這樣子類隻需要關注自己關心的抽象方法即可)。

AQS擷取資源

  擷取資源的入口是

acquire(int arg)方法。

arg是要擷取資源的個數,在獨占模式下始終為1,可以浏覽一下這個函數的代碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           

  首先調用tryAcquire(arg)嘗試去擷取資源。前面提到了這個方法是在子類具體實作的。

如果擷取資源失敗,就通過addWaiter(Node.EXCLUSIVE)方法把這個線程插入到等待隊列中。其中傳入的參數代表要插入的Node是獨占式的。這個方法的具體實作:

// 為目前線程和給定模式建立和劃分節點
 private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 找到尾節點
    Node pred = tail;
    // 如果尾節點不為空
    if (pred != null) {
    	// 将目前節點的前置節點置為尾節點
        node.prev = pred;
        // 使用CAS嘗試  如果成功就傳回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //  如果等待隊列為空或者上述CAS失敗,再自旋CAS插入
    enq(node);
    return node;
}

// final 不變性
// 自旋CAS将節點插入隊列,必要時初始化
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
           
上面的兩個函數好了解,就是在隊列的尾部插入新的Node節點,但是需要注意的是會出現多個線程同時争奪資源的情況,是以肯定會出現多個線程同時插入節點的操作,在這裡是通過CAS自旋的方式保證了操作的線程安全性。

  現在可以去看看

acquireQueued

方法,該方法就是把處于等待隊列的結點從頭到尾一個一個擷取:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 如果node的前驅結點p是head,表示node是第二個結點,就可以嘗試去擷取資源了
            if (p == head && tryAcquire(arg)) {
                // 拿到資源後,将head指向該結點。
                // 是以head所指的結點,就是目前擷取到資源的那個結點或null。
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果自己可以休息了,就進入waiting狀态,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

AQS釋放資源

釋放資源相對于擷取資源來說,會簡單許多。在AQS中自會有一小段實作。源碼如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // 如果狀态是負數,嘗試把它設定為0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 得到頭結點的後繼結點head.next
    Node s = node.next;
    // 如果這個後繼結點為空或者狀态大于0
    // 通過前面的定義我們知道,大于0隻有一種可能,就是這個結點已被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 等待隊列中所有還有用的結點,都向前移動
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果後繼結點不為空,
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

繼續閱讀