天天看點

并發程式設計:線程的共享與協作

之前的博文,《Android程序和線程詳解》裡有一些關于線程的基礎知識。

1)什麼是并發?

指多個線程操作同一個資源,不是同時操作,而是交替操作,隻不過因為速度太快,看起來是同時執行(單核 /多核CPU均是如此,因為通常任務的數量遠遠多于CPU的核數,是以任務最終也是交替執行的)。

通過時間片輪轉機制RR(CPU時間片輪轉機制,cpu給每個程序配置設定一個“時間段”,這個時間就是這個程序允許運作的時間,如果當這個程序的時間片段結束/阻塞,作業系統就會把配置設定給這個程序的cpu剝奪,配置設定給另外一個程序。)排程實作并發。

好處:高并發程式設計可以充分利用cpu的資源;可以充分地加快使用者的響應時間;可以使我們的代碼子產品化、異步化。

壞處:線程之間會共享程序的資源,既然說是共享資源,就有可能存在沖突;在高并發程式設計中如果控制不好,還有可能會造成線程的死鎖(無限等待,唯有強制結束程序)。

2)建立新線程

Java程式中預設有兩個線程——main線程和GC線程;Android中預設有一個主線程,除此之外的線程都需要建立。

① 類Thread

② 接口Runnable(推薦使用這種,因為接口可以多實作)

③ 接口Callable:與Runnable的差別是,實作Runnabble接口裡的run方法是沒有傳回值的,而Callable是允許有傳回值的。

public class Test {
    private static class RunnableThread implements Runnable {
        @Override
        public void run() {
            System.out.println("實作Runnable方式建立線程");
            System.out.println("thread run...");
            System.out.println("thread end.");
        }
    }

    private static class CallableThread implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "this is return result";
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {        
        System.out.println("main start...");
        RunnableThread runnableThread = new RunnableThread();
        //要啟動實作Runnablede的線程的話還需要把runnable的執行個體傳到Thread裡
        new Thread(runnableThread).start();

        CallableThread callableThread = new CallableThread();
        //由于new Thread隻接受Runnable類型的構造參數,是以要先把Callable包裝一下
        FutureTask<String> futureTask = new FutureTask<>(callableThread);
        new Thread(futureTask).start();
        //擷取傳回值,get方法是阻塞的
        System.out.println(futureTask.get());
        System.out.println("main end...");
    }
}
           

思考一個問題,上面的例子中使用線程執行的列印語句,和直接在

main()

方法執行的列印語句有差別嗎?

—— 除了可以肯定,

main start

會先列印外,

main end

列印在

thread run

之前、

thread end

之後或者之間,都無法确定。因為從

t

線程開始運作以後,兩個線程就開始同時運作了,并且由作業系統排程,程式本身無法确定線程的排程順序。要模拟并發執行的效果,我們可以線上程中調用

Thread.sleep()

,強迫目前線程暫停一段時間:

private static class RunnableThread implements Runnable {
        @Override
        public void run() {
            System.out.println("實作Runnable方式建立線程");
            try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {}
                System.out.println("thread end.");
            }
        }
    }
           

線程的優先級,可以對線程設定優先級,設定優先級的方法是:

Thread.setPriority(int n) // 1~10, 預設值5
           

優先級高的線程被作業系統排程的優先級較高,作業系統對高優先級線程可能排程更頻繁,但我們決不能通過設定優先級來確定高優先級的線程一定會先執行。

3)線程的狀态

并發程式設計:線程的共享與協作

線程共包括以下 5 種狀态:

1. 建立狀态(New): 線程對象被建立後,就進入了建立狀态。例如,Thread thread = new Thread()。

2. 就緒狀态(Runnable): 也被稱為“可執行狀态”。線程對象被建立後,其它線程調用了該對象的start()方法,進而來啟動該線程。例如,thread.start()。處于就緒狀态的線程,随時可能被CPU排程執行。

3. 運作狀态(Running): 線程擷取CPU權限進行執行。需要注意的是,線程隻能從就緒狀态進入到運作狀态。

4. 阻塞狀态(Blocked): 阻塞狀态是線程因為某種原因放棄CPU使用權,暫時停止運作。直到線程進入就緒狀态,才有機會轉到運作狀态。阻塞的情況分三種:

等待阻塞 -- 通過調用線程的wait()方法,讓線程等待某工作的完成。

同步阻塞 -- 線程在擷取synchronized同步鎖失敗(因為鎖被其它線程所占用),它會進入同步阻塞狀态。

其他阻塞 -- 通過調用線程的sleep()或join()或發出了I/O請求時,線程會進入到阻塞狀态。當sleep()狀态逾時、join()等待線程終止或者逾時、或者I/O處理完畢時,線程重新轉入就緒狀态。

5. 死亡狀态(Dead): 線程執行完了或者因異常退出了run()方法,該線程結束生命周期。

比如,一個線程需等待另一個線程直到其運作結束。如下

main

線程在啟動

t

線程後,可以通過

t.join()

等待

t

線程結束後再繼續運作:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            System.out.println("hello");
        });
        System.out.println("start");
        t.start();
        t.join();
        System.out.println("end");
    }
}
           

main

線程對線程對象

t

調用

join()

方法時,主線程将等待變量

t

表示的線程運作結束,即

join

就是指等待該線程結束,然後才繼續往下執行自身線程。是以,上述代碼列印順序可以肯定是

main

線程先列印

start

t

線程再列印

hello

main

線程最後再列印

end

如果

t

線程已經結束,對執行個體

t

調用

join()

會立刻傳回。此外,

join(long)

的重載方法也可以指定一個等待時間,超過等待時間後就不再繼續等待。

4)中斷線程

方法執行完自動終止 / 抛出異常,又沒有捕獲異常,此時線程自己中斷;如果需要中斷線程,有兩種處理方法:

第一種中斷線程的方法,調用Thread類内的方法,如下:

public void interrupt() {
        throw new RuntimeException("Stub!");
    }

    public static native boolean interrupted();

    public native boolean isInterrupted();
           

① interrupt():作用終止一個線程,但并不是強行關閉一個線程(java的線程是協作式的,不是強迫式的,調用一個線程的interrupt()方法并不會強制關閉一個線程,它就好比其他線程對要關閉的線程打了一聲招呼,告訴被關閉線程它要中斷了,但被關閉線程什麼時候關閉完全由它自身做主),線程調用該方法并不會立刻終止。它的目的是把線程中的“中斷标志位”置為true

② isInterrupted(),判定目前線程是否處于中斷狀态。通過這個方法判斷中斷标志位是否為true。

③ 靜态方法interrupted(), 也是判斷目前線程是否處于中斷狀态。當調用此方法時,它會把中斷标志位改為false。

需要注意的是,當線程中調用了wait(),join(),sleep()方法時,方法會抛出InterruptedException,這個時候線程的中斷标志會被複位成為false,是以這個時候我們應該在catch裡面再調用一次interrupt(),再次中斷一次。

public class HasInterrputException {
    private static final String TAG = "HasInterrputException";
    private static class UseThread extends Thread {
        public UseThread(String name) {
            super(name);
        }
        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            while (!isInterrupted()) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Log.d(TAG, "run 00: " + threadName + " catch interrput flag is " + isInterrupted());
                    interrupt();
                    e.printStackTrace();
                }
                Log.d(TAG, "run 11: " + threadName);
                System.out.println(threadName);
            }
            Log.d(TAG, "run 22: " + threadName + " interrput flag is " + isInterrupted());
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread endThread = new UseThread("HasInterrputEx");
        endThread.start();
        Thread.sleep(20);
        endThread.interrupt();

        UseRunnable useRunnable = new UseRunnable();
        Thread endThread = new Thread(useRunnable, "endThread");
        endThread.start();
        Thread.sleep(1);
        endThread.interrupt();
    }

    private static class UseRunnable implements Runnable {
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                Log.d(TAG, "run 33: " + "Thread " + Thread.currentThread().getName() + " is running.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Log.d(TAG, "run 44: " + e.getMessage());
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
           

第二種中斷線程的方法,設定标記位。我們通常會用一個

running

标志位來辨別線程是否應該繼續運作,在外部線程中,通過把

HelloThread.running

置為

false

,就可以讓線程結束:

public class Main {
    public static void main(String[] args)  throws InterruptedException {
        HelloThread t = new HelloThread();
        t.start();
        Thread.sleep(1);
        t.running = false; // 标志位置為false
    }
}

class HelloThread extends Thread {
    public volatile boolean running = true;
    public void run() {
        int n = 0;
        while (running) {
            n ++;
            System.out.println(n + " hello!");
        }
        System.out.println("end!");
    }
}
           

4.1)volatile

關鍵字

注意到

HelloThread

的标志位

boolean running

是一個線程間共享的變量。線程間共享變量需要使用

volatile

關鍵字标記,確定每個線程都能讀取到更新後的變量值。

為什麼要對線程間共享的變量用關鍵字

volatile

聲明?這涉及到Java的記憶體模型。在Java虛拟機中,變量的值儲存在主記憶體中,但是,當線程通路變量時,它會先擷取一個副本,并儲存在自己的工作記憶體中。如果線程修改了變量的值,虛拟機會在某個時刻把修改後的值回寫到主記憶體,但是,這個時間是不确定的!

這會導緻如果一個線程更新了某個變量,另一個線程讀取的值可能還是更新前的。例如,主記憶體的變量

a = true

,線程1執行

a = false

時,它在此刻僅僅是把變量

a

的副本變成了

false

,主記憶體的變量

a

還是

true

,在JVM把修改後的

a

回寫到主記憶體之前,其他線程讀取到的

a

的值仍然是

true

,這就造成了多線程之間共享的變量不一緻。

是以,

volatile

關鍵字的目的是告訴虛拟機:

  • 每次通路變量時,總是擷取主記憶體的最新值;
  • 每次修改變量後,立刻回寫到主記憶體。

volatile

關鍵字解決的是可見性問題:當一個線程修改了某個共享變量的值,其他線程能夠立刻看到修改後的值(适用于一個線程寫,多個線程讀這種場景)。

如果我們去掉

volatile

關鍵字,運作上述程式,發現效果和帶

volatile

差不多,這是因為在x86的架構下,JVM回寫主記憶體的速度非常快,但是,換成ARM的架構,就會有顯著的延遲。

5)守護線程

Java程式入口就是由JVM啟動

main

線程,

main

線程又可以啟動其他線程。當所有線程都運作結束時,JVM退出,程序結束。

如果有一個線程沒有退出,JVM程序就不會退出。是以,必須保證所有線程都能及時結束。

守護線程通過調用接口實作設定,

setDaemon(boolean on)

,參數boolean類型,true則是守護線程,false則不是守護線程;

public static void main(String[] arg0) {
        System.out.println("main start=====");
        Thread thread1 = new Thread("守護線程"){
            @Override
            public void run() {
                int i = 0;
                while (i <= 4){
                    i++;
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+":"+i);
                }
                super.run();
            }
        };


        Thread thread2 = new Thread("使用者線程"){
            @Override
            public void run() {
                int i = 0;
                while (i <= 2){
                    i++;
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+":"+i);
                }
                super.run();
            }
        };
        
        //setDaemon, 不設定則預設false
        thread1.setDaemon(true);//設定thread1為守護線程
        thread2.setDaemon(false);//設定thread2為普通線程

        thread1.start();
        thread2.start();

        System.out.println("main end==");
    }
           
main start=====
main end==
使用者線程:1
守護線程:1
守護線程:2
使用者線程:2
守護線程:3
使用者線程:3
           
  1. 主線程,main執行結束後,普通線程可以繼續執行直至執行完畢;
  2. 使用者線程執行完畢後,守護線程立刻結束;

守護線程是指為其他線程服務的線程。在JVM中,所有非守護線程都執行完畢後,無論有沒有守護線程,虛拟機都會自動退出。是以,JVM退出時,不必關心守護線程是否已結束。在守護線程中,編寫代碼要注意:守護線程不能持有任何需要關閉的資源,例如打開檔案等,因為虛拟機退出時,守護線程沒有任何機會來關閉檔案,這會導緻資料丢失。

6)線程同步-

synchronized

當多個線程同時運作時,線程的排程由作業系統決定,程式本身無法決定。是以,任何一個線程都有可能在任何指令處被作業系統暫停,然後在某個時間段後繼續執行。

這個時候,有個單線程模型下不存在的問題就來了:如果多個線程同時讀寫共享變量,會出現資料不一緻的問題。

public class Main {
    public static void main(String[] args) throws Exception {
        var add = new AddThread();
        var dec = new DecThread();
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count);
    }
}

class Counter {
    public static int count = 0;
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) { Counter.count += 1; }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) { Counter.count -= 1; }
    }
}
           

兩個線程同時對一個

int

變量進行操作,一個加10000次,一個減10000次,最後結果應該是0,但是,每次運作,結果實際上都是不一樣的。

這是因為對變量進行讀取和寫入時,結果要正确,必須保證是原子操作。原子操作是指不能被中斷的一個或一系列操作。

多線程模型下,要保證邏輯正确,對共享變量進行讀寫時,必須保證一組指令以原子方式執行:即某一個線程執行時,其他線程必須等待:

┌───────┐     ┌───────┐
│Thread1│     │Thread2│
└───┬───┘     └───┬───┘
    │             │
    │-- lock --   │
    │ILOAD (100)  │
    │IADD         │
    │ISTORE (101) │
    │-- unlock -- │
    │             │-- lock --
    │             │ILOAD (101)
    │             │IADD
    │             │ISTORE (102)
    │             │-- unlock --
    ▼             ▼
           

通過加鎖和解鎖的操作,就能保證一份連續指令總是在一個線程執行期間,不會有其他線程會進入此指令區間。即使在執行期線程被作業系統中斷執行,其他線程也會因為無法獲得鎖導緻無法進入此指令區間。隻有執行線程将鎖釋放後,其他線程才有機會獲得鎖并執行。這種加鎖和解鎖之間的代碼塊我們稱之為臨界區(Critical Section),任何時候臨界區最多隻有一個線程能執行。

可見,保證一段代碼的原子性就是通過加鎖和解鎖實作的。Java程式使用

synchronized

關鍵字對一個對象進行加鎖,

synchronized

保證了代碼塊在任意時刻最多隻有一個線程能執行。我們把上面的代碼用

synchronized

改寫如下:

public class Main {
    public static void main(String[] args) throws Exception {
        var add = new AddThread();
        var dec = new DecThread();
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count);
    }
}

class Counter {
    public static final Object lock = new Object();
    public static int count = 0;
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count += 1;
            }
        }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count -= 1;
            }
        }
    }
}
           
synchronized(Counter.lock) { // 擷取鎖
    ...
} // 釋放鎖
           

它表示用

Counter.lock

執行個體作為鎖,兩個線程在執行各自的

synchronized(Counter.lock) { ... }

代碼塊時,必須先獲得鎖,才能進入代碼塊進行。執行結束後,在

synchronized

語句塊結束會自動釋放鎖。這樣一來,對

Counter.count

變量進行讀寫就不可能同時進行。上述代碼無論運作多少次,最終結果都是0。

使用

synchronized

解決了多線程同步通路共享變量的正确性問題。但是,它的缺點是帶來了性能下降。因為

synchronized

代碼塊無法并發執行。此外,加鎖和解鎖需要消耗一定的時間,是以,

synchronized

會降低程式的執行效率。

我們來概括一下如何使用

synchronized

  1. 找出修改共享變量的線程代碼塊;
  2. 選擇一個共享執行個體作為鎖;
  3. 使用

    synchronized(lockObject) { ... }

在使用

synchronized

的時候,不必擔心抛出異常。因為無論是否有異常,都會在

synchronized

結束處正确釋放鎖。

小結

多線程同時讀寫共享變量時,會造成邏輯錯誤,是以需要通過

synchronized

同步;

同步的本質就是給指定對象加鎖,加鎖後才能繼續執行後續代碼;

注意加鎖對象必須是同一個執行個體;

7)同步方法

Java程式依靠

synchronized

對線程進行同步,使用

synchronized

的時候,鎖住的是哪個對象非常重要。

讓線程自己選擇鎖對象往往會使得代碼邏輯混亂,也不利于封裝。更好的方法是把

synchronized

邏輯封裝起來。例如,我們編寫一個計數器如下:

public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public void dec(int n) {
        synchronized(this) {
            count -= n;
        }
    }

    public int get() {
        return count;
    }
}
           

這樣一來,線程調用

add()

dec()

方法時,它不必關心同步邏輯,因為

synchronized

代碼塊在

add()

dec()

方法内部。并且,我們注意到,

synchronized

鎖住的對象是

this

,即目前執行個體,這又使得建立多個

Counter

執行個體的時候,它們之間互不影響,可以并發執行:

var c1 = Counter();
var c2 = Counter();

// 對c1進行操作的線程:
new Thread(() -> {
    c1.add();
}).start();

// 對c2進行操作的線程:
new Thread(() -> {
    c2.add();
}).start();
           

現在,對于

Counter

類,多線程可以正确調用。

如果一個類被設計為允許多線程正确通路,我們就說這個類就是“線程安全”的(thread-safe),上面的

Counter

類就是線程安全的。Java标準庫的

java.lang.StringBuffer

也是線程安全的。

還有一些不變類,例如

String

Integer

LocalDate

,它們的所有成員變量都是

final

,多線程同時通路時隻能讀不能寫,這些不變類也是線程安全的。

最後,類似

Math

這些隻提供靜态方法,沒有成員變量的類,也是線程安全的。

除了上述幾種少數情況,大部分類,例如

ArrayList

,都是非線程安全的類,我們不能在多線程中修改它們。但是,如果所有線程都隻讀取,不寫入,那麼

ArrayList

是可以安全地線上程間共享的。

 沒有特殊說明時,一個類預設是非線程安全的。

當我們鎖住的是

this

執行個體時,實際上可以用

synchronized

修飾這個方法。下面兩種寫法是等價的:

public void add(int n) {
    synchronized(this) { // 鎖住this
        count += n;
    } // 解鎖
}

public synchronized void add(int n) { // 鎖住this
    count += n;
} // 解鎖
           

是以,用

synchronized

修飾的方法就是同步方法,它表示整個方法都必須用

this

執行個體加鎖。

我們再思考一下,如果對一個靜态方法添加

synchronized

修飾符,它鎖住的是哪個對象?

public synchronized static void test(int n) {
    ...
}

public class Counter {
    public static void test(int n) {
        synchronized(Counter.class) {
            ...
        }
    }
}
           

對于

static

方法,是沒有

this

執行個體的,因為

static

方法是針對類而不是執行個體。但是我們注意到任何一個類都有一個由JVM自動建立的

Class

執行個體,是以,對

static

方法添加

synchronized

,鎖住的是該類的

Class

執行個體。上面兩種寫法也是等效的。

7)死鎖

JVM允許同一個線程重複擷取同一個鎖,這種能被同一個線程反複擷取的鎖,就叫做可重入鎖。一個線程可以擷取一個鎖後,再繼續擷取另一個鎖。例如:

public void add(int m) {
    synchronized(lockA) { // 獲得lockA的鎖
        this.value += m;
        synchronized(lockB) { // 獲得lockB的鎖
            this.another += m;
        } // 釋放lockB的鎖
    } // 釋放lockA的鎖
}

public void dec(int m) {
    synchronized(lockB) { // 獲得lockB的鎖
        this.another -= m;
        synchronized(lockA) { // 獲得lockA的鎖
            this.value -= m;
        } // 釋放lockA的鎖
    } // 釋放lockB的鎖
}
           

在擷取多個鎖的時候,不同線程擷取多個不同對象的鎖可能導緻死鎖。對于上述代碼,線程1和線程2如果分别執行

add()

dec()

方法時:

  • 線程1:進入

    add()

    ,獲得

    lockA

  • 線程2:進入

    dec()

    ,獲得

    lockB

随後:

  • 線程1:準備獲得

    lockB

    ,失敗,等待中;
  • 線程2:準備獲得

    lockA

    ,失敗,等待中。

此時,兩個線程各自持有不同的鎖,然後各自試圖擷取對方手裡的鎖,造成了雙方無限等待下去,這就是死鎖。

死鎖發生後,沒有任何機制能解除死鎖,隻能強制結束JVM程序。

是以,在編寫多線程應用時,要特别注意防止死鎖。因為死鎖一旦形成,就隻能強制結束程序。

那麼我們應該如何避免死鎖呢?答案是:線程擷取鎖的順序要一緻。即嚴格按照先擷取

lockA

,再擷取

lockB

的順序,改寫

dec()

方法如下:

public void dec(int m) {
    synchronized(lockA) { // 獲得lockA的鎖
        this.value -= m;
        synchronized(lockB) { // 獲得lockB的鎖
            this.another -= m;
        } // 釋放lockB的鎖
    } // 釋放lockA的鎖
}
           

8)使用wait和notify

在Java程式中,

synchronized

解決了多線程競争的問題。例如,對于一個任務管理器,多個線程同時往隊列中添加任務,可以用

synchronized

加鎖:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }
}
           

但是

synchronized

并沒有解決多線程協調的問題。

仍然以上面的

TaskQueue

為例,我們再編寫一個

getTask()

方法取出隊列的第一個任務:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }

    public synchronized String getTask() {
        while (queue.isEmpty()) {
        }
        return queue.remove();
    }
}
           

上述代碼看上去沒有問題:

getTask()

内部先判斷隊列是否為空,如果為空,就循環等待,直到另一個線程往隊列中放入了一個任務,

while()

循環退出,就可以傳回隊列的元素了。

但實際上

while()

循環永遠不會退出。因為線程在執行

while()

循環時,已經在

getTask()

入口擷取了

this

鎖,其他線程根本無法調用

addTask()

,因為

addTask()

執行條件也是擷取

this

鎖。

是以,執行上述代碼,線程會在

getTask()

中因為死循環而100%占用CPU資源。

如果深入思考一下,我們想要的執行效果是:

  • 線程1可以調用

    addTask()

    不斷往隊列中添加任務;
  • 線程2可以調用

    getTask()

    從隊列中擷取任務。如果隊列為空,則

    getTask()

    應該等待,直到隊列中至少有一個任務時再傳回。

是以,多線程協調運作的原則就是:當條件不滿足時,線程進入等待狀态;當條件滿足時,線程被喚醒,繼續執行任務。

對于上述

TaskQueue

,我們先改造

getTask()

方法,在條件不滿足時,線程進入等待狀态:

public synchronized String getTask() {
    while (queue.isEmpty()) {
        this.wait();
    }
    return queue.remove();
}
           

當一個線程執行到

getTask()

方法内部的

while

循環時,它必定已經擷取到了

this

鎖,此時,線程執行

while

條件判斷,如果條件成立(隊列為空),線程将執行

this.wait()

,進入等待狀态。

這裡的關鍵是:

wait()

方法必須在目前擷取的鎖對象上調用,這裡擷取的是

this

鎖,是以調用

this.wait()

調用

wait()

方法後,線程進入等待狀态,

wait()

方法不會傳回,直到将來某個時刻,線程從等待狀态被其他線程喚醒後,

wait()

方法才會傳回,然後,繼續執行下一條語句。

當一個線程在

this.wait()

等待時,它就會釋放

this

鎖,進而使得其他線程能夠在

addTask()

方法獲得

this

鎖。

現在我們面臨第二個問題:如何讓等待的線程被重新喚醒,然後從

wait()

方法傳回?答案是在相同的鎖對象上調用

notify()

方法。我們修改

addTask()

如下:

public synchronized void addTask(String s) {
    this.queue.add(s);
    this.notify(); // 喚醒在this鎖等待的線程
}
           

注意到在往隊列中添加了任務後,線程立刻對

this

鎖對象調用

notify()

方法,這個方法會喚醒一個正在

this

鎖等待的線程(就是在

getTask()

中位于

this.wait()

的線程),進而使得等待線程從

this.wait()

方法傳回。

完整的例子:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        var q = new TaskQueue();
        var ts = new ArrayList<Thread>();
        for (int i=0; i<5; i++) {
            var t = new Thread() {
                public void run() {
                    // 執行task:
                    while (true) {
                        try {
                            String s = q.getTask();
                            System.out.println("execute task: " + s);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
            t.start();
            ts.add(t);
        }
        var add = new Thread(() -> {
            for (int i=0; i<10; i++) {
                // 放入task:
                String s = "t-" + Math.random();
                System.out.println("add task: " + s);
                q.addTask(s);
                try { Thread.sleep(100); } catch(InterruptedException e) {}
            }
        });
        add.start();
        add.join();
        Thread.sleep(100);
        for (var t : ts) {
            t.interrupt();
        }
    }
}

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();
    }

    public synchronized String getTask() throws InterruptedException {
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
}
           

這個例子中,我們重點關注

addTask()

方法,内部調用了

this.notifyAll()

而不是

this.notify()

,使用

notifyAll()

将喚醒所有目前正在

this

鎖等待的線程,而

notify()

隻會喚醒其中一個(具體哪個依賴作業系統,有一定的随機性)。這是因為可能有多個線程正在

getTask()

方法内部的

wait()

中等待,使用

notifyAll()

将一次性全部喚醒。通常來說,

notifyAll()

更安全。有些時候,如果我們的代碼邏輯考慮不周,用

notify()

會導緻隻喚醒了一個線程,而其他線程可能永遠等待下去醒不過來了。

但是,注意到

wait()

方法傳回時需要重新獲得

this

鎖。假設目前有3個線程被喚醒,喚醒後,首先要等待執行

addTask()

的線程結束此方法後,才能釋放

this

鎖,随後,這3個線程中隻能有一個擷取到

this

鎖,剩下兩個将繼續等待。

9)使用ThreadLocal

對于多任務,Java标準庫提供的線程池可以友善地執行這些任務,同時複用線程。Web應用程式就是典型的多任務應用,每個使用者請求頁面時,我們都會建立一個任務,類似:

public void process(User user) {
    checkPermission();
    doWork();
    saveStatus();
    sendResponse();
}
           

然後,通過線程池去執行這些任務。

觀察

process()

方法,它内部需要調用若幹其他方法,同時,我們遇到一個問題:如何在一個線程内傳遞狀态?

process()

方法需要傳遞的狀态就是

User

執行個體。簡單地傳入

User

就可以了?

public void process(User user) {
    checkPermission(user);
    doWork(user);
    saveStatus(user);
    sendResponse(user);
}
           

但是往往一個方法又會調用其他很多方法,這樣會導緻

User

傳遞到所有地方:

void doWork(User user) {
    queryStatus(user);
    checkStatus();
    setNewStatus(user);
    log();
}
           

這種在一個線程中,橫跨若幹方法調用,需要傳遞的對象,我們通常稱之為上下文(Context),它是一種狀态,可以是使用者身份、任務資訊等。

給每個方法增加一個context參數非常麻煩,而且有些時候,如果調用鍊有無法修改源碼的第三方庫,

User

對象就傳不進去了。

Java标準庫提供了一個特殊的

ThreadLocal

,它可以在一個線程中傳遞同一個對象。

ThreadLocal

執行個體通常總是以靜态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
           
void processUser(user) {
    try {
        threadLocalUser.set(user);
        step1();
        step2();
    } finally {
        threadLocalUser.remove();
    }
}
           

通過設定一個

User

執行個體關聯到

ThreadLocal

中,在移除之前,所有方法都可以随時擷取到該

User

執行個體:

void step1() {
    User u = threadLocalUser.get();
    log();
    printUser();
}

void log() {
    User u = threadLocalUser.get();
    println(u.name);
}

void step2() {
    User u = threadLocalUser.get();
    checkUser(u.id);
}
           

注意到普通的方法調用一定是同一個線程執行的,是以,

step1()

step2()

以及

log()

方法内,

threadLocalUser.get()

擷取的

User

對象是同一個執行個體。實際上,可以把

ThreadLocal

看成一個全局

Map<Thread, Object>

:每個線程擷取

ThreadLocal

變量時,總是使用

Thread

自身作為key。

ThreadLocal

相當于給每個線程都開辟了一個獨立的存儲空間,各個線程的

ThreadLocal

關聯的執行個體互不幹擾。

最後,特别注意

ThreadLocal

一定要在

finally

中清除:

try {
    threadLocalUser.set(user);
    ...
} finally {
    threadLocalUser.remove();
}
           

這是因為目前線程執行完相關代碼後,很可能會被重新放入線程池中,如果

ThreadLocal

沒有被清除,該線程執行其他代碼時,會把上一次的狀态帶進去。

參考文章:

《Java多線程看這一篇就足夠了》

《Java同步塊》

《線程基礎、線程之間的共享和協作》

《并發程式設計(一):線程基礎、線程之間的共享與協作》

《Java守護線程的了解和使用場景》

《廖雪峰的官方網站-多線程》

繼續閱讀