通过i/o在线程间进行通信通常很有用。提供线程功能的类库以“管道”的形式对线程间的 i/o 提供了支持。它们在java i/o 类库中的对应物就是pipedwriter(允许任务向管道写)和pipedreader(允许不同的任务从同一个管道中读取)。这个模型可以看做是“生产者-消费者”问题的变体,这里的管道就是一个封装好的解决方案。管道基本上是一个阻塞队列, 存在于多个引入blockingqueue之前的java版本中。
下面是一个简单的例子,两个任务使用一个管道进行通信:
<a href="http://my.oschina.net/itblog/blog/515822#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<code>import</code> <code>java.io.ioexception;</code>
<code>import</code> <code>java.io.pipedreader;</code>
<code>import</code> <code>java.io.pipedwriter;</code>
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.executorservice;</code>
<code>import</code> <code>java.util.concurrent.executors;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>/**</code>
<code> </code><code>* 发送端</code>
<code> </code><code>*/</code>
<code>class</code> <code>sender </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>random rand = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>
<code> </code><code>private</code> <code>pipedwriter writer = </code><code>new</code> <code>pipedwriter();</code>
<code> </code><code>public</code> <code>pipedwriter getwriter() { </code><code>return</code> <code>writer; }</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>while</code><code>(</code><code>true</code><code>) {</code>
<code> </code><code>for</code> <code>(</code><code>char</code> <code>c = </code><code>'a'</code><code>; c < </code><code>'z'</code><code>; c++) {</code>
<code> </code><code>writer.write(c);</code>
<code> </code><code>timeunit.milliseconds.sleep(rand.nextint(</code><code>500</code><code>));</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>
<code> </code><code>system.out.println(e + </code><code>" sender write exception"</code><code>);</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>system.out.println(e + </code><code>" sender sleep interrupted"</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code>}</code>
<code> </code><code>* 接收端</code>
<code>class</code> <code>receiver </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>pipedreader reader;</code>
<code> </code><code>public</code> <code>receiver(sender sender) </code><code>throws</code> <code>ioexception {</code>
<code> </code><code>reader = </code><code>new</code> <code>pipedreader(sender.getwriter());</code>
<code> </code><code>int</code> <code>count = </code><code>0</code><code>;</code>
<code> </code><code>//在读取到内容之前,会一直阻塞</code>
<code> </code><code>char</code> <code>s = (</code><code>char</code><code>)reader.read();</code>
<code> </code><code>system.out.print(</code><code>"read: "</code> <code>+ s + </code><code>", "</code><code>);</code>
<code> </code><code>if</code> <code>(++count % </code><code>5</code> <code>== </code><code>0</code><code>) {</code>
<code> </code><code>system.out.println();</code>
<code> </code><code>system.out.println(e + </code><code>" receiver read exception."</code><code>);</code>
<code>public</code> <code>class</code> <code>pipedio {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>
<code> </code><code>sender sender = </code><code>new</code> <code>sender();</code>
<code> </code><code>receiver receiver = </code><code>new</code> <code>receiver(sender);</code>
<code> </code><code>executorservice exec = executors.newcachedthreadpool();</code>
<code> </code><code>exec.execute(sender);</code>
<code> </code><code>exec.execute(receiver);</code>
<code> </code><code>timeunit.seconds.sleep(</code><code>5</code><code>);</code>
<code> </code><code>exec.shutdownnow();</code>
执行结果(可能的结果):
<code>read: a, read: b, read: c, read: d, read: e, </code>
<code>read: f, read: g, read: h, read: i, read: j, </code>
<code>read: k, read: l, read: m, read: n, read: o, </code>
<code>read: p, read: q, read: r, read: s, read: t, </code>
<code>read: u, java.io.interruptedioexception receiver read exception.</code>
<code>java.lang.interruptedexception: sleep interrupted sender sleep interrupted</code>
sender和receiver代表了需要互相通信的两个任务。sender创建了一个pipedwriter,它是一个单独的对象;但是对于receiver,pipedreader的建立必须在构造器中与一个pipedwriter相关联。就是说,pipedreader与pipedwriter的构造可以通过如下两种方式:
<code>//方式一:先构造pipedreader,再通过它构造pipedwriter。</code>
<code>pipedreader reader = </code><code>new</code> <code>pipedreader();</code>
<code>pipedwriter writer = </code><code>new</code> <code>pipedwriter(reader);</code>
<code>//方式二:先构造pipedwriter,再通过它构造pipedreader。</code>
<code>pipedwriter writer2 = </code><code>new</code> <code>pipedwriter();</code>
<code>pipedreader reader2 = </code><code>new</code> <code>pipedreader(writer2);</code>
sender把数据放进writer,然后休眠一段时间(随机数)。然而,receiver没有sleep()和wait。但当它调用read()时,如果没有更多的数据,管道将自动阻塞。
注意sender和receiver是在main()中启动的,即对象构造彻底完毕之后。如果你启动了一个没有构造完毕的对象,在不同的平台上管道可能会产生不一致的行为(注意,blockingqueue使用起来更加健壮而容易)。
在shutdownnow()被调用时,可以看到pipedreader与普通i/o之间最重要的差异——pipedreader是可以中断的。如果你将reader.read()替换为system.in.read(),那么interrupt()将不能打断read()调用。