簡介:
Java Timer class is thread safe and multiple threads can share a single Timer object without need for external synchronization. Timer class uses java.util.TaskQueue to add tasks at given regular interval and at any time there can be only one thread running the TimerTask, for example if you are creating a Timer to run every 10 seconds but single thread execution takes 20 seconds, then Timer object will keep adding tasks to the queue and as soon as one thread is finished, it will notify the queue and another thread will start executing.
Java Timer class uses Object wait and notify methods to schedule the tasks.
要點:
1. Timer中的schedule功能提供了一個添加任務定時執行的功能,他支援多線程。這個功能的實作概括來講就是用一個優先隊列,建立任務(Task),把任務按實行時間加到優先隊列裡。睡若幹秒直到第一個任務。
2. Timer用一個TaskQueue,就是一個優先隊列,可以把任務(Task) 按實行時間加到優先隊列裡。執行的時候,隻有一個線程可以進行操作。提到了一個例子,就是如果一個任務間隔10s,但是執行花20s,還是得等執行完以後,新的任務才能添加。(這時加入隊列的時間會為負數,也就是說立即實行)
3.運用java多線程裡的wait和notify。
4. 對于設計添加任務定時執行的功能的系統設計,一個好的切入點就是先從單線程/多線程開始,拓展到多機器。任務定時的單線程版本最簡單流行的實作就是Java裡Timer中的schedule功能。是以正好對應這個題目。
源碼分析
1. Timer
1)變量
Timer class 的結構很清晰,隻有兩個變量。一個TaskQueue,一個TimerThread。TimerThread是一個有TaskQueue執行個體的線程。
1
7 private final TaskQueue queue = newTaskQueue();8
9
12 private final TimerThread thread = new TimerThread(queue);
2) constructor
4種構造方法分别對應timer的線程是否是守護線程,線程是否有名字。
* @param name the name of the associated thread
* @param isDaemon true if the associated thread should run as a daemon
1 publicTimer() {2 this("Timer-" +serialNumber());3 }4
5 public Timer(booleanisDaemon) {6 this("Timer-" +serialNumber(), isDaemon);7 }8
9 publicTimer(String name) {10 thread.setName(name);11 thread.start();12 }1314 public Timer(String name, booleanisDaemon) {
15 thread.setName(name);16 thread.setDaemon(isDaemon);17 thread.start();18 }
3)schedule方法:
一共有6種public的schedule方法,第1,2種為執行一次。剩下4種為定期執行,其中根據對延期的不用處理,分為間隔優先(fix-delay) / 時間優先(fix-rate)。
這6種schedule方法都call同一個private的sched方法 - sched(TimerTask task, long time, long period),差別是執行一次的period變量設為0,間隔優先設為period的負數,時間優先設為period本身。這樣不用多傳一個執行類型的變量了。
1 // 1. 執行一次2
5 public void schedule(TimerTask task, longdelay) {6 if (delay < 0)7 throw new IllegalArgumentException("Negative delay.");8 sched(task, System.currentTimeMillis()+delay, 0);9 }10
11
15 public voidschedule(TimerTask task, Date time) {16 sched(task, time.getTime(), 0);17 }18 // 2. 定時執行 - fixed-delay 政策 适用于要求間隔盡量一緻,而不是必須某時間執行的需求19
29 public void schedule(TimerTask task, long delay, long period) {
30 if (delay < 0)
31 throw new IllegalArgumentException("Negative delay.");
32 if (period <= 0)
33 throw new IllegalArgumentException("Non-positive period.");
34 sched(task, System.currentTimeMillis()+delay, -period);
35 }
36
37 public void schedule(TimerTask task, Date firstTime, long period) {38 if (period <= 0)39 throw new IllegalArgumentException("Non-positive period.");40 sched(task, firstTime.getTime(), -period);41 }42 3. 定時執行 - fixed-rate 政策 适用于必須某時間執行的需求
43 56 public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
57 if (delay < 0)
58 throw new IllegalArgumentException("Negative delay.");
59 if (period <= 0)
60 throw new IllegalArgumentException("Non-positive period.");
61 sched(task, System.currentTimeMillis()+delay, period);
62 }
63
64 public void scheduleAtFixedRate(TimerTask task, Date firstTime,
65 long period) {
66 if (period <= 0)
67 throw new IllegalArgumentException("Non-positive period.");
68 sched(task, firstTime.getTime(), period);
69 }
再來看call的這個方法,基本上就是把可執行的task放入優先隊列,過程中分别對優先隊列和task加鎖。
1
9 private void sched(TimerTask task, long time, longperiod) {10 if (time < 0)11 throw new IllegalArgumentException("Illegal execution time.");12
13 //Constrain value of period sufficiently to prevent numeric14 //overflow while still being effectively infinitely large.
15 if (Math.abs(period) > (Long.MAX_VALUE >> 1))16 period >>= 1;17
18 synchronized(queue) {19 if (!thread.newTasksMayBeScheduled)20 throw new IllegalStateException("Timer already cancelled.");21
22 synchronized(task.lock) {23 if (task.state !=TimerTask.VIRGIN)24 throw newIllegalStateException(25 "Task already scheduled or cancelled");26 task.nextExecutionTime =time;27 task.period =period;28 task.state =TimerTask.SCHEDULED; // 加入隊列,狀态改為SCHEDULED29 }30
31 queue.add(task);32 if (queue.getMin() ==task)33 queue.notify(); // 如果新放入的task的執行時間是最近的,喚醒優先隊列。因為要更新等待時間。34 }35 }
1
4 voidadd(TimerTask task) {5 //Grow backing store if necessary
6 if (size + 1 ==queue.length)7 queue = Arrays.copyOf(queue, 2*queue.length);8
9 queue[++size] =task;10 fixUp(size);11 }
2. TimerThread
TimerThread類的變量和constructor 主要就是優先隊列和一個有無任務的flag。
這裡有一個點是TaskQueue這個變量在TimerThread, Timer中是共享的。這樣Timer有TimerThread,TaskQueue的reference,TimerThread 有TaskQueue 的reference,可以讓Timer進行garbage-collection。
1
8 boolean newTasksMayBeScheduled = true;9
10
16 privateTaskQueue queue;17
18 TimerThread(TaskQueue queue) {19 this.queue =queue;20 }
TimerThread裡的run 方法,一個直到優先隊列為空且newTaskMayBeScheduled為否跳出的無限循環,一開始一直等待直到有元素加入,如果優先隊列有元素,等待若幹秒直到第一個任務的執行時間,執行任務,如果是多次執行的任務,計算下個執行時間加入隊列。
1 public voidrun() {2 try{3 mainLoop();4 } finally{5 //Someone killed this Thread, behave as if Timer cancelled
6 synchronized(queue) {7 newTasksMayBeScheduled = false;8 queue.clear(); //Eliminate obsolete references
9 }10 }11 }
1 private voidmainLoop() {2 while (true) {3 try{4 TimerTask task;5 booleantaskFired;6 synchronized(queue) {7 //Wait for queue to become non-empty
8 while (queue.isEmpty() &&newTasksMayBeScheduled)9 queue.wait();10 if(queue.isEmpty())11 break; //Queue is empty and will forever remain; die12
13 //Queue nonempty; look at first evt and do the right thing
14 longcurrentTime, executionTime;15 task =queue.getMin();16 synchronized(task.lock) {17 if (task.state ==TimerTask.CANCELLED) {18 queue.removeMin();19 continue; //No action required, poll queue again
20 }21 currentTime =System.currentTimeMillis();22 executionTime =task.nextExecutionTime;23 if (taskFired = (executionTime<=currentTime)) {//最近的執行時間小于目前時間,set taskFired為true
24 if (task.period == 0) { //一次執行 Non-repeating, remove
25 queue.removeMin();26 task.state =TimerTask.EXECUTED;27 } else { //Repeating task, reschedule 多次執行28 //period小于0: 下次執行時間基于目前時間。period大于0: 下次執行時間基于這次執行時間。
29 queue.rescheduleMin(30 task.period<0 ? currentTime -task.period31 : executionTime +task.period);32 }33 }34 }35 if (!taskFired) //Task hasn't yet fired; wait
36 queue.wait(executionTime -currentTime);37 }38 if (taskFired) //Task fired; run it, holding no locks
39 task.run();40 } catch(InterruptedException e) {41 }42 }43 }
3. TaskQueue
TaskQueue 就是一個TimerTask作為内容,nextExecutionTime為排序依據的priorityQueue(在注解中叫做balanced binary heap)。他的實作是一個1為base的,初始大小為128的TimerTask 數組。
1
9 private TimerTask[] queue = new TimerTask[128];10
11
15 private int size = 0;
接下來就是standard的heap裡的方法。
1
5 void rescheduleMin(longnewTime) { // reset隊列頂的重複任務的執行時間,再放到相應的位置6 queue[1].nextExecutionTime =newTime;7 fixDown(1);8 }9
10
19 private void fixUp(intk) { // addTask 方法:把新的task加入arr最後,然後以size為參數call這個fixUp方法,這樣新加入的task按時間排到相應的位置。20 while (k > 1) {21 int j = k >> 1;22 if (queue[j].nextExecutionTime <=queue[k].nextExecutionTime)23 break;24 TimerTask tmp = queue[j];
queue[j] = queue[k];
queue[k] =tmp;25 k =j;26 }27 }28
29
39 private void fixDown(intk) {40 intj;41 while ((j = k << 1) <= size && j > 0) {42 if (j < size && queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)44 j++; //j indexes smallest kid
45 if (queue[k].nextExecutionTime <=queue[j].nextExecutionTime)46 break;47 TimerTask tmp = queue[j]; // 與小的兒子交換
queue[j] = queue[k];
queue[k] =tmp;48 k =j;49 }50 }51
52
56 voidheapify() { // purge方法調用heapify重新排序。57 for (int i = size/2; i >= 1; i--)58 fixDown(i);59 }
4. Timer中其他方法:
Timer類中,還包括方法如cancel,purge。
1
8 public voidcancel() {9 synchronized(queue) {10 thread.newTasksMayBeScheduled = false;11 queue.clear();12 queue.notify(); //In case queue was already empty.
13 }14 }15
16
30 public intpurge() {31 int result = 0;32
33 synchronized(queue) {34 for (int i = queue.size(); i > 0; i--) {35 if (queue.get(i).state ==TimerTask.CANCELLED) {36 queue.quickRemove(i);37 result++;38 }39 }40
41 if (result != 0)42 queue.heapify();43 }44
45 returnresult;46 }
5. TimerTask 類
java中TimerTask是一個implements Runnable的抽象類,是以需要自己建一個concrete class。
1 public abstract class TimerTask implementsRunnable {2
5 final Object lock = newObject();6
7
10 int state =VIRGIN;11
12
15 static final int VIRGIN = 0;16
17
21 static final int SCHEDULED = 1;22
23
27 static final int EXECUTED = 2;28
29
32 static final int CANCELLED = 3;33
34
39 longnextExecutionTime;40
41
46 long period = 0;47
48
51 protectedTimerTask() {52 }53
54
57 public abstract voidrun();58
59
66 public booleancancel() {67 synchronized(lock) {68 boolean result = (state == SCHEDULED); //如果EXECUTED或CANCELLED,result = false;
69 state =CANCELLED;70 returnresult;71 }72 }73
74 /**
75 * Returns the scheduled execution time of the most recent76 * actual execution of this task. (If this method is invoked77 * while task execution is in progress, the return value is the scheduled78 * execution time of the ongoing task execution.)79 *80 *
This method is typically invoked from within a task's run method, to81 * determine whether the current execution of the task is sufficiently82 * timely to warrant performing the scheduled activity:83 *
{@code
84 * public void run() {85 * if (System.currentTimeMillis() - scheduledExecutionTime() >=86 * MAX_TARDINESS)87 * return; // Too late; skip this execution.88 * // Perform the task89 * }90 * }
91 * This method is typically not used in conjunction with92 * fixed-delay execution repeating tasks, as their scheduled93 * execution times are allowed to drift over time, and so are not terribly94 * significant.95 *96 *@returnthe time at which the most recent execution of this task was97 * scheduled to occur, in the format returned by Date.getTime().98 * The return value is undefined if the task has yet to commence99 * its first execution.100 *@seeDate#getTime()101 */
102 public longscheduledExecutionTime() { // 通過這個方法,在task的concrete class 裡可以得到這個task下次運作時間是什麼。103 synchronized(lock) {104 return (period < 0 ? nextExecutionTime +period105 : nextExecutionTime -period);106 }107 }108 }
5. 例子
1 public classTimerTest {2 class MyTimerTask extendsTimerTask {3 @Override4 public voidrun() {5 if (System.currentTimeMillis() - scheduledExecutionTime() >= 500) {6 return; //Too late; skip this execution.
7 }8 //Perform the task
9 System.out.println("This is my timer task." + newDate());10 }11 }12
13 public static voidmain(String[] args) {14 TimerTest tt = newTimerTest();15 tt.testTimer();16 }17
18 public voidtestTimer() {19 Timer timer = newTimer();20 MyTimerTask myTimerTask = newMyTimerTask();21
22 timer.schedule(myTimerTask, 1000, 1000); //delay 1 秒後,每秒運作一次
23 try{24 Thread.sleep(3000); //主線程睡3秒,
25 } catch(InterruptedException e) {26
27 }28 timer.cancel(); //主線程醒來,結束timer
29 }30 }
大體的diagram是這樣的:

reference:
https://www.journaldev.com/1050/java-timer-timertask-example