在實際業務開發中如何降低接口響應時間,即如何提高程式的并行計算能力。
本文主要包含如下内容:
1、順序執行
2、線程池+Future
3、使用Java8的CompletableFuture
4、使用Guava的ListenableFuture
1、順序執行
直接上代碼:
package com.c306.test;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class ParallelTest {
/**
* 測試方法
*
* @return
*/
private int testMethod() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
/**
* 順序執行
*/
private void test01() {
long start = System.currentTimeMillis();
List<Integer> list = new ArrayList<>(5);
list.add(this.testMethod());
list.add(this.testMethod());
list.add(this.testMethod());
list.add(this.testMethod());
list.add(this.testMethod());
log.info("costs: {}ms", System.currentTimeMillis() - start);
}
@Test
public void testSequentialExec() {
this.test01();
this.test01();
this.test01();
}
}
多次順序執行耗時結果:
2020-04-19 16:33:43,310+0800 INFO [main] com.qx.test.ParallelTest - costs: 5046ms
2020-04-19 16:33:48,329+0800 INFO [main] com.qx.test.ParallelTest - costs: 5003ms
2020-04-19 16:33:53,332+0800 INFO [main] com.qx.test.ParallelTest - costs: 5003ms
2、線程池+Future
順序執行确實很慢,是以我們需要并行執行,即同時調用testMethod()這5個方法。每個方法單獨開啟一個線程異步去執行,全部執行完成輸出結果。
注意:這樣執行有個問題是,每次調用都需要建立5個線程,線程的建立和銷毀都是需要開銷的,是以我們使用池化技術
直接上代碼:
package com.c306.test;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@Slf4j
public class ParallelTest {
/**
* IO密集型建議:2*CPU,因為IO密集型線程不是一直在運作,是以可以配置多一點;
* CPU密集型建議:因為一直在使用CPU,是以要保證線程數不能太多,可以CPU數+1;
*/
private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
/**
* 測試方法
*
* @return
*/
private int testMethod() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
/**
* 線程池+Future
*/
private void test02() {
long start = System.currentTimeMillis();
Future<Integer> test1 = executor.submit(this::testMethod);
Future<Integer> test2 = executor.submit(this::testMethod);
Future<Integer> test3 = executor.submit(this::testMethod);
Future<Integer> test4 = executor.submit(this::testMethod);
Future<Integer> test5 = executor.submit(this::testMethod);
List<Future<Integer>> futureList = Arrays.asList(test1, test2, test3, test4, test5);
List<Integer> list = futureList.stream().map(future -> {
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
log.info("costs: {}ms", System.currentTimeMillis() - start);
}
@Test
public void testFutureExec() {
this.test02();
this.test02();
this.test02();
}
}
多次并行執行耗時結果:
2020-04-19 16:53:38,983+0800 INFO [main] com.qx.test.ParallelTest - costs: 1208ms
2020-04-19 16:53:40,002+0800 INFO [main] com.qx.test.ParallelTest - costs: 1000ms
2020-04-19 16:53:41,002+0800 INFO [main] com.qx.test.ParallelTest - costs: 1000ms
效果很明顯,直接相當于一個方法的調用耗時,這種通過線程池+Future并行計算的方式,直接可以把接口性能提高上去了。
3、使用Java8的CompletableFuture
Future是java.util.concurrent并發包中的接口類,用來表示一個線程異步執行後的結果,核心方法包含:
Future.get() 阻塞調用線程,直到計算結果傳回
Future.isDone() 判斷線程是否執行完畢
Future.cancel() 取消目前線程的執行
Future.get()是阻塞調用的,想要拿到線程執行的結果,必須是Future.get()阻塞或者while(Future.isDone())輪詢方式調用。這種方式叫“主動拉(pull)”,現在流行響應式程式設計,即“主動推(push)”的方式,當線程執行完了,你告訴我就可以了。
直接上代碼:
package com.c306.test;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Slf4j
public class ParallelTest {
/**
* IO密集型建議:2*CPU,因為IO密集型線程不是一直在運作,是以可以配置多一點;
* CPU密集型建議:因為一直在使用CPU,是以要保證線程數不能太多,可以CPU數+1;
*/
private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
/**
* 測試方法
*
* @return
*/
private int testMethod() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
/**
* 使用Java8的CompletableFuture
*/
private void test03() {
long start = System.currentTimeMillis();
List<CompletableFuture<Integer>> futureList = new ArrayList<>(5);
futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[5]));
CompletableFuture<List<Integer>> resultList = allFuture.thenApplyAsync(value ->
futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executor
);
List<Integer> list = resultList.join();
log.info("costs: {}ms", System.currentTimeMillis() - start);
}
@Test
public void testCompletableFutureExec() {
this.test03();
this.test03();
this.test03();
}
}
可以看到實作方式和Future并沒有什麼不同,但是CompletableFuture提供了很多友善的方法,比如代碼中的allOf,thenApplyAsync,可以将多個CompletableFuture組合成一個CompletableFuture,再調用join方法阻塞拿到結果
多次并行執行耗時結果:
2020-04-19 17:08:17,800+0800 INFO [main] com.qx.test.ParallelTest - costs: 1212ms
2020-04-19 17:08:18,844+0800 INFO [main] com.qx.test.ParallelTest - costs: 1011ms
2020-04-19 17:08:19,847+0800 INFO [main] com.qx.test.ParallelTest - costs: 1002ms
CompletableFuture類中有很多方法可以供大家使用,不像Future隻要那麼幾個方法可以使用,這也是Java自有庫對Future的一個增強。這裡隻是簡單展示了CompletableFuture的一種用法,實際開發中需要根據不同的場景去選擇使用不同的方法。
4、使用Guava的ListenableFuture
谷歌開源的Guava中的ListenableFuture接口對java自帶的Future接口做了進一步擴充和封裝,并且提供了靜态工具類Futures。
直接上代碼:
package com.qx.test;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Slf4j
public class ParallelTest {
/**
* IO密集型建議:2*CPU,因為IO密集型線程不是一直在運作,是以可以配置多一點;
* CPU密集型建議:因為一直在使用CPU,是以要保證線程數不能太多,可以CPU數+1;
*/
private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
/**
* 測試方法
*
* @return
*/
private int testMethod() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
/**
* 使用Guava的ListenableFuture
*
* @throws ExecutionException
* @throws InterruptedException
*/
private void test04() throws ExecutionException, InterruptedException {
// guava需要包裝一下線程池
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executor);
long start = System.currentTimeMillis();
ListenableFuture<Integer> test1 = listeningExecutorService.submit(this::testMethod);
ListenableFuture<Integer> test2 = listeningExecutorService.submit(this::testMethod);
ListenableFuture<Integer> test3 = listeningExecutorService.submit(this::testMethod);
ListenableFuture<Integer> test4 = listeningExecutorService.submit(this::testMethod);
ListenableFuture<Integer> test5 = listeningExecutorService.submit(this::testMethod);
ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(test1, test2, test3, test4, test5);
List<Integer> list = listListenableFuture.get();
log.info("costs: {}ms", System.currentTimeMillis() - start);
}
@Test
public void testListenableFutureExec() throws ExecutionException, InterruptedException {
this.test04();
this.test04();
this.test04();
}
}
多次并行執行耗時結果:
2020-04-19 17:21:11,919+0800 INFO [main] com.qx.test.ParallelTest - costs: 1203ms
2020-04-19 17:21:12,944+0800 INFO [main] com.qx.test.ParallelTest - costs: 1002ms
2020-04-19 17:21:13,945+0800 INFO [main] com.qx.test.ParallelTest - costs: 1001ms
總結:以上就是如何讓接口并行計算的三種實作方式,屬于日常開發中比較常用的代碼優化技巧。這裡沒有做過多的說明和比較,需要大家查閱更多的相關源碼和資料。