priorityblockingqueue是一個很基礎的優先級隊列,它在priorityqueue的基礎上提供了可阻塞的讀取操作。它是無限制的,就是說向queue裡面增加元素可能會失敗(導緻ourofmemoryerror)。下面是一個示例,其中在優先級隊列中的對象是按照優先級順序依次出隊列的:
<a href="http://my.oschina.net/itblog/blog/518304#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
<code>import</code> <code>java.util.arraylist;</code>
<code>import</code> <code>java.util.list;</code>
<code>import</code> <code>java.util.queue;</code>
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.executorservice;</code>
<code>import</code> <code>java.util.concurrent.executors;</code>
<code>import</code> <code>java.util.concurrent.priorityblockingqueue;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>class</code> <code>prioritizedtask </code><code>implements</code> <code>runnable, comparable<prioritizedtask> {</code>
<code> </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>1</code><code>;</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>priority;</code>
<code> </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code><code>//這個id不是static的,是以</code>
<code> </code><code>protected</code> <code>static</code> <code>list<prioritizedtask> sequence = </code><code>new</code> <code>arraylist<>();</code>
<code> </code><code>public</code> <code>prioritizedtask(</code><code>int</code> <code>priority) {</code>
<code> </code><code>this</code><code>.priority = priority;</code>
<code> </code><code>sequence.add(</code><code>this</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>int</code> <code>compareto(prioritizedtask o) {</code>
<code> </code><code>int</code> <code>val = </code><code>this</code><code>.priority - o.priority;</code>
<code> </code><code>//higher value, higher priority</code>
<code> </code><code>return</code> <code>val < </code><code>0</code> <code>? </code><code>1</code> <code>: (val > </code><code>0</code> <code>? -</code><code>1</code> <code>: </code><code>0</code><code>);</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>timeunit.milliseconds.sleep(random.nextint(</code><code>250</code><code>));</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>}</code>
<code> </code><code>system.out.println(</code><code>this</code><code>);</code>
<code> </code><code>public</code> <code>string tostring() {</code>
<code> </code><code>return</code> <code>string.format(</code><code>"p=[%1$-3d]"</code><code>, priority) + </code><code>", id="</code> <code>+ id;</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>endflagtask </code><code>extends</code> <code>prioritizedtask {</code>
<code> </code><code>private</code> <code>executorservice exec;</code>
<code> </code><code>public</code> <code>endflagtask(executorservice executorservice) {</code>
<code> </code><code>super</code><code>(-</code><code>1</code><code>);</code><code>//最低的優先級</code>
<code> </code><code>exec = executorservice;</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>
<code> </code><code>exec.shutdownnow();</code>
<code>}</code>
<code>class</code> <code>prioritizedtaskproducer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>queue<runnable> queue;</code>
<code> </code><code>private</code> <code>executorservice exec;</code>
<code> </code><code>public</code> <code>prioritizedtaskproducer(queue<runnable> queue, executorservice exec) {</code>
<code> </code><code>this</code><code>.queue = queue;</code>
<code> </code><code>this</code><code>.exec = exec;</code>
<code> </code><code>//慢慢的添加高優先級的任務</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < </code><code>6</code><code>; i++) {</code>
<code> </code><code>timeunit.milliseconds.sleep(</code><code>250</code><code>);</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>9</code><code>)); </code><code>//6個優先級10</code>
<code> </code><code>}</code>
<code> </code><code>//先建立2個p=0的任務</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>0</code><code>));</code>
<code> </code><code>//添加低優先級的任務</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < </code><code>6</code><code>; i++) {</code><code>// 優先級0-5</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(i));</code>
<code> </code><code>//添加一個結束标志的任務</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask.endflagtask(exec));</code>
<code> </code>
<code> </code><code>// todo: handle exception</code>
<code> </code><code>system.out.println(</code><code>"finished prioritizedtaskproducer."</code><code>);</code>
<code>class</code> <code>prioritizedtaskconsumer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>priorityblockingqueue<runnable> queue;</code>
<code> </code><code>public</code> <code>prioritizedtaskconsumer(priorityblockingqueue<runnable> queue) {</code>
<code> </code><code>//不停的從queue裡面取任務,直到exec停止。</code>
<code> </code><code>while</code><code>(!thread.interrupted()) {</code>
<code> </code><code>//使用目前線程來跑這些任務</code>
<code> </code><code>queue.take().run();</code>
<code> </code><code>system.out.println(</code><code>"finished prioritizedtaskconsumer."</code><code>);</code>
<code>public</code> <code>final</code> <code>class</code> <code>priorityblockingqueuedemo {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
<code> </code><code>executorservice exec = executors.newcachedthreadpool();</code>
<code> </code><code>priorityblockingqueue<runnable> queue = </code><code>new</code> <code>priorityblockingqueue<>();</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskproducer(queue, exec));</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskconsumer(queue));</code>
執行結果:
<code>p=[</code><code>9</code> <code>], id=</code><code>1</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>2</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>3</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>4</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>5</code>
<code>finished prioritizedtaskproducer.</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>6</code>
<code>p=[</code><code>5</code> <code>], id=</code><code>14</code>
<code>p=[</code><code>4</code> <code>], id=</code><code>13</code>
<code>p=[</code><code>3</code> <code>], id=</code><code>12</code>
<code>p=[</code><code>2</code> <code>], id=</code><code>11</code>
<code>p=[</code><code>1</code> <code>], id=</code><code>10</code>
<code>p=[</code><code>0</code> <code>], id=</code><code>7</code>
<code>p=[</code><code>0</code> <code>], id=</code><code>9</code>
<code>p=[</code><code>0</code> <code>], id=</code><code>8</code>
<code>p=[-</code><code>1</code> <code>], id=</code><code>15</code> <code>calling shutdownnow()</code>
<code>finished prioritizedtaskconsumer.</code>
prioritizedtask對象的建立序列被記錄在sequencelist中,用于和實際的順序比較。run()方法将休眠一小段随機的時間,然後列印對象資訊,而endflagtask提供了停止executorservice的功能,要確定它是隊列中的最後一個對象,是以給它設定了最低的優先級(-1,優先級值越大,優先級越高)。
prioritizedtaskproducer和prioritizedtaskconsumer通過priorityblockingqueue彼此連結。因為這種隊列的阻塞特性提供了所有必須的同步,是以你應該注意到了,這裡不需要任何顯式的同步——不必考慮當你從這種隊列中讀取時,其中是否有元素,因為這個隊列在沒有元素時,将直接阻塞讀取者。
從執行結果中可以看到,最先出隊列的是priority為9的6個task,因為這幾個任務先建立。
這句話的列印表示生産者已經将所有的任務放到隊列中了,由于将任務放到queue中和從queue中提取任務并執行時兩個不同的任務(即producer和consumer),是以producer先輸出“finished prioritizedtaskproducer.”。輸出這句話的時候,前面隻有5個p=9的任務出列了,是以隊列中還有1個p=9的任務沒出列,同時還有後續放入各種任務。由于queue中的任務裡面,優先級p最高的是p=9的,是以第6個p=9的任務先出隊列。剩下的任務按照p的大小依次出列。
任務的id屬性表示了它們的建立順序,因為id是自增的,每建立一個任務,id就增加。是以從
可以很明顯的看出:p=5的任務,它的id最大,是以是最後建立的。從我們的代碼中也可以看出來,p=5的任務的确是最後建立的。
還有一點可以看出,當p相同的時候,出queue的順序是不确定的,例如:
另外,在使用此類的時候需要注意: