天天看點

Java并發(五)任務間使用管道進行通信

    通過i/o線上程間進行通信通常很有用。提供線程功能的類庫以“管道”的形式對線程間的 i/o 提供了支援。它們在java i/o 類庫中的對應物就是pipedwriter(允許任務向管道寫)和pipedreader(允許不同的任務從同一個管道中讀取)。這個模型可以看做是“生産者-消費者”問題的變體,這裡的管道就是一個封裝好的解決方案。管道基本上是一個阻塞隊列, 存在于多個引入blockingqueue之前的java版本中。

    下面是一個簡單的例子,兩個任務使用一個管道進行通信:

<a href="http://my.oschina.net/itblog/blog/515822#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

<code>import</code> <code>java.io.ioexception;</code>

<code>import</code> <code>java.io.pipedreader;</code>

<code>import</code> <code>java.io.pipedwriter;</code>

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>/**</code>

<code> </code><code>* 發送端</code>

<code> </code><code>*/</code>

<code>class</code> <code>sender </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>random rand = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>    </code><code>private</code> <code>pipedwriter writer = </code><code>new</code> <code>pipedwriter();</code>

<code>    </code><code>public</code> <code>pipedwriter getwriter() { </code><code>return</code> <code>writer; }</code>

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>while</code><code>(</code><code>true</code><code>) {</code>

<code>                </code><code>for</code> <code>(</code><code>char</code> <code>c = </code><code>'a'</code><code>; c &lt; </code><code>'z'</code><code>; c++) {</code>

<code>                    </code><code>writer.write(c);</code>

<code>                    </code><code>timeunit.milliseconds.sleep(rand.nextint(</code><code>500</code><code>));</code>

<code>                </code><code>}</code>

<code>            </code><code>}</code>

<code>        </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>            </code><code>system.out.println(e + </code><code>" sender write exception"</code><code>);</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>system.out.println(e + </code><code>" sender sleep interrupted"</code><code>);</code>

<code>        </code><code>}</code>

<code>    </code><code>}</code>

<code>}</code>

<code> </code><code>* 接收端</code>

<code>class</code> <code>receiver </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>pipedreader reader;</code>

<code>    </code><code>public</code> <code>receiver(sender sender) </code><code>throws</code> <code>ioexception {</code>

<code>        </code><code>reader = </code><code>new</code> <code>pipedreader(sender.getwriter());</code>

<code>        </code><code>int</code> <code>count = </code><code>0</code><code>;</code>

<code>                </code><code>//在讀取到内容之前,會一直阻塞</code>

<code>                </code><code>char</code> <code>s = (</code><code>char</code><code>)reader.read();</code>

<code>                </code><code>system.out.print(</code><code>"read: "</code> <code>+ s + </code><code>", "</code><code>);</code>

<code>                </code><code>if</code> <code>(++count % </code><code>5</code> <code>== </code><code>0</code><code>) {</code>

<code>                    </code><code>system.out.println();</code>

<code>            </code><code>system.out.println(e + </code><code>" receiver read exception."</code><code>);</code>

<code>public</code> <code>class</code> <code>pipedio {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>

<code>        </code><code>sender sender = </code><code>new</code> <code>sender();</code>

<code>        </code><code>receiver receiver = </code><code>new</code> <code>receiver(sender);</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>exec.execute(sender);</code>

<code>        </code><code>exec.execute(receiver);</code>

<code>        </code><code>timeunit.seconds.sleep(</code><code>5</code><code>);</code>

<code>        </code><code>exec.shutdownnow();</code>

執行結果(可能的結果):

<code>read: a, read: b, read: c, read: d, read: e, </code>

<code>read: f, read: g, read: h, read: i, read: j, </code>

<code>read: k, read: l, read: m, read: n, read: o, </code>

<code>read: p, read: q, read: r, read: s, read: t, </code>

<code>read: u, java.io.interruptedioexception receiver read exception.</code>

<code>java.lang.interruptedexception: sleep interrupted sender sleep interrupted</code>

    sender和receiver代表了需要互相通信的兩個任務。sender建立了一個pipedwriter,它是一個單獨的對象;但是對于receiver,pipedreader的建立必須在構造器中與一個pipedwriter相關聯。就是說,pipedreader與pipedwriter的構造可以通過如下兩種方式:

<code>//方式一:先構造pipedreader,再通過它構造pipedwriter。</code>

<code>pipedreader reader = </code><code>new</code> <code>pipedreader();</code>

<code>pipedwriter writer = </code><code>new</code> <code>pipedwriter(reader);</code>

<code>//方式二:先構造pipedwriter,再通過它構造pipedreader。</code>

<code>pipedwriter writer2 = </code><code>new</code> <code>pipedwriter();</code>

<code>pipedreader reader2 = </code><code>new</code> <code>pipedreader(writer2);</code>

    sender把資料放進writer,然後休眠一段時間(随機數)。然而,receiver沒有sleep()和wait。但當它調用read()時,如果沒有更多的資料,管道将自動阻塞。

    注意sender和receiver是在main()中啟動的,即對象構造徹底完畢之後。如果你啟動了一個沒有構造完畢的對象,在不同的平台上管道可能會産生不一緻的行為(注意,blockingqueue使用起來更加健壯而容易)。

    在shutdownnow()被調用時,可以看到pipedreader與普通i/o之間最重要的差異——pipedreader是可以中斷的。如果你将reader.read()替換為system.in.read(),那麼interrupt()将不能打斷read()調用。