天天看點

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(上)1 适用場景2 異步處理之坑

優秀的項目都由同步、異步和定時任務三種處理模式相輔相成。其中當屬異步程式設計充滿坑點。

1 适用場景

1.1 服務于主流程的分支流程

在注冊流程中,資料寫DB是主流程,但注冊後給使用者發優惠券或歡迎短信的操作是分支流程,時效性不強,可異步處理。

1.2 使用者無需實時看到結果的流程

比如,下單後的配貨、送貨流程完全可以進行異步處理,每個階段處理完成後,再給使用者發推送或短信讓使用者知曉即可。

1.3 MQ

任務的緩沖的分發,流量削峰、服務解耦和消息廣播。

當然了異步處理不僅僅是通過 MQ 來實作,還有其他方式

  • 比如開新線程執行,傳回 Future
  • 還有各種異步架構,比如 Vertx,它是通過 callback 的方式實作

2 異步處理之坑

異步處理流程的可靠性問題、消息發送模式的區分問題、大量死信消息堵塞隊列的問題,為友善操作,本文MQ選型RabbitMQ。

2.1 異步處理需要消息補償閉環

RabbitMQ雖可将消息落地磁盤,即使MQ異常消息資料也不會丢失,但異步流程在消息發送、傳輸、處理等環節,都可能發生消息丢失。MQ都無法確定百分百可用,業務設計都需考慮不可用時異步流程将如何繼續。

是以,對于異步處理流程,必須考慮補償或建立主備雙活流程。

2.1.1 案例

使用者注冊後異步發送歡迎消息。

  • 使用者注冊落DB為同步流程
  • 會員服務收到消息後發送歡迎消息為異步流程
  • 用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(上)1 适用場景2 異步處理之坑
  • 藍線

    MQ異步處理(主線),消息可能丢失(虛線代表異步調用)

  • 綠線

    補償Job定期消息補償(備線),以補償主線丢失的消息

  • 考慮到極端的MQ中間件失效的情況

    要求備線的處理吞吐能力達到主線性能

代碼示例

  • UserController 注冊+發送異步消息。注冊方法,一次性注冊10個使用者,使用者注冊消息不能發送出去的機率為50%。
  • 用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(上)1 适用場景2 異步處理之坑
  • MemberService 會員服務監聽使用者注冊成功的消息,并發送歡迎短信。使用ConcurrentHashMap存放那些發過短信的使用者ID實作幂等,避免相同的使用者補償時重複發短信
  • 用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(上)1 适用場景2 異步處理之坑
  • 對于MQ消費程式,處理邏輯務必考慮去重(支援幂等)因為:
  • MQ消息可能會因為中間件本身配置錯誤、穩定性等原因出現重複
  • 自動補償重複,比如本例,同一條消息可能既走MQ也走補償,肯定會出現重複,而且考慮到高内聚,補償Job本身不會做去重處理
  • 人工補償重複。出現消息堆積時,異步處理流程必然會延遲。如果我們提供了通過背景進行補償的功能,那麼在處理遇到延遲的時候,很可能會先進行人工補償,過了一段時間後處理程式又收到消息了,重複處理。我之前就遇到過一次由MQ故障引發的事故,MQ中堆積了幾十萬條發放資金的消息,導緻業務無法及時處理,營運以為程式出錯了就先通過背景進行了人工處理,結果MQ系統恢複後消息又被重複處理了一次,造成大量資金重複發放。

小結

異步處理的時候需要考慮消息重複的可能性,處理邏輯需要實作幂等,防止重複處理。

接着定義補償Job即備線操作。

在CompensationJob中定義一個@Scheduled定時任務,5秒做一次補償操作,因為Job并不知道哪些使用者注冊的消息可能丢失,是以是全量補償,補償邏輯:每5秒補償一次,按順序一次補償5個使用者,下一次補償操作從上一次補償的最後一個使用者ID開始;對于補償任務我們送出到線程池進行“異步”處理,提高處理能力。

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(上)1 适用場景2 異步處理之坑

為實作高内聚,主線和備線處理消息,最好使用同一方法。本案例的MemberService監聽到MQ消息和CompensationJob補償,調用的都是welcome。z這裡的補償邏輯簡單,實際生産代碼應該做到:

考慮配置補償的頻次、每次處理數量,以及補償線程池大小等參數為合适的值,以滿足補償的吞吐量

考慮備線補償資料進行适當延遲

比如,對注冊時間在30秒之前的使用者再進行補償,以友善和主線MQ實時流程錯開,避免沖突。

諸如目前補償到哪個使用者的offset資料,需要落地資料庫。

補償Job本身需要高可用,可以使用類似XXLJob或ElasticJob等任務系統。

運作程式,執行注冊方法注冊10個使用者,輸出如下:

[17:01:16.570] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 1
[17:01:16.571] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 5
[17:01:16.572] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 7
[17:01:16.573] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 8
[17:01:16.594] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 1
[17:01:18.597] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 1
[17:01:18.601] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 5
[17:01:20.603] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 5
[17:01:20.604] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 7
[17:01:22.605] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 7
[17:01:22.606] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 8
[17:01:24.611] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 8
[17:01:25.498] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 開始從使用者ID 0 補償
[17:01:27.510] [compensation-threadpool-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 2
[17:01:27.510] [compensation-threadpool-3] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 4
[17:01:27.511] [compensation-threadpool-2] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 3
[17:01:30.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 開始從使用者ID 5 補償
[17:01:32.500] [compensation-threadpool-6] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 6
[17:01:32.500] [compensation-threadpool-9] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 9
[17:01:35.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 開始從使用者ID 9 補償
[17:01:37.501] [compensation-threadpool-0] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 10
[17:01:40.495] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 開始從使用者ID 10 補償      

可見

  • 共10個使用者,MQ發送成功的使用者有四個:1、5、7、8
  • 補償任務第一次運作,補償了使用者2、3、4,第二次運作補償了使用者6、9,第三次運作補充了使用者10
  • 針對消息的補償閉環處理的最高标準是,能夠達到補償全量資料的吞吐量。即若補償備線足夠完善,即使直接把MQ停機,雖然會略微影響處理的及時性,但至少確定流程都能正常執行。

實際開發要考慮異步流程丢消息或進行中斷場景。

異步流程需有備線以補償,比如這裡的全量補償方式,即便異步流程徹底失效,通過補償也能讓業務繼續進行。