Executors建立線程池的方法:
-
public static ExecutorService newFixedThreadPool(int nThreads)
建立固定數目線程的線程池。
-
public static ExecutorService newCachedThreadPool()
建立一個可緩存的線程池,調用execute 将重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則建立一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
-
public static ExecutorService newSingleThreadExecutor()
建立一個單線程化的Executor。
-
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。
Executor executor = Executors.newFixedThreadPool();
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("task over");
}
};
executor.execute(task);
executor = Executors.newScheduledThreadPool();
ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
scheduler.scheduleAtFixedRate(task, , , TimeUnit.SECONDS);
ExecutorService與生命周期
ExecutorService擴充了Executor并添加了一些生命周期管理的方法。一個Executor的生命周期有三種狀态,運作 ,關閉 ,終止 。Executor建立時處于運作狀态。當調用ExecutorService.shutdown()後,處于關閉狀态,isShutdown()方法傳回true。這時,不應該再想Executor中添加任務,所有已添加的任務執行完畢後,Executor處于終止狀态,isTerminated()傳回true。
如果Executor處于關閉狀态,往Executor送出任務會抛出unchecked exception RejectedExecutionException。
ExecutorService executorService = (ExecutorService) executor;
while (!executorService.isShutdown()) {
try {
executorService.execute(task);
} catch (RejectedExecutionException ignored) {
}
}
executorService.shutdown();
使用Callable,Future傳回結果
Future代表一個異步執行的操作,通過get()方法可以獲得操作的結果,如果異步操作還沒有完成,則,get()會使目前線程阻塞。FutureTask實作了Future和Runable。Callable代表一個有傳回值得操作。
Callable<Integer> func = new Callable<Integer>(){
public Integer call() throws Exception {
System.out.println("inside callable");
Thread.sleep();
return new Integer();
}
};
FutureTask<Integer> futureTask = new FutureTask<Integer>(func);
Thread newThread = new Thread(futureTask);
newThread.start();
try {
System.out.println("blocking here");
Integer result = futureTask.get();
System.out.println(result);
} catch (InterruptedException ignored) {
} catch (ExecutionException ignored) {
}
ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,傳回Future。如果Executor背景線程池還沒有完成Callable的計算,這調用傳回Future對象的get()方法,會阻塞直到計算完成。
CompletionService
在剛在的例子中,getResult()方法的實作過程中,疊代了FutureTask的數組,如果任務還沒有完成則目前線程會阻塞,如果我們希望任意字任務完成後就把其結果加到result中,而不用依次等待每個任務完成,可以使CompletionService。生産者submit()執行的任務。使用者take()已完成的任務,并按照完成這些任務的順序處理它們的結果 。也就是調用CompletionService的take方法是,會傳回按完成順序放回任務的結果,CompletionService内部維護了一個阻塞隊列BlockingQueue,如果沒有任務完成,take()方法也會阻塞。修改剛才的例子使用CompletionService:
public class ConcurrentCalculator2 {
private ExecutorService exec;
private CompletionService<Long> completionService;
private int cpuCoreNumber;
// 内部類
class SumCalculator implements Callable<Long> {
......
}
public ConcurrentCalculator2() {
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
completionService = new ExecutorCompletionService<Long>(exec);
}
public Long sum(final int[] numbers) {
// 根據CPU核心個數拆分任務,建立FutureTask并送出到Executor
for (int i = ; i < cpuCoreNumber; i++) {
int increment = numbers.length / cpuCoreNumber + ;
int start = increment * i;
int end = increment * i + increment;
if (end > numbers.length)
end = numbers.length;
SumCalculator subCalc = new SumCalculator(numbers, start, end);
if (!exec.isShutdown()) {
completionService.submit(subCalc);
}
}
return getResult();
}
/**
* 疊代每個隻任務,獲得部分和,相加傳回
*
* @return
*/
public Long getResult() {
Long result = l;
for (int i = ; i < cpuCoreNumber; i++) {
try {
Long subSum = completionService.take().get();
result += subSum;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
public void close() {
exec.shutdown();
}
}