簡介
AQS:AbstractQueuedSynchronizer,是JUC下lock包的一個類,能夠多線程通路共享資源的同步器架構。
首先介紹AQS中一些重要的參數:
state:同步器的狀态,
exclusiveOwnerThread:互斥鎖持有的線程
CLH隊列:這是一個雙向連結清單隊列
head:同步等待期的頭部
tail:同步等待器的尾部
在AQS中線程通路資源有兩種方式:
使用獨占鎖:顧名思義每次通路隻能有一個線程持有鎖,ReentrantLock就是以獨占方式實作的互斥鎖。
使用共享鎖:共享鎖就是允許多個線程痛死獲得鎖,并發通路資源。比如:ReadWriteLock
ReentrantLock可重入鎖:
線程在ReentrantLock的lock方法後,擷取到鎖沒有釋放鎖,當目前線程再次調用lock方法時,并不會阻塞,隻會增加重入的次數。
首先我們通過一段代碼逐漸分析AQS的源碼:
public class AQS {
private ReentrantLock lock = new ReentrantLock();
public void lock1(){
lock.lock();
System.out.println(Thread.currentThread().getName() +"獲得了鎖");
lock2();
lock.unlock();
}
public void lock2(){
lock.lock();
System.out.println(Thread.currentThread().getName() +"獲得了鎖");
lock.unlock();
}
public static void main(String[] args) {
AQS t = new AQS();
Thread t1 = new Thread(()->t.lock1(),"t1");
t1.start();
}
}
從代碼的執行結果來看,t1線程在調用lock1方法後并沒有釋放鎖就調用了,lock2方法,但是并沒有出現阻塞現象。
AQS源碼分析:
我們根據上述代碼逐漸分析:
//預設的無參構造器
public ReentrantLock() {
sync = new NonfairSync();
}
//有參構造器
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
首先看一下ReentrantLock類的構造方法,可以看出關鍵的類sync(抽象類Sync也是ReentrantLock類中的屬性),這也就引出了公平鎖和非公平鎖的概念
//抽象類Sync繼承了AbstractQueuedSynchronizer(也就是常說的AQS)
abstract static class Sync extends AbstractQueuedSynchronizer
此處我們可以先熟悉一下ReentrantLock和AbstractQueuedSynchronizer的相關的結構:
跟着上述demo,我們繼續調試Lock()方法:
public void lock() {
sync.lock();
}
//非公平
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//公平
final void lock() {
acquire(1);
}
我們通過前面可以知道,ReentrantLock的空構造方法是構造一個非公平鎖的Sync實體類:
public ReentrantLock() {
sync = new NonfairSync();
}
是以上面代碼的lock()方法,執行的是NonfairSync類中的lock()方法。
這個就是我們常說的CAS,
CAS相關源碼:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//擷取state在記憶體中的偏移量
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
這個方法底層調用的是,unsafe類下的compareAndSwapInt()方法,參數中stateOffset是state變量在記憶體中的偏移量,通過這個偏移量可以擷取到state的值,unsafe類中的方法都具有原子性,保證了線程安全,compareAndSwapInt()方法,如果state==expect,那麼就将state更新為update的值,并傳回true;一開始state的值為0,傳入的expect為0,update為1,此時state等于expect,則把state指派為1,并傳回true。
在非公平的lock()方法中,首先會嘗試使用CAS方式加鎖,如果成功了就使用
setExclusiveOwnerThread(Thread.currentThread())
//AbstractOwnableSynchronizer類下的
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
方法,将目前線程賦給exclusiveOwnerThread,這也是非公平鎖的含義,每個線程加鎖時,會先嘗試加鎖,如果成功就賦給exclusiveOwnerThread,也就不用放在CLH隊列中進行排隊。
CLH同步隊列:是一個FIFO雙向隊列,AQS主要是通過其實作公平鎖。線程通過AQS擷取鎖失敗就會将線程封裝成一個Node節點,插入隊列尾部,當有線程釋放鎖時,就會嘗試将鎖頭的next節點占用鎖。
//首先tryAcquire擷取鎖,如果擷取到,!tryAcquire()為false,後面不用執行,直接執行業務代碼
//如果為擷取到鎖,添加一個新的節點,插入到CLH隊列尾部
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//*****核心:主要功能是完成頭節點嘗試擷取鎖資源,以及其他節點被阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//擷取目前節點的前一個節點
final Node p = node.predecessor();
//隻有P節點是頭節點才可以擷取鎖資源,如果擷取到鎖資源也就不需要阻塞
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//p節點不是第一個節點,或者p節點嘗試加鎖失敗,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// 阻塞目前節點,等待被unpark()方法喚醒
/**private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
**/
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//在CLH的隊列結尾處添加一個獨占尾節點
private Node addWaiter(Node mode) {
//把目前線程構造一個新的節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//判斷目前尾節點是否為null,不為null說明隊列中有節點
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//尾插的方式插入目前節點
pred.next = node;
return node;
}
}
//如果對了為空,将隊列初始化後插入目前節點
enq(node);
return node;
}
//這是為了確定節點插入隊列的尾部,防止CAS失敗操作,也會保證節點插入隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//尾節點為空,也就是隊列為空
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//此時CLH隊列不為空,尾插
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
公平鎖和非公平鎖的差別就是在tryAcquire()方法上,多了一個hasQueuedPredecessors()方法。這個方法主要是尋找CLH隊列中有沒有,比目前節點等待擷取鎖的時間更長。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}