最近搞了個項目,涉及到多線程程式設計,同時呢,有涉及線程需要傳回結果的功能。
首先,介紹一下ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize: 線程池維護線程的最少數量 maximumPoolSize:線程池維護線程的最大數量 keepAliveTime: 線程池維護線程所允許的空閑時間 unit: 線程池維護線程所允許的空閑時間的機關 workQueue: 線程池所使用的緩沖隊列 handler: 線程池對拒絕任務的處理政策
handler有四個選擇: ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException異常 ThreadPoolExecutor.CallerRunsPolicy() 重試添加目前的任務,他會自動重複調用execute()方法 ThreadPoolExecutor.DiscardOldestPolicy() 抛棄舊的任務 ThreadPoolExecutor.DiscardPolicy() 抛棄目前的任務
// 構造一個線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy());
在這段程式中,main()方法相當于一個殘忍的上司,他派發出許多任務,丢給一個叫 threadPool的任勞任怨的小組來做。 這個小組裡面隊員至少有兩個,如果他們兩個忙不過來,任務就被放到任務清單裡面。 如果積壓的任務過多,多到任務清單都裝不下(超過3個)的時候,就雇傭新的隊員來幫忙。但是基于成本的考慮,不能雇傭太多的隊員,至多隻能雇傭 4個。 如果四個隊員都在忙時,再有新的任務,這個小組就處理不了了,任務就會被通過一種政策來處理,我們的處理方式是不停的派發,直到接受這個任務為止(更殘忍!呵呵)。 因為隊員工作是需要成本的,如果工作很閑,閑到 3SECONDS都沒有新的任務了,那麼有的隊員就會被解雇了,但是,為了小組的正常運轉,即使工作再閑,小組的隊員也不能少于兩個。
以上是參考别人的部落格,感覺寫的真的不錯,尤其是這個例子,通俗易懂。
下面是兩種線程的需求:
1、線程需要有傳回值 實作callable接口的 call方法
思路: 可以在call裡寫循環,建立線程類的時候用List、Map等集合來傳參 class ShortenUrl implements Callable<List<Map<String, String>>>{ @Override public List<Map<String, String>> call() throws Exception { return backList; } }
調用時: private static final int size = 1000 ;
private final static ExecutorService pool = ThreadPoolExecutorUtil. getInstance ().getThreadPoolExecutorService();
CompletionService<List<Map<String, String>>> urlThreadService = new ExecutorCompletionService<>( pool );
使用這個urlThreadService 進行建立線程 urlThreadService.submit( new ShortenUrl(threadUrlRecordList, url_long_Map, url_key_Map, thread_message_Map, messageMap_Map, batch_id));
使用這個方式判斷線程是否執行完畢 for ( int i = 0 ; i< urlThreadCount; ++i){ try { List<Map<String, String>> back = urlThreadService.take().get(); if ( null != back && back.size() > 0 ){ fromUrlThreadList.addAll(back); } } catch (InterruptedException | ExecutionException e) { logger .error( "線程等待異常:{}" , e); } }
2、線程不需要傳回值 實作Runnable接口的 run方法 class MySendSmsThread implements Runnable{ @Override public void run(){
finally { doneSignal .countDown(); }
} } final CountDownLatch doneSignal = new CountDownLatch(threadCount); try { doneSignal.await(); logger .info( " \n 線程執行完畢 jobSms:{}" , jobSms.getId()); } catch (InterruptedException e) { logger .error( "線程等待異常:{}" , e); } 可以使用doneSignal 進行線程是否執行的判斷
int total = sendDataMap.size(); int threadCount = ( int ) (total / size + ((total % size != 0 ) ? 1 : 0 )); 使用這種辦法設定線程數量
for ( int j = 0 ; j< threadCount; ++j){ int startIndex = j * size ; int toIndex = ((j + 1 ) * size ) > sendSmsList.size() ? sendSmsList.size() : (j + 1 ) * size ; List<NewSendSms> threadRecordList = sendSmsList.subList(startIndex, toIndex); ThreadPoolExecutorUtil. getInstance ().getThreadPoolExecutorService() .execute( new GetStatusThread(batchNo, threadRecordList, doneSignal)); 調用 } 使用這種辦法獲得每個線程中的List
最後,貼一下線程池的工具類,這是組内大佬寫的,給大家做個參考。 這個 LinkedBlockingQueue,有興趣的話可以去查查為什麼使用這個隊列作為實作方式
public class ThreadPoolExecutorUtil {
public static final int NTHREADS = 200;// 預設線程池個數
private ThreadPoolExecutorUtil() {
}
private volatile static ThreadPoolExecutorUtil INSTANCE;
private static final LinkedBlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue<>(5000);// 後續應加入隊列長度監聽機制
private ExecutorService executorService = new ThreadPoolExecutor(NTHREADS, NTHREADS, 0L, TimeUnit.MILLISECONDS,
QUEUE);
public static ThreadPoolExecutorUtil getInstance() {
if (null == INSTANCE) {
synchronized (ThreadPoolExecutorUtil.class) {
if (null == INSTANCE) {
INSTANCE = new ThreadPoolExecutorUtil();
}
}
}
return INSTANCE;
}
public ExecutorService getThreadPoolExecutorService() {
if (null == executorService) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(NTHREADS, NTHREADS, 0L, TimeUnit.MILLISECONDS, QUEUE);
/**
* 重寫Rejected政策,預設抛出TaskRejectedException異常,然後不執行
* 當pool已經達到maxsize的時候 不在新線程中執行任務,而是有調用者所在的線程來執行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
return executorService;
}
@PreDestroy
public void destroy() {
executorService.shutdown();
}
}
剛剛接觸,如有錯誤還請各位大牛指出