天天看點

AQS你真的學會了,深入源碼分析

簡介

AQS:AbstractQueuedSynchronizer,是JUC下lock包的一個類,能夠多線程通路共享資源的同步器架構。

首先介紹AQS中一些重要的參數:

state:同步器的狀态,

exclusiveOwnerThread:互斥鎖持有的線程

CLH隊列:這是一個雙向連結清單隊列

head:同步等待期的頭部

tail:同步等待器的尾部

在AQS中線程通路資源有兩種方式:

使用獨占鎖:顧名思義每次通路隻能有一個線程持有鎖,ReentrantLock就是以獨占方式實作的互斥鎖。

使用共享鎖:共享鎖就是允許多個線程痛死獲得鎖,并發通路資源。比如:ReadWriteLock

ReentrantLock可重入鎖:

線程在ReentrantLock的lock方法後,擷取到鎖沒有釋放鎖,當目前線程再次調用lock方法時,并不會阻塞,隻會增加重入的次數。

首先我們通過一段代碼逐漸分析AQS的源碼:

public class AQS {
    private ReentrantLock lock = new ReentrantLock();
    public void lock1(){
        lock.lock();
        System.out.println(Thread.currentThread().getName() +"獲得了鎖");
        lock2();
        lock.unlock();
    }

    public void lock2(){
        lock.lock();
        System.out.println(Thread.currentThread().getName() +"獲得了鎖");
        lock.unlock();
    }
    public static void main(String[] args) {
        AQS t = new AQS();
        Thread t1 = new Thread(()->t.lock1(),"t1");
        t1.start();
    }
}

           
AQS你真的學會了,深入源碼分析

從代碼的執行結果來看,t1線程在調用lock1方法後并沒有釋放鎖就調用了,lock2方法,但是并沒有出現阻塞現象。

AQS源碼分析:

我們根據上述代碼逐漸分析:

//預設的無參構造器
  public ReentrantLock() {
        sync = new NonfairSync();
    }
//有參構造器
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
           

首先看一下ReentrantLock類的構造方法,可以看出關鍵的類sync(抽象類Sync也是ReentrantLock類中的屬性),這也就引出了公平鎖和非公平鎖的概念

//抽象類Sync繼承了AbstractQueuedSynchronizer(也就是常說的AQS)
abstract static class Sync extends AbstractQueuedSynchronizer 
           
AQS你真的學會了,深入源碼分析

此處我們可以先熟悉一下ReentrantLock和AbstractQueuedSynchronizer的相關的結構:

跟着上述demo,我們繼續調試Lock()方法:

public void lock() {
    sync.lock();
  }
 //非公平
 final void lock() {
	 if (compareAndSetState(0, 1))
	        setExclusiveOwnerThread(Thread.currentThread());
	    else
	        acquire(1);
	}
//公平
final void lock() {
    acquire(1);
    }
           

我們通過前面可以知道,ReentrantLock的空構造方法是構造一個非公平鎖的Sync實體類:

public ReentrantLock() {
        sync = new NonfairSync();
    }
           

是以上面代碼的lock()方法,執行的是NonfairSync類中的lock()方法。

這個就是我們常說的CAS,

CAS相關源碼:

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  //擷取state在記憶體中的偏移量
  stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 
           

這個方法底層調用的是,unsafe類下的compareAndSwapInt()方法,參數中stateOffset是state變量在記憶體中的偏移量,通過這個偏移量可以擷取到state的值,unsafe類中的方法都具有原子性,保證了線程安全,compareAndSwapInt()方法,如果state==expect,那麼就将state更新為update的值,并傳回true;一開始state的值為0,傳入的expect為0,update為1,此時state等于expect,則把state指派為1,并傳回true。

在非公平的lock()方法中,首先會嘗試使用CAS方式加鎖,如果成功了就使用

setExclusiveOwnerThread(Thread.currentThread())
//AbstractOwnableSynchronizer類下的
protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    } 
           

方法,将目前線程賦給exclusiveOwnerThread,這也是非公平鎖的含義,每個線程加鎖時,會先嘗試加鎖,如果成功就賦給exclusiveOwnerThread,也就不用放在CLH隊列中進行排隊。

CLH同步隊列:是一個FIFO雙向隊列,AQS主要是通過其實作公平鎖。線程通過AQS擷取鎖失敗就會将線程封裝成一個Node節點,插入隊列尾部,當有線程釋放鎖時,就會嘗試将鎖頭的next節點占用鎖。

AQS你真的學會了,深入源碼分析
//首先tryAcquire擷取鎖,如果擷取到,!tryAcquire()為false,後面不用執行,直接執行業務代碼
//如果為擷取到鎖,添加一個新的節點,插入到CLH隊列尾部
public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
	return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
   final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//*****核心:主要功能是完成頭節點嘗試擷取鎖資源,以及其他節點被阻塞
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//擷取目前節點的前一個節點
                final Node p = node.predecessor();
                //隻有P節點是頭節點才可以擷取鎖資源,如果擷取到鎖資源也就不需要阻塞
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //p節點不是第一個節點,或者p節點嘗試加鎖失敗,
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
                // 阻塞目前節點,等待被unpark()方法喚醒   
				/**private final boolean parkAndCheckInterrupt() {
				        LockSupport.park(this);
				        return Thread.interrupted();
				    }
				**/
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//在CLH的隊列結尾處添加一個獨占尾節點
private Node addWaiter(Node mode) {
		//把目前線程構造一個新的節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //判斷目前尾節點是否為null,不為null說明隊列中有節點
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
            	//尾插的方式插入目前節點
                pred.next = node;
                return node;
            }
        }
        //如果對了為空,将隊列初始化後插入目前節點
        enq(node);
        return node;
    }
//這是為了確定節點插入隊列的尾部,防止CAS失敗操作,也會保證節點插入隊列
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //尾節點為空,也就是隊列為空
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            //此時CLH隊列不為空,尾插
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

公平鎖和非公平鎖的差別就是在tryAcquire()方法上,多了一個hasQueuedPredecessors()方法。這個方法主要是尋找CLH隊列中有沒有,比目前節點等待擷取鎖的時間更長。

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
           

繼續閱讀