天天看點

為什麼使用消息隊列-kafka第二集

1、使用同步的通訊方式來解決多個服務之間的通訊

為什麼使用消息隊列-kafka第二集

2、使用異步的通訊方式

為什麼使用消息隊列-kafka第二集

Ø 消息(Message)是指應用于應用之間傳送的資料,消息的類型包括文本字元串、JSON、XML、内嵌對象等等...

Ø 所謂消息中間件/消息隊列(Message Queue Middleware,簡稱MQ)是利用高效可靠的消息傳遞機制進行資料交流,同時可以基于資料通信來進行分布式系統的繼承,消息中間件一般有兩種傳遞模式:點對點(Point-to-Point)模式和釋出/訂閱(Pub/Sub)模式,點對點模式是基于隊列的,消息生産者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸成為了可能,釋出訂閱模式定義了如何向一個内容節點釋出和訂閱内容,這個内容節點叫topic,這種模式可以滿足消費者釋出一個消息,多個消費者同時消費同一資訊的需求。

Ø 一般來說,消息隊列是一種異步的服務間通信方式,是分布式系統中重要的元件,主要解決應用耦合,異步消息,流量削鋒等問題,實作高性能,高可用,可伸縮和最終一緻性架構。使用較多的消息隊列有ActiveMQ、RocketMQ、RabbitMQ、Kafka等。

一、多線程和消息隊列的差別?

① 多線程是防止系統的阻塞(優先響應使用者,背景任務執行)

② 消息隊列是提高系統處理業務的效率(異步處理加快程式執行速度)

二、消息隊列和多線程的選擇

可靠性要求高時選擇消息隊列:消息隊列和多線程兩者并不沖突,多線程可以作為隊列的生産者和消費者。使用外部的消息隊列時,第一是可以提高應用的穩定性,當程式fail後,已經寫入外部消息隊列的資料依舊是儲存的,如果使用兩步commit的隊列的話,可以更加提高這個項目。

不着急知道結果,盡量使用消息隊列,保證伺服器的壓力減小,因為多線程對cpu的消耗大一點:用線程的話,會占用主伺服器資源, 消息隊列的話, 可以放到其他機器上運作, 讓主伺服器盡量多的服務其他請求。我個人認為, 如果使用者不急着知道結果的操作, 用消息隊列, 否則再考慮用不用線程。

需要解耦的時候用消息隊列:解耦更充分,架構更合理。多線程是在程式設計語言層面解決問題,消息隊列是在架構層面解決問題。我認為架構層面解決問題是“覺悟比較高的方式“,理想情況下應該限制語言層面濫用多線程,能不用就不用。

如果容易出現線程安全問題的業務或者批量操作時,也盡量使用消息隊列:批量發送郵件時,資料量龐大,如果使用多線程對系統不安全。

三、消息隊列和線程池的比較

1) 兩者内部都使用了隊列,如阻塞隊列、優先級隊列;

2) 使用線程池時應用伺服器既充當生産者又充當消費者,也是消息隊列中間件的實作者,使用消息隊列時中間件、生産者、消費者可以部署在不同的應用機器上(當然也可以部署在一台伺服器上但很少有人這麼用);

3) 出于第2點線程池更适合非分布式的系統,但在分布式架構下消息隊列明顯是更突出優勢;

4) 使用消息隊列會帶來額外的網絡開銷;

5) 消息隊列的耦合性更低,可擴充性更好,适用于弱一緻性的場景,如對log日志的解耦;

6) 消息隊列自動實作消息的持久化,中間已經實作了大量功能,如消息轉發、消息拒絕、消息重試,以及對消息的一些監控,例如消息的消費狀态、消息的消費速率等,使用線程池如果需要很多功能還要自己去實作,例如需要執行狀态需要列印隊列數量、計算消息消費速度;

7) 在不同系統間的服務調用(調用協定也可能不一緻)線程池很難實作或開銷很大,這時候消息隊列可以屏蔽不同機器或不同協定的問題;

8) 使用消息隊列會提升系統的複雜度,網絡抖動怎麼辦?最大隊列長度怎麼設定?逾時時間又設定多少?Qos又設定為多少?消費者多少個比較合适?Channel cache size又該設定為多少?業務線可能都是用同一個Mq,你占資源太多,或者設計不當可能會導緻整個Mq故障

為什麼使用消息隊列-kafka第二集

四、消息隊列的主要作用

① 解耦

如果采用推送的方式,A 系統通過接口調用發送資料到 B、C、D 三個系統,A 系統的維護成本就非常的高,而且 A 系統要時時刻刻考慮B、C、D 四個系統如果出現故障該怎麼辦?使用消息隊列就可以解決這個問題。A 系統隻負責生産資料,不需要考慮消息被哪個系統來消費。

② 異步

A 系統需要發送個請求給 B 系統處理,由于 B 系統需要查詢資料庫花費時間較長,以至于 A 系統要等待 B 系統處理完畢後再發送下個請求,造成 A 系統資源浪費。使用消息隊列後,A 系統生産完消息後直接丢進消息隊列,不用等待 B 系統的結果,直接繼續去幹自己的事情了。

③ 削峰

A 系統調用 B 系統處理資料,每天 0 點到 12 點,A 系統風平浪靜,每秒并發請求數量就 100 個。結果每次一到 12 點 ~ 13 點,每秒并發請求數量突然會暴增到 1 萬條。但是 B 系統最大的處理能力就隻能是每秒鐘處理 1000 個請求,這樣系統很容易就會崩掉。這種情況可以引入消息隊列,把請求資料先存入消息隊列中,消費系統再根據自己的消費能力拉取消費。

五、消息隊列的局限性

首先,消息隊列會降低系統的可用性,因為引入消息隊列,就等于引入多一種外部依賴,多一種外部依賴,挂掉的可能性就多一點;

其次,消息隊列會使得系統複雜度提高,比如:需要保證消息沒有被重複消費、處理消息丢失的情況、保證消息傳遞的順序性等等問題;

最後,使用消息隊列,要考慮一緻性問題:A 系統處理完了直接傳回成功了,但問題是:要是 B、C、D 三個系統那裡,B 和 D 兩個系統寫庫成功了,結果 C 系統寫庫失敗了,就造成資料不一緻了。

六、kafka消息隊列的高可用

Kafka 使用的是 partition和 replica 模式來保證高可用

① Partition:

partition(分區)是kafka的一個核心概念,kafka将1個topic分成了一個或多個分區,每個分區在實體上對應一個目錄,分區目錄下存儲的是該分區的日志段(segment),包括日志的資料檔案和兩個索引檔案。然後每個分區又對應一個或多個副本,由一個ISR清單來維護。 注意:分區數可以大于節點數,但是副本數不能大于節點數,因為副本需要分不到不同的節點上,才能達到備份的目的。

② Replica:

Kafka 是有主題概念的,而每個主題又進一步劃分成若幹個分區。副本的概念實際上是在分區層級下定義的,每個分區配置有若幹個副本。

所謂副本(Replica),本質就是一個隻能追加寫消息的送出日志。根據 Kafka 副本機制的定義,同一個分區下的所有副本儲存有相同的消息序列,這些副本分散儲存在不同的 Broker 上,進而能夠對抗部分 Broker 當機帶來的資料不可用。

在實際生産環境中,每台 Broker 都可能儲存有各個主題下不同分區的不同副本,是以,單個 Broker 上存有成百上千個副本的現象是非常正常的。接下來我們來看一張圖,它展示的是一個有 3 台 Broker 的 Kafka 叢集上的副本分布情況。

從這張圖中,我們可以看到,主題 1 分區 0 的 3 個副本分散在 3 台 Broker 上,其他主題分區的副本也都散落在不同的 Broker 上,進而實作資料備援。

為什麼使用消息隊列-kafka第二集

七、如何保證消息不被重複消費、幂等性

可以在寫資料時,先根據主鍵查一下這條資料是否存在,如果已經存在則 update;

資料庫的唯一鍵限制也可以保證不會重複插入多條,因為重複插入多條隻會報錯,不會導緻資料庫中出現髒資料;如果是寫 Redis,就沒有問題,因為 set 操作是天然幂等性的

八、保證消息不丢失

在生産階段,你需要捕獲消息發送的錯誤,并重發消息。在存儲階段,你可以通過配置刷盤和複制相關的參數,讓消息寫入到多個副本的磁盤上,來確定消息不會因為某個 Broker 當機或者磁盤損壞而丢失。在消費階段,你需要在處理完全部消費業務邏輯之後,再發送消費确認。

在 Producer 端,我們給每個發出的消息附加一個連續遞增的序号,然後在 Consumer 端來檢查這個序号的連續性。如果沒有消息丢失,Consumer 收到消息的序号必然是連續遞增的,或者說收到的消息,其中的序号必然是上一條消息的序号 +1。如果檢測到序号不連續,那就是丢消息了。還可以通過缺失的序号來确定丢失的是哪條消息,友善進一步排查原因。大多數消息隊列的用戶端都支援攔截器機制,你可以利用這個攔截器機制,在 Producer 發送消息之前的攔截器中将序号注入到消息中,在 Consumer 收到消息的攔截器中檢測序号的連續性,這樣實作的好處是消息檢測的代碼不會侵入到你的業務代碼中,待你的系統穩定後,也友善将這部分檢測的邏輯關閉或者删除。

九、處理消息積壓

能導緻積壓突然增加,最粗粒度的原因,隻有兩種:要麼是發送變快了,要麼是消費變慢了。大部分消息隊列都内置了監控的功能,隻要通過監控資料,很容易确定是哪種原因。如果是機關時間發送的消息增多,比如說是趕上大促或者搶購,短時間内不太可能優化消費端的代碼來提升消費性能,唯一的方法是通過擴容消費端的執行個體數來提升總體的消費能力。如果短時間内沒有足夠的伺服器資源進行擴容,沒辦法的辦法是,将系統降級,通過關閉一些不重要的業務,減少發送方發送的資料量,最低限度讓系統還能正常運轉,服務一些重要業務。還有一種不太常見的情況,你通過監控發現,無論是發送消息的速度還是消費消息的速度和原來都沒什麼變化,這時候你需要檢查一下你的消費端,是不是消費失敗導緻的一條消息反複消費這種情況比較多,這種情況也會拖慢整個系統的消費速度。如果監控到消費變慢了,你需要檢查你的消費執行個體,分析一下是什麼原因導緻消費變慢。優先檢查一下日志是否有大量的消費錯誤,如果沒有錯誤的話,可以通過列印堆棧資訊,看一下你的消費線程是不是卡在什麼地方不動了,比如觸發了死鎖或者卡在等待某些資源上了。

十、消息過期

我們可以采取一個方案,就是批量重導。就是大量積壓的時候,直接丢棄資料了,然後等過了高峰期以後開始寫程式,将丢失的那批資料一點一點的查出來,然後重新灌入 MQ 裡面去,把丢的資料給補回來。