天天看點

Kafka基礎(四):Kafka API

1.1 消息發送流程

  Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。

main 線程将消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

Kafka基礎(四):Kafka API

相關參數:

batch.size:隻有資料積累到 batch.size 之後,sender 才會發送資料。

linger.ms:如果資料遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送資料。 

1.2 異步發送 API

1)導入依賴 

2)編寫代碼

需要用到的類:

KafkaProducer:需要建立一個生産者對象,用來發送資料

ProducerConfig:擷取所需的一系列配置參數

ProducerRecord:每條資料都要封裝成一個 ProducerRecord 對象

1.不帶回調函數的 API 

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API

View Code

2.帶回調函數的 API

回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分别是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果

Exception 不為 null,說明消息發送失敗。

注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。 

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API

1.3 同步發送 API 

  同步發送的意思就是,一條消息發送之後,會阻塞目前線程,直至傳回 ack。

  由于 send 方法傳回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實作同步發送的效果,隻需在調用 Future 對象的 get 方發即可。 

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API

  Consumer 消費資料時的可靠性是很容易保證的,因為資料在 Kafka 中是持久化的,故不用擔心資料丢失問題。 

  由于 consumer 在消費過程中可能會出現斷電當機等故障,consumer 恢複後,需要從故障前的位置的繼續消費,是以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢複後繼續消費。 

是以 offset 的維護是 Consumer 消費資料是必須考慮的問題。

2.1 自動送出 offset 

KafkaConsumer:需要建立一個消費者對象,用來消費資料

ConsumerConfig:擷取所需的一系列配置參數

ConsuemrRecord:每條資料都要封裝成一個 ConsumerRecord 對象

  為了使我們能夠專注于自己的業務邏輯,Kafka 提供了自動送出 offset 的功能。自動送出 offset 的相關參數:

enable.auto.commit:是否開啟自動送出 offset 功能

auto.commit.interval.ms:自動送出 offset 的時間間隔

以下為自動送出 offset 的代碼:

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API

2.2 手動送出 offset

  雖然自動送出 offset 十分簡介便利,但由于其是基于時間送出的,開發人員難以把握offset 送出的時機。是以 Kafka 還提供了手動送出 offset 的 API。 

  手動送出 offset 的方法有兩種:分别是 commitSync(同步送出)和 commitAsync(異步送出)。兩者的相同點是,都會将本次 poll 的一批資料最高的偏移量送出;不同點是,

commitSync 阻塞目前線程,一直到送出成功,并且會自動失敗重試(由不可控因素導緻,也會出現送出失敗);而 commitAsync 則沒有失敗重試機制,故有可能送出失敗。 

1)同步送出 offset

由于同步送出 offset 有失敗重試機制,故更加可靠,以下為同步送出 offset 的示例。 

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API

2)異步送出 offset

雖然同步送出 offset 更可靠一些,但是由于其會阻塞目前線程,直到送出成功。是以吞吐量會收到很大的影響。是以更多的情況下,會選用異步送出 offset 的方式。

以下為異步送出 offset 的示例: 

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API

3) 資料漏消費和重複消費分析

  無論是同步送出還是異步送出 offset,都有可能會造成資料的漏消費或者重複消費。先送出 offset 後消費,有可能造成資料的漏消費;而先消費後送出 offset,有可能會造成資料

的重複消費。

Kafka基礎(四):Kafka API

2.3 自定義存儲 offset 

  Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之後,預設将 offset 存儲在 Kafka的一個内置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。

  offset 的維護是相當繁瑣的,因為需要考慮到消費者的 Rebalace。 

  當有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新配置設定,重新配置設定的過程叫做 Rebalance。 

  消費者發生 Rebalance 之後,每個消費者消費的分區就會發生變化。是以消費者要首先擷取到自己被重新配置設定到的分區,并且定位到每個分區最近送出的 offset 位置繼續消費。

  要實作自定義存儲 offset,需要借助 ConsumerRebalanceListener,以下為示例代碼,其中送出和擷取 offset 的方法,需要根據所選的 offset 存儲系統自行實作。

Kafka基礎(四):Kafka API
Kafka基礎(四):Kafka API