天天看點

Java 多線程程式設計之九:使用 Executors 和 ThreadPoolExecutor 實作的 Java 線程池的例子

        線程池用來管理工作線程的數量,它持有一個等待被執行的線程的隊列。

        java.util.concurrent.Executors 提供了 java.util.concurrent.Executor 接口實作來建立 Java 裡的線程池。我們寫一個簡單的程式來解釋一下它的工作機制。

        首先我們需要有一個 Runnable 類。

        WorkerThread.java

package com.journaldev.threadpool;
 
public class WorkerThread implements Runnable {
     
    private String command;
     
    public WorkerThread(String s){
        this.command=s;
    }
 
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
        processCommand();
        System.out.println(Thread.currentThread().getName()+" End.");
    }
 
    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    @Override
    public String toString(){
        return this.command;
    }
}
           

        這裡是我們使用 Executors 架構建立了一個固定的線程池的測試程式。

        SimpleThreadPool.java

package com.journaldev.threadpool;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class SimpleThreadPool {
 
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            Runnable worker = new WorkerThread("" + i);
            executor.execute(worker);
          }
        executor.shutdown(); // This will make the executor accept no new threads and finish all existing threads in the queue
        while (!executor.isTerminated()) { // Wait until all threads are finish,and also you can use "executor.awaitTermination();" to wait
        }
        System.out.println("Finished all threads");
    }
 
}
           

        程式中我們建立了固定大小為五個工作線程的線程池。然後配置設定給線程池十個工作,因為線程池大小為五,它将啟動五個工作線程先處理五個工作,其他的工作則處于等待狀态,一旦有工作完成,空閑下來工作線程就會撿取等待隊列裡的其他工作進行執行。

        這裡是以上程式的輸出。

pool-1-thread-2 Start. Command = 1

pool-1-thread-4 Start. Command = 3

pool-1-thread-1 Start. Command = 0

pool-1-thread-3 Start. Command = 2

pool-1-thread-5 Start. Command = 4

pool-1-thread-4 End.

pool-1-thread-5 End.

pool-1-thread-1 End.

pool-1-thread-3 End.

pool-1-thread-3 Start. Command = 8

pool-1-thread-2 End.

pool-1-thread-2 Start. Command = 9

pool-1-thread-1 Start. Command = 7

pool-1-thread-5 Start. Command = 6

pool-1-thread-4 Start. Command = 5

pool-1-thread-2 End.

pool-1-thread-4 End.

pool-1-thread-3 End.

pool-1-thread-5 End.

pool-1-thread-1 End.

Finished all threads

        輸出表明線程池中至始至終隻有五個名為 "pool-1-thread-1" 到 "pool-1-thread-5" 的五個線程,這五個線程不随着工作的完成而消亡,會一直存在,并負責執行配置設定給線程池的任務,直到線程池消亡。

        Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實作,但是 ThreadPoolExecutor 提供的功能遠不止于此。我們可以在建立 ThreadPoolExecutor 執行個體時指定活動線程的數量,我們也可以限制線程池的大小并且建立我們自己的 RejectedExecutionHandler 實作來處理不能适應工作隊列的工作。

        這裡是我們自定義的 RejectedExecutionHandler 接口的實作。

        RejectedExecutionHandlerImpl.java

package com.journaldev.threadpool;
 
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
 
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " is rejected");
    }
 
}
           

        ThreadPoolExecutor 提供了一些方法,我們可以使用這些方法來查詢 executor 的目前狀态,線程池大小,活動線程數量以及任務數量。是以我是用來一個監控線程在特定的時間間隔内列印 executor 資訊。

        MyMonitorThread.java

package com.journaldev.threadpool;
 
import java.util.concurrent.ThreadPoolExecutor;
 
public class MyMonitorThread implements Runnable
{
    private ThreadPoolExecutor executor;
     
    private int seconds;
     
    private boolean run=true;
 
    public MyMonitorThread(ThreadPoolExecutor executor, int delay)
    {
        this.executor = executor;
        this.seconds=delay;
    }
     
    public void shutdown(){
        this.run=false;
    }
 
    @Override
    public void run()
    {
        while(run){
                System.out.println(
                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                        this.executor.getPoolSize(),
                        this.executor.getCorePoolSize(),
                        this.executor.getActiveCount(),
                        this.executor.getCompletedTaskCount(),
                        this.executor.getTaskCount(),
                        this.executor.isShutdown(),
                        this.executor.isTerminated()));
                try {
                    Thread.sleep(seconds*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
             
    }
}
           

        這裡是使用 ThreadPoolExecutor 的線程池實作例子。

        WorkerPool.java

package com.journaldev.threadpool;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class WorkerPool {
 
    public static void main(String args[]) throws InterruptedException{
        //RejectedExecutionHandler implementation
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        //Get the ThreadFactory implementation to use
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //creating the ThreadPoolExecutor
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
        //start the monitoring thread
        MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();
        //submit work to the thread pool
        for(int i=0; i<10; i++){
            executorPool.execute(new WorkerThread("cmd"+i));
        }
         
        Thread.sleep(30000);
        //shut down the pool
        executorPool.shutdown();
        //shut down the monitor thread
        Thread.sleep(5000);
        monitor.shutdown();
         
    }
}
           

        注意在初始化 ThreadPoolExecutor 時,我們保持初始池大小為 2,最大池大小為 4 而工作隊列大小為 2。是以如果已經有四個正在執行的任務而此時配置設定來更多任務的話,工作隊列将僅僅保留他們(新任務)中的兩個,其他的将會被 RejectedExecutionHandlerImpl 處理。

        上面程式的輸出可以證明以上觀點。

pool-1-thread-1 Start. Command = cmd0

pool-1-thread-4 Start. Command = cmd5

cmd6 is rejected

pool-1-thread-3 Start. Command = cmd4

pool-1-thread-2 Start. Command = cmd1

cmd7 is rejected

cmd8 is rejected

cmd9 is rejected

[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false

[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false

pool-1-thread-4 End.

pool-1-thread-1 End.

pool-1-thread-2 End.

pool-1-thread-3 End.

pool-1-thread-1 Start. Command = cmd3

pool-1-thread-4 Start. Command = cmd2

[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false

[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false

pool-1-thread-1 End.

pool-1-thread-4 End.

[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false

[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

        注意 executor 的活動任務、完成任務以及所有完成任務,這些數量上的變化。我們可以調用 shutdown() 方法來結束所有送出的任務并終止線程池。

原文連結: http://www.journaldev.com/1069/java-thread-pool-example-using-executors-and-threadpoolexecutor

繼續閱讀