通過i/o線上程間進行通信通常很有用。提供線程功能的類庫以“管道”的形式對線程間的 i/o 提供了支援。它們在java i/o 類庫中的對應物就是pipedwriter(允許任務向管道寫)和pipedreader(允許不同的任務從同一個管道中讀取)。這個模型可以看做是“生産者-消費者”問題的變體,這裡的管道就是一個封裝好的解決方案。管道基本上是一個阻塞隊列, 存在于多個引入blockingqueue之前的java版本中。
下面是一個簡單的例子,兩個任務使用一個管道進行通信:
<a href="http://my.oschina.net/itblog/blog/515822#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<code>import</code> <code>java.io.ioexception;</code>
<code>import</code> <code>java.io.pipedreader;</code>
<code>import</code> <code>java.io.pipedwriter;</code>
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.executorservice;</code>
<code>import</code> <code>java.util.concurrent.executors;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>/**</code>
<code> </code><code>* 發送端</code>
<code> </code><code>*/</code>
<code>class</code> <code>sender </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>random rand = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>
<code> </code><code>private</code> <code>pipedwriter writer = </code><code>new</code> <code>pipedwriter();</code>
<code> </code><code>public</code> <code>pipedwriter getwriter() { </code><code>return</code> <code>writer; }</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>while</code><code>(</code><code>true</code><code>) {</code>
<code> </code><code>for</code> <code>(</code><code>char</code> <code>c = </code><code>'a'</code><code>; c < </code><code>'z'</code><code>; c++) {</code>
<code> </code><code>writer.write(c);</code>
<code> </code><code>timeunit.milliseconds.sleep(rand.nextint(</code><code>500</code><code>));</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>
<code> </code><code>system.out.println(e + </code><code>" sender write exception"</code><code>);</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>system.out.println(e + </code><code>" sender sleep interrupted"</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code>}</code>
<code> </code><code>* 接收端</code>
<code>class</code> <code>receiver </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>pipedreader reader;</code>
<code> </code><code>public</code> <code>receiver(sender sender) </code><code>throws</code> <code>ioexception {</code>
<code> </code><code>reader = </code><code>new</code> <code>pipedreader(sender.getwriter());</code>
<code> </code><code>int</code> <code>count = </code><code>0</code><code>;</code>
<code> </code><code>//在讀取到内容之前,會一直阻塞</code>
<code> </code><code>char</code> <code>s = (</code><code>char</code><code>)reader.read();</code>
<code> </code><code>system.out.print(</code><code>"read: "</code> <code>+ s + </code><code>", "</code><code>);</code>
<code> </code><code>if</code> <code>(++count % </code><code>5</code> <code>== </code><code>0</code><code>) {</code>
<code> </code><code>system.out.println();</code>
<code> </code><code>system.out.println(e + </code><code>" receiver read exception."</code><code>);</code>
<code>public</code> <code>class</code> <code>pipedio {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>
<code> </code><code>sender sender = </code><code>new</code> <code>sender();</code>
<code> </code><code>receiver receiver = </code><code>new</code> <code>receiver(sender);</code>
<code> </code><code>executorservice exec = executors.newcachedthreadpool();</code>
<code> </code><code>exec.execute(sender);</code>
<code> </code><code>exec.execute(receiver);</code>
<code> </code><code>timeunit.seconds.sleep(</code><code>5</code><code>);</code>
<code> </code><code>exec.shutdownnow();</code>
執行結果(可能的結果):
<code>read: a, read: b, read: c, read: d, read: e, </code>
<code>read: f, read: g, read: h, read: i, read: j, </code>
<code>read: k, read: l, read: m, read: n, read: o, </code>
<code>read: p, read: q, read: r, read: s, read: t, </code>
<code>read: u, java.io.interruptedioexception receiver read exception.</code>
<code>java.lang.interruptedexception: sleep interrupted sender sleep interrupted</code>
sender和receiver代表了需要互相通信的兩個任務。sender建立了一個pipedwriter,它是一個單獨的對象;但是對于receiver,pipedreader的建立必須在構造器中與一個pipedwriter相關聯。就是說,pipedreader與pipedwriter的構造可以通過如下兩種方式:
<code>//方式一:先構造pipedreader,再通過它構造pipedwriter。</code>
<code>pipedreader reader = </code><code>new</code> <code>pipedreader();</code>
<code>pipedwriter writer = </code><code>new</code> <code>pipedwriter(reader);</code>
<code>//方式二:先構造pipedwriter,再通過它構造pipedreader。</code>
<code>pipedwriter writer2 = </code><code>new</code> <code>pipedwriter();</code>
<code>pipedreader reader2 = </code><code>new</code> <code>pipedreader(writer2);</code>
sender把資料放進writer,然後休眠一段時間(随機數)。然而,receiver沒有sleep()和wait。但當它調用read()時,如果沒有更多的資料,管道将自動阻塞。
注意sender和receiver是在main()中啟動的,即對象構造徹底完畢之後。如果你啟動了一個沒有構造完畢的對象,在不同的平台上管道可能會産生不一緻的行為(注意,blockingqueue使用起來更加健壯而容易)。
在shutdownnow()被調用時,可以看到pipedreader與普通i/o之間最重要的差異——pipedreader是可以中斷的。如果你将reader.read()替換為system.in.read(),那麼interrupt()将不能打斷read()調用。