天天看點

消息隊列消息丢失和消息重複發送的處理政策

分布式事務

什麼是分布式事務

我們的伺服器從單機發展到擁有多台機器的分布式系統,各個系統之前需要借助于網絡進行通信,原有單機中相對可靠的方法調用以及程序間通信方式已經沒有辦法使用,同時網絡環境也是不穩定的,造成了我們多個機器之間的資料同步問題,這就是典型的分布式事務問題。

在分布式事務中事務的參與者、支援事務的伺服器、資源伺服器以及事務管理器分别位于不同的分布式系統的不同節點之上。分布式事務就是要保證不同節點之間的資料一緻性。

常見的分布式事務解決方案

1、2PC(二階段送出)方案 - 強一緻性

2、3PC(三階段送出)方案

3、TCC (Try-Confirm-Cancel)事務 - 最終一緻性

4、Saga事務 - 最終一緻性

5、本地消息表 - 最終一緻性

6、MQ事務 - 最終一緻性

這裡重點關注下使用消息隊列實作分布式的一緻性,上面幾種的分布式設計方案的具體細節可參見文章最後的引用連結

基于 MQ 實作的分布式事務

本地消息表-最終一緻性

消息的生産方,除了維護自己的業務邏輯之外,同時需要維護一個消息表。這個消息表裡面記錄的就是需要同步到别的服務的資訊,當然這個消息表,每個消息都有一個狀态值,來辨別這個消息有沒有被成功處理。

發送放的業務邏輯以及消息表中資料的插入将在一個事務中完成,這樣避免了​

​業務處理成功 + 事務消息發送失敗​

​​,或​

​業務處理失敗 + 事務消息發送成功​

​,這個問題。

消息隊列消息丢失和消息重複發送的處理政策

圖檔

舉個栗子:

我們假定目前有兩個服務,訂單服務,購物車服務,使用者在購物車中對幾個商品進行合并下單,之後需要清空購物車中剛剛已經下單的商品資訊。

1、消息的生産方也就是訂單服務,完成了自己的邏輯(對商品進行下單操作)然後把這個消息通過 mq 發送到需要進行資料同步的其他服務中,也就是我們栗子中的購物車服務。

2、其他服務(購物車服務)會監聽這個隊列;

1、如果收到這個消息,并且資料同步執行成功了,當然這也是一個本地事務,就通過 mq 回複消息的生産方(訂單服務)消息已經處理了,然後生産方就能辨別本次事務已經結束。如果是一個業務上的錯誤,就回複消息的生産方,需要進行資料復原了。

2、很久沒收到這個消息,這種情況是不會發生的,消息的發送方會有一個定時的任務,會定時重試發送消息表中還沒有處理的消息;

3、消息的生産方(訂單服務)如果收到消息回執;

1、成功的話就修改本次消息已經處理完,也就是本次分布式事務的同步已經完成;

2、如果消息的結果是執行失敗,同時在本地復原本次事務,辨別消息已經處理完成;

3、如果消息丢失,也就是回執消息沒有收到,這種情況也不太會發生,消息的發送方(訂單服務)會有一個定時的任務,定時重試發送消息表中還沒有處理的消息,下遊的服務需要做幂等,可能會收到多次重複的消息,如果一個回複消息生産方中的某個回執資訊丢失了,後面持續收到生産方的 mq 消息,然後再次回複消息的生産方回執資訊,這樣總能保證發送者能成功收到回執,消息的生産方在接收回執消息的時候也要做到幂等性。

這裡有兩個很重要的操作:

1、伺服器處理消息需要是幂等的,消息的生産方和接收方都需要做到幂等性;

2、發送放需要添加一個定時器來周遊重推未處理的消息,避免消息丢失,造成的事務執行斷裂。

該方案的優缺點

優點:

1、在設計層面上實作了消息資料的可靠性,不依賴消息中間件,弱化了對 mq 特性的依賴。

2、簡單,易于實作。

缺點:

主要是需要和業務資料綁定到一起,耦合性比較高,使用相同的資料庫,會占用業務資料庫的一些資源。

MQ事務-最終一緻性

下面分析下幾種消息隊列對事務的支援

RocketMQ中如何處理事務

RocketMQ 中的事務,它解決的問題是,確定執行本地事務和發消息這兩個操作,要麼都成功,要麼都失敗。并且,RocketMQ 增加了一個事務反查的機制,來盡量提高事務執行的成功率和資料一緻性。

消息隊列消息丢失和消息重複發送的處理政策

圖檔

主要是兩個方面,正常的事務送出和事務消息補償

正常的事務送出

1、發送消息(half消息),這個 half 消息和普通消息的差別,在事務送出 之前,對于消費者來說,這個消息是不可見的。

2、​

​MQ SERVER​

​寫入資訊,并且傳回響應的結果;

3、根據​

​MQ SERVER​

​​響應的結果,決定是否執行本地事務,如果​

​MQ SERVER​

​寫入資訊成功執行本地事務,否則不執行;

4、根據本地事務執行的狀态,決定是否對事務進行 Commit 或者 Rollback。​

​MQ SERVER​

​收到 Commit,之後就會投遞該消息到下遊的訂閱服務,下遊的訂閱服務就能進行資料同步,如果是 Rollback 則該消息就會被丢失;

如果​

​MQ SERVER​

​沒有收到 Commit 或者 Rollback 的消息,這種情況就需要進行補償流程了

補償流程

1、​

​MQ SERVER​

​如果沒有收到來自消息發送方的 Commit 或者 Rollback 消息,就會向消息發送端也就是我們的伺服器發起一次查詢,查詢目前消息的狀态;

2、消息發送方收到對應的查詢請求,查詢事務的狀态,然後把狀态重新推送給​

​MQ SERVER​

​​,​

​MQ SERVER​

​就能之後後續的流程了。

相比于本地消息表來處理分布式事務,MQ 事務是把原本應該在本地消息表中處理的邏輯放到了 MQ 中來完成。

Kafka中如何處理事務

Kafka 中的事務解決問題,確定在一個事務中發送的多條資訊,要麼都成功,要麼都失敗。也就是保證對多個分區寫入操作的原子性。

通過配合 Kafka 的幂等機制來實作 Kafka 的 ​

​Exactly Once​

​​,滿足了​

​讀取-處理-寫入​

​這種模式的應用程式。當然 Kafka 中的事務主要也是來處理這種模式的。

什麼是​

​讀取-處理-寫入​

​模式呢?

栗如:在流計算中,用 Kafka 作為資料源,并且将計算結果儲存到 Kafka 這種場景下,資料從 Kafka 的某個主題中消費,在計算叢集中計算,再把計算結果儲存在 Kafka 的其他主題中。這個過程中,要保證每條消息隻被處理一次,這樣才能保證最終結果的成功。Kafka 事務的原子性就保證了,讀取和寫入的原子性,兩者要不一起成功,要不就一起失敗復原。

這裡來分析下 Kafka 的事務是如何實作的

它的實作原理和 RocketMQ 的事務是差不多的,都是基于兩階段送出來實作的,在實作上可能更麻煩

先來介紹下事務協調者,為了解決分布式事務問題,Kafka 引入了事務協調者這個角色,負責在服務端協調整個事務。這個協調者并不是一個獨立的程序,而是 Broker 程序的一部分,協調者和分區一樣通過選舉來保證自身的可用性。

Kafka 叢集中也有一個特殊的用于記錄事務日志的主題,裡面記錄的都是事務的日志。同時會有多個協調者的存在,每個協調者負責管理和使用事務日志中的幾個分區。這樣能夠并行的執行事務,提高性能。

下面看下具體的流程

  • 1、首先在開啟事務的時候,生産者會給協調者發送一個開啟事務的請求,協調者在事務日志中記錄下事務ID;
  • 2、然後生産者開始發送事務消息給協調者,不過需要先發送消息告知協調者在哪個主題和分區,之後就正常的發送事務消息,這些事務消息不像 RocketMQ 會儲存在特殊的隊列中,Kafka 未送出的事務消息和普通的消息一樣,隻是在消費的時候依賴用戶端進行過濾。
  • 3、消息發送完成,生産者根據自己的執行的狀态對協調者進行事務的送出或者復原;

事務的送出

1、協調者設定事務的狀态為PrepareCommit,寫入到事務日志中;

2、協調者在每個分區中寫入事務結束的辨別,然後用戶端就能把之前過濾的未送出的事務消息放行給消費端進行消費了;

事務的復原

1、協調者設定事務的狀态為PrepareAbort,寫入到事務日志中;

2、協調者在每個分區中寫入事務復原的辨別,然後之前未送出的事務消息就能被丢棄了;

這裡引用一下【消息隊列高手課中的圖檔】

消息隊列消息丢失和消息重複發送的處理政策

圖檔

RabbitMQ中的事務

RabbitMQ 中事務解決的問題是確定生産者的消息到達​

​MQ SERVER​

​,這和其他 MQ 事務還是有點差别的,這裡也不展開讨論了。

消息防丢失

先來分析下一條消息在 MQ 中流轉所經曆的階段。

消息隊列消息丢失和消息重複發送的處理政策

圖檔

生産階段:生産者産生消息,通過網絡發送到 Broker 端。

存儲階段:Broker 拿到消息,需要進行落盤,如果是叢集版的 MQ 還需要同步資料到其他節點。

消費階段:消費者在 Broker 端拉資料,通過網絡傳輸到達消費者端。

生産階段防止消息丢失

發生網絡丢包、網絡故障等這些會導緻消息的丢失

RabbitMQ 中的防丢失措施
  • 1、對于可以感覺的錯誤,我們捕獲錯誤,然後重新投遞;
  • 2、通過 RabbitMQ 中的事務解決,RabbitMQ 中的事務解決的就是生産階段消息丢失的問題;

在生産者發送消息之前,通過​

​channel.txSelect​

​​開啟一個事務,接着發送消息, 如果消息投遞 server 失敗,進行事務復原​

​channel.txRollback​

​​,然後重新發送, 如果 server 收到消息,就送出事務​

​channel.txCommit​

不過使用事務性能不好,這是同步操作,一條消息發送之後會使發送端阻塞,以等待​

​RabbitMQ Server​

​的回應,之後才能繼續發送下一條消息,生産者生産消息的吞吐量和性能都會大大降低。

  • 3、使用發送确認機制。

使用确認機制,生産者将信道設定成 confirm 确認模式,一旦信道進入 confirm 模式,所有在該信道上面釋出的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有比對的隊列之後,RabbitMQ 就會發送一個确認(Basic.Ack)給生産者(包含消息的唯一 deliveryTag 和 multiple 參數),這就使得生産者知曉消息已經正确到達了目的地了。

multiple 為 true 表示的是批量的消息确認,為 true 的時候,表示小于等于傳回的 deliveryTag 的消息 id 都已經确認了,為 false 表示的是消息 id 為傳回的 deliveryTag 的消息,已經确認了。

消息隊列消息丢失和消息重複發送的處理政策

圖檔

确認機制有三種類型

1、同步确認

2、批量确認

3、異步确認

同步模式的效率很低,因為每一條消息度都需要等待确認好之後,才能處理下一條;

批量确認模式相比同步模式效率是很高,不過有個緻命的缺陷,一旦回複确認失敗,目前确認批次的消息會全部重新發送,導緻消息重複發送;

異步模式就是個很好的選擇了,不會有同步模式的阻塞問題,同時效率也很高,是個不錯的選擇。

Kafka 中的防丢失措施

Kafaka 中引入了一個 broker。broker 會對生産者和消費者進行消息的确認,生産者發送消息到 broker,如果沒有收到 broker 的确認就可以選擇繼續發送。

隻要 Producer 收到了 Broker 的确認響應,就可以保證消息在生産階段不會丢失。有些消息隊列在長時間沒收到發送确認響應後,會自動重試,如果重試再失敗,就會以傳回值或者異常的方式告知使用者。

隻要正确處理 Broker 的确認響應,就可以避免消息的丢失。

RocketMQ 中的防丢失措施
  • 使用 SYNC 的發送消息方式,等待 broker 處理結果

RocketMQ 提供了3種發送消息方式,分别是:

同步發送:Producer 向 broker 發送消息,阻塞目前線程等待 broker 響應 發送結果。

異步發送:Producer 首先建構一個向 broker 發送消息的任務,把該任務送出給線程池,等執行完該任務時,回調使用者自定義的回調函數,執行處理結果。

Oneway發送:Oneway 方式隻負責發送請求,不等待應答,Producer 隻負責把請求發出去,而不處理響應結果。

  • 使用事務,RocketMQ 中的事務,它解決的問題是,確定執行本地事務和發消息這兩個操作,要麼都成功,要麼都失敗。

存儲階段

在存儲階段正常情況下,隻要 Broker 在正常運作,就不會出現丢失消息的問題,但是如果 Broker 出現了故障,比如程序死掉了或者伺服器當機了,還是可能會丢失消息的。

RabbitMQ 中的防丢失措施

防止在存儲階段消息額丢失,可以做持久化,防止異常情況(重新開機,關閉,當機)。。。

RabbitMQ 持久化中有三部分:

  • 交換器的持久化

交換器的持久化,是通過在聲明隊列時将 durable 參數置為 true 實作的,不設定持久化的話,交換器的資訊将會丢失。

  • 隊列持久化

隊列的持久化,是通過在聲明隊列時将 durable 參數置為 true 實作的,隊列的持久化能保證其本身的中繼資料不會因異常情況而丢失,但是并不能保證内部所存儲的消息不會丢失。

  • 消息的持久化

消息的持久化,在投遞時指定 ​

​delivery_mode=2​

​(1是非持久化),消息的持久化,需要配合隊列的持久,隻設定消息的持久化,重新開機之後隊列消失,繼而消息也會丢失。是以如果隻設定消息持久化而不設定隊列的持久化意義不大。

對于持久化,如果所有的消息都設定持久化,會影響寫入的性能,是以可以選擇對可靠性要求比較高的消息進行持久化處理。

不過消息持久化并不能百分之百避免消息的丢失

比如資料在落盤的過程中當機了,消息還沒及時同步到記憶體中,這也是會丢資料的,這種問題可以通過引入鏡像隊列來解決。

鏡像隊列的作用:引入鏡像隊列,可已将隊列鏡像到叢集中的其他 Broker 節點之上,如果叢集中的一個節點失效了,隊列能夠自動切換到鏡像中的另一個節點上來保證服務的可用性。(更細節的這裡不展開讨論了)

Kafka 中的防丢失措施

作業系統本身有一層緩存,叫做 Page Cache,當往磁盤檔案寫入的時候,系統會先将資料流寫入緩存中。

Kafka 收到消息後也會先存儲在也緩存中(Page Cache)中,之後由作業系統根據自己的政策進行刷盤或者通過 fsync 指令強制刷盤。如果系統挂掉,在 PageCache 中的資料就會丢失。也就是對應的 Broker 中的資料就會丢失了。

圖檔

處理思路

1、控制競選分區 leader 的 Broker。如果一個 Broker 落後原先的 Leader 太多,那麼它一旦成為新的 Leader,必然會造成消息的丢失。

2、控制消息能夠被寫入到多個副本中才能送出,這樣避免上面的問題1。

RocketMQ 中的防丢失措施

1、将刷盤方式改成同步刷盤;

2、對于多個節點的 Broker,需要将 Broker 叢集配置成:至少将消息發送到 2 個以上的節點,再給用戶端回複發送确認響應。這樣當某個 Broker 當機時,其他的 Broker 可以替代當機的 Broker,也不會發生消息丢失。

消費階段

消費階段就很簡單了,如果在網絡傳輸中丢失,這個消息之後還會持續的推送給消費者,在消費階段我們隻需要控制在業務邏輯處理完成之後再去進行消費确認就行了。

總結:對于消息的丢失,也可以借助于本地消息表的思路,消息産生的時候進行消息的落盤,長時間未處理的消息,使用定時重推到隊列中。

消息重複發送

消息在 MQ 中的傳遞,大緻可以歸類為下面三種:

1、At most once: 至多一次。消息在傳遞時,最多會被送達一次。是不安全的,可能會丢資料。

2、At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丢消息,但是允許有少量重複消息出現。

3、Exactly once:恰好一次。消息在傳遞時,隻會被送達一次,不允許丢失也不允許重複,這個是最高的等級。

大部分消息隊列滿足的都是​

​At least once​

​,也就是可以允許重複的消息出現。

我們消費者需要滿足幂等性,通常有下面幾種處理方案

1、利用資料庫的唯一性

根據業務情況,標明業務中能夠判定唯一的值作為資料庫的唯一鍵,建立一個流水表,然後執行業務操作和流水表資料的插入放在同一事務中,如果流水表資料已經存在,那麼就執行失敗,借此保證幂等性。也可先查詢流水表的資料,沒有資料然後執行業務,插入流水表資料。不過需要注意,資料庫讀寫延遲的情況。

2、資料庫的更新增加前置條件

3、給消息帶上唯一ID