天天看點

「spring-kafka」@KafkaListener詳解與使用

作者:石臻臻的雜貨鋪

作者:石臻臻, CSDN部落格之星Top5、Kafka Contributor 、nacos Contributor、華為雲 MVP ,騰訊雲TVP, 滴滴Kafka技術專家 、 KnowStreaming。

KnowStreaming 是滴滴開源的Kafka運維管控平台, 有興趣一起參與參與開發的同學,但是怕自己能力不夠的同學,可以聯系我,當你導師帶你參與開源! 。

1說明

  • 從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性将覆寫在使用者工廠中配置的具有相同名稱的所有屬性。您不能通過這種方式指定group.id和client.id屬性。他們将被忽略;
  • 可以使用#{…}或屬性占位符(${…​})在SpEL上配置注釋上的大多數屬性。 比如:
@KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")
           

屬性concurrency将會從容器中擷取listen.concurrency的值,如果不存在就預設用3

2@KafkaListener詳解

id 監聽器的id

①. 消費者線程命名規則

填寫:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消費

沒有填寫ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的監聽器ID不能重複

否則會報錯

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
           

③.會覆寫消費者工廠的消費組GroupId

假如配置檔案屬性配置了消費組kafka.consumer.group-id=BASE-DEMO 正常情況它是該容器中的預設消費組 但是如果設定了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"}) 那麼目前消費者的消費組就是consumer-id7 ;

當然如果你不想要他作為groupId的話 可以設定屬性idIsGroup = false;那麼還是會使用預設的GroupId;

④. 如果配置了屬性groupId,則其優先級最高

@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")
           

例如上面代碼中最終這個消費者的消費組GroupId是 "groupId-test"

該id屬性(如果存在)将用作Kafka消費者group.id屬性,并覆寫消費者工廠中的已配置屬性(如果存在)您還可以groupId顯式設定或将其設定idIsGroup為false,以恢複使用使用者工廠的先前行為group.id。

groupId 消費組名

指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監聽器的id

如何擷取消費者 group.id

在監聽器中調用KafkaUtils.getConsumerGroupId()可以獲得目前的groupId; 可以在日志中列印出來; 可以知道是哪個用戶端消費的;

topics 指定要監聽哪些topic(與topicPattern、topicPartitions 三選一)

可以同時監聽多個 topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 比對Topic進行監聽(與topics、topicPartitions 三選一)

topicPartitions 顯式分區配置設定

可以為監聽器配置明确的主題和分區(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
           

上面例子意思是 監聽topic1的0,1分區;監聽topic2的第0分區,并且第1分區從offset為100的開始消費;

errorHandler 異常處理

實作KafkaListenerErrorHandler; 然後做一些異常處理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
     //do someting
        return null;
    }
}
           

調用的時候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 監聽器工廠

指定生成監聽器的工廠類;

例如我寫一個 批量消費的工廠類

/**
     * 監聽器工廠 批量消費
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //設定為批量消費,每個批次數量在Kafka配置參數中設定ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }
           

使用containerFactory = "batchFactory"

clientIdPrefix 用戶端字首

會覆寫消費者工廠的kafka.consumer.client-id屬性; 最為字首後面接 -n n是數字

concurrency并發數

會覆寫消費者工廠中的concurrency ,這裡的并發數就是多線程消費; 比如說單機情況下,你設定了3; 相當于就是啟動了3個用戶端來配置設定消費分區;分布式情況 總線程數=concurrency*機器數量; 并不是設定越多越好,具體如何設定請看 屬性concurrency的作用及配置(RoundRobinAssignor 、RangeAssignor)
/**
     * 監聽器工廠 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
           
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
           

雖然使用的工廠是concurrencyFactory(concurrency配置了6); 但是他最終生成的監聽器數量 是1;

properties 配置其他屬性

kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ; 同名的都可以修改掉;

用法

@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
           

3@KafkaListener使用

4KafkaListenerEndpointRegistry

@Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 擷取所有注冊的監聽器
        registry.getAllListenerContainers();

           

設定入參驗證器

當您将Spring Boot與驗證啟動器一起使用時,将LocalValidatorFactoryBean自動配置:如下

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}
           

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}