天天看點

死磕多線程(8)-ReentrantLock,Condition詳解

ReentrantLock(Lock中使用頻率最高的類)-可重入鎖

内建鎖隐式支援重入性,synchronized通過擷取自增,釋放自減的方式實作重入。

一、重入性實作原理

重入性鎖的特點:線程擷取鎖的時候,如果已經擷取鎖的線程是目前線程直接再次擷取

由于鎖會被擷取N次,是以鎖隻有被釋放N次之後才算真正釋放成功

如何實作可重入?

以非公平鎖為例,判斷目前線程能否獲得鎖為例,核心方法為nonfairTryAcquire:

final boolean nonfairTryAcquire(int acquires) {
    //拿到目前線程
final Thread current = Thread.currentThread();
//擷取目前同步狀态
int c = getState();
// 1.如果該鎖未被任何線程占有,目前線程使用CAS嘗試擷取同步狀态
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 2.此時同步狀态不為0,表示已經有線程擷取到了同步狀态
//判斷持有線程是否為目前線程
else if (current == getExclusiveOwnerThread()) {
// 3.若是目前線程,同步狀态再次+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//将再次+1後的狀态寫回記憶體
setState(nextc);
return true;
}
return false;
}

           

鎖的釋放:

protected final boolean tryRelease(int releases) {
// 1.同步狀态-1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 2.隻有當同步狀态為0時,鎖成功釋放,傳回false
free = true;
setExclusiveOwnerThread(null);
}
// 3.鎖未被完全釋放,傳回false
setState(c);
return free;
}

           

二、公平鎖與非公平鎖

公平鎖:鎖的擷取順序一定滿足時間上的絕對順序,等待時間最長的線程一定最先擷取到鎖。

非公平鎖:等待時間最長的線程不一定先擷取到鎖

ReentrantLock預設使用非公平鎖:

public ReentrantLock() {
sync = new NonfairSync();
}

           

可傳入一個boolean值,true時為公平鎖,false時為非公平鎖:

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

           

1.非公平鎖nonFairSync:

final void lock() {
    //不在隊列中的線程可能會直接擷取到鎖
    if(compareAndSetState(0,1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
}

           
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//-------------------------------------------------
// 1.如果該鎖未被任何線程占有,該鎖能被目前線程擷取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
//------------------------------------------------
}
// 2.若被占有,檢查占有線程是否是目前線程
else if (current == getExclusiveOwnerThread()) {
// 3.再次擷取,計數+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

           

2.公平鎖FairSync:

final void lock(){
    //少了一次CAS過程
    acquire(1);
}

           
protected final boolean tryAcquire(int acquires) {
            if (c == 0) {
            	// 增加了!hasQueuedPredecessors()
            	// 當同步隊列中存在非空節點,目前線程直接封裝為Node節點排隊
                if (!hasQueuedPredecessors() &&
                //hasQueuedPredecessors() :實作公平鎖,先檢測隊列中是否有非空節點,沒有CAS競争
                  compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
        }   

           

3.公平鎖與非公平鎖對比:

公平鎖保證每次擷取鎖均為同步隊列的第一個節點,保證了請求資源時間上的絕對順序,但是效率較低,需要頻繁的進行上下文切換

非公平鎖會降低性能開銷,在恢複一個被挂起的線程與該線程真正運作之間存在着嚴重的延遲。而且,非公平鎖能更充分的利用cpu的時間片,盡量的減少cpu空閑的狀态時間。但是可能導緻其他線程永遠無法擷取到鎖,造成線程"饑餓"現象。

通常來講,沒有特定的公平性要求盡量選擇非公平鎖(ReentrantLock預設選擇)

三、ReentrantReadWriteLock(可重入讀寫鎖)

讀寫者模型:

讀寫鎖允許同一時刻被多個讀線程通路,

但是在寫線程通路時,所有的讀線程以及其他寫線程均會被阻塞。

1.寫鎖詳解-獨占鎖

寫鎖的擷取-tryAcquire(int acquires)

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            // 擷取讀寫鎖狀态
            int c = getState();
            // 擷取獨占式鎖狀态-即寫鎖狀态
            int w = exclusiveCount(c);//目前寫鎖的擷取次數
            if (c != 0) {
                // 表示目前有讀線程拿到讀鎖,寫線程無法擷取同步狀态
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 寫鎖的可重入次數已達最大值
                if (w + exclusiveCount(acquires) > MAX_COUNT)//左移16位-1(乘2的10次方)
                    throw new Error("Maximum lock count exceeded");
                // 寫鎖可重入
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            // 此時讀寫狀态為0,寫鎖可以正常擷取到同步狀态
            // 将目前線程置為隻有寫鎖線程
            setExclusiveOwnerThread(current);
            return true;
        }


           

寫鎖的釋放通過重寫AQS的tryRelease方法,源碼為:

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 同步狀态減去寫狀态
int nextc = getState() - releases;
// 目前寫狀态是否為0,為0則釋放寫鎖
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 不為0則更新同步狀态
setState(nextc);
return free;
}

           

2.讀鎖-共享鎖(一般與獨占鎖搭配使用實作讀寫者模型)

讀鎖擷取:

隻要目前沒有寫線程擷取到寫鎖并且讀鎖的擷取次數不超過最大值,讀鎖就能擷取成功。

讀鎖的釋放:減到0徹底将鎖釋放

讀寫鎖的應用場景:緩存的實作

寫鎖的降級:寫鎖可以降級為讀鎖,但是讀鎖不能更新為寫鎖。

注意:讀鎖 != 無鎖:(寫鎖時所有讀線程也停止,讀鎖可以限制個數)

3.如何區分讀狀态與寫狀态:

同步狀态的高16位表示讀鎖擷取次數;同步狀态低16位表示寫鎖擷取次數。

死磕多線程(8)-ReentrantLock,Condition詳解

四、Condition的await與signal 等待/通知機制

Object的wait與notify是與内建鎖(對象螢幕)搭配使用,完成線程的等待與通知機制。Condition的await、signal是與Lock體系配合實作線程的等待與通知(Java語言層面實作,具有更高的控制與擴充性。)

1.Condition有以下三個獨有特性(内建鎖不具備)

I.Condition await支援不響應中斷,而Object提供wait不支援。(Lock支援響應中斷)

II.Condition支援多個等待隊列,而Object wait隻有一個等待隊列。

III.Condition支援設定截止時間,而Object wait隻支援設定逾時時間。

2.等待方法:await()

I.void await() throws InterruptedException(同wait());目前線程進入等待狀态,如果在等待狀态中被中斷會抛出被中斷異常;

II.void awaitUninterruptibly();特性1,等待過程中不響應中斷

III.boolean await(long time, TimeUnit unit) throws InterruptedException;在I的基礎上增加了逾時等待功能,可以自定義時間機關

IV. boolean awaitUntil(Date deadline) throws InterruptedException;特性3,支援設定截止時間

await實作原理:

public final void await() throws InterruptedException {
	        // 判斷中斷
            if (Thread.interrupted())
                throw new InterruptedException();
============// 将目前線程封裝為Node入等待隊列===============================
            Node node = addConditionWaiter();
            // .釋放目前線程所占用的lock,釋放後會喚醒同步隊列中的下一個節點
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 
            while (!isOnSyncQueue(node)) {
            	// 當線程不在同步隊列後将其阻塞,置為WAIT狀态
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 在同步隊列中排隊擷取鎖
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

           

如何将目前線程插入等待隊列 addConditionWaiter()

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
            	// 清空所有等待隊列中狀态不為Condition的節點
                unlinkCancelledWaiters();
                // 将最新的尾節點指派
                t = lastWaiter;
            }
            // 将目前線程包裝為Node節點且狀态為Condition
            Node node = 
            new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            // 尾插入等待隊列
            lastWaiter = node;
            return node;
        }

           

将線程包裝為Node節點尾插入等待隊列後,線程釋放鎖過程fullyRelease()

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
        	// 擷取目前同步狀态
            int savedState = getState();
            // 調用AQS的釋放同步狀态方法release()
            if (release(savedState)) {
                failed = false;
                return savedState;//釋放成功狀态為0
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
        	// 若在釋放過程中出現異常,将目前節點取消
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

           

線程如何能從await()方法中退出?

while (!isOnSyncQueue(node)) {
				// 阻塞在此處
                LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
               break;
     }
I.在等待時被中斷,通過break退出循環
II.被喚醒後置入同步隊列,退出循環。

           

調用condition.await方法的線程必須是已經獲得了lock,也就是目前線程是同步隊列中的頭結點。調用該方法後會使得目前線程所封裝的Node尾插入到等待隊列中

  1. signal(),signalAll()

    将等待隊列中等待時間最長的節點移動到同步隊列中

public final void signal() {
    //判斷目前節點是否拿到鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 拿到目前等待隊列的頭結點
            Node first = firstWaiter;
            if (first != null)
            	// 喚醒頭結點
                doSignal(first);
        }


           
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将頭結點從等待隊列中移除
first.nextWaiter = null;
// transferForSignal方法對頭結點做真正的處理
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}      

           
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 首先将節點狀态更新為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将節點使用enq方法尾插到同步隊列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

           

同步隊列中的節點置入等待隊列:同步隊列的頭節點作為等待隊列的最後一個節點喚醒:喚醒等待隊列的頭節點,放入同步隊列最後一個節點

signalAll()方法:

将等待隊列中的每一個節點都移入到同步隊列中,即“通知”目前調用condition.await()方法的每一個線程

4.Condition等待隊列

Condition隊列與AQS中的同步隊列共享節點(Node)資料結構,帶有頭尾指針的單向隊列

死磕多線程(8)-ReentrantLock,Condition詳解

每當調用lock.newCondition()就會在綁定的lock鎖上新增一個等待隊列。(特性2)

死磕多線程(8)-ReentrantLock,Condition詳解