前言
正如《Java異步程式設計實戰》一書中所述,Spring5中引入了與Web Servlet技術棧并行存在的,架構層面支援異步處理的Webflux技術棧。
利用Spring WebFlux可以實作服務端異步執行,另外其也提供了WebClient用來執行HTTP請求,用來實作用戶端異步調用。
WebClient 提供了Reactor的功能性、流暢的API,支援多種異步邏輯處理的聲明式編排,另外它是完全非阻塞的,并且支援流式傳輸。
WebClient的使用
首先我們建立一個簡單的web伺服器,用來作為webclient的服務端,概要代碼如下:
public class Server {
public static void main(String[] arg) throws IOException {
// 建立 http 伺服器, 綁定本地 8080 端口
HttpServer httpServer = HttpServer.create(new InetSocketAddress(8080), 0);
// 建立上下文監聽, "/" 表示比對所有 URI 請求
httpServer.createContext("/", new HttpHandler() {
@Override
public void handle(HttpExchange httpExchange) throws IOException {
// 響應内容
byte[] respContents = "Hello World".getBytes("UTF-8");
// 設定響應頭
httpExchange.getResponseHeaders().add("Content-Type", "text/html; charset=UTF-8");
httpExchange.sendResponseHeaders(200, respContents.length);
// 模拟服務端執行耗時
try {
Thread.sleep(10000);
} catch (Exception e) {
}
// 設定響應内容
httpExchange.getResponseBody().write(respContents);
httpExchange.close();
}
});
httpServer.start();
}
}
- 如上代碼我們啟動了一個Web服務端,服務端内部實作休眠10s用來模拟服務端執行邏輯,最後http 響應body内寫回Hello World作為用戶端響應結果。
下面我們來看如何使用Webclient來做用戶端:
public class App {
private static WebClient client;
public static void init() {
//建立webclient執行個體
client = WebClient.builder(). build();
}
public static void main(String[] args) throws KsMerchantApiException, IOException {
//1.初始化webclient
init();
//2.反應式調用,傳回reactor流對象
Mono<String> resp = client
.method(HttpMethod.GET)
.uri("http://127.0.0.1:8080")
.retrieve()
.bodyToMono(String.class);
//3.訂閱流對象
resp.onErrorMap(throwable -> {
System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
return throwable;
}).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));
//4.調用線程列印
System.out.println("main is over");
}
- 如上代碼1我們建立了一個WebClient對象
- 代碼2我們使用client發起了一個get請求,并且使用bodyToMono(String.class)傳回反應式流對象。
- 代碼3 我們訂閱流對象,并列印響應結果。
- 代碼4列印main執行結束了。
首先運作服務端,然後運作用戶端,會發現用戶端如下輸出:
main is over
result:reactor-http-nio-1 Hello World
下面我們來分析下調用邏輯:
- 用戶端調用線程,執行代碼2發起調用後會馬上傳回一個Mono對象,不會阻塞等待服務端寫回響應結果;調用線程繼續向下運作執行代碼3訂閱流結果,該過程不會阻塞調用線程,調用線程馬上傳回;最後調用線程執行代碼2列印日志,最終調用線程執行結束。
- 服務端結束到請求後,會休眠10s,休眠結束後,把結果寫回用戶端,用戶端IO線程接受到服務端響應結果後,回調代碼3設定的訂閱回調函數,輸出響應結果。
可知用戶端調用線程在發起請求時沒被阻塞,響應結果回來後也沒阻塞,而是使用的IO線程來處理響應結果。可知webclient實作了嚴格意義上的異步調用。
WebClient的線程模型
- 如上當調用線程使用webclient發起請求後,内部會先建立一個Mono響應對象,然後切換到IO線程具體發起網絡請求。
- 調用線程擷取到Mono對象後,一般會訂閱,也就是設定一個Consumer用來具體處理服務端響應結果。
- 服務端接受請求後,進行處理,最後把結果寫回用戶端,用戶端接受響應後,使用IO線程把結果設定到Mono對象,進而觸發設定的Consumer回調函數的執行。
注:WebClient預設内部使用Netty實作http用戶端調用,這裡IO線程其實是netty的IO線程,而netty用戶端的IO線程内是不建議做耗時操作的,因為IO線程是用來輪訓注冊到select上的channel的資料的,如果阻塞了,那麼其他channel的讀寫請求就會得不到及時處理。是以如果consumer内邏輯比較耗時,建議從IO線程切換到其他線程來做。
那麼如何切換那?可以使用publishOn把IO線程切換到自定義線程池進行處理:
resp.publishOn(Schedulers.elastic())//切換到Schedulers.elastic()對應的線程池進行處理
.onErrorMap(throwable -> {
System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
return throwable;
}).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));
Reactor中Schedulers提供了幾種内置實作:
- Schedulers.elastic():線程池中的線程是可以複用的,按需建立與空閑回收,該排程器适用于 I/O 密集型任務。
- Schedulers.parallel():含有固定個數的線程池,該排程器适用于計算密集型任務。
- Schedulers.single():單一線程來執行任務
- Schedulers.immediate():立刻使用調用線程來執行。
- Schedulers.fromExecutor():使用已有的Executor轉換為Scheduler來執行任務。
總結
異步非阻塞是未來,而結合反應式架構Reactor或Rxjava可以實作Reactor風格的異步程式設計,實作聲明式程式設計模式,可以讓我們編寫的異步代碼,可讀性大大提高。