天天看點

kafka

  雖然一直在使用kafka,但是還沒有系統的整理過kafka的原理,學習每個架構都要掌握其應用、原理和設計理念,這樣才能舉一反三,學為所用。今天就整理一些kafka相關的知識,一是為了加強記憶,二是通過整理再次回顧一下作者的設計思想。

kafka的定位:消息中間件、分布式實時流處理平台:(1)結合hadoop和hbase離線資料庫做資料內建(2)實時流計算。

這裡先說下kafka幾個重要參數:

              (1)heartbeat.interval.ms——consumer每隔多長時間向coordinator發送心跳

              (2)session.timeout.ms——consumer最大檢測失效時間,如果在配置時間内coordinator沒有收到該consumer的心跳,

       則将該consumer從group中移除,進而觸發消費組對消費分區的重新配置設定(rebalance)

              (3)max.poll.records——一次poll的最大消息條數

              (4)max.poll.interval.ms——兩次poll之間的最大間隔時間,如果兩次poll超過配置時間,coordinator會将将該consumer從group中移除,

       進而觸發消費組對消費分區的重新配置設定(rebalance)

1、kafka整體設計

                            

kafka

   借用官網一張圖來說明各個角色的定義:

  producer:producer即生産者,消息的産生者,是消息的入口。

  broker:broker是kafka執行個體,每個伺服器上有一個或多個kafka的執行個體。每個kafka叢集内的broker都有一個唯一的編号。

  topic:消息的主題,可以了解為消息的分類,kafka的資料就儲存在topic。在每個broker上都可以建立多個topic。

  partition:topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。同一個topic在不同的分區的資料是不重複的,partition的表現形式就是一個一個的檔案夾。檔案夾中包括消息檔案,還有兩個索引檔案:一個以消息在磁盤的偏移量(offset)為索引,一個以消息發送時間為索引。

  segment:也叫段,是對partition的進一步細化,根據檔案大小将分區進行拆分,一個為了進一步提高對同一個分區的讀寫能力,二是為了友善根據索引檔案進行查找。

                                                         

kafka

   replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(leader)故障的時候會選擇一個備胎(follower)上位,成為leader。在kafka中預設副本的最大數量是10個,且副本的數量不能大于broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也隻可能存放一個副本(包括自己)。隻有leader分區可以支援讀寫,也就是生成者隻能向leader分區寫入,消費者隻能從leader分區讀取,其他副本隻是備份(高可用),這樣設計避免了複雜的主從同步導緻的讀寫不一緻問題。

  message:消息存儲在log檔案中,包含消息體、消息大小、offset、壓縮類型等。

    offset:一個占8byte的有序id号,唯一辨別消息的位置。

    消息大小:消息大小占4byte,辨別消息的大小。

    消息體:被壓縮過的具體消息内容。

  consumer:消費者,即消息的消費方,是消息的出口。

  consumer group:可以将多個消費組組成一個消費者組,同一個分區的資料隻能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的資料,這也是為了提高kafka的吞吐量。(舉個例子:一個消費者組中3個消費者,一個topic中3個分區,則3個消費者各自消費一個分區,如果該組中4個消費者,則有一個消費者不會被配置設定任何分區,如果該組中2個消費者,則有一個消費者會消費兩個分區,這麼設計也是為了避免争搶産生的并發問題)。消費者的設計主要有兩個作用:一是一個分區隻能被同一個組中的某一個消費者消費,避免競争;二是以消費者組次元來記錄分區被消費的位置,避免重複消費。

  zookeeper:kafka叢集依賴zookeeper來儲存叢集的的元資訊,來保證系統的可用性。

2、生産者

  kafka将發送的消息先存儲在緩沖區(其實就是一個concurrenthashmap)中,然後批量發送,由兩個參數控制:(1)消息達到n條發送(2)如果沒達到n條但是達到指定時間發送。

1、發送流程

      

kafka

(1)攔截器,實作producerinterceptor,可以針對消息發送的生命周期實作特定邏輯

(2)累加器就是一個concurrenthashmap(也有叫頁緩存),用來在記憶體中緩存消息。

2、如何确認消息發送成功

       leader分區向producer傳回ack即确認消息發送成功,可以設定ack屬性。

(1)pros.put(“ack”,0),ack=0,不需傳回ack

(2)pros.put(“ack”,1),ack=1,隻要leader落盤就傳回ack

(3)pros.put(“ack”,-1),ack=-1或all,leader和所有有效follower全部落盤才傳回ack,設定ack時要注意重試次數會不會導緻消息重複問題,pros.put(“retries”,n)

       什麼是有效副本呢?

(1)isr:in-sync replica set(有效副本集合)

(2)replica.lag.time.max.ms(超過該時間沒有向leader同步資料,則會被移除isr,等恢複了再加入到isr)

3、消息存儲

    

kafka

  一個topic中各個leader分區均勻地分布在叢集中的各個broker上,這樣做的目的:

  (1)均衡broker的請求壓力,因為leader分區承擔讀寫任務,follower副本隻同步leader的資料。

       (2)防止leader分區過度集中在某個broker上,一旦該broker出現故障,會出現多個leader分區選舉行為。

       具體算法:所有broker組成brokerlist(同一個叢集中的每個broker有一個唯一編号),第一個leader分區從brokerlist随機選一個broker,第二個leader分區選擇順序下一個broker,以此類推,follower副本是随機分布的。例如:3分區,2副本,3broker

kafka

   part0-leader随機選擇了broker2,則part1-leader會放在broker3上,part2-leader會輪回放在broker1上,其他副本會随機配置設定。

 分區的拆分:

  為防止一個log檔案過大,導緻索引查找和讀取較慢,分區會拆分成segment(段),拆分規則:

    (1)按時間拆分(預設1周):log.roll.hour/ms

    (2)按大小拆分(預設1g):log.segment.bytes

    (3)按索引大小拆分:log.index.size.max.bytes

      

kafka

  偏移量就是消息在log檔案中的具體位置,偏移量索引采用的稀疏索引,如:

       稀疏度設定:log.index.interval.bytes(預設4kb),稀疏度越小索引越密集,檢索越快,但是會消耗更多存儲空間。

       時間索引中的消息時間可以使用producer發送的時間或是消息的落盤時間。

消息清理政策:

       開關:log.cleaner.enable=true

       政策:log.cleanup.policy=delete(預設)/compact——壓縮

    compact的做法:産出重複的key,隻保留key對應的最新的值。

       周期:log.retention.check.interval.ms=300000(預設5分鐘),定時任務每5分鐘檢查一次,清理過期消息。

  清理條件:

         (1)基于時間:預設168小時(一周),即一周前的日志為過期日志

         (2)基于檔案大小:預設不限制大小

 4、高可用之leader副本選舉

  早期kafka使用zk進行leader選舉(使用zk的watch機制和臨時有序節點實作)。

       後期放棄的原因:當分區和副本較多時,會在zk上建立大量的節點,一旦某個broker挂掉,會觸發大量的watch事件,産生驚群效應。

1、broker controller(控制器)

       分區副本的選舉由broker controller(也是一個broker)處理,那麼叢集中哪個broker可以成為controller呢?這就涉及到controller的選舉:

       叢集中的broker在啟動時會到zk上建立臨時節點/controller,建立成功的則會成為controller(一般第一個啟動的broker會成為controller)。其他的broker建立節點失敗,會注冊watch事件,一旦目前controller出現故障就會觸發所有broker注冊的watch事件,其他節點又會争搶着建立/controller節點,建立成功的則成為新的controller。

       叢集中每選舉一次控制器,就會通過zookeeper建立一個controller epoch,每一個選舉都會建立一個更大,包含最新資訊的epoch,如果有broker收到比這個epoch舊的資料,就會忽略它們,kafka也通過這個epoch來防止叢集産生“腦裂”。

2、分區副本的選舉

       isr:同步正常的副本集合

       osr:同步異常的副本集合

kafka

   隻有isr集合中副本可以參與選舉,如果isr為空,此時如果“允許非正常副本參與選舉”開關打開(預設false),也會從osr中進行選擇,但可能會造成資料丢失。

選舉算法的選擇:

  paxos——>由paxos演變而來的raft,zab(zk使用),這類算法概括一下:先到先得,少數服從多數。

       kafka沒有使用類似raft這種算法,原因:這種算法依賴于節點間的通信,一旦節點間不能正常通信,可能發生腦裂,出現多個leader副本。

       了解kafka選舉算法要先了解幾個概念:

         leo(log end offset):下一條等待寫入的消息的偏移量offset(最新的offset+1)

         hw(high watermark):isr中最小的leo

         hw的作用:消費者消費offset小于hw的消息(確定主從消息一緻,但是可能會丢消息)。

kafka

    使用hw限制消費區間的原因:為了防止leader副本挂掉,消息被超前消費。如果不限制,如果leader故障,消費者已經消費了offset=7的消息,但是其他的replica還沒來得及同步offset=7的消息,當replica0或replica1成為新的leader就會出現消息被超前消費的問題,這也是kafka保證主從同步一緻的一種手段。

主從如何保持同步:

       (1)follower節點向leader節點發送一個fetch請求,leader向follower發送資料

       (2)follower接收到資料響應後,依次寫入消息并更新自己的leo

       (3)leader更新hw(isr中最小的leo)

故障處理:

  (1)follower故障:

        follower故障恢複後,開始從leader同步消息,此時它首先會截掉本地記錄的hw之後的消息,避免消息不一緻。比如replica0故障恢複後,會截掉offset=5 和6的消息,再從5開始從leader同步,等追上之後又會加入到isr集合中。

  (2)leader故障:

           選取isr中順位第一的follower成為新leader(類似于微軟的pacifica算法)

5、消費者

  kafka消息拉取:隻支援pull方式,不支援push方式(因為kafka的定位就是處理大量資料)。kafka提供了一個簡潔的poll方法來拉取消息,但是其實作了協作、分區重平衡、心跳、資料拉取等功能。

消費者如何記錄目前消費位置:

  消息在.log檔案中都是順序存儲的,每個消息都有一個偏移量(offset),每個消費者組消費到目前分區的哪個位置都會記錄下來,避免消費者重新開機消息又從頭開始消費。消費者組在各個分區的消費位置會被存儲在一個特定的topic中(_consumer_offsets_*),該topic預設50個分區,消費者組與分區位置具體放在哪個這50個分區中的哪個分區呢?是将topic名稱進行hash後對50取模,是以如果将消費者組換個名稱,則整個組将重新消費。

  消費組可以選擇從目前位置或從頭開始消費消息

  位移送出commit:更新消費的offset

       由消費端enable.auto,commit屬性設定:

    true:調用poll方法後每個5秒(有auto.commit.interval.ms指定)送出一次位移

    false:手動送出,consumer.commitasync()異步送出,consumer.commitsync()同步送出。

      同步送出:發起送出時調用方會阻塞,如果伺服器傳回送出失敗會重試,直到成功或抛異常(适合消息量不太大,且對重複消費限制較嚴格的場景)

      異步送出:調用方不會阻塞,送出失敗不會重試,但可以通過callback判斷本次送出成功/異常(适合消息量大,允許重複消費的場景)

  指定位移消費seek,可以追蹤之前的消費或回溯消費

消費的分區:

  1、分區政策

  (1)rangeassignor配置設定政策(預設配置設定政策)

    rangeassignor政策的原理是按照消費者總數和分區總數進行整除運算來獲得一個跨度,然後将分區按照跨度進行平均配置設定,以保證分區盡可能均勻地配置設定給所有的消費者。對于每一個topic,rangeassignor政策會将消費組内所有訂閱這個topic的消費者按照名稱的字典序排序,然後為每個消費者劃分固定的分區範圍,如果不夠平均配置設定,那麼字典序靠前的消費者會被多配置設定一個分區。

    假設有10個分區,3個消費者,排完序的分區将會是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序将 會是c1-0, c2-0, c3-0。然後将partitions的個數除于消費者線程的總數來決定每個消費者線程消費幾個 分區。如果除不盡,那麼前面幾個消費者線程将會多消費一個分區。在我們的例子裡面,我們有10個分 區,3個消費者線程,10 / 3 = 3,而且除不盡,那麼消費者線程 c1-0 将會多消費一個分區的結果看起來是這樣的:

          c1-0 将消費 0, 1, 2, 3 分區

          c2-0 将消費 4, 5, 6 分區

          c3-0 将消費 7, 8, 9 分區  

     這個分區也有一個缺點,如果topic特别多,那麼第一個消費者可能會多消費很多個分區,壓力會增加。

  (2)roundrobinassignor(輪詢分區)

    輪詢分區政策是把所有partition和所有consumer線程都列出來,然後按照hashcode進行排序。最後通 過輪詢算法配置設定partition給消費線程。如果所有consumer執行個體的訂閱是相同的,那麼partition會均勻 分布。

    如果同一個消費組内所有的消費者的訂閱資訊都是相同的,那麼roundrobinassignor政策的分區配置設定會是均勻的。舉例,假設消費組中有2個消費者c0和c1,都訂閱了主題t0和t1,并且每個主題都有3個分區,那麼所訂閱的所有分區可以辨別為:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最終的配置設定結果為:

          ​<code>​消費者c0:t0p0、t0p2、t1p1​</code>​

​<code>​          消費者c1:t0p1、t1p0、t1p2​</code>​

    如果同一個消費組内的消費者所訂閱的資訊是不相同的,那麼在執行分區配置設定的時候就不是完全的輪詢配置設定,有可能會導緻分區配置設定的不均勻。如果某個消費者沒有訂閱消費組内的某個topic,那麼在配置設定分區的時候此消費者将配置設定不到這個topic的任何分區。

    比如消費組内有3個消費者c0、c1和c2,它們共訂閱了3個主題:t0、t1、t2,這3個主題分别有1、2、3個分區,即整個消費組訂閱了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2這6個分區。具體而言,消費者c0訂閱的是主題t0,消費者c1訂閱的是主題t0和t1,消費者c2訂閱的是主題t0、t1和t2,那麼最終的配置設定結果為:

          ​<code>​消費者c0:t0p0​</code>​

​<code>​          消費者c1:t1p0​</code>​

​<code>​          消費者c2:t1p1、t2p0、t2p1、t2p2​</code>​

    是以當消費組内的消費者訂閱不同主題時,輪詢配置設定不是很完美。

  (3)strickyassignor 配置設定政策(粘滞政策)

    它主要有兩個目的:

      分區的配置設定盡可能的均勻

      分區的配置設定盡可能和上次配置設定保持相同

      當兩者發生沖突時, 第一個目标優先于第二個目标。它的實作比前兩種政策都更加複雜,但是配置設定結果更加優異。

  2、分區再平衡(rebalance)

  (1)由誰來執行 rebalance 操作?由誰來管理消費端 consumer 的 group ?

    kafka 提供了一個 角色:coordinator。coordinator 來完成對消費端 group 的管理。當 consumer group 的第一個 consumer啟動的時候,它會去和 kafka server 去确定到底誰是它們組的 coordinator。之後該 group 組内的所有成員都會和該 coordinator 進行通信。

  (2)consumer group 如何确定自己的 coordinator是誰?

    當消費者向 kafka 叢集中的任意一個 broker 發送一個 groupcoordinatorrequest 請求,kafka server 服務端會傳回一個目前負載最小的 broker 節點的 id,并将該 id 所對應的的 broker 節點設定為目前 consumer group 的 coordinator。

  (3)rebalance過程

    整個rebalance的過程分為兩個步驟:①joingroup過程 和 ②synchronizing group state 階段。

  第一步:确定 coordinator

  第二步:joingroup過程,表示消費者加入到consumer group中的過程。

    當确定了 coordinator 之後,所有的 consumer 都會向 coordinator 發送一個 joingroup 請求(隻要啟動,所有消費者都會發送該請求)。此時 coordinator 會從 consumer group 中選取一個 consumer 擔任 leader 角色,并把組成員資訊和訂閱的消息發送給所有的消費組

kafka

  第三步:synchronizing group state 階段 

    完成分區配置設定之後,就進入了 synchronizing group state階段。該階段主要完成 leader 将消費者對應的 partition 配置設定方案同步給consumer group 中的所有 consumer。每一個消費者,都會向 coordinator 發送一個 syncgrouprequest 請求。請求内容:包含group_id、member_id、generation_id 。在 leader 層面,還會有一個 member_assignment 内容。

    每個消費者,還會向 coordinator 發送 syncgroup 請求,不過隻有 leader 節點會發送配置設定方案,其他消費者也會發送配置設定方案,不過發送内容都是空,隻是打打醬油而已。當 leader 把方案發給 coordinator 以後,coordinator 會把結果設定到 syncgroupresponse 中。這樣所有成員都知道自己應該消費哪個分區。

kafka

    kafka 每個用戶端,在收到分發政策 syncgroupresponse 後,會根據傳回結果去執行。consumer group 的分區配置設定方案是在用戶端執行的。kafka 将這個權利下放給用戶端主要是因為這樣做可以有更好的靈活性。

6、kafka的有點

(1)高吞吐、低延遲

       從底層資料處理來看kafka能夠快速處理海量資料的原因:

       首先了解一個硬體相關的概念,dma,它是協處理器,主要是代理cpu處理資料傳輸,來減少cpu在資料i/o上的等待時間。

       來看一個kafka consumer消費資料的過程: 

          file.read(filedesc, buf, len);

          socket.send(socket, buf, len);

  在這個過程中,資料一共發生了四次傳輸的過程。其中兩次是 dma 的傳輸,另外兩次,則是通過 cpu 控制的傳輸。

    第一次傳輸,是從硬碟上,讀到作業系統核心的緩沖區裡。這個傳輸是通過 dma 搬運的。

    第二次傳輸,需要從核心緩沖區裡面的資料,複制到我們應用配置設定的記憶體裡面。這個傳輸是通過 cpu 搬運的。

    第三次傳輸,要從我們應用的記憶體裡面,再寫到作業系統的 socket 的緩沖區裡面去。這個傳輸,還是由 cpu 搬運的。

    最後一次傳輸,需要再從 socket 的緩沖區裡面,寫到網卡的緩沖區裡面去。這個傳輸又是通過 dma 搬運的。

       kafka通過調用java的nio庫,将4次傳輸減少到2次,具體過程如下:

    第一次,是通過 dma,從硬碟直接讀到作業系統核心的讀緩沖區裡面。

    第二次,則是根據 socket 的描述符資訊,直接從讀緩沖區裡面,寫入到網卡的緩沖區裡面。

   傳統io:

        

kafka

  kafkaio:

kafka

(2)、高伸縮性

       分區partition機制,通過增加分區可以進行橫向擴充。

(3)、持久性、可靠性

       副本機制,每個分區都會有副本,leader分區挂了可以使用副本(follower分區)保證資料不丢失。

(4)、容錯性

       副本的選舉功能,leader分區挂了,follower分區會重新選舉出leader。

(5)、高并發

  (1)頁緩存,消息先儲存在kafka系統記憶體中,滿足條件再一次性發送到broker

  (2)磁盤順序寫入

  (3)零拷貝(也就是第一點說的資料傳輸)