天天看點

Kafka面試18連環炮:30+圖帶您看透Kafka

作者:JAVA後端架構
Kafka面試18連環炮:30+圖帶您看透Kafka

有網友提到RabbitMQ受限于開發語言,比較難以一探究竟,而RocketMQ對于Java開發人員來說更加觸手可得。在發表了RocketMQ的文章之後,有幾個網友回報可否出一篇Kafka的文章,是以我就寫了這篇文章。那麼話不多說,我們開始吧。

本文将帶您了解以下問題:

  • Kafka是如何存儲和檢索消息的?(log檔案,index索引檔案,timeindex索引檔案)
  • Kafka是如何基于offset查找消息的?
  • Kafka有哪些日志清理政策?什麼場景下會用到?
  • ISR是幹嘛的?
  • Kafka總控制器是幹嘛的?如何選舉出來的?
  • Topic的最優Leader副本是如何選舉出來的?
  • 什麼時候會觸發消費的Rebalace?Kafka中有哪些Rebalance政策?
  • Rebalance是如何工作的?
  • Kafka是如何保證資料可靠性的?
  • Kafka是如何保證資料一緻性的?
  • 消費者是如何送出offset的?
  • 有哪些消費曆史消息的方法?
  • Kafka為啥性能這麼高?
  • Kafka如何避免重複消費?
  • Kafka如何處理消息堆積?
  • 如何保證消息順序性?
  • Kafka如何實作消息傳遞保障?
  • Kafka有哪些關鍵的生産者和消費者參數?

本文主要内容:

Kafka面試18連環炮:30+圖帶您看透Kafka

Kafka是一個分布式實時事件流平台,主要提供了關鍵功能:

  • 釋出和訂閱事件流,事件記錄被存儲起來,是以消費應用程式可以提取他們需要的資訊,并跟蹤曆史儲存的所有消息;
  • 支援高吞吐量;
  • 可以彈性和透明的擴容,無需停機;
  • 将事件流存儲在磁盤上,并在分布式叢集中實作多副本存儲,以實作容錯,支援配置事件記錄資料存儲的時長;
  • 基于Zookeeper的同步控制器,以保持主題、分區和中繼資料的高可用(不過在2.8版本之後,可以使用基于 Kafka Raft 的 Quorm 控制器取代基于Zookeeper的控制器)。

如果對Kafka不是很了解,看到上面功能清單,大家可能會比較茫然,不過沒關系,接下來的文章保證給大家徹底講明白,一看就懂,看不懂就當我沒說。

下面看看Kafka的整體架構以及關鍵元件。

1. Kafka整體架構

Kafka整體架構圖如下:

Kafka面試18連環炮:30+圖帶您看透Kafka

Kafka基本概念:

  • Broker:Kafka以Broker叢集的方式運作,一個Kafka節點就是一個Broker。理論上可以跨越多個資料中心。Broker負責資料複制,管理主題、分區、消費偏移量等。如果要跨越多個資料中心,資料中心之間的網絡延遲需要非常低,因為Kafka Broker之間以及Broker和Zookeeper伺服器之間有大量的通信。
  • 在上圖中,Kafka叢集中包含3個Broker。
  • Topic:即主題,與RocketMQ的Topic類似,使用Topic對消息進行分類,Kafka接收到的每條消息都會放入到一個Topic中。
  • Topic代表釋出和消費記錄的端點。生産者向主題釋出消息,消費者訂閱主題進行消費消息;
  • 每條記錄有一個鍵,一個值,一個時間戳和一些中繼資料組成;
  • 在未指定分區的情況下釋出消息時,将使用鍵的散列選擇分區。
  • Producer:消息生産者,負責向Broker發送消息;
  • Consumer:消息消費者,從負責Broker讀取并消費消息;
  • ConsumerGroup:消費分組,對于同一個主題,可以被多個消費分組分别消費,每個消費分組有自己的消費偏移量,互不影響;
  • Partition:分區,對Topic的資料進行分布式存儲的最小機關。

再次說明下關鍵點:Kafka每個分區的消息存在在CommitLog檔案中,每個Consumer各自維護各自對CommitLog的消費進度(offset),可以從頭到尾消費消息,也可以指定offset來重複消費消息,或者跳過某些消息。

接下來的文章會詳細講解這些特性。

2. Kafka存儲架構

在介紹RocketMQ的時候,高并發異步解耦利器:RocketMQ究竟強在哪裡?這篇文章中,我們介紹了RocketMQ的存儲架構,由于RocketMQ是基于Kafka改造而來的,是以Rocket與Kafa的存儲架構很相似。這裡對比下:

  • RocketMQ是把Topic分片存儲到各個Broker節點中,然後在把Broker節點中的Topic繼續分片為若幹等分的ConsumeQueue,進而提高消息的吞吐量。ConsumeQueue是作為負載均衡資源配置設定的基本單元;
  • 類似的,Kafka的Topic以Partition為機關,分片存儲到各個Broker節點中,一個Broker節點可以存儲多個Partition,Partition是作為Kafka負載均衡資源配置設定的基本單元。

還沒有深入了解RocketMQ的朋友,可以看看高并發異步解耦利器:RocketMQ究竟強在哪裡?更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

2.1 Kafka分區檔案存儲方式

Kafka的Partition類似于RocketMQ的ConsumeQueue。随便檢視某一個Topic Partition下的檔案:

Kafka面試18連環炮:30+圖帶您看透Kafka

我們重點看看index, log, timeindex這三個檔案。

log檔案有點像RocketMQ的commitlog檔案,但是Kafka是以分區為次元進行存儲的,RocketMQ存儲的則是整個Broker的所有消息。

每個Partition分區下面是由多個Segment(段)組成的,Segment是邏輯概念,實際上會對應到上面的三個檔案:

Kafka面試18連環炮:30+圖帶您看透Kafka
  • log:資料檔案,存儲實際的消息資料;
  • index:索引檔案,存儲消息資料的索引;
  • timeindex:索引檔案,提供時間次元的檢索。

Segment檔案的命名規則:Partition的第一個Segment檔案從0開始,後續每生成一個新的Segment檔案的時候,檔案名以目前Partition的最大offset為基準,檔案名長度為64位long類型。

Segment生成相關配置:

  • log.segment.bytes: 每個segment的大小,達到這個大小後會建立一個新的segment,預設是1G;
  • log.segment.ms: 配置每隔多少毫秒産生一個新的segment,預設是7天。

2.2 log資料檔案

log檔案存儲實際的消息資料,可以通過參數log.segment.bytes指定一個log檔案大小,log檔案的消息是順序寫的。

2.3 index索引檔案

index:是一個稀疏索引,預設的,Kafka每接收4k(可通過log.index.interval.bytes參數配置)就記錄目前一條消息的offset和消息在log日志中的實際位置到index索引檔案。也就是說,Kafka是采用稀疏索引來實作資訊檢索的,如下圖,Kafka會把offset為3,7,10的消息的offset以及在log檔案中的實際位置存入index檔案中:

Kafka面試18連環炮:30+圖帶您看透Kafka

我們可以通過以下指令檢視index檔案的内容:

Kafka面試18連環炮:30+圖帶您看透Kafka
log.index.interval.bytes:索引條目區間密度,預設4k,每接收4k就記錄目前一條消息的offset。增加索引條目的區間密度會影響索引檔案的區間密度和查詢效率。

2.3.1 KAFKA是如何基于OFFSET查找消息的 ?

當我們要根據offset在log檔案中查找消息的時候,首先會根據offset定位到具體的Segment,然後去查找Segment中的index檔案,通過二分查找快速定位到offset的存儲範圍在log檔案中的起始位址;當拿到起始位址之後,從log檔案的起始位址開始順序查找,直到找到比對的offset的消息:

Kafka面試18連環炮:30+圖帶您看透Kafka

index相關配置:

  • log.index.interval.bytes:索引間隔,即每接收多少資料會記錄一個索引,預設為4k;

2.4 timeindex索引檔案

存儲消息時,除了會維護index索引檔案,也會維護timeindex索引檔案,timeindex同樣是稀疏索引,timeindex索引檔案存儲消息發送的時間點以及offset。

2.4.1 KAFKA是如何基于時間查找消息的?

要通過時間戳a查找消息:

  • 首先會根據時間戳a基于時間戳索引定位到具體的Segment,定位方法:
  • 将時間戳a與每個Segment的timeindex中最大時間戳對比,找到最大時間戳不小于時間戳a的記錄,如果找到了,則繼續按以下步驟在這個Segment中查找消息;
  • 使用二分法查找timeindex檔案,找到不大于時間戳a的最大索引項,進而擷取到該索引項存儲的offset;
  • 使用offset二分查找index檔案,找到不大于offset的最大索引項的log檔案實體位置p;
  • 在log檔案中定位到實體位置p,開始查找不小于時間戳a的消息。

如下圖,要基于時間戳1636773676499查找消息,先定位到具體的Segment,然後按以下步驟查找:

Kafka面試18連環炮:30+圖帶您看透Kafka
  • 在timeindex中查找時間戳不大于1636773676499的最大記錄,最終找到1636773676498,對應的offset為7;
  • 在index中查找offset不大于7的最大索引項的log檔案實體位置,這裡即為offset=7的索引的log檔案實體位址p;
  • 到log檔案中定位到實體位址p,開始查找時間戳不小于1636773676499的記錄,找到第一條,就是我們要找的消息。

2.5 Kafka的日志清理政策是怎樣的?

Kafka的日志清理政策cleanup.policy有兩種:Delete政策和Compact政策。

2.5.1 DELETE政策

預設的的政策,當Segment的不活躍時間大于設定的時間的時候,就删除對應的Segment。具體配置參數:

  • retention.bytes:總的segment的大小限制,超過這個值之後,會删除舊的segment。預設為-1,表示無大小限制;
  • retention.ms:Segment最後一次寫入日志記錄的時間與目前時間的時間差,如果超過配置的值,則删除這個Segment。預設是168h,即7天;
  • log.retention.check.interval.ms:檢查是否有可删除日志的間隔時間,預設是300s,5分鐘;
  • file.delete.delay.ms:删除延遲時間,在真正删除檔案之前,繼續保留檔案的時間,預設為1分鐘。

2.5.1.1 如果日志增長很慢,delete政策下如何配置才能觸發檔案清理?

在delete政策下,我們如果要日志保留3天,可以這樣設定:

1retention.ms: 259200000 # 3天           

但是如果日志檔案增長很慢,3天之後,日志檔案大小還沒有達到retention.bytes的值,那麼就不會生成新的Segment檔案,仍然用的是同一個Segment檔案,是以不能直接删除Segment檔案。

如果想要真正達到清理3天之前的日志的效果,就需要優化一下配置了,可以添加設定:

1segment.ms: 86400000 # 24小時           

這樣,每隔24小時,隻要有新資料進來,就會産生新的Segment,進而可以觸發retention.ms的三天清除政策了。

總結:對于寫速度很慢的Topic,為了優化存儲,需要控制:segment.ms < retention.ms。

2.5.2 COMPACT政策

在這種模式下,日志不會被删除,但會被去重清理。這種模式下要求每個日志記錄都必須有key,kafka按照一定的時機清理Segment中的key:對于同一個key,隻保留最新的那個key。

每個Partition的日志,以Segment為機關,會被分為兩部分,已清理和未清理的部分。未清理的部分又可以分為可清理和不可清理。

對于compact清理政策,Segment可清理部分的清理思路是這樣的:

Kafka面試18連環炮:30+圖帶您看透Kafka

Kafka根據key來去重合并,對于可清理的部分,每個key保留一個最新的值。如果清理後的Segment太小,Kafka會按照一定的政策合并這些Segment,避免Segment過于碎片化。

更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

2.5.2.1 什麼情況下會用到compact政策政策?

比如,當我們按照一定的邏輯計算到每個使用者的粉絲數,并且每幾分鐘就更新一次,把使用者的粉絲數都存到Kafka中,任何需要使用者粉絲數的業務都可以從Kafka擷取資料。

此時就不能使用delete政策了,因為資料不能删,但是每次重複計算之後,使用者粉絲數都會多一份資料,我們隻是需要最新的那一個粉絲數,為此,可以把使用者id作為key,通過使用compact政策,把重複的曆史使用者粉絲數給清理掉。

更多關于compact測量隊配置參數:

  • min.cleanable.dirty.ratio:可以進行compact的髒資料的比例;
  • dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes),其中dirtyBytes表示可被清理部分的日志大小,cleanBytes表示已清理部分的日志大小。預設值是0.5,即髒資料達到了總資料的50%才進行清理,這樣配置可以減少清理次數,提高清理的成本效益,如果需要更及時的清理政策,可用調低該值;
  • min.compaction.lag.ms:設定一條消息投遞到Kafka後,多久時間内不會被compact。預設是0,表示不會根據消息投遞的時間來決定消息是否應該被compacted。這個配置可用于支援擷取一定時間内的曆史快照的業務場景。

對于日志增值很慢的topic,同樣需要配合segment.ms配置來配合清理日志。

看到這裡,是不是對Kafka的存儲原理有了比較深入的了解了呢?想看更多中間件的相關文章,歡迎關注我的部落格IT宅(itzhai.com)或者Java架構雜談公衆号。

3. 叢集

由于我的測試伺服器記憶體比較小,我們先來配置Kafka啟動記憶體,大家根據自己的實際情況進行配置:

1.修改bin目錄下的 zookeeper-server-start.sh,将初始堆的大小(-Xms)設定小一些

1export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"           
  1. 修改bin目錄下的kafka-server-start.sh檔案,将初始堆的大小(-Xms)設定小一些
1 export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"           

注意:listeners中的IP位址要配置為伺服器的IP位址

listeners=PLAINTEXT://192.168.1.101:9093

然後啟動三個Kafka執行個體,部署好之後,可以通過Zookeeper檢視叢集節點數:

Kafka面試18連環炮:30+圖帶您看透Kafka

即,目前的叢集情況如下:

Kafka面試18連環炮:30+圖帶您看透Kafka

3.1 建立叢集

每個Topic可以配置為多個分區,每個分區可以有多個副本,副本稱為Replica,在副本集合中會存在一個Leader副本,Leader負責所有的讀寫請求,其餘副本隻負責從Leader同步備份資料。

3.1.1 ✨建立TOPIC

接下來建立一個Topic,副本數設定為3,分區數設定為2:

Kafka面試18連環炮:30+圖帶您看透Kafka

3.1.2 ✨檢視所有TOPIC

Kafka面試18連環炮:30+圖帶您看透Kafka

3.1.3 ✨檢視TOPIC分區詳情

檢視剛剛建立的 itzhai-com-topic-1 Topic的資訊:

Kafka面試18連環炮:30+圖帶您看透Kafka

Topic資訊屬性說明:

  • Topic:主題名稱
  • TopicId:主題id
  • PartitionCount:主題分區數量
  • ReplicationFactor:副本數量
  • Configs:主題詳細配置,每一行配置表示一個分區資訊
  • Topic:主題名稱
  • Partition:分區号
  • Leader:分區的Leader副本,負責目前分區的所有讀寫請求
  • Replicas:存放分區備份的節點
  • Isr(In-Sync Replica):這個集合是Replicas的一個子集,列出目前還存活着,并且已經同步備份了該分區的節點

此時,伺服器狀态如下圖所示:

Kafka面試18連環炮:30+圖帶您看透Kafka

3.1.3.1 ISR是幹嘛的?

Isr(In-Sync Replica):是Replicas的一個子集,列出目前還存活着,并且已經同步備份了該分區的節點。

Isr中包括Leader副本,以及與Leader副本保持同步的Follower副本。

3.2 重新選主

副本會均勻配置設定到多個Broker節點上,當Leader節點挂了之後,會從副本集中選出一個新的副本作為Leader繼續對外提供服務。

下面我們來測試一下,我們把Broker-1給停掉(找到servier.properties中broker.id=1的程序):

1ps aux | grep server-1.properties
2kill 28329           

再次檢視Topic狀态:

Kafka面試18連環炮:30+圖帶您看透Kafka

發現Partition 1的Leader已經從Broker-1切換到了Broker-2,Broker-1已經從Isr副本集合中移除了。伺服器狀态如下所示:

Kafka面試18連環炮:30+圖帶您看透Kafka

3.3 總控制器

3.3.1 總控制器是幹嘛的?

我們再來看一下Kafka的叢集架構圖:

Kafka面試18連環炮:30+圖帶您看透Kafka

在Broker叢集中,會選舉出一個Controller總控制器。總控制器主要負責:

  • 監聽叢集資訊變更:
    • 監聽叢集變更:為Zookeeper的/brokers/ids節點添加BrokerChangeListener,用于處理Broker節點增減變更;
  • 監聽Topic變更:
    • 為Zookeeper的/brokers/topics節點添加TopicChangeListener,用于處理Topic增減變更;
    • 為Zookeeper的/admin/delete_topics節點添加TopicDeletionListener,用于處理删除Topic的事件;
    • 為Zookeeper的/brokers/topics/[topic]節點添加PartitionModificationsListener,用于監聽Topic分區配置設定變更;
  • 選舉Partition分區Leader:分區的Leader副本當機之後,Controller負責為該分區選舉一個新的Leader副本;
  • 更新叢集中繼資料資訊:感覺到分區的ISR集合有變更之後,Controller通知所有的Broker更新其中繼資料資訊。

Zookeeper中存儲了Kafka叢集資訊,可以從Zookeeper中檢視到目前總控制器是哪個Broker:

Kafka面試18連環炮:30+圖帶您看透Kafka

可以發現,id為0的Broker是目前的總控制器。

3.3.2 總控制器是如何選出來的?

首次選舉Controller:叢集啟動的時候,每個Broker節點都會嘗在Zookeeper中建立臨時節點/controller,最終隻會有一個節點能夠建立成功,這個節點就會作為Controller總控制器。

重新選舉Controller:當Controller所在的Broker發生故障之後,Zookeeper中的/controller臨時節點會被删除,/broker/ids中對應的Broker節點資訊業會被删除。其他Broker節點監聽這兩個Zookeeper節點,當監聽到/controller臨時節點消失了,就會嘗試往Zookeeper建立該節點,寫成功的那個Broker将會成為新的Controller。

3.3.3 TOPIC的最優LEADER副本是如何選舉出來的?

一般的,在分布式系統中,Leader的選舉算法很多,如Zab、Raft、Viewstamped Replication等。Kafka使用的Leader選舉算法更像是微軟的PacificA算法。

Controller負責為Topic選取Leader副本:Controller從ISR清單中選擇第一個分區作為Leader,因為ISR第一個分區可能是同步資料最多的副本,可以盡可能保證資料不丢失。

重新選舉Leader副本:當Controller監聽到/brokers/ids中的Broker節點消失的時候,會重新執行Leader選舉流程。

相關參數:

  • unclean.leader.election.enable:true表示當ISR清單所有副本都挂了之後,可以在ISR以外的副本選取Leader副本。進而可以提高可用性,但是可能會導緻丢失更多的資料;false表示隻能從ISR中選擇Leader副本。

3.3.3.1 Kafka的Topic Leader選舉機制有啥優勢?

與一般的少數服從多數選舉算法不同,Kafka通過使用ISR來實作選舉的,ISR的數量不需要超過副本數量的一半,進而使得在可靠性和吞吐量上面取得平衡,一般我們設定為一個大于1的值。

3.4 Rebalance機制

Rebalance機制是Kafka消費機制的核心。

當消費組消費者數量發生變化、或者消費組消費主題數量變化、主題分區數量變化等的時候,Kafka會重新配置設定消費者和分區的關系,也就是做一次Rebalance。

Kafka保證一個Topic分區隻會配給一個組内的消費者,而一個消費者可以消費多個分區。

關于Rebalance的具體原理,找到了一篇講的比較好的文章,可以參考:Apache Kafka Rebalance Protocol, or the magic behind your streams applications[2]

更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

3.4.1 什麼時候會觸發REBALANCE機制?

當發生以下情況時,會觸發Rebalance機制:

  1. 消費者的數量發生變化:
Kafka面試18連環炮:30+圖帶您看透Kafka
  1. 主題分區的數量發生變化:
Kafka面試18連環炮:30+圖帶您看透Kafka
  1. 消費組訂閱的Topic數量發生了變化:
Kafka面試18連環炮:30+圖帶您看透Kafka

3.4.2 KAFKA中有哪些REBALANCE的政策?

Rebalance政策主要有三種:Range、RoundRobin、StickyAssignor(粘性配置設定器)。在聲明消費者的時候可以指定:

1props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());           

屬于同一組的所有消費者必須聲明一個共同的政策。如果消費者嘗試加入配置設定配置與其他組成員不一緻的組,會引發如下異常:

1org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member’s supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.           

3.4.2.1 RangeAssignor[3]

範圍配置設定器,這是預設的政策。

範圍配置設定器是在每個主題基礎上工作的,對于每個主題,按照數字順序排列可用分區,使用組協調器配置設定的member_id按字典順序排列消費者。然後将分區數除于消費者總數,以确定配置設定給每個消費者的分區數,如果沒有均勻劃分,那麼前幾個消費者将有一個額外的分區。

如下例子:

Kafka面試18連環炮:30+圖帶您看透Kafka

假設有兩個消費者C0和C1,兩個主題T0和T1,每個主題有3個分區,使用範圍配置設定器最終配置設定結果:

  • C0 = {T0P0, T0P1, T1P0, T1P1}
  • C1 = {T0P2, T1P2}

RangeAssignor有何缺點?

我們假設消費組中的消費者數量多于主題的分區數量,則會出現以下情況:

Kafka面試18連環炮:30+圖帶您看透Kafka

可以發現C3消費者并沒有在消費任何分區的消息,并沒有盡可能地使用到所有的消費者。

3.4.2.2 RoundRobinAssignor[4]

循環配置設定器,讓消費組中的所有消費者平均配置設定可用分區。同樣的,首先按照順序排列可用分區和消費者。循環配置設定器把所有主題的所有可用分區輪訓地配置設定給訂閱它們的消費者。

假設所有消費者執行個體訂閱的主題都相同,則分區将均勻分布:

Kafka面試18連環炮:30+圖帶您看透Kafka

T0和T1的分區:T0P0, T0P1, T0P2, T1P0, T1P1, T1P2将以此輪訓的配置設定給C0和C1。

可以發現,RoundRobin盡可能地使用到了所有的消費者,把分區更均勻的配置設定給消費者。

上一節的例子使用RoundRobin政策,結果如下圖所示:

Kafka面試18連環炮:30+圖帶您看透Kafka

可以發現C2也被利用起來了。

RoundRobin有何缺點?

雖然RoundRobin盡可能的利用所有的消費者,但是一旦消費者數量發生變化觸發Rebalance時,會導緻更多的分區重配置設定。

3.4.2.3 StickyAssignor[5]

粘性配置設定政策,與RoundRobin類似,但是在Rebalance時,會遵循以下原則:

  • 分區盡可能保證分布均勻;
  • 分區配置設定盡可能保持不變更;

優先保證分布均勻。

使用StickyAssignor政策的情況下,假如C1挂了,那麼隻需要把原本C1的T0P1分區配置設定給C2即可:

Kafka面試18連環炮:30+圖帶您看透Kafka

3.4.3 叢集消費REBALANCE機制是如何工作的?[2]

Kafka的Rebalance流程會經曆以下幾個階段:

3.4.3.1 選擇組協調器階段

Kafka會為每個消費組選擇一個Broker來作為組協調器,組協調器負責監控消費組裡所有消費者的心跳,判斷機器是否下線,以及開啟消費者Rebalance。

消費組中的每個消費者在啟動的時候都會向Kafka叢集中的某個節點發送FindCoordinator請求來查找對應的組協調器GroupCoordinator,并與之建立網絡連接配接。

如何選擇組協調器?

Kafka會選擇消費分組正在使用的consumer_offsets分區對應的Broker作為ConsumerGroup的Coordinator。

消費分組寫消息的consumer_offsets分區号:

hash(ConsumerGroupId) % __consumer_offsets 主題的分區數。

3.4.3.2 消費者加入消費組階段

成功找到消費組對應的GroupCoordinator之後,就進入加入消費組階段。

此時消費者向GroupCoordinator發送JoinGroup請求,申請加入消費組,此時會啟動Rebalance協定。

Kafka面試18連環炮:30+圖帶您看透Kafka

Join Group 包含了一些消費者用戶端配置資訊,如session.timeout.ms和max.poll.interval.ms等,組協調器使用這些屬性進行消費者下線狀态判斷。另外,請求中包含了成員支援的用戶端協定清單,以及用于執行用戶端協定的中繼資料。

GroupCoordinator會從Consumer Group中選擇第一個加入消費組的消費者作為組長(Leader),并把消費組的情況發送給這個組長,組長負責在本地制定分區方案。

Kafka面試18連環炮:30+圖帶您看透Kafka

3.4.3.3 同步與執行分區方案階段

消費組的組長制定好分區方案後,給GroupCoordinator發送SyncGroup請求,并附加上制定好的分區作業,非組長則簡單的發送一個空請求。:

Kafka面試18連環炮:30+圖帶您看透Kafka

然後GroupCoordinator把分區方案響應給組裡的所有消費者。最終消費者連接配接指定的分區,并進行消息消費:

Kafka面試18連環炮:30+圖帶您看透Kafka

每個消費者定期向組協調器發送心跳請求,以保持會話狀态(相關配置:heartbeat.interval.ms)。如果此時正在進行Rebalance操作,組協調器會響應告知消費者需要重新加入組。

當叢集節點比較多的時候,Rebalance可能會花費比較多的時間,導緻消耗Broker伺服器的資源,影響消費性能,為此,盡量選擇在系統負載比較低的時候進行Rebalance。

注意:通過assign指定消費分區的情況下,Kafka不會進行Rebalance:

1consumer.assign(Collections.singletonList(new TopicPartition("itzhai-test-topic", 0)));

4. 消息的投遞

消費的時候,會從叢集的Leader節點進行讀寫請求。

4.1 生産消息如何投遞?

生産者将消息發送到Topic的某一個分區中,一般通過round-robin做簡單的負載均衡,也可以通過自定義分區器根據消息中的某一個關鍵字來做分區,後者使用更廣泛。

關于自定義分區器的例子,參考:Custom Partitioner in Kafka: Let’s Take a Quick Tour!. Retrieved from https://dzone.com/articles/custom-partitioner-in-kafka-lets-take-quick-tour[9]

我們可以直接使用Kafka提供的bash腳本來嘗試發送消息,接下來示範下。

4.1.1 ✨發送消息例子

Kafka面試18連環炮:30+圖帶您看透Kafka

4.2 生産者相關參數

生産者最重要的三個參數[1]是:

  • acks
  • compression
  • batch size

4.2.1 投遞ACK: ACKS

消息投遞的持久化機制。有如下幾個配置政策:

acks=0

acks=0:不需要ACK,生産者消息發出之後,不需要等待Broker的确認回複,就可以繼續發送下一條消息。

Kafka面試18連環炮:30+圖帶您看透Kafka

優點:性能高;

缺點:容易丢消息;

使用場景:适合對性能要求比較高但是對資料可靠性要求比較低的場景,如寫日志。

acks=1

acks=1: 生産者發出消息後,需要Leader副本的ACK,Leader将資料持久化到本地log之後,就确認回複,而不需要等待Follower副本寫入成功。這也是預設的配置值。

Kafka面試18連環炮:30+圖帶您看透Kafka

優點:比acks=0可靠,確定消息寫入到了Leader;

缺點:Leader挂掉的時候,Follower沒有成功備份資料,那麼消息會丢失;

使用場景:對消息可靠性有一定要求,但是不是很高,消息丢失之後通過專門的補償機制去保證資料的完整性,并且對性能要求高的場景。如訂單狀态更新消息,假如消息丢失了,還有定時任務去輪訓補償。

acks=all 或者 -1

acks=all或者-1: 需要等到min.insync.replicas個副本(包括Leader副本)都成功寫入消息,才進行确認回複。Leader挂了,觸發選舉機制,選舉政策是優先選舉同步成功的Follower節點為新的Leader。

假設min.insync.replicas=2,則有如下同步過程:

Kafka面試18連環炮:30+圖帶您看透Kafka

優點:消息的可靠性可以得到更大程度的保證;

缺點:性能更低;

使用場景:對消息可靠性要求很高的場景,不允許丢失消息,如金融業務。

min.insync.replicas

acks=all或者-1時,min.insync.replicas參數設定必須成功寫入日志的最小副本個數,如果達不到這個數量,那麼生産者将引發異常:NotEnoughReplicas 或者NotEnoughReplicasAfterAppend。

通過配合min.insync.replicas 和 acks 一起使用,你可以擁有更大的持久性保證。

如果要保證更高持久化可靠性,一般的,如果Topic的副本因子為3 ,那麼一般将 min.insync.replicas 設定為 2,acks設定為all,如果大多數副本沒有收到寫入,這将確定生産者引發異常。

4.2.2 重試: RETRIES和RETRY.BACKOFF.MS

retries用于配置重試次數,配置為大于0,則在發送失敗、網絡異常等場景下回觸發重新發送。重試可能會導緻消息的重複投遞,需要消費端做好消費幂等處理。

支援重試可能影響消息的順序性,比如:

Kafka面試18連環炮:30+圖帶您看透Kafka

a b兩個批次發送到Broker的單個分區中,a批次第一次發送失敗了,但是b批次發送成功了,導緻Broker先接收到b批次,然後重試發送a批次,最終導緻Broker分區中的a b批次消息順序改變了。

如果需要確定這種情況的順序性,請配置max.in.flight.requests.per.connection參數的值為1。

如果重試次數用完之前,就到達到了重試逾時時間(達到了delivery.timeout.ms配置的值),那麼将不繼續進行重試。一般的,使用者更願意使用delivery.timeout.ms來控制重試行為。

retry.backoff.ms參數則是用于控制重試間隔。

4.2.3 分批發送: BUFFER.MEMORY, BATCH.SIZE和LINGER.MS

設定分批發送每個批次的大小。

Kafka為了提高發送消息,将生産者請求傳輸的所有記錄組合成一個一個的批次進行分批發送,這類似于TCP中的Nagle算法。

這個暫存消息的發送緩沖區大小是通過buffer.memory參數進行設定的。

分批批次大小是通過batch.size參數進行設定的。

一旦擷取到batch.size大小的批次之後,就立刻發送出去。

linger.ms參數控制最多每間隔多久發送一個批次,如果在linger.ms間隔内就擷取到了完整的批次,那麼就會立刻發送出去。如果等到linger.ms時間,還沒有收集到完整的一批資料,那麼也會強制發送出去。

linger.ms預設值為0,表示消息會立即被發送出去,發送效率相對較低。

Kafka面試18連環炮:30+圖帶您看透Kafka

如果消息生産的速度太慢,為了避免消息一直發送不出去,注意留一下linger.ms配置的發送間隔,可以适當縮小發送間隔。更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

4.3 Kafka是如何保證資料的可靠性?

4.3.1 生産端

在Kafka 0.8.0之前,是沒有副本的概念的,資料可能會丢失,隻能存儲一些不重要的資料。

從0.l8.0banb開始引入了分區副本,每個分區可以配置幾個副本。Kafka的分區多副本機制是可靠性保證的核心。

為了保證可靠性,我們可以使用同步發送,根據不同的場景,配置合理的acks參數值。

為了嚴格保證可靠性,以下是需要的配置:

  • 生産者:acks=all,并且使用同步阻塞的方式發送消息;
  • Topic:replication.factor >= 3,min.insync.replicas >= 2;
  • Broker:unclean.leader.election.enable=false,確定ISR集合中沒有可用的線上副本時,不會去選舉ISR之外的副本作為新的Leader。

unclean.leader.election.enable設定為true,意味着允許選舉非ISR集合的副本作為新的Leader,即使配置了acks=all,新選舉出來的Leader也可能消息是落後的。

如下圖,原本ISR中有三個副本,某個時間之後,Follower1脫離了ISR,并且落後Leader比較多:

此時ISR中的副本都下線了,unclean.leader.election.enable=true,那麼,會把Follower1選舉為新的 Leader:

此時新的Leader副本開始接收消息,假如原來的Leader此後又恢複了,稱為了新的Follower副本,那麼會開始嘗試從新的Leader副本同步消息,此時這個新的 Follower副本的LEO比新的Leader還要大,最終會把這個新的Follower副本的日志進行截斷,保持與心Leader一緻,最終導緻資料丢失:

4.3.2 消費端

對于消費端,為了避免丢失未處理完的消息,需要設定為手動送出。

4.4 Kafka是如何保證資料的一緻性?

保證資料一緻性,也就是無論是對于老的Leader,還是新選舉出來的Leader,消費者都需要讀到一樣的資料。

為了支援以上特性,Kakfa引入了HW(High Watermark)高水位的概念。ISR中每個副本最後的那個日志偏移量稱為LEO(Log End Offset),HW的取值為ISR集合中最小的LEO,消費者隻能消費到HW對應的日志。有點抽象?IT宅來給大家上圖,一看就懂:

Kafka面試18連環炮:30+圖帶您看透Kafka

如上圖,ISR中有三個副本,Replica 0為Leader,副本0的消息3和消息4都沒有完全同步給其他副本,是以HW在消息2處,消費者隻能消費到消息2以及之前的消息。

通過引入HW,就避免讓消費者消費到還沒有完全同步到ISR中所有副本的消息,避免由于切換Leader導緻能夠讀取到的消息變少了,進而導緻資料不一緻問題。

為了避免部分副本寫入速度太慢,導緻影響消費者消費消息的及時性,可以配置參數replica.lag.time.max.ms參數,指定副本在複制消息時可被允許的最大延遲時間。如果超過這個時間副本還沒有同步好消息,那麼副本就會被剔出ISR集合。

HW是用于控制消費行為的,即使acks設定為0,超過HW的消息也是不能被消費者消費端。

講完了消息的投遞,我們接下來講講消息的消費。更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

5. 消息的消費

5.1 叢集消費與廣播消費5.1.1 KAFKA中的叢集消費(單點傳播消費)

Kafka面試18連環炮:30+圖帶您看透Kafka

如上圖,每個ConsumerGroup裡面的消費者是一個叢集,同一個ConsumerGroup的消費者共同消費Topic的消息,同一個Topic的一條消息隻能被同一個ConsumerGroup的某一個Consumer消費,不能被重複消費,如果C0消費了一條消息,那麼C1和C2就不會再消費這條消息了。要實作叢集消費,隻要把所有Consumer放到同一個ConsumerGroup中就可以了。

✨ 叢集消費例子

Kafka面試18連環炮:30+圖帶您看透Kafka

我們啟動了一個消費者,通過group.id參數指定消費分組arthinking來消費消息了,進而達到了叢集消費的效果。

5.1.2 KAFKA中的廣播消費

同一個Topic的一條消息可以被多個ConsumerGoup重複消費。如果要實作廣播消費,隻需要把Consumer放到不同的ConsumerGroup中就可以了。

✨ 廣播消費例子

為了實作廣播消費效果,我們繼續啟用新的消費組消費即可:

Kafka面試18連環炮:30+圖帶您看透Kafka

5.2 Kafka的消費進度如何次元

5.2.1 消費進度相關指令

我們現在來看一下消費組的消費進度:

✨ 檢視消費組消費進度

Kafka面試18連環炮:30+圖帶您看透Kafka

可以發現,在arthinking消費分組中,P0和P1分區都正在被同一個消費者消費,這裡可以看到詳細的消費進度。

我們列出所有的Topic,發現有一個消費主題__consumer_offsets,這個主題是用來維護消費進度的。

✨列出所有Topic

Kafka面試18連環炮:30+圖帶您看透Kafka

我們看看__consumer_offsets這個Topic的詳情:

Kafka面試18連環炮:30+圖帶您看透Kafka

可以發現,這個Topic有50個Partition,副本數為1。topic配置的清理政策是compact,即總是保留最新的key。

5.2.3 __CONSUMER_OFFSETS

__consumer_offsets這個Topic就是用于維護消費組的消費進度的。__consumer_offsets中儲存的也是普通的Kafka消息,主要保留三類消息消息:

  • Consumer group組中繼資料消息,如groupId,組成員狀态,成員配置資訊等;這類消息在Group Rebalance的時候寫入;
  • Consumer group位移消息,存儲消費組的消費進度;這類消息在送出消費進度的時候寫入;
  • Tombstone消息或Delete mark消息。每當Consumer Group下已經沒有任何激活的成員并且所有位移資料都被删除時,Kafka就會将該Group狀态設定為Dead,并發送一條tombstone消息,表明要徹底删除這個Group的資訊。這類消息在Kafka背景線程掃描并删除過期位移或者__consumer_offsets分區副本重配置設定的時候寫入。

這裡我們主要關注的就是Consumer group的位移消息。該消息的key的格式是:groupId + topic + partition分區号,即,每個topic的每個分區,針對不同的消費分組,都會存儲一個消費進度。value是消費偏移量offset。

__consumer_offset Topic相關配置參數:

  • offsets.topic.num.partitions:分區數量,預設為50;
  • offsets.topic.replication.factor:副本因子,預設為1。

推薦副本因子設定成>1,以提供資料存儲的可靠性。

5.3 消費者是如何送出offset的?

5.3.1 自動送出 ENABLE.AUTO.COMMIT

通過enable.auto.commit參數,可以控制是否自動送出offset,預設為true。

如果設定為false,則消費完成之後,記得手動送出ack,否則,每次重新開機消費者之後,會繼續從未送出的位置繼續重複消費消息。

auto.commit.interval.ms配置自動送出的時間間隔。

更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

自動送出會有什麼問題?

假設設定的自動送出時間間隔為1秒,取出一批資料之後,需要5秒才能消費完,但是還沒消費完,程式就挂了。導緻這批未被消費部分的資料再也沒有機會被消費到了,即消息錯過消費。

假設取出的一批資料為10條,假設成功處理了兩條消息,還沒有觸發自動送出offset,消費程式就挂了,下次重新開機消費程式之後,會導緻這兩條消息再次被消費到,即消息重複消費。

5.3.2 同步送出&異步送出

如果設定為手動送出,需要調用送出的API。在kafka-clients的API中,kafka為我們提供了同步送出和異步送出的API。

同步送出:

1consumer.commitSync();
           

異步送出:

1consumer.commitAsync(new OffsetCommitCallback() {
2    @Override
3    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
4        if (exception != null) {
5            log.error("送出消費進度異常,offsets:{}", offsets, exception);
6        }
7    }
8});
           

5.4 有哪些消費曆史消息的方法?

5.4.1 指定分區消費

指定消費0分區:

1String TOPIC_NAME = "itzhai-com-test1";
2// 指定0分區
3consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
           

5.4.2 消息回溯消費

指定0分區,從頭消費:

1String TOPIC_NAME = "itzhai-com-test1";
2consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
3// 從頭消費
4consumer.seekToBeginning(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
           

5.4.3 指定OFFSET消費

1String TOPIC_NAME = "itzhai-com-test1";
2consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
3// 指定偏移量
4consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);、
           

5.4.4 指定時間點消費

從指定時間點往後找到第一條消息的偏移量,開始消費。最終都是調用指定offset進行消費。

相關例子:

1// 消費8小時前的消息
 2long beginConsumeTime = System.currentTimeMillis() - 1000 * 60 * 60 * 8;
 3
 4for (PartitionInfo partitionInfo : partitionInfos) {
 5    seekMap.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()), beginConsumeTime);
 6}
 7consumer.assign(seekMap.keySet());
 8
 9Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
10
11for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
12    TopicPartition key = entry.getKey();
13    OffsetAndTimestamp value = entry.getValue();
14    if (key == null || value == null) {
15        continue;
16    }
17    consumer.seek(key, value.offset());
18}
19
20while (true) {
21    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
22    for (ConsumerRecord<String, String> record : records) {
23        System.out.println("消費消息: partition={}, offset={}, key={}, value={}");
24        log.info("消費消息: partition={}, offset={}, key={}, value={}",
25                record.partition(), record.offset(), record.key(), record.value());
26    }
27}
           

5.4.4.1 基于時間點消費底層是如何實作的?

由上面的例子可以發現,基于時間的消費,也是先找到對應時間的消息offset,最終都是基于offset去消費的。

5.5 消費者相關參數

5.5.1 消費送出: ENABLE.AUTO.COMMIT

參考 5.3.1 自動送出 enable.auto.commit

5.5.2 最大拉取消息數: MAX.POLL.INTERVAL.MS

每次poll拉取的最大消息數,根據消費處理速度進行配置。如果消費者消費速度很快,則可以設定的大點。

5.5.3 消費者線上判斷: HEARTBEAT.INTERVAL.MS 和 SESSION.TIMEOUT.MS

heartbeat.interval.ms參數配置消費者給Broker發送心跳的間隔時間。當Broker進行Rebalance的時候,接收到了消費者的心跳,将把Rebalance方案響應給Consumer。

session.timeout.msBroker等待消費者發送心跳的最大時間,如果超過了這個時間,消費者就會被判斷為出問題,會被踢出消費組,導緻該消費者占用的Partition被重新配置設定給其他消費者。

5.5.4 最大POLL時間間隔: MAX.POLL.INTERVAL.MS

如果兩次poll時間超過這個間隔,Broker就會認為這個消費者消費太慢了,會把消費者剔除消費組,讓出分區,并把分區配置設定給其他的消費者進行消費。

5.5.4.1 為什麼生産的消費者突然就不消費消息了?

如果消費者每次啟動了,消費若幹條消息就不再消費消息了,而生産者是有不斷生産消息的,就需要确認消費者是否被T掉了,可能是兩次poll的時間超過了max.poll.interval.ms配置的值。為了解決這個問題,可以:

  • 增加max.poll.interval.ms配置的時間,建議不要配置的太大,不然就沒辦法基于這個參數判斷消費者的消費能力了,導緻沒法把分區重配置設定給消費能力更好的消費者;
  • 減小max.poll.interval.ms,即每次poll拉取的消息數降低點,避免消費時間過長;
  • 檢查消費者消費性能是否有瓶頸,根據實際情況進行優化。
  • 更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

5.5.5 新消費組是否從頭消費: AUTO.OFFSET.RESET

auto.offset.reset用于配置新的消費組的消費行為,配置新的消費組是否從頭開始消費分區,還是隻消費增量的消息,可選配置:

  • latest:隻消費增量的消息,即消費消費組啟動後分區接收到的消息,預設為該配置;
  • earliest:從頭開始消費分區消息,後續會根據offset記錄消費進度,消費增量的消息。

6. Kafka常見問題

6.1 Kafka為啥性能這麼高?

大家知道RocketMQ是基于Kafka改造而來的,是以Kafka的高性能原因與RocketMQ類似,以下是Kafka高性能的原因:

  • 磁盤順序讀寫:Kafka寫消息都是直接追加到檔案末尾的,不會有随機寫的情況,另外,不會随機删除日志,隻會按照删除政策删除一整段的曆史消息。
  • 與RocketMQ不同的是,kafka不會像RocketMQ那樣預配置設定一個很大的檔案來存儲消息,Kafka的順序寫可以了解為分段順序寫的,一般一台伺服器隻部署Kafka就更接近與完全順序;
  • 批量讀寫資料,以及壓縮傳輸:
  • Rocket發送消息底層是分批發送的,提高了傳輸和存儲的效率;
  • 資料零拷貝技術:通過mmap記憶體映射,以及sendfile,減少了資料拷貝次數,提高了資料發送效率。

6.2 Kafka支援延時隊列嗎?

很遺憾,Kafka中并沒有像RocketMQ中提供的那種延時隊列功能,不過可以參考RocketMQ自己實作一個延時隊列。RocketMQ不正是基于Kafka演變而來的麼。

參考做法:按延時時間分為不同的延時等級,分别建立對應的延時主題:delay_1s, delay_10s, delay_30s…通過定時任務輪訓這些主題,根據消息的建立時間,對比判斷主題的隊列是否到期,如果到期了,就把消息轉發給具體的業務處理的topic進行處理,由于排在前面的消息肯定時候最早到期的,是以可以很快的找到所有要處理的消息,處理完畢。

6.3 Kafka支援事務消息嗎?

Kafka中有事務的概念,但是并不支援類似RocketMQ中的分布式事務消息,Kafka中的事務隻是用于保證發送多條消息時候,同時成功或者失敗。

有時候,我們在做完一次業務處理之後,需要發多條不同的消息給不同的消費方,這個時候要確定消息同時發送成功的話,就可以使用Kafka的事務了[6]:

1Properties producerProps = new Properties();
 2producerProps.put("enable.idempotence", "true");
 3producerProps.put("transactional.id", "prod-1");
 4KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
 5try {
 6    // 開啟事務
 7    producer.beginTransaction();
 8    producer.send(new ProducerRecord<>("itzhai‐topic", "123", "itzhai.com"));
 9    producer.send(new ProducerRecord<>("arthinking‐topic", "123", "Java架構雜談"));
10    producer.send(new ProducerRecord<>("itread‐topic", "123", "abc"));
11    // 送出事務
12    producer.commitTransaction();
13} catch (Exception e) {
14    // 復原事務
15    producer.abortTransaction();
16} finally {
17    producer.close();
18}
           

6.4 Kafka如何避免重複消費?

避免重複消費,是任何消息隊列中間件都不可避免遇到的問題,我麼接下來說下在Kafka中導緻重複消費的原因和解決方法。

6.4.1 生産端

如果生産端配置了重試機制,那麼在網絡不穩定,或者發送逾時的情況下,就會嘗試重新發送,這可能會導緻Broker接收到重複的消息。

6.4.2 消費端

當消費端設定為自動送出Offset的時候,可能在消費一批資料過程中,還沒來得及送出,服務就挂了,下次重新開機消費者,就會導緻重複消費該批消息。

為了避免重複消費,在消費端,需要做好幂等處理。

6.5 消息堆積如何處理?

産生消息堆積的原因,不外乎兩種:

  • 消費端程式有bug,或者資料有問題,導緻一直消費失敗,消息一直得不到正确處理進而導緻消息堆積;
  • 消費者的消費性能太差,或者消費消息的時間太長了,導緻消息堆積着來不及消費。

針對第一種情況,為了避免消息隊列,可以把這種消息單獨放到死信隊列中做特殊處理。由于Kafka中并沒有提供類似RocketMQ的那種死信隊列[7],是以需要專門準備一個這樣的主題充當死信隊列。進入死信隊列的消息需要進行分析并處理掉消費不成功的問題。

更進一步的,也可以參考RocketMQ,先把消費失敗的消息放到一個專門負責重試的重試隊列中,執行多次重試可以通過建立多個主題來完成,如果重試隊列還是消費失敗,則把消息放入死信隊列。具體做法可以參考此文:Building Reliable Reprocessing and Dead Letter Queues with Apache Kafka. Retrieved from https://eng.uber.com/reliable-reprocessing/[8]

Kafka面試18連環炮:30+圖帶您看透Kafka

針對第二種情況,由于分區數量是固定的,即使增加消費者,也沒辦法加快消費速度。為了快速修複問題,可以修改消費者程式,把消息快速轉發到另一個新的主題中,并給這個主題設定很多分區,最後啟動對應數量的消費者進行消費:

Kafka面試18連環炮:30+圖帶您看透Kafka

6.6 消息順序性如何保證?

為了保證消息消費的順序性,最簡單的做法就是:

  • 發送端設定同步發送,避免異步發送導緻亂序;
  • 消費端消息統一發到同一個分區,通過一個消費者去消費消息。

但是這樣會導緻消息處理的效率很低,拖慢系統的吞吐量。

為了提高性能,需要考慮其他的思路。RocketMQ中,提供給了MessageQueueSelector接口,可以把具有相同辨別(如訂單号)的消息統一發到同一個消息隊列中,參考IT宅上一篇文章(高并發異步解耦利器:RocketMQ究竟強在哪裡?)。我們也可以考慮類似的思路:按照消息的某種辨別,把相同辨別的消息投遞到同一個分區,進而保證同同一個辨別的消息在分區中是順序消費的。

更多圖解系列文章,歡迎關注我的關注公衆号:Java爛豬皮。

6.7 消息如何回溯消費?

在某些場景下,如消費程式有問題時,修複了消費程式之後,想要重新消費之前已經消費過的消息,就需要用到回溯消息的功能更了。回溯消息支援指定offset消費,也支援指定時間點消費,參考5.4 有哪些消費曆史消息的方法。

6.8 如何實作消息傳遞保障?

對于消息中間件,可以提供多種傳遞保障:

  • 最多一次,消息可能會丢失,但絕對不會重發;
  • 至少一次,消息不會丢失,但有可能會導緻重發;
  • 正好一次,每個消息傳遞一次且僅一次。

在Kafka可以通過acks參數值控制傳遞保障行為:

  • 最多一次:acks=0
  • 至少一次:acks=all 或者 -1
  • 正好一次:acks=all 或者 -1,消費端加上消費幂等性保證。當然,也可以使用Kafka的幂等性投遞來實作。

Kafka中的幂等性投遞消息是如何實作的?

相關參數:enable.idempotence

當設定為“true”時,生産者将確定隻會投遞一條消息到Broker中。如果為“false”,則生産者則可能會由于網絡等問題導緻重試投遞,導緻重複消息。請注意,啟用幂等性要求 max.in.flight.requests.per.connection 小于或等于 5(保留任何允許值的消息排序),retries大于 0,并且acks必須為“all”。

實作原理:Kafka每次發送消息的時候,會給消息生成PID和Sequence Number,一并發送給Broker,Broker根據PID和Sequence Number判斷生産者發送過來的消息是否相同,隻有不相同的才會接收并存儲起來。

References

[1]: Kafka 3.0 Documentation. Retrieved from https://kafka.apache.org/documentation/#configuration

[2]: Apache Kafka Rebalance Protocol, or the magic behind your streams applications. Retrieved from https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2

[3]: Class RangeAssignor. Retrieved from https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

[4]: Class RoundRobinAssignor. Retrieved from https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html

[5]: Class StickyAssignor. Retrieved from https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html

[6]: Exactly Once Processing in Kafka with Java. Retrieved from https://www.baeldung.com/kafka-exactly-once

[7]: Kafka Connect 101: Error Handling and Dead Letter Queues. Retrieved from https://www.youtube.com/watch?v=KJUlnmEjbTY

[8]: Building Reliable Reprocessing and Dead Letter Queues with Apache Kafka. Retrieved from https://eng.uber.com/reliable-reprocessing/

[9]: Custom Partitioner in Kafka: Let’s Take a Quick Tour!. Retrieved from https://dzone.com/articles/custom-partitioner-in-kafka-lets-take-quick-tour

為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。

大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!

Kafka面試18連環炮:30+圖帶您看透Kafka

歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。

每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!

Kafka面試18連環炮:30+圖帶您看透Kafka

作者:arthinking

出處:Kafka必知必會18問:30+圖帶您看透Kafka | HeapDump性能社群