天天看点

java timertask源码_Java里的Timer源码分析

简介:

Java Timer class is thread safe and multiple threads can share a single Timer object without need for external synchronization. Timer class uses java.util.TaskQueue to add tasks at given regular interval and at any time there can be only one thread running the TimerTask, for example if you are creating a Timer to run every 10 seconds but single thread execution takes 20 seconds, then Timer object will keep adding tasks to the queue and as soon as one thread is finished, it will notify the queue and another thread will start executing.

Java Timer class uses Object wait and notify methods to schedule the tasks.

要点:

1. Timer中的schedule功能提供了一个添加任务定时执行的功能,他支持多线程。这个功能的实现概括来讲就是用一个优先队列,建立任务(Task),把任务按实行时间加到优先队列里。睡若干秒直到第一个任务。

2. Timer用一个TaskQueue,就是一个优先队列,可以把任务(Task) 按实行时间加到优先队列里。执行的时候,只有一个线程可以进行操作。提到了一个例子,就是如果一个任务间隔10s,但是执行花20s,还是得等执行完以后,新的任务才能添加。(这时加入队列的时间会为负数,也就是说立即实行)

3.运用java多线程里的wait和notify。

4. 对于设计添加任务定时执行的功能的系统设计,一个好的切入点就是先从单线程/多线程开始,拓展到多机器。任务定时的单线程版本最简单流行的实现就是Java里Timer中的schedule功能。所以正好对应这个题目。

源码分析

1. Timer

1)变量

Timer class 的结构很清晰,只有两个变量。一个TaskQueue,一个TimerThread。TimerThread是一个有TaskQueue实例的线程。

1

7 private final TaskQueue queue = newTaskQueue();8

9

12 private final TimerThread thread = new TimerThread(queue);

2) constructor

4种构造方法分别对应timer的线程是否是守护线程,线程是否有名字。

* @param name the name of the associated thread

* @param isDaemon true if the associated thread should run as a daemon

1 publicTimer() {2 this("Timer-" +serialNumber());3 }4

5 public Timer(booleanisDaemon) {6 this("Timer-" +serialNumber(), isDaemon);7 }8

9 publicTimer(String name) {10 thread.setName(name);11 thread.start();12 }1314 public Timer(String name, booleanisDaemon) {

15 thread.setName(name);16 thread.setDaemon(isDaemon);17 thread.start();18 }

3)schedule方法:

一共有6种public的schedule方法,第1,2种为执行一次。剩下4种为定期执行,其中根据对延期的不用处理,分为间隔优先(fix-delay) / 时间优先(fix-rate)。

这6种schedule方法都call同一个private的sched方法 - sched(TimerTask task, long time, long period),区别是执行一次的period变量设为0,间隔优先设为period的负数,时间优先设为period本身。这样不用多传一个执行类型的变量了。

1 // 1. 执行一次2

5 public void schedule(TimerTask task, longdelay) {6 if (delay < 0)7 throw new IllegalArgumentException("Negative delay.");8 sched(task, System.currentTimeMillis()+delay, 0);9 }10

11

15 public voidschedule(TimerTask task, Date time) {16 sched(task, time.getTime(), 0);17 }18 // 2. 定时执行 - fixed-delay 策略 适用于要求间隔尽量一致,而不是必须某时间执行的需求19

29 public void schedule(TimerTask task, long delay, long period) {

30 if (delay < 0)

31 throw new IllegalArgumentException("Negative delay.");

32 if (period <= 0)

33 throw new IllegalArgumentException("Non-positive period.");

34 sched(task, System.currentTimeMillis()+delay, -period);

35 }

36

37 public void schedule(TimerTask task, Date firstTime, long period) {38 if (period <= 0)39 throw new IllegalArgumentException("Non-positive period.");40 sched(task, firstTime.getTime(), -period);41 }42 3. 定时执行 - fixed-rate 策略 适用于必须某时间执行的需求

43 56 public void scheduleAtFixedRate(TimerTask task, long delay, long period) {

57 if (delay < 0)

58 throw new IllegalArgumentException("Negative delay.");

59 if (period <= 0)

60 throw new IllegalArgumentException("Non-positive period.");

61 sched(task, System.currentTimeMillis()+delay, period);

62 }

63

64 public void scheduleAtFixedRate(TimerTask task, Date firstTime,

65 long period) {

66 if (period <= 0)

67 throw new IllegalArgumentException("Non-positive period.");

68 sched(task, firstTime.getTime(), period);

69 }

再来看call的这个方法,基本上就是把可执行的task放入优先队列,过程中分别对优先队列和task加锁。

1

9 private void sched(TimerTask task, long time, longperiod) {10 if (time < 0)11 throw new IllegalArgumentException("Illegal execution time.");12

13 //Constrain value of period sufficiently to prevent numeric14 //overflow while still being effectively infinitely large.

15 if (Math.abs(period) > (Long.MAX_VALUE >> 1))16 period >>= 1;17

18 synchronized(queue) {19 if (!thread.newTasksMayBeScheduled)20 throw new IllegalStateException("Timer already cancelled.");21

22 synchronized(task.lock) {23 if (task.state !=TimerTask.VIRGIN)24 throw newIllegalStateException(25 "Task already scheduled or cancelled");26 task.nextExecutionTime =time;27 task.period =period;28 task.state =TimerTask.SCHEDULED; // 加入队列,状态改为SCHEDULED29 }30

31 queue.add(task);32 if (queue.getMin() ==task)33 queue.notify(); // 如果新放入的task的执行时间是最近的,唤醒优先队列。因为要更新等待时间。34 }35 }

1

4 voidadd(TimerTask task) {5 //Grow backing store if necessary

6 if (size + 1 ==queue.length)7 queue = Arrays.copyOf(queue, 2*queue.length);8

9 queue[++size] =task;10 fixUp(size);11 }

2. TimerThread

TimerThread类的变量和constructor 主要就是优先队列和一个有无任务的flag。

这里有一个点是TaskQueue这个变量在TimerThread, Timer中是共享的。这样Timer有TimerThread,TaskQueue的reference,TimerThread 有TaskQueue 的reference,可以让Timer进行garbage-collection。

1

8 boolean newTasksMayBeScheduled = true;9

10

16 privateTaskQueue queue;17

18 TimerThread(TaskQueue queue) {19 this.queue =queue;20 }

TimerThread里的run 方法,一个直到优先队列为空且newTaskMayBeScheduled为否跳出的无限循环,一开始一直等待直到有元素加入,如果优先队列有元素,等待若干秒直到第一个任务的执行时间,执行任务,如果是多次执行的任务,计算下个执行时间加入队列。

1 public voidrun() {2 try{3 mainLoop();4 } finally{5 //Someone killed this Thread, behave as if Timer cancelled

6 synchronized(queue) {7 newTasksMayBeScheduled = false;8 queue.clear(); //Eliminate obsolete references

9 }10 }11 }

1 private voidmainLoop() {2 while (true) {3 try{4 TimerTask task;5 booleantaskFired;6 synchronized(queue) {7 //Wait for queue to become non-empty

8 while (queue.isEmpty() &&newTasksMayBeScheduled)9 queue.wait();10 if(queue.isEmpty())11 break; //Queue is empty and will forever remain; die12

13 //Queue nonempty; look at first evt and do the right thing

14 longcurrentTime, executionTime;15 task =queue.getMin();16 synchronized(task.lock) {17 if (task.state ==TimerTask.CANCELLED) {18 queue.removeMin();19 continue; //No action required, poll queue again

20 }21 currentTime =System.currentTimeMillis();22 executionTime =task.nextExecutionTime;23 if (taskFired = (executionTime<=currentTime)) {//最近的执行时间小于当前时间,set taskFired为true

24 if (task.period == 0) { //一次执行 Non-repeating, remove

25 queue.removeMin();26 task.state =TimerTask.EXECUTED;27 } else { //Repeating task, reschedule 多次执行28 //period小于0: 下次执行时间基于当前时间。period大于0: 下次执行时间基于这次执行时间。

29 queue.rescheduleMin(30 task.period<0 ? currentTime -task.period31 : executionTime +task.period);32 }33 }34 }35 if (!taskFired) //Task hasn't yet fired; wait

36 queue.wait(executionTime -currentTime);37 }38 if (taskFired) //Task fired; run it, holding no locks

39 task.run();40 } catch(InterruptedException e) {41 }42 }43 }

3. TaskQueue

TaskQueue 就是一个TimerTask作为内容,nextExecutionTime为排序依据的priorityQueue(在注解中叫做balanced binary heap)。他的实现是一个1为base的,初始大小为128的TimerTask 数组。

1

9 private TimerTask[] queue = new TimerTask[128];10

11

15 private int size = 0;

接下来就是standard的heap里的方法。

1

5 void rescheduleMin(longnewTime) { // reset队列顶的重复任务的执行时间,再放到相应的位置6 queue[1].nextExecutionTime =newTime;7 fixDown(1);8 }9

10

19 private void fixUp(intk) { // addTask 方法:把新的task加入arr最后,然后以size为参数call这个fixUp方法,这样新加入的task按时间排到相应的位置。20 while (k > 1) {21 int j = k >> 1;22 if (queue[j].nextExecutionTime <=queue[k].nextExecutionTime)23 break;24 TimerTask tmp = queue[j];

queue[j] = queue[k];

queue[k] =tmp;25 k =j;26 }27 }28

29

39 private void fixDown(intk) {40 intj;41 while ((j = k << 1) <= size && j > 0) {42 if (j < size && queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)44 j++; //j indexes smallest kid

45 if (queue[k].nextExecutionTime <=queue[j].nextExecutionTime)46 break;47 TimerTask tmp = queue[j]; // 与小的儿子交换

queue[j] = queue[k];

queue[k] =tmp;48 k =j;49 }50 }51

52

56 voidheapify() { // purge方法调用heapify重新排序。57 for (int i = size/2; i >= 1; i--)58 fixDown(i);59 }

4. Timer中其他方法:

Timer类中,还包括方法如cancel,purge。

1

8 public voidcancel() {9 synchronized(queue) {10 thread.newTasksMayBeScheduled = false;11 queue.clear();12 queue.notify(); //In case queue was already empty.

13 }14 }15

16

30 public intpurge() {31 int result = 0;32

33 synchronized(queue) {34 for (int i = queue.size(); i > 0; i--) {35 if (queue.get(i).state ==TimerTask.CANCELLED) {36 queue.quickRemove(i);37 result++;38 }39 }40

41 if (result != 0)42 queue.heapify();43 }44

45 returnresult;46 }

5. TimerTask 类

java中TimerTask是一个implements Runnable的抽象类,所以需要自己建一个concrete class。

1 public abstract class TimerTask implementsRunnable {2

5 final Object lock = newObject();6

7

10 int state =VIRGIN;11

12

15 static final int VIRGIN = 0;16

17

21 static final int SCHEDULED = 1;22

23

27 static final int EXECUTED = 2;28

29

32 static final int CANCELLED = 3;33

34

39 longnextExecutionTime;40

41

46 long period = 0;47

48

51 protectedTimerTask() {52 }53

54

57 public abstract voidrun();58

59

66 public booleancancel() {67 synchronized(lock) {68 boolean result = (state == SCHEDULED); //如果EXECUTED或CANCELLED,result = false;

69 state =CANCELLED;70 returnresult;71 }72 }73

74 /**

75 * Returns the scheduled execution time of the most recent76 * actual execution of this task. (If this method is invoked77 * while task execution is in progress, the return value is the scheduled78 * execution time of the ongoing task execution.)79 *80 *

This method is typically invoked from within a task's run method, to81 * determine whether the current execution of the task is sufficiently82 * timely to warrant performing the scheduled activity:83 *

{@code      

84 * public void run() {85 * if (System.currentTimeMillis() - scheduledExecutionTime() >=86 * MAX_TARDINESS)87 * return; // Too late; skip this execution.88 * // Perform the task89 * }90 * }

91 * This method is typically not used in conjunction with92 * fixed-delay execution repeating tasks, as their scheduled93 * execution times are allowed to drift over time, and so are not terribly94 * significant.95 *96 *@returnthe time at which the most recent execution of this task was97 * scheduled to occur, in the format returned by Date.getTime().98 * The return value is undefined if the task has yet to commence99 * its first execution.100 *@seeDate#getTime()101 */

102 public longscheduledExecutionTime() { // 通过这个方法,在task的concrete class 里可以得到这个task下次运行时间是什么。103 synchronized(lock) {104 return (period < 0 ? nextExecutionTime +period105 : nextExecutionTime -period);106 }107 }108 }

5. 例子

1 public classTimerTest {2 class MyTimerTask extendsTimerTask {3 @Override4 public voidrun() {5 if (System.currentTimeMillis() - scheduledExecutionTime() >= 500) {6 return; //Too late; skip this execution.

7 }8 //Perform the task

9 System.out.println("This is my timer task." + newDate());10 }11 }12

13 public static voidmain(String[] args) {14 TimerTest tt = newTimerTest();15 tt.testTimer();16 }17

18 public voidtestTimer() {19 Timer timer = newTimer();20 MyTimerTask myTimerTask = newMyTimerTask();21

22 timer.schedule(myTimerTask, 1000, 1000); //delay 1 秒后,每秒运行一次

23 try{24 Thread.sleep(3000); //主线程睡3秒,

25 } catch(InterruptedException e) {26

27 }28 timer.cancel(); //主线程醒来,结束timer

29 }30 }

大体的diagram是这样的:

java timertask源码_Java里的Timer源码分析

reference:

https://www.journaldev.com/1050/java-timer-timertask-example