天天看點

WebClient-Reactor風格的異步調用

作者:技術聯盟總壇
WebClient-Reactor風格的異步調用

前言

正如《Java異步程式設計實戰》一書中所述,Spring5中引入了與Web Servlet技術棧并行存在的,架構層面支援異步處理的Webflux技術棧。

利用Spring WebFlux可以實作服務端異步執行,另外其也提供了WebClient用來執行HTTP請求,用來實作用戶端異步調用。

WebClient 提供了Reactor的功能性、流暢的API,支援多種異步邏輯處理的聲明式編排,另外它是完全非阻塞的,并且支援流式傳輸。

WebClient的使用

首先我們建立一個簡單的web伺服器,用來作為webclient的服務端,概要代碼如下:

public class Server {
    public static void main(String[] arg) throws IOException {
        // 建立 http 伺服器, 綁定本地 8080 端口
        HttpServer httpServer = HttpServer.create(new InetSocketAddress(8080), 0);


        // 建立上下文監聽, "/" 表示比對所有 URI 請求
        httpServer.createContext("/", new HttpHandler() {
            @Override
            public void handle(HttpExchange httpExchange) throws IOException {
                // 響應内容
                byte[] respContents = "Hello World".getBytes("UTF-8");
                // 設定響應頭
                httpExchange.getResponseHeaders().add("Content-Type", "text/html; charset=UTF-8");
                httpExchange.sendResponseHeaders(200, respContents.length);
                // 模拟服務端執行耗時
                try {
                    Thread.sleep(10000);
                } catch (Exception e) {


                }
                // 設定響應内容
                httpExchange.getResponseBody().write(respContents);
                httpExchange.close();
            }
        });
        httpServer.start();
    }
}           
  • 如上代碼我們啟動了一個Web服務端,服務端内部實作休眠10s用來模拟服務端執行邏輯,最後http 響應body内寫回Hello World作為用戶端響應結果。

下面我們來看如何使用Webclient來做用戶端:

public class App {
    private static WebClient client;


    public static void init() {
        //建立webclient執行個體
        client = WebClient.builder(). build();
    }


    public static void main(String[] args) throws KsMerchantApiException, IOException {


        //1.初始化webclient
        init();


        //2.反應式調用,傳回reactor流對象
        Mono<String> resp = client
                .method(HttpMethod.GET)
                .uri("http://127.0.0.1:8080")
                .retrieve()
                .bodyToMono(String.class);


        //3.訂閱流對象
        resp.onErrorMap(throwable -> {
            System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
            return throwable;
        }).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));


        //4.調用線程列印
        System.out.println("main is over");
}           
  • 如上代碼1我們建立了一個WebClient對象
  • 代碼2我們使用client發起了一個get請求,并且使用bodyToMono(String.class)傳回反應式流對象。
  • 代碼3 我們訂閱流對象,并列印響應結果。
  • 代碼4列印main執行結束了。

首先運作服務端,然後運作用戶端,會發現用戶端如下輸出:

main is over
result:reactor-http-nio-1 Hello World

           

下面我們來分析下調用邏輯:

  • 用戶端調用線程,執行代碼2發起調用後會馬上傳回一個Mono對象,不會阻塞等待服務端寫回響應結果;調用線程繼續向下運作執行代碼3訂閱流結果,該過程不會阻塞調用線程,調用線程馬上傳回;最後調用線程執行代碼2列印日志,最終調用線程執行結束。
  • 服務端結束到請求後,會休眠10s,休眠結束後,把結果寫回用戶端,用戶端IO線程接受到服務端響應結果後,回調代碼3設定的訂閱回調函數,輸出響應結果。

可知用戶端調用線程在發起請求時沒被阻塞,響應結果回來後也沒阻塞,而是使用的IO線程來處理響應結果。可知webclient實作了嚴格意義上的異步調用。

WebClient的線程模型

WebClient-Reactor風格的異步調用
  • 如上當調用線程使用webclient發起請求後,内部會先建立一個Mono響應對象,然後切換到IO線程具體發起網絡請求。
  • 調用線程擷取到Mono對象後,一般會訂閱,也就是設定一個Consumer用來具體處理服務端響應結果。
  • 服務端接受請求後,進行處理,最後把結果寫回用戶端,用戶端接受響應後,使用IO線程把結果設定到Mono對象,進而觸發設定的Consumer回調函數的執行。

注:WebClient預設内部使用Netty實作http用戶端調用,這裡IO線程其實是netty的IO線程,而netty用戶端的IO線程内是不建議做耗時操作的,因為IO線程是用來輪訓注冊到select上的channel的資料的,如果阻塞了,那麼其他channel的讀寫請求就會得不到及時處理。是以如果consumer内邏輯比較耗時,建議從IO線程切換到其他線程來做。

那麼如何切換那?可以使用publishOn把IO線程切換到自定義線程池進行處理:

resp.publishOn(Schedulers.elastic())//切換到Schedulers.elastic()對應的線程池進行處理
        .onErrorMap(throwable -> {
            System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
            return throwable;
        }).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));

           

Reactor中Schedulers提供了幾種内置實作:

  • Schedulers.elastic():線程池中的線程是可以複用的,按需建立與空閑回收,該排程器适用于 I/O 密集型任務。
  • Schedulers.parallel():含有固定個數的線程池,該排程器适用于計算密集型任務。
  • Schedulers.single():單一線程來執行任務
  • Schedulers.immediate():立刻使用調用線程來執行。
  • Schedulers.fromExecutor():使用已有的Executor轉換為Scheduler來執行任務。

總結

異步非阻塞是未來,而結合反應式架構Reactor或Rxjava可以實作Reactor風格的異步程式設計,實作聲明式程式設計模式,可以讓我們編寫的異步代碼,可讀性大大提高。

繼續閱讀