天天看點

Hystrix 線程池技術實作資源隔離

基于 Hystrix 線程池技術實作資源隔離

上一講提到,如果從 Nginx 開始,緩存都失效了,Nginx 會直接通過緩存服務調用商品服務擷取最新商品資料(我們基于電商項目做個讨論),有可能出現調用延時而把緩存服務資源耗盡的情況。這裡,我們就來說說,怎麼通過 Hystrix 線程池技術實作資源隔離。

資源隔離,就是說,你如果要把對某一個依賴服務的所有調用請求,全部隔離在同一份資源池内,不會去用其它資源了,這就叫資源隔離。哪怕對這個依賴服務,比如說商品服務,現在同時發起的調用量已經到了 1000,但是線程池内就 10 個線程,最多就隻會用這 10 個線程去執行,不會說,對商品服務的請求,因為接口調用延時,将 tomcat 内部所有的線程資源全部耗盡。

Hystrix 進行資源隔離,其實是提供了一個抽象,叫做 command。這也是 Hystrix 最最基本的資源隔離技術。

利用 HystrixCommand 擷取單條資料

我們通過将調用商品服務的操作封裝在 HystrixCommand 中,限定一個 key,比如下面的 GetProductInfoCommandGroup,在這裡我們可以簡單認為這是一個線程池,每次調用商品服務,就隻會用該線程池中的資源,不會再去用其它線程資源了。

public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {

    private Long productId;

    public GetProductInfoCommand(Long productId) {

        super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoCommandGroup"));

        this.productId = productId;

    }

    @Override

    protected ProductInfo run() {

        String url = "http://localhost:8081/getProductInfo?productId=" + productId;

        // 調用商品服務接口

        String response = HttpClientUtils.sendGetRequest(url);

        return JSONObject.parseObject(response, ProductInfo.class);

    }

}

我們在緩存服務接口中,根據 productId 建立 command 并執行,擷取到商品資料。

@RequestMapping("/getProductInfo")@ResponseBodypublic String getProductInfo(Long productId) {

    HystrixCommand<ProductInfo> getProductInfoCommand = new GetProductInfoCommand(productId);

    // 通過command執行,擷取最新商品資料

    ProductInfo productInfo = getProductInfoCommand.execute();

    System.out.println(productInfo);

    return "success";

}

上面執行的是 execute() 方法,其實是同步的。也可以對 command 調用 queue() 方法,它僅僅是将 command 放入線程池的一個等待隊列,就立即傳回,拿到一個 Future 對象,後面可以繼續做其它一些事情,然後過一段時間對 Future 調用 get() 方法擷取資料。這是異步的。

利用 HystrixObservableCommand 批量擷取資料

隻要是擷取商品資料,全部都綁定到同一個線程池裡面去,我們通過 HystrixObservableCommand 的一個線程去執行,而在這個線程裡面,批量把多個 productId 的 productInfo 拉回來。

public class GetProductInfosCommand extends HystrixObservableCommand<ProductInfo> {

    private String[] productIds;

    public GetProductInfosCommand(String[] productIds) {

        // 還是綁定在同一個線程池

        super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));

        this.productIds = productIds;

    }

    @Override

    protected Observable<ProductInfo> construct() {

        return Observable.unsafeCreate((Observable.OnSubscribe<ProductInfo>) subscriber -> {

            for (String productId : productIds) {

                // 批量擷取商品資料

                String url = "http://localhost:8081/getProductInfo?productId=" + productId;

                String response = HttpClientUtils.sendGetRequest(url);

                ProductInfo productInfo = JSONObject.parseObject(response, ProductInfo.class);

                subscriber.onNext(productInfo);

            }

            subscriber.onCompleted();

        }).subscribeOn(Schedulers.io());

    }

}

在緩存服務接口中,根據傳來的 id 清單,比如是以 , 分隔的 id 串,通過上面的 HystrixObservableCommand,執行 Hystrix 的一些 API 方法,擷取到所有商品資料。

public String getProductInfos(String productIds) {

    String[] productIdArray = productIds.split(",");

    HystrixObservableCommand<ProductInfo> getProductInfosCommand = new GetProductInfosCommand(productIdArray);

    Observable<ProductInfo> observable = getProductInfosCommand.observe();

    observable.subscribe(new Observer<ProductInfo>() {

        @Override

        public void onCompleted() {

            System.out.println("擷取完了所有的商品資料");

        }

        @Override

        public void onError(Throwable e) {

            e.printStackTrace();

        }

        @Override

        public void onNext(ProductInfo productInfo) {

            System.out.println(productInfo);

        }

    });

    return "success";

}

我們回過頭來,看看 Hystrix 線程池技術是如何實作資源隔離的。

Hystrix 線程池技術實作資源隔離

從 Nginx 開始,緩存都失效了,那麼 Nginx 通過緩存服務去調用商品服務。緩存服務預設的線程大小是 10 個,最多就隻有 10 個線程去調用商品服務的接口。即使商品服務接口故障了,最多就隻有 10 個線程會 hang 死在調用商品服務接口的路上,緩存服務的 tomcat 内其它的線程還是可以用來調用其它的服務,幹其它的事情。

繼續閱讀