天天看點

Java多線程之Executor架構

Executors建立線程池的方法:

  • public static ExecutorService newFixedThreadPool(int nThreads)

    建立固定數目線程的線程池。

  • public static ExecutorService newCachedThreadPool()

    建立一個可緩存的線程池,調用execute 将重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則建立一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

  • public static ExecutorService newSingleThreadExecutor()

    建立一個單線程化的Executor。

  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。

Executor executor = Executors.newFixedThreadPool();  
Runnable task = new Runnable() {  
    @Override  
    public void run() {  
        System.out.println("task over");  
    }  
};  
executor.execute(task);  

executor = Executors.newScheduledThreadPool();  
ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;  
scheduler.scheduleAtFixedRate(task, , , TimeUnit.SECONDS);  
           

ExecutorService與生命周期

ExecutorService擴充了Executor并添加了一些生命周期管理的方法。一個Executor的生命周期有三種狀态,運作 ,關閉 ,終止 。Executor建立時處于運作狀态。當調用ExecutorService.shutdown()後,處于關閉狀态,isShutdown()方法傳回true。這時,不應該再想Executor中添加任務,所有已添加的任務執行完畢後,Executor處于終止狀态,isTerminated()傳回true。

如果Executor處于關閉狀态,往Executor送出任務會抛出unchecked exception RejectedExecutionException。

ExecutorService executorService = (ExecutorService) executor;  
while (!executorService.isShutdown()) {  
    try {  
        executorService.execute(task);  
    } catch (RejectedExecutionException ignored) {  

    }  
}  
executorService.shutdown();  
           

使用Callable,Future傳回結果

Future代表一個異步執行的操作,通過get()方法可以獲得操作的結果,如果異步操作還沒有完成,則,get()會使目前線程阻塞。FutureTask實作了Future和Runable。Callable代表一個有傳回值得操作。

Callable<Integer> func = new Callable<Integer>(){  
    public Integer call() throws Exception {  
        System.out.println("inside callable");  
        Thread.sleep();  
        return new Integer();  
    }         
};        
FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);  
Thread newThread = new Thread(futureTask);  
newThread.start();  

try {  
    System.out.println("blocking here");  
    Integer result = futureTask.get();  
    System.out.println(result);  
} catch (InterruptedException ignored) {  
} catch (ExecutionException ignored) {  
}  
           

ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,傳回Future。如果Executor背景線程池還沒有完成Callable的計算,這調用傳回Future對象的get()方法,會阻塞直到計算完成。

CompletionService

在剛在的例子中,getResult()方法的實作過程中,疊代了FutureTask的數組,如果任務還沒有完成則目前線程會阻塞,如果我們希望任意字任務完成後就把其結果加到result中,而不用依次等待每個任務完成,可以使CompletionService。生産者submit()執行的任務。使用者take()已完成的任務,并按照完成這些任務的順序處理它們的結果 。也就是調用CompletionService的take方法是,會傳回按完成順序放回任務的結果,CompletionService内部維護了一個阻塞隊列BlockingQueue,如果沒有任務完成,take()方法也會阻塞。修改剛才的例子使用CompletionService:

public class ConcurrentCalculator2 {  

    private ExecutorService exec;  
    private CompletionService<Long> completionService;  


    private int cpuCoreNumber;  

    // 内部類  
    class SumCalculator implements Callable<Long> {  
        ......  
    }  

    public ConcurrentCalculator2() {  
        cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
        exec = Executors.newFixedThreadPool(cpuCoreNumber);  
        completionService = new ExecutorCompletionService<Long>(exec);  


    }  

    public Long sum(final int[] numbers) {  
        // 根據CPU核心個數拆分任務,建立FutureTask并送出到Executor  
        for (int i = ; i < cpuCoreNumber; i++) {  
            int increment = numbers.length / cpuCoreNumber + ;  
            int start = increment * i;  
            int end = increment * i + increment;  
            if (end > numbers.length)  
                end = numbers.length;  
            SumCalculator subCalc = new SumCalculator(numbers, start, end);   
            if (!exec.isShutdown()) {  
                completionService.submit(subCalc);  


            }  

        }  
        return getResult();  
    }  

    /** 
     * 疊代每個隻任務,獲得部分和,相加傳回 
     *  
     * @return 
     */  
    public Long getResult() {  
        Long result = l;  
        for (int i = ; i < cpuCoreNumber; i++) {              
            try {  
                Long subSum = completionService.take().get();  
                result += subSum;             
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {  
                e.printStackTrace();  
            }  
        }  
        return result;  
    }  

    public void close() {  
        exec.shutdown();  
    }  
}