天天看點

(五)PipedInputStream與PipedOutputStream

PipedInputStream與PipedOutputStream需要配套使用,用于同一個程序之間不同線程的通信。

首先先看一下PipedOutputStream源碼

//内部持有一個PipedInputStream的引用,通過connect()将兩者綁定在一起,當然通過PipedInputStream.connect也可以進行綁定,效果都是一樣的,後面可以看到。
private PipedInputStream sink;

 public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

      public PipedOutputStream() {
    }

       public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
        snk.in = -;//緩存下标
        snk.out = ;//未讀下标
        snk.connected = true;
    }
           
//觀察write()函數可以發現,寫入的資料通過PipedInputStream.receive()傳到了PipedInputStream内部,自身并沒有進行任何資料的緩存。
 public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(b);
    }

       public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < ) || (off > b.length) || (len < ) ||
                   ((off + len) > b.length) || ((off + len) < )) {
            throw new IndexOutOfBoundsException();
        } else if (len == ) {
            return;
        }
        sink.receive(b, off, len);
    }
           
//重新整理函數也隻是讀線程激活。
 public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }
   //關閉流也是通過持有輸入流引用進行,receivedLast()函數主要是激活所有讀線程開始讀取。
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }
           

PipedOutputStream的源碼比較簡單,重點還是要分析一下PipedInputStream源碼

boolean closedByWriter = false;//輸出流是否關閉
    volatile boolean closedByReader = false;//輸入流是否關閉
    boolean connected = false;//是否關聯

    Thread readSide;//目前讀線程
    Thread writeSide;//目前寫線程
    //緩存區大小
    private static final int DEFAULT_PIPE_SIZE = ;
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    //緩存數組
    protected byte buffer[];
    //存儲下标
    protected int in = -;
    //讀取下标
    protected int out = ;
    //當in == out表示緩存區所有資料都已讀
           
//構造函數可以設定緩存區大小

 public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }   
    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
         initPipe(pipeSize);
         connect(src);
    }

    public PipedInputStream() {
        initPipe(DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(int pipeSize) {
        initPipe(pipeSize);
    }

    private void initPipe(int pipeSize) {
         if (pipeSize <= ) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

   //connect函數可以進行輸入/輸出流的綁定
   //最後調用的還是PipedOutputStream.connect()
    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }
           
//當輸出流輸出資料時被調用,資料都過該函數存儲到輸入流中
//這兩個函數隻會被PipedOutputStream執行個體通過PipedInputStream引用進行調用。
 protected synchronized void receive(int b) throws IOException {
        checkStateForReceive();//檢查狀态,是否綁定啊是否被關閉啊
        writeSide = Thread.currentThread();
        if (in == out)//沒有資料可以讀取,需要進行線程阻塞等待資料
            awaitSpace();
        if (in < ) {
            in = ;
            out = ;
        }
        buffer[in++] = (byte)(b & xFF);
        if (in >= buffer.length) {//循環數組
            in = ;
        }
    }


    synchronized void receive(byte b[], int off, int len)  throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        //循環寫入,直到結束
        while (bytesToTransfer > ) {
            if (in == out)//等待資料寫入
                awaitSpace();
            int nextTransferAmount = ;
            //這一段用于計算可存儲長度
            if (out < in) {//讀取比寫入慢
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -) {//整個數組可存
                    in = out = ;
                    nextTransferAmount = buffer.length - in;
                } else {//buffer[in]~buffer[out]之間的資料是讀過的,buffer[out]-buffer[buffer.length-1]和buffer[0]~buffer[in]的資料是未讀的
                    nextTransferAmount = out - in;
                }
            }
            //取最小
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;

            assert(nextTransferAmount > );
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {
                in = ;
            }
        }
    }
           
private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }

    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait();
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    synchronized void receivedLast() {
        closedByWriter = true;
        notifyAll();
    }
           
public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < )) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = ;
        while (in < ) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < )) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait();
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & ;
        if (out >= buffer.length) {
            out = ;
        }
        if (in == out) {
            /* now empty */
            in = -;
        }

        return ret;
    }


    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off <  || len <  || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == ) {
            return ;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < ) {
            return -;
        }
        b[off] = (byte) c;
        int rlen = ;
        while ((in >= ) && (len > )) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - )) {
                available = len - ;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = ;
            }
            if (in == out) {
                /* now empty */
                in = -;
            }
        }
        return rlen;
    }
           

總結

  1. 通過PipedOutputStream輸出的資料直接寫入到PipedInputStream的緩存數組中了。
  2. 當寫線程寫入資料時,發現沒有空間寫入時,激活所有讀線程讀取資料并阻塞自身線程,直到有空間寫入或被讀線程所通知。
  3. 當讀線程讀取資料,發現沒有資料可讀時,通知所有寫線程寫入資料并阻塞自身線程,直到有資料可讀或被寫線程通知。