天天看點

【Java并發基礎】死鎖

前言

我們使用加鎖機制來保證線程安全,但是如果過度地使用加鎖,則可能會導緻死鎖。下面将介紹關于死鎖的相關知識以及我們在編寫程式時如何預防死鎖。

什麼是死鎖

學習作業系統時,給出死鎖的定義為兩個或兩個以上的線程在執行過程中,由于競争資源而造成的一種阻塞的現象,若無外力作用,它們都将無法推進下去。簡化一點說就是:一組互相競争資源的線程因為互相等待,導緻“永久”阻塞的現象。

下面我們通過一個轉賬例子來深入了解死鎖。

class Account {
    private int balance;
    // 轉賬
    void transfer(Account target, int amt){
        if (this.balance > amt) {
            this.balance -= amt;
            target.balance += amt;
        }
    } 
}
           

為了使以上轉賬方法transfer()不存在并發問題,很快地我們可以想使用Java的synchronized修飾transfer方法,于是代碼如下:

class Account {
    private int balance;
    // 轉賬
    synchronized void transfer(Account target, int amt){
        if (this.balance > amt) {
            this.balance -= amt;
            target.balance += amt;
        }
    } 
}
           

需要注意,這裡我們使用的内置鎖是

this

,這把鎖雖然可以保護我們自己的balance,卻不可以保護target的balance。使用我們上一篇介紹的鎖模型來描繪這個代碼就是下面這樣:(圖來自參考[1])

【Java并發基礎】死鎖

更具體來說,假設有 A、B、C 三個賬戶,餘額都是 200 元,我們用兩個線程分别執行兩個轉賬操作:賬戶 A 轉給賬戶 B 100 元,賬戶 B 轉給賬戶 C 100 元,最後我們期望的結果應該是賬戶 A 的餘額是 100 元,賬戶 B 的餘額是 200 元, 賬戶 C 的餘額是 300 元。

如果有兩個線程1和線程2,線程1 執行賬戶 A 轉賬戶 B 的操作,線程2執行賬戶 B 轉賬戶 C 的操作。這兩個線程分别運作在兩顆的CPU上,由于

this

這個鎖隻能保護自己的balance而不能保護别人的,線程 1 鎖定的是賬戶 A 的執行個體(A.this),而線程 2 鎖定的是賬戶 B 的執行個體(B.this),是以這兩個線程可以同時進入臨界區 transfer(),是以兩個線程沒有實作互斥。

出現可能的結果就為,兩個線程同時讀到賬戶B的餘額為200元,導緻最終賬戶 B 的餘額可能是 300(線程 1 後于線程 2 寫 B.balance,線程 2 寫的 B.balance 值被線程 1 覆寫),可能是 100(線程 1 先于線程 2 寫 B.balance,線程 1 寫的 B.balance 值被線程 2 覆寫)。在這種情況下,賬戶B的餘額就不會是 200。

并發轉賬示意圖(圖來自參考[1])

【Java并發基礎】死鎖

于是我們應該使用一個能夠覆寫所有保護資源的鎖,如果還記得我們上一篇講synchronized修飾靜态方法時預設的鎖對象的話,那這裡就很容易解決了。這個預設的鎖就是類的class對象。于是,我們就可以使用Account.class作為一個可以保護這個轉賬過程的鎖。

class Account {
    private int balance;
    // 轉賬
    void transfer(Account target, int amt){
        synchronized(Account.class) {
            if (this.balance > amt) {
                this.balance -= amt;
                target.balance += amt;
            }
        }
    } 
}
           

這個方案雖然不存在并發問題,但是所有賬戶的轉賬操作都是串行的。現實世界中,賬戶 A 轉賬戶 B、賬戶 C 轉賬戶 D 這兩個轉賬操作現實世界裡是可以并行的。較于實際情況來說,這個方案就顯得性能太差。

于是,我們盡量模仿現實世界的轉賬操作:

每個賬戶都有一個賬本,這些賬本都統一存放在檔案架上。當轉賬A給賬戶B轉賬時,櫃員會去拿A賬本和B賬本做登記,此時櫃員在拿賬本時會遇到三種情況:

  1. 檔案架上恰好有A賬本和B賬本,那就同時拿走;
  2. 如果檔案架上隻有A賬本和B賬本之一,那這個櫃員就先把檔案架上有的賬本拿到手,同時等着其他櫃員把另外一個賬本送回來;
  3. A賬本和B賬本都沒有,那這個櫃員就等着兩個賬本都被送回來

在程式設計實作中,我們可以使用兩把鎖來實作這個過程。在 transfer() 方法内部,我們首先嘗試鎖定轉出賬戶 this(先把A賬本拿到手),然後嘗試鎖定轉入賬戶 target(再把B賬本拿到手),隻有當兩者都成功時,才執行轉賬操作。

這個邏輯可以圖形化為下圖這個樣子,(圖來自參考[1]):

【Java并發基礎】死鎖

代碼如下:

class Account {
    private int balance;
    // 轉賬
    void transfer(Account target, int amt){
        // 鎖定轉出賬戶A
        synchronized(this) {              
            // 鎖定轉入賬戶B
            synchronized(target) {           
                if (this.balance > amt) {
                    this.balance -= amt;
                    target.balance += amt;
                }
            }
        }
    } 
}
           

經過這樣的優化後,賬戶 A 轉賬戶 B 和賬戶 C 轉賬戶 D 這兩個轉賬操作就可以并行了。

但是這樣卻會導緻死鎖。例如情況:櫃員張三做賬戶A轉賬戶B的轉賬操作,櫃員李四做賬戶B轉賬戶C的轉賬操作。他們兩個同時操作,于是就會出現下面這種情形:(圖來自參考[1])

【Java并發基礎】死鎖

他倆會一直等待對方将賬本放到檔案架上,造成一個一直僵持的局勢。

關于這種現象,我們還可以借助資源配置設定圖來可視化鎖的占用情況(資源配置設定圖是個有向圖,它可以描述資源和線程的狀态)。其中,資源用方形節點表示,線程用圓形節點表示;資源中的點指向線程的邊表示線程已經獲得該資源,線程指向資源的邊則表示線程請求資源,但尚未得到。(圖來自參考[1])

【Java并發基礎】死鎖

Java并發程式一旦死鎖,一般沒有特别好的方法,恢複應用程式的唯一方式就是中止并重新開機。是以,我們要盡量避免死鎖的發生,最好不要産生死鎖。要知道如何才能做到不要産生死鎖,我們首先要知道什麼條件會發生死鎖。

死鎖發生的四個必要條件

雖然程序在運作過程中,可能發生死鎖,但死鎖的發生也必須具備一定的條件,死鎖的發生必須具備以下四個必要條件:

  • 互斥,共享資源 X 和 Y 隻能被一個線程占用;
  • 占有且等待,線程 T1 已經取得共享資源 X,在等待共享資源 Y 的時候,不釋放共享資源 X;
  • 不可搶占,其他線程不能強行搶占線程 T1 占有的資源;
  • 循環等待,線程 T1 等待線程 T2 占有的資源,線程 T2 等待線程 T1 占有的資源,就是循環等待。

破壞死鎖發生的條件預防死鎖

隻有這四個條件都發生時才會出現死鎖,那麼反過來,也就是說隻要我們破壞其中一個,就可以成功預防死鎖的發生。

四個條件中我們不能破壞互斥,因為我們使用鎖目的就是保證資源被互斥通路,于是我們就對其他三個條件進行破壞:

  • 占用且等待:一次性申請所有的資源,這樣就不存在等待了。
  • 不可搶占,占用部分資源的線程進一步申請其他資源時,如果申請不到,可以主動釋放它占有的資源。
  • 循環等待,靠按序申請資源來預防。所謂按序申請,是指資源是有線性順序的,申請的時候可以先申請資源序号小的,再申請資源序号大的,這樣線性化申請後就不存在循環了。

下面我們使用這些方法去解決如上的死鎖問題。

破壞占用且等待條件

一次性申請完所有資源。我們設定一個管理者來管理賬本,櫃員同時申請需要的賬本,而管理者同時出他們需要的賬本。如果不能同時出借,則櫃員就需要等待。

【Java并發基礎】死鎖

“同時申請”:這個操作是一個臨界區,含有兩個操作,同時申請資源apply()和同時釋放資源free()。

class Allocator {
    private List<Object> als = new ArrayList<>();
    // 一次性申請所有資源
    synchronized boolean apply( Object from, Object to){
        if(als.contains(from) || als.contains(to)){    //from 或者 to賬戶被其他線程擁有
            return false;  
        } else {
            als.add(from);
            als.add(to);  
        }
        return true;
    }
    // 歸還資源
    synchronized void free(Object from, Object to){
        als.remove(from);
        als.remove(to);
    }
}

class Account {
    // actr 應該為單例,隻能由一個人來配置設定資源
    private Allocator actr;
    private int balance;
    // 轉賬
    void transfer(Account target, int amt){
        // 一次性申請轉出賬戶和轉入賬戶,直到成功
        while(!actr.apply(this, target))  //最好可以加個timeout避免一直循環
            ;
            try{
                // 鎖定轉出賬戶
                synchronized(this){ //存在客戶對自己賬戶的操作
                    // 鎖定轉入賬戶
                    synchronized(target){           
                        if (this.balance > amt){
                            this.balance -= amt;
                            target.balance += amt;
                        }
                    }
                }
            } finally {
                actr.free(this, target)    //釋放資源
            }
    }
}
           

破壞不可搶占條件

破壞不搶占要能夠主動釋放它占有的資源,但synchronized是做不到的。原因為synchronized申請不到資源時,線程直接進入了阻塞狀态,而線程進入了阻塞狀态也就沒有辦法釋放它占有的資源了。不過SDK中的java.util.concurrent提供了

Lock

解決這個問題。

支援定時的鎖

顯式使用

Lock

類中的定時

tryLock

功能來代替内置鎖機制,可以檢測死鎖和從死鎖中恢複過來。使用内置鎖的線程擷取不到鎖會被阻塞,而顯式鎖可以指定一個逾時時限(Timeout),在等待超過該時間後tryLock就會傳回一個失敗資訊,也會釋放其擁有的資源。

破壞循環等待條件

破壞這個條件,需要對資源進行排序,然後按序申請資源。我們假設每個賬戶都有不同的屬性 id,這個 id 可以作為排序字段,申請的時候,我們可以按照從小到大的順序來申請。

比如下面代碼中,①~⑤處的代碼對轉出賬戶(this)和轉入賬戶(target)排序,然後按照序号從小到大的順序鎖定賬戶。這樣就不存在“循環”等待了。

class Account {
    private int id;
    private int balance;
    // 轉賬
    void transfer(Account target, int amt){
        Account left = this            // ①
            Account right = target;    // ②
        if (this.id > target.id) {     // ③
            left = target;             // ④
            right = this;              // ⑤
        }                          
        // 鎖定序号小的賬戶
        synchronized(left){
            // 鎖定序号大的賬戶
            synchronized(right){ 
                if (this.balance > amt){
                    this.balance -= amt;
                    target.balance += amt;
                }
            }
        }
    } 
}
           

小結

記得學習作業系統時還有避免死鎖,其和預防死鎖的差別在于:預防死鎖是設法至少破壞産生死鎖的四個必要條件之一,嚴格地防止死鎖的出現,但是這也會使系統性能降低;而避免死鎖則不那麼嚴格的限制産生死鎖的必要條件的存在,因為即使死鎖的必要條件存在,也不一定發生死鎖,死鎖避免是在系統運作過程中注意避免死鎖的最終發生。避免死鎖的經典算法就是銀行家算法,這裡就不擴開介紹了。

還有一個避免出現死鎖的結論:如果所有線程以固定順序來獲得鎖,那麼在程式中就不會出現鎖順序死鎖問題。檢視參考[4]了解。

我們使用細粒度鎖鎖住多個資源時,要注意死鎖的産生。隻有先嗅到死鎖的味道,才有我們的施展之地。

參考:

[1]極客時間專欄王寶令《Java并發程式設計實戰》

[2]Brian Goetz.Tim Peierls. et al.Java并發程式設計實戰[M].北京:機械工業出版社,2016

[3]iywwuyifan.避免死鎖和預防思索的差別.https://blog.csdn.net/masterchiefcc/article/details/83303813

[4]AddoilDan.死鎖面試題(什麼是死鎖,産生死鎖的原因及必要條件).https://blog.csdn.net/hd12370/article/details/82814348

每天進步一點點,不要停止前進的腳步~