天天看點

java7 fork join_Java 7 Fork/Join 架構使用

最近有個業務是批量導出cognos報表, 由于未開發此功能, 人工導出需要大量的時間消耗, 奔着珍惜時間的使命寫了一個導出工具類, 至此在導出的過程中用到了并發請求數, 比如:每秒并發10次,20次等. 工作中使用的Java8并發文法, 在此之前先介紹一下Java7 Fork/Join的架構使用方式.

之前使用的此架構是一個查詢SQL時, 當時一個SQL有28個子語句通過left join 拼接而成, 查詢速度為20s,經常卡死, 最後寫成并發,28個子語句,分成3批次,每次10個SQL,并行查詢,最後通過Java算法拼接成List,從20S變為1.4S左右,性能大大提升.

下面就開始今天的内容:

簡介

從JDK1.7開始,Java提供Fork/Join架構用于并行執行任務,它的思想就是講一個大任務分割成若幹小任務,最終彙總每個小任務的結果得到這個大任務的結果。

這種思想和MapReduce很像(input --> split --> map --> reduce --> output)

主要有兩步:

第一、任務切分;

第二、結果合并

剛剛我介紹的SQL其實就是這樣的原理.

API 介紹

ForkJoinPool 池子

ForkJoinPool的優勢在于,可以充分利用多cpu,多核cpu的優勢,把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上并行執行;當多個“小任務”執行完成之後,再将這些執行結果合并起來即可。

ForkJoinTask 任務

ForkJoinTask代表運作在ForkJoinPool中的任務。

主要方法:

fork() 在目前線程運作的線程池中安排一個異步執行。簡單的了解就是再建立一個子任務。

join() 當任務完成的時候傳回計算結果。

invoke() 開始執行任務,如果必要,等待計算完成。

子類:

RecursiveAction 一個遞歸無結果的ForkJoinTask(沒有傳回值)

RecursiveTask 一個遞歸有結果的ForkJoinTask(有傳回值)

例子

private static final ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue(100),

new ThreadFactoryBuilder().setNameFormat("Reports-%d").setDaemon(true).build(),

new ThreadPoolExecutor.AbortPolicy());

這裡先建立了一個多線程任務,意思為:

這裡核心線程數5

最大線程數5

blockingQueue 最大size 100, 解釋: workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運作過程産生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇: ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊政策與BlockingQueue有關。

reject政策 java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy,意思是由調用線程處理該任務

另外的政策

ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

方法調用,模拟一下并發執行SQL拼接表的過程

public Page getRepairTaskReport() {

Listids = Lists.newArrayList("1","2","3","4","5","6","7","8","9");

//多線程查詢列資料

Set setIds = new HashSet<>();

for (String id : ids) {

setIds.add(id);

}

//整理要查詢的列,以後可以做成由前端指定查詢哪些column

List columns = new ArrayList<>();

columns.add("ID1相關");

columns.add("ID2相關");

...

columns.add("ID9相關");

Map>> futureMap = new HashMap<>();

Map>> columnCallableMap = getColumnCallableMap(appId, staff, columns, params, queryConditionVo, departIdSet);

for (Map.Entry>> entry : columnCallableMap.entrySet()) {

futureMap.put(entry.getKey(), executorService.submit(entry.getValue()));

}

//合并報表

for (RepairTaskReportColumn column : columns) {

try {

//列資料

List columnDataList = futureMap.get(column).get();

for (Object[] objects : columnDataList) {

String departId = objects[0].toString();

String columnData = objects[1].toString();

//比對行資料

for (String reportVo : ids) {

if (departId.equals(reportVo.getDepartId())) {

switch (column) {

case "ID1相關":

// 并行傳回ID1相關資料

break;

case "ID2相關":

// 并行傳回ID2相關資料

break;

...

//

}

}

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

return "最終結果";

}

private Map>> getColumnCallableMap(String ID, List columns, Set departIdSet) {

Map>> columnMap = new HashMap<>();

for (T column : columns) {

switch (column) {

case "ID1相關SQL查詢":

columnMap.put("ID1相關", getSQL(ID,T,departIdSet));

break;

case "ID2相關SQL查詢":

columnMap.put("ID2相關", getSQL(ID,T,departIdSet));

break;

....

default:

break;

}

}

return columnMap;

}

private Callable> getSQL(final String ID, final T column, final Set departIdSet) {

return new Callable>() {

@Override

public List call() throws Exception {

//todo sql query

//傳回格式:object[0]為departId,object[1]為需要的資料

String sql = "select * from table xxxxx";

List result = dao.getResult(sql);

return result;

}

};

}

以上代碼為僞代碼,實作的邏輯其實很簡單.

大緻邏輯如下

我有一條SQL,為N個left join 拼接而成,那麼我現在就是吧N個left 拆分成N個小SQL,并發執行,那麼執行時間縮短為N倍, 然後通過N個SQL查詢出的結果,通過相同的屬性 再次拼接成業務正确的資料

就是這樣的一個圖:

java7 fork join_Java 7 Fork/Join 架構使用

image.png

java7 fork join_Java 7 Fork/Join 架構使用

image.png

哈哈, 大緻就是這樣,通過并發執行任務,人工點選的8八小時縮短為10分鐘! 是不是很秀呢

下次講解JDK8中并發執行的例子,更為簡潔

歡迎小夥伴們留言哦