天天看點

Java定時任務線程池ScheduledThreadPoolExecutor

Java并發定時任務線程池--------定時任務ScheduledThreadPoolExecutor

我們了解的ThreadPoolExecutor是java的普通線程池,而ScheduledThreadPoolExecutor是java提供的定時任務線程池。今天就跟大家談一下我對定時線程池ScheduledThreadPoolExecutor的了解。

ScheduledThreadPoolExecutor的使用

常用
java.util.concurrent.ScheduledThreadPoolExecutor#schedule 定時任務
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate 固定速率連續執行
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleWithFixedDelay非固定速率連續執行
java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue延遲隊列
           

源碼分析:

Java定時任務線程池ScheduledThreadPoolExecutor

初始化ScheduledThreadPoolExecutor

排程核心構造器

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
           

scheduleAtFixedRate方法

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));//處理時間
    RunnableScheduledFuture<Void> t = decorateTask(command, sft); 
    sft.outerTask = t;
    delayedExecute(t);    return t;
}
           

delayedExecute#ScheduledThreadPoolExecutor方法

if (isShutdown())
    reject(task);
else {
    super.getQueue().add(task);//增加任務 子類實作了
//總結:add方法是通過DelayedWorkQueue(初始化時候指定的隊列) 延遲隊列實作 offer擷取對象的延遲

    if (isShutdown() &&
        !canRunInCurrentRunState(task.isPeriodic()) && //判斷是否已經停止
        remove(task))
        task.cancel(false);
    else
        ensurePrestart();
}
           

Offer#DelayedWorkQueue 二叉樹堆排序算法

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x; //内部類
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)//判斷是否擴容
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;//這個隊列是我們核心
            setIndex(e, 0); //第一個直接設定索引和下标0
        } else {
            siftUp(i, e);//看這兒 
        }
        if (queue[0] == e) {
            leader = null;
            available.signal();//喚醒 }
    } finally {
        lock.unlock();
    }
    return true;
}
           

siftUp#DelayedWorkQueue保證相同的

while (k > 0) {
    int parent = (k - 1) >>> 1; 
    RunnableScheduledFuture<?> e = queue[parent];
    if (key.compareTo(e) >= 0)
        break; 
    queue[k] = e;
    setIndex(e, k);
    k = parent;
}
queue[k] = key;
setIndex(key, k);
           

compareTo#ScheduledFutureTask

if (other == this) // compare zero if same object
    return 0;
if (other instanceof ScheduledFutureTask) {
    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
    long diff = time - x.time; //判斷time
    if (diff < 0)
        return -1;
    else if (diff > 0)
        return 1;
    else if (sequenceNumber < x.sequenceNumber)
        return -1;
    else
        return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
           

我們在回到開始的地方,根據剛才我們跟代碼可以看到執行時間的順序已經配置設定好了,那如何確定work可以運作了?

確定有work執行

ensurePrestart#ThreadPoolExecutor

int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
    addWorker(null, true);
else if (wc == 0)
    addWorker(null, false);
           

放到隊列 runwork take對象

take#DelayedWorkQueue

//調用start>run>runWorker->getTask>take方法
public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture first = queue[0];
            if (first == null) //第一個有沒有 沒有等着
                available.await();
            else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);//到時間了
                if (delay <= 0)//到時間了
                    return finishPoll(first);
                else if (leader != null)
                    available.await();//因為沒有執行線程初始化,是以等等什麼時候有了自己被他人喚醒
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);//各種condition的awaitNanos 帶時間的
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
           

finishPoll#ScheduledThreadPoolExecutor

private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
    int s = --size;
    RunnableScheduledFuture x = queue[s];//重新排列
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}
           

差別scheduleAtFixedRate 和scheduleWithFixedDelay 有什麼差別嗎?

(構造方法中實作了ScheduledFutureTask)

run#ScheduledFutureTask

public void run() {
    boolean periodic = isPeriodic(); //判斷是否是周期的
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();//點進去
        reExecutePeriodic(outerTask);// 重置task 沒有異常捕捉
    }
}
           

setNextRunTime#ScheduledFutureTask

long p = period;
if (p > 0)  //看這個大于0 和小于0的差別
    time += p;  //假如延遲了這個時間早過了,+目前時候肯定還是過的。
else
    time = triggerTime(-p); //取的目前的任務延遲
           

參數

task–這是被排程的任務。

delay–這是以毫秒為機關的延遲之前的任務執行。

period–這是在連續執行任務之間的毫秒的時間。

實戰:

實戰與工作中注意事項

有異常一定要捕獲,要不job不會執行了

單列模式(Double check lock)

雙重鎖定

釋出與逸出這塊以單列模式舉例,在高并發下如何實作一個線程安全的單列模式。

會涉及到這些知識點1、懶漢模式、2、餓漢模式、3、synchronized 4、volatile 5、枚舉、6重排序等

内部類

枚舉類

AbstractQueuedSynchronizer(AQS)同步器:

公平鎖、非公平鎖

NonfairSync

cpu 随機調用task

Java定時任務線程池ScheduledThreadPoolExecutor
Java定時任務線程池ScheduledThreadPoolExecutor

總體來說,ScheduedThreadPoolExecutor的重點是要了解下次執行時間的計算,以及優 先隊列的出隊、入隊和删除的過程,這兩個是了解ScheduedThreadPoolExecutor的關 鍵。