天天看點

Java并發 -- AQS(AbstractQueuedSynchronizer)介紹内部狀态實作

介紹

在Java中,除了synchronized關鍵字具備鎖的語義,還有juc包中的Lock接口。在觀察Lock接口的多個不同實作後,不難發現,其内部鎖的語義的實作,基本都是仰仗着AbstractQueuedSynchronizer,簡稱AQS。

内部狀态

Node

既然要實作鎖的語義,則必須處理擷取競争鎖失敗,線程等待的情況。在AQS線程擷取鎖失敗,就會構造一個Node接口,放入内部維護的同步隊列中。

字段

  • int waitStatus:等待狀态。有以下幾個值。
    • 1:目前線程被取消了
    • -1:辨別目前節點的後繼節點中的線程目前是被阻塞的,是以目前節點在出隊時應該喚醒後繼節點
    • -2:目前節點在某個Condition的等待隊列中
    • -3:僅頭節點可能設定為這個辨別,代表doReleaseShared方法應該傳播下去
    • 0:差別以上的狀态值
  • Node prev:前驅節點
  • Node next:後繼節點
  • nextWaiter:
  • Thread thread:目前節點裡面的線程

state

AQS内部有個int類型的變量state。需要注意:state是volatile變量。在AQS中,鎖的擷取和釋放就表現在state值的變化。

實作

既然AQS的主要作用是用來輔助各種鎖的實作類的。那它自研必然也會有相關的鎖的申請和釋放的方法。而鎖又分為共享鎖和排它鎖,是以,AQS内部也有4個對應的方法:

排它鎖:

  • acquire(int arg):排它鎖的申請
  • release(int arg):排它鎖的釋放

共享鎖:

  • acquireShared(int arg):共享鎖的申請
  • releaseShared(int arg):共享鎖的釋放

下面逐個分析每個方法。

排它鎖的申請:acquire(int arg)方法

acquire(int arg)是一個模闆方法,嘗試擷取鎖的方法tryAcquire()是由子類負責實作。

public final void acquire(int arg) {
  //1.
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
           

1.嘗試擷取鎖:tryAcquire(int arg)方法

這是一個抽象方法。子類的實作一般都是以CAS的方式去嘗試改變state的值。如果CAS設定值成功,則為成功擷取到了鎖,方法直接退出;如果沒有擷取到鎖,則會執行後面的方法

2.如果擷取鎖失敗,則建立一個節點:addWaiter(Node node)方法

對于擷取排它鎖而言,這裡構造Node時,傳入的waiter是Node.EXCLUSIVE(其實值為null,僅僅是标記是排它鎖)

private Node addWaiter(Node mode) {
  //建立一個Node,waiter是Node.EXCLUSIVE(其實就是null)
  Node node = new Node(Thread.currentThread(), mode);
  Node pred = tail;
  if (pred != null) {
    //把原來的尾節點設定為這次構造的Node的前驅節點
    node.prev = pred;
    //以cas的方式将尾節點設定為目前節點。如果設定成功,則傳回,方法結束
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  //如果前面cas設定尾結點失敗,則用循環CAS的方式将Node加入到隊列中
  enq(node);
  return node;
}
           

3.自旋,嘗試擷取鎖:acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      //擷取目前節點的前驅節點
      final Node p = node.predecessor();
      //如果前驅節點為頭節點,則嘗試擷取鎖
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      //如果擷取鎖失敗
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}
           

3.1.擷取鎖失敗後,判斷是否要挂起目前node裡的線程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    //SIGNAL狀态表示目前Node的後繼節點需要被notify,是以如果是這個狀态,則可以阻塞node
    return true;
  if (ws > 0) {
    //waitStatus>0的值隻有一個 = 1,表示前驅節點以及被取消,則将目前節點的前驅節點設定為更前面的一個非取消節點
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}
           

3.2. 如果需要阻塞目前線程,則阻塞,并傳回線程是否被中斷

排它鎖的釋放:release(int arg)方法

release()也是個模闆方法,嘗試釋放鎖的tryRelease()方法也需要子類自己實作

public final boolean release(int arg) {
	//嘗試釋放鎖
  if (tryRelease(arg)) {
    //釋放成功
    Node h = head;
    if (h != null && h.waitStatus != 0)
      //喚醒後繼節點
      unparkSuccessor(h);
    return true;
  }
  //釋放失敗
  return false;
}
           

共享鎖的擷取:acquireShared(int arg)方法

public final void acquireShared(int arg) {
  //tryAcquireShared方法由子類負責重寫
  if (tryAcquireShared(arg) < 0)
    //擷取失敗
    doAcquireShared(arg);
}
           

asd

private void doAcquireShared(int arg) {
  //構造節點,waiter是Node.SHARED,并以CAS的方式加入到同步隊列尾部
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      //擷取前驅節點
      final Node p = node.predecessor();
      //前驅節點是頭結點
      if (p == head) {
        //嘗試擷取共享鎖
        int r = tryAcquireShared(arg);
        //擷取共享鎖成功
        if (r >= 0) {
         	
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}
           

共享鎖的釋放:releaseShared(int arg)

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}
           

主要邏輯在doReleaseShared()方法:

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;
        //喚醒後繼節點
        unparkSuccessor(h);
      } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//如果waitStatus=0且cas設定h的狀态失敗在繼續下一次循環
        continue;                
    }
    if (h == head)                   // loop if head changed
      break;
  }
}
           

繼續閱讀