多線程除了我們前面講的競争,其實還有協作。就像我們人一樣,不但要競争,也要學會合作,這樣才能進步。這篇文章我們就講講多線程協作的基本機制wait/notify。同時使用多線程實作生産者/消費者模式。
1.協作的場景
多線程協作的場景有很多,比如:
- 經典的生産者/消費者模式:生産者消費者通過共享隊列實作協作,生産者往隊列中放資料,消費者向隊列中取資料,當隊列滿了的時候,生産者就不能再放了,當隊列空了的時候,消費者就不能取了。
- 同時開始:比如百米賽跑,所有的運動員必須等待裁判吹哨子之後才能開始跑。在一些程式,尤其是模拟仿真程式,要求多個線程同時開始。
- 等待結束:主從協作模式也是一種比較常見的協作模式,主線程将任務分成若幹小任務,為每個小任務建立一個線程,主線程必須等待所有子線程都運作結束後才能繼續。
- 集合點:比如班級去春遊,在集合點必須等到所有同學都來齊了,才能去下一個旅遊點,不能抛下任何一個同學,這是不負責任的。反映在程式中,比如并行疊代計算,每個線程負責一部分計算,然後在集合點等待其他線程完成,一起傳遞資料。
2.wait/notify方法的介紹
wait/notify方法是類Object中的方法,而Object又是所有類的父類,這樣就使得所有的對象都可以調用這兩個方法。
主要有兩個wait方法
public final void wait() throws InterruptedException
public final void wait(long timeout) throws InterruptedException
不帶參數或者參數為0,表示無限期等待。
同時這裡說一下wait和sleep的差別:wait會釋放對象鎖,而且不能主動喚醒,需要其他線程去喚醒它。而sleep不會釋放對象鎖,且到了設定的時間就會主動喚醒。
前面的文章中有說過,每個對象都有一個鎖和一個等待隊列,當一個線程嘗試去擷取對象的鎖失敗時,就會加入該對象的等待隊列。其實對象還有另一個隊列,叫條件隊列,專門用于線程間的協作。
當一個線程調用wait方法後就會把目前線程加入條件隊列中并阻塞,等待一個線程去把它喚醒。喚醒使用notify方法,主要有下面形式:
public final void notify()
public final void notifyAll()
notify方法是在條件隊列中随機選擇一個線程,将其從條件隊列中移除并喚醒。notifyAll方法,顧名思義,就是把條件隊列中的線程都喚醒。
wait的具體過程如下:
1.把目前線程放入條件隊列,釋放對象鎖,線程狀态變為WAITING或TIMED_WAITING。
2.等待時間到或者其他線程調用notify/notifyAll方法喚醒,從條件隊列中移除,但要重新競争鎖。
3.如果獲得了對象鎖,則線程狀态變為RUNNABLE,并從wait方法調用中傳回。否則進入等待隊列,線程狀态變為BLOCKED,隻有獲得鎖之後才從wait方法調用中傳回。
說明:線程被喚醒不一定就能立刻獲得鎖。
ps:wait/notify方法隻能在synchronized塊中被調用。如果調用wait/notify方法時,目前線程沒有獲得對象鎖,則會抛出異常。
3.生産者/消費者模式
生産者/消費者模式中,協作的共享變量是隊列,隊列滿了,則生産者就wait,隊列空了,則消費者就wait。
/**
* 生産者消費者的共享隊列
*/
public class MyBlockingQueue<E> {
private int length;
private Queue<E> queue;
public MyBlockingQueue(int length) {
this.length = length;
queue = new ArrayDeque<E>(length);
}
public synchronized void put(E e) throws InterruptedException {
while(queue.size() == length) {
wait();
}
queue.add(e);
notifyAll();
}
public synchronized E get() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll();
return e;
}
}
該類是生産者消費者的共享隊列,有兩個方法,分别是生産者放資料的put方法,以及消費者取資料的get方法。都加了synchronized進行修飾。兩個方法中都使用了wait方法,等待的條件不一樣,但會加入相同的等待條件隊列,是以這裡要使用notifyAll方法,因為notify隻能喚醒一個線程,如果喚醒的是同類線程那就完蛋了。
隻能有一個條件等待隊列,這是wait/notify機制的局限性。
/**
* 生産者線程
*/
public class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = ;
try {
while(true) {
String value = String.valueOf(num);
queue.put(value);
System.out.println("producer put " + value);
num++;
Thread.sleep((int)(Math.random() * ));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 消費者線程
*/
public class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String value = queue.get();
System.out.println("Consumer get " + value);
Thread.sleep((int)(Math.random() * ));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
搞一個主程式運作
public class ProducerAndConsumer {
public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<>();
new Producer(queue).start();
new Consumer(queue).start();
}
}
運作後會交替的列印生産者線程和消費者線程的存取資訊。
這裡我們使用了ArrayDeque,Java提供了專門的阻塞隊列實作
- 接口BlockingQueue和BlockingDeque
- 基于數組的實作類ArrayBlockingQueue
- 基于連結清單的實作類LinkedBlockingQueue和LinkedBlockingDeque
- 基于堆的實作類PriorityBlockingQueue
在實際開發中,應優先使用這些類。
往期文章
并發基礎知識之線程的基本概念
并發基礎知識之synchronized關鍵字