天天看點

并發程式設計專題(五)抽象隊列同步器AQS應用Lock詳解

1、前言

Java并發程式設計核心在于java.util.concurrent包, 而juc當中的大多數同步器實作都是圍繞着共同的基礎行為,比如等待隊列、條件隊列、獨占擷取、共享擷取等,而這個行為的抽象就是基于AbstractQueuedSynchronizer簡稱AQS,AQS定義了一套多線程通路共享資源的同步器架構,是一個依賴狀态(state)的同步器.

AQS具備的特性

  • 阻塞等待隊列
  • 共享/獨占
  • 公平/非公平
  • 可重入
  • 允許中斷

juc包中的同步器的實作例如Latch,Barrier,BlockingQuene等,都是基于AQS架構實作

1、AQS内部維護屬性 volatile int state (32位)

  • state表示資源的可用狀态

2、state三種通路方式

getState()、setState()、compareAndSetState()

3、AQS定義兩種資源共享方式

  • Exclusive-獨占,隻有一個線程能執行,如ReentrantLock
  • Share-共享,多個線程可以同時執行,如Semaphore/CountDownLatch

4、AQS定義兩種隊列

  • 同步等待隊列
  • 條件等待隊列

不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:

  • isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。
  • tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
  • tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。

說一下AQS中定義的兩種隊列

同步等待隊列:隻有一個線程搶占到鎖運作,其他線程在依次排隊等待

AQS當中的同步等待隊列也稱CLH隊列,CLH隊列是Craig、Landin、Hagersten三人發明的一種基于雙向連結清單資料結構的隊列,是FIFO先入先出線程等待隊列,Java中的CLH隊列是原CLH隊列的一個變種,線程由原自旋機制改為阻塞機制。

并發程式設計專題(五)抽象隊列同步器AQS應用Lock詳解

(紅色:表示線程在排隊等待 )

條件等待隊列:Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),隻有當該條件具備時,這些等待線程才會被喚醒,進而重新争奪鎖

并發程式設計專題(五)抽象隊列同步器AQS應用Lock詳解

(紅色:表示線程在排隊等待 )

2、AQS源碼分析

打開AQS源碼,可以看到一個靜态内部類Node,不管是等待隊列還是同步隊列,都是基于Node實作的

1、Node内部定義的常量

  • 表示節點的模式SHARED/EXCLUSIVE
  • 表示節點在隊列中變換的狀态值
  • 定義兩種隊列
//标記節點是共享模式
static final Node SHARED = new Node();


//标記節點是獨占模式      
static final Node EXCLUSIVE = null;


//在同步隊列中等待的線程等待逾時或者被中斷,需要從同步隊列中取消等待
static final int CANCELLED =  1;


//後繼節點的線程處于等待狀态,而目前的節點如果釋放了同步狀态或者被取消,
//将會通知後繼節點,使後繼節點的線程得以運作。
static final int SIGNAL    = -1;


//節點在等待隊列中,節點的線程等待在Condition上,
//當其他線程對Condition調用了signal()方法後,
//該節點會從等待隊列中轉移到同步隊列中,加入到同步狀态的擷取中      
static final int CONDITION = -2;


// 表示下一次共享式同步狀态擷取将會被無條件地傳播下去
static final int PROPAGATE = -3;


//标記目前節點的信号量狀态 (1,0,-1,-2,-3)5種狀态
//使用CAS更改狀态,volatile保證線程可見性,高并發場景下,
//即被一個線程修改後,狀态會立馬讓其他線程可見
volatile int waitStatus;


// 前驅節點,目前節點加入到同步隊列中被設定
volatile Node prev;


// 後繼結點
volatile Node next;


//節點同步狀态的線程
volatile Thread thread;
// 節點同步狀态的線程
Node nextWaiter;


// 傳回下一節點是否是共享
final boolean isShared() {
    return nextWaiter == SHARED;
}


// 傳回前驅節點
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
//空節點,用于标記共享模式
Node() {    // Used to establish initial head or SHARED marker
}


//用于同步隊列CLH
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}   


//用于條件隊列
Node(Thread thread, int waitStatus) { // Used by Condition
     this.waitStatus = waitStatus;
     this.thread = thread;
}

           

看完這些變量,我麼可以大緻判斷每個節點(線程)在隊列中的狀态變換表示

2、同步等待隊列的一些核心方法

  • tryAcquire() :嘗試擷取資源,成功就傳回,就是一個嘗試加鎖方法
  • addWaiter() :将目前線程加到等待隊列的隊尾
  • acquireQueued():讓線程阻塞在等待隊列裡面擷取資源,直到擷取到資源才傳回,如果在等待的過程被中斷,就傳回false,否則傳回true。

2.1、tryAcquire():就是一個嘗試加鎖方法

這個方法AQS隻定義了一個接口,都交給我們具體的實作類(ReentrantLock...)來實作。

2.2、addWaiter():将目前線程加到等待隊列隊尾

private Node addWaiter(Node mode) {
    // 1. 将目前線程建構成Node類型
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 2. 1目前尾節點是否為null?
    if (pred != null) {
        // 2.2 将目前節點尾插入的方式
        node.prev = pred;
        // 2.3 通過CAS将節點插入同步隊列的尾部
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果存在競争 搶占失敗,就通過自旋
    enq(node);
    return node;
}           

enq():搶占失敗,通過自旋加入到隊列尾部

private Node enq(final Node node) {
 // 自旋
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //隊列為空需要初始化,建立空的頭節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            //set尾部節點
            if (compareAndSetTail(t, node)) {//目前節點置為尾部
                t.next = node; //前驅節點的next指針指向目前節點
                return t;
            }
        }
    }
}

           

2.3、acquireQueued():讓線程阻塞在等待隊列裡面擷取資源,直到擷取到資源才傳回

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
       boolean interrupted = false;
        //死循環
       for (;;) {
           //找到目前結點的前驅結點
           final Node p = node.predecessor();
           //如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
           if (p == head && tryAcquire(arg)) {
               //擷取同步狀态成功,将目前結點設定為頭結點。
               setHead(node);
                // help GC
               p.next = null;
               failed = false;
               return interrupted;
           }
        //如果前驅節點不是Head,通過shouldParkAfterFailedAcquire
        // 判斷是否應該阻塞
        // 前驅節點信号量為-1,目前線程可以安全被parkAndCheckInterrupt用來阻塞線程 
           if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
               interrupted = true;
        }
    } finally {
        if (failed)
        // 終結掉正在擷取鎖的線程
            cancelAcquire(node);
    }
}

           

shouldParkAfterFailedAcquire():判斷目前節點能否被安全挂起park

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
   // 若前驅結點的狀态是SIGNAL,意味着目前結點可以被安全地park


         return true;
    if (ws > 0) {
     // 前驅節點狀态如果被取消狀态,将被移除出隊列
         do {
            node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    //将其設定為SIGNAL狀态,然後目前結點才可以可以被安全地park
     // 目前驅節點waitStatus為 0 or PROPAGATE狀态時     
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}           

parkAndCheckInterrupt():阻塞目前節點,傳回目前Thread的中斷狀态

// LockSupport.park 
// 底層實作邏輯調用系統核心功能 pthread_mutex_lock 阻塞線程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//阻塞
    return Thread.interrupted();
}           

3、AQS主要實作源碼解析

ReentrantLock:同一時間隻有一個線程擷取到鎖

lock():

公平鎖:

  1. 調用tryAcquire()搶占鎖,成功設定目前線程為獨占模式(還有可重入變量)
  2. 搶占失敗,調用addWaiter(),acquireQueued()将目前節點加入到等待隊列等待擷取資源
final void lock() {
    acquire(1);
}
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 得到同步資源的狀态
    int c = getState();
    // 沒有被占用
    if (c == 0) {
        // 如果目前節點沒有前驅節點 
        // 且CAS成功 設定目前線程獨占資源 
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 判斷是目前線程
    else if (current == getExclusiveOwnerThread()) {
        // 可重入鎖 state加一
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

           

非公平鎖:

CAS操作搶占鎖,搶占成功,設定目前線程

final void lock() {
    if (compareAndSetState(0, 1))
         setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
    }           

unlock():

調用tryRelease()釋放鎖

protected final boolean tryRelease(int releases) {
// 可重入次數減一
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        // 設定搶占線程為null,
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}           

總結:公平和非公平鎖的差別就在于,公平鎖會根據自己的前驅節點的狀态,隻有前驅節點為空,才會去搶占鎖(也就是依次搶占),就是說根據加入到等待隊列中的順序搶占鎖,非公平鎖模式下,目前線程和等待隊列中的線程一起搶占鎖,各憑本事

繼續閱讀