天天看點

搭建大型分布式服務(二十二)SpringBoot 如何優雅地整合多個kafka資料源?系列文章目錄前言

系列文章目錄

搭建大型分布式服務(二十二)SpringBoot 如何優雅地整合多個kafka資料源?系列文章目錄前言

文章目錄

  • 系列文章目錄
  • 前言
      • 一、本文要點
      • 二、開發環境
      • 三、建立項目
      • 四、修改項目
      • 五、測試一下
      • 六、小結

前言

在日常開發當中,經常會遇到需要消費的topic不在同一個kafka叢集内,這時候需要配置多個資料源,如何優雅地完成這個配置呢?

一、本文要點

接前文,我們已經在項目裡內建了kafka。本文将介紹如何優雅地整合多個kafka資料源。系列文章完整目錄

  • springboot 整合多個kafka資料源
  • springboot 整合多個消費者topic
  • kafka listener 配置factory

二、開發環境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka 2.12-2.3
  • idea 2020

三、建立項目

1、使用早期文章快速建立項目。

《搭建大型分布式服務(十二)Docker搭建開發環境安裝Kafka和zookeeper》

《搭建大型分布式服務(十三)SpringBoot整合kafka》

《搭建大型分布式服務(十八)Maven自定義項目腳手架》

2、建立Ticket項目

mvn archetype:generate  -DgroupId="com.mmc.lesson" -DartifactId="ticket" -Dversion=1.0-SNAPSHOT -Dpackage="com.mmc.lesson" -DarchetypeArtifactId=member-archetype  -DarchetypeGroupId=com.mmc.lesson -DarchetypeVersion=1.0.0-SNAPSHOT -B
           

四、修改項目

1、編寫KafkaPropertiesConfiguration.java,用來接收兩個kafka配置。

@Configuration
public class KafkaPropertiesConfiguration {

    /**
     * one的kafka配置.
     */
    @Bean("oneKafkaProperties")
    @ConfigurationProperties("spring.one.kafka")
    public CustomKafkaProperties oneKafkaProperties() {
        return new CustomKafkaProperties();
    }

    /**
     * two的kafka配置.
     */
    @Bean("twoKafkaProperties")
    @ConfigurationProperties("spring.two.kafka")
    public CustomKafkaProperties twoKafkaProperties() {
        return new CustomKafkaProperties();
    }


    @Data
    public static class CustomKafkaProperties {

        private final Consumer consumer = new Consumer();

        /**
         * Create an initial map of consumer properties from the state of this instance.
         * <p>
         * This allows you to add additional properties, if necessary, and override the
         * default kafkaConsumerFactory bean.
         *
         * @return the consumer properties initialized with the customizations defined on this
         *         instance
         */
        public Map<String, Object> buildConsumerProperties() {
            return new HashMap<>(this.consumer.buildProperties());
        }
    }
}

           

2、編寫KafkaConsumerConfiguration.java,定義兩個消費工廠。

@Configuration
@AutoConfigureAfter(KafkaPropertiesConfiguration.class)
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.listener.type:batch}")
    private String listenerType;

    /**
     * 消費one資料.
     */
    @Bean("oneContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> oneContainerFactory(
            @Qualifier("oneKafkaProperties")
                    CustomKafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> container =
                new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
        // 設定并發量,小于或等于Topic的分區數
        // container.setConcurrency(5);
        // 設定為批量監聽
        container.setBatchListener("batch".equalsIgnoreCase(listenerType));
        return container;
    }

    /**
     * 消費two資料.
     */
    @Bean("twoContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> twoContainerFactory(
            @Qualifier("twoKafkaProperties")
                    CustomKafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> container =
                new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
        // 設定并發量,小于或等于Topic的分區數
        // container.setConcurrency(5);
        // 設定為批量監聽
        container.setBatchListener("batch".equalsIgnoreCase(listenerType));
        return container;
    }
}
           

3、編寫KafkaReceiver.java,消費兩個topic的資料。

@Service
@Slf4j
public class KafkaReceiver {
    @KafkaListener(id = "kafka-one-demo",
            topics = Const.KAFKA_ONE_DEMO_TOPIC,
            groupId = "oneGroup",
            containerFactory = "oneContainerFactory")
    public void receiveOne(ConsumerRecord<String, String> record) {

        if (null == record || !StringUtils.hasText(record.value())) {

            log.warn("KafkaReceiver record is null or record.value is empty.");
            return;
        }

        String reqJson = record.value();
        log.info("one KafkaReceiver {}", reqJson);


    }

    @KafkaListener(id = "kafka-two-demo",
            topics = Const.KAFKA_TWO_DEMO_TOPIC,
            groupId = "twoGroup",
            containerFactory = "twoContainerFactory")
    public void receiveTwo(ConsumerRecord<String, String> record) {

        if (null == record || !StringUtils.hasText(record.value())) {

            log.warn("KafkaReceiver record is null or record.value is empty.");
            return;
        }

        String reqJson = record.value();
        log.info("two KafkaReceiver {}", reqJson);


    }
}
           

4、修改application-dev.properties 增加雙kafka配置,其它環境同理(可以放到Apollo托管)。

#################### KAFKA ####################

## 以下為消費者配置
spring.kafka.listener.type=single
spring.kafka.listener.missing-topics-fatal=false

spring.ymall.kafka.consumer.bootstrapServers=127.0.0.1:9092
spring.ymall.kafka.consumer.auto-offset-reset=latest
spring.ymall.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.ymall.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.ymall.kafka.consumer.max-poll-records=50

spring.center.kafka.consumer.bootstrapServers=127.0.0.1:9092
spring.center.kafka.consumer.auto-offset-reset=latest
spring.center.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.center.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.center.kafka.consumer.max-poll-records=50

## 以下為單元測試生産者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.buffer-memory=524288
spring.kafka.producer.batch-size=65536
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
           

五、測試一下

1、修改并運作單元測試

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest
class KafkaSenderTest {

    @Resource
    private KafkaSender kafkaSender;

    @Test
    void sendMessage() throws IOException {

        String json = "hello";

        for (int i = 0; i < 1; i++) {

            kafkaSender.sendMessage(Const.KAFKA_ONE_DEMO_TOPIC, json);
            kafkaSender.sendMessage(Const.KAFKA_TWO_DEMO_TOPIC, json);

        }

        System.in.read();

    }

}
           

2、測試通過。

[2021-08-19 17:09:02.203] [kafka-two-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - twoGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:02.203] [kafka-one-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - oneGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:09.721] [kafka-two-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - two KafkaReceiver hello
[2021-08-19 17:09:12.736] [kafka-one-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - one KafkaReceiver hello

           

六、小結

至此,我們就優雅地整合多個kafka消費者資料源,小夥伴們可以發揮自己的動手能力,配置多個生産者哦。下一篇《搭建大型分布式服務(二十三)Springboot 拓展-定制日志元件》

加我加群一起交流學習!更多幹貨下載下傳和大廠内推等着你

搭建大型分布式服務(二十二)SpringBoot 如何優雅地整合多個kafka資料源?系列文章目錄前言

繼續閱讀