天天看點

Kafka Provider(生産者)(二)API 樣例

kafka生産者的三種方式

(不同業務場景使用不同的生産方式)(後續在總結梳理)

三種發送方式

  • 1.發送并忘記 :

把消息發送到伺服器,但是并不關心它是否到達。大多數的情況下,消息會正常到達,因為kafka是高可用的,

而且生産者會自動嘗試重發。不過使用這種方式有時候也會丢失一些資料。

  • 2.同步發送

我們使用send()方法發送消息,它會傳回一個future對象,調用get()方法進行等待,就可以知道消息是否發送成功

  • 3.異步發送

我們調用send()方法,并制定一個回調函數,伺服器在傳回響應時候調用該函數。

在建立生産者的代碼層面,我做了一些很詳細的解釋,可以通過下面的代碼來得到實戰,我自己也有測試過,把consumer也貼在上面,這樣對于一些初學者也可以得到一個比較很好的測試效果

建立生産者:

package com.wy.concurrent.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * Created on 2019/7/18.
 * Title: Simple
 * Description:建立生産者API(三種方式,使用哪種方式要取決于使用業務場景)
 * Copyright: Copyright(c) 2019
 * Company:
 *
 * @author wy
 */
@Slf4j
public class KafkaProducerConfigTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        //設定生産者記憶體緩沖區的大小。
        props.put("buffer.memory", 33554432);
        //預設情況,是不會被壓縮,參數可以設定為snappy,gzip,lz4,它指定了消息被發送給broker之前使用哪一種壓縮算法進行壓縮。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //建立生産者
        Producer<String, String> producer = new KafkaProducer<>(props);
        try {
           /* ProducerRecord的泛型表示,主要是key,value的對象
            使用生産者的的send()方法發送ProducerRecord對象。從生産者的架構圖裡可以看到,消息先被放到緩沖區,然後使用單獨的程式設計發送到伺服器端。
            send()方法會傳回一個包辦RecordMetadata的Future對象,不過因為我們常常忽略傳回值,是以無法知道消息是否發送成功。如果不關心發送結果,那麼可以使用
            這種發送方式。eg:記錄不太重要的日志,或者是實時的GPS資訊,而且在生産者發送的時候還是會存在異常情況出現的(序列化異常,緩沖區已滿,發送線程中斷等)*/
            //todo 1.發送并忘記
            // producer.send(new ProducerRecord<String, String>("my-topic", "hello world+"));


          /*  1.producer.send()方法先傳回一個Future對象,然後調用Future的get()方法等待kafka響應。如果伺服器傳回錯誤,get()方法會抛出異常。
            如果沒有錯誤,我們會得到一個RecordMetadata的對象,可以用它得到消息的偏移量。
            2.如果在發送資料之前或者在發送過程中發生了任何錯誤,比如broker傳回了一個不允許重發消息的異常或者已經超過了重發的次數,那麼就會抛出異常
            在catch中可以做異常的處理,這裡隻是把日志進行列印。*/
         /*   kafkaProducer一般會發生兩種錯誤。1:可重試錯誤。這類錯誤可以通過重發消息來解決。
                                            2.eg:連接配接錯誤,可以通過再次建立連接配接來解決。“無主(no leader)” 錯誤則可以通過重新為分區選舉首領來解決。
            kafkaProducer可以被配成自動重試,如果多次重試後仍無法解決問題,應用程式會收到一個重試異常,另一類錯誤無法通過重試解決,比如“消息太大異常”。
            對于這類錯誤,不會進行重試,會直接進行抛出異常。*/
            //todo 2.同步發送
            for (int i = 0; i < 10; i++) {
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>("my-topic", "hello world+" + i)).get();
                System.out.println(recordMetadata.offset() + "----" + recordMetadata.partition() + "----" + recordMetadata.topic());
            }

            //todo 異步發送
          /*假設消息在應用程式和kafka叢集之間來回需要10ms。如果在發送完每個消息後都等待回應,那麼發送100個消息需要1秒。但如果隻發送消息而不等待回應,那麼
            發送100個消息的時間會是少很多。大多數時候,我們并不需要等待響應--盡管kafka會把主題目标、分區資訊、和消息的偏移量發送回來,但對于發送端的應用程式來說不是必須的
            不過我們遇到消息發送失敗時,是需要抛出異常,用來記錄錯誤日志,或者把消息寫進錯誤消息檔案中的,友善以後分析使用。*/
            //todo 為了在異步發送消息的同時能夠對異常情況進行處理,生産者提供了回調支援。
            //為了支援回調,需要實作Callback接口的onCompletion方法。
            //在發送消息出去的時候,要傳入一個回調對象用以實作回調方法
          /*  producer.send(new ProducerRecord<String, String>("my-topic", "hello world+"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (null == exception) {
                        log.info("You can print some of the information you need without any exceptions");
                        log.info("這裡不出現異常可以列印自己需要的一些資訊");
                    } else {
                        log.error("Only the log is printed here, but in real project development you can optimize it according to the company's business scenario");
                        log.error("這裡隻進行日志的列印,但是在實際的項目開發中可以根據company的業務場景進行優化");
                        exception.printStackTrace();
                    }
                }
            });*/

        } catch (Exception e) {
            log.error("send fail Exception={}:", e);
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}
           

consumer進行消費:

package com.wy.concurrent.kafka;


import com.wy.concurrent.utils.JsonUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;

  
/*Created on 2019/7/18.
          Title:Simple
          Description:建立消費者,進行消費
          有些是我的工具類,在項目運作的時候可以進行注釋掉即可
          Copyright:Copyright(c)2018
          Company:

@author wy*/

public class KafkaConsumerConfigTest {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.test");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(JsonUtil.toJson(record.value()));
            }
        }


    }
}
           

周五同僚都走了,可是我不能閑着啊,總結梳理一下,如果能不對的地方請指出,一起交流下,共同提高!