本文是對《【硬剛大資料之學習路線篇】從零到大資料專家的學習指南(全面更新版)》的Kafka部分補充。
1.1 消息發送流程
Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。
main 線程将消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

相關參數:
batch.size:隻有資料積累到 batch.size 之後,sender 才會發送資料。
linger.ms:如果資料遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送資料。
1.2 異步發送 API
1)導入依賴
2)添加log4j配置檔案
3)編寫代碼
需要用到的類:
KafkaProducer:需要建立一個生産者對象,用來發送資料
ProducerConfig:擷取所需的一系列配置參數
ProducerRecord:每條資料都要封裝成一個 ProducerRecord 對象
1.不帶回調函數的 API
2.帶回調函數的 API
回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分别RecordMetadata 和Exception,如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
1.3 分區器
1) 預設的分區器 DefaultPartitioner
2) 自定義分區器
1.4 同步發送API
同步發送的意思就是,一條消息發送之後,會阻塞目前線程,直至傳回ack。
由于send方法傳回的是一個Future對象,根據Futrue對象的特點,我們也可以實作同步發送的效果,隻需在調用Future對象的get方發即可。
Consumer 消費資料時的可靠性是很容易保證的,因為資料在 Kafka 中是持久化的,故不用擔心資料丢失問題。
由于 consumer 在消費過程中可能會出現斷電當機等故障,consumer 恢複後,需要從故障前的位置的繼續消費,是以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢複後繼續消費。
是以 offset 的維護是 Consumer 消費資料是必須考慮的問題。
2.1 自動送出 offset
2)編寫代碼
KafkaConsumer:需要建立一個消費者對象,用來消費資料
ConsumerConfig:擷取所需的一系列配置參數
ConsuemrRecord:每條資料都要封裝成一個 ConsumerRecord 對象
為了使我們能夠專注于自己的業務邏輯,Kafka 提供了自動送出 offset 的功能。自動送出 offset 的相關參數:
enable.auto.commit:是否開啟自動送出 offset 功能
auto.commit.interval.ms:自動送出 offset 的時間間隔
以下為自動送出 offset 的代碼:
2.2 重置Offset
auto.offset.rest = earliest | latest | none |
2.3 手動送出 offset
雖然自動送出 offset 十分簡介便利,但由于其是基于時間送出的,開發人員難以把握offset 送出的時機。是以 Kafka 還提供了手動送出 offset 的 API。
手動送出 offset 的方法有兩種:分别是 commitSync(同步送出)和 commitAsync(異步送出)。兩者的相同點是,都會将本次 poll 的一批資料最高的偏移量送出;不同點是,commitSync 阻塞目前線程,一直到送出成功,并且會自動失敗重試(由不可控因素導緻,也會出現送出失敗);而 commitAsync 則沒有失敗重試機制,故有可能送出失敗。
1)同步送出 offset
由于同步送出 offset 有失敗重試機制,故更加可靠,以下為同步送出 offset 的示例。
2)異步送出 offset
雖然同步送出 offset 更可靠一些,但是由于其會阻塞目前線程,直到送出成功。是以吞吐量會收到很大的影響。是以更多的情況下,會選用異步送出 offset 的方式。
以下為異步送出 offset 的示例:
3) 資料漏消費和重複消費分析
無論是同步送出還是異步送出 offset,都有可能會造成資料的漏消費或者重複消費。先送出 offset 後消費,有可能造成資料的漏消費;而先消費後送出 offset,有可能會造成資料
的重複消費。
2.3 自定義存儲 offset
Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之後,預設将 offset 存儲在 Kafka的一個内置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。
offset 的維護是相當繁瑣的,因為需要考慮到消費者的 Rebalace。
當有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新配置設定,重新配置設定的過程叫做 Rebalance。
消費者發生 Rebalance 之後,每個消費者消費的分區就會發生變化。是以消費者要首先擷取到自己被重新配置設定到的分區,并且定位到每個分區最近送出的 offset 位置繼續消費。
要實作自定義存儲 offset,需要借助 ConsumerRebalanceListener,以下為示例代碼,其中送出和擷取 offset 的方法,需要根據所選的 offset 存儲系統自行實作。