天天看點

線程池(ThreadPoolExecutor)處理異步任務

轉載于: http://blog.csdn.net/u010687392/article/details/49850803

1.前言

我們在開發時候或多或少都會用到線程,而通常建立線程有兩種方式:

  1. 繼承Thread類
  2. 實作Runnable接口

這兩種方式雖然都可以建立線程,但是是有差別的:

  • 主要差別在于在多線程通路同一資源的情況下,用Runnable接口建立的線程可以處理同一資源,而用Thread類建立的線程則各自獨立處理,各自擁有自己的資源,這樣不利于資源共享。
  • 用Thread類建立是繼承的方式,而用Runnable接口建立是實作接口的方式,更靈活多樣性。

是以,在Java中大多數多線程程式都是通過實作Runnable來完成的,而對于Android來說也不例外。

具體實作代碼如下:

new Thread(new Runnable() {
    @Override
    public void run() {
        //TODO.
    }
}).start();
           

上述代碼建立了一個線程并執行,它在任務結束後GC會自動回收該線程,這種處理線上程并發不多的程式中确實不錯,但是當程式有很多地方需要開啟大量線程來處理任務,那麼如果還是用上述的方式去建立線程處理的話,那麼将導緻系統的性能表現的非常糟糕,更别說在記憶體有限的移動裝置上,主要的影響如下:

  • 線程的建立和銷毀都需要時間,當有大量的線程建立和銷毀時,那麼這些時間的消耗則比較明顯,将導緻性能上的缺失
  • 大量的線程建立、執行和銷毀是非常耗cpu和記憶體的,這樣将直接影響系統的吞吐量,導緻性能急劇下降,如果記憶體資源占用的比較多,還很可能造成OOM
  • 大量的線程的建立和銷毀很容易導緻GC頻繁的執行,進而發生記憶體抖動現象,而發生了記憶體抖動,對于移動端來說,最大的影響就是造成界面卡頓

而針對上述所描述的問題,解決的辦法歸根到底就是:重用已有的線程,進而減少線程的建立。

如此就需要使用到線程池(ExecutorService),線程池的基本作用就是進行線程的複用。

使用線程池管理線程的優點:

  1. 線程的建立和銷毀由線程池維護,一個線程在完成任務後并不會立即銷毀,而是由後續的任務複用這個線程,進而減少線程的建立和銷毀,節約系統的開銷
  2. 線程池旨線上程的複用,這就可以節約我們用以往的方式建立線程和銷毀所消耗的時間,減少線程頻繁排程的開銷,進而節約系統資源,提高系統吞吐量
  3. 在執行大量異步任務時提高了性能
  4. Java内置的一套ExecutorService線程池相關的api,可以更友善的控制線程的最大并發數、線程的定時任務、單線程的順序執行等

2.ExecutorService簡介

ExecutorService:它是一個接口,其實如果要從真正意義上來說,它可以叫做線程池的服務,因為它提供了衆多接口api來控制線程池中的線程,而真正意義上的線程池就是:ThreadPoolExecutor,它實作了ExecutorService接口,并封裝了一系列的api使得它具有線程池的特性,其中包括工作隊列、核心線程數、最大線程數等。

ThreadPoolExecutor:我們要建立一個線程池隻需要new ThreadPoolExecutor(…);就可以建立一個線程池,而如果這樣建立線程池的話,我們需要配置一堆東西,非常麻煩:

public ThreadPoolExecutor(int corePoolSize,                             //核心線程池數量
                              int maximumPoolSize,                      //最大線程池大小
                              long keepAliveTime,                       //線程池中超過                                         corePoolSize數目的空閑線程最大存活時間;可以allowCoreThreadTimeOut(true)使得核心線程有效時間
                              TimeUnit unit,                            //keepAliveTime時間機關
                              BlockingQueue<Runnable> workQueue,        //阻塞任務隊列
                              ThreadFactory threadFactory,              //建立線程工廠
                              RejectedExecutionHandler handler) {...}   //當送出任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理
           

官方一般不推薦此種使用方法,而是推薦使用Executors的工廠方法來建立線程池,Executors類是官方提供的一個工廠類,它裡面封裝好了衆多功能不一樣的線程池,進而使得我們建立線程池非常的簡便,主要提供了如下五種功能不一樣的線程池:

  • newFixedThreadPool(int nThreads) :該方法傳回一個固定線程數量的線程池,該線程池中的線程數量始終不變,即不會再建立新的線程,也不會銷毀已經建立好的線程,自始自終都是那幾個固定的線程在工作,是以該線程池可以控制線程的最大并發數。

    eg:假如有一個新任務送出時,線程池中如果有空閑的線程則立即使用空閑線程來處理任務,如果沒有,則會把這個新任務存在一個任務隊列中,一旦有線程空閑了,則按FIFO方式處理任務隊列中的任務。

    *FixedThreadPool是一個典型且優秀的線程池,它具有線程池提高程式效率和節省建立線程時所耗的開銷的優點。但是,線上程池空閑時,即線程池中沒有可運作任務時,它不會釋放工作線程,還會占用一定的系統資源。

  • newCachedThreadPool() :該方法傳回一個可以根據實際情況調整線程池中線程的數量的線程池。即該線程池中的線程數量不确定,是根據實際情況動态調整的。

    eg:線程池中的線程都有一個“保持活動時間”的參數,通過配置它,如果線程池中的空閑線程的空閑時間超過該“儲存活動時間(keepAliveTime)”則立刻停止該線程,而該線程池預設的“保持活動時間”為60s。是以線程池中線程不會越集越多

    *在使用CachedThreadPool時,一定要注意控制任務的數量,否則,由于大量線程同時運作,很有會造成系統癱瘓。

  • newSingleThreadExecutor() :該方法傳回一個隻有一個線程的線程池,即每次隻能執行一個線程任務,多餘的任務會儲存到一個任務隊列中,等待這一個線程空閑,當這個線程空閑了再按FIFO方式順序執行任務隊列中的任務。

    *單工作線程最大的特點是可保證順序地執行各個任務,并且在任意給定的時間不會有多個線程是活動的。

  • newScheduledThreadPool(int corePoolSize):該方法傳回一個可以控制線程池内線程定時或周期性執行某任務的線程池。
  • newSingleThreadScheduledExecutor() : 該方法傳回一個可以控制線程池内線程定時或周期性執行某任務的線程池。隻不過和上面的差別是該線程池大小為1,而上面的可以指定線程池的大小。

ThreadPoolExecutor的構造方法中,workQueue這個任務隊列卻要再次說明一下,它是一個BlockingQueue對象,而泛型則限定它是用來存放Runnable對象的,不同的線程池它的任務隊列實作肯定是不一樣的,是以,保證不同線程池有着不同的功能的核心就是這個workQueue的實作了,細心的會發現在剛剛的用來建立線程池的工廠方法中,針對不同的線程池傳入的workQueue也不一樣:

1、newFixedThreadPool()—>LinkedBlockingQueue        //無界的隊列
2、newSingleThreadExecutor()—>LinkedBlockingQueue 
3、newCachedThreadPool()—>SynchronousQueue          //直接送出的隊列 
4、newScheduledThreadPool()—>DelayedWorkQueue       //等待隊列
5、newSingleThreadScheduledExecutor()—>DelayedWorkQueue
           

**幾種線程池使用場合:

  • 對于需要保證所有送出的任務都要被執行的情況,使用FixedThreadPool
  • 如果限定隻能使用一個線程進行任務處理,使用SingleThreadExecutor
  • 如果希望送出的任務盡快配置設定線程執行,使用CachedThreadPool
  • 如果業務上允許任務執行失敗,或者任務執行過程可能出現執行時間過長進而影響其他業務的應用場景,可以通過使用限定線程數量的線程池以及限定長度的隊列進行容錯處理。

3.線程池ThreadPoolExecutor的使用

(1).Java内置提供的五種常用的線程池

使用線程池,其中涉及到一個極其重要的方法,即:

建立一個固定線程數量的線程池,示例為:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool();
    for (int i = ; i <= ; i++) {
        final int index = i;
        fixedThreadPool.execute(new Runnable() {
             @Override
             public void run() {
                 String threadName = Thread.currentThread().getName();
                 Log.v("zy", "線程:"+threadName+",正在執行第" + index + "個任務");
                 try {
                        Thread.sleep();
                 } catch (InterruptedException e) {
                        e.printStackTrace();
                 }
             }
         });
     }
           

建立一個可以定時或者周期性執行任務的線程池,示例為:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();
        //延遲2秒後執行該任務
        scheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {

            }
        }, , TimeUnit.SECONDS);
        //延遲1秒後,每隔2秒執行一次該任務
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {

            }
        }, , , TimeUnit.SECONDS);
           

(2).自定義線程池ThreadPoolExecutor

實作一個功能是按任務的優先級來處理的線程池

  • 首先我們建立一個基于PriorityBlockingQueue實作的線程池,為了測試友善,我這裡把核心線程數量設定為3,如下:
ExecutorService priorityThreadPool = new ThreadPoolExecutor(,,,TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>());
           
  • 然後建立一個實作Runnable接口的類,并向外提供一個抽象方法供我們實作自定義功能,并實作Comparable接口,實作這個接口主要就是進行優先級的比較,代碼如下:
public abstract class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
    private int priority;

    public PriorityRunnable(int priority) {
        if (priority < )
            throw new IllegalArgumentException();
        this.priority = priority;
    }

    @Override
    public int compareTo(PriorityRunnable another) {
        int my = this.getPriority();
        int other = another.getPriority();
        return my < other ?  : my > other ? - : ;
    }

    @Override
    public void run() {
        doSth();
    }

    public abstract void doSth();

    public int getPriority() {
        return priority;
    }
}
           
  • 使用我們自己的PriorityRunnable送出任務,整體代碼如下:
ExecutorService priorityThreadPool = new ThreadPoolExecutor(, , L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
        for (int i = ; i <= ; i++) {
            final int priority = i;
            priorityThreadPool.execute(new PriorityRunnable(priority) {
                @Override
                public void doSth() {
                    String threadName = Thread.currentThread().getName();
                    Log.v("zxy", "線程:" + threadName + ",正在執行優先級為:" + priority + "的任務");
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
           

可以推斷執行結果,由于核心線程數設定為3,剛開始時,系統有3個空閑線程,是以無須使用任務隊列,而是直接運作前三個任務,而後面再送出任務時由于目前沒有空閑線程是以加入任務隊列中進行等待,此時,由于我們的任務隊列實作是由PriorityBlockingQueue實作的,是以進行等待的任務會經過優先級判斷,優先級高的放在隊列前面先處理。

優先級線程池的優點:建立一個優先級線程池非常有用,它可以線上程池中線程數量不足或系統資源緊張時,優先處理我們想要先處理的任務,而優先級低的則放到後面再處理,這極大改善了系統預設線程池以FIFO方式處理任務的不靈活

(3).擴充線程池ThreadPoolExecutor

除了内置的功能外,ThreadPoolExecutor也向外提供了三個接口供我們自己擴充滿足我們需求的線程池,這三個接口分别是:

beforeExecute() - 任務執行前執行的方法 
afterExecute() -任務執行結束後執行的方法 
terminated() -線程池關閉後執行的方法
           

這三個方法在ThreadPoolExecutor内部都沒有實作.

前面兩個方法我們可以在ThreadPoolExecutor内部的runWorker()方法中找到,而runWorker()是ThreadPoolExecutor的内部類Worker實作的方法,Worker它實作了Runnable接口,也正是線程池内處理任務的工作線程,而Worker.runWorker()方法則是處理我們所送出的任務的方法,它會同時被多個線程通路,是以我們看runWorker()方法的實作,由于涉及到多個線程的異步調用,必然是需要使用鎖來處理,而這裡使用的是Lock來實作的.

是以,我們要擴充線程池,隻需要重寫這三個方法,并實作我們自己的功能即可,這三個方法分别都會在任務執行前調用、任務執行完成後調用、線程池關閉後調用。

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        String threadName = t.getName();
        Log.v("zxy", "線程:" + threadName + "準備執行任務!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        String threadName = Thread.currentThread().getName();
        Log.v("zxy", "線程:" + threadName + "任務執行結束!");
    }

    @Override
    protected void terminated() {
        super.terminated();
        Log.v("zxy", "線程池結束!");
    }
}
           

具有暫時功能的線程池:

public class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }

    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}
           

然後結合上面的優先級線程池的實作,建立具有暫停功能的優先級線程池:

PausableThreadPoolExecutor pausableThreadPoolExecutor = new PausableThreadPoolExecutor(, , L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
        for (int i = ; i <= ; i++) {
            final int priority = i;
            pausableThreadPoolExecutor.execute(new PriorityRunnable(priority) {
                @Override
                public void doSth() {
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            textView.setText(priority + "");
                        }
                    });
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
           

(4).優化線程池ThreadPoolExecutor

雖說線程池極大改善了系統的性能,不過建立線程池也是需要資源的,是以線程池内線程數量的大小也會影響系統的性能,大了反而浪費資源,小了反而影響系統的吞吐量,是以我們建立線程池需要把握一個度才能合理的發揮它的優點,通常來說我們要考慮的因素有CPU的數量、記憶體的大小、并發請求的數量等因素,按需調整。

通常核心線程數可以設為CPU數量+1,而最大線程數可以設為CPU的數量*2+1。

擷取CPU數量的方法為:

shutdown()和shutdownNow()的差別:

1、shutdown()方法在終止前允許執行以前送出的任務。

2、shutdownNow()方法則是阻止正在任務隊列中等待任務的啟動并試圖停止目前正在執行的任務。

(5).優化線程池ThreadPoolExecutor

AsyncTask内部實作其實就是Thread+Handler。其中Handler是為了處理線程之間的通信,而這個Thread是線程池,AsyncTask内部實作了兩個線程池,分别是:串行線程池和固定線程數量的線程池。而這個固定線程數量則是通過CPU的數量決定的。

在預設情況下,我們大都通過AsyncTask::execute()來執行任務的,而execute()内部則是調用executeOnExecutor(sDefaultExecutor, params)方法執行的,第一個參數就是指定處理該任務的線程池,而預設情況下AsyncTask是傳入串行線程池(在這裡不講版本的變化),也就是任務隻能單個的按順序執行。

而我們要是想讓AsyncTask并行的處理任務,大家都知道調AsyncTask::executeOnExecutor(sDefaultExecutor, params)方法傳入這個參數即可:AsyncTask.THREAD_POOL_EXECUTOR。

而這個參數的意義在于為任務指定了一個固定線程數量的線程池去處理,進而達到了并行處理的功能,我們可以在源碼中看到AsyncTask.THREAD_POOL_EXECUTOR這個參數就是一個固定線程數量的線程池:

public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
           

繼續閱讀