wait()和notifyall()方法以一種非常低級的方式解決了任務互操作的問題,即每次互動時都需要握手。在許多情況下,你可以瞄準更高的抽象級别,使用同步隊列來解決任務協作的問題。同步隊列在任何時刻都隻允許一個任務插入或移除元素。在java.util.concurrent.blockingqueue接口中提供了這個隊列,這個接口有大量的标準實作。你通常可以使用linkedblockingqueue,它是一個無屆隊列,你還可以使用arrayblockingqueue,它具有固定的尺寸,是以你可以在它被阻塞之前,向其中放置有限數量的元素。
如果消費者任務試圖從隊列中擷取對象,而該隊列此時為空,那麼這些隊列還可以挂起消費者任務,并且當有更多的元素可用時回複消費者任務。阻塞隊列可以解決非常大的問題,而其方式與wait()和notifyall()相比,則要簡單并可靠許多。
考慮下面這個blockingqueue的示例,有一台機器具有三個任務:一個制作吐司,一個給吐司抹黃油,還有一個給吐司塗果醬。我們可以通過各個處理過程之間的blockingqueue來運作這個吐司制作程式:
<a href="http://my.oschina.net/itblog/blog/515735#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.executorservice;</code>
<code>import</code> <code>java.util.concurrent.executors;</code>
<code>import</code> <code>java.util.concurrent.linkedblockingqueue;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>class</code> <code>toast {</code>
<code> </code><code>/**</code>
<code> </code><code>* 吐司的狀态:</code>
<code> </code><code>* dry: 烘幹的</code>
<code> </code><code>* buttered: 塗了黃油的</code>
<code> </code><code>* jammed: 塗了果醬的</code>
<code> </code><code>* <p>吐司的狀态隻能由dry->buttered->jammed轉變</code>
<code> </code><code>*/</code>
<code> </code><code>public</code> <code>enum</code> <code>status {dry, buttered, jammed}</code>
<code> </code><code>private</code> <code>status status = status.dry;</code><code>//預設狀态為dry</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>id;</code>
<code> </code><code>public</code> <code>toast(</code><code>int</code> <code>id) { </code><code>this</code><code>.id = id;}</code>
<code> </code><code>public</code> <code>void</code> <code>butter() {status = status.buttered;}</code>
<code> </code><code>public</code> <code>void</code> <code>jam() {status = status.jammed;}</code>
<code> </code><code>public</code> <code>status getstatus() {</code><code>return</code> <code>status;}</code>
<code> </code><code>public</code> <code>int</code> <code>getid() {</code><code>return</code> <code>id;}</code>
<code> </code><code>public</code> <code>string tostring() {</code>
<code> </code><code>return</code> <code>"toast id: "</code> <code>+ id + </code><code>", status: "</code> <code>+ status;</code>
<code> </code><code>}</code>
<code>}</code>
<code>@suppresswarnings</code><code>(</code><code>"serial"</code><code>)</code>
<code>class</code> <code>toastqueue </code><code>extends</code> <code>linkedblockingqueue<toast> {}</code>
<code>/**</code>
<code> </code><code>* 生産吐司的任務。</code>
<code> </code><code>*/</code>
<code>class</code> <code>toaster </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>toastqueue toastqueue;</code>
<code> </code><code>private</code> <code>int</code> <code>count = </code><code>0</code><code>;</code>
<code> </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>
<code> </code><code>public</code> <code>toaster(toastqueue queue) {</code>
<code> </code><code>this</code><code>.toastqueue = queue;</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>while</code><code>(!thread.interrupted()) {</code>
<code> </code><code>timeunit.milliseconds.sleep(</code><code>300</code> <code>+ random.nextint(</code><code>500</code><code>));</code>
<code> </code><code>//生産一片吐司,這些吐司是有序的</code>
<code> </code><code>toast toast = </code><code>new</code> <code>toast(count++);</code>
<code> </code><code>system.out.println(toast);</code>
<code> </code><code>//放到toastqueue中</code>
<code> </code><code>toastqueue.put(toast);</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>system.out.println(</code><code>"toaster interrupted."</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>system.out.println(</code><code>"toaster off."</code><code>);</code>
<code> </code><code>* 塗黃油的任務。</code>
<code>class</code> <code>butterer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>toastqueue dryqueue;</code>
<code> </code><code>private</code> <code>toastqueue butteredqueue;</code>
<code> </code><code>public</code> <code>butterer(toastqueue dryqueue, toastqueue butteredqueue) {</code>
<code> </code><code>this</code><code>.dryqueue = dryqueue;</code>
<code> </code><code>this</code><code>.butteredqueue = butteredqueue;</code>
<code> </code>
<code> </code><code>//在取得下一個吐司之前會一直阻塞</code>
<code> </code><code>toast toast = dryqueue.take();</code>
<code> </code><code>toast.butter();</code>
<code> </code><code>butteredqueue.put(toast);</code>
<code> </code><code>system.out.println(</code><code>"butterer interrupted."</code><code>);</code>
<code> </code><code>system.out.println(</code><code>"butterer off."</code><code>);</code>
<code> </code>
<code> </code><code>* 塗果醬的任務。</code>
<code>class</code> <code>jammer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>toastqueue finishedqueue;</code>
<code> </code><code>public</code> <code>jammer(toastqueue butteredqueue, toastqueue finishedqueue) {</code>
<code> </code><code>this</code><code>.finishedqueue = finishedqueue;</code>
<code> </code><code>toast toast = butteredqueue.take();</code>
<code> </code><code>toast.jam();</code>
<code> </code><code>finishedqueue.put(toast);</code>
<code> </code><code>system.out.println(</code><code>"jammer interrupted."</code><code>);</code>
<code> </code><code>system.out.println(</code><code>"jammer off."</code><code>);</code>
<code> </code><code>* 吃吐司的人,消費者。</code>
<code>class</code> <code>eater </code><code>implements</code> <code>runnable {</code>
<code> </code><code>public</code> <code>eater (toastqueue finishedqueue) {</code>
<code> </code><code>toast toast = finishedqueue.take();</code>
<code> </code><code>//驗證取得的吐司是有序的,而且狀态是jammed的</code>
<code> </code><code>if</code> <code>(toast.getid() != count++ || </code>
<code> </code><code>toast.getstatus() != toast.status.jammed) {</code>
<code> </code><code>system.out.println(</code><code>"error -> "</code> <code>+ toast);</code>
<code> </code><code>system.exit(-</code><code>1</code><code>);</code>
<code> </code><code>} </code><code>else</code> <code>{</code>
<code> </code><code>//吃掉吐司</code>
<code> </code><code>system.out.println(toast + </code><code>"->eaten"</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>system.out.println(</code><code>"eater interrupted."</code><code>);</code>
<code> </code><code>system.out.println(</code><code>"eater off."</code><code>);</code>
<code>public</code> <code>class</code> <code>toastomatic {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>
<code> </code><code>toastqueue dryqueue = </code><code>new</code> <code>toastqueue();</code>
<code> </code><code>toastqueue butteredqueue = </code><code>new</code> <code>toastqueue();</code>
<code> </code><code>toastqueue finishedqueue = </code><code>new</code> <code>toastqueue();</code>
<code> </code><code>executorservice exec = executors.newcachedthreadpool();</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>toaster(dryqueue));</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>butterer(dryqueue, butteredqueue));</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>jammer(butteredqueue, finishedqueue));</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>eater(finishedqueue));</code>
<code> </code><code>timeunit.seconds.sleep(</code><code>5</code><code>);</code>
<code> </code><code>exec.shutdownnow();</code>
執行結果(可能的結果):
<code>toast id: </code><code>0</code><code>, status: dry</code>
<code>toast id: </code><code>0</code><code>, status: buttered</code>
<code>toast id: </code><code>0</code><code>, status: jammed</code>
<code>toast id: </code><code>0</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>1</code><code>, status: dry</code>
<code>toast id: </code><code>1</code><code>, status: buttered</code>
<code>toast id: </code><code>1</code><code>, status: jammed</code>
<code>toast id: </code><code>1</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>2</code><code>, status: dry</code>
<code>toast id: </code><code>2</code><code>, status: buttered</code>
<code>toast id: </code><code>2</code><code>, status: jammed</code>
<code>toast id: </code><code>2</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>3</code><code>, status: dry</code>
<code>toast id: </code><code>3</code><code>, status: buttered</code>
<code>toast id: </code><code>3</code><code>, status: jammed</code>
<code>toast id: </code><code>3</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>4</code><code>, status: dry</code>
<code>toast id: </code><code>4</code><code>, status: buttered</code>
<code>toast id: </code><code>4</code><code>, status: jammed</code>
<code>toast id: </code><code>4</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>5</code><code>, status: dry</code>
<code>toast id: </code><code>5</code><code>, status: buttered</code>
<code>toast id: </code><code>5</code><code>, status: jammed</code>
<code>toast id: </code><code>5</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>6</code><code>, status: dry</code>
<code>toast id: </code><code>6</code><code>, status: buttered</code>
<code>toast id: </code><code>6</code><code>, status: jammed</code>
<code>toast id: </code><code>6</code><code>, status: jammed->eaten</code>
<code>toast id: </code><code>7</code><code>, status: dry</code>
<code>toast id: </code><code>7</code><code>, status: buttered</code>
<code>toast id: </code><code>7</code><code>, status: jammed</code>
<code>toast id: </code><code>7</code><code>, status: jammed->eaten</code>
<code>eater interrupted.</code>
<code>eater off.</code>
<code>butterer interrupted.</code>
<code>toaster interrupted.</code>
<code>toaster off.</code>
<code>jammer interrupted.</code>
<code>jammer off.</code>
<code>butterer off.</code>
toast是一個使用enum值的優秀示例。注意,這個示例中沒有任何顯式的同步(即使用lock對象或者synchronized關鍵字的同步),因為同步已經由隊列和系統的設計隐式的管理了——每片toast在任何時刻都隻由一個任務在操作。因為隊列的阻塞,使得處理過程将被自動的挂起和恢複。你可以看到由blockingqueue産生的簡化十分明顯。在使用顯式的wait()和notifyall()時存在的類和類之間的耦合被消除了,因為每個類都隻和它的blockingqueue通信。