天天看點

Java多線程詳解(六)------ 線程間通信(等待通知及管道流)

文章目錄

      • 1 等待/通知機制
        • 1.1 不使用wait/notify機制實作線程間通信
        • 1.2 等待wait/通知notify機制的實作
        • 1.3 方法wait()鎖釋放與notify()鎖不釋放
        • 1.4 方法wait(long)的使用
      • 2 生産者/消費者模式實作
      • 3 通過管道進行線程間通信
        • 3.1 位元組流
        • 3.2 字元流

1 等待/通知機制

A的執行需要依賴于B的一些條件,A進入等待狀态

wait

,B持續執行,當達到A期望的條件時,通知A繼續執行任務

notify

。這就是線程間的通信。

1.1 不使用wait/notify機制實作線程間通信

使用

while

語句輪詢機制來檢測某一個條件,這樣做有缺陷。如果輪詢的時間間隔很小

while(true)

,非常浪費

CPU

資源;如果輪詢的時間間隔很大,有可能取不到想要的結果。

1.2 等待wait/通知notify機制的實作

方法

wait()

的作用是使目前執行代碼的線程進行等待,該方法用來将目前線程置入預執行隊列中,并且在

wait()

所在的代碼行處停止執行,直到接到通知或被中斷為止。在調用

wait()

之前,線程必須獲得該對象的對象級别鎖,即隻能在同步方法或同步塊中調用

wait()

方法。調用

wait()

方法後,目前線程釋放鎖。

方法

notify()

也要在同步方法或同步塊中調用,如果沒有獲得鎖的情況下調用

wait()

notify()

,會抛出

IllegalMonitorStateException

。該方法用來通知那些等待該對象的對象鎖的其它線程,若有多個線程等待,則由線程規劃器随機挑選出其中一個呈

wait

狀态的線程,對其發出通知

notify

,并使它擷取該對象的對象鎖。

在執行完

notify()

方法後,目前線程不會馬上釋放該對象鎖,呈

wait()

狀态的線程也并不能馬上擷取該對象鎖,要等到執行

notify()

方法的線程将程式執行完,也就是退出

synchronized

代碼塊後,目前線程才會釋放鎖。

wait

使線程停止運作,而notify使停止的線程繼續運作

Java多線程詳解(六)------ 線程間通信(等待通知及管道流)
  • yield()

    方法的作用是放棄目前的CPU資源,将它讓給其它的任務去占用CPU執行時間。但放棄的時間不确定,有可能剛放棄,馬上又獲得CPU時間片。
  • 新建立一個新的線程對象後,再調用它的

    start()

    方法,系統會為此線程配置設定

    CPU

    資源,使其處于

    Runnable

    (可運作)狀态,這是一個準備運作的階段。如果線程搶占到

    CPU

    資源,此線程就處于

    Running

    (運作)狀态。

Blocked

是阻塞的意思,出現阻塞的情況大體分為如下5中:

  1. 線程調用了

    sleep()

    方法,主動放棄占用的處理器資源;
  2. 線程調用了阻塞式

    IO

    方法,在該方法傳回前,該線程被阻塞;
  3. 線程試圖獲得一個同步螢幕,但該同步螢幕正被其他線程所持有;
  4. 線程等待某個通知;
  5. 程式調用了

    suspend

    方法将該線程挂起。此方法容易導緻死鎖,盡量避免使用該方法。

每個鎖對象都有兩個隊列,一個是就緒隊列,一個是阻塞隊列。就緒隊列存儲了将要獲得鎖的線程,阻塞隊列存儲了被阻塞的線程。一個線程被喚醒後,才會進入就緒隊列,等待

CPU

的排程;反之,一個線程被

wait

後,就會進入阻塞隊列,等待下一次被喚醒。

1.3 方法wait()鎖釋放與notify()鎖不釋放

方法

wait()

被執行後,鎖被自動釋放,但執行完

notify()

方法,鎖卻不自動釋放,必須執行完

notify()

方法所在的同步

synchronized

代碼塊後才釋放鎖。

notify()

一次隻會随機通知一個線程進行喚醒,使用

notifyAll()

方法喚醒所有線程。

sleep()

方法不釋放鎖。

  1. 執行完同步代碼塊就會釋放對象的鎖;
  2. 在執行同步代碼塊的過程中,遇到異常而導緻線程終止,鎖也會被釋放。
  3. 在執行同步代碼塊的過程中,執行了鎖所屬對象的

    wait()

    方法,這個線程會釋放對象鎖,而此線程對象會進入線程等待池中,等待被喚醒。

1.4 方法wait(long)的使用

帶一個參數的

wait(long)

方法的功能時等待某一時間内是否有線程對鎖進行喚醒,如果超過這個時間則自動喚醒,也可以提前進行喚醒。

通知過早:

notify

先執行,然後再觸發

wait

,則會導緻

wait

所在的線程不會收到通知,其後續代碼就不會執行了。

2 生産者/消費者模式實作

public class ValueObject {
    public static String value = "";
}
public class P {
    private String lock;

    public P() {
    }

    public P(String lock) {
        this.lock = lock;
    }

    public void setValue() {
        try {
            synchronized (lock) {
                if (!ValueObject.value.equals("")) {
                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println("set的值" + value);
                ValueObject.value = value;
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class C {
    private String lock;
    public C() {
    }
    public C(String lock) {
        this.lock = lock;
    }
    public void getValue() {
        try {
            synchronized (lock) {
                if (ValueObject.value.equals("")) {
                    lock.wait();
                }
                System.out.println("get的值是" + ValueObject.value);
                ValueObject.value = "";
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadP extends Thread {
    private P p;
    public ThreadP() {
    }
    public ThreadP(P p) {
        this.p = p;
    }
    @Override
    public void run() {
        while (true) {
            p.setValue();
        }
    }
}
public class ThreadC extends Thread {
    private C c;
    public ThreadC() {
    }
    public ThreadC(C c) {
        this.c = c;
    }
    @Override
    public void run() {
        while (true) {
            c.getValue();
        }
    }
}
public class Run {
    public static void main(String[] args) {
        String lock = new String("");
        P p = new P(lock);
        C c = new C(lock);
        ThreadP pThread = new ThreadP(p);
        ThreadC cThread = new ThreadC(c);
        pThread.start();
        cThread.start();
    }
}
           

生産者執行代碼發現内容不為空,則等待,釋放鎖,線程對象

while(true)

一直判斷;消費者拿到鎖,消費掉内容。

3 通過管道進行線程間通信

管道流(

pipeStream

)是一種特殊的流,用于在不同線程間直接傳送資料。一個線程發送資料到輸出管道,另一個線程從輸入管道中讀資料。

JDK

提供了4個類來使線程間可以進行通信:

  • PipedInputStream

    PipedOutputStream

  • PipedReader

    PipedWriter

3.1 位元組流

位元組流寫的是位元組資料。

public class ReadData {
    public void readMethod(PipedInputStream input) {
        try {
            System.out.println("read :");
            byte[] byteArray = new byte[20];
            // 阻塞式IO
            int readLength = input.read(byteArray);
            while (readLength != -1) {
                String newData = new String(byteArray, 0, readLength);
                System.out.print(newData);
                readLength = input.read(byteArray);
            }
            System.out.println();
            input.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class WriteData {
    public void writeMethod(PipedOutputStream out) {
        try {
            System.out.println("write :");
            for (int i = 0; i < 300; i++) {
                String outData = "" + (i + 1);
                out.write(outData.getBytes());
                System.out.print(outData);
            }
            System.out.println();
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadRead extends Thread {
    private ReadData read;
    private PipedInputStream input;

    public ThreadRead(ReadData read, PipedInputStream input) {
        this.read = read;
        this.input = input;
    }

    @Override
    public void run() {
        read.readMethod(input);
    }
}
public class ThreadWrite extends Thread {
    private WriteData write;
    private PipedOutputStream out;

    public ThreadWrite(WriteData write, PipedOutputStream out) {
        this.write = write;
        this.out = out;
    }

    @Override
    public void run() {
        write.writeMethod(out);
    }
}
public class Run {
    public static void main(String[] args) {
        try {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();

            PipedInputStream inputStream = new PipedInputStream();
            PipedOutputStream outputStream = new PipedOutputStream();

            // inputStream.connect(outputStream);
            outputStream.connect(inputStream);

            ThreadRead threadRead = new ThreadRead(readData, inputStream);
            threadRead.start();

            Thread.sleep(2000);

            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
            threadWrite.start();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

使用

inputStream.connect(outputStream);

outputStream.connect(inputStream);

的作用使兩個

Stream

之間産生通信連結,這樣才可以将資料進行輸出與輸入。

在上面的代碼中,首先是讀取線程

new ThreadRead(inputStream)

啟動,由于此刻沒有資料被寫入,是以線程阻塞在

int readLength = input.read(byteArray);

代碼處,直到有資料被寫入,才繼續向下運作。

3.2 字元流

字元流寫的是字元及字元串。

PipedReader

PipedWriter

,使用方式上通位元組流。