天天看點

Java并發——ScheduledExecutorService

ScheduledExecutorService

類 Scheduledexecutor Service的主要作用就是可以将定時任務與線程池功能結合使用。

Java并發——ScheduledExecutorService

ScheduledThreadPoolExecutor使用Callable延遲運作

public class MyCallableAA implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            System.out.println("callA begin " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
            Thread.sleep(3000);
            System.out.println("callA end " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "returnA";
    }
}



public class MyCallableBB implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("callB begin " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        System.out.println("callB end " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        return "returnB";
    }
}


public class MyRun1 {
    public static void main(String[] args) {
        try {
            List<Callable> callables = new ArrayList<>();
            callables.add(new MyCallableAA());
            callables.add(new MyCallableBB());
            //調用方法newSingleThreadScheduledExecutor(),取得一個單任務的計劃任務執行池
            ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
            ScheduledFuture<String> futureA = executorService.schedule(callables.get(0), 4L, TimeUnit.SECONDS);
            ScheduledFuture<String> futureB = executorService.schedule(callables.get(1), 4L, TimeUnit.SECONDS);

            System.out.println("     X=" + System.currentTimeMillis());
            System.out.println("傳回值A:" + futureA.get());
            System.out.println("傳回值B:" + futureB.get());
            System.out.println("     Y=" + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
           

從X到Y的運作時間為7秒,阻塞點是get()方法。

從運作結果中可以發現:

public <V> Scheduled Future<V> schedule(Callable<v> callable, long delay, Time Unit unit)

方法中的第2個參數在多個任務中同時消耗時間,并不是一個任務執行完畢後再等待4秒繼續執行的效果。由于第1個任務從計劃任務到運作結束需要用時7秒,那麼第2個任務其實是想在第4秒被執行,由于是單任務的計劃任務池,是以第2個任務的執行時間被延後3秒。

Java并發——ScheduledExecutorService

在此實驗中使用工廠類 Executors的

newSingleThreadScheduledExecutor

方法來建立

ScheduledExecutorService對象,但傳回的真正對象卻是

Scheduledthreadpoolexecutor

,因為

Scheduled Thread PoolExecutor實作了ScheduledexecutorService接口。

Java并發——ScheduledExecutorService

ScheduledThreadPoolExecutor使用Runnable延遲運作

public class MyRunnableAA implements Runnable {
    @Override
    public void run() {
        try {
            System.out.println("runnableA begin " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
            Thread.sleep(3000);
            System.out.println("runnableA end " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class MyRunnableBB implements Runnable {
    @Override
    public void run() {
        System.out.println("runnableB begin " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        System.out.println("runnableB end " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
    }
}

public class MyRun2 {
    public static void main(String[] args) {
        List<Runnable> list = new ArrayList<>();
        list.add(new MyRunnableAA());
        list.add(new MyRunnableBB());
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

        System.out.println("     X=" + System.currentTimeMillis());
        executorService.schedule(list.get(0), 0L, TimeUnit.SECONDS);
        executorService.schedule(list.get(1), 1L, TimeUnit.SECONDS);
        System.out.println("     Y=" + System.currentTimeMillis());
    }
}
           
Java并發——ScheduledExecutorService

延遲運作并取得傳回值

public class MyRun3 {
    public static void main(String[] args) {
        Callable<String> callable = new Callable<String>() {
            public String call() throws Exception {
                System.out.println("a call run =" + System.currentTimeMillis());
                return "returnA";
            }
        };

        try {
            List<Callable> list = new ArrayList<>();
            list.add(callable);
            ScheduledExecutorService exector = Executors.newSingleThreadScheduledExecutor();
            System.out.println("     X=" + System.currentTimeMillis());
            ScheduledFuture future = exector.schedule(list.get(0), 4L, TimeUnit.SECONDS);
            System.out.println(future.get() + "    A=" + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
           
Java并發——ScheduledExecutorService

使用scheduleAtFixedRate()方法實作周期性執行

  • command:執行線程
  • initialDelay:初始化延時
  • period:兩次開始執行最小間隔時間
  • unit:計時機關

執行任務時間大于>period預定的周期時間,也就是産生了逾時的效果:

public class MyRun4 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("   begin=" + System.currentTimeMillis() +
                            " ThreadName=" + Thread.currentThread().getName());
                    Thread.sleep(4000);
                    System.out.println("   end=" + System.currentTimeMillis() +
                            " ThreadName=" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        System.out.println("  X=" + System.currentTimeMillis());
        executor.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);
        System.out.println("  Y=" + System.currentTimeMillis());
    }
}
           

任務執行時間(4)大于周期時間(2),是以産生逾時效果,每次執行任務不再間隔2秒。

Java并發——ScheduledExecutorService

執行任務時間<period的時間:

public class MyRun5 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("  begin=" + System.currentTimeMillis() +
                        " ThreadName=" + Thread.currentThread().getName());
                System.out.println("  end=" + System.currentTimeMillis() +
                        " ThreadName=" + Thread.currentThread().getName());
            }
        };

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        System.out.println("  X=" + System.currentTimeMillis());
        executor.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);
        System.out.println("  Y=" + System.currentTimeMillis());
    }
}
           

每次間隔2秒執行。

Java并發——ScheduledExecutorService

注意:

scheduleAtFixedRatel方法傳回的 Scheduled

Future對象無法獲得傳回值,也就是

scheduleAtFixedRate()方法不具有獲得傳回值的功能,而 schedule方法卻可以獲得傳回值。

是以當使用 scheduleAtFixedRate方法實作重複運作任務的效果時,需要結合自定義 Runnable接口的實作類,不要使用FutureTask類,因為FutureTask類并不能實作重複運作的效果。

使用scheduleWithFixedDelay()方法實作周期性執行

方法 schedule With FixedDelay(的主要作用是設定多個任務之間固定的運作時間間隔。

執行任務時間大于> period定的時間

public class MyRun6 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("  begin=" + System.currentTimeMillis() +
                            " ThreadName=" + Thread.currentThread().getName());
                    Thread.sleep(4000);
                    System.out.println("  end=" + System.currentTimeMillis() +
                            " ThreadName=" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        System.out.println("  X=" + System.currentTimeMillis());
        executor.scheduleWithFixedDelay(runnable, 1, 2, TimeUnit.SECONDS);
        System.out.println("  Y=" + System.currentTimeMillis());
    }
}
           

固定每隔2秒執行一次。

Java并發——ScheduledExecutorService

執行任務時間<period:

public class MyRun6 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("  begin=" + System.currentTimeMillis() +
                        " ThreadName=" + Thread.currentThread().getName());
                System.out.println("  end=" + System.currentTimeMillis() +
                        " ThreadName=" + Thread.currentThread().getName());
            }
        };

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        System.out.println("  X=" + System.currentTimeMillis());
        executor.scheduleWithFixedDelay(runnable, 1, 2, TimeUnit.SECONDS);
        System.out.println("  Y=" + System.currentTimeMillis());
    }
}
           

固定每隔2秒執行一次。

Java并發——ScheduledExecutorService

getQueue()與remove()方法

方法 getQueue()的作用是取得隊列中的任務,而這些任務是未來将要運作的,正在運作的任務不在此隊列中。使用 scheduleAtFixedRateO和 scheduleWithFixedDelay()兩個方法實作周期性執行任務時,未來欲執行的任務都是放入此隊列中。

public class MyRun7 {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

        Runnable_A r1 = new Runnable_A("A");
        Runnable_A r2 = new Runnable_A("B");
        Runnable_A r3 = new Runnable_A("C");
        Runnable_A r4 = new Runnable_A("D");
        Runnable_A r5 = new Runnable_A("E");

        System.out.println(r1.hashCode());
        System.out.println(r2.hashCode());
        System.out.println(r3.hashCode());
        System.out.println(r4.hashCode());
        System.out.println(r5.hashCode());

        executor.scheduleAtFixedRate(r1, 10, 2, TimeUnit.SECONDS);
        executor.scheduleAtFixedRate(r2, 10, 2, TimeUnit.SECONDS);
        executor.scheduleAtFixedRate(r3, 10, 2, TimeUnit.SECONDS);
        executor.scheduleAtFixedRate(r4, 10, 2, TimeUnit.SECONDS);
        executor.scheduleAtFixedRate(r5, 10, 2, TimeUnit.SECONDS);

        System.out.println("");

        BlockingQueue<Runnable> queue = executor.getQueue();
        Iterator<Runnable> it = queue.iterator();
        while (it.hasNext()) {
            Runnable next = it.next();
            System.out.println("隊列中的:" + next);
        }
    }
}

class Runnable_A implements Runnable {
    private String username;

    public Runnable_A(String username) {
        this.username = username;
    }

    public String getUsername() {
        return username;
    }

    @Override
    public void run() {
        System.out.println("run! username=" + Thread.currentThread().getName());
    }
}
           
Java并發——ScheduledExecutorService
public class MyRun7 {
    public static void main(String[] args) throws InterruptedException {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

        Runnable_A r1 = new Runnable_A("A");
        Runnable_A r2 = new Runnable_A("B");

        ScheduledFuture<?> future1 = executor.scheduleAtFixedRate(r1, 0, 2, TimeUnit.SECONDS);
        Thread.sleep(1000);
        ScheduledFuture<?> future2 = executor.scheduleAtFixedRate(r2, 10, 2, TimeUnit.SECONDS);
        Thread.sleep(5000);
        System.out.println(executor.remove((Runnable) future2));
        System.out.println("");

        BlockingQueue<Runnable> queue = executor.getQueue();
        Iterator<Runnable> it = queue.iterator();
        while (it.hasNext()) {
            Runnable next = it.next();
            System.out.println("隊列中的:" + next);
        }
    }
}

class Runnable_A implements Runnable {
    private String username;

    public Runnable_A(String username) {
        this.username = username;
    }

    public String getUsername() {
        return username;
    }

    @Override
    public void run() {
        System.out.println("run! username=" + Thread.currentThread().getName());
    }
}
           
Java并發——ScheduledExecutorService

setExecuteExistingDelayedTasksAfterShtudownPolicy()

方法

setExecuteExisting DelayedTasksAfterShutdownPolicy()

的作用是當對 ScheduledThreadPoolExecutor執行了shutdown方法時,任務是否繼續運作,預設值是true,也就是

當調用了 shutdown方法時任務還是繼續運作,

當使用 set ExecuteExistingDelayedTasksAfterShutdownPolicy(alse)時任務不再運作。

public class MyRun8 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("任務執行!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        executor.schedule(runnable, 3, TimeUnit.SECONDS);
        executor.shutdown();
        System.out.println("main end");
    }
}
           
Java并發——ScheduledExecutorService

注意:

方法setExecuteExistingDelayedTasksAfterShutdownPolicy可以與 schedule()和 shutdown()方法聯合使用,但

setExecuteExistingDelayedTasksAfterShutdownPolicy()方法不能與 scheduleatFixedRate()和 scheduleWithFixedDelay()方法聯合使用。

那麼如果想實作 shutdown關閉線程池後,池中的任務還會繼續重複運作,則要将 scheduleatFixedRate和 scheduleWithFixedDelayo方法與 setContinueExistingPeriodicTasksAfterShutdownPolicy()方法聯合使用。

setContinueExistingPeriodicTasksAfterShutdownPolicy()

方法 setContinueExistingPeriodicTasksAfterShutdownPolicy()傳入tue的作用是當使用 scheduleAtFixedRate()方法或 scheduleWithFixedDelay()方法時,如果調用 ScheduledThreadPoolExecutor對象的 shutdown()方法,任務還會繼續運作,傳入false時任務不運作,程序銷毀。

public class MyRun8 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("任務執行!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        executor.scheduleWithFixedDelay(runnable, 0,3, TimeUnit.SECONDS);
        executor.shutdown();
        System.out.println("main end");
    }
}
           
Java并發——ScheduledExecutorService

cancel(boolean)與setRemoveOnCancelPolicy()

  • 方法 cancel(boolean)的作用設定是否取消任務。
  • 方法 setRemove On CancelPolicy (boolean)的作用設定是否将取消後的任務從隊列中清除。
public class MyRun9 {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
        Runnable runnable1 = new Runnable_B("A");
        ScheduledFuture<?> future = executor.schedule(runnable1, 1, TimeUnit.SECONDS);
        System.out.println(future.cancel(true));
        System.out.println("");
        BlockingQueue<Runnable> queue = executor.getQueue();
        Iterator<Runnable> it = queue.iterator();
        while (it.hasNext()) {
            Runnable next = it.next();
            System.out.println("隊列中的:" + next);
        }
        System.out.println("main end");
    }
}

class Runnable_B implements Runnable {

    private String username;

    public Runnable_B(String username) {
        this.username = username;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (Thread.currentThread().isInterrupted() == true) {
                    throw new InterruptedException();
                }
                System.out.println("run! username=" + username + " " + Thread.currentThread().getName());
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("中斷了!");
        }
    }
}
           

删除成功,但是還保留在隊列中。

Java并發——ScheduledExecutorService
Java并發——ScheduledExecutorService

删除成功,從隊列中剔除。

Java并發——ScheduledExecutorService