線程間的通信
等待、通知機制
什麼是等待通知機制
在單線程程式設計中,要執行的操作需要滿足一定的條件才能執行,可以把這個條件放在if語句塊中
在多線程程式設計中,可能A線程的條件沒有滿足隻是暫時的,稍後其它的線程B可能會更新條件使得A線程的條件得到滿足,可以将A線程暫停,直到它的條件得到滿足後再将A線程喚醒,它的僞代碼:

object類中的wait()方法可以使執行目前代碼的線程等待,暫停執行,直到接到通知或被中斷為止
注意:
- wait()方法,隻能在同步代碼塊中由鎖對象調用
- 調用wait()方法,目前線程會釋放鎖
其僞代碼為:
object類的notify()可以喚醒線程,該方法也必須在同步代碼塊中由鎖對象調用,沒有使用鎖對象調用,wait、notify會抛非法的螢幕異常,如果有多個等待的線程,nofity方法隻能喚醒其中的一個,同步代碼塊中調用nofity方法後并不會立即釋放鎖對象,需要等待目前同步代碼塊執行完後才會釋放鎖對象,一般将nofity方法放在同步代碼塊的最後
僞代碼為:
wait方法的測試
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/20 21:19:56
* @Version 1.0
*/
public class WaitTest {
public static void main(String[] args) {
String text="cxf";
String zz="zz";
System.out.println("before synchronized");
synchronized (text){
System.out.println("start synchronized");
try {
// zz.wait();//java.lang.IllegalMonitorStateException
text.wait();//調用wait()方法後,目前線程就會等待,釋放鎖對象,目前線程需要被喚醒,如果沒有喚醒就會一直等待
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("the method after wait()");
}
System.out.println("after the synchronized");
}
}
程式進入無期限的等待
notify方法的測試
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/20 21:43:12
* @Version 1.0
*/
public class NotifyTest {
public static final Integer LOCK=1;
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (LOCK){
try {
System.out.println("線程一開始等待了");
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程一被喚醒了,并且執行了");
}
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
synchronized (LOCK){
System.out.println("線程二開始喚醒線程一");
LOCK.notify();
}
}
}).start();
}
}
notify不會立即釋放鎖對象
鎖對象在調用完notify方法後并不會立即釋放鎖對象,而是在該notify所在的同步代碼塊執行結束後才會釋放鎖對象
測試代碼:
package cxf.manythread;
import java.util.ArrayList;
import java.util.List;
/**
* @author feifei
* @Date 2021/8/20 21:43:12
* @Version 1.0
*/
public class NotifyTest {
public static void main(String[] args) {
List<Integer> list=new ArrayList<>();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (list){
if(list.size()!=5){
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("等待的線程蘇醒了");
}
}
});
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (list){
for (int i = 0; i < 10; i++) {
list.add(i);
if(i==5){
list.notify();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第"+(i+1)+"個元素被添加了");
}
}
}
});
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread1.start();
}
}
這個例子相對還是比較直白的,先讓thread執行了。同時釋放鎖處于等待狀态,thread1開始執行,但執行到i==4時調用了鎖對象的喚醒方法,此時可以看到運作結果,可以看出此時并沒有釋放鎖 而是一直将同步代碼塊執行結束後才釋放鎖,thread列印出 等待的線程蘇醒了在控制台
interrupt()方法會中斷wait()
當線程處于wait()等待狀态時,調用線程對象的interrupt()方法會中斷線程的等待狀态,并産生interruptedException異常
測試代碼如下
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/20 22:27:24
* @Version 1.0
*/
public class Test {
public static final Object obj=new Object();
public static void main(String[] args) {
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
synchronized (obj){
try {
obj.wait();
} catch (Exception e) {
System.out.println("等待解除");
}
}
}
});
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.interrupt();
}
}
notify和nofityall
nofity一次隻能喚醒一個線程,如果有多個等待的線程,隻能随機喚醒其中的一個,想要喚醒所有等待線程,需要調用nofityall
測試代碼如下:
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/21 09:35:17
* @Version 1.0
*/
public class Test06 {
public static void main(String[] args) {
Object obj=new Object();
for (int i = 0; i <5 ; i++) {
new MyThreadTest(obj).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (obj){
obj.notify();
// obj.notifyAll();
}
}
}
class MyThreadTest extends Thread{
private Object obj;
public MyThreadTest(Object obj){
this.obj=obj;
}
@Override
public void run() {
synchronized (obj){
System.out.println(Thread.currentThread().getName()+"開始睡覺啦");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"被喚醒了");
}
}
}
同步代碼塊中調用obj.nofity();的話運作結果如下,nofity隻能喚醒某一個處于wait狀态的線程,其他的即使此時鎖對象處于空閑也不能運作,需要等待喚醒,且此代碼中無法被喚醒程式進入阻塞狀态。
調用nofityall()的結果,喚醒所有等待的線程,競争鎖直到所有方法執行結束
wait(long)方法的使用
wait(long)帶有long類型參數的wait()等待,如果在參數指定的時間内沒有被喚醒,逾時後會自動喚醒
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/21 09:35:17
* @Version 1.0
*/
public class Test06 {
public static void main(String[] args) {
String lock="lock";
new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
System.out.println("開始等待");
try {
lock.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1秒時間到了自動喚醒");
}
}
}).start();
}
}
時間到了會自動喚醒該線程,需要重新獲得鎖
通知過早
先看以下代碼:
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/21 09:35:17
* @Version 1.0
*/
public class Test06 {
public static void main(String[] args) {
String lock="lock";
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("wait start");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait end");
}
}
});
Thread thread2=new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
System.out.println("notify start");
lock.notify();
System.out.println("notify end");
}
}
});
// thread2.start();
// thread1.start();
}
}
實際上,調用線程的start方法,就是高速線程排程器,目前線程準備就緒,線程排程器在什麼時候調用這個線程不确定,即調用start開啟線程的順序,并不一定就是線程實際開啟的順序
在這段代碼中先開啟線程一的執行在開啟線程二的執行,大多數情況下,線程一先等待,線程二再把線程一喚醒
如果是線程二先執行,通知過早,則線程一就陷入無限等待的情況
這種情況下如過提前通知的情況下,就沒有必要等待了,可以用個信号量判斷下就好啦,代碼如下:
package cxf.manythread;
/**
* @author feifei
* @Date 2021/8/21 09:35:17
* @Version 1.0
*/
public class Test06 {
public static boolean flag=true;
public static void main(String[] args) {
String lock="lock";
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
if (flag) {
System.out.println("wait start");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait end");
}
}
}
});
Thread thread2=new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
System.out.println("notify start");
lock.notify();
flag=false;
System.out.println("notify end");
}
}
});
thread2.start();
thread1.start();
}
}
提前通知的話,就讓線程一不再等待了,一定要要把新标量的判斷指派在同步代碼塊中,要保證資料的可見性,否則
wait等待條件發生了變化
如下代碼:
package cxf.manythread;
import java.util.ArrayList;
/**
* @author feifei
* @Date 2021/8/21 09:35:17
* @Version 1.0
*/
public class Test06 {
public static ArrayList<Integer> list=new ArrayList<>();
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
getNum();
}
});
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
addNum();
}
});
thread.start();
thread1.start();
}
public static void getNum(){
synchronized (list){
if(list.size()==0){
try {
System.out.println("begin wait");
list.wait();
System.out.println("end wait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
list.remove(0);
System.out.println(Thread.currentThread().getName()+"取出了一個資料");
}
public static void addNum(){
synchronized (list){
list.add((int)(Math.random()*10));
list.notify();
System.out.println("添加資料完成了");
}
}
}
代碼大緻上就是一個線程往list裡添加資料,一個取出資料,兩個線程因為wait方法和notify的調用的時機。
運作結果也是正常的,但是如果在同樣添加一個取的線程呢,此時就涉及到了wait等待的條件發生了變化,代碼同樣的隻是添加了一個取資料的線程
此時抛出了角标越界的異常,這個異常的原因也比較容易分析,第二個線程在等待的時候,等待的條件發生了變化,即使被喚醒了,線程一已經将list中唯一的資料取出,是以要對程式進行更改将從list取出數組的代碼進行更改如下:
public static void getNum(){
synchronized (list){
while (list.size()==0){
try {
System.out.println("begin wait");
list.wait();
System.out.println("end wait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove(0);
System.out.println(Thread.currentThread().getName()+"取出了一個資料");
}
}
生産者消費者模式
在java中,負責産生資料的子產品是生産者,負責使用資料的子產品是消費者,生産者消費者解決資料的平衡問題,即先有資料然後才能使用,消費者需要等待
1.操作值
- 模拟多消費者多生産者的情況
package cxf.manythread.test; import java.util.UUID; /** * @author feifei * @Date 2021/8/21 14:34:03 * @Version 1.0 */ public class Value { private String value=""; public static void main(String[] args) { Value val=new Value(); Thread thread4=new SetVal(val); Thread thread5=new SetVal(val); Thread thread6=new SetVal(val); Thread thread1=new GetVal(val); Thread thread2=new GetVal(val); Thread thread3=new GetVal(val); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); thread6.start(); } public synchronized void set(){ synchronized (this){ while(!"".equals(value)){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } value=UUID.randomUUID().toString(); System.out.println("将值設定為"+value); this.notify(); } } public synchronized void get(){ synchronized (this){ while("".equals(value)){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("得到值為"+value); value=""; this.notify(); } } } class GetVal extends Thread{ private Value val; public GetVal(Value val){ this.val=val; } @Override public void run() { while (true){ val.get(); } } } class SetVal extends Thread{ private Value val; public SetVal(Value val){ this.val=val; } @Override public void run() { while (true){ val.set(); } } }
運作結果如下:
這裡造成線程假死的原因,在多生産者多消費者的模型下notify喚醒的不一定是不同類型的,如果喚醒的是同類型的即有可能會使線程陷入死鎖,大家都處于等待狀态,此時應該使用notifyall(),即使是同類型的得到鎖對象,不滿足條件仍然會回到等待,釋放鎖,直到對應的不同類型的線程得到鎖,線程繼續運作
代碼如下及運作結果:
package cxf.manythread.test;
import java.util.UUID;
/**
* @author feifei
* @Date 2021/8/21 14:34:03
* @Version 1.0
*/
public class Value {
private String value="";
public static void main(String[] args) {
Value val=new Value();
Thread thread4=new SetVal(val);
Thread thread5=new SetVal(val);
Thread thread6=new SetVal(val);
Thread thread1=new GetVal(val);
Thread thread2=new GetVal(val);
Thread thread3=new GetVal(val);
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();
}
public synchronized void set(){
synchronized (this){
while(!"".equals(value)){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
value=UUID.randomUUID().toString();
System.out.println("将值設定為"+value);
this.notifyAll();
}
}
public synchronized void get(){
synchronized (this){
while("".equals(value)){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("得到值為"+value);
value="";
this.notifyAll();
}
}
}
class GetVal extends Thread{
private Value val;
public GetVal(Value val){
this.val=val;
}
@Override
public void run() {
while (true){
val.get();
}
}
}
class SetVal extends Thread{
private Value val;
public SetVal(Value val){
this.val=val;
}
@Override
public void run() {
while (true){
val.set();
}
}
}
2.操作棧
了解到了上邊的死鎖情況,我們再去實作一個多壓棧多彈棧的操作應該就不是很難了,我這裡用的是linkedlist模拟的棧的操作
package cxf.manythread.test;
import java.util.LinkedList;
/**
* @author feifei
* @Date 2021/8/21 14:34:12
* @Version 1.0
*/
public class Test {
private static LinkedList<Integer> linkedList=new LinkedList<>();
private static final Integer SIZE=5;
public static void main(String[] args) {
Test test=new Test();
Thread thread1=new POP(test);
Thread thread2=new POP(test);
Thread thread3=new POP(test);
Thread thread4=new Push(test);
Thread thread5=new Push(test);
Thread thread6=new Push(test);
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();
}
public synchronized void push(){
while(linkedList.size()>=SIZE){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
linkedList.push((int)(Math.random()*10));
System.out.println("添加了資料,目前有"+linkedList.size()+"個資料");
this.notifyAll();
}
public synchronized void pop(){
while(linkedList.size()==0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
linkedList.pop();
System.out.println("彈出了資料,目前有"+linkedList.size()+"個資料");
this.notifyAll();
}
}
class POP extends Thread{
private Test test;
public POP(Test test){
this.test=test;
}
@Override
public void run() {
while(true){
test.pop();
}
}
}
class Push extends Thread{
private Test test;
public Push(Test test){
this.test=test;
}
@Override
public void run() {
while(true){
test.push();
}
}
}
通過管道實作線程間的通信
在java.io包中的pipeStream管道流用于線上程之間傳送資料,一個線程發送資料到輸出管道,另外一個線程從輸入管道中讀取資料。相關得類包括:pipeinputstream和pipeoutputstream,pipereader和pipedwriter
代碼示例:
package cxf.manythread.test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
/**
* @author feifei
* @Date 2021/8/21 16:26:35
* @Version 1.0
*/
public class PipeStreamTest {
public static void main(String[] args) {
PipedInputStream in=new PipedInputStream();
PipedOutputStream out=new PipedOutputStream();
try {
in.connect(out);
} catch (IOException e) {
e.printStackTrace();
}
Thread thread =new Thread(new Runnable() {
@Override
public void run() {
writeDate(out);
}
});
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
readDate(in);
}
});
thread1.setName("cxf");
thread1.start();
thread.start();
}
public static void writeDate(PipedOutputStream out){
try {
for (int i = 0; i <100 ; i++) {
String date=i+"";
out.write(date.getBytes());
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void readDate(PipedInputStream in){
try {
byte[] bytes=new byte[1024];
int len;
while((len=in.read(bytes))!=-1){
System.out.println(Thread.currentThread().getName()+new String(bytes,0,len));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}