天天看點

高并發場景下優化加鎖方式:線程等待與通知機制

摘要:很多時候,我們在并發程式設計中,涉及到加鎖操作時,對代碼塊的加鎖操作真的合理嗎?還有沒有需要優化的地方呢?

本文分享自華為雲社群《​​【高并發】講講高并發場景下如何優化加鎖方式?​​》,作者: 冰 河 。

互斥條件、不可剝奪條件、請求與保持條件、循環等待條件,這是産生死鎖時的四個必要條件,隻有四個條件同時具備時才能發生死鎖。其中,我們在阻止請求與保持條件時,采用了一次性申請所有的資源的方式。例如在我們完成轉賬操作的過程中,我們一次性申請賬戶A和賬戶B,兩個賬戶都申請成功後,再執行轉賬的操作。其中,在我們實作的轉賬方法中,使用了死循環來循環擷取資源,直到同時擷取到賬戶A和賬戶B為止,核心代碼如下所示。

//一次申請轉出賬戶和轉入賬戶,直到成功
while(!requester.applyResources(this, target)){
    //循環體為空
    ;
}      

如果ResourcesRequester類的applyResources()方法執行的時間非常短,并且程式并發帶來的沖突不大,程式循環幾次到幾十次就可以同時擷取到轉出賬戶和轉入賬戶,這種方案就是可行的。

但是,如果ResourcesRequester類的applyResources()方法執行的時間比較長,或者說,程式并發帶來的沖突比較大,此時,可能需要循環成千上萬次才能同時擷取到轉出賬戶和轉入賬戶。這樣就太消耗CPU資源了,此時,這種方案就是不可行的了。

那麼,有沒有什麼方式對這種方案進行優化呢?

問題分析

既然使用死循環一直擷取資源這種方案存在問題,那我們換位思考一下。當線程執行時,發現條件不滿足,是不是可以讓線程進入等待狀态?當條件滿足的時候,通知等待的線程重新執行?

也就是說,如果線程需要的條件不滿足,我們就讓線程進入等待狀态;如果線程需要的條件滿足時,我們再通知等待的線程重新執行。這樣,就能夠避免程式進行循環等待進而消耗CPU的問題。

那麼,問題又來了!當條件不滿足時,如何實作讓線程等待?當條件滿足時,又如何喚醒線程呢?

不錯,這是個問題!不過這個問題解決起來也非常簡單。簡單的說,就是使用線程的等待與通知機制。

線程的等待與通知機制

我們可以使用線程的等待與通知機制來優化阻止請求與保持條件時,循環擷取賬戶資源的問題。具體的等待與通知機制如下所示。

執行的線程首先擷取互斥鎖,如果線程繼續執行時,需要的條件不滿足,則釋放互斥鎖,并進入等待狀态;當線程繼續執行需要的條件滿足時,就通知等待的線程,重新擷取互斥鎖。

那麼,說了這麼多,Java支援這種線程的等待與通知機制嗎?其實,這個問題問的就有點廢話了,Java這麼優秀(牛逼)的語言肯定支援啊,而且實作起來也比較簡單。

用Java實作線程的等待與通知機制

實作方式

其實,使用Java語言實作線程的等待與通知機制有多種方式,這裡我就簡單的列舉一種方式,其他的方式大家可以自行思考和實作,有不懂的地方也可以問我!

在Java語言中,實作線程的等待與通知機制,一種簡單的方式就是使用synchronized并結合wait()、notify()和notifyAll()方法來實作。

實作原理

我們使用synchronized加鎖時,隻允許一個線程進入synchronized保護的代碼塊,也就是臨界區。如果一個線程進入了臨界區,則其他的線程會進入阻塞隊列裡等待,這個阻塞隊列和synchronized互斥鎖是一對一的關系,也就是說,一把互斥鎖對應着一個獨立的阻塞隊列。

在并發程式設計中,如果一個線程獲得了synchronized互斥鎖,但是不滿足繼續向下執行的條件,則需要進入等待狀态。此時,可以使用Java中的wait()方法來實作。當調用wait()方法後,目前線程就會被阻塞,并且會進入一個等待隊列中進行等待,這個由于調用wait()方法而進入的等待隊列也是互斥鎖的等待隊列。而且,線程在進入等待隊列的同時,會釋放自身獲得的互斥鎖,這樣,其他線程就有機會獲得互斥鎖,進而進入臨界區了。整個過程可以表示成下圖所示。

高并發場景下優化加鎖方式:線程等待與通知機制

當線程執行的條件滿足時,可以使用Java提供的notify()和notifyAll()方法來通知互斥鎖等待隊列中的線程,我們可以使用下圖來簡單的表示這個過程。

高并發場景下優化加鎖方式:線程等待與通知機制

這裡,需要注意如下事項:

(1)使用notify()和notifyAll()方法通知線程時,調用notify()和notifyAll()方法時,滿足線程的執行條件,但是當線程真正執行的時候,條件可能已經不再滿足了,可能有其他線程已經進入臨界區執行。

(2)被通知的線程繼續執行時,需要先擷取互斥鎖,因為在調用wait()方法等待時已經釋放了互斥鎖。

(3)wait()、notify()和notifyAll()方法操作的隊列是互斥鎖的等待隊列,如果synchronized鎖定的是this對象,則一定要使用this.wait()、this.notify()和this.notifyAll()方法;如果synchronized鎖定的是target對象,則一定要使用target.wait()、target.notify()和target.notifyAll()方法。

(4)wait()、notify()和notifyAll()方法調用的前提是已經擷取了相應的互斥鎖,也就是說,wait()、notify()和notifyAll()方法都是在synchronized方法中或代碼塊中調用的。如果在synchronized方法外或代碼塊外調用了三個方法,或者鎖定的對象是this,使用target對象調用三個方法的話,JVM會抛出java.lang.IllegalMonitorStateException異常。

具體實作

實作邏輯

在實作之前,我們還需要考慮以下幾個問題:

  • 選擇哪個互斥鎖

在之前的程式中,我們在TansferAccount類中,存在一個ResourcesRequester 類的單例對象,是以,我們是可以使用this作為互斥鎖的。這一點大家需要重點了解。

  • 線程執行轉賬操作的條件

轉出賬戶和轉入賬戶都沒有被配置設定過。

  • 線程什麼時候進入等待狀态

線程繼續執行需要的條件不滿足的時候,進入等待狀态。

  • 什麼時候通知等待的線程執行

當存線上程釋放賬戶的資源時,通知等待的線程繼續執行。

綜上,我們可以得出以下核心代碼。

while(不滿足條件){
    wait();
}      

那麼,問題來了!為何是在while循環中調用wait()方法呢?因為當wait()方法傳回時,有可能線程執行的條件已經改變,也就是說,之前條件是滿足的,但是現在已經不滿足了,是以要重新檢驗條件是否滿足。

實作代碼

我們優化後的ResourcesRequester類的代碼如下所示。

public class ResourcesRequester{
    //存放申請資源的集合
    private List<Object> resources = new ArrayList<Object>();
    //一次申請所有的資源
    public synchronized void applyResources(Object source, Object target){
        while(resources.contains(source) || resources.contains(target)){
            try{
                wait();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        resources.add(source);
        resources.add(targer);
    }
 
    //釋放資源
    public synchronized void releaseResources(Object source, Object target){
        resources.remove(source);
        resources.remove(target);
        notifyAll();
    }
}      

生成ResourcesRequester單例對象的Holder類ResourcesRequesterHolder的代碼如下所示。

public class ResourcesRequesterHolder{
    private ResourcesRequesterHolder(){}
 
    public static ResourcesRequester getInstance(){
        return Singleton.INSTANCE.getInstance();
    }
    private enum Singleton{
        INSTANCE;
        private ResourcesRequester singleton;
        Singleton(){
            singleton = new ResourcesRequester();
        }
        public ResourcesRequester getInstance(){
            return singleton;
        }
    }
}      

執行轉賬操作的類的代碼如下所示。

public class TansferAccount{
    //賬戶的餘額
    private Integer balance;
    //ResourcesRequester類的單例對象
    private ResourcesRequester requester;
 
    public TansferAccount(Integer balance){
        this.balance = balance;
        this.requester = ResourcesRequesterHolder.getInstance();
    }
    //轉賬操作
    public void transfer(TansferAccount target, Integer transferMoney){
        //一次申請轉出賬戶和轉入賬戶,直到成功
        requester.applyResources(this, target))
        try{
            //對轉出賬戶加鎖
            synchronized(this){
                //對轉入賬戶加鎖
                synchronized(target){
                    if(this.balance >= transferMoney){
                        this.balance -= transferMoney;
                        target.balance += transferMoney;
                    }   
                }
            }
        }finally{
            //最後釋放賬戶資源
            requester.releaseResources(this, target);
        }
    }
}      

可以看到,我們在程式中通知處于等待狀态的線程時,使用的是notifyAll()方法而不是notify()方法。那notify()方法和notifyAll()方法兩者有什麼差別呢?

notify()和notifyAll()的差別

  • notify()方法

随機通知等待隊列中的一個線程。

  • notifyAll()方法

通知等待隊列中的所有線程。

繼續閱讀