天天看點

【Java并發工具類】Lock和Condition

前言

Java SDK并發包通過

Lock

Condition

兩個接口來實作管程,其中Lock用于解決互斥問題,Condition用于解決同步問題。我們需要知道,Java語言本身使用

synchronized

實作了管程的,那麼為什麼還在SDK中提供另外一種實作呢?欲知為何請看下文。

下面将先闡述再造管程的理由,然後詳細介紹Lock和Condition,最後再看實作同步機制時是選擇synchronized還是SDK中的管程。

再造管程的理由

Java本就從語言層面實作了管程,然而後面又在SDK中再次現實,這隻能說明語言層面的實作的管程有所不足。要說談synchronized的不足,我們就要要回顧一下破壞死鎖的不可搶占問題:

破壞不可搶占條件,需要線程在擷取不到鎖的情況下主動釋放它擁有的資源。當我們使用synchronized的時候,線程是沒有辦法主動釋放它占有的資源的。因為,synchronized在申請不到資源時,會使線程直接進入阻塞狀态,而線程進入了阻塞狀态就不能主動釋放占有的資源。

是以,有沒有一種辦法可以使得線程處于阻塞狀态時也能夠響應中斷主動釋放資源或者擷取不到資源的時候不阻塞呢?答案是有的,使用SDK中的管程。

SDK中管程的實作

java.util.concurrent

中的

Lock

接口,提供了如下三種設計思想都可以解決死鎖的不可搶占條件:

  1. 能夠響應中斷

    線程處于阻塞狀态時可以接收中斷信号。我們便可以給阻塞的線程發送中斷信号,喚醒線程,線程便有機會釋放它曾經擁有的鎖。這樣便可破壞不可搶占條件。

  2. 支援逾時

    如果線程在一段時間之内沒有擷取到鎖,不是進入阻塞狀态,而是傳回一個錯誤,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。

  3. 非阻塞地擷取鎖

    如果嘗試擷取鎖失敗,并不進入阻塞狀态,而是直接傳回,那這個線程也有機會釋放曾經持有的鎖。這樣也可以破壞不可搶占條件。

這三種方案就可全面彌補synchronized的問題。也就是再造管程的原因。這三種思想展現在Lock接口的API上,便是如下三個方法:

// 支援中斷的 API
void lockInterruptibly() throws InterruptedException;

// 支援逾時的 API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

// 支援非阻塞擷取鎖的 API
boolean tryLock();
           

下面我們便繼續介紹Lock。

Lock和ReentrantLock

Lock接口中定義了一組抽象的加鎖操作:

public interface Lock{
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition(); // 關聯Condition對象使用
}
           

與synchronized内置加鎖不同,Lock提供的是無條件的、可輪詢的、定時的以及可中斷的鎖擷取操作,所有加鎖和解鎖都是顯式的。在Lock的實作中必須要提供與内置鎖相同的記憶體可見性語義,但是加鎖語義、排程算法、順序保證以及性能等方面可以不同。

ReentrantLock

實作了Lock接口,并提供了與synchronized相同的互斥性和記憶體可見性。在擷取ReentrantLock時,有着進入同步代碼塊相同的記憶體語義,在釋放ReentrantLock時,同樣有着與退出同步代碼塊相同的記憶體語義。見名知義,ReentrantLock還提供了同synchronized一樣的可重入加鎖的語義。

👉 擴充:可重入函數

可重入函數怎麼了解呢?指的是線程可以重複調用?顯然不是,所謂可重入函數,指的是多個線程可以同時調用該函數,每個線程都能得到正确結果;同時在一個線程内支援線程切換,無論被切換多少次,結果都是正确的。多線程可以同時執行,還支援線程切換,這意味着什麼呢?線程安全。是以,可重入函數是線程安全的。

Lock的标準使用形式

Lock l = ...; //使用ReentrantLock實作類 Lock l = new ReentrantLock();
l.lock();
try {
    // access the resource protected by this lock
} finally {
    l.unlock();
}
           

Lock的使用形式比synchronized要複雜一些,所有的加鎖和解鎖的操作都是顯式的。解鎖操作必須在

finally

塊中,否則,如果在被保護的代碼塊中抛出了異常,那麼這個鎖将永遠無法釋放。當使用加鎖時,還必須考慮在

try

塊中抛出異常的情況,如果可能使對象處于不一緻狀态,那麼就需要

try-catch

或者

try-finally

塊。

輪詢鎖與定時鎖

可定時與可輪詢的鎖擷取方式是由

tryLock

方法實作的。與無條件擷取鎖的模式比較,它具有更完善的錯誤恢複機制。

使用輪詢鎖解決動态順序死鎖問題

如果不能獲得所有需要的鎖,那麼可以使用可定時的或者可輪詢的鎖擷取方式,它會釋放已經獲得的鎖,然後嘗試重新擷取所有鎖。下面一個例子(來自參考[2]),給出了使用輪詢鎖來解決轉賬時動态順序死鎖問題:使用

tryLock

來擷取兩個賬戶的鎖,如果不能同時獲得,那麼就回退并重新嘗試。程式中還在休眠時間中做了随機處理,進而降低發生活鎖的可能性。

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class DeadlockAvoidance {
    private static Random rnd = new Random();
    public boolean transferMoney(Account fromAcct,
                                 Account toAcct,
                                 DollarAmount amount,
                                 long timeout,
                                 TimeUnit unit)
        throws InsufficientFundsException, InterruptedException {
        long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
        long randMod = getRandomDelayModulusNanos(timeout, unit);
        long stopTime = System.nanoTime() + unit.toNanos(timeout);

        while (true) {
            // 使用tryLock()擷取鎖,如果擷取了鎖,則傳回 true;否則傳回 false.
            if (fromAcct.lock.tryLock()) { 
                try {
                    if (toAcct.lock.tryLock()) { // 使用tryLock()擷取鎖
                        try {
                            if (fromAcct.getBalance().compareTo(amount) < 0)
                                throw new InsufficientFundsException();
                            else {
                                fromAcct.debit(amount);
                                toAcct.credit(amount);
                                return true;
                            }
                        } finally {
                            toAcct.lock.unlock();
                        }
                    }
                } finally {
                    fromAcct.lock.unlock();
                }
            }
            if (System.nanoTime() < stopTime) // 擷取鎖的時間大于給定時間,則傳回失敗
                return false;
            NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod); // 休眠時間加入随機成分
        }
    }

    private static final int DELAY_FIXED = 1;
    private static final int DELAY_RANDOM = 2;

    static long getFixedDelayComponentNanos(long timeout, TimeUnit unit) {
        return DELAY_FIXED;
    }

    static long getRandomDelayModulusNanos(long timeout, TimeUnit unit) {
        return DELAY_RANDOM;
    }

    static class DollarAmount implements Comparable<DollarAmount> {
        public int compareTo(DollarAmount other) {
            return 0;
        }
        DollarAmount(int dollars) {
        }
    }

    class Account {
        public Lock lock;
        void debit(DollarAmount d) {
        }
        void credit(DollarAmount d) {
        }
        DollarAmount getBalance() {
            return null;
        }
    }
    class InsufficientFundsException extends Exception {
    }
}
           

定時鎖

在實作具有時間限制操作時,定時鎖将非常有用。當在帶有時間限制的操作中調用了一個阻塞方法時,它能根據剩餘時間來提供一個時限。如果操作不能在指定的時間内給出結果,那麼它就會使程式提前結束。當使用内置鎖時,在開始請求鎖後,這個操作将無法取消,是以内置鎖很難實作帶有時間限制的操作。

可中斷的鎖擷取操作

可中斷的鎖擷取操作适用在可取消的操作中擷取鎖。内置鎖是不能響應中斷的。

lockInterruptibly()

方法能夠在擷取鎖的同時保持對中斷的響應,因為它包含在Lock中,是以無須建立其他類型的不可中斷阻塞機制。

非塊結構的加鎖

在内置鎖中,鎖的擷取和釋放都是基于代碼塊的,且是自動擷取和釋放鎖。雖然這樣避免了編碼的複雜性,但是卻不太靈活。例如,某些周遊并發通路的資料結果的算法要求使用連鎖式加鎖"hand-over-hand" 或鎖耦合 "chain locking":擷取節點 A 的鎖,然後再擷取節點 B 的鎖,然後釋放 A 并擷取 C,然後釋放 B 并擷取 D,依此類推。

Lock

接口的實作允許鎖在不同的作用範圍内擷取和釋放,并允許以任何順序擷取和釋放多個鎖,進而支援使用這種技術。

公平性

ReentrantLock

的構造函數中提供了兩種公平性選擇:建立一個非公平的鎖(預設)或者一個公平的鎖。

public ReentrantLock(){}
// fair參數代表的是鎖的公平政策,如果傳入true就表示需要構造一個公平鎖,否則就是構造一個非公平鎖。
public ReentrantLock(boolean fair) {}    
           

在公平的鎖上,線程将按照它們送出請求的順序來獲得鎖;在非公平的鎖上,則允許“插隊”:當一個線程請求非公平鎖時,如果在送出請求的同時該鎖的狀态變為可用,那麼這個線程将跳過隊列中所有的等待線程并獲得這個鎖(來得早不如來得巧)。

關于請求線程是否進入隊列排隊等待鎖:在公平的鎖中,如果有一個線程持有這個鎖或者有其他線程正在隊列中等待這個鎖,那麼新送出請求的線程将被放入隊列中等待(FIFO原則);在非公平鎖中,隻有當鎖被某個線程持有時,新送出請求的線程才會被放入隊列中(如上面所述的來得巧就會直接獲得鎖)。

在現實生活中我們往往期望事事公平,但是為什麼在并發的鎖上卻存在不公平鎖?其實想想也簡單,恢複挂起的一個線程到這個線程到這個線程真正開始執行之前是存在延遲的,如果在此期間有一個線程剛好達到并且在被喚醒的線程真正執行之前又剛好可以利用完資源,那麼這種充分利用資源的精神恰恰是可取的。

公平性将由于在挂起線程和恢複線程時産生開銷而極大地降低性能,于是,大多數情況下,非公平鎖的性能要高于公平鎖的性能。

隻有當持有鎖的時間相對較長,或者請求鎖的平均時間間隔較長,那麼就應該使用公平鎖。與預設的ReentrantLock一樣,内置加鎖并不會提供确定的公平性保證。

synchronized和ReentrantLock之間的抉擇

ReentrantLock在加鎖和記憶體上提供的語義都與内置鎖相同,除此之外,它還提供我們上述的定時的鎖等待、可中斷的鎖等待、公平性以及實作非塊結構的加鎖。ReentrantLock在性能上也優于内置鎖(在Java 5.0中遠遠勝出,在Java 6.0中略有勝出,synchronized在Java 6.0中做了優化),但是是否就意味着開發都使用ReentrantLock替代synchronized呢?

synchronized與ReentrantLock相比,還是具有很大優勢。例如為開發人員所熟悉、簡潔緊湊。加鎖和釋放鎖都是自動進行的,而顯式鎖需要手動在finally中進行,如果忘記将引發嚴重後果。在現有很多程式中使用的時内置鎖,貿然混合入顯式鎖也會讓人困惑,也容易引起錯誤。

是以,在一般情況下使用内置鎖,僅當内置鎖不能滿足需求時,才可以考慮使用ReentrantLock。未來synchronized被提升優化的可能也會很大,因為synchronized作為JVM的内置屬性,可以便于一些代碼優化,如對線程封閉的鎖對象的鎖消除優化,通過增加鎖的粒度來消除内置鎖的同步。

Condition對象

Lock可以看作是一種廣義的内置鎖,Condition則可以看作是一種廣義的内置條件隊列。我們前面介紹管程時說過,每個内置鎖隻能有一個相關聯的條件隊列(條件變量等待隊列)。 一個Condition和一個Lock關聯在一起,就像一個條件隊列和一個内置鎖關聯一樣。建立一個Condition,可以在關聯的Lock上調用

Lock.newCondition()

方法 。Condition比内置條件隊列提供了更豐富的功能:在每個鎖上加鎖存在多個等待、條件等待是可中斷的或不可中斷的、基于限時的等待,以及公平的或非公平的隊列操作。

每個Lock可以擁有任意數量的Condition對象。Condition對象繼承了相關的Lock對象的公平性,對于公平的鎖,線程會依照FIFO順序從

Condition.await

中釋放。

Condition的接口如下:

public interface Condition{
	void await() throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    void awaitUninterruptibly();
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}
           

注意,在Condition對象中,與(等待—喚醒機制中介紹的)内置鎖中

wait

notify

notifyAll

方法相對應的是

await

signal

signaAll

方法。因為Condition也繼承了Object,是以它也包含了wait、notify和notifyAll方法,在使用時一定要使用正确的版本。

使用Lock和Condition實作有界緩存(代碼來自參考[2])。使用兩個Condition,分别為notFull和notEmpty,用于表示“非滿”與“非空”兩個條件謂詞(使某個操作成為狀态依賴操作的前提,對下面的take方法來說,它的條件謂詞就是“緩存不空”,take方法在執行前必須首先測試該條件謂詞)。當緩存為空時,take将阻塞并等待notEmpty,此時put想notEmpty發送信号,可以解除任何在take中阻塞的線程。

public class ConditionBoundedBuffer <T> {
    protected final Lock lock = new ReentrantLock();
    // 條件謂詞: notFull (count < items.length)
    private final Condition notFull = lock.newCondition();
    // 條件謂詞: notEmpty (count > 0)
    private final Condition notEmpty = lock.newCondition();
    private static final int BUFFER_SIZE = 100;
    @GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
    @GuardedBy("lock") private int tail, head, count;

    // 阻塞并直到: notFull
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await(); // 阻塞目前線程在notFull的條件隊列上
            items[tail] = x;
            if (++tail == items.length)
                tail = 0;
            ++count;
            notEmpty.signal(); // 喚醒notEmpty條件隊列上的一個線程
        } finally {
            lock.unlock();
        }
    }

    // 阻塞并直到: notEmpty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 阻塞目前線程在notEmpty的條件隊列上
            T x = items[head];
            items[head] = null;
            if (++head == items.length)
                head = 0;
            --count;
            notFull.signal(); // 喚醒notFull條件隊列上的一個線程
            return x;
        } finally {
            lock.unlock();
        }
    }
}
           

通過将兩個條件謂詞分開并放到兩個等待線程集中,Condition使其更容易滿足單次通知的需求。signal将比signalAll更高效,它能極大地減少在每次緩存操作中發生的上下文切換與鎖請求的次數。因為如果使用内置鎖來實作,所有被阻塞的線程都将在一個隊列上等待。

小結

在開發并發程式時,是使用原生的synchronized還是

java.util.concurrent.*

下的顯式鎖Lock工具類,它們各有優劣還需要根據具體要求進行選擇。以上僅是學習筆記的整合,若有不明不白之處,還望各位看官指出,先在此謝過。

參考:

[1]極客時間專欄王寶令《Java并發程式設計實戰》

[2]Brian Goetz.Tim Peierls. et al.Java并發程式設計實戰[M].北京:機械工業出版社,2016

每天進步一點點,不要停止前進的腳步~