天天看點

kafka消費者那些事兒

作者:JAVA旭陽

前言

消息的消費一般有兩種模式,推模式和拉模式。推模式是服務端主動将消息推送給消費者,而拉模式是消費者主動向服務端發起請求來拉取消息。kakfa采用的是拉模式,這樣可以很好的控制消費速率。那麼kafka消費的具體工作流程是什麼樣的呢?

消費者消費規則

kafka是以消費者組進行消費,一個消費者組,由多個consumer組成,他們和topic的消費規則如下:

kafka消費者那些事兒
  • topic的一個分區隻能被消費組中的一個消費者消費。
  • 消費者組中的一個消費者可以消費topic一個或者多個分區。

通過這種分組、分區的消費方式,可以提高消費者的吞吐量,同時也能夠實作消息的釋出/訂閱模式和點對點兩種模式。

消費者整體工作流程

消費者消費總體分為兩個步驟,第一步是制定消費的方案,就是這個組下哪個消費者消費哪個分區,第二個是建立網絡連接配接,擷取消息資料。

一、制定消費方案

kafka消費者那些事兒
  1. 消費者consumerA,consumerB, consumerC向kafka叢集中的協調器coordinator發送JoinGroup的請求。coordinator主要是用來輔助實作消費者組的初始化和分區的配置設定。
  • coordinator老大節點選擇 = groupid的hashcode值 % 50( __consumer_offsets内置主題位移的分區數量)例如: groupid的hashcode值 為1,1% 50 = 1,那麼__consumer_offsets 主題的1号分區,在哪個broker上,就選擇這個節點的coordinator作為這個消費者組的老大。消費者組下的所有的消費者送出offset的時候就往這個分區去送出offset。
  1. 選出一個 consumer作為消費中的leader,比如上圖中的consumerB。
  2. 消費者leader制定出消費方案,比如誰來消費哪個分區等,有Range分區政策、RoundRobin分區政策等。
  3. 把消費方案告訴給coordinator
  4. 最後coordinator就把消費方案下發給各個consumer, 圖中隻畫了一條線,實際上是會下發到各個consumer。

二、消費者消費細節

現在已經初始化消費者組資訊,知道哪個消費者消費哪個分區,接着我們來看看消費者細節。

kafka消費者那些事兒
  1. 消費者建立一個網絡連接配接用戶端ConsumerNetworkClient, 發送消費請求,可以進行如下配置:
  • fetch.min.bytes: 每批次最小抓取大小,預設1位元組
  • fetch.max.bytes: 每批次最大抓取大小,預設50M
  • fetch.max.wait.ms:最大逾時時間,預設500ms
  1. 發送請求到kafka叢集
  2. 擷取資料成功,會将資料儲存到completedFetches隊列中
  3. 消費者從隊列中抓取資料,根據配置max.poll.records一次拉取資料傳回消息的最大條數,預設500條。
  4. 擷取到資料後,經過反序列化器、攔截器後,得到最終的消息。
  5. 最後一步是送出儲存消費的位移offset,也就是這個消費者消費到什麼位置了,這樣下次重新開機也可以繼續從這個位置開始消費,關于offset的管理後面詳細介紹。

消費者分區政策

前面簡單提到了消費者組初始化的時候會對分區進行配置設定,那麼具體的配置設定政策是什麼呢,也就是哪個消費者消費哪個分區資料?

kafka有四種主流的分區配置設定政策: Range、RoundRobin、Sticky、CooperativeSticky。可以通過配置參數partition.assignment.strategy,修改分區的配置設定政策。預設政策是Range + CooperativeSticky。Kafka可以同時使用多個分區配置設定政策。

  1. Range 分區政策
kafka消費者那些事兒
  • Range分區 是對每個 topic 而言的。對同一個 topic 裡面的分區按照序号進行排序,并對消費者按照字母順序進行排序。
  • 通過 partitions數/consumer數 來決定每個消費者應該消費幾個分區。如果除不盡,那麼前面幾個消費者将會多消費 1 個分區。

如上圖所示:有 7 個分區,3 個消費者,排序後的分區将會是0,1,2,3,4,5,6;消費者排序完之後将會是C0,C1,C2。7/3 = 2 餘 1 ,除不盡,那麼 消費者 C0 便會多消費 1 個分區。 8/3=2餘2,除不盡,那麼C0和C1分别多消費一個。

這種方式容易造成資料傾斜!如果有 N 多個 topic,那麼針對每個 topic,消費者 C0都将多消費 1 個分區,topic越多,C0消費的分區會比其他消費者明顯多消費 N 個分區。

  1. RoundRobin 分區政策

RoundRobin 針對叢集中所有topic而言,RoundRobin 輪詢分區政策,是把所有的 partition 和所有的consumer 都列出來,然後按照 hashcode 進行排序,最後通過輪詢算法來配置設定 partition 給到各個消費者。

kafka消費者那些事兒
  1. Sticky 和Cooperative Sticky分區政策

Sticky是粘性的意思,它是從 0.11.x 版本開始引入這種配置設定政策,首先會盡量均衡的放置分區到消費者上面,在出現同一消費者組内消費者出現問題的時候,在rebalance會盡量保持原有配置設定的分區不變化,這樣可以節省開銷。

Cooperative Sticky和Sticky類似,但是它會将原來的一次大規模rebalance操作,拆分成了多次小規模的rebalance,直至最終平衡完成,是以體驗上會更好。

關于什麼是rebalance繼續往下看你就知道了。

消費者再均衡

上面也提到了rebalance,也就是再均衡。當kafka發生下面的情況會進行在均衡,也就是重新給消費者配置設定分區:

  • 有新的消費者加入消費組。 
  • 有消費者當機下線,消費者并不一定需要真正下線,例如遇到長時間的 GC 、網絡延遲導緻消費者長時間未向Group Coordinator發送心跳等情況時,GroupCoordinator 會認為消費者己下線。 
  • 有消費者主動退出消費組。
  • 消費組所對應的Group Coorinator節點發生了變更。 
  • 消費組内所訂閱的任一主題或者主題的分區數量發生變化。

消費者位移offset管理

消費者需要儲存目前消費到分區的什麼位置了,這樣哪怕消費者故障,重新開機後也能繼續消費,這就是消費者的維護offset管理。

一、消費者位移offset存儲位置

消費者位移offset存儲在哪呢?

  • kafka0.9版本之前,consumer預設将offset儲存在Zookeeper中
  • 從0.9版本開始,consumer預設将offset儲存在Kafka一個内置的topic中,該topic為__consumer_offsets,這樣可以大量減少和zookeeper的互動。
  • __consumer_offsets 主題裡面采用 key 和 value 的方式存儲資料。key 是 group.id+topic+分區号,value 就是目前 offset 的值。

如何檢視__consumer_offsets主題内容?

  • 在配置檔案 config/consumer.properties 中添加配置 exclude.internal.topics=false,預設是 true,表示不能消費系統主題。為了檢視該系統主題資料,是以該參數修改為 false。
  • 檢視消費者消費主題__consumer_offsets。
bin/kafka-console-consumer.sh --topic 
__consumer_offsets --bootstrap-server hadoop102:9092 --
consumer.config config/consumer.properties --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm
atter" --from-beginning
## topic1 1号分區
[offset,topic1,1]::OffsetAndMetadata(offset=7, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)
## topic1 0号分區
[offset,topic1,0]::OffsetAndMetadata(offset=8, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)           

二、消費者位移offset送出儲存模式

消費者是如何送出儲存位移offset呢?

  1. 自動送出

為了使我們能夠專注于自己的業務邏輯,kafka預設提供了自動送出offset的功能。這個由消費者用戶端參數 enable.auto.commit 配置, 預設值為 true 。當然這個預設的自動送出不是每消費一條消息就送出一次,而是定期送出,這個定期的周期時間由用戶端參數 auto.commit.interval.ms 配置,預設值為 5 秒。

kafka消費者那些事兒
  • 消費者每隔 5 秒會将拉取到的每個分區中最大的消息位移進行送出。
  • 自動位移送出 的動作是在 poll() 方法的邏輯裡完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移送出,如果可以,那麼就會送出上一次輪詢的位移。

自動送出會帶來什麼問題?

自動送出消費位移的方式非常簡便,但會帶來是重複消費的問題。

kafka消費者那些事兒

假設剛剛送出完一次消費位移,然後拉取一批消息進行消費,在下一次自動送出消費位移之前,消費者崩潰了,那麼又得從上一次位移送出的地方重新開始消費,這樣便發生了重複消費的現象。

我們可以通過減小位移送出的時間間隔來減小重複消息的視窗大小,但這樣 并不能避免重複消費的發送,而且也會使位移送出更加頻繁。

  1. 手動送出

很多時候并不是說拉取到消息就算消費完成,而是需要将消息寫入資料庫、寫入本地緩存,或者是更 加複雜的業務處理。在這些場景下,所有的業務處理完成才能認為消息被成功消費。手動的送出方式可以讓開發人員根據程式的邏輯在合适的地方進行位移送出。

// 是否自動送出 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);           

手動送出可以細分為同步送出和異步送出,對應于 KafkaConsumer 中的 commitSync()和 commitAsync()兩種類型的方法。

  • 同步送出方式

同步送出會阻塞目前線程,一直到送出成功,并且會自動失敗重試(由不可控因素導緻,也會出現送出失敗),它必須等待offset送出完畢,再去消費下一批資料。

// 同步送出 offset
consumer.commitSync();           
  • 異步送出方式

異步送出則沒有失敗重試機制,故有可能送出失敗。它發送完送出offset請求後,就開始消費下一批資料了。

// 異步送出 offset
consumer.commitAsync();           

那麼手動送出會帶來什麼問題呢?可能會出現"漏消息"的情況。

kafka消費者那些事兒

設定offset為手動送出,當offset被送出時,資料還在記憶體中未落盤,此時剛好消費者線程被kill掉,那麼offset已經送出,但是資料未處理,導緻這部分記憶體中的資料丢失。

我們可以通過消費者事物來解決這樣的問題。

其實無論是手動送出還是自動送出,都有可能出現消息重複和是漏消息,與我們的程式設計模型有關,需要我們開發的時候根據消息的重要程度來選擇合适的消費方案。

消費者API

一個正常的消費邏輯需要具備以下幾個步驟:

(1)配置消費者用戶端參數及建立相應的消費者執行個體;

(2)訂閱主題;

(3)拉取消息并消費;

(4)送出消費位移 offset;

(5)關閉消費者執行個體。

public class MyConsumer { 
    public static void main(String[] args) { 
        Properties props = new Properties(); 
        // 定義 kakfa 服務的位址,不需要将所有 broker 指定上 
        props.put("bootstrap.servers", "doitedu01:9092"); 
        // 制定 consumer group 
        props.put("group.id", "g1"); 
        // 是否自動送出 offset 
        props.put("enable.auto.commit", "true"); 
        // 自動送出 offset 的時間間隔 
        props.put("auto.commit.interval.ms", "1000");
        // key 的反序列化類 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // value 的反序列化類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // 如果沒有消費偏移量記錄,則自動重設為起始 offset:latest, earliest, none 
        props.put("auto.offset.reset","earliest");
    
    	// 定義 consumer 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
        // 消費者訂閱的 topic, 可同時訂閱多個 
        consumer.subscribe(Arrays.asList("first", "test","test1"));
        while (true) { 
            // 讀取資料,讀取逾時時間為 100ms 
            ConsumerRecords<String, String> records = consumer.poll(100); 
            for (ConsumerRecord<String, String> record : records) 
            	System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
        } 
	} 
}

                                    
  1. 訂閱主題
  • 指定集合方式訂閱主題
consumer.subscribe(Arrays.asList(topicl )); 
consumer subscribe(Arrays.asList(topic2))           
  • 正則方式訂閱主題

如果消費者采用的是正規表達式的方式(subscribe(Pattern))訂閱, 在之後的過程中,如果 有人又建立了新的主題,并且主題名字與正表達式相比對,那麼這個消費者就可以消費到 新添加的主題中的消息。

consumer.subscribe(Pattern.compile ("topic.*" ));           
  • 訂閱主題指定分區

消費者不僅可以通過 KafkaConsumer.subscribe() 方法訂閱主題,還可直接訂閱某些主題的指定分區。

consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;           
  1. 取消訂閱

通過unsubscribe()方法采取消主題的訂閱。

consumer.unsubscribe();            
  1. poll()拉取消息

kafka 中的消息消費是一個不斷輪詢的過程,消費者所要做的就是重複地調用 poll() 方法, poll() 方法傳回的是所訂閱的主題(分區)上的一組消息。

對于 poll () 方法而言,如果某些分區中沒有可供消費的消息,那麼此分區對應的消息拉取的結果就為空。

public ConsumerRecords<K, V> poll(final Duration timeout)           

逾時時間參數 timeout ,用來控制 poll() 方法的阻塞時間,在消費者的緩沖區裡沒有可用資料時會發生阻塞。

  1. 指定位移消費

有些時候,我們需要一種更細粒度的掌控,可以讓我們從特定的位移處開始拉取消息,而 KafkaConsumer 中的 seek( 方法正好提供了這個功能,讓我們可以追前消費或回溯消費。

public void seek(TopicPartiton partition,long offset)           

消費者重要參數

最後我們總結一下消費者中重要的參數配置。

參數名稱 描述
bootstrap.servers 向 Kafka 叢集建立初始連接配接用到的 host/port 清單。

key.deserializer 和

value.deserializer

指定接收消息的 key 和 value 的反序列化類型。一定要寫全

類名。

group.id 标記消費者所屬的消費者組。
enable.auto.commit 預設值為 true,消費者會自動周期性地向伺服器送出偏移量。
auto.commit.interval.ms 如果設定了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 送出的頻率,預設 5s。
auto.offset.reset 當 Kafka 中沒有初始偏移量或目前偏移量在伺服器中不存在(如,資料被删除了),該如何處理? earliest:自動重置偏移量到最早的偏移量。 latest:預設,自動重置偏移量為最新的偏移量。 none:如果消費組原來的(previous)偏移量不存在,則向消費者抛異常。 anything:向消費者抛異常。
offsets.topic.num.partitions __consumer_offsets 的分區數,預設是 50 個分區。
heartbeat.interval.ms Kafka 消費者和 coordinator 之間的心跳時間,預設 3s。該條目的值必須小于 session.timeout.ms ,也不應該高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消費者和 coordinator 之間連接配接逾時時間,預設 45s。超過該值,該消費者被移除,消費者組執行再平衡。
max.poll.interval.ms 消費者處理消息的最大時長,預設是 5 分鐘。超過該值,該消費者被移除,消費者組執行再平衡。
fetch.min.bytes 預設 1 個位元組。消費者擷取伺服器端一批消息最小的位元組數。
fetch.max.wait.ms 預設 500ms。如果沒有從伺服器端擷取到一批資料的最小位元組數。該時間到,仍然會傳回資料。
fetch.max.bytes 預設 Default: 52428800(50 m)。消費者擷取伺服器端一批消息最大的位元組數。如果伺服器端一批次的資料大于該值(50m)仍然可以拉取回來這批資料,是以,這不是一個絕對最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影響。
max.poll.records 一次 poll 拉取資料傳回消息的最大條數,預設是 500 條。

總結

kafka消費是很重要的一個環節,本文總結kafka消費者的一些重要機制,包括消費者的整個流程,消費的分區政策,消費的再平衡以及消費的位移管理。在明白這些機制以後,簡單講解了如何使用消費者consumer的API以及消費者中重要的參數。