天天看點

通過ThreadPoolExecutor 進行多線程程式設計

最近搞了個項目,涉及到多線程程式設計,同時呢,有涉及線程需要傳回結果的功能。

首先,介紹一下ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,  long keepAliveTime, TimeUnit unit,  BlockingQueue<Runnable> workQueue,  RejectedExecutionHandler handler)   corePoolSize: 線程池維護線程的最少數量  maximumPoolSize:線程池維護線程的最大數量  keepAliveTime: 線程池維護線程所允許的空閑時間  unit: 線程池維護線程所允許的空閑時間的機關  workQueue: 線程池所使用的緩沖隊列  handler: 線程池對拒絕任務的處理政策 

handler有四個選擇:  ThreadPoolExecutor.AbortPolicy()  抛出java.util.concurrent.RejectedExecutionException異常  ThreadPoolExecutor.CallerRunsPolicy()  重試添加目前的任務,他會自動重複調用execute()方法  ThreadPoolExecutor.DiscardOldestPolicy()  抛棄舊的任務  ThreadPoolExecutor.DiscardPolicy()  抛棄目前的任務 

// 構造一個線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy());

在這段程式中,main()方法相當于一個殘忍的上司,他派發出許多任務,丢給一個叫 threadPool的任勞任怨的小組來做。  這個小組裡面隊員至少有兩個,如果他們兩個忙不過來,任務就被放到任務清單裡面。  如果積壓的任務過多,多到任務清單都裝不下(超過3個)的時候,就雇傭新的隊員來幫忙。但是基于成本的考慮,不能雇傭太多的隊員,至多隻能雇傭 4個。  如果四個隊員都在忙時,再有新的任務,這個小組就處理不了了,任務就會被通過一種政策來處理,我們的處理方式是不停的派發,直到接受這個任務為止(更殘忍!呵呵)。  因為隊員工作是需要成本的,如果工作很閑,閑到 3SECONDS都沒有新的任務了,那麼有的隊員就會被解雇了,但是,為了小組的正常運轉,即使工作再閑,小組的隊員也不能少于兩個。 

以上是參考别人的部落格,感覺寫的真的不錯,尤其是這個例子,通俗易懂。

下面是兩種線程的需求:

1、線程需要有傳回值 實作callable接口的 call方法

思路: 可以在call裡寫循環,建立線程類的時候用List、Map等集合來傳參 class ShortenUrl implements Callable<List<Map<String, String>>>{ @Override public List<Map<String, String>> call() throws Exception { return backList; } }

調用時: private static final int size = 1000 ;

private final static ExecutorService pool = ThreadPoolExecutorUtil. getInstance ().getThreadPoolExecutorService();

CompletionService<List<Map<String, String>>> urlThreadService = new ExecutorCompletionService<>( pool );

使用這個urlThreadService 進行建立線程 urlThreadService.submit( new ShortenUrl(threadUrlRecordList, url_long_Map, url_key_Map, thread_message_Map, messageMap_Map, batch_id));

使用這個方式判斷線程是否執行完畢 for ( int i = 0 ; i< urlThreadCount; ++i){ try { List<Map<String, String>> back = urlThreadService.take().get(); if ( null != back && back.size() > 0 ){ fromUrlThreadList.addAll(back); } } catch (InterruptedException | ExecutionException e) { logger .error( "線程等待異常:{}" , e); } }

2、線程不需要傳回值 實作Runnable接口的 run方法 class MySendSmsThread implements Runnable{ @Override public void run(){

finally { doneSignal .countDown(); }

} } final CountDownLatch doneSignal = new CountDownLatch(threadCount); try { doneSignal.await(); logger .info( " \n 線程執行完畢 jobSms:{}" , jobSms.getId()); } catch (InterruptedException e) { logger .error( "線程等待異常:{}" , e); } 可以使用doneSignal 進行線程是否執行的判斷

int total = sendDataMap.size(); int threadCount = ( int ) (total / size + ((total % size != 0 ) ? 1 : 0 )); 使用這種辦法設定線程數量

for ( int j = 0 ; j< threadCount; ++j){ int startIndex = j * size ; int toIndex = ((j + 1 ) * size ) > sendSmsList.size() ? sendSmsList.size() : (j + 1 ) * size ; List<NewSendSms> threadRecordList = sendSmsList.subList(startIndex, toIndex); ThreadPoolExecutorUtil. getInstance ().getThreadPoolExecutorService() .execute( new GetStatusThread(batchNo, threadRecordList, doneSignal)); 調用 } 使用這種辦法獲得每個線程中的List

最後,貼一下線程池的工具類,這是組内大佬寫的,給大家做個參考。 這個 LinkedBlockingQueue,有興趣的話可以去查查為什麼使用這個隊列作為實作方式

public class ThreadPoolExecutorUtil {
    public static final int NTHREADS = 200;// 預設線程池個數

    private ThreadPoolExecutorUtil() {
    }

    private volatile static ThreadPoolExecutorUtil INSTANCE;
    private static final LinkedBlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue<>(5000);// 後續應加入隊列長度監聽機制
    private ExecutorService executorService = new ThreadPoolExecutor(NTHREADS, NTHREADS, 0L, TimeUnit.MILLISECONDS,
            QUEUE);

    public static ThreadPoolExecutorUtil getInstance() {
        if (null == INSTANCE) {
            synchronized (ThreadPoolExecutorUtil.class) {
                if (null == INSTANCE) {
                    INSTANCE = new ThreadPoolExecutorUtil();
                }
            }
        }
        return INSTANCE;
    }

    public ExecutorService getThreadPoolExecutorService() {
        if (null == executorService) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(NTHREADS, NTHREADS, 0L, TimeUnit.MILLISECONDS, QUEUE);
            /**
             * 重寫Rejected政策,預設抛出TaskRejectedException異常,然後不執行
             * 當pool已經達到maxsize的時候 不在新線程中執行任務,而是有調用者所在的線程來執行
             */
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
        return executorService;
    }

    @PreDestroy
    public void destroy() {
        executorService.shutdown();
    }
}
           

剛剛接觸,如有錯誤還請各位大牛指出