天天看點

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

Github

本文主要講解RabbitMQ的介紹和安裝,Spring Cloud Stream核心概念,Spring Cloud Alibaba RocketMQ學習,異步消息推送與消費

1 稽核業務的實作
Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • com/javaedge/contentcenter/service/content/ShareService.java

    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

假設添加積分操作很耗時,我們的主要操作是稽核,而不關心積分,是以可以将其異步化

1.1 Spring實作異步的方法

◆ AsyncRestTemplate

  • 參考文檔

    Spring 的異步HTTP請求AsyncRestTemplate

◆ @ Async注解

◆ WebClient ( Spring 5.0引入 ,為取代AsyncRestTemplate)

◆ MQ

我們采用此法

2 引入MQ後的架構演進

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

3 MQ适用場景

  • 異步處理
  • 流量削峰填谷
  • 解耦微服務

4 MQ的選擇

流行的MQ那麼多,如何選擇?

  • Kafka、RabbitMQ、 RocketMQ、 ActiveMQ...
Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

5 搭建RocketMQ

6 搭建RocketMQ控制台

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 修改pom.xml版本
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 修改代碼
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

7 Spring消息程式設計模型

  • 推薦Maven依賴版本分析插件
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

7.1 編寫生産者

content-center

開始拿出三闆斧:

  • 引入依賴
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 添加注解

  • 寫配置
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 服務類添加模闆類
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

7.2 編寫消費者

user-center

  • 依賴
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 配置
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • com.javaedge.contentcenter.rocketmq.AddBonusTransactionListener

    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

小結

  • RocketMQ : RocketMQMessageListener
  • ActiveMQ/Artemis : JmsListener
  • RabbitMQ : RabbitListener
  • Kafka : KafkaListener

8 分布式事務

流程剖析、概念術語、

如何實作事務呢,我們知道Spring有事務注解,那麼直接就添加@Transaction注解吧!

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

可這樣是萬無一失了嗎?顯然不行,因為消息已經發出,沒法撤回了

那麼看看RocketMQ是怎麼解決分布式事務問題呢

8.1 實作分布式事務流程

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

業務流程圖

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  1. 半消息,雖然被存儲到MQserver,但會被标記為暫時不能投遞,是以消費者不會接受到該消息
  2. 半消息發送成功,開始3
  3. 開始執行本地事務
  4. 生産者根據本地事務,發送二次确認請求

    MQServer如果從4中接收到的是

    • commit,就把消息置為可投遞,這樣消費者就可消費該消息了
    • rollback:将該消息删除
  5. MQServer未收到4中的二次确認消息,就會回查
  6. 生産者檢查本地事務的執行結果
  7. 根據本地事務執行結果,發送commit/rollback消息

總體來說,就是生産者把消息發送到MQ,但MQ隻是将其标記,不讓消費者消費

然後生産者就執行本地事務,執行完後就知道到底是該投遞還是丢棄該消息了!

這其實就是典型的二次确認

消費回查就是防止二次确認消息發送異常的容錯處理

8.2 關鍵概念

◆ 半消息( Half(Prepare) Message )

暫時無法消費的消息。生産者将消息發送到了MQ server ,但這個消息會被标記為"暫不能投遞"狀态,先存儲起來;消費者不會去消費這條消息。

并不是消息的狀态,隻是一種特殊的消息而已

◆ 消息回查(Message Status Check )

網絡斷開或生産者重新開機可能導緻丢失事務消息的第二次确認。當MQ Server發現消息長時間處于半消息狀态時,将向消息生産者發送請求,詢問該消息的最終狀态(送出或復原)。

8.3 事務消息三狀态

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

◆ Commit

送出事務消息,消費者可以消費此消息

◆ Rollback

復原事務消息, broker會删除該消息,消費者不能消費.

◆UNKNOWN

broker需要回查确認該消息的狀态

9 分布式事務 - 編碼實作

  • 在内容中心新增事務日志表

    rocketmq_transaction_log

    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

對照上一小節流程圖,開始code!

9.1/2 發半消息

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

改造接口

  • 将原先如下代碼删除
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 改為如下接方法
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

9.3 執行本地事務

  • 建立

    rocketmq

    包,并在其中建立一個新類

    AddBonusTransactionListener

    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 一定要在該類上加

    @RocketMQTransactionListener

    注解

    其中的txProducerGroup一定要對應哦

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

注意這裡的

Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • msg參數即對應
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • args參數即對應
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 回查本地事務執行結果,即通過查詢日志記錄表,該表在執行完本地事務後更新
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考
  • 這即可回查啦
    Spring Cloud Alibaba 實戰(八) - 稽核業務的分布式事務處理實作1 稽核業務的實作 2 引入MQ後的架構演進3 MQ适用場景4 MQ的選擇5 搭建RocketMQ6 搭建RocketMQ控制台7 Spring消息程式設計模型8 分布式事務9 分布式事務 - 編碼實作參考

參考