天天看點

并發新特性—Executor 架構與線程池

并發新特性—Executor 架構與線程池

Executor 架構簡介

在 Java 5 之後,并發程式設計引入了一堆新的啟動、排程和管理線程的API。Executor 架構便是 Java 5 中引入的,其内部使用了線程池機制,它在 java.util.cocurrent 包下,通過該架構來控制線程的啟動、執行和關閉,可以簡化并發程式設計的操作。是以,在 Java 5之後,通過 Executor 來啟動線程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用線程池實作,節約開銷)外,還有關鍵的一點:有助于避免 this 逃逸問題——如果我們在構造器中啟動一個線程,因為另一個任務可能會在構造器結束之前開始執行,此時可能會通路到初始化了一半的對象用 Executor 在構造器中。

Executor 架構包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。

Executor 接口中之定義了一個方法 execute(Runnable command),該方法接收一個 Runable 執行個體,它用來執行一個任務,任務即一個實作了 Runnable 接口的類。ExecutorService 接口繼承自 Executor 接口,它提供了更豐富的實作多線程的方法,比如,ExecutorService 提供了關閉自己的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。 可以調用 ExecutorService 的 shutdown()方法來平滑地關閉 ExecutorService,調用該方法後,将導緻 ExecutorService 停止接受任何新的任務且等待已經送出的任務執行完成(已經送出的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經送出的任務執行完畢後将會關閉 ExecutorService。是以我們一般用該接口來實作和管理多線程。

ExecutorService 的生命周期包括三種狀态:運作、關閉、終止。建立後便進入運作狀态,當調用了 shutdown()方法時,便進入關閉狀态,此時意味着 ExecutorService 不再接受新的任務,但它還在執行已經送出了的任務,當素有已經送出了的任務執行完後,便到達終止狀态。如果不調用 shutdown()方法,ExecutorService 會一直處在運作狀态,不斷接收新的任務,執行新的任務,伺服器端一般不需要關閉它,保持一直運作即可。

Executors 提供了一系列工廠方法用于創先線程池,傳回的線程池都實作了 ExecutorService 接口。

public static ExecutorService newFixedThreadPool(int nThreads)

建立固定數目線程的線程池。

public static ExecutorService newCachedThreadPool()

建立一個可緩存的線程池,調用execute将重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則建立一個新線 程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

public static ExecutorService newSingleThreadExecutor()

建立一個單線程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

建立一個支援定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。

這四種方法都是用的 Executors 中的 ThreadFactory 建立的線程,下面就以上四個方法做個比較:

newCachedThreadPool()

  • 緩存型池子,先檢視池中有沒有以前建立的線程,如果有,就 reuse 如果沒有,就建一個新的線程加入池中
  • 緩存型池子通常用于執行一些生存期很短的異步型任務是以在一些面向連接配接的 daemon 型 SERVER 中用得不多。但對于生存期短的異步任務,它是 Executor 的首選。
  • 能 reuse 的線程,必須是 timeout IDLE 内的池中線程,預設 timeout 是 60s,超過這個 IDLE 時長,線程執行個體将被終止及移出池。

注意,放入 CachedThreadPool 的線程不必擔心其結束,超過 TIMEOUT 不活動,其會自動被終止。

newFixedThreadPool(int)

  • newFixedThreadPool 與 cacheThreadPool 差不多,也是能 reuse 就用,但不能随時建新的線程。
  • 其獨特之處:任意時間點,最多隻能有固定數目的活動線程存在,此時如果有新的線程要建立,隻能放在另外的隊列中等待,直到目前的線程中某個線程終止直接被移出池子。
  • 和 cacheThreadPool 不同,FixedThreadPool 沒有 IDLE 機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的 TCP 或 UDP IDLE 機制之類的),是以 FixedThreadPool 多數針對一些很穩定很固定的正規并發線程,多用于伺服器。
  • 從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,隻不過參數不同:
  • fixed 池線程數固定,并且是0秒IDLE(無IDLE)。
  • cache 池線程數支援 0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60 秒 IDLE 。

newScheduledThreadPool(int)

  • 排程型線程池
  • 這個池子裡的線程可以按 schedule 依次 delay 執行,或周期執行

SingleThreadExecutor()

  • 單例線程,任意時間池中隻能有一個線程
  • 用的是和 cache 池和 fixed 池相同的底層池,但線程數目是 1-1,0 秒 IDLE(無 IDLE)

一般來說,CachedTheadPool 在程式執行過程中通常會建立與所需數量相同的線程,然後在它回收舊線程時停止建立新線程,是以它是合理的 Executor 的首選,隻有當這種方式會引發問題時(比如需要大量長時間面向連接配接的線程時),才需要考慮用 FixedThreadPool。(該段話摘自《Thinking in Java》第四版)

Executor 執行 Runnable 任務

通過 Executors 的以上四個靜态工廠方法獲得 ExecutorService 執行個體,而後調用該執行個體的 execute(Runnable command)方法即可。一旦 Runnable 任務傳遞到 execute()方法,該方法便會自動在一個線程上執行。下面是 Executor 執行 Runnable 任務的示例代碼:

import java.util.concurrent.ExecutorService;   
import java.util.concurrent.Executors;   
public class TestCachedThreadPool{

public static void main(String[] args){

ExecutorService executorService = Executors.newCachedThreadPool();

//      ExecutorService executorService = Executors.newFixedThreadPool(5);

//      ExecutorService executorService = Executors.newSingleThreadExecutor();

for (int i = 0; i < 5; i++){

executorService.execute(new TestRunnable());

System.out.println("************* a" + i + " *************");

}

executorService.shutdown();

}

}

class TestRunnable implements Runnable{

public void run(){

System.out.println(Thread.currentThread().getName() + "線程被調用了。");

}

}        

執行後的結果如下:

從結果中可以看出,pool-1-thread-1 和 pool-1-thread-2 均被調用了兩次,這是随機的,execute 會首先線上程池中選擇一個已有空閑線程來執行任務,如果線程池中沒有空閑線程,它便會建立一個新的線程來執行任務。

Executor 執行 Callable 任務

在 Java 5 之後,任務分兩類:一類是實作了 Runnable 接口的類,一類是實作了 Callable 接口的類。兩者都可以被 ExecutorService 執行,但是 Runnable 任務沒有傳回值,而 Callable 任務有傳回值。并且 Callable 的 call()方法隻能通過 ExecutorService 的 submit(Callable task) 方法來執行,并且傳回一個 Future,是表示任務等待完成的 Future。

Callable 接口類似于 Runnable,兩者都是為那些其執行個體可能被另一個線程執行的類設計的。但是 Runnable 不會傳回結果,并且無法抛出經過檢查的異常而 Callable 又傳回結果,而且當擷取傳回結果時可能會抛出異常。Callable 中的 call()方法類似 Runnable 的 run()方法,差別同樣是有傳回值,後者沒有。

當将一個 Callable 的對象傳遞給 ExecutorService 的 submit 方法,則該 call 方法自動在一個線程上執行,并且會傳回執行結果 Future 對象。同樣,将 Runnable 的對象傳遞給 ExecutorService 的 submit 方法,則該 run 方法自動在一個線程上執行,并且會傳回執行結果 Future 對象,但是在該 Future 對象上調用 get 方法,将傳回 null。

下面給出一個 Executor 執行 Callable 任務的示例代碼:

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   
public class CallableDemo{

public static void main(String[] args){

ExecutorService executorService = Executors.newCachedThreadPool();

List<Future<String>> resultList = new ArrayList<Future<String>>();

    //建立10個任務并執行   
    for (int i = 0; i < 10; i++){   
        //使用ExecutorService執行Callable類型的任務,并将結果儲存在future變量中   
        Future<String> future = executorService.submit(new TaskWithResult(i));   
        //将任務執行結果存儲到List中   
        resultList.add(future);   
    }   

    //周遊任務的結果   
    for (Future<String> fs : resultList){   
            try{   
                while(!fs.isDone);//Future傳回如果沒有完成,則一直循環等待,直到Future傳回完成  
                System.out.println(fs.get());     //列印各個線程(任務)執行的結果   
            }catch(InterruptedException e){   
                e.printStackTrace();   
            }catch(ExecutionException e){   
                e.printStackTrace();   
            }finally{   
                //啟動一次順序關閉,執行以前送出的任務,但不接受新任務  
                executorService.shutdown();   
            }   
    }   
}   

}
class TaskWithResult implements Callable<String>{

private int id;

public TaskWithResult(int id){   
    this.id = id;   
}   

/**  
 * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
 * 則該方法自動在一個線程上執行 
 */   
public String call() throws Exception {  
    System.out.println("call()方法被自動調用!!!    " + Thread.currentThread().getName());   
    //該傳回結果将被Future的get方法得到  
    return "call()方法被自動調用,任務傳回的結果是:" + id + "    " + Thread.currentThread().getName();   
}   

}        

執行結果如下:

從結果中可以同樣可以看出,submit 也是首先選擇空閑線程來執行任務,如果沒有,才會建立新的線程來執行任務。另外,需要注意:如果 Future 的傳回尚未完成,則 get()方法會阻塞等待,直到 Future 完成傳回,可以通過調用 isDone()方法判斷 Future 是否完成了傳回。

自定義線程池

自定義線程池,可以用 ThreadPoolExecutor 類建立,它有多個構造方法來建立線程池,用該類很容易實作自定義的線程池,這裡先貼上示例程式:

import java.util.concurrent.ArrayBlockingQueue;   
import java.util.concurrent.BlockingQueue;   
import java.util.concurrent.ThreadPoolExecutor;   
import java.util.concurrent.TimeUnit;   
public class ThreadPoolTest{

public static void main(String[] args){

//建立等待隊列

BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);

//建立線程池,池中儲存的線程數為3,允許的最大線程數為5

ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);

//建立七個任務

Runnable t1 = new MyThread();

Runnable t2 = new MyThread();

Runnable t3 = new MyThread();

Runnable t4 = new MyThread();

Runnable t5 = new MyThread();

Runnable t6 = new MyThread();

Runnable t7 = new MyThread();

//每個任務會在一個線程上執行

pool.execute(t1);

pool.execute(t2);

pool.execute(t3);

pool.execute(t4);

pool.execute(t5);

pool.execute(t6);

pool.execute(t7);

//關閉線程池

pool.shutdown();

}

}

class MyThread implements Runnable{

@Override

public void run(){

System.out.println(Thread.currentThread().getName() + "正在執行。。。");

try{

Thread.sleep(100);

}catch(InterruptedException e){

e.printStackTrace();

}

}

}        

運作結果如下:

從結果中可以看出,七個任務是線上程池的三個線程上執行的。這裡簡要說明下用到的 ThreadPoolExecuror 類的構造方法中各個參數的含義。

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)      
  • corePoolSize:線程池中所儲存的核心線程數,包括空閑線程。
  • maximumPoolSize:池中允許的最大線程數。
  • keepAliveTime:線程池中的空閑線程所能持續的最長時間。
  • unit:持續時間的機關。
  • workQueue:任務執行前儲存任務的隊列,僅儲存由 execute 方法送出的 Runnable 任務。

根據 ThreadPoolExecutor 源碼前面大段的注釋,我們可以看出,當試圖通過 excute 方法講一個 Runnable 任務添加到線程池中時,按照如下順序來處理:

  1. 如果線程池中的線程數量少于 corePoolSize,即使線程池中有空閑線程,也會建立一個新的線程來執行新添加的任務;
  2. 如果線程池中的線程數量大于等于 corePoolSize,但緩沖隊列 workQueue 未滿,則将新添加的任務放到 workQueue 中,按照 FIFO 的原則依次等待執行(線程池中有線程空閑出來後依次将緩沖隊列中的任務傳遞給空閑的線程執行);
  3. 如果線程池中的線程數量大于等于 corePoolSize,且緩沖隊列 workQueue 已滿,但線程池中的線程數量小于 maximumPoolSize,則會建立新的線程來處理被添加的任務;
  4. 如果線程池中的線程數量等于了 maximumPoolSize,有 4 種才處理方式(該構造方法調用了含有 5 個參數的構造方法,并将最後一個構造方法為 RejectedExecutionHandler 類型,它在處理線程溢出時有 4 種方式,這裡不再細說,要了解的,自己可以閱讀下源碼)。

總結起來,也即是說,當有新的任務要處理時,先看線程池中的線程數量是否大于 corePoolSize,再看緩沖隊列 workQueue 是否滿,最後看線程池中的線程數量是否大于 maximumPoolSize。

另外,當線程池中的線程數量大于 corePoolSize 時,如果裡面有線程的空閑時間超過了 keepAliveTime,就将其移除線程池,這樣,可以動态地調整線程池中線程的數量。

我們大緻來看下 Executors 的源碼,newCachedThreadPool 的不帶 RejectedExecutionHandler 參數(即第五個參數,線程數量超過 maximumPoolSize 時,指定處理方式)的構造方法如下:

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}      

它将 corePoolSize 設定為 0,而将 maximumPoolSize 設定為了 Integer 的最大值,線程空閑超過 60 秒,将會從線程池中移除。由于核心線程數為 0,是以每次添加任務,都會先從線程池中找空閑線程,如果沒有就會建立一個線程(SynchronousQueue決定的,後面會說)來執行新的任務,并将該線程加入到線程池中,而最大允許的線程數為 Integer 的最大值,是以這個線程池理論上可以不斷擴大。

再來看 newFixedThreadPool 的不帶 RejectedExecutionHandler 參數的構造方法,如下:

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}        

它将 corePoolSize 和 maximumPoolSize 都設定為了 nThreads,這樣便實作了線程池的大小的固定,不會動态地擴大,另外,keepAliveTime 設定為了 0,也就是說線程隻要空閑下來,就會被移除線程池,敢于 LinkedBlockingQueue 下面會說。

幾種排隊的政策

  • 直接送出。緩沖隊列采用 SynchronousQueue,它将任務直接交給線程處理而不保持它們。如果不存在可用于立即運作任務的線程(即線程池中的線程都在工作),則試圖把任務加入緩沖隊列将會失敗,是以會構造一個新的線程來處理新添加的任務,并将其加入到線程池中。直接送出通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新送出的任務。newCachedThreadPool 采用的便是這種政策。
  • 無界隊列。使用無界隊列(典型的便是采用預定義容量的 LinkedBlockingQueue,理論上是該緩沖隊列可以對無限多的任務排隊)将導緻在所有 corePoolSize 線程都工作的情況下将新任務加入到緩沖隊列中。這樣,建立的線程就不會超過 corePoolSize,也是以,maximumPoolSize 的值也就無效了。當每個任務完全獨立于其他任務,即任務執行互不影響時,适合于使用無界隊列。newFixedThreadPool采用的便是這種政策。
  • 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(一般緩沖隊列使用 ArrayBlockingQueue,并制定隊列的最大長度)有助于防止資源耗盡,但是可能較難調整和控制,隊列大小和最大池大小需要互相折衷,需要設定合理的參數。

繼續閱讀