天天看点

Java并发(四)BlockingQueue的使用

    wait()和notifyall()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.blockingqueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用linkedblockingqueue,它是一个无届队列,你还可以使用arrayblockingqueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

    如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyall()相比,则要简单并可靠许多。

    考虑下面这个blockingqueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的blockingqueue来运行这个吐司制作程序:

<a href="http://my.oschina.net/itblog/blog/515735#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.linkedblockingqueue;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>class</code> <code>toast {</code>

<code>    </code><code>/**</code>

<code>     </code><code>* 吐司的状态:</code>

<code>     </code><code>* dry: 烘干的</code>

<code>     </code><code>* buttered: 涂了黄油的</code>

<code>     </code><code>* jammed: 涂了果酱的</code>

<code>     </code><code>* &lt;p&gt;吐司的状态只能由dry-&gt;buttered-&gt;jammed转变</code>

<code>     </code><code>*/</code>

<code>    </code><code>public</code> <code>enum</code> <code>status {dry, buttered, jammed}</code>

<code>    </code><code>private</code> <code>status status = status.dry;</code><code>//默认状态为dry</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>id;</code>

<code>    </code><code>public</code> <code>toast(</code><code>int</code> <code>id) { </code><code>this</code><code>.id =  id;}</code>

<code>    </code><code>public</code> <code>void</code> <code>butter() {status = status.buttered;}</code>

<code>    </code><code>public</code> <code>void</code> <code>jam() {status = status.jammed;}</code>

<code>    </code><code>public</code> <code>status getstatus() {</code><code>return</code> <code>status;}</code>

<code>    </code><code>public</code> <code>int</code> <code>getid() {</code><code>return</code> <code>id;}</code>

<code>    </code><code>public</code> <code>string tostring() {</code>

<code>        </code><code>return</code> <code>"toast id: "</code> <code>+ id + </code><code>", status: "</code> <code>+ status;</code>

<code>    </code><code>}</code>

<code>}</code>

<code>@suppresswarnings</code><code>(</code><code>"serial"</code><code>)</code>

<code>class</code> <code>toastqueue </code><code>extends</code> <code>linkedblockingqueue&lt;toast&gt; {}</code>

<code>/**</code>

<code> </code><code>* 生产吐司的任务。</code>

<code> </code><code>*/</code>

<code>class</code> <code>toaster </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>toastqueue toastqueue;</code>

<code>    </code><code>private</code> <code>int</code> <code>count = </code><code>0</code><code>;</code>

<code>    </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>    </code><code>public</code> <code>toaster(toastqueue queue) {</code>

<code>        </code><code>this</code><code>.toastqueue = queue;</code>

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>while</code><code>(!thread.interrupted()) {</code>

<code>                </code><code>timeunit.milliseconds.sleep(</code><code>300</code> <code>+ random.nextint(</code><code>500</code><code>));</code>

<code>                </code><code>//生产一片吐司,这些吐司是有序的</code>

<code>                </code><code>toast toast = </code><code>new</code> <code>toast(count++);</code>

<code>                </code><code>system.out.println(toast);</code>

<code>                </code><code>//放到toastqueue中</code>

<code>                </code><code>toastqueue.put(toast);</code>

<code>            </code><code>}</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>system.out.println(</code><code>"toaster interrupted."</code><code>);</code>

<code>        </code><code>}</code>

<code>        </code><code>system.out.println(</code><code>"toaster off."</code><code>);</code>

<code> </code><code>* 涂黄油的任务。</code>

<code>class</code> <code>butterer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>toastqueue dryqueue;</code>

<code>    </code><code>private</code> <code>toastqueue butteredqueue;</code>

<code>    </code><code>public</code> <code>butterer(toastqueue dryqueue, toastqueue butteredqueue) {</code>

<code>        </code><code>this</code><code>.dryqueue = dryqueue;</code>

<code>        </code><code>this</code><code>.butteredqueue = butteredqueue;</code>

<code>    </code> 

<code>                </code><code>//在取得下一个吐司之前会一直阻塞</code>

<code>                </code><code>toast toast = dryqueue.take();</code>

<code>                </code><code>toast.butter();</code>

<code>                </code><code>butteredqueue.put(toast);</code>

<code>            </code><code>system.out.println(</code><code>"butterer interrupted."</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"butterer off."</code><code>);</code>

<code>        </code> 

<code> </code><code>* 涂果酱的任务。</code>

<code>class</code> <code>jammer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>toastqueue finishedqueue;</code>

<code>    </code><code>public</code> <code>jammer(toastqueue butteredqueue, toastqueue finishedqueue) {</code>

<code>        </code><code>this</code><code>.finishedqueue = finishedqueue;</code>

<code>                </code><code>toast toast = butteredqueue.take();</code>

<code>                </code><code>toast.jam();</code>

<code>                </code><code>finishedqueue.put(toast);</code>

<code>            </code><code>system.out.println(</code><code>"jammer interrupted."</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"jammer off."</code><code>);</code>

<code> </code><code>* 吃吐司的人,消费者。</code>

<code>class</code> <code>eater </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>public</code> <code>eater (toastqueue finishedqueue) {</code>

<code>                </code><code>toast toast = finishedqueue.take();</code>

<code>                </code><code>//验证取得的吐司是有序的,而且状态是jammed的</code>

<code>                </code><code>if</code> <code>(toast.getid() != count++ || </code>

<code>                        </code><code>toast.getstatus() != toast.status.jammed) {</code>

<code>                    </code><code>system.out.println(</code><code>"error -&gt; "</code> <code>+ toast);</code>

<code>                    </code><code>system.exit(-</code><code>1</code><code>);</code>

<code>                </code><code>} </code><code>else</code> <code>{</code>

<code>                    </code><code>//吃掉吐司</code>

<code>                    </code><code>system.out.println(toast + </code><code>"-&gt;eaten"</code><code>);</code>

<code>                </code><code>}</code>

<code>            </code><code>system.out.println(</code><code>"eater interrupted."</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"eater off."</code><code>);</code>

<code>public</code> <code>class</code> <code>toastomatic {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>

<code>        </code><code>toastqueue dryqueue = </code><code>new</code> <code>toastqueue();</code>

<code>        </code><code>toastqueue butteredqueue = </code><code>new</code> <code>toastqueue();</code>

<code>        </code><code>toastqueue finishedqueue = </code><code>new</code> <code>toastqueue();</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>toaster(dryqueue));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>butterer(dryqueue, butteredqueue));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>jammer(butteredqueue, finishedqueue));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>eater(finishedqueue));</code>

<code>        </code><code>timeunit.seconds.sleep(</code><code>5</code><code>);</code>

<code>        </code><code>exec.shutdownnow();</code>

执行结果(可能的结果):

<code>toast id: </code><code>0</code><code>, status: dry</code>

<code>toast id: </code><code>0</code><code>, status: buttered</code>

<code>toast id: </code><code>0</code><code>, status: jammed</code>

<code>toast id: </code><code>0</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>1</code><code>, status: dry</code>

<code>toast id: </code><code>1</code><code>, status: buttered</code>

<code>toast id: </code><code>1</code><code>, status: jammed</code>

<code>toast id: </code><code>1</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>2</code><code>, status: dry</code>

<code>toast id: </code><code>2</code><code>, status: buttered</code>

<code>toast id: </code><code>2</code><code>, status: jammed</code>

<code>toast id: </code><code>2</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>3</code><code>, status: dry</code>

<code>toast id: </code><code>3</code><code>, status: buttered</code>

<code>toast id: </code><code>3</code><code>, status: jammed</code>

<code>toast id: </code><code>3</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>4</code><code>, status: dry</code>

<code>toast id: </code><code>4</code><code>, status: buttered</code>

<code>toast id: </code><code>4</code><code>, status: jammed</code>

<code>toast id: </code><code>4</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>5</code><code>, status: dry</code>

<code>toast id: </code><code>5</code><code>, status: buttered</code>

<code>toast id: </code><code>5</code><code>, status: jammed</code>

<code>toast id: </code><code>5</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>6</code><code>, status: dry</code>

<code>toast id: </code><code>6</code><code>, status: buttered</code>

<code>toast id: </code><code>6</code><code>, status: jammed</code>

<code>toast id: </code><code>6</code><code>, status: jammed-&gt;eaten</code>

<code>toast id: </code><code>7</code><code>, status: dry</code>

<code>toast id: </code><code>7</code><code>, status: buttered</code>

<code>toast id: </code><code>7</code><code>, status: jammed</code>

<code>toast id: </code><code>7</code><code>, status: jammed-&gt;eaten</code>

<code>eater interrupted.</code>

<code>eater off.</code>

<code>butterer interrupted.</code>

<code>toaster interrupted.</code>

<code>toaster off.</code>

<code>jammer interrupted.</code>

<code>jammer off.</code>

<code>butterer off.</code>

    toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由blockingqueue产生的简化十分明显。在使用显式的wait()和notifyall()时存在的类和类之间的耦合被消除了,因为每个类都只和它的blockingqueue通信。