面試官:談一談java中基于AQS的并發鎖原理
我:java中的AQS是指AbstractQueuedSynchronizer類,java中并發鎖控制邏輯都是基于這個類來實作的。
面試官:能說一下你用過的基于AQS的并發類有哪些嗎?
我:首先是基于AQS在内部類實作了公平鎖和非公平鎖,具體有3個類:ReentrantLock、ReentrantReadWriteLock、Semaphore,UML類圖如下:

面試官:談一下AQS是怎麼擷取鎖的?
我:首先,AbstractQueuedSynchronizer是一個基于FIFO的隊列實作的并發控制,隊列中的元素通過操作共享資源state來擷取和釋放鎖,state是一個volatile修飾的int類型變量。我以ReentrantLock中獨占鎖為例,如果有一個線程來擷取鎖,這時如果隊列中沒有元素,那就把這個線程加入隊列,同時線程申請的數量加入到state變量。如果隊列中已經有元素,這個線程入隊尾,之後線程中斷等待隊列前一個元素釋放鎖後喚醒。
下面的流程是一個擷取鎖的流程,如果下面的流程傳回false,則把目前線程加入到等待隊列。
面試官:讀過這部分的源代碼嗎,能不能講一下?
我:看一下流程中的源碼
final void lock() {
acquire(1);//擷取1個資源
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//擷取鎖失敗,入隊
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //是否有等待時間更長的元素
compareAndSetState(0, acquires)) {//自旋鎖
setExclusiveOwnerThread(current);//設定目前線程為獨占線程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//目前隊列中是否有等待時間更長的元素
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
下面的代碼是加入到等待隊列的過程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//前置節點是head并且擷取成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {//注意:如果前置節點不通知,繼續往前查找,找到一個可以通知的節點作為目前節點的node節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {//把前置節點的狀态設定為通知狀态-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//把目前節點加入到隊尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //隊列為空,初始化隊列,head=tail=node
if (compareAndSetHead(new Node()))
tail = head;
} else {//隊列非空,放到隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這兒要注意:隊列上的元素是Node對象,node中定義了waitStatus變量,有4個狀态,
static final int CANCELLED = 1;//目前線程已經取消鎖等待
static final int SIGNAL = -1;//下一個節點需要被喚醒
static final int CONDITION = -2;//目前線程正在等待condition,這個狀态需要跟condition配合使用
static final int PROPAGATE = -3;//釋放鎖的時候使用,這個狀态隻給頭節點,并且要不斷傳播下去
面試官:再說一下釋放鎖的過程?
我:鎖的釋放過程比較簡單,還是以ReentrantLock為例。首先嘗試釋放鎖(state變量中減去1),把目前鎖的擁有者置空,通知隊列中下一個節點。整個流程入下:
面試官:這部分的源代碼能不能講一下?
我:以ReentrantLock為例,源碼入下:
public final boolean release(int arg) {
if (tryRelease(arg)) {//嘗試釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//通知連結清單中下個等待節點
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())//鎖的目前擁有者不是目前線程,抛出異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//設定目前鎖擁有者為空
}
setState(c);
return free;
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
private void unparkSuccessor(Node node) {
//把目前節點的等待狀态置為0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//如果隊列中下一個節點為空或者下一個節點等待狀态是取消狀态(1),則從隊尾開始查找,知道找到一個非空并且等待狀态小于0的節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//為什麼從隊尾開始查找,這樣周遊整個隊列啊?但是如果目前節點的下一個是null,那就始終找不到下下個節點了,必須從隊尾找。
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//喚醒等待線程
LockSupport.unpark(s.thread);
}
面試官:上面你講的是獨占鎖,那AQS中的共享鎖怎麼用的?
我:首先共享鎖是指多個線程可以同時使用這個鎖,AQS中的使用是隻要不超過共享鎖允許的總數,都可以擷取到。在擷取讀鎖時,首先嘗試擷取共享鎖,如果擷取失敗,入隊後等待擷取。以ReentrantReadWriteLock為例,擷取共享鎖流程入下:
從上面這個流程可以看到,如果擷取不到鎖,就會進入fullTryAcquireShared,這個方式是在死循環中不斷嘗試擷取到鎖,直到成功。
面試官:這部分的源代碼能介紹一下嗎?
我:源碼入下
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//先嘗試擷取,擷取失敗,則進入隊列等待擷取
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//其他線程已經擷取到排它鎖
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && //是否需要阻塞,在公平鎖中,如果隊列中已經有元素了那就傳回true;在非公平鎖中,隊列中第二個元素不是共享節點,傳回true
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//cas失敗
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {//下面的方法前面都講過了
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//共享模式入隊
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {//擷取成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && //失敗後等待前置節點通知
parkAndCheckInterrupt()) //睡眠後等待喚醒
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);//不再擷取
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//自己成功頭結點進而喚醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//喚醒下一個節點
}
}
面試官:好的,共享鎖的釋放流程是怎樣的?
我:跟共享鎖的擷取流程一樣,先嘗試釋放(state變量中減去1),成功後喚醒隊列中下一個等待線程
這部分代碼如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//目前線程是第一個擷取到鎖的線程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;//釋放一個count數減1
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))//CAS修改state值
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//目前節點waiteStatus置為0
continue; // loop to recheck cases
unparkSuccessor(h);//喚醒下一個節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//頭節點waiteStatus置為-3
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
面試官:前面你提到了公平鎖和非公平鎖,這2個有什麼差別呢?
我:一般情況下,公平鎖是指目前線程在釋放鎖的時候,會通知一個等待隊列中等待時間最長的線程,而非公平鎖,目前線程釋放鎖的時候,會随機通知一個線程。非公平鎖代碼如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在ReentrantLock中,公平鎖和非公平鎖的不同就是非公平鎖會直接嘗試compareAndSetState(0, 1),失敗後才走擷取鎖流程。而公平鎖直接走擷取鎖流程。
在ReentrantReadWriteLock中,公平鎖隻要隊列中有其他線程占用鎖,讀寫鎖就需要阻塞,而非公平鎖中寫鎖不阻塞,讀鎖隻有隊列中第一個排隊等待線程使用獨占鎖時才阻塞,代碼如下
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();//隊列中第一個等待線程以獨占方式等待鎖
}
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();//隊列中有元素正在使用鎖或者第一個等待的線程不是目前線程
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
面試官:好的,再談一談CountDownLatch的使用和原理?
我:上面的原理了解了之後,CountDownLatch的使用就非常簡單了。示例代碼如下:
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);//将state置為3
for (int i = 0; i < 3; i++){
LocalThreadPool.run(new WorkerThread(latch));
}
System.out.println("latch test begin");
latch.await();//擷取到共享鎖
System.out.println("latch test end");
}
static class WorkerThread implements Runnable {
private CountDownLatch latch;
public WorkerThread(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " finished");
latch.countDown();//将state值減1,最後一個線程将state減為0後,釋放上面await方法擷取到的共享鎖。
}
}
}
上面輸出結果如下:
latch test begin
pool-1-thread-3 finished
pool-1-thread-2 finished
pool-1-thread-1 finished
latch test end
初始化的時候,批量将state值設定為線程數量,然後通過await擷取共享鎖。每個線程執行完成後調用釋放鎖的方法将state減1,最後一個線程将state減為0後傳回true,這時CountDownLatch就會釋放掉共享鎖。看下面源代碼
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//state減為0才會執行這個邏輯
doReleaseShared();//釋放await()方法擷取到的共享鎖
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;//減為0時傳回true
}
}
面試官:那CountDownLatch在await方法中加入參數是怎麼實作等待逾時的呢?
我:這個就是在一個死循環中不斷擷取鎖,直到逾時,這個是AQS本身就支援的邏輯,代碼如下:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;//設定申請鎖失敗時間
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {//逾時,取消擷取鎖
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
面試官:那CountDownLatch設定等待逾時時間,有什麼好處呢?
我:這個主要作用是設定主線程等待時間,以免長期阻塞主線程。從上面源代碼看出這個并不影響任務線程的執行,不過如果等待任務執行線程執行完成後再做一些日志或者通知,就會失敗,因為逾時後直接就會調用這些日志或通知,不一定真的所有任務都完成了。
比如下面的代碼:
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++){
LocalThreadPool.run(new WorkerThread(latch));
}
System.out.println("latch test begin");
latch.await(1, TimeUnit.SECONDS);
System.out.println("latch test end");
}
static class WorkerThread implements Runnable {
private CountDownLatch latch;
public WorkerThread(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finished");
latch.countDown();
}
}
}
輸出結果是
latch test begin
latch test end
pool-1-thread-2 finished
pool-1-thread-1 finished
pool-1-thread-3 finished
面試官:再聊一聊Semaphore的使用?
我:Semaphore的使用也是基于AQS的,它更像一個限流器,初始化是給state指派,每次執行任務時Semaphore擷取共享鎖并且将state值減1。如果state值小于0則入隊等待。任務執行完成後,Semaphore釋放鎖,首先state值加1,如果state小于0,則通知隊列中下一個等待線程。我們可以用Semaphore實作一個資料庫連接配接池,代碼如下:
class ConnectionPool {
final List<Connection> pool = new ArrayList<>(10);//這兒沒有考慮線程安全
final Semaphore sem = new Semaphore(10);//将state設定為10
//構造函數初始化連接配接池
ConnectionPool(int size) {
for (int i = 0; i < size; i++) {
pool.add(new Connection() {
//省略實作代碼
});
}
}
//執行任務過程
void doExecution() throws InterruptedException {
Connection connection = null;
sem.acquire();//擷取可中斷共享鎖,如果state小于1,則進入鎖等待隊列
try {
connection = pool.remove(0);//從連接配接池取連接配接
connection.commit();//執行事務
} catch (SQLException e) {
e.printStackTrace();
} finally {
pool.add(connection);//用完後歸還連接配接池
sem.release();//釋放鎖,将state值加1,如果state小于0,去等待隊列喚醒下一個等待線程
}
}
}
面試官:再聊一聊Condition的使用?
我:Condition類的作用是可以實作wait-notify機制,比synchronized靈活,可以在一個Lock上建立多個Condition,線程選擇注冊不同的Condition進項排程。假設我們現在有一個需求,2個線程輪流列印出1到10的數字,這個時候我們可以用信号量來做,代碼如下:
public class ConditionTest {
private ReentrantLock lock = new ReentrantLock();
public Condition conditionA = lock.newCondition();
public Condition conditionB = lock.newCondition();
private int count = 0;
public String printThreadA() throws InterruptedException {
while (count < 10){
lock.lock();
System.out.println(Thread.currentThread().getName() + ":" + (++count));
conditionB.signal();//列印後喚醒B線程列印
conditionA.await();//列印後等待B線程喚醒
lock.unlock();
}
return null;
}
public String printThreadB() throws InterruptedException {
while (count < 10){
lock.lock();
System.out.println(Thread.currentThread().getName() + ":" + (++count));
conditionA.signal();//列印後喚醒A線程列印
conditionB.await();//列印後等待A線程喚醒
lock.unlock();
}
return null;
}
public void printTenNumber(){
LocalThreadPool.call(() -> printThreadA());
LocalThreadPool.call(() -> printThreadB());
}
public static void main(String[] args){
ConditionTest conditionTest = new ConditionTest();
conditionTest.printTenNumber();
}
}
上面signal()方法具體實作為将目前線程從condition等待隊列轉入鎖等待隊列隊尾等待前一個節點喚醒,await()方法的具體實作為在condition等待隊列隊尾新加一個等待者,釋放鎖并且喚醒下一個等待線程。是以,condition的await和signal本質上是目前線程在condition等待隊列和鎖等待隊列直接的轉移。
代碼實作如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//waitStatus置為初始化
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
//放入鎖等待隊列隊尾并且把前置節點置為SIGNAL,以通知目前線程
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//等待隊列隊尾新加一個等待者
int savedState = fullyRelease(node);//釋放鎖并且喚醒下一個等待線程
//删除部分代碼
}
面試官:最後一個問題,如果我們自己實作一個AQS的類,需要實作哪些方法呢?
我:AQS類中以下5個方法沒有實作,都是throw new UnsupportedOperationException(),這些方法留給自定義子類來實作。
protected boolean tryAcquire(int arg):獨占方式擷取鎖
protected boolean tryRelease(int arg):獨占方式釋放鎖
protected int tryAcquireShared(int arg):共享方式擷取鎖
protected boolean tryReleaseShared(int arg):共享方式釋放鎖
isHeldExclusively():判斷目前線程是否正在獨占鎖
面試官:恭喜你,通過了。