天天看點

《Java 并發程式設計的藝術》

并發程式設計的挑戰

1.1 上下文切換

CPU通過時間片配置設定算法來循環執行任務,目前任務執行一個時間片後會切換到下一個

任務。但是,在切換前會儲存上一個任務的狀态,以便下次切換回這個任務時,可以再加載這

個任務的狀态。是以任務從儲存到再加載的過程就是一次上下文切換。

如何減少上下文切換:

  • 無鎖并發程式設計。多線程競争鎖時,會引起上下文切換,是以多線程處理資料時,可以用一

    些辦法來避免使用鎖,如将資料的ID按照Hash算法取模分段,不同的線程處理不同段的資料。

  • CAS算法。Java的Atomic包使用CAS算法來更新資料,而不需要加鎖。
  • 使用最少線程和使用協程。避免建立不需要的線程,比如任務很少,但是建立了很多線程來處理,這

    樣會造成大量線程都處于等待狀态。

1.2 死鎖

現在我們介紹避免死鎖的幾個常見方法。

  • 避免一個線程同時擷取多個鎖。
  • ·避免一個線程在鎖内同時占用多個資源,盡量保證每個鎖隻占用一個資源。
  • 嘗試使用定時鎖,使用lock.tryLock(timeout)來替代使用内部鎖機制。
  • 對于資料庫鎖,加鎖和解鎖必須在一個資料庫連接配接裡,否則會出現解鎖失敗的情況。

1.3 資源限制的挑戰

  • 資源限制是指在進行并發程式設計時,程式的執行速度受限于計算機硬體資源或軟體資源。
  • 在并發程式設計中,将代碼執行速度加快的原則是将代碼中串行執行的部分變成并發執行,但是如果将某段串行的代碼并發執行,因為受限于資源,仍然在串行執行,這時候程式不僅不會加快執行,反而會更慢,因為增加了上下文切換和資源排程的時間。
  • 對于硬體資源限制,可以考慮使用叢集并行執行程式。
  • 對于軟體資源限制,可以考慮使用資源池将資源複用。比如使用連接配接池将資料庫和Socket連接配接複用,或者在調用對方webservice接口擷取資料時,隻建立一個連接配接。

Java并發機制的底層實作原理

2.1 volatile的應用

  1. volatile的定義與實作原理

    下面來具體講解volatile的兩條實作原則。

    1)Lock字首指令會引起處理器緩存回寫到記憶體。

    2)一個處理器的緩存回寫到記憶體會導緻其他處理器的緩存無效。

2.2 synchronized的實作原理與應用

先來看下利用synchronized實作同步的基礎:Java中的每一個對象都可以作為鎖。具體表現

為以下3種形式。

·對于普通同步方法,鎖是目前執行個體對象。

·對于靜态同步方法,鎖是目前類的Class對象。

·對于同步方法塊,鎖是Synchonized括号裡配置的對象。

代碼塊同步是使用monitorenter和monitorexit指令實作的。

方法級的同步是隐式,即無需通過位元組碼指令來控制的,它實作在方法調用和傳回操作之中。JVM可以從方法常量池中的方法表結構(method_info Structure) 中的 ACC_SYNCHRONIZED 通路标志區分一個方法是否同步方法。當方法調用時,調用指令将會 檢查方法的ACC_SYNCHRONIZED 通路标志是否被設定,如果設定了,執行線程将先持有monitor(虛拟機規範中用的是管程一詞),然後再執行方法,最後再方法完成(無論是正常完成還是非正常完成)時釋放monitor。在方法執行期間,執行線程持有了monitor,其他任何線程都無法再獲得同一個monitor。如果一個同步方法執行期間抛出了異常,并且在方法内部無法處理此異常,那這個同步方法所持有的monitor将在異常抛到同步方法之外時自動釋放。

Java對象頭

《Java 并發程式設計的藝術》

在運作期間,Mark Word裡存儲的資料會随着鎖标志位的變化而變化。Mark Word可能變化為存儲以下4種資料,如表2-4所示。

《Java 并發程式設計的藝術》

鎖的更新與對比

在Java SE 1.6中,鎖一共有4種狀态,級别從低到高依次是:無鎖狀态、偏向鎖狀态、輕量級鎖狀态和重量級鎖狀态。

偏向鎖

當一個線程通路同步塊并擷取鎖時,會在對象頭和棧幀中的鎖記錄裡存儲鎖偏向的線程ID,以後該線程在進入和退出同步塊時不需要進行CAS操作來加鎖和解鎖,隻需簡單地測試一下對象頭的Mark

Word裡是否存儲着指向目前線程的偏向鎖。如果測試成功,表示線程已經獲得了鎖。如果測試失敗,則需要再測試一下Mark

Word中偏向鎖的辨別是否設定成1(表示目前是偏向鎖):如果沒有設定,則使用CAS競争鎖;如果設定了,則嘗試使用CAS将對象頭的偏向鎖指向目前線程。

輕量級鎖

輕量級鎖加鎖:

線程在執行同步塊之前,JVM會先在目前線程的棧桢中建立用于存儲鎖記錄的空間,并将對象頭中的Mark Word複制到鎖記錄中,官方稱為Displaced Mark Word。然後線程嘗試使用CAS将對象頭中的Mark Word替換為指向鎖記錄的指針。如果成功目前線程獲得鎖,如果失敗,表示其他線程競争鎖,目前線程便嘗試使用自旋來擷取鎖。

輕量級鎖解鎖:

輕量級解鎖時,會使用原子的CAS操作将Displaced Mark Word替換回到對象頭,如果成功,則表示沒有競争發生。如果失敗,表示目前鎖存在競争,鎖就會膨脹成重量級鎖。

鎖的優缺點對比

《Java 并發程式設計的藝術》

2.3 原子操作的實作原理

原子(atomic)本意是“不能被進一步分割的最小粒子”,而原子操作(atomic operation)意為“不可被中斷的一個或一系列操作”。在多處理器上實作原子操作就變得有點複雜。

術語定義

CPU術語定義

《Java 并發程式設計的藝術》

.處理器如何實作原子操作

  • 使用總線鎖保證原子性

    處理器使用總線鎖就是來解決這個問題的。所謂總線鎖就是使用處理器提供的一個LOCK#信号,當一個處理器在總線上輸出此信号時,其他處理器的請求将被阻塞住,那麼該處理器可以獨占共享記憶體。

  • 使用緩存鎖保證原子性

    所謂“緩存鎖定”是指記憶體區域如果被緩存在處理器的緩存行中,并且在Lock操作期間被鎖定,那麼當它執行鎖操作回寫到記憶體時,處理器不在總線上聲言LOCK#信号,而是修改内部的記憶體位址,并允許它的緩存一緻性機制來保證操作的原子性。

但是有兩種情況下處理器不會使用緩存鎖定。

第一種情況是:當操作的資料不能被緩存在處理器内部,或操作的資料跨多個緩存行(cache line)時,則處理器會調用總線鎖定。

第二種情況是:有些處理器不支援緩存鎖定。對于Intel 486和Pentium處理器,就算鎖定的記憶體區域在處理器的緩存行中也會調用總線鎖定。

Java如何實作原子操作

1.使用循環CAS實作原子操作

在Java中可以通過鎖和循環CAS的方式來實作原子操作。

public class Counter {
    private AtomicInteger atomicI = new AtomicInteger(0);
    private int i = 0;
    public static void main(String[] args) {
        final Counter cas = new Counter();
        List<Thread> ts = new ArrayList<Thread>(600);
        long start = System.currentTimeMillis();
        for (int j = 0; j < 100; j++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10000; i++) {
                        cas.count();
                        cas.safeCount();
                    }
                }
            });
            ts.add(t);
        }
        for (Thread t : ts) {
            t.start();
        }
// 等待所有線程執行完成
        for (Thread t : ts) {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(cas.i);
        System.out.println(cas.atomicI.get());
        System.out.println(System.currentTimeMillis() - start);
    }
    /** * 使用CAS實作線程安全計數器 */
    private void safeCount() {
        for (;;) {
            int i = atomicI.get();
            boolean suc = atomicI.compareAndSet(i, ++i);
            if (suc) {
                break;
            }
        }
    }
    /**
     * 非線程安全計數器
     */
    private void count() {
        i++;
    }
}
           

從Java 1.5開始,JDK的并發包裡提供了一些類來支援原子操作,如AtomicBoolean(用原子方式更新的boolean值)、AtomicInteger(用原子方式更新的int值)和AtomicLong(用原子方式更新的long值)。這些原子包裝類還提供了有用的工具方法,比如以原子的方式将目前值自增1和自減1。

2.CAS實作原子操作的三大問題

  • ABA問題

    A→B→A 值已經被修改 ,但是實際檢測沒有變化。

    ABA問題的解決思路就是使用版本号。在變量前面追加上版本号,每次變量更新的時候把版本号加1,那麼A→B→A就會變成1A→2B→3A。

  • 循環時間長開銷大

    自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。

  • 隻能保證一個共享變量的原子操作

    當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖。

    還有一個取巧的辦法,就是把多個共享變量合并成一個共享變量來

    操作。比如,有兩個共享變量i=2,j=a,合并一下ij=2a,然後用CAS來操作ij。

  • 使用鎖機制實作原子操作

    鎖機制保證了隻有獲得鎖的線程才能夠操作鎖定的記憶體區域。JVM内部實作了很多種鎖機制,有偏向鎖、輕量級鎖和互斥鎖。

    有意思的是除了偏向鎖,JVM實作鎖的方式都用了循環CAS,即當一個線程想進入同步塊的時候使用循環CAS的方式來擷取鎖,當它退出同步塊的時候使用循環CAS釋放鎖。

Java記憶體模型

3.1 Java記憶體模型的基礎

3.1.1 并發程式設計模型的兩個關鍵問題

在并發程式設計中,需要處理兩個關鍵問題:線程之間如何通信及線程之間如何同步(這裡的線程是指并發執行的活動實體)。通信是指線程之間以何種機制來交換資訊。在指令式程式設計中,線程之間的通信機制有兩種:共享記憶體和消息傳遞。

在共享記憶體的并發模型裡,線程之間共享程式的公共狀态,通過寫-讀記憶體中的公共狀态進行隐式通信。在消息傳遞的并發模型裡,線程之間沒有公共狀态,線程之間必須通過發送消息來顯式進行通信。

同步是指程式中用于控制不同線程間操作發生相對順序的機制。在共享記憶體并發模型裡,同步是顯式進行的。程式員必須顯式指定某個方法或某段代碼需要線上程之間互斥執行。

在消息傳遞的并發模型裡,由于消息的發送必須在消息的接收之前,是以同步是隐式進行的。

3.1.2 Java記憶體模型的抽象結構

在Java中,所有執行個體域、靜态域和數組元素都存儲在堆記憶體中,堆記憶體線上程之間共享(本章用“共享變量”這個術語代指執行個體域,靜态域和數組元素)。

局部變量(Local Variables),方法定義參數(Formal Method Parameters)和異常處理器參數(Exception

Handler Parameters)不會線上程之間共享,它們不會有記憶體可見性問題,也不受記憶體模型的影響。

Java線程之間的通信由Java記憶體模型(本文簡稱為JMM)控制,JMM決定一個線程對共享變量的寫入何時對另一個線程可見。

Java記憶體模型的抽象結構示意圖:

《Java 并發程式設計的藝術》

從上圖來看,如果線程A與線程B之間要通信的話,必須要經曆下面2個步驟。

1)線程A把本地記憶體A中更新過的共享變量重新整理到主記憶體中去。

2)線程B到主記憶體中去讀取線程A之前已更新過的共享變量。

《Java 并發程式設計的藝術》

從整體來看,這兩個步驟實質上是線程A在向線程B發送消息,而且這個通信過程必須要經過主記憶體。JMM通過控制主記憶體與每個線程的本地記憶體之間的互動,來為Java程式員提供記憶體可見性保證。

3.1.3 從源代碼到指令序列的重排序

在執行程式時,為了提高性能,編譯器和處理器常常會對指令做重排序。重排序分3種類型。

1)編譯器優化的重排序。編譯器在不改變單線程程式語義的前提下,可以重新安排語句的執行順序。

2)指令級并行的重排序。現代處理器采用了指令級并行技術(Instruction-LevelParallelism,ILP)來将多條指令重疊執行。如果不存在資料依賴性,處理器可以改變語句對應機器指令的執行順序。

3)記憶體系統的重排序。由于處理器使用緩存和讀/寫緩沖區,這使得加載和存儲操作看上去可能是在亂序執行。

《Java 并發程式設計的藝術》
上述的1屬于編譯器重排序,2和3屬于處理器重排序。這些重排序可能會導緻多線程程式出現記憶體可見性問題。對于編譯器,JMM的編譯器重排序規則會禁止特定類型的編譯器重排序(不是所有的編譯器重排序都要禁止)。對于處理器重排序,JMM的處理器重排序規則會要求Java編譯器在生成指令序列時,插入特定類型的記憶體屏障(Memory Barriers,Intel稱之為Memory Fence)指令,通過記憶體屏障指令來禁止特定類型的處理器重排序。

JMM屬于語言級的記憶體模型,它確定在不同的編譯器和不同的處理器平台之上,通過禁止特定類型的編譯器重排序和處理器重排序,為程式員提供一緻的記憶體可見性保證。

3.1.4 并發程式設計模型的分類

雖然寫緩沖區有這麼多好處,但每個處理器上的寫緩沖區,僅僅對它所在的處理器可見。

這個特性會對記憶體操作的執行順序産生重要的影響:處理器對記憶體的讀/寫操作的執行順序,不一定與記憶體實際發生的讀/寫操作順序一緻!

為了保證記憶體可見性,Java編譯器在生成指令序列的适當位置會插入記憶體屏障指令來禁止特定類型的處理器重排序。JMM把記憶體屏障指令分為4類:

《Java 并發程式設計的藝術》

3.1.5 happens-before簡介

在JMM中,如果一個操作執行的結果需要對另一個操作可見,那麼這兩個操作之間必須要存在happens-before關系。這裡提到的兩個操作既可以是在一個線程之内,也可以是在不同線程之間。

  • 程式順序規則:一個線程中的每個操作,happens-before于該線程中的任意後續操作。

    兩個操作之間具有happens-before關系,并不意味着前一個操作必須要在後一個操作之前執行!happens-before僅僅要求前一個操作(執行的結果)對後一個操作可見。

  • 螢幕鎖規則:對一個鎖的解鎖,happens-before于随後對這個鎖的加鎖。
  • volatile變量規則:對一個volatile域的寫,happens-before于任意後續對這個volatile域的讀。

3.2 重排序

重排序是指編譯器和處理器為了優化程式性能而對指令序列進行重新排序的一種手段。

3.2.1 資料依賴性

如果兩個操作通路同一個變量,且這兩個操作中有一個為寫操作,此時這兩個操作之間就存在資料依賴性。

資料依賴類型表:

《Java 并發程式設計的藝術》

編譯器和處理器可能會對操作做重排序。編譯器和處理器在重排序時,會遵守資料依賴性,編譯器和處理器不會改變存在資料依賴關系的兩個操作的執行順序。

3.2.2 as-if-serial語義

as-if-serial語義的意思是:不管怎麼重排序(編譯器和處理器為了提高并行度),(單線程)程式的執行結果不能被改變。編譯器、runtime和處理器都必須遵守as-if-serial語義。

double pi = 3.14; // A
double r = 1.0; // B
double area = pi * r * r; // C
           

依賴關系是:

《Java 并發程式設計的藝術》

A和B之間沒有資料依賴關系,編譯器和處理器可以重排序A和B之間的執行順序。

A——B——C

B——A——C

as-if-serial語義把單線程程式保護了起來,遵守as-if-serial語義的編譯器、runtime和處理器共同為編寫單線程程式的程式員建立了一個幻覺:單線程程式是按程式的順序來執行的。as-if-serial語義使單線程程式員無需擔心重排序會幹擾他們,也無需擔心記憶體可見性問題。

3.2.3 程式順序規則

根據happens-before的程式順序規則,上面計算圓的面積的示例代碼存在3個happens-

before關系。

1)A happens-before B。

2)B happens-before C。

3)A happens-before C。

這裡的第3個happens-before關系,是根據happens-before的傳遞性推導出來的。這裡A happens-before B,但實際執行時B卻可以排在A之前執行(看上面的重排序後的執行順序)。如果A happens-before B,JMM并不要求A一定要在B之前執行。JMM僅僅要求前一個操作(執行的結果)對後一個操作可見,且前一個操作按順序排在第二個操作之前。這裡操作A的執行結果不需要對操作B可見;而且重排序操作A和操作B後的執行結果,與操作A和操作B按happens-before順序執行的結果一緻。在這種情況下,JMM會認為這種重排序并不非法(notillegal),JMM允許這種重排序。

3.2.4 重排序對多線程的影響

class ReorderExample {
    int a = 0;
    boolean flag = false;

    public void writer() {
        a = 1; // 1
        flag = true; // 2
    }

    public void reader() {
        if (flag) { // 3
            int i = a * a; // 4
        }
    }
}
           

操作1和操作2做了重排序。程式執行時,線程A首先寫标記變量flag,随後線程B讀這個變量。由于條件判斷為真,線程B将讀取變量a。此時,變量a還沒有被線程A寫入,在這裡多線程程式的語義被重排序破壞了!

《Java 并發程式設計的藝術》

操作3和操作4存在控制依賴關系。當代碼中存在控制依賴性時,會影響指令序列執行的并行度。為此,編譯器和處理器會采用猜測(Speculation)執行來克服控制相關性對并行度的影響。以處理器的猜測執行為例,執行線程B的處理器可以提前讀取并計算a*a,然後把計算結果臨時儲存到一個名為重排序緩沖(Reorder Buffer,ROB)的硬體緩存中。當操作3的條件判斷為真時,就把該計算結果寫入變量i中。

猜測執行實質上對操作3和4做了重排序。重排序在這裡破壞了多線程程式的語義!

《Java 并發程式設計的藝術》

3.3 順序一緻性

3.3.1 資料競争與順序一緻性

當程式未正确同步時,就可能會存在資料競争。Java記憶體模型規範對資料競争的定義如下。

在一個線程中寫一個變量,

在另一個線程讀同一個變量,

而且寫和讀沒有通過同步來排序。

JMM對正确同步的多線程程式的記憶體一緻性做了如下保證。

如果程式是正确同步的,程式的執行将具有順序一緻性(Sequentially Consistent)——即程式的執行結果與該程式在順序一緻性記憶體模型中的執行結果相同。

3.3.2 順序一緻性記憶體模型

順序一緻性記憶體模型有兩大特性。

1)一個線程中的所有操作必須按照程式的順序來執行。

2)(不管程式是否同步)所有線程都隻能看到一個單一的操作執行順序。在順序一緻性記憶體模型中,每個操作都必須原子執行且立刻對所有線程可見。

假設有兩個線程A和B并發執行。其中A線程有3個操作,它們在程式中的順序是:

A1→A2→A3。B線程也有3個操作,它們在程式中的順序是:B1→B2→B3。

3.3.3 同步程式的順序一緻性效果

class SynchronizedExample {
    int a = 0;
    boolean flag = false;

    public synchronized void writer() { // 擷取鎖
        a = 1;
        flag = true;
    } // 釋放鎖

    public synchronized void reader() { // 擷取鎖
        if (flag) {
            int i = a;

        } // 釋放鎖
    }
}
           

兩個記憶體模型中的執行時序對比圖:

《Java 并發程式設計的藝術》

3.3.4 未同步程式的執行特性

對于未同步或未正确同步的多線程程式,JMM隻提供最小安全性:線程執行時讀取到的值,要麼是之前某個線程寫入的值,要麼是預設值(0,Null,False),JMM保證線程讀操作讀取到的值不會無中生有(Out Of Thin Air)的冒出來。

未同步程式在兩個模型中的執行特性有如下幾個差異。

1)順序一緻性模型保證單線程内的操作會按程式的順序執行,而JMM不保證單線程内的操作會按程式的順序執行(比如上面正确同步的多線程程式在臨界區内的重排序)。

2)順序一緻性模型保證所有線程隻能看到一緻的操作執行順序,而JMM不保證所有線程能看到一緻的操作執行順序。

3)JMM不保證對64位的long型和double型變量的寫操作具有原子性,而順序一緻性模型保證對所有的記憶體讀/寫操作都具有原子性。

注意,在JSR-133之前的舊記憶體模型中,一個64位long/double型變量的讀/寫操作可以被拆分為兩個32位的讀/寫操作來執行。從JSR-133記憶體模型開始(即從JDK5開始),僅僅隻允許把一個64位long/double型變量的寫操作拆分為兩個32位的寫操作來執行,任意的讀操作在JSR-133中都必須具有原子性(即任意讀操作必須要在單個讀事務中執行)。

3.4 volatile的記憶體語義

3.4.1 volatile的特性

class VolatileFeaturesExample {
    volatile long vl = 0L; // 使用volatile聲明64位的long型變量
    public void set(long l) {
        vl = l; // 單個volatile變量的寫
    }
    public void getAndIncrement () {
        vl++; // 複合(多個)volatile變量的讀/寫
    }
    public long get() {
        return vl; // 單個volatile變量的讀
    }
}
           

假設有多個線程分别調用上面程式的3個方法,這個程式在語義上和下面程式等價。

class VolatileFeaturesExample {
    long vl = 0L; // 64位的long型普通變量
    public synchronized void set(long l) { // 對單個的普通變量的寫用同一個鎖同步
        vl = l;
    }
    public void getAndIncrement () { // 普通方法調用
        long temp = get(); // 調用已同步的讀方法
        temp += 1L; // 普通寫操作
        set(temp); // 調用已同步的寫方法
    }
    public synchronized long get() { // 對單個的普通變量的讀用同一個鎖同步
        return vl;
    }
}
           

鎖的happens-before規則保證釋放鎖和擷取鎖的兩個線程之間的記憶體可見性,這意味着對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最後的寫入。

鎖的語義決定了臨界區代碼的執行具有原子性。這意味着,即使是64位的long型和double型變量,隻要它是volatile變量,對該變量的讀/寫就具有原子性。如果是多個volatile操作或類似于volatile++這種複合操作,這些操作整體上不具有原子性。

volatile變量自身具有下列特性。

  • 可見性。對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最後的寫 入。
  • 原子性:對任意單個volatile變量的讀/寫具有原子性,但類似于volatile++這種複合操作不 具有原子性。

3.4.2 volatile寫-讀建立的happens-before關系

上面講的是volatile變量自身的特性,對程式員來說,volatile對線程的記憶體可見性的影響比volatile自身的特性更為重要,也更需要我們去關注。

從JSR-133開始(即從JDK5開始),volatile變量的寫-讀可以實作線程之間的通信。

從記憶體語義的角度來說,volatile的寫-讀與鎖的釋放-擷取有相同的記憶體效果:volatile寫和鎖的釋放有相同的記憶體語義;volatile讀與鎖的擷取有相同的記憶體語義。
class VolatileExample {
    int a = 0;
    volatile boolean flag = false;

    public void writer() {
        a = 1;              // 1
        flag = true;        // 2
    }

    public void reader() {
        if (flag) {         // 3
            int i = a;      // 4

        }
    }
}
           

happens-before關系:

《Java 并發程式設計的藝術》

3.4.3 volatile寫-讀的記憶體語義

volatile寫的記憶體語義如下。

當寫一個volatile變量時,JMM會把該線程對應的本地記憶體中的共享變量值重新整理到主記憶體。

線程A執行volatile寫後,共享變量的狀态示意圖。

《Java 并發程式設計的藝術》

volatile讀的記憶體語義如下。

當讀一個volatile變量時,JMM會把該線程對應的本地記憶體置為無效。線程接下來将從主記憶體中讀取共享變量。

線程B讀同一個volatile變量後,共享變量的狀态示意圖。

《Java 并發程式設計的藝術》

下面對volatile寫和volatile讀的記憶體語義做個總結。

  • 線程A寫一個volatile變量,實質上是線程A向接下來将要讀這個volatile變量的某個線程發出了(其對共享變量所做修改的)消息。
  • 線程B讀一個volatile變量,實質上是線程B接收了之前某個線程發出的(在寫這個volatile 變量之前對共享變量所做修改的)消息。
  • 線程A寫一個volatile變量,随後線程B讀這個volatile變量,這個過程實質上是線程A通過 主記憶體向線程B發送消息。

3.4.4 volatile記憶體語義的實作

為了實作volatile的記憶體語義,編譯器在生成位元組碼時,會在指令序列中插入記憶體屏障來禁止特定類型的處理器重排序。

volatile重排序規則表

《Java 并發程式設計的藝術》
  • ·當第二個操作是volatile寫時,不管第一個操作是什麼,都不能重排序。這個規則確定 volatile寫之前的操作不會被編譯器重排序到volatile寫之後。
  • 當第一個操作是volatile讀時,不管第二個操作是什麼,都不能重排序。這個規則確定volatile讀之後的操作不會被編譯器重排序到volatile讀之前。
  • 當第一個操作是volatile寫,第二個操作是volatile讀時,不能重排序。

3.4.5 JSR-133為什麼要增強volatile的記憶體語義

在JSR-133之前的舊Java記憶體模型中,雖然不允許volatile變量之間重排序,但舊的Java記憶體模型允許volatile變量與普通變量重排序。在舊的記憶體模型中,VolatileExample示例程式可能被重排序成下列時序來執行,如圖3-23所示。

《Java 并發程式設計的藝術》

其結果就是:讀線程B執行4時,不一定能看到寫線程A在執行1時對共享變量的修改。

為了提供一種比鎖更輕量級的線程之間通信的機制,JSR-133專家組決定增強volatile的記憶體語義:嚴格限制編譯器和處理器對volatile變量與普通變量的重排序,確定volatile的寫-讀和鎖的釋放-擷取具有相同的記憶體語義。

3.5 鎖的記憶體語義

3.5.1 鎖的釋放-擷取建立的happens-before關系

鎖是Java并發程式設計中最重要的同步機制。鎖除了讓臨界區互斥執行外,還可以讓釋放鎖的線程向擷取同一個鎖的線程發送消息。

class MonitorExample {
    int a = 0;

    public synchronized void writer() {    // 1
        a++;          // 2
    }            // 3

    public synchronized void reader() {   // 4
        int i = a;        // 5
    }            // 6
}
           

假設線程A執行writer()方法,随後線程B執行reader()方法。根據happens-before規則,這個過程包含的happens-before關系可以分為3類。

《Java 并發程式設計的藝術》

在上圖中,2 happens-before 5。是以,線程A在釋放鎖之前所有可見的共享變量,線上程B擷取同一個鎖之後,将立刻變得對B線程可見。

3.5.2 鎖的釋放和擷取的記憶體語義

class MonitorExample {
    int a = 0;

    public synchronized void writer() {    // 1
        a++;          // 2
    }            // 3

    public synchronized void reader() {   // 4
        int i = a;        // 5
    }            // 6
}
           

當線程釋放鎖時,JMM會把該線程對應的本地記憶體中的共享變量重新整理到主記憶體中。以上面的MonitorExample程式為例,A線程釋放鎖後,共享資料的狀态示意圖如圖所示。

《Java 并發程式設計的藝術》

鎖擷取的狀态示意圖:

《Java 并發程式設計的藝術》

下面對鎖釋放和鎖擷取的記憶體語義做個總結。

  • 線程A釋放一個鎖,實質上是線程A向接下來将要擷取這個鎖的某個線程發出了(線程A對共享變量所做修改的)消息。
  • 線程B擷取一個鎖,實質上是線程B接收了之前某個線程發出的(在釋放這個鎖之前對共享變量所做修改的)消息。
  • 線程A釋放鎖,随後線程B擷取這個鎖,這個過程實質上是線程A通過主記憶體向線程B發送消息。

3.5.3 鎖記憶體語義的實作

class ReentrantLockExample {
    int a = 0;
    ReentrantLock lock = new ReentrantLock();

    public void writer() {
        lock.lock();        // 擷取鎖
        try {
            a++;
        } finally {
            lock.unlock();  // 釋放鎖
        }
    }

    public void reader() {
        lock.lock();        // 擷取鎖
        try {
            int i = a;
        } finally {
            lock.unlock();  // 釋放鎖
        }
    }
}
           

CAS是如何同時具有volatile讀和volatile寫的記憶體語義的。

lock字首提供的記憶體屏障效果。

1)確定對記憶體的讀-改-寫操作原子執行。在Pentium及Pentium之前的處理器中,帶有lock前 綴的指令在執行期間會鎖住總線,使得其他處理器暫時無法通過總線通路記憶體。很顯然,這會 帶來昂貴的開銷。從Pentium 4、Intel Xeon及P6處理器開始,Intel使用緩存鎖定(Cache Locking) 來保證指令執行的原子性。緩存鎖定将大大降低lock字首指令的執行開銷。

2)禁止該指令,與之前和之後的讀和寫指令重排序。

3)把寫緩沖區中的所有資料重新整理到記憶體中。

上面的第2點和第3點所具有的記憶體屏障效果,足以同時實作volatile讀和volatile寫的記憶體 語義。

現在對公平鎖和非公平鎖的記憶體語義做個總結。

·公平鎖和非公平鎖釋放時,最後都要寫一個volatile變量state。

·公平鎖擷取時,首先會去讀volatile變量。

·非公平鎖擷取時,首先會用CAS更新volatile變量,這個操作同時具有volatile讀和volatile寫的記憶體語義。

從本文對ReentrantLock的分析可以看出,鎖釋放-擷取的記憶體語義的實作至少有下面兩種方式。

1)利用volatile變量的寫-讀所具有的記憶體語義。

2)利用CAS所附帶的volatile讀和volatile寫的記憶體語義。

3.5.4 concurrent包的實作

如果我們仔細分析concurrent包的源代碼實作,會發現一個通用化的實作模式。

首先,聲明共享變量為volatile。

然後,使用CAS的原子條件更新來實作線程之間的同步。

同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的記憶體語義來實作線程之間的通信。

concurrent包的實作示意圖:

《Java 并發程式設計的藝術》

3.6 final域的記憶體語義

3.6.6 final語義在處理器中的實作

上面我們提到,寫final域的重排序規則會要求編譯器在final域的寫之後,構造函數return之前插入一個StoreStore障屏。讀final域的重排序規則要求編譯器在讀final域的操作前面插入一個LoadLoad屏障。

3.6.7 JSR-133為什麼要增強final的語義

在舊的Java記憶體模型中,一個最嚴重的缺陷就是線程可能看到final域的值會改變。

為了修補這個漏洞,JSR-133專家組增強了final的語義。通過為final域增加寫和讀重排序規則,可以為Java程式員提供初始化安全保證:隻要對象是正确構造的(被構造對象的引用在構造函數中沒有“逸出”),那麼不需要使用同步(指lock和volatile的使用)就可以保證任意線程都能看到這個final域在構造函數中被初始化之後的值。

3.7 happens-before

3.7.1 JMM的設計

double pi = 3.14;  // A
double r = 1.0;    // B
double area = pi * r * r; // C
           

上面計算圓的面積的示例代碼存在3個happens-before關系,如下。

·A happens-before B。

·B happens-before C。

·A happens-before C。

在3個happens-before關系中,2和3是必需的,但1是不必要的。是以,JMM把happens-before

要求禁止的重排序分為了下面兩類。

·會改變程式執行結果的重排序。

·不會改變程式執行結果的重排序。

JMM對這兩種不同性質的重排序,采取了不同的政策,如下。

·對于會改變程式執行結果的重排序,JMM要求編譯器和處理器必須禁止這種重排序。

·對于不會改變程式執行結果的重排序,JMM對編譯器和處理器不做要求(JMM允許這種

重排序)。

《Java 并發程式設計的藝術》

3.7.2 happens-before的定義

《JSR-133:Java Memory Model and Thread Specification》對happens-before關系的定義如下。

1)如果一個操作happens-before另一個操作,那麼第一個操作的執行結果将對第二個操作可見,而且第一個操作的執行順序排在第二個操作之前。

2)兩個操作之間存在happens-before關系,并不意味着Java平台的具體實作必須要按照happens-before關系指定的順序來執行。如果重排序之後的執行結果,與按happens-before關系來執行的結果一緻,那麼這種重排序并不非法(也就是說,JMM允許這種重排序)。

上面的1)是JMM對程式員的承諾。從程式員的角度來說,可以這樣了解happens-before關系:如果A happens-before B,那麼Java記憶體模型将向程式員保證——A操作的結果将對B可見,且A的執行順序排在B之前。注意,這隻是Java記憶體模型向程式員做出的保證!

上面的2)是JMM對編譯器和處理器重排序的限制原則。正如前面所言,JMM其實是在遵循一個基本原則:隻要不改變程式的執行結果(指的是單線程程式和正确同步的多線程程式),編譯器和處理器怎麼優化都行。JMM這麼做的原因是:程式員對于這兩個操作是否真的被重排序并不關心,程式員關心的是程式執行時的語義不能被改變(即執行結果不能被改變)。是以,happens-before關系本質上和as-if-serial語義是一回事。

3.7.3 happens-before規則

1)程式順序規則:一個線程中的每個操作,happens-before于該線程中的任意後續操作。

2)螢幕鎖規則:對一個鎖的解鎖,happens-before于随後對這個鎖的加鎖。

3)volatile變量規則:對一個volatile域的寫,happens-before于任意後續對這個volatile域的讀。

4)傳遞性:如果A happens-before B,且B happens-before C,那麼A happens-before C。

5)start()規則:如果線程A執行操作ThreadB.start()(啟動線程B),那麼A線程的

ThreadB.start()操作happens-before于線程B中的任意操作。

6)join()規則:如果線程A執行操作ThreadB.join()并成功傳回,那麼線程B中的任意操作happens-before于線程A從ThreadB.join()操作成功傳回。

3.8 雙重檢查鎖定與延遲初始化

public class UnsafeLazyInitialization {
    private static Instance instance;

    public static Instance getInstance() {
        if (instance == null) // 1:A線程執行
            instance = new Instance(); // 2:B線程執行
        return instance;
    }
}
           

下面是使用雙重檢查鎖定來實作延遲初始化的示例代碼

public class DoubleCheckedLocking {         // 1
    private static Instance instance;       // 2

    public static Instance getInstance() { // 3
        if (instance == null) {             // 4:第一次檢查
            synchronized (DoubleCheckedLocking.class) { // 5:加鎖
                if (instance == null)       // 6:第二次檢查
                    instance = new Instance(); // 7:問題的根源出在這裡
            }                               // 8
        }                                   // 9
        return instance;                    // 10
    }                                       // 11
}
           

線上程執行到第4行,代碼讀取到instance不為null時,instance引用的對象有可能還沒有完成初始化。

3.8.2 問題的根源

前面的雙重檢查鎖定示例代碼的第7行(instance=new Singleton();)建立了一個對象。這一行代碼可以分解為如下的3行僞代碼。

memory = allocate();  // 1:配置設定對象的記憶體空間
ctorInstance(memory); // 2:初始化對象
instance = memory;  // 3:設定instance指向剛配置設定的記憶體位址
           

上面3行僞代碼中的2和3之間,可能會被重排序。

memory = allocate();  // 1:配置設定對象的記憶體空間
instance = memory;  // 3:設定instance指向剛配置設定的記憶體位址
// 注意,此時對象還沒有被初始化!
ctorInstance(memory); // 2:初始化對象
           

多線程執行時序圖:

《Java 并發程式設計的藝術》

在知曉了問題發生的根源之後,我們可以想出兩個辦法來實作線程安全的延遲初始化。

1)不允許2和3重排序。

2)允許2和3重排序,但不允許其他線程“看到”這個重排序。

3.8.3 基于volatile的解決方案

public class SafeDoubleCheckedLocking {
    private volatile static Instance instance;

    public static Instance getInstance() {
        if (instance == null) {
            synchronized (SafeDoubleCheckedLocking.class) {
                if (instance == null)
                    instance = new Instance(); // instance為volatile,現在沒問題了
            }
        }
        return instance;
    }
}
           

當聲明對象的引用為volatile後,3.8.2節中的3行僞代碼中的2和3之間的重排序,在多線程

環境中将會被禁止。上面示例代碼将按如下的時序執行,如圖3-39所示。

《Java 并發程式設計的藝術》

這個方案本質上是通過禁止圖3-39中的2和3之間的重排序,來保證線程安全的延遲初始化。

3.8.4 基于類初始化的解決方案

JVM在類的初始化階段(即在Class被加載後,且被線程使用之前),會執行類的初始化。在執行類的初始化期間,JVM會去擷取一個鎖。這個鎖可以同步多個線程對同一個類的初始化。

基于這個特性,可以實作另一種線程安全的延遲初始化方案(這個方案被稱之為Initialization On Demand Holder idiom)。

public class InstanceFactory {
    private static class InstanceHolder {
        public static Instance instance = new Instance();
    }
    public static Instance getInstance() {
        return InstanceHolder.instance ;  // 這裡将導緻InstanceHolder類被初始化
    }
}
           
《Java 并發程式設計的藝術》

3.9 Java記憶體模型綜述

處理器記憶體模型的特征表

《Java 并發程式設計的藝術》

各種記憶體模型之間的關系

《Java 并發程式設計的藝術》

JMM的記憶體可見性保證

  • 單線程程式。單線程程式不會出現記憶體可見性問題。
  • 正确同步的多線程程式。正确同步的多線程程式的執行将具有順序一緻性(程式的執行結果與該程式在順序一緻性記憶體模型中的執行結果相同)。
  • 未同步/未正确同步的多線程程式。JMM為它們提供了最小安全性保障:線程執行時讀取到的值,要麼是之前某個線程寫入的值,要麼是預設值(0、null、false)。

Java并發程式設計基礎

4.1 線程簡介

4.1.1 什麼是線程

現代作業系統排程的最小單元是線程。也叫輕量級程序(Light

Weight Process),在一個程序裡可以建立多個線程,這些線程都擁有各自的計數器、堆棧和局部變量等屬性,并且能夠通路共享的記憶體變量。

4.1.4 線程的狀态

《Java 并發程式設計的藝術》

Java線程狀态變遷圖:

《Java 并發程式設計的藝術》

4.1.5 Daemon線程

Daemon線程是一種支援型線程,因為它主要被用作程式中背景排程以及支援性工作。這意味着,當一個Java虛拟機中不存在非Daemon線程的時候,Java虛拟機将會退出。可以通過調用Thread.setDaemon(true)将線程設定為Daemon線程。

public class Daemon {
    public static void main(String[] args) {
        Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
        thread.setDaemon(true);
        thread.start();
    }
    static class DaemonRunner implements Runnable {
        @Override
        public void run() {
            try {
                SleepUtils.second(10);
            } finally {
                System.out.println("DaemonThread finally run.");
            }
        }
    }
}
           

Daemon線程被用作完成支援性工作,但是在Java虛拟機退出時Daemon線程中的finally塊并不一定會執行。

4.2 啟動和終止線程

線程對象在初始化完成之後,調用start()方法就可以啟動這個線程。線程start()方法的含義是:目前線程(即parent線程)同步告知Java虛拟機,隻要線程規劃器空閑,應立即啟動調用start()方法的線程。

4.2.3 了解中斷

中斷可以了解為線程的一個辨別位屬性,它表示一個運作中的線程是否被其他線程進行了中斷操作。

中斷好比其他線程對該線程打了個招呼,其他線程通過調用該線程的interrupt()方法對其進行中斷操作。

線程通過檢查自身是否被中斷來進行響應,線程通過方法isInterrupted()來進行判斷是否被中斷,也可以調用靜态方法Thread.interrupted()對目前線程的中斷辨別位進行複位。如果該線程已經處于終結狀态,即使該線程被中斷過,在調用該線程對象的isInterrupted()時依舊會傳回false。

4.2.5 安全地終止線程

在4.2.3節中提到的中斷狀态是線程的一個辨別位,而中斷操作是一種簡便的線程間互動方式,而這種互動方式最适合用來取消或停止任務。除了中斷以外,還可以利用一個boolean變量來控制是否需要停止任務并終止該線程。

4.3 線程間通信

4.3.1 volatile和synchronized關鍵字

關鍵字volatile可以用來修飾字段(成員變量),就是告知程式任何對該變量的通路均需要從共享記憶體中擷取,而對它的改變必須同步重新整理回共享記憶體,它能保證所有線程對變量通路的可見性。

關鍵字synchronized可以修飾方法或者以同步塊的形式來進行使用,它主要確定多個線程在同一個時刻,隻能有一個線程處于方法或者同步塊中,它保證了線程對變量通路的可見性和排他性。

無論采用哪種方式,其本質是對一個對象的螢幕(monitor)進行擷取,而這個擷取過程是排他的,也就是同一時刻隻能有一個線程擷取到由synchronized所保護對象的螢幕。

圖4-2描述了對象、對象的螢幕、同步隊列和執行線程之間的關系。

《Java 并發程式設計的藝術》

4.3.2 等待/通知機制

等待/通知的相關方法

《Java 并發程式設計的藝術》

1)使用wait()、notify()和notifyAll()時需要先對調用對象加鎖。

2)調用wait()方法後,線程狀态由RUNNING變為WAITING,并将目前線程放置到對象的等待隊列。

3)notify()或notifyAll()方法調用後,等待線程依舊不會從wait()傳回,需要調用notify()或notifAll()的線程釋放鎖之後,等待線程才有機會從wait()傳回。

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        public void run() {
// 加鎖,擁有lock的Monitor
            synchronized (lock) {
// 當條件不滿足時,繼續wait,同時釋放了lock的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. wait @ "
                                + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
// 條件滿足時,完成工作
                System.out.println(Thread.currentThread() + " flag is false. running @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        public void run() {
// 加鎖,擁有lock的Monitor
            synchronized (lock) {
// 擷取lock的鎖,然後進行通知,通知時不會釋放lock的鎖,
// 直到目前線程釋放了lock後,WaitThread才能從wait方法中傳回
                System.out.println(Thread.currentThread() + " hold lock. notify @ " +
                        new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
// 再次加鎖
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. [email protected] "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}
           

4.3.5 Thread.join()的使用

如果一個線程A執行了thread.join()語句,其含義是:目前線程A等待thread線程終止之後才從thread.join()傳回。

public class JoinThread {
    public static void main(String[] args) throws Exception {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
// 每個線程擁有前一個線程的引用,需要等待前一個線程終止,才能從等待中傳回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }
    static class Domino implements Runnable {
        private Thread thread;
        public Domino(Thread thread) {
            this.thread = thread;
        }
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}
           

4.3.6 ThreadLocal的使用

ThreadLocal,即線程變量,是一個以ThreadLocal對象為鍵、任意對象為值的存儲結構。這個結構被附帶線上程上,也就是說一個線程可以根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值。

Java中的鎖

5.1 Lock接口

表5-1 Lock接口提供的synchronized關鍵字不具備的主要特性

《Java 并發程式設計的藝術》

5.2 隊列同步器

隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來建構鎖或者其他同步組 件的基礎架構,它使用了一個int成員變量表示同步狀态,通過内置的FIFO隊列來完成資源獲 取線程的排隊工作,并發包的作者(Doug Lea)期望它能夠成為實作大部分同步需求的基礎。

5.2.1 隊列同步器的接口與示例

同步器的設計是基于模闆方法模式的,也就是說,使用者需要繼承同步器并重寫指定的 方法,随後将同步器組合在自定義同步元件的實作中,并調用同步器提供的模闆方法,而這些 模闆方法将會調用使用者重寫的方法。

重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來通路或修改同步狀态。

·getState():擷取目前同步狀态。

·setState(int newState):設定目前同步狀态。

·compareAndSetState(int expect,int update):使用CAS設定目前狀态,該方法能夠保證狀态 設定的原子性。

5.3 重入鎖

重入鎖ReentrantLock,顧名思義,就是支援重進入的鎖,它表示該鎖能夠支援一個線程對資源的重複加鎖。除此之外,該鎖的還支援擷取鎖時的公平和非公平性選擇。

1.實作重進入

重進入是指任意線程在擷取到鎖之後能夠再次擷取該鎖而不會被鎖所阻塞,該特性的實 現需要解決以下兩個問題。

1)線程再次擷取鎖。鎖需要去識别擷取鎖的線程是否為目前占據鎖的線程,如果是,則再 次成功擷取。

2)鎖的最終釋放。線程重複n次擷取了鎖,随後在第n次釋放該鎖後,其他線程能夠擷取到 該鎖。鎖的最終釋放要求鎖對于擷取進行計數自增,計數表示目前鎖被重複擷取的次數,而鎖 被釋放時,計數自減,當計數等于0時表示鎖已經成功釋放。

ReentrantLock的nonfairTryAcquire方法

inal boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
        }
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
        return false;
    }
           

2.公平與非公平擷取鎖的差別

公平性與否是針對擷取鎖而言的,如果一個鎖是公平的,那麼鎖的擷取順序就應該符合請求的絕對時間順序,也就是FIFO。

5.4 讀寫鎖

之前提到鎖(如Mutex和ReentrantLock)基本都是排他鎖,這些鎖在同一時刻隻允許一個線 程進行通路,而讀寫鎖在同一時刻可以允許多個讀線程通路,但是在寫線程通路時,所有的讀 線程和其他寫線程均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫 鎖,使得并發性相比一般的排他鎖有了很大提升。

Java并發包提供讀寫鎖的實作是 ReentrantReadWriteLock。

ReentrantReadWriteLock的特性

《Java 并發程式設計的藝術》

5.4.1 讀寫鎖的接口與示例

public class Cache {
    static Map<String, Object> map = new HashMap<String, Object>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();

    // 擷取一個key對應的value
    public static final Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }

    // 設定key對應的value,并傳回舊的value
    public static final Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }


    // 清空所有的内容
    public static final void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

           

5.4.2 讀寫鎖的實作分析

接下來分析ReentrantReadWriteLock的實作,主要包括:讀寫狀态的設計、寫鎖的擷取與釋 放、讀鎖的擷取與釋放以及鎖降級(以下沒有特别說明讀寫鎖均可認為是 ReentrantReadWriteLock)。

5.6 Condition接口

任意一個Java對象,都擁有一組螢幕方法(定義在java.lang.Object上),主要包括wait()、 wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關鍵字配合,可以 實作等待/通知模式。Condition接口也提供了類似Object的螢幕方法,與Lock配合可以實作等 待/通知模式,但是這兩者在使用方式以及功能特性上還是有差别的。

5.6.1 Condition接口與示例

Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal() throws InterruptedException {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
           

5.6.2 Condition的實作分析

ConditionObject是同步器AbstractQueuedSynchronizer的内部類,因為Condition的操作需要 擷取相關聯的鎖,是以作為同步器的内部類也較為合理。每個Condition對象都包含着一個隊 列(以下稱為等待隊列),該隊列是Condition對象實作等待/通知功能的關鍵。

下面将分析Condition的實作,主要包括:等待隊列、等待和通知,下面提到的Condition如 果不加說明均指的是ConditionObject。

Java并發容器和架構

6.1 ConcurrentHashMap的實作原理與使用

在并發程式設計中使用HashMap可能導緻程式死循環。而使用線程安全的HashTable效率又非常低下,基于以上兩個原因,便有了ConcurrentHashMap的登場機會。

ConcurrentHashMap的鎖分段技術可有效提升并發通路率

HashTable容器在競争激烈的并發環境下表現出效率低下的原因是所有通路HashTable的 線程都必須競争同一把鎖,假如容器裡有多把鎖,每一把鎖用于鎖容器其中一部分資料,那麼 當多線程通路容器裡不同資料段的資料時,線程間就不會存在鎖競争,進而可以有效提高并 發通路效率,這就是ConcurrentHashMap所使用的鎖分段技術。首先将資料分成一段一段地存 儲,然後給每一段資料配一把鎖,當一個線程占用鎖通路其中一個段資料的時候,其他段的數 據也能被其他線程通路。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一個基于連結節點的無界線程安全隊列,它采用先進先出的規 則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部;當我們擷取一個元 素時,它會傳回隊列頭部的元素。它采用了“wait-free”算法(即CAS算法)來實作,該算法在 Michael&Scott算法上進行了一些修改。

6.3 Java中的阻塞隊列

6.3.2 Java裡的阻塞隊列

JDK 7提供了7個阻塞隊列,如下。

·ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。 ·LinkedBlockingQueue:一個由連結清單結構組成的有界阻塞隊列。 ·PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。

·DelayQueue:一個使用優先級隊列實作的無界阻塞隊列。 ·SynchronousQueue:一個不存儲元素的阻塞隊列。

·LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列。 ·LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列。

6.3.3 阻塞隊列的實作原理

6.4 Fork/Join架構

6.4.1 什麼是Fork/Join架構

Fork/Join架構是Java 7提供的一個用于并行執行任務的架構,是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。

《Java 并發程式設計的藝術》

6.4.2 工作竊取算法

工作竊取(work-stealing)算法是指某個線程從其他隊列裡竊取任務來執行。

6.4.4 使用Fork/Join架構

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;  // 門檻值 private int start;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
// 如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
// 如果任務大于門檻值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end); // 執行子任務
            leftTask.fork();
            rightTask.fork();
// 等待子任務執行完,并得到其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
// 合并子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一個計算任務,負責計算1+2+3+4
        CountTask task = new CountTask(1, 100);
// 執行一個任務
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
    }
}
           

Java中的13個原子操作類

7.1 原子更新基本類型類

使用原子的方式更新基本類型,Atomic包提供了以下3個類。

·AtomicBoolean:原子更新布爾類型。

·AtomicInteger:原子更新整型。

·AtomicLong:原子更新長整型。

public class AtomicIntegerTest {
    static AtomicInteger ai = new AtomicInteger(1);

    public static void main(String[] args) {
        System.out.println(ai.getAndIncrement());
        System.out.println(ai.get());
    }
}

當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法 會阻塞目前線程,直到N變成零。
           

Java中的并發工具類

8.1 等待多線程完成的CountDownLatch

CountDownLatch允許一個或多個線程等待其他線程完成操作。

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        c.await();
        System.out.println("3");
    }
}
           

8.2 同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一 組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會 開門,所有被屏障攔截的線程才會繼續運作。

CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數 量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞。

public class CyclicBarrierTest {
    static
    CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        c.await();
                    } catch (Exception e) {
                    }
                    System.out.println(1);
                }
            }).start();
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
}
           

8.2.3 CyclicBarrier和CountDownLatch的差別

CountDownLatch的計數器隻能使用一次,而CyclicBarrier的計數器可以使用reset()方法重 置。是以CyclicBarrier能處理更為複雜的業務場景。例如,如果計算發生錯誤,可以重置計數 器,并讓線程重新執行一次。

8.3 控制并發線程數的Semaphore

Semaphore(信号量)是用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。

Semaphore可以用于做流量控制,特别是公用資源有限的應用場景,比如資料庫連接配接。假 如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,我們可以啟動幾十個線程 并發地讀取,但是如果讀到記憶體後,還需要存儲到資料庫中,而資料庫的連接配接數隻有10個,這 時我們必須控制隻有10個線程同時擷取資料庫連接配接儲存資料,否則會報錯無法擷取資料庫連 接。這個時候,就可以使用Semaphore來做流量控制。

8.4 線程間交換資料的Exchanger

Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger用于進行線程間的資料交 換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的資料。

public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "銀行流水A";    // A錄入銀行流水資料
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "銀行流水B";    // B錄入銀行流水資料
                    String A = exgr.exchange("B");
                    System.out.println("A和B資料是否一緻:" + A.equals(B) + ",A錄入的是:" + A + ",B錄入是:" + B);

                } catch (InterruptedException e) {
                    threadPool.shutdown();
                }
            }
        });
    }
}
           

Java中的線程池

Java中的線程池是運用場景最多的并發架構,幾乎所有需要異步或并發執行任務的程式都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來3個好處。

第一:降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到線程建立就能立即執行。

第三:提高線程的可管理性。線程是稀缺資源,如果無限制地建立,不僅會消耗系統資源, 還會降低系統的穩定性,使用線程池可以進行統一配置設定、調優和監控。但是,要做到合理利用 線程池,必須對其實作原理了如指掌。

9.1 線程池的實作原理

線程池的主要處理流程:

《Java 并發程式設計的藝術》

9.2 線程池的使用

9.2.1 線程池的建立

new ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
           

建立一個線程池時需要輸入幾個參數,如下。

1)corePoolSize(線程池的基本大小):當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會建立線程,等到需要執行的任 務數大于線程池基本大小時就不再建立。如果調用了線程池的prestartAllCoreThreads()方法, 線程池會提前建立并啟動所有基本線程。

2)runnableTaskQueue(任務隊列):用于儲存等待執行的任務的阻塞隊列。可以選擇以下幾 個阻塞隊列。

·ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原 則對元素進行排序。

·LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通 常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool()使用了這個隊列。

·SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用 移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于Linked-BlockingQueue,靜态工 廠方法Executors.newCachedThreadPool使用了這個隊列。

·PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。

3)maximumPoolSize(線程池最大數量):線程池允許建立的最大線程數。如果隊列滿了,并 且已建立的線程數小于最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,如 果使用了無界的任務隊列這個參數就沒什麼效果。

4)ThreadFactory:用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的線程設 置更有意義的名字。使用開源架構guava提供的ThreadFactoryBuilder可以快速給線程池裡的線 程設定有意義的名字,代碼如下。

5)RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處于飽和狀 态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是AbortPolicy,表示無法 處理新任務時抛出異常。在JDK 1.5中Java線程池架構提供了以下4種政策。

·AbortPolicy:直接抛出異常。

·CallerRunsPolicy:隻用調用者所線上程來運作任務。

·DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。

·DiscardPolicy:不處理,丢棄掉。

當然,也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如記錄 日志或持久化存儲不能處理的任務。

6)keepAliveTime(線程活動保持時間):線程池的工作線程空閑後,保持存活的時間。是以, 如果任務很多,并且每個任務執行的時間比較短,可以調大時間,提高線程的使用率。

7)TimeUnit(線程活動保持時間的機關):可選的機關有天(DAYS)、小時(HOURS)、分鐘 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

9.2.2 向線程池送出任務

可以使用兩個方法向線程池送出任務,分别為execute()和submit()方法。

execute()方法用于送出不需要傳回值的任務,是以無法判斷任務是否被線程池執行成功。

submit()方法用于送出需要傳回值的任務。線程池會傳回一個future類型的對象,通過這個 future對象可以判斷任務是否執行成功。

9.2.3 關閉線程池

可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。

它們的原理是周遊線 程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,是以無法響應中斷的任務 可能永遠無法終止。但是它們存在一定的差別,shutdownNow首先将線程池的狀态設定成 STOP,然後嘗試停止所有的正在執行或暫停任務的線程,并傳回等待執行任務的清單,而 shutdown隻是将線程池的狀态設定成SHUTDOWN狀态,然後中斷所有沒有正在執行任務的線程。

9.2.4 合理地配置線程池

要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。

·任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

·任務的優先級:高、中和低。

·任務的執行時間:長、中和短。

·任務的依賴性:是否依賴其他系統資源,如資料庫連接配接。

9.2.5 線程池的監控

可以通過線程池提供的參數進行監控,在監控線程池的 時候可以使用以下屬性。

·taskCount:線程池需要執行的任務數量。

·completedTaskCount:線程池在運作過程中已完成的任務數量,小于或等于taskCount。

·largestPoolSize:線程池裡曾經建立過的最大線程數量。通過這個資料可以知道線程池是 否曾經滿過。如該數值等于線程池的最大大小,則表示線程池曾經滿過。

·getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池裡的線程不會自動銷 毀,是以這個大小隻增不減。

·getActiveCount:擷取活動的線程數。

Executor架構

從JDK 5開始,把工作單元與執行機制分離開 來。工作單元包括Runnable和Callable,而執行機制由Executor架構提供。

10.1 Executor架構簡介

Executor架構主要由3大部分組成如下。

·任務。包括被執行任務需要實作的接口:Runnable接口或Callable接口。

·任務的執行。包括任務執行機制的核心接口Executor,以及繼承自Executor的 ExecutorService接口。Executor架構有兩個關鍵類實作了ExecutorService接口 (ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

·異步計算的結果。包括接口Future和實作Future接口的FutureTask類。

10.2 ThreadPoolExecutor詳解

FixedThreadPool被稱為可重用固定線程數的線程池。下面是FixedThreadPool的源代碼實作。

public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

           

SingleThreadExecutor是使用單個worker線程的Executor。下面是SingleThreadExecutor的源代碼實作。

public static ExecutorService newSingleThreadExecutor() {
       return new FinalizableDelegatedExecutorService
       (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
  }

           

CachedThreadPool是一個會根據需要建立新線程的線程池。下面是建立CachedThread-Pool的源代碼。

public static ExecutorService newCachedThreadPool() { 
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
           

10.4 FutureTask詳解

Future接口和實作Future接口的FutureTask類,代表異步計算的結果。

《Java 并發程式設計的藝術》

Java并發程式設計實踐