天天看點

多線程高并發系列之ReentrantLock鎖的實作原理:AQS

本來這篇想寫并發容器的,但是我看了下api文檔,沒啥深度,沒啥可寫的,照着api文檔的描述敲一遍代碼也差不多就懂了。是以也就決定這一片記一下這兩天學習的AQS原理。

大家不要一看到原理兩字,心裡就有點退堂鼓,以為很難的感覺。難道那不是人寫的代碼嗎,他能寫出來,你就寫不出來嗎,也就是一個人的邏輯,明白了他的邏輯不也就懂了代碼原理了嘛。

初入部落格,有許多描述不清楚、語句不通順的地方。請提出指正!!!

開始

synchronized鎖的實作,它是jvm關鍵字,直接轉換成位元組碼指令。 ReentrantLock是一個普通的代碼類,是java代碼通過邏輯來實作鎖的效果,也正因為是java代碼實作的,是以在靈活度上要比sync鎖強。 在講解是,我會以非公平鎖示例來描述他發生的過程,了解了非公平鎖,公平鎖也自然就了解了。我将以不同的情況來描述Rreentrantlock當時要做的操作。

1. 初始狀态下,第一個線程通路:

public static void main(String[] args) {
		Lock rtLock =new ReentrantLock();
		try {
			rtLock.lock();
			TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally {
			rtLock.unlock();
		}
	}
           

斷點運作這段代碼,此時的狀态:

多線程高并發系列之ReentrantLock鎖的實作原理:AQS

可以看到rtLock中有4個字段:state、head、tail、exclusiveOwnerThread,

state表示的是鎖的狀态:0表示沒有鎖,0<state,則表示有鎖或鎖重入。

head和tail:表示的隊列中的節點,Rtlock之是以能實作公平鎖,就是因為隊列的設計,下面的情節會說。

exclusiveOwnerThread:目前持有鎖的線程.

這段是擷取鎖的源碼,通過CAS算法去擷取鎖,我們這一小節是初始狀态的前提下,是以CAS傳回值肯定為true,然後設定目前持有鎖的線程為目前線程。

final void lock() {
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		acquire(1);
}
           

2. RtLock的重入鎖 繼第一步,我們在main方法中,稍加改動

多線程高并發系列之ReentrantLock鎖的實作原理:AQS

看到了state變成了2,看源碼是怎麼操作的:

final void lock() {
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		acquire(1);
}
           

還是這段源碼,這次compareAndSetState的CAS傳回結果肯定為false,是以執行else中的方法,跟進去:

public final void acquire(int arg) {
		//tryAcquire擷取鎖,若tryAcquire擷取鎖失敗,則進入
		//acquireQueued方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
//先跟進去tryAcquire方法
protected final boolean tryAcquire(int acquires) {
		return nonfairTryAcquire(acquires);
	}
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//目前線程
            int c = getState();//此時鎖的狀态
            //鎖重入,則c肯定不為0,執行else if邏輯
			if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
			/**
			*我們在上面說明了,有個表示目前持有鎖的線程的字段,
			getExclusiveOwnerThread為得到目前持有鎖的線程,
			*判斷目前線程與此時持有鎖的線程比較是否相等,相等則表示為
			同一個線程,則state加1,傳回true
			*别忘了我們這一小節的主題:重入鎖,是以,此小節到此為止。
			别着急,一步一步來。
			*/
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
           

3. 此時處于上鎖狀态時,又新來一線程擷取鎖

多線程高并發系列之ReentrantLock鎖的實作原理:AQS

waitStatus在AQS中有5個狀态:

/** waitStatus value to indicate successor's thread needs unparking */
	翻譯:waitStatus value to 訓示後續線程需要斷開連接配接
        static final int SIGNAL    = -1;
	/** waitStatus value to indicate thread has cancelled */
	翻譯:waitStatus value to 訓示線程已取消
        static final int CANCELLED =  1;
	/** waitStatus value to indicate thread is waiting on condition */
	翻譯:waitStatus value to 訓示線程正在等待條件
        static final int CONDITION = -2;
	/** waitStatus value to indicate the next acquireShared should
         * unconditionally propagate*/
	翻譯:waitStatus value to 訓示下一個acquireShared應無條件傳播
        static final int PROPAGATE = -3;
為0時,則以上都不是。
           
  1. 我們看到head(隊列頭部)的thread為空、waitstatus為-1以及next指向了下一個節點,thread為空則說明head是AQS建立的一個空節點,隻不過,為啥要這麼做呢?—程式之是以建立一個空的head節點,就是為了能準确的确定目前節點是否為有效隊列中的第一個節點。
  2. tail(隊列尾部)的waitstatus為0,上一個節點為head,下一個節點為空,thread為新來的線程。

    記住這張圖的資訊,然後我們開始看源碼:

    繼第2步,tryAcquire擷取鎖方法肯定傳回false,然後“!false”為true,則執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),

    嵌套了一個方法addWaiter,我們進去:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//建立一個獨占鎖的node節點,Node.EXCLUSIVE表示獨占鎖
		//隊列中的尾部節點,由于這個線程是新來的,隊列還未建立,是以tail現在肯定為空。
        Node pred = tail;
        //pred為空,則跳過
		if (pred != null) {
            node.prev = pred;//原來隊列中的尾部節點,現在變成新節點的上一個節點
            if (compareAndSetTail(pred, node)) {//通過cas算法,加入隊列中
                pred.next = node;//原來尾部節點的下一個節點為目前節點
                return node;
            }
        }
        enq(node);//加入隊列,進入enq
        return node;
    }

	private Node enq(final Node node) {
        for (;;) {//相當于while(true),最好的寫法是for(;;)
            //第一次循環,tail為空,則執行if方法體,第2次循環不為空,
            //則執行else方法體,
			//這裡說的第一次循環tail為空,是基于這個新來的線程是第二
			//個競争鎖的線程,
			//如果第3個線程來了,第一個線程還未釋放鎖,那這個tail肯定
			//不為空。是以不要混淆
			Node t = tail;
            if (t == null) {
           		 //建立一個節點,通過cas算法放入head中。
                if (compareAndSetHead(new Node()))
                	//将頭部又指派給尾部,說明該隊列是個循環清單,然
                	//後進行第2次循環
                    tail = head;
            } else {
			//這一步就是給tail重新指派,将新線程的資訊指派給tail,然
			//後循環結束
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

結束,我們看到addWaiter方法就是做了隊列的建立,以及将新來的線程加入到建立的隊列中,一個特點就是head節點是空的,有效的線程從隊列中的第2個元素開始。然後,我們看acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
           		 //現在隊列中雖然有兩個節點,但是現在“真正有效”的節點
           		 //隻有tail節點,則p為head節點
                final Node p = node.predecessor();
                //現在第一個線程還未釋放鎖,則p==head為true,
                //tryAcquire(arg)為false,跳過第一個if,執行第二個if
				if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

	 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
		//剛才看到了圖檔上head的waitStatus為-1,是以傳回true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
           

看到源碼,我們得到的資訊是,去目前節點,上一個節點的waitStatus枚舉值,由枚舉值決定傳回值,然後,parkAndCheckInterrupt方法,是直接将目前線程中斷,那麼這個if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())幹的就是:由目前節點的上一個節點的狀态來決定目前節點是否需要被中斷。在這裡我想說明一下:我看到了很多部落格中說for(;;)是在自旋。确實是自旋,但是與jvm中對鎖進行優化時進行的自旋鎖有鎖不同,jvm中的自旋鎖,是為了防止阻塞導緻的性能下降而引入的,而這裡描述的自旋,僅僅隻是自旋,并沒有關于性能的優化設計,在這裡,如果需要阻塞,就會阻塞。我也因為當時了解錯誤,找了好久的資料。

對于CLH隊列,隻是記錄了線程的id,在需要喚醒的時候,根據這個id喚醒它,否則根據這個id找到這個線程後讓他中斷。

4. 看到了AQS的隊列操作,再來看看解鎖吧

解鎖很簡單,我不會在這裡再那麼多話,請自行分析代碼

public void unlock() {
        sync.release(1);
    }
	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
	protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
	//喚醒head節點下一個節點
	private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
           

這是解鎖,沒啥好說的,可以說一下的就是在解鎖之後,程式又去喚醒了head節點的下一個節點。節點在擷取鎖之後,将目前節點移出隊列,就是if (p == head && tryAcquire(arg))中的邏輯。

**在這裡,被喚醒的節點,從被中斷的位置開始執行,也就是再次開始執行循環for(;;)體,則if (p == head && tryAcquire(arg))有可能成功(因為是非公平鎖,是以有可能成功),通過自旋。**這裡的自旋與sync鎖的優化自旋不同,sync的自旋的設計是不想線程阻塞耗費性能,這裡的自旋隻是單純的循環擷取鎖而已。

總結:當鎖已經被占用的時候,非公平鎖,會首先去嘗試擷取鎖,如果擷取鎖失敗,則乖乖的去排隊。CLH隊列,遵循FIFO規則,也就是,隻要你入隊了,那麼你隻能被按照公平鎖的方式執行。而公平鎖與非公平鎖在代碼上的不同就是在擷取鎖的時候,先去檢視隊列中是否有node節點,如果有,那就乖乖排隊去,沒有就直接擷取鎖。

另外,aqs又可以實作條件隊列來操作對應的線程,詳見部落格

這隻是reentrantlock中的很小的一部分,但是基本的流程邏輯也差不多就這樣了,其他的方法多是對一些邏輯方法的強化等。

如果有時間真的可以好好讀一讀。

CLH隊列:是Craig,Landin and Hagersten三個人發明的,是以叫CLH隊列