天天看點

Java 線程池-ThreadPoolExecutor

ThreadPoolExecutor

首先來看下 ThreadPoolExecutor 的 API 文檔,這個文檔裡寫了很多知識點,http://docs.oracle.com/javase/8/docs/api/
public class ThreadPoolExecutor
extends AbstractExecutorService

An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.
// 一個 ExecutorService 可以執行每個送出到池中的線程,一般使用 Executors 中的工廠方法去配置

Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks.
// 線程池解決兩個不同的問題:當執行大量異步任務時,就需要去改善提高效率(線程用于在需要執行大量異步任務的情況下),由于每個任務調用開銷減少,他們提供了一個管理資源的辦法,包括線程,執行一個隊列任務的消費,每個 ThreadPoolExecutor 都會對已經完成的線程進行統計。


To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient Executors factory methods Executors.newCachedThreadPool() (unbounded thread pool, with automatic thread reclamation), Executors.newFixedThreadPool(int) (fixed size thread pool) and Executors.newSingleThreadExecutor() (single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class:
// 為了在廣泛的上下文中使用,這個類提供了靈活的參數和可擴充的hooks,推薦程式員使用 Executors 中的工廠方法去調用,這個方法預先設定了很多通用場景,另外,你也可以閱讀下面的手冊,去手動配置調用這個類
//1. Executors.newCachedThreadPool(無限制的線程池,自動回收線程)
//2. Executors.newFixedThreadPool(int) (固定大小的線程池)
//3. Executors.newSingleThreadExecutor() (單背景線程)  

Core and maximum pool sizes
// 核心和最大的線程數
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).
// ThreadPoolExecutor 會根據 corePoolSize 和 maximumPoolSize 去調整池中的線程數,當一個新的任務送出到方法執行時,如果小于 corePoolSize 的線程在執行,即線程池中有空閑的線程,會建立的新的線程去處理請求。如果運作的線程數大于 corePoolSize 并且 小于 maximumPoolSize,隻要隊列滿了,就會建立新的線程。如果 corePoolSize 和 maximumPoolSize 是一樣的,那麼就是建立了固定的線程池, 如果設定 maximumPoolSize 不限制,比如 Integer.MAX_VALUE ,那麼就是允許線程池容納任意數量的并發任務,最典型的,核心和最大的線程數在構造方法中建立,但是也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 方法去動态的改變。

On-demand construction
By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method prestartCoreThread() or prestartAllCoreThreads(). You probably want to prestart threads if you construct the pool with a non-empty queue.
// 預設情況下,當新的任務到達的時候,線程才會被建立和使用,但是可以用  prestartCoreThread() or prestartAllCoreThreads() 去動态的重寫,你也許會想要在建立一個非空的隊列的線程池的時候去預啟動線程

Creating new threads
New threads are created using a ThreadFactory. If not otherwise specified, a Executors.defaultThreadFactory() is used, that creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread's name, thread group, priority, daemon status, etc. If a ThreadFactory fails to create a thread when asked by returning null from newThread, the executor will continue, but might not be able to execute any tasks. Threads should possess the "modifyThread" RuntimePermission. If worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed.
// 使用 ThreadFactory 去建立一個新的線程,如果沒有特殊的指定,會調用 Executors.defaultThreadFactory() ,這樣建立的線程會線上程組,并且他們的優先級都是NORM_PRIORITY,并且都是非守護狀态,通過不同的 ThreadFactory, 你可以去修改線程的名字,線程的 group, 優先級,守護狀态等等,當建立線程失敗時,這個 executor 會繼續,可能會不執行任何的任務. 線程需要掌控 "modifyThread" 運作時權限,如果工作線程或者線程池中的線程不能掌控這個權限,服務就會降級,configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed.


Keep-alive times
If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see getKeepAliveTime(TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero.
// 如果池中有超過 corePoolSize 的線程, 如果他們空閑的時間超過 keepAliveTime 則會被中止。 當線程池沒有被充分利用的情況下,這樣是一種可以減少資源的消耗的方式,如果線程池變的很活躍,那麼新的線程就會被建立。使用 setKeepAliveTime 可以動态的修改這個參數,使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 可以有效的確定空閑線程不會被關閉,預設情況下,隻有線上程數超過 corePoolSize 時,keep-alive 參數才會生效,
allowCoreThreadTimeOut(boolean) 可以用來設定還沒超過線程數超過 corePoolSize 時的線程的 keepAliveTime。 


Queuing
Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
-If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
-If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
-If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
// 任何 BlockingQueue 可以用用來傳遞和儲存送出的任務,這個隊列和線程池大小是互相關聯的:
// 如果小于 corePoolSize 的線程在運作,執行器通常會增加一個新的線程,而不是排隊(讓任務進入隊列),如果目前運作的線程數達到或超過 corePoolSize ,執行器往往會把任務加入到隊列而不是建立新的線程,
//如果一個任務不能加入隊列,并且不超過 maximumPoolSize,就會建立一個新的線程,在這種情況下(超過maximumPoolSize),這個任務會被駁回。

There are three general strategies for queuing:
// 這裡有3種政策的隊列
Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
// 直接傳遞。如果想要不幹涉任務的線程,并不持有他們,預設的好的選擇就是使用 SynchronousQueue 。 這裡, 如果沒有立即可用的線程去執行,那麼隊列的任務就會失敗,是以一個新的線程就會被建立。當處理大量的有内部關聯的請求時,這個政策就可以避免鎖。Direct handoffs 一般需要無界的 maximumPoolSizes 去避免拒絕新送出的任務,這反過來承認,當指令繼續平均到達速度比處理速度快時,可能會出現無限線程增長的可能性。

Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.
// 無界隊列. 使用無界隊列( 比如LinkedBlockingQueue 就沒有預先定義容量 )将對導緻任務在隊列中等待,當所有的 corePoolSize 線程繁忙. 是以,不再有比 corePoolSize  的線程去建立,(是以 maximumPoolSize 的值不會有任何影響 ),每個線程間互相獨立,是以任務間也不會互相影響,舉個例子,在web服務端,在平滑的請求增加時,這樣的隊列政策就顯得很有用,同時在任務送出數量平均速度大于線程池可以處理的速度時,可以使無限的任務繼續送出。 

Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.
// 有界隊列,有界隊列可以在有限的 maximumPoolSizes 時防止資源枯竭(ArrayBlockingQueue),但是會比較難的去調整和控制,隊列的大小和 maximum pool 的大小可能會此消彼長(互相影響),使用大的隊列,小的線程池會降低CPU 的使用率,系統資源,上下文切換的開銷,但是可以降低吞吐量; 如果任務頻繁的阻塞(比如I/O bound),系統就會去安排更多的線程,超過你允許的,使用小隊列通常需要大的線程池,會增加 CPU的資源,排程成本,造成吞吐量降低。
// 這一段大緻的意思是,隊列大小 和 maximumPoolSizes ,誰高誰低一定要配置設定好,不然都會造成吞吐量降低

Rejected tasks
New tasks submitted in method execute(Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) method of its RejectedExecutionHandler. 
Four predefined handler policies are provided:
// 當 Executor 關閉後,一個新添加的任務(execute(Runnable)) 就會被拒絕,同樣的情況也會出現在 Executor 使用了最大的線程數和最大的隊列數 (兩者都飽和了),不管哪種情況,執行器就會去調用 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)  ,這裡提供了4個預定義的 handler 政策

-In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
// ThreadPoolExecutor.AbortPolicy ,拒絕任務時,抛出RejectedExecutionException

-In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
// ThreadPoolExecutor.CallerRunsPolicy ,調用線程會執行該任務,這個政策提供了一個回報機制去減慢新任務的送出

-In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
// 任務不能被執行,放棄。

-In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
// 如果執行器沒有關閉,工作隊列頭的任務将會被丢棄,然後執行器重新 
 嘗試執行任務(如果失敗,則重複這一過程) 

It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.
// 我們也可以自己定義RejectedExecutionHandler,以适應特殊的容量和隊列政策場景中。 

Hook methods
This class provides protected overridable beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.
// 這個類提供了受保護和可覆寫的 beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable)  可以在執行任務前後調用,這樣就可以去操控執行的環境。

If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.
// 如果鈎子和回調方法抛出異常,内部的工作線程會失敗并且終止!

Queue maintenance
//隊列維護
Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge() are available to assist in storage reclamation when large numbers of queued tasks become cancelled.
// 方法 getQueue() 允許通路工作隊列,達到監聽調試的目的,不建議用這個方法去做别的事,當大量的任務被取消後,可以調用 remove(Runnable) and purge() 方法去進行回收。

Finalization
// 終止
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to call shutdown(), then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting allowCoreThreadTimeOut(boolean).
Extension example. Most extensions of this class override one or more of the protected hook methods. For example, here is a subclass that adds a simple pause/resume feature:
// 當線程池不在被引用或者被有剩餘的線程後,将會自動關閉,如果你忘記調用 shutdown(),又想確定無引用的線程池被回收,你必須安排沒有使用的線程能夠終止,通過設定合适的  keep-alive times ,使用低核心線程的池,并設定allowCoreThreadTimeOut(boolean)

 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
   private boolean isPaused;
   private ReentrantLock pauseLock = new ReentrantLock();
   private Condition unpaused = pauseLock.newCondition();

   public PausableThreadPoolExecutor(...) { super(...); }

   protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch (InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
   }

   public void pause() {
     pauseLock.lock();
     try {
       isPaused = true;
     } finally {
       pauseLock.unlock();
     }
   }

   public void resume() {
     pauseLock.lock();
     try {
       isPaused = false;
       unpaused.signalAll();
     } finally {
       pauseLock.unlock();
     }
   }
 }
           
上面的 API 文檔資料翻譯的不是很好,但是可以用過上面的知識點進行總結:
  1. 在 ThreadPoolExecutor 中定義了4個構造方法,而線程池的構造也是基于這4個方法,Executors 工廠方法中最終調用的也是這些構造方法。其它三個構造器也最終對下面的構造器進行調用,對下面的參數來做個解釋
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Creates a new ThreadPoolExecutor with the given initial parameters.
           
  • corePoolSize 核心線程池的大小,理論上建立線程池後,池中是沒有任何的線程的,隻有當線程提到線程池中時,線程才會被建立,當然可以使用 prestartCoreThread() 和 prestartAllCoreThreads() 去預啟動線程,當線程池中的線程超過corePoolSize 時,任務就會被加到 Queue 中等待。
  • maximumPoolSize 線程池中最大的線程數
  • keepAliveTime 一個線程在空閑多久後會被銷毀,預設情況下,隻有線程數超過 corePoolSize 時,keepAliveTime 才會起作用,當然也可以調用 allowCoreThreadTimeOut(boolean) 的方法,則當線程數小于corePoolSize時,keepAliveTime 也會起作用.
  • TimeUnit keepAliveTime的時間機關,有7種

    DAYS

    HOURS

    MICROSECONDS

    MILLISECONDS

    MINUTES

    NANOSECONDS

    SECONDS

  • workQueue 工作隊列,用來存儲等待的任務:

    ArrayBlockingQueue;

    LinkedBlockingQueue;

    SynchronousQueue;

  • threadFactory 建立線程的工廠
  • handler 拒絕任務的政策

    ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。

    ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

    ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

然後來看關閉的方法, 看代碼

import java.util.concurrent.*;
import java.util.List;

public class Test2 {
    public static void main(String[] args) throws Exception {
        ExecutorService myExecutorService  = Executors.newFixedThreadPool();
        for (int i =  ;i<  ;i++){
            myExecutorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep();
                        System.out.println(Thread.currentThread().getName());
                    } catch (Exception e) {

                    }
                }
            });
        }
        System.out.println("start shutdown");
        List<Runnable> runnables = myExecutorService.shutdownNow();
        System.out.println(runnables.size()); // 5

        // myExecutorService.shutdown();
    }
}
           

總結:

1. shutdown 不會立即關閉,而是等待所有任務(包括正在執行的,正在等待的)執行完畢後才會終止,同僚不會接受新的任務

2. shutdownNow 則立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,傳回沒有執行的任務的集合

在來看下 送出的方法,送出任務有兩個方法,一個是execute ,還有一個submit,差別就是:

1. execute 沒有傳回值

2. submit 可以有傳回值(使用Callable),也可以沒有傳回值(使用Runnable)

import java.util.concurrent.*;
import java.util.List;

public class Test3 {
    public static void main(String[] args) throws Exception {
        ExecutorService myExecutorService  = Executors.newFixedThreadPool();
        for (int i =  ; i <  ; i++) {
            Future<String> future = myExecutorService.submit(new Callable<String>() {

                @Override
                public String call() {
                    try {
                        Thread.sleep();
                        System.out.println(Thread.currentThread().getName());
                    } catch (Exception e) {

                    }
                    return Thread.currentThread().getName() + " callback";
                }
            });
            System.out.println(future.get());

        }
        System.out.println("start shutdown");
        myExecutorService.shutdown();

    }
}
           

參考文檔

http://blog.csdn.net/sinat_36263171/article/details/52764205

http://donald-draper.iteye.com/blog/2366934

http://www.cnblogs.com/dolphin0520/p/3932921.html