kafka生産者的三種方式
(不同業務場景使用不同的生産方式)(後續在總結梳理)
三種發送方式
- 1.發送并忘記 :
把消息發送到伺服器,但是并不關心它是否到達。大多數的情況下,消息會正常到達,因為kafka是高可用的,
而且生産者會自動嘗試重發。不過使用這種方式有時候也會丢失一些資料。
- 2.同步發送
我們使用send()方法發送消息,它會傳回一個future對象,調用get()方法進行等待,就可以知道消息是否發送成功
- 3.異步發送
我們調用send()方法,并制定一個回調函數,伺服器在傳回響應時候調用該函數。
在建立生産者的代碼層面,我做了一些很詳細的解釋,可以通過下面的代碼來得到實戰,我自己也有測試過,把consumer也貼在上面,這樣對于一些初學者也可以得到一個比較很好的測試效果
建立生産者:
package com.wy.concurrent.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
/**
* Created on 2019/7/18.
* Title: Simple
* Description:建立生産者API(三種方式,使用哪種方式要取決于使用業務場景)
* Copyright: Copyright(c) 2019
* Company:
*
* @author wy
*/
@Slf4j
public class KafkaProducerConfigTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
//設定生産者記憶體緩沖區的大小。
props.put("buffer.memory", 33554432);
//預設情況,是不會被壓縮,參數可以設定為snappy,gzip,lz4,它指定了消息被發送給broker之前使用哪一種壓縮算法進行壓縮。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//建立生産者
Producer<String, String> producer = new KafkaProducer<>(props);
try {
/* ProducerRecord的泛型表示,主要是key,value的對象
使用生産者的的send()方法發送ProducerRecord對象。從生産者的架構圖裡可以看到,消息先被放到緩沖區,然後使用單獨的程式設計發送到伺服器端。
send()方法會傳回一個包辦RecordMetadata的Future對象,不過因為我們常常忽略傳回值,是以無法知道消息是否發送成功。如果不關心發送結果,那麼可以使用
這種發送方式。eg:記錄不太重要的日志,或者是實時的GPS資訊,而且在生産者發送的時候還是會存在異常情況出現的(序列化異常,緩沖區已滿,發送線程中斷等)*/
//todo 1.發送并忘記
// producer.send(new ProducerRecord<String, String>("my-topic", "hello world+"));
/* 1.producer.send()方法先傳回一個Future對象,然後調用Future的get()方法等待kafka響應。如果伺服器傳回錯誤,get()方法會抛出異常。
如果沒有錯誤,我們會得到一個RecordMetadata的對象,可以用它得到消息的偏移量。
2.如果在發送資料之前或者在發送過程中發生了任何錯誤,比如broker傳回了一個不允許重發消息的異常或者已經超過了重發的次數,那麼就會抛出異常
在catch中可以做異常的處理,這裡隻是把日志進行列印。*/
/* kafkaProducer一般會發生兩種錯誤。1:可重試錯誤。這類錯誤可以通過重發消息來解決。
2.eg:連接配接錯誤,可以通過再次建立連接配接來解決。“無主(no leader)” 錯誤則可以通過重新為分區選舉首領來解決。
kafkaProducer可以被配成自動重試,如果多次重試後仍無法解決問題,應用程式會收到一個重試異常,另一類錯誤無法通過重試解決,比如“消息太大異常”。
對于這類錯誤,不會進行重試,會直接進行抛出異常。*/
//todo 2.同步發送
for (int i = 0; i < 10; i++) {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>("my-topic", "hello world+" + i)).get();
System.out.println(recordMetadata.offset() + "----" + recordMetadata.partition() + "----" + recordMetadata.topic());
}
//todo 異步發送
/*假設消息在應用程式和kafka叢集之間來回需要10ms。如果在發送完每個消息後都等待回應,那麼發送100個消息需要1秒。但如果隻發送消息而不等待回應,那麼
發送100個消息的時間會是少很多。大多數時候,我們并不需要等待響應--盡管kafka會把主題目标、分區資訊、和消息的偏移量發送回來,但對于發送端的應用程式來說不是必須的
不過我們遇到消息發送失敗時,是需要抛出異常,用來記錄錯誤日志,或者把消息寫進錯誤消息檔案中的,友善以後分析使用。*/
//todo 為了在異步發送消息的同時能夠對異常情況進行處理,生産者提供了回調支援。
//為了支援回調,需要實作Callback接口的onCompletion方法。
//在發送消息出去的時候,要傳入一個回調對象用以實作回調方法
/* producer.send(new ProducerRecord<String, String>("my-topic", "hello world+"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null == exception) {
log.info("You can print some of the information you need without any exceptions");
log.info("這裡不出現異常可以列印自己需要的一些資訊");
} else {
log.error("Only the log is printed here, but in real project development you can optimize it according to the company's business scenario");
log.error("這裡隻進行日志的列印,但是在實際的項目開發中可以根據company的業務場景進行優化");
exception.printStackTrace();
}
}
});*/
} catch (Exception e) {
log.error("send fail Exception={}:", e);
e.printStackTrace();
} finally {
producer.close();
}
}
}
consumer進行消費:
package com.wy.concurrent.kafka;
import com.wy.concurrent.utils.JsonUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
/*Created on 2019/7/18.
Title:Simple
Description:建立消費者,進行消費
有些是我的工具類,在項目運作的時候可以進行注釋掉即可
Copyright:Copyright(c)2018
Company:
@author wy*/
public class KafkaConsumerConfigTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(JsonUtil.toJson(record.value()));
}
}
}
}
周五同僚都走了,可是我不能閑着啊,總結梳理一下,如果能不對的地方請指出,一起交流下,共同提高!