天天看點

管道流 java_【Java學習筆記】管道流

1.引言

Java I/O系統是建立在資料流概念之上的,而在UNIX/Linux中有一個類似的概念,就是管道,它具有将一個程式的輸出當作另一個程式的輸入的能力。在Java中,可以使用管道流進行線程之間的通信,輸入流和輸出流必須相連接配接,這樣的通信有别于一般的Shared Data通信,其不需要一個共享的資料空間。

2.相關類及其關系

1)位元組流:

分為管道輸出流(PipedOutputStream)和管道輸入流(PipedInputStream),利用 java.io.PipedOutputStream和java.io.PipedInputStream可以實作線程之間的二進制資訊傳輸。如果要進行管道輸出,則必須把輸出流連在輸入流上。 java.io.PipedOutputStream是java.io.OutputStream的直接子類,而java.io. PipedInputStream是java.io.InputStream的直接子類。PipedOutputStream和 PipedInputStream往往成對出現、配合使用。舉例說明:

TestPipe.Java

import java.io.IOException;

public class TestPipe {

public static void main(String[] args) {

Send s = new Send();

Receive r = new Receive();

try {

s.getPos().connect(r.getPis()); // 連接配接管道

} catch (IOException e) {

e.printStackTrace();

}

new Thread(s).start(); // 啟動線程

new Thread(r).start(); // 啟動線程

}

}

Receive.java

import java.io.IOException;

import java.io.PipedInputStream;

class Receive implements Runnable { // 實作Runnable接口

private PipedInputStream pis = null;

public Receive() {

this.pis = new PipedInputStream(); // 執行個體化輸入流

}

public void run() {

byte b[] = new byte[1024];

int len = 0;

try {

len = this.pis.read(b); // 接收資料

} catch (IOException e) {

e.printStackTrace();

}

try {

this.pis.close();

} catch (IOException e) {

e.printStackTrace();

}

System.out.println("接收的内容為:" + new String(b, 0, len));

}

public PipedInputStream getPis() {

return pis;

}

}

Send.java

import java.io.IOException;

import java.io.PipedOutputStream;

class Send implements Runnable {

// 實作Runnable接口

private PipedOutputStream pos = null; // 管道輸出流

public Send() {

this.pos = new PipedOutputStream();// 執行個體化輸出流

}

public void run() {

String str = "Hello World!!!";

try {

this.pos.write(str.getBytes()); // 輸出資訊

} catch (IOException e) {

e.printStackTrace();

}

try {

this.pos.close(); // 關閉輸出流

} catch (IOException e) {

e.printStackTrace();

}

}

public PipedOutputStream getPos() { // 通過線程類得到輸出流

return pos;

}

}

我們可以看到使用管道流,通過connect方法進行連接配接,實作了Send線程和Receive線程之間的通信。

注意:

PipedInputStream中實際是用了一個1024位元組固定大小的循環緩沖區。寫入PipedOutputStream的資料實際上儲存到對應的 PipedInputStream的内部緩沖區。從PipedInputStream執行讀操作時,讀取的資料實際上來自這個内部緩沖區。如果對應的 PipedInputStream輸入緩沖區已滿,任何企圖寫入PipedOutputStream的線程都将被阻塞。而且這個寫操作線程将一直阻塞,直至出現讀取PipedInputStream的操作從緩沖區删除資料。這也就是說往PipedOutputStream寫資料的線程Send若是和從PipedInputStream讀資料的線程Receive是同一個線程的話,那麼一旦Send線程發送資料過多(大于1024位元組),它就會被阻塞,這就直接導緻接受資料的線程阻塞而無法工作(因為是同一個線程嘛),那麼這就是一個典型的死鎖現象,這也就是為什麼javadoc中關于這兩個類的使用時告訴大家要在多線程環境下使用的原因了。

管道流 java_【Java學習筆記】管道流

應用:過濾器模式

管道流 java_【Java學習筆記】管道流

使用這個模式的典型例子是Unix的shell指令。這個模式的好處在于過濾器無需知道它與何種東西進行連接配接,并且這可以實作并行,而且系統的可擴充性可以根據添加删除或者改變Filter進行增強。

在這舉一個不斷計算平均值的例子,producer作為前端的資料源,不斷産生随機數,通過pipe進入filter進行資料處理,然後通過第二個pipe就行後端處理。

import java.util.*;

import java.io.*;

public class PipeTest

{

public static void main(String args[]) {

try {

PipedOutputStream pout1 = new PipedOutputStream();

PipedInputStream pin1 = new PipedInputStream(pout1);

PipedOutputStream pout2 = new PipedOutputStream();

PipedInputStream pin2 = new PipedInputStream(pout2);

Producer prod = new Producer(pout1);

Filter filt = new Filter(pin1, pout2);

Consumer cons = new Consumer(pin2);

prod.start();

filt.start();

cons.start();

} catch (IOException e) {

}

}

}

// 前端:該類的作用是産生随機數,并将其放到管道1的輸出流中

class Producer extends Thread {

private DataOutputStream out;// DataOutputStream是用于寫入一些基本類型資料的類,此類的執行個體用于生成僞随機數流

private Random rand = new Random();

public Producer(OutputStream os) {

out = new DataOutputStream(os);

}

public void run() {

while (true) {

try {

double num = rand.nextDouble();

// 将double值直接寫入流

out.writeDouble(num);

System.out.println("寫入流中的值是 :" + num);

out.flush();

sleep(Math.abs(rand.nextInt()%10));//随機休眠一段時間

} catch (Exception e) {

System.out.println("Error:   " + e);

}

}

}

}

// 過濾器,起資料處理作用,讀取管道1中輸入流的内容,并将其放到管道2的輸出流中

class Filter extends Thread {

private DataInputStream in;

private DataOutputStream out;

private double total = 0;

private int count = 0;

public Filter(InputStream is, OutputStream os) {

in = new DataInputStream(is);

out = new DataOutputStream(os);

}

public void run() {

for (;;) {

try {

double x = in.readDouble(); // 讀取流中的資料

total += x;

count++;

if (count != 0) {

double d = total / count;

out.writeDouble(d); // 将得到的資料平均值寫入流

}

} catch (IOException e) {

System.out.println("Error:   " + e);

}

}

}

}

// 後端:讀取管道2輸入流的内容

class Consumer extends Thread {

private double old_avg = 0;

private DataInputStream in;

public Consumer(InputStream is) {

in = new DataInputStream(is);

}

public void run() {

for (;;) {

try {

double avg = in.readDouble();

if (Math.abs(avg - old_avg) > 0.01) {

System.out.println("現在的平均值是:   " + avg);

System.out.println();

old_avg = avg;

}

} catch (IOException e) {

System.out.println("Error:   " + e);

}

}

}

}

2)字元流

Java利用 java.io.PipedWriter和java.io.PipedReader線上程之間傳輸字元資訊。與 java.io.PipedOutputStream和java.io.PipedInputStream類似,java.io.PipedWriter 是java.io.Writer的直接子類,java.io.PipedReader是java.io.Reader的直接子類。PipedWriter擁有一個允許指定輸入管道字元流的構造方法,而PipedReader擁有一個允許指定輸出管道字元流的構造方法。進而使得PipedWriter和PipedReader往往成對出現、配合使用。

以典型KWIC系統為例,下邊的代碼示範了如何使用字元流并且使用了過濾器模式:ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.io.FileWriter;

import java.io.IOException;

import java.io.PipedReader;

import java.io.PipedWriter;

import java.util.ArrayList;

import java.util.Collections;

import java.util.StringTokenizer;

public class KwicPipe {

public static void main(String[] args) {

try {

//get the input and output path

String src = args[0];

String dest = args[1];

//(writeToShiftThread => readFromShiftThread) = Pipe1

PipedReader readFromShiftThread = new PipedReader();

PipedWriter writeToShiftThread = new PipedWriter(readFromShiftThread);

//(writeToSortLinesThread => readFromSortLinesThread) = Pipe2

PipedReader readFromSortLinesThread = new PipedReader();

PipedWriter writeToSortLinesThread = new PipedWriter(readFromSortLinesThread);

//ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread

ReadLineThread R1 = new ReadLineThread(writeToShiftThread,src);

ShiftThread R2 = new ShiftThread(readFromShiftThread,writeToSortLinesThread);

SortLinesThread R3 = new SortLinesThread(readFromSortLinesThread,dest);

//Start the three processing thread

R1.start();

R2.start();

R3.start();

}

catch (IOException e) {

System.out.println("NO I/O");

}

}

}

// read the content of kwici.dat and send the lines to another thread

class ReadLineThread extends Thread {

PipedWriter PipeIn;

String InputFilename= null;

ReadLineThread(PipedWriter PlaceInPipe, String InputFilename) {

PipeIn = PlaceInPipe;

this.InputFilename = InputFilename;

}

private BufferedReader fileopen(String InputFilename) {

BufferedReader input_file = null;

try {

input_file = new BufferedReader(new FileReader(InputFilename));

} catch (IOException e) {

System.err.println(("File not open" + e.toString()));

System.exit(1);

}

return input_file;

}

public void run() {

try {

String Input;

BufferedReader TheInput = fileopen(InputFilename);

while ( (Input = TheInput.readLine()) != null) {

System.out.println(Input);

PipeIn.write(Input + "/n"); // Read from the file and then write to the pipe1

}

}

catch (FileNotFoundException e) {

System.out.println("NO FILE ");

}

catch (IOException e) {

System.out.println("NO I/O");

}

}

}

// read the lines from ReadLineThread and shift them. Send all the shifted lines to SortLinesThread

class ShiftThread extends Thread {

PipedReader PipeOut;

PipedWriter PipeIn;

ShiftThread(PipedReader ReadFromPipe, PipedWriter WriteToPipe) {

PipeOut = ReadFromPipe;

PipeIn = WriteToPipe;

}

public void run() {

char[] cbuf = new char[80];

int i, j;

StringBuffer linebuff = new StringBuffer();

try {

// read from ReadLineThread

i = PipeOut.read(cbuf, 0, 80);

while (i != -1) {

for (j = 0; j < i; j++) {

//if new line

if (cbuf[j]=='/n'){

// When reach the end of line,shift it

shiftline(linebuff.toString());

// empty the buffer

linebuff.delete(0, linebuff.length());

}

else {

linebuff.append(cbuf[j]);

}

}

i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth

}

}

catch (FileNotFoundException e) {

System.out.println("NO FILE ");

}

catch (IOException e) {

System.out.println("NO I/O or end of stream (ShiftThread terminated)");

}

}

private void shiftline( String line )

{

String onetoken = new String ();

StringTokenizer tokens =

new StringTokenizer( line );

ArrayList Tokens = new ArrayList ();

int count = tokens.countTokens();

for ( int i = 0; i < count; i++)

{

onetoken = tokens.nextToken();

if (!((onetoken.compareTo( "a" ) == 0) && (onetoken.compareTo( "an" ) == 0) && (onetoken.compareTo( "and" ) == 0) && (onetoken.compareTo( "the" ) == 0)))

{

Tokens.add(onetoken);

}

}

for ( int tokencount = 0; tokencount < count; tokencount++ )

{

StringBuffer linebuffer = new StringBuffer ();

int index = tokencount;

for ( int i = 0; i< count; i++ )

{

if (index >= count)

index = 0;

linebuffer.append ( Tokens.get(index)  );

linebuffer.append (" ");

index++;

}  //for i

line = linebuffer.toString();

// send the line to the SortLinesThread

try {

PipeIn.write(line+ "/n");

} catch (IOException e) {

e.printStackTrace();

}

}  // for token count

return;

}

}

class SortLinesThread extends Thread {

PipedReader PipeOut;

String OutputFilename;

ArrayList  KwicList = new ArrayList();

SortLinesThread(PipedReader ReadFromPipe, String OutputFilename) {

PipeOut = ReadFromPipe;

this.OutputFilename = OutputFilename;

}

public void run() {

char[] cbuf = new char[80];

int i, j;

StringBuffer linebuff = new StringBuffer();

try {

// read from ShiftLineThread

i = PipeOut.read(cbuf, 0, 80);

while (i != -1) { // I don't know we're using that (The method Read blocks until at least one character of input is available.)

for (j = 0; j < i; j++) {

//if new line

if (cbuf[j]=='/n'){

// add it to the ArrayList

KwicList.add(linebuff.toString());

// adn empty the buffer

linebuff.delete(0, linebuff.length());

}

else {

//append the character to the line

linebuff.append(cbuf[j]);

}

}

i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth

}

}

catch (FileNotFoundException e) {

System.out.println("NO FILE ");

}

catch (IOException e) {

System.out.println("NO I/O or end of stream (SortLinesThread terminated)");

}

// when the reading is finished, sort the ArrayList and diplay

Collections.sort(KwicList);//sort when added

displaylist ( KwicList );//Standard Output

//Export to file

try {

export(KwicList, OutputFilename);

} catch (Exception e) {

System.out.println("Error Output File ");

}

}

private void displaylist (ArrayList KwicList )

{

System.out.println ("/nList : " );

for ( int count = 0; count < KwicList.size(); count++ )

System.out.println (KwicList.get (count) );

}

private void export(ArrayList List, String oufFilename) throws Exception{

BufferedWriter writer = null;

try {

writer = new BufferedWriter(new FileWriter(oufFilename));

} catch (FileNotFoundException e) {

System.err.println(("File not open" + e.toString()));

System.exit(1);

}

for (int count = 0; count < List.size(); count++) {

writer.write(List.get(count));

writer.write("/r/n");

}

writer.flush();

writer.close();

System.out.println("Processed Finished");

}

}