天天看点

Java并发新构件之PriorityBlockingQueue

    priorityblockingqueue是一个很基础的优先级队列,它在priorityqueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向queue里面增加元素可能会失败(导致ourofmemoryerror)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

<a href="http://my.oschina.net/itblog/blog/518304#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

<code>import</code> <code>java.util.arraylist;</code>

<code>import</code> <code>java.util.list;</code>

<code>import</code> <code>java.util.queue;</code>

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.priorityblockingqueue;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>class</code> <code>prioritizedtask </code><code>implements</code> <code>runnable, comparable&lt;prioritizedtask&gt; {</code>

<code>    </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>1</code><code>;</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>priority;</code>

<code>    </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code><code>//这个id不是static的,因此</code>

<code>    </code><code>protected</code> <code>static</code> <code>list&lt;prioritizedtask&gt; sequence = </code><code>new</code> <code>arraylist&lt;&gt;();</code>

<code>    </code><code>public</code> <code>prioritizedtask(</code><code>int</code> <code>priority) {</code>

<code>        </code><code>this</code><code>.priority = priority;</code>

<code>        </code><code>sequence.add(</code><code>this</code><code>);</code>

<code>    </code><code>}</code>

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>int</code> <code>compareto(prioritizedtask o) {</code>

<code>        </code><code>int</code> <code>val = </code><code>this</code><code>.priority - o.priority;</code>

<code>        </code><code>//higher value, higher priority</code>

<code>        </code><code>return</code> <code>val &lt; </code><code>0</code> <code>? </code><code>1</code> <code>: (val &gt; </code><code>0</code> <code>? -</code><code>1</code> <code>: </code><code>0</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>timeunit.milliseconds.sleep(random.nextint(</code><code>250</code><code>));</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>        </code><code>}</code>

<code>        </code><code>system.out.println(</code><code>this</code><code>);</code>

<code>    </code><code>public</code> <code>string tostring() {</code>

<code>        </code><code>return</code> <code>string.format(</code><code>"p=[%1$-3d]"</code><code>, priority) + </code><code>", id="</code> <code>+ id;</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>endflagtask </code><code>extends</code> <code>prioritizedtask {</code>

<code>        </code><code>private</code> <code>executorservice exec;</code>

<code>        </code><code>public</code> <code>endflagtask(executorservice executorservice) {</code>

<code>            </code><code>super</code><code>(-</code><code>1</code><code>);</code><code>//最低的优先级</code>

<code>            </code><code>exec = executorservice;</code>

<code>        </code><code>@override</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>            </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>

<code>            </code><code>exec.shutdownnow();</code>

<code>}</code>

<code>class</code> <code>prioritizedtaskproducer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>queue&lt;runnable&gt; queue;</code>

<code>    </code><code>private</code> <code>executorservice exec;</code>

<code>    </code><code>public</code> <code>prioritizedtaskproducer(queue&lt;runnable&gt; queue, executorservice exec) {</code>

<code>        </code><code>this</code><code>.queue = queue;</code>

<code>        </code><code>this</code><code>.exec = exec;</code>

<code>            </code><code>//慢慢的添加高优先级的任务</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>6</code><code>; i++) {</code>

<code>                </code><code>timeunit.milliseconds.sleep(</code><code>250</code><code>);</code>

<code>                </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>9</code><code>)); </code><code>//6个优先级10</code>

<code>            </code><code>}</code>

<code>            </code><code>//先创建2个p=0的任务</code>

<code>            </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>0</code><code>));</code>

<code>            </code><code>//添加低优先级的任务</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>6</code><code>; i++) {</code><code>// 优先级0-5</code>

<code>                </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(i));</code>

<code>            </code><code>//添加一个结束标志的任务</code>

<code>            </code><code>queue.add(</code><code>new</code> <code>prioritizedtask.endflagtask(exec));</code>

<code>            </code> 

<code>            </code><code>// todo: handle exception</code>

<code>        </code><code>system.out.println(</code><code>"finished prioritizedtaskproducer."</code><code>);</code>

<code>class</code> <code>prioritizedtaskconsumer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>priorityblockingqueue&lt;runnable&gt; queue;</code>

<code>    </code><code>public</code> <code>prioritizedtaskconsumer(priorityblockingqueue&lt;runnable&gt; queue) {</code>

<code>            </code><code>//不停的从queue里面取任务,直到exec停止。</code>

<code>            </code><code>while</code><code>(!thread.interrupted()) {</code>

<code>                </code><code>//使用当前线程来跑这些任务</code>

<code>                </code><code>queue.take().run();</code>

<code>        </code><code>system.out.println(</code><code>"finished prioritizedtaskconsumer."</code><code>);</code>

<code>public</code> <code>final</code> <code>class</code> <code>priorityblockingqueuedemo {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>priorityblockingqueue&lt;runnable&gt; queue = </code><code>new</code> <code>priorityblockingqueue&lt;&gt;();</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskproducer(queue, exec));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskconsumer(queue));</code>

执行结果:

<code>p=[</code><code>9</code>  <code>], id=</code><code>1</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>2</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>3</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>4</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>5</code>

<code>finished prioritizedtaskproducer.</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>6</code>

<code>p=[</code><code>5</code>  <code>], id=</code><code>14</code>

<code>p=[</code><code>4</code>  <code>], id=</code><code>13</code>

<code>p=[</code><code>3</code>  <code>], id=</code><code>12</code>

<code>p=[</code><code>2</code>  <code>], id=</code><code>11</code>

<code>p=[</code><code>1</code>  <code>], id=</code><code>10</code>

<code>p=[</code><code>0</code>  <code>], id=</code><code>7</code>

<code>p=[</code><code>0</code>  <code>], id=</code><code>9</code>

<code>p=[</code><code>0</code>  <code>], id=</code><code>8</code>

<code>p=[-</code><code>1</code> <code>], id=</code><code>15</code> <code>calling shutdownnow()</code>

<code>finished prioritizedtaskconsumer.</code>

    prioritizedtask对象的创建序列被记录在sequencelist中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而endflagtask提供了停止executorservice的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    prioritizedtaskproducer和prioritizedtaskconsumer通过priorityblockingqueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是priority为9的6个task,因为这几个任务先创建。

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到queue中和从queue中提取任务并执行时两个不同的任务(即producer和consumer),因此producer先输出“finished prioritizedtaskproducer.”。输出这句话的时候,前面只有5个p=9的任务出列了,因此队列中还有1个p=9的任务没出列,同时还有后续放入各种任务。由于queue中的任务里面,优先级p最高的是p=9的,因此第6个p=9的任务先出队列。剩下的任务按照p的大小依次出列。

    任务的id属性表示了它们的创建顺序,因为id是自增的,每创建一个任务,id就增加。因此从

    可以很明显的看出:p=5的任务,它的id最大,所以是最后创建的。从我们的代码中也可以看出来,p=5的任务的确是最后创建的。

    还有一点可以看出,当p相同的时候,出queue的顺序是不确定的,例如:

    另外,在使用此类的时候需要注意: