delayqueue主要用于放置實作了delay接口的對象,其中的對象隻能在其時刻到期時才能從隊列中取走。這種隊列是有序的,即隊頭的延遲到期時間最短。如果沒有任何延遲到期,那麼久不會有任何頭元素,并且poll()将傳回null(正因為這樣,你不能将null放置到這種隊列中)
下面是一個示例,其中的delayed對象自身就是任務,而delayedtaskconsumer将最“緊急”的任務從隊列中取出來,然後運作它:
<a href="http://my.oschina.net/itblog/blog/516670#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
<code>import</code> <code>java.util.arraylist;</code>
<code>import</code> <code>java.util.list;</code>
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.delayqueue;</code>
<code>import</code> <code>java.util.concurrent.delayed;</code>
<code>import</code> <code>java.util.concurrent.executorservice;</code>
<code>import</code> <code>java.util.concurrent.executors;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>import</code> <code>static</code> <code>java.util.concurrent.timeunit.*;</code>
<code>class</code> <code>delayedtask </code><code>implements</code> <code>runnable, delayed {</code>
<code> </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>0</code><code>;</code>
<code> </code><code>protected</code> <code>static</code> <code>list<delayedtask> sequence = </code><code>new</code> <code>arraylist<>();</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code>
<code> </code><code>private</code> <code>final</code> <code>int</code> <code>delaytime;</code>
<code> </code><code>private</code> <code>final</code> <code>long</code> <code>triggertime;</code>
<code> </code><code>public</code> <code>delayedtask(</code><code>int</code> <code>delayinmillis) {</code>
<code> </code><code>delaytime = delayinmillis;</code>
<code> </code><code>triggertime = system.nanotime() + nanoseconds.convert(delaytime, milliseconds);</code>
<code> </code><code>sequence.add(</code><code>this</code><code>);</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>int</code> <code>compareto(delayed o) {</code>
<code> </code><code>delayedtask that = (delayedtask)o;</code>
<code> </code><code>if</code> <code>(triggertime < that.triggertime) </code><code>return</code> <code>-</code><code>1</code><code>;</code>
<code> </code><code>if</code> <code>(triggertime > that.triggertime) </code><code>return</code> <code>1</code><code>;</code>
<code> </code><code>return</code> <code>0</code><code>;</code>
<code> </code><code>/**</code>
<code> </code><code>* 剩餘的延遲時間</code>
<code> </code><code>*/</code>
<code> </code><code>public</code> <code>long</code> <code>getdelay(timeunit unit) {</code>
<code> </code><code>return</code> <code>unit.convert(triggertime - system.nanotime(), nanoseconds);</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" "</code><code>);</code>
<code> </code><code>public</code> <code>string tostring() {</code>
<code> </code><code>return</code> <code>string.format(</code><code>"[%1$-4d]"</code><code>, delaytime) + </code><code>" task "</code> <code>+ id;</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>endsentinel </code><code>extends</code> <code>delayedtask {</code>
<code> </code><code>private</code> <code>executorservice exec;</code>
<code> </code><code>public</code> <code>endsentinel(</code><code>int</code> <code>delay, executorservice exec) {</code>
<code> </code><code>super</code><code>(delay);</code>
<code> </code><code>this</code><code>.exec = exec;</code>
<code> </code><code>}</code>
<code> </code><code>@override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>
<code> </code><code>exec.shutdownnow();</code>
<code>}</code>
<code>class</code> <code>delayedtaskconsumer </code><code>implements</code> <code>runnable {</code>
<code> </code><code>private</code> <code>delayqueue<delayedtask> tasks;</code>
<code> </code><code>public</code> <code>delayedtaskconsumer(delayqueue<delayedtask> tasks) {</code>
<code> </code><code>this</code><code>.tasks = tasks;</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>while</code><code>(!thread.interrupted()) {</code>
<code> </code><code>tasks.take().run();</code><code>//run tasks with current thread.</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code> </code><code>// todo: handle exception</code>
<code> </code><code>system.out.println(</code><code>"finished delaytedtaskconsumer."</code><code>);</code>
<code>public</code> <code>class</code> <code>delayqueuedemo {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
<code> </code><code>int</code> <code>maxdelaytime = </code><code>5000</code><code>;</code><code>//milliseconds</code>
<code> </code><code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>
<code> </code><code>executorservice exec = executors.newcachedthreadpool();</code>
<code> </code><code>delayqueue<delayedtask> queue = </code><code>new</code> <code>delayqueue<>();</code>
<code> </code><code>//填充10個休眠時間随機的任務</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < </code><code>10</code><code>; i++) {</code>
<code> </code><code>queue.put(</code><code>new</code> <code>delayedtask(random.nextint(maxdelaytime)));</code>
<code> </code><code>//設定結束的時候。</code>
<code> </code><code>queue.add(</code><code>new</code> <code>delayedtask.endsentinel(maxdelaytime, exec));</code>
<code> </code><code>exec.execute(</code><code>new</code> <code>delayedtaskconsumer(queue));</code>
執行結果:
<code>[</code><code>200</code> <code>] task </code><code>7</code>
<code>[</code><code>429</code> <code>] task </code><code>5</code>
<code>[</code><code>555</code> <code>] task </code><code>1</code>
<code>[</code><code>961</code> <code>] task </code><code>4</code>
<code>[</code><code>1207</code><code>] task </code><code>9</code>
<code>[</code><code>1693</code><code>] task </code><code>2</code>
<code>[</code><code>1861</code><code>] task </code><code>3</code>
<code>[</code><code>4258</code><code>] task </code><code>0</code>
<code>[</code><code>4522</code><code>] task </code><code>8</code>
<code>[</code><code>4868</code><code>] task </code><code>6</code>
<code>[</code><code>5000</code><code>] task </code><code>10</code> <code>calling shutdownnow()</code>
<code>finished delaytedtaskconsumer.</code>
delayedtask包含一個稱為sequence的list<delayedtask>,它儲存了在任務被建立的順序,是以我們可以看到排序是按照實際發生的順序執行的(即到期時間短的先出隊列)。
delayed接口有一個方法名為getdelay(),它可以用來告知延遲到期還有多長時間,或者延遲在多長時間之前已經到期。這個方法将強制我們去使用timeunit類,因為這就是參數類型。這會産生一個非常友善的類,因為你可以很容易地轉換機關而無需做任何聲明。例如,delaytime的值是以毫秒為機關的,但是system.nanotime()産生的時間則是以納秒為機關的。你可以轉換delaytime的值,方法是聲明它的機關以及你希望以什麼機關來表示,就像下面這樣:
<code>nanoseconds.convert(delaytime, milliseconds);</code>
為了排序,delayed接口還繼承了comparable接口,是以必須實作compareto()方法,使其可以産生合理的比較。tostring()則提供了輸出格式化,而嵌套的endsentinel類提供了一種關閉所有事物的途徑,具體做法是将其放置為隊列的最後一個元素。
注意,因為delayedtaskconsumer自身是一個任務,是以它有自己的thread,它可以使用這個線程來運作從隊列中擷取的所有任務。由于任務是按照隊列優先級的順序來執行的,是以在本例中不需要啟動任何單獨的線程來運作delayedtask。