天天看點

Java并發新構件之DelayQueue

    delayqueue主要用于放置實作了delay接口的對象,其中的對象隻能在其時刻到期時才能從隊列中取走。這種隊列是有序的,即隊頭的延遲到期時間最短。如果沒有任何延遲到期,那麼久不會有任何頭元素,并且poll()将傳回null(正因為這樣,你不能将null放置到這種隊列中)

    下面是一個示例,其中的delayed對象自身就是任務,而delayedtaskconsumer将最“緊急”的任務從隊列中取出來,然後運作它:

<a href="http://my.oschina.net/itblog/blog/516670#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

<code>import</code> <code>java.util.arraylist;</code>

<code>import</code> <code>java.util.list;</code>

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.delayqueue;</code>

<code>import</code> <code>java.util.concurrent.delayed;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>import</code> <code>static</code> <code>java.util.concurrent.timeunit.*;</code>

<code>class</code> <code>delayedtask </code><code>implements</code> <code>runnable, delayed {</code>

<code>    </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>0</code><code>;</code>

<code>    </code><code>protected</code> <code>static</code> <code>list&lt;delayedtask&gt; sequence = </code><code>new</code> <code>arraylist&lt;&gt;();</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>delaytime;</code>

<code>    </code><code>private</code> <code>final</code> <code>long</code> <code>triggertime;</code>

<code>    </code><code>public</code> <code>delayedtask(</code><code>int</code> <code>delayinmillis) {</code>

<code>        </code><code>delaytime = delayinmillis;</code>

<code>        </code><code>triggertime = system.nanotime() + nanoseconds.convert(delaytime, milliseconds);</code>

<code>        </code><code>sequence.add(</code><code>this</code><code>);</code>

<code>    </code><code>}</code>

<code>    </code> 

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>int</code> <code>compareto(delayed o) {</code>

<code>        </code><code>delayedtask that = (delayedtask)o;</code>

<code>        </code><code>if</code> <code>(triggertime &lt; that.triggertime) </code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code>        </code><code>if</code> <code>(triggertime &gt; that.triggertime) </code><code>return</code> <code>1</code><code>;</code>

<code>        </code><code>return</code> <code>0</code><code>;</code>

<code>    </code><code>/**</code>

<code>     </code><code>* 剩餘的延遲時間</code>

<code>     </code><code>*/</code>

<code>    </code><code>public</code> <code>long</code> <code>getdelay(timeunit unit) {</code>

<code>        </code><code>return</code> <code>unit.convert(triggertime - system.nanotime(), nanoseconds);</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" "</code><code>);</code>

<code>    </code><code>public</code> <code>string tostring() {</code>

<code>        </code><code>return</code> <code>string.format(</code><code>"[%1$-4d]"</code><code>, delaytime) + </code><code>" task "</code> <code>+ id;</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>endsentinel </code><code>extends</code> <code>delayedtask {</code>

<code>        </code><code>private</code> <code>executorservice exec;</code>

<code>        </code><code>public</code> <code>endsentinel(</code><code>int</code> <code>delay, executorservice exec) {</code>

<code>            </code><code>super</code><code>(delay);</code>

<code>            </code><code>this</code><code>.exec = exec;</code>

<code>        </code><code>}</code>

<code>        </code><code>@override</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>            </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>

<code>            </code><code>exec.shutdownnow();</code>

<code>}</code>

<code>class</code> <code>delayedtaskconsumer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>delayqueue&lt;delayedtask&gt; tasks;</code>

<code>    </code><code>public</code> <code>delayedtaskconsumer(delayqueue&lt;delayedtask&gt; tasks) {</code>

<code>        </code><code>this</code><code>.tasks = tasks;</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>while</code><code>(!thread.interrupted()) {</code>

<code>                </code><code>tasks.take().run();</code><code>//run tasks with current thread.</code>

<code>            </code><code>}</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>// todo: handle exception</code>

<code>        </code><code>system.out.println(</code><code>"finished delaytedtaskconsumer."</code><code>);</code>

<code>public</code> <code>class</code> <code>delayqueuedemo {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>int</code> <code>maxdelaytime = </code><code>5000</code><code>;</code><code>//milliseconds</code>

<code>        </code><code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>delayqueue&lt;delayedtask&gt; queue = </code><code>new</code> <code>delayqueue&lt;&gt;();</code>

<code>        </code><code>//填充10個休眠時間随機的任務</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>10</code><code>; i++) {</code>

<code>            </code><code>queue.put(</code><code>new</code> <code>delayedtask(random.nextint(maxdelaytime)));</code>

<code>        </code><code>//設定結束的時候。</code>

<code>        </code><code>queue.add(</code><code>new</code> <code>delayedtask.endsentinel(maxdelaytime, exec));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>delayedtaskconsumer(queue));</code>

執行結果:

<code>[</code><code>200</code> <code>] task </code><code>7</code> 

<code>[</code><code>429</code> <code>] task </code><code>5</code> 

<code>[</code><code>555</code> <code>] task </code><code>1</code> 

<code>[</code><code>961</code> <code>] task </code><code>4</code> 

<code>[</code><code>1207</code><code>] task </code><code>9</code> 

<code>[</code><code>1693</code><code>] task </code><code>2</code> 

<code>[</code><code>1861</code><code>] task </code><code>3</code> 

<code>[</code><code>4258</code><code>] task </code><code>0</code> 

<code>[</code><code>4522</code><code>] task </code><code>8</code> 

<code>[</code><code>4868</code><code>] task </code><code>6</code> 

<code>[</code><code>5000</code><code>] task </code><code>10</code> <code>calling shutdownnow()</code>

<code>finished delaytedtaskconsumer.</code>

    delayedtask包含一個稱為sequence的list&lt;delayedtask&gt;,它儲存了在任務被建立的順序,是以我們可以看到排序是按照實際發生的順序執行的(即到期時間短的先出隊列)。

    delayed接口有一個方法名為getdelay(),它可以用來告知延遲到期還有多長時間,或者延遲在多長時間之前已經到期。這個方法将強制我們去使用timeunit類,因為這就是參數類型。這會産生一個非常友善的類,因為你可以很容易地轉換機關而無需做任何聲明。例如,delaytime的值是以毫秒為機關的,但是system.nanotime()産生的時間則是以納秒為機關的。你可以轉換delaytime的值,方法是聲明它的機關以及你希望以什麼機關來表示,就像下面這樣:

<code>nanoseconds.convert(delaytime, milliseconds);</code>

    為了排序,delayed接口還繼承了comparable接口,是以必須實作compareto()方法,使其可以産生合理的比較。tostring()則提供了輸出格式化,而嵌套的endsentinel類提供了一種關閉所有事物的途徑,具體做法是将其放置為隊列的最後一個元素。

    注意,因為delayedtaskconsumer自身是一個任務,是以它有自己的thread,它可以使用這個線程來運作從隊列中擷取的所有任務。由于任務是按照隊列優先級的順序來執行的,是以在本例中不需要啟動任何單獨的線程來運作delayedtask。