天天看點

JAVA重試機制

作者:書香小炒肉

一.為什麼要引入重試機制

在分布式系統中,日常開發維護中,我們經常涉及到調用外部接口或者通過RPC去通路其他的業務系統。在這個過程中經常會碰到這樣的問題:被調用的第三方或者外部接口的穩定性存在問題,經常會出現請求不通或者逾時等現象,造成這種現象的原因可能是由于網絡波動或者是系統更新維護導緻短暫的不可用。這些可能會導緻我們自身的系統處于一種不穩定的狀态,不僅會在測試跟産品那經常被抱怨系統地實作問題,還會影響我們自身業務的正常進行。

諸如此類的場景:發送消息失敗、調用遠端服務失敗、争搶鎖失敗等;

重試機制可以保護系統減少因網絡波動、依賴服務短暫性不可用所帶來的影響,讓系統能更穩定運作的一種保護機制。是以在有些必須的業務中重試機制實用又有效。

二.重試機制需要具備的特點

  • 無侵入:盡量不改動目前的業務邏輯,對代碼最好做到無侵入,在需要進行重試的地方,可以簡單地實作
  • 可配置:重試次數、重試間隔時間、是否為異步重試等,可以根據業務的不同進行配置
  • 通用性:可以無改動的或者很少改動的支援絕大部分的場景,拿來即用

三.重試機制需要考慮的問題

當然,在引入重試機制的時候我們需要進行如下幾個問題的考慮:

  • 重試幾次比較合适?
  • 每次重試的間隔設定多少合适?
  • 如果所有的重試機會用完之後還是不成功應該怎麼辦?

1.重試幾次比較合适

通常來說我們單次重試所面臨的的結果是有很大的不确定性的,那麼到底需要進行多少次重試才是最合理的呢?這個就需要根據具體的業務具體進行分析了,看業務的重要性以及異常報警的等級劃分處理及時度。但是一般來說3次重試就基本可以滿足大多數的業務需求了,當然這個也需要結合重試間隔進行一起考量。為什麼說3次就基本可以說足夠了呢,因為如果系統處在長時間不可用的狀态下,我們重試多少次都是沒有實際意義的,反而徒增系統的壓力。

2.每次重試的間隔設定多少合适

如果重試間隔設定得太小,可能會造成這樣的情況:被調用的系統還沒來得及恢複我們就已經發起了調用,那麼得到的結果肯定還是失敗,這樣相當于快速調用并失敗了N次,沒有實際意義;

如果重試間隔設定得太大,可能會造成這樣的情況:犧牲掉了不少資料的時效性;

是以,重試間隔的設定要根據被調用系統的平均恢複時間來去正确的估量,通常來說這個平均恢複時間如果沒有完備的大資料分析系統是很難統計到,是以一般這個值就需要根據經驗(一般的經驗值3-5min)來進行設定并根據實際情況去不斷地修正。

3.重試機會用完之後還是失敗應該怎麼辦

當設定的重試次數全部用完之後系統仍然傳回失敗,此時此處業務相當于中斷,這時候就需要采用一定的補償措施,以保證系統流程的正常進行,保證資料的準确性時效性。

  • 增加錯誤報警機制:将重試之後依然無法成功地接口詳細資訊記錄到報警日志并進行郵件、電話、飛書等方式的提醒,這樣我們可以及時的感覺到應用發生了不可自恢複的調用異常,采取人工幹預的方式進行修複;
  • 增加手動重試的入口:增加手動重試的開關按鈕,這樣在異常發生時可以友善進行手動重試;在手動進行重試時,需要考慮接口的幂等性,否則容易造成資料的混亂。

四.重試機制的解決思路

1.Spring-Retry

spring-retry是spring自身提供的一種重試機制,可以幫助我們以标準的方式處理特定操作的重試。在spring-retry中,所有配置都是基于簡單注釋的。

  • spring-retry怎麼使用具體過程不做過多贅述

/**

* value:抛出指定異常才會重試

* include:和value一樣,預設為空,當exclude也為空時,預設所有異常

* exclude:指定不處理得異常

* maxAttempts:最大重試次數,預設3次

* backoff:重試等待政策,

* 預設使用@Backoff,@Backoff的value預設為1000L,我們設定為2000; 以毫秒為機關的延遲(預設 1000)

* multiplier(指定延遲倍數)預設為0,表示固定暫停1秒後進行重試,如果把multiplier設定為1.5,則第一次重試為2秒,第二次為3秒,第三次為4.5秒。

* Spring-Retry還提供了@Recover注解,用于@Retryable重試失敗後處理方法。

* 如果不需要回調方法,可以直接不寫回調方法,那麼實作的效果是,重試次數完了後,如果還是沒成功沒符合業務判斷,就抛出異常。

* 可以看到傳參裡面寫的是 Exception e,這個是作為回調的接頭暗号(重試次數用完了,還是失敗,我們抛出這個Exception e通知觸發這個回調方法)。

* 注意事項:

* 方法的傳回值必須與@Retryable方法一緻

* 方法的第一個參數,必須是Throwable類型的,建議是與@Retryable配置的異常一緻,其他的參數,需要哪個參數,寫進去就可以了(@Recover方法中有的)

* 該回調方法與重試方法寫在同一個實作類裡面

*

* 由于是基于AOP實作,是以不支援類裡自調用方法

* 如果重試失敗需要給@Recover注解的方法做後續處理,那這個重試的方法不能有傳回值,隻能是void

* 方法内不能使用try catch,隻能往外抛異常

* @Recover注解來開啟重試失敗後調用的方法(注意,需跟重處理方法在同一個類中),此注解注釋的方法參數一定要是@Retryable抛出的異常,否則無法識别,可以在該方法中進行日志處理。

*/

  • spring-retry的缺陷

spring-retry工具雖然是基于注解的并且很優雅實作重試,但是存在幾個不友好的設計:

  • spring-retry重試的實體限制為Throwable子類,說明重試針對的是可捕捉的功能異常為設計前提的,但是我們希望依賴某個資料對象實體作為重試實體,但spring-retry架構必須輕質轉換為Throwable子類。
  • spring-retry重試的斷言對象使用的是doWithRetry的Exception異常執行個體,不符合正常内部斷言的傳回設計
  • spring-retry提倡以注解的方式對方法進行重試,重試的邏輯是同步的,當抛出相關的異常之後進行重試邏輯。如果你要以傳回值的某個狀态來判定是否需要進行重試,可能隻能通過自己判斷傳回值然後顯示抛出異常來實作了。
  • spring-retry重試機制中:隻讀操作可以進行重試,幂等寫操作可以進行重試,但是非幂等寫操作不能進行重試,這種情況下的重試可能會導緻寫入髒資料,或者産生重複資料
  • spring-retry中@Recover注解在使用時無法指定方法,如果說一個類中有多個重試方法,處理起來就會很麻煩,可能需要特定的辨別符來進行區分
  • spring-retry注意事項
  • 使用了@Retryable的方法不能在本類被調用,不然重試機制不會生效。也就是說要标記為@Service,然後在其他類中使用@Autowired注入或者@Bean去執行個體才能生效
  • 要觸發@Recover方法,那麼在@Retryable方法上不能有傳回值,隻能是void才能生效
  • 使用了@Retryable的方法裡面不能使用try...catch包裹,要在方法上抛出異常,不然不會觸發
  • 在重試期間這個方法是同步的,如果使用類似Spring Cloud這種架構的熔斷機制時,可以結合重試機制來重試後傳回結果
  • spring-retry不隻能注入方式去實作,還可以通過API的方式實作,類似熔斷處理的機制就基于API方式實作會比較寬松

2.Guava-Retrying

  • 關于guava-retrying
  • guava-retrying是Google Guava庫的一個擴充包,包含了多種重試政策,而且擴充起來非常的容易。
  • 使用guava-retrying你可以自定義來執行重試,同時也可以監控每次重試的結果和行為。
  • guava-retry工具與spring retry類似,都是通過定義重試者角色來包裝正常邏輯重試,但是Guava retry有更優的政策定義,在支援重試次數和重試頻度控制基礎上,能夠相容支援多個異常或者自定義實體對象的重試源定義,讓重試功能有更多的靈活性。相比于spring-retry:

--可以設定任何任務單次執行的時間限制,如果逾時則抛出異常;

--可以設定重試監聽器,用來執行額外的處理工作

--可以設定任務阻塞政策,即可設定目前重試完成,下次重試開始前的這段時間做什麼事情

--可以通過停止重試政策和等待政策結合使用來設定更加靈活的政策,比如指數等待時長并最多10次調用,随機等待時長并且永不停止等待

  • Guava Retryer也是線程安全的,入口調用邏輯采用的是 java.util.concurrent.Callable 的 call() 方法
  • 關鍵點介紹
  • RetryerBuilder是一個factory建立者,可以定制設定重試源且可以支援多個重試源,可以配置重試次數或重試逾時時間,以及可以配置等待時間間隔,建立重試者Retryer執行個體。
  • RetryerBuilder的重試源支援Exception異常對象 和自定義斷言對象,通過retryIfException 和retryIfResult設定,同時支援多個且能相容。
  • retryIfException,抛出runtime異常、checked異常時都會重試,但是抛出error不會重試。
  • retryIfRuntimeException隻會在抛runtime異常的時候才重試,checked異常和error都不重試。
  • retryIfExceptionOfType允許我們隻在發生特定異常的時候才重試,比如NullPointerException和IllegalStateException`都屬于runtime異常,也包括自定義的error

3.Spring-Aop

自己造輪子:使用AOP來為目标設定切面,即可在目标調用的前後添加一些額外的邏輯。

  • 首先建立一個注解,注解裡一般有2個參數:retryCount:最大重試次數;retryInterval:重試間隔;當然也可以設定更多的參數,比如指定重試的異常、重試延遲倍數等。
  • 其實spring-retry、guava-retrying也是基于切面的方式來實作的
  • 在需要進行重試的方法上加上注解
  • 編寫AOP切面

4.利用MQ進行消息重試

以kafka為例:

  1. kafka消費者采用手動異步ack确認機制,如果消費失敗,消息會重新進行消費,可以設定重試的次數,這樣就可以對要消費的消息進行簡單的重試。
  2. 流行的解決方案:設定重試主題(retry-topics)

基本的過程如下:

  • 消費者嘗試消費業務主題中(business-topic)的一條消息
  • 如果未能正确的消費該消息,則消費者将消息釋出到第一個重試主題中(retry-topic-1),然後送出消息的偏移量,以便繼續處理下一條消息。
  • 訂閱重試主題(retry-topic-1)的消費者,他的處理邏輯與業務主題的消費者邏輯相同。該消費者在進行消費嘗試的時候引入了短暫的延遲。如果這個消費者也無法消費該消息,則會将消息釋出到另一個重試主題中(retry-topic-2),并送出該消息的偏移量。
  • 這一過程不斷的進行,增加了一些重試主題(retry-topic-n)和重試消費者,同時每個重試的延遲會越來越長(用作退避政策)。最終在消費者無法處理某條消息後,會将該消息釋出到一個死信隊列(dead letter queue,DLQ)中,運維團隊将在該隊列中消息進行手動分類處理。

重試主題(retry-topics)帶來的問題以及思考

  • 重試主題看起來似乎是非常的合理,但他存在一些弊端,在實際場景中有些場景不大适用,需要進行進一步的改造。
  • 首先他忽略了不同類型的錯誤,我們暫且可分為可自動恢複的錯誤、不可自動恢複的錯誤兩種類型。可恢複性的錯誤也就是說不管重試了多少次,最終這些錯誤是可以得到解決的。舉個例子來說:一個入庫的操作,如果說消息A是因為資料庫重新開機或者當機而造成消費失敗,那麼随之而來的消息B、消息C...都将失敗。那麼這種可恢複性的錯誤直接交給消費者自身來進行重試,直至問題解決,不停的加入到重試主題中,反而會增加不必要的開銷。
  • 不可恢複的錯誤指的是無論我們重試了多少次都将失敗的錯誤。這時候加入到重試主題中可以幫助受到困擾的消息消費者繼續前進。而錯誤的消息會在重試主題中進行重試直至進入死信隊列,或者此處已經确定了是不可恢複的錯誤,直接将該消息發送到死信隊列。、
  • 另外使用這種方案的時候還需要考慮有些消息需要順序消費的問題如何解決,或者是否能接受一些資料不一緻性。

5.其他方案

  • 手動重試:在catch代碼塊中進行重試邏輯代碼。需要對代碼進行大量的侵入式修改,不優雅,可維護性差,且大部分時候還是失敗,意義不大,不建議。
  • 代理模式:不修改業務邏輯代碼,直接在業務代碼外邊再包一層重試邏輯。這樣重試邏輯全部由代理類來完成,以後想修改重試邏輯可以直接修改代理類,分工明确。但代理模式顯然過于麻煩,而且重試的邏輯一般都是大同小異,無非就是重試的次數和間隔時間不一樣而已,如果每個需要進行重試的業務類都包裝一層代理類,顯然代碼備援量太大,不夠優雅。
  • JDK動态代理:上面說代理類的方式代碼備援量太大,不夠優雅,那麼自然而然就想到了使用JDK動态代理來實作。動态代理将重試的邏輯放在一塊,顯然比代理類的方式更加友善優雅。

但是:如果被代理的類沒有其他的依賴類,直接建立不成問題;如果被代理的類依賴了其他被spring容器 管理的類,則這種方式就會抛出異常,因為沒有把被代理的執行個體注入到建立的代理執行個體中。這種情況 下,就比較複雜了,需要從Spring容器中擷取已經裝配好的,需要被代理的執行個體,然後為其建立代 理類執行個體,并交給Spring容器來管理,這樣就不用每次都重新建立新的代理類執行個體了。

同時還要考慮容器中的bean類型是Singleton還是Prototype,如果是Singleton則像上面這樣進行 操作,如果是Prototype則每次都建立代理類對象。

另外,這裡使用的是JDK動态代理,是以就存在一個天然的缺陷,如果想要被代理的類,沒有實作 任何接口,那麼就無法為其建立代理對象,這種方式就行不通了。

  • CGLib動态代理:解決了JDK動态代理帶來的缺陷,但是仍然存在問題:對原來的邏輯進行了侵入式的修改,在每個·被代理執行個體被調用的地方都需要進行調整。
  • 總的來說,這幾種方式都不是比較完善的方式,不建議使用。已經有了比較好的輪子,也不建議自己再造輪子,自己造的輪子也不見得完善。

6.一個消息重試的解決方案

  1. 案例背景

服務端Svr、資料采集中間件Svr、車端Svr進行指令的互動

  • 服務端svr發送指令消息至kafka(business-topic)
  • 資料采集中間件svr消費kafka消息(business-topic),然後發送給車端硬體svr
  • 車端svr消費後會将結果是成功失敗傳回給資料采集中間件服務svr
  • 資料采集中間件再将結果發送至kafka(result-business-topic)
  • 服務端svr消費kafka消息(result-business-topic),得到本次指令發送的最終結果

2.重試需求

  • 對于kafka消費者端的消息重試采用kafka自身的重試機制,或者是采用上文介紹的重試主題的方式
  • 本需求需要讨論解決的是:在向kafka發送消息以及消費消息都成功的情況下,但是由車端傳回的結果為失敗時,這種情況下同樣也需要進行重試,即根據傳回的狀态值進行業務端的重試。

3.解決方案

采用redis進行指令、指令索引、遠端啟動指令、指令重試次數的記錄;采用naocs配置指令重試的最大次數; 采用xxl-job進行設定指令重試的間隔時間;

  • 服務端下發消息時,進行緩存記錄,采用redis的String資料結構:

指令緩存(String類型):key:dispatch:vehicle:retry:instruction:instructionId(指令id) value:json字元串(此次下發的指令json)

指令的重試次數(String類型):key:dispatch:vehicle:retry:count:instructionId value:0(預設為0次)

【遠端啟動】指令緩存(Set類型):key:dispatch:vehicle:retry:remotestart value:instructionId

注:此兩處緩存在消息傳回成功後或者是重試N次之後進行删除;緩存的預設次數采用nacos進行配置,重試的執行間隔時間采用xxl-job進行設定,可以是每分鐘一次或者是間隔的倍數等形式。

  • 消息執行傳回結果為失敗時,記錄緩存的指令索引,采用redis的Set資料結構:

消息指令索引集合(Set類型):key:dispatch:vehicle:retry:failmsgindex value:instructionId

  • 使用xxl-job進行定時任務的調用。

擷取緩存指令索引的集合,判斷該set集合是否為空,如果為空,則【沒有需要進行重試的指令】;如果set集合不為空,則周遊集合中的每一個值。

由于是分布式系統,此處需要采用分布式鎖,我們在此采用redission架構來進行加鎖。至此還需對具體的業務進行判斷:有一個特殊的指令為【遠端啟動】指令,由于此指令在下發還未建立任務,是以無法與任務進行關聯,是以如果是這個指令就直接進行重試,如果不是則需要對任務的狀态進行判斷,我們隻對正在執行的任務進行重試,已經完成、暫停的、廢棄的任務不進行指令的重試

  • 根據需要進行重試的指令擷取已經進行重試的次數,第一次肯定是0次
  • 判斷重試的次數是否大于nacos中設定的重試次數N(比如n=3)
  • 如果已經大于等于3(實際最終隻可能等于設定的重試次數),則不再進行重試,同時删除在redis中做的那幾個相關的緩存
  • 如果小于3:首先根據指令索引擷取到需要進行重試的消息,将該消息發送至kafka,同時重試次數緩存+1,并将此處記錄寫入到記錄檔中【正在進行第N次重試,狀态為執行中】。
// 核心代碼:
 	@XxlJob("doRetryHandle")
    public void doRetryHandle() {
        log.info("...[消息重試]正在進行消息重試...");
        // 擷取執行結果為失敗的消息索引
        Set<String> msgSet = commonHandle.getRedisFailMsgIndex();
        log.info("...[消息重試]需要進行消息重試的條數為:{},消息索引為:{}", msgSet.size(), msgSet);
        if (EmptyUtil.isEmpty(msgSet)) {
            log.info("[沒有需要進行重試的指令]:doRetryHandle:[{}]", JSON.toJSONString(msgSet));
        } else {
            msgSet.forEach(instructionId -> {
                RLock lock = redissonClient.getLock(DispatchRedisConstants.DISPATCH_MSG_RETRY_LOCK + instructionId);
                log.info("...[消息重試]擷取到鎖{}", DispatchRedisConstants.DISPATCH_MSG_RETRY_LOCK + instructionId);
                try {
                    lock.lock();
                    //ADD:前置條件,隻對執行中的任務進行重試;若下發指令時,驗證任務為【廢棄/完成/暫停=非執行中】狀态,則删除該指令重試的緩存,同時更新該指令狀态為失敗
                    // todo:問題:1.遠端啟動指令時,尚未進行建立任務關聯 2.需要做指令跟任務關聯的緩存(同時注意删除時機)
                    // 判斷指令是否為遠端啟動的指令,如果是直接進行重試,如果不是則進行判斷任務的狀态
                    if (commonHandle.getRedisRetryRemoteStart(instructionId)) {
                        doRetryExcute(instructionId);
                    } else {
                        String dispatchId = commonHandle.getRedisInstrctionIdDispatchId(instructionId);
                        log.info("...[消息重試]根據指令id:{}擷取到該指令所屬的任務id:{}", instructionId, dispatchId);
                        if (EmptyUtil.isNotEmpty(dispatchId)) {
                            // 擷取任務的目前狀态
                            DispatchTask dispatchTask = dispatchTaskRepository.getById(dispatchId);
                            log.info("...[消息重試]消息為:{},任務狀态為{}", JSONObject.toJSONString(dispatchTask), dispatchTask.getTaskState());
                            if (EmptyUtil.isNotEmpty(dispatchTask)) {
                                //隻針對執行狀态的任務進行重試
                                if (dispatchTask.getTaskState().equals(EnumTaskState.EXECUTING.getCode())) {
                                    log.info("...[消息重試]正在進行消息重試,任務狀态為:{}", EnumTaskState.EXECUTING.getDesc());
                                    //執行重試
                                    doRetryExcute(instructionId);
                                } else {
                                    // 非執行狀态:則删除該指令重試的緩存,同時更新該指令狀态為失敗
                                    log.info("...[消息重試]任務狀态為:{},不是執行中的任務不進行重試", dispatchTask.getTaskState());
                                    commonHandle.doRetryFailExcute(instructionId, OperateExecuteResultEnum.FAIL.getCode());
                                }
                            } else {
                                log.info("...[消息重試]查詢不到任務,不進行重試", dispatchTask.getTaskState());
                                // 查詢不到任務
                                commonHandle.doRetryFailExcute(instructionId, OperateExecuteResultEnum.FAIL.getCode());
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("doRetryHandle消息重試異常:{}", e.getMessage());
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
        }
    }

    /**
     * 執行重試邏輯
     */
    private void doRetryExcute(String instructionId) {
        // 根據指令索引擷取指令已經進行的重試次數
        String stringCount = commonHandle.getRedisRetryCount(instructionId);
        if (EmptyUtil.isNotEmpty(stringCount)) {
            Integer retryCount = Integer.valueOf(stringCount);
            if (retryCount >= RETRY_COUNT) {
                log.info("...[消息重試]doRetryExcute:重試次數為{},不再進行重試", retryCount);
                // 删除緩存:指令索引緩存、指令緩存、指令重試次數緩存
                commonHandle.deleteRetryCaches(instructionId);
                return;
            } else {
                // 根據指令索引擷取需要進行重試的消息
                String messageJson = commonHandle.getRedisInstrction(instructionId);
                if (EmptyUtil.isNotEmpty(messageJson)) {
                    log.info("...[消息重試]doRetryExcute:需要進行重試的消息為{}", messageJson);
                    // 發送重試消息
                    kafkaSendMessage.retrySendMessage(messageJson);
                    // 重試次數+1
                    commonHandle.addRedisRetryCount(instructionId, String.valueOf(retryCount + 1));
                    // 寫日志檔案(暫時:正在進行第N次重試,**秒後進行第N+1次重試)
                    IntructionLogUpdateResultRequestDTO intructionLogUpdateResultRequestDTO = new IntructionLogUpdateResultRequestDTO()
                            .setOperateId(Long.parseLong(instructionId))
                            .setExecuteResult(OperateExecuteResultEnum.RERTY.getCode())
                            .setLogContext("進行第" + (retryCount + 1) + "次重試");
                    dispatchTaskOperateLogRepository.ModifyResultByInstructionId(intructionLogUpdateResultRequestDTO);
                }
            }
        }
    }           

總結:沒有更好的方案,隻有最合适的方案,一定要根據自己的業務進行選擇并不斷的進行優化。

歡迎關注公衆号:

JAVA重試機制