天天看點

基于kafka的消息隊列的設計kafka的設計kafka的api消息隊列服務

kafka的設計

動機

建⽴一個⾼吞吐量、低延遲、分布式的消息系統。但從設計來看,它可能更像是⼀個資料庫的⽇志系統。

持久性

檔案系統其實也挺快!

⾸先,SSD硬碟的順序讀寫速度可以達到⼏百兆(普遍在300M-600M,有些好的硬碟讀取的速度甚⾄可以接近千兆)。 其次,作業系統也做了許多策略來優化硬碟的讀寫速度。

為什麼不使用記憶體來存儲資料呢?理由如下:

1. 現代作業系統為了提高随機讀硬碟的性能,會”膽大包天“地使⽤記憶體:将空的記憶體來作為硬碟的⻚緩存,是以,即使基于檔案系統,可能也會獲得不錯的性能。如果再在程序里維護⼀個資料記憶體,可能會導緻同一份資料在記憶體中存儲了兩份。

2. 此外,kafka是基于JVM開發的(使⽤了scala語言)。JVM的記憶體管理有兩個顯而易見的問題:

a) 存儲對象的時候,jvm還需要存儲對象頭等額外的資訊,經常會導緻需要使用接近資料⼤小兩倍的記憶體。

b) 當記憶體增⻓的時候,記憶體的垃圾回收将越來越慢

綜上所述,基于⽂件系統并依賴于頁緩存來設計一個系統,會⽐基于記憶體更優。有⼏個優勢:

a) 32G記憶體的機器,可以利用到的頁緩存可達到 28-30G,⽽且沒有GC的困擾

b) 重新開機服務時⾮常快,⽽純記憶體的系統,10G的資料在服務重新開機時至少需要10分鐘

c) 硬碟和記憶體之間的資料映射更為簡單了,因為作業系統已經幫你做了大部分的工作

性能

在很多系統中,由于某些下遊基礎服務很容易成為瓶頸,涉及到這些瓶頸服務的小變化也會導緻産生問題。

硬碟的順序讀寫性能,我們已經闡述過了,事實上,硬碟的順序讀寫性能是很有保證的,并不會成為系統瓶頸。除此之外,可能還存在着兩個隐藏的瓶頸:

1. 過多的⼩資料的零碎的IO操作

2. 昂貴的位元組複制:例如要将一段資料發送到⽹卡,要經過以下步驟:

1) 從硬碟讀取到核心空間(read系統調⽤用)

2) 從核心空間複制到⽤戶空間(read系統調用)

3) 從⽤戶空間⼜寫⼊核心空間(需要寫入socket buffer)

4) 從socket buffer複制到NIC buffer

如何解決以上兩個問題?

1. 使用“message set”的模型,⼀次性傳輸/寫入/讀取多條資料

2. 使用sendfile系統調⽤,将資料從⻚緩存(pagecache)直接拷⻉到NIC緩存 (NIC:Network Interface Card),省去中間的許多步驟

消費者的消費position消費者的消費記錄對⼀個消息系統的來說,也是性能關鍵之⼀。

按照傳統的消息系統(例如beanstalkd),消費者每成功消費一條消息後,會給伺服器器broker傳回⼀一個ACK,然後broker将對應的消息删除掉。這樣其實會有一些問題:

1) ACK丢失會導緻消息被處理多次

2) 性能會受影響,因為每條消息都需要維護多種狀态

kafka對消費的position有着更好的處理方式: ⾸先,我們知道,kafka對每個topic可以分成多個partition,每個partition限制了最多隻能有⼀個cosumer來處理(如果有多個consumer,那麼其餘的consumer将收不到資料),同時消費完的消息也不用删除,這就使得每個consumer在某個partition中的消費position是一個簡單的整數,⽽不用維護各種 狀态。此外,還有⼀個好處是,消費者可以⾃行調整它在某個partition的position,從⽽重複處理消息。

消息傳遞的語義

消息系統中消息傳遞的語義有三種級别:

1) At most once —— 最多⼀次:消息最多被傳遞一次,消息可能會丢失但從不會重複傳遞

2) At least once —— 最少⼀次:消息⾄少會被傳遞一次,消息不會丢失但可能會重複傳遞

3) Exactly once —— 剛好一次:消息不會丢失,并且僅僅會隻傳遞一次

首先,我們來看看對于生産者:如何保證⼀條消息正确發送到broker呢? 在kafka中,生産者釋出消息到broker後,有個“已送出(committed)”的概念:⼀旦消息已送出了,意味着存放該消息的partition的任一個broker(一個partition會複制到指定數量的broker中,以提⾼容災能力)存活着,那該消息就不會丢失。 ⽣産者釋出消息時,⼀個常見的問題是可能在釋出過程中遇到了網絡錯誤,⽣産者是⽆法判斷是在committed之前還是之後的,這就可能造成多次釋出。要解決這個問題,需要使得釋出消息的操作具備幂等性——具體的做法可以是在producer端⽣成⼀個唯⼀消息編号,當且僅當消息送出成功後,broker記錄唯⼀的消息編号,并向producer傳回一個ACK,producer收到ACK後才确認消息已釋出成功,否則重發消息。但在kafka0.8.2版本,這個釋出幂等性的功能還沒實作。 但很多實際應⽤場景中,并不需要上述的那麼嚴格的保證。在kafka中,可以配置一個⽣産者釋出消息後是否需要等待“消息已送出”。

其次,我們來分析對于消費者來說,消息傳遞的3種語義是如何處理的: 在kafka中,任一個partition的所有複制broker都有一個完全一樣的log來記錄某個consumer/consumer group在partition中的消費的位置。kafka可以通過配置實作At-most-once和At-least-once的語義,但Exactlty-once的語義則需要consumer的配合(使⽤更底層的consumer API)。

1) 如果消費者先讀取消息,然後将position記錄在log中,最後再處理理消息:那麼有可能在處理理消息時消費者crash掉了了,這就是 At-most-once的情形;

2) 如果消費者先讀取消息,然後處理消息,确認處理理成功後在記錄position:那麼這種情況下,當消息處理理失敗時,會重新讀取消息;也有可能在記錄position時發⽣了錯誤,這就導緻要重複處理理消息。這就At-least-once的情形;

3) ⽽需要達到Exactly-once的⽬标,單依賴kafka是⽆無法達成的,需要消費者⾃行存儲消費的position,這樣才可以保證消息不不會丢失并僅僅被處理了一次。

Replication複制

kafka把topic中每個partition的資料按照配置指定的數量複制到相應的broker中 (建立topic時有⼀個參數 --replication-factor),這樣達到了故障轉移的⽬的,使得即使有一台broker崩潰了也依然可以提供服務。

kafka中複制的單元是partition。在不發⽣故障的前提下,每個partition都有一個leader的節點和0個或多個的follower節點,這些節點的數量量通過參數--replication-factor來指定。leader節點負責處理所有的讀寫請求。一般的,partition的數量(包括不同的topic)會遠遠超過broker的數量,而partition的leader節點則均衡分布在每個broker上。例如有10個broker,20個topic,每個topic有5個partition,總共有100個partition,其中有20個leader的partition,這些partition都會均衡分布在10個broker中。follower中的日志⽂件跟leader中的會保持一緻,logs包含了partition的消息以及consumer的消費offset。follower從leader中同步的過程,就跟follower是一個普通的kafka consumer類似。

判斷⼀個partition節點是否還活着(alive),從兩個⽅面來判斷:

1) 節點需要維持與zookeeper的⼼跳會話

2) 節點中的消息同步不能落後leader太多(通過配置項來規定“落後太多”的具體含義)

如果partition節點滿足以上兩個條件,我們稱該節點是“in sync”的。leader節點跟蹤管理“in sync”的節點,當發現有某個節點崩潰、失去聯系、落後過多時, 則leader節點則将該節點從in sync節點清單中删除。失去聯系和落後過多的含義分别由配置項replica.lag.max.messages 和replica.lag.time.max.ms 來定義。 當一個節點重新工作并重新同步資料、趕上進度後,leader節點⼜重新将該節點加入in sync節點清單。

上⽂提到,kafka中的消息有一個“已送出(committed)”的概念,它的具體含義是:相應partition的所有的in sync節點都已經将該消息記錄到各⾃的log中(確定已寫入硬碟fsync)。隻有“已送出”的消息,consumer才能讀取到。隻要有⼀個in sync的節點存活,那麼所有已送出的消息就不會丢失。 當leader節點崩潰時,會⾃動從"in sync"節點列列表中選擇一個節點作為新的leader,是以,kafka中的partition可以容忍【f-1】個replica崩潰(f是partition的replication的數量)。

還有一個問題需要考慮的是,當⼀個partition的所有節點都崩潰的時候,kafka應該怎麼做呢?理理論上有兩個選擇

1) 等待某個in sync的節點恢複服務

2) 等待任一個節點(即使不是in sync)恢複服務,很明顯,第⼀種選擇可以保證資料的一緻性,但服務不可用的時間會增長;第二種選擇可以盡可能快地恢複服務,但資料的一緻性不能保證。在kafka 0.8.2,可以通過topic級的配置unclean.leader.election.enable來配置選擇第⼀或第⼆種, 預設是第⼆種。

基于kafka的消息隊列的設計kafka的設計kafka的api消息隊列服務

持久性和可用性的衡量

持久性指的是資料的一緻性,即保證資料盡可能不會丢失。 可⽤性指的是服務的可⽤性,即保證服務盡可能可提供使用。

當生産者向kafka釋出消息時,生産者可以通過生産者配置項request.required.acks來選擇需要等待多少個ACK後才認為寫請求已被完成。或者更明确的說,就是多少個broker(包括leader)“已送出”了消息并向leader傳回ACK。

a. request.required.acks=0:消息釋出到伺服器端即可認為已成功釋出,這是預設值

b. request.required.acks=1:消息釋出到伺服器端後,需要等待leader節點的ACK,以確定資料“已送出”。此時,隻有當leader節點崩潰并且尚未複制到别的follower的資料才可能丢失。

c. request.required.acks=-1:這個就厲害了,需要等到所有in-sync的節點都傳回ACK後,才認為消息釋出成功。但當in-sync的節點數量量隻有⼀個時,就會退 化到 b) 的情形。

此外,還有兩個配置也是⽤在持久性和可⽤性之間取舍的:

1) unclean.leader.election.enable:在所有的broker都崩潰的時候,是否允許⼀個不在in-sync列列表的節點恢複時被選作leader節點。此配置,更傾向于可⽤性。

2) min.insync.replicas:指定了in-sync節點的最⼩值,當in-sync節點數量不夠 時,partition則停⽌服務。此配置更傾向于持久性,而犧牲了可用性。

日志壓緊(Log Compaction)日志compaction不是指compress log⽂件,而是說對于一些重複更新的key,隻保留最新的值。

在kafka中,使⽤Log Compaction,可以保證在一個topic partition中,對于每個消息的key,總是保留最新的值。

kafka的api

生産者API(Producer API)kafka提供了了java版本的⽣産者API,使⽤者隻需把相關jar包引⼊自己的工程即可。

消費者API(Consumer API)

kafka的消費者API有三種:

1. ⾼度抽象的消費者API:

這些API高度封裝了一個consumer group連接配接kafka、順序擷取資料的過程,使用者很容易就能寫出一個多線程的消費者程式。但有些地方需要注意的是:

1) 如果消費者線程的數量比partition的數量還多,有些線程将永遠讀取不到資料;

2) 如果partition的數量比消費者線程的數量要多,有的線程将會從多個partition中讀取資料;

3) 如果⼀個消費者線程是從多個partition中讀取資料的,那麼讀取的資料的順序将無法保證:例如線程可能會從partition 10中讀取5條資料,然後從partition 11中讀取10條資料,然後⼜從partition 10中讀取5條資料,緊接着依然從partition 10中讀取5條資料。

4) 增加⼀條消費者線程/程序,會導緻kafka重新規劃負載均衡,進而會改變線程和partition之間的對應關系。

5) 當partition中沒有可用資料的時候,線程會阻塞。

2. 底層的消費者API: ⼀般來說,⾼度封裝好的消費者API可以滿足⼤部分的需求。但當你的應用有一些特殊需求時,例如需要重複讀取某條資料,重新開機consumer時需要從頭讀取消息等等,此時,需要使⽤底層的消費者API。

3. 這對Hadoop的消費者API: 這個有點意思:因為從kafka直接導資料到Hadoop是⼀一個很常見的場景,是以kafka還專門提供了相關的API,并已開源到github上了,可以參考。

消息隊列服務

為什麼需要消息服務

  • 解耦
  • ⼴播
  • 錯峰&流控

常用 Message Queue 對比

  • 特性 ActiveMQ RabbitMQ RocketMQ Kafka
    單機吞吐量 萬級,比 RocketMQ、Kafka 低一個數量級 同 ActiveMQ 10 萬級,支撐高吞吐 10 萬級,高吞吐,一般配合大資料類的系統來進行實時資料計算、日志采集等場景
    topic 數量對吞吐量的影響 topic 可以達到幾百/幾千的級别,吞吐量會有較小幅度的下降,這是 RocketMQ 的一大優勢,在同等機器下,可以支撐大量的 topic topic 從幾十到幾百個時候,吞吐量會大幅度下降,在同等機器下,Kafka 盡量保證 topic 數量不要過多,如果要支撐大規模的 topic,需要增加更多的機器資源
    時效性 ms 級 微秒級,這是 RabbitMQ 的一大特點,延遲最低 ms 級 延遲在 ms 級以内
    可用性 高,基于主從架構實作高可用 同 ActiveMQ 非常高,分布式架構 非常高,分布式,一個資料多個副本,少數機器當機,不會丢失資料,不會導緻不可用
    消息可靠性 有較低的機率丢失資料 基本不丢 經過參數優化配置,可以做到 0 丢失 同 RocketMQ
    功能支援 MQ 領域的功能極其完備 基于 erlang 開發,并發能力很強,性能極好,延時很低 MQ 功能較為完善,還是分布式的,擴充性好 功能較為簡單,主要支援簡單的 MQ 功能,在大資料領域的實時計算以及日志采集被大規模使用

消息隊列設計

  • 核心依賴kafka
  • 負載均衡依賴于partition
  • ⾼可用依賴于replication和重新選取leader機制
  • 可靠性傳輸:
    • 消費端:至少一次,需要業務端保證幂等性
    • kafka端: 可參考上文中持久性和副本相關配置

消息順序:

  1. 存入partition 的順序,由上遊服務控制。
  2. Partition 中的順序取出,由kafka保證。
  3. 收到消息的順序由ack機制保證。
  • 即MQ service發送消息之後,等待訂閱者發送确認,收到确認之後再對 kafka 進行commit,讀取下一條。

消息内容

  • event_name(事件名稱)
  • event_data(事件内容)
  • event_time(事件時間)

消息積壓思路:

  • 先修複 consumer 的問題,確定其恢複消費速度,然後将現有 consumer 都停掉。
  • 建立一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。
  • 然後寫一個臨時的分發資料的 consumer 程式,這個程式部署上去消費積壓的資料,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
  • 接着臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的資料。這種做法相當于是臨時将 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費資料。
  • 等快速消費完積壓資料之後,得恢複原先部署的架構,重新用原先的 consumer 機器來消費消息。

參考自: 

https://www.orchome.com/kafka/index