天天看點

Java并發(四)BlockingQueue的使用

    wait()和notifyall()方法以一種非常低級的方式解決了任務互操作的問題,即每次互動時都需要握手。在許多情況下,你可以瞄準更高的抽象級别,使用同步隊列來解決任務協作的問題。同步隊列在任何時刻都隻允許一個任務插入或移除元素。在java.util.concurrent.blockingqueue接口中提供了這個隊列,這個接口有大量的标準實作。你通常可以使用linkedblockingqueue,它是一個無屆隊列,你還可以使用arrayblockingqueue,它具有固定的尺寸,是以你可以在它被阻塞之前,向其中放置有限數量的元素。

    如果消費者任務試圖從隊列中擷取對象,而該隊列此時為空,那麼這些隊列還可以挂起消費者任務,并且當有更多的元素可用時回複消費者任務。阻塞隊列可以解決非常大的問題,而其方式與wait()和notifyall()相比,則要簡單并可靠許多。

    考慮下面這個blockingqueue的示例,有一台機器具有三個任務:一個制作吐司,一個給吐司抹黃油,還有一個給吐司塗果醬。我們可以通過各個處理過程之間的blockingqueue來運作這個吐司制作程式:

<a href="http://my.oschina.net/itblog/blog/515735#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.linkedblockingqueue;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>class</code> <code>toast {</code>

<code>    </code><code>/**</code>

<code>     </code><code>* 吐司的狀态:</code>

<code>     </code><code>* dry: 烘幹的</code>

<code>     </code><code>* buttered: 塗了黃油的</code>

<code>     </code><code>* jammed: 塗了果醬的</code>

<code>     </code><code>* &lt;p&gt;吐司的狀态隻能由dry-&gt;buttered-&gt;jammed轉變</code>

<code>     </code><code>*/</code>

<code>    </code><code>public</code> <code>enum</code> <code>status {dry, buttered, jammed}</code>

<code>    </code><code>private</code> <code>status status = status.dry;</code><code>//預設狀态為dry</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>id;</code>

<code>    </code><code>public</code> <code>toast(</code><code>int</code> <code>id) { </code><code>this</code><code>.id =  id;}</code>

<code>    </code><code>public</code> <code>void</code> <code>butter() {status = status.buttered;}</code>

<code>    </code><code>public</code> <code>void</code> <code>jam() {status = status.jammed;}</code>

<code>    </code><code>public</code> <code>status getstatus() {</code><code>return</code> <code>status;}</code>

<code>    </code><code>public</code> <code>int</code> <code>getid() {</code><code>return</code> <code>id;}</code>

<code>    </code><code>public</code> <code>string tostring() {</code>

<code>        </code><code>return</code> <code>"toast id: "</code> <code>+ id + </code><code>", status: "</code> <code>+ status;</code>

<code>    </code><code>}</code>

<code>}</code>

<code>@suppresswarnings</code><code>(</code><code>"serial"</code><code>)</code>

<code>class</code> <code>toastqueue </code><code>extends</code> <code>linkedblockingqueue&lt;toast&gt; {}</code>

<code>/**</code>

<code> </code><code>* 生産吐司的任務。</code>

<code> </code><code>*/</code>

<code>class</code> <code>toaster </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>toastqueue toastqueue;</code>

<code>    </code><code>private</code> <code>int</code> <code>count = </code><code>0</code><code>;</code>

<code>    </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>    </code><code>public</code> <code>toaster(toastqueue queue) {</code>

<code>        </code><code>this</code><code>.toastqueue = queue;</code>

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>while</code><code>(!thread.interrupted()) {</code>

<code>                </code><code>timeunit.milliseconds.sleep(</code><code>300</code> <code>+ random.nextint(</code><code>500</code><code>));</code>

<code>                </code><code>//生産一片吐司,這些吐司是有序的</code>

<code>                </code><code>toast toast = </code><code>new</code> <code>toast(count++);</code>

<code>                </code><code>system.out.println(toast);</code>

<code>                </code><code>//放到toastqueue中</code>

<code>                </code><code>toastqueue.put(toast);</code>

<code>            </code><code>}</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>system.out.println(</code><code>"toaster interrupted."</code><code>);</code>

<code>        </code><code>}</code>

<code>        </code><code>system.out.println(</code><code>"toaster off."</code><code>);</code>

<code> </code><code>* 塗黃油的任務。</code>

<code>class</code> <code>butterer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>toastqueue dryqueue;</code>

<code>    </code><code>private</code> <code>toastqueue butteredqueue;</code>

<code>    </code><code>public</code> <code>butterer(toastqueue dryqueue, toastqueue butteredqueue) {</code>

<code>        </code><code>this</code><code>.dryqueue = dryqueue;</code>

<code>        </code><code>this</code><code>.butteredqueue = butteredqueue;</code>

<code>    </code> 

<code>                </code><code>//在取得下一個吐司之前會一直阻塞</code>

<code>                </code><code>toast toast = dryqueue.take();</code>

<code>                </code><code>toast.butter();</code>

<code>                </code><code>butteredqueue.put(toast);</code>

<code>            </code><code>system.out.println(</code><code>"butterer interrupted."</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"butterer off."</code><code>);</code>

<code>        </code> 

<code> </code><code>* 塗果醬的任務。</code>

<code>class</code> <code>jammer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>toastqueue finishedqueue;</code>

<code>    </code><code>public</code> <code>jammer(toastqueue butteredqueue, toastqueue finishedqueue) {</code>

<code>        </code><code>this</code><code>.finishedqueue = finishedqueue;</code>

<code>                </code><code>toast toast = butteredqueue.take();</code>

<code>                </code><code>toast.jam();</code>

<code>                </code><code>finishedqueue.put(toast);</code>

<code>            </code><code>system.out.println(</code><code>"jammer interrupted."</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"jammer off."</code><code>);</code>

<code> </code><code>* 吃吐司的人,消費者。</code>

<code>class</code> <code>eater </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>public</code> <code>eater (toastqueue finishedqueue) {</code>

<code>                </code><code>toast toast = finishedqueue.take();</code>

<code>                </code><code>//驗證取得的吐司是有序的,而且狀态是jammed的</code>

<code>                </code><code>if</code> <code>(toast.getid() != count++ || </code>

<code>                        </code><code>toast.getstatus() != toast.status.jammed) {</code>

<code>                    </code><code>system.out.println(</code><code>"error -&gt; "</code> <code>+ toast);</code>

<code>                    </code><code>system.exit(-</code><code>1</code><code>);</code>

<code>                </code><code>} </code><code>else</code> <code>{</code>

<code>                    </code><code>//吃掉吐司</code>

<code>                    </code><code>system.out.println(toast + </code><code>"-&gt;eaten"</code><code>);</code>

<code>                </code><code>}</code>

<code>            </code><code>system.out.println(</code><code>"eater interrupted."</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"eater off."</code><code>);</code>

<code>public</code> <code>class</code> <code>toastomatic {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>

<code>        </code><code>toastqueue dryqueue = </code><code>new</code> <code>toastqueue();</code>

<code>        </code><code>toastqueue butteredqueue = </code><code>new</code> <code>toastqueue();</code>

<code>        </code><code>toastqueue finishedqueue = </code><code>new</code> <code>toastqueue();</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>toaster(dryqueue));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>butterer(dryqueue, butteredqueue));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>jammer(butteredqueue, finishedqueue));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>eater(finishedqueue));</code>

<code>        </code><code>timeunit.seconds.sleep(</code><code>5</code><code>);</code>

<code>        </code><code>exec.shutdownnow();</code>

執行結果(可能的結果):

<code>toast id: </code><code>0</code><code>, status: dry</code>

<code>toast id: </code><code>0</code><code>, status: buttered</code>

<code>toast id: </code><code>0</code><code>, status: jammed</code>

<code>toast id: </code><code>0</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>1</code><code>, status: dry</code>

<code>toast id: </code><code>1</code><code>, status: buttered</code>

<code>toast id: </code><code>1</code><code>, status: jammed</code>

<code>toast id: </code><code>1</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>2</code><code>, status: dry</code>

<code>toast id: </code><code>2</code><code>, status: buttered</code>

<code>toast id: </code><code>2</code><code>, status: jammed</code>

<code>toast id: </code><code>2</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>3</code><code>, status: dry</code>

<code>toast id: </code><code>3</code><code>, status: buttered</code>

<code>toast id: </code><code>3</code><code>, status: jammed</code>

<code>toast id: </code><code>3</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>4</code><code>, status: dry</code>

<code>toast id: </code><code>4</code><code>, status: buttered</code>

<code>toast id: </code><code>4</code><code>, status: jammed</code>

<code>toast id: </code><code>4</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>5</code><code>, status: dry</code>

<code>toast id: </code><code>5</code><code>, status: buttered</code>

<code>toast id: </code><code>5</code><code>, status: jammed</code>

<code>toast id: </code><code>5</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>6</code><code>, status: dry</code>

<code>toast id: </code><code>6</code><code>, status: buttered</code>

<code>toast id: </code><code>6</code><code>, status: jammed</code>

<code>toast id: </code><code>6</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>7</code><code>, status: dry</code>

<code>toast id: </code><code>7</code><code>, status: buttered</code>

<code>toast id: </code><code>7</code><code>, status: jammed</code>

<code>toast id: </code><code>7</code><code>, status: jammed-&gt;eaten</code>

<code>eater interrupted.</code>

<code>eater off.</code>

<code>butterer interrupted.</code>

<code>toaster interrupted.</code>

<code>toaster off.</code>

<code>jammer interrupted.</code>

<code>jammer off.</code>

<code>butterer off.</code>

    toast是一個使用enum值的優秀示例。注意,這個示例中沒有任何顯式的同步(即使用lock對象或者synchronized關鍵字的同步),因為同步已經由隊列和系統的設計隐式的管理了——每片toast在任何時刻都隻由一個任務在操作。因為隊列的阻塞,使得處理過程将被自動的挂起和恢複。你可以看到由blockingqueue産生的簡化十分明顯。在使用顯式的wait()和notifyall()時存在的類和類之間的耦合被消除了,因為每個類都隻和它的blockingqueue通信。