5、管道流
管道流是用来在多个线程之间进行信息传递的Java流,包括字节管道读取流PipedInputStream和字节管道写入流PipedOutputStream、字符管道读取流PipedReader和字符管道写入流PipedWriter。其中读取流是读取者/消费者/接收者,写入流是写入者/生产者/发送者。
需要注意的是:
- 管道流仅用于多个线程之间传递信息,若用在同一个线程中可能会造成死锁。
- 管道流的输入输出是成对的,一个输出流只能对应一个输入流,使用构造函数或者connect函数进行连接。
- 一对管道流包含一个缓冲区,其默认值为1024个字节,若要改变缓冲区大小,可以使用带有参数的构造函数。
- 管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞。
- 管道依附于线程,因此若线程结束,则虽然管道流对象还在,仍然会报错“read dead end”。
- 管道流的读取方法与普通流不同,只有输出流正确close时,输出流才能读到-1值。
PipedReader/PipedWriter和PipedInputStream/PipedOutputStream源码相似,思路相同,所以以下以PipedInputStream/PipedOutputStream源码为例。
5.1 PipedOutputStream源码分析
由于connect方法在PipedOutputStream中,所以从PipedOutputStream开始看。
java.io.PipedOutputStream继承了基类OutputStream。包含PipedInputStream实例sink,在构造函数中可以与传入管道输入流进行连接。connect方法所做的是将传入管道输入流传给参数sink,并且初始化一些参数和状态。由于管道输入输出流是一一对应的,在进行连接前,connect方法会进行判断,若双方任何一个已有连接则抛出异常。
/*对应的管道输入流*/
private PipedInputStream sink;
/*构造函数:连接传入的管道输入流*/
public PipedOutputStream(PipedInputStream snk) throws IOException {
connect(snk);
}
/*空参数构造函数:未进行连接*/
public PipedOutputStream() {
}
/*connect方法*/
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) {
throw new NullPointerException();
} else if (sink != null || snk.connected) {
throw new IOException("Already connected");
}
sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
PipedOutputStream是生产者/写入者,将数据写到“管道”中,由对应的PipedInputStream来读取,不过缓冲区在PipedInputStream之中,上面connect时初始化的也是对应PipedInputStream中的参数,PipedOutputStream实例在写入时,调用的是对应的消费者/读取者来receive数据。
public void write(int b) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
}
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
sink.receive(b, off, len);
}
其close()方法也只是调用对应PipedInputStream 的receivedLast方法来实现。
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
PipedInputStream中,该方法将closeByWriter置为true并且唤醒所有等待线程,将所有数据写入PipedInputStream的缓冲区。
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();
}
而PipedInputStream的close()方法则是将closedByReader置为True。而closeByWriter和closedByReader两个变量在PipedInputStream的receive以及read方法中有不同的作用,在closeByWriter和closedByReader任一为True的时候都不能再调用receive方法进行写入,而在closeByWriter为True,而closedByReader为False时,若缓冲区仍有数据未读取,则可以继续读取。
public void close() throws IOException {
closedByReader = true;
synchronized (this) {
in = -1;
}
}
5.2 PipedInputStream 源码分析
从PipedOutputStream的源码可以看出来,PipedOutputStream做的只是连接对应的PipedInputStream 实例并在写入时调用对应的receive方法,管道流具体的实现还是主要在PipedInputStream 之中。
java.io.PipedInputStream继承了基类InputStream,其主要参数包含以下几个部分:
- 负责连接与colse的参数
- 读取线程与写入线程
- “管道”,即缓冲区相关参数
/*负责连接与close的参数*/
boolean closedByWriter = false;
volatile boolean closedByReader = false;
boolean connected = false;
/*读取线程与写入线程*/
Thread readSide;
Thread writeSide;
/*“管道”,即缓冲区相关参数*/
private static final int DEFAULT_PIPE_SIZE = 1024;
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
protected byte buffer[];//“管道”
protected int in = -1;//指向下一个“写入”的位置
protected int out = 0;//指向下一个“读取”的位置
而构造方法所做的两件事情分别是:1、为“管道”的缓冲区分配空间。2、连接对应的PipedOutputStream。四个构造方法的区别在于,当传入了缓冲区大小则按照自定义大小分配空间,没有缓冲区大小参数则使用默认大小,当传入PipedOutputStream参数则进行连接,反之则暂时不进行连接。
/*构造方法1:使用默认“管道”大小,并连接传入的PipedOutputStream*/
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
/*构造方法2:自定义“管道”大小,并连接传入的PipedOutputStream*/
public PipedInputStream(PipedOutputStream src, int pipeSize)
throws IOException {
initPipe(pipeSize);
connect(src);
}
/*构造方法3:使用默认“管道”大小,并未进行连接*/
public PipedInputStream() {
initPipe(DEFAULT_PIPE_SIZE);
}
/*构造方法4:自定义“管道”大小,并未进行连接*/
public PipedInputStream(int pipeSize) {
initPipe(pipeSize);
}
/*为“管道”按照大小分配空间*/
private void initPipe(int pipeSize) {
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize];
}
/*PipedInputStream.connect()调用传入PipedOutputStream的connect方法*/
public void connect(PipedOutputStream src) throws IOException {
src.connect(this);
}
主要问题在于receive系列方法及read系列方法。
PipedInputStream的receive方法,在功能上是实现了“写入”的功能的,将传入的数据写入到“管道”之中。
首先涉及两个方法checkStateForReceive和awaitSpace。
checkStateForReceive方法所做的是确认这对管道流可用:1、写入者和读取者是否已连接 2、是否关闭 3、读取线程是否有效。
private void checkStateForReceive() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
}
awaitApace则是在“管道”缓冲区已满的时候,阻塞数据写入。ps:由于这个缓冲区使用时可以看做一个循环队列,缓冲区已满判断条件是in==out,而判断缓冲区为空的条件是in=-1(read的时候缓冲区为空会将in置为-1)。
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
/* full: kick any waiting readers */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
两个receive方法中
receive(int b)
方法比较简单,在判断了管道流可用以及缓冲区未满之后写入,只是在写入到了缓冲区队尾(且缓冲区未满)的时候会跳到队头继续写入。
而
receive(byte b[], int off, int len)
方法在写入字节数组的时候会复杂些。有关“管道”缓冲区,数据有以下三种情况:
- 缓冲区为空:in=-1,out=0,初始时以及在读取的时候发现缓冲区为空会将in置为-1
- 缓冲区有数据但是未满:in<out或者out<in
- 缓冲区已满:int==out
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLi0zaHRGcWdUYuVzVa9GczoVdG1mWfVGc5RHLwkzX39GZhh2csATMflHLwEzX4xSZz91ZsADMx8FdsYkRGZkRG9lcvx2bjxSa2EWNhJTW1AlUxEFeVRUUfRHelRHL2EzXlpXazxyayFWbyVGdhd3LcV2Zh1Wa9M3clN2byBXLzN3btg3PnVGcq5COiVWM0QWZxM2YwUGOilTNiZDZhJjYjVWN4IWNxITMk9CXyAzLchDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL3M3Lc9CX6MHc0RHaiojIsJye.jpeg)
receive(byte b[], int off, int len)
所做的是bytesToTransfer保留还有多少字节未写入,nextTransferAmount保存下一个可写入空间的大小,写入后,若bytesToTransfer仍大于0,则继续循环,判断缓冲区情况,尝试寻找下一个可写入空间直至全部写入。
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;
while (bytesToTransfer > 0) {
if (in == out)/*缓冲区已满:阻塞*/
awaitSpace();
int nextTransferAmount = 0;
/*判断下一个可写入的连续空间的大小*/
if (out < in) {/*当out<in,下一个可写入的空间是in到队尾*/
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {/*当缓冲区为空,下一个可写入的空间是0到队尾*/
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {/*当in<out,下一个可写入的空间是in-->out的空间*/
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)/*如果空间足够写入则写入全部*/
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;/*如果空间不足够写入,则减去已写入的部分,进入下一个循环找下一个可写入的空间*/
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = 0;
}
}
}
read方法包含
read()
和
read(byte b[], int off, int len)
。
read()
方法在判断了是否有关联PipedOutputStream,读取流是否关闭和写入流是否完全关闭之后开始尝试读取数据,有以下情况:
- 缓冲区为空,则先尝试唤醒全部等待线程并等待,等待对应的写入线程是否有未完成的写入。若有则等待写入后读取,若无,则尝试2次之后抛出异常并退出。
- 缓冲区不为空,则直接读取数据,更新参数
public synchronized int read() throws IOException {
/*判断是否有关联PipedOutputStream,读取流是否关闭和写入流是否完全关闭*/
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
/*如果缓冲区为空*/
while (in < 0) {
/*缓冲区为空,并且写入流已经关闭则结束并返回-1*/
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
/*如果写入线程不再活动,并且已经尝试等待2次后仍无数据则抛出异常*/
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/*可能仍有写入线程在等待的,read方法尝试唤醒全部线程并等待,尝试2次后退出并抛出异常*/
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
/*若缓冲区不为空,读取数据,并更新in和out*/
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
public synchronized int read(byte b[], int off, int len) throws IOException {
/*判断传入参数的合理性*/
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
/*尝试调用read()读取第一个字节*/
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;
/*当缓冲区不为空,并且读取的字节数仍不够时则继续读取*/
while ((in >= 0) && (len > 1)) {
/*获取下一次可读取连续空间的字节数*/
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
/*如果这次可读连续空间的字节数已经够了,则只读取len-1个字节*/
if (available > (len - 1)) {
available = len - 1;
}
/*读取数据,并更新参数*/
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}
5.3 PipedOutputStream和PipedInputStream 应用代码
public void Test(){
try{
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
Wtr wtr = new Wtr(pos);
Rdr rdr = new Rdr(pis);
wtr.start();
rdr.start();
} catch (IOException e) {
e.printStackTrace();
}
}
class Wtr extends Thread{
private PipedOutputStream writer;
public Wtr(PipedOutputStream pos){
writer = pos;
}
@Override
public void run(){
String s = "好好学习,天天向上";
byte[] buf = s.getBytes();
System.out.println("Send "+buf.length+" Bytes : "+s);
try {
writer.write(buf,0,buf.length);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Send done");
}
}
class Rdr extends Thread{
private PipedInputStream reader;
public Rdr(PipedInputStream pis){
reader = pis;
}
@Override
public void run(){
ByteArrayOutputStream bis = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int len = 0;
try {
len = reader.read(buf,0,1024);
bis.write(buf,0,len);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("read "+len+"Bytes : "+bis.toString());
}
}
5.4 PipedReader和PipedWriter 应用代码
public void Test(){
try{
PipedReader prd = new PipedReader();
PipedWriter pwr = new PipedWriter(prd);
Wtr wtr = new Wtr(pwr);
Rdr rdr = new Rdr(prd);
wtr.start();
rdr.start();
} catch (IOException e) {
e.printStackTrace();
}
}
class Wtr extends Thread{
private PipedWriter writer;
public Wtr(PipedWriter pwr){
writer = pwr;
}
@Override
public void run(){
String s = "好好学习,天天向上";
char[] chr = s.toCharArray();
System.out.println("Send "+chr.length+" Bytes : "+s);
try {
writer.write(chr,0,chr.length);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Send done");
}
}
class Rdr extends Thread{
private PipedReader reader;
public Rdr(PipedReader prd){
reader = prd;
}
@Override
public void run(){
char[] chr = new char[1024];
int len = 0;
try {
len = reader.read(chr,0,1024);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("read "+len+"Bytes : "+new String(chr));
}
}
当你深入了解,你就会发现世界如此广袤,而你对世界的了解则是如此浅薄,请永远保持谦卑的态度。