作者:石臻臻, CSDN部落格之星Top5、Kafka Contributor 、nacos Contributor、華為雲 MVP ,騰訊雲TVP, 滴滴Kafka技術專家 、 KnowStreaming。
KnowStreaming 是滴滴開源的Kafka運維管控平台, 有興趣一起參與參與開發的同學,但是怕自己能力不夠的同學,可以聯系我,當你導師帶你參與開源! 。
1說明
- 從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性将覆寫在使用者工廠中配置的具有相同名稱的所有屬性。您不能通過這種方式指定group.id和client.id屬性。他們将被忽略;
- 可以使用#{…}或屬性占位符(${…})在SpEL上配置注釋上的大多數屬性。 比如:
@KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
clientIdPrefix = "myClientId")
屬性concurrency将會從容器中擷取listen.concurrency的值,如果不存在就預設用3
2@KafkaListener詳解
id 監聽器的id
①. 消費者線程命名規則
填寫:
2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消費
沒有填寫ID:
2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7
②.在相同容器中的監聽器ID不能重複
否則會報錯
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.會覆寫消費者工廠的消費組GroupId
假如配置檔案屬性配置了消費組kafka.consumer.group-id=BASE-DEMO 正常情況它是該容器中的預設消費組 但是如果設定了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"}) 那麼目前消費者的消費組就是consumer-id7 ;
當然如果你不想要他作為groupId的話 可以設定屬性idIsGroup = false;那麼還是會使用預設的GroupId;
④. 如果配置了屬性groupId,則其優先級最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")
例如上面代碼中最終這個消費者的消費組GroupId是 "groupId-test"
該id屬性(如果存在)将用作Kafka消費者group.id屬性,并覆寫消費者工廠中的已配置屬性(如果存在)您還可以groupId顯式設定或将其設定idIsGroup為false,以恢複使用使用者工廠的先前行為group.id。
groupId 消費組名
指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監聽器的id
如何擷取消費者 group.id
在監聽器中調用KafkaUtils.getConsumerGroupId()可以獲得目前的groupId; 可以在日志中列印出來; 可以知道是哪個用戶端消費的;
topics 指定要監聽哪些topic(與topicPattern、topicPartitions 三選一)
可以同時監聽多個 topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 比對Topic進行監聽(與topics、topicPartitions 三選一)
topicPartitions 顯式分區配置設定
可以為監聽器配置明确的主題和分區(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
上面例子意思是 監聽topic1的0,1分區;監聽topic2的第0分區,并且第1分區從offset為100的開始消費;
errorHandler 異常處理
實作KafkaListenerErrorHandler; 然後做一些異常處理;
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//do someting
return null;
}
}
調用的時候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"
containerFactory 監聽器工廠
指定生成監聽器的工廠類;
例如我寫一個 批量消費的工廠類
/**
* 監聽器工廠 批量消費
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
//設定為批量消費,每個批次數量在Kafka配置參數中設定ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
使用containerFactory = "batchFactory"
clientIdPrefix 用戶端字首
會覆寫消費者工廠的kafka.consumer.client-id屬性; 最為字首後面接 -n n是數字
concurrency并發數
會覆寫消費者工廠中的concurrency ,這裡的并發數就是多線程消費; 比如說單機情況下,你設定了3; 相當于就是啟動了3個用戶端來配置設定消費分區;分布式情況 總線程數=concurrency*機器數量; 并不是設定越多越好,具體如何設定請看 屬性concurrency的作用及配置(RoundRobinAssignor 、RangeAssignor)
/**
* 監聽器工廠
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(6);
return factory;
}
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
雖然使用的工廠是concurrencyFactory(concurrency配置了6); 但是他最終生成的監聽器數量 是1;
properties 配置其他屬性
kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ; 同名的都可以修改掉;
用法
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
, clientIdPrefix = "myClientId5",groupId = "groupId-test",
properties = {
"enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
3@KafkaListener使用
4KafkaListenerEndpointRegistry
@Autowired
private KafkaListenerEndpointRegistry registry;
//.... 擷取所有注冊的監聽器
registry.getAllListenerContainers();
設定入參驗證器
當您将Spring Boot與驗證啟動器一起使用時,将LocalValidatorFactoryBean自動配置:如下
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
使用
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}