天天看點

java中ReentrantLock類的詳細介紹(詳解)

部落客如果看到請聯系小白,小白記不清位址了

簡介

ReentrantLock是一個可重入且獨占式的鎖,它具有與使用synchronized螢幕鎖相同的基本行為和語義,但與synchronized關鍵字相比,它更靈活、更強大,增加了輪詢、逾時、中斷等進階功能。ReentrantLock,顧名思義,它是支援可重入鎖的鎖,是一種遞歸無阻塞的同步機制。除此之外,該鎖還支援擷取鎖時的公平和非公平選擇。

ReentrantLock的類圖如下:

java中ReentrantLock類的詳細介紹(詳解)

ReentrantLock的内部類Sync繼承了AQS,分為公平鎖FairSync和非公平鎖NonfairSync。如果在絕對時間上,先對鎖進行擷取的請求你一定先被滿足,那麼這個鎖是公平的,反之,是不公平的。公平鎖的擷取,也就是等待時間最長的線程最優先擷取鎖,也可以說鎖擷取是順序的。ReentrantLock的公平與否,可以通過它的構造函數來決定。

事實上,公平鎖往往沒有非公平鎖的效率高,但是,并不是任何場景都是以TPS作為唯一名額,公平鎖能夠減少“饑餓”發生的機率,等待越久的請求越能夠得到優先滿足。

下面我們着重分析ReentrantLock是如何實作重進入和公平性擷取鎖的特性,并通過測試來驗證公平性對性能的影響。

實作重進入

重進入是指任意線程在擷取到鎖之後能夠再次擷取該鎖而不會被鎖阻塞,該特性的首先需要解決以下兩個問題:

線程再次擷取鎖:所需要去識别擷取鎖的線程是否為目前占據鎖的線程,如果是,則再次擷取成功;

鎖的最終釋放:線程重複n次擷取了鎖,随後在第n次釋放該鎖後,其它線程能夠擷取到該鎖。鎖的最終釋放要求鎖對于擷取進行計數自增,計數表示目前線程被重複擷取的次數,而被釋放時,計數自減,當計數為0時表示鎖已經成功釋放。

ReentrantLock是通過自定義同步器來實作鎖的擷取與釋放,我們以非公平鎖(預設)實作為例,對鎖的擷取和釋放進行詳解。

擷取鎖

ReentrantLock的預設構造函數為:

public ReentrantLock() {
    sync = new NonfairSync();
}
           

即内部同步元件為非公平鎖,擷取鎖的代碼為:

public void lock() {
    sync.lock();
}
           

通過簡介中的類圖可以看到,Sync類是ReentrantLock自定義的同步元件,它是ReentrantLock裡面的一個内部類,它繼承自AQS,它有兩個子類:公平鎖FairSync和非公平鎖NonfairSync。ReentrantLock的擷取與釋放鎖操作都是委托給該同步元件來實作的。下面我們來看一看非公平鎖的lock()方法:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
           

該程式首先會通過compareAndSetState(int, int)方法來嘗試修改同步狀态,如果修改成功則表示擷取到了鎖,然後調用setExclusiveOwnerThread(Thread)方法來設定擷取到鎖的線程,該方法是繼承自AbstractOwnableSynchronizer類,AQS繼承自AOS類,它的主要作用就是記錄擷取到獨占鎖的線程,AOS類的定義很簡單:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static final long serialVersionUID = 3737899427754241961L;
 
    protected AbstractOwnableSynchronizer() { }
 
    // The current owner of exclusive mode synchronization.
    private transient Thread exclusiveOwnerThread;
 
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
 
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
           

如果同步狀态修改失敗,則表示沒有擷取到鎖,需要調用acquire(int)方法,該方法定義在AQS中,如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
tryAcquire(int)是子類需要重寫的方法,在非公平鎖中的實作如下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    // 擷取目前線程
    final Thread current = Thread.currentThread();
    // 擷取同步狀态
    int c = getState();
    // 同步狀态為0,表示沒有線程擷取鎖
    if (c == 0) {
        // 嘗試修改同步狀态
        if (compareAndSetState(0, acquires)) {
            // 同步狀态修改成功,擷取到鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 同步狀态不為0,表示已經有線程擷取了鎖,判斷擷取鎖的線程是否為目前線程
    else if (current == getExclusiveOwnerThread()) {
        // 擷取鎖的線程是目前線程
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 擷取鎖的線程不是目前線程
    return false;
}
           

nonfairTryAcquire(int)方法首先判斷同步狀态是否為0,如果是0,則表示該鎖還沒有被線程持有,然後通過CAS操作擷取同步狀态,如果修改成功,傳回true。如果同步狀态不為0,則表示該鎖已經被線程持有,需要判斷目前線程是否為擷取鎖的線程,如果是則擷取鎖,成功傳回true。成功擷取鎖的線程再次擷取該鎖,隻是增加了同步狀态的值,這也就實作了可重入鎖。

釋放鎖

成功擷取鎖的線程在完成業務邏輯之後,需要調用unlock()來釋放鎖:

public void unlock() {
    sync.release(1);
}
unlock()調用NonfairSync類的release(int)方法釋放鎖,release(int)方法是定義在AQS中的方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
tryRelease(int)是子類需要實作的方法:

protected final boolean tryRelease(int releases) {
    // 計算新的狀态值
    int c = getState() - releases;
    // 判斷目前線程是否是持有鎖的線程,如果不是的話,抛出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 新的狀态值是否為0,若為0,則表示該鎖已經完全釋放了,其他線程可以擷取同步狀态了
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 更新狀态值
    setState(c);
    return free;
}
           

如果該鎖被擷取n次,那麼前(n-1)次tryRelease(int)方法必須傳回false,隻有同步狀态完全釋放了,才能傳回true。可以看到,該方法将同步狀态是否為0作為最終釋放的條件,當狀态為0時,将占有線程設為null,并傳回true,表示釋放成功。

公平鎖與非公平鎖

公平性與否是針對鎖擷取順序而言的,如果一個鎖是公平的,那麼鎖的擷取順序就應該符合FIFO原則。我們在前面介紹了非公平鎖NonfairSync調用的nonfairTryAcquire(int)方法,在該方法中,隻要通過CAS操作修改同步狀态成功,則目前線程就擷取到了鎖,而公平鎖則不同,公平鎖FairSync的tryAcquire(int)方法如下所示:

protected final boolean tryAcquire(int acquires) {
    // 擷取目前線程
    final Thread current = Thread.currentThread();
    // 擷取同步狀态
    int c = getState();
    // 同步狀态為0,表示沒有線程擷取鎖
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 同步狀态不為0,表示已經有線程擷取了鎖,判斷擷取鎖的線程是否為目前線程
    else if (current == getExclusiveOwnerThread()) {
        // 擷取鎖的線程是目前線程
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
該方法與nonfairTryAcquire(int)方法比較,唯一不同的位置為判斷條件多了hasQueuedPredecessors()方法,該方法定義如下:

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    // 同步隊列尾節點
    Node t = tail; // Read fields in reverse initialization order
    // 同步隊列頭節點
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
           

該方法主要是對同步隊列中目前節點是否有前驅節點進行判斷,如果該方法傳回true,則表示有線程比目前線程更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。

下面我們編寫一個測試程式來觀察公平鎖和非公平鎖在擷取鎖時的差別:

public class FairAndUnfairTest {
    private static CountDownLatch start;
    
    private static class MyReentrantLock extends ReentrantLock {
        public MyReentrantLock(boolean fair) {
            super(fair);
        }
 
        public Collection<Thread> getQueuedThreads() {
            List<Thread> arrayList = new ArrayList<Thread>(super.getQueuedThreads());
            Collections.reverse(arrayList);
            return arrayList;
        }
    }
    
    private static class Worker extends Thread {
        private Lock lock;
 
        public Worker(Lock lock) {
            this.lock = lock;
        }
 
        @Override
        public void run() {
            try {
                start.await();
            } catch (InterruptedException e) {
            	e.printStackTrace();
            }
            // 連續兩次列印目前的Thread和等待隊列中的Thread
            for (int i = 0; i < 2; i++) {
                lock.lock();
                try {
                    System.out.println("Lock by [" + getName() + "], Waiting by " + ((MyReentrantLock) lock).getQueuedThreads());
                } finally {
                    lock.unlock();
                }
            }
        }
 
        public String toString() {
            return getName();
        }
    }
 
    public static void main(String[] args) {
		Lock fairLock = new MyReentrantLock(true);
		Lock unfairLock = new MyReentrantLock(false);
		
		testLock(fairLock);
//		testLock(unfairLock);
	}
 
    private static void testLock(Lock lock) {
        start = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            Thread thread = new Worker(lock);
            thread.setName("" + i);
            thread.start();
        }
        start.countDown();
    }
}
testLock(fairLock)運作結果(不唯一):
Lock by [0], Waiting by [4, 1, 2, 3]
Lock by [4], Waiting by [1, 2, 3, 0]
Lock by [1], Waiting by [2, 3, 0, 4]
Lock by [2], Waiting by [3, 0, 4, 1]
Lock by [3], Waiting by [0, 4, 1, 2]
Lock by [0], Waiting by [4, 1, 2, 3]
Lock by [4], Waiting by [1, 2, 3]
Lock by [1], Waiting by [2, 3]
Lock by [2], Waiting by [3]
Lock by [3], Waiting by []
testLock(unfairLock)運作結果(不唯一):

Lock by [0], Waiting by [1]
Lock by [0], Waiting by [1, 2, 3, 4]
Lock by [1], Waiting by [2, 3, 4]
Lock by [1], Waiting by [2, 3, 4]
Lock by [2], Waiting by [3, 4]
Lock by [2], Waiting by [3, 4]
Lock by [3], Waiting by [4]
Lock by [3], Waiting by [4]
Lock by [4], Waiting by []
Lock by [4], Waiting by []
           

從上述結果可以看到,公平鎖每次都是隊列中的第一個節點擷取到鎖,而非公平鎖出現了一個線程連續擷取鎖的情況。

為什麼會出現連續擷取鎖的情況呢?因為在nonfairTryAcquire(int)方法中,每當一個線程請求鎖時,隻要擷取了同步狀态就成功擷取了鎖。在此前提下,剛剛釋放鎖的線程再次擷取到同步狀态的幾率很大,而其他線程隻能在同步隊列中等待。

非公平鎖有可能使線程饑餓,那為什麼還要将它設定為預設模式呢?我們再次觀察上面的運作結果,如果把每次不同線程擷取到鎖定義為1次切換,公平鎖在測試中進行了10次切換,而非公平鎖隻有5次切換,這說明非公平鎖的開銷更小。我們下面再進行一個測試(還是用上面的程式,不過使用了10個線程,每個線程擷取2000次鎖,程式運作環境為Centos7.3 E5-2682 2.50GHz 單核 2GB),通過vmstat統計測試程式上下文切換次數,運作結果如下所示:

公平鎖

java中ReentrantLock類的詳細介紹(詳解)

程式運作總耗時為5308毫秒

非公平鎖

java中ReentrantLock類的詳細介紹(詳解)

程式運作總耗時為3176毫秒

從結果中可以看到,公平鎖與非公平鎖相比,耗時更多,線程上下文切換次數更多。可以看出,公平鎖保證了鎖的擷取按照FIFO原則,而代價則是進行大量的線程切換。非公平鎖雖然可能導緻線程饑餓,但卻有極少的線程切換,保證了其更大的吞吐量。