天天看點

【第十九篇】商城系統-分布式事務解決方案

【第十九篇】商城系統-分布式事務解決方案

分布式事務

一、為什麼需要使用分布式事務

  我們在分布式環境下一個業務可能會涉及到多個子產品之間的調用,為了保證操作的原子性,分布式事務是最好的解決方案。

【第十九篇】商城系統-分布式事務解決方案

二、本地事務

  在系統介紹分布式事務之前,我們還是很有必要回顧下本地事務。在一個服務中生效的事務我們稱為本地事務。

1.事務的特性

  事務的概念:事務是邏輯上一組操作,組成這組操作各個邏輯單元,要麼一起成功,要麼一起失敗。

事務的四個特性(ACID):

  1. 原子性(atomicity):“原子”的本意是“不可再分”,事務的原子性表現為一個事務中涉及到的多個操作在邏輯上缺一不可。事務的原子性要求事務中的所有操作要麼都執行,要麼都不執行。
  2. 一緻性(consistency):​

    ​一緻​

    ​指的是資料的一緻,具體是指:所有資料都處于滿足業務規則的一緻性狀态。一緻性原則要求:一個事務中不管涉及到多少個操作,都必須保證事務執行之前資料是正确的,事務執行之後資料仍然是正确的。如果一個事務在執行的過程中,其中某一個或某幾個操作失敗了,則必須将其他所有操作撤銷,将資料恢複到事務執行之前的狀态,這就是復原。
  3. 隔離性(isolation):在應用程式實際運作過程中,事務往往是并發執行的,是以很有可能有許多事務同時處理相同的資料,是以每個事務都應該與其他事務隔離開來,防止資料損壞。隔離性原則要求多個事務在并發執行過程中不會互相幹擾。
  4. 持久性(durability):持久性原則要求事務執行完成後,對資料的修改永久的儲存下來,不會因各種系統錯誤或其他意外情況而受到影響。通常情況下,事務對資料的修改應該被寫入到持久化存儲器中。

2.事務的隔離級别

事務并發引起一些讀的問題:

  • 髒讀 一個事務可以讀取另一個事務未送出的資料
  • 不可重複讀 一個事務可以讀取另一個事務已送出的資料 單條記錄前後不比對
  • 虛讀(幻讀) 一個事務可以讀取另一個事務已送出的資料 讀取的資料前後多了點或者少了點

并發寫:使用mysql預設的鎖機制(獨占鎖)

解決讀問題:設定事務隔離級别

  • read uncommitted(0)
  • read committed(2)
  • repeatable read(4)
  • Serializable(8)

隔離級别越高,性能越低。

【第十九篇】商城系統-分布式事務解決方案

一般情況下:髒讀是不可允許的,不可重複讀和幻讀是可以被适當允許的。

3.事務的傳播屬性

Spring中的7個事務傳播行為:

事務行為 說明
PROPAGATION_REQUIRED 支援目前事務,假設目前沒有事務。就建立一個事務
PROPAGATION_SUPPORTS 支援目前事務,假設目前沒有事務,就以非事務方式運作
PROPAGATION_MANDATORY 支援目前事務,假設目前沒有事務,就抛出異常
PROPAGATION_REQUIRES_NEW 建立事務,假設目前存在事務。把目前事務挂起
PROPAGATION_NOT_SUPPORTED 以非事務方式運作操作。假設目前存在事務,就把目前事務挂起
PROPAGATION_NEVER 以非事務方式運作,假設目前存在事務,則抛出異常
PROPAGATION_NESTED 如果目前存在事務,則在嵌套事務内執行。如果目前沒有事務,則執行與PROPAGATION_REQUIRED類似的操作。

舉例說明

ServiceA

ServiceA {   
     void methodA() {
         ServiceB.methodB();
     }
}      

ServiceB

ServiceB { 
     void methodB() {
     }  
}      
1.PROPAGATION_REQUIRED

  假如目前正要運作的事務不在另外一個事務裡,那麼就起一個新的事務 比方說,ServiceB.methodB的事務級别定義PROPAGATION_REQUIRED, 那麼因為執行ServiceA.methodA的時候,ServiceA.methodA已經起了事務。這時調用ServiceB.methodB,ServiceB.methodB看到自己已經執行在ServiceA.methodA的事務内部。就不再起新的事務。而假如ServiceA.methodA執行的時候發現自己沒有在事務中,他就會為自己配置設定一個事務。這樣,在ServiceA.methodA或者在ServiceB.methodB内的不論什麼地方出現異常。事務都會被復原。即使ServiceB.methodB的事務已經被送出,可是ServiceA.methodA在接下來fail要復原,ServiceB.methodB也要復原

【第十九篇】商城系統-分布式事務解決方案
2.PROPAGATION_SUPPORTS

  假設目前在事務中。即以事務的形式執行。假設目前不在一個事務中,那麼就以非事務的形式執行

3.PROPAGATION_MANDATORY

  必須在一個事務中執行。也就是說,他僅僅能被一個父事務調用。否則,他就要抛出異常

4.PROPAGATION_REQUIRES_NEW

  這個就比較繞口了。 比方我們設計ServiceA.methodA的事務級别為PROPAGATION_REQUIRED,ServiceB.methodB的事務級别為PROPAGATION_REQUIRES_NEW。那麼當運作到ServiceB.methodB的時候,ServiceA.methodA所在的事務就會挂起。ServiceB.methodB會起一個新的事務。等待ServiceB.methodB的事務完畢以後,他才繼續運作。

他與PROPAGATION_REQUIRED 的事務差别在于事務的復原程度了。由于ServiceB.methodB是新起一個事務,那麼就是存在兩個不同的事務。假設ServiceB.methodB已經送出,那麼ServiceA.methodA失敗復原。ServiceB.methodB是不會復原的。假設ServiceB.methodB失敗復原,假設他抛出的異常被ServiceA.methodA捕獲,ServiceA.methodA事務仍然可能送出。

【第十九篇】商城系統-分布式事務解決方案
5.PROPAGATION_NOT_SUPPORTED

  目前不支援事務。比方ServiceA.methodA的事務級别是PROPAGATION_REQUIRED 。而ServiceB.methodB的事務級别是PROPAGATION_NOT_SUPPORTED ,那麼當執行到ServiceB.methodB時。ServiceA.methodA的事務挂起。而他以非事務的狀态執行完,再繼續ServiceA.methodA的事務。

6.PROPAGATION_NEVER

  不能在事務中執行。

如果ServiceA.methodA的事務級别是PROPAGATION_REQUIRED。 而ServiceB.methodB的事務級别是PROPAGATION_NEVER ,那麼ServiceB.methodB就要抛出異常了。

7.PROPAGATION_NESTED

  如果目前存在事務,則在嵌套事務内執行。如果目前沒有事務,則執行與PROPAGATION_REQUIRED類似的操作。

@Transactional(propagation=Propagation.REQUIRED)

如果有事務, 那麼加入事務, 沒有的話建立一個(預設情況下)

@Transactional(propagation=Propagation.NOT_SUPPORTED)

容器不為這個方法開啟事務

@Transactional(propagation=Propagation.REQUIRES_NEW)

不管是否存在事務,都建立一個新的事務,原來的挂起,新的執行完畢,繼續執行老的事務

@Transactional(propagation=Propagation.MANDATORY)

必須在一個已有的事務中執行,否則抛出異常

@Transactional(propagation=Propagation.NEVER)

必須在一個沒有的事務中執行,否則抛出異常(與Propagation.MANDATORY相反)

@Transactional(propagation=Propagation.SUPPORTS)

如果其他bean調用這個方法,在其他bean中聲明事務,那就用事務.如果其他bean沒有聲明事務,那就不用事務.

4.SpringBoot事務代理對象

  在SpringBoot中如果一個對象中有多個事務方法互相調用,那麼事務傳播會失效,主要原因是目前對象直接調用了自身對象的方法,繞過了代理對象的處理,造成了事務傳播的失效。那麼對應的解決方案是 ​

​spring-boot-stater-aop​

​ 來顯示的擷取代理對象來調用

引入相關的依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
            <version>2.4.12</version>
        </dependency>      

添加aspectj的注解

【第十九篇】商城系統-分布式事務解決方案

然後在需要調用的位置通過 ​

​AopContext​

​擷取目前的代理對象

/**
     * 在service中調用自身的其他事務方法的時候,事務的傳播行為會失效
     *   因為會繞過代理對象的處理
     *
     */
    @Transactional // 事務A
    public void a(){
        OrderServiceImpl o = (OrderServiceImpl) AopContext.currentProxy();
        o.b(); // 事務A
        o.c(); // 事務C
        int a = 10/0;
    }

    @Transactional(propagation = Propagation.REQUIRED)
    public void b(){

    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void c(){

    }      

三、分布式事務

1.分布式事務基礎

CAP定理

分布式存儲系統的CAP原理(分布式系統的三個名額):

  1. Consistency(一緻性):在分布式系統中的所有資料備份,在同一時刻是否同樣的值。

    對于資料分布在不同節點上的資料來說,如果在某個節點更新了資料,那麼在其他節點如果都能讀取到這個最新的資料,那麼就稱為強一緻,如果有某個節點沒有讀取到,那就是分布式不一緻。

  2. Availability(可用性):在叢集中一部分節點故障後,叢集整體是否還能響應用戶端的讀寫請求。(要求資料需要備份)
  3. Partition tolerance(分區容忍性):大多數分布式系統都分布在多個子網絡。每個子網絡就叫做一個區(partition)。分區容錯的意思是,區間通信可能失敗。

  CAP理論就是說在分布式存儲系統中,最多隻能實作上面的兩點。而由于目前的網絡硬體肯定會出現延遲丢包等問題,是以分區容忍性是我們無法避免的。是以我們隻能在一緻性和可用性之間進行權衡,沒有系統能同時保證這三點。要麼選擇CP、要麼選擇AP。

【第十九篇】商城系統-分布式事務解決方案

BASE定理

  BASE是對CAP中一緻性和可用性權衡的結果,其來源于對大規模網際網路系統分布式實踐的結論,是基于CAP定理逐漸演化而來的,其核心思想是即使無法做到強一緻性(Strong consistency),但每個應用都可以根據自身的業務特點,采用适當的方式來使系統達到最終一緻性(Eventual consistency)。接下來看看BASE中的三要素:

  1. Basically Available(基本可用)

    基本可用是指分布式系統在出現故障的時候,允許損失部分可用性,即保證核心可用。

    電商大促時,為了應對通路量激增,部分使用者可能會被引導到降級頁面,服務層也可能隻提供降級服務。這就是損失部分可用性的展現。

  2. Soft state(軟狀态)

    軟狀态是指允許系統存在中間狀态,而該中間狀态不會影響系統整體可用性。分布式存儲中一般一份資料至少會有三個副本,允許不同節點間副本同步的延時就是軟狀态的展現。mysql replication的異步複制也是一種展現。

  3. Eventually consistent(最終一緻性)

    最終一緻性是指系統中的所有資料副本經過一定時間後,最終能夠達到一緻的狀态。弱一緻性和強一緻性相反,最終一緻性是弱一緻性的一種特殊情況。

BASE模型是傳統ACID模型的反面,不同于ACID,BASE強調犧牲高一緻性,進而獲得可用性,資料允許在一段時間内的不一緻,隻要保證最終一緻就可以了。

2.分布式事務的解決方案

​​https://www.processon.com/view/link/62a1ddce0791293ad1a552c0​​

  分布式事務是企業內建中的一個技術難點,也是每一個分布式系統架構中都會涉及到的一個東西,特别是在微服務架構中,幾乎可以說是無法避免。

主流的解決方案如下:

  1. 基于XA協定的兩階段送出(2PC)
  2. 柔性事務-TCC事務
  3. 柔性事務-最終一緻性

2.1 兩階段送出(2PC)

  2PC即兩階段送出協定,是将整個事務流程分為兩個階段,準備階段(Prepare phase)、送出階段(commit phase),2是指兩個階段,P是指準備階段,C是指送出階段。

【第十九篇】商城系統-分布式事務解決方案

第一階段:事務協調器要求每個涉及到事務的資料庫預送出(precommit)此操作,并反映是否可以送出.

第二階段:事務協調器要求每個資料庫送出資料。

其中,如果有任何一個資料庫否決此次送出,那麼所有資料庫都會被要求復原它們在此事務中的那部分資訊。

目前主流資料庫均支援2PC【2 Phase Commit】

XA 是一個兩階段送出協定,又叫做 XA Transactions。

MySQL從5.5版本開始支援,SQL Server 2005 開始支援,Oracle 7 開始支援。

  總的來說,XA協定比較簡單,而且一旦商業資料庫實作了XA協定,使用分布式事務的成本也比較低。但是,XA也有緻命的缺點,那就是性能不理想,特别是在交易下單鍊路,往往并發量很高,XA無法滿足高并發場景。

  1. 兩階段送出涉及多次節點間的網絡通信,通信時間太長!
  2. 事務時間相對于變長了,鎖定的資源的時間也變長了,造成資源等待時間也增加好多。
  3. XA目前在商業資料庫支援的比較理想,在mysql資料庫中支援的不太理想,mysql的XA實作,沒有記錄prepare階段日志,主備切換會導緻主庫與備庫資料不一緻。許多nosql也沒有支援XA,這讓XA的應用場景變得非常狹隘。

2.2 TCC補償式事務

TCC 是一種程式設計式分布式事務解決方案。

TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的确認和補償(撤銷)操作。TCC模式要求從服務提供三個接口:Try、Confirm、Cancel。

  • Try:主要是對業務系統做檢測及資源預留
  • Confirm:真正執行業務,不作任何業務檢查;隻使用Try階段預留的業務資源;Confirm操作滿足幂等性。
  • Cancel:釋放Try階段預留的業務資源;Cancel操作滿足幂等性。

整個TCC業務分成兩個階段完成:

【第十九篇】商城系統-分布式事務解決方案

第一階段:主業務服務分别調用所有從業務的try操作,并在活動管理器中登記所有從業務服務。當所有從業務服務的try操作都調用成功或者某個從業務服務的try操作失敗,進入第二階段。

第二階段:活動管理器根據第一階段的執行結果來執行confirm或cancel操作。如果第一階段所有try操作都成功,則活動管理器調用所有從業務活動的confirm操作。否則調用所有從業務服務的cancel操作。

舉個例子,假如 Bob 要向 Smith 轉賬100元,思路大概是:

我們有一個本地方法,裡面依次調用

  1. 首先在 Try 階段,要先檢查Bob的錢是否充足,并把這100元鎖住,Smith賬戶也當機起來。
  2. 在 Confirm 階段,執行遠端調用的轉賬的操作,轉賬成功進行解凍。
  3. 如果第2步執行成功,那麼轉賬成功,如果第二步執行失敗,則調用遠端當機接口對應的解凍方法 (Cancel)。

缺點:

  • Canfirm和Cancel的幂等性很難保證。
  • 這種方式缺點比較多,通常在複雜場景下是不推薦使用的,除非是非常簡單的場景,非常容易提供復原Cancel,而且依賴的服務也非常少的情況。
  • 這種實作方式會造成代碼量龐大,耦合性高。而且非常有局限性,因為有很多的業務是無法很簡單的實作復原的,如果串行的服務很多,復原的成本實在太高。

不少大公司裡,其實都是自己研發 TCC 分布式事務架構的,專門在公司内部使用。國内開源出去的:ByteTCC,TCC-transaction,Himly。

2.3 消息事務+最終一緻性

  基于消息中間件的兩階段送出往往用在高并發場景下,将一個分布式事務拆成一個消息事務(A系統的本地操作+發消息)+B系統的本地操作,其中B系統的操作由消息驅動,隻要消息事務成功,那麼A操作一定成功,消息也一定發出來了,這時候B會收到消息去執行本地操作,如果本地操作失敗,消息會重投,直到B操作成功,這樣就變相地實作了A與B的分布式事務。

【第十九篇】商城系統-分布式事務解決方案

雖然上面的方案能夠完成A和B的操作,但是A和B并不是嚴格一緻的,而是最終一緻的,我們在這裡犧牲了一緻性,換來了性能的大幅度提升。當然,這種玩法也是有風險的,如果B一直執行不成功,那麼一緻性會被破壞,具體要不要玩,還是得看業務能夠承擔多少風險。

适用于高并發最終一緻

低并發基本一緻:二階段送出

高并發強一緻:沒有解決方案

3.seata

分布式事務解決方案seata

官網:https://seata.io/zh-cn/docs/overview/what-is-seata.html

GitHub:https://github.com/seata/seata

3.1 seata服務的安裝

  我們可以先導入seata的依賴,根據依賴的版本來下載下傳對應的seata安裝檔案

【第十九篇】商城系統-分布式事務解決方案

​​https://github.com/seata/seata/releases​​

下載下傳後解壓縮然後進入conf檔案夾,然後通過registery.conf檔案可以更新配置中心和注冊中心的資訊。

【第十九篇】商城系統-分布式事務解決方案

然後進入 bin 目錄 通過 seata-server.bat檔案來啟動服務

【第十九篇】商城系統-分布式事務解決方案

然後進入nacos注冊中心可以看到對應的服務,表示OK

【第十九篇】商城系統-分布式事務解決方案

3.2 項目內建Seata

  接下來看看如何在商城項目中來內建Seata,首先是file.conf檔案

網絡傳輸配置:

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  #thread factory for netty
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-prefix = "NettyServerBizHandler"
    share-boss-worker = false
    client-selector-thread-prefix = "NettyClientSelector"
    client-selector-thread-size = 1
    client-worker-thread-prefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size = 1
    #auto default pin or 8
    worker-thread-size = 8
  }
}      

事務日志存儲配置:該部配置設定置僅在seata-server中使用,如果選擇db請配合seata.sql使用

## transaction log store, only used in seata-server
store {
  ## store mode: file、db
  mode = "file"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "mysql"
    password = "******"
  }
}      

*目前微服務在seata伺服器中注冊的資訊配置:

service {
  # 事務分組,預設:${spring.applicaiton.name}-fescar-service-group,可以随便寫
  vgroup_mapping.${spring.application.name}-fescar-service-group = "default"
  # 僅支援單節點,不要配置多位址,這裡的default要和事務分組的值一緻
  default.grouplist = "127.0.0.1:8091" #seata-server伺服器位址,預設是8091
  # 降級,目前不支援
  enableDegrade = false
  # 禁用全局事務
  disableGlobalTransaction = false
}      

用戶端相關工作的機制

client {
  rm {
    async.commit.buffer.limit = 10000
    lock {
      retry.internal = 10
      retry.times = 30
      retry.policy.branch-rollback-on-conflict = true
    }
    report.retry.count = 5
    table.meta.check.enable = false
    report.success.enable = true
  }
  tm {
    commit.retry.count = 5
    rollback.retry.count = 5
  }
  undo {
    data.validation = true
    log.serialization = "jackson"
    log.table = "undo_log"
  }
  log {
    exceptionRate = 100
  }
  support {
    # auto proxy the DataSource bean
    spring.datasource.autoproxy = false
  }
}      

首先我們需要把registry.conf和file.conf兩個配置檔案拷貝到對應的項目的屬性檔案目錄中

【第十九篇】商城系統-分布式事務解決方案

然後我們在屬性檔案中定義 tx-service-group 資訊

【第十九篇】商城系統-分布式事務解決方案

然後在file.conf中添加service屬性,然後關聯剛剛設定的tx-service-group資訊。注意1.1版本後屬性更新為了駝峰命名法:

【第十九篇】商城系統-分布式事務解決方案

3.3 案例示範

  我們在下訂單的操作中除了已有的生成訂單和訂單項已經鎖定庫存操作外我們還顯示的增加的了一個會有積分調整的服務。[1/0],這樣一來如果鎖定庫存成功,但是會員積分調整失敗,被分布式事務管理的邏輯中,鎖庫存的操作會復原。

【第十九篇】商城系統-分布式事務解決方案
【第十九篇】商城系統-分布式事務解決方案

4.取消訂單

  取消訂單出現的情況:

  • 下訂單後超過30分鐘沒有支付,需要觸發關單操作
  • 支付失敗,同樣的需要關單

實作方式:定時任務和消息中間件,定時任務對系統的性能肯定是有影響

【第十九篇】商城系統-分布式事務解決方案
【第十九篇】商城系統-分布式事務解決方案
【第十九篇】商城系統-分布式事務解決方案

RocketMQ:https://github.com/apache/rocketmq/tree/master/docs/cn

【第十九篇】商城系統-分布式事務解決方案

RocketMQ:Docker安裝

安裝NameServer

docker pull rocketmqinc/rocketmq      

建立存儲目錄

mkdir -p /mydata/docker/rocketmq/data/namesrv/logs /mydata/docker/rocketmq/data/namesrv/store      

然後安裝

docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876  -v /docker/rocketmq/data/namesrv/logs:/root/logs -v /docker/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
      

相關參數說明

【第十九篇】商城系統-分布式事務解決方案

安裝Broker

border配置:建立 broker.conf 配置檔案 vi /mydata/rocketmq/conf/broker.conf ,配置如下:

# 所屬叢集名稱,如果節點較多可以配置多個
brokerClusterName = DefaultCluster 
#broker名稱,master和slave使用相同的名稱,表明他們的主從關系 
brokerName = broker-a 
#0表示Master,大于0表示不同的
slave brokerId = 0 
#表示幾點做消息删除動作,預設是淩晨4點 
deleteWhen = 04 
#在磁盤上保留消息的時長,機關是小時 
fileReservedTime = 48 
#有三個值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和異步表示Master和Slave之間同步資料的機 制;
brokerRole = ASYNC_MASTER 
#刷盤政策,取值為:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盤和異步刷盤;SYNC_FLUSH消息寫入磁盤後 才傳回成功狀态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH 
# 設定broker節點所在伺服器的ip位址 
brokerIP1 = 192.168.100.130 
#剩餘磁盤比例 
diskMaxUsedSpaceRatio=99
      

安裝:

docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
      

相關參數說明:

【第十九篇】商城系統-分布式事務解決方案

安裝控制台:

拉取鏡像

docker pull pangliang/rocketmq-console-ng
      

控制台安裝:

docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.100.130:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng
      

通路測試:http://192.168.100.100:8080

【第十九篇】商城系統-分布式事務解決方案

5.釋放庫存

  需要釋放庫存的情況:

  • 下訂單後手動取消訂單擷取逾時支付訂單
  • 支付成功釋放庫存,更新庫存

SpringBoot整合RocketMQ

需要添加對應的依賴

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.1</version>
        </dependency>      

需要添加對應的配置資訊

【第十九篇】商城系統-分布式事務解決方案

然後定義對應的消息生産者

@Component
public class OrderMsgProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public  void sendOrderMessage(String orderSN){

        rocketMQTemplate.syncSend(OrderConstant.ROCKETMQ_ORDER_TOPIC, MessageBuilder.withPayload(orderSN).build(),5000,4);
    }
}      

然後就是延遲消息的消費者,我們得通過對應的監聽器來處理

@RocketMQMessageListener(topic = OrderConstant.ROCKETMQ_ORDER_TOPIC,consumerGroup = "${rocketmq.consumer.group}")
@Component
public class OrderMsgConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("收到的消息:" + s);
    }
}      

繼續閱讀