天天看點

java線程池 消息隊列_淺談java.util.concurrent包中的線程池和消息隊列

1.java并發包介紹

JDK5.0(JDK1.5更名後)以後的版本引入進階并發特性,大多數的特性在java.util.concurrent包中,是專門用于多線程程式設計的,充分利用了現代多處理器和多核心系統的功能以編寫大規模并發應用程式。主要包括原子量、并發集合、同步器、可重入鎖,并對線程池的構造提供了強力的支援

2.線程池

java.util.concurrent.Executors提供了一個 java.util.concurrent.Executor接口的實作用于建立線程池

多線程技術主要解決處理器單元内多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。

假設伺服器完成一項任務所需時間為:T1 建立線程時間,T2 線上程中執行任務的時間,T3 銷毀線程時間。如果T1 + T3 遠大于 T2,則可以采用線程池,以提高伺服器性能,減少建立和銷毀線程所需消耗的時間。

一個線程池由以下四個基本部分組成:

線程池管理器(ThreadPool):用于建立并管理線程池,包括 建立線程池,銷毀線程池,添加新任務;

工作線程(PoolWorker):線程池中線程,在沒有任務時處于等待狀态,可以循環的執行任務;

任務接口(Task):每個任務必須實作的接口,以供工作線程排程任務的執行,它主要規定了任務的入口,任務執行完後的收尾工作,任務的執行狀态等;

任務隊列(taskQueue):用于存放沒有處理的任務。提供一種緩沖機制。

線程池技術正是關心如何縮短或調整T1,T3時間進而提高伺服器程式性能的技術。它把T1,T3分别安排在伺服器程式的啟動和結束的時間段或者一些空閑的時間段,這樣在伺服器程式處理客戶請求時,免去了線程建立和銷毀的開銷。

線程池不僅調整T1,T3産生的時間段,而且它還顯著減少了建立線程的數目,看一個例子:

假設一個伺服器一天要處理100000個請求,并且每個請求需要一個單獨的線程完成。線上程池中,線程數一般是固定的,

一般線程池大小是遠小于100000。是以利用線程池的伺服器程式不會為了建立100000而在處理請求時浪費時間,進而提高效率。

線程池的五種建立方式

Single Thread Executor:隻有一個線程的線程池,是以所送出的任務是順序執行,Executors.newSingleThreadExecutor();

Cached Thread Pool:線程池裡有很多線程需同時進行,舊的可用線程将被新的任務觸發進而重新執行,如果線程超過60秒内沒有執行,那麼将被終止并從池中删除Executors.newCachedThreadPool();

Fixed Thread Pool:擁有固定線程數的線程池,如果沒有任務執行,那麼線程會一直等待,Executors.newFixedThreadPool(10);在構造函數中的參數10是線程池的大小,你可以随意設定,也可以和cpu的數量保持一緻,擷取cpu的數量int cpuNums = Runtime.getRuntime().availableProcessors();

Scheduled Thread Pool:用來排程即将執行的任務的線程池Executors.newScheduledThreadPool();

Sing Thread Scheduled Pool:隻有一個線程,用來排程任務在指定時間執行Executors.newSingleThreadScheduledExecutor();

3.線程池的使用

以下用Fixed Thread Pool作為示範,提供一個使用參考

LogNumVo

package com.ithzk.threadpool;

public class LogNumVo {

private static final long serialVersionUID = -5541722936350755569L;

private Integer dataNum;

private Integer successNum;

private Integer waitNum;

public Integer getDataNum() {

return dataNum;

}

public void setDataNum(Integer dataNum) {

this.dataNum = dataNum;

}

public Integer getSuccessNum() {

return successNum;

}

public void setSuccessNum(Integer successNum) {

this.successNum = successNum;

}

public Integer getWaitNum() {

return waitNum;

}

public void setWaitNum(Integer waitNum) {

this.waitNum = waitNum;

}

}

DealObject

package com.ithzk.threadpool;

public class DealObject {

private Integer identifyId;

private String data;

public DealObject(Integer identifyId, String data) {

this.identifyId = identifyId;

this.data = data;

}

public DealObject() {

}

public Integer getIdentifyId() {

return identifyId;

}

public void setIdentifyId(Integer identifyId) {

this.identifyId = identifyId;

}

public String getData() {

return data;

}

public void setData(String data) {

this.data = data;

}

}

AbstractCalculateThread

package com.ithzk.threadpool;

import java.util.Collection;

import java.util.concurrent.Callable;

import java.util.concurrent.CountDownLatch;

public class AbstractCalculateThread implements Callable {

protected Collection insertList;

protected CountDownLatch countd;

protected String threadCode;

protected String batchNumber;

public Collection getInsertList() {

return insertList;

}

public void setInsertList(Collection insertList) {

this.insertList = insertList;

}

public CountDownLatch getCountd() {

return countd;

}

public void setCountd(CountDownLatch countd) {

this.countd = countd;

}

public String getThreadCode() {

return threadCode;

}

public void setThreadCode(String threadCode) {

this.threadCode = threadCode;

}

public String getBatchNumber() {

return batchNumber;

}

public void setBatchNumber(String batchNumber) {

this.batchNumber = batchNumber;

}

public AbstractCalculateThread() {

super();

}

public AbstractCalculateThread(Collection insertList, CountDownLatch countd, String threadCode,String batchNumber) {

super();

this.insertList = insertList;

this.countd = countd;

this.threadCode = threadCode;

this.batchNumber = batchNumber;

}

public String call() throws Exception {

return null;

}

}

CalculateDealThread

package com.ithzk.threadpool;

import java.util.Collection;

import java.util.concurrent.CountDownLatch;

public class CalculateDealThread extends AbstractCalculateThread {

private ExecutorPool executorPool = SpringContextUtil.getBean(ExecutorPool.class);

@Override

public String call() throws Exception {

try {

System.out.println("========開始跑線程【"+threadCode+"】");

return executorPool.syncBatchDealObject(insertList,batchNumber);

} catch (Exception e) {

e.printStackTrace();

System.out.println("========開始跑線程【"+threadCode+"】:"+e.getMessage());

}finally {

countd.countDown();

}

return null;

}

public CalculateDealThread() {

super();

}

public CalculateDealThread(Collection insertList, CountDownLatch countd, String threadCode,String batchNumber) {

super(insertList, countd, threadCode, batchNumber);

}

}

ExecutorPool

package com.ithzk.threadpool;

import java.util.*;

import java.util.concurrent.*;

public class ExecutorPool {

private static final int ARRAY_COUNT = 50000;

private static final int MULTI_THREAD_STARTCOUNT = 10000;

private static final int BATCH_DEAL_SIZE = 100;

public static final int THREAD_POOL_NUM=10;

public static void main(String[] args){

testExecutorPool();

}

public static void testExecutorPool(){

ArrayList dealObjects = new ArrayList();

for (int i = 0;i

DealObject dealObject = new DealObject(i,"data_"+i);

dealObjects.add(dealObject);

System.out.println("Data add success current:"+i);

}

int size = dealObjects.size();

int successNum = 0;

int waitNum = 0;

System.out.println("需要處理的資料資料量為:"+size);

// 判斷資料是否大于10000 如果大于則開啟線程池 跑資料

if (size > MULTI_THREAD_STARTCOUNT) {

try {

System.out.println("===================dataNum > 1000 | Multiple Thread Run=======================");

// 每次新增處理的條數

int batchInsertSize = BATCH_DEAL_SIZE;

// 定義儲存的線程池

ExecutorService executorInsert = Executors.newFixedThreadPool(THREAD_POOL_NUM);

// 定義儲存過程中傳回的線程執行傳回參數

List> futureListIsert = new ArrayList>();

// 線程 修改list

List> listDealObjects = new ArrayList>();

List> listLiveSyncLogInsert = pointDateClassify(dealObjects, batchInsertSize, listDealObjects);

if (null != listLiveSyncLogInsert && !listDealObjects.isEmpty()) {

System.out.println("===================切割後的大小:"+listLiveSyncLogInsert.size()+"=======================");

//配合使用CountDownLatch為了保證在執行完所有子程式之後再執行主程式

CountDownLatch countd = new CountDownLatch(listLiveSyncLogInsert.size());

for (int j = 0; j < listLiveSyncLogInsert.size(); j++) {

Map insert = listLiveSyncLogInsert.get(j);

Future future = executorInsert.submit(new CalculateDealThread(insert.values(), countd,"executor_pool_test_thread", null));

futureListIsert.add(future);

}

}

// 等待線程執行完成

executorInsert.shutdown();

for (Future future : futureListIsert) {

String json = future.get();

if (null != json && !"".equals(json)) {

将傳回的json格式資料轉換為實體類 進行業務記錄

LogNumVo logNumVo = JSON.toJavaObject(JSON.parseObject(json),LogNumVo.class);

successNum += logNumVo.getSuccessNum();

waitNum += logNumVo.getWaitNum();

}

}

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

}

@SuppressWarnings("all")

public static List> pointDateClassify(List lPostUploadIntegralList,int batchInsertSize, List> listJSONObjectUpdate) {

List> listLiveSyncLogInsert = new Vector>();

// 新增資料list

List integralListInsert = lPostUploadIntegralList;

System.out.println("============integralListInsert.size()=====:" + integralListInsert.size());

// 拆分資料(拆成多個List)

int inserti = 0;

if (integralListInsert != null && integralListInsert.size() > 0) {

ConcurrentHashMap integralListIns = null;

for (int l = 0; l < integralListInsert.size(); l++) {

if (integralListIns == null) {

integralListIns = new ConcurrentHashMap();

}

integralListIns.put(integralListInsert.get(l).getIdentifyId(), integralListInsert.get(l));

inserti++;

if ((inserti % batchInsertSize) == 0) {

listLiveSyncLogInsert.add(integralListIns);

integralListIns = null;

} else {

// 最後100條或不足100條資料

if ((l + 1) == integralListInsert.size()) {

listLiveSyncLogInsert.add(integralListIns);

}

}

}

}

System.out.println("=============listPostUploadInsert.size()====:" + listLiveSyncLogInsert.size());

return listLiveSyncLogInsert;

}

public String syncBatchDealObject(Collection insertList,String batchNumber) {

int successNum = 0, waitNum = 0;

Date currentDate = new Date(System.currentTimeMillis());

for (DealObject dealObject : insertList) {

try {

int icount = syncDealObject(dealObject,currentDate);

if(icount > 0){

successNum ++;

}else {

waitNum ++;

}

} catch (Exception e) {

e.printStackTrace();

++waitNum;

}

}

LogNumVo logNum = new LogNumVo();

logNum.setDataNum(0);

logNum.setSuccessNum(successNum);

logNum.setWaitNum(waitNum);

// 将記錄實體類轉為json格式回報給線程池

return JSON.toJSONString(logNum);

}

private int syncDealObject(DealObject dealObject,Date currentDate){

int successNum = 0;

//業務處理邏輯

if(null != dealObject.getData()){

successNum++;

}

return successNum;

}

}

4.BlockingQueue

BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。

插入:

add(anObject)

把anObject加到BlockingQueue裡,如果BlockingQueue可以容納,則傳回true,否則抛出異常

offer(anObject)

把anObject加到BlockingQueue裡,如果BlockingQueue可以容納,則傳回true,否則傳回false.

put(anObject)

把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則調用此方法的線程被阻塞直到BlockingQueue裡面有空間再繼續.

讀取:

poll(time)

取走BlockingQueue裡排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時傳回null

take()

取走BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀态直到Blocking有新的對象被加入為止

其他:

int remainingCapacity()

傳回理想情況下(沒有記憶體和資源限制)此隊列可接受并且不會被阻塞的附加元素數量。

該數量總是等于此隊列的初始容量,小于隊列的目前 size(傳回隊列剩餘的容量)。

注意,不能總是通過檢查 remainingcapacity 來斷定試圖插入一個元素是否成功,因為可能是另一個線程将插入或移除某個元

素。

boolean remove(Object o)

從隊列移除元素,如果存在,即移除一個或者更多,隊列改變了傳回true

public boolean contains(Object o)

檢視隊列是否存在這個元素,存在傳回true

int drainTo(Collection super E> c)

傳入的集合中的元素,如果在隊列中存在,那麼将隊列中的元素移動到集合中

int drainTo(Collection super E> c, int maxElements)

和上面方法的差別在于,制定了移動的數量

以下是一個BlockQueue的基本使用參考:

Producer

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable{

BlockingQueue blockingQueue;

public Producer(BlockingQueue blockingQueue) {

this.blockingQueue = blockingQueue;

}

@Override

public void run() {

try {

String threadIdentify = "A Producer,生産線程"+Thread.currentThread().getName();

blockingQueue.put(threadIdentify);

System.out.println("Produce success! Thread:"+Thread.currentThread().getName());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

Consumer

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

BlockingQueue blockingQueue;

public Consumer(BlockingQueue blockingQueue) {

this.blockingQueue = blockingQueue;

}

@Override

public void run() {

try {

String consumer = Thread.currentThread().getName();

System.out.println("Current Consumer Thread:"+consumer);

//如果隊列為空會阻塞目前線程

String take = blockingQueue.take();

System.out.println(consumer + " consumer get a product:"+take);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

BlockTest

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class BlockTest {

public static void main(String[] args) throws InterruptedException {

// 不設定的話,LinkedBlockingQueue預設大小為Integer.MAX_VALUE

// BlockingQueue blockingQueue = new LinkedBlockingQueue();

// BlockingQueue blockingQueue = new ArrayBlockingQueue(2);

BlockingQueue blockingQueue = new LinkedBlockingQueue(2);

Consumer consumer = new Consumer(blockingQueue);

Producer producer = new Producer(blockingQueue);

for (int i = 0; i < 3; i++) {

new Thread(producer, "Producer" + (i + 1)).start();

}

for (int i = 0; i < 5; i++) {

new Thread(consumer, "Consumer" + (i + 1)).start();

}

Thread.sleep(5000);

new Thread(producer, "Producer" + (5)).start();

}

}

BlockingQueue有四個具體的實作類,常用的兩種實作類為:

ArrayBlockingQueue:一個由數組支援的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。

LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制。

若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。

LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。

LinkedBlockingQueue和ArrayBlockingQueue差別

LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導緻LinkedBlockingQueue的資料吞吐量要大于ArrayBlockingQueue,但線上程數量很大時其性能的可預見性低于ArrayBlockingQueue.

以上就是本文的全部内容,希望對大家的學習有所幫助,也希望大家多多支援腳本之家。