天天看點

線程間的通信線程間的通信

線程間的通信

等待、通知機制

什麼是等待通知機制

在單線程程式設計中,要執行的操作需要滿足一定的條件才能執行,可以把這個條件放在if語句塊中

在多線程程式設計中,可能A線程的條件沒有滿足隻是暫時的,稍後其它的線程B可能會更新條件使得A線程的條件得到滿足,可以将A線程暫停,直到它的條件得到滿足後再将A線程喚醒,它的僞代碼:

線程間的通信線程間的通信

object類中的wait()方法可以使執行目前代碼的線程等待,暫停執行,直到接到通知或被中斷為止

注意:

  1. wait()方法,隻能在同步代碼塊中由鎖對象調用
  2. 調用wait()方法,目前線程會釋放鎖

其僞代碼為:

線程間的通信線程間的通信

object類的notify()可以喚醒線程,該方法也必須在同步代碼塊中由鎖對象調用,沒有使用鎖對象調用,wait、notify會抛非法的螢幕異常,如果有多個等待的線程,nofity方法隻能喚醒其中的一個,同步代碼塊中調用nofity方法後并不會立即釋放鎖對象,需要等待目前同步代碼塊執行完後才會釋放鎖對象,一般将nofity方法放在同步代碼塊的最後

僞代碼為:

線程間的通信線程間的通信

wait方法的測試

package cxf.manythread;

/**
 * @author feifei
 * @Date 2021/8/20  21:19:56
 * @Version 1.0
 */
public class WaitTest {
    public static void main(String[] args) {
        String text="cxf";
        String zz="zz";
        System.out.println("before synchronized");
        synchronized (text){
            System.out.println("start synchronized");
            try {
//                zz.wait();//java.lang.IllegalMonitorStateException
                text.wait();//調用wait()方法後,目前線程就會等待,釋放鎖對象,目前線程需要被喚醒,如果沒有喚醒就會一直等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("the method after wait()");
        }
        System.out.println("after the synchronized");
    }
}
           
線程間的通信線程間的通信

程式進入無期限的等待

notify方法的測試

package cxf.manythread;

/**
 * @author feifei
 * @Date 2021/8/20  21:43:12
 * @Version 1.0
 */
public class NotifyTest {
    public static  final  Integer LOCK=1;
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (LOCK){
                    try {
                        System.out.println("線程一開始等待了");
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("線程一被喚醒了,并且執行了");
                }
            }
        }).start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (LOCK){
                    System.out.println("線程二開始喚醒線程一");
                    LOCK.notify();
                }
            }
        }).start();
    }
}
           
線程間的通信線程間的通信

notify不會立即釋放鎖對象

鎖對象在調用完notify方法後并不會立即釋放鎖對象,而是在該notify所在的同步代碼塊執行結束後才會釋放鎖對象

測試代碼:

package cxf.manythread;

import java.util.ArrayList;
import java.util.List;

/**
 * @author feifei
 * @Date 2021/8/20  21:43:12
 * @Version 1.0
 */
public class NotifyTest {
    public static void main(String[] args) {
        List<Integer> list=new ArrayList<>();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (list){
                    if(list.size()!=5){
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("等待的線程蘇醒了");
                }
            }
        });
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (list){
                    for (int i = 0; i < 10; i++) {
                        list.add(i);
                        if(i==5){
                            list.notify();
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("第"+(i+1)+"個元素被添加了");
                    }
                }
            }
        });

        thread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread1.start();
    }
}
           
線程間的通信線程間的通信

這個例子相對還是比較直白的,先讓thread執行了。同時釋放鎖處于等待狀态,thread1開始執行,但執行到i==4時調用了鎖對象的喚醒方法,此時可以看到運作結果,可以看出此時并沒有釋放鎖 而是一直将同步代碼塊執行結束後才釋放鎖,thread列印出 等待的線程蘇醒了在控制台

interrupt()方法會中斷wait()

當線程處于wait()等待狀态時,調用線程對象的interrupt()方法會中斷線程的等待狀态,并産生interruptedException異常

測試代碼如下

package cxf.manythread;

/**
 * @author feifei
 * @Date 2021/8/20  22:27:24
 * @Version 1.0
 */
public class Test {
    public static  final  Object obj=new Object();
    public static void main(String[] args) {
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (obj){
                    try {
                        obj.wait();
                    } catch (Exception e) {
                        System.out.println("等待解除");
                    }
                }
            }
        });
        thread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread.interrupt();
    }
}
           
線程間的通信線程間的通信

notify和nofityall

nofity一次隻能喚醒一個線程,如果有多個等待的線程,隻能随機喚醒其中的一個,想要喚醒所有等待線程,需要調用nofityall

測試代碼如下:

package cxf.manythread;
/**
 * @author feifei
 * @Date 2021/8/21  09:35:17
 * @Version 1.0
 */
public class Test06 {
    public static void main(String[] args) {
        Object obj=new Object();
        for (int i = 0; i <5 ; i++) {
            new MyThreadTest(obj).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (obj){
            obj.notify();
//            obj.notifyAll();
        }
    }
}
class MyThreadTest extends  Thread{
    private Object obj;
    public MyThreadTest(Object obj){
        this.obj=obj;
    }

    @Override
    public void run() {
       synchronized (obj){
           System.out.println(Thread.currentThread().getName()+"開始睡覺啦");
           try {
               obj.wait();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println(Thread.currentThread().getName()+"被喚醒了");

       }
    }
}
           

同步代碼塊中調用obj.nofity();的話運作結果如下,nofity隻能喚醒某一個處于wait狀态的線程,其他的即使此時鎖對象處于空閑也不能運作,需要等待喚醒,且此代碼中無法被喚醒程式進入阻塞狀态。

線程間的通信線程間的通信

調用nofityall()的結果,喚醒所有等待的線程,競争鎖直到所有方法執行結束

線程間的通信線程間的通信

wait(long)方法的使用

wait(long)帶有long類型參數的wait()等待,如果在參數指定的時間内沒有被喚醒,逾時後會自動喚醒

package cxf.manythread;
/**
 * @author feifei
 * @Date 2021/8/21  09:35:17
 * @Version 1.0
 */
public class Test06 {
    public static void main(String[] args) {
        String lock="lock";
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    System.out.println("開始等待");
                    try {
                        lock.wait(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("1秒時間到了自動喚醒");
                }
            }
        }).start();
    }
}
           
線程間的通信線程間的通信

時間到了會自動喚醒該線程,需要重新獲得鎖

通知過早

先看以下代碼:

package cxf.manythread;
/**
 * @author feifei
 * @Date 2021/8/21  09:35:17
 * @Version 1.0
 */
public class Test06 {
    public static void main(String[] args) {
        String lock="lock";
        Thread thread1=new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("wait start");
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("wait end");
                }
            }
        });
        Thread thread2=new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    System.out.println("notify start");
                    lock.notify();
                    System.out.println("notify end");
                }
            }
        });
//        thread2.start();
//        thread1.start();

    }
}
           

實際上,調用線程的start方法,就是高速線程排程器,目前線程準備就緒,線程排程器在什麼時候調用這個線程不确定,即調用start開啟線程的順序,并不一定就是線程實際開啟的順序

在這段代碼中先開啟線程一的執行在開啟線程二的執行,大多數情況下,線程一先等待,線程二再把線程一喚醒

線程間的通信線程間的通信

如果是線程二先執行,通知過早,則線程一就陷入無限等待的情況

線程間的通信線程間的通信

這種情況下如過提前通知的情況下,就沒有必要等待了,可以用個信号量判斷下就好啦,代碼如下:

package cxf.manythread;
/**
 * @author feifei
 * @Date 2021/8/21  09:35:17
 * @Version 1.0
 */
public class Test06 {
    public static  boolean flag=true;
    public static void main(String[] args) {
        String lock="lock";
        Thread thread1=new Thread(new Runnable() {
            @Override
            public void run() {

                    synchronized (lock) {
                        if (flag) {
                            System.out.println("wait start");
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("wait end");
                        }
                    }

            }
        });
        Thread thread2=new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    System.out.println("notify start");
                    lock.notify();
                    flag=false;
                    System.out.println("notify end");
                }
            }
        });
        thread2.start();
        thread1.start();



    }
}
           
線程間的通信線程間的通信

提前通知的話,就讓線程一不再等待了,一定要要把新标量的判斷指派在同步代碼塊中,要保證資料的可見性,否則

wait等待條件發生了變化

如下代碼:

package cxf.manythread;
import java.util.ArrayList;

/**
 * @author feifei
 * @Date 2021/8/21  09:35:17
 * @Version 1.0
 */
public class Test06 {
    public static ArrayList<Integer> list=new ArrayList<>();
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                getNum();
            }
        });
        Thread thread1=new Thread(new Runnable() {
            @Override
            public void run() {
                addNum();
            }
        });
        thread.start();
        thread1.start();

    }
    public static  void getNum(){
        synchronized (list){
            if(list.size()==0){
                try {
                    System.out.println("begin wait");
                    list.wait();
                    System.out.println("end wait");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        list.remove(0);
        System.out.println(Thread.currentThread().getName()+"取出了一個資料");
    }
    public static  void addNum(){
        synchronized (list){
            list.add((int)(Math.random()*10));
            list.notify();
            System.out.println("添加資料完成了");
        }
    }
}
           

代碼大緻上就是一個線程往list裡添加資料,一個取出資料,兩個線程因為wait方法和notify的調用的時機。

線程間的通信線程間的通信

運作結果也是正常的,但是如果在同樣添加一個取的線程呢,此時就涉及到了wait等待的條件發生了變化,代碼同樣的隻是添加了一個取資料的線程

線程間的通信線程間的通信

此時抛出了角标越界的異常,這個異常的原因也比較容易分析,第二個線程在等待的時候,等待的條件發生了變化,即使被喚醒了,線程一已經将list中唯一的資料取出,是以要對程式進行更改将從list取出數組的代碼進行更改如下:

public static  void getNum(){
    synchronized (list){
        while (list.size()==0){
            try {
                System.out.println("begin wait");
                list.wait();
                System.out.println("end wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove(0);
        System.out.println(Thread.currentThread().getName()+"取出了一個資料");
    }

}
           

生産者消費者模式

在java中,負責産生資料的子產品是生産者,負責使用資料的子產品是消費者,生産者消費者解決資料的平衡問題,即先有資料然後才能使用,消費者需要等待

1.操作值

  • 模拟多消費者多生産者的情況
    package cxf.manythread.test;
    
    import java.util.UUID;
    
    /**
     * @author feifei
     * @Date 2021/8/21  14:34:03
     * @Version 1.0
     */
    public class Value {
        private String value="";
        public static void main(String[] args) {
            Value val=new Value();
            Thread thread4=new SetVal(val);
            Thread thread5=new SetVal(val);
            Thread thread6=new SetVal(val);
            Thread thread1=new GetVal(val);
            Thread thread2=new GetVal(val);
            Thread thread3=new GetVal(val);
            thread1.start();
            thread2.start();
            thread3.start();
            thread4.start();
            thread5.start();
            thread6.start();
        }
        public synchronized  void  set(){
            synchronized (this){
                while(!"".equals(value)){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                value=UUID.randomUUID().toString();
                System.out.println("将值設定為"+value);
                this.notify();
            }
        }
        public synchronized  void get(){
            synchronized (this){
                while("".equals(value)){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("得到值為"+value);
                value="";
                this.notify();
            }
        }
    }
    class GetVal extends  Thread{
        private  Value val;
        public GetVal(Value val){
            this.val=val;
        }
        @Override
        public void run() {
            while (true){
                val.get();
            }
        }
    }
    class SetVal extends  Thread{
        private  Value val;
        public SetVal(Value val){
            this.val=val;
        }
        @Override
        public void run() {
            while (true){
                val.set();
            }
        }
    }
               

運作結果如下:

線程間的通信線程間的通信

這裡造成線程假死的原因,在多生産者多消費者的模型下notify喚醒的不一定是不同類型的,如果喚醒的是同類型的即有可能會使線程陷入死鎖,大家都處于等待狀态,此時應該使用notifyall(),即使是同類型的得到鎖對象,不滿足條件仍然會回到等待,釋放鎖,直到對應的不同類型的線程得到鎖,線程繼續運作

代碼如下及運作結果:

package cxf.manythread.test;
import java.util.UUID;
/**
 * @author feifei
 * @Date 2021/8/21  14:34:03
 * @Version 1.0
 */
public class Value {
    private String value="";
    public static void main(String[] args) {
        Value val=new Value();
        Thread thread4=new SetVal(val);
        Thread thread5=new SetVal(val);
        Thread thread6=new SetVal(val);
        Thread thread1=new GetVal(val);
        Thread thread2=new GetVal(val);
        Thread thread3=new GetVal(val);
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
        thread6.start();
    }
    public synchronized  void  set(){
        synchronized (this){
            while(!"".equals(value)){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            value=UUID.randomUUID().toString();
            System.out.println("将值設定為"+value);
            this.notifyAll();
        }
    }
    public synchronized  void get(){
        synchronized (this){
            while("".equals(value)){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("得到值為"+value);
            value="";
            this.notifyAll();
        }
    }
}
class GetVal extends  Thread{
    private  Value val;
    public GetVal(Value val){
        this.val=val;
    }
    @Override
    public void run() {
        while (true){
            val.get();
        }
    }
}
class SetVal extends  Thread{
    private  Value val;
    public SetVal(Value val){
        this.val=val;
    }
    @Override
    public void run() {
        while (true){
            val.set();
        }
    }
}
           
線程間的通信線程間的通信

2.操作棧

了解到了上邊的死鎖情況,我們再去實作一個多壓棧多彈棧的操作應該就不是很難了,我這裡用的是linkedlist模拟的棧的操作

package cxf.manythread.test;
import java.util.LinkedList;

/**
 * @author feifei
 * @Date 2021/8/21  14:34:12
 * @Version 1.0
 */
public class Test {
    private static LinkedList<Integer> linkedList=new LinkedList<>();
    private static  final Integer SIZE=5;
    public static void main(String[] args) {
            Test test=new Test();
            Thread thread1=new POP(test);
            Thread thread2=new POP(test);
            Thread thread3=new POP(test);
            Thread thread4=new Push(test);
            Thread thread5=new Push(test);
            Thread thread6=new Push(test);

            thread1.start();
            thread2.start();
            thread3.start();
            thread4.start();
            thread5.start();
            thread6.start();
    }
    public synchronized void push(){
        while(linkedList.size()>=SIZE){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        linkedList.push((int)(Math.random()*10));
        System.out.println("添加了資料,目前有"+linkedList.size()+"個資料");
        this.notifyAll();
    }
    public synchronized  void pop(){
        while(linkedList.size()==0){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        linkedList.pop();
        System.out.println("彈出了資料,目前有"+linkedList.size()+"個資料");
        this.notifyAll();
    }
}
class POP extends  Thread{
    private Test test;
    public POP(Test test){
        this.test=test;
    }

    @Override
    public void run() {
        while(true){
            test.pop();
        }
    }
}
class Push extends  Thread{
    private Test test;
    public Push(Test test){
        this.test=test;
    }

    @Override
    public void run() {
        while(true){
            test.push();
        }
    }
}
           
線程間的通信線程間的通信

通過管道實作線程間的通信

在java.io包中的pipeStream管道流用于線上程之間傳送資料,一個線程發送資料到輸出管道,另外一個線程從輸入管道中讀取資料。相關得類包括:pipeinputstream和pipeoutputstream,pipereader和pipedwriter

代碼示例:

package cxf.manythread.test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

/**
 * @author feifei
 * @Date 2021/8/21  16:26:35
 * @Version 1.0
 */
public class PipeStreamTest {
    public static void main(String[] args) {
          PipedInputStream in=new PipedInputStream();
          PipedOutputStream out=new PipedOutputStream();
        try {
            in.connect(out);
        } catch (IOException e) {
            e.printStackTrace();
        }
        Thread thread =new Thread(new Runnable() {
            @Override
            public void run() {
                writeDate(out);
            }
        });

        Thread thread1=new Thread(new Runnable() {
            @Override
            public void run() {
                readDate(in);
            }
        });
        thread1.setName("cxf");
        thread1.start();
        thread.start();
    }
    public  static  void  writeDate(PipedOutputStream out){
        try {
            for (int i = 0; i <100 ; i++) {
                String date=i+"";
                out.write(date.getBytes());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static  void readDate(PipedInputStream in){
        try {
            byte[] bytes=new byte[1024];
            int len;
            while((len=in.read(bytes))!=-1){
                System.out.println(Thread.currentThread().getName()+new String(bytes,0,len));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
           
線程間的通信線程間的通信

繼續閱讀