1.1 消息發送流程
Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。
main 線程将消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

相關參數:
batch.size:隻有資料積累到 batch.size 之後,sender 才會發送資料。
linger.ms:如果資料遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送資料。
1.2 異步發送 API
1)導入依賴
2)編寫代碼
需要用到的類:
KafkaProducer:需要建立一個生産者對象,用來發送資料
ProducerConfig:擷取所需的一系列配置參數
ProducerRecord:每條資料都要封裝成一個 ProducerRecord 對象
1.不帶回調函數的 API
View Code
2.帶回調函數的 API
回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分别是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果
Exception 不為 null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
1.3 同步發送 API
同步發送的意思就是,一條消息發送之後,會阻塞目前線程,直至傳回 ack。
由于 send 方法傳回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實作同步發送的效果,隻需在調用 Future 對象的 get 方發即可。
Consumer 消費資料時的可靠性是很容易保證的,因為資料在 Kafka 中是持久化的,故不用擔心資料丢失問題。
由于 consumer 在消費過程中可能會出現斷電當機等故障,consumer 恢複後,需要從故障前的位置的繼續消費,是以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢複後繼續消費。
是以 offset 的維護是 Consumer 消費資料是必須考慮的問題。
2.1 自動送出 offset
KafkaConsumer:需要建立一個消費者對象,用來消費資料
ConsumerConfig:擷取所需的一系列配置參數
ConsuemrRecord:每條資料都要封裝成一個 ConsumerRecord 對象
為了使我們能夠專注于自己的業務邏輯,Kafka 提供了自動送出 offset 的功能。自動送出 offset 的相關參數:
enable.auto.commit:是否開啟自動送出 offset 功能
auto.commit.interval.ms:自動送出 offset 的時間間隔
以下為自動送出 offset 的代碼:
2.2 手動送出 offset
雖然自動送出 offset 十分簡介便利,但由于其是基于時間送出的,開發人員難以把握offset 送出的時機。是以 Kafka 還提供了手動送出 offset 的 API。
手動送出 offset 的方法有兩種:分别是 commitSync(同步送出)和 commitAsync(異步送出)。兩者的相同點是,都會将本次 poll 的一批資料最高的偏移量送出;不同點是,
commitSync 阻塞目前線程,一直到送出成功,并且會自動失敗重試(由不可控因素導緻,也會出現送出失敗);而 commitAsync 則沒有失敗重試機制,故有可能送出失敗。
1)同步送出 offset
由于同步送出 offset 有失敗重試機制,故更加可靠,以下為同步送出 offset 的示例。
2)異步送出 offset
雖然同步送出 offset 更可靠一些,但是由于其會阻塞目前線程,直到送出成功。是以吞吐量會收到很大的影響。是以更多的情況下,會選用異步送出 offset 的方式。
以下為異步送出 offset 的示例:
3) 資料漏消費和重複消費分析
無論是同步送出還是異步送出 offset,都有可能會造成資料的漏消費或者重複消費。先送出 offset 後消費,有可能造成資料的漏消費;而先消費後送出 offset,有可能會造成資料
的重複消費。
2.3 自定義存儲 offset
Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之後,預設将 offset 存儲在 Kafka的一個内置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。
offset 的維護是相當繁瑣的,因為需要考慮到消費者的 Rebalace。
當有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新配置設定,重新配置設定的過程叫做 Rebalance。
消費者發生 Rebalance 之後,每個消費者消費的分區就會發生變化。是以消費者要首先擷取到自己被重新配置設定到的分區,并且定位到每個分區最近送出的 offset 位置繼續消費。
要實作自定義存儲 offset,需要借助 ConsumerRebalanceListener,以下為示例代碼,其中送出和擷取 offset 的方法,需要根據所選的 offset 存儲系統自行實作。