天天看點

Kafka快速入門系列(9) | Kafka的Producer API操作

  本篇部落客帶來的是Kafka的Producer API操作。

1. 消息發送流程

2. 無回調參數的API

3. 帶回調函數的API

4. 同步發送API / 隻是比異步多了一個.get()

  Kafka的Producer發送消息采用的是<code>異步發送</code>的方式。在消息發送的過程中,涉及到了<code>兩個線程——main線程和Sender線程</code>,以及<code>一個線程共享變量——RecordAccumulator</code>。main線程将消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker。

下圖為KafkaProducer發送消息流程
Kafka快速入門系列(9) | Kafka的Producer API操作
Kafka快速入門系列(9) | Kafka的Producer API操作
相關參數: batch.size:隻有資料積累到batch.size之後,sender才會發送資料。 linger.ms:如果資料遲遲未達到batch.size,sender等待linger.time之後就會發送資料。

1. 導入依賴

2. 編寫代碼

需要用到的類:

KafkaProducer:需要建立一個生産者對象,用來發送資料

ProducerConfig:擷取所需的一系列配置參數

ProducerRecord:每條資料都要封裝成一個ProducerRecord對象

3. 完整代碼

4. 群起zookeeper和kafka

Kafka快速入門系列(9) | Kafka的Producer API操作

5. 啟動一個consumer控制台

6. 跑代碼進行測試

Kafka快速入門系列(9) | Kafka的Producer API操作
為什麼是無序的,這是因為我們隻能保證分區内有序。其他無序。

  回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分别是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,如果Exception不為null,說明消息發送失敗。

  <code>注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試</code>。

1. 代碼

2. 結果

Kafka快速入門系列(9) | Kafka的Producer API操作

3. 大緻流程圖

Kafka快速入門系列(9) | Kafka的Producer API操作

  同步發送的意思就是,一條消息發送之後,會阻塞目前線程,直至傳回ack。

  由于send方法傳回的是一個Future對象,根據Futrue對象的特點,我們也可以實作同步發送的效果,隻需在調用Future對象的get方發即可。

1. 代碼:

Kafka快速入門系列(9) | Kafka的Producer API操作

  本次的分享就到這裡了,

Kafka快速入門系列(9) | Kafka的Producer API操作

  

\color{#FF0000}{看完就贊,養成習慣!!!}

看完就贊,養成習慣!!!^ _ ^ ❤️ ❤️ ❤️

  碼字不易,大家的支援就是我堅持下去的動力。點贊後不要忘了關注我哦!