Java并發定時任務線程池--------定時任務ScheduledThreadPoolExecutor
我們了解的ThreadPoolExecutor是java的普通線程池,而ScheduledThreadPoolExecutor是java提供的定時任務線程池。今天就跟大家談一下我對定時線程池ScheduledThreadPoolExecutor的了解。
ScheduledThreadPoolExecutor的使用
常用
java.util.concurrent.ScheduledThreadPoolExecutor#schedule 定時任務
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate 固定速率連續執行
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleWithFixedDelay非固定速率連續執行
java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue延遲隊列
源碼分析:

初始化ScheduledThreadPoolExecutor
排程核心構造器
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
scheduleAtFixedRate方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));//處理時間
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t); return t;
}
delayedExecute#ScheduledThreadPoolExecutor方法
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);//增加任務 子類實作了
//總結:add方法是通過DelayedWorkQueue(初始化時候指定的隊列) 延遲隊列實作 offer擷取對象的延遲
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) && //判斷是否已經停止
remove(task))
task.cancel(false);
else
ensurePrestart();
}
Offer#DelayedWorkQueue 二叉樹堆排序算法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x; //内部類
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)//判斷是否擴容
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;//這個隊列是我們核心
setIndex(e, 0); //第一個直接設定索引和下标0
} else {
siftUp(i, e);//看這兒
}
if (queue[0] == e) {
leader = null;
available.signal();//喚醒 }
} finally {
lock.unlock();
}
return true;
}
siftUp#DelayedWorkQueue保證相同的
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
compareTo#ScheduledFutureTask
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time; //判斷time
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
我們在回到開始的地方,根據剛才我們跟代碼可以看到執行時間的順序已經配置設定好了,那如何確定work可以運作了?
確定有work執行
ensurePrestart#ThreadPoolExecutor
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
放到隊列 runwork take對象
take#DelayedWorkQueue
//調用start>run>runWorker->getTask>take方法
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null) //第一個有沒有 沒有等着
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//到時間了
if (delay <= 0)//到時間了
return finishPoll(first);
else if (leader != null)
available.await();//因為沒有執行線程初始化,是以等等什麼時候有了自己被他人喚醒
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);//各種condition的awaitNanos 帶時間的
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
finishPoll#ScheduledThreadPoolExecutor
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
int s = --size;
RunnableScheduledFuture x = queue[s];//重新排列
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
差別scheduleAtFixedRate 和scheduleWithFixedDelay 有什麼差別嗎?
(構造方法中實作了ScheduledFutureTask)
run#ScheduledFutureTask
public void run() {
boolean periodic = isPeriodic(); //判斷是否是周期的
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();//點進去
reExecutePeriodic(outerTask);// 重置task 沒有異常捕捉
}
}
setNextRunTime#ScheduledFutureTask
long p = period;
if (p > 0) //看這個大于0 和小于0的差別
time += p; //假如延遲了這個時間早過了,+目前時候肯定還是過的。
else
time = triggerTime(-p); //取的目前的任務延遲
參數
task–這是被排程的任務。
delay–這是以毫秒為機關的延遲之前的任務執行。
period–這是在連續執行任務之間的毫秒的時間。
實戰:
實戰與工作中注意事項
有異常一定要捕獲,要不job不會執行了
單列模式(Double check lock)
雙重鎖定
釋出與逸出這塊以單列模式舉例,在高并發下如何實作一個線程安全的單列模式。
會涉及到這些知識點1、懶漢模式、2、餓漢模式、3、synchronized 4、volatile 5、枚舉、6重排序等
内部類
枚舉類
AbstractQueuedSynchronizer(AQS)同步器:
公平鎖、非公平鎖
NonfairSync
cpu 随機調用task
總體來說,ScheduedThreadPoolExecutor的重點是要了解下次執行時間的計算,以及優 先隊列的出隊、入隊和删除的過程,這兩個是了解ScheduedThreadPoolExecutor的關 鍵。