轉自: http://www.2cto.com/kf/201312/267018.html
ThreadPool 先看成員變量Executor mExecutor。
線程池的基本思想還是一種對象池的思想,開辟一塊記憶體空間,裡面存放了衆多(未死亡)的線程,池中線程執行排程由池管理器來處理。當有線程任務時,從池中取一個,執行完成後線程對象歸池,這樣可以避免反複建立線程對象所帶來的性能開銷,節省了系統的資源。
用線程池來管理的好處是,可以保證系統穩定運作,适用與有大量線程,高工作量的情景下使用,假如要展示1000張圖檔如果建立1000個線程去加載,系統肯定會死掉。用線程池就可以避免這個問題,可以用5個線程輪流執行,5個一組,執行完的線程不直接回收而是等待下次執行,這樣對系統的開銷就可以減小不少。
=======================Executor的譯文部分==========================
Executor是Java工具類,執行送出給它的Runnable任務。該接口提供了一種基于任務運作機制的任務送出方法,包括線程使用詳細資訊,時序等等。Executor通常用于替代建立多線程。例如:你可能會使用以下方式來代替建立線程集合中的線程new Thread(new(RunnableTask())).start()。
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
盡管如此,Executor接口沒有明确要求執行過程是異步的。舉個最簡單的例子,一個Executor可以在調用者的線程中運作送出的任務。
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
更典型的是,任務也可以運作在其他的線程而不是調用者線程。以下代碼就是在Executor中生成新的線程。
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
很多Executor的實作按照任務的實作方式和時間來分類,下面的代碼将送出的任務序列化給第二個Executor,闡述了一個組合的Executor。
class SerialExecutor implements Executor {
final Queue tasks = new ArrayDeque();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
}
以上代碼簡答講就是執行一個 SerialExecutor時,先執行Runnable的run(),然後再從Tasks任務堆棧中找到目前激活的任務并執行。
在這個package包中實作的Executor實作了ExecutorService,它是個擴充接口。
而threadPoolExecutor類提供了一個擴充的線程池實作。Executors類給這些Executors提供了友善的工程方法。
記憶體一緻性效果:在送出一個Runnable對象給Executor執行之前,線程中的行為可能先在另一個線程中發生。
=======================Executor的譯文部分==========================
Java裡面線程池的頂級接口是Executor,但是嚴格意義上講Executor并不是一個線程池,而隻是一個執行線程的工具。真正的線程池接口是ExecutorService。
根據線程池的執行政策,Executor的execute()可能在新線程中執行,或者線上程池中的某個線程中執行,也可能是在調用者線程中執行。ExecutorService在Executor的基礎上增加了兩個核心方法:
1、Future submit(Runnable task)
2、 Future submit(Callable task)
差異點:這兩個方法都可以向線程池送出任務,差別在于Runnable執行完run()有傳回值,而Callable執行完call()後有傳回值。
共同點:submit都傳回Future對象,Future對象可以阻塞線程直到運作完畢,也可以取消任務執行和檢測任務是否執行完畢。
在executors類裡面提供了一些靜态工廠,生成一些常用的線程池:
1、newSingleThreadExecutor:建立一個單線程的線程池。這個線程池隻有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那麼會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的送出順序執行。
2、newFixedThreadPool:建立固定大小的線程池。每次送出一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那麼線程池會補充一個新線程。
3、newCachedThreadPool:建立一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那麼就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于作業系統(或者說JVM)能夠建立的最大線程大小。
4、newScheduledThreadPool:建立一個大小無限的線程池。此線程池支援定時以及周期性執行任務的需求。
5、newSingleThreadExecutor:建立一個單線程的線程池。此線程池支援定時以及周期性執行任務的需求。
下面再介紹下ThreadPoolExecutor函數,以便對線程池有進一步認識:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler);
corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的機關
workQueue: 線程池所使用的緩沖隊列
handler: 線程池對拒絕任務的處理政策
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀态,也要建立新的線程來處理被添加的任務。
2、如果此時線程池中的數量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那麼任務被放入緩沖隊列。
3、如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量小于maximumPoolSize,建新的線程來處理被添加的任務。
4、如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize,那麼通過 handler所指定的政策來處理此任務。
也就是說,處理任務的優先級為:
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
簡單的例子:
ThreadPoolTestMain.java
package threadpool.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTestMain {
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 4;
private static final int KEEP_ACTIVE_TIME = 3;
private static final int TASK_NUM = 10;
private static final int PRODUCE_SLEEP_TIME = 10;
static public void main(String[] args) {
// 構造一個線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ACTIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 1; i < TASK_NUM; i++) {
String name = "Task" + i;
try {
System.out.println("ThreadPoolTestMain: put a task: " + name);
threadPool.execute(new ThreadPoolTask(name));
Thread.sleep(20);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
ThreadPoolTask.java
package threadpool.test;
public class ThreadPoolTask implements Runnable {
private String mTaskName;
private static int CONSUME_SLEEP_TIME = 2000;
public ThreadPoolTask(String name) {
mTaskName = name;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName());
System.out.println("ThreadPoolTask :" + mTaskName);
try {
Thread.sleep(CONSUME_SLEEP_TIME);
} catch (Exception err) {
err.printStackTrace();
}
}
}</runnable>