天天看点

Java并发新构件之DelayQueue

    delayqueue主要用于放置实现了delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那么久不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)

    下面是一个示例,其中的delayed对象自身就是任务,而delayedtaskconsumer将最“紧急”的任务从队列中取出来,然后运行它:

<a href="http://my.oschina.net/itblog/blog/516670#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

<code>import</code> <code>java.util.arraylist;</code>

<code>import</code> <code>java.util.list;</code>

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.delayqueue;</code>

<code>import</code> <code>java.util.concurrent.delayed;</code>

<code>import</code> <code>java.util.concurrent.executorservice;</code>

<code>import</code> <code>java.util.concurrent.executors;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>import</code> <code>static</code> <code>java.util.concurrent.timeunit.*;</code>

<code>class</code> <code>delayedtask </code><code>implements</code> <code>runnable, delayed {</code>

<code>    </code><code>private</code> <code>static</code> <code>int</code> <code>counter = </code><code>0</code><code>;</code>

<code>    </code><code>protected</code> <code>static</code> <code>list&lt;delayedtask&gt; sequence = </code><code>new</code> <code>arraylist&lt;&gt;();</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>id = counter++;</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>delaytime;</code>

<code>    </code><code>private</code> <code>final</code> <code>long</code> <code>triggertime;</code>

<code>    </code><code>public</code> <code>delayedtask(</code><code>int</code> <code>delayinmillis) {</code>

<code>        </code><code>delaytime = delayinmillis;</code>

<code>        </code><code>triggertime = system.nanotime() + nanoseconds.convert(delaytime, milliseconds);</code>

<code>        </code><code>sequence.add(</code><code>this</code><code>);</code>

<code>    </code><code>}</code>

<code>    </code> 

<code>    </code><code>@override</code>

<code>    </code><code>public</code> <code>int</code> <code>compareto(delayed o) {</code>

<code>        </code><code>delayedtask that = (delayedtask)o;</code>

<code>        </code><code>if</code> <code>(triggertime &lt; that.triggertime) </code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code>        </code><code>if</code> <code>(triggertime &gt; that.triggertime) </code><code>return</code> <code>1</code><code>;</code>

<code>        </code><code>return</code> <code>0</code><code>;</code>

<code>    </code><code>/**</code>

<code>     </code><code>* 剩余的延迟时间</code>

<code>     </code><code>*/</code>

<code>    </code><code>public</code> <code>long</code> <code>getdelay(timeunit unit) {</code>

<code>        </code><code>return</code> <code>unit.convert(triggertime - system.nanotime(), nanoseconds);</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" "</code><code>);</code>

<code>    </code><code>public</code> <code>string tostring() {</code>

<code>        </code><code>return</code> <code>string.format(</code><code>"[%1$-4d]"</code><code>, delaytime) + </code><code>" task "</code> <code>+ id;</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>endsentinel </code><code>extends</code> <code>delayedtask {</code>

<code>        </code><code>private</code> <code>executorservice exec;</code>

<code>        </code><code>public</code> <code>endsentinel(</code><code>int</code> <code>delay, executorservice exec) {</code>

<code>            </code><code>super</code><code>(delay);</code>

<code>            </code><code>this</code><code>.exec = exec;</code>

<code>        </code><code>}</code>

<code>        </code><code>@override</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>            </code><code>system.out.println(</code><code>this</code> <code>+ </code><code>" calling shutdownnow()"</code><code>);</code>

<code>            </code><code>exec.shutdownnow();</code>

<code>}</code>

<code>class</code> <code>delayedtaskconsumer </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>delayqueue&lt;delayedtask&gt; tasks;</code>

<code>    </code><code>public</code> <code>delayedtaskconsumer(delayqueue&lt;delayedtask&gt; tasks) {</code>

<code>        </code><code>this</code><code>.tasks = tasks;</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>while</code><code>(!thread.interrupted()) {</code>

<code>                </code><code>tasks.take().run();</code><code>//run tasks with current thread.</code>

<code>            </code><code>}</code>

<code>        </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>// todo: handle exception</code>

<code>        </code><code>system.out.println(</code><code>"finished delaytedtaskconsumer."</code><code>);</code>

<code>public</code> <code>class</code> <code>delayqueuedemo {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>int</code> <code>maxdelaytime = </code><code>5000</code><code>;</code><code>//milliseconds</code>

<code>        </code><code>random random = </code><code>new</code> <code>random(</code><code>47</code><code>);</code>

<code>        </code><code>executorservice exec = executors.newcachedthreadpool();</code>

<code>        </code><code>delayqueue&lt;delayedtask&gt; queue = </code><code>new</code> <code>delayqueue&lt;&gt;();</code>

<code>        </code><code>//填充10个休眠时间随机的任务</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i &lt; </code><code>10</code><code>; i++) {</code>

<code>            </code><code>queue.put(</code><code>new</code> <code>delayedtask(random.nextint(maxdelaytime)));</code>

<code>        </code><code>//设置结束的时候。</code>

<code>        </code><code>queue.add(</code><code>new</code> <code>delayedtask.endsentinel(maxdelaytime, exec));</code>

<code>        </code><code>exec.execute(</code><code>new</code> <code>delayedtaskconsumer(queue));</code>

执行结果:

<code>[</code><code>200</code> <code>] task </code><code>7</code> 

<code>[</code><code>429</code> <code>] task </code><code>5</code> 

<code>[</code><code>555</code> <code>] task </code><code>1</code> 

<code>[</code><code>961</code> <code>] task </code><code>4</code> 

<code>[</code><code>1207</code><code>] task </code><code>9</code> 

<code>[</code><code>1693</code><code>] task </code><code>2</code> 

<code>[</code><code>1861</code><code>] task </code><code>3</code> 

<code>[</code><code>4258</code><code>] task </code><code>0</code> 

<code>[</code><code>4522</code><code>] task </code><code>8</code> 

<code>[</code><code>4868</code><code>] task </code><code>6</code> 

<code>[</code><code>5000</code><code>] task </code><code>10</code> <code>calling shutdownnow()</code>

<code>finished delaytedtaskconsumer.</code>

    delayedtask包含一个称为sequence的list&lt;delayedtask&gt;,它保存了在任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的(即到期时间短的先出队列)。

    delayed接口有一个方法名为getdelay(),它可以用来告知延迟到期还有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用timeunit类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需做任何声明。例如,delaytime的值是以毫秒为单位的,但是system.nanotime()产生的时间则是以纳秒为单位的。你可以转换delaytime的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样:

<code>nanoseconds.convert(delaytime, milliseconds);</code>

    为了排序,delayed接口还继承了comparable接口,因此必须实现compareto()方法,使其可以产生合理的比较。tostring()则提供了输出格式化,而嵌套的endsentinel类提供了一种关闭所有事物的途径,具体做法是将其放置为队列的最后一个元素。

    注意,因为delayedtaskconsumer自身是一个任务,所以它有自己的thread,它可以使用这个线程来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序来执行的,因此在本例中不需要启动任何单独的线程来运行delayedtask。