天天看點

并發工具類:AQS有哪些作用?(二)

并發工具類:AQS有哪些作用?(二)

AQS封裝的加鎖和解鎖方法

AQS提供了獨占鎖和共享鎖兩種加鎖方式,每種方式都有響應中斷和不響應中斷的差別,是以AQS的鎖可以分為如下四類

  1. 不響應中斷的獨占鎖(acquire)
  2. 響應中斷的獨占鎖(acquireInterruptibly)
  3. 不響應中斷的共享鎖(acquireShared)
  4. 響應中斷的共享鎖(acquireSharedInterruptibly)

而釋放鎖的方式隻有兩種

  1. 獨占鎖的釋放(release)
  2. 共享鎖的釋放(releaseShared)

獨占鎖的加鎖

我們前面在寫入隊的邏輯的時候,直接将Thread放到Queue中就完事了。看看Doug Lea是如何寫的

AQS隊列,當隊列為空時,第一次生成兩個節點,第一個節點代表目前占有鎖的線程,第二個節點為搶鎖失敗的節點。不為空的時候,每次生成一個節點放入隊尾。

當把線程放入隊列中時,後續應該做哪些操作呢?

如果讓你寫是不是直接放入隊列中就完事了?但Doug Lea是這樣做的

  1. 如果目前線程是隊列中的第二個節點則再嘗試搶一下鎖(不是第二個節點就不用搶來,輪不到),這樣避免了頻繁的阻塞和喚醒線程,提高了效率
  2. 上鬧鐘,讓上一個線程來喚醒自己(後續會說到,即更改上一個節點的waitStatus)
  3. 阻塞

另外還得注意一下出隊的邏輯,當A線程釋放鎖,喚醒隊列中的B線程,A線程會從隊列中删除。那出隊這個事情由誰來做?是由被喚醒的線程來做,即B線程

從加鎖的模版方法開始分析

// 調用ReentrantLock.FairSync#lock方法其實就是調用acquire(1);
public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//擷取到鎖傳回false,否則傳回true
    selfInterrupt();//目前線程将自己中斷
}      
  1. 先嘗試擷取,如果擷取到直接退出,否則進入2
  2. 擷取鎖失敗,以獨占模式将線程包裝成Node放到隊列中
  3. 如果放入的節點是隊列的第二個節點,則再嘗試擷取鎖,因為此時鎖有可能釋放類,不是第二個節點就不用嘗試了,因為輪不到。如果擷取到鎖則将目前節點設為head節點,退出,否則進入4
  4. 設定好鬧鐘後将自己阻塞
  5. 線程被喚醒,重新競争鎖,擷取鎖成功,繼續執行。如果線程發生過中斷,則最後重置中斷标志位位true,即執行selfInterrupt()方法

從代碼層面詳細分析一波,走起

tryAcquire是讓子類實作的

protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}      

這裡通過抛出異常來告訴子類要重寫這個方法,為什麼不将這個方法定義為abstract方法呢?因為AQS有2種功能,獨占和共享,如果用abstract修飾,則子類需要同時實作兩種功能的方法,對子類不友好

  1. 當隊列不為空,嘗試将新節點通過CAS的方式設定為尾節點,如果成功,傳回附加着目前線程的節點
  2. 當隊列為空,或者新節點通過CAS的方式設定為尾節點失敗,進入enq方法
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}      
  1. 當隊列不為空,一直CAS,直到把新節點放入隊尾
  2. 當隊列為空,先往對列中放入一個節點,在把傳入的節點CAS為尾節點

前面已經說過了哈,AQS隊列為空時,第一次會放入2個節點

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    // 隊列為空,進行初始化,
    if (t == null) {
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}      

放入隊列後還要幹什麼?

  1. 如果是第二個節點再嘗試擷取一波鎖,因為此時有可能鎖已經釋放了,其他節點就不用了,因為還輪不到
  2. 上鬧鐘,讓别的線程喚醒自己
  3. 阻塞自己
// 自旋擷取鎖,直到擷取鎖成功,或者異常退出
// 但是并不是busy acquire,因為當擷取失敗後會被挂起,由前驅節點釋放鎖時将其喚醒
// 同時由于喚醒的時候可能有其他線程競争,是以還需要進行嘗試擷取鎖,展現的非公平鎖的精髓。
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 擷取前繼節點
      final Node p = node.predecessor();
      // node節點的前繼節點是head節點,嘗試擷取鎖,如果成功說明head節點已經釋放鎖了
      // 将node設為head開始運作(head中不包含thread)
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        // 将第一個節點出隊
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      // 擷取鎖失敗後是否可以挂起
      // 如果可以挂起,則阻塞目前線程(擷取鎖失敗的節點)
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}      

根據前繼節點的狀态,是否可以阻塞目前擷取鎖失敗的節點

一般情況會經曆如下2個過程

  1. 預設情況下上一個節點的waitStatus=0,是以會進入compareAndSetWaitStatus方法,通過cas将上一個節點的waitStatus設定為SIGNAL,然後return false
  2. shouldParkAfterFailedAcquire方法外面是一個死循環,當再次進入這個方法時,如果上一步cas成功,則會走第一個if,return true。接着執行parkAndCheckInterrupt,線程會阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  // 前繼節點釋放時會unpark後繼節點,可以挂起
  if (ws == Node.SIGNAL)
    return true;
  if (ws > 0) {
    //将CANCELLED狀态的線程清理出隊列
    // 後面會提到為什麼會有CANCELLED的節點
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // 将前繼節點的狀态設定為SIGNAL,代表釋放鎖時需要喚醒後面的線程
    // cas更新可能失敗,是以不能直接傳回true
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}      

shouldParkAfterFailedAcquire表示上好鬧鐘了,可以阻塞線程了。後續當線程被喚醒的時候會從return語句出繼續執行,然後進入acquireQueued方法的死循環,重新搶鎖。至此,加鎖結束。

// 挂起線程,傳回是否被中斷過
private final boolean parkAndCheckInterrupt() {
  // 阻塞線程
  LockSupport.park(this);
  // 傳回目前線程是否被調用過Thread#interrupt方法
  return Thread.interrupted();
}      

最後用一個流程圖來解釋不響應中斷的獨占鎖

并發工具類:AQS有哪些作用?(二)

入隊過程中有異常該怎麼辦?

這部分内容比較難,看着吃力的小夥伴跳過即可,對主流程影響不大

可以看到上面調用acquireQueued方法發生異常的時候,會調用cancelAcquire方法,我們就詳細分析一下這個cancelAcquire方法有哪些作用?

哪些地方執行發生異常會執行cancelAcquire?

可以看到調用cancelAcquire方法的有如下幾個部分

并發工具類:AQS有哪些作用?(二)

分析這些方法的調用,發現基本就是如下2個地方會發生異常

  1. 嘗試擷取鎖的方法如tryAcquire,這些一般是交給子類來實作的
  2. 當線程是被調用Thread#interrupt方法喚醒,如果要響應中斷,會抛出InterruptedException
并發工具類:AQS有哪些作用?(二)
并發工具類:AQS有哪些作用?(二)
//處理異常退出的node
private void cancelAcquire(Node node) {
  if (node == null)
    return;

  // 設定該節點不再關聯任何線程
  node.thread = null;

  // 跳過CANCELLED節點,找到一個有效的前繼節點
  Node pred = node.prev;
  while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;

  // 擷取過濾後的有效節點的後繼節點
  Node predNext = pred.next;

  // 設定狀态為取消
  node.waitStatus = Node.CANCELLED;

  // case 1
  if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);
  } else {
    // case 2
    int ws;
    if (pred != head &&
      ((ws = pred.waitStatus) == Node.SIGNAL ||
       (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
      pred.thread != null) {
      Node next = node.next;
      if (next != null && next.waitStatus <= 0)
        compareAndSetNext(pred, predNext, next);
    } else {
      // case3
      unparkSuccessor(node);
    }

    node.next = node; // help GC
  }
}      

将node出隊有如下三種情況

  1. 目前節點是tail
  2. 目前節點不是head的後繼節點,也不是tail
  3. 目前節點是head的後繼節點

目前節點是tail

compareAndSetTail,将tail指向pred

compareAndSetNext,将pred的next指向null,也就是把目前節點移出隊列

并發工具類:AQS有哪些作用?(二)

目前節點不是head的後繼節點,也不是tail

并發工具類:AQS有哪些作用?(二)

這裡将node的前繼節點的next指向了node的後繼節點,即compareAndSetNext(pred, predNext, next),注意pred和node節點中間有可能有CANCELLED的節點,怕亂就沒畫出來

目前節點是head的後繼節點

沒有對隊列進行操作,隻是進行head後繼節點的喚醒操作(unparkSuccessor方法,後面會分析這個方法),因為此時他是head的後繼節點,還是有可能擷取到鎖的,是以喚醒它嘗試擷取一波鎖,當再次調用到shouldParkAfterFailedAcquire(判斷是否應該阻塞的方法時)會把CANCELLED狀态的節點從隊列中删除

獨占鎖的解鎖

獨占鎖是釋放其實就是利用cas将state-1,當state=0表示鎖被釋放,需要将阻塞隊列中的線程喚醒

public final boolean release(int arg) {
  // 是否需要釋放鎖,傳回ture則需要釋放鎖
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      // 喚醒阻塞的線程
      unparkSuccessor(h);
    return true;
  }
  return false;
}      

tryRelease即具體的解鎖邏輯,需要子類自己去實作

喚醒同步隊列中的線程,可以看到前面加了判斷h != null && h.waitStatus != 0

h = null,說明同步同步隊列中沒有資料,則不需要喚醒

h = null && waitStatus = 0,同步隊列是有了,但是沒有線程給自己上鬧鐘,不用喚醒

h != null && waitStatus < 0,說明頭節點被人上了鬧鐘,自己需要喚醒阻塞的線程

h != null && waitStatus > 0,頭節點因為發生異常被設定為取消,但還是得喚醒線程

private void unparkSuccessor(Node node) {

  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

  // 頭結點的下一個節點
  Node s = node.next;
  // 為空或者被取消
  if (s == null || s.waitStatus > 0) {
    s = null;
    // 從隊列尾部向前周遊找到最前面的一個waitStatus<=0的節點
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    // 喚醒節點,但并不表示它持有鎖,要從阻塞的地方開始運作
    LockSupport.unpark(s.thread);
}      

為什麼要從後向前找第一個非CANCELLED的節點呢?

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // 線程在這裡挂起了
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}      

這其實和入隊的邏輯有關系,假如Node1在圖示位置挂起了,Node1後面又陸續增加了Node2和Node3,如果此時從前向後周遊會導緻元素丢失,不能正确喚醒線程

并發工具類:AQS有哪些作用?(二)

分析一下獨占鎖響應中斷和不響應中斷的差別

我們之前說過獨占鎖可以響應中斷,也可以不響應中斷,調用的方法如下?

  1. 不響應中斷的獨占鎖(acquire)
  2. 響應中斷的獨占鎖(acquireInterruptibly)

是以我們隻需要看這2個方法的差別在哪裡就可以,我下面隻列出有差別的部分哈。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}      
public final void acquireInterruptibly(int arg)
         throws InterruptedException {
     // 判斷線程是否被中斷
     if (Thread.interrupted())
         throw new InterruptedException();
     if (!tryAcquire(arg))
         doAcquireInterruptibly(arg);
 }      

acquire在嘗試擷取鎖的時候完全不管線程有沒有被中斷,而acquireInterruptibly在嘗試擷取鎖之前會判斷線程是否被中斷,如果被中斷,則直接抛出異常。

tryAcquire方法一樣,是以我們隻需要對比acquireQueued方法和doAcquireInterruptibly方法的差別即可

并發工具類:AQS有哪些作用?(二)

執行acquireQueued方法當線程發生中斷時,隻是将interrupted設定為true,并且調用selfInterrupt方法将中斷标志位設定為true

并發工具類:AQS有哪些作用?(二)

而執行doAcquireInterruptibly方法,當線程發生中斷時,直接抛出異常。

最後看一下parkAndCheckInterrupt方法,這個方法中判斷線程是否中斷的邏輯特别巧!

private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  return Thread.interrupted();
}      

Thread類提供了如下2個方法來判斷線程是否是中斷狀态

  1. isInterrupted
  2. interrupted

這裡為什麼用interrupted而不是isInterrupted的呢?(這部分内容比較難,看着吃力的小夥伴跳過即可,對主流程影響不大)

示範一下這2個方法的差別

@Test
public void testInterrupt() throws InterruptedException {
    Thread thread = new Thread(() -> {
        while (true) {}
    });
    thread.start();
    TimeUnit.MICROSECONDS.sleep(100);
    thread.interrupt();
    // true
    System.out.println(thread.isInterrupted());
    // true
    System.out.println(thread.isInterrupted());
    // true
    System.out.println(thread.isInterrupted());
}      
@Test
public void testInterrupt2() {
    Thread.currentThread().interrupt();
    // true
    System.out.println(Thread.interrupted());
    // false
    System.out.println(Thread.interrupted());
    // false
    System.out.println(Thread.interrupted());
}      

isInterrupted和interrupted的方法差別如下

Thread#isInterrupted:測試線程是否是中斷狀态,執行後不更改狀态标志

Thread#interrupted:測試線程是否是中斷狀态,執行後将中斷标志更改為false

接着再寫2個例子

public static void main(String[] args) {
  LockSupport.park();
  // end被一直阻塞沒有輸出
  System.out.println("end");
}      
public static void main(String[] args) {
  Thread.currentThread().interrupt();
  LockSupport.park();
  // 輸出end
  System.out.println("end");
}      

可以看到當線程被中斷時,調用park()方法并不會被阻塞

public static void main(String[] args) {
  Thread.currentThread().interrupt();
  LockSupport.park();
  // 傳回中斷狀态,并且清除中斷狀态
  Thread.interrupted();
  // 輸出start
  System.out.println("start");
  LockSupport.park();
  // end被阻塞,沒有輸出
  System.out.println("end");
}      

到這我們就能了解為什麼要進行中斷的複位了

  • 如果目前線程是非中斷狀态,則在執行park時被阻塞,傳回中斷狀态false
  • 如果目前線程是中斷狀态,則park方法不起作用,傳回中斷狀态true,interrupted将中斷複位,變為false
  • 再次執行循環的時候,前一步已經線上程的中斷狀态進行了複位,則再次調用park方法時會阻塞

是以這裡要對中斷進行複位,是為了不讓循環一直執行,讓目前線程進入阻塞狀态,如果不進行複位,前一個線程在擷取鎖之後執行了很耗時的操作,那目前線程豈不是要一直執行死循環,造成CPU使用率飙升?

獨占鎖的擷取和釋放我們已經搞清楚了,來分析共享鎖的擷取和釋放

共享鎖的加鎖

加鎖成功則執行業務邏輯,否則入隊并阻塞

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}      

入隊的邏輯和獨占鎖的入隊邏輯差不多

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}      

在獲鎖成功後,除了設定頭節點,還多了一步操作,就是喚醒同步隊列中的所有線程

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}      

共享鎖的解鎖

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}      

喚醒同步隊列中的所有線程

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}      

獨占鎖和共享鎖的差別

  1. 獨占鎖:一個鎖能被多個線程擷取,加鎖失敗阻塞放入同步隊列,加鎖成功執行業務邏輯,解鎖喚醒同步隊列中的一個線程
  2. 共享鎖:一個鎖隻能被一個線程擷取,加鎖失敗阻塞放入同步隊列,加鎖成功會喚醒同步隊列中的所有線程,解鎖也會喚醒同步隊列中的所有線程

參考部落格