天天看點

用生活的故事總結一下線程通信

線程之間通信,就像人與人之間通信一樣重要。

我覺得了解一個問題,就要建立在我們已有的認知上,就會容易很多。

我将列舉幾個生活的例子,然後再對比的線程之間通信的例子。

# # 線程通信 就像是人和人通信

  先用直白的話來講,線程通信的目的,就和人之間的通信的目的一樣。為了交換資訊,為了通知消息。

# #   場景一,約好一起做

  比方說,你周末想約隔壁的王阿姨,一起去買菜。不要問為什麼是和王阿姨,因為一個人去買菜有點孤獨。這就要以資訊的方式先通知到王阿姨。比方你想 約王阿姨十點去買菜,你可以給王阿姨發一個短信,可以發一個微信,可以打一個電話,還可以在門口等王阿姨回家的時候約一下她。

  那麼線程為什麼要通信呢?仔細想一下,是不是有這些場景需要通知到對方?比方說,線程A要約線程B一起去買菜,A線程已經出門了(執行了一些操作),要等等B線程來了再一起操作。

  這個通信的方式就有很多個已經實作好的。

 # #   場景一,方案一:Thread 的 join方法。

join()方法是Thread類的一個執行個體方法。它的作用是讓目前線程陷入“等待”狀态,等join的這個線程執行完成後,再繼續執行目前線程。

有時候,主線程建立并啟動了子線程,如果子線程中需要進行大量的耗時運算,主線程往往将早于子線程結束之前結束。

如果主線程想等待子線程執行完畢後,獲得子線程中的處理完的某個資料,就要用到join方法了。

示例代碼:

/**
 * @author angus
 * @create 2020-04-18 12:38
 */
public class ObjThread {
  public static void main(String[] args) throws InterruptedException {
    Object obj = new Object();
    AtomicReference<String> temp = new AtomicReference<>();
    Thread thread = new Thread(()->{
      System.out.println("開始:線程A");
      try {
        Thread.sleep(3);
        System.out.println("休息完成");
        temp.set("10");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
    thread.start();
    //thread.join();
    System.out.println("main線程要執行");
    System.out.println(temp.get());
    
  }


}      

  輸出的結果,如果不加 join,那麼main線程就會有 

注意join()方法有兩個重載方法,一個是join(long), 一個是join(long, int)。

實際上,通過源碼你會發現,join()方法及其重載方法底層都是利用了wait(long)這個方法。

對于join(long, int),通過檢視源碼(JDK 1.8)發現,底層并沒有精确到納秒,而是對第二個參數做了簡單的判斷和處理。

   關于join 的還可以參考我的另外一篇文章

 # #   場景一,方案二:CountDownLatch

  先來解讀一下CountDownLatch這個類名字的意義。CountDown代表計數遞減,Latch是“門闩”的意思。也有人把它稱為“屏障”。而CountDownLatch這個類的作用也很貼合這個名字的意義,假設某個線程在執行任務之前,需要等待其它線程完成一些前置任務,必須等所有的前置任務都完成,才能開始執行本線程的任務。

CountDownLatch的方法也很簡單,如下:

// 構造方法:
public CountDownLatch(int count)

public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 逾時等待
public void countDown() // count - 1
public long getCount() // 擷取目前還有多少count      

我們知道,玩遊戲的時候,在遊戲真正開始之前,一般會等待一些前置任務完成,比如“加載地圖資料”,“加載人物模型”,“加載背景音樂”等等。隻有當所有的東西都加載完成後,玩家才能真正進入遊戲。下面我們就來模拟一下這個demo。

public class CountDownLatchDemo {
    // 定義前置任務線程
    static class PreTaskThread implements Runnable {

        private String task;
        private CountDownLatch countDownLatch;

        public PreTaskThread(String task, CountDownLatch countDownLatch) {
            this.task = task;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + " - 任務完成");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 假設有三個子產品需要加載
        CountDownLatch countDownLatch = new CountDownLatch(3);

        // 主任務
        new Thread(() -> {
            try {
                System.out.println("等待資料加載...");
                System.out.println(String.format("還有%d個前置任務", countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("資料加載完成,正式開始遊戲!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 前置任務
        new Thread(new PreTaskThread("加載地圖資料", countDownLatch)).start();
        new Thread(new PreTaskThread("加載人物模型", countDownLatch)).start();
        new Thread(new PreTaskThread("加載背景音樂", countDownLatch)).start();
    }
}      

輸出:

等待資料加載...

還有3個前置任務

加載人物模型 - 任務完成

加載背景音樂 - 任務完成

加載地圖資料 - 任務完成

資料加載完成,正式開始遊戲!

 # #   場景一,方案三:對象鎖

利對象的 wait方法,和notify方法實作互相的通信 ,來看一下例子

/**
 * @author angus
 * @create 2020-04-18 12:38
 */
public class ObjThread {
  public static void main(String[] args) throws InterruptedException {
    Object obj = new Object();
    Thread thread = new Thread(()->{
      synchronized (obj){
        System.out.println("開始:線程A");
        try {
          obj.wait();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println("線程A:我醒了");
      }

    });
    thread.start();

    Thread thread2 = new Thread(()->{
      synchronized (obj){
        System.out.println("開始:線程B");
        obj.notify();

      }

    });
    thread2.start();

    //System.out.println("main線程,喚醒線程A");

  }


}      

  運作結果是:

開始:線程A

開始:線程B

線程A:我醒了

# #  場景二:為了合了解決資源競争的問題

  比方說,商場就剩了最後一份韭菜,王阿姨特别想要。你也特别想要。

  我們經常會遇到這樣的情況,線程A想要改一個資料,線程B也要改一個資料,這個資料恰好是同一個。比方說扣票的場景。線程A想要扣一張票,線程B也想扣一張票。而事實上恰好隻剩下最後一張了,隻能有一個人扣。否則都扣的話。就變成 -1,不符合實際情況,以為到時候得有一個人沒座位。

# # # 場景二,方案一,使用鎖

  在我們的程式中,解決競争問題,最常用的就是鎖。鎖也有很多種,其中一個重量級的鎖就是 synchronized, 這個是使用到了對象鎖。同時隻有 一個線程能夠拿到這個對象鎖,這個鎖一旦被線程拿到,其他的線程再想通路,就必須要等待了。

  看個例子:

public class ObjectLock {
    private static Object lock = new Object();

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread A " + i);
                }
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread B " + i);
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(10);
        new Thread(new ThreadB()).start();
    }
}      

    除了synchronized 這個重量級鎖,還有其他的鎖。這裡不做過多的介紹。隻要明白,鎖也是線程之間通信的一種方式。并且這個場景往往是,一塊資源同時隻能被一個占用的情況。

# # # 場景二,方案二,使用信号量 Semaphore

  其中資源競争問題,往往有很多多個資源的問題。比方說,停車場有兩百個車位,那麼同時可以停下兩百輛車。而信号量則是一個比較适合的解決方案。  

 JDK提供了一個類似于“信号量”功能的類​

​Semaphore​

​​。但本文不是要介紹這個類,而是介紹一種基于​

​volatile​

​關鍵字的自己實作的信号量通信。

Semaphore往往用于資源有限的場景中,去限制線程的數量。舉個例子,我想限制同時隻能有3個線程在工作:

public class SemaphoreDemo {
    static class MyThread implements Runnable {

        private int value;
        private Semaphore semaphore;

        public MyThread(int value, Semaphore semaphore) {
            this.value = value;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire(); // 擷取permit
                System.out.println(String.format("目前線程是%d, 還剩%d個資源,還有%d個線程在等待",
                        value, semaphore.availablePermits(), semaphore.getQueueLength()));
                // 睡眠随機時間,打亂釋放順序
                Random random =new Random();
                Thread.sleep(random.nextInt(1000));
                semaphore.release(); // 釋放permit
                System.out.println(String.format("線程%d釋放了資源", value));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread(i, semaphore)).start();
        }
    }
}      

輸出:

目前線程是1, 還剩2個資源,還有0個線程在等待

目前線程是0, 還剩1個資源,還有0個線程在等待

目前線程是6, 還剩0個資源,還有0個線程在等待

線程6釋放了資源

目前線程是2, 還剩0個資源,還有6個線程在等待

線程2釋放了資源

目前線程是4, 還剩0個資源,還有5個線程在等待

線程0釋放了資源

目前線程是7, 還剩0個資源,還有4個線程在等待

線程1釋放了資源

目前線程是8, 還剩0個資源,還有3個線程在等待

線程7釋放了資源

目前線程是5, 還剩0個資源,還有2個線程在等待

線程4釋放了資源

目前線程是3, 還剩0個資源,還有1個線程在等待

線程8釋放了資源

目前線程是9, 還剩0個資源,還有0個線程在等待

線程9釋放了資源

線程5釋放了資源

線程3釋放了資源

可以看到,在這次運作中,最開始是1, 0, 6這三個線程獲得了資源,而其它線程進入了等待隊列。然後當某個線程釋放資源後,就會有等待隊列中的線程獲得資源。

當然,Semaphore預設的acquire方法是會讓線程進入等待隊列,且會抛出中斷異常。但它還有一些方法可以忽略中斷或不進入阻塞隊列:

// 忽略中斷
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)

// 不進入等待隊列,底層使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)      

# # 場景三:傳遞消息 

  比方說,你想告訴王阿姨,她很美。王阿姨也想告訴你,你很帥。

  再舉一個不恰當的例子。王阿姨要告訴你一個資訊,今天她家裡就她自己,你可以來跟她一起看電影了。

  注意邏輯,A線程把消息告訴了B線程,B線程才能接着根據這個消息去做其他的事。或者說,隻有B線程隻有拿到了A線程的消息以後才能接着去做其他事。

# # # 場景三 方案一 :Exchanger類

  這個類是已經實作好了的,具體看怎麼用:Exchanger類用于兩個線程交換資料。它支援泛型,也就是說你可以在兩個線程之間傳送任何資料。先來一個案例看看如何使用,比如兩個線程之間想要傳送字元串:

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                System.out.println("這是線程A,得到了另一個線程的資料:"
                        + exchanger.exchange("這是來自線程A的資料"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        System.out.println("這個時候線程A是阻塞的,在等待線程B的資料");
        Thread.sleep(1000);

        new Thread(() -> {
            try {
                System.out.println("這是線程B,得到了另一個線程的資料:"
                        + exchanger.exchange("這是來自線程B的資料"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}      

   請注意看,線程A 調用了 exchanger.exchange 以後,線程A 接着就進入了阻塞狀态。隻有另外一個線程B 或者 C 同樣調用一下這個方法,A才會接着執行,同時,兩個線程會彼此交換資訊。線程A傳給線程B 一份資訊,然後進入阻塞狀态,然後B接受了消息,并傳回了一個消息。A收到了以後,A繼續執行,B也執行。

# # # 場景三 方案二:管道

為了能和王阿姨偷偷的通信,還可以有獨特的方式。比方說和王阿姨約定好,把寫好的信放在了公園的擦長凳下邊。每次。彼此都去長等下拿信。

管道是基于“管道流”的通信方式。JDK提供了​

​PipedWriter​

​​、 ​

​PipedReader​

​​、 ​

​PipedOutputStream​

​​、 ​

​PipedInputStream​

​。其中,前面兩個是基于字元的,後面兩個是基于位元組流的。

這裡的示例代碼使用的是基于字元的:

public class Pipe {
    static class ReaderThread implements Runnable {
        private PipedReader reader;

        public ReaderThread(PipedReader reader) {
            this.reader = reader;
        }

        @Override
        public void run() {
            System.out.println("this is reader");
            int receive = 0;
            try {
                while ((receive = reader.read()) != -1) {
                    System.out.print((char)receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class WriterThread implements Runnable {

        private PipedWriter writer;

        public WriterThread(PipedWriter writer) {
            this.writer = writer;
        }

        @Override
        public void run() {
            System.out.println("this is writer");
            int receive = 0;
            try {
                writer.write("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        writer.connect(reader); // 這裡注意一定要連接配接,才能通信

        new Thread(new ReaderThread(reader)).start();
        Thread.sleep(1000);
        new Thread(new WriterThread(writer)).start();
    }
}

// 輸出:
this is reader
this is writer
test      

我們通過線程的構造函數,傳入了​

​PipedWrite​

​​和​

​PipedReader​

​對象。可以簡單分析一下這個示例代碼的執行流程:

  1. 線程ReaderThread開始執行,
  2. 線程ReaderThread使用管道reader.read()進入”阻塞“,
  3. 線程WriterThread開始執行,
  4. 線程WriterThread用writer.write("test")往管道寫入字元串,
  5. 線程WriterThread使用writer.close()結束管道寫入,并執行完畢,
  6. 線程ReaderThread接受到管道輸出的字元串并列印,
  7. 線程ReaderThread執行完畢。

管道通信的應用場景:

這個很好了解。使用管道多半與I/O流相關。當我們一個線程需要先另一個線程發送一個資訊(比如字元串)或者檔案等等時,就需要使用管道通信了。

# # # 場景三 方案三:ThreadLocal

  嚴格意義上來說:這種也不叫線程之間通信。 比方說 隔壁王叔叔想要瞞着王阿姨存點私房錢,王阿姨也想存放一點私房錢,王叔叔要用作和我喝酒,王阿姨好拿私房錢用來和我一起看電影。

  這樣的場景,就是我們往往會遇到不同的線程,在一個變量裡邊存放的是自己特有的值。比方說A在這個變量裡邊存放的是 A,B線程存放的是B,A和B存放在通一個地方,A看到的是A,B看到的是B,A不能看到B,B不能看到是A。

ThreadLocal是一個本地線程副本變量工具類。内部是一個弱引用的Map來維護。這裡不詳細介紹它的原理,而是隻是介紹它的使用,以後有獨立章節來介紹ThreadLocal類的原理。

有些朋友稱ThreadLocal為線程本地變量或線程本地存儲。嚴格來說,ThreadLocal類并不屬于多線程間的通信,而是讓每個線程有自己”獨立“的變量,線程之間互不影響。它為每個線程都建立一個副本,每個線程可以通路自己内部的副本變量。

ThreadLocal類最常用的就是set方法和get方法。示例代碼:

public class ThreadLocalDemo {
    static class ThreadA implements Runnable {
        private ThreadLocal<String> threadLocal;

        public ThreadA(ThreadLocal<String> threadLocal) {
            this.threadLocal = threadLocal;
        }

        @Override
        public void run() {
            threadLocal.set("A");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ThreadA輸出:" + threadLocal.get());
        }

        static class ThreadB implements Runnable {
            private ThreadLocal<String> threadLocal;

            public ThreadB(ThreadLocal<String> threadLocal) {
                this.threadLocal = threadLocal;
            }

            @Override
            public void run() {
                threadLocal.set("B");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("ThreadB輸出:" + threadLocal.get());
            }
        }

        public static void main(String[] args) {
            ThreadLocal<String> threadLocal = new ThreadLocal<>();
            new Thread(new ThreadA(threadLocal)).start();
            new Thread(new ThreadB(threadLocal)).start();
        }
    }
}

// 輸出:
ThreadA輸出:A
ThreadB輸出:B      

可以看到,雖然兩個線程使用的同一個ThreadLocal執行個體(通過構造方法傳入),但是它們各自可以存取自己目前線程的一個值。

那ThreadLocal有什麼作用呢?如果隻是單純的想要線程隔離,在每個線程中聲明一個私有變量就好了呀,為什麼要使用ThreadLocal?

如果開發者希望将類的某個靜态變量(user ID或者transaction ID)與線程狀态關聯,則可以考慮使用ThreadLocal。

最常見的ThreadLocal使用場景為用來解決資料庫連接配接、Session管理等。資料庫連接配接和Session管理涉及多個複雜對象的初始化和關閉。如果在每個線程中聲明一些私有變量來進行操作,那這個線程就變得不那麼“輕量”了,需要頻繁的建立和關閉連接配接。