天天看點

線程池 ThreadPool 概念、應用執行個體

轉自: http://www.2cto.com/kf/201312/267018.html

ThreadPool 先看成員變量Executor mExecutor。

線程池的基本思想還是一種對象池的思想,開辟一塊記憶體空間,裡面存放了衆多(未死亡)的線程,池中線程執行排程由池管理器來處理。當有線程任務時,從池中取一個,執行完成後線程對象歸池,這樣可以避免反複建立線程對象所帶來的性能開銷,節省了系統的資源。

用線程池來管理的好處是,可以保證系統穩定運作,适用與有大量線程,高工作量的情景下使用,假如要展示1000張圖檔如果建立1000個線程去加載,系統肯定會死掉。用線程池就可以避免這個問題,可以用5個線程輪流執行,5個一組,執行完的線程不直接回收而是等待下次執行,這樣對系統的開銷就可以減小不少。

=======================Executor的譯文部分==========================

Executor是Java工具類,執行送出給它的Runnable任務。該接口提供了一種基于任務運作機制的任務送出方法,包括線程使用詳細資訊,時序等等。Executor通常用于替代建立多線程。例如:你可能會使用以下方式來代替建立線程集合中的線程new Thread(new(RunnableTask())).start()。

Executor executor = anExecutor;

 executor.execute(new RunnableTask1());

 executor.execute(new RunnableTask2());

 ...

盡管如此,Executor接口沒有明确要求執行過程是異步的。舉個最簡單的例子,一個Executor可以在調用者的線程中運作送出的任務。

class DirectExecutor implements Executor {

    public void execute(Runnable r) {

        r.run();

    }

}

更典型的是,任務也可以運作在其他的線程而不是調用者線程。以下代碼就是在Executor中生成新的線程。

class ThreadPerTaskExecutor implements Executor {

    public void execute(Runnable r) {

        new Thread(r).start();

    }

}

很多Executor的實作按照任務的實作方式和時間來分類,下面的代碼将送出的任務序列化給第二個Executor,闡述了一個組合的Executor。

class SerialExecutor implements Executor {

   final Queue tasks = new ArrayDeque();

   final Executor executor;

   Runnable active;

   SerialExecutor(Executor executor) {

     this.executor = executor;

   public synchronized void execute(final Runnable r) {

     tasks.offer(new Runnable() {

       public void run() {

         try {

           r.run();

         } finally {

           scheduleNext();

         }

       }

     });

     if (active == null) {

       scheduleNext();

     }

   }

   protected synchronized void scheduleNext() {

     if ((active = tasks.poll()) != null) {

       executor.execute(active);

     }

   }

 }

}

以上代碼簡答講就是執行一個 SerialExecutor時,先執行Runnable的run(),然後再從Tasks任務堆棧中找到目前激活的任務并執行。

在這個package包中實作的Executor實作了ExecutorService,它是個擴充接口。

而threadPoolExecutor類提供了一個擴充的線程池實作。Executors類給這些Executors提供了友善的工程方法。

記憶體一緻性效果:在送出一個Runnable對象給Executor執行之前,線程中的行為可能先在另一個線程中發生。

=======================Executor的譯文部分==========================

Java裡面線程池的頂級接口是Executor,但是嚴格意義上講Executor并不是一個線程池,而隻是一個執行線程的工具。真正的線程池接口是ExecutorService。

根據線程池的執行政策,Executor的execute()可能在新線程中執行,或者線上程池中的某個線程中執行,也可能是在調用者線程中執行。ExecutorService在Executor的基礎上增加了兩個核心方法:

1、Future submit(Runnable task)

2、 Future submit(Callable task)

差異點:這兩個方法都可以向線程池送出任務,差別在于Runnable執行完run()有傳回值,而Callable執行完call()後有傳回值。

共同點:submit都傳回Future對象,Future對象可以阻塞線程直到運作完畢,也可以取消任務執行和檢測任務是否執行完畢。

在executors類裡面提供了一些靜态工廠,生成一些常用的線程池:

1、newSingleThreadExecutor:建立一個單線程的線程池。這個線程池隻有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那麼會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的送出順序執行。

2、newFixedThreadPool:建立固定大小的線程池。每次送出一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那麼線程池會補充一個新線程。

3、newCachedThreadPool:建立一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那麼就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于作業系統(或者說JVM)能夠建立的最大線程大小。

4、newScheduledThreadPool:建立一個大小無限的線程池。此線程池支援定時以及周期性執行任務的需求。

5、newSingleThreadExecutor:建立一個單線程的線程池。此線程池支援定時以及周期性執行任務的需求。

下面再介紹下ThreadPoolExecutor函數,以便對線程池有進一步認識:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue,

RejectedExecutionHandler handler);

corePoolSize: 線程池維護線程的最少數量

maximumPoolSize:線程池維護線程的最大數量

keepAliveTime: 線程池維護線程所允許的空閑時間

unit: 線程池維護線程所允許的空閑時間的機關

workQueue: 線程池所使用的緩沖隊列

handler: 線程池對拒絕任務的處理政策

當一個任務通過execute(Runnable)方法欲添加到線程池時:

1、如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀态,也要建立新的線程來處理被添加的任務。

2、如果此時線程池中的數量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那麼任務被放入緩沖隊列。

3、如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量小于maximumPoolSize,建新的線程來處理被添加的任務。

4、如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize,那麼通過 handler所指定的政策來處理此任務。

也就是說,處理任務的優先級為:

核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。

簡單的例子:

ThreadPoolTestMain.java

package threadpool.test;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class ThreadPoolTestMain {

    private static final int CORE_POOL_SIZE = 2;

    private static final int MAX_POOL_SIZE = 4;

    private static final int KEEP_ACTIVE_TIME = 3;

    private static final int TASK_NUM = 10;

    private static final int PRODUCE_SLEEP_TIME = 10;

    static public void main(String[] args) {

    // 構造一個線程池  

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, 

                            MAX_POOL_SIZE, 

                            KEEP_ACTIVE_TIME, 

                            TimeUnit.SECONDS, 

                            new ArrayBlockingQueue<runnable>(3),  

                            new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 1; i < TASK_NUM; i++) {

            String name = "Task" + i;

            try {

                System.out.println("ThreadPoolTestMain: put a task: " + name);

                threadPool.execute(new ThreadPoolTask(name));

                Thread.sleep(20);

            } catch (Exception err) {

                err.printStackTrace();

            }

        }

    }

}

ThreadPoolTask.java

package threadpool.test;

public class ThreadPoolTask implements Runnable {

    private String mTaskName;

    private static int CONSUME_SLEEP_TIME = 2000;

    public ThreadPoolTask(String name) {

        mTaskName = name;

    }

    @Override

    public void run() {

        // TODO Auto-generated method stub

        System.out.println(Thread.currentThread().getName());

        System.out.println("ThreadPoolTask :" + mTaskName);

        try {

            Thread.sleep(CONSUME_SLEEP_TIME);

        } catch (Exception err) {  

            err.printStackTrace();

        }

    }

}</runnable>