本篇部落客帶來的是Kafka的Producer API操作。
1. 消息發送流程
2. 無回調參數的API
3. 帶回調函數的API
4. 同步發送API / 隻是比異步多了一個.get()
Kafka的Producer發送消息采用的是<code>異步發送</code>的方式。在消息發送的過程中,涉及到了<code>兩個線程——main線程和Sender線程</code>,以及<code>一個線程共享變量——RecordAccumulator</code>。main線程将消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker。
下圖為KafkaProducer發送消息流程![]()
Kafka快速入門系列(9) | Kafka的Producer API操作 相關參數: batch.size:隻有資料積累到batch.size之後,sender才會發送資料。 linger.ms:如果資料遲遲未達到batch.size,sender等待linger.time之後就會發送資料。![]()
Kafka快速入門系列(9) | Kafka的Producer API操作
1. 導入依賴
2. 編寫代碼
需要用到的類:
KafkaProducer:需要建立一個生産者對象,用來發送資料
ProducerConfig:擷取所需的一系列配置參數
ProducerRecord:每條資料都要封裝成一個ProducerRecord對象
3. 完整代碼
4. 群起zookeeper和kafka
5. 啟動一個consumer控制台
6. 跑代碼進行測試
為什麼是無序的,這是因為我們隻能保證分區内有序。其他無序。
回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分别是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,如果Exception不為null,說明消息發送失敗。
<code>注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試</code>。
1. 代碼
2. 結果
3. 大緻流程圖
同步發送的意思就是,一條消息發送之後,會阻塞目前線程,直至傳回ack。
由于send方法傳回的是一個Future對象,根據Futrue對象的特點,我們也可以實作同步發送的效果,隻需在調用Future對象的get方發即可。
1. 代碼:
本次的分享就到這裡了,
看
完
就
贊
,
養
成
習
慣
!
\color{#FF0000}{看完就贊,養成習慣!!!}
看完就贊,養成習慣!!!^ _ ^ ❤️ ❤️ ❤️
碼字不易,大家的支援就是我堅持下去的動力。點贊後不要忘了關注我哦!