天天看點

Java并發新構件之PriorityBlockingQueue

    priorityblockingqueue是一個很基礎的優先級隊列,它在priorityqueue的基礎上提供了可阻塞的讀取操作。它是無限制的,就是說向queue裡面增加元素可能會失敗(導緻ourofmemoryerror)。下面是一個示例,其中在優先級隊列中的對象是按照優先級順序依次出隊列的:

<a href="http://my.oschina.net/itblog/blog/518304#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

<code>import</code> <code>java.util.arraylist;</code>

<code>import</code> <code>java.util.list;</code>

<code>import</code> <code>java.util.queue;</code>

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.priorityblockingqueue;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>class</code> <code>prioritizedtask </code><code>implements</code> <code>runnable, comparable&lt;prioritizedtask&gt; {</code>

<code>    </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>1</code><code>;</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>priority;</code>

<code>    </code><code>private</code> <code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code><code>//這個id不是static的,是以</code>

<code>    </code><code>protected</code> <code>static</code> <code>list&lt;prioritizedtask&gt; sequence = </code><code>new</code> <code>arraylist&lt;&gt;();</code>

<code>    </code><code>public</code> <code>prioritizedtask(</code><code>int</code> <code>priority) {</code>

<code>        </code><code>this</code><code>.priority = priority;</code>

<code>        </code><code>sequence.add(</code><code>this</code><code>);</code>

<code>    </code><code>}</code>

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>int</code> <code>compareto(prioritizedtask o) {</code>

<code>        </code><code>int</code> <code>val = </code><code>this</code><code>.priority - o.priority;</code>

<code>        </code><code>//higher value, higher priority</code>

<code>        </code><code>return</code> <code>val &lt; </code><code>0</code> <code>? </code><code>1</code> <code>: (val &gt; </code><code>0</code> <code>? -</code><code>1</code> <code>: </code><code>0</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>timeunit.milliseconds.sleep(random.nextint(</code><code>250</code><code>));</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>        </code><code>}</code>

<code>        </code><code>system.out.println(</code><code>this</code><code>);</code>

<code>    </code><code>public</code> <code>string tostring() {</code>

<code>        </code><code>return</code> <code>string.format(</code><code>"p=[%1$-3d]"</code><code>, priority) + </code><code>", id="</code> <code>+ id;</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>endflagtask </code><code>extends</code> <code>prioritizedtask {</code>

<code>        </code><code>private</code> <code>executorservice exec;</code>

<code>        </code><code>public</code> <code>endflagtask(executorservice executorservice) {</code>

<code>            </code><code>super</code><code>(-</code><code>1</code><code>);</code><code>//最低的優先級</code>

<code>            </code><code>exec = executorservice;</code>

<code>        </code><code>@override</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>            </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>

<code>            </code><code>exec.shutdownnow();</code>

<code>}</code>

<code>class</code> <code>prioritizedtaskproducer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>queue&lt;runnable&gt; queue;</code>

<code>    </code><code>private</code> <code>executorservice exec;</code>

<code>    </code><code>public</code> <code>prioritizedtaskproducer(queue&lt;runnable&gt; queue, executorservice exec) {</code>

<code>        </code><code>this</code><code>.queue = queue;</code>

<code>        </code><code>this</code><code>.exec = exec;</code>

<code>            </code><code>//慢慢的添加高優先級的任務</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>6</code><code>; i++) {</code>

<code>                </code><code>timeunit.milliseconds.sleep(</code><code>250</code><code>);</code>

<code>                </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>9</code><code>)); </code><code>//6個優先級10</code>

<code>            </code><code>}</code>

<code>            </code><code>//先建立2個p=0的任務</code>

<code>            </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(</code><code>0</code><code>));</code>

<code>            </code><code>//添加低優先級的任務</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>6</code><code>; i++) {</code><code>// 優先級0-5</code>

<code>                </code><code>queue.add(</code><code>new</code> <code>prioritizedtask(i));</code>

<code>            </code><code>//添加一個結束标志的任務</code>

<code>            </code><code>queue.add(</code><code>new</code> <code>prioritizedtask.endflagtask(exec));</code>

<code>            </code> 

<code>            </code><code>// todo: handle exception</code>

<code>        </code><code>system.out.println(</code><code>"finished prioritizedtaskproducer."</code><code>);</code>

<code>class</code> <code>prioritizedtaskconsumer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>priorityblockingqueue&lt;runnable&gt; queue;</code>

<code>    </code><code>public</code> <code>prioritizedtaskconsumer(priorityblockingqueue&lt;runnable&gt; queue) {</code>

<code>            </code><code>//不停的從queue裡面取任務,直到exec停止。</code>

<code>            </code><code>while</code><code>(!thread.interrupted()) {</code>

<code>                </code><code>//使用目前線程來跑這些任務</code>

<code>                </code><code>queue.take().run();</code>

<code>        </code><code>system.out.println(</code><code>"finished prioritizedtaskconsumer."</code><code>);</code>

<code>public</code> <code>final</code> <code>class</code> <code>priorityblockingqueuedemo {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>priorityblockingqueue&lt;runnable&gt; queue = </code><code>new</code> <code>priorityblockingqueue&lt;&gt;();</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskproducer(queue, exec));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>prioritizedtaskconsumer(queue));</code>

執行結果:

<code>p=[</code><code>9</code>  <code>], id=</code><code>1</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>2</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>3</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>4</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>5</code>

<code>finished prioritizedtaskproducer.</code>

<code>p=[</code><code>9</code>  <code>], id=</code><code>6</code>

<code>p=[</code><code>5</code>  <code>], id=</code><code>14</code>

<code>p=[</code><code>4</code>  <code>], id=</code><code>13</code>

<code>p=[</code><code>3</code>  <code>], id=</code><code>12</code>

<code>p=[</code><code>2</code>  <code>], id=</code><code>11</code>

<code>p=[</code><code>1</code>  <code>], id=</code><code>10</code>

<code>p=[</code><code>0</code>  <code>], id=</code><code>7</code>

<code>p=[</code><code>0</code>  <code>], id=</code><code>9</code>

<code>p=[</code><code>0</code>  <code>], id=</code><code>8</code>

<code>p=[-</code><code>1</code> <code>], id=</code><code>15</code> <code>calling shutdownnow()</code>

<code>finished prioritizedtaskconsumer.</code>

    prioritizedtask對象的建立序列被記錄在sequencelist中,用于和實際的順序比較。run()方法将休眠一小段随機的時間,然後列印對象資訊,而endflagtask提供了停止executorservice的功能,要確定它是隊列中的最後一個對象,是以給它設定了最低的優先級(-1,優先級值越大,優先級越高)。

    prioritizedtaskproducer和prioritizedtaskconsumer通過priorityblockingqueue彼此連結。因為這種隊列的阻塞特性提供了所有必須的同步,是以你應該注意到了,這裡不需要任何顯式的同步——不必考慮當你從這種隊列中讀取時,其中是否有元素,因為這個隊列在沒有元素時,将直接阻塞讀取者。

    從執行結果中可以看到,最先出隊列的是priority為9的6個task,因為這幾個任務先建立。

    這句話的列印表示生産者已經将所有的任務放到隊列中了,由于将任務放到queue中和從queue中提取任務并執行時兩個不同的任務(即producer和consumer),是以producer先輸出“finished prioritizedtaskproducer.”。輸出這句話的時候,前面隻有5個p=9的任務出列了,是以隊列中還有1個p=9的任務沒出列,同時還有後續放入各種任務。由于queue中的任務裡面,優先級p最高的是p=9的,是以第6個p=9的任務先出隊列。剩下的任務按照p的大小依次出列。

    任務的id屬性表示了它們的建立順序,因為id是自增的,每建立一個任務,id就增加。是以從

    可以很明顯的看出:p=5的任務,它的id最大,是以是最後建立的。從我們的代碼中也可以看出來,p=5的任務的确是最後建立的。

    還有一點可以看出,當p相同的時候,出queue的順序是不确定的,例如:

    另外,在使用此類的時候需要注意: