天天看點

多線程:線程安全的實作方法。互斥同步非阻塞同步無同步方案

互斥同步

互斥同步(Mutual Exclusion&Synchronization)是常見的一種并發正确性保障手段。同步是指多個線程并發通路資料時,保證共享資料在同一個時刻隻被一個線程(或者是一些,使用信号量的時候)使用。而互斥是實作同步的一種手段,臨界區(Critical Section)、互斥量(Mutex)和信号量(Semaphore)都是主要的互斥實作方式。是以,在這4個字裡面,互斥是因,同步是果;互斥是方法,同步是目的。

在Java中,最基本的互斥同步手段就是synchronized關鍵字,synchronized關鍵字經過編譯之後,會在同步塊的前後分别形成monitorenter和monitorexit這兩個位元組碼指令,這兩個位元組碼都需要一個reference類型的參數來指明要鎖定和解鎖的對象。如果Java程式中的synchronized明确指定了對象參數,那就是這個對象的reference;如果沒有明确指定,那就根據synchronized修飾的是執行個體方法還是類方法,去取對應的對象執行個體來作為鎖對象。

根據虛拟機規範的要求,在執行monitorenter指令時,首先要嘗試擷取對象的鎖。如果這個對象沒被鎖定,或者目前線程已經擁有了那個對象的鎖,把鎖的計數器加1,相應的,在執行monitorexit指令時會将鎖計數器減1,當計數器加0時,鎖就被釋放。如果擷取對象鎖失敗,那目前線程就要阻塞等待,直到對象鎖被另外一個線程釋放為止。

在虛拟機規範對monitorenter和monitorexit的行為描述中,有兩點是需要特别注意的。首先,synchronized同步塊對同一條線程來說是可重入的,不會出現自己把自己鎖死的問題。其次,同步塊在已進入的線程執行完之前,會阻塞後面其他線程的進入。Java的線程是映射到作業系統的原生線程之上的,如果要阻塞或喚醒一個線程,都需要作業系統來幫忙完成,這就需要從使用者态轉換到核心态中,是以狀态轉換需要耗費很多的處理器時間。對于代碼簡單的同步塊(如被synchronized修飾的getter()或setter()方法),狀态轉換消耗的時間有可能比使用者代碼執行的時間還要長。是以synchronized是java語言中的一個重量級(Heavyweight)操作,有經驗的程式員都會在确定必要的情況下才使用這種操作。而虛拟機本身也會進行一些優化,譬如在通知作業系統阻塞線程之前加入一段自旋等待過程,避免頻繁的切入到核心态之中。

除了synchronized之外,我們還可以使用java.util.concurrent(下文稱J.U.C)包中的重入鎖(ReentrantLock)來實作同步,在基本用法上,ReentrantLock與synchronized很相似,他們都具備一樣的線程重入特性,隻是代碼寫法上有點差別,一個表現為API層面的互斥鎖(lock()和unlock()方法配合try/finally語句塊來完成),另一個表現為原生文法層面的互斥鎖。不過,相對于synchronized,ReentrantLock增加了一些進階功能,主要有以下3項:等待可中斷、可實作公平鎖,以及鎖可以綁定多個條件。

等待可中斷是指當持有鎖的線程長期不釋放鎖的時候,正在等待的線程可以選擇放棄等待,改為處理其他事情,可中斷特性對處理時間非常長的同步塊很有幫助。

公平鎖是指多個線程在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖;而非公平鎖則不保證這一點,在鎖被釋放時,任何一個等待鎖的線程都有機會獲得鎖。synchronized中的鎖是非公平的,ReentrantLock預設情況下也是非公平的,但可以通過帶有布爾值的構造函數要求使用公平鎖。

鎖綁定多個條件是指一個ReentrantLock對象可以同時綁定多個Condition對象,而在synchronized中,鎖對象的wait()和notify()或notifyAll()方法可以實作一個隐含的條件,如果要和多于一個的條件關聯的時候,就不得不額外的添加一個鎖,而ReentrantLock則無需這樣做,隻需要多次調用newCondition()方法即可。

如果需要使用上述功能,選用ReentrantLock是一個很好的選擇,那如果是基于性能考慮呢?關于synchronized和ReentrantLock的性能問題,Brian Goetz對這兩種鎖在JDK 1.5與單核處理器,以及JDK 1.5與雙Xeon處理器環境下做了一組吞吐量對比的實驗,實驗結果如下圖所示。

多線程:線程安全的實作方法。互斥同步非阻塞同步無同步方案
多線程:線程安全的實作方法。互斥同步非阻塞同步無同步方案

從上面兩個圖可以看出,多線程環境下synchronized的吞吐量下降的非常嚴重,而ReentrantLock則能基本保持在同一個比較穩定的水準上。與其說ReentrantLock性能好,還不如說synchronized還有非常大的優化餘地。後續的技術發展也證明這一點,JDK 1.6中加入了很多針對鎖的優化措施,JDK 1.6釋出之後,人們就發現synchronized與ReentrantLock的性能基本上是完全持平了。是以,如果讀者的程式是使用JDK 1.6或以上部署的話,性能因素不再是選擇ReentrantLock的理由了,虛拟機在未來的性能改進中肯定也會更加偏向于原生的synchronized,是以還是提倡在synchronized能實作需求的情況下,優先考慮使用synchronized來進行同步。

非阻塞同步

互斥同步最主要的問題就是進行線程阻塞和喚醒所帶來的性能問題,是以這種同步也稱為阻塞同步(Blocking Synchronization)。從處理問題的方式上說,互斥同步屬于一種悲觀的并發政策,總是認為隻要不去做正确的同步措施(例如加鎖),那就肯定會出現問題,無論共享資料是否真的會出現競争,他都要進行加鎖(這裡讨論的是概念模型,實際上虛拟機會優化掉很大一部分不必要的加鎖)、使用者态核心轉換、維護鎖計時器和檢查是否有被阻塞的線程需要喚醒等操作。随着硬體指令集的發展,我們有了另外一個選擇:基于沖突檢測的樂觀并發政策,通俗的說,就是先進行操作,如果沒有其他線程争用共享資料,那操作就成功了;如果共享資料有争用,産生了沖突,那就再采取其他的補償措施(最常見的補償措施就是不斷的重試,直到成功為止),這種樂觀的并發政策的許多實作都不需要把線程挂起,是以這種同步操作稱為非阻塞同步(Non-Blocking Synchronization)。

為什麼這裡說使用樂觀并發政策需要“硬體指令集的發展”才能進行呢?因為我們需要操作和沖突檢測這兩個步驟具備原子性,靠什麼來保證呢?如果這裡再使用互斥同步來保證就失去意義了,是以我們隻能靠硬體來完成這件事情,硬體保證一個從語義上看起來需要多次操作的行為隻通過一條處理器指令就能完成,這類指令常用的有:

  • 測試并設定(Test-and-Set)。
  • 擷取并增加(Fetch-and-Increment)。
  • 交換(Swap)。
  • 比較并交換(Compare-and-Swap,下文稱CAS)。
  • 加載連結/條件存儲(Load-Linked/Store-Conditional,下文稱LL/SC)。

其中,前面的3條是20世紀就已經存在于大多數指令集之中的處理器指令,後面的兩條是現代處理器新增的,而且這兩條指令的目的和功能是類似的。在IA64、x86指令集中有cmpxchg指令完成CAS功能,在sparc-TSO也有casa指令實作,而在ARM和PowerPC架構下,則需要使用一堆ldrex/strex指令來完成LL/SC的功能。

CAS指令需要有3個操作數,分别是記憶體位置(在Java中可以簡單了解為變量的記憶體位址,用V表示)、舊的預期值(用A表示)和新值(用B表示)。CAS指令執行時,當且僅當V符合舊預期值A時,處理器用新值B更新V的值,否則他就不執行更新,但是無論是否更新了V的值,都會傳回V的舊值,上述的處理過程是一個原子操作。

在JDK 1.5之後,Java程式中才可以使用CAS操作,該操作由sun.misc.Unsafe類裡面的compareAndSwapInt()和compareAndSwapLong()等幾個方法包裝提供,虛拟機在内部對這些方法做了特殊處理,即時編譯出來的結果就是一條平台相關的處理器CAS指令,沒有方法調用的過程,或者可以認為是無條件内聯進去了。

由于Unsafe類不是提供使用者程式調用的類(Unsafe.getUnsafe()的代碼中限制了隻有啟動類加載器(Bootstrap ClassLoader)加載的Class才能通路他),是以,如果不采用反射手段,我們隻能通過其他的Java API來簡介使用它,如J.U.C包裡面的整數原子類,其中的compareAndSet()和getAndIncrement()等方法都使用了Unsafe類的CAS操作。

public class VolatileTest {

    public static volatile int race = 0;

    public static void increase() {
        race++;
    }

    private static final int THREADS_COUNT = 20;

    public static void main(String[] args) {
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10000; i++) {
                        increase();
                    }
                }
            });
            threads[i].start();
        }

        // 等待所有累加線程都結束
        while (Thread.activeCount() > 1)
            Thread.yield();

        System.out.println(race);
    }
}
           

我們不妨拿上面一段代碼來看看如何使用CAS操作來避免阻塞同步。我們曾經通過這段20個線程自增10000次的代碼來證明volatile變量不具備原子性,那麼如何才能讓他具備原子性呢?把“race++”操作或increase()方法用同步塊包裹起來當然是一個方法,但是如果改成下面的代碼,那效率将會提高許多。

public class AtomicTest {

    public static AtomicInteger race = new AtomicInteger(0);

    public static void increase() {
        race.incrementAndGet();
    }

    private static final int THREADS_COUNT = 20;

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10000; i++) {
                        increase();
                    }
                }
            });
            threads[i].start();
        }

        while (Thread.activeCount() > 1)
            Thread.yield();

        System.out.println(race);
    }
}
           

運作結果如下:

200000

使用AtomicInteger代替int後,程式輸出了正确的結果,一切都要歸功于incrementAndGet()方法的原子性。他的實作其實非常簡單,如下所示。

/**
     * Atomically increment by one the current value.
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }
           

incrementAndGet()方法在一個無限循環中,不斷嘗試将一個比目前值大1的新值賦給自己。如果失敗了,那說明在執行“擷取-設定”操作的時候值已經有了修改,于是再次循環進行下一次操作,直到設定成功為止。

盡管CAS看起來很美,但顯然這種操作無法涵蓋互斥同步的所有使用場景,并且CAS從語義上來說并不是完美的,存在這樣的一個邏輯漏洞:如果一個變量V初次讀取的時候是A值,并且在準備指派的時候檢查到他仍然為A值,那我們就能說他的值沒有被其他線程改變過了嗎?如果在這段期間他的值曾經被改成了B,後來又被改回為A,那CAS操作就會誤認為他從來沒有被改變過,這個漏洞稱為CAS操作的“ABA”問題。J.U.C包為了解決這個問題,提供了一個帶有标記的原子引用類(AtomicStampedReference),他可以通過控制變量值的版本來保證CAS的正确性。不過目前來說這個類比較“雞肋”,大部分情況下ABA問題不會影響程式并發的準确性,如果需要解決ABA問題,改用傳統的互斥同步可能會比原子類更高效。

無同步方案

要保證線程安全,并不是一定就要進行同步,兩者沒有因果關系。同步隻是保證共享資料争用時的正确性的手段,如果一個方法本來就不涉及共享資料,那他自然就無需任何同步措施去保證正确性,是以會有一些代碼天生就是線程安全的,這裡簡單的介紹其中兩類。

可重入代碼(Reentrant Code)

這種代碼也叫做純代碼(Pure Code),可以在代碼執行的任何時刻中斷他,轉而去執行另外一段代碼(包括遞歸調用他本身),而在控制權傳回後,原來的程式不會出現任何錯誤。相對線程安全來說,可重入性是更基本的特性,他可以保證線程安全,即所有的可重入的代碼都是線程安全的,但是并非所有的線程安全的代碼都是可重入的。

可重入代碼有一些共同的特征,例如不依賴存儲在堆上的資料和公用的系統資源、用到的狀态量都由參數中傳入、不調用非可重入的方法等。我們可以通過一個簡單的原則來判斷代碼是否具備可重入性:如果一個方法,他的傳回結果是可以預測的,隻要輸入了相同的資料,就都能傳回相同的結果,那他就滿足可重入性的要求,當然也就是線程安全的。

線程本地存儲(Thread Local Storage)

如果一段代碼中所需要的資料必須與其他代碼共享,那就看看這些共享資料的代碼是否能保證在同一個線程中執行?如果能保證,我們就可以把共享資料的可見範圍限制在同一個線程之内,這樣,無需同步也能保證線程之間不出現資料争用的問題。

符合這種特點的應用并不少見,大部分使用消費隊列的架構模式(如“生産者-消費者”模式)都會将産品的消費過程盡量在一個線程中消費完,其中最重要的一個應用執行個體就是經典Web互動模型中的“一個請求對應一個伺服器線程”(Thread-per-Request)的處理方式,這種處理方式的廣泛應用使得很多Web服務端應用都可以使用線程本地存儲來解決線程安全問題。

Java語言中,如果一個變量要被多線程通路,可以使用volatile關鍵字聲明他為“易變的”;如果一個變量要被某個線程獨享,Java中就沒有類似C++中_declspec(thread)這樣的關鍵字,不過還是可以通過java.lang.ThreadLocal類來實作線程本地存儲的功能。每一個線程的Thread對象中都有一個ThreadLocalMap對象,這個對象存儲了一組以ThreadLocal.threadLocalHashCode為鍵,以本地線程變量為值的K-V值對,ThreadLocal對象就是目前線程的THreadLocalMap的通路入口,每一個ThreadLocal對象都包含了一個獨一無二的threadLocalHashCode值,使用這個值就可以線上程K-V值對中找回對應的本地線程變量。