priorityblockingqueue是一个很基础的优先级队列,它在priorityqueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向queue里面增加元素可能会失败(导致ourofmemoryerror)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:
<a href="http://my.oschina.net/itblog/blog/518304#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
<code>import</code> <code>java.util.arraylist;</code>
<code>import</code> <code>java.util.list;</code>
<code>import</code> <code>java.util.queue;</code>
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.executorservice;</code>
<code>import</code> <code>java.util.concurrent.executors;</code>
<code>import</code> <code>java.util.concurrent.priorityblockingqueue;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>class</code> <code>prioritizedtask </code><code>implements</code> <code>runnable, comparable<prioritizedtask> {</code>
<code> </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>1</code><code>;</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>priority;</code>
<code> </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code><code>//这个id不是static的,因此</code>
<code> </code><code>protected</code> <code>static</code> <code>list<prioritizedtask> sequence = </code><code>new</code> <code>arraylist<>();</code>
<code> </code><code>public</code> <code>prioritizedtask(</code><code>int</code> <code>priority) {</code>
<code> </code><code>this</code><code>.priority = priority;</code>
<code> </code><code>sequence.add(</code><code>this</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>int</code> <code>compareto(prioritizedtask o) {</code>
<code> </code><code>int</code> <code>val = </code><code>this</code><code>.priority - o.priority;</code>
<code> </code><code>//higher value, higher priority</code>
<code> </code><code>return</code> <code>val < </code><code>0</code> <code>? </code><code>1</code> <code>: (val > </code><code>0</code> <code>? -</code><code>1</code> <code>: </code><code>0</code><code>);</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>timeunit.milliseconds.sleep(random.nextint(</code><code>250</code><code>));</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>}</code>
<code> </code><code>system.out.println(</code><code>this</code><code>);</code>
<code> </code><code>public</code> <code>string tostring() {</code>
<code> </code><code>return</code> <code>string.format(</code><code>"p=[%1$-3d]"</code><code>, priority) + </code><code>", id="</code> <code>+ id;</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>endflagtask </code><code>extends</code> <code>prioritizedtask {</code>
<code> </code><code>private</code> <code>executorservice exec;</code>
<code> </code><code>public</code> <code>endflagtask(executorservice executorservice) {</code>
<code> </code><code>super</code><code>(-</code><code>1</code><code>);</code><code>//最低的优先级</code>
<code> </code><code>exec = executorservice;</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>
<code> </code><code>exec.shutdownnow();</code>
<code>}</code>
<code>class</code> <code>prioritizedtaskproducer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>queue<runnable> queue;</code>
<code> </code><code>private</code> <code>executorservice exec;</code>
<code> </code><code>public</code> <code>prioritizedtaskproducer(queue<runnable> queue, executorservice exec) {</code>
<code> </code><code>this</code><code>.queue = queue;</code>
<code> </code><code>this</code><code>.exec = exec;</code>
<code> </code><code>//慢慢的添加高优先级的任务</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < </code><code>6</code><code>; i++) {</code>
<code> </code><code>timeunit.milliseconds.sleep(</code><code>250</code><code>);</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>9</code><code>)); </code><code>//6个优先级10</code>
<code> </code><code>}</code>
<code> </code><code>//先创建2个p=0的任务</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>0</code><code>));</code>
<code> </code><code>//添加低优先级的任务</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < </code><code>6</code><code>; i++) {</code><code>// 优先级0-5</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(i));</code>
<code> </code><code>//添加一个结束标志的任务</code>
<code> </code><code>queue.add(</code><code>new</code> <code>prioritizedtask.endflagtask(exec));</code>
<code> </code>
<code> </code><code>// todo: handle exception</code>
<code> </code><code>system.out.println(</code><code>"finished prioritizedtaskproducer."</code><code>);</code>
<code>class</code> <code>prioritizedtaskconsumer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>priorityblockingqueue<runnable> queue;</code>
<code> </code><code>public</code> <code>prioritizedtaskconsumer(priorityblockingqueue<runnable> queue) {</code>
<code> </code><code>//不停的从queue里面取任务,直到exec停止。</code>
<code> </code><code>while</code><code>(!thread.interrupted()) {</code>
<code> </code><code>//使用当前线程来跑这些任务</code>
<code> </code><code>queue.take().run();</code>
<code> </code><code>system.out.println(</code><code>"finished prioritizedtaskconsumer."</code><code>);</code>
<code>public</code> <code>final</code> <code>class</code> <code>priorityblockingqueuedemo {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
<code> </code><code>executorservice exec = executors.newcachedthreadpool();</code>
<code> </code><code>priorityblockingqueue<runnable> queue = </code><code>new</code> <code>priorityblockingqueue<>();</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskproducer(queue, exec));</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskconsumer(queue));</code>
执行结果:
<code>p=[</code><code>9</code> <code>], id=</code><code>1</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>2</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>3</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>4</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>5</code>
<code>finished prioritizedtaskproducer.</code>
<code>p=[</code><code>9</code> <code>], id=</code><code>6</code>
<code>p=[</code><code>5</code> <code>], id=</code><code>14</code>
<code>p=[</code><code>4</code> <code>], id=</code><code>13</code>
<code>p=[</code><code>3</code> <code>], id=</code><code>12</code>
<code>p=[</code><code>2</code> <code>], id=</code><code>11</code>
<code>p=[</code><code>1</code> <code>], id=</code><code>10</code>
<code>p=[</code><code>0</code> <code>], id=</code><code>7</code>
<code>p=[</code><code>0</code> <code>], id=</code><code>9</code>
<code>p=[</code><code>0</code> <code>], id=</code><code>8</code>
<code>p=[-</code><code>1</code> <code>], id=</code><code>15</code> <code>calling shutdownnow()</code>
<code>finished prioritizedtaskconsumer.</code>
prioritizedtask对象的创建序列被记录在sequencelist中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而endflagtask提供了停止executorservice的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。
prioritizedtaskproducer和prioritizedtaskconsumer通过priorityblockingqueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。
从执行结果中可以看到,最先出队列的是priority为9的6个task,因为这几个任务先创建。
这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到queue中和从queue中提取任务并执行时两个不同的任务(即producer和consumer),因此producer先输出“finished prioritizedtaskproducer.”。输出这句话的时候,前面只有5个p=9的任务出列了,因此队列中还有1个p=9的任务没出列,同时还有后续放入各种任务。由于queue中的任务里面,优先级p最高的是p=9的,因此第6个p=9的任务先出队列。剩下的任务按照p的大小依次出列。
任务的id属性表示了它们的创建顺序,因为id是自增的,每创建一个任务,id就增加。因此从
可以很明显的看出:p=5的任务,它的id最大,所以是最后创建的。从我们的代码中也可以看出来,p=5的任务的确是最后创建的。
还有一点可以看出,当p相同的时候,出queue的顺序是不确定的,例如:
另外,在使用此类的时候需要注意: