天天看點

2018年第52周-Kafka知識點Kafka資料分區和消費者的關系Kafka的資料offset讀取流程Kafka内部如何保證順序,結合外部組建如何保證消費者的順序持久化

預祝2019年元旦節快樂!

2018年最後一周,分享些Kafka的知識點。

Kafka資料分區和消費者的關系

分區(partition)

topic是邏輯概念,分區(partition)是實體概念,對于使用者來說是透明的。producer隻需要關心消息網哪個topic發送,而consumer之關系自己訂閱哪個topic,不需要關心每條消息存于整個叢集的哪個Broker。

為了性能考慮,如果topic的消息都放在一個Broker,這個Broker必然稱為瓶頸,而且無法做到水準擴充。是以topic内的資料分布到整個叢集就是個自然而然的設計了。分區的引入就是解決水準擴充問題的一個解決方案。

Kafka盡量将所有的分區均勻配置設定到整個叢集上。基本算法如下:

  1. 将所有存貨的N個Broker和待配置設定的分區排序。
  2. 将第i個分區配置設定到第(i mon n)個Broker上,這個分區的第一個副本(Replica)存在于這個配置設定的Broker上,并且會作為分區的優先副本。
  3. 将第i個分區的第j個副本配置設定到第(i+j)mod n 個Broker上。

但實際情況Kafka的算法是上述基礎上再加些,看Kafka的函數assignReplicasToBrokers。變了兩點:

  1. 限制了副本因子(replication factor)不得大于Broker的個數。因為當j大于n時,就會存在一個Broker有兩個副本,這沒意義且浪費。
  2. 起始位置不是第0個Broker,是第rand.nextInt(Brokers.size)個。那是因為(i mon n)造成的問題就是,永遠都會有0,未必有n。是以必須加上随機數,也就是第i個分區配置設定到第(i mon n)+rand.nextInt(Brokers.size)個Broker上。
  3. 如果考慮多機架問題,那麼Broker順序就未必是0,1,2,3,4,5。而是如果0,1,2是機架A,3,4,5是機架B。則Broker的順序為0,3,1,4,2,5。錯開,每個機架依次選一次。是以當副本因子為3時,保證每一個分區在各個機架都至少有一個副本。

分區中副本一般都是 Leader,其餘的都是 Follow 副本。生産者消費者都固定在 Leader進行生産和消費。

分區與生産者

 負載均衡(Load balancing)  

生産者直接發送資料到Broker,不需要任何的中間路由層,而接受的Broker是該分區的Leader。

為了幫助生産者實作這一點,所有Broker都可以回答關于哪些是可用伺服器的中繼資料的請求,以及在任何給定的時間内,某個主題的分區的Leader是否允許生産者适當地發送它的請求。

用戶端可以控制往哪個分區生産消息。這可以随機地進行,實作一種随機的負載平衡,或者可以通過一些語義分區函數來實作負載平衡。

Kafka提供了語義分區的接口,允許使用者指定一個分區的key,并使用這個key來做hash到一個分區(如果需要的話,也是可以複寫這分區功能的)。例如,我們選擇user的id作為可用,則是以該使用者的資訊都會發送到同樣的分區。

異步發送(Asynchronus send)

批處理是效率的主要驅動因素之一,為了能夠批處理,Kafka的生産者會嘗試在記憶體中積累資料,然後在一起在一個請求中以大批量的形式發送出去。批處理這個可以設定按固定的消息數量或按特定的延遲(64k或10ms)。這允許累積更多位元組的發送出去,這樣隻是在伺服器上做少量的大IO操作。這種緩沖是可配置的,這樣提供了一種機制來以額外的延遲來提高吞吐量。具體的配置)和生産者的api可以在這文檔中找到。

分區與消費者

消費者的工作方式是,向分區的Leader發送“fetch”請求。在每個請求中消費者指定日志的偏移量(position),然後接受回一大塊從偏移量開始的日志。是以,消費者對偏移量有重要的控制權,如果需要,可以重置偏移量來重新消費資料。

Push和pull

我們首先考慮的一個問題是,消費者應該是從Broker拉取消息,還是應該是Broker把消息推送給消費者。在這方面,Kafka遵循了一種更傳統的設計,大多數消息隊列系統也會用的,那就是資料是從生産者push到Broker,消費者是從Broker拉取資料。一些日志集中系統,如Scribe和Apache Flume,遵循一個非常不同的,基于推送的路徑,将資料被推到下遊。這兩種方法都由利弊,在基于推送的系統,由于是Broker得控制資料傳輸的速率,不同消費者可能要不同的速率。然而消費者一般的目的都是讓消費者自己能夠以最大的速度進行消費,但在基于push的系統,當消費速率低于生産效率時,消費者就不知道該怎麼辦好了(本質上就是一種拒絕服務攻擊(DOS))。一個基于pull的系統就擁有很好的熟悉,消費者可以簡單的調控速率。

基于pull的系統的另一個優點是,它可以對發送給消費者的資料進行聚合的批處理。基于推送的系統必須選擇立即發送請求或積累更多資料,然後在不知道下遊使用者是否能夠立即處理它的情況下發送它。

基于pull的系統的缺點是,如果Broker沒資料,則消費者可能會不停的輪訓。為了避免這一點,我們在pull請求上提供了參數,允許消費者在“長輪訓”中阻塞,直到資料達到(并且可以選擇等待,直到一定數量的自己可以,確定傳輸的大小)。

消費者的Position(Consumer Position)

令人驚訝的是,跟蹤消息是否使用了,是消息隊列系統的關鍵性能點之一。

很多消息隊列系統在Broker中儲存了關于什麼消息是被消費了的中繼資料。也就是說,當消息隊列給消費者時,Broker要麼立即記錄資訊到本地,要麼就是等待消費者的确認。這是一個相當直覺的選擇,而且對于一台機器伺服器來說,很清楚地知道這些消息的狀态。由于許多消息隊列系統中用于存儲的資料結構都很糟糕,是以這(記錄消息狀态)也是一個實用的選擇——因為Broker知道什麼是已經被消費的,是以可以立即删除它,保持資料的大小。

讓Broker和消費者就已經消費的東西達成一緻,這可不是小問題。如果一條消息發送到網絡上,Broker就把它置為已消費,但消費者可能處理這條消息失敗了(或許是消費者挂了,也或許是請求逾時等),這條消息就會丢失了。為了解決這個問題,很多消息隊列系統增加了确認機制。當消息被發送時,是被标志為已發送,而不是已消費;這是Broker等待消費者發來特定的确認資訊,則将消息置為已消費。這個政策雖然解決了消息丢失的問題,但卻帶來了新的問題。第一,如果消費者在發送确認資訊之前,在處理完消息之後,消費者挂了,則會導緻此消息會被處理兩次。第二個問題是關于性能,Broker必須儲存每個消息的不同狀态(首先先鎖住消息以緻于不會讓它發送第二次,其次标志位已消費進而可以删除它)。還有些棘手的問題要處理。如消息被發送出去,但其确認資訊一直沒傳回。

Kafka處理則不一樣。我們的主題被分為一個有序分區的集合,且每個分區在任何給定的時間内隻會被訂閱它的消費者組中的一個消費者給使用。這意味着每個分區中的消費者的position僅僅是一個整數,這是下一次消費時,消息的偏移量。這使狀态(記錄是否被消費)非常小,每個分區隻有一個數字。這個狀态可以被定期檢查。這樣确認一條消息是否被消費的成本就很低。

這樣還附加了一個好處。消費者可以重置其最先的position進而重新消費資料。這雖然違反了隊列的公共契約,但它卻變成關鍵功能給許多消費者。例如,如果消費者代碼有一個bug,并且在一些消息被消費後才被發現,那麼當bug被修複後,消費者就可以重新使用這些消息。

消費組

每群消費者都會被标志有消費組名。有消費組這個概念,Kafka就可以實作類似與工作隊列(Worke Queues)模式和釋出/訂閱(Publish/Subscribe)。

如果消費者都在同一個消費組,則消息則會負載均衡的配置設定每個消費者,一條消息不會配置設定個兩個及以上的消費者。

如果消費者不在同一個組,則消息會被廣播到每一個消費組中。

Kafka的資料offset讀取流程

每個消息在分區中都是被配置設定一個有序的ID數字,而這數字,我們稱之為偏移量(offset)。在一個分區上,offset唯一辨別一個消息。

由每個消費者維護offset。

在Kafka檔案存儲中,同一個topic下有多個不同分區,每個分區為一個目錄,分區命名規則為topic名稱+有序序号,第一個分區序号從0開始,序号最大值為分區數量減1。

partition實體上由多個大小相等的segment組成。segment由2大部分組成,分别為index file和data file,此2個檔案一一對應,成對出現,字尾".index"和“.log”分别表示為segment索引檔案、資料檔案.

segment檔案命名規則:partion全局的第一個segment從0開始,後續每個segment檔案名為上一個segment檔案最後一條消息的offset值。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。

00000000000000000.index
00000000000000000.log
00000000000368769.index
00000000000368769.log
00000000000737337.index
00000000000737337.log
00000000001105814.index
00000000001105814.log
           

index file的結構:

1,0
3,497
6,1407
8,1686
....
N,position
           

index file結構是兩個數字兩個數字一組,N,position。N用于查找相對于目前檔案名的offset值的N個消息。如00000000000368769.index的3,497,則為368769+3=第368772個消息。而position 497是指data file的偏移量497。

data file由許多message組成,message實體結構如下:

8 byte offset
4 byte message size
4 byte CRC32
1 byte "magic"
1 byte "attributes"
4 byte key length
K byte key
4 byte payload length
value bytes payload
           

這樣的結構,配合index file,很快就可以知道某條消息的大小。

 在partition中如何通過offset查找message

例如讀取offset=368776的message,需要通過下面2個步驟查找。

第一步查找segment file

上述為例,其中00000000000000000000.index表示最開始的檔案,起始偏移量(offset)為0.第二個檔案00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個檔案00000000000000737337.index的起始偏移量為737338=737337 + 1,其他後續檔案依次類推,以起始偏移量命名并排序這些檔案,隻要根據offset 二分查找檔案清單,就可以快速定位到具體檔案。

當offset=368776時定位到00000000000000368769.index|log

第二步通過segment file查找message

通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的中繼資料實體位置和00000000000000368769.log的實體偏移位址,然後再通過00000000000000368769.log順序查找直到offset=368776為止。

從上述圖3可知這樣做的優點,segment index file采取稀疏索引存儲方式,它減少索引檔案大小,通過mmap可以直接記憶體操作,稀疏索引為資料檔案的每個對應message設定一個中繼資料指針,它比稠密索引節省了更多的存儲空間,但查找起來需要消耗更多的時間。

Kafka高效檔案存儲設計特點

  • Kafka把topic中一個parition大檔案分成多個小檔案段,通過多個小檔案段,就容易定期清除或删除已經消費完檔案,減少磁盤占用。
  • 通過索引資訊可以快速定位message和确定response的最大大小。
  • 通過index中繼資料全部映射到memory,可以避免segment file的IO磁盤操作。
  • 通過索引檔案稀疏存儲,可以大幅降低index檔案中繼資料占用空間大小。

注: 稀疏索引類似于帶一級索引的跳表,但是一級索引是數組可以使用二分法查找。

注:mmap()函數是Linux的檔案空間映射函數,用來将檔案或裝置空間映射到記憶體中,可以通過映射後的記憶體空間存取來獲得與存取檔案一緻的控制方式,不必再使用read(),write()函數。

mmap和正常檔案操作的差別

回顧一下正常檔案系統操作(調用read/fread等類函數)中,函數的調用過程:

  1. 程序發起讀檔案請求。
  2. 核心通過查找程序檔案符表,定位到核心已打開檔案集上的檔案資訊,進而找到此檔案的inode。
  3. inode在address_space上查找要請求的檔案頁是否已經緩存在頁緩存中。如果存在,則直接傳回這片檔案頁的内容。
  4. 如果不存在,則通過inode定位到檔案磁盤位址,将資料從磁盤複制到頁緩存。之後再次發起讀頁面過程,進而将頁緩存中的資料發給使用者程序。

總結來說,正常檔案操作為了提高讀寫效率和保護磁盤,使用了頁緩存機制。這樣造成讀檔案時需要先将檔案頁從磁盤拷貝到頁緩存中,由于頁緩存處在核心空間,不能被使用者程序直接尋址,是以還需要将頁緩存中資料頁再次拷貝到記憶體對應的使用者空間中。這樣,通過了兩次資料拷貝過程,才能完成程序對檔案内容的擷取任務。寫操作也是一樣,待寫入的buffer在核心空間不能直接通路,必須要先拷貝至核心空間對應的主存,再寫回磁盤中(延遲寫回),也是需要兩次資料拷貝。

而使用mmap操作檔案中,建立新的虛拟記憶體區域和建立檔案磁盤位址和虛拟記憶體區域映射這兩步,沒有任何檔案拷貝操作。而之後通路資料時發現記憶體中并無資料而發起的缺頁異常過程,可以通過已經建立好的映射關系,隻使用一次資料拷貝,就從磁盤中将資料傳入記憶體的使用者空間中,供程序使用。

總而言之,正常檔案操作需要從磁盤到頁緩存再到使用者主存的兩次資料拷貝。而mmap操控檔案,隻需要從磁盤到使用者主存的一次資料拷貝過程。說白了,mmap的關鍵點是實作了使用者空間和核心空間的資料直接互動而省去了空間不同資料不通的繁瑣過程。是以mmap效率更高。

函數原型

void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);
           

也就是可以将大資料的檔案,局部映射到記憶體中,在記憶體中進行此部分檔案的操作。對此記憶體操作,都不涉及到核心空間到使用者空間之間互動。直接操作記憶體,記憶體直接寫入(讀取)檔案。就隻有一次IO。 如果是普通檔案操作,則需要檔案複制到核心,再由核心複制到使用者空間,使用者空間才能操作。進而達到零拷貝。

換句話說,但凡是需要用磁盤空間代替記憶體的時候,mmap都可以發揮其功效。

Kafka内部如何保證順序,結合外部組建如何保證消費者的順序

Kafka中每個分區都是有序,由于Kafka的消息是不可變的,是以都是追加的形式有序的往上加消息。這個結構體叫 結構化送出日志(a structured commit log)。

首先就要考慮是否真的需要所有消息在隊列都得有序。一般情況,不止一般,而是很大一部分,是可以無序的。就跟分布式一樣。有很多業務,看起來是同步的,靜下來慢慢思考,就會發現很多東西是可以異步執行的。

如果實在有這樣保證順序的需要,保證生産者需将有序地送出給一個分區,首先是生産者不能送出錯順序。其次,消費者組就不能擁有兩個或以上消費者執行個體了。連兩個或以上的消費者組也不能有。

持久化

Kafka會根據保留時間這參數,持久化所有已經收到的消息。雖然可以設定保留時間這參數,但是Kafka優秀的性能,添加删除都是常量級的性能,是以理論上,資料儲存很長時間也不成問題。

參考:

http://kafka.apache.org/

https://www.zhihu.com/questio...

https://blog.csdn.net/yangyut...

http://www.cnblogs.com/huxiao...

https://www.cnblogs.com/ITtan...

稀疏索引:https://blog.csdn.net/qq_2223...

跳表:https://www.jianshu.com/p/dc2...

繼續閱讀