1.引言
Java I/O系統是建立在資料流概念之上的,而在UNIX/Linux中有一個類似的概念,就是管道,它具有将一個程式的輸出當作另一個程式的輸入的能力。在Java中,可以使用管道流進行線程之間的通信,輸入流和輸出流必須相連接配接,這樣的通信有别于一般的Shared Data通信,其不需要一個共享的資料空間。
2.相關類及其關系
1)位元組流:
分為管道輸出流(PipedOutputStream)和管道輸入流(PipedInputStream),利用 java.io.PipedOutputStream和java.io.PipedInputStream可以實作線程之間的二進制資訊傳輸。如果要進行管道輸出,則必須把輸出流連在輸入流上。 java.io.PipedOutputStream是java.io.OutputStream的直接子類,而java.io. PipedInputStream是java.io.InputStream的直接子類。PipedOutputStream和 PipedInputStream往往成對出現、配合使用。舉例說明:
TestPipe.Java
import java.io.IOException;
public class TestPipe {
public static void main(String[] args) {
Send s = new Send();
Receive r = new Receive();
try {
s.getPos().connect(r.getPis()); // 連接配接管道
} catch (IOException e) {
e.printStackTrace();
}
new Thread(s).start(); // 啟動線程
new Thread(r).start(); // 啟動線程
}
}
Receive.java
import java.io.IOException;
import java.io.PipedInputStream;
class Receive implements Runnable { // 實作Runnable接口
private PipedInputStream pis = null;
public Receive() {
this.pis = new PipedInputStream(); // 執行個體化輸入流
}
public void run() {
byte b[] = new byte[1024];
int len = 0;
try {
len = this.pis.read(b); // 接收資料
} catch (IOException e) {
e.printStackTrace();
}
try {
this.pis.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("接收的内容為:" + new String(b, 0, len));
}
public PipedInputStream getPis() {
return pis;
}
}
Send.java
import java.io.IOException;
import java.io.PipedOutputStream;
class Send implements Runnable {
// 實作Runnable接口
private PipedOutputStream pos = null; // 管道輸出流
public Send() {
this.pos = new PipedOutputStream();// 執行個體化輸出流
}
public void run() {
String str = "Hello World!!!";
try {
this.pos.write(str.getBytes()); // 輸出資訊
} catch (IOException e) {
e.printStackTrace();
}
try {
this.pos.close(); // 關閉輸出流
} catch (IOException e) {
e.printStackTrace();
}
}
public PipedOutputStream getPos() { // 通過線程類得到輸出流
return pos;
}
}
我們可以看到使用管道流,通過connect方法進行連接配接,實作了Send線程和Receive線程之間的通信。
注意:
PipedInputStream中實際是用了一個1024位元組固定大小的循環緩沖區。寫入PipedOutputStream的資料實際上儲存到對應的 PipedInputStream的内部緩沖區。從PipedInputStream執行讀操作時,讀取的資料實際上來自這個内部緩沖區。如果對應的 PipedInputStream輸入緩沖區已滿,任何企圖寫入PipedOutputStream的線程都将被阻塞。而且這個寫操作線程将一直阻塞,直至出現讀取PipedInputStream的操作從緩沖區删除資料。這也就是說往PipedOutputStream寫資料的線程Send若是和從PipedInputStream讀資料的線程Receive是同一個線程的話,那麼一旦Send線程發送資料過多(大于1024位元組),它就會被阻塞,這就直接導緻接受資料的線程阻塞而無法工作(因為是同一個線程嘛),那麼這就是一個典型的死鎖現象,這也就是為什麼javadoc中關于這兩個類的使用時告訴大家要在多線程環境下使用的原因了。

應用:過濾器模式
使用這個模式的典型例子是Unix的shell指令。這個模式的好處在于過濾器無需知道它與何種東西進行連接配接,并且這可以實作并行,而且系統的可擴充性可以根據添加删除或者改變Filter進行增強。
在這舉一個不斷計算平均值的例子,producer作為前端的資料源,不斷産生随機數,通過pipe進入filter進行資料處理,然後通過第二個pipe就行後端處理。
import java.util.*;
import java.io.*;
public class PipeTest
{
public static void main(String args[]) {
try {
PipedOutputStream pout1 = new PipedOutputStream();
PipedInputStream pin1 = new PipedInputStream(pout1);
PipedOutputStream pout2 = new PipedOutputStream();
PipedInputStream pin2 = new PipedInputStream(pout2);
Producer prod = new Producer(pout1);
Filter filt = new Filter(pin1, pout2);
Consumer cons = new Consumer(pin2);
prod.start();
filt.start();
cons.start();
} catch (IOException e) {
}
}
}
// 前端:該類的作用是産生随機數,并将其放到管道1的輸出流中
class Producer extends Thread {
private DataOutputStream out;// DataOutputStream是用于寫入一些基本類型資料的類,此類的執行個體用于生成僞随機數流
private Random rand = new Random();
public Producer(OutputStream os) {
out = new DataOutputStream(os);
}
public void run() {
while (true) {
try {
double num = rand.nextDouble();
// 将double值直接寫入流
out.writeDouble(num);
System.out.println("寫入流中的值是 :" + num);
out.flush();
sleep(Math.abs(rand.nextInt()%10));//随機休眠一段時間
} catch (Exception e) {
System.out.println("Error: " + e);
}
}
}
}
// 過濾器,起資料處理作用,讀取管道1中輸入流的内容,并将其放到管道2的輸出流中
class Filter extends Thread {
private DataInputStream in;
private DataOutputStream out;
private double total = 0;
private int count = 0;
public Filter(InputStream is, OutputStream os) {
in = new DataInputStream(is);
out = new DataOutputStream(os);
}
public void run() {
for (;;) {
try {
double x = in.readDouble(); // 讀取流中的資料
total += x;
count++;
if (count != 0) {
double d = total / count;
out.writeDouble(d); // 将得到的資料平均值寫入流
}
} catch (IOException e) {
System.out.println("Error: " + e);
}
}
}
}
// 後端:讀取管道2輸入流的内容
class Consumer extends Thread {
private double old_avg = 0;
private DataInputStream in;
public Consumer(InputStream is) {
in = new DataInputStream(is);
}
public void run() {
for (;;) {
try {
double avg = in.readDouble();
if (Math.abs(avg - old_avg) > 0.01) {
System.out.println("現在的平均值是: " + avg);
System.out.println();
old_avg = avg;
}
} catch (IOException e) {
System.out.println("Error: " + e);
}
}
}
}
2)字元流
Java利用 java.io.PipedWriter和java.io.PipedReader線上程之間傳輸字元資訊。與 java.io.PipedOutputStream和java.io.PipedInputStream類似,java.io.PipedWriter 是java.io.Writer的直接子類,java.io.PipedReader是java.io.Reader的直接子類。PipedWriter擁有一個允許指定輸入管道字元流的構造方法,而PipedReader擁有一個允許指定輸出管道字元流的構造方法。進而使得PipedWriter和PipedReader往往成對出現、配合使用。
以典型KWIC系統為例,下邊的代碼示範了如何使用字元流并且使用了過濾器模式:ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.StringTokenizer;
public class KwicPipe {
public static void main(String[] args) {
try {
//get the input and output path
String src = args[0];
String dest = args[1];
//(writeToShiftThread => readFromShiftThread) = Pipe1
PipedReader readFromShiftThread = new PipedReader();
PipedWriter writeToShiftThread = new PipedWriter(readFromShiftThread);
//(writeToSortLinesThread => readFromSortLinesThread) = Pipe2
PipedReader readFromSortLinesThread = new PipedReader();
PipedWriter writeToSortLinesThread = new PipedWriter(readFromSortLinesThread);
//ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread
ReadLineThread R1 = new ReadLineThread(writeToShiftThread,src);
ShiftThread R2 = new ShiftThread(readFromShiftThread,writeToSortLinesThread);
SortLinesThread R3 = new SortLinesThread(readFromSortLinesThread,dest);
//Start the three processing thread
R1.start();
R2.start();
R3.start();
}
catch (IOException e) {
System.out.println("NO I/O");
}
}
}
// read the content of kwici.dat and send the lines to another thread
class ReadLineThread extends Thread {
PipedWriter PipeIn;
String InputFilename= null;
ReadLineThread(PipedWriter PlaceInPipe, String InputFilename) {
PipeIn = PlaceInPipe;
this.InputFilename = InputFilename;
}
private BufferedReader fileopen(String InputFilename) {
BufferedReader input_file = null;
try {
input_file = new BufferedReader(new FileReader(InputFilename));
} catch (IOException e) {
System.err.println(("File not open" + e.toString()));
System.exit(1);
}
return input_file;
}
public void run() {
try {
String Input;
BufferedReader TheInput = fileopen(InputFilename);
while ( (Input = TheInput.readLine()) != null) {
System.out.println(Input);
PipeIn.write(Input + "/n"); // Read from the file and then write to the pipe1
}
}
catch (FileNotFoundException e) {
System.out.println("NO FILE ");
}
catch (IOException e) {
System.out.println("NO I/O");
}
}
}
// read the lines from ReadLineThread and shift them. Send all the shifted lines to SortLinesThread
class ShiftThread extends Thread {
PipedReader PipeOut;
PipedWriter PipeIn;
ShiftThread(PipedReader ReadFromPipe, PipedWriter WriteToPipe) {
PipeOut = ReadFromPipe;
PipeIn = WriteToPipe;
}
public void run() {
char[] cbuf = new char[80];
int i, j;
StringBuffer linebuff = new StringBuffer();
try {
// read from ReadLineThread
i = PipeOut.read(cbuf, 0, 80);
while (i != -1) {
for (j = 0; j < i; j++) {
//if new line
if (cbuf[j]=='/n'){
// When reach the end of line,shift it
shiftline(linebuff.toString());
// empty the buffer
linebuff.delete(0, linebuff.length());
}
else {
linebuff.append(cbuf[j]);
}
}
i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
}
}
catch (FileNotFoundException e) {
System.out.println("NO FILE ");
}
catch (IOException e) {
System.out.println("NO I/O or end of stream (ShiftThread terminated)");
}
}
private void shiftline( String line )
{
String onetoken = new String ();
StringTokenizer tokens =
new StringTokenizer( line );
ArrayList Tokens = new ArrayList ();
int count = tokens.countTokens();
for ( int i = 0; i < count; i++)
{
onetoken = tokens.nextToken();
if (!((onetoken.compareTo( "a" ) == 0) && (onetoken.compareTo( "an" ) == 0) && (onetoken.compareTo( "and" ) == 0) && (onetoken.compareTo( "the" ) == 0)))
{
Tokens.add(onetoken);
}
}
for ( int tokencount = 0; tokencount < count; tokencount++ )
{
StringBuffer linebuffer = new StringBuffer ();
int index = tokencount;
for ( int i = 0; i< count; i++ )
{
if (index >= count)
index = 0;
linebuffer.append ( Tokens.get(index) );
linebuffer.append (" ");
index++;
} //for i
line = linebuffer.toString();
// send the line to the SortLinesThread
try {
PipeIn.write(line+ "/n");
} catch (IOException e) {
e.printStackTrace();
}
} // for token count
return;
}
}
class SortLinesThread extends Thread {
PipedReader PipeOut;
String OutputFilename;
ArrayList KwicList = new ArrayList();
SortLinesThread(PipedReader ReadFromPipe, String OutputFilename) {
PipeOut = ReadFromPipe;
this.OutputFilename = OutputFilename;
}
public void run() {
char[] cbuf = new char[80];
int i, j;
StringBuffer linebuff = new StringBuffer();
try {
// read from ShiftLineThread
i = PipeOut.read(cbuf, 0, 80);
while (i != -1) { // I don't know we're using that (The method Read blocks until at least one character of input is available.)
for (j = 0; j < i; j++) {
//if new line
if (cbuf[j]=='/n'){
// add it to the ArrayList
KwicList.add(linebuff.toString());
// adn empty the buffer
linebuff.delete(0, linebuff.length());
}
else {
//append the character to the line
linebuff.append(cbuf[j]);
}
}
i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
}
}
catch (FileNotFoundException e) {
System.out.println("NO FILE ");
}
catch (IOException e) {
System.out.println("NO I/O or end of stream (SortLinesThread terminated)");
}
// when the reading is finished, sort the ArrayList and diplay
Collections.sort(KwicList);//sort when added
displaylist ( KwicList );//Standard Output
//Export to file
try {
export(KwicList, OutputFilename);
} catch (Exception e) {
System.out.println("Error Output File ");
}
}
private void displaylist (ArrayList KwicList )
{
System.out.println ("/nList : " );
for ( int count = 0; count < KwicList.size(); count++ )
System.out.println (KwicList.get (count) );
}
private void export(ArrayList List, String oufFilename) throws Exception{
BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter(oufFilename));
} catch (FileNotFoundException e) {
System.err.println(("File not open" + e.toString()));
System.exit(1);
}
for (int count = 0; count < List.size(); count++) {
writer.write(List.get(count));
writer.write("/r/n");
}
writer.flush();
writer.close();
System.out.println("Processed Finished");
}
}