天天看點

請求合并哪家強

将相似或重複請求在上遊系統中合并後發往下遊系統,可以大大降低下遊系統的負載,提升系統整體吞吐率。文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實作BatchCollapser 三種請求合并技術,并通過其具體實作對比各自适用的場景。

前言

工作中,我們常見的請求模型都是請求-應答式,即一次請求中,服務給請求配置設定一個獨立的線程,一塊獨立的記憶體空間,所有的操作都是獨立的,包括資源和系統運算。我們也知道,在請求中處理一次系統 I/O 的消耗是非常大的,如果有非常多的請求都進行同一類 I/O 操作,那麼是否可以将這些 I/O 操作都合并到一起,進行一次 I/O 操作,是否可以大大降低下遊資源伺服器的負擔呢?

最近我工作之餘的大部分時間都花在這個問題的探究上了,對比了幾個現有類庫,為了解決一個小問題把 hystrix javanica 的代碼翻了一遍,也根據自己工作中遇到的業務需求實作了一個簡單的合并類,收獲還是挺大的。可能這個需求有點偏門,在網上搜尋結果并不多,也沒有綜合一點的資料,索性自己總結分享一下,希望能幫到後來遇到這種問題的小夥伴。

文章歡迎轉載,請尊重作者勞動成果,帶上原文連結:https://www.cnblogs.com/zhenbianshu/p/9382420.html

Hystrix Collapser

hystrix

開源的請求合并類庫(知名的)好像也隻有 Netflix 公司開源的

Hystrix

了, hystrix 專注于保持 WEB 伺服器在高并發環境下的系統穩定,我們常用它的熔斷器(Circuit Breaker) 來實作服務的服務隔離和災時降級,有了它,可以使整個系統不至于被某一個接口的高并發洪流沖塌,即使接口挂了也可以将服務降級,傳回一個人性化的響應。請求合并作為一個保障下遊服務穩定的利器,在 hystrix 内實作也并不意外。

我們在使用 hystrix 時,常用它的 javanica 子產品,以注解的方式編寫 hystrix 代碼,使代碼更簡潔而且對業務代碼侵入更低。是以在項目中我們一般至少需要引用

hystrix-core

hystrix-javanica

兩個包。

另外,hystrix 的實作都是通過 AOP,我們要還要在項目 xml 裡顯式配置 HystrixAspect 的 bean 來啟用它。

<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />           

collapser

hystrix collapser 是 hystrix 内的請求合并器,它有自定義 BatchMethod 和 注解兩種實作方式,自定義 BatchMethod 網上有各種教程,實作起來很複雜,需要手寫大量代碼,而注解方式隻需要添加兩行注解即可,但配置方式我在官方文檔上也沒找見,中文方面本文應該是獨一份兒了。

其實作需要注意的是:

  • 我們在需要合并的方法上添加

    @HystrixCollapser

    注解,在定義好的合并方法上添加

    @HystrixCommand

    注解;
  • single 方法隻能傳入一個參數,多參數情況下需要自己包裝一個參數類,而 batch 方法需要

    java.util.List<SingleParam>

  • single 方法傳回

    java.util.concurrent.Future<SingleReturn>

    , batch 方法傳回

    java.util.List<SingleReturn>

    ,且要保證傳回的結果數量和傳入的參數數量一緻。

下面是一個簡單的示例:

public class HystrixCollapserSample {

    @HystrixCollapser(batchMethod = "batch")
    public Future<Boolean> single(String input) {
        return null; // single方法不會被執行到
    }

    public List<Boolean> batch(List<String> inputs) {
        return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
    }
}           

源碼實作

為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這裡簡單總結一下 hystrix 請求合并器的具體實作,源碼的詳細解析在我的筆記:Hystrix collasper 源碼解析。

  1. 在 spring-boot 内注冊切面類的 bean,裡面包含 @HystrixCollapser 注解切面;
  2. 在方法執行時檢測到方法被

    HystrixCollapser

    注解後,spring 調用

    methodsAnnotatedWithHystrixCommand

    方法來執行 hystrix 代理;
  3. hystrix 擷取一個

    collapser

    執行個體(在目前 scope 内檢測不到即建立);
  4. hystrix 将目前請求的參數送出給 collapser, 由 collapser 存儲在一個 concurrentHashMap (RequestArgumentType -> CollapsedRequest)内,此方法會建立一個 Observable 對象,并傳回一個 觀察此對象的 Future 給業務線程;
  5. collpser 在建立時會建立一個 timer 線程,定時消費存儲的請求,timer 會将多個請求構造成一個合并後的請求,調用 batch 執行後将結果順序映射到輸出參數,并通知 Future 任務已完成。

需要注意,由于需要等待 timer 執行真正的請求操作,collapser 會導緻所有的請求的 cost 都會增加約 timerInterval/2 ms;

配置

hystrix collapser 的配置需要在

@HystrixCollapser

注解上使用,主要包括兩個部分,專有配置和 hystrixCommand 通用配置;

專有配置包括:

  • collapserKey,這個可以不用配置,hystrix 會預設使用目前方法名;
  • batchMethod,配置 batch 方法名,我們一般會将 single 方法和 batch 方法定義在同一個類内,直接填方法名即可;
  • scope,最坑的配置項,也是逼我讀源碼的元兇,

    com.netflix.hystrix.HystrixCollapser.Scope

    枚舉類,有

    REQUEST, GLOBAL

    兩種選項,在 scope 為 REQUEST 時,hystrix 會為每個請求都建立一個 collapser, 此時你會發現 batch 方法執行時,傳入的請求數總為1。而且 REQUEST 項還是預設項,不明白這樣請求合并還有什麼意義;
  • collapserProperties, 在此選項内我們可以配置 hystrixCommand 的通用配置;

通用配置包括:

  • maxRequestsInBatch, 構造批量請求時,使用的單個請求的最大數量;
  • timerDelayInMilliseconds, 此選項配置 collapser 的 timer 線程多久會合并一次請求;
  • requestCache.enabled, 配置送出請求時是否緩存;

一個完整的配置如下:

@HystrixCollapser(
            batchMethod = "batch",
            collapserKey = "single",
            scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
            collapserProperties = {
                    @HystrixProperty(name = "maxRequestsInBatch", value = "100"),
                    @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
                    @HystrixProperty(name = "requestCache.enabled", value = "true")
            })           

BatchCollapser

設計

由于業務需求,我們并不太關心被合并請求的傳回值,而且覺得 hystrix 保持那麼多的 Future 并沒有必要,于是自己實作了一個簡單的請求合并器,業務線程簡單地将請求放到一個容器裡,請求數累積到一定量或延遲了一定的時間,就取出容器内的資料統一發送給下遊系統。

設計思想跟 hystrix 類似,合并器有一個字段作為存儲請求的容器,且設定一個 timer 線程定時消費容器内的請求,業務線程将請求參數送出到合并 器的容器内。不同之處在于,業務線程将請求送出給容器後立即同步傳回成功,不必管請求的消費結果,這樣便實作了時間次元上的合并觸發。

另外,我還添加了另外一個次元的觸發條件,每次将請求參數添加到容器後都會檢驗一下容器内請求的數量,如果數量達到一定的門檻值,将在業務線程内合并執行一次。

由于有兩個次元會觸發合并,就不可避免會遇到線程安全問題。為了保證容器内的請求不會被多個線程重複消費或都漏掉,我需要一個容器能滿足以下條件:

  • 是一種 Collection,類似于 ArrayList 或 Queue,可以存重複元素且有順序;
  • 在多線程環境中能安全地将裡面的資料全取出來進行消費,而不用自己實作鎖。

java.util.concurrent 包内的

LinkedBlockingDeque

剛好符合要求,首先它實作了 BlockingDeque 接口,多線程環境下的存取操作是安全的;此外,它還提供

drainTo(Collection<? super E> c, int maxElements)

方法,可以将容器内 maxElements 個元素安全地取出來,放到 Collection c 中。

實作

以下是具體的代碼實作:

public class BatchCollapser<E> implements InitializingBean {
     private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
     private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();
     private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);

     private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
     private Handler<List<E>, Boolean> cleaner;
     private long interval;
     private int threshHold;

     private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
         this.cleaner = cleaner;
         this.threshHold = threshHold;
         this.interval = interval;
     }

     @Override
     public void afterPropertiesSet() throws Exception {
         SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
             try {
                 this.clean();
             } catch (Exception e) {
                 logger.error("clean container exception", e);
             }
         }, 0, interval, TimeUnit.MILLISECONDS);
     }

     public void submit(E event) {
         batchContainer.add(event);
         if (batchContainer.size() >= threshHold) {
             clean();
         }
     }

     private void clean() {
         List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
         batchContainer.drainTo(transferList, 100);
         if (CollectionUtils.isEmpty(transferList)) {
             return;
         }

         try {
             cleaner.handle(transferList);
         } catch (Exception e) {
             logger.error("batch execute error, transferList:{}", transferList, e);
         }
     }

     public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
         Class jobClass = cleaner.getClass();
         if (instance.get(jobClass) == null) {
             synchronized (BatchCollapser.class) {
                 if (instance.get(jobClass) == null) {
                     instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
                 }
             }
         }

         return instance.get(jobClass);
     }
 }           

以下代碼内需要注意的點:

  • 由于合并器的全局性需求,需要将合并器實作為一個單例,另外為了提升它的通用性,内部使用使用 concurrentHashMap 和 double check 實作了一個簡單的單例工廠。
  • 為了區分不同用途的合并器,工廠需要傳入一個實作了

    Handler

    的執行個體,通過執行個體的 class 來對請求進行分組存儲。
  • 由于 java.util.Timer 的阻塞特性,一個 Timer 線程在阻塞時不會啟動另一個同樣的 Timer 線程,是以使用

    ScheduledExecutorService

    定時啟動 Timer 線程。

ConcurrentHashMultiset

上面介紹的請求合并都是将多個請求一次發送,下遊伺服器處理時本質上還是多個請求,最好的請求合并是在記憶體中進行,将請求結果簡單合并成一個發送給下遊伺服器。如我們經常會遇到的需求:元素分值累加或資料統計,就可以先在記憶體中将某一項的分值或資料累加起來,定時請求資料庫儲存。

Guava 内就提供了這麼一種資料結構:

ConcurrentHashMultiset

,它不同于普通的 set 結構存儲相同元素時直接覆寫原有元素,而是給每個元素保持一個計數 count, 插入重複時元素的 count 值加1。而且它在添加和删除時并不加鎖也能保證線程安全,具體實作是通過一個 while(true) 循環嘗試操作,直到操作夠所需要的數量。

ConcurrentHashMultiset 這種排重計數的特性,非常适合資料統計這種元素在短時間内重複率很高的場景,經過排重後的數量計算,可以大大降低下遊伺服器的壓力,即使重複率不高,能用少量的記憶體空間換取系統可用性的提高,也是很劃算的。

使用 ConcurrentHashMultiset 進行請求合并與使用普通容器在整體結構上并無太大差異,具體類似于:

if (ConcurrentHashMultiset.isEmpty()) {
            return;
        }

        List<Request> transferList = Lists.newArrayList();
        ConcurrentHashMultiset.elementSet().forEach(request -> {
            int count = ConcurrentHashMultiset.count(request);
            if (count <= 0) {
                return;
            }

            transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
            ConcurrentHashMultiset.remove(request, count);
        });           

小結

最後總結一下各個技術适用的場景:

  • hystrix collapser: 需要每個請求的結果,并且不在意每個請求的 cost 會增加;
  • BatchCollapser: 不在意請求的結果,需要請求合并能在時間和數量兩個次元上觸發;
  • ConcurrentHashMultiset:請求重複率很高的統計類場景;

另外,如果選擇自己來實作的話,完全可以将 BatchCollapser 和 ConcurrentHashMultiset 結合一下,在BatchCollapser 裡使用 ConcurrentHashMultiset 作為容器,這樣就可以結合兩者的優勢了。

關于本文有什麼問題可以在下面留言交流,如果您覺得本文對您有幫助,可以點選下面的 

推薦

 支援一下我,部落格一直在更新,歡迎 

關注