PipedInputStream與PipedOutputStream需要配套使用,用于同一個程序之間不同線程的通信。
首先先看一下PipedOutputStream源碼
//内部持有一個PipedInputStream的引用,通過connect()将兩者綁定在一起,當然通過PipedInputStream.connect也可以進行綁定,效果都是一樣的,後面可以看到。
private PipedInputStream sink;
public PipedOutputStream(PipedInputStream snk) throws IOException {
connect(snk);
}
public PipedOutputStream() {
}
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) {
throw new NullPointerException();
} else if (sink != null || snk.connected) {
throw new IOException("Already connected");
}
sink = snk;
snk.in = -;//緩存下标
snk.out = ;//未讀下标
snk.connected = true;
}
//觀察write()函數可以發現,寫入的資料通過PipedInputStream.receive()傳到了PipedInputStream内部,自身并沒有進行任何資料的緩存。
public void write(int b) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
}
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();
} else if ((off < ) || (off > b.length) || (len < ) ||
((off + len) > b.length) || ((off + len) < )) {
throw new IndexOutOfBoundsException();
} else if (len == ) {
return;
}
sink.receive(b, off, len);
}
//重新整理函數也隻是讀線程激活。
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
//關閉流也是通過持有輸入流引用進行,receivedLast()函數主要是激活所有讀線程開始讀取。
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
PipedOutputStream的源碼比較簡單,重點還是要分析一下PipedInputStream源碼
boolean closedByWriter = false;//輸出流是否關閉
volatile boolean closedByReader = false;//輸入流是否關閉
boolean connected = false;//是否關聯
Thread readSide;//目前讀線程
Thread writeSide;//目前寫線程
//緩存區大小
private static final int DEFAULT_PIPE_SIZE = ;
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
//緩存數組
protected byte buffer[];
//存儲下标
protected int in = -;
//讀取下标
protected int out = ;
//當in == out表示緩存區所有資料都已讀
//構造函數可以設定緩存區大小
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
public PipedInputStream(PipedOutputStream src, int pipeSize)
throws IOException {
initPipe(pipeSize);
connect(src);
}
public PipedInputStream() {
initPipe(DEFAULT_PIPE_SIZE);
}
public PipedInputStream(int pipeSize) {
initPipe(pipeSize);
}
private void initPipe(int pipeSize) {
if (pipeSize <= ) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize];
}
//connect函數可以進行輸入/輸出流的綁定
//最後調用的還是PipedOutputStream.connect()
public void connect(PipedOutputStream src) throws IOException {
src.connect(this);
}
//當輸出流輸出資料時被調用,資料都過該函數存儲到輸入流中
//這兩個函數隻會被PipedOutputStream執行個體通過PipedInputStream引用進行調用。
protected synchronized void receive(int b) throws IOException {
checkStateForReceive();//檢查狀态,是否綁定啊是否被關閉啊
writeSide = Thread.currentThread();
if (in == out)//沒有資料可以讀取,需要進行線程阻塞等待資料
awaitSpace();
if (in < ) {
in = ;
out = ;
}
buffer[in++] = (byte)(b & xFF);
if (in >= buffer.length) {//循環數組
in = ;
}
}
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;
//循環寫入,直到結束
while (bytesToTransfer > ) {
if (in == out)//等待資料寫入
awaitSpace();
int nextTransferAmount = ;
//這一段用于計算可存儲長度
if (out < in) {//讀取比寫入慢
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -) {//整個數組可存
in = out = ;
nextTransferAmount = buffer.length - in;
} else {//buffer[in]~buffer[out]之間的資料是讀過的,buffer[out]-buffer[buffer.length-1]和buffer[0]~buffer[in]的資料是未讀的
nextTransferAmount = out - in;
}
}
//取最小
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > );
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = ;
}
}
}
private void checkStateForReceive() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
}
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
/* full: kick any waiting readers */
notifyAll();
try {
wait();
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();
}
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < )) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = ;
while (in < ) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < )) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait();
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & ;
if (out >= buffer.length) {
out = ;
}
if (in == out) {
/* now empty */
in = -;
}
return ret;
}
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < || len < || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == ) {
return ;
}
/* possibly wait on the first character */
int c = read();
if (c < ) {
return -;
}
b[off] = (byte) c;
int rlen = ;
while ((in >= ) && (len > )) {
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - )) {
available = len - ;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = ;
}
if (in == out) {
/* now empty */
in = -;
}
}
return rlen;
}
總結
- 通過PipedOutputStream輸出的資料直接寫入到PipedInputStream的緩存數組中了。
- 當寫線程寫入資料時,發現沒有空間寫入時,激活所有讀線程讀取資料并阻塞自身線程,直到有空間寫入或被讀線程所通知。
- 當讀線程讀取資料,發現沒有資料可讀時,通知所有寫線程寫入資料并阻塞自身線程,直到有資料可讀或被寫線程通知。