天天看點

Java多線程 之 ThreadPoolExecutor(九)

最近在工作中遇到了ThreadPoolExecutor的使用,而且是由于它的配置不當導緻了線上問題。下面對其進行簡單介紹。

先看看ThreadPoolExecutor常用的構造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
           

其中:

(1)corePoolSize:線程池中核心線程的個數

(2)maximumPoolSize:線程池中最大的線程個數

(3)long keepAliveTime,TimeUnit unit:線程池中超過corePoolSize的那些線程,如果空閑,能夠活線上程池中的最長時間

(4)workQueue:工作隊列(感覺叫等待隊列比較好),一般使用LinkedBlockingQueue。

(5)handler:處理政策

private static ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(, , L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new ThreadPoolExecutor.CallerRunsPolicy());
           

threadPoolExecutor.execute(runnableTask);

ThreadPoolExecutor是通過execute方法将任務添加到線程池中。在java中要注意任務與線程的差別。感覺有點像程式和程序的差別。任務是一個靜态的概念,隻是用來表示要做的事情,而線程是一個動态的概念,線程是用來驅動任務的,也就是說任務要附着線上程上去執行。

下面詳細說明下這5個參數的含義,大體經過以下過程:

a.當線程池中的線程數量少于corePoolSize時,當新的任務被execute送出到線程池時,不管線程池中是否有空閑線程,即使有空閑線程也要建立新線程。

b.當線程池中的線程數到達corePoolSize的上限時,當新的任務被execute送出到線程池時,這個新送出的任務會被添加到等待隊列中。

c.當等待隊列已經滿了,當新的任務被execute送出到線程池時,會建立新的線程,直到到達maximumPoolSize。

d.如果maximumPoolSize也滿了,當新的任務被execute送出到線程池時,就會使用後面的處理政策handler。

先定義一個概念,拒絕任務。

所謂拒絕任務是指,當線程池的等待隊列已滿,并且已經到達maximumPoolSize時,被execute添加進來的任務。

常見的處理政策有下面4種:

(1). CallerRunsPolicy :這個政策重試添加目前的任務,他會自動重複調用 execute() 方法,直到成功。也就是說,這個政策會一直嘗試着将這個新到來的任務添加到線程池中,直到添加成功。

(2). AbortPolicy :對拒絕任務抛棄處理,并且抛出異常。

(3). DiscardPolicy :對拒絕任務直接無聲抛棄,沒有異常資訊。

(4). DiscardOldestPolicy :對拒絕任務不抛棄,而是抛棄隊列裡面等待最久的一個線程,然後把拒絕任務加到隊列。

在項目中最好使用CallerRunsPolicy ,否則其他政策都有抛棄的可能。就拿發送push來說,有些使用者可能收不到push,尤其是一些上司收不到,這會很慘。這次線上問題的原因就是,原來代碼使用的DiscardOldestPolicy 政策。

使用線程池的好處:

線程的建立與消亡會浪費很多時間,使用線程池可以減少這種時間消耗。

使用java提供的Executor還可以實作任務的送出與執行解耦。

線上問題的工程有一個生産線程池,一個消費線程池。生産線程池中的線程在執行過程中隻是進行的簡單調用,而消費線程池中的線程在執行過程中卻要RPC調用,RPC調用相對于簡單調用會消耗大量時間,是以消費線程池要設定的大一些。

線程池中線程的數量與CPU的核心數有很大關系。檢視伺服器CPU的核心數可以使用cat /proc/cpuinfo來檢視。

線程池中線程的數量與等待隊列的大小關系:等待隊列比較消耗記憶體,線程消耗CPU,這個要衡量好。

下面有一個例子:

package org.fan.learn.thread.share;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestRejectedPolicy {
    public static void main(String[] args) throws InterruptedException {
//        ThreadPoolExecutor pool = new ThreadPoolExecutor(, , , TimeUnit.SECONDS,
//                new ArrayBlockingQueue<Runnable>());//設定線程池隻啟動一個線程 阻塞隊列一個元素
//        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        ThreadPoolExecutor pool = new ThreadPoolExecutor(, , , TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new ThreadPoolExecutor.DiscardOldestPolicy());
        //設定政策為擠掉最舊的
        for (int i = ; i < 10; i++) {
            final int  j = i;
            pool.submit(new Runnable() {
                public void run() {
                    System.out.println("線程: "+j + " " + Thread.currentThread().getName()+"  開始執行");
                    try {
//                        Thread.sleep(L);
                        TimeUnit.SECONDS.sleep();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("線程: "+j + " " + Thread.currentThread().getName()+"  執行完畢");
                }
            });
        }
        Thread.sleep(L);
        pool.shutdown();
        System.out.println("關閉後線程終止了嗎?" + pool.isTerminated());
    }
}
           

執行結果如下:

線程: 0 pool-1-thread-1 開始執行

線程: 0 pool-1-thread-1 執行完畢

線程: 9 pool-1-thread-1 開始執行

線程: 9 pool-1-thread-1 執行完畢

關閉後線程終止了嗎?false

這裡故意讓線程開始執行之後sleep 1秒,這樣新的線程被添加到線程池時都會講等待隊列中的舊的線程抛棄,導緻隻有第一個和最後一個能夠執行。