一、背景
現實業務開發中,通常為了避免逾時、對方接口限制等原因需要對支援批量的接口的資料分批調用。
比如List參數的size可能為 幾十個甚至上百個,但是假如對方dubbo接口比較慢,傳入50個以上會逾時,那麼可以每次傳入20個,分批執行。
通常很多人會寫 for 循環或者 while 循環,非常不優雅,無法複用,而且容易出錯。
下面結合 Java8 的 Stream ,Function ,Consumer 等特性實作分批調用的工具類封裝和自測。
并給出 CompletableFuture 的異步改進方案。
二、實作
工具類:
package com.chujianyun.common.java8.function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
/**
- 執行工具類
*
- @author 明明如月
*/
public class ExecuteUtil {
public static <T> void partitionRun(List<T> dataList, int size, Consumer<List<T>> consumer) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
Preconditions.checkArgument(size > 0, "size must not be a minus");
Lists.partition(dataList, size).forEach(consumer);
}
public static <T, V> List<V> partitionCall2List(List<T> dataList, int size, Function<List<T>, List<V>> function) {
if (CollectionUtils.isEmpty(dataList)) {
return new ArrayList<>(0);
}
Preconditions.checkArgument(size > 0, "size must not be a minus");
return Lists.partition(dataList, size)
.stream()
.map(function)
.filter(Objects::nonNull)
.reduce(new ArrayList<>(),
(resultList1, resultList2) -> {
resultList1.addAll(resultList2);
return resultList1;
});
}
public static <T, V> Map<T, V> partitionCall2Map(List<T> dataList, int size, Function<List<T>, Map<T, V>> function) {
if (CollectionUtils.isEmpty(dataList)) {
return new HashMap<>(0);
}
Preconditions.checkArgument(size > 0, "size must not be a minus");
return Lists.partition(dataList, size)
.stream()
.map(function)
.filter(Objects::nonNull)
.reduce(new HashMap<>(),
(resultMap1, resultMap2) -> {
resultMap1.putAll(resultMap2);
return resultMap1;
});
}
}
待調用的服務(模拟)
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SomeManager {
public void aRun(Long id, List<String> data) {
}
public List<Integer> aListMethod(Long id, List<String> data) {
return new ArrayList<>(0);
}
public Map<String, Integer> aMapMethod(Long id, List<String> data) {
return new HashMap<>(0);
}
單元測試:
import org.apache.commons.lang3.RandomUtils;
import org.jeasy.random.EasyRandom;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.verification.Times;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@RunWith(PowerMockRunner.class)
public class ExecuteUtilTest {
private EasyRandom easyRandom = new EasyRandom();
@Mock
private SomeManager someManager;
// 測試資料
private List<String> mockDataList;
private int total = 30;
@Before
public void init() {
// 構造30條資料
mockDataList = easyRandom.objects(String.class, 30).collect(Collectors.toList());
}
@Test
public void test_a_run_partition() {
// mock aRun
PowerMockito.doNothing().when(someManager).aRun(anyLong(), any());
// 每批 10 個
ExecuteUtil.partitionRun(mockDataList, 10, (eachList) -> someManager.aRun(1L, eachList));
//驗證執行了 3 次
Mockito.verify(someManager, new Times(3)).aRun(anyLong(), any());
}
@Test
public void test_call_return_list_partition() {
// mock 每次調用傳回條數(注意每次調用都是這2個)
int eachReturnSize = 2;
PowerMockito
.doReturn(easyRandom.objects(String.class, eachReturnSize).collect(Collectors.toList()))
.when(someManager)
.aListMethod(anyLong(), any());
// 分批執行
int size = 4;
List<Integer> resultList = ExecuteUtil.partitionCall2List(mockDataList, size, (eachList) -> someManager.aListMethod(2L, eachList));
//驗證執行次數
int invocations = 8;
Mockito.verify(someManager, new Times(invocations)).aListMethod(anyLong(), any());
// 正好幾輪
int turns;
if (total % size == 0) {
turns = total / size;
} else {
turns = total / size + 1;
}
Assert.assertEquals(turns * eachReturnSize, resultList.size());
}
@Test
public void test_call_return_map_partition() {
// mock 每次調用傳回條數
// 注意:
// 如果僅調用doReturn一次,那麼每次傳回都是key相同的Map,
// 如果需要不覆寫,則doReturn次數和 invocations 相同)
int eachReturnSize = 3;
PowerMockito
.doReturn(mockMap(eachReturnSize))
.doReturn(mockMap(eachReturnSize))
.when(someManager).aMapMethod(anyLong(), any());
// 每批
int size = 16;
Map<String, Integer> resultMap = ExecuteUtil.partitionCall2Map(mockDataList, size, (eachList) -> someManager.aMapMethod(2L, eachList));
//驗證執行次數
int invocations = 2;
Mockito.verify(someManager, new Times(invocations)).aMapMethod(anyLong(), any());
// 正好幾輪
int turns;
if (total % size == 0) {
turns = total / size;
} else {
turns = total / size + 1;
}
Assert.assertEquals(turns * eachReturnSize, resultMap.size());
}
private Map<String, Integer> mockMap(int size) {
Map<String, Integer> result = new HashMap<>(size);
for (int i = 0; i < size; i++) {
// 極力保證key不重複
result.put(easyRandom.nextObject(String.class) + RandomUtils.nextInt(), easyRandom.nextInt());
}
return result;
}
注意:
1 判空
.filter(Objects::nonNull)
這裡非常重要,避免又一次調用傳回 null,而導緻空指針異常。
2 實際使用時可以結合apollo配置, 靈活設定每批執行的數量,如果逾時随時調整
3 用到的類庫
集合工具類: commons-collections4、guava (可以不用)
這裡的list劃分子list也可以使用stream的 skip ,limit特性自己去做,集合判空也可以不借助collectionutils.
構造資料:easy-random
單元測試架構: Junit4 、 powermockito、mockito
4 大家可以加一些更強大的功能,如允許設定每次調用的時間間隔、并行或并發調用等。
三、改進
以上面的List接口為例,将其改為異步版本:
public static <T, V> List<V> partitionCall2ListAsync(List<T> dataList,
int size,
ExecutorService executorService,
Function<List<T>, List<V>> function) {
if (CollectionUtils.isEmpty(dataList)) {
return new ArrayList<>(0);
}
Preconditions.checkArgument(size > 0, "size must not be a minus");
List<CompletableFuture<List<V>>> completableFutures = Lists.partition(dataList, size)
.stream()
.map(eachList -> {
if (executorService == null) {
return CompletableFuture.supplyAsync(() -> function.apply(eachList));
} else {
return CompletableFuture.supplyAsync(() -> function.apply(eachList), executorService);
}
})
.collect(Collectors.toList());
CompletableFuture<Void> allFinished = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
try {
allFinished.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return completableFutures.stream()
.map(CompletableFuture::join)
.filter(CollectionUtils::isNotEmpty)
.reduce(new ArrayList<V>(), ((list1, list2) -> {
List<V> resultList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(list1)){
resultList.addAll(list1);
}
if(CollectionUtils.isNotEmpty(list2)){
resultList.addAll(list2);
}
return resultList;
}));
}
測試代碼:
// 測試資料
private List<String> mockDataList;
private int total = 300;
private AtomicInteger atomicInteger;
@Before
public void init() {
// 構造total條資料
mockDataList = easyRandom.objects(String.class, total).collect(Collectors.toList());
}
@Test
public void test_call_return_list_partition_async() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
atomicInteger = new AtomicInteger(0);
Stopwatch stopwatch = Stopwatch.createStarted();
// 分批執行
int size = 2;
List<Integer> resultList = ExecuteUtil.partitionCall2ListAsync(mockDataList, size, executorService, (eachList) -> someCall(2L, eachList));
Stopwatch stop = stopwatch.stop();
log.info("執行時間: {} 秒", stop.elapsed(TimeUnit.SECONDS));
Assert.assertEquals(total, resultList.size());
// 正好幾輪
int turns;
if (total % size == 0) {
turns = total / size;
} else {
turns = total / size + 1;
}
log.info("共調用了{}次", turns);
Assert.assertEquals(turns, atomicInteger.get());
// 順序也一緻
for(int i =0; i< mockDataList.size();i++){
Assert.assertEquals((Integer) mockDataList.get(i).length(), resultList.get(i));
}
}
* 模拟一次調用
*/
private List<Integer> someCall(Long id, List<String> strList) {
log.info("目前-->{},strList.size:{}", atomicInteger.incrementAndGet(), strList.size());
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return strList.stream()
.map(String::length)
.collect(Collectors.toList());
}
通過異步可以盡可能快得拿到執行結果。
四、總結
1 要靈活運用Java 8 的 特性簡化代碼
2 要注意代碼的封裝來使代碼更加優雅,複用性更強
3 要利用來構造單元測試的資料架構如 java-faker和easy-random來提高構造資料的效率
4 要了解性能改進的常見思路:合并請求、并發、并行、緩存等。