天天看點

線程池核心之ScheduledThreadPoolExecutor

線程池核心之ScheduledThreadPoolExecutor

從線程池的UML類圖可以看到,ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類,是以它擁有ThreadPoolExecutor的所有功能,同時它又實作了ScheduledExecutorService接口,是以,這個接口的實作又賦予它新的功能,那就是定時任務的功能。

ScheduledExecutorService

/**
     * Creates and executes a one-shot action that becomes enabled
     * after the given delay.
     *
     * @param command the task to execute
     * @param delay the time from now to delay execution
     * @param unit the time unit of the delay parameter
     * @return a ScheduledFuture representing pending completion of
     *         the task and whose {@code get()} method will return
     *         {@code null} upon completion
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if command is null
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /**
     * Creates and executes a ScheduledFuture that becomes enabled after the
     * given delay.
     *
     * @param callable the function to execute
     * @param delay the time from now to delay execution
     * @param unit the time unit of the delay parameter
     * @param <V> the type of the callable's result
     * @return a ScheduledFuture that can be used to extract result or cancel
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if callable is null
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the given
     * period; that is executions will commence after
     * {@code initialDelay} then {@code initialDelay+period}, then
     * {@code initialDelay + 2 * period}, and so on.
     * If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.  If any execution of this task
     * takes longer than its period, then subsequent executions
     * may start late, but will not concurrently execute.
     *
     * @param command the task to execute
     * @param initialDelay the time to delay first execution
     * @param period the period between successive executions
     * @param unit the time unit of the initialDelay and period parameters
     * @return a ScheduledFuture representing pending completion of
     *         the task, and whose {@code get()} method will throw an
     *         exception upon cancellation
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if command is null
     * @throws IllegalArgumentException if period less than or equal to zero
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the
     * given delay between the termination of one execution and the
     * commencement of the next.  If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.
     *
     * @param command the task to execute
     * @param initialDelay the time to delay first execution
     * @param delay the delay between the termination of one
     * execution and the commencement of the next
     * @param unit the time unit of the initialDelay and delay parameters
     * @return a ScheduledFuture representing pending completion of
     *         the task, and whose {@code get()} method will throw an
     *         exception upon cancellation
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if command is null
     * @throws IllegalArgumentException if delay less than or equal to zero
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
           

接口ScheduledExecutorService有四個方法:

1、ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit),延遲執行一個線程

2、ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit),延遲執行一個有結果回調的線程

3、ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,long period,TimeUnit unit),定時執行一個線程,下一次開始的時間是從上一個任務開始時間算起+period

4、ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit),定時執行一個任務,下一次開始的時間從上一個任務執行結束的時間+delay的時間算起

scheduleAtFixedRate

public class MyThreadPool {
    static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

    public static void main(String[] args) {
        System.out.println("StartTime" + (formatDate(System.currentTimeMillis())));
        scheduledThreadPoolExecutor.scheduleAtFixedRate(new Thread(new MyThread()), 0, 10, TimeUnit.SECONDS);
    }

    private static class MyThread implements Runnable {

        @Override
        public void run() {
            System.out.println("run:" + (formatDate(System.currentTimeMillis())));
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("sleep:" + (formatDate(System.currentTimeMillis())));
        }
    }

    public static String formatDate(long timeLong) {
        SimpleDateFormat sdf = new SimpleDateFormat("mm-ss");
        return sdf.format(timeLong);
    }
}
           

運作結果

1、任務執行完成延遲2秒

線程池核心之ScheduledThreadPoolExecutor

2、任務執行完成延遲11秒,大于period時間

線程池核心之ScheduledThreadPoolExecutor

run是任務開始執行的時間,sleep是為了示範耗時的任務操作,由結果可以得出一個結論:scheduleAtFixedRate下次開始的時間是以上個任務開始時間算起的,并且要等到上個任務執行結束,下個任務才開始執行。

scheduleWithFixedDelay

public class MyThreadPool {
    static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

    public static void main(String[] args) {
        System.out.println("StartTime" + (formatDate(System.currentTimeMillis())));
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(new Thread(new MyThread()), 0, 10, TimeUnit.SECONDS);
    }

    private static class MyThread implements Runnable {

        @Override
        public void run() {
            System.out.println("run:" + (formatDate(System.currentTimeMillis())));
            try {
                Thread.sleep(11000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("sleep:" + (formatDate(System.currentTimeMillis())));
        }
    }

    public static String formatDate(long timeLong) {
        SimpleDateFormat sdf = new SimpleDateFormat("mm-ss");
        return sdf.format(timeLong);
    }
}
           

任務延遲2秒:

線程池核心之ScheduledThreadPoolExecutor

任務延遲11秒:

線程池核心之ScheduledThreadPoolExecutor

run是任務開始執行的時間,sleep是為了示範耗時的任務操作,由結果可以得出一個結論:scheduleWithFixedDelay下次開始的時間是以上個任務完成時間算起的。

ScheduledThreadPoolExecutor的運作機制

線程池核心之ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor的執行示意圖(本文基于JDK8)如上圖。

DelayedWorkQueue是一個無界隊列,是以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中是沒有任何意義的(設定maximumPoolSize的大小是沒有任何效果的)

ScheduledThreadPoolExecutor的執行主要包含兩部分:

1)當調用ScheduledThreadPoolExecutor的scheduledAtFixedRate()或者scheduledWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayWorkQueue中的數組中添加一個RunnableScheduledFuture。

2)線程池中的線程從數組中擷取一個RunnableScheduledFuture,然後執行任務。

源碼求證

scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);    @1
        return t;
    }
           

@1 延遲執行

delayedExecute(t)
private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task); @2
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
           

@2 在隊列中加入該任務,而該隊列就是DelayedWorkQueue,是以,進入DelayedWorkQueue的add() 方法

public boolean add(Runnable e) {
            return offer(e); @3
        }
           

@3 最終調用的是隊列的offer() 方法

offer(Runnable x)
public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();                                 @4
                size = i + 1;
                if (i == 0) {                               @5
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);                       @6
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
           

@4 如果此時的資料大小大于或等于目前隊列的大小,則進行擴容操作

@5 如果此時的資料大小為0,則直接放在隊首

@6 如果此時資料大小不為0,則進行小頂堆調整

siftUp(int k, RunnableScheduledFuture<?> key)
private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;                     @7
                RunnableScheduledFuture<?> e = queue[parent];   @8
                if (key.compareTo(e) >= 0)                      @9
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }
           

@7 擷取父類的下标(在二叉樹表示數組的時候,父子節點的下标關系如下:parentNo=(childNo-1)/2)

@8 擷取到父類節點

@9 進行子節點和父節點的大小比較

compareTo(Delayed other)
public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;                  @10
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)  @11
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
           

@[email protected] 先比較時間,時間小的排在前面(時間早的任務将先被執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面。

以上内容從源碼了解了在任務隊列中添加一個任務,接下來将從源碼了解線程池從任務隊列中擷取一個任務并執行任務的流程

take()

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);               @1
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
           

@1 忽略其它,直接進入核心代碼,當延遲時間小于會等于0的時候,從任務隊列的隊首取出一個任務

finishPoll(first)

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }
           

到此,就分析完了ScheduledThreadPoolExecutor。

繼續閱讀