天天看點

并發基礎知識之線程間的協作

多線程除了我們前面講的競争,其實還有協作。就像我們人一樣,不但要競争,也要學會合作,這樣才能進步。這篇文章我們就講講多線程協作的基本機制wait/notify。同時使用多線程實作生産者/消費者模式。

1.協作的場景

多線程協作的場景有很多,比如:

  • 經典的生産者/消費者模式:生産者消費者通過共享隊列實作協作,生産者往隊列中放資料,消費者向隊列中取資料,當隊列滿了的時候,生産者就不能再放了,當隊列空了的時候,消費者就不能取了。
  • 同時開始:比如百米賽跑,所有的運動員必須等待裁判吹哨子之後才能開始跑。在一些程式,尤其是模拟仿真程式,要求多個線程同時開始。
  • 等待結束:主從協作模式也是一種比較常見的協作模式,主線程将任務分成若幹小任務,為每個小任務建立一個線程,主線程必須等待所有子線程都運作結束後才能繼續。
  • 集合點:比如班級去春遊,在集合點必須等到所有同學都來齊了,才能去下一個旅遊點,不能抛下任何一個同學,這是不負責任的。反映在程式中,比如并行疊代計算,每個線程負責一部分計算,然後在集合點等待其他線程完成,一起傳遞資料。

2.wait/notify方法的介紹

wait/notify方法是類Object中的方法,而Object又是所有類的父類,這樣就使得所有的對象都可以調用這兩個方法。

主要有兩個wait方法

public final void wait() throws InterruptedException
public final void wait(long timeout) throws InterruptedException
           

不帶參數或者參數為0,表示無限期等待。

同時這裡說一下wait和sleep的差別:wait會釋放對象鎖,而且不能主動喚醒,需要其他線程去喚醒它。而sleep不會釋放對象鎖,且到了設定的時間就會主動喚醒。

前面的文章中有說過,每個對象都有一個鎖和一個等待隊列,當一個線程嘗試去擷取對象的鎖失敗時,就會加入該對象的等待隊列。其實對象還有另一個隊列,叫條件隊列,專門用于線程間的協作。

當一個線程調用wait方法後就會把目前線程加入條件隊列中并阻塞,等待一個線程去把它喚醒。喚醒使用notify方法,主要有下面形式:

public final void notify()
public final void notifyAll()
           

notify方法是在條件隊列中随機選擇一個線程,将其從條件隊列中移除并喚醒。notifyAll方法,顧名思義,就是把條件隊列中的線程都喚醒。

wait的具體過程如下:

1.把目前線程放入條件隊列,釋放對象鎖,線程狀态變為WAITING或TIMED_WAITING。

2.等待時間到或者其他線程調用notify/notifyAll方法喚醒,從條件隊列中移除,但要重新競争鎖。

3.如果獲得了對象鎖,則線程狀态變為RUNNABLE,并從wait方法調用中傳回。否則進入等待隊列,線程狀态變為BLOCKED,隻有獲得鎖之後才從wait方法調用中傳回。

說明:線程被喚醒不一定就能立刻獲得鎖。

ps:wait/notify方法隻能在synchronized塊中被調用。如果調用wait/notify方法時,目前線程沒有獲得對象鎖,則會抛出異常。

3.生産者/消費者模式

生産者/消費者模式中,協作的共享變量是隊列,隊列滿了,則生産者就wait,隊列空了,則消費者就wait。

/**
 * 生産者消費者的共享隊列
 */
public class MyBlockingQueue<E> {
    private int length;
    private Queue<E> queue;
    public MyBlockingQueue(int length) {
        this.length = length;
        queue = new ArrayDeque<E>(length);
    }

    public synchronized void put(E e) throws InterruptedException {
        while(queue.size() == length) {
            wait();
        }
        queue.add(e);
        notifyAll();
    }

    public synchronized E get() throws InterruptedException {
        while(queue.isEmpty()) {
            wait();
        }
        E e = queue.poll();
        notifyAll();
        return e;
    }
}
           

該類是生産者消費者的共享隊列,有兩個方法,分别是生産者放資料的put方法,以及消費者取資料的get方法。都加了synchronized進行修飾。兩個方法中都使用了wait方法,等待的條件不一樣,但會加入相同的等待條件隊列,是以這裡要使用notifyAll方法,因為notify隻能喚醒一個線程,如果喚醒的是同類線程那就完蛋了。

隻能有一個條件等待隊列,這是wait/notify機制的局限性。

/**
 * 生産者線程
 */
public class Producer extends Thread {
    MyBlockingQueue<String> queue;
    public Producer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        int num = ;
        try {
            while(true) {
                String value = String.valueOf(num);
                queue.put(value);
                System.out.println("producer put " + value);
                num++;
                Thread.sleep((int)(Math.random() * ));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
           
/**
 * 消費者線程
 */
public class Consumer extends Thread {
    MyBlockingQueue<String> queue;
    public Consumer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while(true) {
                String value = queue.get();
                System.out.println("Consumer get " + value);
                Thread.sleep((int)(Math.random() * ));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
           

搞一個主程式運作

public class ProducerAndConsumer {
    public static void main(String[] args) {
        MyBlockingQueue<String> queue = new MyBlockingQueue<>();
        new Producer(queue).start();
        new Consumer(queue).start();
    }
}
           

運作後會交替的列印生産者線程和消費者線程的存取資訊。

這裡我們使用了ArrayDeque,Java提供了專門的阻塞隊列實作

  • 接口BlockingQueue和BlockingDeque
  • 基于數組的實作類ArrayBlockingQueue
  • 基于連結清單的實作類LinkedBlockingQueue和LinkedBlockingDeque
  • 基于堆的實作類PriorityBlockingQueue

在實際開發中,應優先使用這些類。

往期文章

并發基礎知識之線程的基本概念

并發基礎知識之synchronized關鍵字

繼續閱讀