天天看點

從synchronize到CSA和AQS

目錄

  • ​導論​
  • ​​悲觀鎖和樂觀鎖​​
  • ​​公平鎖和非公平鎖​​
  • ​​可重入鎖和不可重入鎖​​
  • ​Synchronized 關鍵字​
  • ​實作原理​
  • ​​Java 對象頭​​
  • ​​Monitor​​
  • ​​JVM 對 synchronized 的處理​​
  • ​JVM 對 synchronized 的優化​
  • ​​自旋鎖與自适應自旋​​
  • ​​鎖消除​​
  • ​​鎖粗化​​
  • ​​輕量級鎖​​
  • ​​偏向鎖​​
  • ​CAS​
  • ​​操作模型​​
  • ​​重試機制(循環 CAS)​​
  • ​​底層實作​​
  • ​​ABA問題?​​
  • ​​可重入鎖 ReentrantLock​​
  • ​AQS​
  • ​​AQS中Node常量含義​​
  • ​​Condition隊列​​
  • ​​獨占模式下的AQS​​
  • ​​請求鎖​​
  • ​​建立 Node 節點并加傳入連結表​​
  • ​​挂起等待​​
  • ​​釋放鎖​​
  • ​​公平鎖如何實作​​
  • ​可重入讀寫鎖 ReentrantReadWriteLock​
  • ​​讀寫鎖機制​​
  • ​​實作原理​​

導論

Java 中的并發鎖大緻分為隐式鎖和顯式鎖兩種。

隐式鎖就是我們最常使用的 synchronized 關鍵字,顯式鎖主要包含兩個接口:Lock 和 ReadWriteLock,主要實作類分别為 ReentrantLock 和 ReentrantReadWriteLock,

這兩個類都是基于 AQS(AbstractQueuedSynchronizer) 實作的。還有的地方将 CAS 也稱為一種鎖,在包括 AQS 在内的很多并發相關類中,CAS 都扮演了很重要的角色。

就是一些名詞的概念,知道他們是什麼中文意思,知道了解意思之後在學習代碼的實作就比較輕松了。

這個文章也算帶着看了 AbstractQueuedSynchronizer 和 ReentrantReadWriteLock 和 ReentrantLock 的源碼。

悲觀鎖和樂觀鎖

悲觀鎖和獨占鎖是一個意思,

它假設一定會發生沖突,是以擷取到鎖之後會阻塞其他等待線程。

這麼做的好處是簡單安全,但是挂起線程和恢複線程都需要轉入核心态進行,這樣做會帶來很大的性能開銷。

悲觀鎖的代表是 synchronized。

然而在真實環境中,大部分時候都不會産生沖突。悲觀鎖會造成很大的浪費。

而樂觀鎖不一樣,它假設不會産生沖突,先去嘗試執行某項操作,失敗了再進行其他處理(一般都是不斷循環重試)。

這種鎖不會阻塞其他的線程,也不涉及上下文切換,性能開銷小。代表實作是 CAS。

公平鎖和非公平鎖

公平鎖是指各個線程在加鎖前先檢查有無排隊的線程,按排隊順序去獲得鎖。

非公平鎖是指線程加鎖前不考慮排隊問題,直接嘗試擷取鎖,擷取不到再去隊尾排隊。

值得注意的是,在 AQS 的實作中,一旦線程進入排隊隊列,即使是非公平鎖,線程也得乖乖排隊。

可重入鎖和不可重入鎖

如果一個線程已經擷取到了一個鎖,那麼它可以通路被這個鎖鎖住的所有代碼塊。

不可重入鎖與之相反。

Synchronized 關鍵字

Synchronized 是一種獨占鎖。

在修飾靜态方法時,鎖的是類,如 Object.class。

修飾非靜态方法時,鎖的是對象,即 this。修飾方法塊時,鎖的是括号裡的對象。

每個對象有一個鎖和一個等待隊列,鎖隻能被一個線程持有,其他需要鎖的線程需要阻塞等待。

鎖被釋放後,對象會從隊列中取出一個并喚醒,喚醒哪個線程是不确定的,不保證公平性。

實作原理

synchronized 是基于 Java 對象頭和 Monitor 機制來實作的。

Java 對象頭

一個對象在記憶體中包含三部分:對象頭,執行個體資料和對齊填充。

其中 Java 對象頭包含兩部分:

  • Class Metadata Address (類型指針)。存儲類的中繼資料的指針。虛拟機通過這個指針找到它是哪個類的執行個體。
  • Mark Word(标記字段)。存出一些對象自身運作時的資料。包括哈希碼,GC 分代年齡,鎖狀态标志等。

Monitor

Mark Word 有一個字段指向 monitor 對象。

monitor 中記錄了鎖的持有線程,等待的線程隊列等資訊。

前面說的每個對象都有一個鎖和一個等待隊列,就是在這裡實作的。 monitor 對象由 C++ 實作。其中有三個關鍵字段:

  • _owner 記錄目前持有鎖的線程
  • _EntryList 是一個隊列,記錄所有阻塞等待鎖的線程
  • _WaitSet 也是一個隊列,記錄調用 wait() 方法并還未被通知的線程。
從synchronize到CSA和AQS

Monitor的操作機制如下:

  1. 多個線程競争鎖時,會先進入 EntryList 隊列。競争成功的線程被标記為 Owner。其他線程繼續在此隊列中阻塞等待。
  2. 如果 Owner 線程調用 wait() 方法,則其釋放對象鎖并進入 WaitSet 中等待被喚醒。Owner 被置空,EntryList 中的線程再次競争鎖。
  3. 如果 Owner 線程執行完了,便會釋放鎖,Owner 被置空,EntryList 中的線程再次競争鎖。

JVM 對 synchronized 的處理

上面了解了 monitor 的機制,那虛拟機是如何将 synchronized 和 monitor 關聯起來的呢?

分兩種情況:

  • 如果同步的是代碼塊,編譯時會直接在同步代碼塊前加上 monitorenter 指令,代碼塊後加上 monitorexit 指令。這稱為顯示同步。
  • 如果同步的是方法,虛拟機會為方法設定 ACC_SYNCHRONIZED 标志。調用的時候 JVM 根據這個标志判斷是否是同步方法。

JVM 對 synchronized 的優化

synchronized 是重量級鎖,由于消耗太大,虛拟機對其做了一些優化。

自旋鎖與自适應自旋

在許多應用中,鎖定狀态隻會持續很短的時間,為了這麼一點時間去挂起恢複線程,不值得。

我們可以讓等待線程執行一定次數的循環,在循環中去擷取鎖。這項技術稱為自旋鎖,它可以節省系統切換線程的消耗,但仍然要占用處理器。

在 JDK1.4.2 中,自選的次數可以通過參數來控制。 JDK 1.6又引入了自适應的自旋鎖,不再通過次數來限制,而是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀态來決定。

鎖消除

虛拟機在運作時,如果發現一段被鎖住的代碼中不可能存在共享資料,就會将這個鎖清除。

鎖粗化

當虛拟機檢測到有一串零碎的操作都對同一個對象加鎖時,會把鎖擴充到整個操作序列外部。如 StringBuffer 的 append 操作。

輕量級鎖

對絕大部分的鎖來說,在整個同步周期内都不存在競争。如果沒有競争,輕量級鎖可以使用 CAS 操作避免使用互斥量的開銷。

偏向鎖

偏向鎖的核心思想是,如果一個線程獲得了鎖,那麼鎖就進入偏向模式,當這個線程再次請求鎖時,無需再做任何同步操作,即可擷取鎖。

CAS

操作模型

CAS 是 compare and swap 的簡寫,即比較并交換。

它是指一種操作機制,而不是某個具體的類或方法。

在 Java 平台上對這種操作進行了包裝。在 Unsafe 類中,調用代碼如下:

unsafe.compareAndSwapInt(this, valueOffset, expect, update);
      

它需要三個參數,分别是記憶體位置 V,舊的預期值 A 和新的值 B。

操作時,先從記憶體位置讀取到值,然後和預期值A比較。如果相等,則将此記憶體位置的值改為新值 B,傳回 true。如果不相等,說明和其他線程沖突了,則不做任何改變,傳回 false。

這種機制在不阻塞其他線程的情況下避免了并發沖突,比獨占鎖的性能高很多。 CAS 在 Java 的原子類和并發包中有大量使用。

重試機制(循環 CAS)

第一,CAS 本身并未實作失敗後的處理機制,它隻負責傳回成功或失敗的布爾值,後續由調用者自行處理。隻不過我們最常用的處理方式是重試而已。

第二,這句話很容易了解錯,被了解成重新比較并交換。實際上失敗的時候,原值已經被修改,如果不更改期望值,再怎麼比較都會失敗。而新值同樣需要修改。

是以正确的方法是,使用一個死循環進行 CAS 操作,成功了就結束循環傳回,失敗了就重新從記憶體讀取值和計算新值,再調用 CAS。看下 AtomicInteger 的源碼就什麼都懂了:

​​CAS-Java基礎​​

public final int incrementAndGet () {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
    }
}
      

底層實作

CAS 主要分三步,讀取-比較-修改。

其中比較是在檢測是否有沖突,如果檢測到沒有沖突後,其他線程還能修改這個值,那麼 CAS 還是無法保證正确性。是以最關鍵的是要保證比較-修改這兩步操作的原子性。

CAS 底層是靠調用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架構中的 compare and exchange 指令。在多核的情況下,這個指令也不能保證原子性,需要在前面加上 lock 指令。

lock 指令可以保證一個 CPU 核心在操作期間獨占一片記憶體區域。那麼 這又是如何實作的呢?

在處理器中,一般有兩種方式來實作上述效果:總線鎖和緩存鎖。

在多核處理器的結構中,CPU 核心并不能直接通路記憶體,而是統一通過一條總線通路。

總線鎖就是鎖住這條總線,使其他核心無法通路記憶體。這種方式代價太大了,會導緻其他核心停止工作。

而緩存鎖并不鎖定總線,隻是鎖定某部分記憶體區域。當一個 CPU 核心将記憶體區域的資料讀取到自己的緩存區後,它會鎖定緩存對應的記憶體區域。鎖住期間,其他核心無法操作這塊記憶體區域。

CAS 就是通過這種方式實作比較和交換操作的原子性的。

值得注意的是, CAS 隻是保證了操作的原子性,并不保證變量的可見性,是以變量需要加上 volatile 關鍵字。

ABA問題?

比如是有兩個線程A,B ,一個變量:蘋果

A線程期望拿到一個蘋果

B線程一進來把蘋果改成了梨子,但是在最後結束的時候又把梨子換成了蘋果

A線程在此期間是不知情的,以為自己拿到的蘋果還是原來的那一個,其實已經被換過了

如何解決ABA問題?

原子引用-解決ABA問題

AtomicStampedReference

類似于樂觀鎖,比較時會去對比版本号,确認變量是否被換過了。

可重入鎖 ReentrantLock

ReentrantLock 使用代碼實作了和 synchronized 一樣的語義,包括可重入,保證記憶體可見性和解決競态條件問題等。相比 synchronized,它還有如下好處:

  • 支援以非阻塞方式擷取鎖
  • 可以響應中斷
  • 可以限時
  • 支援了公平鎖和非公平鎖

基本用法如下:

public class Counter {
private final Lock lock = new ReentrantLock();
private volatile int count;

public void incr() {
lock.lock();
try {
count++;
        } finally {
lock.unlock();
        }
    }

public int getCount() {
return count;
    }
}
      

ReentrantLock 内部有兩個内部類,分别是 FairSync 和 NoFairSync,對應公平鎖和非公平鎖。他們都繼承自 Sync。Sync 又繼承自AQS。

AQS

AQS全稱​

​AbstractQueuedSynchronizer​

​,即抽象的隊列同步器,是一種用來建構鎖和同步器的架構。

AQS 中有兩個重要的成員:

  • 成員變量 state。用于表示鎖現在的狀态,用 volatile 修飾,保證記憶體一緻性。

    同時所用對 state 的操作都是使用 CAS 進行的。

    state 為0表示沒有任何線程持有這個鎖,線程持有該鎖後将 state 加1,釋放時減1。

    多次持有釋放則多次加減。

  • 還有一個雙向連結清單,連結清單除了頭結點外,每一個節點都記錄了線程的資訊,代表一個等待線程。這是一個 FIFO 的連結清單。
從synchronize到CSA和AQS

下面以 ReentrantLock 非公平鎖的代碼看看 AQS 的原理。

AQS中Node常量含義

CANCELLED

waitStatus值為1時表示該線程節點已釋放(逾時、中斷),已取消的節點不會再阻塞。

SIGNAL

waitStatus為-1時表示該線程的後續線程需要阻塞,即隻要前置節點釋放鎖,就會通知辨別為 SIGNAL 狀态的後續節點的線程

CONDITION

waitStatus為-2時,表示該線程在condition隊列中阻塞(Condition有使用)

PROPAGATE

waitStatus為-3時,表示該線程以及後續線程進行無條件傳播(CountDownLatch中有使用)共享模式下, PROPAGATE 狀态的線程處于可運作狀态

Condition隊列

除了同步隊列之外,AQS中還存在Condition隊列,這是一個單向隊列。

調用ConditionObject.await()方法,能夠将目前線程封裝成Node加入到Condition隊列的末尾,

然後将擷取的同步狀态釋放(即修改同步狀态的值,喚醒在同步隊列中的線程)。

Condition隊列也是FIFO。調用ConditionObject.signal()方法,能夠喚醒firstWaiter節點,将其添加到同步隊列末尾。

獨占模式下的AQS

所謂獨占模式,即隻允許一個線程擷取同步狀态,當這個線程還沒有釋放同步狀态時,其他線程是擷取不了的,隻能加入到同步隊列,進行等待。

很明顯,我們可以将state的初始值設為0,表示空閑。

當一個線程擷取到同步狀态時,利用CAS操作讓state加1,表示非空閑,那麼其他線程就隻能等待了。

釋放同步狀态時,不需要CAS操作,因為獨占模式下隻有一個線程能擷取到同步狀态。

ReentrantLock、CyclicBarrier正是基于此設計的。

例如,ReentrantLock,state初始化為0,表示未鎖定狀态。

A線程lock()時,會調用tryAcquire()獨占該鎖并将state+1

獨占模式下的AQS是不響應中斷的,指的是加入到同步隊列中的線程,如果因為中斷而被喚醒的話,

不會立即傳回,并且抛出InterruptedException。

而是再次去判斷其前驅節點是否為head節點,決定是否争搶同步狀态。

如果其前驅節點不是head節點或者争搶同步狀态失敗,那麼再次挂起。

從synchronize到CSA和AQS

請求鎖

請求鎖時有三種可能:

  1. 如果沒有線程持有鎖,則請求成功,目前線程直接擷取到鎖。
  2. 如果目前線程已經持有鎖,則使用 CAS 将 state 值加1,表示自己再次申請了鎖,釋放鎖時減1。這就是可重入性的實作。
  3. 如果由其他線程持有鎖,那麼将自己添加進等待隊列。
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock.  Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
//沒有線程持有鎖時,直接擷取鎖,對應情況1
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
        }

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
        }
    }


/**以獨占模式擷取,忽略中斷。通過至少調用一次 {@link tryAcquire} 實作,成功傳回。
否則線程會排隊,可能會反複阻塞和解除阻塞,調用 {@link tryAcquire} 直到成功。
該方法可用于實作方法{@link Locklock}。 
@param arg 擷取參數。這個值被傳送到 {@link tryAcquire},但不會被解釋,可以代表任何你喜歡的東西。*/

public final void acquire(int arg) {
if (!tryAcquire(arg) &&//在此方法中會判斷目前持有線程是否等于自己,對應情況2
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//将自己加入隊列中,對應情況3
selfInterrupt();
    }
      

建立 Node 節點并加傳入連結表

如果沒競争到鎖,這時候就要進入等待隊列。

隊列是預設有一個 head 節點的,并且不包含線程資訊。

上面情況3中,addWaiter 會建立一個 Node,并添加到連結清單的末尾,Node 中持有目前線程的引用。同時還有一個成員變量 waitStatus,表示線程的等待狀态,初始值為0。我們還需要關注兩個值:

  • CANCELLED,cancelled,值為1,表示取消狀态,就是說我不要這個鎖了,請你把我移出去。
  • SINGAL,singal,值為-1,表示下一個節點正在挂起等待,注意是下一個節點,不是目前節點。

同時,加到連結清單末尾的操作使用了 CAS+死循環的模式,很有代表性,拿出來看一看:

/**

AbstractQueuedSynchronizer

為目前線程和給定模式建立和排隊節點。 
@param mode Node.EXCLUSIVE 表示獨占,Node.SHARED 表示共享 @return 新節點
*/

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
            }
        }
enq(node);
return node;
    }
      

可以看到,在死循環裡調用了 CAS 的方法。

如果多個線程同時調用該方法,那麼每次循環都隻有一個線程執行成功,其他線程進入下一次循環,重新調用。

N個線程就會循環N次。這樣就在無鎖的模式下實作了并發模型。

挂起等待

  • 如果此節點的上一個節點是頭部節點,則再次嘗試擷取鎖,擷取到了就移除并傳回。擷取不到就進入下一步;
  • 判斷前一個節點的 waitStatus,如果是 SINGAL,則傳回 true,并調用 LockSupport.park() 将線程挂起;
  • 如果是 CANCELLED,則将前一個節點移除;
  • 如果是其他值,則将前一個節點的 waitStatus 标記為 SINGAL,進入下一次循環。

可以看到,一個線程最多有兩次機會,還競争不到就去挂起等待。

/**
以獨占不間斷模式擷取已在隊列中的線程。
由條件等待方法以及擷取使用。 
@param node 節點 
@param arg 擷取參數 
@return {@code true} 如果在等待時中斷
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
                }
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
            }
        } finally {
if (failed)
cancelAcquire(node);
        }
    }

      

釋放鎖

  • 調用 tryRelease,此方法由子類實作。實作非常簡單,如果目前線程是持有鎖的線程,就将 state 減1。減完後如果 state 大于0,表示目前線程仍然持有鎖,傳回 false。如果等于0,表示已經沒有線程持有鎖,傳回 true,進入下一步;
  • 如果頭部節點的 waitStatus 不等于0,則調用LockSupport.unpark()喚醒其下一個節點。頭部節點的下一個節點就是等待隊列中的第一個線程,這反映了 AQS 先進先出的特點。另外,即使是非公平鎖,進入隊列之後,還是得按順序來。
public final boolean release(int arg) {
    if (tryRelease(arg)) { //将 state 減1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) { 
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null) //喚醒第一個等待的線程
        LockSupport.unpark(s.thread);
}
      

公平鎖如何實作

上面分析的是非公平鎖,那公平鎖呢?很簡單,在競争鎖之前判斷一下等待隊列中有沒有線程在等待就行了。

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //判斷等待隊列是否有節點
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
        }
    }
    ......
return false;
}

      

可重入讀寫鎖 ReentrantReadWriteLock

讀寫鎖機制

了解 ReentrantLock 和 AQS 之後,再來了解讀寫鎖就很簡單了。

讀寫鎖有一個讀鎖和一個寫鎖,分别對應讀操作和鎖操作。

鎖的特性如下:

  • 隻有一個線程可以擷取到寫鎖。在擷取寫鎖時,隻有沒有任何線程持有任何鎖才能擷取成功;
  • 如果有線程正持有寫鎖,其他任何線程都擷取不到任何鎖;
  • 沒有線程持有寫鎖時,可以有多個線程擷取到讀鎖。

讀寫鎖雖然有兩個鎖,但實際上隻有一個等待隊列。

  • 擷取寫鎖時,要保證沒有任何線程持有鎖;
  • 寫鎖釋放後,會喚醒隊列第一個線程,可能是讀鎖和寫鎖;
  • 擷取讀鎖時,先判斷寫鎖有沒有被持有,沒有就可以擷取成功;
  • 擷取讀鎖成功後,會将隊列中等待讀鎖的線程挨個喚醒,知道遇到等待寫鎖的線程位置;
  • 釋放讀鎖時,要檢查讀鎖數,如果為0,則喚醒隊列中的下一個線程,否則不進行操作。